LSST Applications g013ef56533+7c9321ec0f,g042eb84c57+c6cfa41bc3,g199a45376c+0ba108daf9,g1fd858c14a+fcad0d0313,g210f2d0738+c0f94c6586,g262e1987ae+a7e710680e,g29ae962dfc+fb55f2edb0,g2ac17093b6+61d6563b1e,g2b1d02342f+df6f932764,g2cef7863aa+aef1011c0b,g2f7ad74990+c0f94c6586,g35bb328faa+8c5ae1fdc5,g3fd5ace14f+53cf87ae69,g47891489e3+4316d04fff,g511e8cfd20+baa56acf6c,g53246c7159+8c5ae1fdc5,g54cd7ddccb+fd7ad03fde,g64539dfbff+c0f94c6586,g67b6fd64d1+4316d04fff,g67fd3c3899+c0f94c6586,g6985122a63+4316d04fff,g74acd417e5+ca833bee28,g786e29fd12+668abc6043,g81db2e9a8d+b2ec8e584f,g87389fa792+8856018cbb,g89139ef638+4316d04fff,g8d7436a09f+0a24083b20,g8ea07a8fe4+760ca7c3fc,g90f42f885a+033b1d468d,g97be763408+11eb8fd5b8,gbf99507273+8c5ae1fdc5,gcdda8b9158+e4c84c9d5c,gce8aa8abaa+8c5ae1fdc5,gd7ef33dd92+4316d04fff,gdab6d2f7ff+ca833bee28,ge410e46f29+4316d04fff,geaed405ab2+c4bbc419c6,gf9a733ac38+8c5ae1fdc5,w.2025.40
LSST Data Management Base Package
Loading...
Searching...
No Matches
convertRefcatManager.py
Go to the documentation of this file.
1# This file is part of meas_algorithms.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["ConvertRefcatManager", "ConvertGaiaManager", "ConvertGaiaXpManager"]
23
24from ctypes import c_int
25import os.path
26import itertools
27import multiprocessing
28import time
29
30import astropy.time
31import astropy.units as u
32import numpy as np
33
34import lsst.sphgeom
35import lsst.afw.table as afwTable
36from lsst.afw.image import fluxErrFromABMagErr
37import lsst.pex.config as pexConfig
38
39
40# global shared counter to keep track of source ids
41# (multiprocess sharing is most easily done with a global)
42COUNTER = multiprocessing.Value(c_int, 0)
43# global shared counter to keep track of number of files processed.
44FILE_PROGRESS = multiprocessing.Value(c_int, 0)
45
46
47class ConvertRefcatManagerConfig(pexConfig.Config):
48 """Placeholder for ConfigurableField validation; refcat convert is
49 configured by the parent convert Task.
50 """
51 pass
52
53
55 """
56 Convert a reference catalog from external files into the LSST HTM sharded
57 format, using a multiprocessing Pool to speed up the work.
58
59 Parameters
60 ----------
61 filenames : `dict` [`int`, `str`]
62 The HTM pixel id and filenames to convert the catalog into.
63 config : `lsst.meas.algorithms.ConvertReferenceCatalogConfig`
64 The Task configuration holding the field names.
65 file_reader : `lsst.pipe.base.Task`
66 The file reader to use to load the files.
67 indexer : `lsst.meas.algorithms.HtmIndexer`
68 The class used to compute the HTM pixel per coordinate.
69 schema : `lsst.afw.table.Schema`
70 The schema of the output catalog.
71 key_map : `dict` [`str`, `lsst.afw.table.Key`]
72 The mapping from output field names to keys in the Schema.
73 htmRange : `tuple` [`int`]
74 The start and end HTM pixel ids.
75 addRefCatMetadata : callable
76 A function called to add extra metadata to each output Catalog.
77 log : `lsst.log.Log` or `logging.Logger`
78 The log to send messages to.
79 """
80 _flags = ['photometric', 'resolved', 'variable']
81 _DefaultName = 'convertRefcatManager'
82 ConfigClass = ConvertRefcatManagerConfig
83
84 def __init__(self, filenames, config, file_reader, indexer,
85 schema, key_map, htmRange, addRefCatMetadata, log):
86 self.filenames = filenames
87 self.config = config
88 self.file_reader = file_reader
89 self.indexer = indexer
90 self.schema = schema
91 self.key_map = key_map
92 self.htmRange = htmRange
93 self.addRefCatMetadata = addRefCatMetadata
94 self.log = log
95
96 if self.config.coord_err_unit is not None:
97 # cache this to speed up coordinate conversions.
98 self.coord_err_unit = u.Unit(self.config.coord_err_unit)
99
100 def run(self, inputFiles):
101 """Index a set of input files from a reference catalog, and write the
102 output to the appropriate filenames, in parallel.
103
104 Parameters
105 ----------
106 inputFiles : `list`
107 A list of file paths to read data from.
108
109 Returns
110 -------
111 output : `dict` [`int`, `str`]
112 The htm ids and the filenames that were written to.
113 """
114 self.nInputFiles = len(inputFiles)
115
116 with multiprocessing.Manager() as manager:
117 COUNTER.value = 0
118 FILE_PROGRESS.value = 0
119 fileLocks = manager.dict()
120 self.log.info("Creating %s file locks.", self.htmRange[1] - self.htmRange[0])
121 for i in range(self.htmRange[0], self.htmRange[1]):
122 fileLocks[i] = manager.Lock()
123 self.log.info("File locks created.")
124
125 start_time = time.perf_counter()
126 with multiprocessing.Pool(self.config.n_processes) as pool:
127 result = pool.starmap(self._convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
128 end_time = time.perf_counter()
129 self.log.info("Finished writing files. Elapsed time: %.2f seconds", end_time-start_time)
130
131 return {id: self.filenames[id] for item in result for id in item}
132
133 def _convertOneFile(self, filename, fileLocks):
134 """Read and process one file, and write its records to the correct
135 indexed files, while handling exceptions in a useful way so that they
136 don't get swallowed by the multiprocess pool.
137
138 Parameters
139 ----------
140 filename : `str`
141 The file to process.
142 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
143 A Lock for each HTM pixel; each pixel gets one file written, and
144 we need to block when one process is accessing that file.
145
146 Returns
147 -------
148 pixels, files : `list` [`int`]
149 The pixel ids that were written to.
150 """
151 self.log.debug("Processing: %s", filename)
152 inputData = self.file_reader.run(filename)
153 try:
154 fluxes = self._getFluxes(inputData)
155 coordErr = self._getCoordErr(inputData)
156 matchedPixels = self.indexer.indexPoints(inputData[self.config.ra_name],
157 inputData[self.config.dec_name])
158 except Exception:
159 self.log.error("Failure preparing data for: %s", filename)
160 raise
161
162 pixel_ids = set(matchedPixels)
163 for pixelId in pixel_ids:
164 with fileLocks[pixelId]:
165 try:
166 self._doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
167 except Exception:
168 self.log.error("Failure processing filename: %s, pixelId: %s",
169 filename, pixelId)
170 raise
171 with FILE_PROGRESS.get_lock():
172 oldPercent = 100 * FILE_PROGRESS.value / self.nInputFiles
173 FILE_PROGRESS.value += 1
174 percent = 100 * FILE_PROGRESS.value / self.nInputFiles
175 # only log each "new percent"
176 if np.floor(percent) - np.floor(oldPercent) >= 1:
177 self.log.info("Completed %d / %d files: %d %% complete ",
178 FILE_PROGRESS.value,
179 self.nInputFiles,
180 percent)
181 return pixel_ids
182
183 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
184 """Process one HTM pixel, appending to an existing catalog or creating
185 a new catalog, as needed.
186
187 Parameters
188 ----------
189 inputData : `numpy.ndarray`
190 The data from one input file.
191 matchedPixels : `numpy.ndarray`
192 The row-matched pixel indexes corresponding to ``inputData``.
193 pixelId : `int`
194 The pixel index we are currently processing.
195 fluxes : `dict` [`str`, `numpy.ndarray`]
196 The values that will go into the flux and fluxErr fields in the
197 output catalog.
198 coordErr : `dict` [`str`, `numpy.ndarray`]
199 The values that will go into the coord_raErr, coord_decErr, and
200 coord_ra_dec_Cov fields in the output catalog (in radians).
201 """
202 idx = np.where(matchedPixels == pixelId)[0]
203 catalog = self.getCatalog(pixelId, self.schema, len(idx))
204 for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
205 self._fillRecord(outputRow, inputRow)
206
207 with COUNTER.get_lock():
208 self._setIds(inputData[idx], catalog)
209
210 # set fluxes from the pre-computed array
211 for name, array in fluxes.items():
212 catalog[self.key_map[name]][-len(idx):] = array[idx]
213
214 # set coordinate errors from the pre-computed array
215 for name, array in coordErr.items():
216 catalog[name][-len(idx):] = array[idx]
217
218 catalog.writeFits(self.filenames[pixelId])
219
220 def _setIds(self, inputData, catalog):
221 """Fill the `id` field of catalog with a running index, filling the
222 last values up to the length of ``inputData``.
223
224 Fill with `self.config.id_name` if specified, otherwise use the
225 global running counter value.
226
227 Parameters
228 ----------
229 inputData : `numpy.ndarray`
230 The input data that is being processed.
231 catalog : `lsst.afw.table.SimpleCatalog`
232 The output catalog to fill the ids.
233 """
234 size = len(inputData)
235 if self.config.id_name:
236 catalog['id'][-size:] = inputData[self.config.id_name]
237 else:
238 idEnd = COUNTER.value + size
239 catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
240 COUNTER.value = idEnd
241
242 def getCatalog(self, pixelId, schema, nNewElements):
243 """Get a catalog from disk or create it if it doesn't exist.
244
245 Parameters
246 ----------
247 pixelId : `dict`
248 Identifier for catalog to retrieve
249 schema : `lsst.afw.table.Schema`
250 Schema to use in catalog creation it does not exist.
251 nNewElements : `int`
252 The number of new elements that will be added to the catalog,
253 so space can be preallocated.
254
255 Returns
256 -------
257 catalog : `lsst.afw.table.SimpleCatalog`
258 The new or read-and-resized catalog specified by `dataId`.
259 """
260 # This is safe, because we lock on this file before getCatalog is called.
261 if os.path.isfile(self.filenames[pixelId]):
262 catalog = afwTable.SimpleCatalog.readFits(self.filenames[pixelId])
263 catalog.resize(len(catalog) + nNewElements)
264 return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
265 catalog = afwTable.SimpleCatalog(schema)
266 catalog.resize(nNewElements)
267 self.addRefCatMetadata(catalog)
268 return catalog
269
270 @staticmethod
271 def computeCoord(row, ra_name, dec_name):
272 """Create an ICRS coord. from a row of a catalog being converted.
273
274 Parameters
275 ----------
276 row : `numpy.ndarray`
277 Row from catalog being converted.
278 ra_name : `str`
279 Name of RA key in catalog being converted.
280 dec_name : `str`
281 Name of Dec key in catalog being converted.
282
283 Returns
284 -------
285 coord : `lsst.geom.SpherePoint`
286 ICRS coordinate.
287 """
288 return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
289
290 def _getCoordErr(self, inputData, ):
291 """Compute the ra/dec error fields that will go into the output catalog.
292
293 Parameters
294 ----------
295 inputData : `numpy.ndarray`
296 The input data to compute fluxes for.
297
298 Returns
299 -------
300 coordErr : `dict` [`str`, `numpy.ndarray`]
301 The values that will go into the coord_raErr, coord_decErr, fields
302 in the output catalog (in radians).
303
304 Notes
305 -----
306 This does not handle the ra/dec covariance field,
307 ``coord_ra_coord_dec_Cov``. That field is handled in
308 `_setCoordinateCovariance`.
309 """
310 result = {}
311 if hasattr(self, "coord_err_unit"):
312 result['coord_raErr'] = u.Quantity(inputData[self.config.ra_err_name],
313 self.coord_err_unit).to_value(u.radian)
314 result['coord_decErr'] = u.Quantity(inputData[self.config.dec_err_name],
315 self.coord_err_unit).to_value(u.radian)
316 return result
317
318 def _setFlags(self, record, row):
319 """Set flags in an output record.
320
321 Parameters
322 ----------
323 record : `lsst.afw.table.SimpleRecord`
324 Row from indexed catalog to modify.
325 row : `numpy.ndarray`
326 Row from catalog being converted.
327 """
328 names = record.schema.getNames()
329 for flag in self._flags:
330 if flag in names:
331 attr_name = 'is_{}_name'.format(flag)
332 record.set(self.key_map[flag], bool(row[getattr(self.config, attr_name)]))
333
334 def _getFluxes(self, inputData):
335 """Compute the flux fields that will go into the output catalog.
336
337 Parameters
338 ----------
339 inputData : `numpy.ndarray`
340 The input data to compute fluxes for.
341
342 Returns
343 -------
344 fluxes : `dict` [`str`, `numpy.ndarray`]
345 The values that will go into the flux and fluxErr fields in the
346 output catalog.
347 """
348 result = {}
349 for item in self.config.mag_column_list:
350 result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
351 if len(self.config.mag_err_column_map) > 0:
352 for err_key in self.config.mag_err_column_map.keys():
353 error_col_name = self.config.mag_err_column_map[err_key]
354 # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
355 # NOTE: copy the arrays because the numpy strides may not be useable by C++.
356 fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
357 inputData[err_key].copy())*1e9
358 result[err_key+'_fluxErr'] = fluxErr
359 return result
360
361 def _setProperMotion(self, record, row):
362 """Set proper motion fields in a record of an indexed catalog.
363
364 The proper motions are read from the specified columns,
365 scaled appropriately, and installed in the appropriate
366 columns of the output.
367
368 Parameters
369 ----------
370 record : `lsst.afw.table.SimpleRecord`
371 Row from indexed catalog to modify.
372 row : structured `numpy.array`
373 Row from catalog being converted.
374 """
375 if self.config.pm_ra_name is None: # ConvertReferenceCatalogConfig.validate ensures all or none
376 return
377 radPerOriginal = np.radians(self.config.pm_scale)/(3600*1000)
378 record.set(self.key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
379 record.set(self.key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
380 record.set(self.key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
381 if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
382 record.set(self.key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
383 record.set(self.key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
384
385 def _setParallax(self, record, row):
386 """Set the parallax fields in a record of a refcat.
387 """
388 if self.config.parallax_name is None:
389 return
390 scale = self.config.parallax_scale*lsst.geom.milliarcseconds
391 record.set(self.key_map['parallax'], row[self.config.parallax_name]*scale)
392 record.set(self.key_map['parallaxErr'], row[self.config.parallax_err_name]*scale)
393
394 def _epochToMjdTai(self, nativeEpoch):
395 """Convert an epoch in native format to TAI MJD (a float).
396 """
397 return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
398 scale=self.config.epoch_scale).tai.mjd
399
400 def _setCoordinateCovariance(self, record, row):
401 """Set the off-diagonal position covariance in a record of an indexed
402 catalog.
403
404 There is no generic way to determine covariance. Override this method
405 in a subclass specialized for your dataset.
406
407 Parameters
408 ----------
409 record : `lsst.afw.table.SimpleRecord`
410 Row from indexed catalog to modify.
411 row : structured `numpy.array`
412 Row from catalog being converted.
413 """
414 raise NotImplementedError("There is no default method for setting the covariance. Override this "
415 "method in a subclass specialized for your dataset.")
416
417 def _setExtra(self, record, row):
418 """Set extra data fields in a record of an indexed catalog.
419
420 Parameters
421 ----------
422 record : `lsst.afw.table.SimpleRecord`
423 Row from indexed catalog to modify.
424 row : structured `numpy.array`
425 Row from catalog being converted.
426 """
427 for extra_col in self.config.extra_col_names:
428 value = row[extra_col]
429 # If data read from a text file contains string like entires,
430 # numpy stores this as its own internal type, a numpy.str_
431 # object. This seems to be a consequence of how numpy stores
432 # string like objects in fixed column arrays. This checks
433 # if any of the values to be added to the catalog are numpy
434 # string types, and if they are, casts them to a python string
435 # which is what the python c++ records expect
436 if isinstance(value, np.str_):
437 value = str(value)
438 record.set(self.key_map[extra_col], value)
439
440 def _fillRecord(self, record, row):
441 """Fill a record in an indexed catalog to be persisted.
442
443 Parameters
444 ----------
445 record : `lsst.afw.table.SimpleRecord`
446 Row from indexed catalog to modify.
447 row : structured `numpy.array`
448 Row from catalog being converted.
449 """
450 record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
451
452 self._setFlags(record, row)
453 if self.config.full_position_information:
454 self._setProperMotion(record, row)
455 self._setParallax(record, row)
456 self._setCoordinateCovariance(record, row)
457 self._setExtra(record, row)
458
459
461 """Special-case convert manager to deal with Gaia fluxes.
462 """
463 def __init__(self, *args, **kwargs):
464 super().__init__(*args, **kwargs)
465 self.properMotionUnit = self.config.pm_scale * u.milliarcsecond
466 self.parallaxUnit = self.config.parallax_scale * u.milliarcsecond
467 self.outputUnit = u.radian * u.radian
468
469 def _getFluxes(self, input):
470 result = {}
471
472 def gaiaFluxToFlux(flux, zeroPoint):
473 """Equations 5.19 and 5.30 from the Gaia calibration document define the
474 conversion from Gaia electron/second fluxes to AB magnitudes.
475 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
476 """
477 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
478 # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
479 result[flux == 0] = 0
480 return result
481
482 # Some fluxes are 0, so log10(flux) can give warnings. We handle the
483 # zeros explicitly, so they warnings are irrelevant.
484 with np.errstate(invalid='ignore', divide='ignore'):
485 # The constants below come from table 5.3 in this document;
486 # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
487 result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
488 result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
489 result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
490
491 result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
492 result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
493 result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
494
495 return result
496
497 def _setCoordinateCovariance(self, record, row):
498 """Set the off-diagonal position covariance in a record of an indexed
499 catalog.
500
501 Convert the Gaia coordinate correlations into covariances.
502
503 Parameters
504 ----------
505 record : `lsst.afw.table.SimpleRecord`
506 Row from indexed catalog to modify.
507 row : structured `numpy.array`
508 Row from catalog being converted.
509 """
510 inputParams = ['ra', 'dec', 'parallax', 'pmra', 'pmdec']
511 outputParams = ['coord_ra', 'coord_dec', 'parallax', 'pm_ra', 'pm_dec']
512 # The Gaia standard for naming is to order the parameters as
513 # (coordinates, parallax, proper motion), so they need to be reordered
514 # as (coordinates, proper motion, parallax) to match the order used
515 # in LSST code (i.g. 'coord_parallax_pm_ra_Cov' becomes
516 # 'coord_pm_ra_parallax_Cov').
517 reorder = [0, 1, 4, 2, 3]
518
519 inputUnits = [self.coord_err_unit, self.coord_err_unit, self.parallaxUnit, self.properMotionUnit,
520 self.properMotionUnit]
521
522 for i in range(5):
523 for j in range(i):
524 j_error = row[f'{inputParams[j]}_error'] * inputUnits[j]
525 i_error = row[f'{inputParams[i]}_error'] * inputUnits[i]
526 ij_corr = row[f'{inputParams[j]}_{inputParams[i]}_corr']
527 cov = (i_error * j_error * ij_corr).to_value(self.outputUnit)
528
529 # Switch from order of Gaia parallax and proper motion
530 # parameters to the desired schema:
531 a = (i if (reorder[i] < reorder[j]) else j)
532 b = (j if (reorder[i] < reorder[j]) else i)
533
534 record.set(self.key_map[f'{outputParams[a]}_{outputParams[b]}_Cov'], cov)
535
536
538 """Special-case convert manager for Gaia XP spectrophotometry catalogs,
539 that have fluxes/flux errors, instead of magnitudes/mag errors. The input
540 flux and error values are in units of W/Hz/(m^2) (Gaia Collaboration, Montegriffo et al. 2022).
541 The the flux and fluxErr fields in the output catalog have units of nJy.
542 """
543
544 def _getFluxes(self, inputData):
545 result = {}
546 for item in self.config.mag_column_list:
547
548 error_col_name = item.replace("_flux_", "_flux_error_")
549
550 result[item + "_flux"] = (
551 inputData[item] * u.Watt / u.Hz / u.meter / u.meter
552 ).to_value(u.nJy)
553 result[item + "_fluxErr"] = (
554 inputData[error_col_name] * u.Watt / u.Hz / u.meter / u.meter
555 ).to_value(u.nJy)
556
557 return result
Point in an unspecified spherical coordinate system.
Definition SpherePoint.h:57
__init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
_doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)