350 filterNames):
351 """Run each of the plugins on the catalog.
352
353 For catalog column names see the lsst.cat schema definitions for the
354 DiaObject and DiaSource tables (http://github.com/lsst/cat).
355
356 Parameters
357 ----------
358 diaObjectCat : `pandas.DataFrame`
359 DiaObjects to update values of and append new objects to. DataFrame
360 should be indexed on "diaObjectId"
361 diaSourceCat : `pandas.DataFrame`
362 DiaSources associated with the DiaObjects in diaObjectCat.
363 DataFrame must be indexed on
364 ["diaObjectId", "band", "diaSourceId"]`
365 updatedDiaObjectIds : `numpy.ndarray`
366 Integer ids of the DiaObjects to update and create.
367 filterNames : `list` of `str`
368 List of string names of filters to be being processed.
369
370 Returns
371 -------
372 returnStruct : `lsst.pipe.base.Struct`
373 Struct containing:
374
375 ``diaObjectCat``
376 Full set of DiaObjects including both un-updated and
377 updated/new DiaObjects (`pandas.DataFrame`).
378 ``updatedDiaObjects``
379 Catalog of DiaObjects that were updated or created by this
380 task (`pandas.DataFrame`).
381
382 Raises
383 ------
384 KeyError
385 Raises if `pandas.DataFrame` indexing is not properly set.
386 """
387
388 diaObjectsToUpdate = diaObjectCat.loc[updatedDiaObjectIds, :]
389 self.log.info("Calculating summary stats for %i DiaObjects",
390 len(diaObjectsToUpdate))
391
392 updatingDiaSources = diaSourceCat.loc[updatedDiaObjectIds, :]
393 diaSourcesGB = updatingDiaSources.groupby(level=0)
394 for runlevel in sorted(self.executionDict):
395 for plug in self.executionDict[runlevel].single:
396 if plug.needsFilter:
397 continue
398 for updatedDiaObjectId in updatedDiaObjectIds:
399
400
401 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
402
403
404 with CCContext(plug, updatedDiaObjectId, self.log):
405
406
407
408 plug.calculate(diaObjects=diaObjectsToUpdate,
409 diaObjectId=updatedDiaObjectId,
410 diaSources=objDiaSources,
411 filterDiaSources=None,
412 band=None)
413 for plug in self.executionDict[runlevel].multi:
414 if plug.needsFilter:
415 continue
416 with CCContext(plug, diaObjectsToUpdate, self.log):
417 plug.calculate(diaObjects=diaObjectsToUpdate,
418 diaSources=diaSourcesGB,
419 filterDiaSources=None,
420 band=None)
421
422 for band in filterNames:
423 try:
424 updatingFilterDiaSources = updatingDiaSources.loc[
425 (slice(None), band), :
426 ]
427 except KeyError:
428 self.log.warning("No DiaSource data with fitler=%s. "
429 "Continuing...", band)
430 continue
431
432 filterDiaSourcesGB = updatingFilterDiaSources.groupby(level=0)
433
434 for runlevel in sorted(self.executionDict):
435 for plug in self.executionDict[runlevel].single:
436 if not plug.needsFilter:
437 continue
438 for updatedDiaObjectId in updatedDiaObjectIds:
439
440
441 objDiaSources = updatingDiaSources.loc[updatedDiaObjectId]
442
443
444 try:
445 filterObjDiaSources = objDiaSources.loc[band]
446 except KeyError:
447 self.log.warning(
448 "DiaObjectId={updatedDiaObjectId} has no "
449 "DiaSources for filter=%s. "
450 "Continuing...", band)
451 with CCContext(plug, updatedDiaObjectId, self.log):
452
453
454
455 plug.calculate(diaObjects=diaObjectsToUpdate,
456 diaObjectId=updatedDiaObjectId,
457 diaSources=objDiaSources,
458 filterDiaSources=filterObjDiaSources,
459 band=band)
460 for plug in self.executionDict[runlevel].multi:
461 if not plug.needsFilter:
462 continue
463 with CCContext(plug, diaObjectsToUpdate, self.log):
464 plug.calculate(diaObjects=diaObjectsToUpdate,
465 diaSources=diaSourcesGB,
466 filterDiaSources=filterDiaSourcesGB,
467 band=band)
468
469
470
471 diaObjectCat.loc[updatedDiaObjectIds, :] = diaObjectsToUpdate
472 return lsst.pipe.base.Struct(
473 diaObjectCat=diaObjectCat,
474 updatedDiaObjects=diaObjectsToUpdate)
475