LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.45
LSSTDataManagementBasePackage
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 #
4 # Copyright 2008-2017 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 
24 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig",
25  "IngestGaiaReferenceTask"]
26 
27 import os.path
28 
29 import lsst.pex.config as pexConfig
30 import lsst.pipe.base as pipeBase
31 import lsst.geom
32 import lsst.sphgeom
33 import lsst.afw.table as afwTable
34 from lsst.daf.base import PropertyList
35 from .indexerRegistry import IndexerRegistry
36 from .readTextCatalogTask import ReadTextCatalogTask
37 from .loadReferenceObjects import LoadReferenceObjectsTask
38 from . import ingestIndexManager
39 
40 # The most recent Indexed Reference Catalog on-disk format version.
41 LATEST_FORMAT_VERSION = 1
42 
43 
44 def addRefCatMetadata(catalog):
45  """Add metadata to a new (not yet populated) reference catalog.
46 
47  Parameters
48  ----------
49  catalog : `lsst.afw.table.SimpleCatalog`
50  Catalog to which metadata should be attached. Will be modified
51  in-place.
52  """
53  md = catalog.getMetadata()
54  if md is None:
55  md = PropertyList()
56  md.set("REFCAT_FORMAT_VERSION", LATEST_FORMAT_VERSION)
57  catalog.setMetadata(md)
58 
59 
60 class IngestReferenceRunner(pipeBase.TaskRunner):
61  """Task runner for the reference catalog ingester
62 
63  Data IDs are ignored so the runner should just run the task on the parsed command.
64  """
65 
66  def run(self, parsedCmd):
67  """Run the task.
68 
69  Several arguments need to be collected to send on to the task methods.
70 
71  Parameters
72  ----------
73  parsedCmd : `argparse.Namespace`
74  Parsed command.
75 
76  Returns
77  -------
78  results : `lsst.pipe.base.Struct` or `None`
79  A empty struct if self.doReturnResults, else None
80  """
81  files = parsedCmd.files
82  butler = parsedCmd.butler
83  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
84  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
85 
86  task.createIndexedCatalog(files)
87  if self.doReturnResults:
88  return pipeBase.Struct()
89 
90 
91 class DatasetConfig(pexConfig.Config):
92  """The description of the on-disk storage format for the persisted
93  reference catalog.
94  """
95  format_version = pexConfig.Field(
96  dtype=int,
97  doc="Version number of the persisted on-disk storage format."
98  "\nVersion 0 had Jy as flux units (default 0 for unversioned catalogs)."
99  "\nVersion 1 had nJy as flux units.",
100  default=0 # This needs to always be 0, so that unversioned catalogs are interpreted as version 0.
101  )
102  ref_dataset_name = pexConfig.Field(
103  dtype=str,
104  default='cal_ref_cat',
105  doc='String to pass to the butler to retrieve persisted files.',
106  )
107  indexer = IndexerRegistry.makeField(
108  default='HTM',
109  doc='Name of indexer algoritm to use. Default is HTM',
110  )
111 
112 
113 class IngestIndexedReferenceConfig(pexConfig.Config):
114  dataset_config = pexConfig.ConfigField(
115  dtype=DatasetConfig,
116  doc="Configuration for reading the ingested data",
117  )
118  n_processes = pexConfig.Field(
119  dtype=int,
120  doc=("Number of python processes to use when ingesting."),
121  default=1
122  )
123  file_reader = pexConfig.ConfigurableField(
124  target=ReadTextCatalogTask,
125  doc='Task to use to read the files. Default is to expect text files.'
126  )
127  ra_name = pexConfig.Field(
128  dtype=str,
129  doc="Name of RA column",
130  )
131  dec_name = pexConfig.Field(
132  dtype=str,
133  doc="Name of Dec column",
134  )
135  ra_err_name = pexConfig.Field(
136  dtype=str,
137  doc="Name of RA error column",
138  optional=True,
139  )
140  dec_err_name = pexConfig.Field(
141  dtype=str,
142  doc="Name of Dec error column",
143  optional=True,
144  )
145  mag_column_list = pexConfig.ListField(
146  dtype=str,
147  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
148  "List of column names to use for photometric information. At least one entry is required."
149  )
150  mag_err_column_map = pexConfig.DictField(
151  keytype=str,
152  itemtype=str,
153  default={},
154  doc="A map of magnitude column name (key) to magnitude error column (value)."
155  )
156  is_photometric_name = pexConfig.Field(
157  dtype=str,
158  optional=True,
159  doc='Name of column stating if satisfactory for photometric calibration (optional).'
160  )
161  is_resolved_name = pexConfig.Field(
162  dtype=str,
163  optional=True,
164  doc='Name of column stating if the object is resolved (optional).'
165  )
166  is_variable_name = pexConfig.Field(
167  dtype=str,
168  optional=True,
169  doc='Name of column stating if the object is measured to be variable (optional).'
170  )
171  id_name = pexConfig.Field(
172  dtype=str,
173  optional=True,
174  doc='Name of column to use as an identifier (optional).'
175  )
176  pm_ra_name = pexConfig.Field(
177  dtype=str,
178  doc="Name of proper motion RA column",
179  optional=True,
180  )
181  pm_dec_name = pexConfig.Field(
182  dtype=str,
183  doc="Name of proper motion Dec column",
184  optional=True,
185  )
186  pm_ra_err_name = pexConfig.Field(
187  dtype=str,
188  doc="Name of proper motion RA error column",
189  optional=True,
190  )
191  pm_dec_err_name = pexConfig.Field(
192  dtype=str,
193  doc="Name of proper motion Dec error column",
194  optional=True,
195  )
196  pm_scale = pexConfig.Field(
197  dtype=float,
198  doc="Scale factor by which to multiply proper motion values to obtain units of milliarcsec/year",
199  default=1.0,
200  )
201  parallax_name = pexConfig.Field(
202  dtype=str,
203  doc="Name of parallax column",
204  optional=True,
205  )
206  parallax_err_name = pexConfig.Field(
207  dtype=str,
208  doc="Name of parallax error column",
209  optional=True,
210  )
211  parallax_scale = pexConfig.Field(
212  dtype=float,
213  doc="Scale factor by which to multiply parallax values to obtain units of milliarcsec",
214  default=1.0,
215  )
216  epoch_name = pexConfig.Field(
217  dtype=str,
218  doc="Name of epoch column",
219  optional=True,
220  )
221  epoch_format = pexConfig.Field(
222  dtype=str,
223  doc="Format of epoch column: any value accepted by astropy.time.Time, e.g. 'iso' or 'unix'",
224  optional=True,
225  )
226  epoch_scale = pexConfig.Field(
227  dtype=str,
228  doc="Scale of epoch column: any value accepted by astropy.time.Time, e.g. 'utc'",
229  optional=True,
230  )
231  extra_col_names = pexConfig.ListField(
232  dtype=str,
233  default=[],
234  doc='Extra columns to add to the reference catalog.'
235  )
236 
237  def setDefaults(self):
238  # Newly ingested reference catalogs always have the latest format_version.
239  self.dataset_config.format_version = LATEST_FORMAT_VERSION
240 
241  def validate(self):
242  pexConfig.Config.validate(self)
243 
244  def assertAllOrNone(*names):
245  """Raise ValueError unless all the named fields are set or are
246  all none (or blank)
247  """
248  setNames = [name for name in names if bool(getattr(self, name))]
249  if len(setNames) in (len(names), 0):
250  return
251  prefix = "Both or neither" if len(names) == 2 else "All or none"
252  raise ValueError("{} of {} must be set, but only {} are set".format(
253  prefix, ", ".join(names), ", ".join(setNames)))
254 
255  if not (self.ra_name and self.dec_name and self.mag_column_list):
256  raise ValueError(
257  "ra_name and dec_name and at least one entry in mag_column_list must be supplied.")
258  if self.mag_err_column_map and set(self.mag_column_list) != set(self.mag_err_column_map.keys()):
259  raise ValueError(
260  "mag_err_column_map specified, but keys do not match mag_column_list: {} != {}".format(
261  sorted(self.mag_err_column_map.keys()), sorted(self.mag_column_list)))
262  assertAllOrNone("ra_err_name", "dec_err_name")
263  assertAllOrNone("epoch_name", "epoch_format", "epoch_scale")
264  assertAllOrNone("pm_ra_name", "pm_dec_name")
265  assertAllOrNone("pm_ra_err_name", "pm_dec_err_name")
266  assertAllOrNone("parallax_name", "parallax_err_name")
267  if self.pm_ra_err_name and not self.pm_ra_name:
268  raise ValueError('"pm_ra/dec_name" must be specified if "pm_ra/dec_err_name" are specified')
269  if (self.pm_ra_name or self.parallax_name) and not self.epoch_name:
270  raise ValueError(
271  '"epoch_name" must be specified if "pm_ra/dec_name" or "parallax_name" are specified')
272 
273 
274 class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
275  """Class for producing and loading indexed reference catalogs.
276 
277  This implements an indexing scheme based on hierarchical triangular
278  mesh (HTM). The term index really means breaking the catalog into
279  localized chunks called shards. In this case each shard contains
280  the entries from the catalog in a single HTM trixel
281 
282  For producing catalogs this task makes the following assumptions
283  about the input catalogs:
284  - RA, Dec, RA error and Dec error are all in decimal degrees.
285  - Epoch is available in a column, in a format supported by astropy.time.Time.
286  - There are no off-diagonal covariance terms, such as covariance
287  between RA and Dec, or between PM RA and PM Dec. Gaia is a well
288  known example of a catalog that has such terms, and thus should not
289  be ingested with this task.
290 
291  Parameters
292  ----------
293  butler : `lsst.daf.persistence.Butler`
294  Data butler for reading and writing catalogs
295  """
296  canMultiprocess = False
297  ConfigClass = IngestIndexedReferenceConfig
298  RunnerClass = IngestReferenceRunner
299  _DefaultName = 'IngestIndexedReferenceTask'
300 
301  @classmethod
302  def _makeArgumentParser(cls):
303  """Create an argument parser.
304 
305  This returns a standard parser with an extra "files" argument.
306  """
307  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
308  parser.add_argument("files", nargs="+", help="Names of files to index")
309  return parser
310 
311  def __init__(self, *args, butler=None, **kwargs):
312  self.butler = butler
313  super().__init__(*args, **kwargs)
314  self.indexer = IndexerRegistry[self.config.dataset_config.indexer.name](
315  self.config.dataset_config.indexer.active)
316  self.makeSubtask('file_reader')
318 
319  def createIndexedCatalog(self, inputFiles):
320  """Index a set of files comprising a reference catalog.
321 
322  Outputs are persisted in the butler repository.
323 
324  Parameters
325  ----------
326  inputFiles : `list`
327  A list of file paths to read.
328  """
329  schema, key_map = self._saveMasterSchema(inputFiles[0])
330  # create an HTM we can interrogate about pixel ids
331  htm = lsst.sphgeom.HtmPixelization(self.indexer.htm.get_depth())
332  filenames = self._getButlerFilenames(htm)
333  worker = self.IngestManager(filenames,
334  self.config,
335  self.file_reader,
336  self.indexer,
337  schema,
338  key_map,
339  htm.universe()[0],
340  addRefCatMetadata,
341  self.log)
342  worker.run(inputFiles)
343 
344  # write the config that was used to generate the refcat
345  dataId = self.indexer.makeDataId(None, self.config.dataset_config.ref_dataset_name)
346  self.butler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
347 
348  def _saveMasterSchema(self, filename):
349  """Generate and save the master catalog schema.
350 
351  Parameters
352  ----------
353  filename : `str`
354  An input file to read to get the input dtype.
355  """
356  arr = self.file_reader.run(filename)
357  schema, key_map = self.makeSchema(arr.dtype)
358  dataId = self.indexer.makeDataId('master_schema',
359  self.config.dataset_config.ref_dataset_name)
360 
361  catalog = afwTable.SimpleCatalog(schema)
362  addRefCatMetadata(catalog)
363  self.butler.put(catalog, 'ref_cat', dataId=dataId)
364  return schema, key_map
365 
366  def _getButlerFilenames(self, htm):
367  """Get filenames from the butler for each output pixel."""
368  filenames = {}
369  start, end = htm.universe()[0]
370  # path manipulation because butler.get() per pixel will take forever
371  dataId = self.indexer.makeDataId(start, self.config.dataset_config.ref_dataset_name)
372  path = self.butler.get('ref_cat_filename', dataId=dataId)[0]
373  base = os.path.join(os.path.dirname(path), "%d"+os.path.splitext(path)[1])
374  for pixelId in range(start, end):
375  filenames[pixelId] = base % pixelId
376 
377  return filenames
378 
379  def makeSchema(self, dtype):
380  """Make the schema to use in constructing the persisted catalogs.
381 
382  Parameters
383  ----------
384  dtype : `numpy.dtype`
385  Data type describing each entry in ``config.extra_col_names``
386  for the catalogs being ingested.
387 
388  Returns
389  -------
390  schemaAndKeyMap : `tuple` of (`lsst.afw.table.Schema`, `dict`)
391  A tuple containing two items:
392  - The schema for the output source catalog.
393  - A map of catalog keys to use in filling the record
394  """
395  # make a schema with the standard fields
396  schema = LoadReferenceObjectsTask.makeMinimalSchema(
397  filterNameList=self.config.mag_column_list,
398  addCentroid=False,
399  addIsPhotometric=bool(self.config.is_photometric_name),
400  addIsResolved=bool(self.config.is_resolved_name),
401  addIsVariable=bool(self.config.is_variable_name),
402  coordErrDim=2 if bool(self.config.ra_err_name) else 0,
403  addProperMotion=2 if bool(self.config.pm_ra_name) else 0,
404  properMotionErrDim=2 if bool(self.config.pm_ra_err_name) else 0,
405  addParallax=bool(self.config.parallax_name),
406  )
407  keysToSkip = set(("id", "centroid_x", "centroid_y", "hasCentroid"))
408  key_map = {fieldName: schema[fieldName].asKey() for fieldName in schema.getOrderedNames()
409  if fieldName not in keysToSkip}
410 
411  def addField(name):
412  if dtype[name].kind == 'U':
413  # dealing with a string like thing. Need to get type and size.
414  at_size = dtype[name].itemsize
415  return schema.addField(name, type=str, size=at_size)
416  else:
417  at_type = dtype[name].type
418  return schema.addField(name, at_type)
419 
420  for col in self.config.extra_col_names:
421  key_map[col] = addField(col)
422  return schema, key_map
423 
424 
426  """A special-cased version of the refcat ingester for Gaia DR2.
427  """
428  def __init__(self, *args, **kwargs):
429  super().__init__(*args, **kwargs)
def format(config, name=None, writeSourceLine=True, prefix="", verbose=False)
Definition: history.py:174
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
daf::base::PropertySet * set
Definition: fits.cc:902
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:63
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
HtmPixelization provides HTM indexing of points and regions.