24 __all__ = [
"IngestIndexedReferenceConfig",
"IngestIndexedReferenceTask",
"DatasetConfig"]
36 from .indexerRegistry
import IndexerRegistry
37 from .readTextCatalogTask
import ReadTextCatalogTask
38 from .loadReferenceObjects
import LoadReferenceObjectsTask
40 _RAD_PER_DEG = math.pi / 180
41 _RAD_PER_MILLIARCSEC = _RAD_PER_DEG/(3600*1000)
45 """Task runner for the reference catalog ingester 47 Data IDs are ignored so the runner should just run the task on the parsed command. 50 def run(self, parsedCmd):
53 Several arguments need to be collected to send on to the task methods. 57 parsedCmd : `argparse.Namespace` 62 results : `lsst.pipe.base.Struct` or `None` 63 A empty struct if self.doReturnResults, else None 65 files = parsedCmd.files
66 butler = parsedCmd.butler
67 task = self.TaskClass(config=self.config, log=self.log, butler=butler)
68 task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
70 task.createIndexedCatalog(files)
71 if self.doReturnResults:
72 return pipeBase.Struct()
76 ref_dataset_name = pexConfig.Field(
78 default=
'cal_ref_cat',
79 doc=
'String to pass to the butler to retrieve persisted files.',
81 indexer = IndexerRegistry.makeField(
83 doc=
'Name of indexer algoritm to use. Default is HTM',
88 dataset_config = pexConfig.ConfigField(
90 doc=
"Configuration for reading the ingested data",
92 file_reader = pexConfig.ConfigurableField(
93 target=ReadTextCatalogTask,
94 doc=
'Task to use to read the files. Default is to expect text files.' 96 ra_name = pexConfig.Field(
98 doc=
"Name of RA column",
100 dec_name = pexConfig.Field(
102 doc=
"Name of Dec column",
104 ra_err_name = pexConfig.Field(
106 doc=
"Name of RA error column",
109 dec_err_name = pexConfig.Field(
111 doc=
"Name of Dec error column",
114 mag_column_list = pexConfig.ListField(
116 doc=
"The values in the reference catalog are assumed to be in AB magnitudes. " 117 "List of column names to use for photometric information. At least one entry is required." 119 mag_err_column_map = pexConfig.DictField(
123 doc=
"A map of magnitude column name (key) to magnitude error column (value)." 125 is_photometric_name = pexConfig.Field(
128 doc=
'Name of column stating if satisfactory for photometric calibration (optional).' 130 is_resolved_name = pexConfig.Field(
133 doc=
'Name of column stating if the object is resolved (optional).' 135 is_variable_name = pexConfig.Field(
138 doc=
'Name of column stating if the object is measured to be variable (optional).' 140 id_name = pexConfig.Field(
143 doc=
'Name of column to use as an identifier (optional).' 145 pm_ra_name = pexConfig.Field(
147 doc=
"Name of proper motion RA column",
150 pm_dec_name = pexConfig.Field(
152 doc=
"Name of proper motion Dec column",
155 pm_ra_err_name = pexConfig.Field(
157 doc=
"Name of proper motion RA error column",
160 pm_dec_err_name = pexConfig.Field(
162 doc=
"Name of proper motion Dec error column",
165 pm_scale = pexConfig.Field(
167 doc=
"Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
170 parallax_name = pexConfig.Field(
172 doc=
"Name of parallax column",
175 parallax_err_name = pexConfig.Field(
177 doc=
"Name of parallax error column",
180 parallax_scale = pexConfig.Field(
182 doc=
"Scale factor by which to multiply parallax values to obtain units of milliarcsec",
185 epoch_name = pexConfig.Field(
187 doc=
"Name of epoch column",
190 epoch_format = pexConfig.Field(
192 doc=
"Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
195 epoch_scale = pexConfig.Field(
197 doc=
"Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
200 extra_col_names = pexConfig.ListField(
203 doc=
'Extra columns to add to the reference catalog.' 207 pexConfig.Config.validate(self)
209 def assertAllOrNone(*names):
210 """Raise ValueError unless all the named fields are set or are 213 setNames = [name
for name
in names
if bool(getattr(self, name))]
214 if len(setNames)
in (len(names), 0):
216 prefix =
"Both or neither" if len(names) == 2
else "All or none" 217 raise ValueError(
"{} of {} must be set, but only {} are set".
format(
218 prefix,
", ".join(names),
", ".join(setNames)))
222 "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
225 "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".
format(
227 assertAllOrNone(
"ra_err_name",
"dec_err_name")
228 assertAllOrNone(
"epoch_name",
"epoch_format",
"epoch_scale")
229 assertAllOrNone(
"pm_ra_name",
"pm_dec_name")
230 assertAllOrNone(
"pm_ra_err_name",
"pm_dec_err_name")
232 raise ValueError(
'"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
235 '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
239 """Class for producing and loading indexed reference catalogs. 241 This implements an indexing scheme based on hierarchical triangular 242 mesh (HTM). The term index really means breaking the catalog into 243 localized chunks called shards. In this case each shard contains 244 the entries from the catalog in a single HTM trixel 246 For producing catalogs this task makes the following assumptions 247 about the input catalogs: 248 - RA, Dec, RA error and Dec error are all in decimal degrees. 249 - Epoch is available in a column, in a format supported by astropy.time.Time. 250 - There are no off-diagonal covariance terms, such as covariance 251 between RA and Dec, or between PM RA and PM Dec. Gaia is a well 252 known example of a catalog that has such terms, and thus should not 253 be ingested with this task. 257 butler : `lsst.daf.persistence.Butler` 258 Data butler for reading and writing catalogs 260 canMultiprocess =
False 261 ConfigClass = IngestIndexedReferenceConfig
262 RunnerClass = IngestReferenceRunner
263 _DefaultName =
'IngestIndexedReferenceTask' 265 _flags = [
'photometric',
'resolved',
'variable']
268 def _makeArgumentParser(cls):
269 """Create an argument parser. 271 This returns a standard parser with an extra "files" argument. 273 parser = pipeBase.InputOnlyArgumentParser(name=cls.
_DefaultName)
274 parser.add_argument(
"files", nargs=
"+", help=
"Names of files to index")
279 pipeBase.Task.__init__(self, *args, **kwargs)
280 self.
indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
281 self.config.dataset_config.indexer.active)
282 self.makeSubtask(
'file_reader')
285 """Index a set of files comprising a reference catalog. 287 Outputs are persisted in the data repository. 292 A list of file paths to read. 296 for filename
in files:
297 arr = self.file_reader.
run(filename)
298 index_list = self.
indexer.indexPoints(arr[self.config.ra_name], arr[self.config.dec_name])
302 dataId = self.
indexer.makeDataId(
'master_schema',
303 self.config.dataset_config.ref_dataset_name)
307 pixel_ids =
set(index_list)
308 for pixel_id
in pixel_ids:
309 dataId = self.
indexer.makeDataId(pixel_id, self.config.dataset_config.ref_dataset_name)
311 els = np.where(index_list == pixel_id)
313 record = catalog.addNew()
314 rec_num = self.
_fillRecord(record, row, rec_num, key_map)
315 self.
butler.put(catalog,
'ref_cat', dataId=dataId)
316 dataId = self.
indexer.makeDataId(
None, self.config.dataset_config.ref_dataset_name)
317 self.
butler.put(self.config.dataset_config,
'ref_cat_config', dataId=dataId)
321 """Create an ICRS coord. from a row of a catalog being ingested. 325 row : structured `numpy.array` 326 Row from catalog being ingested. 328 Name of RA key in catalog being ingested. 330 Name of Dec key in catalog being ingested. 334 coord : `lsst.geom.SpherePoint` 339 def _setCoordErr(self, record, row, key_map):
340 """Set coordinate error in a record of an indexed catalog. 342 The errors are read from the specified columns, and installed 343 in the appropriate columns of the output. 347 record : `lsst.afw.table.SimpleRecord` 348 Row from indexed catalog to modify. 349 row : structured `numpy.array` 350 Row from catalog being ingested. 351 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 354 if self.config.ra_err_name:
355 record.set(key_map[
"coord_raErr"], row[self.config.ra_err_name]*_RAD_PER_DEG)
356 record.set(key_map[
"coord_decErr"], row[self.config.dec_err_name]*_RAD_PER_DEG)
358 def _setFlags(self, record, row, key_map):
359 """Set flags in an output record 363 record : `lsst.afw.table.SimpleRecord` 364 Row from indexed catalog to modify. 365 row : structured `numpy.array` 366 Row from catalog being ingested. 367 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 370 names = record.schema.getNames()
373 attr_name =
'is_{}_name'.
format(flag)
374 record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
376 def _setFlux(self, record, row, key_map):
377 """Set flux fields in a record of an indexed catalog. 381 record : `lsst.afw.table.SimpleRecord` 382 Row from indexed catalog to modify. 383 row : structured `numpy.array` 384 Row from catalog being ingested. 385 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 388 for item
in self.config.mag_column_list:
390 if len(self.config.mag_err_column_map) > 0:
391 for err_key
in self.config.mag_err_column_map.keys():
392 error_col_name = self.config.mag_err_column_map[err_key]
393 record.set(key_map[err_key+
'_fluxErr'],
396 def _setProperMotion(self, record, row, key_map):
397 """Set proper motion fields in a record of an indexed catalog. 399 The proper motions are read from the specified columns, 400 scaled appropriately, and installed in the appropriate 401 columns of the output. 405 record : `lsst.afw.table.SimpleRecord` 406 Row from indexed catalog to modify. 407 row : structured `numpy.array` 408 Row from catalog being ingested. 409 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 412 if self.config.pm_ra_name
is None:
414 radPerOriginal = _RAD_PER_MILLIARCSEC*self.config.pm_scale
415 record.set(key_map[
"pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
416 record.set(key_map[
"pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
417 record.set(key_map[
"epoch"], self.
_epochToMjdTai(row[self.config.epoch_name]))
418 if self.config.pm_ra_err_name
is not None:
419 record.set(key_map[
"pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
420 record.set(key_map[
"pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
422 def _epochToMjdTai(self, nativeEpoch):
423 """Convert an epoch in native format to TAI MJD (a float). 425 return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
426 scale=self.config.epoch_scale).tai.mjd
428 def _setExtra(self, record, row, key_map):
429 """Set extra data fields in a record of an indexed catalog. 433 record : `lsst.afw.table.SimpleRecord` 434 Row from indexed catalog to modify. 435 row : structured `numpy.array` 436 Row from catalog being ingested. 437 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 440 for extra_col
in self.config.extra_col_names:
441 value = row[extra_col]
449 if isinstance(value, np.str_):
451 record.set(key_map[extra_col], value)
453 def _fillRecord(self, record, row, rec_num, key_map):
454 """Fill a record in an indexed catalog to be persisted. 458 record : `lsst.afw.table.SimpleRecord` 459 Row from indexed catalog to modify. 460 row : structured `numpy.array` 461 Row from catalog being ingested. 463 Starting integer to increment for the unique id 464 key_map : `dict` mapping `str` to `lsst.afw.table.Key` 467 record.setCoord(self.
computeCoord(row, self.config.ra_name, self.config.dec_name))
468 if self.config.id_name:
469 record.setId(row[self.config.id_name])
472 record.setId(rec_num)
482 """Get a catalog from the butler or create it if it doesn't exist. 487 Identifier for catalog to retrieve 488 schema : `lsst.afw.table.Schema` 489 Schema to use in catalog creation if the butler can't get it 493 catalog : `lsst.afw.table.SimpleCatalog` 494 The catalog specified by `dataId` 496 if self.
butler.datasetExists(
'ref_cat', dataId=dataId):
497 return self.
butler.get(
'ref_cat', dataId=dataId)
501 """Make the schema to use in constructing the persisted catalogs. 505 dtype : `numpy.dtype` 506 Data type describing each entry in ``config.extra_col_names`` 507 for the catalogs being ingested. 511 schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`) 512 A tuple containing two items: 513 - The schema for the output source catalog. 514 - A map of catalog keys to use in filling the record 519 schema = LoadReferenceObjectsTask.makeMinimalSchema(
520 filterNameList=self.config.mag_column_list,
522 addIsPhotometric=bool(self.config.is_photometric_name),
523 addIsResolved=bool(self.config.is_resolved_name),
524 addIsVariable=bool(self.config.is_variable_name),
525 coordErrDim=2
if bool(self.config.ra_err_name)
else 0,
526 addProperMotion=2
if bool(self.config.pm_ra_name)
else 0,
527 properMotionErrDim=2
if bool(self.config.pm_ra_err_name)
else 0,
528 addParallax=bool(self.config.parallax_name),
529 addParallaxErr=bool(self.config.parallax_err_name),
531 keysToSkip =
set((
"id",
"centroid_x",
"centroid_y",
"hasCentroid"))
532 key_map = {fieldName: schema[fieldName].asKey()
for fieldName
in schema.getOrderedNames()
533 if fieldName
not in keysToSkip}
536 if dtype[name].kind ==
'U': 538 at_size = dtype[name].itemsize
539 return schema.addField(name, type=str, size=at_size)
541 at_type = dtype[name].type
542 return schema.addField(name, at_type)
544 for col
in self.config.extra_col_names:
545 key_map[col] = addField(col)
546 return schema, key_map
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
def makeSchema(self, dtype)
def getCatalog(self, dataId, schema)
def _setFlags(self, record, row, key_map)
daf::base::PropertySet * set
def _epochToMjdTai(self, nativeEpoch)
def _setCoordErr(self, record, row, key_map)
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
def _fillRecord(self, record, row, rec_num, key_map)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
double fluxFromABMag(double mag) noexcept
Compute flux in Janskys from AB magnitude.
def _setFlux(self, record, row, key_map)
def createIndexedCatalog(self, files)
Point in an unspecified spherical coordinate system.
def _setExtra(self, record, row, key_map)
def computeCoord(row, ra_name, dec_name)
def _setProperMotion(self, record, row, key_map)
def __init__(self, args, kwargs)