LSST Applications  21.0.0-141-gec8a224e+c9dc52cf11,22.0.0+052faf71bd,22.0.0+1c4650f311,22.0.0+40ce427c77,22.0.0+5b6c068b1a,22.0.0+7589c3a021,22.0.0+b84b5806bd,22.0.1-1-g7d6de66+961d22230d,22.0.1-1-g87000a6+314cd8b7ea,22.0.1-1-g8760c09+052faf71bd,22.0.1-1-g8e32f31+5b6c068b1a,22.0.1-10-g779eefa+7d186d72ce,22.0.1-12-g3bd7ecb+bec1274511,22.0.1-15-g63cc0c1+2a7037787d,22.0.1-17-ge5a99e88+2652af7731,22.0.1-19-g88addfe+961d22230d,22.0.1-2-g1cb3e5b+9c7a6aab6f,22.0.1-2-g8ef0a89+961d22230d,22.0.1-2-g92698f7+1c4650f311,22.0.1-2-ga9b0f51+052faf71bd,22.0.1-2-gb66926d+5b6c068b1a,22.0.1-2-gcb770ba+0723a13595,22.0.1-2-ge470956+36d961e575,22.0.1-21-g532228a4+2ac85e833c,22.0.1-29-g184b6e44e+8b185d4e2d,22.0.1-3-g59f966b+89b97c2637,22.0.1-3-g8c1d971+f90df4c6d0,22.0.1-3-g997b569+d69a7aa2f8,22.0.1-3-gaaec9c0+4d194bf81c,22.0.1-4-g1930a60+283d9d2f1a,22.0.1-4-g5b7b756+c1283a92b8,22.0.1-4-g8623105+961d22230d,22.0.1-7-gba73697+283d9d2f1a,22.0.1-8-g47d23f5+39e151d95f,master-g5f2689bdc5+40ce427c77,w.2021.38
LSST Data Management Base Package
ingestIndexManager.py
Go to the documentation of this file.
1 # This file is part of meas_algorithms.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 __all__ = ["IngestIndexManager", "IngestGaiaManager"]
23 
24 from ctypes import c_int
25 import os.path
26 import itertools
27 import multiprocessing
28 
29 import astropy.time
30 import astropy.units as u
31 import numpy as np
32 
33 import lsst.sphgeom
34 import lsst.afw.table as afwTable
35 from lsst.afw.image import fluxErrFromABMagErr
36 
37 
38 # global shared counter to keep track of source ids
39 # (multiprocess sharing is most easily done with a global)
40 COUNTER = multiprocessing.Value(c_int, 0)
41 # global shared counter to keep track of number of files processed.
42 FILE_PROGRESS = multiprocessing.Value(c_int, 0)
43 
44 
46  """
47  Ingest a reference catalog from external files into a butler repository,
48  using a multiprocessing Pool to speed up the work.
49 
50  Parameters
51  ----------
52  filenames : `dict` [`int`, `str`]
53  The HTM pixel id and filenames to ingest the catalog into.
54  config : `lsst.meas.algorithms.IngestIndexedReferenceConfig`
55  The Task configuration holding the field names.
56  file_reader : `lsst.pipe.base.Task`
57  The file reader to use to load the files.
58  indexer : `lsst.meas.algorithms.HtmIndexer`
59  The class used to compute the HTM pixel per coordinate.
60  schema : `lsst.afw.table.Schema`
61  The schema of the output catalog.
62  key_map : `dict` [`str`, `lsst.afw.table.Key`]
63  The mapping from output field names to keys in the Schema.
64  htmRange : `tuple` [`int`]
65  The start and end HTM pixel ids.
66  addRefCatMetadata : callable
67  A function called to add extra metadata to each output Catalog.
68  log : `lsst.log.Log`
69  The log to send messages to.
70  """
71  _flags = ['photometric', 'resolved', 'variable']
72 
73  def __init__(self, filenames, config, file_reader, indexer,
74  schema, key_map, htmRange, addRefCatMetadata, log):
75  self.filenamesfilenames = filenames
76  self.configconfig = config
77  self.file_readerfile_reader = file_reader
78  self.indexerindexer = indexer
79  self.schemaschema = schema
80  self.key_mapkey_map = key_map
81  self.htmRangehtmRange = htmRange
82  self.addRefCatMetadataaddRefCatMetadata = addRefCatMetadata
83  self.loglog = log
84  if self.configconfig.coord_err_unit is not None:
85  # cache this to speed up coordinate conversions
86  self.coord_err_unitcoord_err_unit = u.Unit(self.configconfig.coord_err_unit)
87 
88  def run(self, inputFiles):
89  """Index a set of input files from a reference catalog, and write the
90  output to the appropriate filenames, in parallel.
91 
92  Parameters
93  ----------
94  inputFiles : `list`
95  A list of file paths to read data from.
96  """
97  global COUNTER, FILE_PROGRESS
98  self.nInputFilesnInputFiles = len(inputFiles)
99 
100  with multiprocessing.Manager() as manager:
101  COUNTER.value = 0
102  FILE_PROGRESS.value = 0
103  fileLocks = manager.dict()
104  self.loglog.info("Creating %s file locks.", self.htmRangehtmRange[1] - self.htmRangehtmRange[0])
105  for i in range(self.htmRangehtmRange[0], self.htmRangehtmRange[1]):
106  fileLocks[i] = manager.Lock()
107  self.loglog.info("File locks created.")
108  with multiprocessing.Pool(self.configconfig.n_processes) as pool:
109  pool.starmap(self._ingestOneFile_ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
110 
111  def _ingestOneFile(self, filename, fileLocks):
112  """Read and process one file, and write its records to the correct
113  indexed files, while handling exceptions in a useful way so that they
114  don't get swallowed by the multiprocess pool.
115 
116  Parameters
117  ----------
118  filename : `str`
119  The file to process.
120  fileLocks : `dict` [`int`, `multiprocessing.Lock`]
121  A Lock for each HTM pixel; each pixel gets one file written, and
122  we need to block when one process is accessing that file.
123  """
124  global FILE_PROGRESS
125  inputData = self.file_readerfile_reader.run(filename)
126  fluxes = self._getFluxes_getFluxes(inputData)
127  coordErr = self._getCoordErr_getCoordErr(inputData)
128  matchedPixels = self.indexerindexer.indexPoints(inputData[self.configconfig.ra_name],
129  inputData[self.configconfig.dec_name])
130  pixel_ids = set(matchedPixels)
131  for pixelId in pixel_ids:
132  with fileLocks[pixelId]:
133  self._doOnePixel_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
134  with FILE_PROGRESS.get_lock():
135  oldPercent = 100 * FILE_PROGRESS.value / self.nInputFilesnInputFiles
136  FILE_PROGRESS.value += 1
137  percent = 100 * FILE_PROGRESS.value / self.nInputFilesnInputFiles
138  # only log each "new percent"
139  if np.floor(percent) - np.floor(oldPercent) >= 1:
140  self.loglog.info("Completed %d / %d files: %d %% complete ",
141  FILE_PROGRESS.value,
142  self.nInputFilesnInputFiles,
143  percent)
144 
145  def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
146  """Process one HTM pixel, appending to an existing catalog or creating
147  a new catalog, as needed.
148 
149  Parameters
150  ----------
151  inputData : `numpy.ndarray`
152  The data from one input file.
153  matchedPixels : `numpy.ndarray`
154  The row-matched pixel indexes corresponding to ``inputData``.
155  pixelId : `int`
156  The pixel index we are currently processing.
157  fluxes : `dict` [`str`, `numpy.ndarray`]
158  The values that will go into the flux and fluxErr fields in the
159  output catalog.
160  coordErr : `dict` [`str`, `numpy.ndarray`]
161  The values that will go into the coord_raErr, coord_decErr, and
162  coord_ra_dec_Cov fields in the output catalog (in radians).
163  """
164  idx = np.where(matchedPixels == pixelId)[0]
165  catalog = self.getCataloggetCatalog(pixelId, self.schemaschema, len(idx))
166  for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
167  self._fillRecord_fillRecord(outputRow, inputRow)
168 
169  global COUNTER
170  with COUNTER.get_lock():
171  self._setIds_setIds(inputData[idx], catalog)
172 
173  # set fluxes from the pre-computed array
174  for name, array in fluxes.items():
175  catalog[self.key_mapkey_map[name]][-len(idx):] = array[idx]
176 
177  # set coordinate errors from the pre-computed array
178  for name, array in coordErr.items():
179  catalog[name][-len(idx):] = array[idx]
180 
181  catalog.writeFits(self.filenamesfilenames[pixelId])
182 
183  def _setIds(self, inputData, catalog):
184  """Fill the `id` field of catalog with a running index, filling the
185  last values up to the length of ``inputData``.
186 
187  Fill with `self.config.id_name` if specified, otherwise use the
188  global running counter value.
189 
190  Parameters
191  ----------
192  inputData : `numpy.ndarray`
193  The input data that is being processed.
194  catalog : `lsst.afw.table.SimpleCatalog`
195  The output catalog to fill the ids.
196  """
197  global COUNTER
198  size = len(inputData)
199  if self.configconfig.id_name:
200  catalog['id'][-size:] = inputData[self.configconfig.id_name]
201  else:
202  idEnd = COUNTER.value + size
203  catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
204  COUNTER.value = idEnd
205 
206  def getCatalog(self, pixelId, schema, nNewElements):
207  """Get a catalog from disk or create it if it doesn't exist.
208 
209  Parameters
210  ----------
211  pixelId : `dict`
212  Identifier for catalog to retrieve
213  schema : `lsst.afw.table.Schema`
214  Schema to use in catalog creation it does not exist.
215  nNewElements : `int`
216  The number of new elements that will be added to the catalog,
217  so space can be preallocated.
218 
219  Returns
220  -------
221  catalog : `lsst.afw.table.SimpleCatalog`
222  The new or read-and-resized catalog specified by `dataId`.
223  """
224  # This is safe, because we lock on this file before getCatalog is called.
225  if os.path.isfile(self.filenamesfilenames[pixelId]):
226  catalog = afwTable.SimpleCatalog.readFits(self.filenamesfilenames[pixelId])
227  catalog.resize(len(catalog) + nNewElements)
228  return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
229  catalog = afwTable.SimpleCatalog(schema)
230  catalog.resize(nNewElements)
231  self.addRefCatMetadataaddRefCatMetadata(catalog)
232  return catalog
233 
234  @staticmethod
235  def computeCoord(row, ra_name, dec_name):
236  """Create an ICRS coord. from a row of a catalog being ingested.
237 
238  Parameters
239  ----------
240  row : `numpy.ndarray`
241  Row from catalog being ingested.
242  ra_name : `str`
243  Name of RA key in catalog being ingested.
244  dec_name : `str`
245  Name of Dec key in catalog being ingested.
246 
247  Returns
248  -------
249  coord : `lsst.geom.SpherePoint`
250  ICRS coordinate.
251  """
252  return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
253 
254  def _getCoordErr(self, inputData, ):
255  """Compute the ra/dec error fields that will go into the output catalog.
256 
257  Parameters
258  ----------
259  inputData : `numpy.ndarray`
260  The input data to compute fluxes for.
261 
262  Returns
263  -------
264  coordErr : `dict` [`str`, `numpy.ndarray`]
265  The values that will go into the coord_raErr, coord_decErr, fields
266  in the output catalog (in radians).
267 
268  Notes
269  -----
270  This does not currently handle the ra/dec covariance field,
271  ``coord_ra_dec_Cov``. That field may require extra work, as its units
272  may be more complicated in external catalogs.
273  """
274  result = {}
275  if hasattr(self, "coord_err_unit"):
276  result['coord_raErr'] = u.Quantity(inputData[self.configconfig.ra_err_name],
277  self.coord_err_unitcoord_err_unit).to_value(u.radian)
278  result['coord_decErr'] = u.Quantity(inputData[self.configconfig.dec_err_name],
279  self.coord_err_unitcoord_err_unit).to_value(u.radian)
280  return result
281 
282  def _setFlags(self, record, row):
283  """Set flags in an output record.
284 
285  Parameters
286  ----------
287  record : `lsst.afw.table.SimpleRecord`
288  Row from indexed catalog to modify.
289  row : `numpy.ndarray`
290  Row from catalog being ingested.
291  """
292  names = record.schema.getNames()
293  for flag in self._flags_flags:
294  if flag in names:
295  attr_name = 'is_{}_name'.format(flag)
296  record.set(self.key_mapkey_map[flag], bool(row[getattr(self.configconfig, attr_name)]))
297 
298  def _getFluxes(self, inputData):
299  """Compute the flux fields that will go into the output catalog.
300 
301  Parameters
302  ----------
303  inputData : `numpy.ndarray`
304  The input data to compute fluxes for.
305 
306  Returns
307  -------
308  fluxes : `dict` [`str`, `numpy.ndarray`]
309  The values that will go into the flux and fluxErr fields in the
310  output catalog.
311  """
312  result = {}
313  for item in self.configconfig.mag_column_list:
314  result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
315  if len(self.configconfig.mag_err_column_map) > 0:
316  for err_key in self.configconfig.mag_err_column_map.keys():
317  error_col_name = self.configconfig.mag_err_column_map[err_key]
318  # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
319  # NOTE: copy the arrays because the numpy strides may not be useable by C++.
320  fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
321  inputData[err_key].copy())*1e9
322  result[err_key+'_fluxErr'] = fluxErr
323  return result
324 
325  def _setProperMotion(self, record, row):
326  """Set proper motion fields in a record of an indexed catalog.
327 
328  The proper motions are read from the specified columns,
329  scaled appropriately, and installed in the appropriate
330  columns of the output.
331 
332  Parameters
333  ----------
334  record : `lsst.afw.table.SimpleRecord`
335  Row from indexed catalog to modify.
336  row : structured `numpy.array`
337  Row from catalog being ingested.
338  """
339  if self.configconfig.pm_ra_name is None: # IngestIndexedReferenceConfig.validate ensures all or none
340  return
341  radPerOriginal = np.radians(self.configconfig.pm_scale)/(3600*1000)
342  record.set(self.key_mapkey_map["pm_ra"], row[self.configconfig.pm_ra_name]*radPerOriginal*lsst.geom.radians)
343  record.set(self.key_mapkey_map["pm_dec"], row[self.configconfig.pm_dec_name]*radPerOriginal*lsst.geom.radians)
344  record.set(self.key_mapkey_map["epoch"], self._epochToMjdTai_epochToMjdTai(row[self.configconfig.epoch_name]))
345  if self.configconfig.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
346  record.set(self.key_mapkey_map["pm_raErr"], row[self.configconfig.pm_ra_err_name]*radPerOriginal)
347  record.set(self.key_mapkey_map["pm_decErr"], row[self.configconfig.pm_dec_err_name]*radPerOriginal)
348 
349  def _setParallax(self, record, row):
350  """Set the parallax fields in a record of a refcat.
351  """
352  if self.configconfig.parallax_name is None:
353  return
354  scale = self.configconfig.parallax_scale*lsst.geom.milliarcseconds
355  record.set(self.key_mapkey_map['parallax'], row[self.configconfig.parallax_name]*scale)
356  record.set(self.key_mapkey_map['parallaxErr'], row[self.configconfig.parallax_err_name]*scale)
357 
358  def _epochToMjdTai(self, nativeEpoch):
359  """Convert an epoch in native format to TAI MJD (a float).
360  """
361  return astropy.time.Time(nativeEpoch, format=self.configconfig.epoch_format,
362  scale=self.configconfig.epoch_scale).tai.mjd
363 
364  def _setExtra(self, record, row):
365  """Set extra data fields in a record of an indexed catalog.
366 
367  Parameters
368  ----------
369  record : `lsst.afw.table.SimpleRecord`
370  Row from indexed catalog to modify.
371  row : structured `numpy.array`
372  Row from catalog being ingested.
373  """
374  for extra_col in self.configconfig.extra_col_names:
375  value = row[extra_col]
376  # If data read from a text file contains string like entires,
377  # numpy stores this as its own internal type, a numpy.str_
378  # object. This seems to be a consequence of how numpy stores
379  # string like objects in fixed column arrays. This checks
380  # if any of the values to be added to the catalog are numpy
381  # string types, and if they are, casts them to a python string
382  # which is what the python c++ records expect
383  if isinstance(value, np.str_):
384  value = str(value)
385  record.set(self.key_mapkey_map[extra_col], value)
386 
387  def _fillRecord(self, record, row):
388  """Fill a record in an indexed catalog to be persisted.
389 
390  Parameters
391  ----------
392  record : `lsst.afw.table.SimpleRecord`
393  Row from indexed catalog to modify.
394  row : structured `numpy.array`
395  Row from catalog being ingested.
396  """
397  record.setCoord(self.computeCoordcomputeCoord(row, self.configconfig.ra_name, self.configconfig.dec_name))
398 
399  self._setFlags_setFlags(record, row)
400  self._setProperMotion_setProperMotion(record, row)
401  self._setParallax_setParallax(record, row)
402  self._setExtra_setExtra(record, row)
403 
404 
406  """Special-case ingest manager to deal with Gaia fluxes.
407  """
408  def _getFluxes(self, input):
409  result = {}
410 
411  def gaiaFluxToFlux(flux, zeroPoint):
412  """Equations 5.19 and 5.30 from the Gaia calibration document define the
413  conversion from Gaia electron/second fluxes to AB magnitudes.
414  https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
415  """
416  result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
417  # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
418  result[flux == 0] = 0
419  return result
420 
421  # Some fluxes are 0, so log10(flux) can give warnings. We handle the
422  # zeros explicitly, so they warnings are irrelevant.
423  with np.errstate(invalid='ignore', divide='ignore'):
424  # The constants below come from table 5.3 in this document;
425  # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
426  result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
427  result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
428  result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
429 
430  result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
431  result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
432  result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
433 
434  return result
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: SortedCatalog.h:42
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
daf::base::PropertySet * set
Definition: fits.cc:912
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:60
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174