22 from __future__
import absolute_import, division, print_function
31 from .indexerRegistry
import IndexerRegistry
32 from .readTextCatalogTask
import ReadTextCatalogTask
34 __all__ = [
"IngestIndexedReferenceConfig",
"IngestIndexedReferenceTask",
"DatasetConfig"]
38 """!Task runner for the reference catalog ingester
40 Data IDs are ignored so the runner should just run the task on the parsed command.
43 def run(self, parsedCmd):
45 Several arguments need to be collected to send on to the task methods.
47 @param[in] parsedCmd Parsed command including command line arguments.
48 @param[out] Struct containing the result of the indexing.
50 files = parsedCmd.files
51 butler = parsedCmd.butler
52 task = self.TaskClass(config=self.config, log=self.log, butler=butler)
53 task.writeConfig(parsedCmd.butler, clobber=self.clobberConfig, doBackup=self.doBackup)
55 result = task.create_indexed_catalog(files)
56 if self.doReturnResults:
57 return pipeBase.Struct(
62 ref_dataset_name = pexConfig.Field(
64 default=
'cal_ref_cat',
65 doc=
'String to pass to the butler to retrieve persisted files.',
67 indexer = IndexerRegistry.makeField(
69 doc=
'Name of indexer algoritm to use. Default is HTM',
73 dataset_config = pexConfig.ConfigField(
75 doc=
"Configuration for reading the ingested data",
77 file_reader = pexConfig.ConfigurableField(
78 target=ReadTextCatalogTask,
79 doc=
'Task to use to read the files. Default is to expect text files.'
81 ra_name = pexConfig.Field(
83 doc=
"Name of RA column",
85 dec_name = pexConfig.Field(
87 doc=
"Name of Dec column",
89 mag_column_list = pexConfig.ListField(
91 doc=
"The values in the reference catalog are assumed to be in AB magnitudes. "
92 "List of column names to use for photometric information. At least one entry is required."
94 mag_err_column_map = pexConfig.DictField(
98 doc=
"A map of magnitude column name (key) to magnitude error column (value)."
100 is_photometric_name = pexConfig.Field(
103 doc=
'Name of column stating if satisfactory for photometric calibration (optional).'
105 is_resolved_name = pexConfig.Field(
108 doc=
'Name of column stating if the object is resolved (optional).'
110 is_variable_name = pexConfig.Field(
113 doc=
'Name of column stating if the object is measured to be variable (optional).'
115 id_name = pexConfig.Field(
118 doc=
'Name of column to use as an identifier (optional).'
120 extra_col_names = pexConfig.ListField(
123 doc=
'Extra columns to add to the reference catalog.'
127 pexConfig.Config.validate(self)
129 raise ValueError(
"ra_name and dec_name and at least one entry in mag_column_list must be" +
132 raise ValueError(
"If magnitude errors are provided, all magnitudes must have an error column")
136 """!Class for both producing indexed reference catalogs and for loading them.
138 This implements an indexing scheme based on hierarchical triangular mesh (HTM).
139 The term index really means breaking the catalog into localized chunks called
140 shards. In this case each shard contains the entries from the catalog in a single
143 canMultiprocess =
False
144 ConfigClass = IngestIndexedReferenceConfig
145 RunnerClass = IngestReferenceRunner
146 _DefaultName =
'IngestIndexedReferenceTask'
148 _flags = [
'photometric',
'resolved',
'variable']
152 """Create an argument parser
154 This overrides the original because we need the file arguments
156 parser = pipeBase.InputOnlyArgumentParser(name=cls._DefaultName)
157 parser.add_argument(
"files", nargs=
"+", help=
"Names of files to index")
161 """!Constructor for the HTM indexing engine
163 @param[in] butler dafPersistence.Butler object for reading and writing catalogs
166 pipeBase.Task.__init__(self, *args, **kwargs)
167 self.
indexer = IndexerRegistry[self.config.dataset_config.indexer.name](self.config.dataset_config.indexer.active)
168 self.makeSubtask(
'file_reader')
171 """!Index a set of files comprising a reference catalog. Outputs are persisted in the
174 @param[in] files A list of file names to read.
178 for filename
in files:
179 arr = self.file_reader.run(filename)
180 index_list = self.indexer.index_points(arr[self.config.ra_name], arr[self.config.dec_name])
184 dataId = self.indexer.make_data_id(
'master_schema', self.config.dataset_config.ref_dataset_name)
185 self.butler.put(self.
get_catalog(dataId, schema),
'ref_cat',
188 pixel_ids = set(index_list)
189 for pixel_id
in pixel_ids:
190 dataId = self.indexer.make_data_id(pixel_id, self.config.dataset_config.ref_dataset_name)
192 els = np.where(index_list == pixel_id)
194 record = catalog.addNew()
195 rec_num = self.
_fill_record(record, row, rec_num, key_map)
196 self.butler.put(catalog,
'ref_cat', dataId=dataId)
197 dataId = self.indexer.make_data_id(
None, self.config.dataset_config.ref_dataset_name)
198 self.butler.put(self.config.dataset_config,
'ref_cat_config', dataId=dataId)
202 """!Create a afwCoord object from a np.array row
203 @param[in] row dict like object with ra/dec info in degrees
204 @param[in] ra_name name of RA key
205 @param[in] dec_name name of Dec key
206 @param[out] IcrsCoord object constructed from the RA/Dec values
209 row[dec_name]*afwGeom.degrees)
212 """!Set the flags for a record. Relies on the _flags class attribute
213 @param[in,out] record SourceCatalog record to modify
214 @param[in] row dict like object containing flag info
215 @param[in] key_map Map of catalog keys to use in filling the record
217 names = record.schema.getNames()
220 attr_name =
'is_{}_name'.
format(flag)
221 record.set(key_map[flag], bool(row[getattr(self.config, attr_name)]))
224 """!Set the flux records from the input magnitudes
225 @param[in,out] record SourceCatalog record to modify
226 @param[in] row dict like object containing magnitude values
227 @param[in] key_map Map of catalog keys to use in filling the record
229 for item
in self.config.mag_column_list:
231 if len(self.config.mag_err_column_map) > 0:
232 for err_key
in self.config.mag_err_column_map.keys():
233 error_col_name = self.config.mag_err_column_map[err_key]
234 record.set(key_map[err_key+
'_fluxSigma'],
238 """!Copy the extra column information to the record
239 @param[in,out] record SourceCatalog record to modify
240 @param[in] row dict like object containing the column values
241 @param[in] key_map Map of catalog keys to use in filling the record
243 for extra_col
in self.config.extra_col_names:
244 value = row[extra_col]
252 if isinstance(value, np.str_):
254 record.set(key_map[extra_col], value)
257 """!Fill a record to put in the persisted indexed catalogs
259 @param[in,out] record afwTable.SourceRecord in a reference catalog to fill.
260 @param[in] row A row from a numpy array constructed from the input catalogs.
261 @param[in] rec_num Starting integer to increment for the unique id
262 @param[in] key_map Map of catalog keys to use in filling the record
264 record.setCoord(self.
compute_coord(row, self.config.ra_name, self.config.dec_name))
265 if self.config.id_name:
266 record.setId(row[self.config.id_name])
269 record.setId(rec_num)
279 """!Get a catalog from the butler or create it if it doesn't exist
281 @param[in] dataId Identifier for catalog to retrieve
282 @param[in] schema Schema to use in catalog creation if the butler can't get it
283 @param[out] afwTable.SourceCatalog for the specified identifier
285 if self.butler.datasetExists(
'ref_cat', dataId=dataId):
286 return self.butler.get(
'ref_cat', dataId=dataId)
290 """!Make the schema to use in constructing the persisted catalogs.
292 @param[in] dtype A np.dtype to use in constructing the schema
293 @param[out] The schema for the output source catalog.
294 @param[out] A map of catalog keys to use in filling the record
297 mag_column_list = self.config.mag_column_list
298 mag_err_column_map = self.config.mag_err_column_map
299 if len(mag_err_column_map) > 0
and (
300 not len(mag_column_list) == len(mag_err_column_map)
or
301 not sorted(mag_column_list) == sorted(mag_err_column_map.keys())):
302 raise ValueError(
"Every magnitude column must have a corresponding error column")
304 schema = afwTable.SourceTable.makeMinimalSchema()
307 if dtype[name].kind ==
'U':
309 at_type = afwTable.aliases[str]
310 at_size = dtype[name].itemsize
311 return schema.addField(name, type=at_type, size=at_size)
313 at_type = afwTable.aliases[dtype[name].type]
314 return schema.addField(name, at_type)
316 for item
in mag_column_list:
317 key_map[item+
'_flux'] = schema.addField(item+
'_flux', float)
318 if len(mag_err_column_map) > 0:
319 for err_item
in mag_err_column_map.keys():
320 key_map[err_item+
'_fluxSigma'] = schema.addField(err_item+
'_fluxSigma', float)
322 attr_name =
'is_{}_name'.
format(flag)
323 if getattr(self.config, attr_name):
324 key_map[flag] = schema.addField(flag,
'Flag')
325 for col
in self.config.extra_col_names:
326 key_map[col] = add_field(col)
327 return schema, key_map
def _fill_record
Fill a record to put in the persisted indexed catalogs.
def _set_flags
Set the flags for a record.
double fluxErrFromABMagErr(double magErr, double mag)
Compute flux error in Janskys from AB magnitude error and AB magnitude.
double fluxFromABMag(double mag)
Compute flux in Janskys from AB magnitude.
def _set_extra
Copy the extra column information to the record.
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
def _set_mags
Set the flux records from the input magnitudes.
def make_schema
Make the schema to use in constructing the persisted catalogs.
def create_indexed_catalog
Index a set of files comprising a reference catalog.
def get_catalog
Get a catalog from the butler or create it if it doesn't exist.
def compute_coord
Create a afwCoord object from a np.array row.
Class for both producing indexed reference catalogs and for loading them.
Task runner for the reference catalog ingester.
def __init__
Constructor for the HTM indexing engine.
A class to handle Icrs coordinates (inherits from Coord)