LSST Applications  21.0.0-172-gfb10e10a+18fedfabac,22.0.0+297cba6710,22.0.0+80564b0ff1,22.0.0+8d77f4f51a,22.0.0+a28f4c53b1,22.0.0+dcf3732eb2,22.0.1-1-g7d6de66+2a20fdde0d,22.0.1-1-g8e32f31+297cba6710,22.0.1-1-geca5380+7fa3b7d9b6,22.0.1-12-g44dc1dc+2a20fdde0d,22.0.1-15-g6a90155+515f58c32b,22.0.1-16-g9282f48+790f5f2caa,22.0.1-2-g92698f7+dcf3732eb2,22.0.1-2-ga9b0f51+7fa3b7d9b6,22.0.1-2-gd1925c9+bf4f0e694f,22.0.1-24-g1ad7a390+a9625a72a8,22.0.1-25-g5bf6245+3ad8ecd50b,22.0.1-25-gb120d7b+8b5510f75f,22.0.1-27-g97737f7+2a20fdde0d,22.0.1-32-gf62ce7b1+aa4237961e,22.0.1-4-g0b3f228+2a20fdde0d,22.0.1-4-g243d05b+871c1b8305,22.0.1-4-g3a563be+32dcf1063f,22.0.1-4-g44f2e3d+9e4ab0f4fa,22.0.1-42-gca6935d93+ba5e5ca3eb,22.0.1-5-g15c806e+85460ae5f3,22.0.1-5-g58711c4+611d128589,22.0.1-5-g75bb458+99c117b92f,22.0.1-6-g1c63a23+7fa3b7d9b6,22.0.1-6-g50866e6+84ff5a128b,22.0.1-6-g8d3140d+720564cf76,22.0.1-6-gd805d02+cc5644f571,22.0.1-8-ge5750ce+85460ae5f3,master-g6e05de7fdc+babf819c66,master-g99da0e417a+8d77f4f51a,w.2021.48
LSST Data Management Base Package
convertRefcatManager.py
Go to the documentation of this file.
1 # This file is part of meas_algorithms.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 __all__ = ["ConvertRefcatManager", "ConvertGaiaManager"]
23 
24 from ctypes import c_int
25 import os.path
26 import itertools
27 import multiprocessing
28 
29 import astropy.time
30 import astropy.units as u
31 import numpy as np
32 
33 import lsst.sphgeom
34 import lsst.afw.table as afwTable
35 from lsst.afw.image import fluxErrFromABMagErr
36 import lsst.pex.config as pexConfig
37 
38 
39 # global shared counter to keep track of source ids
40 # (multiprocess sharing is most easily done with a global)
41 COUNTER = multiprocessing.Value(c_int, 0)
42 # global shared counter to keep track of number of files processed.
43 FILE_PROGRESS = multiprocessing.Value(c_int, 0)
44 
45 
46 class ConvertRefcatManagerConfig(pexConfig.Config):
47  """Placeholder for ConfigurableField validation; refcat convert is
48  configured by the parent convert Task.
49  """
50  pass
51 
52 
54  """
55  Convert a reference catalog from external files into the LSST HTM sharded
56  format, using a multiprocessing Pool to speed up the work.
57 
58  Parameters
59  ----------
60  filenames : `dict` [`int`, `str`]
61  The HTM pixel id and filenames to convert the catalog into.
62  config : `lsst.meas.algorithms.ConvertReferenceCatalogConfig`
63  The Task configuration holding the field names.
64  file_reader : `lsst.pipe.base.Task`
65  The file reader to use to load the files.
66  indexer : `lsst.meas.algorithms.HtmIndexer`
67  The class used to compute the HTM pixel per coordinate.
68  schema : `lsst.afw.table.Schema`
69  The schema of the output catalog.
70  key_map : `dict` [`str`, `lsst.afw.table.Key`]
71  The mapping from output field names to keys in the Schema.
72  htmRange : `tuple` [`int`]
73  The start and end HTM pixel ids.
74  addRefCatMetadata : callable
75  A function called to add extra metadata to each output Catalog.
76  log : `lsst.log.Log` or `logging.Logger`
77  The log to send messages to.
78  """
79  _flags = ['photometric', 'resolved', 'variable']
80  _DefaultName = 'convertRefcatManager'
81  ConfigClass = ConvertRefcatManagerConfig
82 
83  def __init__(self, filenames, config, file_reader, indexer,
84  schema, key_map, htmRange, addRefCatMetadata, log):
85  self.filenamesfilenames = filenames
86  self.configconfig = config
87  self.file_readerfile_reader = file_reader
88  self.indexerindexer = indexer
89  self.schemaschema = schema
90  self.key_mapkey_map = key_map
91  self.htmRangehtmRange = htmRange
92  self.addRefCatMetadataaddRefCatMetadata = addRefCatMetadata
93  self.loglog = log
94  if self.configconfig.coord_err_unit is not None:
95  # cache this to speed up coordinate conversions
96  self.coord_err_unitcoord_err_unit = u.Unit(self.configconfig.coord_err_unit)
97 
98  def run(self, inputFiles):
99  """Index a set of input files from a reference catalog, and write the
100  output to the appropriate filenames, in parallel.
101 
102  Parameters
103  ----------
104  inputFiles : `list`
105  A list of file paths to read data from.
106 
107  Returns
108  -------
109  output : `dict` [`int`, `str`]
110  The htm ids and the filenames that were written to.
111  """
112  global COUNTER, FILE_PROGRESS
113  self.nInputFilesnInputFiles = len(inputFiles)
114 
115  with multiprocessing.Manager() as manager:
116  COUNTER.value = 0
117  FILE_PROGRESS.value = 0
118  fileLocks = manager.dict()
119  self.loglog.info("Creating %s file locks.", self.htmRangehtmRange[1] - self.htmRangehtmRange[0])
120  for i in range(self.htmRangehtmRange[0], self.htmRangehtmRange[1]):
121  fileLocks[i] = manager.Lock()
122  self.loglog.info("File locks created.")
123  with multiprocessing.Pool(self.configconfig.n_processes) as pool:
124  result = pool.starmap(self._convertOneFile_convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
125  return {id: self.filenamesfilenames[id] for item in result for id in item}
126 
127  def _convertOneFile(self, filename, fileLocks):
128  """Read and process one file, and write its records to the correct
129  indexed files, while handling exceptions in a useful way so that they
130  don't get swallowed by the multiprocess pool.
131 
132  Parameters
133  ----------
134  filename : `str`
135  The file to process.
136  fileLocks : `dict` [`int`, `multiprocessing.Lock`]
137  A Lock for each HTM pixel; each pixel gets one file written, and
138  we need to block when one process is accessing that file.
139 
140  Returns
141  -------
142  pixels, files : `list` [`int`]
143  The pixel ids that were written to.
144  """
145  global FILE_PROGRESS
146  inputData = self.file_readerfile_reader.run(filename)
147  fluxes = self._getFluxes_getFluxes(inputData)
148  coordErr = self._getCoordErr_getCoordErr(inputData)
149  matchedPixels = self.indexerindexer.indexPoints(inputData[self.configconfig.ra_name],
150  inputData[self.configconfig.dec_name])
151  pixel_ids = set(matchedPixels)
152  for pixelId in pixel_ids:
153  with fileLocks[pixelId]:
154  self._doOnePixel_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
155  with FILE_PROGRESS.get_lock():
156  oldPercent = 100 * FILE_PROGRESS.value / self.nInputFilesnInputFiles
157  FILE_PROGRESS.value += 1
158  percent = 100 * FILE_PROGRESS.value / self.nInputFilesnInputFiles
159  # only log each "new percent"
160  if np.floor(percent) - np.floor(oldPercent) >= 1:
161  self.loglog.info("Completed %d / %d files: %d %% complete ",
162  FILE_PROGRESS.value,
163  self.nInputFilesnInputFiles,
164  percent)
165  return pixel_ids
166 
167  def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
168  """Process one HTM pixel, appending to an existing catalog or creating
169  a new catalog, as needed.
170 
171  Parameters
172  ----------
173  inputData : `numpy.ndarray`
174  The data from one input file.
175  matchedPixels : `numpy.ndarray`
176  The row-matched pixel indexes corresponding to ``inputData``.
177  pixelId : `int`
178  The pixel index we are currently processing.
179  fluxes : `dict` [`str`, `numpy.ndarray`]
180  The values that will go into the flux and fluxErr fields in the
181  output catalog.
182  coordErr : `dict` [`str`, `numpy.ndarray`]
183  The values that will go into the coord_raErr, coord_decErr, and
184  coord_ra_dec_Cov fields in the output catalog (in radians).
185  """
186  idx = np.where(matchedPixels == pixelId)[0]
187  catalog = self.getCataloggetCatalog(pixelId, self.schemaschema, len(idx))
188  for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
189  self._fillRecord_fillRecord(outputRow, inputRow)
190 
191  global COUNTER
192  with COUNTER.get_lock():
193  self._setIds_setIds(inputData[idx], catalog)
194 
195  # set fluxes from the pre-computed array
196  for name, array in fluxes.items():
197  catalog[self.key_mapkey_map[name]][-len(idx):] = array[idx]
198 
199  # set coordinate errors from the pre-computed array
200  for name, array in coordErr.items():
201  catalog[name][-len(idx):] = array[idx]
202 
203  catalog.writeFits(self.filenamesfilenames[pixelId])
204 
205  def _setIds(self, inputData, catalog):
206  """Fill the `id` field of catalog with a running index, filling the
207  last values up to the length of ``inputData``.
208 
209  Fill with `self.config.id_name` if specified, otherwise use the
210  global running counter value.
211 
212  Parameters
213  ----------
214  inputData : `numpy.ndarray`
215  The input data that is being processed.
216  catalog : `lsst.afw.table.SimpleCatalog`
217  The output catalog to fill the ids.
218  """
219  global COUNTER
220  size = len(inputData)
221  if self.configconfig.id_name:
222  catalog['id'][-size:] = inputData[self.configconfig.id_name]
223  else:
224  idEnd = COUNTER.value + size
225  catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
226  COUNTER.value = idEnd
227 
228  def getCatalog(self, pixelId, schema, nNewElements):
229  """Get a catalog from disk or create it if it doesn't exist.
230 
231  Parameters
232  ----------
233  pixelId : `dict`
234  Identifier for catalog to retrieve
235  schema : `lsst.afw.table.Schema`
236  Schema to use in catalog creation it does not exist.
237  nNewElements : `int`
238  The number of new elements that will be added to the catalog,
239  so space can be preallocated.
240 
241  Returns
242  -------
243  catalog : `lsst.afw.table.SimpleCatalog`
244  The new or read-and-resized catalog specified by `dataId`.
245  """
246  # This is safe, because we lock on this file before getCatalog is called.
247  if os.path.isfile(self.filenamesfilenames[pixelId]):
248  catalog = afwTable.SimpleCatalog.readFits(self.filenamesfilenames[pixelId])
249  catalog.resize(len(catalog) + nNewElements)
250  return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
251  catalog = afwTable.SimpleCatalog(schema)
252  catalog.resize(nNewElements)
253  self.addRefCatMetadataaddRefCatMetadata(catalog)
254  return catalog
255 
256  @staticmethod
257  def computeCoord(row, ra_name, dec_name):
258  """Create an ICRS coord. from a row of a catalog being converted.
259 
260  Parameters
261  ----------
262  row : `numpy.ndarray`
263  Row from catalog being converted.
264  ra_name : `str`
265  Name of RA key in catalog being converted.
266  dec_name : `str`
267  Name of Dec key in catalog being converted.
268 
269  Returns
270  -------
271  coord : `lsst.geom.SpherePoint`
272  ICRS coordinate.
273  """
274  return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
275 
276  def _getCoordErr(self, inputData, ):
277  """Compute the ra/dec error fields that will go into the output catalog.
278 
279  Parameters
280  ----------
281  inputData : `numpy.ndarray`
282  The input data to compute fluxes for.
283 
284  Returns
285  -------
286  coordErr : `dict` [`str`, `numpy.ndarray`]
287  The values that will go into the coord_raErr, coord_decErr, fields
288  in the output catalog (in radians).
289 
290  Notes
291  -----
292  This does not currently handle the ra/dec covariance field,
293  ``coord_ra_dec_Cov``. That field may require extra work, as its units
294  may be more complicated in external catalogs.
295  """
296  result = {}
297  if hasattr(self, "coord_err_unit"):
298  result['coord_raErr'] = u.Quantity(inputData[self.configconfig.ra_err_name],
299  self.coord_err_unitcoord_err_unit).to_value(u.radian)
300  result['coord_decErr'] = u.Quantity(inputData[self.configconfig.dec_err_name],
301  self.coord_err_unitcoord_err_unit).to_value(u.radian)
302  return result
303 
304  def _setFlags(self, record, row):
305  """Set flags in an output record.
306 
307  Parameters
308  ----------
309  record : `lsst.afw.table.SimpleRecord`
310  Row from indexed catalog to modify.
311  row : `numpy.ndarray`
312  Row from catalog being converted.
313  """
314  names = record.schema.getNames()
315  for flag in self._flags_flags:
316  if flag in names:
317  attr_name = 'is_{}_name'.format(flag)
318  record.set(self.key_mapkey_map[flag], bool(row[getattr(self.configconfig, attr_name)]))
319 
320  def _getFluxes(self, inputData):
321  """Compute the flux fields that will go into the output catalog.
322 
323  Parameters
324  ----------
325  inputData : `numpy.ndarray`
326  The input data to compute fluxes for.
327 
328  Returns
329  -------
330  fluxes : `dict` [`str`, `numpy.ndarray`]
331  The values that will go into the flux and fluxErr fields in the
332  output catalog.
333  """
334  result = {}
335  for item in self.configconfig.mag_column_list:
336  result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
337  if len(self.configconfig.mag_err_column_map) > 0:
338  for err_key in self.configconfig.mag_err_column_map.keys():
339  error_col_name = self.configconfig.mag_err_column_map[err_key]
340  # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
341  # NOTE: copy the arrays because the numpy strides may not be useable by C++.
342  fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
343  inputData[err_key].copy())*1e9
344  result[err_key+'_fluxErr'] = fluxErr
345  return result
346 
347  def _setProperMotion(self, record, row):
348  """Set proper motion fields in a record of an indexed catalog.
349 
350  The proper motions are read from the specified columns,
351  scaled appropriately, and installed in the appropriate
352  columns of the output.
353 
354  Parameters
355  ----------
356  record : `lsst.afw.table.SimpleRecord`
357  Row from indexed catalog to modify.
358  row : structured `numpy.array`
359  Row from catalog being converted.
360  """
361  if self.configconfig.pm_ra_name is None: # ConvertReferenceCatalogConfig.validate ensures all or none
362  return
363  radPerOriginal = np.radians(self.configconfig.pm_scale)/(3600*1000)
364  record.set(self.key_mapkey_map["pm_ra"], row[self.configconfig.pm_ra_name]*radPerOriginal*lsst.geom.radians)
365  record.set(self.key_mapkey_map["pm_dec"], row[self.configconfig.pm_dec_name]*radPerOriginal*lsst.geom.radians)
366  record.set(self.key_mapkey_map["epoch"], self._epochToMjdTai_epochToMjdTai(row[self.configconfig.epoch_name]))
367  if self.configconfig.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
368  record.set(self.key_mapkey_map["pm_raErr"], row[self.configconfig.pm_ra_err_name]*radPerOriginal)
369  record.set(self.key_mapkey_map["pm_decErr"], row[self.configconfig.pm_dec_err_name]*radPerOriginal)
370 
371  def _setParallax(self, record, row):
372  """Set the parallax fields in a record of a refcat.
373  """
374  if self.configconfig.parallax_name is None:
375  return
376  scale = self.configconfig.parallax_scale*lsst.geom.milliarcseconds
377  record.set(self.key_mapkey_map['parallax'], row[self.configconfig.parallax_name]*scale)
378  record.set(self.key_mapkey_map['parallaxErr'], row[self.configconfig.parallax_err_name]*scale)
379 
380  def _epochToMjdTai(self, nativeEpoch):
381  """Convert an epoch in native format to TAI MJD (a float).
382  """
383  return astropy.time.Time(nativeEpoch, format=self.configconfig.epoch_format,
384  scale=self.configconfig.epoch_scale).tai.mjd
385 
386  def _setExtra(self, record, row):
387  """Set extra data fields in a record of an indexed catalog.
388 
389  Parameters
390  ----------
391  record : `lsst.afw.table.SimpleRecord`
392  Row from indexed catalog to modify.
393  row : structured `numpy.array`
394  Row from catalog being converted.
395  """
396  for extra_col in self.configconfig.extra_col_names:
397  value = row[extra_col]
398  # If data read from a text file contains string like entires,
399  # numpy stores this as its own internal type, a numpy.str_
400  # object. This seems to be a consequence of how numpy stores
401  # string like objects in fixed column arrays. This checks
402  # if any of the values to be added to the catalog are numpy
403  # string types, and if they are, casts them to a python string
404  # which is what the python c++ records expect
405  if isinstance(value, np.str_):
406  value = str(value)
407  record.set(self.key_mapkey_map[extra_col], value)
408 
409  def _fillRecord(self, record, row):
410  """Fill a record in an indexed catalog to be persisted.
411 
412  Parameters
413  ----------
414  record : `lsst.afw.table.SimpleRecord`
415  Row from indexed catalog to modify.
416  row : structured `numpy.array`
417  Row from catalog being converted.
418  """
419  record.setCoord(self.computeCoordcomputeCoord(row, self.configconfig.ra_name, self.configconfig.dec_name))
420 
421  self._setFlags_setFlags(record, row)
422  self._setProperMotion_setProperMotion(record, row)
423  self._setParallax_setParallax(record, row)
424  self._setExtra_setExtra(record, row)
425 
426 
428  """Special-case convert manager to deal with Gaia fluxes.
429  """
430  def _getFluxes(self, input):
431  result = {}
432 
433  def gaiaFluxToFlux(flux, zeroPoint):
434  """Equations 5.19 and 5.30 from the Gaia calibration document define the
435  conversion from Gaia electron/second fluxes to AB magnitudes.
436  https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
437  """
438  result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
439  # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
440  result[flux == 0] = 0
441  return result
442 
443  # Some fluxes are 0, so log10(flux) can give warnings. We handle the
444  # zeros explicitly, so they warnings are irrelevant.
445  with np.errstate(invalid='ignore', divide='ignore'):
446  # The constants below come from table 5.3 in this document;
447  # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
448  result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
449  result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
450  result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
451 
452  result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
453  result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
454  result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
455 
456  return result
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: SortedCatalog.h:42
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
daf::base::PropertySet * set
Definition: fits.cc:912
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:60
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174