22 __all__ = [
"IngestIndexManager",
"IngestGaiaManager"]
26 import multiprocessing
29 import astropy.units
as u
46 Ingest a reference catalog from external files into a butler repository,
47 using a multiprocessing Pool to speed up the work.
51 filenames : `dict` [`int`, `str`]
52 The HTM pixel id and filenames to ingest the catalog into.
53 config : `lsst.meas.algorithms.IngestIndexedReferenceConfig`
54 The Task configuration holding the field names.
55 file_reader : `lsst.pipe.base.Task`
56 The file reader to use to load the files.
57 indexer : `lsst.meas.algorithms.HtmIndexer`
58 The class used to compute the HTM pixel per coordinate.
59 schema : `lsst.afw.table.Schema`
60 The schema of the output catalog.
61 key_map : `dict` [`str`, `lsst.afw.table.Key`]
62 The mapping from output field names to keys in the Schema.
63 htmRange : `tuple` [`int`]
64 The start and end HTM pixel ids.
65 addRefCatMetadata : callable
66 A function called to add extra metadata to each output Catalog.
68 The log to send messages to.
70 _flags = [
'photometric',
'resolved',
'variable']
72 def __init__(self, filenames, config, file_reader, indexer,
73 schema, key_map, htmRange, addRefCatMetadata, log):
86 def run(self, inputFiles):
87 """Index a set of input files from a reference catalog, and write the
88 output to the appropriate filenames, in parallel.
93 A list of file paths to read data from.
95 global COUNTER, FILE_PROGRESS
98 with multiprocessing.Manager()
as manager:
99 COUNTER = multiprocessing.Value(
'i', 0)
100 FILE_PROGRESS = multiprocessing.Value(
'i', 0)
101 fileLocks = manager.dict()
104 fileLocks[i] = manager.Lock()
105 self.
log.
info(
"File locks created.")
106 with multiprocessing.Pool(self.
config.n_processes)
as pool:
107 pool.starmap(self.
_ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
109 def _ingestOneFile(self, filename, fileLocks):
110 """Read and process one file, and write its records to the correct
111 indexed files, while handling exceptions in a useful way so that they
112 don't get swallowed by the multiprocess pool.
118 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
119 A Lock for each HTM pixel; each pixel gets one file written, and
120 we need to block when one process is accessing that file.
126 matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
127 inputData[self.
config.dec_name])
128 pixel_ids =
set(matchedPixels)
129 for pixelId
in pixel_ids:
130 with fileLocks[pixelId]:
131 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
132 with FILE_PROGRESS.get_lock():
133 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles
134 FILE_PROGRESS.value += 1
135 percent = 100 * FILE_PROGRESS.value / self.
nInputFiles
137 if np.floor(percent) - np.floor(oldPercent) >= 1:
138 self.
log.
info(
"Completed %d / %d files: %d %% complete ",
143 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
144 """Process one HTM pixel, appending to an existing catalog or creating
145 a new catalog, as needed.
149 inputData : `numpy.ndarray`
150 The data from one input file.
151 matchedPixels : `numpy.ndarray`
152 The row-matched pixel indexes corresponding to ``inputData``.
154 The pixel index we are currently processing.
155 fluxes : `dict` [`str`, `numpy.ndarray`]
156 The values that will go into the flux and fluxErr fields in the
158 coordErr : `dict` [`str`, `numpy.ndarray`]
159 The values that will go into the coord_raErr, coord_decErr, and
160 coord_ra_dec_Cov fields in the output catalog (in radians).
162 idx = np.where(matchedPixels == pixelId)[0]
164 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
168 with COUNTER.get_lock():
169 self.
_setIds(inputData[idx], catalog)
172 for name, array
in fluxes.items():
173 catalog[self.
key_map[name]][-len(idx):] = array[idx]
176 for name, array
in coordErr.items():
177 catalog[name][-len(idx):] = array[idx]
179 catalog.writeFits(self.
filenames[pixelId])
181 def _setIds(self, inputData, catalog):
182 """Fill the `id` field of catalog with a running index, filling the
183 last values up to the length of ``inputData``.
185 Fill with `self.config.id_name` if specified, otherwise use the
186 global running counter value.
190 inputData : `numpy.ndarray`
191 The input data that is being processed.
192 catalog : `lsst.afw.table.SimpleCatalog`
193 The output catalog to fill the ids.
196 size = len(inputData)
198 catalog[
'id'][-size:] = inputData[self.
config.id_name]
200 idEnd = COUNTER.value + size
201 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
202 COUNTER.value = idEnd
205 """Get a catalog from disk or create it if it doesn't exist.
210 Identifier for catalog to retrieve
211 schema : `lsst.afw.table.Schema`
212 Schema to use in catalog creation it does not exist.
214 The number of new elements that will be added to the catalog,
215 so space can be preallocated.
219 catalog : `lsst.afw.table.SimpleCatalog`
220 The new or read-and-resized catalog specified by `dataId`.
223 if os.path.isfile(self.
filenames[pixelId]):
224 catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
225 catalog.resize(len(catalog) + nNewElements)
226 return catalog.copy(deep=
True)
228 catalog.resize(nNewElements)
234 """Create an ICRS coord. from a row of a catalog being ingested.
238 row : `numpy.ndarray`
239 Row from catalog being ingested.
241 Name of RA key in catalog being ingested.
243 Name of Dec key in catalog being ingested.
247 coord : `lsst.geom.SpherePoint`
252 def _getCoordErr(self, inputData, ):
253 """Compute the ra/dec error fields that will go into the output catalog.
257 inputData : `numpy.ndarray`
258 The input data to compute fluxes for.
262 coordErr : `dict` [`str`, `numpy.ndarray`]
263 The values that will go into the coord_raErr, coord_decErr, fields
264 in the output catalog (in radians).
268 This does not currently handle the ra/dec covariance field,
269 ``coord_ra_dec_Cov``. That field may require extra work, as its units
270 may be more complicated in external catalogs.
273 result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
275 result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
279 def _setFlags(self, record, row):
280 """Set flags in an output record.
284 record : `lsst.afw.table.SimpleRecord`
285 Row from indexed catalog to modify.
286 row : `numpy.ndarray`
287 Row from catalog being ingested.
289 names = record.schema.getNames()
292 attr_name =
'is_{}_name'.
format(flag)
293 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
295 def _getFluxes(self, inputData):
296 """Compute the flux fields that will go into the output catalog.
300 inputData : `numpy.ndarray`
301 The input data to compute fluxes for.
305 fluxes : `dict` [`str`, `numpy.ndarray`]
306 The values that will go into the flux and fluxErr fields in the
310 for item
in self.
config.mag_column_list:
311 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
312 if len(self.
config.mag_err_column_map) > 0:
313 for err_key
in self.
config.mag_err_column_map.keys():
314 error_col_name = self.
config.mag_err_column_map[err_key]
318 inputData[err_key].copy())*1e9
319 result[err_key+
'_fluxErr'] = fluxErr
322 def _setProperMotion(self, record, row):
323 """Set proper motion fields in a record of an indexed catalog.
325 The proper motions are read from the specified columns,
326 scaled appropriately, and installed in the appropriate
327 columns of the output.
331 record : `lsst.afw.table.SimpleRecord`
332 Row from indexed catalog to modify.
333 row : structured `numpy.array`
334 Row from catalog being ingested.
336 if self.
config.pm_ra_name
is None:
338 radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
339 record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
340 record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
342 if self.
config.pm_ra_err_name
is not None:
343 record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
344 record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
346 def _setParallax(self, record, row):
347 """Set the parallax fields in a record of a refcat.
349 if self.
config.parallax_name
is None:
351 scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
352 record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
353 record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
355 def _epochToMjdTai(self, nativeEpoch):
356 """Convert an epoch in native format to TAI MJD (a float).
358 return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
359 scale=self.
config.epoch_scale).tai.mjd
361 def _setExtra(self, record, row):
362 """Set extra data fields in a record of an indexed catalog.
366 record : `lsst.afw.table.SimpleRecord`
367 Row from indexed catalog to modify.
368 row : structured `numpy.array`
369 Row from catalog being ingested.
371 for extra_col
in self.
config.extra_col_names:
372 value = row[extra_col]
380 if isinstance(value, np.str_):
382 record.set(self.
key_map[extra_col], value)
384 def _fillRecord(self, record, row):
385 """Fill a record in an indexed catalog to be persisted.
389 record : `lsst.afw.table.SimpleRecord`
390 Row from indexed catalog to modify.
391 row : structured `numpy.array`
392 Row from catalog being ingested.
403 """Special-case ingest manager to deal with Gaia fluxes.
405 def _getFluxes(self, input):
408 def gaiaFluxToFlux(flux, zeroPoint):
409 """Equations 5.19 and 5.30 from the Gaia calibration document define the
410 conversion from Gaia electron/second fluxes to AB magnitudes.
411 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
413 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
415 result[flux == 0] = 0
420 with np.errstate(invalid=
'ignore', divide=
'ignore'):
423 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
424 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
425 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
427 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
428 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
429 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']