22__all__ = [
"ConvertRefcatManager", 
"ConvertGaiaManager"]
 
   24from ctypes 
import c_int
 
   30import astropy.units 
as u
 
   41COUNTER = multiprocessing.Value(c_int, 0)
 
   43FILE_PROGRESS = multiprocessing.Value(c_int, 0)
 
   47    """Placeholder for ConfigurableField validation; refcat convert is 
   48    configured by the parent convert Task. 
   55    Convert a reference catalog from external files into the LSST HTM sharded
 
   56    format, using a multiprocessing Pool to speed up the work.
 
   60    filenames : `dict` [`int`, `str`]
 
   61        The HTM pixel id 
and filenames to convert the catalog into.
 
   63        The Task configuration holding the field names.
 
   64    file_reader : `lsst.pipe.base.Task`
 
   65        The file reader to use to load the files.
 
   67        The 
class used 
to compute the HTM pixel per coordinate.
 
   69        The schema of the output catalog.
 
   71        The mapping 
from output field names to keys 
in the Schema.
 
   72    htmRange : `tuple` [`int`]
 
   73        The start 
and end HTM pixel ids.
 
   74    addRefCatMetadata : callable
 
   75        A function called to add extra metadata to each output Catalog.
 
   77        The log to send messages to.
 
   79    _flags = ['photometric', 
'resolved', 
'variable']
 
   80    _DefaultName = 
'convertRefcatManager' 
   81    ConfigClass = ConvertRefcatManagerConfig
 
   83    def __init__(self, filenames, config, file_reader, indexer,
 
   84                 schema, key_map, htmRange, addRefCatMetadata, log):
 
   94        if self.
config.coord_err_unit 
is not None:
 
   98    def run(self, inputFiles):
 
   99        """Index a set of input files from a reference catalog, and write the 
  100        output to the appropriate filenames, in parallel.
 
  105            A list of file paths to read data 
from.
 
  109        output : `dict` [`int`, `str`]
 
  110            The htm ids 
and the filenames that were written to.
 
  112        global COUNTER, FILE_PROGRESS
 
  115        with multiprocessing.Manager() 
as manager:
 
  117            FILE_PROGRESS.value = 0
 
  118            fileLocks = manager.dict()
 
  121                fileLocks[i] = manager.Lock()
 
  122            self.
log.info(
"File locks created.")
 
  123            with multiprocessing.Pool(self.
config.n_processes) 
as pool:
 
  124                result = pool.starmap(self.
_convertOneFile, zip(inputFiles, itertools.repeat(fileLocks)))
 
  125            return {id: self.
filenames[id] 
for item 
in result 
for id 
in item}
 
  127    def _convertOneFile(self, filename, fileLocks):
 
  128        """Read and process one file, and write its records to the correct 
  129        indexed files, while handling exceptions 
in a useful way so that they
 
  130        don
't get swallowed by the multiprocess pool. 
  136        fileLocks : `dict` [`int`, `multiprocessing.Lock`] 
  137            A Lock for each HTM pixel; each pixel gets one file written, 
and 
  138            we need to block when one process 
is accessing that file.
 
  142        pixels, files : `list` [`int`]
 
  143            The pixel ids that were written to.
 
  149        matchedPixels = self.
indexer.indexPoints(inputData[self.
config.ra_name],
 
  150                                                 inputData[self.
config.dec_name])
 
  151        pixel_ids = 
set(matchedPixels)
 
  152        for pixelId 
in pixel_ids:
 
  153            with fileLocks[pixelId]:
 
  154                self.
_doOnePixel(inputData, matchedPixels, pixelId, fluxes, coordErr)
 
  155        with FILE_PROGRESS.get_lock():
 
  156            oldPercent = 100 * FILE_PROGRESS.value / self.
nInputFiles 
  157            FILE_PROGRESS.value += 1
 
  158            percent = 100 * FILE_PROGRESS.value / self.
nInputFiles 
  160            if np.floor(percent) - np.floor(oldPercent) >= 1:
 
  161                self.
log.info(
"Completed %d / %d files: %d %% complete ",
 
  167    def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr):
 
  168        """Process one HTM pixel, appending to an existing catalog or creating 
  169        a new catalog, as needed.
 
  173        inputData : `numpy.ndarray`
 
  174            The data 
from one input file.
 
  175        matchedPixels : `numpy.ndarray`
 
  176            The row-matched pixel indexes corresponding to ``inputData``.
 
  178            The pixel index we are currently processing.
 
  179        fluxes : `dict` [`str`, `numpy.ndarray`]
 
  180            The values that will go into the flux 
and fluxErr fields 
in the
 
  182        coordErr : `dict` [`str`, `numpy.ndarray`]
 
  183            The values that will go into the coord_raErr, coord_decErr, 
and 
  184            coord_ra_dec_Cov fields 
in the output catalog (
in radians).
 
  186        idx = np.where(matchedPixels == pixelId)[0] 
  188        for outputRow, inputRow 
