LSST Applications  21.0.0+04719a4bac,21.0.0-1-ga51b5d4+f5e6047307,21.0.0-11-g2b59f77+a9c1acf22d,21.0.0-11-ga42c5b2+86977b0b17,21.0.0-12-gf4ce030+76814010d2,21.0.0-13-g1721dae+760e7a6536,21.0.0-13-g3a573fe+768d78a30a,21.0.0-15-g5a7caf0+f21cbc5713,21.0.0-16-g0fb55c1+b60e2d390c,21.0.0-19-g4cded4ca+71a93a33c0,21.0.0-2-g103fe59+bb20972958,21.0.0-2-g45278ab+04719a4bac,21.0.0-2-g5242d73+3ad5d60fb1,21.0.0-2-g7f82c8f+8babb168e8,21.0.0-2-g8f08a60+06509c8b61,21.0.0-2-g8faa9b5+616205b9df,21.0.0-2-ga326454+8babb168e8,21.0.0-2-gde069b7+5e4aea9c2f,21.0.0-2-gecfae73+1d3a86e577,21.0.0-2-gfc62afb+3ad5d60fb1,21.0.0-25-g1d57be3cd+e73869a214,21.0.0-3-g357aad2+ed88757d29,21.0.0-3-g4a4ce7f+3ad5d60fb1,21.0.0-3-g4be5c26+3ad5d60fb1,21.0.0-3-g65f322c+e0b24896a3,21.0.0-3-g7d9da8d+616205b9df,21.0.0-3-ge02ed75+a9c1acf22d,21.0.0-4-g591bb35+a9c1acf22d,21.0.0-4-g65b4814+b60e2d390c,21.0.0-4-gccdca77+0de219a2bc,21.0.0-4-ge8a399c+6c55c39e83,21.0.0-5-gd00fb1e+05fce91b99,21.0.0-6-gc675373+3ad5d60fb1,21.0.0-64-g1122c245+4fb2b8f86e,21.0.0-7-g04766d7+cd19d05db2,21.0.0-7-gdf92d54+04719a4bac,21.0.0-8-g5674e7b+d1bd76f71f,master-gac4afde19b+a9c1acf22d,w.2021.13
LSST Data Management Base Package
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 #
4 # Copyright 2008-2017 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig",
25  "IngestGaiaReferenceTask"]
26 
27 import os.path
28 
29 import astropy.units
30 
31 import lsst.pex.config as pexConfig
32 import lsst.pipe.base as pipeBase
33 import lsst.geom
34 import lsst.sphgeom
35 import lsst.afw.table as afwTable
36 from lsst.daf.base import PropertyList
37 from .indexerRegistry import IndexerRegistry
38 from .readTextCatalogTask import ReadTextCatalogTask
39 from .loadReferenceObjects import LoadReferenceObjectsTask
40 from . import ingestIndexManager
41 
42 # The most recent Indexed Reference Catalog on-disk format version.
43 LATEST_FORMAT_VERSION = 1
44 
45 
46 def addRefCatMetadata(catalog):
47  """Add metadata to a new (not yet populated) reference catalog.
48 
49  Parameters
50  ----------
51  catalog : `lsst.afw.table.SimpleCatalog`
52  Catalog to which metadata should be attached. Will be modified
53  in-place.
54  """
55  md = catalog.getMetadata()
56  if md is None:
57  md = PropertyList()
58  md.set("REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
59  catalog.setMetadata(md)
60 
61 
62 class IngestReferenceRunner(pipeBase.TaskRunner):
63  """Task runner for the reference catalog ingester
64 
65  Data IDs are ignored so the runner should just run the task on the parsed command.
66  """
67 
68  def run(self, parsedCmd):
69  """Run the task.
70 
71  Several arguments need to be collected to send on to the task methods.
72 
73  Parameters
74  ----------
75  parsedCmd : `argparse.Namespace`
76  Parsed command.
77 
78  Returns
79  -------
80  results : `lsst.pipe.base.Struct` or `None`
81  A empty struct if self.doReturnResults, else None
82  """
83  files = parsedCmd.files
84  butler = parsedCmd.butler
85  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
86  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
87 
88  task.createIndexedCatalog(files)
89  if self.doReturnResults:
90  return pipeBase.Struct()
91 
92 
93 class DatasetConfig(pexConfig.Config):
94  """The description of the on-disk storage format for the persisted
95  reference catalog.
96  """
97  format_version = pexConfig.Field(
98  dtype=int,
99  doc="Version number of the persisted on-disk storage format."
100  "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
101  "\nVersion 1 had nJy as flux units.",
102  default=0 # This needs to always be 0, so that unversioned catalogs are interpreted as version 0.
103  )
104  ref_dataset_name = pexConfig.Field(
105  dtype=str,
106  default='cal_ref_cat',
107  doc='String to pass to the butler to retrieve persisted files.',
108  )
109  indexer = IndexerRegistry.makeField(
110  default='HTM',
111  doc='Name of indexer algoritm to use. Default is HTM',
112  )
113 
114 
115 class IngestIndexedReferenceConfig(pexConfig.Config):
116  dataset_config = pexConfig.ConfigField(
117  dtype=DatasetConfig,
118  doc="Configuration for reading the ingested data",
119  )
120  n_processes = pexConfig.Field(
121  dtype=int,
122  doc=("Number of python processes to use when ingesting."),
123  default=1
124  )
125  file_reader = pexConfig.ConfigurableField(
126  target=ReadTextCatalogTask,
127  doc='Task to use to read the files. Default is to expect text files.'
128  )
129  ra_name = pexConfig.Field(
130  dtype=str,
131  doc="Name of RA column (values in decimal degrees)",
132  )
133  dec_name = pexConfig.Field(
134  dtype=str,
135  doc="Name of Dec column (values in decimal degrees)",
136  )
137  ra_err_name = pexConfig.Field(
138  dtype=str,
139  doc="Name of RA error column",
140  optional=True,
141  )
142  dec_err_name = pexConfig.Field(
143  dtype=str,
144  doc="Name of Dec error column",
145  optional=True,
146  )
147  coord_err_unit = pexConfig.Field(
148  dtype=str,
149  doc="Unit of RA/Dec error fields (astropy.unit.Unit compatible)",
150  optional=True
151  )
152  mag_column_list = pexConfig.ListField(
153  dtype=str,
154  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
155  "List of column names to use for photometric information. At least one entry is required."
156  )
157  mag_err_column_map = pexConfig.DictField(
158  keytype=str,
159  itemtype=str,
160  default={},
161  doc="A map of magnitude column name (key) to magnitude error column (value)."
162  )
163  is_photometric_name = pexConfig.Field(
164  dtype=str,
165  optional=True,
166  doc='Name of column stating if satisfactory for photometric calibration (optional).'
167  )
168  is_resolved_name = pexConfig.Field(
169  dtype=str,
170  optional=True,
171  doc='Name of column stating if the object is resolved (optional).'
172  )
173  is_variable_name = pexConfig.Field(
174  dtype=str,
175  optional=True,
176  doc='Name of column stating if the object is measured to be variable (optional).'
177  )
178  id_name = pexConfig.Field(
179  dtype=str,
180  optional=True,
181  doc='Name of column to use as an identifier (optional).'
182  )
183  pm_ra_name = pexConfig.Field(
184  dtype=str,
185  doc="Name of proper motion RA column",
186  optional=True,
187  )
188  pm_dec_name = pexConfig.Field(
189  dtype=str,
190  doc="Name of proper motion Dec column",
191  optional=True,
192  )
193  pm_ra_err_name = pexConfig.Field(
194  dtype=str,
195  doc="Name of proper motion RA error column",
196  optional=True,
197  )
198  pm_dec_err_name = pexConfig.Field(
199  dtype=str,
200  doc="Name of proper motion Dec error column",
201  optional=True,
202  )
203  pm_scale = pexConfig.Field(
204  dtype=float,
205  doc="Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
206  default=1.0,
207  )
208  parallax_name = pexConfig.Field(
209  dtype=str,
210  doc="Name of parallax column",
211  optional=True,
212  )
213  parallax_err_name = pexConfig.Field(
214  dtype=str,
215  doc="Name of parallax error column",
216  optional=True,
217  )
218  parallax_scale = pexConfig.Field(
219  dtype=float,
220  doc="Scale factor by which to multiply parallax values to obtain units of milliarcsec",
221  default=1.0,
222  )
223  epoch_name = pexConfig.Field(
224  dtype=str,
225  doc="Name of epoch column",
226  optional=True,
227  )
228  epoch_format = pexConfig.Field(
229  dtype=str,
230  doc="Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
231  optional=True,
232  )
233  epoch_scale = pexConfig.Field(
234  dtype=str,
235  doc="Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
236  optional=True,
237  )
238  extra_col_names = pexConfig.ListField(
239  dtype=str,
240  default=[],
241  doc='Extra columns to add to the reference catalog.'
242  )
243 
244  def setDefaults(self):
245  # Newly ingested reference catalogs always have the latest format_version.
246  self.dataset_configdataset_config.format_version = LATEST_FORMAT_VERSION
247 
248  def validate(self):
249  pexConfig.Config.validate(self)
250 
251  def assertAllOrNone(*names):
252  """Raise ValueError unless all the named fields are set or are
253  all none (or blank)
254  """
255  setNames = [name for name in names if bool(getattr(self, name))]
256  if len(setNames) in (len(names), 0):
257  return
258  prefix = "Both or neither" if len(names) == 2 else "All or none"
259  raise ValueError("{} of {} must be set, but only {} are set".format(
260  prefix, ", ".join(names), ", ".join(setNames)))
261 
262  if not (self.ra_namera_name and self.dec_namedec_name and self.mag_column_listmag_column_list):
263  raise ValueError(
264  "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
265  if self.mag_err_column_mapmag_err_column_map and set(self.mag_column_listmag_column_list) != set(self.mag_err_column_mapmag_err_column_map.keys()):
266  raise ValueError(
267  "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".format(
268  sorted(self.mag_err_column_mapmag_err_column_map.keys()), sorted(self.mag_column_listmag_column_list)))
269  assertAllOrNone("ra_err_name", "dec_err_name", "coord_err_unit")
270  if self.coord_err_unitcoord_err_unit is not None:
271  result = astropy.units.Unit(self.coord_err_unitcoord_err_unit, parse_strict='silent')
272  if isinstance(result, astropy.units.UnrecognizedUnit):
273  msg = f"{self.coord_err_unit} is not a valid astropy unit string."
274  raise pexConfig.FieldValidationError(IngestIndexedReferenceConfig.coord_err_unit, self, msg)
275 
276  assertAllOrNone("epoch_name", "epoch_format", "epoch_scale")
277  assertAllOrNone("pm_ra_name", "pm_dec_name")
278  assertAllOrNone("pm_ra_err_name", "pm_dec_err_name")
279  assertAllOrNone("parallax_name", "parallax_err_name")
280  if self.pm_ra_err_namepm_ra_err_name and not self.pm_ra_namepm_ra_name:
281  raise ValueError('"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
282  if (self.pm_ra_namepm_ra_name or self.parallax_nameparallax_name) and not self.epoch_nameepoch_name:
283  raise ValueError(
284  '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
285 
286 
287 class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
288  """Class for producing and loading indexed reference catalogs.
289 
290  This implements an indexing scheme based on hierarchical triangular
291  mesh (HTM). The term index really means breaking the catalog into
292  localized chunks called shards. In this case each shard contains
293  the entries from the catalog in a single HTM trixel
294 
295  For producing catalogs this task makes the following assumptions
296  about the input catalogs:
297  - RA, Dec are in decimal degrees.
298  - Epoch is available in a column, in a format supported by astropy.time.Time.
299  - There are no off-diagonal covariance terms, such as covariance
300  between RA and Dec, or between PM RA and PM Dec. Support for such
301  covariance would have to be added to to the config, including consideration
302  of the units in the input catalog.
303 
304  Parameters
305  ----------
306  butler : `lsst.daf.persistence.Butler`
307  Data butler for reading and writing catalogs
308  """
309  canMultiprocess = False
310  ConfigClass = IngestIndexedReferenceConfig
311  RunnerClass = IngestReferenceRunner
312  _DefaultName = 'IngestIndexedReferenceTask'
313 
314  @classmethod
315  def _makeArgumentParser(cls):
316  """Create an argument parser.
317 
318  This returns a standard parser with an extra "files" argument.
319  """
320  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName_DefaultName)
321  parser.add_argument("files", nargs="+", help="Names of files to index")
322  return parser
323 
324  def __init__(self, *args, butler=None, **kwargs):
325  self.butlerbutler = butler
326  super().__init__(*args, **kwargs)
327  self.indexerindexer = IndexerRegistry[self.config.dataset_config.indexer.name](
328  self.config.dataset_config.indexer.active)
329  self.makeSubtask('file_reader')
331 
332  def createIndexedCatalog(self, inputFiles):
333  """Index a set of files comprising a reference catalog.
334 
335  Outputs are persisted in the butler repository.
336 
337  Parameters
338  ----------
339  inputFiles : `list`
340  A list of file paths to read.
341  """
342  schema, key_map = self._saveMasterSchema_saveMasterSchema(inputFiles[0])
343  # create an HTM we can interrogate about pixel ids
344  htm = lsst.sphgeom.HtmPixelization(self.indexerindexer.htm.get_depth())
345  filenames = self._getButlerFilenames_getButlerFilenames(htm)
346  worker = self.IngestManagerIngestManager(filenames,
347  self.config,
348  self.file_reader,
349  self.indexerindexer,
350  schema,
351  key_map,
352  htm.universe()[0],
353  addRefCatMetadata,
354  self.log)
355  worker.run(inputFiles)
356 
357  # write the config that was used to generate the refcat
358  dataId = self.indexerindexer.makeDataId(None, self.config.dataset_config.ref_dataset_name)
359  self.butlerbutler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
360 
361  def _saveMasterSchema(self, filename):
362  """Generate and save the master catalog schema.
363 
364  Parameters
365  ----------
366  filename : `str`
367  An input file to read to get the input dtype.
368  """
369  arr = self.file_reader.run(filename)
370  schema, key_map = self.makeSchemamakeSchema(arr.dtype)
371  dataId = self.indexerindexer.makeDataId('master_schema',
372  self.config.dataset_config.ref_dataset_name)
373 
374  catalog = afwTable.SimpleCatalog(schema)
375  addRefCatMetadata(catalog)
376  self.butlerbutler.put(catalog, 'ref_cat', dataId=dataId)
377  return schema, key_map
378 
379  def _getButlerFilenames(self, htm):
380  """Get filenames from the butler for each output pixel."""
381  filenames = {}
382  start, end = htm.universe()[0]
383  # path manipulation because butler.get() per pixel will take forever
384  dataId = self.indexerindexer.makeDataId(start, self.config.dataset_config.ref_dataset_name)
385  path = self.butlerbutler.get('ref_cat_filename', dataId=dataId)[0]
386  base = os.path.join(os.path.dirname(path), "%d"+os.path.splitext(path)[1])
387  for pixelId in range(start, end):
388  filenames[pixelId] = base % pixelId
389 
390  return filenames
391 
392  def makeSchema(self, dtype):
393  """Make the schema to use in constructing the persisted catalogs.
394 
395  Parameters
396  ----------
397  dtype : `numpy.dtype`
398  Data type describing each entry in ``config.extra_col_names``
399  for the catalogs being ingested.
400 
401  Returns
402  -------
403  schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
404  A tuple containing two items:
405  - The schema for the output source catalog.
406  - A map of catalog keys to use in filling the record
407  """
408  # make a schema with the standard fields
409  schema = LoadReferenceObjectsTask.makeMinimalSchema(
410  filterNameList=self.config.mag_column_list,
411  addCentroid=False,
412  addIsPhotometric=bool(self.config.is_photometric_name),
413  addIsResolved=bool(self.config.is_resolved_name),
414  addIsVariable=bool(self.config.is_variable_name),
415  coordErrDim=2 if bool(self.config.ra_err_name) else 0,
416  addProperMotion=2 if bool(self.config.pm_ra_name) else 0,
417  properMotionErrDim=2 if bool(self.config.pm_ra_err_name) else 0,
418  addParallax=bool(self.config.parallax_name),
419  )
420  keysToSkip = set(("id", "centroid_x", "centroid_y", "hasCentroid"))
421  key_map = {fieldName: schema[fieldName].asKey() for fieldName in schema.getOrderedNames()
422  if fieldName not in keysToSkip}
423 
424  def addField(name):
425  if dtype[name].kind == 'U':
426  # dealing with a string like thing. Need to get type and size.
427  at_size = dtype[name].itemsize
428  return schema.addField(name, type=str, size=at_size)
429  else:
430  at_type = dtype[name].type
431  return schema.addField(name, at_type)
432 
433  for col in self.config.extra_col_names:
434  key_map[col] = addField(col)
435  return schema, key_map
436 
437 
439  """A special-cased version of the refcat ingester for Gaia DR2.
440  """
441  def __init__(self, *args, **kwargs):
442  super().__init__(*args, **kwargs)
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: SortedCatalog.h:42
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
HtmPixelization provides HTM indexing of points and regions.
daf::base::PropertySet * set
Definition: fits.cc:912
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)