LSSTApplications  17.0+10,17.0+52,17.0+91,18.0.0+11,18.0.0+16,18.0.0+38,18.0.0+4,18.0.0-2-ge43143a+8,18.1.0-1-g0001055+4,18.1.0-1-g1349e88+13,18.1.0-1-g2505f39+10,18.1.0-1-g380d4d4+13,18.1.0-1-g5315e5e,18.1.0-1-g5e4b7ea+4,18.1.0-1-g7e8fceb,18.1.0-1-g85f8cd4+10,18.1.0-1-g9a6769a+4,18.1.0-1-ga1a4c1a+9,18.1.0-1-gd55f500+5,18.1.0-1-ge10677a+10,18.1.0-11-gb2589d7b,18.1.0-13-g451e75588+2,18.1.0-13-gbfe7f7f+4,18.1.0-14-g2e73c10+1,18.1.0-2-g31c43f9+10,18.1.0-2-g919ecaf,18.1.0-2-g9c63283+13,18.1.0-2-gdf0b915+13,18.1.0-2-gfefb8b5+2,18.1.0-3-g52aa583+4,18.1.0-3-g8f4a2b1+4,18.1.0-3-g9cb968e+12,18.1.0-3-gab23065,18.1.0-4-g7bbbad0+4,18.1.0-5-g510c42a+12,18.1.0-5-gaeab27e+13,18.1.0-6-gc4bdb98+2,18.1.0-6-gdda7f3e+15,18.1.0-9-g9613d271+1,w.2019.34
LSSTDataManagementBasePackage
Public Member Functions | Public Attributes | Static Public Attributes | List of all members
lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask Class Reference

Merge measurements from multiple bands. More...

Inheritance diagram for lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask:

Public Member Functions

def getInputSchema (self, butler=None, schema=None)
 
def getInitOutputDatasets (self)
 
def adaptArgsAndRun (self, inputData, inputDataIds, outputDataIds, butler)
 
def __init__ (self, butler=None, schema=None, initInputs=None, kwargs)
 Initialize the task. More...
 
def runDataRef (self, patchRefList)
 Merge coadd sources from multiple bands. More...
 
def run (self, catalogs)
 Merge measurement catalogs to create a single reference catalog for forced photometry. More...
 
def write (self, patchRef, catalog)
 Write the output. More...
 
def writeMetadata (self, dataRefList)
 No metadata to write, and not sure how to write it for a list of dataRefs. More...
 

Public Attributes

 schemaMapper
 
 instFluxKey
 
 instFluxErrKey
 
 fluxFlagKey
 
 flagKeys
 
 schema
 
 pseudoFilterKeys
 
 badFlags
 

Static Public Attributes

 ConfigClass = MergeMeasurementsConfig
 
 RunnerClass = MergeSourcesRunner
 
string inputDataset = "meas"
 
string outputDataset = "ref"
 
 getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
 

Detailed Description

Merge measurements from multiple bands.

Contents

Description

Command-line task that merges measurements from multiple bands.

Combines consistent (i.e. with the same peaks and footprints) catalogs of sources from multiple filter bands to construct a unified catalog that is suitable for driving forced photometry. Every source is required to have centroid, shape and flux measurements in each band.

Inputs:
deepCoadd_meas{tract,patch,filter}: SourceCatalog
Outputs:
deepCoadd_ref{tract,patch}: SourceCatalog
Data Unit:
tract, patch

MergeMeasurementsTask subclasses CmdLineTask.

Task initialization

Initialize the task.

Parameters
[in]schemathe schema of the detection catalogs used as input to this one
[in]butlera butler used to read the input schema from disk, if schema is None

The task will set its own self.schema attribute to the schema of the output merged catalog.

Invoking the Task

Merge measurement catalogs to create a single reference catalog for forced photometry.

Parameters
[in]catalogsthe catalogs to be merged

For parent sources, we choose the first band in config.priorityList for which the merge_footprint flag for that band is is True.

For child sources, the logic is the same, except that we use the merge_peak flags.

Configuration parameters

See MergeMeasurementsConfig_

Debug variables

The command line task interface supports a flag -d to import debug.py from your PYTHONPATH; see Using lsstDebug to control debugging output for more about debug.py files.

MergeMeasurementsTask has no debug variables.

A complete example

of using MergeMeasurementsTask

