Core Concepts ============= .. verified:: 2025-10-16 :reviewer: Christof Buchbender The Data Transfer System is built on several foundational concepts that work together to enable automated, distributed data management. This section explains each concept and how they relate. Sites ----- A **Site** represents a physical location where data can be stored and processed. Sites are geographically distributed across the CCAT collaboration. **Current Sites:** * **CCAT (Chile)**: Observatory site where data originates * **Cologne (Germany)**: Primary development and long-term archive site * **Optionally others e.g. Cornell (USA)**: Optional other sites e.g. US-based long-term archive and processing site **Database Model**: :py:class:`ccat_ops_db.models.Site` **Key Attributes:** * ``name``: Full site name (e.g., "CCAT Observatory") * ``short_name``: Used in queue names (e.g., "ccat", "cologne", "us") * ``location``: Geographic information for coordination **Purpose:** Sites group related storage locations and provide geographic context for routing decisions. When data needs to be replicated, the system ensures copies exist at multiple sites for redundancy. Data Locations -------------- A **DataLocation** represents a specific storage location at a site. Each location has a type that defines its role in the data pipeline. **Database Model**: :py:class:`ccat_ops_db.models.DataLocation` Location Types ~~~~~~~~~~~~~~ :py:class:`ccat_ops_db.models.LocationType` defines four types: **SOURCE** Where data originates (e.g. telescope instrument computers) * Raw data files created here by instruments * Packaged and moved to buffers * Examples: ``primecam_raw_data``, ``chai_raw_data`` **BUFFER** Intermediate staging areas for transfers * Aggregation point before site-to-site transfers * Temporary storage during pipeline processing * Each site typically has one or more buffers * Failover is supported by having multiple buffers with different priorities * Examples: ``output_buffer``, ``input_buffer`` **LONG_TERM_ARCHIVE** Permanent data storage locations * Final destination for all data * Multiple copies across sites for redundancy * May use S3, tape, or high-capacity disk storage * Supported storage is S3 * Examples: ``long_term_archive`` **PROCESSING** Temporary locations for scientific data analysis * Data staged here from archives when needed * Cleaned up after processing completes * Examples: ``ramses_processing`` Storage Technologies ~~~~~~~~~~~~~~~~~~~~ Each DataLocation uses a specific storage technology: **DiskDataLocation** :py:class:`ccat_ops_db.models.DiskDataLocation` * Traditional filesystem storage (local or network-mounted) * Example: ``/data/ccat/buffer`` on Cologne servers **S3DataLocation** :py:class:`ccat_ops_db.models.S3DataLocation` * Object storage (e.g. AWS S3 or compatible) * Example: CCAT Project on DataStorage.NRW **TapeDataLocation** :py:class:`ccat_ops_db.models.TapeDataLocation` * Tape library systems (future capability) * Not currently implemented Priority and Failover ~~~~~~~~~~~~~~~~~~~~~ Multiple locations of the same type can exist at a site: .. code-block:: python # Example: Multiple buffers at Cologne cologne_buffer_primary (priority=1, active=True) cologne_buffer_secondary (priority=2, active=True) cologne_buffer_backup (priority=3, active=False) The system uses: * **Priority** (lower number = higher priority): Determines which location to use first * **Active** flag: Allows temporarily disabling locations for maintenance Implementation: :py:meth:`ccat_data_transfer.queue_discovery.QueueDiscoveryService.get_primary_buffer_for_site` Data Packages ------------- Raw data files are grouped into :py:class:`ccat_ops_db.models.RawDataPackage` for efficient transfer and management. RawDataFile ~~~~~~~~~~~ :py:class:`ccat_ops_db.models.RawDataFile` Individual files created by instruments: * Original filename and size * Checksum for integrity verification * Relationship to observation (ExecutedObsUnit) * Relationship to instrument component (InstrumentModule) RawDataPackage ~~~~~~~~~~~~~~ :py:class:`ccat_ops_db.models.RawDataPackage` Logical grouping of related raw data files: * Typically all files from one observation * Created at SOURCE locations by :py:mod:`ccat_data_transfer.raw_data_package_manager` * Archived as tar files for efficient storage and transfer * Tracked independently through pipeline stages **States**: :py:class:`ccat_ops_db.models.Status` * ``PENDING``: Exists only at source, not yet packaged * ``SCHEDULED``: Packaging scheduled for celery task * ``IN_PROGRESS``: Packaging in progress, i.e. celery task is running * ``COMPLETED``: Packaging completed, i.e. celery task completed successfully * ``FAILED``: Packaging failed, i.e. celery task failed DataTransferPackage ~~~~~~~~~~~~~~~~~~~ :py:class:`ccat_ops_db.models.DataTransferPackage` Temporary container bundling multiple RawDataPackages for efficient transfer: * Aggregates packages up to a size limit (e.g., 100 GB) * Created at BUFFER locations by :py:mod:`ccat_data_transfer.data_transfer_package_manager` * Tar archive containing multiple RawDataPackage tar files * Deleted after successful unpacking at destination **Purpose**: Optimize network efficiency by amortizing transfer overhead across many packages. Physical Copy Tracking ~~~~~~~~~~~~~~~~~~~~~~ :py:class:`ccat_ops_db.models.PhysicalCopy` (and subclasses) Tracks where each file/package physically exists: * Every file in the system has PhysicalCopy records for each location it exists at * Status indicates current state (PRESENT, DELETED, etc.) * Enables complete audit trail of data movement * Used by deletion manager to determine cleanup eligibility Example: .. code-block:: python # RawDataPackage "obs_001" exists at three locations PhysicalCopy(package=obs_001, location=ccat_buffer, status=DELETED) PhysicalCopy(package=obs_001, location=cologne_lta, status=PRESENT) PhysicalCopy(package=obs_001, location=cornell_lta, status=PRESENT) Operations ---------- An **Operation** is a unit of work performed by the data transfer system. Operations are defined by :py:class:`ccat_data_transfer.operation_types.OperationType`. Operation Types ~~~~~~~~~~~~~~~ .. list-table:: :header-rows: 1 :widths: 30 50 20 * - Operation - Description - Primary Location * - ``RAW_DATA_PACKAGE_CREATION`` - Package raw files into tar archives - SOURCE * - ``DATA_TRANSFER_PACKAGE_CREATION`` - Bundle packages for transfer - BUFFER * - ``DATA_TRANSFER`` - Move packages between sites - BUFFER * - ``DATA_TRANSFER_UNPACKING`` - Extract transferred archives - BUFFER * - ``LONG_TERM_ARCHIVE_TRANSFER`` - Move to permanent storage - LTA * - ``STAGING`` - Copy to processing locations - PROCESSING * - ``DELETION`` - Remove temporary files - ALL * - ``MONITORING`` - Disk usage and health checks - ALL Operation-Location Matrix ~~~~~~~~~~~~~~~~~~~~~~~~~~ Not all operations apply to all location types. The system uses :py:data:`ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` to determine which operations are valid for each location: .. code-block:: text SOURCE locations: • RAW_DATA_PACKAGE_CREATION • DELETION • MONITORING BUFFER locations: • DATA_TRANSFER_PACKAGE_CREATION • DATA_TRANSFER • DATA_TRANSFER_UNPACKING • DELETION • MONITORING LONG_TERM_ARCHIVE locations: • LONG_TERM_ARCHIVE_TRANSFER • MONITORING PROCESSING locations: • STAGING • DELETION • MONITORING Managers -------- **Managers** are Python processes that orchestrate the data pipeline by: 1. Scanning the database for work to be done 2. Creating database records for new operations 3. Submitting Celery tasks to appropriate queues 4. Running in loops with configurable sleep intervals **Key Characteristic**: Managers need database access but NOT direct data access. They can run anywhere (typically centrally in Cologne). Manager Examples ~~~~~~~~~~~~~~~~ **Raw Data Package Manager** :py:mod:`ccat_data_transfer.raw_data_package_manager` * Finds unpackaged RawDataFiles * Groups files by observation * Creates RawDataPackage records * Submits packaging tasks to SOURCE location queues **Transfer Manager** :py:mod:`ccat_data_transfer.transfer_manager` * Finds DataTransferPackages ready to transfer * Determines routes using round-robin * Creates DataTransfer records * Submits transfer tasks to BUFFER location queues **Deletion Manager** :py:mod:`ccat_data_transfer.deletion_manager` * Finds packages eligible for deletion * Checks retention policies * Ensures data safely archived before deletion * Submits deletion tasks to appropriate queues Health Checks ~~~~~~~~~~~~~ All managers integrate with :py:class:`ccat_data_transfer.health_check.HealthCheck`: * Register service startup * Send periodic heartbeats * Report when stopping * Enables monitoring of service health Celery Workers -------------- **Celery Workers** are distributed processes that execute actual work: 1. Listen to queues for specific location/operation combinations 2. Perform file operations (copy, checksum, delete, etc.) 3. Update database with results 4. Must run on machines with access to the data location **Key Characteristic**: Workers need direct data access. They run on servers/computers where storage is mounted. Worker Assignment ~~~~~~~~~~~~~~~~~ Workers are assigned to queues by location: .. code-block:: bash # Cologne buffer worker handles all buffer operations celery worker -Q cologne_buffer_data_transfer_package_creation, cologne_buffer_data_transfer, cologne_buffer_data_transfer_unpacking, cologne_buffer_deletion # CCAT telescope worker handles source operations celery worker -Q ccat_telescope_computer_raw_data_package_creation, ccat_telescope_computer_deletion Task Implementation ~~~~~~~~~~~~~~~~~~~ Workers execute tasks defined as Celery functions. Example from :py:mod:`ccat_data_transfer.raw_data_package_manager`: .. code-block:: python @app.task(base=make_celery_task()) def create_raw_data_package(package_id, verbose=False): """ Create tar archive from raw data files. Updates database with checksum and status. """ # Implementation details... All tasks inherit from :py:func:`ccat_data_transfer.setup_celery_app.make_celery_task()` which provides: * Automatic session management * Heartbeat tracking * Retry logic * Error handling Queues ------ **Queues** are named channels in Redis where tasks are placed for workers to consume. Queue names are automatically generated from the database. Queue Naming Convention ~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: text {site_short_name}_{data_location_name}_{operation_type} Examples: ccat_telescope_computer_raw_data_package_creation cologne_buffer_data_transfer cologne_lta_long_term_archive_transfer ramses_processing_staging This convention: * Makes queue purpose self-documenting * Enables automatic worker assignment * Prevents configuration drift * Simplifies debugging (queue name tells you exactly what it does) Queue Discovery ~~~~~~~~~~~~~~~ :py:class:`ccat_data_transfer.queue_discovery.QueueDiscoveryService` Automatically discovers queues from the database: 1. Query all active :py:class:`~ccat_ops_db.models.DataLocation` records 2. For each location, determine applicable operations from :py:data:`~ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` 3. Generate queue names using the naming convention 4. Configure Celery routing to map tasks to queues This happens at: * Application startup * When new locations are added to the database * When the ``list-queues`` CLI command is run Implementation: :py:meth:`ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues` Routes and Routing ------------------ **Routes** define how data flows between sites and locations. Automatic Route Discovery ~~~~~~~~~~~~~~~~~~~~~~~~~ The system discovers routes by analyzing site topology: **Primary Routes** (SOURCE → LTA) From any SOURCE site to all LTA sites * Created automatically by :py:func:`ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes` * Distributes data from telescope to archives * Uses round-robin for load balancing **Secondary Routes** (LTA → LTA) Between all LTA sites * Created automatically by :py:func:`ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes` * Ensures redundant copies at all archive sites * Also uses round-robin Round-Robin State ~~~~~~~~~~~~~~~~~ Round-robin distribution state is tracked in Redis: .. code-block:: python redis_key = f"round_robin:{source_site.short_name}" last_index = redis.get(redis_key) next_index = (last_index + 1) % num_lta_sites redis.set(redis_key, next_index) This ensures even distribution across LTA sites over time while surviving service restarts. Custom Routes ~~~~~~~~~~~~~ :py:class:`ccat_ops_db.models.DataTransferRoute` Manual route overrides for special cases: * ``DIRECT``: Skip buffers, transfer directly to destination * ``RELAY``: Route through intermediate site * ``CUSTOM``: Specific location-to-location override These are not used in production currently but available for operational flexibility. Data Flow Summary ----------------- Putting it all together: .. mermaid:: flowchart TD A[Instrument creates file] --> B[Manager detects file] B --> C[Manager creates package] C --> D[Worker transfer to BUFFER] D --> E[Manager creates transfer] E --> F[Manager determines route] F --> G[Worker transfers data] G --> H[Manager submits unpack] H --> I[Worker unpacks & verifies] I --> J[Manager submits archive] J --> K[Worker moves to LTA] K --> L[State: ARCHIVED] L --> M[Cleanup] classDef instrument fill:#fff4e6,stroke:#e65100,stroke-width:2px classDef manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px classDef worker fill:#f3e5f5,stroke:#4a148c,stroke-width:2px classDef archived fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px class A instrument class B,C,E,F,H,J manager class D,G,I,K worker class L archived class M manager Each step involves: * Database state update * Task submission to appropriate queue * Worker execution at correct location * Result verification and recording Next Steps ---------- * :doc:`pipeline` - Detailed look at each of the 7 pipeline stages * :doc:`routing` - Deep dive into queue discovery and task routing * :doc:`monitoring` - How the system tracks health and recovers from failures