LSSTApplications  11.0-13-gbb96280,12.1.rc1,12.1.rc1+1,12.1.rc1+2,12.1.rc1+5,12.1.rc1+8,12.1.rc1-1-g06d7636+1,12.1.rc1-1-g253890b+5,12.1.rc1-1-g3d31b68+7,12.1.rc1-1-g3db6b75+1,12.1.rc1-1-g5c1385a+3,12.1.rc1-1-g83b2247,12.1.rc1-1-g90cb4cf+6,12.1.rc1-1-g91da24b+3,12.1.rc1-2-g3521f8a,12.1.rc1-2-g39433dd+4,12.1.rc1-2-g486411b+2,12.1.rc1-2-g4c2be76,12.1.rc1-2-gc9c0491,12.1.rc1-2-gda2cd4f+6,12.1.rc1-3-g3391c73+2,12.1.rc1-3-g8c1bd6c+1,12.1.rc1-3-gcf4b6cb+2,12.1.rc1-4-g057223e+1,12.1.rc1-4-g19ed13b+2,12.1.rc1-4-g30492a7
LSSTDataManagementBasePackage
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2016 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <https://www.lsstcorp.org/LegalNotices/>.
21 #
22 from __future__ import absolute_import, division, print_function
23 import numpy as np
24 
25 import lsst.pex.config as pexConfig
26 import lsst.pipe.base as pipeBase
27 import lsst.afw.table as afwTable
28 import lsst.afw.coord as afwCoord
29 import lsst.afw.geom as afwGeom
30 from lsst.afw.image import fluxFromABMag, fluxErrFromABMagErr
31 from .htmIndexer import HtmIndexer as Indexer
32 from .readTextCatalogTask import ReadTextCatalogTask
33 
34 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask"]
35 
36 
37 class IngestReferenceRunner(pipeBase.TaskRunner):
38  """!Task runner for the reference catalog ingester
39 
40  Data IDs are ignored so the runner should just run the task on the parsed command.
41  """
42 
43  def run(self, parsedCmd):
44  """!Run the task.
45  Several arguments need to be collected to send on to the task methods.
46 
47  @param[in] parsedCmd Parsed command including command line arguments.
48  @param[out] Struct containing the result of the indexing.
49  """
50  files = parsedCmd.files
51  butler = parsedCmd.butler
52  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
53  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
54 
55  result = task.create_indexed_catalog(files)
56  if self.doReturnResults:
57  return pipeBase.Struct(
58  result=result,
59  )
60 
61 
62 class IngestIndexedReferenceConfig(pexConfig.Config):
63  ref_dataset_name = pexConfig.Field(
64  dtype=str,
65  default='cal_ref_cat',
66  doc='String to pass to the butler to retrieve persisted files.',
67  )
68  level = pexConfig.Field(
69  dtype=int,
70  default=8,
71  doc='Default HTM level. Level 8 gives ~0.08 sq deg per trixel.',
72  )
73  file_reader = pexConfig.ConfigurableField(
74  target=ReadTextCatalogTask,
75  doc='Task to use to read the files. Default is to expect text files.'
76  )
77  ra_name = pexConfig.Field(
78  dtype=str,
79  doc="Name of RA column",
80  )
81  dec_name = pexConfig.Field(
82  dtype=str,
83  doc="Name of Dec column",
84  )
85  mag_column_list = pexConfig.ListField(
86  dtype=str,
87  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
88  "List of column names to use for photometric information. At least one entry is required."
89  )
90  mag_err_column_map = pexConfig.DictField(
91  keytype=str,
92  itemtype=str,
93  default={},
94  doc="A map of magnitude column name (key) to magnitude error column (value)."
95  )
96  is_photometric_name = pexConfig.Field(
97  dtype=str,
98  optional=True,
99  doc='Name of column stating if satisfactory for photometric calibration (optional).'
100  )
101  is_resolved_name = pexConfig.Field(
102  dtype=str,
103  optional=True,
104  doc='Name of column stating if the object is resolved (optional).'
105  )
106  is_variable_name = pexConfig.Field(
107  dtype=str,
108  optional=True,
109  doc='Name of column stating if the object is measured to be variable (optional).'
110  )
111  id_name = pexConfig.Field(
112  dtype=str,
113  optional=True,
114  doc='Name of column to use as an identifier (optional).'
115  )
116  extra_col_names = pexConfig.ListField(
117  dtype=str,
118  default=[],
119  doc='Extra columns to add to the reference catalog.'
120  )
121 
122  def validate(self):
123  pexConfig.Config.validate(self)
124  if not (self.ra_name and self.dec_name and self.mag_column_list):
125  raise ValueError("ra_name and dec_name and at least one entry in mag_column_list must be" +
126  " supplied.")
127  if len(self.mag_err_column_map) > 0 and not len(self.mag_column_list) == len(self.mag_err_column_map):
128  raise ValueError("If magnitude errors are provided, all magnitudes must have an error column")
129 
130 
131 class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
132  """!Class for both producing indexed reference catalogs and for loading them.
133 
134  This implements an indexing scheme based on hierarchical triangular mesh (HTM).
135  The term index really means breaking the catalog into localized chunks called
136  shards. In this case each shard contains the entries from the catalog in a single
137  HTM trixel
138  """
139  canMultiprocess = False
140  ConfigClass = IngestIndexedReferenceConfig
141  RunnerClass = IngestReferenceRunner
142  _DefaultName = 'IngestIndexedReferenceTask'
143 
144  _flags = ['photometric', 'resolved', 'variable']
145 
146  @classmethod
148  """Create an argument parser
149 
150  This overrides the original because we need the file arguments
151  """
152  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
153  parser.add_argument("files", nargs="+", help="Names of files to index")
154  return parser
155 
156  def __init__(self, *args, **kwargs):
157  """!Constructor for the HTM indexing engine
158 
159  @param[in] butler dafPersistence.Butler object for reading and writing catalogs
160  """
161  self.butler = kwargs.pop('butler')
162  pipeBase.Task.__init__(self, *args, **kwargs)
163  self.indexer = Indexer(self.config.level)
164  self.makeSubtask('file_reader')
165 
166  def create_indexed_catalog(self, files):
167  """!Index a set of files comprising a reference catalog. Outputs are persisted in the
168  data repository.
169 
170  @param[in] files A list of file names to read.
171  """
172  rec_num = 0
173  first = True
174  for filename in files:
175  arr = self.file_reader.run(filename)
176  index_list = self.indexer.index_points(arr[self.config.ra_name], arr[self.config.dec_name])
177  if first:
178  schema, key_map = self.make_schema(arr.dtype)
179  # persist empty catalog to hold the master schema
180  dataId = self.make_data_id('master_schema')
181  self.butler.put(self.get_catalog(dataId, schema), self.config.ref_dataset_name,
182  dataId=dataId)
183  first = False
184  pixel_ids = set(index_list)
185  for pixel_id in pixel_ids:
186  dataId = self.make_data_id(pixel_id)
187  catalog = self.get_catalog(dataId, schema)
188  els = np.where(index_list == pixel_id)
189  for row in arr[els]:
190  record = catalog.addNew()
191  rec_num = self._fill_record(record, row, rec_num, key_map)
192  self.butler.put(catalog, self.config.ref_dataset_name, dataId=dataId)
193 
194  @staticmethod
195  def make_data_id(pixel_id):
196  """!Make a data id. Meant to be overridden.
197  @param[in] pixel_id An identifier for the pixel in question.
198  @param[out] dataId (dictionary)
199  """
200  return {'pixel_id': pixel_id}
201 
202  @staticmethod
203  def compute_coord(row, ra_name, dec_name):
204  """!Create a afwCoord object from a np.array row
205  @param[in] row dict like object with ra/dec info in degrees
206  @param[in] ra_name name of RA key
207  @param[in] dec_name name of Dec key
208  @param[out] IcrsCoord object constructed from the RA/Dec values
209  """
210  return afwCoord.IcrsCoord(row[ra_name]*afwGeom.degrees,
211  row[dec_name]*afwGeom.degrees)
212 
213  def _set_flags(self, record, row, key_map):
214  """!Set the flags for a record. Relies on the _flags class attribute
215  @param[in,out] record SourceCatalog record to modify
216  @param[in] row dict like object containing flag info
217  @param[in] key_map Map of catalog keys to use in filling the record
218  """
219  names = record.schema.getNames()
220  for flag in self._flags:
221  if flag in names:
222  attr_name = 'is_{}_name'.format(flag)
223  record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
224 
225  def _set_mags(self, record, row, key_map):
226  """!Set the flux records from the input magnitudes
227  @param[in,out] record SourceCatalog record to modify
228  @param[in] row dict like object containing magnitude values
229  @param[in] key_map Map of catalog keys to use in filling the record
230  """
231  for item in self.config.mag_column_list:
232  record.set(key_map[item+'_flux'], fluxFromABMag(row[item]))
233  if len(self.config.mag_err_column_map) > 0:
234  for err_key in self.config.mag_err_column_map.keys():
235  error_col_name = self.config.mag_err_column_map[err_key]
236  record.set(key_map[err_key+'_fluxSigma'],
237  fluxErrFromABMagErr(row[error_col_name], row[err_key]))
238 
239  def _set_extra(self, record, row, key_map):
240  """!Copy the extra column information to the record
241  @param[in,out] record SourceCatalog record to modify
242  @param[in] row dict like object containing the column values
243  @param[in] key_map Map of catalog keys to use in filling the record
244  """
245  for extra_col in self.config.extra_col_names:
246  value = row[extra_col]
247  # If data read from a text file contains string like entires,
248  # numpy stores this as its own internal type, a numpy.str_
249  # object. This seems to be a consequence of how numpy stores
250  # string like objects in fixed column arrays. This checks
251  # if any of the values to be added to the catalog are numpy
252  # string types, and if they are, casts them to a python string
253  # which is what the python c++ records expect
254  if isinstance(value, np.str_):
255  value = str(value)
256  record.set(key_map[extra_col], value)
257 
258  def _fill_record(self, record, row, rec_num, key_map):
259  """!Fill a record to put in the persisted indexed catalogs
260 
261  @param[in,out] record afwTable.SourceRecord in a reference catalog to fill.
262  @param[in] row A row from a numpy array constructed from the input catalogs.
263  @param[in] rec_num Starting integer to increment for the unique id
264  @param[in] key_map Map of catalog keys to use in filling the record
265  """
266  record.setCoord(self.compute_coord(row, self.config.ra_name, self.config.dec_name))
267  if self.config.id_name:
268  record.setId(row[self.config.id_name])
269  else:
270  rec_num += 1
271  record.setId(rec_num)
272  # No parents
273  record.setParent(-1)
274 
275  self._set_flags(record, row, key_map)
276  self._set_mags(record, row, key_map)
277  self._set_extra(record, row, key_map)
278  return rec_num
279 
280  def get_catalog(self, dataId, schema):
281  """!Get a catalog from the butler or create it if it doesn't exist
282 
283  @param[in] dataId Identifier for catalog to retrieve
284  @param[in] schema Schema to use in catalog creation if the butler can't get it
285  @param[out] afwTable.SourceCatalog for the specified identifier
286  """
287  if self.butler.datasetExists(self.config.ref_dataset_name, dataId=dataId):
288  return self.butler.get(self.config.ref_dataset_name, dataId=dataId)
289  return afwTable.SourceCatalog(schema)
290 
291  def make_schema(self, dtype):
292  """!Make the schema to use in constructing the persisted catalogs.
293 
294  @param[in] dtype A np.dtype to use in constructing the schema
295  @param[out] The schema for the output source catalog.
296  @param[out] A map of catalog keys to use in filling the record
297  """
298  key_map = {}
299  mag_column_list = self.config.mag_column_list
300  mag_err_column_map = self.config.mag_err_column_map
301  if len(mag_err_column_map) > 0 and (
302  not len(mag_column_list) == len(mag_err_column_map) or
303  not sorted(mag_column_list) == sorted(mag_err_column_map.keys())):
304  raise ValueError("Every magnitude column must have a corresponding error column")
305  # makes a schema with a coord, id and parent_id
306  schema = afwTable.SourceTable.makeMinimalSchema()
307 
308  def add_field(name):
309  if dtype[name].kind == 'U':
310  # dealing with a string like thing. Need to get type and size.
311  at_type = afwTable.aliases[str]
312  at_size = dtype[name].itemsize
313  return schema.addField(name, type=at_type, size=at_size)
314  else:
315  at_type = afwTable.aliases[dtype[name].type]
316  return schema.addField(name, at_type)
317 
318  for item in mag_column_list:
319  key_map[item+'_flux'] = schema.addField(item+'_flux', float)
320  if len(mag_err_column_map) > 0:
321  for err_item in mag_err_column_map.keys():
322  key_map[err_item+'_fluxSigma'] = schema.addField(err_item+'_fluxSigma', float)
323  for flag in self._flags:
324  attr_name = 'is_{}_name'.format(flag)
325  if getattr(self.config, attr_name):
326  key_map[flag] = schema.addField(flag, 'Flag')
327  for col in self.config.extra_col_names:
328  key_map[col] = add_field(col)
329  return schema, key_map
def _fill_record
Fill a record to put in the persisted indexed catalogs.
double fluxErrFromABMagErr(double magErr, double mag)
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:74
double fluxFromABMag(double mag)
Compute flux in Janskys from AB magnitude.
Definition: Calib.h:69
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:55
def make_schema
Make the schema to use in constructing the persisted catalogs.
def create_indexed_catalog
Index a set of files comprising a reference catalog.
def get_catalog
Get a catalog from the butler or create it if it doesn&#39;t exist.
Class for both producing indexed reference catalogs and for loading them.
A class to handle Icrs coordinates (inherits from Coord)
Definition: Coord.h:156