in zip(catalog[-len(idx):], inputData[idx]):
 
  192        with COUNTER.get_lock():
 
  193            self.
_setIds(inputData[idx], catalog)
 
  196        for name, array 
in fluxes.items():
 
  197            catalog[self.
key_map[name]][-len(idx):] = array[idx]
 
  200        for name, array 
in coordErr.items():
 
  201            catalog[name][-len(idx):] = array[idx]
 
  203        catalog.writeFits(self.
filenames[pixelId])
 
  205    def _setIds(self, inputData, catalog):
 
  206        """Fill the `id` field of catalog with a running index, filling the 
  207        last values up to the length of ``inputData``. 
  209        Fill with `self.
config.id_name` 
if specified, otherwise use the
 
  210        global running counter value.
 
  214        inputData : `numpy.ndarray`
 
  215            The input data that 
is being processed.
 
  217            The output catalog to fill the ids.
 
  220        size = len(inputData)
 
  222            catalog[
'id'][-size:] = inputData[self.
config.id_name]
 
  224            idEnd = COUNTER.value + size
 
  225            catalog[
'id'][-size:] = np.arange(COUNTER.value, idEnd)
 
  226            COUNTER.value = idEnd
 
  229        """Get a catalog from disk or create it if it doesn't exist. 
  234            Identifier for catalog to retrieve
 
  236            Schema to use 
in catalog creation it does 
not exist.
 
  238            The number of new elements that will be added to the catalog,
 
  239            so space can be preallocated.
 
  244            The new 
or read-
and-resized catalog specified by `dataId`.
 
  247        if os.path.isfile(self.
filenames[pixelId]):
 
  248            catalog = afwTable.SimpleCatalog.readFits(self.
filenames[pixelId])
 
  249            catalog.resize(len(catalog) + nNewElements)
 
  250            return catalog.copy(deep=
True)  
 
  252        catalog.resize(nNewElements)
 
  258        """Create an ICRS coord. from a row of a catalog being converted. 
  262        row : `numpy.ndarray` 
  263            Row from catalog being converted.
 
  265            Name of RA key 
in catalog being converted.
 
  267            Name of Dec key 
in catalog being converted.
 
  276    def _getCoordErr(self, inputData, ):
 
  277        """Compute the ra/dec error fields that will go into the output catalog. 
  281        inputData : `numpy.ndarray` 
  282            The input data to compute fluxes for.
 
  286        coordErr : `dict` [`str`, `numpy.ndarray`]
 
  287            The values that will go into the coord_raErr, coord_decErr, fields
 
  288            in the output catalog (
in radians).
 
  292        This does 
not currently handle the ra/dec covariance field,
 
  293        ``coord_ra_dec_Cov``. That field may require extra work, 
as its units
 
  294        may be more complicated 
