LSSTApplications  18.1.0
LSSTDataManagementBasePackage
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 #
4 # Copyright 2008-2017 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig"]
25 
26 import math
27 
28 import astropy.time
29 import astropy.units as u
30 import numpy as np
31 
32 import lsst.pex.config as pexConfig
33 import lsst.pipe.base as pipeBase
34 import lsst.geom
35 import lsst.afw.table as afwTable
36 from lsst.daf.base import PropertyList
37 from lsst.afw.image import fluxErrFromABMagErr
38 from .indexerRegistry import IndexerRegistry
39 from .readTextCatalogTask import ReadTextCatalogTask
40 from .loadReferenceObjects import LoadReferenceObjectsTask
41 
42 _RAD_PER_DEG = math.pi / 180
43 _RAD_PER_MILLIARCSEC = _RAD_PER_DEG/(3600*1000)
44 
45 # The most recent Indexed Reference Catalog on-disk format version.
46 LATEST_FORMAT_VERSION = 1
47 
48 
49 def addRefCatMetadata(catalog):
50  """Add metadata to a new (not yet populated) reference catalog.
51 
52  Parameters
53  ----------
54  catalog : `lsst.afw.table.SimpleCatalog`
55  Catalog to which metadata should be attached. Will be modified
56  in-place.
57  """
58  md = catalog.getMetadata()
59  if md is None:
60  md = PropertyList()
61  md.set("REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
62  catalog.setMetadata(md)
63 
64 
65 class IngestReferenceRunner(pipeBase.TaskRunner):
66  """Task runner for the reference catalog ingester
67 
68  Data IDs are ignored so the runner should just run the task on the parsed command.
69  """
70 
71  def run(self, parsedCmd):
72  """Run the task.
73 
74  Several arguments need to be collected to send on to the task methods.
75 
76  Parameters
77  ----------
78  parsedCmd : `argparse.Namespace`
79  Parsed command.
80 
81  Returns
82  -------
83  results : `lsst.pipe.base.Struct` or `None`
84  A empty struct if self.doReturnResults, else None
85  """
86  files = parsedCmd.files
87  butler = parsedCmd.butler
88  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
89  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
90 
91  task.createIndexedCatalog(files)
92  if self.doReturnResults:
93  return pipeBase.Struct()
94 
95 
96 class DatasetConfig(pexConfig.Config):
97  """The description of the on-disk storage format for the persisted
98  reference catalog.
99  """
100  format_version = pexConfig.Field(
101  dtype=int,
102  doc="Version number of the persisted on-disk storage format."
103  "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
104  "\nVersion 1 had nJy as flux units.",
105  default=0 # This needs to always be 0, so that unversioned catalogs are interpreted as version 0.
106  )
107  ref_dataset_name = pexConfig.Field(
108  dtype=str,
109  default='cal_ref_cat',
110  doc='String to pass to the butler to retrieve persisted files.',
111  )
112  indexer = IndexerRegistry.makeField(
113  default='HTM',
114  doc='Name of indexer algoritm to use. Default is HTM',
115  )
116 
117 
118 class IngestIndexedReferenceConfig(pexConfig.Config):
119  dataset_config = pexConfig.ConfigField(
120  dtype=DatasetConfig,
121  doc="Configuration for reading the ingested data",
122  )
123  file_reader = pexConfig.ConfigurableField(
124  target=ReadTextCatalogTask,
125  doc='Task to use to read the files. Default is to expect text files.'
126  )
127  ra_name = pexConfig.Field(
128  dtype=str,
129  doc="Name of RA column",
130  )
131  dec_name = pexConfig.Field(
132  dtype=str,
133  doc="Name of Dec column",
134  )
135  ra_err_name = pexConfig.Field(
136  dtype=str,
137  doc="Name of RA error column",
138  optional=True,
139  )
140  dec_err_name = pexConfig.Field(
141  dtype=str,
142  doc="Name of Dec error column",
143  optional=True,
144  )
145  mag_column_list = pexConfig.ListField(
146  dtype=str,
147  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
148  "List of column names to use for photometric information. At least one entry is required."
149  )
150  mag_err_column_map = pexConfig.DictField(
151  keytype=str,
152  itemtype=str,
153  default={},
154  doc="A map of magnitude column name (key) to magnitude error column (value)."
155  )
156  is_photometric_name = pexConfig.Field(
157  dtype=str,
158  optional=True,
159  doc='Name of column stating if satisfactory for photometric calibration (optional).'
160  )
161  is_resolved_name = pexConfig.Field(
162  dtype=str,
163  optional=True,
164  doc='Name of column stating if the object is resolved (optional).'
165  )
166  is_variable_name = pexConfig.Field(
167  dtype=str,
168  optional=True,
169  doc='Name of column stating if the object is measured to be variable (optional).'
170  )
171  id_name = pexConfig.Field(
172  dtype=str,
173  optional=True,
174  doc='Name of column to use as an identifier (optional).'
175  )
176  pm_ra_name = pexConfig.Field(
177  dtype=str,
178  doc="Name of proper motion RA column",
179  optional=True,
180  )
181  pm_dec_name = pexConfig.Field(
182  dtype=str,
183  doc="Name of proper motion Dec column",
184  optional=True,
185  )
186  pm_ra_err_name = pexConfig.Field(
187  dtype=str,
188  doc="Name of proper motion RA error column",
189  optional=True,
190  )
191  pm_dec_err_name = pexConfig.Field(
192  dtype=str,
193  doc="Name of proper motion Dec error column",
194  optional=True,
195  )
196  pm_scale = pexConfig.Field(
197  dtype=float,
198  doc="Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
199  default=1.0,
200  )
201  parallax_name = pexConfig.Field(
202  dtype=str,
203  doc="Name of parallax column",
204  optional=True,
205  )
206  parallax_err_name = pexConfig.Field(
207  dtype=str,
208  doc="Name of parallax error column",
209  optional=True,
210  )
211  parallax_scale = pexConfig.Field(
212  dtype=float,
213  doc="Scale factor by which to multiply parallax values to obtain units of milliarcsec",
214  default=1.0,
215  )
216  epoch_name = pexConfig.Field(
217  dtype=str,
218  doc="Name of epoch column",
219  optional=True,
220  )
221  epoch_format = pexConfig.Field(
222  dtype=str,
223  doc="Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
224  optional=True,
225  )
226  epoch_scale = pexConfig.Field(
227  dtype=str,
228  doc="Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
229  optional=True,
230  )
231  extra_col_names = pexConfig.ListField(
232  dtype=str,
233  default=[],
234  doc='Extra columns to add to the reference catalog.'
235  )
236 
237  def setDefaults(self):
238  # Newly ingested reference catalogs always have the latest format_version.
239  self.dataset_config.format_version = LATEST_FORMAT_VERSION
240 
241  def validate(self):
242  pexConfig.Config.validate(self)
243 
244  def assertAllOrNone(*names):
245  """Raise ValueError unless all the named fields are set or are
246  all none (or blank)
247  """
248  setNames = [name for name in names if bool(getattr(self, name))]
249  if len(setNames) in (len(names), 0):
250  return
251  prefix = "Both or neither" if len(names) == 2 else "All or none"
252  raise ValueError("{} of {} must be set, but only {} are set".format(
253  prefix, ", ".join(names), ", ".join(setNames)))
254 
255  if not (self.ra_name and self.dec_name and self.mag_column_list):
256  raise ValueError(
257  "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
258  if self.mag_err_column_map and set(self.mag_column_list) != set(self.mag_err_column_map.keys()):
259  raise ValueError(
260  "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".format(
261  sorted(self.mag_err_column_map.keys()), sorted(self.mag_column_list)))
262  assertAllOrNone("ra_err_name", "dec_err_name")
263  assertAllOrNone("epoch_name", "epoch_format", "epoch_scale")
264  assertAllOrNone("pm_ra_name", "pm_dec_name")
265  assertAllOrNone("pm_ra_err_name", "pm_dec_err_name")
266  if self.pm_ra_err_name and not self.pm_ra_name:
267  raise ValueError('"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
268  if (self.pm_ra_name or self.parallax_name) and not self.epoch_name:
269  raise ValueError(
270  '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
271 
272 
273 class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
274  """Class for producing and loading indexed reference catalogs.
275 
276  This implements an indexing scheme based on hierarchical triangular
277  mesh (HTM). The term index really means breaking the catalog into
278  localized chunks called shards. In this case each shard contains
279  the entries from the catalog in a single HTM trixel
280 
281  For producing catalogs this task makes the following assumptions
282  about the input catalogs:
283  - RA, Dec, RA error and Dec error are all in decimal degrees.
284  - Epoch is available in a column, in a format supported by astropy.time.Time.
285  - There are no off-diagonal covariance terms, such as covariance
286  between RA and Dec, or between PM RA and PM Dec. Gaia is a well
287  known example of a catalog that has such terms, and thus should not
288  be ingested with this task.
289 
290  Parameters
291  ----------
292  butler : `lsst.daf.persistence.Butler`
293  Data butler for reading and writing catalogs
294  """
295  canMultiprocess = False
296  ConfigClass = IngestIndexedReferenceConfig
297  RunnerClass = IngestReferenceRunner
298  _DefaultName = 'IngestIndexedReferenceTask'
299 
300  _flags = ['photometric', 'resolved', 'variable']
301 
302  @classmethod
303  def _makeArgumentParser(cls):
304  """Create an argument parser.
305 
306  This returns a standard parser with an extra "files" argument.
307  """
308  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
309  parser.add_argument("files", nargs="+", help="Names of files to index")
310  return parser
311 
312  def __init__(self, *args, **kwargs):
313  self.butler = kwargs.pop('butler')
314  pipeBase.Task.__init__(self, *args, **kwargs)
315  self.indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
316  self.config.dataset_config.indexer.active)
317  self.makeSubtask('file_reader')
318 
319  def createIndexedCatalog(self, files):
320  """Index a set of files comprising a reference catalog.
321 
322  Outputs are persisted in the data repository.
323 
324  Parameters
325  ----------
326  files : `list`
327  A list of file paths to read.
328  """
329  rec_num = 0
330  first = True
331  for filename in files:
332  arr = self.file_reader.run(filename)
333  index_list = self.indexer.indexPoints(arr[self.config.ra_name], arr[self.config.dec_name])
334  if first:
335  schema, key_map = self.makeSchema(arr.dtype)
336  # persist empty catalog to hold the master schema
337  dataId = self.indexer.makeDataId('master_schema',
338  self.config.dataset_config.ref_dataset_name)
339  self.butler.put(self.getCatalog(dataId, schema), 'ref_cat',
340  dataId=dataId)
341  first = False
342  pixel_ids = set(index_list)
343  for pixel_id in pixel_ids:
344  dataId = self.indexer.makeDataId(pixel_id, self.config.dataset_config.ref_dataset_name)
345  catalog = self.getCatalog(dataId, schema)
346  els = np.where(index_list == pixel_id)
347  for row in arr[els]:
348  record = catalog.addNew()
349  rec_num = self._fillRecord(record, row, rec_num, key_map)
350  self.butler.put(catalog, 'ref_cat', dataId=dataId)
351  dataId = self.indexer.makeDataId(None, self.config.dataset_config.ref_dataset_name)
352  self.butler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
353 
354  @staticmethod
355  def computeCoord(row, ra_name, dec_name):
356  """Create an ICRS coord. from a row of a catalog being ingested.
357 
358  Parameters
359  ----------
360  row : structured `numpy.array`
361  Row from catalog being ingested.
362  ra_name : `str`
363  Name of RA key in catalog being ingested.
364  dec_name : `str`
365  Name of Dec key in catalog being ingested.
366 
367  Returns
368  -------
369  coord : `lsst.geom.SpherePoint`
370  ICRS coordinate.
371  """
372  return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
373 
374  def _setCoordErr(self, record, row, key_map):
375  """Set coordinate error in a record of an indexed catalog.
376 
377  The errors are read from the specified columns, and installed
378  in the appropriate columns of the output.
379 
380  Parameters
381  ----------
382  record : `lsst.afw.table.SimpleRecord`
383  Row from indexed catalog to modify.
384  row : structured `numpy.array`
385  Row from catalog being ingested.
386  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
387  Map of catalog keys.
388  """
389  if self.config.ra_err_name: # IngestIndexedReferenceConfig.validate ensures all or none
390  record.set(key_map["coord_raErr"], row[self.config.ra_err_name]*_RAD_PER_DEG)
391  record.set(key_map["coord_decErr"], row[self.config.dec_err_name]*_RAD_PER_DEG)
392 
393  def _setFlags(self, record, row, key_map):
394  """Set flags in an output record
395 
396  Parameters
397  ----------
398  record : `lsst.afw.table.SimpleRecord`
399  Row from indexed catalog to modify.
400  row : structured `numpy.array`
401  Row from catalog being ingested.
402  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
403  Map of catalog keys.
404  """
405  names = record.schema.getNames()
406  for flag in self._flags:
407  if flag in names:
408  attr_name = 'is_{}_name'.format(flag)
409  record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
410 
411  def _setFlux(self, record, row, key_map):
412  """Set flux fields in a record of an indexed catalog.
413 
414  Parameters
415  ----------
416  record : `lsst.afw.table.SimpleRecord`
417  Row from indexed catalog to modify.
418  row : structured `numpy.array`
419  Row from catalog being ingested.
420  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
421  Map of catalog keys.
422  """
423  for item in self.config.mag_column_list:
424  record.set(key_map[item+'_flux'], (row[item]*u.ABmag).to_value(u.nJy))
425  if len(self.config.mag_err_column_map) > 0:
426  for err_key in self.config.mag_err_column_map.keys():
427  error_col_name = self.config.mag_err_column_map[err_key]
428  # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
429  fluxErr = fluxErrFromABMagErr(row[error_col_name], row[err_key])
430  if fluxErr is not None:
431  fluxErr *= 1e9
432  record.set(key_map[err_key+'_fluxErr'], fluxErr)
433 
434  def _setProperMotion(self, record, row, key_map):
435  """Set proper motion fields in a record of an indexed catalog.
436 
437  The proper motions are read from the specified columns,
438  scaled appropriately, and installed in the appropriate
439  columns of the output.
440 
441  Parameters
442  ----------
443  record : `lsst.afw.table.SimpleRecord`
444  Row from indexed catalog to modify.
445  row : structured `numpy.array`
446  Row from catalog being ingested.
447  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
448  Map of catalog keys.
449  """
450  if self.config.pm_ra_name is None: # IngestIndexedReferenceConfig.validate ensures all or none
451  return
452  radPerOriginal = _RAD_PER_MILLIARCSEC*self.config.pm_scale
453  record.set(key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
454  record.set(key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
455  record.set(key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
456  if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
457  record.set(key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
458  record.set(key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
459 
460  def _epochToMjdTai(self, nativeEpoch):
461  """Convert an epoch in native format to TAI MJD (a float).
462  """
463  return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
464  scale=self.config.epoch_scale).tai.mjd
465 
466  def _setExtra(self, record, row, key_map):
467  """Set extra data fields in a record of an indexed catalog.
468 
469  Parameters
470  ----------
471  record : `lsst.afw.table.SimpleRecord`
472  Row from indexed catalog to modify.
473  row : structured `numpy.array`
474  Row from catalog being ingested.
475  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
476  Map of catalog keys.
477  """
478  for extra_col in self.config.extra_col_names:
479  value = row[extra_col]
480  # If data read from a text file contains string like entires,
481  # numpy stores this as its own internal type, a numpy.str_
482  # object. This seems to be a consequence of how numpy stores
483  # string like objects in fixed column arrays. This checks
484  # if any of the values to be added to the catalog are numpy
485  # string types, and if they are, casts them to a python string
486  # which is what the python c++ records expect
487  if isinstance(value, np.str_):
488  value = str(value)
489  record.set(key_map[extra_col], value)
490 
491  def _fillRecord(self, record, row, rec_num, key_map):
492  """Fill a record in an indexed catalog to be persisted.
493 
494  Parameters
495  ----------
496  record : `lsst.afw.table.SimpleRecord`
497  Row from indexed catalog to modify.
498  row : structured `numpy.array`
499  Row from catalog being ingested.
500  rec_num : `int`
501  Starting integer to increment for the unique id
502  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
503  Map of catalog keys.
504  """
505  record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
506  if self.config.id_name:
507  record.setId(row[self.config.id_name])
508  else:
509  rec_num += 1
510  record.setId(rec_num)
511 
512  self._setCoordErr(record, row, key_map)
513  self._setFlags(record, row, key_map)
514  self._setFlux(record, row, key_map)
515  self._setProperMotion(record, row, key_map)
516  self._setExtra(record, row, key_map)
517  return rec_num
518 
519  def getCatalog(self, dataId, schema):
520  """Get a catalog from the butler or create it if it doesn't exist.
521 
522  Parameters
523  ----------
524  dataId : `dict`
525  Identifier for catalog to retrieve
526  schema : `lsst.afw.table.Schema`
527  Schema to use in catalog creation if the butler can't get it
528 
529  Returns
530  -------
531  catalog : `lsst.afw.table.SimpleCatalog`
532  The catalog specified by `dataId`
533  """
534  if self.butler.datasetExists('ref_cat', dataId=dataId):
535  return self.butler.get('ref_cat', dataId=dataId)
536  catalog = afwTable.SimpleCatalog(schema)
537  addRefCatMetadata(catalog)
538  return catalog
539 
540  def makeSchema(self, dtype):
541  """Make the schema to use in constructing the persisted catalogs.
542 
543  Parameters
544  ----------
545  dtype : `numpy.dtype`
546  Data type describing each entry in ``config.extra_col_names``
547  for the catalogs being ingested.
548 
549  Returns
550  -------
551  schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
552  A tuple containing two items:
553  - The schema for the output source catalog.
554  - A map of catalog keys to use in filling the record
555  """
556  self.config.validate() # just to be sure
557 
558  # make a schema with the standard fields
559  schema = LoadReferenceObjectsTask.makeMinimalSchema(
560  filterNameList=self.config.mag_column_list,
561  addCentroid=False,
562  addIsPhotometric=bool(self.config.is_photometric_name),
563  addIsResolved=bool(self.config.is_resolved_name),
564  addIsVariable=bool(self.config.is_variable_name),
565  coordErrDim=2 if bool(self.config.ra_err_name) else 0,
566  addProperMotion=2 if bool(self.config.pm_ra_name) else 0,
567  properMotionErrDim=2 if bool(self.config.pm_ra_err_name) else 0,
568  addParallax=bool(self.config.parallax_name),
569  addParallaxErr=bool(self.config.parallax_err_name),
570  )
571  keysToSkip = set(("id", "centroid_x", "centroid_y", "hasCentroid"))
572  key_map = {fieldName: schema[fieldName].asKey() for fieldName in schema.getOrderedNames()
573  if fieldName not in keysToSkip}
574 
575  def addField(name):
576  if dtype[name].kind == 'U':
577  # dealing with a string like thing. Need to get type and size.
578  at_size = dtype[name].itemsize
579  return schema.addField(name, type=str, size=at_size)
580  else:
581  at_type = dtype[name].type
582  return schema.addField(name, at_type)
583 
584  for col in self.config.extra_col_names:
585  key_map[col] = addField(col)
586  return schema, key_map
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:60
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
daf::base::PropertySet * set
Definition: fits.cc:884
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:63
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...