24 __all__ = [
"IngestIndexedReferenceConfig",
"IngestIndexedReferenceTask",
"DatasetConfig"]
29 import astropy.units
as u
38 from .indexerRegistry
import IndexerRegistry
39 from .readTextCatalogTask
import ReadTextCatalogTask
40 from .loadReferenceObjects
import LoadReferenceObjectsTask
42 _RAD_PER_DEG = math.pi / 180
43 _RAD_PER_MILLIARCSEC = _RAD_PER_DEG/(3600*1000)
46 LATEST_FORMAT_VERSION = 1
50 """Add metadata to a new (not yet populated) reference catalog. 54 catalog : `lsst.afw.table.SimpleCatalog` 55 Catalog to which metadata should be attached. Will be modified 58 md = catalog.getMetadata()
61 md.set(
"REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
62 catalog.setMetadata(md)
66 """Task runner for the reference catalog ingester 68 Data IDs are ignored so the runner should just run the task on the parsed command. 71 def run(self, parsedCmd):
74 Several arguments need to be collected to send on to the task methods. 78 parsedCmd : `argparse.Namespace` 83 results : `lsst.pipe.base.Struct` or `None` 84 A empty struct if self.doReturnResults, else None 86 files = parsedCmd.files
87 butler = parsedCmd.butler
88 task = self.TaskClass(config=self.config, log=self.log, butler=butler)
89 task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
91 task.createIndexedCatalog(files)
92 if self.doReturnResults:
93 return pipeBase.Struct()
97 """The description of the on-disk storage format for the persisted 100 format_version = pexConfig.Field(
102 doc=
"Version number of the persisted on-disk storage format." 103 "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)." 104 "\nVersion 1 had nJy as flux units.",
107 ref_dataset_name = pexConfig.Field(
109 default=
'cal_ref_cat',
110 doc=
'String to pass to the butler to retrieve persisted files.',
112 indexer = IndexerRegistry.makeField(
114 doc=
'Name of indexer algoritm to use. Default is HTM',
119 dataset_config = pexConfig.ConfigField(
121 doc=
"Configuration for reading the ingested data",
123 file_reader = pexConfig.ConfigurableField(
124 target=ReadTextCatalogTask,
125 doc=
'Task to use to read the files. Default is to expect text files.' 127 ra_name = pexConfig.Field(
129 doc=
"Name of RA column",
131 dec_name = pexConfig.Field(
133 doc=
"Name of Dec column",
135 ra_err_name = pexConfig.Field(
137 doc=
"Name of RA error column",
140 dec_err_name = pexConfig.Field(
142 doc=
"Name of Dec error column",
145 mag_column_list = pexConfig.ListField(
147 doc=
"The values in the reference catalog are assumed to be in AB magnitudes. " 148 "List of column names to use for photometric information. At least one entry is required." 150 mag_err_column_map = pexConfig.DictField(
154 doc=
"A map of magnitude column name (key) to magnitude error column (value)." 156 is_photometric_name = pexConfig.Field(
159 doc=
'Name of column stating if satisfactory for photometric calibration (optional).' 161 is_resolved_name = pexConfig.Field(
164 doc=
'Name of column stating if the object is resolved (optional).' 166 is_variable_name = pexConfig.Field(
169 doc=
'Name of column stating if the object is measured to be variable (optional).' 171 id_name = pexConfig.Field(
174 doc=
'Name of column to use as an identifier (optional).' 176 pm_ra_name = pexConfig.Field(
178 doc=
"Name of proper motion RA column",
181 pm_dec_name = pexConfig.Field(
183 doc=
"Name of proper motion Dec column",
186 pm_ra_err_name = pexConfig.Field(
188 doc=
"Name of proper motion RA error column",
191 pm_dec_err_name = pexConfig.Field(
193 doc=
"Name of proper motion Dec error column",
196 pm_scale = pexConfig.Field(
198 doc=
"Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
201 parallax_name = pexConfig.Field(
203 doc=
"Name of parallax column",
206 parallax_err_name = pexConfig.Field(
208 doc=
"Name of parallax error column",
211 parallax_scale = pexConfig.Field(
213 doc=
"Scale factor by which to multiply parallax values to obtain units of milliarcsec",
216 epoch_name = pexConfig.Field(
218 doc=
"Name of epoch column",
221 epoch_format = pexConfig.Field(
223 doc=
"Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
226 epoch_scale = pexConfig.Field(
228 doc=
"Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
231 extra_col_names = pexConfig.ListField(
234 doc=
'Extra columns to add to the reference catalog.' 242 pexConfig.Config.validate(self)
244 def assertAllOrNone(*names):
245 """Raise ValueError unless all the named fields are set or are 248 setNames = [name
for name
in names
if bool(getattr(self, name))]
249 if len(setNames)
in (len(names), 0):
251 prefix =
"Both or neither" if len(names) == 2
else "All or none" 252 raise ValueError(
"{} of {} must be set, but only {} are set".
format(
253 prefix,
", ".join(names),
", ".join(setNames)))
257 "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
260 "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".
format(
262 assertAllOrNone(
"ra_err_name",
"dec_err_name")
263 assertAllOrNone(
"epoch_name",
"epoch_format",
"epoch_scale")
264 assertAllOrNone(
"pm_ra_name",
"pm_dec_name")
265 assertAllOrNone(
"pm_ra_err_name",
"pm_dec_err_name")
267 raise ValueError(
'"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
270 '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
274 """Class for producing and loading indexed reference catalogs. 276 This implements an indexing scheme based on hierarchical triangular 277 mesh (HTM). The term index really means breaking the catalog into 278 localized chunks called shards. In this case each shard contains 279 the entries from the catalog in a single HTM trixel 281 For producing catalogs this task makes the following assumptions 282 about the input catalogs: 283 - RA, Dec, RA error and Dec error are all in decimal degrees. 284 - Epoch is available in a column, in a format supported by astropy.time.Time. 285 - There are no off-diagonal covariance terms, such as covariance 286 between RA and Dec, or between PM RA and PM Dec. Gaia is a well 287 known example of a catalog that has such terms, and thus should not 288 be ingested with this task. 292 butler : `lsst.daf.persistence.Butler` 293 Data butler for reading and writing catalogs 295 canMultiprocess =
False 296 ConfigClass = IngestIndexedReferenceConfig
297 RunnerClass = IngestReferenceRunner
298 _DefaultName =
'IngestIndexedReferenceTask' 300 _flags = [
'photometric',
'resolved',
'variable']
303 def _makeArgumentParser(cls):
304 """Create an argument parser. 306 This returns a standard parser with an extra "files" argument. 308 parser = pipeBase.InputOnlyArgumentParser(name=cls.
_DefaultName)
309 parser.add_argument(
"files", nargs=
"+", help=
"Names of files to index")
314 pipeBase.Task.__init__(self, *args, **kwargs)
315 self.
indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
316 self.config.dataset_config.indexer.active)
317 self.makeSubtask(
'file_reader')
320 """Index a set of files comprising a reference catalog. 322 Outputs are persisted in the data repository. 327 A list of file paths to read. 331 for filename
in files:
332 arr = self.file_reader.
run(filename)
333 index_list = self.
indexer.indexPoints(arr[self.config.ra_name], arr[self.config.dec_name])
337 dataId = self.
indexer.makeDataId(
'master_schema',
338 self.config.dataset_config.ref_dataset_name)
342 pixel_ids =
set(index_list)
343 for pixel_id
in pixel_ids:
344 dataId = self.
indexer.makeDataId(pixel_id, self.config.dataset_config.ref_dataset_name)
346 els = np.where(index_list == pixel_id)
348 record = catalog.addNew()
349 rec_num = self.
_fillRecord(record, row, rec_num, key_map)
350 self.
butler.put(catalog,
'ref_cat', dataId=dataId)
351 dataId = self.
indexer.makeDataId(
None, self.config.dataset_config.ref_dataset_name)
352 self.
butler.put(self.config.dataset_config,
'ref_cat_config', dataId=dataId)
356 """Create an ICRS coord. from a row of a catalog being ingested. 360 row : structured `numpy.array` 361 Row from catalog being ingested. 363 Name of RA key in catalog being ingested. 365 Name of Dec key in catalog being ingested. 369 coord : `lsst.geom.SpherePoint` 374 def _setCoordErr(self, record, row, key_map):
375 """Set coordinate error in a record of an indexed catalog. 377 The errors are read from the specified columns, and installed 378 in the appropriate columns of the output. 382 record : `lsst.afw.table.SimpleRecord` 383 Row from indexed catalog to modify. 384 row : structured `numpy.array` 385 Row from catalog being ingested. 386 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 389 if self.config.ra_err_name:
390 record.set(key_map[
"coord_raErr"], row[self.config.ra_err_name]*_RAD_PER_DEG)
391 record.set(key_map[
"coord_decErr"], row[self.config.dec_err_name]*_RAD_PER_DEG)
393 def _setFlags(self, record, row, key_map):
394 """Set flags in an output record 398 record : `lsst.afw.table.SimpleRecord` 399 Row from indexed catalog to modify. 400 row : structured `numpy.array` 401 Row from catalog being ingested. 402 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 405 names = record.schema.getNames()
408 attr_name =
'is_{}_name'.
format(flag)
409 record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
411 def _setFlux(self, record, row, key_map):
412 """Set flux fields in a record of an indexed catalog. 416 record : `lsst.afw.table.SimpleRecord` 417 Row from indexed catalog to modify. 418 row : structured `numpy.array` 419 Row from catalog being ingested. 420 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 423 for item
in self.config.mag_column_list:
424 record.set(key_map[item+
'_flux'], (row[item]*u.ABmag).to_value(u.nJy))
425 if len(self.config.mag_err_column_map) > 0:
426 for err_key
in self.config.mag_err_column_map.keys():
427 error_col_name = self.config.mag_err_column_map[err_key]
430 if fluxErr
is not None:
432 record.set(key_map[err_key+
'_fluxErr'], fluxErr)
434 def _setProperMotion(self, record, row, key_map):
435 """Set proper motion fields in a record of an indexed catalog. 437 The proper motions are read from the specified columns, 438 scaled appropriately, and installed in the appropriate 439 columns of the output. 443 record : `lsst.afw.table.SimpleRecord` 444 Row from indexed catalog to modify. 445 row : structured `numpy.array` 446 Row from catalog being ingested. 447 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 450 if self.config.pm_ra_name
is None:
452 radPerOriginal = _RAD_PER_MILLIARCSEC*self.config.pm_scale
453 record.set(key_map[
"pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
454 record.set(key_map[
"pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
455 record.set(key_map[
"epoch"], self.
_epochToMjdTai(row[self.config.epoch_name]))
456 if self.config.pm_ra_err_name
is not None:
457 record.set(key_map[
"pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
458 record.set(key_map[
"pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
460 def _epochToMjdTai(self, nativeEpoch):
461 """Convert an epoch in native format to TAI MJD (a float). 463 return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
464 scale=self.config.epoch_scale).tai.mjd
466 def _setExtra(self, record, row, key_map):
467 """Set extra data fields in a record of an indexed catalog. 471 record : `lsst.afw.table.SimpleRecord` 472 Row from indexed catalog to modify. 473 row : structured `numpy.array` 474 Row from catalog being ingested. 475 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 478 for extra_col
in self.config.extra_col_names:
479 value = row[extra_col]
487 if isinstance(value, np.str_):
489 record.set(key_map[extra_col], value)
491 def _fillRecord(self, record, row, rec_num, key_map):
492 """Fill a record in an indexed catalog to be persisted. 496 record : `lsst.afw.table.SimpleRecord` 497 Row from indexed catalog to modify. 498 row : structured `numpy.array` 499 Row from catalog being ingested. 501 Starting integer to increment for the unique id 502 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 505 record.setCoord(self.
computeCoord(row, self.config.ra_name, self.config.dec_name))
506 if self.config.id_name:
507 record.setId(row[self.config.id_name])
510 record.setId(rec_num)
520 """Get a catalog from the butler or create it if it doesn't exist. 525 Identifier for catalog to retrieve 526 schema : `lsst.afw.table.Schema` 527 Schema to use in catalog creation if the butler can't get it 531 catalog : `lsst.afw.table.SimpleCatalog` 532 The catalog specified by `dataId` 534 if self.
butler.datasetExists(
'ref_cat', dataId=dataId):
535 return self.
butler.get(
'ref_cat', dataId=dataId)
541 """Make the schema to use in constructing the persisted catalogs. 545 dtype : `numpy.dtype` 546 Data type describing each entry in ``config.extra_col_names`` 547 for the catalogs being ingested. 551 schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`) 552 A tuple containing two items: 553 - The schema for the output source catalog. 554 - A map of catalog keys to use in filling the record 559 schema = LoadReferenceObjectsTask.makeMinimalSchema(
560 filterNameList=self.config.mag_column_list,
562 addIsPhotometric=bool(self.config.is_photometric_name),
563 addIsResolved=bool(self.config.is_resolved_name),
564 addIsVariable=bool(self.config.is_variable_name),
565 coordErrDim=2
if bool(self.config.ra_err_name)
else 0,
566 addProperMotion=2
if bool(self.config.pm_ra_name)
else 0,
567 properMotionErrDim=2
if bool(self.config.pm_ra_err_name)
else 0,
568 addParallax=bool(self.config.parallax_name),
569 addParallaxErr=bool(self.config.parallax_err_name),
571 keysToSkip =
set((
"id",
"centroid_x",
"centroid_y",
"hasCentroid"))
572 key_map = {fieldName: schema[fieldName].asKey()
for fieldName
in schema.getOrderedNames()
573 if fieldName
not in keysToSkip}
576 if dtype[name].kind ==
'U': 578 at_size = dtype[name].itemsize
579 return schema.addField(name, type=str, size=at_size)
581 at_type = dtype[name].type
582 return schema.addField(name, at_type)
584 for col
in self.config.extra_col_names:
585 key_map[col] = addField(col)
586 return schema, key_map
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Class for storing ordered metadata with comments.
def makeSchema(self, dtype)
def addRefCatMetadata(catalog)
def getCatalog(self, dataId, schema)
def _setFlags(self, record, row, key_map)
daf::base::PropertySet * set
def _epochToMjdTai(self, nativeEpoch)
def _setCoordErr(self, record, row, key_map)
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
def _fillRecord(self, record, row, rec_num, key_map)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
def _setFlux(self, record, row, key_map)
def createIndexedCatalog(self, files)
Point in an unspecified spherical coordinate system.
def _setExtra(self, record, row, key_map)
def computeCoord(row, ra_name, dec_name)
def _setProperMotion(self, record, row, key_map)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def __init__(self, args, kwargs)