22 """Base class for writing Gen3 raw data ingest tests.
25 __all__ = (
"IngestTestBase",)
34 from lsst.daf.butler
import Butler, ButlerURI
35 from lsst.daf.butler.cli.butler
import cli
as butlerCli
36 from lsst.daf.butler.cli.utils
import LogCliRunner
39 from .utils
import getInstrument
44 """Base class for tests of gen3 ingest. Subclass from this, then
45 `unittest.TestCase` to get a working test suite.
49 """Root path to ingest files into. Typically `obs_package/tests/`; the
50 actual directory will be a tempdir under this one.
54 """list of butler data IDs of files that should have been ingested."""
57 """Full path to a file to ingest in tests."""
60 """The lsst.afw.image.FilterLabel that should be returned by the above
63 rawIngestTask =
"lsst.obs.base.RawIngestTask"
64 """The task to use in the Ingest test."""
66 curatedCalibrationDatasetTypes =
None
67 """List or tuple of Datasets types that should be present after calling
68 writeCuratedCalibrations. If `None` writeCuratedCalibrations will
69 not be called and the test will be skipped."""
72 """The task to use to define visits from groups of exposures.
73 This is ignored if ``visits`` is `None`.
77 """A dictionary mapping visit data IDs the lists of exposure data IDs that
78 are associated with them.
79 If this is empty (but not `None`), visit definition will be run but no
80 visits will be expected (e.g. because no exposures are on-sky
87 """The fully qualified instrument class name.
92 The fully qualified instrument class name.
98 """The instrument class."""
103 """The name of the instrument.
108 The name of the instrument.
127 if os.path.exists(cls.
rootroot):
128 shutil.rmtree(cls.
rootroot, ignore_errors=
True)
132 Test that RawIngestTask ingested the expected files.
136 files : `list` [`str`], or None
137 List of files to be ingested, or None to use ``self.file``
138 fullCheck : `bool`, optional
139 If `True`, read the full raw dataset and check component
140 consistency. If `False` check that a component can be read
141 but do not read the entire raw exposure.
145 Reading all the ingested test data can be expensive. The code paths
146 for reading the second raw are the same as reading the first so
147 we do not gain anything by doing full checks of everything.
148 Only read full pixel data for first dataset from file.
149 Don't even do that if we are requested not to by the caller.
150 This only really affects files that contain multiple datasets.
153 datasets =
list(butler.registry.queryDatasets(
"raw", collections=self.
outputRunoutputRun))
154 self.assertEqual(len(datasets), len(self.
dataIdsdataIds))
158 datasetUri = butler.getURI(datasets[0])
159 self.assertIsNotNone(datasetUri.relative_to(butler.datastore.root))
161 for dataId
in self.
dataIdsdataIds:
163 metadata = butler.get(
"raw.metadata", dataId)
167 exposure = butler.get(
"raw", dataId)
168 self.assertEqual(metadata.toDict(), exposure.getMetadata().toDict())
173 wcs = butler.get(
"raw.wcs", dataId)
174 self.assertEqual(wcs, exposure.getWcs())
176 rawImage = butler.get(
"raw.image", dataId)
177 self.assertEqual(rawImage.getBBox(), exposure.getBBox())
180 filterLabel = butler.get(
"raw.filterLabel", dataId)
181 self.assertEqual(filterLabel, self.
filterLabelfilterLabel)
186 """Check the state of the repository after ingest.
188 This is an optional hook provided for subclasses; by default it does
193 files : `list` [`str`], or None
194 List of files to be ingested, or None to use ``self.file``
199 def _createRepo(cls):
200 """Use the Click `testing` module to call the butler command line api
201 to create a repository."""
202 runner = LogCliRunner()
203 result = runner.invoke(butlerCli, [
"create", cls.
rootroot])
205 assert result.exit_code == 0, f
"output: {result.output} exception: {result.exception}"
207 def _ingestRaws(self, transfer, file=None):
208 """Use the Click `testing` module to call the butler command line api
214 The external data transfer type.
216 Path to a file to ingest instead of the default associated with
221 runner = LogCliRunner()
222 result = runner.invoke(butlerCli, [
"ingest-raws", self.
rootroot, file,
224 "--transfer", transfer,
226 self.assertEqual(result.exit_code, 0, f
"output: {result.output} exception: {result.exception}")
229 def _registerInstrument(cls):
230 """Use the Click `testing` module to call the butler command line api
231 to register the instrument."""
232 runner = LogCliRunner()
233 result = runner.invoke(butlerCli, [
"register-instrument", cls.
rootroot, cls.
instrumentClassNameinstrumentClassName])
235 assert result.exit_code == 0, f
"output: {result.output} exception: {result.exception}"
237 def _writeCuratedCalibrations(self):
238 """Use the Click `testing` module to call the butler command line api
239 to write curated calibrations."""
240 runner = LogCliRunner()
241 result = runner.invoke(butlerCli, [
"write-curated-calibrations", self.
rootroot, self.
instrumentNameinstrumentName])
242 self.assertEqual(result.exit_code, 0, f
"output: {result.output} exception: {result.exception}")
256 srcUri = ButlerURI(self.
filefile)
258 datasets =
list(butler.registry.queryDatasets(
"raw", collections=self.
outputRunoutputRun))
259 datastoreUri = butler.getURI(datasets[0])
260 self.assertEqual(datastoreUri, srcUri)
276 except (AssertionError, PermissionError)
as err:
277 raise unittest.SkipTest(
"Skipping hard-link test because input data"
278 " is on a different filesystem.")
from err
282 """Test that files already in the directory can be added to the
287 pathInStore =
"prefix-" + os.path.basename(self.
filefile)
288 newPath = butler.datastore.root.join(pathInStore)
289 os.symlink(os.path.abspath(self.
filefile), newPath.ospath)
290 self.
_ingestRaws_ingestRaws(transfer=
"auto", file=newPath.ospath)
298 uri = butler.getURI(
"raw", self.
dataIdsdataIds[0])
299 self.assertEqual(uri.relative_to(butler.datastore.root), pathInStore)
302 """Re-ingesting the same data into the repository should fail.
305 with self.assertRaises(Exception):
309 """Test that we can ingest the curated calibrations, and read them
310 with `loadCamera` both before and after.
313 raise unittest.SkipTest(
"Class requests disabling of writeCuratedCalibrations test")
315 butler = Butler(self.
rootroot, writeable=
False)
316 collection = self.
instrumentClassinstrumentClass.makeCalibrationCollectionName()
320 with self.assertRaises(LookupError):
321 lsst.obs.base.loadCamera(butler, {
"exposure": 0}, collections=collection)
328 camera, isVersioned = lsst.obs.base.loadCamera(butler, self.
dataIdsdataIds[0], collections=collection)
329 self.assertFalse(isVersioned)
338 butler = Butler(self.
rootroot, writeable=
False)
341 with self.subTest(dtype=datasetTypeName):
343 butler.registry.queryDatasetAssociations(
345 collections=collection,
348 self.assertGreater(len(found), 0, f
"Checking {datasetTypeName}")
351 camera, isVersioned = lsst.obs.base.loadCamera(butler, self.
dataIdsdataIds[0], collections=collection)
352 self.assertTrue(isVersioned)
356 if self.
visitsvisits
is None:
357 self.skipTest(
"Expected visits were not defined.")
364 script.defineVisits(self.
rootroot, config_file=
None, collections=self.
outputRunoutputRun,
369 visits = butler.registry.queryDataIds([
"visit"]).expanded().toSet()
370 self.assertCountEqual(visits, self.
visitsvisits.
keys())
372 camera = instr.getCamera()
373 for foundVisit, (expectedVisit, expectedExposures)
in zip(visits, self.
visitsvisits.
items()):
375 foundExposures = butler.registry.queryDataIds([
"exposure"], dataId=expectedVisit
377 self.assertCountEqual(foundExposures, expectedExposures)
380 self.assertIsNotNone(foundVisit.region)
381 detectorVisitDataIds = butler.registry.queryDataIds([
"visit",
"detector"], dataId=expectedVisit
383 self.assertEqual(len(detectorVisitDataIds), len(camera))
384 for dataId
in detectorVisitDataIds:
385 self.assertTrue(foundVisit.region.contains(dataId.region))
std::vector< SchemaItem< Flag > > * items
An immutable representation of a camera.
def checkRepo(self, files=None)
def instrumentClassName(self)
def _writeCuratedCalibrations(self)
def _registerInstrument(cls)
def testDefineVisits(self)
curatedCalibrationDatasetTypes
def testWriteCuratedCalibrations(self)
def testFailOnConflict(self)
def _ingestRaws(self, transfer, file=None)
def verifyIngest(self, files=None, cli=False, fullCheck=False)
def instrumentClass(self)
std::string const & getName() const noexcept
Return a filter's name.
def getInstrument(instrumentName, registry=None)
daf::base::PropertyList * list