22 __all__ = [
"IngestIndexManager", 
"IngestGaiaManager"]
 
   26 import multiprocessing
 
   29 import astropy.units 
as u
 
   46     Ingest a reference catalog from external files into a butler repository, 
   47     using a multiprocessing Pool to speed up the work. 
   51     filenames : `dict` [`int`, `str`] 
   52         The HTM pixel id and filenames to ingest the catalog into. 
   53     config : `lsst.meas.algorithms.IngestIndexedReferenceConfig` 
   54         The Task configuration holding the field names. 
   55     file_reader : `lsst.pipe.base.Task` 
   56         The file reader to use to load the files. 
   57     indexer : `lsst.meas.algorithms.HtmIndexer` 
   58         The class used to compute the HTM pixel per coordinate. 
   59     schema : `lsst.afw.table.Schema` 
   60         The schema of the output catalog. 
   61     key_map : `dict` [`str`, `lsst.afw.table.Key`] 
   62         The mapping from output field names to keys in the Schema. 
   63     htmRange : `tuple` [`int`] 
   64         The start and end HTM pixel ids. 
   65     addRefCatMetadata : callable 
   66         A function called to add extra metadata to each output Catalog. 
   68         The log to send messages to. 
   70     _flags = [
'photometric', 
'resolved', 
'variable']
 
   72     def __init__(self, filenames, config, file_reader, indexer,
 
   73                  schema, key_map, htmRange, addRefCatMetadata, log):
 
   86     def run(self, inputFiles):
 
   87         """Index a set of input files from a reference catalog, and write the 
   88         output to the appropriate filenames, in parallel. 
   93             A list of file paths to read data from. 
   95         global COUNTER, FILE_PROGRESS
 
   98         with multiprocessing.Manager() 
as manager:
 
   99             COUNTER = multiprocessing.Value(
'i', 0)
 
  100             FILE_PROGRESS = multiprocessing.Value(
'i', 0)
 
  101             fileLocks = manager.dict()
 
  104                 fileLocks[i] = manager.Lock()
 
  105             self.
log.
info(
"File locks created.")
 
  106             with multiprocessing.Pool(self.
config.n_processes) 
as pool:
 
  107                 pool.starmap(self.
_ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
 
  109     def _ingestOneFile(self, filename, fileLocks):
 
  110         """Read and process one file, and write its records to the correct 
  111         indexed files, while handling exceptions in a useful way so that they 
  112         don't get swallowed by the multiprocess pool. 
  118         fileLocks : `dict` [`int`, `multiprocessing.Lock`] 
  119             A Lock for each HTM pixel; each pixel gets one file written, and 
  120             we need to block when one process is accessing that file. 
  126         matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
 
  127                                                  inputData[self.
config.dec_name])
 
  128         pixel_ids = 
set(matchedPixels)
 
  129         for pixelId 
in pixel_ids:
 
  130             with fileLocks[pixelId]:
 
  131                 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
 
  132         with FILE_PROGRESS.get_lock():
 
  133             oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles 
  134             FILE_PROGRESS.value += 1
 
  135             percent = 100 * FILE_PROGRESS.value / self.
nInputFiles 
  137             if np.floor(percent) - np.floor(oldPercent) >= 1:
 
  138                 self.
