22__all__ = [
"ConvertRefcatManager",
"ConvertGaiaManager",
"ConvertGaiaXpManager"]
24from ctypes
import c_int
31import astropy.units
as u
42COUNTER = multiprocessing.Value(c_int, 0)
44FILE_PROGRESS = multiprocessing.Value(c_int, 0)
48 """Placeholder for ConfigurableField validation; refcat convert is
49 configured by the parent convert Task.
56 Convert a reference catalog from external files into the LSST HTM sharded
57 format, using a multiprocessing Pool to speed up the work.
61 filenames : `dict` [`int`, `str`]
62 The HTM pixel id
and filenames to convert the catalog into.
64 The Task configuration holding the field names.
65 file_reader : `lsst.pipe.base.Task`
66 The file reader to use to load the files.
68 The
class used
to compute the HTM pixel per coordinate.
70 The schema of the output catalog.
72 The mapping
from output field names to keys
in the Schema.
73 htmRange : `tuple` [`int`]
74 The start
and end HTM pixel ids.
75 addRefCatMetadata : callable
76 A function called to add extra metadata to each output Catalog.
78 The log to send messages to.
80 _flags = ['photometric',
'resolved',
'variable']
81 _DefaultName =
'convertRefcatManager'
82 ConfigClass = ConvertRefcatManagerConfig
84 def __init__(self, filenames, config, file_reader, indexer,
85 schema, key_map, htmRange, addRefCatMetadata, log):
96 if self.
config.coord_err_unit
is not None:
100 def run(self, inputFiles):
101 """Index a set of input files from a reference catalog, and write the
102 output to the appropriate filenames, in parallel.
107 A list of file paths to read data
from.
111 output : `dict` [`int`, `str`]
112 The htm ids
and the filenames that were written to.
114 global COUNTER, FILE_PROGRESS
117 with multiprocessing.Manager()
as manager:
119 FILE_PROGRESS.value = 0
120 fileLocks = manager.dict()
123 fileLocks[i] = manager.Lock()
124 self.
log.info(
"File locks created.")
126 start_time = time.perf_counter()
127 with multiprocessing.Pool(self.
config.n_processes)
as pool:
128 result = pool.starmap(self.
_convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
129 end_time = time.perf_counter()
130 self.
log.info(
"Finished writing files. Elapsed time: %.2f seconds", end_time-start_time)
132 return {id: self.
filenames[id]
for item
in result
for id
in item}
135 """Read and process one file, and write its records to the correct
136 indexed files, while handling exceptions
in a useful way so that they
137 don
't get swallowed by the multiprocess pool.
143 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
144 A Lock for each HTM pixel; each pixel gets one file written,
and
145 we need to block when one process
is accessing that file.
149 pixels, files : `list` [`int`]
150 The pixel ids that were written to.
153 self.
log.debug(
"Processing: %s", filename)
158 matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
159 inputData[self.
config.dec_name])
161 self.
log.error(
"Failure preparing data for: %s", filename)
164 pixel_ids =
set(matchedPixels)
165 for pixelId
in pixel_ids:
166 with fileLocks[pixelId]:
168 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
170 self.
log.error(
"Failure processing filename: %s, pixelId: %s",
173 with FILE_PROGRESS.get_lock():
174 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles
175 FILE_PROGRESS.value += 1
176 percent = 100 * FILE_PROGRESS.value / self.
nInputFiles
178 if np.floor(percent) - np.floor(oldPercent) >= 1:
179 self.
log.info(
"Completed %d / %d files: %d %% complete ",
185 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
186 """Process one HTM pixel, appending to an existing catalog or creating
187 a new catalog, as needed.
191 inputData : `numpy.ndarray`
192 The data
from one input file.
193 matchedPixels : `numpy.ndarray`
194 The row-matched pixel indexes corresponding to ``inputData``.
196 The pixel index we are currently processing.
197 fluxes : `dict` [`str`, `numpy.ndarray`]
198 The values that will go into the flux
and fluxErr fields
in the
200 coordErr : `dict` [`str`, `numpy.ndarray`]
201 The values that will go into the coord_raErr, coord_decErr,
and
202 coord_ra_dec_Cov fields
in the output catalog (
in radians).
204 idx = np.where(matchedPixels == pixelId)[0]
206 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
210 with COUNTER.get_lock():
211 self.
_setIds(inputData[idx], catalog)
214 for name, array
in fluxes.items():
215 catalog[self.
key_map[name]][-len(idx):] = array[idx]
218 for name, array
in coordErr.items():
219 catalog[name][-len(idx):] = array[idx]
221 catalog.writeFits(self.
filenames[pixelId])
224 """Fill the `id` field of catalog with a running index, filling the
225 last values up to the length of ``inputData``.
227 Fill with `self.
config.id_name`
if specified, otherwise use the
228 global running counter value.
232 inputData : `numpy.ndarray`
233 The input data that
is being processed.
235 The output catalog to fill the ids.
238 size = len(inputData)
240 catalog[
'id'][-size:] = inputData[self.
config.id_name]
242 idEnd = COUNTER.value + size
243 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
244 COUNTER.value = idEnd
247 """Get a catalog from disk or create it if it doesn't exist.
252 Identifier for catalog to retrieve
254 Schema to use
in catalog creation it does
not exist.
256 The number of new elements that will be added to the catalog,
257 so space can be preallocated.
262 The new
or read-
and-resized catalog specified by `dataId`.
265 if os.path.isfile(self.
filenames[pixelId]):
266 catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
267 catalog.resize(len(catalog) + nNewElements)
268 return catalog.copy(deep=
True)
270 catalog.resize(nNewElements)
276 """Create an ICRS coord. from a row of a catalog being converted.
280 row : `numpy.ndarray`
281 Row from catalog being converted.
283 Name of RA key
in catalog being converted.
285 Name of Dec key
in catalog being converted.
295 """Compute the ra/dec error fields that will go into the output catalog.
299 inputData : `numpy.ndarray`
300 The input data to compute fluxes for.
304 coordErr : `dict` [`str`, `numpy.ndarray`]
305 The values that will go into the coord_raErr, coord_decErr, fields
306 in the output catalog (
in radians).
310 This does
not handle the ra/dec covariance field,
311 ``coord_ra_coord_dec_Cov``. That field
is handled
in
312 `_setCoordinateCovariance`.
315 if hasattr(self,
"coord_err_unit"):
316 result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
318 result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
323 """Set flags in an output record.
328 Row from indexed catalog to modify.
329 row : `numpy.ndarray`
330 Row
from catalog being converted.
332 names = record.schema.getNames()
335 attr_name =
'is_{}_name'.format(flag)
336 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
339 """Compute the flux fields that will go into the output catalog.
343 inputData : `numpy.ndarray`
344 The input data to compute fluxes for.
348 fluxes : `dict` [`str`, `numpy.ndarray`]
349 The values that will go into the flux
and fluxErr fields
in the
353 for item
in self.
config.mag_column_list:
354 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
355 if len(self.
config.mag_err_column_map) > 0:
356 for err_key
in self.
config.mag_err_column_map.keys():
357 error_col_name = self.
config.mag_err_column_map[err_key]
360 fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
361 inputData[err_key].copy())*1e9
362 result[err_key+
'_fluxErr'] = fluxErr
366 """Set proper motion fields in a record of an indexed catalog.
368 The proper motions are read from the specified columns,
369 scaled appropriately,
and installed
in the appropriate
370 columns of the output.
375 Row
from indexed catalog to modify.
376 row : structured `numpy.array`
377 Row
from catalog being converted.
379 if self.
config.pm_ra_name
is None:
381 radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
382 record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
383 record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
385 if self.
config.pm_ra_err_name
is not None:
386 record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
387 record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
390 """Set the parallax fields in a record of a refcat.
392 if self.
config.parallax_name
is None:
394 scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
395 record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
396 record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
399 """Convert an epoch in native format to TAI MJD (a float).
401 return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
402 scale=self.
config.epoch_scale).tai.mjd
405 """Set the off-diagonal position covariance in a record of an indexed
408 There is no generic way to determine covariance. Override this method
409 in a subclass specialized
for your dataset.
414 Row
from indexed catalog to modify.
415 row : structured `numpy.array`
416 Row
from catalog being converted.
418 raise NotImplementedError(
"There is no default method for setting the covariance. Override this "
419 "method in a subclass specialized for your dataset.")
422 """Set extra data fields in a record of an indexed catalog.
427 Row from indexed catalog to modify.
428 row : structured `numpy.array`
429 Row
from catalog being converted.
431 for extra_col
in self.
config.extra_col_names:
432 value = row[extra_col]
440 if isinstance(value, np.str_):
442 record.set(self.
key_map[extra_col], value)
445 """Fill a record in an indexed catalog to be persisted.
450 Row from indexed catalog to modify.
451 row : structured `numpy.array`
452 Row
from catalog being converted.
457 if self.
config.full_position_information:
465 """Special-case convert manager to deal with Gaia fluxes.
476 def gaiaFluxToFlux(flux, zeroPoint):
477 """Equations 5.19 and 5.30 from the Gaia calibration document define the
478 conversion from Gaia electron/second fluxes to AB magnitudes.
479 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
481 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
483 result[flux == 0] = 0
488 with np.errstate(invalid=
'ignore', divide=
'ignore'):
491 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
492 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
493 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
495 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
496 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
497 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']
502 """Set the off-diagonal position covariance in a record of an indexed
505 Convert the Gaia coordinate correlations into covariances.
510 Row from indexed catalog to modify.
511 row : structured `numpy.array`
512 Row
from catalog being converted.
514 inputParams = ['ra',
'dec',
'parallax',
'pmra',
'pmdec']
515 outputParams = [
'coord_ra',
'coord_dec',
'parallax',
'pm_ra',
'pm_dec']
521 reorder = [0, 1, 4, 2, 3]
528 j_error = row[f
'{inputParams[j]}_error'] * inputUnits[j]
529 i_error = row[f
'{inputParams[i]}_error'] * inputUnits[i]
530 ij_corr = row[f
'{inputParams[j]}_{inputParams[i]}_corr']
531 cov = (i_error * j_error * ij_corr).to_value(self.
outputUnit)
535 a = (i
if (reorder[i] < reorder[j])
else j)
536 b = (j
if (reorder[i] < reorder[j])
else i)
538 record.set(self.
key_map[f
'{outputParams[a]}_{outputParams[b]}_Cov'], cov)
542 """Special-case convert manager for Gaia XP spectrophotometry catalogs,
543 that have fluxes/flux errors, instead of magnitudes/mag errors. The input
544 flux and error values are
in units of W/Hz/(m^2) (Gaia Collaboration, Montegriffo et al. 2022).
545 The the flux
and fluxErr fields
in the output catalog have units of nJy.
550 for item
in self.
config.mag_column_list:
552 error_col_name = item.replace(
"_flux_",
"_flux_error_")
554 result[item +
"_flux"] = (
555 inputData[item] * u.Watt / u.Hz / u.meter / u.meter
557 result[item +
"_fluxErr"] = (
558 inputData[error_col_name] * u.Watt / u.Hz / u.meter / u.meter
A class used as a handle to a particular field in a table.
Defines the fields and offsets for a table.
Record class that must contain a unique ID field and a celestial coordinate field.
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Point in an unspecified spherical coordinate system.
This static class includes a variety of methods for interacting with the the logging module.
_setCoordinateCovariance(self, record, row)
__init__(self, *args, **kwargs)
_getFluxes(self, inputData)
_setParallax(self, record, row)
computeCoord(row, ra_name, dec_name)
_setFlags(self, record, row)
_setProperMotion(self, record, row)
_fillRecord(self, record, row)
_convertOneFile(self, filename, fileLocks)
_epochToMjdTai(self, nativeEpoch)
__init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
_setExtra(self, record, row)
_setCoordinateCovariance(self, record, row)
_getCoordErr(self, inputData)
_getFluxes(self, inputData)
_doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
_setIds(self, inputData, catalog)
getCatalog(self, pixelId, schema, nNewElements)
daf::base::PropertySet * set