28__all__ = [
"IngestIndexedReferenceConfig",
"IngestIndexedReferenceTask",
"DatasetConfig",
29 "ConvertReferenceCatalogBase",
"ConvertReferenceCatalogConfig"]
42from .indexerRegistry
import IndexerRegistry
43from .readTextCatalogTask
import ReadTextCatalogTask
44from .loadReferenceObjects
import ReferenceObjectLoaderBase
45from .
import convertRefcatManager
48LATEST_FORMAT_VERSION = 1
52 """Add metadata to a new (not yet populated) reference catalog.
57 Catalog to which metadata should be attached. Will be modified
60 md = catalog.getMetadata()
63 md.set(
"REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
64 catalog.setMetadata(md)
68 """Task runner for the reference catalog ingester (gen2 version).
70 Data IDs are ignored so the runner should just run the task on the parsed command.
73 def run(self, parsedCmd):
76 Several arguments need to be collected to send on to the task methods.
80 parsedCmd : `argparse.Namespace`
85 results : `lsst.pipe.base.Struct` or `
None`
86 A empty struct
if self.doReturnResults,
else None
88 files = parsedCmd.files
89 butler = parsedCmd.butler
90 task = self.TaskClass(config=self.config, log=self.log, butler=butler)
91 task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
94 if self.doReturnResults:
95 return pipeBase.Struct()
99 """The description of the on-disk storage format for the persisted
102 format_version = pexConfig.Field(
104 doc="Version number of the persisted on-disk storage format."
105 "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
106 "\nVersion 1 had nJy as flux units.",
109 ref_dataset_name = pexConfig.Field(
112 default=
'cal_ref_cat',
113 doc=
"Name of this reference catalog to be used in the butler registry.",
115 indexer = IndexerRegistry.makeField(
117 doc=
'Name of indexer algoritm to use. Default is HTM',
122 dataset_config = pexConfig.ConfigField(
124 doc=
"Configuration for reading the ingested data",
126 n_processes = pexConfig.Field(
128 doc=(
"Number of python processes to use when ingesting."),
131 manager = pexConfig.ConfigurableField(
133 doc=
"Multiprocessing manager to perform the actual conversion of values, file-by-file."
135 file_reader = pexConfig.ConfigurableField(
136 target=ReadTextCatalogTask,
137 doc=
'Task to use to read the files. Default is to expect text files.'
139 ra_name = pexConfig.Field(
141 doc=
"Name of RA column (values in decimal degrees)",
143 dec_name = pexConfig.Field(
145 doc=
"Name of Dec column (values in decimal degrees)",
147 ra_err_name = pexConfig.Field(
149 doc=
"Name of RA error column",
152 dec_err_name = pexConfig.Field(
154 doc=
"Name of Dec error column",
157 coord_err_unit = pexConfig.Field(
159 doc=
"Unit of RA/Dec error fields (astropy.unit.Unit compatible)",
162 mag_column_list = pexConfig.ListField(
164 doc=
"The values in the reference catalog are assumed to be in AB magnitudes. "
165 "List of column names to use for photometric information. At least one entry is required."
167 mag_err_column_map = pexConfig.DictField(
171 doc=
"A map of magnitude column name (key) to magnitude error column (value)."
173 is_photometric_name = pexConfig.Field(
176 doc=
'Name of column stating if satisfactory for photometric calibration (optional).'
178 is_resolved_name = pexConfig.Field(
181 doc=
'Name of column stating if the object is resolved (optional).'
183 is_variable_name = pexConfig.Field(
186 doc=
'Name of column stating if the object is measured to be variable (optional).'
188 id_name = pexConfig.Field(
191 doc=
'Name of column to use as an identifier (optional).'
193 pm_ra_name = pexConfig.Field(
195 doc=
"Name of proper motion RA column",
198 pm_dec_name = pexConfig.Field(
200 doc=
"Name of proper motion Dec column",
203 pm_ra_err_name = pexConfig.Field(
205 doc=
"Name of proper motion RA error column",
208 pm_dec_err_name = pexConfig.Field(
210 doc=
"Name of proper motion Dec error column",
213 pm_scale = pexConfig.Field(
215 doc=
"Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
218 parallax_name = pexConfig.Field(
220 doc=
"Name of parallax column",
223 parallax_err_name = pexConfig.Field(
225 doc=
"Name of parallax error column",
228 parallax_scale = pexConfig.Field(
230 doc=
"Scale factor by which to multiply parallax values to obtain units of milliarcsec",
233 epoch_name = pexConfig.Field(
235 doc=
"Name of epoch column",
238 epoch_format = pexConfig.Field(
240 doc=
"Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
243 epoch_scale = pexConfig.Field(
245 doc=
"Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
248 extra_col_names = pexConfig.ListField(
251 doc=
'Extra columns to add to the reference catalog.'
256 self.
dataset_configdataset_config.format_version = LATEST_FORMAT_VERSION
261 pexConfig.Config.validate(self)
263 def assertAllOrNone(*names):
264 """Raise ValueError unless all the named fields are set or are
267 setNames = [name for name
in names
if bool(getattr(self, name))]
268 if len(setNames)
in (len(names), 0):
270 prefix =
"Both or neither" if len(names) == 2
else "All or none"
271 raise ValueError(
"{} of {} must be set, but only {} are set".
format(
272 prefix,
", ".join(names),
", ".join(setNames)))
276 "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
279 "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".
format(
281 assertAllOrNone(
"ra_err_name",
"dec_err_name",
"coord_err_unit")
283 result = astropy.units.Unit(self.
coord_err_unitcoord_err_unit, parse_strict=
'silent')
284 if isinstance(result, astropy.units.UnrecognizedUnit):
285 msg = f
"{self.coord_err_unit} is not a valid astropy unit string."
286 raise pexConfig.FieldValidationError(IngestIndexedReferenceConfig.coord_err_unit, self, msg)
288 assertAllOrNone(
"epoch_name",
"epoch_format",
"epoch_scale")
289 assertAllOrNone(
"pm_ra_name",
"pm_dec_name")
290 assertAllOrNone(
"pm_ra_err_name",
"pm_dec_err_name")
291 assertAllOrNone(
"parallax_name",
"parallax_err_name")
293 raise ValueError(
'"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
296 '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
300 """For gen2 backwards compatibility.
306 """Base class for producing and loading indexed reference catalogs,
307 shared between gen2 and gen3.
309 This implements an indexing scheme based on hierarchical triangular
310 mesh (HTM). The term index really means breaking the catalog into
311 localized chunks called shards. In this case each shard contains
312 the entries
from the catalog
in a single HTM trixel
314 For producing catalogs this task makes the following assumptions
315 about the input catalogs:
316 - RA, Dec are
in decimal degrees.
317 - Epoch
is available
in a column,
in a format supported by astropy.time.Time.
318 - There are no off-diagonal covariance terms, such
as covariance
319 between RA
and Dec,
or between PM RA
and PM Dec. Support
for such
320 covariance would have to be added to to the config, including consideration
321 of the units
in the input catalog.
323 canMultiprocess = False
324 ConfigClass = ConvertReferenceCatalogConfig
328 self.
indexerindexer = IndexerRegistry[self.config.dataset_config.indexer.name](
329 self.config.dataset_config.indexer.active)
330 self.makeSubtask(
'file_reader')
332 def run(self, inputFiles):
333 """Index a set of files comprising a reference catalog.
335 Outputs are persisted in the butler repository.
340 A list of file paths to read.
347 worker = self.config.manager.target(filenames,
356 result = worker.run(inputFiles)
362 """Any setup that has to be performed at the start of ``run``, but that
363 cannot be performed during ``__init__`` (e.g. making directories).
367 def _postRun(self, result):
368 """Any tasks that have to happen at the end of ``run``.
373 The result returned from``worker.run()``.
377 def _getButlerFilenames(self, htm):
378 """Get filenames from the butler for each output htm pixel.
383 The HTM pixelization scheme to be used to build filenames.
387 filenames : `list [str]`
388 List of filenames to write each HTM pixel to.
391 start, end = htm.universe()[0]
394 base = os.path.join(os.path.dirname(path),
"%d"+os.path.splitext(path)[1])
395 for pixelId
in range(start, end):
396 filenames[pixelId] = base % pixelId
401 """Make the schema to use in constructing the persisted catalogs.
405 dtype : `numpy.dtype`
406 Data type describing each entry in ``config.extra_col_names``
407 for the catalogs being ingested.
412 A tuple containing two items:
413 - The schema
for the output source catalog.
414 - A map of catalog keys to use
in filling the record
417 schema = ReferenceObjectLoaderBase.makeMinimalSchema(
418 filterNameList=self.config.mag_column_list,
420 addIsPhotometric=bool(self.config.is_photometric_name),
421 addIsResolved=bool(self.config.is_resolved_name),
422 addIsVariable=bool(self.config.is_variable_name),
423 coordErrDim=2
if bool(self.config.ra_err_name)
else 0,
424 addProperMotion=2
if bool(self.config.pm_ra_name)
else 0,
425 properMotionErrDim=2
if bool(self.config.pm_ra_err_name)
else 0,
426 addParallax=bool(self.config.parallax_name),
428 keysToSkip =
set((
"id",
"centroid_x",
"centroid_y",
"hasCentroid"))
429 key_map = {fieldName: schema[fieldName].asKey()
for fieldName
in schema.getOrderedNames()
430 if fieldName
not in keysToSkip}
433 if dtype[name].kind ==
'U':
435 at_size = dtype[name].itemsize
436 return schema.addField(name, type=str, size=at_size)
438 at_type = dtype[name].type
439 return schema.addField(name, at_type)
441 for col
in self.config.extra_col_names:
442 key_map[col] = addField(col)
443 return schema, key_map
445 def _saveMasterSchema(self, filename):
446 """Generate and save the master catalog schema.
451 An input file to read to get the input dtype.
453 arr = self.file_reader.run(filename)
454 schema, key_map = self.makeSchemamakeSchema(arr.dtype)
459 return schema, key_map
462 def _getOnePixelFilename(self, start):
463 """Return one example filename to help construct the rest of the
464 per-htm pixel filenames.
469 The first HTM index in this HTM pixelization.
474 Path to a single file that would be written to the output location.
479 def _persistConfig(self):
480 """Write the config that was used to generate the refcat.
485 def _writeMasterSchema(self, catalog):
486 """Butler put the master catalog schema.
491 An empty catalog with a fully-defined schema that matches the
492 schema used
in each of the HTM pixel files.
498 """Class for producing and loading indexed reference catalogs (gen2 version).
503 Data butler for reading
and writing catalogs
505 RunnerClass = IngestReferenceRunner
506 ConfigClass = IngestIndexedReferenceConfig
507 _DefaultName = 'IngestIndexedReferenceTask'
510 def _makeArgumentParser(cls):
511 """Create an argument parser.
513 This returns a standard parser with an extra
"files" argument.
515 parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName_DefaultName)
516 parser.add_argument("files", nargs=
"+", help=
"Names of files to index")
523 def _persistConfig(self):
524 dataId = self.
indexerindexer.makeDataId(
None, self.config.dataset_config.ref_dataset_name)
525 self.
butlerbutler.put(self.config.dataset_config,
'ref_cat_config', dataId=dataId)
527 def _getOnePixelFilename(self, start):
528 dataId = self.
indexerindexer.makeDataId(start, self.config.dataset_config.ref_dataset_name)
529 return self.
butlerbutler.get(
'ref_cat_filename', dataId=dataId)[0]
531 def _writeMasterSchema(self, catalog):
532 dataId = self.
indexerindexer.makeDataId(
'master_schema', self.config.dataset_config.ref_dataset_name)
533 self.
butlerbutler.put(catalog,
'ref_cat', dataId=dataId)
Defines the fields and offsets for a table.
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Class for storing ordered metadata with comments.
def run(self, inputFiles)
def _postRun(self, result)
def makeSchema(self, dtype)
def _saveMasterSchema(self, filename)
def _getButlerFilenames(self, htm)
def _getOnePixelFilename(self, start)
def _writeMasterSchema(self, catalog)
def __init__(self, *args, **kwargs)
def __init__(self, *args, butler=None, **kwargs)
HtmPixelization provides HTM indexing of points and regions.
daf::base::PropertySet * set
def addRefCatMetadata(catalog)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)