log.
info(
"Completed %d / %d files: %d %% complete ",
 
  143     def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
 
  144         """Process one HTM pixel, appending to an existing catalog or creating 
  145         a new catalog, as needed. 
  149         inputData : `numpy.ndarray` 
  150             The data from one input file. 
  151         matchedPixels : `numpy.ndarray` 
  152             The row-matched pixel indexes corresponding to ``inputData``. 
  154             The pixel index we are currently processing. 
  155         fluxes : `dict` [`str`, `numpy.ndarray`] 
  156             The values that will go into the flux and fluxErr fields in the 
  158         coordErr : `dict` [`str`, `numpy.ndarray`] 
  159             The values that will go into the coord_raErr, coord_decErr, and 
  160             coord_ra_dec_Cov fields in the output catalog (in radians). 
  162         idx = np.where(matchedPixels == pixelId)[0]
 
  164         for outputRow, inputRow 
in zip(catalog[-len(idx):], inputData[idx]):
 
  168         with COUNTER.get_lock():
 
  169             self.
_setIds(inputData[idx], catalog)
 
  172         for name, array 
in fluxes.items():
 
  173             catalog[self.
key_map[name]][-len(idx):] = array[idx]
 
  176         for name, array 
in coordErr.items():
 
  177             catalog[name][-len(idx):] = array[idx]
 
  179         catalog.writeFits(self.
filenames[pixelId])
 
  181     def _setIds(self, inputData, catalog):
 
  182         """Fill the `id` field of catalog with a running index, filling the 
  183         last values up to the length of ``inputData``. 
  185         Fill with `self.config.id_name` if specified, otherwise use the 
  186         global running counter value. 
  190         inputData : `numpy.ndarray` 
  191             The input data that is being processed. 
  192         catalog : `lsst.afw.table.SimpleCatalog` 
  193             The output catalog to fill the ids. 
  196         size = len(inputData)
 
  198             catalog[
'id'][-size:] = inputData[self.
config.id_name]
 
  200             idEnd = COUNTER.value + size
 
  201             catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
 
  202             COUNTER.value = idEnd
 
  205         """Get a catalog from disk or create it if it doesn't exist. 
  210             Identifier for catalog to retrieve 
  211         schema : `lsst.afw.table.Schema` 
  212             Schema to use in catalog creation it does not exist. 
  214             The number of new elements that will be added to the catalog, 
  215             so space can be preallocated. 
  219         catalog : `lsst.afw.table.SimpleCatalog` 
  220             The new or read-and-resized catalog specified by `dataId`. 
  223         if os.path.isfile(self.
filenames[pixelId]):
 
  224             catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
 
  225             catalog.resize(len(catalog) + nNewElements)
 
  226             return catalog.copy(deep=
True)  
 
  228         catalog.resize(nNewElements)
 
  234         """Create an ICRS coord. from a row of a catalog being ingested. 
  238         row : `numpy.ndarray` 
  239             Row from catalog being ingested. 
  241             Name of RA key in catalog being ingested. 
  243             Name of Dec key in catalog being ingested. 
  247         coord : `lsst.geom.SpherePoint` 
  252     def _getCoordErr(self, inputData, ):
 
  253         """Compute the ra/dec error fields that will go into the output catalog. 
  257         inputData : `numpy.ndarray` 
  258             The input data to compute fluxes for. 
  262         coordErr : `dict` [`str`, `numpy.ndarray`] 
  263             The values that will go into the coord_raErr, coord_decErr, fields 
  264             in the output catalog (in radians). 
  268         This does not currently handle the ra/dec covariance field, 
  269         ``coord_ra_dec_Cov``. That field may require extra work, as its units 
  270         may be more complicated in external catalogs. 
  273         result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
 
  275         result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
 
  279     def _setFlags(self, record, row):
 
  280         """Set flags in an output record. 
  284         record : `lsst.afw.table.SimpleRecord` 
  285             Row from indexed catalog to modify. 
  286         row : `numpy.ndarray` 
  287             Row from catalog being ingested. 
  289         names = record.schema.getNames()
 
  292                 attr_name = 
'is_{}_name'.
format(flag)
 
  293                 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
 
  295     def _getFluxes(self, inputData):
 
  296         """Compute the flux fields that will go into the output catalog. 
  300         inputData : `numpy.ndarray` 
  301             The input data to compute fluxes for. 
  305         fluxes : `dict` [`str`, `numpy.ndarray`] 
  306             The values that will go into the flux and fluxErr fields in the 
  310         for item 
in self.
config.mag_column_list:
 
  311             result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
 
  312         if len(self.
config.mag_err_column_map) > 0:
 
  313             for err_key 
in self.
config.mag_err_column_map.keys():
 
  314                 error_col_name = self.
config.mag_err_column_map[err_key]
 
  318                                               inputData[err_key].copy())*1e9
 
  319                 result[err_key+
'_fluxErr'] = fluxErr
 
  322     def _setProperMotion(self, record, row):
 
  323         """Set proper motion fields in a record of an indexed catalog. 
  325         The proper motions are read from the specified columns, 
  326         scaled appropriately, and installed in the appropriate 
  327         columns of the output. 
  331         record : `lsst.afw.table.SimpleRecord` 
  332             Row from indexed catalog to modify. 
  333         row : structured `numpy.array` 
  334             Row from catalog being ingested. 
  336         if self.
config.pm_ra_name 
is None:  
 
  338         radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
 
  339         record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
 
  340         record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
 
  342         if self.
config.pm_ra_err_name 
is not None:  
 
  343             record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
 
  344             record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
 
  346     def _setParallax(self, record, row):
 
  347         """Set the parallax fields in a record of a refcat. 
  349         if self.
config.parallax_name 
is None:
 
  351         scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
 
  352         record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
 
  353         record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
 
  355     def _epochToMjdTai(self, nativeEpoch):
 
  356         """Convert an epoch in native format to TAI MJD (a float). 
  358         return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
 
  359                                  scale=self.
config.epoch_scale).tai.mjd
 
  361     def _setExtra(self, record, row):
 
  362         """Set extra data fields in a record of an indexed catalog. 
  366         record : `lsst.afw.table.SimpleRecord` 
  367             Row from indexed catalog to modify. 
  368         row : structured `numpy.array` 
  369             Row from catalog being ingested. 
  371         for extra_col 
in self.
config.extra_col_names:
 
  372             value = row[extra_col]
 
  380             if isinstance(value, np.str_):
 
  382             record.set(self.
key_map[extra_col], value)
 
  384     def _fillRecord(self, record, row):
 
  385         """Fill a record in an indexed catalog to be persisted. 
  389         record : `lsst.afw.table.SimpleRecord` 
  390             Row from indexed catalog to modify. 
  391         row : structured `numpy.array` 
  392             Row from catalog being ingested. 
  403     """Special-case ingest manager to deal with Gaia fluxes. 
  405     def _getFluxes(self, input):
 
  408         def gaiaFluxToFlux(flux, zeroPoint):
 
  409             """Equations 5.19 and 5.30 from the Gaia calibration document define the 
  410             conversion from Gaia electron/second fluxes to AB magnitudes. 
  411             https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html 
  413             result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
 
  415             result[flux == 0] = 0
 
  420         with np.errstate(invalid=
'ignore', divide=
'ignore'):
 
  423             result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
 
  424             result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
 
  425             result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
 
  427         result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
 
  428         result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
 
  429         result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']