LSSTApplications  11.0-13-gbb96280,12.1+18,12.1+7,12.1-1-g14f38d3+72,12.1-1-g16c0db7+5,12.1-1-g5961e7a+84,12.1-1-ge22e12b+23,12.1-11-g06625e2+4,12.1-11-g0d7f63b+4,12.1-19-gd507bfc,12.1-2-g7dda0ab+38,12.1-2-gc0bc6ab+81,12.1-21-g6ffe579+2,12.1-21-gbdb6c2a+4,12.1-24-g941c398+5,12.1-3-g57f6835+7,12.1-3-gf0736f3,12.1-37-g3ddd237,12.1-4-gf46015e+5,12.1-5-g06c326c+20,12.1-5-g648ee80+3,12.1-5-gc2189d7+4,12.1-6-ga608fc0+1,12.1-7-g3349e2a+5,12.1-7-gfd75620+9,12.1-9-g577b946+5,12.1-9-gc4df26a+10
LSSTDataManagementBasePackage
ingestIndexReferenceTask.py
Go to the documentation of this file.
1 #
2 # LSST Data Management System
3 # Copyright 2008-2016 AURA/LSST.
4 #
5 # This product includes software developed by the
6 # LSST Project (http://www.lsst.org/).
7 #
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the LSST License Statement and
19 # the GNU General Public License along with this program. If not,
20 # see <https://www.lsstcorp.org/LegalNotices/>.
21 #
22 from __future__ import absolute_import, division, print_function
23 import numpy as np
24 
25 import lsst.pex.config as pexConfig
26 import lsst.pipe.base as pipeBase
27 import lsst.afw.table as afwTable
28 import lsst.afw.coord as afwCoord
29 import lsst.afw.geom as afwGeom
30 from lsst.afw.image import fluxFromABMag, fluxErrFromABMagErr
31 from .indexerRegistry import IndexerRegistry
32 from .readTextCatalogTask import ReadTextCatalogTask
33 
34 __all__ = ["IngestIndexedReferenceConfig", "IngestIndexedReferenceTask", "DatasetConfig"]
35 
36 
37 class IngestReferenceRunner(pipeBase.TaskRunner):
38  """!Task runner for the reference catalog ingester
39 
40  Data IDs are ignored so the runner should just run the task on the parsed command.
41  """
42 
43  def run(self, parsedCmd):
44  """!Run the task.
45  Several arguments need to be collected to send on to the task methods.
46 
47  @param[in] parsedCmd Parsed command including command line arguments.
48  @param[out] Struct containing the result of the indexing.
49  """
50  files = parsedCmd.files
51  butler = parsedCmd.butler
52  task = self.TaskClass(config=self.config, log=self.log, butler=butler)
53  task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
54 
55  result = task.create_indexed_catalog(files)
56  if self.doReturnResults:
57  return pipeBase.Struct(
58  result=result,
59  )
60 
61 class DatasetConfig(pexConfig.Config):
62  ref_dataset_name = pexConfig.Field(
63  dtype=str,
64  default='cal_ref_cat',
65  doc='String to pass to the butler to retrieve persisted files.',
66  )
67  indexer = IndexerRegistry.makeField(
68  default='HTM',
69  doc='Name of indexer algoritm to use. Default is HTM',
70  )
71 
72 class IngestIndexedReferenceConfig(pexConfig.Config):
73  dataset_config = pexConfig.ConfigField(
74  dtype=DatasetConfig,
75  doc="Configuration for reading the ingested data",
76  )
77  file_reader = pexConfig.ConfigurableField(
78  target=ReadTextCatalogTask,
79  doc='Task to use to read the files. Default is to expect text files.'
80  )
81  ra_name = pexConfig.Field(
82  dtype=str,
83  doc="Name of RA column",
84  )
85  dec_name = pexConfig.Field(
86  dtype=str,
87  doc="Name of Dec column",
88  )
89  mag_column_list = pexConfig.ListField(
90  dtype=str,
91  doc="The values in the reference catalog are assumed to be in AB magnitudes. "
92  "List of column names to use for photometric information. At least one entry is required."
93  )
94  mag_err_column_map = pexConfig.DictField(
95  keytype=str,
96  itemtype=str,
97  default={},
98  doc="A map of magnitude column name (key) to magnitude error column (value)."
99  )
100  is_photometric_name = pexConfig.Field(
101  dtype=str,
102  optional=True,
103  doc='Name of column stating if satisfactory for photometric calibration (optional).'
104  )
105  is_resolved_name = pexConfig.Field(
106  dtype=str,
107  optional=True,
108  doc='Name of column stating if the object is resolved (optional).'
109  )
110  is_variable_name = pexConfig.Field(
111  dtype=str,
112  optional=True,
113  doc='Name of column stating if the object is measured to be variable (optional).'
114  )
115  id_name = pexConfig.Field(
116  dtype=str,
117  optional=True,
118  doc='Name of column to use as an identifier (optional).'
119  )
120  extra_col_names = pexConfig.ListField(
121  dtype=str,
122  default=[],
123  doc='Extra columns to add to the reference catalog.'
124  )
125 
126  def validate(self):
127  pexConfig.Config.validate(self)
128  if not (self.ra_name and self.dec_name and self.mag_column_list):
129  raise ValueError("ra_name and dec_name and at least one entry in mag_column_list must be" +
130  " supplied.")
131  if len(self.mag_err_column_map) > 0 and not len(self.mag_column_list) == len(self.mag_err_column_map):
132  raise ValueError("If magnitude errors are provided, all magnitudes must have an error column")
133 
134 
135 class IngestIndexedReferenceTask(pipeBase.CmdLineTask):
136  """!Class for both producing indexed reference catalogs and for loading them.
137 
138  This implements an indexing scheme based on hierarchical triangular mesh (HTM).
139  The term index really means breaking the catalog into localized chunks called
140  shards. In this case each shard contains the entries from the catalog in a single
141  HTM trixel
142  """
143  canMultiprocess = False
144  ConfigClass = IngestIndexedReferenceConfig
145  RunnerClass = IngestReferenceRunner
146  _DefaultName = 'IngestIndexedReferenceTask'
147 
148  _flags = ['photometric', 'resolved', 'variable']
149 
150  @classmethod
152  """Create an argument parser
153 
154  This overrides the original because we need the file arguments
155  """
156  parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
157  parser.add_argument("files", nargs="+", help="Names of files to index")
158  return parser
159 
160  def __init__(self, *args, **kwargs):
161  """!Constructor for the HTM indexing engine
162 
163  @param[in] butler dafPersistence.Butler object for reading and writing catalogs
164  """
165  self.butler = kwargs.pop('butler')
166  pipeBase.Task.__init__(self, *args, **kwargs)
167  self.indexer = IndexerRegistry[self.config.dataset_config.indexer.name](self.config.dataset_config.indexer.active)
168  self.makeSubtask('file_reader')
169 
170  def create_indexed_catalog(self, files):
171  """!Index a set of files comprising a reference catalog. Outputs are persisted in the
172  data repository.
173 
174  @param[in] files A list of file names to read.
175  """
176  rec_num = 0
177  first = True
178  for filename in files:
179  arr = self.file_reader.run(filename)
180  index_list = self.indexer.index_points(arr[self.config.ra_name], arr[self.config.dec_name])
181  if first:
182  schema, key_map = self.make_schema(arr.dtype)
183  # persist empty catalog to hold the master schema
184  dataId = self.indexer.make_data_id('master_schema', self.config.dataset_config.ref_dataset_name)
185  self.butler.put(self.get_catalog(dataId, schema), 'ref_cat',
186  dataId=dataId)
187  first = False
188  pixel_ids = set(index_list)
189  for pixel_id in pixel_ids:
190  dataId = self.indexer.make_data_id(pixel_id, self.config.dataset_config.ref_dataset_name)
191  catalog = self.get_catalog(dataId, schema)
192  els = np.where(index_list == pixel_id)
193  for row in arr[els]:
194  record = catalog.addNew()
195  rec_num = self._fill_record(record, row, rec_num, key_map)
196  self.butler.put(catalog, 'ref_cat', dataId=dataId)
197  dataId = self.indexer.make_data_id(None, self.config.dataset_config.ref_dataset_name)
198  self.butler.put(self.config.dataset_config, 'ref_cat_config', dataId=dataId)
199 
200  @staticmethod
201  def compute_coord(row, ra_name, dec_name):
202  """!Create a afwCoord object from a np.array row
203  @param[in] row dict like object with ra/dec info in degrees
204  @param[in] ra_name name of RA key
205  @param[in] dec_name name of Dec key
206  @param[out] IcrsCoord object constructed from the RA/Dec values
207  """
208  return afwCoord.IcrsCoord(row[ra_name]*afwGeom.degrees,
209  row[dec_name]*afwGeom.degrees)
210 
211  def _set_flags(self, record, row, key_map):
212  """!Set the flags for a record. Relies on the _flags class attribute
213  @param[in,out] record SourceCatalog record to modify
214  @param[in] row dict like object containing flag info
215  @param[in] key_map Map of catalog keys to use in filling the record
216  """
217  names = record.schema.getNames()
218  for flag in self._flags:
219  if flag in names:
220  attr_name = 'is_{}_name'.format(flag)
221  record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
222 
223  def _set_mags(self, record, row, key_map):
224  """!Set the flux records from the input magnitudes
225  @param[in,out] record SourceCatalog record to modify
226  @param[in] row dict like object containing magnitude values
227  @param[in] key_map Map of catalog keys to use in filling the record
228  """
229  for item in self.config.mag_column_list:
230  record.set(key_map[item+'_flux'], fluxFromABMag(row[item]))
231  if len(self.config.mag_err_column_map) > 0:
232  for err_key in self.config.mag_err_column_map.keys():
233  error_col_name = self.config.mag_err_column_map[err_key]
234  record.set(key_map[err_key+'_fluxSigma'],
235  fluxErrFromABMagErr(row[error_col_name], row[err_key]))
236 
237  def _set_extra(self, record, row, key_map):
238  """!Copy the extra column information to the record
239  @param[in,out] record SourceCatalog record to modify
240  @param[in] row dict like object containing the column values
241  @param[in] key_map Map of catalog keys to use in filling the record
242  """
243  for extra_col in self.config.extra_col_names:
244  value = row[extra_col]
245  # If data read from a text file contains string like entires,
246  # numpy stores this as its own internal type, a numpy.str_
247  # object. This seems to be a consequence of how numpy stores
248  # string like objects in fixed column arrays. This checks
249  # if any of the values to be added to the catalog are numpy
250  # string types, and if they are, casts them to a python string
251  # which is what the python c++ records expect
252  if isinstance(value, np.str_):
253  value = str(value)
254  record.set(key_map[extra_col], value)
255 
256  def _fill_record(self, record, row, rec_num, key_map):
257  """!Fill a record to put in the persisted indexed catalogs
258 
259  @param[in,out] record afwTable.SourceRecord in a reference catalog to fill.
260  @param[in] row A row from a numpy array constructed from the input catalogs.
261  @param[in] rec_num Starting integer to increment for the unique id
262  @param[in] key_map Map of catalog keys to use in filling the record
263  """
264  record.setCoord(self.compute_coord(row, self.config.ra_name, self.config.dec_name))
265  if self.config.id_name:
266  record.setId(row[self.config.id_name])
267  else:
268  rec_num += 1
269  record.setId(rec_num)
270  # No parents
271  record.setParent(-1)
272 
273  self._set_flags(record, row, key_map)
274  self._set_mags(record, row, key_map)
275  self._set_extra(record, row, key_map)
276  return rec_num
277 
278  def get_catalog(self, dataId, schema):
279  """!Get a catalog from the butler or create it if it doesn't exist
280 
281  @param[in] dataId Identifier for catalog to retrieve
282  @param[in] schema Schema to use in catalog creation if the butler can't get it
283  @param[out] afwTable.SourceCatalog for the specified identifier
284  """
285  if self.butler.datasetExists('ref_cat', dataId=dataId):
286  return self.butler.get('ref_cat', dataId=dataId)
287  return afwTable.SourceCatalog(schema)
288 
289  def make_schema(self, dtype):
290  """!Make the schema to use in constructing the persisted catalogs.
291 
292  @param[in] dtype A np.dtype to use in constructing the schema
293  @param[out] The schema for the output source catalog.
294  @param[out] A map of catalog keys to use in filling the record
295  """
296  key_map = {}
297  mag_column_list = self.config.mag_column_list
298  mag_err_column_map = self.config.mag_err_column_map
299  if len(mag_err_column_map) > 0 and (
300  not len(mag_column_list) == len(mag_err_column_map) or
301  not sorted(mag_column_list) == sorted(mag_err_column_map.keys())):
302  raise ValueError("Every magnitude column must have a corresponding error column")
303  # makes a schema with a coord, id and parent_id
304  schema = afwTable.SourceTable.makeMinimalSchema()
305 
306  def add_field(name):
307  if dtype[name].kind == 'U':
308  # dealing with a string like thing. Need to get type and size.
309  at_type = afwTable.aliases[str]
310  at_size = dtype[name].itemsize
311  return schema.addField(name, type=at_type, size=at_size)
312  else:
313  at_type = afwTable.aliases[dtype[name].type]
314  return schema.addField(name, at_type)
315 
316  for item in mag_column_list:
317  key_map[item+'_flux'] = schema.addField(item+'_flux', float)
318  if len(mag_err_column_map) > 0:
319  for err_item in mag_err_column_map.keys():
320  key_map[err_item+'_fluxSigma'] = schema.addField(err_item+'_fluxSigma', float)
321  for flag in self._flags:
322  attr_name = 'is_{}_name'.format(flag)
323  if getattr(self.config, attr_name):
324  key_map[flag] = schema.addField(flag, 'Flag')
325  for col in self.config.extra_col_names:
326  key_map[col] = add_field(col)
327  return schema, key_map
def _fill_record
Fill a record to put in the persisted indexed catalogs.
double fluxErrFromABMagErr(double magErr, double mag)
Compute flux error in Janskys from AB magnitude error and AB magnitude.
Definition: Calib.h:74
double fluxFromABMag(double mag)
Compute flux in Janskys from AB magnitude.
Definition: Calib.h:69
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:55
def make_schema
Make the schema to use in constructing the persisted catalogs.
def create_indexed_catalog
Index a set of files comprising a reference catalog.
def get_catalog
Get a catalog from the butler or create it if it doesn&#39;t exist.
Class for both producing indexed reference catalogs and for loading them.
A class to handle Icrs coordinates (inherits from Coord)
Definition: Coord.h:156