LSST Applications g0f08755f38+82efc23009,g12f32b3c4e+e7bdf1200e,g1653933729+a8ce1bb630,g1a0ca8cf93+50eff2b06f,g28da252d5a+52db39f6a5,g2bbee38e9b+37c5a29d61,g2bc492864f+37c5a29d61,g2cdde0e794+c05ff076ad,g3156d2b45e+41e33cbcdc,g347aa1857d+37c5a29d61,g35bb328faa+a8ce1bb630,g3a166c0a6a+37c5a29d61,g3e281a1b8c+fb992f5633,g414038480c+7f03dfc1b0,g41af890bb2+11b950c980,g5fbc88fb19+17cd334064,g6b1c1869cb+12dd639c9a,g781aacb6e4+a8ce1bb630,g80478fca09+72e9651da0,g82479be7b0+04c31367b4,g858d7b2824+82efc23009,g9125e01d80+a8ce1bb630,g9726552aa6+8047e3811d,ga5288a1d22+e532dc0a0b,gae0086650b+a8ce1bb630,gb58c049af0+d64f4d3760,gc28159a63d+37c5a29d61,gcf0d15dbbd+2acd6d4d48,gd7358e8bfb+778a810b6e,gda3e153d99+82efc23009,gda6a2b7d83+2acd6d4d48,gdaeeff99f8+1711a396fd,ge2409df99d+6b12de1076,ge79ae78c31+37c5a29d61,gf0baf85859+d0a5978c5a,gf3967379c6+4954f8c433,gfb92a5be7c+82efc23009,gfec2e1e490+2aaed99252,w.2024.46
LSST Data Management Base Package
Loading...
Searching...
No Matches
convertRefcatManager.py
Go to the documentation of this file.
1# This file is part of meas_algorithms.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["ConvertRefcatManager", "ConvertGaiaManager", "ConvertGaiaXpManager"]
23
24from ctypes import c_int
25import os.path
26import itertools
27import multiprocessing
28import time
29
30import astropy.time
31import astropy.units as u
32import numpy as np
33
34import lsst.sphgeom
35import lsst.afw.table as afwTable
36from lsst.afw.image import fluxErrFromABMagErr
37import lsst.pex.config as pexConfig
38
39
40# global shared counter to keep track of source ids
41# (multiprocess sharing is most easily done with a global)
42COUNTER = multiprocessing.Value(c_int, 0)
43# global shared counter to keep track of number of files processed.
44FILE_PROGRESS = multiprocessing.Value(c_int, 0)
45
46
47class ConvertRefcatManagerConfig(pexConfig.Config):
48 """Placeholder for ConfigurableField validation; refcat convert is
49 configured by the parent convert Task.
50 """
51 pass
52
53
55 """
56 Convert a reference catalog from external files into the LSST HTM sharded
57 format, using a multiprocessing Pool to speed up the work.
58
59 Parameters
60 ----------
61 filenames : `dict` [`int`, `str`]
62 The HTM pixel id and filenames to convert the catalog into.
63 config : `lsst.meas.algorithms.ConvertReferenceCatalogConfig`
64 The Task configuration holding the field names.
65 file_reader : `lsst.pipe.base.Task`
66 The file reader to use to load the files.
67 indexer : `lsst.meas.algorithms.HtmIndexer`
68 The class used to compute the HTM pixel per coordinate.
69 schema : `lsst.afw.table.Schema`
70 The schema of the output catalog.
71 key_map : `dict` [`str`, `lsst.afw.table.Key`]
72 The mapping from output field names to keys in the Schema.
73 htmRange : `tuple` [`int`]
74 The start and end HTM pixel ids.
75 addRefCatMetadata : callable
76 A function called to add extra metadata to each output Catalog.
77 log : `lsst.log.Log` or `logging.Logger`
78 The log to send messages to.
79 """
80 _flags = ['photometric', 'resolved', 'variable']
81 _DefaultName = 'convertRefcatManager'
82 ConfigClass = ConvertRefcatManagerConfig
83
84 def __init__(self, filenames, config, file_reader, indexer,
85 schema, key_map, htmRange, addRefCatMetadata, log):
86 self.filenames = filenames
87 self.config = config
88 self.file_reader = file_reader
89 self.indexer = indexer
90 self.schema = schema
91 self.key_map = key_map
92 self.htmRange = htmRange
93 self.addRefCatMetadata = addRefCatMetadata
94 self.log = log
95
96 if self.config.coord_err_unit is not None:
97 # cache this to speed up coordinate conversions.
98 self.coord_err_unit = u.Unit(self.config.coord_err_unit)
99
100 def run(self, inputFiles):
101 """Index a set of input files from a reference catalog, and write the
102 output to the appropriate filenames, in parallel.
103
104 Parameters
105 ----------
106 inputFiles : `list`
107 A list of file paths to read data from.
108
109 Returns
110 -------
111 output : `dict` [`int`, `str`]
112 The htm ids and the filenames that were written to.
113 """
114 global COUNTER, FILE_PROGRESS
115 self.nInputFiles = len(inputFiles)
116
117 with multiprocessing.Manager() as manager:
118 COUNTER.value = 0
119 FILE_PROGRESS.value = 0
120 fileLocks = manager.dict()
121 self.log.info("Creating %s file locks.", self.htmRange[1] - self.htmRange[0])
122 for i in range(self.htmRange[0], self.htmRange[1]):
123 fileLocks[i] = manager.Lock()
124 self.log.info("File locks created.")
125
126 start_time = time.perf_counter()
127 with multiprocessing.Pool(self.config.n_processes) as pool:
128 result = pool.starmap(self._convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
129 end_time = time.perf_counter()
130 self.log.info("Finished writing files. Elapsed time: %.2f seconds", end_time-start_time)
131
132 return {id: self.filenames[id] for item in result for id in item}
133
134 def _convertOneFile(self, filename, fileLocks):
135 """Read and process one file, and write its records to the correct
136 indexed files, while handling exceptions in a useful way so that they
137 don't get swallowed by the multiprocess pool.
138
139 Parameters
140 ----------
141 filename : `str`
142 The file to process.
143 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
144 A Lock for each HTM pixel; each pixel gets one file written, and
145 we need to block when one process is accessing that file.
146
147 Returns
148 -------
149 pixels, files : `list` [`int`]
150 The pixel ids that were written to.
151 """
152 global FILE_PROGRESS
153 self.log.debug("Processing: %s", filename)
154 inputData = self.file_reader.run(filename)
155 try:
156 fluxes = self._getFluxes(inputData)
157 coordErr = self._getCoordErr(inputData)
158 matchedPixels = self.indexer.indexPoints(inputData[self.config.ra_name],
159 inputData[self.config.dec_name])
160 except Exception:
161 self.log.error("Failure preparing data for: %s", filename)
162 raise
163
164 pixel_ids = set(matchedPixels)
165 for pixelId in pixel_ids:
166 with fileLocks[pixelId]:
167 try:
168 self._doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
169 except Exception:
170 self.log.error("Failure processing filename: %s, pixelId: %s",
171 filename, pixelId)
172 raise
173 with FILE_PROGRESS.get_lock():
174 oldPercent = 100 * FILE_PROGRESS.value / self.nInputFiles
175 FILE_PROGRESS.value += 1
176 percent = 100 * FILE_PROGRESS.value / self.nInputFiles
177 # only log each "new percent"
178 if np.floor(percent) - np.floor(oldPercent) >= 1:
179 self.log.info("Completed %d / %d files: %d %% complete ",
180 FILE_PROGRESS.value,
181 self.nInputFiles,
182 percent)
183 return pixel_ids
184
185 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
186 """Process one HTM pixel, appending to an existing catalog or creating
187 a new catalog, as needed.
188
189 Parameters
190 ----------
191 inputData : `numpy.ndarray`
192 The data from one input file.
193 matchedPixels : `numpy.ndarray`
194 The row-matched pixel indexes corresponding to ``inputData``.
195 pixelId : `int`
196 The pixel index we are currently processing.
197 fluxes : `dict` [`str`, `numpy.ndarray`]
198 The values that will go into the flux and fluxErr fields in the
199 output catalog.
200 coordErr : `dict` [`str`, `numpy.ndarray`]
201 The values that will go into the coord_raErr, coord_decErr, and
202 coord_ra_dec_Cov fields in the output catalog (in radians).
203 """
204 idx = np.where(matchedPixels == pixelId)[0]
205 catalog = self.getCatalog(pixelId, self.schema, len(idx))
206 for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
207 self._fillRecord(outputRow, inputRow)
208
209 global COUNTER
210 with COUNTER.get_lock():
211 self._setIds(inputData[idx], catalog)
212
213 # set fluxes from the pre-computed array
214 for name, array in fluxes.items():
215 catalog[self.key_map[name]][-len(idx):] = array[idx]
216
217 # set coordinate errors from the pre-computed array
218 for name, array in coordErr.items():
219 catalog[name][-len(idx):] = array[idx]
220
221 catalog.writeFits(self.filenames[pixelId])
222
223 def _setIds(self, inputData, catalog):
224 """Fill the `id` field of catalog with a running index, filling the
225 last values up to the length of ``inputData``.
226
227 Fill with `self.config.id_name` if specified, otherwise use the
228 global running counter value.
229
230 Parameters
231 ----------
232 inputData : `numpy.ndarray`
233 The input data that is being processed.
234 catalog : `lsst.afw.table.SimpleCatalog`
235 The output catalog to fill the ids.
236 """
237 global COUNTER
238 size = len(inputData)
239 if self.config.id_name:
240 catalog['id'][-size:] = inputData[self.config.id_name]
241 else:
242 idEnd = COUNTER.value + size
243 catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
244 COUNTER.value = idEnd
245
246 def getCatalog(self, pixelId, schema, nNewElements):
247 """Get a catalog from disk or create it if it doesn't exist.
248
249 Parameters
250 ----------
251 pixelId : `dict`
252 Identifier for catalog to retrieve
253 schema : `lsst.afw.table.Schema`
254 Schema to use in catalog creation it does not exist.
255 nNewElements : `int`
256 The number of new elements that will be added to the catalog,
257 so space can be preallocated.
258
259 Returns
260 -------
261 catalog : `lsst.afw.table.SimpleCatalog`
262 The new or read-and-resized catalog specified by `dataId`.
263 """
264 # This is safe, because we lock on this file before getCatalog is called.
265 if os.path.isfile(self.filenames[pixelId]):
266 catalog = afwTable.SimpleCatalog.readFits(self.filenames[pixelId])
267 catalog.resize(len(catalog) + nNewElements)
268 return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
269 catalog = afwTable.SimpleCatalog(schema)
270 catalog.resize(nNewElements)
271 self.addRefCatMetadata(catalog)
272 return catalog
273
274 @staticmethod
275 def computeCoord(row, ra_name, dec_name):
276 """Create an ICRS coord. from a row of a catalog being converted.
277
278 Parameters
279 ----------
280 row : `numpy.ndarray`
281 Row from catalog being converted.
282 ra_name : `str`
283 Name of RA key in catalog being converted.
284 dec_name : `str`
285 Name of Dec key in catalog being converted.
286
287 Returns
288 -------
289 coord : `lsst.geom.SpherePoint`
290 ICRS coordinate.
291 """
292 return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
293
294 def _getCoordErr(self, inputData, ):
295 """Compute the ra/dec error fields that will go into the output catalog.
296
297 Parameters
298 ----------
299 inputData : `numpy.ndarray`
300 The input data to compute fluxes for.
301
302 Returns
303 -------
304 coordErr : `dict` [`str`, `numpy.ndarray`]
305 The values that will go into the coord_raErr, coord_decErr, fields
306 in the output catalog (in radians).
307
308 Notes
309 -----
310 This does not handle the ra/dec covariance field,
311 ``coord_ra_coord_dec_Cov``. That field is handled in
312 `_setCoordinateCovariance`.
313 """
314 result = {}
315 if hasattr(self, "coord_err_unit"):
316 result['coord_raErr'] = u.Quantity(inputData[self.config.ra_err_name],
317 self.coord_err_unit).to_value(u.radian)
318 result['coord_decErr'] = u.Quantity(inputData[self.config.dec_err_name],
319 self.coord_err_unit).to_value(u.radian)
320 return result
321
322 def _setFlags(self, record, row):
323 """Set flags in an output record.
324
325 Parameters
326 ----------
327 record : `lsst.afw.table.SimpleRecord`
328 Row from indexed catalog to modify.
329 row : `numpy.ndarray`
330 Row from catalog being converted.
331 """
332 names = record.schema.getNames()
333 for flag in self._flags:
334 if flag in names:
335 attr_name = 'is_{}_name'.format(flag)
336 record.set(self.key_map[flag], bool(row[getattr(self.config, attr_name)]))
337
338 def _getFluxes(self, inputData):
339 """Compute the flux fields that will go into the output catalog.
340
341 Parameters
342 ----------
343 inputData : `numpy.ndarray`
344 The input data to compute fluxes for.
345
346 Returns
347 -------
348 fluxes : `dict` [`str`, `numpy.ndarray`]
349 The values that will go into the flux and fluxErr fields in the
350 output catalog.
351 """
352 result = {}
353 for item in self.config.mag_column_list:
354 result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
355 if len(self.config.mag_err_column_map) > 0:
356 for err_key in self.config.mag_err_column_map.keys():
357 error_col_name = self.config.mag_err_column_map[err_key]
358 # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
359 # NOTE: copy the arrays because the numpy strides may not be useable by C++.
360 fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
361 inputData[err_key].copy())*1e9
362 result[err_key+'_fluxErr'] = fluxErr
363 return result
364
365 def _setProperMotion(self, record, row):
366 """Set proper motion fields in a record of an indexed catalog.
367
368 The proper motions are read from the specified columns,
369 scaled appropriately, and installed in the appropriate
370 columns of the output.
371
372 Parameters
373 ----------
374 record : `lsst.afw.table.SimpleRecord`
375 Row from indexed catalog to modify.
376 row : structured `numpy.array`
377 Row from catalog being converted.
378 """
379 if self.config.pm_ra_name is None: # ConvertReferenceCatalogConfig.validate ensures all or none
380 return
381 radPerOriginal = np.radians(self.config.pm_scale)/(3600*1000)
382 record.set(self.key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
383 record.set(self.key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
384 record.set(self.key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
385 if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
386 record.set(self.key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
387 record.set(self.key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
388
389 def _setParallax(self, record, row):
390 """Set the parallax fields in a record of a refcat.
391 """
392 if self.config.parallax_name is None:
393 return
394 scale = self.config.parallax_scale*lsst.geom.milliarcseconds
395 record.set(self.key_map['parallax'], row[self.config.parallax_name]*scale)
396 record.set(self.key_map['parallaxErr'], row[self.config.parallax_err_name]*scale)
397
398 def _epochToMjdTai(self, nativeEpoch):
399 """Convert an epoch in native format to TAI MJD (a float).
400 """
401 return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
402 scale=self.config.epoch_scale).tai.mjd
403
404 def _setCoordinateCovariance(self, record, row):
405 """Set the off-diagonal position covariance in a record of an indexed
406 catalog.
407
408 There is no generic way to determine covariance. Override this method
409 in a subclass specialized for your dataset.
410
411 Parameters
412 ----------
413 record : `lsst.afw.table.SimpleRecord`
414 Row from indexed catalog to modify.
415 row : structured `numpy.array`
416 Row from catalog being converted.
417 """
418 raise NotImplementedError("There is no default method for setting the covariance. Override this "
419 "method in a subclass specialized for your dataset.")
420
421 def _setExtra(self, record, row):
422 """Set extra data fields in a record of an indexed catalog.
423
424 Parameters
425 ----------
426 record : `lsst.afw.table.SimpleRecord`
427 Row from indexed catalog to modify.
428 row : structured `numpy.array`
429 Row from catalog being converted.
430 """
431 for extra_col in self.config.extra_col_names:
432 value = row[extra_col]
433 # If data read from a text file contains string like entires,
434 # numpy stores this as its own internal type, a numpy.str_
435 # object. This seems to be a consequence of how numpy stores
436 # string like objects in fixed column arrays. This checks
437 # if any of the values to be added to the catalog are numpy
438 # string types, and if they are, casts them to a python string
439 # which is what the python c++ records expect
440 if isinstance(value, np.str_):
441 value = str(value)
442 record.set(self.key_map[extra_col], value)
443
444 def _fillRecord(self, record, row):
445 """Fill a record in an indexed catalog to be persisted.
446
447 Parameters
448 ----------
449 record : `lsst.afw.table.SimpleRecord`
450 Row from indexed catalog to modify.
451 row : structured `numpy.array`
452 Row from catalog being converted.
453 """
454 record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
455
456 self._setFlags(record, row)
457 if self.config.full_position_information:
458 self._setProperMotion(record, row)
459 self._setParallax(record, row)
460 self._setCoordinateCovariance(record, row)
461 self._setExtra(record, row)
462
463
465 """Special-case convert manager to deal with Gaia fluxes.
466 """
467 def __init__(self, *args, **kwargs):
468 super().__init__(*args, **kwargs)
469 self.properMotionUnit = self.config.pm_scale * u.milliarcsecond
470 self.parallaxUnit = self.config.parallax_scale * u.milliarcsecond
471 self.outputUnit = u.radian * u.radian
472
473 def _getFluxes(self, input):
474 result = {}
475
476 def gaiaFluxToFlux(flux, zeroPoint):
477 """Equations 5.19 and 5.30 from the Gaia calibration document define the
478 conversion from Gaia electron/second fluxes to AB magnitudes.
479 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
480 """
481 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
482 # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
483 result[flux == 0] = 0
484 return result
485
486 # Some fluxes are 0, so log10(flux) can give warnings. We handle the
487 # zeros explicitly, so they warnings are irrelevant.
488 with np.errstate(invalid='ignore', divide='ignore'):
489 # The constants below come from table 5.3 in this document;
490 # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
491 result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
492 result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
493 result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
494
495 result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
496 result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
497 result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
498
499 return result
500
501 def _setCoordinateCovariance(self, record, row):
502 """Set the off-diagonal position covariance in a record of an indexed
503 catalog.
504
505 Convert the Gaia coordinate correlations into covariances.
506
507 Parameters
508 ----------
509 record : `lsst.afw.table.SimpleRecord`
510 Row from indexed catalog to modify.
511 row : structured `numpy.array`
512 Row from catalog being converted.
513 """
514 inputParams = ['ra', 'dec', 'parallax', 'pmra', 'pmdec']
515 outputParams = ['coord_ra', 'coord_dec', 'parallax', 'pm_ra', 'pm_dec']
516 # The Gaia standard for naming is to order the parameters as
517 # (coordinates, parallax, proper motion), so they need to be reordered
518 # as (coordinates, proper motion, parallax) to match the order used
519 # in LSST code (i.g. 'coord_parallax_pm_ra_Cov' becomes
520 # 'coord_pm_ra_parallax_Cov').
521 reorder = [0, 1, 4, 2, 3]
522
523 inputUnits = [self.coord_err_unit, self.coord_err_unit, self.parallaxUnit, self.properMotionUnit,
524 self.properMotionUnit]
525
526 for i in range(5):
527 for j in range(i):
528 j_error = row[f'{inputParams[j]}_error'] * inputUnits[j]
529 i_error = row[f'{inputParams[i]}_error'] * inputUnits[i]
530 ij_corr = row[f'{inputParams[j]}_{inputParams[i]}_corr']
531 cov = (i_error * j_error * ij_corr).to_value(self.outputUnit)
532
533 # Switch from order of Gaia parallax and proper motion
534 # parameters to the desired schema:
535 a = (i if (reorder[i] < reorder[j]) else j)
536 b = (j if (reorder[i] < reorder[j]) else i)
537
538 record.set(self.key_map[f'{outputParams[a]}_{outputParams[b]}_Cov'], cov)
539
540
542 """Special-case convert manager for Gaia XP spectrophotometry catalogs,
543 that have fluxes/flux errors, instead of magnitudes/mag errors. The input
544 flux and error values are in units of W/Hz/(m^2) (Gaia Collaboration, Montegriffo et al. 2022).
545 The the flux and fluxErr fields in the output catalog have units of nJy.
546 """
547
548 def _getFluxes(self, inputData):
549 result = {}
550 for item in self.config.mag_column_list:
551
552 error_col_name = item.replace("_flux_", "_flux_error_")
553
554 result[item + "_flux"] = (
555 inputData[item] * u.Watt / u.Hz / u.meter / u.meter
556 ).to_value(u.nJy)
557 result[item + "_fluxErr"] = (
558 inputData[error_col_name] * u.Watt / u.Hz / u.meter / u.meter
559 ).to_value(u.nJy)
560
561 return result
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Point in an unspecified spherical coordinate system.
Definition SpherePoint.h:57
__init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
_doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)