LSSTApplications  20.0.0
LSSTDataManagementBasePackage
ingestIndexManager.py
Go to the documentation of this file.
1 # This file is part of meas_algorithms.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 __all__ = ["IngestIndexManager", "IngestGaiaManager"]
23 
24 import os.path
25 import itertools
26 import multiprocessing
27 
28 import astropy.time
29 import astropy.units as u
30 import numpy as np
31 
32 import lsst.sphgeom
33 import lsst.afw.table as afwTable
34 from lsst.afw.image import fluxErrFromABMagErr
35 
36 
37 # global shared counter to keep track of source ids
38 # (multiprocess sharing is most easily done with a global)
39 COUNTER = 0
40 # global shared counter to keep track of number of files processed.
41 FILE_PROGRESS = 0
42 
43 
45  """
46  Ingest a reference catalog from external files into a butler repository,
47  using a multiprocessing Pool to speed up the work.
48 
49  Parameters
50  ----------
51  filenames : `dict` [`int`, `str`]
52  The HTM pixel id and filenames to ingest the catalog into.
53  config : `lsst.meas.algorithms.IngestIndexedReferenceConfig`
54  The Task configuration holding the field names.
55  file_reader : `lsst.pipe.base.Task`
56  The file reader to use to load the files.
57  indexer : `lsst.meas.algorithms.HtmIndexer`
58  The class used to compute the HTM pixel per coordinate.
59  schema : `lsst.afw.table.Schema`
60  The schema of the output catalog.
61  key_map : `dict` [`str`, `lsst.afw.table.Key`]
62  The mapping from output field names to keys in the Schema.
63  htmRange : `tuple` [`int`]
64  The start and end HTM pixel ids.
65  addRefCatMetadata : callable
66  A function called to add extra metadata to each output Catalog.
67  log : `lsst.log.Log`
68  The log to send messages to.
69  """
70  _flags = ['photometric', 'resolved', 'variable']
71 
72  def __init__(self, filenames, config, file_reader, indexer,
73  schema, key_map, htmRange, addRefCatMetadata, log):
74  self.filenames = filenames
75  self.config = config
76  self.file_reader = file_reader
77  self.indexer = indexer
78  self.schema = schema
79  self.key_map = key_map
80  self.htmRange = htmRange
81  self.addRefCatMetadata = addRefCatMetadata
82  self.log = log
83  # cache this to speed up coordinate conversions
84  self.coord_err_unit = u.Unit(self.config.coord_err_unit)
85 
86  def run(self, inputFiles):
87  """Index a set of input files from a reference catalog, and write the
88  output to the appropriate filenames, in parallel.
89 
90  Parameters
91  ----------
92  inputFiles : `list`
93  A list of file paths to read data from.
94  """
95  global COUNTER, FILE_PROGRESS
96  self.nInputFiles = len(inputFiles)
97 
98  with multiprocessing.Manager() as manager:
99  COUNTER = multiprocessing.Value('i', 0)
100  FILE_PROGRESS = multiprocessing.Value('i', 0)
101  fileLocks = manager.dict()
102  self.log.info("Creating %s file locks.", self.htmRange[1] - self.htmRange[0])
103  for i in range(self.htmRange[0], self.htmRange[1]):
104  fileLocks[i] = manager.Lock()
105  self.log.info("File locks created.")
106  with multiprocessing.Pool(self.config.n_processes) as pool:
107  pool.starmap(self._ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
108 
109  def _ingestOneFile(self, filename, fileLocks):
110  """Read and process one file, and write its records to the correct
111  indexed files, while handling exceptions in a useful way so that they
112  don't get swallowed by the multiprocess pool.
113 
114  Parameters
115  ----------
116  filename : `str`
117  The file to process.
118  fileLocks : `dict` [`int`, `multiprocessing.Lock`]
119  A Lock for each HTM pixel; each pixel gets one file written, and
120  we need to block when one process is accessing that file.
121  """
122  global FILE_PROGRESS
123  inputData = self.file_reader.run(filename)
124  fluxes = self._getFluxes(inputData)
125  coordErr = self._getCoordErr(inputData)
126  matchedPixels = self.indexer.indexPoints(inputData[self.config.ra_name],
127  inputData[self.config.dec_name])
128  pixel_ids = set(matchedPixels)
129  for pixelId in pixel_ids:
130  with fileLocks[pixelId]:
131  self._doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
132  with FILE_PROGRESS.get_lock():
133  oldPercent = 100 * FILE_PROGRESS.value / self.nInputFiles
134  FILE_PROGRESS.value += 1
135  percent = 100 * FILE_PROGRESS.value / self.nInputFiles
136  # only log each "new percent"
137  if np.floor(percent) - np.floor(oldPercent) >= 1:
138  self.log.info("Completed %d / %d files: %d %% complete ",
139  FILE_PROGRESS.value,
140  self.nInputFiles,
141  percent)
142 
143  def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
144  """Process one HTM pixel, appending to an existing catalog or creating
145  a new catalog, as needed.
146 
147  Parameters
148  ----------
149  inputData : `numpy.ndarray`
150  The data from one input file.
151  matchedPixels : `numpy.ndarray`
152  The row-matched pixel indexes corresponding to ``inputData``.
153  pixelId : `int`
154  The pixel index we are currently processing.
155  fluxes : `dict` [`str`, `numpy.ndarray`]
156  The values that will go into the flux and fluxErr fields in the
157  output catalog.
158  coordErr : `dict` [`str`, `numpy.ndarray`]
159  The values that will go into the coord_raErr, coord_decErr, and
160  coord_ra_dec_Cov fields in the output catalog (in radians).
161  """
162  idx = np.where(matchedPixels == pixelId)[0]
163  catalog = self.getCatalog(pixelId, self.schema, len(idx))
164  for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
165  self._fillRecord(outputRow, inputRow)
166 
167  global COUNTER
168  with COUNTER.get_lock():
169  self._setIds(inputData[idx], catalog)
170 
171  # set fluxes from the pre-computed array
172  for name, array in fluxes.items():
173  catalog[self.key_map[name]][-len(idx):] = array[idx]
174 
175  # set coordinate errors from the pre-computed array
176  for name, array in coordErr.items():
177  catalog[name][-len(idx):] = array[idx]
178 
179  catalog.writeFits(self.filenames[pixelId])
180 
181  def _setIds(self, inputData, catalog):
182  """Fill the `id` field of catalog with a running index, filling the
183  last values up to the length of ``inputData``.
184 
185  Fill with `self.config.id_name` if specified, otherwise use the
186  global running counter value.
187 
188  Parameters
189  ----------
190  inputData : `numpy.ndarray`
191  The input data that is being processed.
192  catalog : `lsst.afw.table.SimpleCatalog`
193  The output catalog to fill the ids.
194  """
195  global COUNTER
196  size = len(inputData)
197  if self.config.id_name:
198  catalog['id'][-size:] = inputData[self.config.id_name]
199  else:
200  idEnd = COUNTER.value + size
201  catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
202  COUNTER.value = idEnd
203 
204  def getCatalog(self, pixelId, schema, nNewElements):
205  """Get a catalog from disk or create it if it doesn't exist.
206 
207  Parameters
208  ----------
209  pixelId : `dict`
210  Identifier for catalog to retrieve
211  schema : `lsst.afw.table.Schema`
212  Schema to use in catalog creation it does not exist.
213  nNewElements : `int`
214  The number of new elements that will be added to the catalog,
215  so space can be preallocated.
216 
217  Returns
218  -------
219  catalog : `lsst.afw.table.SimpleCatalog`
220  The new or read-and-resized catalog specified by `dataId`.
221  """
222  # This is safe, because we lock on this file before getCatalog is called.
223  if os.path.isfile(self.filenames[pixelId]):
224  catalog = afwTable.SimpleCatalog.readFits(self.filenames[pixelId])
225  catalog.resize(len(catalog) + nNewElements)
226  return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
227  catalog = afwTable.SimpleCatalog(schema)
228  catalog.resize(nNewElements)
229  self.addRefCatMetadata(catalog)
230  return catalog
231 
232  @staticmethod
233  def computeCoord(row, ra_name, dec_name):
234  """Create an ICRS coord. from a row of a catalog being ingested.
235 
236  Parameters
237  ----------
238  row : `numpy.ndarray`
239  Row from catalog being ingested.
240  ra_name : `str`
241  Name of RA key in catalog being ingested.
242  dec_name : `str`
243  Name of Dec key in catalog being ingested.
244 
245  Returns
246  -------
247  coord : `lsst.geom.SpherePoint`
248  ICRS coordinate.
249  """
250  return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
251 
252  def _getCoordErr(self, inputData, ):
253  """Compute the ra/dec error fields that will go into the output catalog.
254 
255  Parameters
256  ----------
257  inputData : `numpy.ndarray`
258  The input data to compute fluxes for.
259 
260  Returns
261  -------
262  coordErr : `dict` [`str`, `numpy.ndarray`]
263  The values that will go into the coord_raErr, coord_decErr, fields
264  in the output catalog (in radians).
265 
266  Notes
267  -----
268  This does not currently handle the ra/dec covariance field,
269  ``coord_ra_dec_Cov``. That field may require extra work, as its units
270  may be more complicated in external catalogs.
271  """
272  result = {}
273  result['coord_raErr'] = u.Quantity(inputData[self.config.ra_err_name],
274  self.coord_err_unit).to_value(u.radian)
275  result['coord_decErr'] = u.Quantity(inputData[self.config.dec_err_name],
276  self.coord_err_unit).to_value(u.radian)
277  return result
278 
279  def _setFlags(self, record, row):
280  """Set flags in an output record.
281 
282  Parameters
283  ----------
284  record : `lsst.afw.table.SimpleRecord`
285  Row from indexed catalog to modify.
286  row : `numpy.ndarray`
287  Row from catalog being ingested.
288  """
289  names = record.schema.getNames()
290  for flag in self._flags:
291  if flag in names:
292  attr_name = 'is_{}_name'.format(flag)
293  record.set(self.key_map[flag], bool(row[getattr(self.config, attr_name)]))
294 
295  def _getFluxes(self, inputData):
296  """Compute the flux fields that will go into the output catalog.
297 
298  Parameters
299  ----------
300  inputData : `numpy.ndarray`
301  The input data to compute fluxes for.
302 
303  Returns
304  -------
305  fluxes : `dict` [`str`, `numpy.ndarray`]
306  The values that will go into the flux and fluxErr fields in the
307  output catalog.
308  """
309  result = {}
310  for item in self.config.mag_column_list:
311  result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
312  if len(self.config.mag_err_column_map) > 0:
313  for err_key in self.config.mag_err_column_map.keys():
314  error_col_name = self.config.mag_err_column_map[err_key]
315  # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
316  # NOTE: copy the arrays because the numpy strides may not be useable by C++.
317  fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
318  inputData[err_key].copy())*1e9
319  result[err_key+'_fluxErr'] = fluxErr
320  return result
321 
322  def _setProperMotion(self, record, row):
323  """Set proper motion fields in a record of an indexed catalog.
324 
325  The proper motions are read from the specified columns,
326  scaled appropriately, and installed in the appropriate
327  columns of the output.
328 
329  Parameters
330  ----------
331  record : `lsst.afw.table.SimpleRecord`
332  Row from indexed catalog to modify.
333  row : structured `numpy.array`
334  Row from catalog being ingested.
335  """
336  if self.config.pm_ra_name is None: # IngestIndexedReferenceConfig.validate ensures all or none
337  return
338  radPerOriginal = np.radians(self.config.pm_scale)/(3600*1000)
339  record.set(self.key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
340  record.set(self.key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
341  record.set(self.key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
342  if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
343  record.set(self.key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
344  record.set(self.key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
345 
346  def _setParallax(self, record, row):
347  """Set the parallax fields in a record of a refcat.
348  """
349  if self.config.parallax_name is None:
350  return
351  scale = self.config.parallax_scale*lsst.geom.milliarcseconds
352  record.set(self.key_map['parallax'], row[self.config.parallax_name]*scale)
353  record.set(self.key_map['parallaxErr'], row[self.config.parallax_err_name]*scale)
354 
355  def _epochToMjdTai(self, nativeEpoch):
356  """Convert an epoch in native format to TAI MJD (a float).
357  """
358  return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
359  scale=self.config.epoch_scale).tai.mjd
360 
361  def _setExtra(self, record, row):
362  """Set extra data fields in a record of an indexed catalog.
363 
364  Parameters
365  ----------
366  record : `lsst.afw.table.SimpleRecord`
367  Row from indexed catalog to modify.
368  row : structured `numpy.array`
369  Row from catalog being ingested.
370  """
371  for extra_col in self.config.extra_col_names:
372  value = row[extra_col]
373  # If data read from a text file contains string like entires,
374  # numpy stores this as its own internal type, a numpy.str_
375  # object. This seems to be a consequence of how numpy stores
376  # string like objects in fixed column arrays. This checks
377  # if any of the values to be added to the catalog are numpy
378  # string types, and if they are, casts them to a python string
379  # which is what the python c++ records expect
380  if isinstance(value, np.str_):
381  value = str(value)
382  record.set(self.key_map[extra_col], value)
383 
384  def _fillRecord(self, record, row):
385  """Fill a record in an indexed catalog to be persisted.
386 
387  Parameters
388  ----------
389  record : `lsst.afw.table.SimpleRecord`
390  Row from indexed catalog to modify.
391  row : structured `numpy.array`
392  Row from catalog being ingested.
393  """
394  record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
395 
396  self._setFlags(record, row)
397  self._setProperMotion(record, row)
398  self._setParallax(record, row)
399  self._setExtra(record, row)
400 
401 
403  """Special-case ingest manager to deal with Gaia fluxes.
404  """
405  def _getFluxes(self, input):
406  result = {}
407 
408  def gaiaFluxToFlux(flux, zeroPoint):
409  """Equations 5.19 and 5.30 from the Gaia calibration document define the
410  conversion from Gaia electron/second fluxes to AB magnitudes.
411  https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
412  """
413  result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
414  # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
415  result[flux == 0] = 0
416  return result
417 
418  # Some fluxes are 0, so log10(flux) can give warnings. We handle the
419  # zeros explicitly, so they warnings are irrelevant.
420  with np.errstate(invalid='ignore', divide='ignore'):
421  # The constants below come from table 5.3 in this document;
422  # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
423  result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
424  result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
425  result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
426 
427  result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
428  result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
429  result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
430 
431  return result
lsst::afw::image
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects.
Definition: imageAlgorithm.dox:1
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._flags
list _flags
Definition: ingestIndexManager.py:70
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.coord_err_unit
coord_err_unit
Definition: ingestIndexManager.py:83
lsst::log.log.logContinued.info
def info(fmt, *args)
Definition: logContinued.py:198
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._epochToMjdTai
def _epochToMjdTai(self, nativeEpoch)
Definition: ingestIndexManager.py:355
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._setFlags
def _setFlags(self, record, row)
Definition: ingestIndexManager.py:279
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.schema
schema
Definition: ingestIndexManager.py:77
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._setIds
def _setIds(self, inputData, catalog)
Definition: ingestIndexManager.py:181
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._doOnePixel
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
Definition: ingestIndexManager.py:143
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._ingestOneFile
def _ingestOneFile(self, filename, fileLocks)
Definition: ingestIndexManager.py:109
pex.config.history.format
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.key_map
key_map
Definition: ingestIndexManager.py:78
lsst::meas::algorithms.ingestIndexManager.IngestGaiaManager
Definition: ingestIndexManager.py:402
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager
Definition: ingestIndexManager.py:44
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._setParallax
def _setParallax(self, record, row)
Definition: ingestIndexManager.py:346
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._getCoordErr
def _getCoordErr(self, inputData)
Definition: ingestIndexManager.py:252
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.log
log
Definition: ingestIndexManager.py:81
lsst::afw::image::fluxErrFromABMagErr
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:60
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._setProperMotion
def _setProperMotion(self, record, row)
Definition: ingestIndexManager.py:322
lsst::sphgeom
Definition: Angle.h:38
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.indexer
indexer
Definition: ingestIndexManager.py:76
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._getFluxes
def _getFluxes(self, inputData)
Definition: ingestIndexManager.py:295
lsst::afw::table
Definition: table.dox:3
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.run
def run(self, inputFiles)
Definition: ingestIndexManager.py:86
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._fillRecord
def _fillRecord(self, record, row)
Definition: ingestIndexManager.py:384
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager._setExtra
def _setExtra(self, record, row)
Definition: ingestIndexManager.py:361
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.computeCoord
def computeCoord(row, ra_name, dec_name)
Definition: ingestIndexManager.py:233
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.filenames
filenames
Definition: ingestIndexManager.py:73
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.addRefCatMetadata
addRefCatMetadata
Definition: ingestIndexManager.py:80
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.htmRange
htmRange
Definition: ingestIndexManager.py:79
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.__init__
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
Definition: ingestIndexManager.py:72
lsst::geom::SpherePoint
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.file_reader
file_reader
Definition: ingestIndexManager.py:75
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.config
config
Definition: ingestIndexManager.py:74
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.getCatalog
def getCatalog(self, pixelId, schema, nNewElements)
Definition: ingestIndexManager.py:204
set
daf::base::PropertySet * set
Definition: fits.cc:912
lsst::afw::table::SortedCatalogT
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Definition: fwd.h:63
lsst::meas::algorithms.ingestIndexManager.IngestIndexManager.nInputFiles
nInputFiles
Definition: ingestIndexManager.py:96