26 "GroupExposuresConfig",
28 "VisitDefinitionData",
31 from abc
import ABCMeta, abstractmethod
32 from collections
import defaultdict
34 from typing
import Any, Dict, Iterable, List, Optional, Tuple
35 from multiprocessing
import Pool
37 from lsst.daf.butler
import (
46 from lsst.pex.config
import Config, Field, makeRegistry, registerConfigurable
49 from lsst.sphgeom import ConvexPolygon, Region, UnitVector3d
50 from ._instrument
import loadCamera
53 @dataclasses.dataclass
55 """Struct representing a group of exposures that will be used to define a
60 """Name of the instrument this visit will be associated with.
64 """Integer ID of the visit.
66 This must be unique across all visit systems for the instrument.
70 """String name for the visit.
72 This must be unique across all visit systems for the instrument.
75 exposures: List[DimensionRecord] = dataclasses.field(default_factory=list)
76 """Dimension records for the exposures that are part of this visit.
80 @dataclasses.dataclass
82 """Struct containing the dimension records associated with a visit.
85 visit: DimensionRecord
86 """Record for the 'visit' dimension itself.
89 visit_definition: List[DimensionRecord]
90 """Records for 'visit_definition', which relates 'visit' to 'exposure'.
93 visit_detector_region: List[DimensionRecord]
94 """Records for 'visit_detector_region', which associates the combination
95 of a 'visit' and a 'detector' with a region on the sky.
103 class GroupExposuresTask(Task, metaclass=ABCMeta):
104 """Abstract base class for the subtask of `DefineVisitsTask` that is
105 responsible for grouping exposures into visits.
107 Subclasses should be registered with `GroupExposuresTask.registry` to
108 enable use by `DefineVisitsTask`, and should generally correspond to a
109 particular 'visit_system' dimension value. They are also responsible for
110 defining visit IDs and names that are unique across all visit systems in
111 use by an instrument.
115 config : `GroupExposuresConfig`
116 Configuration information.
118 Additional keyword arguments forwarded to the `Task` constructor.
120 def __init__(self, config: GroupExposuresConfig, **kwargs: Any):
121 Task.__init__(self, config=config, **kwargs)
123 ConfigClass = GroupExposuresConfig
125 _DefaultName =
"groupExposures"
128 doc=
"Registry of algorithms for grouping exposures into visits.",
129 configBaseType=GroupExposuresConfig,
133 def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
134 """Group the given exposures into visits.
138 exposures : `list` [ `DimensionRecord` ]
139 DimensionRecords (for the 'exposure' dimension) describing the
144 visits : `Iterable` [ `VisitDefinitionData` ]
145 Structs identifying the visits and the exposures associated with
146 them. This may be an iterator or a container.
148 raise NotImplementedError()
152 """Return identifiers for the 'visit_system' dimension this
153 algorithm implements.
158 Integer ID for the visit system (given an instrument).
160 Unique string identifier for the visit system (given an
163 raise NotImplementedError()
170 doc=(
"Pad raw image bounding boxes with specified number of pixels "
171 "when calculating their (conservatively large) region on the "
177 """Abstract base class for the subtask of `DefineVisitsTask` that is
178 responsible for extracting spatial regions for visits and visit+detector
181 Subclasses should be registered with `ComputeVisitRegionsTask.registry` to
182 enable use by `DefineVisitsTask`.
186 config : `ComputeVisitRegionsConfig`
187 Configuration information.
189 Additional keyword arguments forwarded to the `Task` constructor.
191 def __init__(self, config: ComputeVisitRegionsConfig, *, butler: Butler, **kwargs: Any):
192 Task.__init__(self, config=config, **kwargs)
195 ConfigClass = ComputeVisitRegionsConfig
197 _DefaultName =
"computeVisitRegions"
200 doc=(
"Registry of algorithms for computing on-sky regions for visits "
201 "and visit+detector combinations."),
202 configBaseType=ComputeVisitRegionsConfig,
206 def compute(self, visit: VisitDefinitionData, *, collections: Any =
None
207 ) -> Tuple[Region, Dict[int, Region]]:
208 """Compute regions for the given visit and all detectors in that visit.
212 visit : `VisitDefinitionData`
213 Struct describing the visit and the exposures associated with it.
214 collections : Any, optional
215 Collections to be searched for raws and camera geometry, overriding
216 ``self.butler.collections``.
217 Can be any of the types supported by the ``collections`` argument
218 to butler construction.
222 visitRegion : `lsst.sphgeom.Region`
223 Region for the full visit.
224 visitDetectorRegions : `dict` [ `int`, `lsst.sphgeom.Region` ]
225 Dictionary mapping detector ID to the region for that detector.
226 Should include all detectors in the visit.
228 raise NotImplementedError()
232 groupExposures = GroupExposuresTask.registry.makeField(
233 doc=
"Algorithm for grouping exposures into visits.",
234 default=
"one-to-one",
236 computeVisitRegions = ComputeVisitRegionsTask.registry.makeField(
237 doc=
"Algorithm from computing visit and visit+detector regions.",
238 default=
"single-raw-wcs",
240 ignoreNonScienceExposures = Field(
241 doc=(
"If True, silently ignore input exposures that do not have "
242 "observation_type=SCIENCE. If False, raise an exception if one "
251 """Driver Task for defining visits (and their spatial regions) in Gen3
256 config : `DefineVisitsConfig`
257 Configuration for the task.
258 butler : `~lsst.daf.butler.Butler`
259 Writeable butler instance. Will be used to read `raw.wcs` and `camera`
260 datasets and insert/sync dimension data.
262 Additional keyword arguments are forwarded to the `lsst.pipe.base.Task`
267 Each instance of `DefineVisitsTask` reads from / writes to the same Butler.
268 Each invocation of `DefineVisitsTask.run` processes an independent group of
269 exposures into one or more new vists, all belonging to the same visit
270 system and instrument.
272 The actual work of grouping exposures and computing regions is delegated
273 to pluggable subtasks (`GroupExposuresTask` and `ComputeVisitRegionsTask`),
274 respectively. The defaults are to create one visit for every exposure,
275 and to use exactly one (arbitrary) detector-level raw dataset's WCS along
276 with camera geometry to compute regions for all detectors. Other
277 implementations can be created and configured for instruments for which
278 these choices are unsuitable (e.g. because visits and exposures are not
279 one-to-one, or because ``raw.wcs`` datasets for different detectors may not
280 be consistent with camera geomery).
282 It is not necessary in general to ingest all raws for an exposure before
283 defining a visit that includes the exposure; this depends entirely on the
284 `ComputeVisitRegionTask` subclass used. For the default configuration,
285 a single raw for each exposure is sufficient.
287 def __init__(self, config: Optional[DefineVisitsConfig] =
None, *, butler: Butler, **kwargs: Any):
295 ConfigClass = DefineVisitsConfig
297 _DefaultName =
"defineVisits"
299 def _buildVisitRecords(self, definition: VisitDefinitionData, *,
300 collections: Any =
None) -> _VisitRecords:
301 """Build the DimensionRecords associated with a visit.
305 definition : `VisitDefinition`
306 Struct with identifiers for the visit and records for its
307 constituent exposures.
308 collections : Any, optional
309 Collections to be searched for raws and camera geometry, overriding
310 ``self.butler.collections``.
311 Can be any of the types supported by the ``collections`` argument
312 to butler construction.
316 records : `_VisitRecords`
317 Struct containing DimensionRecords for the visit, including
318 associated dimension elements.
321 visitRegion, visitDetectorRegions = self.computeVisitRegions.compute(definition,
322 collections=collections)
325 begin=_reduceOrNone(min, (e.timespan.begin
for e
in definition.exposures)),
326 end=_reduceOrNone(max, (e.timespan.end
for e
in definition.exposures)),
328 exposure_time = _reduceOrNone(sum, (e.exposure_time
for e
in definition.exposures))
329 physical_filter = _reduceOrNone(
lambda a, b: a
if a == b
else None,
330 (e.physical_filter
for e
in definition.exposures))
333 visit=self.
universe[
"visit"].RecordClass.fromDict({
334 "instrument": definition.instrument,
336 "name": definition.name,
337 "physical_filter": physical_filter,
338 "visit_system": self.groupExposures.getVisitSystem()[0],
339 "exposure_time": exposure_time,
340 TIMESPAN_FIELD_SPECS.begin.name: timespan.begin,
341 TIMESPAN_FIELD_SPECS.end.name: timespan.end,
342 "region": visitRegion,
348 self.
universe[
"visit_definition"].RecordClass.fromDict({
349 "instrument": definition.instrument,
350 "visit": definition.id,
351 "exposure": exposure.id,
352 "visit_system": self.groupExposures.getVisitSystem()[0],
354 for exposure
in definition.exposures
356 visit_detector_region=[
357 self.
universe[
"visit_detector_region"].RecordClass.fromDict({
358 "instrument": definition.instrument,
359 "visit": definition.id,
360 "detector": detectorId,
361 "region": detectorRegion,
363 for detectorId, detectorRegion
in visitDetectorRegions.items()
367 def run(self, dataIds: Iterable[DataId], *,
368 pool: Optional[Pool] =
None,
370 collections: Optional[str] =
None):
371 """Add visit definitions to the registry for the given exposures.
375 dataIds : `Iterable` [ `dict` or `DataCoordinate` ]
376 Exposure-level data IDs. These must all correspond to the same
377 instrument, and are expected to be on-sky science exposures.
378 pool : `multiprocessing.Pool`, optional
379 If not `None`, a process pool with which to parallelize some
381 processes : `int`, optional
382 The number of processes to use. Ignored if ``pool`` is not `None`.
383 collections : Any, optional
384 Collections to be searched for raws and camera geometry, overriding
385 ``self.butler.collections``.
386 Can be any of the types supported by the ``collections`` argument
387 to butler construction.
390 if pool
is None and processes > 1:
391 pool = Pool(processes)
392 mapFunc = map
if pool
is None else pool.imap_unordered
394 self.
log.
info(
"Preprocessing data IDs.")
395 dimensions = DimensionGraph(self.
universe, names=[
"exposure"])
396 dataIds =
set(mapFunc(
lambda d: self.
butler.registry.expandDataId(d, graph=dimensions), dataIds))
398 raise RuntimeError(
"No exposures given.")
403 for dataId
in dataIds:
404 record = dataId.records[
"exposure"]
405 if record.observation_type !=
"science":
406 if self.
config.ignoreNonScienceExposures:
409 raise RuntimeError(f
"Input exposure {dataId} has observation_type "
410 f
"{record.observation_type}, not 'science'.")
411 instruments.add(dataId[
"instrument"])
412 exposures.append(record)
414 self.
log.
info(
"No science exposures found after filtering.")
416 if len(instruments) > 1:
418 f
"All data IDs passed to DefineVisitsTask.run must be "
419 f
"from the same instrument; got {instruments}."
421 instrument, = instruments
424 visitSystemId, visitSystemName = self.groupExposures.getVisitSystem()
425 self.
log.
info(
"Registering visit_system %d: %s.", visitSystemId, visitSystemName)
426 self.
butler.registry.syncDimensionData(
428 {
"instrument": instrument,
"id": visitSystemId,
"name": visitSystemName}
431 self.
log.
info(
"Grouping %d exposure(s) into visits.", len(exposures))
432 definitions =
list(self.groupExposures.
group(exposures))
436 self.
log.
info(
"Computing regions and other metadata for %d visit(s).", len(definitions))
437 allRecords = mapFunc(
lambda d: self.
_buildVisitRecords(d, collections=collections), definitions)
440 for visitRecords
in allRecords:
441 with self.
butler.registry.transaction():
442 self.
butler.registry.insertDimensionData(
"visit", visitRecords.visit)
443 self.
butler.registry.insertDimensionData(
"visit_definition",
444 *visitRecords.visit_definition)
445 self.
butler.registry.insertDimensionData(
"visit_detector_region",
446 *visitRecords.visit_detector_region)
449 def _reduceOrNone(func, iterable):
450 """Apply a binary function to pairs of elements in an iterable until a
451 single value is returned, but return `None` if any element is `None` or
452 there are no elements.
466 visitSystemId = Field(
467 doc=(
"Integer ID of the visit_system implemented by this grouping "
472 visitSystemName = Field(
473 doc=(
"String name of the visit_system implemented by this grouping "
476 default=
"one-to-one",
482 """An exposure grouping algorithm that simply defines one visit for each
483 exposure, reusing the exposures identifiers for the visit.
486 ConfigClass = _GroupExposuresOneToOneConfig
488 def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
490 for exposure
in exposures:
492 instrument=exposure.instrument,
495 exposures=[exposure],
500 return (self.
config.visitSystemId, self.
config.visitSystemName)
504 visitSystemId = Field(
505 doc=(
"Integer ID of the visit_system implemented by this grouping "
510 visitSystemName = Field(
511 doc=(
"String name of the visit_system implemented by this grouping "
514 default=
"by-group-metadata",
520 """An exposure grouping algorithm that uses exposure.group_name and
523 This algorithm _assumes_ exposure.group_id (generally populated from
524 `astro_metadata_translator.ObservationInfo.visit_id`) is not just unique,
525 but disjoint from all `ObservationInfo.exposure_id` values - if it isn't,
526 it will be impossible to ever use both this grouping algorithm and the
527 one-to-one algorithm for a particular camera in the same data repository.
530 ConfigClass = _GroupExposuresOneToOneConfig
532 def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
534 groups = defaultdict(list)
535 for exposure
in exposures:
536 groups[exposure.group_name].
append(exposure)
537 for visitName, exposuresInGroup
in groups.items():
538 instrument = exposuresInGroup[0].instrument
539 visitId = exposuresInGroup[0].group_id
540 assert all(e.group_id == visitId
for e
in exposuresInGroup), \
541 "Grouping by exposure.group_name does not yield consistent group IDs"
543 exposures=exposuresInGroup)
547 return (self.
config.visitSystemId, self.
config.visitSystemName)
551 mergeExposures = Field(
552 doc=(
"If True, merge per-detector regions over all exposures in a "
553 "visit (via convex hull) instead of using the first exposure and "
554 "assuming its regions are valid for all others."),
559 doc=(
"Load the WCS for the detector with this ID. If None, use an "
560 "arbitrary detector (the first found in a query of the data "
561 "repository for each exposure (or all exposures, if "
562 "mergeExposures is True)."),
567 requireVersionedCamera = Field(
568 doc=(
"If True, raise LookupError if version camera geometry cannot be "
569 "loaded for an exposure. If False, use the nominal camera from "
570 "the Instrument class instead."),
579 """A visit region calculator that uses a single raw WCS and a camera to
580 project the bounding boxes of all detectors onto the sky, relating
581 different detectors by their positions in focal plane coordinates.
585 Most instruments should have their raw WCSs determined from a combination
586 of boresight angle, rotator angle, and camera geometry, and hence this
587 algorithm should produce stable results regardless of which detector the
588 raw corresponds to. If this is not the case (e.g. because a per-file FITS
589 WCS is used instead), either the ID of the detector should be fixed (see
590 the ``detectorId`` config parameter) or a different algorithm used.
593 ConfigClass = _ComputeVisitRegionsFromSingleRawWcsConfig
596 ) -> Dict[int, List[UnitVector3d]]:
597 """Compute the lists of unit vectors on the sphere that correspond to
598 the sky positions of detector corners.
602 exposure : `DimensionRecord`
603 Dimension record for the exposure.
604 collections : Any, optional
605 Collections to be searched for raws and camera geometry, overriding
606 ``self.butler.collections``.
607 Can be any of the types supported by the ``collections`` argument
608 to butler construction.
613 Dictionary mapping detector ID to a list of unit vectors on the
614 sphere representing that detector's corners projected onto the sky.
616 if collections
is None:
617 collections = self.
butler.collections
618 camera, versioned =
loadCamera(self.
butler, exposure.dataId, collections=collections)
619 if not versioned
and self.
config.requireVersionedCamera:
620 raise LookupError(f
"No versioned camera found for exposure {exposure.dataId}.")
621 if self.
config.detectorId
is None:
622 wcsRefs =
list(self.
butler.registry.queryDatasets(
"raw.wcs", dataId=exposure.dataId,
623 collections=collections))
625 raise LookupError(f
"No raw.wcs datasets found for data ID {exposure.dataId} "
626 f
"in collections {collections}.")
627 wcsDetector = camera[wcsRefs[0].dataId[
"detector"]]
628 wcs = self.
butler.getDirect(wcsRefs[0])
630 wcsDetector = camera[self.
config.detectorId]
631 wcs = self.
butler.get(
"raw.wcs", dataId=exposure.dataId, detector=self.
config.detectorId,
632 collections=collections)
633 fpToSky = wcsDetector.getTransform(FOCAL_PLANE, PIXELS).
then(wcs.getTransform())
635 for detector
in camera:
636 pixelsToSky = detector.getTransform(PIXELS, FOCAL_PLANE).
then(fpToSky)
637 pixCorners =
Box2D(detector.getBBox().dilatedBy(self.
config.padding)).getCorners()
638 bounds[detector.getId()] = [
639 skyCorner.getVector()
for skyCorner
in pixelsToSky.applyForward(pixCorners)
643 def compute(self, visit: VisitDefinitionData, *, collections: Any =
None
644 ) -> Tuple[Region, Dict[int, Region]]:
646 if self.
config.mergeExposures:
647 detectorBounds = defaultdict(list)
648 for exposure
in visit.exposures:
650 for detectorId, bounds
in exposureDetectorBounds.items():
651 detectorBounds[detectorId].extend(bounds)
656 for detectorId, bounds
in detectorBounds.items():
657 detectorRegions[detectorId] = ConvexPolygon.convexHull(bounds)
658 visitBounds.extend(bounds)
659 return ConvexPolygon.convexHull(visitBounds), detectorRegions