22 """Base class for writing Gen3 raw data ingest tests. 
   25 __all__ = (
"IngestTestBase",)
 
   34 from lsst.daf.butler 
import Butler
 
   35 from lsst.daf.butler.cli.butler 
import cli 
as butlerCli
 
   38 from .utils 
import getInstrument
 
   42     """Base class for tests of gen3 ingest. Subclass from this, then 
   43     `unittest.TestCase` to get a working test suite. 
   47     """Root path to ingest files into. Typically `obs_package/tests/`; the 
   48     actual directory will be a tempdir under this one. 
   52     """list of butler data IDs of files that should have been ingested.""" 
   55     """Full path to a file to ingest in tests.""" 
   57     rawIngestTask = 
"lsst.obs.base.RawIngestTask" 
   58     """The task to use in the Ingest test.""" 
   60     curatedCalibrationDatasetTypes = 
None 
   61     """List or tuple of Datasets types that should be present after calling 
   62     writeCuratedCalibrations. If `None` writeCuratedCalibrations will 
   63     not be called and the test will be skipped.""" 
   66     """The task to use to define visits from groups of exposures. 
   67     This is ignored if ``visits`` is `None`. 
   71     """A dictionary mapping visit data IDs the lists of exposure data IDs that 
   72     are associated with them. 
   73     If this is empty (but not `None`), visit definition will be run but no 
   74     visits will be expected (e.g. because no exposures are on-sky 
   79     """The name of the output run to use in tests. 
   85         """The fully qualified instrument class name. 
   90             The fully qualified instrument class name. 
   96         """The instrument class.""" 
  101         """The name of the instrument. 
  106             The name of the instrument. 
  119         if os.path.exists(self.
root):
 
  120             shutil.rmtree(self.
root, ignore_errors=
True)
 
  124         Test that RawIngestTask ingested the expected files. 
  128         files : `list` [`str`], or None 
  129             List of files to be ingested, or None to use ``self.file`` 
  132         datasets = butler.registry.queryDatasets(self.
outputRun, collections=...)
 
  133         self.assertEqual(len(
list(datasets)), len(self.
dataIds))
 
  135             exposure = butler.get(self.
outputRun, dataId)
 
  136             metadata = butler.get(
"raw.metadata", dataId)
 
  137             self.assertEqual(metadata.toDict(), exposure.getMetadata().toDict())
 
  142             wcs = butler.get(
"raw.wcs", dataId)
 
  143             self.assertEqual(wcs, exposure.getWcs())
 
  145             rawImage = butler.get(
"raw.image", dataId)
 
  146             self.assertEqual(rawImage.getBBox(), exposure.getBBox())
 
  151         """Check the state of the repository after ingest. 
  153         This is an optional hook provided for subclasses; by default it does 
  158         files : `list` [`str`], or None 
  159             List of files to be ingested, or None to use ``self.file`` 
  163     def _createRepo(self):
 
  164         """Use the Click `testing` module to call the butler command line api 
  165         to create a repository.""" 
  166         runner = click.testing.CliRunner()
 
  167         result = runner.invoke(butlerCli, [
"create", self.root])
 
  168         self.assertEqual(result.exit_code, 0, f
"output: {result.output} exception: {result.exception}")
 
  170     def _ingestRaws(self, transfer):
 
  171         """Use the Click `testing` module to call the butler command line api 
  177             The external data transfer type. 
  179         runner = click.testing.CliRunner()
 
  180         result = runner.invoke(butlerCli, [
"ingest-raws", self.root,
 
  181                                            "--output-run", self.outputRun,
 
  183                                            "--transfer", transfer,
 
  184                                            "--ingest-task", self.rawIngestTask])
 
  185         self.assertEqual(result.exit_code, 0, f
"output: {result.output} exception: {result.exception}")
 
  187     def _registerInstrument(self):
 
  188         """Use the Click `testing` module to call the butler command line api 
  189         to register the instrument.""" 
  190         runner = click.testing.CliRunner()
 
  191         result = runner.invoke(butlerCli, [
"register-instrument", self.root,
 
  192                                            "--instrument", self.instrumentClassName])
 
  193         self.assertEqual(result.exit_code, 0, f
"output: {result.output} exception: {result.exception}")
 
  195     def _writeCuratedCalibrations(self):
 
  196         """Use the Click `testing` module to call the butler command line api 
  197         to write curated calibrations.""" 
  198         runner = click.testing.CliRunner()
 
  199         result = runner.invoke(butlerCli, [
"write-curated-calibrations", self.root,
 
  200                                            "--instrument", self.instrumentName,
 
  201                                            "--output-run", self.outputRun])
 
  202         self.assertEqual(result.exit_code, 0, f
"output: {result.output} exception: {result.exception}")
 
  220         except PermissionError 
as err:
 
  221             raise unittest.SkipTest(
"Skipping hard-link test because input data" 
  222                                     " is on a different filesystem.") 
from err
 
  225         """Test that files already in the directory can be added to the 
  230         newPath = os.path.join(butler.datastore.root, os.path.basename(self.
file))
 
  231         os.symlink(os.path.abspath(self.
file), newPath)
 
  236         """Re-ingesting the same data into the repository should fail. 
  239         with self.assertRaises(Exception):
 
  243         """Test that we can ingest the curated calibrations""" 
  245             raise unittest.SkipTest(
"Class requests disabling of writeCuratedCalibrations test")
 
  252             with self.subTest(dtype=datasetTypeName, dataId=dataId):
 
  253                 datasets = 
list(butler.registry.queryDatasets(datasetTypeName, collections=...,
 
  255                 self.assertGreater(len(datasets), 0, f
"Checking {datasetTypeName}")
 
  259             self.skipTest(
"Expected visits were not defined.")
 
  270         visits = 
set(butler.registry.queryDimensions([
"visit"], expand=
True))
 
  271         self.assertCountEqual(visits, self.
visits.
keys())
 
  272         camera = instr.getCamera()
 
  273         for foundVisit, (expectedVisit, expectedExposures) 
in zip(visits, self.
visits.
items()):
 
  275             foundExposures = 
set(butler.registry.queryDimensions([
"exposure"], dataId=expectedVisit,
 
  277             self.assertCountEqual(foundExposures, expectedExposures)
 
  280             self.assertIsNotNone(foundVisit.region)
 
  281             detectorVisitDataIds = 
set(butler.registry.queryDimensions([
"visit", 
"detector"],
 
  282                                                                        dataId=expectedVisit,
 
  284             self.assertEqual(len(detectorVisitDataIds), len(camera))
 
  285             for dataId 
in detectorVisitDataIds:
 
  286                 self.assertTrue(foundVisit.region.contains(dataId.region))