MergeMeasurementsTask is meant to be run after deblending & measuring sources in every band. The purpose of the task is to generate a catalog of sources suitable for driving forced photometry in coadds and individual exposures. Command-line usage of MergeMeasurementsTask expects a data reference to the coadds to be processed. A list of the available optional arguments can be obtained by calling mergeCoaddMeasurements.py with the --help command line argument:

mergeCoaddMeasurements.py --help

To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we will process HSC data in the ci_hsc package. Assuming one has finished step 7 at pipeTasks_multiBand, one may merge the catalogs generated after deblending and measuring as follows:

mergeCoaddMeasurements.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I^HSC-R

This will merge the HSC-I & HSC-R band catalogs. The results are written in $CI_HSC_DIR/DATA/deepCoadd-results/.

Definition at line 126 of file mergeMeasurements.py.

Constructor & Destructor Documentation

◆ __init__()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.__init__ (   self,
  butler = None,
  schema = None,
  initInputs = None,
  kwargs 
)

Initialize the task.

Parameters
[in]schemathe schema of the detection catalogs used as input to this one
[in]butlera butler used to read the input schema from disk, if schema is None

The task will set its own self.schema attribute to the schema of the output merged catalog.

Definition at line 225 of file mergeMeasurements.py.

225  def __init__(self, butler=None, schema=None, initInputs=None, **kwargs):
226  """!
227  Initialize the task.
228 
229  @param[in] schema: the schema of the detection catalogs used as input to this one
230  @param[in] butler: a butler used to read the input schema from disk, if schema is None
231 
232  The task will set its own self.schema attribute to the schema of the output merged catalog.
233  """
234  super().__init__(**kwargs)
235 
236  if initInputs is not None:
237  inputSchema = initInputs['inputSchema'].schema
238  else:
239  inputSchema = self.getInputSchema(butler=butler, schema=schema)
240  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
241  self.schemaMapper.addMinimalSchema(inputSchema, True)
242  self.instFluxKey = inputSchema.find(self.config.snName + "_instFlux").getKey()
243  self.instFluxErrKey = inputSchema.find(self.config.snName + "_instFluxErr").getKey()
244  self.fluxFlagKey = inputSchema.find(self.config.snName + "_flag").getKey()
245 
246  self.flagKeys = {}
247  for band in self.config.priorityList:
248  short = getShortFilterName(band)
249  outputKey = self.schemaMapper.editOutputSchema().addField(
250  "merge_measurement_%s" % short,
251  type="Flag",
252  doc="Flag field set if the measurements here are from the %s filter" % band
253  )
254  peakKey = inputSchema.find("merge_peak_%s" % short).key
255  footprintKey = inputSchema.find("merge_footprint_%s" % short).key
256  self.flagKeys[band] = pipeBase.Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
257  self.schema = self.schemaMapper.getOutputSchema()
258 
259  self.pseudoFilterKeys = []
260  for filt in self.config.pseudoFilterList:
261  try:
262  self.pseudoFilterKeys.append(self.schema.find("merge_peak_%s" % filt).getKey())
263  except Exception as e:
264  self.log.warn("merge_peak is not set for pseudo-filter %s: %s" % (filt, e))
265 
266  self.badFlags = {}
267  for flag in self.config.flags:
268  try:
269  self.badFlags[flag] = self.schema.find(flag).getKey()
270  except KeyError as exc:
271  self.log.warn("Can't find flag %s in schema: %s" % (flag, exc,))
272 
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
std::shared_ptr< FrameSet > append(FrameSet const &first, FrameSet const &second)
Construct a FrameSet that performs two transformations in series.
Definition: functional.cc:33
def __init__(self, minimum, dataRange, Q)

Member Function Documentation

◆ adaptArgsAndRun()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.adaptArgsAndRun (   self,
  inputData,
  inputDataIds,
  outputDataIds,
  butler 
)

Definition at line 218 of file mergeMeasurements.py.

218  def adaptArgsAndRun(self, inputData, inputDataIds, outputDataIds, butler):
219  catalogDict = {dataId['abstract_filter']: cat for dataId, cat in zip(inputDataIds['catalogs'],
220  inputData['catalogs'])}
221  inputData['catalogs'] = catalogDict
222 
223  return super().adaptArgsAndRun(inputData, inputDataIds, outputDataIds, butler)
224 

