24 __all__ = [
"IngestIndexedReferenceConfig", 
"IngestIndexedReferenceTask", 
"DatasetConfig",
 
   25            "IngestGaiaReferenceTask"]
 
   31 import lsst.pex.config 
as pexConfig
 
   37 from .indexerRegistry 
import IndexerRegistry
 
   38 from .readTextCatalogTask 
import ReadTextCatalogTask
 
   39 from .loadReferenceObjects 
import LoadReferenceObjectsTask
 
   40 from . 
import ingestIndexManager
 
   43 LATEST_FORMAT_VERSION = 1
 
   47     """Add metadata to a new (not yet populated) reference catalog. 
   51     catalog : `lsst.afw.table.SimpleCatalog` 
   52         Catalog to which metadata should be attached.  Will be modified 
   55     md = catalog.getMetadata()
 
   58     md.set(
"REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
 
   59     catalog.setMetadata(md)
 
   63     """Task runner for the reference catalog ingester 
   65     Data IDs are ignored so the runner should just run the task on the parsed command. 
   68     def run(self, parsedCmd):
 
   71         Several arguments need to be collected to send on to the task methods. 
   75         parsedCmd : `argparse.Namespace` 
   80         results : `lsst.pipe.base.Struct` or `None` 
   81             A empty struct if self.doReturnResults, else None 
   83         files = parsedCmd.files
 
   84         butler = parsedCmd.butler
 
   85         task = self.TaskClass(config=self.config, log=self.log, butler=butler)
 
   86         task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
 
   88         task.createIndexedCatalog(files)
 
   89         if self.doReturnResults:
 
   90             return pipeBase.Struct()
 
   94     """The description of the on-disk storage format for the persisted 
   97     format_version = pexConfig.Field(
 
   99         doc=
"Version number of the persisted on-disk storage format." 
  100         "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)." 
  101         "\nVersion 1 had nJy as flux units.",
 
  104     ref_dataset_name = pexConfig.Field(
 
  106         default=
'cal_ref_cat',
 
  107         doc=
'String to pass to the butler to retrieve persisted files.',
 
  109     indexer = IndexerRegistry.makeField(
 
  111         doc=
'Name of indexer algoritm to use.  Default is HTM',
 
  116     dataset_config = pexConfig.ConfigField(
 
  118         doc=
"Configuration for reading the ingested data",
 
  120     n_processes = pexConfig.Field(
 
  122         doc=(
"Number of python processes to use when ingesting."),
 
  125     file_reader = pexConfig.ConfigurableField(
 
  126         target=ReadTextCatalogTask,
 
  127         doc=
'Task to use to read the files.  Default is to expect text files.' 
  129     ra_name = pexConfig.Field(
 
  131         doc=
"Name of RA column (values in decimal degrees)",
 
  133     dec_name = pexConfig.Field(
 
  135         doc=
"Name of Dec column (values in decimal degrees)",
 
  137     ra_err_name = pexConfig.Field(
 
  139         doc=
"Name of RA error column",
 
  142     dec_err_name = pexConfig.Field(
 
  144         doc=
"Name of Dec error column",
 
  147     coord_err_unit = pexConfig.Field(
 
  149         doc=
"Unit of RA/Dec error fields (astropy.unit.Unit compatible)",
 
  152     mag_column_list = pexConfig.ListField(
 
  154         doc=
"The values in the reference catalog are assumed to be in AB magnitudes. " 
  155             "List of column names to use for photometric information.  At least one entry is required." 
  157     mag_err_column_map = pexConfig.DictField(
 
  161         doc=
"A map of magnitude column name (key) to magnitude error column (value)." 
  163     is_photometric_name = pexConfig.Field(
 
  166         doc=
'Name of column stating if satisfactory for photometric calibration (optional).' 
  168     is_resolved_name = pexConfig.Field(
 
  171         doc=
'Name of column stating if the object is resolved (optional).' 
  173     is_variable_name = pexConfig.Field(
 
  176         doc=
'Name of column stating if the object is measured to be variable (optional).' 
  178     id_name = pexConfig.Field(
 
  181         doc=
'Name of column to use as an identifier (optional).' 
  183     pm_ra_name = pexConfig.Field(
 
  185         doc=
"Name of proper motion RA column",
 
  188     pm_dec_name = pexConfig.Field(
 
  190         doc=
"Name of proper motion Dec column",
 
  193     pm_ra_err_name = pexConfig.Field(
 
  195         doc=
"Name of proper motion RA error column",
 
  198     pm_dec_err_name = pexConfig.Field(
 
  200         doc=
"Name of proper motion Dec error column",
 
  203     pm_scale = pexConfig.Field(
 
  205         doc=
"Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
 
  208     parallax_name = pexConfig.Field(
 
  210         doc=
"Name of parallax column",
 
  213     parallax_err_name = pexConfig.Field(
 
  215         doc=
"Name of parallax error column",
 
  218     parallax_scale = pexConfig.Field(
 
  220         doc=
"Scale factor by which to multiply parallax values to obtain units of milliarcsec",
 
  223     epoch_name = pexConfig.Field(
 
  225         doc=
"Name of epoch column",
 
  228     epoch_format = pexConfig.Field(
 
  230         doc=
"Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
 
  233     epoch_scale = pexConfig.Field(
 
  235         doc=
"Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
 
  238     extra_col_names = pexConfig.ListField(
 
  241         doc=
'Extra columns to add to the reference catalog.' 
  249         pexConfig.Config.validate(self)
 
  251         def assertAllOrNone(*names):
 
  252             """Raise ValueError unless all the named fields are set or are 
  255             setNames = [name 
for name 
in names 
if bool(getattr(self, name))]
 
  256             if len(setNames) 
