LSST Applications g070148d5b3+33e5256705,g0d53e28543+25c8b88941,g0da5cf3356+2dd1178308,g1081da9e2a+62d12e78cb,g17e5ecfddb+7e422d6136,g1c76d35bf8+ede3a706f7,g295839609d+225697d880,g2e2c1a68ba+cc1f6f037e,g2ffcdf413f+853cd4dcde,g38293774b4+62d12e78cb,g3b44f30a73+d953f1ac34,g48ccf36440+885b902d19,g4b2f1765b6+7dedbde6d2,g5320a0a9f6+0c5d6105b6,g56b687f8c9+ede3a706f7,g5c4744a4d9+ef6ac23297,g5ffd174ac0+0c5d6105b6,g6075d09f38+66af417445,g667d525e37+2ced63db88,g670421136f+2ced63db88,g71f27ac40c+2ced63db88,g774830318a+463cbe8d1f,g7876bc68e5+1d137996f1,g7985c39107+62d12e78cb,g7fdac2220c+0fd8241c05,g96f01af41f+368e6903a7,g9ca82378b8+2ced63db88,g9d27549199+ef6ac23297,gabe93b2c52+e3573e3735,gb065e2a02a+3dfbe639da,gbc3249ced9+0c5d6105b6,gbec6a3398f+0c5d6105b6,gc9534b9d65+35b9f25267,gd01420fc67+0c5d6105b6,geee7ff78d7+a14128c129,gf63283c776+ede3a706f7,gfed783d017+0c5d6105b6,w.2022.47
LSST Data Management Base Package
Loading...
Searching...
No Matches
convertRefcatManager.py
Go to the documentation of this file.
1# This file is part of meas_algorithms.
2#
3# Developed for the LSST Data Management System.
4# This product includes software developed by the LSST Project
5# (https://www.lsst.org).
6# See the COPYRIGHT file at the top-level directory of this distribution
7# for details of code ownership.
8#
9# This program is free software: you can redistribute it and/or modify
10# it under the terms of the GNU General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# This program is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU General Public License for more details.
18#
19# You should have received a copy of the GNU General Public License
20# along with this program. If not, see <https://www.gnu.org/licenses/>.
21
22__all__ = ["ConvertRefcatManager", "ConvertGaiaManager"]
23
24from ctypes import c_int
25import os.path
26import itertools
27import multiprocessing
28
29import astropy.time
30import astropy.units as u
31import numpy as np
32
33import lsst.sphgeom
34import lsst.afw.table as afwTable
35from lsst.afw.image import fluxErrFromABMagErr
36import lsst.pex.config as pexConfig
37
38
39# global shared counter to keep track of source ids
40# (multiprocess sharing is most easily done with a global)
41COUNTER = multiprocessing.Value(c_int, 0)
42# global shared counter to keep track of number of files processed.
43FILE_PROGRESS = multiprocessing.Value(c_int, 0)
44
45
46class ConvertRefcatManagerConfig(pexConfig.Config):
47 """Placeholder for ConfigurableField validation; refcat convert is
48 configured by the parent convert Task.
49 """
50 pass
51
52
54 """
55 Convert a reference catalog from external files into the LSST HTM sharded
56 format, using a multiprocessing Pool to speed up the work.
57
58 Parameters
59 ----------
60 filenames : `dict` [`int`, `str`]
61 The HTM pixel id and filenames to convert the catalog into.
63 The Task configuration holding the field names.
64 file_reader : `lsst.pipe.base.Task`
65 The file reader to use to load the files.
67 The class used to compute the HTM pixel per coordinate.
68 schema : `lsst.afw.table.Schema`
69 The schema of the output catalog.
70 key_map : `dict` [`str`, `lsst.afw.table.Key`]
71 The mapping from output field names to keys in the Schema.
72 htmRange : `tuple` [`int`]
73 The start and end HTM pixel ids.
74 addRefCatMetadata : callable
75 A function called to add extra metadata to each output Catalog.
76 log : `lsst.log.Log` or `logging.Logger`
77 The log to send messages to.
78 """
79 _flags = ['photometric', 'resolved', 'variable']
80 _DefaultName = 'convertRefcatManager'
81 ConfigClass = ConvertRefcatManagerConfig
82
83 def __init__(self, filenames, config, file_reader, indexer,
84 schema, key_map, htmRange, addRefCatMetadata, log):
85 self.filenames = filenames
86 self.config = config
87 self.file_reader = file_reader
88 self.indexer = indexer
89 self.schema = schema
90 self.key_map = key_map
91 self.htmRange = htmRange
92 self.addRefCatMetadata = addRefCatMetadata
93 self.log = log
94 if self.config.coord_err_unit is not None:
95 # cache this to speed up coordinate conversions
96 self.coord_err_unit = u.Unit(self.config.coord_err_unit)
97
98 def run(self, inputFiles):
99 """Index a set of input files from a reference catalog, and write the
100 output to the appropriate filenames, in parallel.
101
102 Parameters
103 ----------
104 inputFiles : `list`
105 A list of file paths to read data from.
106
107 Returns
108 -------
109 output : `dict` [`int`, `str`]
110 The htm ids and the filenames that were written to.
111 """
112 global COUNTER, FILE_PROGRESS
113 self.nInputFiles = len(inputFiles)
114
115 with multiprocessing.Manager() as manager:
116 COUNTER.value = 0
117 FILE_PROGRESS.value = 0
118 fileLocks = manager.dict()
119 self.log.info("Creating %s file locks.", self.htmRange[1] - self.htmRange[0])
120 for i in range(self.htmRange[0], self.htmRange[1]):
121 fileLocks[i] = manager.Lock()
122 self.log.info("File locks created.")
123 with multiprocessing.Pool(self.config.n_processes) as pool:
124 result = pool.starmap(self._convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
125 return {id: self.filenames[id] for item in result for id in item}
126
127 def _convertOneFile(self, filename, fileLocks):
128 """Read and process one file, and write its records to the correct
129 indexed files, while handling exceptions in a useful way so that they
130 don't get swallowed by the multiprocess pool.
131
132 Parameters
133 ----------
134 filename : `str`
135 The file to process.
136 fileLocks : `dict` [`int`, `multiprocessing.Lock`]
137 A Lock for each HTM pixel; each pixel gets one file written, and
138 we need to block when one process is accessing that file.
139
140 Returns
141 -------
142 pixels, files : `list` [`int`]
143 The pixel ids that were written to.
144 """
145 global FILE_PROGRESS
146 inputData = self.file_reader.run(filename)
147 fluxes = self._getFluxes(inputData)
148 coordErr = self._getCoordErr(inputData)
149 matchedPixels = self.indexer.indexPoints(inputData[self.config.ra_name],
150 inputData[self.config.dec_name])
151 pixel_ids = set(matchedPixels)
152 for pixelId in pixel_ids:
153 with fileLocks[pixelId]:
154 self._doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
155 with FILE_PROGRESS.get_lock():
156 oldPercent = 100 * FILE_PROGRESS.value / self.nInputFiles
157 FILE_PROGRESS.value += 1
158 percent = 100 * FILE_PROGRESS.value / self.nInputFiles
159 # only log each "new percent"
160 if np.floor(percent) - np.floor(oldPercent) >= 1:
161 self.log.info("Completed %d / %d files: %d %% complete ",
162 FILE_PROGRESS.value,
163 self.nInputFiles,
164 percent)
165 return pixel_ids
166
167 def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
168 """Process one HTM pixel, appending to an existing catalog or creating
169 a new catalog, as needed.
170
171 Parameters
172 ----------
173 inputData : `numpy.ndarray`
174 The data from one input file.
175 matchedPixels : `numpy.ndarray`
176 The row-matched pixel indexes corresponding to ``inputData``.
177 pixelId : `int`
178 The pixel index we are currently processing.
179 fluxes : `dict` [`str`, `numpy.ndarray`]
180 The values that will go into the flux and fluxErr fields in the
181 output catalog.
182 coordErr : `dict` [`str`, `numpy.ndarray`]
183 The values that will go into the coord_raErr, coord_decErr, and
184 coord_ra_dec_Cov fields in the output catalog (in radians).
185 """
186 idx = np.where(matchedPixels == pixelId)[0]
187 catalog = self.getCatalog(pixelId, self.schema, len(idx))
188 for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
189 self._fillRecord(outputRow, inputRow)
190
191 global COUNTER
192 with COUNTER.get_lock():
193 self._setIds(inputData[idx], catalog)
194
195 # set fluxes from the pre-computed array
196 for name, array in fluxes.items():
197 catalog[self.key_map[name]][-len(idx):] = array[idx]
198
199 # set coordinate errors from the pre-computed array
200 for name, array in coordErr.items():
201 catalog[name][-len(idx):] = array[idx]
202
203 catalog.writeFits(self.filenames[pixelId])
204
205 def _setIds(self, inputData, catalog):
206 """Fill the `id` field of catalog with a running index, filling the
207 last values up to the length of ``inputData``.
208
209 Fill with `self.config.id_name` if specified, otherwise use the
210 global running counter value.
211
212 Parameters
213 ----------
214 inputData : `numpy.ndarray`
215 The input data that is being processed.
217 The output catalog to fill the ids.
218 """
219 global COUNTER
220 size = len(inputData)
221 if self.config.id_name:
222 catalog['id'][-size:] = inputData[self.config.id_name]
223 else:
224 idEnd = COUNTER.value + size
225 catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
226 COUNTER.value = idEnd
227
228 def getCatalog(self, pixelId, schema, nNewElements):
229 """Get a catalog from disk or create it if it doesn't exist.
230
231 Parameters
232 ----------
233 pixelId : `dict`
234 Identifier for catalog to retrieve
235 schema : `lsst.afw.table.Schema`
236 Schema to use in catalog creation it does not exist.
237 nNewElements : `int`
238 The number of new elements that will be added to the catalog,
239 so space can be preallocated.
240
241 Returns
242 -------
244 The new or read-and-resized catalog specified by `dataId`.
245 """
246 # This is safe, because we lock on this file before getCatalog is called.
247 if os.path.isfile(self.filenames[pixelId]):
248 catalog = afwTable.SimpleCatalog.readFits(self.filenames[pixelId])
249 catalog.resize(len(catalog) + nNewElements)
250 return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
251 catalog = afwTable.SimpleCatalog(schema)
252 catalog.resize(nNewElements)
253 self.addRefCatMetadata(catalog)
254 return catalog
255
256 @staticmethod
257 def computeCoord(row, ra_name, dec_name):
258 """Create an ICRS coord. from a row of a catalog being converted.
259
260 Parameters
261 ----------
262 row : `numpy.ndarray`
263 Row from catalog being converted.
264 ra_name : `str`
265 Name of RA key in catalog being converted.
266 dec_name : `str`
267 Name of Dec key in catalog being converted.
268
269 Returns
270 -------
271 coord : `lsst.geom.SpherePoint`
272 ICRS coordinate.
273 """
274 return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
275
276 def _getCoordErr(self, inputData, ):
277 """Compute the ra/dec error fields that will go into the output catalog.
278
279 Parameters
280 ----------
281 inputData : `numpy.ndarray`
282 The input data to compute fluxes for.
283
284 Returns
285 -------
286 coordErr : `dict` [`str`, `numpy.ndarray`]
287 The values that will go into the coord_raErr, coord_decErr, fields
288 in the output catalog (in radians).
289
290 Notes
291 -----
292 This does not currently handle the ra/dec covariance field,
293 ``coord_ra_dec_Cov``. That field may require extra work, as its units
294 may be more complicated in external catalogs.
295 """
296 result = {}
297 if hasattr(self, "coord_err_unit"):
298 result['coord_raErr'] = u.Quantity(inputData[self.config.ra_err_name],
299 self.coord_err_unit).to_value(u.radian)
300 result['coord_decErr'] = u.Quantity(inputData[self.config.dec_err_name],
301 self.coord_err_unit).to_value(u.radian)
302 return result
303
304 def _setFlags(self, record, row):
305 """Set flags in an output record.
306
307 Parameters
308 ----------
310 Row from indexed catalog to modify.
311 row : `numpy.ndarray`
312 Row from catalog being converted.
313 """
314 names = record.schema.getNames()
315 for flag in self._flags:
316 if flag in names:
317 attr_name = 'is_{}_name'.format(flag)
318 record.set(self.key_map[flag], bool(row[getattr(self.config, attr_name)]))
319
320 def _getFluxes(self, inputData):
321 """Compute the flux fields that will go into the output catalog.
322
323 Parameters
324 ----------
325 inputData : `numpy.ndarray`
326 The input data to compute fluxes for.
327
328 Returns
329 -------
330 fluxes : `dict` [`str`, `numpy.ndarray`]
331 The values that will go into the flux and fluxErr fields in the
332 output catalog.
333 """
334 result = {}
335 for item in self.config.mag_column_list:
336 result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
337 if len(self.config.mag_err_column_map) > 0:
338 for err_key in self.config.mag_err_column_map.keys():
339 error_col_name = self.config.mag_err_column_map[err_key]
340 # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
341 # NOTE: copy the arrays because the numpy strides may not be useable by C++.
342 fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
343 inputData[err_key].copy())*1e9
344 result[err_key+'_fluxErr'] = fluxErr
345 return result
346
347 def _setProperMotion(self, record, row):
348 """Set proper motion fields in a record of an indexed catalog.
349
350 The proper motions are read from the specified columns,
351 scaled appropriately, and installed in the appropriate
352 columns of the output.
353
354 Parameters
355 ----------
357 Row from indexed catalog to modify.
358 row : structured `numpy.array`
359 Row from catalog being converted.
360 """
361 if self.config.pm_ra_name is None: # ConvertReferenceCatalogConfig.validate ensures all or none
362 return
363 radPerOriginal = np.radians(self.config.pm_scale)/(3600*1000)
364 record.set(self.key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
365 record.set(self.key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
366 record.set(self.key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
367 if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
368 record.set(self.key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
369 record.set(self.key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
370
371 def _setParallax(self, record, row):
372 """Set the parallax fields in a record of a refcat.
373 """
374 if self.config.parallax_name is None:
375 return
376 scale = self.config.parallax_scale*lsst.geom.milliarcseconds
377 record.set(self.key_map['parallax'], row[self.config.parallax_name]*scale)
378 record.set(self.key_map['parallaxErr'], row[self.config.parallax_err_name]*scale)
379
380 def _epochToMjdTai(self, nativeEpoch):
381 """Convert an epoch in native format to TAI MJD (a float).
382 """
383 return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
384 scale=self.config.epoch_scale).tai.mjd
385
386 def _setExtra(self, record, row):
387 """Set extra data fields in a record of an indexed catalog.
388
389 Parameters
390 ----------
392 Row from indexed catalog to modify.
393 row : structured `numpy.array`
394 Row from catalog being converted.
395 """
396 for extra_col in self.config.extra_col_names:
397 value = row[extra_col]
398 # If data read from a text file contains string like entires,
399 # numpy stores this as its own internal type, a numpy.str_
400 # object. This seems to be a consequence of how numpy stores
401 # string like objects in fixed column arrays. This checks
402 # if any of the values to be added to the catalog are numpy
403 # string types, and if they are, casts them to a python string
404 # which is what the python c++ records expect
405 if isinstance(value, np.str_):
406 value = str(value)
407 record.set(self.key_map[extra_col], value)
408
409 def _fillRecord(self, record, row):
410 """Fill a record in an indexed catalog to be persisted.
411
412 Parameters
413 ----------
415 Row from indexed catalog to modify.
416 row : structured `numpy.array`
417 Row from catalog being converted.
418 """
419 record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
420
421 self._setFlags(record, row)
422 self._setProperMotion(record, row)
423 self._setParallax(record, row)
424 self._setExtra(record, row)
425
426
428 """Special-case convert manager to deal with Gaia fluxes.
429 """
430 def _getFluxes(self, input):
431 result = {}
432
433 def gaiaFluxToFlux(flux, zeroPoint):
434 """Equations 5.19 and 5.30 from the Gaia calibration document define the
435 conversion from Gaia electron/second fluxes to AB magnitudes.
436 https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
437 """
438 result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
439 # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
440 result[flux == 0] = 0
441 return result
442
443 # Some fluxes are 0, so log10(flux) can give warnings. We handle the
444 # zeros explicitly, so they warnings are irrelevant.
445 with np.errstate(invalid='ignore', divide='ignore'):
446 # The constants below come from table 5.3 in this document;
447 # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
448 result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
449 result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
450 result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
451
452 result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
453 result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
454 result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
455
456 return result
table::Key< int > to
A class used as a handle to a particular field in a table.
Definition: Key.h:53
Defines the fields and offsets for a table.
Definition: Schema.h:51
Record class that must contain a unique ID field and a celestial coordinate field.
Definition: Simple.h:48
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: SortedCatalog.h:42
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
This static class includes a variety of methods for interacting with the the logging module.
Definition: Log.h:724
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
daf::base::PropertySet * set
Definition: fits.cc:927