in external catalogs.
 
  297        if hasattr(self, 
"coord_err_unit"):
 
  298            result[
'coord_raErr'] = u.Quantity(inputData[self.
config.ra_err_name],
 
  300            result[
'coord_decErr'] = u.Quantity(inputData[self.
config.dec_err_name],
 
  304    def _setFlags(self, record, row):
 
  305        """Set flags in an output record. 
  310            Row from indexed catalog to modify.
 
  311        row : `numpy.ndarray`
 
  312            Row 
from catalog being converted.
 
  314        names = record.schema.getNames() 
  317                attr_name = 
'is_{}_name'.format(flag)
 
  318                record.set(self.
key_map[flag], bool(row[getattr(self.
config, attr_name)]))
 
  320    def _getFluxes(self, inputData):
 
  321        """Compute the flux fields that will go into the output catalog. 
  325        inputData : `numpy.ndarray` 
  326            The input data to compute fluxes for.
 
  330        fluxes : `dict` [`str`, `numpy.ndarray`]
 
  331            The values that will go into the flux 
and fluxErr fields 
in the
 
  335        for item 
in self.
config.mag_column_list:
 
  336            result[item+
'_flux'] = (inputData[item]*u.ABmag).to_value(u.nJy)
 
  337        if len(self.
config.mag_err_column_map) > 0:
 
  338            for err_key 
in self.
config.mag_err_column_map.keys():
 
  339                error_col_name = self.
config.mag_err_column_map[err_key]
 
  342                fluxErr = fluxErrFromABMagErr(inputData[error_col_name].copy(),
 
  343                                              inputData[err_key].copy())*1e9
 
  344                result[err_key+
'_fluxErr'] = fluxErr
 
  347    def _setProperMotion(self, record, row):
 
  348        """Set proper motion fields in a record of an indexed catalog. 
  350        The proper motions are read from the specified columns,
 
  351        scaled appropriately, 
and installed 
in the appropriate
 
  352        columns of the output.
 
  357            Row 
from indexed catalog to modify.
 
  358        row : structured `numpy.array`
 
  359            Row 
from catalog being converted.
 
  361        if self.
config.pm_ra_name 
is None:  
 
  363        radPerOriginal = np.radians(self.
config.pm_scale)/(3600*1000)
 
  364        record.set(self.
key_map[
"pm_ra"], row[self.
config.pm_ra_name]*radPerOriginal*lsst.geom.radians)
 
  365        record.set(self.
key_map[
"pm_dec"], row[self.
config.pm_dec_name]*radPerOriginal*lsst.geom.radians)
 
  367        if self.
config.pm_ra_err_name 
is not None:  
 
  368            record.set(self.
key_map[
"pm_raErr"], row[self.
config.pm_ra_err_name]*radPerOriginal)
 
  369            record.set(self.
key_map[
"pm_decErr"], row[self.
config.pm_dec_err_name]*radPerOriginal)
 
  371    def _setParallax(self, record, row):
 
  372        """Set the parallax fields in a record of a refcat. 
  374        if self.
config.parallax_name 
is None:
 
  376        scale = self.
config.parallax_scale*lsst.geom.milliarcseconds
 
  377        record.set(self.
key_map[
'parallax'], row[self.
config.parallax_name]*scale)
 
  378        record.set(self.
key_map[
'parallaxErr'], row[self.
config.parallax_err_name]*scale)
 
  380    def _epochToMjdTai(self, nativeEpoch):
 
  381        """Convert an epoch in native format to TAI MJD (a float). 
  383        return astropy.time.Time(nativeEpoch, format=self.
config.epoch_format,
 
  384                                 scale=self.
config.epoch_scale).tai.mjd
 
  386    def _setExtra(self, record, row):
 
  387        """Set extra data fields in a record of an indexed catalog. 
  392            Row from indexed catalog to modify.
 
  393        row : structured `numpy.array`
 
  394            Row 
from catalog being converted.
 
  396        for extra_col 
in self.
config.extra_col_names:
 
  397            value = row[extra_col]
 
  405            if isinstance(value, np.str_):
 
  407            record.set(self.
key_map[extra_col], value)
 
  409    def _fillRecord(self, record, row):
 
  410        """Fill a record in an indexed catalog to be persisted. 
  415            Row from indexed catalog to modify.
 
  416        row : structured `numpy.array`
 
  417            Row 
from catalog being converted.
 
  428    """Special-case convert manager to deal with Gaia fluxes. 
  430    def _getFluxes(self, input):
 
  433        def gaiaFluxToFlux(flux, zeroPoint):
 
  434            """Equations 5.19 and 5.30 from the Gaia calibration document define the 
  435            conversion from Gaia electron/second fluxes to AB magnitudes.
 
  436            https://gea.esac.esa.int/archive/documentation/GDR2/Data_processing/chap_cu5pho/sec_cu5pho_calibr/ssec_cu5pho_calibr_extern.html
 
  438            result = ((zeroPoint + -2.5 * np.log10(flux))*u.ABmag).to_value(u.nJy) 
  440            result[flux == 0] = 0
 
  445        with np.errstate(invalid=
'ignore', divide=
'ignore'):
 
  448            result[
'phot_g_mean_flux'] = gaiaFluxToFlux(input[
'phot_g_mean_flux'], 25.7934)
 
  449            result[
'phot_bp_mean_flux'] = gaiaFluxToFlux(input[
'phot_bp_mean_flux'], 25.3806)
 
  450            result[
'phot_rp_mean_flux'] = gaiaFluxToFlux(input[
'phot_rp_mean_flux'], 25.1161)
 
  452        result[
'phot_g_mean_fluxErr'] = result[
'phot_g_mean_flux'] / input[
'phot_g_mean_flux_over_error']
 
  453        result[
'phot_bp_mean_fluxErr'] = result[
'phot_bp_mean_flux'] / input[
'phot_bp_mean_flux_over_error']
 
  454        result[
'phot_rp_mean_fluxErr'] = result[
'phot_rp_mean_flux'] / input[
'phot_rp_mean_flux_over_error']
 
A class used as a handle to a particular field in a table.
Defines the fields and offsets for a table.
Record class that must contain a unique ID field and a celestial coordinate field.
Custom catalog class for record/table subclasses that are guaranteed to have an ID,...
Point in an unspecified spherical coordinate system.
This static class includes a variety of methods for interacting with the the logging module.
def _setFlags(self, record, row)
def _epochToMjdTai(self, nativeEpoch)
def _setExtra(self, record, row)
def _setIds(self, inputData, catalog)
def _fillRecord(self, record, row)
def __init__(self, filenames, config, file_reader, indexer, schema, key_map, htmRange, addRefCatMetadata, log)
def computeCoord(row, ra_name, dec_name)
def _getFluxes(self, inputData)
def _setProperMotion(self, record, row)
def _getCoordErr(self, inputData)
def _doOnePixel(self, inputData, matchedPixels, pixelId, fluxes, coordErr)
def _setParallax(self, record, row)
def getCatalog(self, pixelId, schema, nNewElements)
def _convertOneFile(self, filename, fileLocks)
daf::base::PropertySet * set