◆ getInitOutputDatasets()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.getInitOutputDatasets (   self)

Definition at line 215 of file mergeMeasurements.py.

215  def getInitOutputDatasets(self):
216  return {"outputSchema": afwTable.SourceCatalog(self.schema), }
217 

◆ getInputSchema()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.getInputSchema (   self,
  butler = None,
  schema = None 
)

Definition at line 212 of file mergeMeasurements.py.

212  def getInputSchema(self, butler=None, schema=None):
213  return getInputSchema(self, butler, schema)
214 
def getInputSchema(task, butler=None, schema=None)
Obtain the input schema either directly or froma butler reference.

◆ run()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.run (   self,
  catalogs 
)

Merge measurement catalogs to create a single reference catalog for forced photometry.

Parameters
[in]catalogsthe catalogs to be merged

For parent sources, we choose the first band in config.priorityList for which the merge_footprint flag for that band is is True.

For child sources, the logic is the same, except that we use the merge_peak flags.

Definition at line 282 of file mergeMeasurements.py.

282  def run(self, catalogs):
283  """!
284  Merge measurement catalogs to create a single reference catalog for forced photometry
285 
286  @param[in] catalogs: the catalogs to be merged
287 
288  For parent sources, we choose the first band in config.priorityList for which the
289  merge_footprint flag for that band is is True.
290 
291  For child sources, the logic is the same, except that we use the merge_peak flags.
292  """
293  # Put catalogs, filters in priority order
294  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
295  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
296 
297  mergedCatalog = afwTable.SourceCatalog(self.schema)
298  mergedCatalog.reserve(len(orderedCatalogs[0]))
299 
300  idKey = orderedCatalogs[0].table.getIdKey()
301  for catalog in orderedCatalogs[1:]:
302  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
303  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
304 
305  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
306  # record for each band, in priority order.
307  for orderedRecords in zip(*orderedCatalogs):
308 
309  maxSNRecord = None
310  maxSNFlagKeys = None
311  maxSN = 0.
312  priorityRecord = None
313  priorityFlagKeys = None
314  prioritySN = 0.
315  hasPseudoFilter = False
316 
317  # Now we iterate over those record-band pairs, keeping track of the priority and the
318  # largest S/N band.
319  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
320  parent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
321  child = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
322 
323  if not (parent or child):
324  for pseudoFilterKey in self.pseudoFilterKeys:
325  if inputRecord.get(pseudoFilterKey):
326  hasPseudoFilter = True
327  priorityRecord = inputRecord
328  priorityFlagKeys = flagKeys
329  break
330  if hasPseudoFilter:
331  break
332 
333  isBad = any(inputRecord.get(flag) for flag in self.badFlags)
334  if isBad or inputRecord.get(self.fluxFlagKey) or inputRecord.get(self.instFluxErrKey) == 0:
335  sn = 0.
336  else:
337  sn = inputRecord.get(self.instFluxKey)/inputRecord.get(self.instFluxErrKey)
338  if numpy.isnan(sn) or sn < 0.:
339  sn = 0.
340  if (parent or child) and priorityRecord is None:
341  priorityRecord = inputRecord
342  priorityFlagKeys = flagKeys
343  prioritySN = sn
344  if sn > maxSN:
345  maxSNRecord = inputRecord
346  maxSNFlagKeys = flagKeys
347  maxSN = sn
348 
349  # If the priority band has a low S/N we would like to choose the band with the highest S/N as
350  # the reference band instead. However, we only want to choose the highest S/N band if it is
351  # significantly better than the priority band. Therefore, to choose a band other than the
352  # priority, we require that the priority S/N is below the minimum threshold and that the
353  # difference between the priority and highest S/N is larger than the difference threshold.
354  #
355  # For pseudo code objects we always choose the first band in the priority list.
356  bestRecord = None
357  bestFlagKeys = None
358  if hasPseudoFilter:
359  bestRecord = priorityRecord
360  bestFlagKeys = priorityFlagKeys
361  elif (prioritySN < self.config.minSN and (maxSN - prioritySN) > self.config.minSNDiff and
362  maxSNRecord is not None):
363  bestRecord = maxSNRecord
364  bestFlagKeys = maxSNFlagKeys
365  elif priorityRecord is not None:
366  bestRecord = priorityRecord
367  bestFlagKeys = priorityFlagKeys
368 
369  if bestRecord is not None and bestFlagKeys is not None:
370  outputRecord = mergedCatalog.addNew()
371  outputRecord.assign(bestRecord, self.schemaMapper)
372  outputRecord.set(bestFlagKeys.output, True)
373  else: # if we didn't find any records
374  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
375  inputRecord.getId())
376 
377  # more checking for sane inputs, since zip silently iterates over the smallest sequence
378  for inputCatalog in orderedCatalogs:
379  if len(mergedCatalog) != len(inputCatalog):
380  raise ValueError("Mismatch between catalog sizes: %s != %s" %
381  (len(mergedCatalog), len(orderedCatalogs)))
382 
383  return pipeBase.Struct(
384  mergedCatalog=mergedCatalog
385  )
386 
bool any(CoordinateExpr< N > const &expr) noexcept
Return true if any elements are true.

