LSST Applications g0b6bd0c080+a72a5dd7e6,g1182afd7b4+2a019aa3bb,g17e5ecfddb+2b8207f7de,g1d67935e3f+06cf436103,g38293774b4+ac198e9f13,g396055baef+6a2097e274,g3b44f30a73+6611e0205b,g480783c3b1+98f8679e14,g48ccf36440+89c08d0516,g4b93dc025c+98f8679e14,g5c4744a4d9+a302e8c7f0,g613e996a0d+e1c447f2e0,g6c8d09e9e7+25247a063c,g7271f0639c+98f8679e14,g7a9cd813b8+124095ede6,g9d27549199+a302e8c7f0,ga1cf026fa3+ac198e9f13,ga32aa97882+7403ac30ac,ga786bb30fb+7a139211af,gaa63f70f4e+9994eb9896,gabf319e997+ade567573c,gba47b54d5d+94dc90c3ea,gbec6a3398f+06cf436103,gc6308e37c7+07dd123edb,gc655b1545f+ade567573c,gcc9029db3c+ab229f5caf,gd01420fc67+06cf436103,gd877ba84e5+06cf436103,gdb4cecd868+6f279b5b48,ge2d134c3d5+cc4dbb2e3f,ge448b5faa6+86d1ceac1d,gecc7e12556+98f8679e14,gf3ee170dca+25247a063c,gf4ac96e456+ade567573c,gf9f5ea5b4d+ac198e9f13,gff490e6085+8c2580be5c,w.2022.27
LSST Data Management Base Package
ingestIndexReferenceTask.py
Go to the documentation of this file.
1# This file is part of meas_algorithms.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22
23# TODO DM-31698: post-gen2 removal notes
24# `DatasetConfig`, `ConvertReferenceCatalogBase`, and `ConvertReferenceCatalogConfig`
25# should all be moved to to `convertReferenceCatalog.py` once gen2 butler
26# has been removed.
27
28__all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig",
29 "ConvertReferenceCatalogBase", "ConvertReferenceCatalogConfig"]
30
31import abc
32import os.path
33
34import astropy.units
35
36import lsst.pex.config as pexConfig
37import lsst.pipe.base as pipeBase
38import lsst.geom
39import lsst.sphgeom
40import lsst.afw.table as afwTable
41from lsst.daf.base import PropertyList
42from .indexerRegistry import IndexerRegistry
43from .readTextCatalogTask import ReadTextCatalogTask
44from .loadReferenceObjects import ReferenceObjectLoaderBase
45from . import convertRefcatManager
46
47# The most recent Indexed Reference Catalog on-disk format version.
48LATEST_FORMAT_VERSION = 1
49
50
51def addRefCatMetadata(catalog):
52 """Add metadata to a new (not yet populated) reference catalog.
53
54 Parameters
55 ----------
57 Catalog to which metadata should be attached. Will be modified
58 in-place.
59 """
60 md = catalog.getMetadata()
61 if md is None:
62 md = PropertyList()
63 md.set("REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
64 catalog.setMetadata(md)
65
66
67class IngestReferenceRunner(pipeBase.TaskRunner):
68 """Task runner for the reference catalog ingester (gen2 version).
69
70 Data IDs are ignored so the runner should just run the task on the parsed command.
71 """
72
73 def run(self, parsedCmd):
74 """Run the task.
75
76 Several arguments need to be collected to send on to the task methods.
77
78 Parameters
79 ----------
80 parsedCmd : `argparse.Namespace`
81 Parsed command.
82
83 Returns
84 -------
85 results : `lsst.pipe.base.Struct` or `None`
86 A empty struct if self.doReturnResults, else None
87 """
88 files = parsedCmd.files
89 butler = parsedCmd.butler
90 task = self.TaskClass(config=self.config, log=self.log, butler=butler)
91 task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
92
93 task.run(files)
94 if self.doReturnResults:
95 return pipeBase.Struct()
96
97
98class DatasetConfig(pexConfig.Config):
99 """The description of the on-disk storage format for the persisted
100 reference catalog.
101 """
102 format_version = pexConfig.Field(
103 dtype=int,
104 doc="Version number of the persisted on-disk storage format."
105 "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
106 "\nVersion 1 had nJy as flux units.",
107 default=0 # This needs to always be 0, so that unversioned catalogs are interpreted as version 0.
108 )
109 ref_dataset_name = pexConfig.Field(
110 dtype=str,
111 # TODO DM-31817: remove this default value.
112 default='cal_ref_cat',
113 doc="Name of this reference catalog to be used in the butler registry.",
114 )
115 indexer = IndexerRegistry.makeField(
116 default='HTM',
117 doc='Name of indexer algoritm to use. Default is HTM',
118 )
119
120
121class ConvertReferenceCatalogConfig(pexConfig.Config):
122 dataset_config = pexConfig.ConfigField(
123 dtype=DatasetConfig,
124 doc="Configuration for reading the ingested data",
125 )
126 n_processes = pexConfig.Field(
127 dtype=int,
128 doc=("Number of python processes to use when ingesting."),
129 default=1
130 )
131 manager = pexConfig.ConfigurableField(
133 doc="Multiprocessing manager to perform the actual conversion of values, file-by-file."
134 )
135 file_reader = pexConfig.ConfigurableField(
136 target=ReadTextCatalogTask,
137 doc='Task to use to read the files. Default is to expect text files.'
138 )
139 ra_name = pexConfig.Field(
140 dtype=str,
141 doc="Name of RA column (values in decimal degrees)",
142 )
143 dec_name = pexConfig.Field(
144 dtype=str,
145 doc="Name of Dec column (values in decimal degrees)",
146 )
147 ra_err_name = pexConfig.Field(
148 dtype=str,
149 doc="Name of RA error column",
150 optional=True,
151 )
152 dec_err_name = pexConfig.Field(
153 dtype=str,
154 doc="Name of Dec error column",
155 optional=True,
156 )
157 coord_err_unit = pexConfig.Field(
158 dtype=str,
159 doc="Unit of RA/Dec error fields (astropy.unit.Unit compatible)",
160 optional=True
161 )
162 mag_column_list = pexConfig.ListField(
163 dtype=str,
164 doc="The values in the reference catalog are assumed to be in AB magnitudes. "
165 "List of column names to use for photometric information. At least one entry is required."
166 )
167 mag_err_column_map = pexConfig.DictField(
168 keytype=str,
169 itemtype=str,
170 default={},
171 doc="A map of magnitude column name (key) to magnitude error column (value)."
172 )
173 is_photometric_name = pexConfig.Field(
174 dtype=str,
175 optional=True,
176 doc='Name of column stating if satisfactory for photometric calibration (optional).'
177 )
178 is_resolved_name = pexConfig.Field(
179 dtype=str,
180 optional=True,
181 doc='Name of column stating if the object is resolved (optional).'
182 )
183 is_variable_name = pexConfig.Field(
184 dtype=str,
185 optional=True,
186 doc='Name of column stating if the object is measured to be variable (optional).'
187 )
188 id_name = pexConfig.Field(
189 dtype=str,
190 optional=True,
191 doc='Name of column to use as an identifier (optional).'
192 )
193 pm_ra_name = pexConfig.Field(
194 dtype=str,
195 doc="Name of proper motion RA column",
196 optional=True,
197 )
198 pm_dec_name = pexConfig.Field(
199 dtype=str,
200 doc="Name of proper motion Dec column",
201 optional=True,
202 )
203 pm_ra_err_name = pexConfig.Field(
204 dtype=str,
205 doc="Name of proper motion RA error column",
206 optional=True,
207 )
208 pm_dec_err_name = pexConfig.Field(
209 dtype=str,
210 doc="Name of proper motion Dec error column",
211 optional=True,
212 )
213 pm_scale = pexConfig.Field(
214 dtype=float,
215 doc="Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
216 default=1.0,
217 )
218 parallax_name = pexConfig.Field(
219 dtype=str,
220 doc="Name of parallax column",
221 optional=True,
222 )
223 parallax_err_name = pexConfig.Field(
224 dtype=str,
225 doc="Name of parallax error column",
226 optional=True,
227 )
228 parallax_scale = pexConfig.Field(
229 dtype=float,
230 doc="Scale factor by which to multiply parallax values to obtain units of milliarcsec",
231 default=1.0,
232 )
233 epoch_name = pexConfig.Field(
234 dtype=str,
235 doc="Name of epoch column",
236 optional=True,
237 )
238 epoch_format = pexConfig.Field(
239 dtype=str,
240 doc="Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
241 optional=True,
242 )
243 epoch_scale = pexConfig.Field(
244 dtype=str,
245 doc="Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
246 optional=True,
247 )
248 extra_col_names = pexConfig.ListField(
249 dtype=str,
250 default=[],
251 doc='Extra columns to add to the reference catalog.'
252 )
253
254 def setDefaults(self):
255 # Newly ingested reference catalogs always have the latest format_version.
256 self.dataset_configdataset_config.format_version = LATEST_FORMAT_VERSION
257 # gen3 refcats are all depth=7
258 self.dataset_configdataset_config.indexer['HTM'].depth = 7
259
260 def validate(self):
261 pexConfig.Config.validate(self)
262
263 def assertAllOrNone(*names):
264 """Raise ValueError unless all the named fields are set or are
265 all none (or blank)
266 """
267 setNames = [name for name in names if bool(getattr(self, name))]
268 if len(setNames) in (len(names), 0):
269 return
270 prefix = "Both or neither" if len(names) == 2 else "All or none"
271 raise ValueError("{} of {} must be set, but only {} are set".format(
272 prefix, ", ".join(names), ", ".join(setNames)))
273
274 if not (self.ra_namera_name and self.dec_namedec_name and self.mag_column_listmag_column_list):
275 raise ValueError(
276 "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
277 if self.mag_err_column_mapmag_err_column_map and set(self.mag_column_listmag_column_list) != set(self.mag_err_column_mapmag_err_column_map.keys()):
278 raise ValueError(
279 "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".format(
280 sorted(self.mag_err_column_mapmag_err_column_map.keys()), sorted(self.mag_column_listmag_column_list)))
281 assertAllOrNone("ra_err_name", "dec_err_name", "coord_err_unit")
282 if self.coord_err_unitcoord_err_unit is not None:
283 result = astropy.units.Unit(self.coord_err_unitcoord_err_unit, parse_strict='silent')
284 if isinstance(result, astropy.units.UnrecognizedUnit):
285 msg = f"{self.coord_err_unit} is not a valid astropy unit string."
286 raise pexConfig.FieldValidationError(IngestIndexedReferenceConfig.coord_err_unit, self, msg)
287
288 assertAllOrNone("epoch_name", "epoch_format", "epoch_scale")
289 assertAllOrNone("pm_ra_name", "pm_dec_name")
290 assertAllOrNone("pm_ra_err_name", "pm_dec_err_name")
291 assertAllOrNone("parallax_name", "parallax_err_name")
292 if self.pm_ra_err_namepm_ra_err_name and not self.pm_ra_namepm_ra_name:
293 raise ValueError('"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
294 if (self.pm_ra_namepm_ra_name or self.parallax_nameparallax_name) and not self.epoch_nameepoch_name:
295 raise ValueError(
296 '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
297
298
300 """For gen2 backwards compatibility.
301 """
302 pass
303
304
305class ConvertReferenceCatalogBase(pipeBase.Task, abc.ABC):
306 """Base class for producing and loading indexed reference catalogs,
307 shared between gen2 and gen3.
308
309 This implements an indexing scheme based on hierarchical triangular
310 mesh (HTM). The term index really means breaking the catalog into
311 localized chunks called shards. In this case each shard contains
312 the entries from the catalog in a single HTM trixel
313
314 For producing catalogs this task makes the following assumptions
315 about the input catalogs:
316 - RA, Dec are in decimal degrees.
317 - Epoch is available in a column, in a format supported by astropy.time.Time.
318 - There are no off-diagonal covariance terms, such as covariance
319 between RA and Dec, or between PM RA and PM Dec. Support for such
320 covariance would have to be added to to the config, including consideration
321 of the units in the input catalog.
322 """
323 canMultiprocess = False
324 ConfigClass = ConvertReferenceCatalogConfig
325
326 def __init__(self, *args, **kwargs):
327 super().__init__(*args, **kwargs)
328 self.indexerindexer = IndexerRegistry[self.config.dataset_config.indexer.name](
329 self.config.dataset_config.indexer.active)
330 self.makeSubtask('file_reader')
331
332 def run(self, inputFiles):
333 """Index a set of files comprising a reference catalog.
334
335 Outputs are persisted in the butler repository.
336
337 Parameters
338 ----------
339 inputFiles : `list`
340 A list of file paths to read.
341 """
342 self._preRun_preRun()
343 schema, key_map = self._saveMasterSchema_saveMasterSchema(inputFiles[0])
344 # create an HTM we can interrogate about pixel ids
345 htm = lsst.sphgeom.HtmPixelization(self.indexerindexer.htm.get_depth())
346 filenames = self._getButlerFilenames_getButlerFilenames(htm)
347 worker = self.config.manager.target(filenames,
348 self.config,
349 self.file_reader,
350 self.indexerindexer,
351 schema,
352 key_map,
353 htm.universe()[0],
354 addRefCatMetadata,
355 self.log)
356 result = worker.run(inputFiles)
357
358 self._persistConfig_persistConfig()
359 self._postRun_postRun(result)
360
361 def _preRun(self):
362 """Any setup that has to be performed at the start of ``run``, but that
363 cannot be performed during ``__init__`` (e.g. making directories).
364 """
365 pass
366
367 def _postRun(self, result):
368 """Any tasks that have to happen at the end of ``run``.
369
370 Parameters
371 ----------
372 result
373 The result returned from``worker.run()``.
374 """
375 pass
376
377 def _getButlerFilenames(self, htm):
378 """Get filenames from the butler for each output htm pixel.
379
380 Parameters
381 ----------
383 The HTM pixelization scheme to be used to build filenames.
384
385 Returns
386 -------
387 filenames : `list [str]`
388 List of filenames to write each HTM pixel to.
389 """
390 filenames = {}
391 start, end = htm.universe()[0]
392 # path manipulation because butler.get() per pixel will take forever
393 path = self._getOnePixelFilename_getOnePixelFilename(start)
394 base = os.path.join(os.path.dirname(path), "%d"+os.path.splitext(path)[1])
395 for pixelId in range(start, end):
396 filenames[pixelId] = base % pixelId
397
398 return filenames
399
400 def makeSchema(self, dtype):
401 """Make the schema to use in constructing the persisted catalogs.
402
403 Parameters
404 ----------
405 dtype : `numpy.dtype`
406 Data type describing each entry in ``config.extra_col_names``
407 for the catalogs being ingested.
408
409 Returns
410 -------
411 schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
412 A tuple containing two items:
413 - The schema for the output source catalog.
414 - A map of catalog keys to use in filling the record
415 """
416 # make a schema with the standard fields
417 schema = ReferenceObjectLoaderBase.makeMinimalSchema(
418 filterNameList=self.config.mag_column_list,
419 addCentroid=False,
420 addIsPhotometric=bool(self.config.is_photometric_name),
421 addIsResolved=bool(self.config.is_resolved_name),
422 addIsVariable=bool(self.config.is_variable_name),
423 coordErrDim=2 if bool(self.config.ra_err_name) else 0,
424 addProperMotion=2 if bool(self.config.pm_ra_name) else 0,
425 properMotionErrDim=2 if bool(self.config.pm_ra_err_name) else 0,
426 addParallax=bool(self.config.parallax_name),
427 )
428 keysToSkip = set(("id", "centroid_x", "centroid_y", "hasCentroid"))
429 key_map = {fieldName: schema[fieldName].asKey() for fieldName in schema.getOrderedNames()
430 if fieldName not in keysToSkip}
431
432 def addField(name):
433 if dtype[name].kind == 'U':
434 # dealing with a string like thing. Need to get type and size.
435 at_size = dtype[name].itemsize
436 return schema.addField(name, type=str, size=at_size)
437 else:
438 at_type = dtype[name].type
439 return schema.addField(name, at_type)
440
441 for col in self.config.extra_col_names:
442 key_map[col] = addField(col)
443 return schema, key_map
444
445 def _saveMasterSchema(self, filename):
446 """Generate and save the master catalog schema.
447
448 Parameters
449 ----------
450 filename : `str`
451 An input file to read to get the input dtype.
452 """
453 arr = self.file_reader.run(filename)
454 schema, key_map = self.makeSchemamakeSchema(arr.dtype)
455
456 catalog = afwTable.SimpleCatalog(schema)
457 addRefCatMetadata(catalog)
458 self._writeMasterSchema_writeMasterSchema(catalog)
459 return schema, key_map
460
461 @abc.abstractmethod
462 def _getOnePixelFilename(self, start):
463 """Return one example filename to help construct the rest of the
464 per-htm pixel filenames.
465
466 Parameters
467 ----------
468 start : `int`
469 The first HTM index in this HTM pixelization.
470
471 Returns
472 -------
473 filename : `str`
474 Path to a single file that would be written to the output location.
475 """
476 pass
477
478 @abc.abstractmethod
479 def _persistConfig(self):
480 """Write the config that was used to generate the refcat.
481 """
482 pass
483
484 @abc.abstractmethod
485 def _writeMasterSchema(self, catalog):
486 """Butler put the master catalog schema.
487
488 Parameters
489 ----------
491 An empty catalog with a fully-defined schema that matches the
492 schema used in each of the HTM pixel files.
493 """
494 pass
495
496
498 """Class for producing and loading indexed reference catalogs (gen2 version).
499
500 Parameters
501 ----------
503 Data butler for reading and writing catalogs
504 """
505 RunnerClass = IngestReferenceRunner
506 ConfigClass = IngestIndexedReferenceConfig
507 _DefaultName = 'IngestIndexedReferenceTask'
508
509 @classmethod
510 def _makeArgumentParser(cls):
511 """Create an argument parser.
512
513 This returns a standard parser with an extra "files" argument.
514 """
515 parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName_DefaultName)
516 parser.add_argument("files", nargs="+", help="Names of files to index")
517 return parser
518
519 def __init__(self, *args, butler=None, **kwargs):
520 self.butlerbutler = butler
521 super().__init__(*args, **kwargs)
522
523 def _persistConfig(self):
524 dataId = self.indexerindexer.makeDataId(None, self.config.dataset_config.ref_dataset_name)
525 self.butlerbutler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
526
527 def _getOnePixelFilename(self, start):
528 dataId = self.indexerindexer.makeDataId(start, self.config.dataset_config.ref_dataset_name)
529 return self.butlerbutler.get('ref_cat_filename', dataId=dataId)[0]
530
531 def _writeMasterSchema(self, catalog):
532 dataId = self.indexerindexer.makeDataId('master_schema', self.config.dataset_config.ref_dataset_name)
533 self.butlerbutler.put(catalog, 'ref_cat', dataId=dataId)
Defines the fields and offsets for a table.
Definition: Schema.h:51
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: SortedCatalog.h:42
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
HtmPixelization provides HTM indexing of points and regions.
daf::base::PropertySet * set
Definition: fits.cc:912
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174