22__all__ = [
"ConvertRefcatManager",
"ConvertGaiaManager"]
24from ctypes
import c_int
30import astropy.units
as u
41COUNTER = multiprocessing.Value(c_int, 0)
43FILE_PROGRESS = multiprocessing.Value(c_int, 0)
47 """Placeholder for ConfigurableField validation; refcat convert is
48 configured by the parent convert Task.
55 Convert a reference catalog from external files into the LSST HTM sharded
56 format, using a multiprocessing Pool to speed up the work.
60 filenames : `dict` [`int`, `str`]
61 The HTM pixel id
and filenames to convert the catalog into.
63 The Task configuration holding the field names.
64 file_reader : `lsst.pipe.base.Task`
65 The file reader to use to load the files.
67 The
class used
to compute the HTM pixel per coordinate.
69 The schema of the output catalog.
71 The mapping
from output field names to keys
in the Schema.
72 htmRange : `tuple` [`int`]
73 The start
and end HTM pixel ids.
74 addRefCatMetadata : callable
75 A function called to add extra metadata to each output Catalog.
77 The log to send messages to.
79 _flags = ['photometric',
'resolved',
'variable']
80 _DefaultName =
'convertRefcatManager'
81 ConfigClass = ConvertRefcatManagerConfig
83 def __init__(self, filenames, config, file_reader, indexer,
84 schema, key_map, htmRange, addRefCatMetadata, log):
94 if self.
config.coord_err_unit
is not None:
98 def run(self, inputFiles):
99 """Index a set of input files from a reference catalog, and write the
100 output to the appropriate filenames, in parallel.
105 A list of file paths to read data
from.
109 output : `dict` [`int`, `str`]
110 The htm ids
and the filenames that were written to.
112 global COUNTER, FILE_PROGRESS
115 with multiprocessing.Manager()
as manager:
117 FILE_PROGRESS.value = 0
118 fileLocks = manager.dict()
121 fileLocks[i] = manager.Lock()
122 self.
log.info(
"File locks created.")
123 with multiprocessing.Pool(self.
config.n_processes)
as pool:
124 result = pool.starmap(self.
_convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
125 return {id: self.
filenames[id]
for item
in result
for id
in item}
127 def _convertOneFile(self, filename, fileLocks):
128 """Read and process one file, and write its records to the correct
129 indexed files, while handling exceptions
in a useful way so that they
130 don
't get swallowed by the multiprocess pool.
136 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
137 A Lock for each HTM pixel; each pixel gets one file written,
and
138 we need to block when one process
is accessing that file.
142 pixels, files : `list` [`int`]
143 The pixel ids that were written to.
149 matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
150 inputData[self.
config.dec_name])
151 pixel_ids =
set(matchedPixels)
152 for pixelId
in pixel_ids:
153 with fileLocks[pixelId]:
154 self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
155 with FILE_PROGRESS.get_lock():
156 oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles
157 FILE_PROGRESS.value += 1
158 percent = 100 * FILE_PROGRESS.value / self.
nInputFiles
160 if np.floor(percent) - np.floor(oldPercent) >= 1:
161 self.
log.info(
"Completed %d / %d files: %d %% complete ",
167 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
168 """Process one HTM pixel, appending to an existing catalog or creating
169 a new catalog, as needed.
173 inputData : `numpy.ndarray`
174 The data
from one input file.
175 matchedPixels : `numpy.ndarray`
176 The row-matched pixel indexes corresponding to ``inputData``.
178 The pixel index we are currently processing.
179 fluxes : `dict` [`str`, `numpy.ndarray`]
180 The values that will go into the flux
and fluxErr fields
in the
182 coordErr : `dict` [`str`, `numpy.ndarray`]
183 The values that will go into the coord_raErr, coord_decErr,
and
184 coord_ra_dec_Cov fields
in the output catalog (
in radians).
186 idx = np.where(matchedPixels == pixelId)[0]
188 for outputRow, inputRow
in zip(catalog[-len(idx):], inputData[idx]):
192 with COUNTER.get_lock():
193 self.
_setIds(inputData[idx], catalog)
196 for name, array
in fluxes.items():
197 catalog[self.
key_map[name]][-len(idx):] = array[idx]
200 for name, array
in coordErr.items():
201 catalog[name][-len(idx):] = array[idx]
203 catalog.writeFits(self.
filenames[pixelId])
205 def _setIds(self, inputData, catalog):
206 """Fill the `id` field of catalog with a running index, filling the
207 last values up to the length of ``inputData``.
209 Fill with `self.
config.id_name`
if specified, otherwise use the
210 global running counter value.
214 inputData : `numpy.ndarray`
215 The input data that
is being processed.
217 The output catalog to fill the ids.
220 size = len(inputData)
222 catalog[
'id'][-size:] = inputData[self.
config.id_name]
224 idEnd = COUNTER.value + size
225 catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
226 COUNTER.value = idEnd
229 """Get a catalog from disk or create it if it doesn't exist.
234 Identifier for catalog to retrieve
236 Schema to use
in catalog creation it does
not exist.
238 The number of new elements that will be added to the catalog,
239 so space can be preallocated.
244 The new
or read-
and-resized catalog specified by `dataId`.
247 if os.path.isfile(self.
filenames[pixelId]):
248 catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
249 catalog.resize(len(catalog) + nNewElements)
250 return catalog.copy(deep=
True)
252 catalog.resize(nNewElements)
258 """Create an ICRS coord. from a row of a catalog being converted.
262 row : `numpy.ndarray`
263 Row from catalog being converted.
265 Name of RA key
in catalog being converted.
267 Name of Dec key
in catalog being converted.
276 def _getCoordErr(self, inputData, ):
277 """Compute the ra/dec error fields that will go into the output catalog.
281 inputData : `numpy.ndarray`
282 The input data to compute fluxes for.
286 coordErr : `dict` [`str`, `numpy.ndarray`]
287 The values that will go into the coord_raErr, coord_decErr, fields
288 in the output catalog (
in radians).
292 This does
not currently handle the ra/dec covariance field,
293 ``coord_ra_dec_Cov``. That field may require extra work,
as its units
294 may be more complicated
in external catalogs.
297 if hasattr(self,
"coord_err_unit"):
298 result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
300 result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
304 def _setFlags(self, record, row):
305 """Set flags in an output record.
310 Row from indexed catalog to modify.
311 row : `numpy.ndarray`
312 Row
from catalog being converted.
314 names = record.schema.getNames()
317 attr_name =
'is_{}_name'.format(flag)
318 record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
320 def _getFluxes(self, inputData):
321 """Compute the flux fields that will go into the output catalog.
325 inputData : `numpy.ndarray`
326 The input data to compute fluxes for.
330 fluxes : `dict` [`str`, `numpy.ndarray`]
331 The values that will go into the flux
and fluxErr fields
in the
335 for item
in self.
config.mag_column_list:
336 result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
337 if len(self.
config.mag_err_column_map) > 0:
338 for err_key
in self.
config.mag_err_column_map.keys():
339 error_col_name = self.
config.mag_err_column_map[err_key]
342 fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
343 inputData[err_key].copy())*1e9
344 result[err_key+
'_fluxErr'] = fluxErr
347 def _setProperMotion(self, record, row):
348 """Set proper motion fields in a record of an indexed catalog.
350 The proper motions are read from the specified columns,
351 scaled appropriately,
and installed
in the appropriate
352 columns of the output.
357 Row
from indexed catalog to modify.
358 row : structured `numpy.array`
359 Row
from catalog being converted.
361 if self.
config.pm_ra_name
is None:
363 radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
364 record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
365 record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
367 if self.
config.pm_ra_err_name
is not None:
368 record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
369 record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
371 def _setParallax(self, record, row):
372 """Set the parallax fields in a record of a refcat.
374 if self.
config.parallax_name
is None:
376 scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
377 record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
378 record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
380 def _epochToMjdTai(self, nativeEpoch):
381 """Convert an epoch in native format to TAI MJD (a float).
383 return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
384 scale=self.
config.epoch_scale).tai.mjd
386 def _setExtra(self, record, row):
387 """Set extra data fields in a record of an indexed catalog.
392 Row from indexed catalog to modify.
393 row : structured `numpy.array`
394 Row
from catalog being converted.
396 for extra_col
in self.
config.extra_col_names:
397 value = row[extra_col]
405 if isinstance(value, np.str_):
407 record.set(self.
key_map[extra_col], value)
409 def _fillRecord(self, record, row):
410 """Fill a record in an indexed catalog to be persisted.
415 Row from indexed catalog to modify.
416 row : structured `numpy.array`
417 Row
from catalog being converted.
428 """Special-case convert manager to deal with Gaia fluxes.
430 def _getFluxes(self, input):
433 def gaiaFluxToFlux(flux, zeroPoint):
434 """Equations 5.19 and 5.30 from the Gaia calibration document define the
435 conversion from Gaia electron/second fluxes to AB magnitudes.
436 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
438 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
440 result[flux == 0] = 0
445 with np.errstate(invalid=
'ignore', divide=
'ignore'):
448 result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
449 result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
450 result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
452 result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
453 result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
454 result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']
A class used as a handle to a particular field in a table.
Defines the fields and offsets for a table.
Record class that must contain a unique ID field and a celestial coordinate field.
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Point in an unspecified spherical coordinate system.
This static class includes a variety of methods for interacting with the the logging module.
def _setFlags(self, record, row)
def _epochToMjdTai(self, nativeEpoch)
def _setExtra(self, record, row)
def _setIds(self, inputData, catalog)
def _fillRecord(self, record, row)
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
def computeCoord(row, ra_name, dec_name)
def _getFluxes(self, inputData)
def _setProperMotion(self, record, row)
def _getCoordErr(self, inputData)
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
def _setParallax(self, record, row)
def getCatalog(self, pixelId, schema, nNewElements)
def _convertOneFile(self, filename, fileLocks)
daf::base::PropertySet * set