22 __all__ = [
"IngestIndexManager",
"IngestGaiaManager"]
26 import multiprocessing
29 import astropy.units
as u
46 Ingest a reference catalog from external files into a butler repository, 47 using a multiprocessing Pool to speed up the work. 51 filenames : `dict` [`int`, `str`] 52 The HTM pixel id and filenames to ingest the catalog into. 53 config : `lsst.meas.algorithms.IngestIndexedReferenceConfig` 54 The Task configuration holding the field names. 55 file_reader : `lsst.pipe.base.Task` 56 The file reader to use to load the files. 57 indexer : `lsst.meas.algorithms.HtmIndexer` 58 The class used to compute the HTM pixel per coordinate. 59 schema : `lsst.afw.table.Schema` 60 The schema of the output catalog. 61 key_map : `dict` [`str`, `lsst.afw.table.Key`] 62 The mapping from output field names to keys in the Schema. 63 htmRange : `tuple` [`int`] 64 The start and end HTM pixel ids. 65 addRefCatMetadata : callable 66 A function called to add extra metadata to each output Catalog. 68 The log to send messages to. 70 _flags = [
'photometric',
'resolved',
'variable']
72 def __init__(self, filenames, config, file_reader, indexer,
73 schema, key_map, htmRange, addRefCatMetadata, log):
84 def run(self, inputFiles):
85 """Index a set of input files from a reference catalog, and write the 86 output to the appropriate filenames, in parallel. 91 A list of file paths to read data from. 93 global COUNTER, FILE_PROGRESS
96 with multiprocessing.Manager()
as manager:
97 COUNTER = multiprocessing.Value(
'i', 0)
98 FILE_PROGRESS = multiprocessing.Value(
'i', 0)
99 fileLocks = manager.dict()
102 fileLocks[i] = manager.Lock()
103 self.
log.
info(
"File locks created.")
104 with multiprocessing.Pool(self.
config.n_processes)
as pool:
105 pool.starmap(self.
_ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
107 def _ingestOneFile(self, filename, fileLocks):
108 """Read and process one file, and write its records to the correct 109 indexed files, while handling exceptions in a useful way so that they 110 don't get swallowed by the multiprocess pool. 116 fileLocks : `dict` [`int`, `multiprocessing.Lock`] 117 A Lock for each HTM pixel; each pixel gets one file written, and 118 we need to block when one process is accessing that file. 123 matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
124 inputData[self.
config.dec_name])
125 pixel_ids =
set(matchedPixels)
126 for pixelId
in pixel_ids:
127 with fileLocks[pixelId]:
128 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes)
129 with FILE_PROGRESS.get_lock():
130 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles 131 FILE_PROGRESS.value += 1
132 percent = 100 * FILE_PROGRESS.value / self.
nInputFiles 134 if np.floor(percent) - np.floor(oldPercent) >= 1:
135 self.
log.
info(
"Completed %d / %d files: %d %% complete ",
140 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes):
141 """Process one HTM pixel, appending to an existing catalog or creating 142 a new catalog, as needed. 146 inputData : `numpy.ndarray` 147 The data from one input file. 148 matchedPixels : `numpy.ndarray` 149 The row-matched pixel indexes corresponding to ``inputData``. 151 The pixel index we are currently processing. 152 fluxes : `dict` [`str`, `numpy.ndarray`] 153 The values that will go into the flux and fluxErr fields in the 156 idx = np.where(matchedPixels == pixelId)[0]
158 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
162 with COUNTER.get_lock():
163 self.
_setIds(inputData[idx], catalog)
165 for name, array
in fluxes.items():
166 catalog[self.
key_map[name]][-len(idx):] = array[idx]
168 catalog.writeFits(self.
filenames[pixelId])
170 def _setIds(self, inputData, catalog):
171 """Fill the `id` field of catalog with a running index, filling the 172 last values up to the length of ``inputData``. 174 Fill with `self.config.id_name` if specified, otherwise use the 175 global running counter value. 179 inputData : `numpy.ndarray` 180 The input data that is being processed. 181 catalog : `lsst.afw.table.SimpleCatalog` 182 The output catalog to fill the ids. 185 size = len(inputData)
187 catalog[
'id'][-size:] = inputData[self.
config.id_name]
189 idEnd = COUNTER.value + size
190 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
191 COUNTER.value = idEnd
194 """Get a catalog from disk or create it if it doesn't exist. 199 Identifier for catalog to retrieve 200 schema : `lsst.afw.table.Schema` 201 Schema to use in catalog creation it does not exist. 203 The number of new elements that will be added to the catalog, 204 so space can be preallocated. 208 catalog : `lsst.afw.table.SimpleCatalog` 209 The new or read-and-resized catalog specified by `dataId`. 212 if os.path.isfile(self.
filenames[pixelId]):
213 catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
214 catalog.resize(len(catalog) + nNewElements)
215 return catalog.copy(deep=
True)
217 catalog.resize(nNewElements)
223 """Create an ICRS coord. from a row of a catalog being ingested. 227 row : `numpy.ndarray` 228 Row from catalog being ingested. 230 Name of RA key in catalog being ingested. 232 Name of Dec key in catalog being ingested. 236 coord : `lsst.geom.SpherePoint` 241 def _setCoordErr(self, record, row):
242 """Set coordinate error in a record of an indexed catalog. 244 The errors are read from the specified columns, and installed 245 in the appropriate columns of the output. 249 record : `lsst.afw.table.SimpleRecord` 250 Row from indexed catalog to modify. 251 row : `numpy.ndarray` 252 Row from catalog being ingested. 254 if self.
config.ra_err_name:
255 record.set(self.
key_map[
"coord_raErr"], np.radians(row[self.
config.ra_err_name]))
256 record.set(self.
key_map[
"coord_decErr"], np.radians(row[self.
config.dec_err_name]))
258 def _setFlags(self, record, row):
259 """Set flags in an output record. 263 record : `lsst.afw.table.SimpleRecord` 264 Row from indexed catalog to modify. 265 row : `numpy.ndarray` 266 Row from catalog being ingested. 268 names = record.schema.getNames()
271 attr_name =
'is_{}_name'.
format(flag)
272 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
274 def _getFluxes(self, inputData):
275 """Compute the flux fields that will go into the output catalog. 279 inputData : `numpy.ndarray` 280 The input data to compute fluxes for. 284 fluxes : `dict` [`str`, `numpy.ndarray`] 285 The values that will go into the flux and fluxErr fields in the 289 for item
in self.
config.mag_column_list:
290 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
291 if len(self.
config.mag_err_column_map) > 0:
292 for err_key
in self.
config.mag_err_column_map.keys():
293 error_col_name = self.
config.mag_err_column_map[err_key]
297 inputData[err_key].copy())*1e9
298 result[err_key+
'_fluxErr'] = fluxErr
301 def _setProperMotion(self, record, row):
302 """Set proper motion fields in a record of an indexed catalog. 304 The proper motions are read from the specified columns, 305 scaled appropriately, and installed in the appropriate 306 columns of the output. 310 record : `lsst.afw.table.SimpleRecord` 311 Row from indexed catalog to modify. 312 row : structured `numpy.array` 313 Row from catalog being ingested. 315 if self.
config.pm_ra_name
is None:
317 radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
318 record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
319 record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
321 if self.
config.pm_ra_err_name
is not None:
322 record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
323 record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
325 def _setParallax(self, record, row):
326 """Set the parallax fields in a record of a refcat. 328 if self.
config.parallax_name
is None:
330 scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
331 record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
332 record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
334 def _epochToMjdTai(self, nativeEpoch):
335 """Convert an epoch in native format to TAI MJD (a float). 337 return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
338 scale=self.
config.epoch_scale).tai.mjd
340 def _setExtra(self, record, row):
341 """Set extra data fields in a record of an indexed catalog. 345 record : `lsst.afw.table.SimpleRecord` 346 Row from indexed catalog to modify. 347 row : structured `numpy.array` 348 Row from catalog being ingested. 350 for extra_col
in self.
config.extra_col_names:
351 value = row[extra_col]
359 if isinstance(value, np.str_):
361 record.set(self.
key_map[extra_col], value)
363 def _fillRecord(self, record, row):
364 """Fill a record in an indexed catalog to be persisted. 368 record : `lsst.afw.table.SimpleRecord` 369 Row from indexed catalog to modify. 370 row : structured `numpy.array` 371 Row from catalog being ingested. 383 """Special-case ingest manager to deal with Gaia fluxes. 385 def _getFluxes(self, input):
388 def gaiaFluxToFlux(flux, zeroPoint):
389 """Equations 5.19 and 5.30 from the Gaia calibration document define the 390 conversion from Gaia electron/second fluxes to AB magnitudes. 391 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html 393 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
395 result[flux == 0] = 0
400 with np.errstate(invalid=
'ignore', divide=
'ignore'):
403 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
404 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
405 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
407 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
408 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
409 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
def _setProperMotion(self, record, row)
def _setCoordErr(self, record, row)
def _setExtra(self, record, row)
def computeCoord(row, ra_name, dec_name)
def getCatalog(self, pixelId, schema, nNewElements)
def _setFlags(self, record, row)
daf::base::PropertySet * set
def _setIds(self, inputData, catalog)
def _fillRecord(self, record, row)
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
def _getFluxes(self, inputData)
def _ingestOneFile(self, filename, fileLocks)
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
Point in an unspecified spherical coordinate system.
def run(self, inputFiles)
def _epochToMjdTai(self, nativeEpoch)
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes)
def _setParallax(self, record, row)