22 __all__ = [
"IngestIndexManager",
"IngestGaiaManager"]
24 from ctypes
import c_int
27 import multiprocessing
30 import astropy.units
as u
40 COUNTER = multiprocessing.Value(c_int, 0)
42 FILE_PROGRESS = multiprocessing.Value(c_int, 0)
47 Ingest a reference catalog from external files into a butler repository,
48 using a multiprocessing Pool to speed up the work.
52 filenames : `dict` [`int`, `str`]
53 The HTM pixel id and filenames to ingest the catalog into.
54 config : `lsst.meas.algorithms.IngestIndexedReferenceConfig`
55 The Task configuration holding the field names.
56 file_reader : `lsst.pipe.base.Task`
57 The file reader to use to load the files.
58 indexer : `lsst.meas.algorithms.HtmIndexer`
59 The class used to compute the HTM pixel per coordinate.
60 schema : `lsst.afw.table.Schema`
61 The schema of the output catalog.
62 key_map : `dict` [`str`, `lsst.afw.table.Key`]
63 The mapping from output field names to keys in the Schema.
64 htmRange : `tuple` [`int`]
65 The start and end HTM pixel ids.
66 addRefCatMetadata : callable
67 A function called to add extra metadata to each output Catalog.
69 The log to send messages to.
71 _flags = [
'photometric',
'resolved',
'variable']
73 def __init__(self, filenames, config, file_reader, indexer,
74 schema, key_map, htmRange, addRefCatMetadata, log):
84 if self.
configconfig.coord_err_unit
is not None:
88 def run(self, inputFiles):
89 """Index a set of input files from a reference catalog, and write the
90 output to the appropriate filenames, in parallel.
95 A list of file paths to read data from.
97 global COUNTER, FILE_PROGRESS
100 with multiprocessing.Manager()
as manager:
102 FILE_PROGRESS.value = 0
103 fileLocks = manager.dict()
106 fileLocks[i] = manager.Lock()
107 self.
loglog.
info(
"File locks created.")
108 with multiprocessing.Pool(self.
configconfig.n_processes)
as pool:
109 pool.starmap(self.
_ingestOneFile_ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
111 def _ingestOneFile(self, filename, fileLocks):
112 """Read and process one file, and write its records to the correct
113 indexed files, while handling exceptions in a useful way so that they
114 don't get swallowed by the multiprocess pool.
120 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
121 A Lock for each HTM pixel; each pixel gets one file written, and
122 we need to block when one process is accessing that file.
128 matchedPixels = self.
indexerindexer.indexPoints(inputData[self.
configconfig.ra_name],
129 inputData[self.
configconfig.dec_name])
130 pixel_ids =
set(matchedPixels)
131 for pixelId
in pixel_ids:
132 with fileLocks[pixelId]:
133 self.
_doOnePixel_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
134 with FILE_PROGRESS.get_lock():
135 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFilesnInputFiles
136 FILE_PROGRESS.value += 1
137 percent = 100 * FILE_PROGRESS.value / self.
nInputFilesnInputFiles
139 if np.floor(percent) - np.floor(oldPercent) >= 1:
140 self.
loglog.
info(
"Completed %d / %d files: %d %% complete ",
145 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
146 """Process one HTM pixel, appending to an existing catalog or creating
147 a new catalog, as needed.
151 inputData : `numpy.ndarray`
152 The data from one input file.
153 matchedPixels : `numpy.ndarray`
154 The row-matched pixel indexes corresponding to ``inputData``.
156 The pixel index we are currently processing.
157 fluxes : `dict` [`str`, `numpy.ndarray`]
158 The values that will go into the flux and fluxErr fields in the
160 coordErr : `dict` [`str`, `numpy.ndarray`]
161 The values that will go into the coord_raErr, coord_decErr, and
162 coord_ra_dec_Cov fields in the output catalog (in radians).
164 idx = np.where(matchedPixels == pixelId)[0]
166 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
170 with COUNTER.get_lock():
171 self.
_setIds_setIds(inputData[idx], catalog)
174 for name, array
in fluxes.items():
175 catalog[self.
key_mapkey_map[name]][-len(idx):] = array[idx]
178 for name, array
in coordErr.items():
179 catalog[name][-len(idx):] = array[idx]
181 catalog.writeFits(self.
filenamesfilenames[pixelId])
183 def _setIds(self, inputData, catalog):
184 """Fill the `id` field of catalog with a running index, filling the
185 last values up to the length of ``inputData``.
187 Fill with `self.config.id_name` if specified, otherwise use the
188 global running counter value.
192 inputData : `numpy.ndarray`
193 The input data that is being processed.
194 catalog : `lsst.afw.table.SimpleCatalog`
195 The output catalog to fill the ids.
198 size = len(inputData)
199 if self.
configconfig.id_name:
200 catalog[
'id'][-size:] = inputData[self.
configconfig.id_name]
202 idEnd = COUNTER.value + size
203 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
204 COUNTER.value = idEnd
207 """Get a catalog from disk or create it if it doesn't exist.
212 Identifier for catalog to retrieve
213 schema : `lsst.afw.table.Schema`
214 Schema to use in catalog creation it does not exist.
216 The number of new elements that will be added to the catalog,
217 so space can be preallocated.
221 catalog : `lsst.afw.table.SimpleCatalog`
222 The new or read-and-resized catalog specified by `dataId`.
225 if os.path.isfile(self.
filenamesfilenames[pixelId]):
226 catalog = afwTable.SimpleCatalog.readFits(self.
filenamesfilenames[pixelId])
227 catalog.resize(len(catalog) + nNewElements)
228 return catalog.copy(deep=
True)
230 catalog.resize(nNewElements)
236 """Create an ICRS coord. from a row of a catalog being ingested.
240 row : `numpy.ndarray`
241 Row from catalog being ingested.
243 Name of RA key in catalog being ingested.
245 Name of Dec key in catalog being ingested.
249 coord : `lsst.geom.SpherePoint`
254 def _getCoordErr(self, inputData, ):
255 """Compute the ra/dec error fields that will go into the output catalog.
259 inputData : `numpy.ndarray`
260 The input data to compute fluxes for.
264 coordErr : `dict` [`str`, `numpy.ndarray`]
265 The values that will go into the coord_raErr, coord_decErr, fields
266 in the output catalog (in radians).
270 This does not currently handle the ra/dec covariance field,
271 ``coord_ra_dec_Cov``. That field may require extra work, as its units
272 may be more complicated in external catalogs.
275 if hasattr(self,
"coord_err_unit"):
276 result[
'coord_raErr'] = u.Quantity(inputData[self.
configconfig.ra_err_name],
278 result[
'coord_decErr'] = u.Quantity(inputData[self.
configconfig.dec_err_name],
282 def _setFlags(self, record, row):
283 """Set flags in an output record.
287 record : `lsst.afw.table.SimpleRecord`
288 Row from indexed catalog to modify.
289 row : `numpy.ndarray`
290 Row from catalog being ingested.
292 names = record.schema.getNames()
293 for flag
in self.
_flags_flags:
295 attr_name =
'is_{}_name'.
format(flag)
296 record.set(self.
key_mapkey_map[flag], bool(row[getattr(self.
configconfig, attr_name)]))
298 def _getFluxes(self, inputData):
299 """Compute the flux fields that will go into the output catalog.
303 inputData : `numpy.ndarray`
304 The input data to compute fluxes for.
308 fluxes : `dict` [`str`, `numpy.ndarray`]
309 The values that will go into the flux and fluxErr fields in the
313 for item
in self.
configconfig.mag_column_list:
314 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
315 if len(self.
configconfig.mag_err_column_map) > 0:
316 for err_key
in self.
configconfig.mag_err_column_map.keys():
317 error_col_name = self.
configconfig.mag_err_column_map[err_key]
321 inputData[err_key].copy())*1e9
322 result[err_key+
'_fluxErr'] = fluxErr
325 def _setProperMotion(self, record, row):
326 """Set proper motion fields in a record of an indexed catalog.
328 The proper motions are read from the specified columns,
329 scaled appropriately, and installed in the appropriate
330 columns of the output.
334 record : `lsst.afw.table.SimpleRecord`
335 Row from indexed catalog to modify.
336 row : structured `numpy.array`
337 Row from catalog being ingested.
339 if self.
configconfig.pm_ra_name
is None:
341 radPerOriginal = np.radians(self.
configconfig.pm_scale)/(3600*1000)
342 record.set(self.
key_mapkey_map[
"pm_ra"], row[self.
configconfig.pm_ra_name]*radPerOriginal*lsst.geom.radians)
343 record.set(self.
key_mapkey_map[
"pm_dec"], row[self.
configconfig.pm_dec_name]*radPerOriginal*lsst.geom.radians)
345 if self.
configconfig.pm_ra_err_name
is not None:
346 record.set(self.
key_mapkey_map[
"pm_raErr"], row[self.
configconfig.pm_ra_err_name]*radPerOriginal)
347 record.set(self.
key_mapkey_map[
"pm_decErr"], row[self.
configconfig.pm_dec_err_name]*radPerOriginal)
349 def _setParallax(self, record, row):
350 """Set the parallax fields in a record of a refcat.
352 if self.
configconfig.parallax_name
is None:
354 scale = self.
configconfig.parallax_scale*lsst.geom.milliarcseconds
355 record.set(self.
key_mapkey_map[
'parallax'], row[self.
configconfig.parallax_name]*scale)
356 record.set(self.
key_mapkey_map[
'parallaxErr'], row[self.
configconfig.parallax_err_name]*scale)
358 def _epochToMjdTai(self, nativeEpoch):
359 """Convert an epoch in native format to TAI MJD (a float).
361 return astropy.time.Time(nativeEpoch, format=self.
configconfig.epoch_format,
362 scale=self.
configconfig.epoch_scale).tai.mjd
364 def _setExtra(self, record, row):
365 """Set extra data fields in a record of an indexed catalog.
369 record : `lsst.afw.table.SimpleRecord`
370 Row from indexed catalog to modify.
371 row : structured `numpy.array`
372 Row from catalog being ingested.
374 for extra_col
in self.
configconfig.extra_col_names:
375 value = row[extra_col]
383 if isinstance(value, np.str_):
385 record.set(self.
key_mapkey_map[extra_col], value)
387 def _fillRecord(self, record, row):
388 """Fill a record in an indexed catalog to be persisted.
392 record : `lsst.afw.table.SimpleRecord`
393 Row from indexed catalog to modify.
394 row : structured `numpy.array`
395 Row from catalog being ingested.
406 """Special-case ingest manager to deal with Gaia fluxes.
408 def _getFluxes(self, input):
411 def gaiaFluxToFlux(flux, zeroPoint):
412 """Equations 5.19 and 5.30 from the Gaia calibration document define the
413 conversion from Gaia electron/second fluxes to AB magnitudes.
414 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
416 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
418 result[flux == 0] = 0
423 with np.errstate(invalid=
'ignore', divide=
'ignore'):
426 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
427 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
428 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
430 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
431 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
432 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Point in an unspecified spherical coordinate system.
def getCatalog(self, pixelId, schema, nNewElements)
def _setProperMotion(self, record, row)
def _getCoordErr(self, inputData)
def _getFluxes(self, inputData)
def _ingestOneFile(self, filename, fileLocks)
def run(self, inputFiles)
def _setIds(self, inputData, catalog)
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
def _setParallax(self, record, row)
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
def computeCoord(row, ra_name, dec_name)
def _epochToMjdTai(self, nativeEpoch)
def _fillRecord(self, record, row)
def _setFlags(self, record, row)
def _setExtra(self, record, row)
daf::base::PropertySet * set
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)