in (len(names), 0):
 
  258             prefix = 
"Both or neither" if len(names) == 2 
else "All or none" 
  259             raise ValueError(
"{} of {} must be set, but only {} are set".
format(
 
  260                 prefix, 
", ".join(names), 
", ".join(setNames)))
 
  264                 "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
 
  267                 "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".
format(
 
  269         assertAllOrNone(
"ra_err_name", 
"dec_err_name", 
"coord_err_unit")
 
  271             result = astropy.units.Unit(self.
coord_err_unit, parse_strict=
'silent')
 
  272             if isinstance(result, astropy.units.UnrecognizedUnit):
 
  273                 msg = f
"{self.coord_err_unit} is not a valid astropy unit string." 
  274                 raise pexConfig.FieldValidationError(IngestIndexedReferenceConfig.coord_err_unit, self, msg)
 
  276         assertAllOrNone(
"epoch_name", 
"epoch_format", 
"epoch_scale")
 
  277         assertAllOrNone(
"pm_ra_name", 
"pm_dec_name")
 
  278         assertAllOrNone(
"pm_ra_err_name", 
"pm_dec_err_name")
 
  279         assertAllOrNone(
"parallax_name", 
"parallax_err_name")
 
  281             raise ValueError(
'"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
 
  284                 '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
 
  288     """Class for producing and loading indexed reference catalogs. 
  290     This implements an indexing scheme based on hierarchical triangular 
  291     mesh (HTM). The term index really means breaking the catalog into 
  292     localized chunks called shards.  In this case each shard contains 
  293     the entries from the catalog in a single HTM trixel 
  295     For producing catalogs this task makes the following assumptions 
  296     about the input catalogs: 
  297     - RA, Dec are in decimal degrees. 
  298     - Epoch is available in a column, in a format supported by astropy.time.Time. 
  299     - There are no off-diagonal covariance terms, such as covariance 
  300       between RA and Dec, or between PM RA and PM Dec. Support for such 
  301      covariance would have to be added to to the config, including consideration 
  302      of the units in the input catalog. 
  306     butler : `lsst.daf.persistence.Butler` 
  307         Data butler for reading and writing catalogs 
  309     canMultiprocess = 
False 
  310     ConfigClass = IngestIndexedReferenceConfig
 
  311     RunnerClass = IngestReferenceRunner
 
  312     _DefaultName = 
'IngestIndexedReferenceTask' 
  315     def _makeArgumentParser(cls):
 
  316         """Create an argument parser. 
  318         This returns a standard parser with an extra "files" argument. 
  320         parser = pipeBase.InputOnlyArgumentParser(name=cls.
_DefaultName)
 
  321         parser.add_argument(
"files", nargs=
"+", help=
"Names of files to index")
 
  327         self.
indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
 
  328             self.config.dataset_config.indexer.active)
 
  329         self.makeSubtask(
'file_reader')
 
  333         """Index a set of files comprising a reference catalog. 
  335         Outputs are persisted in the butler repository. 
  340             A list of file paths to read. 
  355         worker.run(inputFiles)
 
  358         dataId = self.
indexer.makeDataId(
None, self.config.dataset_config.ref_dataset_name)
 
  359         self.
butler.put(self.config.dataset_config, 
'ref_cat_config', dataId=dataId)
 
  361     def _saveMasterSchema(self, filename):
 
  362         """Generate and save the master catalog schema. 
  367             An input file to read to get the input dtype. 
  369         arr = self.file_reader.
run(filename)
 
  371         dataId = self.
indexer.makeDataId(
'master_schema',
 
  372                                          self.config.dataset_config.ref_dataset_name)
 
  376         self.
butler.put(catalog, 
'ref_cat', dataId=dataId)
 
  377         return schema, key_map
 
  379     def _getButlerFilenames(self, htm):
 
  380         """Get filenames from the butler for each output pixel.""" 
  382         start, end = htm.universe()[0]
 
  384         dataId = self.
indexer.makeDataId(start, self.config.dataset_config.ref_dataset_name)
 
  385         path = self.
butler.get(
'ref_cat_filename', dataId=dataId)[0]
 
  386         base = os.path.join(os.path.dirname(path), 
"%d"+os.path.splitext(path)[1])
 
  387         for pixelId 
in range(start, end):
 
  388             filenames[pixelId] = base % pixelId
 
  393         """Make the schema to use in constructing the persisted catalogs. 
  397         dtype : `numpy.dtype` 
  398             Data type describing each entry in ``config.extra_col_names`` 
  399             for the catalogs being ingested. 
  403         schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`) 
  404             A tuple containing two items: 
  405             - The schema for the output source catalog. 
  406             - A map of catalog keys to use in filling the record 
  409         schema = LoadReferenceObjectsTask.makeMinimalSchema(
 
  410             filterNameList=self.config.mag_column_list,
 
  412             addIsPhotometric=bool(self.config.is_photometric_name),
 
  413             addIsResolved=bool(self.config.is_resolved_name),
 
  414             addIsVariable=bool(self.config.is_variable_name),
 
  415             coordErrDim=2 
if bool(self.config.ra_err_name) 
else 0,
 
  416             addProperMotion=2 
if bool(self.config.pm_ra_name) 
else 0,
 
  417             properMotionErrDim=2 
if bool(self.config.pm_ra_err_name) 
else 0,
 
  418             addParallax=bool(self.config.parallax_name),
 
  420         keysToSkip = 
set((
"id", 
"centroid_x", 
"centroid_y", 
"hasCentroid"))
 
  421         key_map = {fieldName: schema[fieldName].asKey() 
for fieldName 
in schema.getOrderedNames()
 
  422                    if fieldName 
not in keysToSkip}
 
  425             if dtype[name].kind == 
'U':
 
  427                 at_size = dtype[name].itemsize
 
  428                 return schema.addField(name, type=str, size=at_size)
 
  430                 at_type = dtype[name].type
 
  431                 return schema.addField(name, at_type)
 
  433         for col 
in self.config.extra_col_names:
 
  434             key_map[col] = addField(col)
 
  435         return schema, key_map
 
  439     """A special-cased version of the refcat ingester for Gaia DR2.