26     "GroupExposuresConfig",
 
   28     "VisitDefinitionData",
 
   31 from abc 
import ABCMeta, abstractmethod
 
   32 from collections 
import defaultdict
 
   34 from typing 
import Any, Dict, Iterable, List, Optional, Tuple
 
   35 from multiprocessing 
import Pool
 
   37 from lsst.daf.butler 
import (
 
   46 from lsst.pex.config 
import Config, Field, makeRegistry, registerConfigurable
 
   49 from lsst.sphgeom import ConvexPolygon, Region, UnitVector3d
 
   50 from ._instrument 
import loadCamera
 
   53 @dataclasses.dataclass
 
   55     """Struct representing a group of exposures that will be used to define a 
   60     """Name of the instrument this visit will be associated with. 
   64     """Integer ID of the visit. 
   66     This must be unique across all visit systems for the instrument. 
   70     """String name for the visit. 
   72     This must be unique across all visit systems for the instrument. 
   75     exposures: List[DimensionRecord] = dataclasses.field(default_factory=list)
 
   76     """Dimension records for the exposures that are part of this visit. 
   80 @dataclasses.dataclass
 
   82     """Struct containing the dimension records associated with a visit. 
   85     visit: DimensionRecord
 
   86     """Record for the 'visit' dimension itself. 
   89     visit_definition: List[DimensionRecord]
 
   90     """Records for 'visit_definition', which relates 'visit' to 'exposure'. 
   93     visit_detector_region: List[DimensionRecord]
 
   94     """Records for 'visit_detector_region', which associates the combination 
   95     of a 'visit' and a 'detector' with a region on the sky. 
  103 class GroupExposuresTask(Task, metaclass=ABCMeta):
 
  104     """Abstract base class for the subtask of `DefineVisitsTask` that is 
  105     responsible for grouping exposures into visits. 
  107     Subclasses should be registered with `GroupExposuresTask.registry` to 
  108     enable use by `DefineVisitsTask`, and should generally correspond to a 
  109     particular 'visit_system' dimension value.  They are also responsible for 
  110     defining visit IDs and names that are unique across all visit systems in 
  111     use by an instrument. 
  115     config : `GroupExposuresConfig` 
  116         Configuration information. 
  118         Additional keyword arguments forwarded to the `Task` constructor. 
  120     def __init__(self, config: GroupExposuresConfig, **kwargs: Any):
 
  121         Task.__init__(self, config=config, **kwargs)
 
  123     ConfigClass = GroupExposuresConfig
 
  125     _DefaultName = 
"groupExposures" 
  128         doc=
"Registry of algorithms for grouping exposures into visits.",
 
  129         configBaseType=GroupExposuresConfig,
 
  133     def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
 
  134         """Group the given exposures into visits. 
  138         exposures : `list` [ `DimensionRecord` ] 
  139             DimensionRecords (for the 'exposure' dimension) describing the 
  144         visits : `Iterable` [ `VisitDefinitionData` ] 
  145             Structs identifying the visits and the exposures associated with 
  146             them.  This may be an iterator or a container. 
  148         raise NotImplementedError()
 
  152         """Return identifiers for the 'visit_system' dimension this 
  153         algorithm implements. 
  158             Integer ID for the visit system (given an instrument). 
  160             Unique string identifier for the visit system (given an 
  163         raise NotImplementedError()
 
  170         doc=(
"Pad raw image bounding boxes with specified number of pixels " 
  171              "when calculating their (conservatively large) region on the " 
  177     """Abstract base class for the subtask of `DefineVisitsTask` that is 
  178     responsible for extracting spatial regions for visits and visit+detector 
  181     Subclasses should be registered with `ComputeVisitRegionsTask.registry` to 
  182     enable use by `DefineVisitsTask`. 
  186     config : `ComputeVisitRegionsConfig` 
  187         Configuration information. 
  189         Additional keyword arguments forwarded to the `Task` constructor. 
  191     def __init__(self, config: ComputeVisitRegionsConfig, *, butler: Butler, **kwargs: Any):
 
  192         Task.__init__(self, config=config, **kwargs)
 
  195     ConfigClass = ComputeVisitRegionsConfig
 
  197     _DefaultName = 
"computeVisitRegions" 
  200         doc=(
"Registry of algorithms for computing on-sky regions for visits " 
  201              "and visit+detector combinations."),
 
  202         configBaseType=ComputeVisitRegionsConfig,
 
  206     def compute(self, visit: VisitDefinitionData, *, collections: Any = 
None 
  207                 ) -> Tuple[Region, Dict[int, Region]]:
 
  208         """Compute regions for the given visit and all detectors in that visit. 
  212         visit : `VisitDefinitionData` 
  213             Struct describing the visit and the exposures associated with it. 
  214         collections : Any, optional 
  215             Collections to be searched for raws and camera geometry, overriding 
  216             ``self.butler.collections``. 
  217             Can be any of the types supported by the ``collections`` argument 
  218             to butler construction. 
  222         visitRegion : `lsst.sphgeom.Region` 
  223             Region for the full visit. 
  224         visitDetectorRegions : `dict` [ `int`, `lsst.sphgeom.Region` ] 
  225             Dictionary mapping detector ID to the region for that detector. 
  226             Should include all detectors in the visit. 
  228         raise NotImplementedError()
 
  232     groupExposures = GroupExposuresTask.registry.makeField(
 
  233         doc=
"Algorithm for grouping exposures into visits.",
 
  234         default=
"one-to-one",
 
  236     computeVisitRegions = ComputeVisitRegionsTask.registry.makeField(
 
  237         doc=
"Algorithm from computing visit and visit+detector regions.",
 
  238         default=
"single-raw-wcs",
 
  240     ignoreNonScienceExposures = Field(
 
  241         doc=(
"If True, silently ignore input exposures that do not have " 
  242              "observation_type=SCIENCE.  If False, raise an exception if one " 
  251     """Driver Task for defining visits (and their spatial regions) in Gen3 
  256     config : `DefineVisitsConfig` 
  257         Configuration for the task. 
  258     butler : `~lsst.daf.butler.Butler` 
  259         Writeable butler instance.  Will be used to read `raw.wcs` and `camera` 
  260         datasets and insert/sync dimension data. 
  262         Additional keyword arguments are forwarded to the `lsst.pipe.base.Task` 
  267     Each instance of `DefineVisitsTask` reads from / writes to the same Butler. 
  268     Each invocation of `DefineVisitsTask.run` processes an independent group of 
  269     exposures into one or more new vists, all belonging to the same visit 
  270     system and instrument. 
  272     The actual work of grouping exposures and computing regions is delegated 
  273     to pluggable subtasks (`GroupExposuresTask` and `ComputeVisitRegionsTask`), 
  274     respectively.  The defaults are to create one visit for every exposure, 
  275     and to use exactly one (arbitrary) detector-level raw dataset's WCS along 
  276     with camera geometry to compute regions for all detectors.  Other 
  277     implementations can be created and configured for instruments for which 
  278     these choices are unsuitable (e.g. because visits and exposures are not 
  279     one-to-one, or because ``raw.wcs`` datasets for different detectors may not 
  280     be consistent with camera geomery). 
  282     It is not necessary in general to ingest all raws for an exposure before 
  283     defining a visit that includes the exposure; this depends entirely on the 
  284     `ComputeVisitRegionTask` subclass used.  For the default configuration, 
  285     a single raw for each exposure is sufficient. 
  287     def __init__(self, config: Optional[DefineVisitsConfig] = 
None, *, butler: Butler, **kwargs: Any):
 
  295     ConfigClass = DefineVisitsConfig
 
  297     _DefaultName = 
"defineVisits" 
  299     def _buildVisitRecords(self, definition: VisitDefinitionData, *,
 
  300                            collections: Any = 
None) -> _VisitRecords:
 
  301         """Build the DimensionRecords associated with a visit. 
  305         definition : `VisitDefinition` 
  306             Struct with identifiers for the visit and records for its 
  307             constituent exposures. 
  308         collections : Any, optional 
  309             Collections to be searched for raws and camera geometry, overriding 
  310             ``self.butler.collections``. 
  311             Can be any of the types supported by the ``collections`` argument 
  312             to butler construction. 
  316         records : `_VisitRecords` 
  317             Struct containing DimensionRecords for the visit, including 
  318             associated dimension elements. 
  321         visitRegion, visitDetectorRegions = self.computeVisitRegions.compute(definition,
 
  322                                                                              collections=collections)
 
  325             begin=_reduceOrNone(min, (e.timespan.begin 
for e 
in definition.exposures)),
 
  326             end=_reduceOrNone(max, (e.timespan.end 
for e 
in definition.exposures)),
 
  328         exposure_time = _reduceOrNone(sum, (e.exposure_time 
for e 
in definition.exposures))
 
  329         physical_filter = _reduceOrNone(
lambda a, b: a 
if a == b 
else None,
 
  330                                         (e.physical_filter 
for e 
in definition.exposures))
 
  333             visit=self.
universe[
"visit"].RecordClass.fromDict({
 
  334                 "instrument": definition.instrument,
 
  336                 "name": definition.name,
 
  337                 "physical_filter": physical_filter,
 
  338                 "visit_system": self.groupExposures.getVisitSystem()[0],
 
  339                 "exposure_time": exposure_time,
 
  340                 TIMESPAN_FIELD_SPECS.begin.name: timespan.begin,
 
  341                 TIMESPAN_FIELD_SPECS.end.name: timespan.end,
 
  342                 "region": visitRegion,
 
  348                 self.
universe[
"visit_definition"].RecordClass.fromDict({
 
  349                     "instrument": definition.instrument,
 
  350                     "visit": definition.id,
 
  351                     "exposure": exposure.id,
 
  352                     "visit_system": self.groupExposures.getVisitSystem()[0],
 
  354                 for exposure 
in definition.exposures
 
  356             visit_detector_region=[
 
  357                 self.
universe[
"visit_detector_region"].RecordClass.fromDict({
 
  358                     "instrument": definition.instrument,
 
  359                     "visit": definition.id,
 
  360                     "detector": detectorId,
 
  361                     "region": detectorRegion,
 
  363                 for detectorId, detectorRegion 
in visitDetectorRegions.items()
 
  367     def run(self, dataIds: Iterable[DataId], *,
 
  368             pool: Optional[Pool] = 
None,
 
  370             collections: Optional[str] = 
None):
 
  371         """Add visit definitions to the registry for the given exposures. 
  375         dataIds : `Iterable` [ `dict` or `DataCoordinate` ] 
  376             Exposure-level data IDs.  These must all correspond to the same 
  377             instrument, and are expected to be on-sky science exposures. 
  378         pool : `multiprocessing.Pool`, optional 
  379             If not `None`, a process pool with which to parallelize some 
  381         processes : `int`, optional 
  382             The number of processes to use.  Ignored if ``pool`` is not `None`. 
  383         collections : Any, optional 
  384             Collections to be searched for raws and camera geometry, overriding 
  385             ``self.butler.collections``. 
  386             Can be any of the types supported by the ``collections`` argument 
  387             to butler construction. 
  390         if pool 
is None and processes > 1:
 
  391             pool = Pool(processes)
 
  392         mapFunc = map 
if pool 
is None else pool.imap_unordered
 
  394         self.
log.
info(
"Preprocessing data IDs.")
 
  395         dimensions = DimensionGraph(self.
universe, names=[
"exposure"])
 
  396         dataIds = 
set(mapFunc(
lambda d: self.
butler.registry.expandDataId(d, graph=dimensions), dataIds))
 
  398             raise RuntimeError(
"No exposures given.")
 
  403         for dataId 
in dataIds:
 
  404             record = dataId.records[
"exposure"]
 
  405             if record.observation_type != 
"science":
 
  406                 if self.
config.ignoreNonScienceExposures:
 
  409                     raise RuntimeError(f
"Input exposure {dataId} has observation_type " 
  410                                        f
"{record.observation_type}, not 'science'.")
 
  411             instruments.add(dataId[
"instrument"])
 
  412             exposures.append(record)
 
  414             self.
log.
info(
"No science exposures found after filtering.")
 
  416         if len(instruments) > 1:
 
  418                 f
"All data IDs passed to DefineVisitsTask.run must be " 
  419                 f
"from the same instrument; got {instruments}." 
  421         instrument, = instruments
 
  424         visitSystemId, visitSystemName = self.groupExposures.getVisitSystem()
 
  425         self.
log.
info(
"Registering visit_system %d: %s.", visitSystemId, visitSystemName)
 
  426         self.
butler.registry.syncDimensionData(
 
  428             {
"instrument": instrument, 
"id": visitSystemId, 
"name": visitSystemName}
 
  431         self.
log.
info(
"Grouping %d exposure(s) into visits.", len(exposures))
 
  432         definitions = 
list(self.groupExposures.
group(exposures))
 
  436         self.
log.
info(
"Computing regions and other metadata for %d visit(s).", len(definitions))
 
  437         allRecords = mapFunc(
lambda d: self.
_buildVisitRecords(d, collections=collections), definitions)
 
  440         for visitRecords 
in allRecords:
 
  441             with self.
butler.registry.transaction():
 
  442                 self.
butler.registry.insertDimensionData(
"visit", visitRecords.visit)
 
  443                 self.
butler.registry.insertDimensionData(
"visit_definition",
 
  444                                                          *visitRecords.visit_definition)
 
  445                 self.
butler.registry.insertDimensionData(
"visit_detector_region",
 
  446                                                          *visitRecords.visit_detector_region)
 
  449 def _reduceOrNone(func, iterable):
 
  450     """Apply a binary function to pairs of elements in an iterable until a 
  451     single value is returned, but return `None` if any element is `None` or 
  452     there are no elements. 
  466     visitSystemId = Field(
 
  467         doc=(
"Integer ID of the visit_system implemented by this grouping " 
  472     visitSystemName = Field(
 
  473         doc=(
"String name of the visit_system implemented by this grouping " 
  476         default=
"one-to-one",
 
  482     """An exposure grouping algorithm that simply defines one visit for each 
  483     exposure, reusing the exposures identifiers for the visit. 
  486     ConfigClass = _GroupExposuresOneToOneConfig
 
  488     def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
 
  490         for exposure 
in exposures:
 
  492                 instrument=exposure.instrument,
 
  495                 exposures=[exposure],
 
  500         return (self.
config.visitSystemId, self.
config.visitSystemName)
 
  504     visitSystemId = Field(
 
  505         doc=(
"Integer ID of the visit_system implemented by this grouping " 
  510     visitSystemName = Field(
 
  511         doc=(
"String name of the visit_system implemented by this grouping " 
  514         default=
"by-group-metadata",
 
  520     """An exposure grouping algorithm that uses exposure.group_name and 
  523     This algorithm _assumes_ exposure.group_id (generally populated from 
  524     `astro_metadata_translator.ObservationInfo.visit_id`) is not just unique, 
  525     but disjoint from all `ObservationInfo.exposure_id` values - if it isn't, 
  526     it will be impossible to ever use both this grouping algorithm and the 
  527     one-to-one algorithm for a particular camera in the same data repository. 
  530     ConfigClass = _GroupExposuresOneToOneConfig
 
  532     def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
 
  534         groups = defaultdict(list)
 
  535         for exposure 
in exposures:
 
  536             groups[exposure.group_name].
append(exposure)
 
  537         for visitName, exposuresInGroup 
in groups.items():
 
  538             instrument = exposuresInGroup[0].instrument
 
  539             visitId = exposuresInGroup[0].group_id
 
  540             assert all(e.group_id == visitId 
for e 
in exposuresInGroup), \
 
  541                 "Grouping by exposure.group_name does not yield consistent group IDs" 
  543                                       exposures=exposuresInGroup)
 
  547         return (self.
config.visitSystemId, self.
config.visitSystemName)
 
  551     mergeExposures = Field(
 
  552         doc=(
"If True, merge per-detector regions over all exposures in a " 
  553              "visit (via convex hull) instead of using the first exposure and " 
  554              "assuming its regions are valid for all others."),
 
  559         doc=(
"Load the WCS for the detector with this ID.  If None, use an " 
  560              "arbitrary detector (the first found in a query of the data " 
  561              "repository for each exposure (or all exposures, if " 
  562              "mergeExposures is True)."),
 
  567     requireVersionedCamera = Field(
 
  568         doc=(
"If True, raise LookupError if version camera geometry cannot be " 
  569              "loaded for an exposure.  If False, use the nominal camera from " 
  570              "the Instrument class instead."),
 
  579     """A visit region calculator that uses a single raw WCS and a camera to 
  580     project the bounding boxes of all detectors onto the sky, relating 
  581     different detectors by their positions in focal plane coordinates. 
  585     Most instruments should have their raw WCSs determined from a combination 
  586     of boresight angle, rotator angle, and camera geometry, and hence this 
  587     algorithm should produce stable results regardless of which detector the 
  588     raw corresponds to.  If this is not the case (e.g. because a per-file FITS 
  589     WCS is used instead), either the ID of the detector should be fixed (see 
  590     the ``detectorId`` config parameter) or a different algorithm used. 
  593     ConfigClass = _ComputeVisitRegionsFromSingleRawWcsConfig
 
  596                               ) -> Dict[int, List[UnitVector3d]]:
 
  597         """Compute the lists of unit vectors on the sphere that correspond to 
  598         the sky positions of detector corners. 
  602         exposure : `DimensionRecord` 
  603             Dimension record for the exposure. 
  604         collections : Any, optional 
  605             Collections to be searched for raws and camera geometry, overriding 
  606             ``self.butler.collections``. 
  607             Can be any of the types supported by the ``collections`` argument 
  608             to butler construction. 
  613             Dictionary mapping detector ID to a list of unit vectors on the 
  614             sphere representing that detector's corners projected onto the sky. 
  616         if collections 
is None:
 
  617             collections = self.
butler.collections
 
  618         camera, versioned = 
loadCamera(self.
butler, exposure.dataId, collections=collections)
 
  619         if not versioned 
and self.
config.requireVersionedCamera:
 
  620             raise LookupError(f
"No versioned camera found for exposure {exposure.dataId}.")
 
  621         if self.
config.detectorId 
is None:
 
  622             wcsRefs = 
list(self.
butler.registry.queryDatasets(
"raw.wcs", dataId=exposure.dataId,
 
  623                                                               collections=collections))
 
  625                 raise LookupError(f
"No raw.wcs datasets found for data ID {exposure.dataId} " 
  626                                   f
"in collections {collections}.")
 
  627             wcsDetector = camera[wcsRefs[0].dataId[
"detector"]]
 
  628             wcs = self.
butler.getDirect(wcsRefs[0])
 
  630             wcsDetector = camera[self.
config.detectorId]
 
  631             wcs = self.
butler.get(
"raw.wcs", dataId=exposure.dataId, detector=self.
config.detectorId,
 
  632                                   collections=collections)
 
  633         fpToSky = wcsDetector.getTransform(FOCAL_PLANE, PIXELS).
then(wcs.getTransform())
 
  635         for detector 
in camera:
 
  636             pixelsToSky = detector.getTransform(PIXELS, FOCAL_PLANE).
then(fpToSky)
 
  637             pixCorners = 
Box2D(detector.getBBox().dilatedBy(self.
config.padding)).getCorners()
 
  638             bounds[detector.getId()] = [
 
  639                 skyCorner.getVector() 
for skyCorner 
in pixelsToSky.applyForward(pixCorners)
 
  643     def compute(self, visit: VisitDefinitionData, *, collections: Any = 
None 
  644                 ) -> Tuple[Region, Dict[int, Region]]:
 
  646         if self.
config.mergeExposures:
 
  647             detectorBounds = defaultdict(list)
 
  648             for exposure 
in visit.exposures:
 
  650                 for detectorId, bounds 
in exposureDetectorBounds.items():
 
  651                     detectorBounds[detectorId].extend(bounds)
 
  656         for detectorId, bounds 
in detectorBounds.items():
 
  657             detectorRegions[detectorId] = ConvexPolygon.convexHull(bounds)
 
  658             visitBounds.extend(bounds)
 
  659         return ConvexPolygon.convexHull(visitBounds), detectorRegions