LSSTApplications  16.0-10-g0ee56ad+5,16.0-11-ga33d1f2+5,16.0-12-g3ef5c14+3,16.0-12-g71e5ef5+18,16.0-12-gbdf3636+3,16.0-13-g118c103+3,16.0-13-g8f68b0a+3,16.0-15-gbf5c1cb+4,16.0-16-gfd17674+3,16.0-17-g7c01f5c+3,16.0-18-g0a50484+1,16.0-20-ga20f992+8,16.0-21-g0e05fd4+6,16.0-21-g15e2d33+4,16.0-22-g62d8060+4,16.0-22-g847a80f+4,16.0-25-gf00d9b8+1,16.0-28-g3990c221+4,16.0-3-gf928089+3,16.0-32-g88a4f23+5,16.0-34-gd7987ad+3,16.0-37-gc7333cb+2,16.0-4-g10fc685+2,16.0-4-g18f3627+26,16.0-4-g5f3a788+26,16.0-5-gaf5c3d7+4,16.0-5-gcc1f4bb+1,16.0-6-g3b92700+4,16.0-6-g4412fcd+3,16.0-6-g7235603+4,16.0-69-g2562ce1b+2,16.0-8-g14ebd58+4,16.0-8-g2df868b+1,16.0-8-g4cec79c+6,16.0-8-gadf6c7a+1,16.0-8-gfc7ad86,16.0-82-g59ec2a54a+1,16.0-9-g5400cdc+2,16.0-9-ge6233d7+5,master-g2880f2d8cf+3,v17.0.rc1
LSSTDataManagementBasePackage
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 #
4 # Copyright 2008-2017 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig"]
25 
26 import math
27 
28 import astropy.time
29 import numpy as np
30 
31 import lsst.pex.config as pexConfig
32 import lsst.pipe.base as pipeBase
33 import lsst.geom
34 import lsst.afw.table as afwTable
35 from lsst.afw.image import fluxFromABMag, fluxErrFromABMagErr
36 from .indexerRegistry import IndexerRegistry
37 from .readTextCatalogTask import ReadTextCatalogTask
38 from .loadReferenceObjects import LoadReferenceObjectsTask
39 
40 _RAD_PER_DEG = math.pi / 180
41 _RAD_PER_MILLIARCSEC = _RAD_PER_DEG/(3600*1000)
42 
43 
44 class IngestReferenceRunner(pipeBase.TaskRunner):
45  """Task runner for the reference catalog ingester
46 
47  Data IDs are ignored so the runner should just run the task on the parsed command.
48  """
49 
50  def run(self, parsedCmd):
51  """Run the task.
52 
53  Several arguments need to be collected to send on to the task methods.
54 
55  Parameters
56  ----------
57  parsedCmd : `argparse.Namespace`
58  Parsed command.
59 
60  Returns
61  -------
62  results : `lsst.pipe.base.Struct` or `None`
63  A empty struct if self.doReturnResults, else None
64  """
65  files = parsedCmd.files
66  butler = parsedCmd.butler
67  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
68  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
69 
70  task.createIndexedCatalog(files)
71  if self.doReturnResults:
72  return pipeBase.Struct()
73 
74 
75 class DatasetConfig(pexConfig.Config):
76  ref_dataset_name = pexConfig.Field(
77  dtype=str,
78  default='cal_ref_cat',
79  doc='String to pass to the butler to retrieve persisted files.',
80  )
81  indexer = IndexerRegistry.makeField(
82  default='HTM',
83  doc='Name of indexer algoritm to use. Default is HTM',
84  )
85 
86 
87 class IngestIndexedReferenceConfig(pexConfig.Config):
88  dataset_config = pexConfig.ConfigField(
89  dtype=DatasetConfig,
90  doc="Configuration for reading the ingested data",
91  )
92  file_reader = pexConfig.ConfigurableField(
93  target=ReadTextCatalogTask,
94  doc='Task to use to read the files. Default is to expect text files.'
95  )
96  ra_name = pexConfig.Field(
97  dtype=str,
98  doc="Name of RA column",
99  )
100  dec_name = pexConfig.Field(
101  dtype=str,
102  doc="Name of Dec column",
103  )
104  ra_err_name = pexConfig.Field(
105  dtype=str,
106  doc="Name of RA error column",
107  optional=True,
108  )
109  dec_err_name = pexConfig.Field(
110  dtype=str,
111  doc="Name of Dec error column",
112  optional=True,
113  )
114  mag_column_list = pexConfig.ListField(
115  dtype=str,
116  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
117  "List of column names to use for photometric information. At least one entry is required."
118  )
119  mag_err_column_map = pexConfig.DictField(
120  keytype=str,
121  itemtype=str,
122  default={},
123  doc="A map of magnitude column name (key) to magnitude error column (value)."
124  )
125  is_photometric_name = pexConfig.Field(
126  dtype=str,
127  optional=True,
128  doc='Name of column stating if satisfactory for photometric calibration (optional).'
129  )
130  is_resolved_name = pexConfig.Field(
131  dtype=str,
132  optional=True,
133  doc='Name of column stating if the object is resolved (optional).'
134  )
135  is_variable_name = pexConfig.Field(
136  dtype=str,
137  optional=True,
138  doc='Name of column stating if the object is measured to be variable (optional).'
139  )
140  id_name = pexConfig.Field(
141  dtype=str,
142  optional=True,
143  doc='Name of column to use as an identifier (optional).'
144  )
145  pm_ra_name = pexConfig.Field(
146  dtype=str,
147  doc="Name of proper motion RA column",
148  optional=True,
149  )
150  pm_dec_name = pexConfig.Field(
151  dtype=str,
152  doc="Name of proper motion Dec column",
153  optional=True,
154  )
155  pm_ra_err_name = pexConfig.Field(
156  dtype=str,
157  doc="Name of proper motion RA error column",
158  optional=True,
159  )
160  pm_dec_err_name = pexConfig.Field(
161  dtype=str,
162  doc="Name of proper motion Dec error column",
163  optional=True,
164  )
165  pm_scale = pexConfig.Field(
166  dtype=float,
167  doc="Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
168  default=1.0,
169  )
170  parallax_name = pexConfig.Field(
171  dtype=str,
172  doc="Name of parallax column",
173  optional=True,
174  )
175  parallax_err_name = pexConfig.Field(
176  dtype=str,
177  doc="Name of parallax error column",
178  optional=True,
179  )
180  parallax_scale = pexConfig.Field(
181  dtype=float,
182  doc="Scale factor by which to multiply parallax values to obtain units of milliarcsec",
183  default=1.0,
184  )
185  epoch_name = pexConfig.Field(
186  dtype=str,
187  doc="Name of epoch column",
188  optional=True,
189  )
190  epoch_format = pexConfig.Field(
191  dtype=str,
192  doc="Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
193  optional=True,
194  )
195  epoch_scale = pexConfig.Field(
196  dtype=str,
197  doc="Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
198  optional=True,
199  )
200  extra_col_names = pexConfig.ListField(
201  dtype=str,
202  default=[],
203  doc='Extra columns to add to the reference catalog.'
204  )
205 
206  def validate(self):
207  pexConfig.Config.validate(self)
208 
209  def assertAllOrNone(*names):
210  """Raise ValueError unless all the named fields are set or are
211  all none (or blank)
212  """
213  setNames = [name for name in names if bool(getattr(self, name))]
214  if len(setNames) in (len(names), 0):
215  return
216  prefix = "Both or neither" if len(names) == 2 else "All or none"
217  raise ValueError("{} of {} must be set, but only {} are set".format(
218  prefix, ", ".join(names), ", ".join(setNames)))
219 
220  if not (self.ra_name and self.dec_name and self.mag_column_list):
221  raise ValueError(
222  "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
223  if self.mag_err_column_map and set(self.mag_column_list) != set(self.mag_err_column_map.keys()):
224  raise ValueError(
225  "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".format(
226  sorted(self.mag_err_column_map.keys()), sorted(self.mag_column_list)))
227  assertAllOrNone("ra_err_name", "dec_err_name")
228  assertAllOrNone("epoch_name", "epoch_format", "epoch_scale")
229  assertAllOrNone("pm_ra_name", "pm_dec_name")
230  assertAllOrNone("pm_ra_err_name", "pm_dec_err_name")
231  if self.pm_ra_err_name and not self.pm_ra_name:
232  raise ValueError('"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
233  if (self.pm_ra_name or self.parallax_name) and not self.epoch_name:
234  raise ValueError(
235  '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
236 
237 
238 class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
239  """Class for producing and loading indexed reference catalogs.
240 
241  This implements an indexing scheme based on hierarchical triangular
242  mesh (HTM). The term index really means breaking the catalog into
243  localized chunks called shards. In this case each shard contains
244  the entries from the catalog in a single HTM trixel
245 
246  For producing catalogs this task makes the following assumptions
247  about the input catalogs:
248  - RA, Dec, RA error and Dec error are all in decimal degrees.
249  - Epoch is available in a column, in a format supported by astropy.time.Time.
250  - There are no off-diagonal covariance terms, such as covariance
251  between RA and Dec, or between PM RA and PM Dec. Gaia is a well
252  known example of a catalog that has such terms, and thus should not
253  be ingested with this task.
254 
255  Parameters
256  ----------
257  butler : `lsst.daf.persistence.Butler`
258  Data butler for reading and writing catalogs
259  """
260  canMultiprocess = False
261  ConfigClass = IngestIndexedReferenceConfig
262  RunnerClass = IngestReferenceRunner
263  _DefaultName = 'IngestIndexedReferenceTask'
264 
265  _flags = ['photometric', 'resolved', 'variable']
266 
267  @classmethod
268  def _makeArgumentParser(cls):
269  """Create an argument parser.
270 
271  This returns a standard parser with an extra "files" argument.
272  """
273  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
274  parser.add_argument("files", nargs="+", help="Names of files to index")
275  return parser
276 
277  def __init__(self, *args, **kwargs):
278  self.butler = kwargs.pop('butler')
279  pipeBase.Task.__init__(self, *args, **kwargs)
280  self.indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
281  self.config.dataset_config.indexer.active)
282  self.makeSubtask('file_reader')
283 
284  def createIndexedCatalog(self, files):
285  """Index a set of files comprising a reference catalog.
286 
287  Outputs are persisted in the data repository.
288 
289  Parameters
290  ----------
291  files : `list`
292  A list of file paths to read.
293  """
294  rec_num = 0
295  first = True
296  for filename in files:
297  arr = self.file_reader.run(filename)
298  index_list = self.indexer.indexPoints(arr[self.config.ra_name], arr[self.config.dec_name])
299  if first:
300  schema, key_map = self.makeSchema(arr.dtype)
301  # persist empty catalog to hold the master schema
302  dataId = self.indexer.makeDataId('master_schema',
303  self.config.dataset_config.ref_dataset_name)
304  self.butler.put(self.getCatalog(dataId, schema), 'ref_cat',
305  dataId=dataId)
306  first = False
307  pixel_ids = set(index_list)
308  for pixel_id in pixel_ids:
309  dataId = self.indexer.makeDataId(pixel_id, self.config.dataset_config.ref_dataset_name)
310  catalog = self.getCatalog(dataId, schema)
311  els = np.where(index_list == pixel_id)
312  for row in arr[els]:
313  record = catalog.addNew()
314  rec_num = self._fillRecord(record, row, rec_num, key_map)
315  self.butler.put(catalog, 'ref_cat', dataId=dataId)
316  dataId = self.indexer.makeDataId(None, self.config.dataset_config.ref_dataset_name)
317  self.butler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
318 
319  @staticmethod
320  def computeCoord(row, ra_name, dec_name):
321  """Create an ICRS coord. from a row of a catalog being ingested.
322 
323  Parameters
324  ----------
325  row : structured `numpy.array`
326  Row from catalog being ingested.
327  ra_name : `str`
328  Name of RA key in catalog being ingested.
329  dec_name : `str`
330  Name of Dec key in catalog being ingested.
331 
332  Returns
333  -------
334  coord : `lsst.geom.SpherePoint`
335  ICRS coordinate.
336  """
337  return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
338 
339  def _setCoordErr(self, record, row, key_map):
340  """Set coordinate error in a record of an indexed catalog.
341 
342  The errors are read from the specified columns, and installed
343  in the appropriate columns of the output.
344 
345  Parameters
346  ----------
347  record : `lsst.afw.table.SimpleRecord`
348  Row from indexed catalog to modify.
349  row : structured `numpy.array`
350  Row from catalog being ingested.
351  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
352  Map of catalog keys.
353  """
354  if self.config.ra_err_name: # IngestIndexedReferenceConfig.validate ensures all or none
355  record.set(key_map["coord_raErr"], row[self.config.ra_err_name]*_RAD_PER_DEG)
356  record.set(key_map["coord_decErr"], row[self.config.dec_err_name]*_RAD_PER_DEG)
357 
358  def _setFlags(self, record, row, key_map):
359  """Set flags in an output record
360 
361  Parameters
362  ----------
363  record : `lsst.afw.table.SimpleRecord`
364  Row from indexed catalog to modify.
365  row : structured `numpy.array`
366  Row from catalog being ingested.
367  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
368  Map of catalog keys.
369  """
370  names = record.schema.getNames()
371  for flag in self._flags:
372  if flag in names:
373  attr_name = 'is_{}_name'.format(flag)
374  record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
375 
376  def _setFlux(self, record, row, key_map):
377  """Set flux fields in a record of an indexed catalog.
378 
379  Parameters
380  ----------
381  record : `lsst.afw.table.SimpleRecord`
382  Row from indexed catalog to modify.
383  row : structured `numpy.array`
384  Row from catalog being ingested.
385  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
386  Map of catalog keys.
387  """
388  for item in self.config.mag_column_list:
389  record.set(key_map[item+'_flux'], fluxFromABMag(row[item]))
390  if len(self.config.mag_err_column_map) > 0:
391  for err_key in self.config.mag_err_column_map.keys():
392  error_col_name = self.config.mag_err_column_map[err_key]
393  record.set(key_map[err_key+'_fluxErr'],
394  fluxErrFromABMagErr(row[error_col_name], row[err_key]))
395 
396  def _setProperMotion(self, record, row, key_map):
397  """Set proper motion fields in a record of an indexed catalog.
398 
399  The proper motions are read from the specified columns,
400  scaled appropriately, and installed in the appropriate
401  columns of the output.
402 
403  Parameters
404  ----------
405  record : `lsst.afw.table.SimpleRecord`
406  Row from indexed catalog to modify.
407  row : structured `numpy.array`
408  Row from catalog being ingested.
409  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
410  Map of catalog keys.
411  """
412  if self.config.pm_ra_name is None: # IngestIndexedReferenceConfig.validate ensures all or none
413  return
414  radPerOriginal = _RAD_PER_MILLIARCSEC*self.config.pm_scale
415  record.set(key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
416  record.set(key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
417  record.set(key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
418  if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
419  record.set(key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
420  record.set(key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
421 
422  def _epochToMjdTai(self, nativeEpoch):
423  """Convert an epoch in native format to TAI MJD (a float).
424  """
425  return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
426  scale=self.config.epoch_scale).tai.mjd
427 
428  def _setExtra(self, record, row, key_map):
429  """Set extra data fields in a record of an indexed catalog.
430 
431  Parameters
432  ----------
433  record : `lsst.afw.table.SimpleRecord`
434  Row from indexed catalog to modify.
435  row : structured `numpy.array`
436  Row from catalog being ingested.
437  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
438  Map of catalog keys.
439  """
440  for extra_col in self.config.extra_col_names:
441  value = row[extra_col]
442  # If data read from a text file contains string like entires,
443  # numpy stores this as its own internal type, a numpy.str_
444  # object. This seems to be a consequence of how numpy stores
445  # string like objects in fixed column arrays. This checks
446  # if any of the values to be added to the catalog are numpy
447  # string types, and if they are, casts them to a python string
448  # which is what the python c++ records expect
449  if isinstance(value, np.str_):
450  value = str(value)
451  record.set(key_map[extra_col], value)
452 
453  def _fillRecord(self, record, row, rec_num, key_map):
454  """Fill a record in an indexed catalog to be persisted.
455 
456  Parameters
457  ----------
458  record : `lsst.afw.table.SimpleRecord`
459  Row from indexed catalog to modify.
460  row : structured `numpy.array`
461  Row from catalog being ingested.
462  rec_num : `int`
463  Starting integer to increment for the unique id
464  key_map : `dict` mapping `str` to `lsst.afw.table.Key`
465  Map of catalog keys.
466  """
467  record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
468  if self.config.id_name:
469  record.setId(row[self.config.id_name])
470  else:
471  rec_num += 1
472  record.setId(rec_num)
473 
474  self._setCoordErr(record, row, key_map)
475  self._setFlags(record, row, key_map)
476  self._setFlux(record, row, key_map)
477  self._setProperMotion(record, row, key_map)
478  self._setExtra(record, row, key_map)
479  return rec_num
480 
481  def getCatalog(self, dataId, schema):
482  """Get a catalog from the butler or create it if it doesn't exist.
483 
484  Parameters
485  ----------
486  dataId : `dict`
487  Identifier for catalog to retrieve
488  schema : `lsst.afw.table.Schema`
489  Schema to use in catalog creation if the butler can't get it
490 
491  Returns
492  -------
493  catalog : `lsst.afw.table.SimpleCatalog`
494  The catalog specified by `dataId`
495  """
496  if self.butler.datasetExists('ref_cat', dataId=dataId):
497  return self.butler.get('ref_cat', dataId=dataId)
498  return afwTable.SimpleCatalog(schema)
499 
500  def makeSchema(self, dtype):
501  """Make the schema to use in constructing the persisted catalogs.
502 
503  Parameters
504  ----------
505  dtype : `numpy.dtype`
506  Data type describing each entry in ``config.extra_col_names``
507  for the catalogs being ingested.
508 
509  Returns
510  -------
511  schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
512  A tuple containing two items:
513  - The schema for the output source catalog.
514  - A map of catalog keys to use in filling the record
515  """
516  self.config.validate() # just to be sure
517 
518  # make a schema with the standard fields
519  schema = LoadReferenceObjectsTask.makeMinimalSchema(
520  filterNameList=self.config.mag_column_list,
521  addCentroid=False,
522  addIsPhotometric=bool(self.config.is_photometric_name),
523  addIsResolved=bool(self.config.is_resolved_name),
524  addIsVariable=bool(self.config.is_variable_name),
525  coordErrDim=2 if bool(self.config.ra_err_name) else 0,
526  addProperMotion=2 if bool(self.config.pm_ra_name) else 0,
527  properMotionErrDim=2 if bool(self.config.pm_ra_err_name) else 0,
528  addParallax=bool(self.config.parallax_name),
529  addParallaxErr=bool(self.config.parallax_err_name),
530  )
531  keysToSkip = set(("id", "centroid_x", "centroid_y", "hasCentroid"))
532  key_map = {fieldName: schema[fieldName].asKey() for fieldName in schema.getOrderedNames()
533  if fieldName not in keysToSkip}
534 
535  def addField(name):
536  if dtype[name].kind == 'U':
537  # dealing with a string like thing. Need to get type and size.
538  at_size = dtype[name].itemsize
539  return schema.addField(name, type=str, size=at_size)
540  else:
541  at_type = dtype[name].type
542  return schema.addField(name, at_type)
543 
544  for col in self.config.extra_col_names:
545  key_map[col] = addField(col)
546  return schema, key_map
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:69
daf::base::PropertySet * set
Definition: fits.cc:832
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:63
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:168
double fluxFromABMag(double mag) noexcept
Compute flux in Janskys from AB magnitude.
Definition: Calib.h:66
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57