22 __all__ = [
"IngestIndexManager",
"IngestGaiaManager"]
26 import multiprocessing
29 import astropy.units
as u
46 Ingest a reference catalog from external files into a butler repository,
47 using a multiprocessing Pool to speed up the work.
51 filenames : `dict` [`int`, `str`]
52 The HTM pixel id and filenames to ingest the catalog into.
53 config : `lsst.meas.algorithms.IngestIndexedReferenceConfig`
54 The Task configuration holding the field names.
55 file_reader : `lsst.pipe.base.Task`
56 The file reader to use to load the files.
57 indexer : `lsst.meas.algorithms.HtmIndexer`
58 The class used to compute the HTM pixel per coordinate.
59 schema : `lsst.afw.table.Schema`
60 The schema of the output catalog.
61 key_map : `dict` [`str`, `lsst.afw.table.Key`]
62 The mapping from output field names to keys in the Schema.
63 htmRange : `tuple` [`int`]
64 The start and end HTM pixel ids.
65 addRefCatMetadata : callable
66 A function called to add extra metadata to each output Catalog.
68 The log to send messages to.
70 _flags = [
'photometric',
'resolved',
'variable']
72 def __init__(self, filenames, config, file_reader, indexer,
73 schema, key_map, htmRange, addRefCatMetadata, log):
83 if self.
config.coord_err_unit
is not None:
87 def run(self, inputFiles):
88 """Index a set of input files from a reference catalog, and write the
89 output to the appropriate filenames, in parallel.
94 A list of file paths to read data from.
96 global COUNTER, FILE_PROGRESS
99 with multiprocessing.Manager()
as manager:
100 COUNTER = multiprocessing.Value(
'i', 0)
101 FILE_PROGRESS = multiprocessing.Value(
'i', 0)
102 fileLocks = manager.dict()
105 fileLocks[i] = manager.Lock()
106 self.
log.
info(
"File locks created.")
107 with multiprocessing.Pool(self.
config.n_processes)
as pool:
108 pool.starmap(self.
_ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
110 def _ingestOneFile(self, filename, fileLocks):
111 """Read and process one file, and write its records to the correct
112 indexed files, while handling exceptions in a useful way so that they
113 don't get swallowed by the multiprocess pool.
119 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
120 A Lock for each HTM pixel; each pixel gets one file written, and
121 we need to block when one process is accessing that file.
127 matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
128 inputData[self.
config.dec_name])
129 pixel_ids =
set(matchedPixels)
130 for pixelId
in pixel_ids:
131 with fileLocks[pixelId]:
132 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
133 with FILE_PROGRESS.get_lock():
134 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles
135 FILE_PROGRESS.value += 1
136 percent = 100 * FILE_PROGRESS.value / self.
nInputFiles
138 if np.floor(percent) - np.floor(oldPercent) >= 1:
139 self.
log.
info(
"Completed %d / %d files: %d %% complete ",
144 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
145 """Process one HTM pixel, appending to an existing catalog or creating
146 a new catalog, as needed.
150 inputData : `numpy.ndarray`
151 The data from one input file.
152 matchedPixels : `numpy.ndarray`
153 The row-matched pixel indexes corresponding to ``inputData``.
155 The pixel index we are currently processing.
156 fluxes : `dict` [`str`, `numpy.ndarray`]
157 The values that will go into the flux and fluxErr fields in the
159 coordErr : `dict` [`str`, `numpy.ndarray`]
160 The values that will go into the coord_raErr, coord_decErr, and
161 coord_ra_dec_Cov fields in the output catalog (in radians).
163 idx = np.where(matchedPixels == pixelId)[0]
165 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
169 with COUNTER.get_lock():
170 self.
_setIds(inputData[idx], catalog)
173 for name, array
in fluxes.items():
174 catalog[self.
key_map[name]][-len(idx):] = array[idx]
177 for name, array
in coordErr.items():
178 catalog[name][-len(idx):] = array[idx]
180 catalog.writeFits(self.
filenames[pixelId])
182 def _setIds(self, inputData, catalog):
183 """Fill the `id` field of catalog with a running index, filling the
184 last values up to the length of ``inputData``.
186 Fill with `self.config.id_name` if specified, otherwise use the
187 global running counter value.
191 inputData : `numpy.ndarray`
192 The input data that is being processed.
193 catalog : `lsst.afw.table.SimpleCatalog`
194 The output catalog to fill the ids.
197 size = len(inputData)
199 catalog[
'id'][-size:] = inputData[self.
config.id_name]
201 idEnd = COUNTER.value + size
202 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
203 COUNTER.value = idEnd
206 """Get a catalog from disk or create it if it doesn't exist.
211 Identifier for catalog to retrieve
212 schema : `lsst.afw.table.Schema`
213 Schema to use in catalog creation it does not exist.
215 The number of new elements that will be added to the catalog,
216 so space can be preallocated.
220 catalog : `lsst.afw.table.SimpleCatalog`
221 The new or read-and-resized catalog specified by `dataId`.
224 if os.path.isfile(self.
filenames[pixelId]):
225 catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
226 catalog.resize(len(catalog) + nNewElements)
227 return catalog.copy(deep=
True)
229 catalog.resize(nNewElements)
235 """Create an ICRS coord. from a row of a catalog being ingested.
239 row : `numpy.ndarray`
240 Row from catalog being ingested.
242 Name of RA key in catalog being ingested.
244 Name of Dec key in catalog being ingested.
248 coord : `lsst.geom.SpherePoint`
253 def _getCoordErr(self, inputData, ):
254 """Compute the ra/dec error fields that will go into the output catalog.
258 inputData : `numpy.ndarray`
259 The input data to compute fluxes for.
263 coordErr : `dict` [`str`, `numpy.ndarray`]
264 The values that will go into the coord_raErr, coord_decErr, fields
265 in the output catalog (in radians).
269 This does not currently handle the ra/dec covariance field,
270 ``coord_ra_dec_Cov``. That field may require extra work, as its units
271 may be more complicated in external catalogs.
274 if hasattr(self,
"coord_err_unit"):
275 result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
277 result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
281 def _setFlags(self, record, row):
282 """Set flags in an output record.
286 record : `lsst.afw.table.SimpleRecord`
287 Row from indexed catalog to modify.
288 row : `numpy.ndarray`
289 Row from catalog being ingested.
291 names = record.schema.getNames()
294 attr_name =
'is_{}_name'.
format(flag)
295 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
297 def _getFluxes(self, inputData):
298 """Compute the flux fields that will go into the output catalog.
302 inputData : `numpy.ndarray`
303 The input data to compute fluxes for.
307 fluxes : `dict` [`str`, `numpy.ndarray`]
308 The values that will go into the flux and fluxErr fields in the
312 for item
in self.
config.mag_column_list:
313 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
314 if len(self.
config.mag_err_column_map) > 0:
315 for err_key
in self.
config.mag_err_column_map.keys():
316 error_col_name = self.
config.mag_err_column_map[err_key]
320 inputData[err_key].copy())*1e9
321 result[err_key+
'_fluxErr'] = fluxErr
324 def _setProperMotion(self, record, row):
325 """Set proper motion fields in a record of an indexed catalog.
327 The proper motions are read from the specified columns,
328 scaled appropriately, and installed in the appropriate
329 columns of the output.
333 record : `lsst.afw.table.SimpleRecord`
334 Row from indexed catalog to modify.
335 row : structured `numpy.array`
336 Row from catalog being ingested.
338 if self.
config.pm_ra_name
is None:
340 radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
341 record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
342 record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
344 if self.
config.pm_ra_err_name
is not None:
345 record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
346 record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
348 def _setParallax(self, record, row):
349 """Set the parallax fields in a record of a refcat.
351 if self.
config.parallax_name
is None:
353 scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
354 record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
355 record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
357 def _epochToMjdTai(self, nativeEpoch):
358 """Convert an epoch in native format to TAI MJD (a float).
360 return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
361 scale=self.
config.epoch_scale).tai.mjd
363 def _setExtra(self, record, row):
364 """Set extra data fields in a record of an indexed catalog.
368 record : `lsst.afw.table.SimpleRecord`
369 Row from indexed catalog to modify.
370 row : structured `numpy.array`
371 Row from catalog being ingested.
373 for extra_col
in self.
config.extra_col_names:
374 value = row[extra_col]
382 if isinstance(value, np.str_):
384 record.set(self.
key_map[extra_col], value)
386 def _fillRecord(self, record, row):
387 """Fill a record in an indexed catalog to be persisted.
391 record : `lsst.afw.table.SimpleRecord`
392 Row from indexed catalog to modify.
393 row : structured `numpy.array`
394 Row from catalog being ingested.
405 """Special-case ingest manager to deal with Gaia fluxes.
407 def _getFluxes(self, input):
410 def gaiaFluxToFlux(flux, zeroPoint):
411 """Equations 5.19 and 5.30 from the Gaia calibration document define the
412 conversion from Gaia electron/second fluxes to AB magnitudes.
413 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
415 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
417 result[flux == 0] = 0
422 with np.errstate(invalid=
'ignore', divide=
'ignore'):
425 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
426 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
427 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
429 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
430 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
431 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']