◆ runDataRef()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.runDataRef (   self,
  patchRefList 
)

Merge coadd sources from multiple bands.

Calls run.

Parameters
[in]patchRefListlist of data references for each filter

Definition at line 273 of file mergeMeasurements.py.

273  def runDataRef(self, patchRefList):
274  """!
275  @brief Merge coadd sources from multiple bands. Calls @ref `run`.
276  @param[in] patchRefList list of data references for each filter
277  """
278  catalogs = dict(readCatalog(self, patchRef) for patchRef in patchRefList)
279  mergedCatalog = self.run(catalogs).mergedCatalog
280  self.write(patchRefList[0], mergedCatalog)
281 
def readCatalog(task, patchRef)
Read input catalog.

◆ write()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.write (   self,
  patchRef,
  catalog 
)

Write the output.

Parameters
[in]patchRefdata reference for patch
[in]catalogcatalog

We write as the dataset provided by the 'outputDataset' class variable.

Definition at line 387 of file mergeMeasurements.py.

387  def write(self, patchRef, catalog):
388  """!
389  @brief Write the output.
390 
391  @param[in] patchRef data reference for patch
392  @param[in] catalog catalog
393 
394  We write as the dataset provided by the 'outputDataset'
395  class variable.
396  """
397  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
398  # since the filter isn't actually part of the data ID for the dataset we're saving,
399  # it's confusing to see it in the log message, even if the butler simply ignores it.
400  mergeDataId = patchRef.dataId.copy()
401  del mergeDataId["filter"]
402  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
403 

◆ writeMetadata()

def lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.writeMetadata (   self,
  dataRefList 
)

No metadata to write, and not sure how to write it for a list of dataRefs.

Definition at line 404 of file mergeMeasurements.py.

404  def writeMetadata(self, dataRefList):
405  """!
406  @brief No metadata to write, and not sure how to write it for a list of dataRefs.
407  """
408  pass
409 

Member Data Documentation

◆ badFlags

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.badFlags

Definition at line 266 of file mergeMeasurements.py.

◆ ConfigClass

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.ConfigClass = MergeMeasurementsConfig
static

Definition at line 202 of file mergeMeasurements.py.

◆ flagKeys

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.flagKeys

Definition at line 246 of file mergeMeasurements.py.

◆ fluxFlagKey

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.fluxFlagKey

Definition at line 244 of file mergeMeasurements.py.

◆ getSchemaCatalogs

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
static

Definition at line 206 of file mergeMeasurements.py.

◆ inputDataset

string lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.inputDataset = "meas"
static

Definition at line 204 of file mergeMeasurements.py.

◆ instFluxErrKey

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.instFluxErrKey

Definition at line 243 of file mergeMeasurements.py.

◆ instFluxKey

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.instFluxKey

Definition at line 242 of file mergeMeasurements.py.

◆ outputDataset

string lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.outputDataset = "ref"
static

Definition at line 205 of file mergeMeasurements.py.

◆ pseudoFilterKeys

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.pseudoFilterKeys

Definition at line 259 of file mergeMeasurements.py.

◆ RunnerClass

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.RunnerClass = MergeSourcesRunner
static

Definition at line 203 of file mergeMeasurements.py.

◆ schema

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.schema

Definition at line 257 of file mergeMeasurements.py.

◆ schemaMapper

lsst.pipe.tasks.mergeMeasurements.MergeMeasurementsTask.schemaMapper

Definition at line 240 of file mergeMeasurements.py.


The documentation for this class was generated from the following file: