26 from lsst.pipe.base import CmdLineTask, Struct, TaskRunner, ArgumentParser, ButlerInitializedTaskRunner
27 from lsst.pex.config import Config, Field, ListField, ConfigurableField, RangeField, ConfigField
42 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
43 * deepCoadd_mergeDet: merged detections (tract, patch)
44 * deepCoadd_meas: measurements of merged detections (tract, patch, filter)
45 * deepCoadd_ref: reference sources (tract, patch)
46 All of these have associated *_schema catalogs that require no data ID and hold no records.
48 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
49 the mergeDet, meas, and ref dataset Footprints:
50 * deepCoadd_peak_schema
55 """Construct a getSchemaCatalogs instance method
57 These are identical for most of the classes here, so we'll consolidate
60 datasetSuffix: Suffix of dataset name, e.g., "src" for "deepCoadd_src"
62 def getSchemaCatalogs(self):
63 """Return a dict of empty catalogs for each catalog dataset produced by this task."""
65 if hasattr(self,
"algMetadata"):
66 src.getTable().setMetadata(self.algMetadata)
67 return {self.config.coaddName +
"Coadd_" + datasetSuffix: src}
68 return getSchemaCatalogs
71 """Construct a makeIdFactory instance method
73 These are identical for all the classes here, so this consolidates
76 datasetName: Dataset name without the coadd name prefix, e.g., "CoaddId" for "deepCoaddId"
78 def makeIdFactory(self, dataRef):
79 """Return an IdFactory for setting the detection identifiers
81 The actual parameters used in the IdFactory are provided by
82 the butler (through the provided data reference.
84 expBits = dataRef.get(self.config.coaddName + datasetName +
"_bits")
85 expId = long(dataRef.get(self.config.coaddName + datasetName))
86 return afwTable.IdFactory.makeSource(expId, 64 - expBits)
90 """Given a longer, camera-specific filter name (e.g. "HSC-I") return its shorthand name ("i").
100 doScaleVariance = Field(dtype=bool, default=
True, doc=
"Scale variance plane using empirical noise?")
101 detection = ConfigurableField(target=SourceDetectionTask, doc=
"Source detection")
102 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
105 Config.setDefaults(self)
106 self.detection.thresholdType =
"pixel_stdev"
107 self.detection.isotropicGrow =
True
108 self.detection.background.undersampleStyle =
'REDUCE_INTERP_ORDER'
112 """Detect sources on a coadd
114 This operation is performed separately in each band. The detections from
115 each band will be merged before performing the measurement stage.
117 _DefaultName =
"detectCoaddSources"
118 ConfigClass = DetectCoaddSourcesConfig
124 parser = ArgumentParser(name=cls._DefaultName)
125 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
126 ContainerClass=ExistingCoaddDataIdContainer)
130 """Initialize the task.
132 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
133 - schema: the initial schema for the output catalog, modified-in place to include all
134 fields set by this task. If None, the source minimal schema will be used.
136 CmdLineTask.__init__(self, **kwargs)
138 schema = afwTable.SourceTable.makeMinimalSchema()
140 self.makeSubtask(
"detection", schema=self.
schema)
143 """Run detection on a coadd"""
144 exposure = patchRef.get(self.config.coaddName +
"Coadd", immediate=
True)
145 if self.config.doScaleVariance:
148 self.
write(exposure, results, patchRef)
151 """Scale the variance in a maskedImage
153 Scales the variance plane to match the measured variance.
156 ctrl.setAndMask(~0x0)
157 var = maskedImage.getVariance()
158 mask = maskedImage.getMask()
161 vrat = dstats / vstats
162 self.log.info(
"Renormalising variance by %f" % (vrat))
166 """Run detection on an exposure
168 exposure: Exposure on which to detect
169 idFactory: IdFactory to set source identifiers
171 Returns: Struct(sources: catalog of detections,
172 backgrounds: list of backgrounds
175 backgrounds = afwMath.BackgroundList()
176 table = afwTable.SourceTable.make(self.
schema, idFactory)
177 detections = self.detection.makeSourceCatalog(table, exposure)
178 sources = detections.sources
179 fpSets = detections.fpSets
180 if fpSets.background:
181 backgrounds.append(fpSets.background)
182 return Struct(sources=sources, backgrounds=backgrounds)
184 def write(self, exposure, results, patchRef):
185 """!Write out results from runDetection
187 @param exposure: Exposure to write out
188 @param results: Struct returned from runDetection
189 @param patchRef: data reference for patch
191 coaddName = self.config.coaddName +
"Coadd"
192 patchRef.put(results.backgrounds, coaddName +
"_calexp_detBackground")
193 patchRef.put(results.sources, coaddName +
"_det")
194 patchRef.put(exposure, coaddName +
"_calexp_det")
200 """Provide a butler to the Task constructor"""
201 if parsedCmd
is not None:
202 butler = parsedCmd.butler
203 elif args
is not None:
204 dataRefList, kwargs = args
205 butler = dataRefList[0].getButler()
207 raise RuntimeError(
"Neither parsedCmd or args specified")
208 return self.TaskClass(config=self.config, log=self.log, butler=butler)
212 """Provide a list of patch references for each patch
214 The patch references within the list will have different filters.
217 for ref
in parsedCmd.id.refList:
218 tract = ref.dataId[
"tract"]
219 patch = ref.dataId[
"patch"]
220 filter = ref.dataId[
"filter"]
221 if not tract
in refList:
223 if not patch
in refList[tract]:
224 refList[tract][patch] = {}
225 if filter
in refList[tract][patch]:
226 raise RuntimeError(
"Multiple versions of %s" % (ref.dataId,))
227 refList[tract][patch][filter] = ref
228 return [(p.values(), kwargs)
for t
in refList.itervalues()
for p
in t.itervalues()]
232 priorityList = ListField(dtype=str, default=[],
233 doc=
"Priority-ordered list of bands for the merge.")
234 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
237 Config.validate(self)
239 raise RuntimeError(
"No priority list provided")
243 """A base class for merging source catalogs
245 Merging detections (MergeDetectionsTask) and merging measurements
246 (MergeMeasurementsTask) are currently so similar that it makes
247 sense to re-use the code, in the form of this abstract base class.
249 Sub-classes should set the following class variables:
250 * _DefaultName: name of Task
251 * inputDataset: name of dataset to read
252 * outputDataset: name of dataset to write
253 * getSchemaCatalogs to the output of _makeGetSchemaCatalogs(outputDataset)
255 In addition, sub-classes must implement the mergeCatalogs method.
258 ConfigClass = MergeSourcesConfig
259 RunnerClass = MergeSourcesRunner
262 getSchemaCatalogs =
None
266 """Create a suitable ArgumentParser
268 We will use the ArgumentParser to get a provide a list of data
269 references for patches; the RunnerClass will sort them into lists
270 of data references for the same patch
272 parser = ArgumentParser(name=cls._DefaultName)
273 parser.add_id_argument(
"--id",
"deepCoadd_" + cls.inputDataset,
274 ContainerClass=ExistingCoaddDataIdContainer,
275 help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i")
280 assert butler
is not None,
"Neither butler nor schema specified"
281 schema = butler.get(self.config.coaddName +
"Coadd_" + self.
inputDataset +
"_schema",
282 immediate=
True).schema
285 def __init__(self, butler=None, schema=None, **kwargs):
286 """Initialize the task.
288 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
289 - schema: the schema of the detection catalogs used as input to this one
290 - butler: a butler used to read the input schema from disk, if schema is None
292 Derived classes should use the getInputSchema() method to handle the additional
293 arguments and retreive the actual input schema.
295 CmdLineTask.__init__(self, **kwargs)
297 def run(self, patchRefList):
298 """Merge coadd sources from multiple bands
300 patchRefList: list of patch data reference for each filter
302 catalogs = dict(self.
readCatalog(patchRef)
for patchRef
in patchRefList)
303 mergedCatalog = self.
mergeCatalogs(catalogs, patchRefList[0])
304 self.
write(patchRefList[0], mergedCatalog)
307 """Read input catalog
309 We read the input dataset provided by the 'inputDataset'
312 filterName = patchRef.dataId[
"filter"]
313 catalog = patchRef.get(self.config.coaddName +
"Coadd_" + self.
inputDataset, immediate=
True)
314 self.log.info(
"Read %d sources for filter %s: %s" % (len(catalog), filterName, patchRef.dataId))
315 return filterName, catalog
318 """Merge multiple catalogs
320 catalogs: dict mapping filter name to source catalog
322 Returns: merged catalog
324 raise NotImplementedError()
329 We write as the dataset provided by the 'outputDataset'
332 patchRef.put(catalog, self.config.coaddName +
"Coadd_" + self.
outputDataset)
335 mergeDataId = patchRef.dataId.copy()
336 del mergeDataId[
"filter"]
337 self.log.info(
"Wrote merged catalog: %s" % (mergeDataId,))
340 """No metadata to write, and not sure how to write it for a list of dataRefs"""
344 class CullPeaksConfig(Config):
345 """Configuration for culling garbage peaks after merging Footprints.
347 Peaks may also be culled after detection or during deblending; this configuration object
348 only deals with culling after merging Footprints.
350 These cuts are based on three quantities:
351 - nBands: the number of bands in which the peak was detected
352 - peakRank: the position of the peak within its family, sorted from brightest to faintest.
353 - peakRankNormalized: the peak rank divided by the total number of peaks in the family.
355 The formula that identifie peaks to cull is:
357 nBands < nBandsSufficient
358 AND (rank >= rankSufficient)
359 AND (rank >= rankConsider OR rank >= rankNormalizedConsider)
361 To disable peak culling, simply set nBandsSafe=1.
364 nBandsSufficient = RangeField(dtype=int, default=2, min=1,
365 doc=
"Always keep peaks detected in this many bands")
366 rankSufficient = RangeField(dtype=int, default=20, min=1,
367 doc=
"Always keep this many peaks in each family")
368 rankConsidered = RangeField(dtype=int, default=30, min=1,
369 doc=(
"Keep peaks with less than this rank that also match the "
370 "rankNormalizedConsidered condition."))
371 rankNormalizedConsidered = RangeField(dtype=float, default=0.7, min=0.0,
372 doc=(
"Keep peaks with less than this normalized rank that"
373 " also match the rankConsidered condition."))
377 minNewPeak = Field(dtype=float, default=1,
378 doc=
"Minimum distance from closest peak to create a new one (in arcsec).")
380 maxSamePeak = Field(dtype=float, default=0.3,
381 doc=
"When adding new catalogs to the merge, all peaks less than this distance "
382 " (in arcsec) to an existing peak will be flagged as detected in that catalog.")
383 cullPeaks = ConfigField(dtype=CullPeaksConfig, doc=
"Configuration for how to cull peaks.")
387 """Merge detections from multiple bands"""
388 ConfigClass = MergeDetectionsConfig
389 _DefaultName =
"mergeCoaddDetections"
391 outputDataset =
"mergeDet"
394 def __init__(self, butler=None, schema=None, **kwargs):
395 """Initialize the task.
397 Additional keyword arguments (forwarded to MergeSourcesTask.__init__):
398 - schema: the schema of the detection catalogs used as input to this one
399 - butler: a butler used to read the input schema from disk, if schema is None
401 The task will set its own self.schema attribute to the schema of the output merged catalog.
403 MergeSourcesTask.__init__(self, butler=butler, schema=schema, **kwargs)
411 """Merge multiple catalogs
415 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
416 tractWcs = skyInfo.wcs
417 peakDistance = self.config.minNewPeak / tractWcs.pixelScale().asArcseconds()
418 samePeakDistance = self.config.maxSamePeak / tractWcs.pixelScale().asArcseconds()
421 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
423 if band
in catalogs.keys()]
425 mergedList = self.merged.getMergedSourceCatalog(orderedCatalogs, orderedBands, peakDistance,
430 for record
in mergedList:
431 record.getFootprint().sortPeaks()
432 self.log.info(
"Merged to %d sources" % len(mergedList))
438 """Attempt to remove garbage peaks (mostly on the outskirts of large blends)"""
439 keys = [item.key
for item
in self.merged.getPeakSchema().extract(
"merge.peak.*").itervalues()]
442 for parentSource
in catalog:
445 keptPeaks = parentSource.getFootprint().getPeaks()
446 oldPeaks = list(keptPeaks)
448 familySize = len(oldPeaks)
449 totalPeaks += familySize
450 for rank, peak
in enumerate(oldPeaks):
451 if ((rank < self.config.cullPeaks.rankSufficient)
or
452 (self.config.cullPeaks.nBandsSufficient > 1
and
453 sum([peak.get(k)
for k
in keys]) >= self.config.cullPeaks.nBandsSufficient)
or
454 (rank < self.config.cullPeaks.rankConsidered
and
455 rank < self.config.cullPeaks.rankNormalizedConsidered * familySize)):
456 keptPeaks.append(peak)
459 self.log.info(
"Culled %d of %d peaks" % (culledPeaks, totalPeaks))
462 """Return a dict of empty catalogs for each catalog dataset produced by this task."""
465 return {self.config.coaddName +
"Coadd_mergeDet": mergeDet,
466 self.config.coaddName +
"Coadd_peak": peak}
471 doDeblend = Field(dtype=bool, default=
True, doc=
"Deblend sources?")
472 deblend = ConfigurableField(target=SourceDeblendTask, doc=
"Deblend sources")
473 measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc=
"Source measurement")
474 doMatchSources = Field(dtype=bool, default=
False, doc=
"Match sources to reference catalog?")
475 astrometry = ConfigurableField(target=ANetAstrometryTask, doc=
"Astrometric matching")
476 coaddName = Field(dtype=str, default=
"deep", doc=
"Name of coadd")
479 Config.setDefaults(self)
480 self.deblend.propagateAllPeaks =
True
483 """Measure sources using the merged catalog of detections
485 This operation is performed separately on each band. We deblend and measure on
486 the list of merge detections. The results from each band will subsequently
487 be merged to create a final reference catalog for forced measurement.
489 _DefaultName =
"measureCoaddSources"
490 ConfigClass = MeasureMergedCoaddSourcesConfig
491 RunnerClass = ButlerInitializedTaskRunner
497 parser = ArgumentParser(name=cls._DefaultName)
498 parser.add_id_argument(
"--id",
"deepCoadd", help=
"data ID, e.g. --id tract=12345 patch=1,2 filter=r",
499 ContainerClass=ExistingCoaddDataIdContainer)
502 def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
503 """Initialize the task.
505 Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
506 - schema: the schema of the merged detection catalog used as input to this one
507 - peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog
508 - butler: a butler used to read the input schemas from disk, if schema or peakSchema is None
510 The task will set its own self.schema attribute to the schema of the output measurement catalog.
511 This will include all fields from the input schema, as well as additional fields for all the
514 CmdLineTask.__init__(self, **kwargs)
516 assert butler
is not None,
"Neither butler nor schema is defined"
517 schema = butler.get(self.config.coaddName +
"Coadd_mergeDet_schema", immediate=
True).schema
519 self.schemaMapper.addMinimalSchema(schema)
520 self.
schema = self.schemaMapper.getOutputSchema()
522 if self.config.doDeblend:
523 if peakSchema
is None:
524 assert butler
is not None,
"Neither butler nor peakSchema is defined"
525 peakSchema = butler.get(self.config.coaddName +
"Coadd_peak_schema", immediate=
True).schema
526 self.makeSubtask(
"deblend", schema=self.
schema, peakSchema=peakSchema)
527 self.makeSubtask(
"measurement", schema=self.
schema, algMetadata=self.
algMetadata)
528 self.makeSubtask(
"astrometry", schema=self.
schema)
531 "detect_isPatchInner", type=
"Flag",
532 doc=
"true if source is in the inner region of a coadd patch",
535 "detect_isTractInner", type=
"Flag",
536 doc=
"true if source is in the inner region of a coadd tract",
539 "detect_isPrimary", type=
"Flag",
540 doc=
"true if source has no children and is in the inner region of a coadd patch " \
541 +
"and is in the inner region of a coadd tract",
545 """Measure and deblend"""
546 exposure = patchRef.get(self.config.coaddName +
"Coadd_calexp_det", immediate=
True)
548 if self.config.doDeblend:
549 self.deblend.run(exposure, sources, exposure.getPsf())
551 bigKey = sources.schema[
"deblend_parentTooBig"].asKey()
552 numBig =
sum((s.get(bigKey)
for s
in sources))
554 self.log.warn(
"Patch %s contains %d large footprints that were not deblended" %
555 (patchRef.dataId, numBig))
556 self.measurement.run(exposure, sources)
558 if self.config.doMatchSources:
560 self.
write(patchRef, sources)
563 """Read input sources
565 We also need to add columns to hold the measurements we're about to make
566 so we can measure in-place.
568 merged = dataRef.get(self.config.coaddName +
"Coadd_mergeDet", immediate=
True)
569 self.log.info(
"Read %d detections: %s" % (len(merged), dataRef.dataId))
572 idFactory.notify(s.getId())
573 table = afwTable.SourceTable.make(self.
schema, idFactory)
579 """Set is-primary and related flags on sources
581 sources: a SourceTable
582 - reads centroid fields and an nChild field
583 - writes is-patch-inner, is-tract-inner and is-primary flags
584 patchRef: a patch reference (for retrieving sky info)
586 Will raise RuntimeError if self.config.doDeblend and the nChild key is not found in the table
588 skyInfo =
getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
592 nChildKeyName =
"deblend_nChild"
594 nChildKey = self.schema.find(nChildKeyName).key
598 if self.config.doDeblend
and nChildKey
is None:
601 raise RuntimeError(
"Ran the deblender but cannot find %r in the source table" % (nChildKeyName,))
605 innerFloatBBox =
afwGeom.Box2D(skyInfo.patchInfo.getInnerBBox())
606 tractId = skyInfo.tractInfo.getId()
607 for source
in sources:
608 if source.getCentroidFlag():
612 centroidPos = source.getCentroid()
615 if centroidPos[0] != centroidPos[0]
or centroidPos[1] != centroidPos[1]:
617 isPatchInner = innerFloatBBox.contains(centroidPos)
620 skyPos = source.getCoord()
621 sourceInnerTractId = skyInfo.skyMap.findTract(skyPos).getId()
622 isTractInner = sourceInnerTractId == tractId
625 if nChildKey
is None or source.get(nChildKey) == 0:
626 source.setFlag(self.
isPrimaryKey, isPatchInner
and isTractInner)
629 """Write matches of the sources to the astrometric reference catalog
631 We use the Wcs in the exposure to match sources.
633 dataRef: data reference
634 exposure: exposure with Wcs
635 sources: source catalog
637 result = self.astrometry.loadAndMatch(exposure=exposure, sourceCat=sources)
640 matches.table.setMetadata(result.matchMeta)
641 dataRef.put(matches, self.config.coaddName +
"Coadd_srcMatch")
644 """Write the source catalog"""
645 dataRef.put(sources, self.config.coaddName +
"Coadd_meas")
646 self.log.info(
"Wrote %d sources: %s" % (len(sources), dataRef.dataId))
652 """Measure measurements from multiple bands"""
653 _DefaultName =
"mergeCoaddMeasurements"
654 inputDataset =
"meas"
655 outputDataset =
"ref"
658 def __init__(self, butler=None, schema=None, **kwargs):
659 """Initialize the task.
661 Additional keyword arguments (forwarded to MergeSourcesTask.__init__):
662 - schema: the schema of the detection catalogs used as input to this one
663 - butler: a butler used to read the input schema from disk, if schema is None
665 The task will set its own self.schema attribute to the schema of the output merged catalog.
667 MergeSourcesTask.__init__(self, butler=butler, schema=schema, **kwargs)
670 self.schemaMapper.addMinimalSchema(inputSchema,
True)
672 for band
in self.config.priorityList:
674 outputKey = self.schemaMapper.editOutputSchema().addField(
675 "merge_measurement_%s" % short,
677 doc=
"Flag field set if the measurements here are from the %s filter" % band
679 peakKey = inputSchema.find(
"merge_peak_%s" % short).key
680 footprintKey = inputSchema.find(
"merge_footprint_%s" % short).key
681 self.
flagKeys[band] = Struct(peak=peakKey, footprint=footprintKey, output=outputKey)
682 self.
schema = self.schemaMapper.getOutputSchema()
685 """Merge measurement catalogs to create a single reference catalog for forced photometry
687 For parent sources, we choose the first band in config.priorityList for which the
688 merge_footprint flag for that band is is True.
690 For child sources, the logic is the same, except that we use the merge_peak flags.
693 orderedCatalogs = [catalogs[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
694 orderedKeys = [self.
flagKeys[band]
for band
in self.config.priorityList
if band
in catalogs.keys()]
697 mergedCatalog.reserve(len(orderedCatalogs[0]))
699 idKey = orderedCatalogs[0].table.getIdKey()
700 for catalog
in orderedCatalogs[1:]:
701 if numpy.any(orderedCatalogs[0].get(idKey) != catalog.get(idKey)):
702 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: source IDs do not match")
706 for n, orderedRecords
in enumerate(zip(*orderedCatalogs)):
708 for inputRecord, flagKeys
in zip(orderedRecords, orderedKeys):
709 bestParent = (inputRecord.getParent() == 0
and inputRecord.get(flagKeys.footprint))
710 bestChild = (inputRecord.getParent() != 0
and inputRecord.get(flagKeys.peak))
711 if bestParent
or bestChild:
712 outputRecord = mergedCatalog.addNew()
714 outputRecord.set(flagKeys.output,
True)
717 raise ValueError(
"Error in inputs to MergeCoaddMeasurements: no valid reference for %s" %
721 for inputCatalog
in orderedCatalogs:
722 if len(mergedCatalog) != len(inputCatalog):
723 raise ValueError(
"Mismatch between catalog sizes: %s != %s" %
724 (len(mergedCatalog), len(orderedCatalogs)))
Class for storing ordered metadata with comments.
A mapping between the keys of two Schemas, used to copy data between them.
boost::enable_if< typename ExpressionTraits< Scalar >::IsScalar, Scalar >::type sum(Scalar const &scalar)
Pass parameters to a Statistics objectA class to pass parameters which control how the stats are calc...
Custom catalog class for record/table subclasses that are guaranteed to have an ID, and should generally be sorted by that ID.
def _makeGetSchemaCatalogs
Holds an integer identifier for an LSST filter.
BaseCatalog packMatches(std::vector< Match< Record1, Record2 > > const &matches)
Return a table representation of a MatchVector that can be used to persist it.
Statistics makeStatistics(afwImage::Mask< afwImage::MaskPixel > const &msk, int const flags, StatisticsControl const &sctrl)
Specialization to handle Masks.
def getSkyInfo
Return SkyMap, tract and patch.
A floating-point coordinate rectangle geometry.
def write
Write out results from runDetection.