LSST Applications  21.0.0-147-g0e635eb1+1acddb5be5,22.0.0+052faf71bd,22.0.0+1ea9a8b2b2,22.0.0+6312710a6c,22.0.0+729191ecac,22.0.0+7589c3a021,22.0.0+9f079a9461,22.0.1-1-g7d6de66+b8044ec9de,22.0.1-1-g87000a6+536b1ee016,22.0.1-1-g8e32f31+6312710a6c,22.0.1-10-gd060f87+016f7cdc03,22.0.1-12-g9c3108e+df145f6f68,22.0.1-16-g314fa6d+c825727ab8,22.0.1-19-g93a5c75+d23f2fb6d8,22.0.1-19-gb93eaa13+aab3ef7709,22.0.1-2-g8ef0a89+b8044ec9de,22.0.1-2-g92698f7+9f079a9461,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gac51dbf+052faf71bd,22.0.1-2-gb66926d+6312710a6c,22.0.1-2-gcb770ba+09e3807989,22.0.1-20-g32debb5+b8044ec9de,22.0.1-23-gc2439a9a+fb0756638e,22.0.1-3-g496fd5d+09117f784f,22.0.1-3-g59f966b+1e6ba2c031,22.0.1-3-g849a1b8+f8b568069f,22.0.1-3-gaaec9c0+c5c846a8b1,22.0.1-32-g5ddfab5d3+60ce4897b0,22.0.1-4-g037fbe1+64e601228d,22.0.1-4-g8623105+b8044ec9de,22.0.1-5-g096abc9+d18c45d440,22.0.1-5-g15c806e+57f5c03693,22.0.1-7-gba73697+57f5c03693,master-g6e05de7fdc+c1283a92b8,master-g72cdda8301+729191ecac,w.2021.39
LSST Data Management Base Package
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 # This file is part of meas_algorithms.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 
23 # TODO DM-31698: post-gen2 removal notes
24 # `DatasetConfig`, `ConvertReferenceCatalogBase`, and `ConvertReferenceCatalogConfig`
25 # should all be moved to to `convertReferenceCatalog.py` once gen2 butler
26 # has been removed.
27 
28 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig",
29  "ConvertReferenceCatalogBase", "ConvertReferenceCatalogConfig"]
30 
31 import abc
32 import os.path
33 
34 import astropy.units
35 
36 import lsst.pex.config as pexConfig
37 import lsst.pipe.base as pipeBase
38 import lsst.geom
39 import lsst.sphgeom
40 import lsst.afw.table as afwTable
41 from lsst.daf.base import PropertyList
42 from .indexerRegistry import IndexerRegistry
43 from .readTextCatalogTask import ReadTextCatalogTask
44 from .loadReferenceObjects import LoadReferenceObjectsTask
45 from . import convertRefcatManager
46 
47 # The most recent Indexed Reference Catalog on-disk format version.
48 LATEST_FORMAT_VERSION = 1
49 
50 
51 def addRefCatMetadata(catalog):
52  """Add metadata to a new (not yet populated) reference catalog.
53 
54  Parameters
55  ----------
56  catalog : `lsst.afw.table.SimpleCatalog`
57  Catalog to which metadata should be attached. Will be modified
58  in-place.
59  """
60  md = catalog.getMetadata()
61  if md is None:
62  md = PropertyList()
63  md.set("REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
64  catalog.setMetadata(md)
65 
66 
67 class IngestReferenceRunner(pipeBase.TaskRunner):
68  """Task runner for the reference catalog ingester (gen2 version).
69 
70  Data IDs are ignored so the runner should just run the task on the parsed command.
71  """
72 
73  def run(self, parsedCmd):
74  """Run the task.
75 
76  Several arguments need to be collected to send on to the task methods.
77 
78  Parameters
79  ----------
80  parsedCmd : `argparse.Namespace`
81  Parsed command.
82 
83  Returns
84  -------
85  results : `lsst.pipe.base.Struct` or `None`
86  A empty struct if self.doReturnResults, else None
87  """
88  files = parsedCmd.files
89  butler = parsedCmd.butler
90  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
91  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
92 
93  task.run(files)
94  if self.doReturnResults:
95  return pipeBase.Struct()
96 
97 
98 class DatasetConfig(pexConfig.Config):
99  """The description of the on-disk storage format for the persisted
100  reference catalog.
101  """
102  format_version = pexConfig.Field(
103  dtype=int,
104  doc="Version number of the persisted on-disk storage format."
105  "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
106  "\nVersion 1 had nJy as flux units.",
107  default=0 # This needs to always be 0, so that unversioned catalogs are interpreted as version 0.
108  )
109  ref_dataset_name = pexConfig.Field(
110  dtype=str,
111  # TODO DM-31817: remove this default value.
112  default='cal_ref_cat',
113  doc="Name of this reference catalog to be used in the butler registry.",
114  )
115  indexer = IndexerRegistry.makeField(
116  default='HTM',
117  doc='Name of indexer algoritm to use. Default is HTM',
118  )
119 
120 
121 class ConvertReferenceCatalogConfig(pexConfig.Config):
122  dataset_config = pexConfig.ConfigField(
123  dtype=DatasetConfig,
124  doc="Configuration for reading the ingested data",
125  )
126  n_processes = pexConfig.Field(
127  dtype=int,
128  doc=("Number of python processes to use when ingesting."),
129  default=1
130  )
131  manager = pexConfig.ConfigurableField(
133  doc="Multiprocessing manager to perform the actual conversion of values, file-by-file."
134  )
135  file_reader = pexConfig.ConfigurableField(
136  target=ReadTextCatalogTask,
137  doc='Task to use to read the files. Default is to expect text files.'
138  )
139  ra_name = pexConfig.Field(
140  dtype=str,
141  doc="Name of RA column (values in decimal degrees)",
142  )
143  dec_name = pexConfig.Field(
144  dtype=str,
145  doc="Name of Dec column (values in decimal degrees)",
146  )
147  ra_err_name = pexConfig.Field(
148  dtype=str,
149  doc="Name of RA error column",
150  optional=True,
151  )
152  dec_err_name = pexConfig.Field(
153  dtype=str,
154  doc="Name of Dec error column",
155  optional=True,
156  )
157  coord_err_unit = pexConfig.Field(
158  dtype=str,
159  doc="Unit of RA/Dec error fields (astropy.unit.Unit compatible)",
160  optional=True
161  )
162  mag_column_list = pexConfig.ListField(
163  dtype=str,
164  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
165  "List of column names to use for photometric information. At least one entry is required."
166  )
167  mag_err_column_map = pexConfig.DictField(
168  keytype=str,
169  itemtype=str,
170  default={},
171  doc="A map of magnitude column name (key) to magnitude error column (value)."
172  )
173  is_photometric_name = pexConfig.Field(
174  dtype=str,
175  optional=True,
176  doc='Name of column stating if satisfactory for photometric calibration (optional).'
177  )
178  is_resolved_name = pexConfig.Field(
179  dtype=str,
180  optional=True,
181  doc='Name of column stating if the object is resolved (optional).'
182  )
183  is_variable_name = pexConfig.Field(
184  dtype=str,
185  optional=True,
186  doc='Name of column stating if the object is measured to be variable (optional).'
187  )
188  id_name = pexConfig.Field(
189  dtype=str,
190  optional=True,
191  doc='Name of column to use as an identifier (optional).'
192  )
193  pm_ra_name = pexConfig.Field(
194  dtype=str,
195  doc="Name of proper motion RA column",
196  optional=True,
197  )
198  pm_dec_name = pexConfig.Field(
199  dtype=str,
200  doc="Name of proper motion Dec column",
201  optional=True,
202  )
203  pm_ra_err_name = pexConfig.Field(
204  dtype=str,
205  doc="Name of proper motion RA error column",
206  optional=True,
207  )
208  pm_dec_err_name = pexConfig.Field(
209  dtype=str,
210  doc="Name of proper motion Dec error column",
211  optional=True,
212  )
213  pm_scale = pexConfig.Field(
214  dtype=float,
215  doc="Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
216  default=1.0,
217  )
218  parallax_name = pexConfig.Field(
219  dtype=str,
220  doc="Name of parallax column",
221  optional=True,
222  )
223  parallax_err_name = pexConfig.Field(
224  dtype=str,
225  doc="Name of parallax error column",
226  optional=True,
227  )
228  parallax_scale = pexConfig.Field(
229  dtype=float,
230  doc="Scale factor by which to multiply parallax values to obtain units of milliarcsec",
231  default=1.0,
232  )
233  epoch_name = pexConfig.Field(
234  dtype=str,
235  doc="Name of epoch column",
236  optional=True,
237  )
238  epoch_format = pexConfig.Field(
239  dtype=str,
240  doc="Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
241  optional=True,
242  )
243  epoch_scale = pexConfig.Field(
244  dtype=str,
245  doc="Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
246  optional=True,
247  )
248  extra_col_names = pexConfig.ListField(
249  dtype=str,
250  default=[],
251  doc='Extra columns to add to the reference catalog.'
252  )
253 
254  def setDefaults(self):
255  # Newly ingested reference catalogs always have the latest format_version.
256  self.dataset_configdataset_config.format_version = LATEST_FORMAT_VERSION
257  # gen3 refcats are all depth=7
258  self.dataset_configdataset_config.indexer['HTM'].depth = 7
259 
260  def validate(self):
261  pexConfig.Config.validate(self)
262 
263  def assertAllOrNone(*names):
264  """Raise ValueError unless all the named fields are set or are
265  all none (or blank)
266  """
267  setNames = [name for name in names if bool(getattr(self, name))]
268  if len(setNames) in (len(names), 0):
269  return
270  prefix = "Both or neither" if len(names) == 2 else "All or none"
271  raise ValueError("{} of {} must be set, but only {} are set".format(
272  prefix, ", ".join(names), ", ".join(setNames)))
273 
274  if not (self.ra_namera_name and self.dec_namedec_name and self.mag_column_listmag_column_list):
275  raise ValueError(
276  "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
277  if self.mag_err_column_mapmag_err_column_map and set(self.mag_column_listmag_column_list) != set(self.mag_err_column_mapmag_err_column_map.keys()):
278  raise ValueError(
279  "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".format(
280  sorted(self.mag_err_column_mapmag_err_column_map.keys()), sorted(self.mag_column_listmag_column_list)))
281  assertAllOrNone("ra_err_name", "dec_err_name", "coord_err_unit")
282  if self.coord_err_unitcoord_err_unit is not None:
283  result = astropy.units.Unit(self.coord_err_unitcoord_err_unit, parse_strict='silent')
284  if isinstance(result, astropy.units.UnrecognizedUnit):
285  msg = f"{self.coord_err_unit} is not a valid astropy unit string."
286  raise pexConfig.FieldValidationError(IngestIndexedReferenceConfig.coord_err_unit, self, msg)
287 
288  assertAllOrNone("epoch_name", "epoch_format", "epoch_scale")
289  assertAllOrNone("pm_ra_name", "pm_dec_name")
290  assertAllOrNone("pm_ra_err_name", "pm_dec_err_name")
291  assertAllOrNone("parallax_name", "parallax_err_name")
292  if self.pm_ra_err_namepm_ra_err_name and not self.pm_ra_namepm_ra_name:
293  raise ValueError('"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
294  if (self.pm_ra_namepm_ra_name or self.parallax_nameparallax_name) and not self.epoch_nameepoch_name:
295  raise ValueError(
296  '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
297 
298 
300  """For gen2 backwards compatibility.
301  """
302  pass
303 
304 
305 class ConvertReferenceCatalogBase(pipeBase.Task, abc.ABC):
306  """Base class for producing and loading indexed reference catalogs,
307  shared between gen2 and gen3.
308 
309  This implements an indexing scheme based on hierarchical triangular
310  mesh (HTM). The term index really means breaking the catalog into
311  localized chunks called shards. In this case each shard contains
312  the entries from the catalog in a single HTM trixel
313 
314  For producing catalogs this task makes the following assumptions
315  about the input catalogs:
316  - RA, Dec are in decimal degrees.
317  - Epoch is available in a column, in a format supported by astropy.time.Time.
318  - There are no off-diagonal covariance terms, such as covariance
319  between RA and Dec, or between PM RA and PM Dec. Support for such
320  covariance would have to be added to to the config, including consideration
321  of the units in the input catalog.
322  """
323  canMultiprocess = False
324  ConfigClass = ConvertReferenceCatalogConfig
325 
326  def __init__(self, *args, **kwargs):
327  super().__init__(*args, **kwargs)
328  self.indexerindexer = IndexerRegistry[self.config.dataset_config.indexer.name](
329  self.config.dataset_config.indexer.active)
330  self.makeSubtask('file_reader')
331 
332  def run(self, inputFiles):
333  """Index a set of files comprising a reference catalog.
334 
335  Outputs are persisted in the butler repository.
336 
337  Parameters
338  ----------
339  inputFiles : `list`
340  A list of file paths to read.
341  """
342  self._preRun_preRun()
343  schema, key_map = self._saveMasterSchema_saveMasterSchema(inputFiles[0])
344  # create an HTM we can interrogate about pixel ids
345  htm = lsst.sphgeom.HtmPixelization(self.indexerindexer.htm.get_depth())
346  filenames = self._getButlerFilenames_getButlerFilenames(htm)
347  worker = self.config.manager.target(filenames,
348  self.config,
349  self.file_reader,
350  self.indexerindexer,
351  schema,
352  key_map,
353  htm.universe()[0],
354  addRefCatMetadata,
355  self.log)
356  result = worker.run(inputFiles)
357 
358  self._persistConfig_persistConfig()
359  self._postRun_postRun(result)
360 
361  def _preRun(self):
362  """Any setup that has to be performed at the start of ``run``, but that
363  cannot be performed during ``__init__`` (e.g. making directories).
364  """
365  pass
366 
367  def _postRun(self, result):
368  """Any tasks that have to happen at the end of ``run``.
369 
370  Parameters
371  ----------
372  result
373  The result returned from``worker.run()``.
374  """
375  pass
376 
377  def _getButlerFilenames(self, htm):
378  """Get filenames from the butler for each output htm pixel.
379 
380  Parameters
381  ----------
382  htm : `lsst.sphgeom.HtmPixelization`
383  The HTM pixelization scheme to be used to build filenames.
384 
385  Returns
386  -------
387  filenames : `list [str]`
388  List of filenames to write each HTM pixel to.
389  """
390  filenames = {}
391  start, end = htm.universe()[0]
392  # path manipulation because butler.get() per pixel will take forever
393  path = self._getOnePixelFilename(start)
394  base = os.path.join(os.path.dirname(path), "%d"+os.path.splitext(path)[1])
395  for pixelId in range(start, end):
396  filenames[pixelId] = base % pixelId
397 
398  return filenames
399 
400  def makeSchema(self, dtype):
401  """Make the schema to use in constructing the persisted catalogs.
402 
403  Parameters
404  ----------
405  dtype : `numpy.dtype`
406  Data type describing each entry in ``config.extra_col_names``
407  for the catalogs being ingested.
408 
409  Returns
410  -------
411  schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
412  A tuple containing two items:
413  - The schema for the output source catalog.
414  - A map of catalog keys to use in filling the record
415  """
416  # make a schema with the standard fields
417  schema = LoadReferenceObjectsTask.makeMinimalSchema(
418  filterNameList=self.config.mag_column_list,
419  addCentroid=False,
420  addIsPhotometric=bool(self.config.is_photometric_name),
421  addIsResolved=bool(self.config.is_resolved_name),
422  addIsVariable=bool(self.config.is_variable_name),
423  coordErrDim=2 if bool(self.config.ra_err_name) else 0,
424  addProperMotion=2 if bool(self.config.pm_ra_name) else 0,
425  properMotionErrDim=2 if bool(self.config.pm_ra_err_name) else 0,
426  addParallax=bool(self.config.parallax_name),
427  )
428  keysToSkip = set(("id", "centroid_x", "centroid_y", "hasCentroid"))
429  key_map = {fieldName: schema[fieldName].asKey() for fieldName in schema.getOrderedNames()
430  if fieldName not in keysToSkip}
431 
432  def addField(name):
433  if dtype[name].kind == 'U':
434  # dealing with a string like thing. Need to get type and size.
435  at_size = dtype[name].itemsize
436  return schema.addField(name, type=str, size=at_size)
437  else:
438  at_type = dtype[name].type
439  return schema.addField(name, at_type)
440 
441  for col in self.config.extra_col_names:
442  key_map[col] = addField(col)
443  return schema, key_map
444 
445  def _saveMasterSchema(self, filename):
446  """Generate and save the master catalog schema.
447 
448  Parameters
449  ----------
450  filename : `str`
451  An input file to read to get the input dtype.
452  """
453  arr = self.file_reader.run(filename)
454  schema, key_map = self.makeSchemamakeSchema(arr.dtype)
455 
456  catalog = afwTable.SimpleCatalog(schema)
457  addRefCatMetadata(catalog)
458  self._writeMasterSchema_writeMasterSchema(catalog)
459  return schema, key_map
460 
461  @abc.abstractmethod
462  def _getOnePixelFilename(self, start):
463  """Return one example filename to help construct the rest of the
464  per-htm pixel filenames.
465 
466  Parameters
467  ----------
468  start : `int`
469  The first HTM index in this HTM pixelization.
470 
471  Returns
472  -------
473  filename : `str`
474  Path to a single file that would be written to the output location.
475  """
476  pass
477 
478  @abc.abstractmethod
479  def _persistConfig(self):
480  """Write the config that was used to generate the refcat.
481  """
482  pass
483 
484  @abc.abstractmethod
485  def _writeMasterSchema(self, catalog):
486  """Butler put the master catalog schema.
487 
488  Parameters
489  ----------
490  catalog : `lsst.afw.table.SimpleCatalog`
491  An empty catalog with a fully-defined schema that matches the
492  schema used in each of the HTM pixel files.
493  """
494  pass
495 
496 
497 class IngestIndexedReferenceTask(ConvertReferenceCatalogBase, pipeBase.CmdLineTask):
498  """Class for producing and loading indexed reference catalogs (gen2 version).
499 
500  Parameters
501  ----------
502  butler : `lsst.daf.persistence.Butler`
503  Data butler for reading and writing catalogs
504  """
505  RunnerClass = IngestReferenceRunner
506  ConfigClass = IngestIndexedReferenceConfig
507  _DefaultName = 'IngestIndexedReferenceTask'
508 
509  @classmethod
510  def _makeArgumentParser(cls):
511  """Create an argument parser.
512 
513  This returns a standard parser with an extra "files" argument.
514  """
515  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName_DefaultName)
516  parser.add_argument("files", nargs="+", help="Names of files to index")
517  return parser
518 
519  def __init__(self, *args, butler=None, **kwargs):
520  self.butlerbutler = butler
521  super().__init__(*args, **kwargs)
522 
523  def _persistConfig(self):
524  dataId = self.indexerindexer.makeDataId(None, self.config.dataset_config.ref_dataset_name)
525  self.butlerbutler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
526 
527  def _getOnePixelFilename(self, start):
528  dataId = self.indexerindexer.makeDataId(start, self.config.dataset_config.ref_dataset_name)
529  return self.butlerbutler.get('ref_cat_filename', dataId=dataId)[0]
530 
531  def _writeMasterSchema(self, catalog):
532  dataId = self.indexerindexer.makeDataId('master_schema', self.config.dataset_config.ref_dataset_name)
533  self.butlerbutler.put(catalog, 'ref_cat', dataId=dataId)
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: SortedCatalog.h:42
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
HtmPixelization provides HTM indexing of points and regions.
daf::base::PropertySet * set
Definition: fits.cc:912
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174