LSSTApplications  17.0+124,17.0+14,17.0+73,18.0.0+37,18.0.0+80,18.0.0-4-g68ffd23+4,18.1.0-1-g0001055+12,18.1.0-1-g03d53ef+5,18.1.0-1-g1349e88+55,18.1.0-1-g2505f39+44,18.1.0-1-g5315e5e+4,18.1.0-1-g5e4b7ea+14,18.1.0-1-g7e8fceb+4,18.1.0-1-g85f8cd4+48,18.1.0-1-g8ff0b9f+4,18.1.0-1-ga2c679d+1,18.1.0-1-gd55f500+35,18.1.0-10-gb58edde+2,18.1.0-11-g0997b02+4,18.1.0-13-gfe4edf0b+12,18.1.0-14-g259bd21+21,18.1.0-19-gdb69f3f+2,18.1.0-2-g5f9922c+24,18.1.0-2-gd3b74e5+11,18.1.0-2-gfbf3545+32,18.1.0-26-g728bddb4+5,18.1.0-27-g6ff7ca9+2,18.1.0-3-g52aa583+25,18.1.0-3-g8ea57af+9,18.1.0-3-gb69f684+42,18.1.0-3-gfcaddf3+6,18.1.0-32-gd8786685a,18.1.0-4-gf3f9b77+6,18.1.0-5-g1dd662b+2,18.1.0-5-g6dbcb01+41,18.1.0-6-gae77429+3,18.1.0-7-g9d75d83+9,18.1.0-7-gae09a6d+30,18.1.0-9-gc381ef5+4,w.2019.46
LSSTDataManagementBasePackage
multiBand.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 #
3 # LSST Data Management System
4 # Copyright 2008-2015 AURA/LSST.
5 #
6 # This product includes software developed by the
7 # LSST Project (http://www.lsst.org/).
8 #
9 # This program is free software: you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation, either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the LSST License Statement and
20 # the GNU General Public License along with this program. If not,
21 # see <https://www.lsstcorp.org/LegalNotices/>.
22 #
23 from lsst.coadd.utils.coaddDataIdContainer import ExistingCoaddDataIdContainer
24 from lsst.pipe.base import (CmdLineTask, Struct, ArgumentParser, ButlerInitializedTaskRunner,
25  PipelineTask, PipelineTaskConfig, PipelineTaskConnections)
27 from lsst.pex.config import Config, Field, ConfigurableField
28 from lsst.meas.algorithms import DynamicDetectionTask, ReferenceObjectLoader
29 from lsst.meas.base import SingleFrameMeasurementTask, ApplyApCorrTask, CatalogCalculationTask
30 from lsst.meas.deblender import SourceDeblendTask
31 from lsst.meas.extensions.scarlet import ScarletDeblendTask
32 from lsst.pipe.tasks.coaddBase import getSkyInfo
33 from lsst.pipe.tasks.scaleVariance import ScaleVarianceTask
34 from lsst.meas.astrom import DirectMatchTask, denormalizeMatches
35 from lsst.pipe.tasks.fakes import BaseFakeSourcesTask
36 from lsst.pipe.tasks.setPrimaryFlags import SetPrimaryFlagsTask
37 from lsst.pipe.tasks.propagateVisitFlags import PropagateVisitFlagsTask
38 import lsst.afw.image as afwImage
39 import lsst.afw.table as afwTable
40 import lsst.afw.math as afwMath
41 from lsst.daf.base import PropertyList
42 
43 from .mergeDetections import MergeDetectionsConfig, MergeDetectionsTask # noqa: F401
44 from .mergeMeasurements import MergeMeasurementsConfig, MergeMeasurementsTask # noqa: F401
45 from .multiBandUtils import MergeSourcesRunner, CullPeaksConfig, _makeGetSchemaCatalogs # noqa: F401
46 from .multiBandUtils import getInputSchema, getShortFilterName, readCatalog, _makeMakeIdFactory # noqa: F401
47 from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesSingleConfig # noqa: F401
48 from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesSingleTask # noqa: F401
49 from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiConfig # noqa: F401
50 from .deblendCoaddSourcesPipeline import DeblendCoaddSourcesMultiTask # noqa: F401
51 
52 
53 """
54 New set types:
55 * deepCoadd_det: detections from what used to be processCoadd (tract, patch, filter)
56 * deepCoadd_mergeDet: merged detections (tract, patch)
57 * deepCoadd_meas: measurements of merged detections (tract, patch, filter)
58 * deepCoadd_ref: reference sources (tract, patch)
59 All of these have associated *_schema catalogs that require no data ID and hold no records.
60 
61 In addition, we have a schema-only dataset, which saves the schema for the PeakRecords in
62 the mergeDet, meas, and ref dataset Footprints:
63 * deepCoadd_peak_schema
64 """
65 
66 
67 
68 class DetectCoaddSourcesConnections(PipelineTaskConnections, dimensions=("tract", "patch", "abstract_filter", "skymap"),
69  defaultTemplates={"inputCoaddName": "deep", "outputCoaddName": "deep"}):
70  detectionSchema = cT.InitOutput(
71  doc="Schema of the detection catalog",
72  name="{outputCoaddName}Coadd_det_schema",
73  storageClass="SourceCatalog",
74  )
75  exposure = cT.Input(
76  doc="Exposure on which detections are to be performed",
77  name="{inputCoaddName}Coadd",
78  storageClass="ExposureF",
79  dimensions=("tract", "patch", "abstract_filter", "skymap")
80  )
81  outputBackgrounds = cT.Output(
82  doc="Output Backgrounds used in detection",
83  name="{outputCoaddName}Coadd_calexp_background",
84  storageClass="Background",
85  dimensions=("tract", "patch", "abstract_filter", "skymap")
86  )
87  outputSources = cT.Output(
88  doc="Detected sources catalog",
89  name="{outputCoaddName}Coadd_det",
90  storageClass="SourceCatalog",
91  dimensions=("tract", "patch", "abstract_filter", "skymap")
92  )
93  outputExposure = cT.Output(
94  doc="Exposure post detection",
95  name="{outputCoaddName}Coadd_calexp",
96  storageClass="ExposureF",
97  dimensions=("tract", "patch", "abstract_filter", "skymap")
98  )
99 
100 
101 class DetectCoaddSourcesConfig(PipelineTaskConfig, pipelineConnections=DetectCoaddSourcesConnections):
102  """!
103  @anchor DetectCoaddSourcesConfig_
104 
105  @brief Configuration parameters for the DetectCoaddSourcesTask
106  """
107  doScaleVariance = Field(dtype=bool, default=True, doc="Scale variance plane using empirical noise?")
108  scaleVariance = ConfigurableField(target=ScaleVarianceTask, doc="Variance rescaling")
109  detection = ConfigurableField(target=DynamicDetectionTask, doc="Source detection")
110  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
111  doInsertFakes = Field(dtype=bool, default=False,
112  doc="Run fake sources injection task")
113  insertFakes = ConfigurableField(target=BaseFakeSourcesTask,
114  doc="Injection of fake sources for testing "
115  "purposes (must be retargeted)")
116  hasFakes = Field(
117  dtype=bool,
118  default=False,
119  doc="Should be set to True if fake sources have been inserted into the input data."
120  )
121 
122  def setDefaults(self):
123  super().setDefaults()
124  self.detection.thresholdType = "pixel_stdev"
125  self.detection.isotropicGrow = True
126  # Coadds are made from background-subtracted CCDs, so any background subtraction should be very basic
127  self.detection.reEstimateBackground = False
128  self.detection.background.useApprox = False
129  self.detection.background.binSize = 4096
130  self.detection.background.undersampleStyle = 'REDUCE_INTERP_ORDER'
131  self.detection.doTempWideBackground = True # Suppress large footprints that overwhelm the deblender
132 
133 
139 
140 
141 class DetectCoaddSourcesTask(PipelineTask, CmdLineTask):
142  r"""!
143  @anchor DetectCoaddSourcesTask_
144 
145  @brief Detect sources on a coadd
146 
147  @section pipe_tasks_multiBand_Contents Contents
148 
149  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose
150  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize
151  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Run
152  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Config
153  - @ref pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug
154  - @ref pipe_tasks_multiband_DetectCoaddSourcesTask_Example
155 
156  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Purpose Description
157 
158  Command-line task that detects sources on a coadd of exposures obtained with a single filter.
159 
160  Coadding individual visits requires each exposure to be warped. This introduces covariance in the noise
161  properties across pixels. Before detection, we correct the coadd variance by scaling the variance plane
162  in the coadd to match the observed variance. This is an approximate approach -- strictly, we should
163  propagate the full covariance matrix -- but it is simple and works well in practice.
164 
165  After scaling the variance plane, we detect sources and generate footprints by delegating to the @ref
166  SourceDetectionTask_ "detection" subtask.
167 
168  @par Inputs:
169  deepCoadd{tract,patch,filter}: ExposureF
170  @par Outputs:
171  deepCoadd_det{tract,patch,filter}: SourceCatalog (only parent Footprints)
172  @n deepCoadd_calexp{tract,patch,filter}: Variance scaled, background-subtracted input
173  exposure (ExposureF)
174  @n deepCoadd_calexp_background{tract,patch,filter}: BackgroundList
175  @par Data Unit:
176  tract, patch, filter
177 
178  DetectCoaddSourcesTask delegates most of its work to the @ref SourceDetectionTask_ "detection" subtask.
179  You can retarget this subtask if you wish.
180 
181  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Initialize Task initialization
182 
183  @copydoc \_\_init\_\_
184 
185  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Run Invoking the Task
186 
187  @copydoc run
188 
189  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Config Configuration parameters
190 
191  See @ref DetectCoaddSourcesConfig_ "DetectSourcesConfig"
192 
193  @section pipe_tasks_multiBand_DetectCoaddSourcesTask_Debug Debug variables
194 
195  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
196  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
197  files.
198 
199  DetectCoaddSourcesTask has no debug variables of its own because it relegates all the work to
200  @ref SourceDetectionTask_ "SourceDetectionTask"; see the documetation for
201  @ref SourceDetectionTask_ "SourceDetectionTask" for further information.
202 
203  @section pipe_tasks_multiband_DetectCoaddSourcesTask_Example A complete example
204  of using DetectCoaddSourcesTask
205 
206  DetectCoaddSourcesTask is meant to be run after assembling a coadded image in a given band. The purpose of
207  the task is to update the background, detect all sources in a single band and generate a set of parent
208  footprints. Subsequent tasks in the multi-band processing procedure will merge sources across bands and,
209  eventually, perform forced photometry. Command-line usage of DetectCoaddSourcesTask expects a data
210  reference to the coadd to be processed. A list of the available optional arguments can be obtained by
211  calling detectCoaddSources.py with the `--help` command line argument:
212  @code
213  detectCoaddSources.py --help
214  @endcode
215 
216  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
217  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has followed
218  steps 1 - 4 at @ref pipeTasks_multiBand, one may detect all the sources in each coadd as follows:
219  @code
220  detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
221  @endcode
222  that will process the HSC-I band data. The results are written to
223  `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I`.
224 
225  It is also necessary to run:
226  @code
227  detectCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
228  @endcode
229  to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
230  processing procedure: @ref MergeDetectionsTask_ "MergeDetectionsTask".
231  """
232  _DefaultName = "detectCoaddSources"
233  ConfigClass = DetectCoaddSourcesConfig
234  getSchemaCatalogs = _makeGetSchemaCatalogs("det")
235  makeIdFactory = _makeMakeIdFactory("CoaddId")
236 
237  @classmethod
238  def _makeArgumentParser(cls):
239  parser = ArgumentParser(name=cls._DefaultName)
240  parser.add_id_argument("--id", "deepCoadd", help="data ID, e.g. --id tract=12345 patch=1,2 filter=r",
241  ContainerClass=ExistingCoaddDataIdContainer)
242  return parser
243 
244  def __init__(self, schema=None, **kwargs):
245  """!
246  @brief Initialize the task. Create the @ref SourceDetectionTask_ "detection" subtask.
247 
248  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
249 
250  @param[in] schema: initial schema for the output catalog, modified-in place to include all
251  fields set by this task. If None, the source minimal schema will be used.
252  @param[in] **kwargs: keyword arguments to be passed to lsst.pipe.base.task.Task.__init__
253  """
254  # N.B. Super is used here to handle the multiple inheritance of PipelineTasks, the init tree
255  # call structure has been reviewed carefully to be sure super will work as intended.
256  super().__init__(**kwargs)
257  if schema is None:
258  schema = afwTable.SourceTable.makeMinimalSchema()
259  if self.config.doInsertFakes:
260  self.makeSubtask("insertFakes")
261  self.schema = schema
262  self.makeSubtask("detection", schema=self.schema)
263  if self.config.doScaleVariance:
264  self.makeSubtask("scaleVariance")
265 
266  self.detectionSchema = afwTable.SourceCatalog(self.schema)
267 
268  def runDataRef(self, patchRef):
269  """!
270  @brief Run detection on a coadd.
271 
272  Invokes @ref run and then uses @ref write to output the
273  results.
274 
275  @param[in] patchRef: data reference for patch
276  """
277  if self.config.hasFakes:
278  exposure = patchRef.get("fakes_" + self.config.coaddName + "Coadd", immediate=True)
279  else:
280  exposure = patchRef.get(self.config.coaddName + "Coadd", immediate=True)
281  expId = int(patchRef.get(self.config.coaddName + "CoaddId"))
282  results = self.run(exposure, self.makeIdFactory(patchRef), expId=expId)
283  self.write(results, patchRef)
284  return results
285 
286  def runQuantum(self, butlerQC, inputRefs, outputRefs):
287  inputs = butlerQC.get(inputRefs)
288  packedId, maxBits = butlerQC.quantum.dataId.pack("tract_patch_abstract_filter", returnMaxBits=True)
289  inputs["idFactory"] = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
290  inputs["expId"] = packedId
291  outputs = self.run(**inputs)
292  butlerQC.put(outputs, outputRefs)
293 
294  def run(self, exposure, idFactory, expId):
295  """!
296  @brief Run detection on an exposure.
297 
298  First scale the variance plane to match the observed variance
299  using @ref ScaleVarianceTask. Then invoke the @ref SourceDetectionTask_ "detection" subtask to
300  detect sources.
301 
302  @param[in,out] exposure: Exposure on which to detect (may be backround-subtracted and scaled,
303  depending on configuration).
304  @param[in] idFactory: IdFactory to set source identifiers
305  @param[in] expId: Exposure identifier (integer) for RNG seed
306 
307  @return a pipe.base.Struct with fields
308  - sources: catalog of detections
309  - backgrounds: list of backgrounds
310  """
311  if self.config.doScaleVariance:
312  varScale = self.scaleVariance.run(exposure.maskedImage)
313  exposure.getMetadata().add("variance_scale", varScale)
314  backgrounds = afwMath.BackgroundList()
315  if self.config.doInsertFakes:
316  self.insertFakes.run(exposure, background=backgrounds)
317  table = afwTable.SourceTable.make(self.schema, idFactory)
318  detections = self.detection.makeSourceCatalog(table, exposure, expId=expId)
319  sources = detections.sources
320  fpSets = detections.fpSets
321  if hasattr(fpSets, "background") and fpSets.background:
322  for bg in fpSets.background:
323  backgrounds.append(bg)
324  return Struct(outputSources=sources, outputBackgrounds=backgrounds, outputExposure=exposure)
325 
326  def write(self, results, patchRef):
327  """!
328  @brief Write out results from runDetection.
329 
330  @param[in] exposure: Exposure to write out
331  @param[in] results: Struct returned from runDetection
332  @param[in] patchRef: data reference for patch
333  """
334  coaddName = self.config.coaddName + "Coadd"
335  patchRef.put(results.outputBackgrounds, coaddName + "_calexp_background")
336  patchRef.put(results.outputSources, coaddName + "_det")
337  if self.config.hasFakes:
338  patchRef.put(results.outputExposure, "fakes_" + coaddName + "_calexp")
339  else:
340  patchRef.put(results.outputExposure, coaddName + "_calexp")
341 
342 
343 
344 
345 class DeblendCoaddSourcesConfig(Config):
346  """DeblendCoaddSourcesConfig
347 
348  Configuration parameters for the `DeblendCoaddSourcesTask`.
349  """
350  singleBandDeblend = ConfigurableField(target=SourceDeblendTask,
351  doc="Deblend sources separately in each band")
352  multiBandDeblend = ConfigurableField(target=ScarletDeblendTask,
353  doc="Deblend sources simultaneously across bands")
354  simultaneous = Field(dtype=bool, default=False, doc="Simultaneously deblend all bands?")
355  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
356  hasFakes = Field(dtype=bool,
357  default=False,
358  doc="Should be set to True if fake sources have been inserted into the input data.")
359 
360  def setDefaults(self):
361  Config.setDefaults(self)
362  self.singleBandDeblend.propagateAllPeaks = True
363 
364 
365 class DeblendCoaddSourcesRunner(MergeSourcesRunner):
366  """Task runner for the `MergeSourcesTask`
367 
368  Required because the run method requires a list of
369  dataRefs rather than a single dataRef.
370  """
371  @staticmethod
372  def getTargetList(parsedCmd, **kwargs):
373  """Provide a list of patch references for each patch, tract, filter combo.
374 
375  Parameters
376  ----------
377  parsedCmd:
378  The parsed command
379  kwargs:
380  Keyword arguments passed to the task
381 
382  Returns
383  -------
384  targetList: list
385  List of tuples, where each tuple is a (dataRef, kwargs) pair.
386  """
387  refDict = MergeSourcesRunner.buildRefDict(parsedCmd)
388  kwargs["psfCache"] = parsedCmd.psfCache
389  return [(list(p.values()), kwargs) for t in refDict.values() for p in t.values()]
390 
391 
392 class DeblendCoaddSourcesTask(CmdLineTask):
393  """Deblend the sources in a merged catalog
394 
395  Deblend sources from master catalog in each coadd.
396  This can either be done separately in each band using the HSC-SDSS deblender
397  (`DeblendCoaddSourcesTask.config.simultaneous==False`)
398  or use SCARLET to simultaneously fit the blend in all bands
399  (`DeblendCoaddSourcesTask.config.simultaneous==True`).
400  The task will set its own `self.schema` atribute to the `Schema` of the
401  output deblended catalog.
402  This will include all fields from the input `Schema`, as well as additional fields
403  from the deblender.
404 
405  `pipe.tasks.multiband.DeblendCoaddSourcesTask Description
406  ---------------------------------------------------------
407  `
408 
409  Parameters
410  ----------
411  butler: `Butler`
412  Butler used to read the input schemas from disk or
413  construct the reference catalog loader, if `schema` or `peakSchema` or
414  schema: `Schema`
415  The schema of the merged detection catalog as an input to this task.
416  peakSchema: `Schema`
417  The schema of the `PeakRecord`s in the `Footprint`s in the merged detection catalog
418  """
419  ConfigClass = DeblendCoaddSourcesConfig
420  RunnerClass = DeblendCoaddSourcesRunner
421  _DefaultName = "deblendCoaddSources"
422  makeIdFactory = _makeMakeIdFactory("MergedCoaddId")
423 
424  @classmethod
425  def _makeArgumentParser(cls):
426  parser = ArgumentParser(name=cls._DefaultName)
427  parser.add_id_argument("--id", "deepCoadd_calexp",
428  help="data ID, e.g. --id tract=12345 patch=1,2 filter=g^r^i",
429  ContainerClass=ExistingCoaddDataIdContainer)
430  parser.add_argument("--psfCache", type=int, default=100, help="Size of CoaddPsf cache")
431  return parser
432 
433  def __init__(self, butler=None, schema=None, peakSchema=None, **kwargs):
434  CmdLineTask.__init__(self, **kwargs)
435  if schema is None:
436  assert butler is not None, "Neither butler nor schema is defined"
437  schema = butler.get(self.config.coaddName + "Coadd_mergeDet_schema", immediate=True).schema
438  self.schemaMapper = afwTable.SchemaMapper(schema)
439  self.schemaMapper.addMinimalSchema(schema)
440  self.schema = self.schemaMapper.getOutputSchema()
441  if peakSchema is None:
442  assert butler is not None, "Neither butler nor peakSchema is defined"
443  peakSchema = butler.get(self.config.coaddName + "Coadd_peak_schema", immediate=True).schema
444 
445  if self.config.simultaneous:
446  self.makeSubtask("multiBandDeblend", schema=self.schema, peakSchema=peakSchema)
447  else:
448  self.makeSubtask("singleBandDeblend", schema=self.schema, peakSchema=peakSchema)
449 
450  def getSchemaCatalogs(self):
451  """Return a dict of empty catalogs for each catalog dataset produced by this task.
452 
453  Returns
454  -------
455  result: dict
456  Dictionary of empty catalogs, with catalog names as keys.
457  """
458  catalog = afwTable.SourceCatalog(self.schema)
459  return {self.config.coaddName + "Coadd_deblendedFlux": catalog,
460  self.config.coaddName + "Coadd_deblendedModel": catalog}
461 
462  def runDataRef(self, patchRefList, psfCache=100):
463  """Deblend the patch
464 
465  Deblend each source simultaneously or separately
466  (depending on `DeblendCoaddSourcesTask.config.simultaneous`).
467  Set `is-primary` and related flags.
468  Propagate flags from individual visits.
469  Write the deblended sources out.
470 
471  Parameters
472  ----------
473  patchRefList: list
474  List of data references for each filter
475  """
476 
477  if self.config.hasFakes:
478  coaddType = "fakes_" + self.config.coaddName
479  else:
480  coaddType = self.config.coaddName
481 
482  if self.config.simultaneous:
483  # Use SCARLET to simultaneously deblend across filters
484  filters = []
485  exposures = []
486  for patchRef in patchRefList:
487  exposure = patchRef.get(coaddType + "Coadd_calexp", immediate=True)
488  filters.append(patchRef.dataId["filter"])
489  exposures.append(exposure)
490  # The input sources are the same for all bands, since it is a merged catalog
491  sources = self.readSources(patchRef)
492  exposure = afwImage.MultibandExposure.fromExposures(filters, exposures)
493  fluxCatalogs, templateCatalogs = self.multiBandDeblend.run(exposure, sources)
494  for n in range(len(patchRefList)):
495  fluxCat = fluxCatalogs if fluxCatalogs is None else fluxCatalogs[filters[n]]
496  self.write(patchRefList[n], fluxCat, templateCatalogs[filters[n]])
497  else:
498  # Use the singeband deblender to deblend each band separately
499  for patchRef in patchRefList:
500  exposure = patchRef.get(coaddType + "Coadd_calexp", immediate=True)
501  exposure.getPsf().setCacheCapacity(psfCache)
502  sources = self.readSources(patchRef)
503  self.singleBandDeblend.run(exposure, sources)
504  self.write(patchRef, sources)
505 
506  def readSources(self, dataRef):
507  """Read merged catalog
508 
509  Read the catalog of merged detections and create a catalog
510  in a single band.
511 
512  Parameters
513  ----------
514  dataRef: data reference
515  Data reference for catalog of merged detections
516 
517  Returns
518  -------
519  sources: `SourceCatalog`
520  List of sources in merged catalog
521 
522  We also need to add columns to hold the measurements we're about to make
523  so we can measure in-place.
524  """
525  merged = dataRef.get(self.config.coaddName + "Coadd_mergeDet", immediate=True)
526  self.log.info("Read %d detections: %s" % (len(merged), dataRef.dataId))
527  idFactory = self.makeIdFactory(dataRef)
528  for s in merged:
529  idFactory.notify(s.getId())
530  table = afwTable.SourceTable.make(self.schema, idFactory)
531  sources = afwTable.SourceCatalog(table)
532  sources.extend(merged, self.schemaMapper)
533  return sources
534 
535  def write(self, dataRef, flux_sources, template_sources=None):
536  """Write the source catalog(s)
537 
538  Parameters
539  ----------
540  dataRef: Data Reference
541  Reference to the output catalog.
542  flux_sources: `SourceCatalog`
543  Flux conserved sources to write to file.
544  If using the single band deblender, this is the catalog
545  generated.
546  template_sources: `SourceCatalog`
547  Source catalog using the multiband template models
548  as footprints.
549  """
550  # The multiband deblender does not have to conserve flux,
551  # so only write the flux conserved catalog if it exists
552  if flux_sources is not None:
553  assert not self.config.simultaneous or self.config.multiBandDeblend.conserveFlux
554  dataRef.put(flux_sources, self.config.coaddName + "Coadd_deblendedFlux")
555  self.log.info("Wrote %d sources: %s" % (len(flux_sources), dataRef.dataId))
556  # Only the multiband deblender has the option to output the
557  # template model catalog, which can optionally be used
558  # in MeasureMergedCoaddSources
559  if template_sources is not None:
560  assert self.config.multiBandDeblend.saveTemplates
561  dataRef.put(template_sources, self.config.coaddName + "Coadd_deblendedModel")
562  self.log.info("Wrote %d sources: %s" % (len(template_sources), dataRef.dataId))
563 
564  def writeMetadata(self, dataRefList):
565  """Write the metadata produced from processing the data.
566  Parameters
567  ----------
568  dataRefList
569  List of Butler data references used to write the metadata.
570  The metadata is written to dataset type `CmdLineTask._getMetadataName`.
571  """
572  for dataRef in dataRefList:
573  try:
574  metadataName = self._getMetadataName()
575  if metadataName is not None:
576  dataRef.put(self.getFullMetadata(), metadataName)
577  except Exception as e:
578  self.log.warn("Could not persist metadata for dataId=%s: %s", dataRef.dataId, e)
579 
580  def getExposureId(self, dataRef):
581  """Get the ExposureId from a data reference
582  """
583  return int(dataRef.get(self.config.coaddName + "CoaddId"))
584 
585 
586 class MeasureMergedCoaddSourcesConnections(PipelineTaskConnections, dimensions=("tract", "patch", "abstract_filter", "skymap"),
587  defaultTemplates={"inputCoaddName": "deep",
588  "outputCoaddName": "deep"}):
589  inputSchema = cT.InitInput(
590  doc="Input schema for measure merged task produced by a deblender or detection task",
591  name="{inputCoaddName}Coadd_deblendedFlux_schema",
592  storageClass="SourceCatalog"
593  )
594  outputSchema = cT.InitOutput(
595  doc="Output schema after all new fields are added by task",
596  name="{inputCoaddName}Coadd_meas_schema",
597  storageClass="SourceCatalog"
598  )
599  refCat = cT.PrerequisiteInput(
600  doc="Reference catalog used to match measured sources against known sources",
601  name="ref_cat",
602  storageClass="SimpleCatalog",
603  dimensions=("skypix",),
604  deferLoad=True,
605  multiple=True
606  )
607  exposure = cT.Input(
608  doc="Input coadd image",
609  name="{inputCoaddName}Coadd_calexp",
610  storageClass="ExposureF",
611  dimensions=("tract", "patch", "abstract_filter", "skymap")
612  )
613  skyMap = cT.Input(
614  doc="SkyMap to use in processing",
615  name="{inputCoaddName}Coadd_skyMap",
616  storageClass="SkyMap",
617  dimensions=("skymap",),
618  )
619  visitCatalogs = cT.Input(
620  doc="Source catalogs for visits which overlap input tract, patch, abstract_filter. Will be "
621  "further filtered in the task for the purpose of propagating flags from image calibration "
622  "and characterization to codd objects",
623  name="src",
624  dimensions=("instrument", "visit", "detector"),
625  storageClass="SourceCatalog",
626  multiple=True
627  )
628  inputCatalog = cT.Input(
629  doc=("Name of the input catalog to use."
630  "If the single band deblender was used this should be 'deblendedFlux."
631  "If the multi-band deblender was used this should be 'deblendedModel, "
632  "or deblendedFlux if the multiband deblender was configured to output "
633  "deblended flux catalogs. If no deblending was performed this should "
634  "be 'mergeDet'"),
635  name="{inputCoaddName}Coadd_deblendedFlux",
636  storageClass="SourceCatalog",
637  dimensions=("tract", "patch", "abstract_filter", "skymap"),
638  )
639  outputSources = cT.Output(
640  doc="Source catalog containing all the measurement information generated in this task",
641  name="{outputCoaddName}Coadd_meas",
642  dimensions=("tract", "patch", "abstract_filter", "skymap"),
643  storageClass="SourceCatalog",
644  )
645  matchResult = cT.Output(
646  doc="Match catalog produced by configured matcher, optional on doMatchSources",
647  name="{outputCoaddName}Coadd_measMatch",
648  dimensions=("tract", "patch", "abstract_filter", "skymap"),
649  storageClass="Catalog",
650  )
651  denormMatches = cT.Output(
652  doc="Denormalized Match catalog produced by configured matcher, optional on "
653  "doWriteMatchesDenormalized",
654  name="{outputCoaddName}Coadd_measMatchFull",
655  dimensions=("tract", "patch", "abstract_filter", "skymap"),
656  storageClass="Catalog",
657  )
658 
659  def __init__(self, *, config=None):
660  super().__init__(config=config)
661  if config.doPropagateFlags is False:
662  self.inputs -= set(("visitCatalogs",))
663 
664  if config.doMatchSources is False:
665  self.outputs -= set(("matchResult",))
666 
667  if config.doWriteMatchesDenormalized is False:
668  self.outputs -= set(("denormMatches",))
669 
670 
671 class MeasureMergedCoaddSourcesConfig(PipelineTaskConfig,
672  pipelineConnections=MeasureMergedCoaddSourcesConnections):
673  """!
674  @anchor MeasureMergedCoaddSourcesConfig_
675 
676  @brief Configuration parameters for the MeasureMergedCoaddSourcesTask
677  """
678  inputCatalog = Field(dtype=str, default="deblendedFlux",
679  doc=("Name of the input catalog to use."
680  "If the single band deblender was used this should be 'deblendedFlux."
681  "If the multi-band deblender was used this should be 'deblendedModel."
682  "If no deblending was performed this should be 'mergeDet'"))
683  measurement = ConfigurableField(target=SingleFrameMeasurementTask, doc="Source measurement")
684  setPrimaryFlags = ConfigurableField(target=SetPrimaryFlagsTask, doc="Set flags for primary tract/patch")
685  doPropagateFlags = Field(
686  dtype=bool, default=True,
687  doc="Whether to match sources to CCD catalogs to propagate flags (to e.g. identify PSF stars)"
688  )
689  propagateFlags = ConfigurableField(target=PropagateVisitFlagsTask, doc="Propagate visit flags to coadd")
690  doMatchSources = Field(dtype=bool, default=True, doc="Match sources to reference catalog?")
691  match = ConfigurableField(target=DirectMatchTask, doc="Matching to reference catalog")
692  doWriteMatchesDenormalized = Field(
693  dtype=bool,
694  default=False,
695  doc=("Write reference matches in denormalized format? "
696  "This format uses more disk space, but is more convenient to read."),
697  )
698  coaddName = Field(dtype=str, default="deep", doc="Name of coadd")
699  psfCache = Field(dtype=int, default=100, doc="Size of psfCache")
700  checkUnitsParseStrict = Field(
701  doc="Strictness of Astropy unit compatibility check, can be 'raise', 'warn' or 'silent'",
702  dtype=str,
703  default="raise",
704  )
705  doApCorr = Field(
706  dtype=bool,
707  default=True,
708  doc="Apply aperture corrections"
709  )
710  applyApCorr = ConfigurableField(
711  target=ApplyApCorrTask,
712  doc="Subtask to apply aperture corrections"
713  )
714  doRunCatalogCalculation = Field(
715  dtype=bool,
716  default=True,
717  doc='Run catalogCalculation task'
718  )
719  catalogCalculation = ConfigurableField(
720  target=CatalogCalculationTask,
721  doc="Subtask to run catalogCalculation plugins on catalog"
722  )
723 
724  hasFakes = Field(
725  dtype=bool,
726  default=False,
727  doc="Should be set to True if fake sources have been inserted into the input data."
728  )
729 
730  @property
731  def refObjLoader(self):
732  return self.match.refObjLoader
733 
734  def setDefaults(self):
735  super().setDefaults()
736  self.measurement.plugins.names |= ['base_InputCount', 'base_Variance']
737  self.measurement.plugins['base_PixelFlags'].masksFpAnywhere = ['CLIPPED', 'SENSOR_EDGE',
738  'INEXACT_PSF']
739  self.measurement.plugins['base_PixelFlags'].masksFpCenter = ['CLIPPED', 'SENSOR_EDGE',
740  'INEXACT_PSF']
741 
742  def validate(self):
743  super().validate()
744  refCatGen2 = getattr(self.refObjLoader, "ref_dataset_name", None)
745  if refCatGen2 is not None and refCatGen2 != self.connections.refCat:
746  raise ValueError(
747  f"Gen2 ({refCatGen2}) and Gen3 ({self.connections.refCat}) reference catalogs "
748  f"are different. These options must be kept in sync until Gen2 is retired."
749  )
750 
751 
752 
758 
759 
760 class MeasureMergedCoaddSourcesRunner(ButlerInitializedTaskRunner):
761  """Get the psfCache setting into MeasureMergedCoaddSourcesTask"""
762  @staticmethod
763  def getTargetList(parsedCmd, **kwargs):
764  return ButlerInitializedTaskRunner.getTargetList(parsedCmd, psfCache=parsedCmd.psfCache)
765 
766 
767 class MeasureMergedCoaddSourcesTask(PipelineTask, CmdLineTask):
768  r"""!
769  @anchor MeasureMergedCoaddSourcesTask_
770 
771  @brief Deblend sources from master catalog in each coadd seperately and measure.
772 
773  @section pipe_tasks_multiBand_Contents Contents
774 
775  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose
776  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize
777  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run
778  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config
779  - @ref pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug
780  - @ref pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example
781 
782  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Purpose Description
783 
784  Command-line task that uses peaks and footprints from a master catalog to perform deblending and
785  measurement in each coadd.
786 
787  Given a master input catalog of sources (peaks and footprints) or deblender outputs
788  (including a HeavyFootprint in each band), measure each source on the
789  coadd. Repeating this procedure with the same master catalog across multiple coadds will generate a
790  consistent set of child sources.
791 
792  The deblender retains all peaks and deblends any missing peaks (dropouts in that band) as PSFs. Source
793  properties are measured and the @c is-primary flag (indicating sources with no children) is set. Visit
794  flags are propagated to the coadd sources.
795 
796  Optionally, we can match the coadd sources to an external reference catalog.
797 
798  @par Inputs:
799  deepCoadd_mergeDet{tract,patch} or deepCoadd_deblend{tract,patch}: SourceCatalog
800  @n deepCoadd_calexp{tract,patch,filter}: ExposureF
801  @par Outputs:
802  deepCoadd_meas{tract,patch,filter}: SourceCatalog
803  @par Data Unit:
804  tract, patch, filter
805 
806  MeasureMergedCoaddSourcesTask delegates most of its work to a set of sub-tasks:
807 
808  <DL>
809  <DT> @ref SingleFrameMeasurementTask_ "measurement"
810  <DD> Measure source properties of deblended sources.</DD>
811  <DT> @ref SetPrimaryFlagsTask_ "setPrimaryFlags"
812  <DD> Set flag 'is-primary' as well as related flags on sources. 'is-primary' is set for sources that are
813  not at the edge of the field and that have either not been deblended or are the children of deblended
814  sources</DD>
815  <DT> @ref PropagateVisitFlagsTask_ "propagateFlags"
816  <DD> Propagate flags set in individual visits to the coadd.</DD>
817  <DT> @ref DirectMatchTask_ "match"
818  <DD> Match input sources to a reference catalog (optional).
819  </DD>
820  </DL>
821  These subtasks may be retargeted as required.
822 
823  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Initialize Task initialization
824 
825  @copydoc \_\_init\_\_
826 
827  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Run Invoking the Task
828 
829  @copydoc run
830 
831  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Config Configuration parameters
832 
833  See @ref MeasureMergedCoaddSourcesConfig_
834 
835  @section pipe_tasks_multiBand_MeasureMergedCoaddSourcesTask_Debug Debug variables
836 
837  The @link lsst.pipe.base.cmdLineTask.CmdLineTask command line task@endlink interface supports a
838  flag @c -d to import @b debug.py from your @c PYTHONPATH; see @ref baseDebug for more about @b debug.py
839  files.
840 
841  MeasureMergedCoaddSourcesTask has no debug variables of its own because it delegates all the work to
842  the various sub-tasks. See the documetation for individual sub-tasks for more information.
843 
844  @section pipe_tasks_multiband_MeasureMergedCoaddSourcesTask_Example A complete example of using
845  MeasureMergedCoaddSourcesTask
846 
847  After MeasureMergedCoaddSourcesTask has been run on multiple coadds, we have a set of per-band catalogs.
848  The next stage in the multi-band processing procedure will merge these measurements into a suitable
849  catalog for driving forced photometry.
850 
851  Command-line usage of MeasureMergedCoaddSourcesTask expects a data reference to the coadds
852  to be processed.
853  A list of the available optional arguments can be obtained by calling measureCoaddSources.py with the
854  `--help` command line argument:
855  @code
856  measureCoaddSources.py --help
857  @endcode
858 
859  To demonstrate usage of the DetectCoaddSourcesTask in the larger context of multi-band processing, we
860  will process HSC data in the [ci_hsc](https://github.com/lsst/ci_hsc) package. Assuming one has finished
861  step 6 at @ref pipeTasks_multiBand, one may perform deblending and measure sources in the HSC-I band
862  coadd as follows:
863  @code
864  measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-I
865  @endcode
866  This will process the HSC-I band data. The results are written in
867  `$CI_HSC_DIR/DATA/deepCoadd-results/HSC-I/0/5,4/meas-HSC-I-0-5,4.fits
868 
869  It is also necessary to run
870  @code
871  measureCoaddSources.py $CI_HSC_DIR/DATA --id patch=5,4 tract=0 filter=HSC-R
872  @endcode
873  to generate the sources catalogs for the HSC-R band required by the next step in the multi-band
874  procedure: @ref MergeMeasurementsTask_ "MergeMeasurementsTask".
875  """
876  _DefaultName = "measureCoaddSources"
877  ConfigClass = MeasureMergedCoaddSourcesConfig
878  RunnerClass = MeasureMergedCoaddSourcesRunner
879  getSchemaCatalogs = _makeGetSchemaCatalogs("meas")
880  makeIdFactory = _makeMakeIdFactory("MergedCoaddId") # The IDs we already have are of this type
881 
882  @classmethod
883  def _makeArgumentParser(cls):
884  parser = ArgumentParser(name=cls._DefaultName)
885  parser.add_id_argument("--id", "deepCoadd_calexp",
886  help="data ID, e.g. --id tract=12345 patch=1,2 filter=r",
887  ContainerClass=ExistingCoaddDataIdContainer)
888  parser.add_argument("--psfCache", type=int, default=100, help="Size of CoaddPsf cache")
889  return parser
890 
891  def __init__(self, butler=None, schema=None, peakSchema=None, refObjLoader=None, initInputs=None,
892  **kwargs):
893  """!
894  @brief Initialize the task.
895 
896  Keyword arguments (in addition to those forwarded to CmdLineTask.__init__):
897  @param[in] schema: the schema of the merged detection catalog used as input to this one
898  @param[in] peakSchema: the schema of the PeakRecords in the Footprints in the merged detection catalog
899  @param[in] refObjLoader: an instance of LoadReferenceObjectsTasks that supplies an external reference
900  catalog. May be None if the loader can be constructed from the butler argument or all steps
901  requiring a reference catalog are disabled.
902  @param[in] butler: a butler used to read the input schemas from disk or construct the reference
903  catalog loader, if schema or peakSchema or refObjLoader is None
904 
905  The task will set its own self.schema attribute to the schema of the output measurement catalog.
906  This will include all fields from the input schema, as well as additional fields for all the
907  measurements.
908  """
909  super().__init__(**kwargs)
910  self.deblended = self.config.inputCatalog.startswith("deblended")
911  self.inputCatalog = "Coadd_" + self.config.inputCatalog
912  if initInputs is not None:
913  schema = initInputs['inputSchema'].schema
914  if schema is None:
915  assert butler is not None, "Neither butler nor schema is defined"
916  schema = butler.get(self.config.coaddName + self.inputCatalog + "_schema", immediate=True).schema
917  self.schemaMapper = afwTable.SchemaMapper(schema)
918  self.schemaMapper.addMinimalSchema(schema)
919  self.schema = self.schemaMapper.getOutputSchema()
920  self.algMetadata = PropertyList()
921  self.makeSubtask("measurement", schema=self.schema, algMetadata=self.algMetadata)
922  self.makeSubtask("setPrimaryFlags", schema=self.schema)
923  if self.config.doMatchSources:
924  self.makeSubtask("match", butler=butler, refObjLoader=refObjLoader)
925  if self.config.doPropagateFlags:
926  self.makeSubtask("propagateFlags", schema=self.schema)
927  self.schema.checkUnits(parse_strict=self.config.checkUnitsParseStrict)
928  if self.config.doApCorr:
929  self.makeSubtask("applyApCorr", schema=self.schema)
930  if self.config.doRunCatalogCalculation:
931  self.makeSubtask("catalogCalculation", schema=self.schema)
932 
933  self.outputSchema = afwTable.SourceCatalog(self.schema)
934 
935  def runQuantum(self, butlerQC, inputRefs, outputRefs):
936  inputs = butlerQC.get(inputRefs)
937 
938  refObjLoader = ReferenceObjectLoader([ref.datasetRef.dataId for ref in inputRefs.refCat],
939  inputs.pop('refCat'), config=self.config.refObjLoader,
940  log=self.log)
941  self.match.setRefObjLoader(refObjLoader)
942 
943  # Set psfcache
944  # move this to run after gen2 deprecation
945  inputs['exposure'].getPsf().setCacheCapacity(self.config.psfCache)
946 
947  # Get unique integer ID for IdFactory and RNG seeds
948  packedId, maxBits = butlerQC.quantum.dataId.pack("tract_patch", returnMaxBits=True)
949  inputs['exposureId'] = packedId
950  idFactory = afwTable.IdFactory.makeSource(packedId, 64 - maxBits)
951  # Transform inputCatalog
952  table = afwTable.SourceTable.make(self.schema, idFactory)
953  sources = afwTable.SourceCatalog(table)
954  sources.extend(inputs.pop('inputCatalog'), self.schemaMapper)
955  table = sources.getTable()
956  table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog.
957  inputs['sources'] = sources
958 
959  skyMap = inputs.pop('skyMap')
960  tractNumber = inputRefs.inputCatalog.dataId['tract']
961  tractInfo = skyMap[tractNumber]
962  patchInfo = tractInfo.getPatchInfo(inputRefs.inputCatalog.dataId['patch'])
963  skyInfo = Struct(
964  skyMap=skyMap,
965  tractInfo=tractInfo,
966  patchInfo=patchInfo,
967  wcs=tractInfo.getWcs(),
968  bbox=patchInfo.getOuterBBox()
969  )
970  inputs['skyInfo'] = skyInfo
971 
972  if self.config.doPropagateFlags:
973  # Filter out any visit catalog that is not coadd inputs
974  ccdInputs = inputs['exposure'].getInfo().getCoaddInputs().ccds
975  visitKey = ccdInputs.schema.find("visit").key
976  ccdKey = ccdInputs.schema.find("ccd").key
977  inputVisitIds = set()
978  ccdRecordsWcs = {}
979  for ccdRecord in ccdInputs:
980  visit = ccdRecord.get(visitKey)
981  ccd = ccdRecord.get(ccdKey)
982  inputVisitIds.add((visit, ccd))
983  ccdRecordsWcs[(visit, ccd)] = ccdRecord.getWcs()
984 
985  inputCatalogsToKeep = []
986  inputCatalogWcsUpdate = []
987  for i, dataRef in enumerate(inputRefs.visitCatalogs):
988  key = (dataRef.dataId['visit'], dataRef.dataId['detector'])
989  if key in inputVisitIds:
990  inputCatalogsToKeep.append(inputs['visitCatalogs'][i])
991  inputCatalogWcsUpdate.append(ccdRecordsWcs[key])
992  inputs['visitCatalogs'] = inputCatalogsToKeep
993  inputs['wcsUpdates'] = inputCatalogWcsUpdate
994  inputs['ccdInputs'] = ccdInputs
995 
996  outputs = self.run(**inputs)
997  butlerQC.put(outputs, outputRefs)
998 
999  def runDataRef(self, patchRef, psfCache=100):
1000  """!
1001  @brief Deblend and measure.
1002 
1003  @param[in] patchRef: Patch reference.
1004 
1005  Set 'is-primary' and related flags. Propagate flags
1006  from individual visits. Optionally match the sources to a reference catalog and write the matches.
1007  Finally, write the deblended sources and measurements out.
1008  """
1009  if self.config.hasFakes:
1010  coaddType = "fakes_" + self.config.coaddName
1011  else:
1012  coaddType = self.config.coaddName
1013  exposure = patchRef.get(coaddType + "Coadd_calexp", immediate=True)
1014  exposure.getPsf().setCacheCapacity(psfCache)
1015  sources = self.readSources(patchRef)
1016  table = sources.getTable()
1017  table.setMetadata(self.algMetadata) # Capture algorithm metadata to write out to the source catalog.
1018  skyInfo = getSkyInfo(coaddName=self.config.coaddName, patchRef=patchRef)
1019 
1020  if self.config.doPropagateFlags:
1021  ccdInputs = self.propagateFlags.getCcdInputs(exposure)
1022  else:
1023  ccdInputs = None
1024 
1025  results = self.run(exposure=exposure, sources=sources,
1026  ccdInputs=ccdInputs,
1027  skyInfo=skyInfo, butler=patchRef.getButler(),
1028  exposureId=self.getExposureId(patchRef))
1029 
1030  if self.config.doMatchSources:
1031  self.writeMatches(patchRef, results)
1032  self.write(patchRef, results.outputSources)
1033 
1034  def run(self, exposure, sources, skyInfo, exposureId, ccdInputs=None, visitCatalogs=None, wcsUpdates=None,
1035  butler=None):
1036  """Run measurement algorithms on the input exposure, and optionally populate the
1037  resulting catalog with extra information.
1038 
1039  Parameters
1040  ----------
1041  exposure : `lsst.afw.exposure.Exposure`
1042  The input exposure on which measurements are to be performed
1043  sources : `lsst.afw.table.SourceCatalog`
1044  A catalog built from the results of merged detections, or
1045  deblender outputs.
1046  skyInfo : `lsst.pipe.base.Struct`
1047  A struct containing information about the position of the input exposure within
1048  a `SkyMap`, the `SkyMap`, its `Wcs`, and its bounding box
1049  exposureId : `int` or `bytes`
1050  packed unique number or bytes unique to the input exposure
1051  ccdInputs : `lsst.afw.table.ExposureCatalog`
1052  Catalog containing information on the individual visits which went into making
1053  the exposure
1054  visitCatalogs : list of `lsst.afw.table.SourceCatalogs` or `None`
1055  A list of source catalogs corresponding to measurements made on the individual
1056  visits which went into the input exposure. If None and butler is `None` then
1057  the task cannot propagate visit flags to the output catalog.
1058  wcsUpdates : list of `lsst.afw.geom.SkyWcs` or `None`
1059  If visitCatalogs is not `None` this should be a list of wcs objects which correspond
1060  to the input visits. Used to put all coordinates to common system. If `None` and
1061  butler is `None` then the task cannot propagate visit flags to the output catalog.
1062  butler : `lsst.daf.butler.Butler` or `lsst.daf.persistence.Butler`
1063  Either a gen2 or gen3 butler used to load visit catalogs
1064 
1065  Returns
1066  -------
1067  results : `lsst.pipe.base.Struct`
1068  Results of running measurement task. Will contain the catalog in the
1069  sources attribute. Optionally will have results of matching to a
1070  reference catalog in the matchResults attribute, and denormalized
1071  matches in the denormMatches attribute.
1072  """
1073  self.measurement.run(sources, exposure, exposureId=exposureId)
1074 
1075  if self.config.doApCorr:
1076  self.applyApCorr.run(
1077  catalog=sources,
1078  apCorrMap=exposure.getInfo().getApCorrMap()
1079  )
1080 
1081  # TODO DM-11568: this contiguous check-and-copy could go away if we
1082  # reserve enough space during SourceDetection and/or SourceDeblend.
1083  # NOTE: sourceSelectors require contiguous catalogs, so ensure
1084  # contiguity now, so views are preserved from here on.
1085  if not sources.isContiguous():
1086  sources = sources.copy(deep=True)
1087 
1088  if self.config.doRunCatalogCalculation:
1089  self.catalogCalculation.run(sources)
1090 
1091  self.setPrimaryFlags.run(sources, skyInfo.skyMap, skyInfo.tractInfo, skyInfo.patchInfo,
1092  includeDeblend=self.deblended)
1093  if self.config.doPropagateFlags:
1094  self.propagateFlags.run(butler, sources, ccdInputs, exposure.getWcs(), visitCatalogs, wcsUpdates)
1095 
1096  results = Struct()
1097 
1098  if self.config.doMatchSources:
1099  matchResult = self.match.run(sources, exposure.getInfo().getFilter().getName())
1100  matches = afwTable.packMatches(matchResult.matches)
1101  matches.table.setMetadata(matchResult.matchMeta)
1102  results.matchResult = matches
1103  if self.config.doWriteMatchesDenormalized:
1104  if matchResult.matches:
1105  denormMatches = denormalizeMatches(matchResult.matches, matchResult.matchMeta)
1106  else:
1107  self.log.warn("No matches, so generating dummy denormalized matches file")
1108  denormMatches = afwTable.BaseCatalog(afwTable.Schema())
1109  denormMatches.setMetadata(PropertyList())
1110  denormMatches.getMetadata().add("COMMENT",
1111  "This catalog is empty because no matches were found.")
1112  results.denormMatches = denormMatches
1113  results.denormMatches = denormMatches
1114 
1115  results.outputSources = sources
1116  return results
1117 
1118  def readSources(self, dataRef):
1119  """!
1120  @brief Read input sources.
1121 
1122  @param[in] dataRef: Data reference for catalog of merged detections
1123  @return List of sources in merged catalog
1124 
1125  We also need to add columns to hold the measurements we're about to make
1126  so we can measure in-place.
1127  """
1128  merged = dataRef.get(self.config.coaddName + self.inputCatalog, immediate=True)
1129  self.log.info("Read %d detections: %s" % (len(merged), dataRef.dataId))
1130  idFactory = self.makeIdFactory(dataRef)
1131  for s in merged:
1132  idFactory.notify(s.getId())
1133  table = afwTable.SourceTable.make(self.schema, idFactory)
1134  sources = afwTable.SourceCatalog(table)
1135  sources.extend(merged, self.schemaMapper)
1136  return sources
1137 
1138  def writeMatches(self, dataRef, results):
1139  """!
1140  @brief Write matches of the sources to the astrometric reference catalog.
1141 
1142  @param[in] dataRef: data reference
1143  @param[in] results: results struct from run method
1144  """
1145  if hasattr(results, "matchResult"):
1146  dataRef.put(results.matchResult, self.config.coaddName + "Coadd_measMatch")
1147  if hasattr(results, "denormMatches"):
1148  dataRef.put(results.denormMatches, self.config.coaddName + "Coadd_measMatchFull")
1149 
1150  def write(self, dataRef, sources):
1151  """!
1152  @brief Write the source catalog.
1153 
1154  @param[in] dataRef: data reference
1155  @param[in] sources: source catalog
1156  """
1157  dataRef.put(sources, self.config.coaddName + "Coadd_meas")
1158  self.log.info("Wrote %d sources: %s" % (len(sources), dataRef.dataId))
1159 
1160  def getExposureId(self, dataRef):
1161  return int(dataRef.get(self.config.coaddName + "CoaddId"))
1162 
def write(self, patchRef, catalog)
Write the output.
Defines the fields and offsets for a table.
Definition: Schema.h:50
Class for storing ordered metadata with comments.
Definition: PropertyList.h:68
A mapping between the keys of two Schemas, used to copy data between them.
Definition: SchemaMapper.h:21
def denormalizeMatches(matches, matchMeta=None)
Fit spatial kernel using approximate fluxes for candidates, and solving a linear system of equations...
daf::base::PropertySet * set
Definition: fits.cc:902
template BaseCatalog packMatches(SourceMatchVector const &)
def __init__(self, minimum, dataRange, Q)
def run(self, skyInfo, tempExpRefList, imageScalerList, weightList, altMaskList=None, mask=None, supplementaryData=None)
def getSkyInfo(coaddName, patchRef)
Return the SkyMap, tract and patch information, wcs, and outer bbox of the patch to be coadded...
Definition: coaddBase.py:231
Backwards-compatibility support for depersisting the old Calib (FluxMag0/FluxMag0Err) objects...
def writeMetadata(self, dataRefList)
No metadata to write, and not sure how to write it for a list of dataRefs.
daf::base::PropertyList * list
Definition: fits.cc:903