LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
ingestIndexManager.py
Go to the documentation of this file.
1 # This file is part of meas_algorithms.
2 #
3 # Developed for the LSST Data Management System.
4 # This product includes software developed by the LSST Project
5 # (https://www.lsst.org).
6 # See the COPYRIGHT file at the top-level directory of this distribution
7 # for details of code ownership.
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program. If not, see <https://www.gnu.org/licenses/>.
21 
22 __all__ = ["IngestIndexManager", "IngestGaiaManager"]
23 
24 import os.path
25 import itertools
26 import multiprocessing
27 
28 import astropy.time
29 import astropy.units as u
30 import numpy as np
31 
32 import lsst.sphgeom
33 import lsst.afw.table as afwTable
34 from lsst.afw.image import fluxErrFromABMagErr
35 
36 
37 # global shared counter to keep track of source ids
38 # (multiprocess sharing is most easily done with a global)
39 COUNTER = 0
40 # global shared counter to keep track of number of files processed.
41 FILE_PROGRESS = 0
42 
43 
45  """
46  Ingest a reference catalog from external files into a butler repository,
47  using a multiprocessing Pool to speed up the work.
48 
49  Parameters
50  ----------
51  filenames : `dict` [`int`, `str`]
52  The HTM pixel id and filenames to ingest the catalog into.
53  config : `lsst.meas.algorithms.IngestIndexedReferenceConfig`
54  The Task configuration holding the field names.
55  file_reader : `lsst.pipe.base.Task`
56  The file reader to use to load the files.
57  indexer : `lsst.meas.algorithms.HtmIndexer`
58  The class used to compute the HTM pixel per coordinate.
59  schema : `lsst.afw.table.Schema`
60  The schema of the output catalog.
61  key_map : `dict` [`str`, `lsst.afw.table.Key`]
62  The mapping from output field names to keys in the Schema.
63  htmRange : `tuple` [`int`]
64  The start and end HTM pixel ids.
65  addRefCatMetadata : callable
66  A function called to add extra metadata to each output Catalog.
67  log : `lsst.log.Log`
68  The log to send messages to.
69  """
70  _flags = ['photometric', 'resolved', 'variable']
71 
72  def __init__(self, filenames, config, file_reader, indexer,
73  schema, key_map, htmRange, addRefCatMetadata, log):
74  self.filenames = filenames
75  self.config = config
76  self.file_reader = file_reader
77  self.indexer = indexer
78  self.schema = schema
79  self.key_map = key_map
80  self.htmRange = htmRange
81  self.addRefCatMetadata = addRefCatMetadata
82  self.log = log
83 
84  def run(self, inputFiles):
85  """Index a set of input files from a reference catalog, and write the
86  output to the appropriate filenames, in parallel.
87 
88  Parameters
89  ----------
90  inputFiles : `list`
91  A list of file paths to read data from.
92  """
93  global COUNTER, FILE_PROGRESS
94  self.nInputFiles = len(inputFiles)
95 
96  with multiprocessing.Manager() as manager:
97  COUNTER = multiprocessing.Value('i', 0)
98  FILE_PROGRESS = multiprocessing.Value('i', 0)
99  fileLocks = manager.dict()
100  self.log.info("Creating %s file locks.", self.htmRange[1] - self.htmRange[0])
101  for i in range(self.htmRange[0], self.htmRange[1]):
102  fileLocks[i] = manager.Lock()
103  self.log.info("File locks created.")
104  with multiprocessing.Pool(self.config.n_processes) as pool:
105  pool.starmap(self._ingestOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
106 
107  def _ingestOneFile(self, filename, fileLocks):
108  """Read and process one file, and write its records to the correct
109  indexed files, while handling exceptions in a useful way so that they
110  don't get swallowed by the multiprocess pool.
111 
112  Parameters
113  ----------
114  filename : `str`
115  The file to process.
116  fileLocks : `dict` [`int`, `multiprocessing.Lock`]
117  A Lock for each HTM pixel; each pixel gets one file written, and
118  we need to block when one process is accessing that file.
119  """
120  global FILE_PROGRESS
121  inputData = self.file_reader.run(filename)
122  fluxes = self._getFluxes(inputData)
123  matchedPixels = self.indexer.indexPoints(inputData[self.config.ra_name],
124  inputData[self.config.dec_name])
125  pixel_ids = set(matchedPixels)
126  for pixelId in pixel_ids:
127  with fileLocks[pixelId]:
128  self._doOnePixel(inputData, matchedPixels, pixelId, fluxes)
129  with FILE_PROGRESS.get_lock():
130  oldPercent = 100 * FILE_PROGRESS.value / self.nInputFiles
131  FILE_PROGRESS.value += 1
132  percent = 100 * FILE_PROGRESS.value / self.nInputFiles
133  # only log each "new percent"
134  if np.floor(percent) - np.floor(oldPercent) >= 1:
135  self.log.info("Completed %d / %d files: %d %% complete ",
136  FILE_PROGRESS.value,
137  self.nInputFiles,
138  percent)
139 
140  def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes):
141  """Process one HTM pixel, appending to an existing catalog or creating
142  a new catalog, as needed.
143 
144  Parameters
145  ----------
146  inputData : `numpy.ndarray`
147  The data from one input file.
148  matchedPixels : `numpy.ndarray`
149  The row-matched pixel indexes corresponding to ``inputData``.
150  pixelId : `int`
151  The pixel index we are currently processing.
152  fluxes : `dict` [`str`, `numpy.ndarray`]
153  The values that will go into the flux and fluxErr fields in the
154  output catalog.
155  """
156  idx = np.where(matchedPixels == pixelId)[0]
157  catalog = self.getCatalog(pixelId, self.schema, len(idx))
158  for outputRow, inputRow in zip(catalog[-len(idx):], inputData[idx]):
159  self._fillRecord(outputRow, inputRow)
160 
161  global COUNTER
162  with COUNTER.get_lock():
163  self._setIds(inputData[idx], catalog)
164 
165  for name, array in fluxes.items():
166  catalog[self.key_map[name]][-len(idx):] = array[idx]
167 
168  catalog.writeFits(self.filenames[pixelId])
169 
170  def _setIds(self, inputData, catalog):
171  """Fill the `id` field of catalog with a running index, filling the
172  last values up to the length of ``inputData``.
173 
174  Fill with `self.config.id_name` if specified, otherwise use the
175  global running counter value.
176 
177  Parameters
178  ----------
179  inputData : `numpy.ndarray`
180  The input data that is being processed.
181  catalog : `lsst.afw.table.SimpleCatalog`
182  The output catalog to fill the ids.
183  """
184  global COUNTER
185  size = len(inputData)
186  if self.config.id_name:
187  catalog['id'][-size:] = inputData[self.config.id_name]
188  else:
189  idEnd = COUNTER.value + size
190  catalog['id'][-size:] = np.arange(COUNTER.value, idEnd)
191  COUNTER.value = idEnd
192 
193  def getCatalog(self, pixelId, schema, nNewElements):
194  """Get a catalog from disk or create it if it doesn't exist.
195 
196  Parameters
197  ----------
198  pixelId : `dict`
199  Identifier for catalog to retrieve
200  schema : `lsst.afw.table.Schema`
201  Schema to use in catalog creation it does not exist.
202  nNewElements : `int`
203  The number of new elements that will be added to the catalog,
204  so space can be preallocated.
205 
206  Returns
207  -------
208  catalog : `lsst.afw.table.SimpleCatalog`
209  The new or read-and-resized catalog specified by `dataId`.
210  """
211  # This is safe, because we lock on this file before getCatalog is called.
212  if os.path.isfile(self.filenames[pixelId]):
213  catalog = afwTable.SimpleCatalog.readFits(self.filenames[pixelId])
214  catalog.resize(len(catalog) + nNewElements)
215  return catalog.copy(deep=True) # ensure contiguity, so that column-assignment works
216  catalog = afwTable.SimpleCatalog(schema)
217  catalog.resize(nNewElements)
218  self.addRefCatMetadata(catalog)
219  return catalog
220 
221  @staticmethod
222  def computeCoord(row, ra_name, dec_name):
223  """Create an ICRS coord. from a row of a catalog being ingested.
224 
225  Parameters
226  ----------
227  row : `numpy.ndarray`
228  Row from catalog being ingested.
229  ra_name : `str`
230  Name of RA key in catalog being ingested.
231  dec_name : `str`
232  Name of Dec key in catalog being ingested.
233 
234  Returns
235  -------
236  coord : `lsst.geom.SpherePoint`
237  ICRS coordinate.
238  """
239  return lsst.geom.SpherePoint(row[ra_name], row[dec_name], lsst.geom.degrees)
240 
241  def _setCoordErr(self, record, row):
242  """Set coordinate error in a record of an indexed catalog.
243 
244  The errors are read from the specified columns, and installed
245  in the appropriate columns of the output.
246 
247  Parameters
248  ----------
249  record : `lsst.afw.table.SimpleRecord`
250  Row from indexed catalog to modify.
251  row : `numpy.ndarray`
252  Row from catalog being ingested.
253  """
254  if self.config.ra_err_name: # IngestIndexedReferenceConfig.validate ensures all or none
255  record.set(self.key_map["coord_raErr"], np.radians(row[self.config.ra_err_name]))
256  record.set(self.key_map["coord_decErr"], np.radians(row[self.config.dec_err_name]))
257 
258  def _setFlags(self, record, row):
259  """Set flags in an output record.
260 
261  Parameters
262  ----------
263  record : `lsst.afw.table.SimpleRecord`
264  Row from indexed catalog to modify.
265  row : `numpy.ndarray`
266  Row from catalog being ingested.
267  """
268  names = record.schema.getNames()
269  for flag in self._flags:
270  if flag in names:
271  attr_name = 'is_{}_name'.format(flag)
272  record.set(self.key_map[flag], bool(row[getattr(self.config, attr_name)]))
273 
274  def _getFluxes(self, inputData):
275  """Compute the flux fields that will go into the output catalog.
276 
277  Parameters
278  ----------
279  inputData : `numpy.ndarray`
280  The input data to compute fluxes for.
281 
282  Returns
283  -------
284  fluxes : `dict` [`str`, `numpy.ndarray`]
285  The values that will go into the flux and fluxErr fields in the
286  output catalog.
287  """
288  result = {}
289  for item in self.config.mag_column_list:
290  result[item+'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
291  if len(self.config.mag_err_column_map) > 0:
292  for err_key in self.config.mag_err_column_map.keys():
293  error_col_name = self.config.mag_err_column_map[err_key]
294  # TODO: multiply by 1e9 here until we have a replacement (see DM-16903)
295  # NOTE: copy the arrays because the numpy strides may not be useable by C++.
296  fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
297  inputData[err_key].copy())*1e9
298  result[err_key+'_fluxErr'] = fluxErr
299  return result
300 
301  def _setProperMotion(self, record, row):
302  """Set proper motion fields in a record of an indexed catalog.
303 
304  The proper motions are read from the specified columns,
305  scaled appropriately, and installed in the appropriate
306  columns of the output.
307 
308  Parameters
309  ----------
310  record : `lsst.afw.table.SimpleRecord`
311  Row from indexed catalog to modify.
312  row : structured `numpy.array`
313  Row from catalog being ingested.
314  """
315  if self.config.pm_ra_name is None: # IngestIndexedReferenceConfig.validate ensures all or none
316  return
317  radPerOriginal = np.radians(self.config.pm_scale)/(3600*1000)
318  record.set(self.key_map["pm_ra"], row[self.config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
319  record.set(self.key_map["pm_dec"], row[self.config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
320  record.set(self.key_map["epoch"], self._epochToMjdTai(row[self.config.epoch_name]))
321  if self.config.pm_ra_err_name is not None: # pm_dec_err_name also, by validation
322  record.set(self.key_map["pm_raErr"], row[self.config.pm_ra_err_name]*radPerOriginal)
323  record.set(self.key_map["pm_decErr"], row[self.config.pm_dec_err_name]*radPerOriginal)
324 
325  def _setParallax(self, record, row):
326  """Set the parallax fields in a record of a refcat.
327  """
328  if self.config.parallax_name is None:
329  return
330  scale = self.config.parallax_scale*lsst.geom.milliarcseconds
331  record.set(self.key_map['parallax'], row[self.config.parallax_name]*scale)
332  record.set(self.key_map['parallaxErr'], row[self.config.parallax_err_name]*scale)
333 
334  def _epochToMjdTai(self, nativeEpoch):
335  """Convert an epoch in native format to TAI MJD (a float).
336  """
337  return astropy.time.Time(nativeEpoch, format=self.config.epoch_format,
338  scale=self.config.epoch_scale).tai.mjd
339 
340  def _setExtra(self, record, row):
341  """Set extra data fields in a record of an indexed catalog.
342 
343  Parameters
344  ----------
345  record : `lsst.afw.table.SimpleRecord`
346  Row from indexed catalog to modify.
347  row : structured `numpy.array`
348  Row from catalog being ingested.
349  """
350  for extra_col in self.config.extra_col_names:
351  value = row[extra_col]
352  # If data read from a text file contains string like entires,
353  # numpy stores this as its own internal type, a numpy.str_
354  # object. This seems to be a consequence of how numpy stores
355  # string like objects in fixed column arrays. This checks
356  # if any of the values to be added to the catalog are numpy
357  # string types, and if they are, casts them to a python string
358  # which is what the python c++ records expect
359  if isinstance(value, np.str_):
360  value = str(value)
361  record.set(self.key_map[extra_col], value)
362 
363  def _fillRecord(self, record, row):
364  """Fill a record in an indexed catalog to be persisted.
365 
366  Parameters
367  ----------
368  record : `lsst.afw.table.SimpleRecord`
369  Row from indexed catalog to modify.
370  row : structured `numpy.array`
371  Row from catalog being ingested.
372  """
373  record.setCoord(self.computeCoord(row, self.config.ra_name, self.config.dec_name))
374 
375  self._setCoordErr(record, row)
376  self._setFlags(record, row)
377  self._setProperMotion(record, row)
378  self._setParallax(record, row)
379  self._setExtra(record, row)
380 
381 
383  """Special-case ingest manager to deal with Gaia fluxes.
384  """
385  def _getFluxes(self, input):
386  result = {}
387 
388  def gaiaFluxToFlux(flux, zeroPoint):
389  """Equations 5.19 and 5.30 from the Gaia calibration document define the
390  conversion from Gaia electron/second fluxes to AB magnitudes.
391  https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
392  """
393  result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy)
394  # set 0 instrumental fluxes to 0 (instead of NaN/inf from the math)
395  result[flux == 0] = 0
396  return result
397 
398  # Some fluxes are 0, so log10(flux) can give warnings. We handle the
399  # zeros explicitly, so they warnings are irrelevant.
400  with np.errstate(invalid='ignore', divide='ignore'):
401  # The constants below come from table 5.3 in this document;
402  # https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
403  result['phot_g_mean_flux'] = gaiaFluxToFlux(input['phot_g_mean_flux'], 25.7934)
404  result['phot_bp_mean_flux'] = gaiaFluxToFlux(input['phot_bp_mean_flux'], 25.3806)
405  result['phot_rp_mean_flux'] = gaiaFluxToFlux(input['phot_rp_mean_flux'], 25.1161)
406 
407  result['phot_g_mean_fluxErr'] = result['phot_g_mean_flux'] / input['phot_g_mean_flux_over_error']
408  result['phot_bp_mean_fluxErr'] = result['phot_bp_mean_flux'] / input['phot_bp_mean_flux_over_error']
409  result['phot_rp_mean_fluxErr'] = result['phot_rp_mean_flux'] / input['phot_rp_mean_flux_over_error']
410 
411  return result
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
double fluxErrFromABMagErr(double magErr, double mag) noexcept
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:60
daf::base::PropertySet * set
Definition: fits.cc:902
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:63
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
Point in an unspecified spherical coordinate system.
Definition: SpherePoint.h:57
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes)