Source code for ccat_ops_db.utils

import logging
from datetime import datetime

import sqlalchemy
from astropy import units as u
from astropy.coordinates import SkyCoord
from sqlalchemy import exc
import json

from . import models


def add_source(session, values, source_type=None):
    log = logging.getLogger(__name__)
    log.debug("Adding or updating source: %s", values)

    if source_type is None or source_type == "fixed_source":
        values["type"] = "fixed_source"
        source_model = models.FixedSource

        if values["frame"] == "icrs":
            coord = SkyCoord(
                values["slam"],
                values["sbet"],
                unit=(u.hourangle, u.deg),
                frame=values["frame"],
            )
            values["ra_deg"], values["dec_deg"] = float(coord.ra.deg), float(
                coord.dec.deg
            )
        if values["frame"] == "galactic":
            coord = SkyCoord(
                values["slam"],
                values["sbet"],
                unit=(u.deg, u.deg),
                frame=values["frame"],
            )
            equatorial = coord.transform_to("icrs")
            values["ra_deg"], values["dec_deg"] = float(equatorial.ra.deg), float(
                equatorial.dec.deg
            )
    elif source_type == "solar_system_object":
        values["type"] = source_type
        source_model = models.SolarSystemObject

    elif source_type == "constant_elevation_source":
        values["type"] = source_type
        source_model = models.ConstantElevationSource
        coord_min = SkyCoord(
            values["slam_min"],
            values["sbet_min"],
            unit=(u.hourangle, u.deg),
            frame=values["frame"],
        )
        values["ra_deg_min"], values["dec_deg_min"] = float(coord_min.ra.deg), float(
            coord_min.dec.deg
        )
        coord_max = SkyCoord(
            values["slam_max"],
            values["sbet_max"],
            unit=(u.hourangle, u.deg),
            frame=values["frame"],
        )
        values["ra_deg_max"], values["dec_deg_max"] = float(coord_max.ra.deg), float(
            coord_max.dec.deg
        )

    try:
        # Check if a source with the given name already exists
        try:
            source = (
                session.query(source_model)
                .filter_by(
                    name=values["name"],
                    type=values["type"],
                )
                .one()
            )

        except sqlalchemy.orm.exc.NoResultFound:
            log.debug("No source found with name %s", values["name"])
            source = None

        if source is not None:
            log.warning("Source found with name %s. Skipping this row", values["name"])

        else:
            # If not, create a new source record
            filtered_values = {}
            for key in values:
                if hasattr(source_model, key.lower()):
                    try:
                        filtered_values[key.lower()] = values[key]
                    except AttributeError:
                        filtered_values[key.lower()] = values[key]

            filtered_values["version"] = 1

            source = source_model(
                **filtered_values,
            )
            session.add(source)

        # Commit the transaction
        session.commit()
        log.debug("Source added or updated successfully.")
    except exc.IntegrityError as e:
        log.error(e)
        # Handle the integrity error (e.g., duplicate name)
        session.rollback()
        log.warning("A record with the name %s already exists.", values.get("name", "<unknown>"))


[docs] def add_inpar_and_tiling( session, observation_configuration, inpar_parameters_list=None, tiling_parameters=None, ): """ Add inpar and tiling parameters to the observation_configuration Parameters ---------- session : database session observation_configuration : ccat.opsdb.observation_configuration Observation configuration with inpar and tiling to be associated inpar_parameters_list : list List of dictionary of inpar_parameters tiling_parameters : list List of dictionary of each line in tiling Returns ------- observation_configuration : ccat.opsdb.observation_configuration Updated observation configuration """ log = logging.getLogger(__name__) if inpar_parameters_list is not None: for inpar_parameters in inpar_parameters_list: # check whether this inpar file already exists inpar_entry = ( session.query(models.ChaiInparParameter) .filter_by(name=inpar_parameters["name"]) .all() ) if len(inpar_entry) == 0: # add as a new one filtered_inpar_parameter_values = {} for key in inpar_parameters: if hasattr(models.ChaiInparParameter, key): filtered_inpar_parameter_values[key] = inpar_parameters[key] filtered_inpar_parameter_values["version"] = 1 chai_inpar_parameter = models.ChaiInparParameter( **filtered_inpar_parameter_values, ) session.add(chai_inpar_parameter) elif len(inpar_entry) == 1: # update with a warning history = inpar_entry[0].history if history is not None: history = json.loads(history) if not isinstance(history, list): history = [history] updateflg = 0 for column in inpar_entry[0].__dict__: column_name = column if column_name in inpar_parameters: if ( getattr(inpar_entry[0], column_name) != inpar_parameters[column_name] ): log.warning( f"inpar parameter {column_name} in {inpar_entry[0].name} is updated from {getattr(inpar_entry[0], column_name)} to {inpar_parameters[column_name]}" ) newhistory = { "timestamp": datetime.now().isoformat(), "key": column_name, "old_value": getattr(inpar_entry[0], column_name), } if history is None: history = [newhistory] else: history.append(newhistory) setattr( inpar_entry[0], column_name, inpar_parameters[column_name], ) updateflg = 1 if updateflg == 1: setattr(inpar_entry[0], "history", json.dumps(history)) ver = inpar_entry[0].version setattr(inpar_entry[0], "version", ver + 1) else: log.error("Two inpar files with same name!") exit(1) if tiling_parameters is not None: for tiling in tiling_parameters: # Fill in the tiling values filtered_tiling_values = {} for key in tiling: if hasattr(models.ChaiTiling, key): filtered_tiling_values[key] = tiling[key] # associate to inpar file try: inpar_parameters = ( session.query(models.ChaiInparParameter) .filter_by(name=tiling["inpar_file"]) .one() ) except sqlalchemy.orm.exc.NoResultFound: print(f"inpar {tiling['inpar_file']} not found") exit(1) filtered_tiling_values["chai_inpar_parameter"] = inpar_parameters filtered_tiling_values["version"] = 1 chai_tiling = models.ChaiTiling( **filtered_tiling_values, ) session.add(chai_tiling) observation_configuration.chai_tilings += [chai_tiling] return observation_configuration
def add_obs_unit( session, values, obs_unit_type="chai_obs_unit", inpar_parameters_list=None, tiling_parameters=None, instrument_module_configurations=None, azimuth_range=None, mapping_parameters=None, ): log = logging.getLogger(__name__) try: # values["source"] = values["name"] # Check if an user with the given name already exists obs_unit = ( session.query(models.ObsUnit) .filter_by( name=values["name"], ) .first() ) if obs_unit: log.warning("ObsUnit found with name %s. Skipping this row.", values["name"]) else: # First we need to check if the observation_configuration is a # chai_observation_configuration or a prime_cam_observation_configuration # and create the appropriate object if obs_unit_type == "chai_obs_unit": # If it is a chai_observation_configuration we need to fill in the # chai_inpar_parameter and chai_tiling observation_configuration = models.ChaiObservationConfiguration( type="chai_observation_configuration", ) session.add(observation_configuration) # Add inpar and tiling parameters to the observation_configuration observation_configuration = add_inpar_and_tiling( session, observation_configuration, inpar_parameters_list=inpar_parameters_list, tiling_parameters=tiling_parameters, ) # Add ntilelines observation_configuration.ntilelines = values["ntilelines"] elif obs_unit_type == "prime_cam_obs_unit": # If it is a prime_cam_observation_configuration observation_configuration = models.PrimeCamObservationConfiguration( type="prime_cam_observation_configuration", ) session.add(observation_configuration) if mapping_parameters: setattr( observation_configuration, "mapping_parameters", mapping_parameters, ) setattr(observation_configuration, "version", 1) # add azimuth range if azimuth_range is not None: setattr(observation_configuration, "azimuth_range", azimuth_range) values["observation_configuration"] = observation_configuration instrument_module_configuration_list = [] primary_instrument_module_configuration = None for instrument_module_configuration in instrument_module_configurations: # Here we have to create or load the module configuration and log them # in the database try: primary = instrument_module_configuration["primary"] except KeyError: primary = False if ( instrument_module_configuration["type"] == "chai_module_configuration" ): instrument = ( session.query(models.Instrument) .filter_by(name=instrument_module_configuration["instrument"]) .one() ) instrument_module = ( session.query(models.InstrumentModule) .filter_by( instrument=instrument, name=instrument_module_configuration["instrument_module"], ) .one() ) line = ( session.query(models.Line) .filter_by(name=instrument_module_configuration["line"]) .one() ) if instrument_module_configuration["if_ghz"] is None: if_ghz = None else: if_ghz = float(instrument_module_configuration["if_ghz"]) try: instrument_module_configuration_db = ( session.query(models.ChaiModuleConfiguration) .filter_by( instrument_module=instrument_module, line=line, if_ghz=if_ghz, ) .one() ) except sqlalchemy.orm.exc.NoResultFound: instrument_module_configuration_db = ( models.ChaiModuleConfiguration( instrument_module=instrument_module, line=line, if_ghz=if_ghz, ) ) session.add(instrument_module_configuration_db) elif ( instrument_module_configuration["type"] == "prime_cam_module_configuration" ): instrument = ( session.query(models.Instrument) .filter_by(name=instrument_module_configuration["instrument"]) .one() ) instrument_module = ( session.query(models.InstrumentModule) .filter_by( instrument=instrument, name=instrument_module_configuration["instrument_module"], ) .one() ) try: instrument_module_configuration_db = ( session.query(models.PrimeCamModuleConfiguration) .filter_by( instrument_module=instrument_module, ) .one() ) except sqlalchemy.orm.exc.NoResultFound: instrument_module_configuration_db = ( models.PrimeCamModuleConfiguration( instrument_module=instrument_module, ) ) session.add(instrument_module_configuration_db) if primary: primary_instrument_module_configuration = ( instrument_module_configuration_db ) instrument_module_configuration_list += [ instrument_module_configuration_db ] # Now we can create the obs_unit filtered_values = {} for key in values: if hasattr(models.ObsUnit, key): # we need to look up the objects for the source, observing_program, # sub_observing_program # from the database and add them to the filtered_values if key == "observing_program": try: observing_program = ( session.query(models.ObservingProgram) .filter_by(short_name=values["observing_program"]) .one() ) except sqlalchemy.orm.exc.NoResultFound: print( "ObservingProgram not found", values["observing_program"], ) observing_program = None filtered_values[key] = observing_program elif key == "source": source = ( session.query(models.Source) .filter_by(name=values["source"]) .one() ) filtered_values[key] = source elif key == "sub_observing_program": if values["sub_observing_program"] == "None": sub_observing_program = None else: try: observing_program = ( session.query(models.ObservingProgram) .filter_by(short_name=values["observing_program"]) .one() ) except sqlalchemy.exc.NoResultFound: log.info( "ObservingProgram: %s not found", values["observing_program"], ) log.debug( "SubObservingProgram: %s", values["sub_observing_program"], ) sub_observing_program = ( session.query(models.SubObservingProgram) .filter_by( short_name=values["sub_observing_program"], observing_program=observing_program, ) .one() ) filtered_values["sub_observing_program"] = ( sub_observing_program ) elif key == "obs_mode": obs_mode = ( session.query(models.ObsMode) .filter_by(name=values["obs_mode"]) .one() ) filtered_values[key] = obs_mode elif key == "observation_configuration": logging.debug("Updating observation_configuration") filtered_values["observation_configuration"] = ( observation_configuration ) else: filtered_values[key] = values[key] filtered_values["instrument_module_configurations"] = ( instrument_module_configuration_list ) filtered_values["primary_instrument_module_configuration"] = ( primary_instrument_module_configuration # None if no primary flagged ) filtered_values["version"] = 1 obs_unit = models.ObsUnit( **filtered_values, ) session.add(obs_unit) # Commit the transaction session.commit() except exc.IntegrityError as e: # Handle the integrity error (e.g., duplicate name) session.rollback() print(e)
[docs] def add_pre_scheduled_slot( session, values, ): log = logging.getLogger(__name__) # add pre scheduled slot filtered_values = {} for key in values: if key == "obsunit_name": # check that the refered ObsUnit exists try: obs_unit = ( session.query(models.ObsUnit) .filter_by(name=values["obsunit_name"]) .one() ) filtered_values["obs_unit"] = obs_unit except sqlalchemy.orm.exc.NoResultFound: log.error("No obsunit found with name %s", values["obsunit_name"]) exit(1) elif key == "start_time" or key == "end_time": if values[key].endswith("Z"): date_str = values[key].replace("Z", "+00:00") else: date_str = values[key] parsed = datetime.fromisoformat(date_str) if parsed.tzinfo is None or parsed.tzinfo.utcoffset(parsed) is None: raise ValueError( f"{key} must be timezone-aware, got {values[key]!r}") filtered_values[key] = parsed else: filtered_values[key] = values[key] slot = models.PreScheduledSlot( **filtered_values, ) session.add(slot) # Commit the transaction session.commit() log.debug("Pre-scheduled slots added successfully.")