LSSTApplications  10.0+286,10.0+36,10.0+46,10.0-2-g4f67435,10.1+152,10.1+37,11.0,11.0+1,11.0-1-g47edd16,11.0-1-g60db491,11.0-1-g7418c06,11.0-2-g04d2804,11.0-2-g68503cd,11.0-2-g818369d,11.0-2-gb8b8ce7
LSSTDataManagementBasePackage
multiBand.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 import numpy
24 
25 from lsst.coadd.utils.coaddDataIdContainer import ExistingCoaddDataIdContainer
26 from lsst.pipe.base import CmdLineTask, Struct, TaskRunner, ArgumentParser, ButlerInitializedTaskRunner
27 from lsst.pex.config import Config, Field, ListField, ConfigurableField, RangeField, ConfigField
28 from lsst.meas.algorithms import SourceDetectionTask
29 from lsst.meas.base import SingleFrameMeasurementTask
30 from lsst.meas.deblender import SourceDeblendTask
31 from lsst.pipe.tasks.coaddBase import getSkyInfo
32 from lsst.meas.astrom import ANetAstrometryTask
33 import lsst.afw.image as afwImage
34 import lsst.afw.table as afwTable
35 import lsst.afw.math as afwMath
36 import lsst.afw.geom as afwGeom
37 import lsst.afw.detection as afwDetect
38 from lsst.daf.base import PropertyList
39 
40 """
41 New dataset types:
42 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
43 * deepCoadd_mergeDet: merged detections (tract, patch)
44 * deepCoadd_meas: measurements of merged detections (tract, patch, filter)
45 * deepCoadd_ref: reference sources (tract, patch)
46 All of these have associated *_schema catalogs that require no data ID and hold no records.
47 
48 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
49 the mergeDet, meas, and ref dataset Footprints:
50 * deepCoadd_peak_schema
51 """
52 
53 
54 def _makeGetSchemaCatalogs(datasetSuffix):
55  """Construct a getSchemaCatalogs instance method
56 
57  These are identical for most of the classes here, so we'll consolidate
58  the code.
59 
60  datasetSuffix: Suffix of dataset name, e.g., "src" for "deepCoadd_src"
61  """
62  def getSchemaCatalogs(self):
63  """Return a dict of empty catalogs for each catalog dataset produced by this task."""
64  src = afwTable.SourceCatalog(self.schema)
65  if hasattr(self, "algMetadata"):
66  src.getTable().setMetadata(self.algMetadata)
67  return {self.config.coaddName + "Coadd_" + datasetSuffix: src}
68  return getSchemaCatalogs
69 
70 def _makeMakeIdFactory(datasetName):
71  """Construct a makeIdFactory instance method
72 
73  These are identical for all the classes here, so this consolidates
74  the code.
75 
76  datasetName: Dataset name without the coadd name prefix, e.g., "CoaddId" for "deepCoaddId"
77  """
78  def makeIdFactory(self, dataRef):
79  """Return an IdFactory for setting the detection identifiers
80 
81  The actual parameters used in the IdFactory are provided by
82  the butler (through the provided data reference.
83  """
84  expBits = dataRef.get(self.config.coaddName + datasetName + "_bits")
85  expId = long(dataRef.get(self.config.coaddName + datasetName))
86  return afwTable.IdFactory.makeSource(expId, 64 - expBits)
87  return makeIdFactory
88 
90  """Given a longer, camera-specific filter name (e.g. "HSC-I") return its shorthand name ("i").
91  """
92  # I'm not sure if this is the way this is supposed to be implemented, but it seems to work,
93  # and its the only way I could get it to work.
94  return afwImage.Filter(name).getFilterProperty().getName()
95 
96 
97 ##############################################################################################################
98 
100  doScaleVariance = Field(dtype=bool, default=True, doc="Scale variance plane using empirical noise?")
101  detection = ConfigurableField(target=SourceDetectionTask, doc="Source detection")
102  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
103 
104  def setDefaults(self):
105  Config.setDefaults(self)
106  self.detection.thresholdType = "pixel_stdev"
107  self.detection.isotropicGrow = True
108  self.detection.background.undersampleStyle = 'REDUCE_INTERP_ORDER'
109 
110 
111 class DetectCoaddSourcesTask(CmdLineTask):
112  """Detect sources on a coadd
113 
114  This operation is performed separately in each band. The detections from
115  each band will be merged before performing the measurement stage.
116  """
117  _DefaultName = "detectCoaddSources"
118  ConfigClass = DetectCoaddSourcesConfig
119  getSchemaCatalogs = _makeGetSchemaCatalogs("det")
120  makeIdFactory = _makeMakeIdFactory("CoaddId")
121 
122  @classmethod
124  parser = ArgumentParser(name=cls._DefaultName)
125  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2 filter=r",
126  ContainerClass=ExistingCoaddDataIdContainer)
127  return parser
128 
129  def __init__(self, schema=None, **kwargs):
130  """Initialize the task.
131 
132  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
133  - schema: the initial schema for the output catalog, modified-in place to include all
134  fields set by this task. If None, the source minimal schema will be used.
135  """
136  CmdLineTask.__init__(self, **kwargs)
137  if schema is None:
138  schema = afwTable.SourceTable.makeMinimalSchema()
139  self.schema = schema
140  self.makeSubtask("detection", schema=self.schema)
141 
142  def run(self, patchRef):
143  """Run detection on a coadd"""
144  exposure = patchRef.get(self.config.coaddName + "Coadd", immediate=True)
145  if self.config.doScaleVariance:
146  self.scaleVariance(exposure.getMaskedImage())
147  results = self.runDetection(exposure, self.makeIdFactory(patchRef))
148  self.write(exposure, results, patchRef)
149 
150  def scaleVariance(self, maskedImage):
151  """Scale the variance in a maskedImage
152 
153  Scales the variance plane to match the measured variance.
154  """
156  ctrl.setAndMask(~0x0)
157  var = maskedImage.getVariance()
158  mask = maskedImage.getMask()
159  dstats = afwMath.makeStatistics(maskedImage, afwMath.VARIANCECLIP, ctrl).getValue()
160  vstats = afwMath.makeStatistics(var, mask, afwMath.MEANCLIP, ctrl).getValue()
161  vrat = dstats / vstats
162  self.log.info("Renormalising variance by %f" % (vrat))
163  var *= vrat
164 
165  def runDetection(self, exposure, idFactory):
166  """Run detection on an exposure
167 
168  exposure: Exposure on which to detect
169  idFactory: IdFactory to set source identifiers
170 
171  Returns: Struct(sources: catalog of detections,
172  backgrounds: list of backgrounds
173  )
174  """
175  backgrounds = afwMath.BackgroundList()
176  table = afwTable.SourceTable.make(self.schema, idFactory)
177  detections = self.detection.makeSourceCatalog(table, exposure)
178  sources = detections.sources
179  fpSets = detections.fpSets
180  if fpSets.background:
181  backgrounds.append(fpSets.background)
182  return Struct(sources=sources, backgrounds=backgrounds)
183 
184  def write(self, exposure, results, patchRef):
185  """!Write out results from runDetection
186 
187  @param exposure: Exposure to write out
188  @param results: Struct returned from runDetection
189  @param patchRef: data reference for patch
190  """
191  coaddName = self.config.coaddName + "Coadd"
192  patchRef.put(results.backgrounds, coaddName + "_calexp_detBackground")
193  patchRef.put(results.sources, coaddName + "_det")
194  patchRef.put(exposure, coaddName + "_calexp_det")
195 
196 ##############################################################################################################
197 
198 class MergeSourcesRunner(TaskRunner):
199  def makeTask(self, parsedCmd=None, args=None):
200  """Provide a butler to the Task constructor"""
201  if parsedCmd is not None:
202  butler = parsedCmd.butler
203  elif args is not None:
204  dataRefList, kwargs = args
205  butler = dataRefList[0].getButler()
206  else:
207  raise RuntimeError("Neither parsedCmd or args specified")
208  return self.TaskClass(config=self.config, log=self.log, butler=butler)
209 
210  @staticmethod
211  def getTargetList(parsedCmd, **kwargs):
212  """Provide a list of patch references for each patch
213 
214  The patch references within the list will have different filters.
215  """
216  refList = {} # Will index this as refList[tract][patch][filter] = ref
217  for ref in parsedCmd.id.refList:
218  tract = ref.dataId["tract"]
219  patch = ref.dataId["patch"]
220  filter = ref.dataId["filter"]
221  if not tract in refList:
222  refList[tract] = {}
223  if not patch in refList[tract]:
224  refList[tract][patch] = {}
225  if filter in refList[tract][patch]:
226  raise RuntimeError("Multiple versions of %s" % (ref.dataId,))
227  refList[tract][patch][filter] = ref
228  return [(p.values(), kwargs) for t in refList.itervalues() for p in t.itervalues()]
229 
230 
231 class MergeSourcesConfig(Config):
232  priorityList = ListField(dtype=str, default=[],
233  doc="Priority-ordered list of bands for the merge.")
234  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
235 
236  def validate(self):
237  Config.validate(self)
238  if len(self.priorityList) == 0:
239  raise RuntimeError("No priority list provided")
240 
241 
242 class MergeSourcesTask(CmdLineTask):
243  """A base class for merging source catalogs
244 
245  Merging detections (MergeDetectionsTask) and merging measurements
246  (MergeMeasurementsTask) are currently so similar that it makes
247  sense to re-use the code, in the form of this abstract base class.
248 
249  Sub-classes should set the following class variables:
250  * _DefaultName: name of Task
251  * inputDataset: name of dataset to read
252  * outputDataset: name of dataset to write
253  * getSchemaCatalogs to the output of _makeGetSchemaCatalogs(outputDataset)
254 
255  In addition, sub-classes must implement the mergeCatalogs method.
256  """
257  _DefaultName = None
258  ConfigClass = MergeSourcesConfig
259  RunnerClass = MergeSourcesRunner
260  inputDataset = None
261  outputDataset = None
262  getSchemaCatalogs = None
263 
264  @classmethod
266  """Create a suitable ArgumentParser
267 
268  We will use the ArgumentParser to get a provide a list of data
269  references for patches; the RunnerClass will sort them into lists
270  of data references for the same patch
271  """
272  parser = ArgumentParser(name=cls._DefaultName)
273  parser.add_id_argument("--id", "deepCoadd_" + cls.inputDataset,
274  ContainerClass=ExistingCoaddDataIdContainer,
275  help="data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i")
276  return parser
277 
278  def getInputSchema(self, butler=None, schema=None):
279  if schema is None:
280  assert butler is not None, "Neither butler nor schema specified"
281  schema = butler.get(self.config.coaddName + "Coadd_" + self.inputDataset + "_schema",
282  immediate=True).schema
283  return schema
284 
285  def __init__(self, butler=None, schema=None, **kwargs):
286  """Initialize the task.
287 
288  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
289  - schema: the schema of the detection catalogs used as input to this one
290  - butler: a butler used to read the input schema from disk, if schema is None
291 
292  Derived classes should use the getInputSchema() method to handle the additional
293  arguments and retreive the actual input schema.
294  """
295  CmdLineTask.__init__(self, **kwargs)
296 
297  def run(self, patchRefList):
298  """Merge coadd sources from multiple bands
299 
300  patchRefList: list of patch data reference for each filter
301  """
302  catalogs = dict(self.readCatalog(patchRef) for patchRef in patchRefList)
303  mergedCatalog = self.mergeCatalogs(catalogs, patchRefList[0])
304  self.write(patchRefList[0], mergedCatalog)
305 
306  def readCatalog(self, patchRef):
307  """Read input catalog
308 
309  We read the input dataset provided by the 'inputDataset'
310  class variable.
311  """
312  filterName = patchRef.dataId["filter"]
313  catalog = patchRef.get(self.config.coaddName + "Coadd_" + self.inputDataset, immediate=True)
314  self.log.info("Read %d sources for filter %s: %s" % (len(catalog), filterName, patchRef.dataId))
315  return filterName, catalog
316 
317  def mergeCatalogs(self, catalogs, patchRef):
318  """Merge multiple catalogs
319 
320  catalogs: dict mapping filter name to source catalog
321 
322  Returns: merged catalog
323  """
324  raise NotImplementedError()
325 
326  def write(self, patchRef, catalog):
327  """Write the output
328 
329  We write as the dataset provided by the 'outputDataset'
330  class variable.
331  """
332  patchRef.put(catalog, self.config.coaddName + "Coadd_" + self.outputDataset)
333  # since the filter isn't actually part of the data ID for the dataset we're saving,
334  # it's confusing to see it in the log message, even if the butler simply ignores it.
335  mergeDataId = patchRef.dataId.copy()
336  del mergeDataId["filter"]
337  self.log.info("Wrote merged catalog: %s" % (mergeDataId,))
338 
339  def writeMetadata(self, dataRefList):
340  """No metadata to write, and not sure how to write it for a list of dataRefs"""
341  pass
342 
343 
344 class CullPeaksConfig(Config):
345  """Configuration for culling garbage peaks after merging Footprints.
346 
347  Peaks may also be culled after detection or during deblending; this configuration object
348  only deals with culling after merging Footprints.
349 
350  These cuts are based on three quantities:
351  - nBands: the number of bands in which the peak was detected
352  - peakRank: the position of the peak within its family, sorted from brightest to faintest.
353  - peakRankNormalized: the peak rank divided by the total number of peaks in the family.
354 
355  The formula that identifie peaks to cull is:
356 
357  nBands < nBandsSufficient
358  AND (rank >= rankSufficient)
359  AND (rank >= rankConsider OR rank >= rankNormalizedConsider)
360 
361  To disable peak culling, simply set nBandsSafe=1.
362  """
363 
364  nBandsSufficient = RangeField(dtype=int, default=2, min=1,
365  doc="Always keep peaks detected in this many bands")
366  rankSufficient = RangeField(dtype=int, default=20, min=1,
367  doc="Always keep this many peaks in each family")
368  rankConsidered = RangeField(dtype=int, default=30, min=1,
369  doc=("Keep peaks with less than this rank that also match the "
370  "rankNormalizedConsidered condition."))
371  rankNormalizedConsidered = RangeField(dtype=float, default=0.7, min=0.0,
372  doc=("Keep peaks with less than this normalized rank that"
373  " also match the rankConsidered condition."))
374 
375 
377  minNewPeak = Field(dtype=float, default=1,
378  doc="Minimum distance from closest peak to create a new one (in arcsec).")
379 
380  maxSamePeak = Field(dtype=float, default=0.3,
381  doc="When adding new catalogs to the merge, all peaks less than this distance "
382  " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
383  cullPeaks = ConfigField(dtype=CullPeaksConfig, doc="Configuration for how to cull peaks.")
384 
385 
387  """Merge detections from multiple bands"""
388  ConfigClass = MergeDetectionsConfig
389  _DefaultName = "mergeCoaddDetections"
390  inputDataset = "det"
391  outputDataset = "mergeDet"
392  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
393 
394  def __init__(self, butler=None, schema=None, **kwargs):
395  """Initialize the task.
396 
397  Additional keyword arguments (forwarded to MergeSourcesTask.__init__):
398  - schema: the schema of the detection catalogs used as input to this one
399  - butler: a butler used to read the input schema from disk, if schema is None
400 
401  The task will set its own self.schema attribute to the schema of the output merged catalog.
402  """
403  MergeSourcesTask.__init__(self, butler=butler, schema=schema, **kwargs)
404  self.schema = self.getInputSchema(butler=butler, schema=schema)
406  self.schema,
407  [getShortFilterName(name) for name in self.config.priorityList]
408  )
409 
410  def mergeCatalogs(self, catalogs, patchRef):
411  """Merge multiple catalogs
412  """
413 
414  # Convert distance to tract coordiante
415  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
416  tractWcs = skyInfo.wcs
417  peakDistance = self.config.minNewPeak / tractWcs.pixelScale().asArcseconds()
418  samePeakDistance = self.config.maxSamePeak / tractWcs.pixelScale().asArcseconds()
419 
420  # Put catalogs, filters in priority order
421  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
422  orderedBands = [getShortFilterName(band) for band in self.config.priorityList
423  if band in catalogs.keys()]
424 
425  mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
426  self.schema, self.makeIdFactory(patchRef),
427  samePeakDistance)
428 
429  # Sort Peaks from brightest to faintest
430  for record in mergedList:
431  record.getFootprint().sortPeaks()
432  self.log.info("Merged to %d sources" % len(mergedList))
433  # Attempt to remove garbage peaks
434  self.cullPeaks(mergedList)
435  return mergedList
436 
437  def cullPeaks(self, catalog):
438  """Attempt to remove garbage peaks (mostly on the outskirts of large blends)"""
439  keys = [item.key for item in self.merged.getPeakSchema().extract("merge.peak.*").itervalues()]
440  totalPeaks = 0
441  culledPeaks = 0
442  for parentSource in catalog:
443  # Make a list copy so we can clear the attached PeakCatalog and append the ones we're keeping
444  # to it (which is easier than deleting as we iterate).
445  keptPeaks = parentSource.getFootprint().getPeaks()
446  oldPeaks = list(keptPeaks)
447  keptPeaks.clear()
448  familySize = len(oldPeaks)
449  totalPeaks += familySize
450  for rank, peak in enumerate(oldPeaks):
451  if ((rank < self.config.cullPeaks.rankSufficient) or
452  (self.config.cullPeaks.nBandsSufficient > 1 and
453  sum([peak.get(k) for k in keys]) >= self.config.cullPeaks.nBandsSufficient) or
454  (rank < self.config.cullPeaks.rankConsidered and
455  rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
456  keptPeaks.append(peak)
457  else:
458  culledPeaks += 1
459  self.log.info("Culled %d of %d peaks" % (culledPeaks, totalPeaks))
460 
461  def getSchemaCatalogs(self):
462  """Return a dict of empty catalogs for each catalog dataset produced by this task."""
463  mergeDet = afwTable.SourceCatalog(self.schema)
464  peak = afwDetect.PeakCatalog(self.merged.getPeakSchema())
465  return {self.config.coaddName + "Coadd_mergeDet": mergeDet,
466  self.config.coaddName + "Coadd_peak": peak}
467 
468 ##############################################################################################################
469 
471  doDeblend = Field(dtype=bool, default=True, doc="Deblend sources?")
472  deblend = ConfigurableField(target=SourceDeblendTask, doc="Deblend sources")
473  measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc="Source measurement")
474  doMatchSources = Field(dtype=bool, default=False, doc="Match sources to reference catalog?")
475  astrometry = ConfigurableField(target=ANetAstrometryTask, doc="Astrometric matching")
476  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
477 
478  def setDefaults(self):
479  Config.setDefaults(self)
480  self.deblend.propagateAllPeaks = True
481 
482 class MeasureMergedCoaddSourcesTask(CmdLineTask):
483  """Measure sources using the merged catalog of detections
484 
485  This operation is performed separately on each band. We deblend and measure on
486  the list of merge detections. The results from each band will subsequently
487  be merged to create a final reference catalog for forced measurement.
488  """
489  _DefaultName = "measureCoaddSources"
490  ConfigClass = MeasureMergedCoaddSourcesConfig
491  RunnerClass = ButlerInitializedTaskRunner
492  getSchemaCatalogs = _makeGetSchemaCatalogs("meas")
493  makeIdFactory = _makeMakeIdFactory("MergedCoaddId") # The IDs we already have are of this type
494 
495  @classmethod
497  parser = ArgumentParser(name=cls._DefaultName)
498  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2 filter=r",
499  ContainerClass=ExistingCoaddDataIdContainer)
500  return parser
501 
502  def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
503  """Initialize the task.
504 
505  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
506  - schema: the schema of the merged detection catalog used as input to this one
507  - peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog
508  - butler: a butler used to read the input schemas from disk, if schema or peakSchema is None
509 
510  The task will set its own self.schema attribute to the schema of the output measurement catalog.
511  This will include all fields from the input schema, as well as additional fields for all the
512  measurements.
513  """
514  CmdLineTask.__init__(self, **kwargs)
515  if schema is None:
516  assert butler is not None, "Neither butler nor schema is defined"
517  schema = butler.get(self.config.coaddName + "Coadd_mergeDet_schema", immediate=True).schema
519  self.schemaMapper.addMinimalSchema(schema)
520  self.schema = self.schemaMapper.getOutputSchema()
522  if self.config.doDeblend:
523  if peakSchema is None:
524  assert butler is not None, "Neither butler nor peakSchema is defined"
525  peakSchema = butler.get(self.config.coaddName + "Coadd_peak_schema", immediate=True).schema
526  self.makeSubtask("deblend", schema=self.schema, peakSchema=peakSchema)
527  self.makeSubtask("measurement", schema=self.schema, algMetadata=self.algMetadata)
528  self.makeSubtask("astrometry", schema=self.schema)
529 
530  self.isPatchInnerKey = self.schema.addField(
531  "detect_isPatchInner", type="Flag",
532  doc="true if source is in the inner region of a coadd patch",
533  )
534  self.isTractInnerKey = self.schema.addField(
535  "detect_isTractInner", type="Flag",
536  doc="true if source is in the inner region of a coadd tract",
537  )
538  self.isPrimaryKey = self.schema.addField(
539  "detect_isPrimary", type="Flag",
540  doc="true if source has no children and is in the inner region of a coadd patch " \
541  + "and is in the inner region of a coadd tract",
542  )
543 
544  def run(self, patchRef):
545  """Measure and deblend"""
546  exposure = patchRef.get(self.config.coaddName + "Coadd_calexp_det", immediate=True)
547  sources = self.readSources(patchRef)
548  if self.config.doDeblend:
549  self.deblend.run(exposure, sources, exposure.getPsf())
550 
551  bigKey = sources.schema["deblend_parentTooBig"].asKey()
552  numBig = sum((s.get(bigKey) for s in sources)) # catalog is non-contiguous so can't extract column
553  if numBig > 0:
554  self.log.warn("Patch %s contains %d large footprints that were not deblended" %
555  (patchRef.dataId, numBig))
556  self.measurement.run(exposure, sources)
557  self.setIsPrimaryFlag(sources, patchRef)
558  if self.config.doMatchSources:
559  self.writeMatches(patchRef, exposure, sources)
560  self.write(patchRef, sources)
561 
562  def readSources(self, dataRef):
563  """Read input sources
564 
565  We also need to add columns to hold the measurements we're about to make
566  so we can measure in-place.
567  """
568  merged = dataRef.get(self.config.coaddName + "Coadd_mergeDet", immediate=True)
569  self.log.info("Read %d detections: %s" % (len(merged), dataRef.dataId))
570  idFactory = self.makeIdFactory(dataRef)
571  for s in merged:
572  idFactory.notify(s.getId())
573  table = afwTable.SourceTable.make(self.schema, idFactory)
574  sources = afwTable.SourceCatalog(table)
575  sources.extend(merged, self.schemaMapper)
576  return sources
577 
578  def setIsPrimaryFlag(self, sources, patchRef):
579  """Set is-primary and related flags on sources
580 
581  sources: a SourceTable
582  - reads centroid fields and an nChild field
583  - writes is-patch-inner, is-tract-inner and is-primary flags
584  patchRef: a patch reference (for retrieving sky info)
585 
586  Will raise RuntimeError if self.config.doDeblend and the nChild key is not found in the table
587  """
588  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
589 
590  # Test for the presence of the nchild key instead of checking config.doDeblend because sources
591  # might be unpersisted with deblend info, even if deblending is not run again.
592  nChildKeyName = "deblend_nChild"
593  try:
594  nChildKey = self.schema.find(nChildKeyName).key
595  except Exception:
596  nChildKey = None
597 
598  if self.config.doDeblend and nChildKey is None:
599  # deblending was run but the nChildKey was not found; this suggests a variant deblender
600  # was used that we cannot use the output from, or some obscure error.
601  raise RuntimeError("Ran the deblender but cannot find %r in the source table" % (nChildKeyName,))
602 
603  # set inner flags for each source and set primary flags for sources with no children
604  # (or all sources if deblend info not available)
605  innerFloatBBox = afwGeom.Box2D(skyInfo.patchInfo.getInnerBBox())
606  tractId = skyInfo.tractInfo.getId()
607  for source in sources:
608  if source.getCentroidFlag():
609  # centroid unknown, so leave the inner and primary flags False
610  continue
611 
612  centroidPos = source.getCentroid()
613  # Skip source whose centroidPos is nan
614  # I do not know why this can happen (NY)
615  if centroidPos[0] != centroidPos[0] or centroidPos[1] != centroidPos[1]:
616  continue
617  isPatchInner = innerFloatBBox.contains(centroidPos)
618  source.setFlag(self.isPatchInnerKey, isPatchInner)
619 
620  skyPos = source.getCoord()
621  sourceInnerTractId = skyInfo.skyMap.findTract(skyPos).getId()
622  isTractInner = sourceInnerTractId == tractId
623  source.setFlag(self.isTractInnerKey, isTractInner)
624 
625  if nChildKey is None or source.get(nChildKey) == 0:
626  source.setFlag(self.isPrimaryKey, isPatchInner and isTractInner)
627 
628  def writeMatches(self, dataRef, exposure, sources):
629  """Write matches of the sources to the astrometric reference catalog
630 
631  We use the Wcs in the exposure to match sources.
632 
633  dataRef: data reference
634  exposure: exposure with Wcs
635  sources: source catalog
636  """
637  result = self.astrometry.loadAndMatch(exposure=exposure, sourceCat=sources)
638  if result.matches:
639  matches = afwTable.packMatches(result.matches)
640  matches.table.setMetadata(result.matchMeta)
641  dataRef.put(matches, self.config.coaddName + "Coadd_srcMatch")
642 
643  def write(self, dataRef, sources):
644  """Write the source catalog"""
645  dataRef.put(sources, self.config.coaddName + "Coadd_meas")
646  self.log.info("Wrote %d sources: %s" % (len(sources), dataRef.dataId))
647 
648 
649 ##############################################################################################################
650 
652  """Measure measurements from multiple bands"""
653  _DefaultName = "mergeCoaddMeasurements"
654  inputDataset = "meas"
655  outputDataset = "ref"
656  getSchemaCatalogs = _makeGetSchemaCatalogs("ref")
657 
658  def __init__(self, butler=None, schema=None, **kwargs):
659  """Initialize the task.
660 
661  Additional keyword arguments (forwarded to MergeSourcesTask.__init__):
662  - schema: the schema of the detection catalogs used as input to this one
663  - butler: a butler used to read the input schema from disk, if schema is None
664 
665  The task will set its own self.schema attribute to the schema of the output merged catalog.
666  """
667  MergeSourcesTask.__init__(self, butler=butler, schema=schema, **kwargs)
668  inputSchema = self.getInputSchema(butler=butler, schema=schema)
669  self.schemaMapper = afwTable.SchemaMapper(inputSchema, True)
670  self.schemaMapper.addMinimalSchema(inputSchema, True)
671  self.flagKeys = {}
672  for band in self.config.priorityList:
673  short = getShortFilterName(band)
674  outputKey = self.schemaMapper.editOutputSchema().addField(
675  "merge_measurement_%s" % short,
676  type="Flag",
677  doc="Flag field set if the measurements here are from the %s filter" % band
678  )
679  peakKey = inputSchema.find("merge_peak_%s" % short).key
680  footprintKey = inputSchema.find("merge_footprint_%s" % short).key
681  self.flagKeys[band] = Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
682  self.schema = self.schemaMapper.getOutputSchema()
683 
684  def mergeCatalogs(self, catalogs, patchRef):
685  """Merge measurement catalogs to create a single reference catalog for forced photometry
686 
687  For parent sources, we choose the first band in config.priorityList for which the
688  merge_footprint flag for that band is is True.
689 
690  For child sources, the logic is the same, except that we use the merge_peak flags.
691  """
692  # Put catalogs, filters in priority order
693  orderedCatalogs = [catalogs[band] for band in self.config.priorityList if band in catalogs.keys()]
694  orderedKeys = [self.flagKeys[band] for band in self.config.priorityList if band in catalogs.keys()]
695 
696  mergedCatalog = afwTable.SourceCatalog(self.schema)
697  mergedCatalog.reserve(len(orderedCatalogs[0]))
698 
699  idKey = orderedCatalogs[0].table.getIdKey()
700  for catalog in orderedCatalogs[1:]:
701  if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
702  raise ValueError("Error in inputs to MergeCoaddMeasurements: source IDs do not match")
703 
704  # This first zip iterates over all the catalogs simultaneously, yielding a sequence of one
705  # record for each band, in order.
706  for n, orderedRecords in enumerate(zip(*orderedCatalogs)):
707  # Now we iterate over those record-band pairs, until we find the one with the right flag set.
708  for inputRecord, flagKeys in zip(orderedRecords, orderedKeys):
709  bestParent = (inputRecord.getParent() == 0 and inputRecord.get(flagKeys.footprint))
710  bestChild = (inputRecord.getParent() != 0 and inputRecord.get(flagKeys.peak))
711  if bestParent or bestChild:
712  outputRecord = mergedCatalog.addNew()
713  outputRecord.assign(inputRecord, self.schemaMapper)
714  outputRecord.set(flagKeys.output, True)
715  break
716  else: # if we didn't break (i.e. didn't find any record with right flag set)
717  raise ValueError("Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
718  inputRecord.getId())
719 
720  # more checking for sane inputs, since zip silently iterates over the smallest sequence
721  for inputCatalog in orderedCatalogs:
722  if len(mergedCatalog) != len(inputCatalog):
723  raise ValueError("Mismatch between catalog sizes: %s != %s" %
724  (len(mergedCatalog), len(orderedCatalogs)))
725 
726  return mergedCatalog
Class for storing ordered metadata with comments.
Definition: PropertyList.h:81
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:19
boost::enable_if< typename ExpressionTraits< Scalar >::IsScalar, Scalar >::type sum(Scalar const &scalar)
Definition: operators.h:1250
Pass parameters to a Statistics objectA class to pass parameters which control how the stats are calc...
Definition: Statistics.h:92
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
Definition: fwd.h:55
Holds an integer identifier for an LSST filter.
Definition: Filter.h:107
BaseCatalog packMatches(std::vector< Match< Record1, Record2 > > const &matches)
Return a table representation of a MatchVector that can be used to persist it.
Statistics makeStatistics(afwImage::Mask< afwImage::MaskPixel > const &msk, int const flags, StatisticsControl const &sctrl)
Specialization to handle Masks.
Definition: Statistics.cc:1082
def getSkyInfo
Return SkyMap, tract and patch.
Definition: coaddBase.py:212
A floating-point coordinate rectangle geometry.
Definition: Box.h:271
def write
Write out results from runDetection.
Definition: multiBand.py:184