Routing & Queue Discovery ========================= .. verified:: 2025-11-05 :reviewer: Christof Buchbender The Data Transfer System uses dynamic queue discovery and intelligent routing to ensure work reaches the right workers without manual configuration. This section explains how the system automatically determines where tasks should execute. Celery tasks can be routed to specific queues. A celery worker can be configured to listen to specific (one or many) queues. A worker needs physical access to the data it is working with. The Routing Problem ------------------- * Multiple sites (Chile, Germany, USA) with different storage locations * Each location requires specific workers with data access * Operations vary by location type (``SOURCE``, ``BUFFER``, ``LTA``, ``PROCESSING``) * New sites and locations added without code changes * Workers may come online/offline dynamically Queue Naming Convention ----------------------- Queue names follow a strict three-part convention: .. code-block:: text {site_short_name}_{data_location_name}_{operation_type} Components: • site_short_name: From Site.short_name (e.g., "ccat", "cologne", "us") • data_location_name: From DataLocation.name (e.g., "buffer", "lta", "telescope_computer") • operation_type: From OperationType enum (e.g., "raw_data_package_creation") **Examples**: .. code-block:: text ccat_telescope_computer_raw_data_package_creation └─┬┘ └──────────┬─────┘ └──────────┬────────────┘ Site Location Name Operation Type cologne_buffer_data_transfer cologne_buffer_data_transfer_unpacking cologne_buffer_deletion cologne_lta_long_term_archive_transfer us_lta_long_term_archive_transfer ramses_processing_staging **Benefits**: * **Self-Documenting**: Queue name tells you exactly what it does and where * **No Configuration Drift**: Queues generated from database, always in sync * **Predictable**: Easy to determine queue name for any location/operation * **Debuggable**: Logs show clear queue names Queue Discovery Service ----------------------- :py:class:`ccat_data_transfer.queue_discovery.QueueDiscoveryService` This service automatically discovers all required queues from the database. Discovery Algorithm ~~~~~~~~~~~~~~~~~~~ **Implementation**: :py:meth:`~ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues` .. literalinclude:: ../../ccat_data_transfer/queue_discovery.py :pyobject: QueueDiscoveryService.discover_all_queues :language: python **Operation Matrix**: The mapping :py:data:`ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` defines which operations apply to each location type: .. literalinclude:: ../../ccat_data_transfer/operation_types.py :start-after: # Define operations by location type :end-before: # End of QUEUE_OPERATIONS_BY_LOCATION_TYPE :language: python This ensures: * Only valid operations for each location type * Consistent across all sites * Easy to add new operation types Celery Configuration ~~~~~~~~~~~~~~~~~~~~ Queue discovery happens at application startup in :py:mod:`ccat_data_transfer.setup_celery_app`: .. literalinclude:: ../../ccat_data_transfer/setup_celery_app.py :pyobject: configure_dynamic_queues :language: python This happens: * When workers start * When managers start * When the CLI command ``list-queues`` runs How Routing Integrates with Managers ------------------------------------ The routing system is called by managers at key decision points in the data pipeline: **1. RawDataPackage Creation** When :py:func:`~ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service` creates a package, it determines the queue: .. code-block:: python # In raw_data_package_manager.py source_location = package.source_location queue = route_task_by_location( OperationType.RAW_DATA_PACKAGE_CREATION, source_location ) create_raw_data_package.apply_async( args=[package.id], queue=queue ) **2. DataTransferPackage Assembly** When ``data_transfer_package_manager.py`` completes a package, it creates primary transfers: .. code-block:: python # In data_transfer_package_manager.py def handle_completed_package(session, package, source_buffer): # Discover routes and pick LTA site via round-robin create_primary_data_transfers(session, package, source_buffer) This calls: * :py:func:`~ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes` - Get all SOURCE→LTA routes * :py:func:`~ccat_data_transfer.data_transfer_package_manager.get_next_lta_site_round_robin` - Pick next LTA site for this SOURCE * :py:func:`~ccat_data_transfer.data_transfer_package_manager.get_primary_buffer_for_site` - Get destination buffer * :py:func:`~ccat_data_transfer.queue_discovery.route_task_by_location` - Determine queue for transfer task **3. Secondary Replication** The :py:func:`~ccat_data_transfer.data_transfer_package_manager.create_secondary_data_transfers` service periodically runs: .. code-block:: python # Scheduled task in data-transfer-package-manager create_secondary_data_transfers(session) This: * Calls :py:func:`~ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes` - Get all LTA↔LTA routes * Scans all completed packages * Creates transfers to LTA sites that don't have the package yet **Key Insight**: Managers never hard-code queue names or routing decisions. They always: 1. Query the database for current sites/locations 2. Call routing functions to determine queues 3. Submit tasks to dynamically determined queues This separation allows the routing logic to evolve without changing manager code. Task Routing ------------ Once queues exist, tasks must be routed to the appropriate queue. Route Determination ~~~~~~~~~~~~~~~~~~~ :py:func:`ccat_data_transfer.queue_discovery.route_task_by_location` Given an operation and data location, determine the queue: .. literalinclude:: ../../ccat_data_transfer/queue_discovery.py :pyobject: route_task_by_location :language: python **Usage in Managers**: .. code-block:: python # In raw_data_package_manager.py # Find location where package will be created source_location = package.source_location # Determine queue queue = route_task_by_location( OperationType.RAW_DATA_PACKAGE_CREATION, source_location ) # Submit task create_raw_data_package.apply_async( args=[package.id], queue=queue ) This ensures: * Task sent to queue matching worker's location * Worker has access to necessary storage * No hard-coded queue names in manager code Transfer Task Selection ~~~~~~~~~~~~~~~~~~~~~~~ Transfer operations are special - they involve two locations (source and destination). The system selects the appropriate transfer task based on storage types. :py:func:`ccat_data_transfer.queue_discovery.get_transfer_task` .. literalinclude:: ../../ccat_data_transfer/queue_discovery.py :pyobject: get_transfer_task :language: python **Routing for Transfers**: Transfers route to the **origin** location's queue: .. code-block:: python # Transfer task executes at origin (push model) queue = route_task_by_location( OperationType.DATA_TRANSFER, origin_location ) This makes sense because: * Origin worker has source data to push * Origin controls transfer initiation * Destination worker handles unpacking separately Worker Assignment ----------------- Workers must bind to queues for locations they can access. Manual Queue Selection ~~~~~~~~~~~~~~~~~~~~~~ When starting a worker, specify queues explicitly: .. code-block:: bash # Worker on Cologne buffer server celery -A ccat_data_transfer.setup_celery_app worker \ -Q cologne_buffer_data_transfer_package_creation,\ cologne_buffer_data_transfer,\ cologne_buffer_data_transfer_unpacking,\ cologne_buffer_deletion # Worker on CCAT telescope computer celery -A ccat_data_transfer.setup_celery_app worker \ -Q ccat_telescope_computer_raw_data_package_creation,\ ccat_telescope_computer_deletion Helper CLI Commands ~~~~~~~~~~~~~~~~~~~ The system provides CLI commands to assist with worker assignment. **List All Queues**: .. code-block:: bash ccat_data_transfer list-queues **List Queues for Specific Location**: .. code-block:: bash ccat_data_transfer list-queues cologne_buffer Site-to-Site Routing -------------------- For inter-site transfers, the system must determine which sites should exchange data. Route Discovery ~~~~~~~~~~~~~~~ :py:func:`ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes` Discovers primary routes from SOURCE sites to LTA sites: .. literalinclude:: ../../ccat_data_transfer/data_transfer_package_manager.py :pyobject: discover_automatic_routes :language: python **Example**: Given: * SOURCE sites: CCAT (Chile) * LTA sites: Cologne (Germany), Cornell (USA) Routes discovered: * CCAT → Cologne * CCAT → Cornell Secondary Routes ~~~~~~~~~~~~~~~~ :py:func:`ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes` After primary distribution, packages replicate between LTA sites to ensure redundancy: .. literalinclude:: ../../ccat_data_transfer/data_transfer_package_manager.py :pyobject: discover_secondary_routes :language: python **Example**: Given LTA sites: Cologne, Cornell Routes discovered: * Cologne → Cornell * Cornell → Cologne **How Secondary Transfers Work**: Unlike primary transfers (which use round-robin), secondary transfers are **opportunistic**: 1. The ``create_secondary_data_transfers()`` function periodically scans all completed DataTransferPackages 2. For each package, it checks which LTA sites already have it (by looking at physical copies) 3. It creates transfers from sites that **have** the package to sites that **don't have** it 4. Over time, this ensures every LTA site gets every package without explicit scheduling This approach is more resilient than round-robin because: * Works even if some LTA sites are offline temporarily * Self-heals if transfers fail - next cycle will retry * No state to track (unlike round-robin index) * Naturally prioritizes packages that have the fewest copies **Example Flow**: .. code-block:: text T=0: Package arrives at Cologne (from CCAT via primary transfer) T=1: create_secondary_data_transfers() runs → Sees Cologne has package, Cornell doesn't → Creates transfer: Cologne → Cornell T=2: Transfer completes, both sites now have package T=3: create_secondary_data_transfers() runs again → Both sites have it, no new transfers created Round-Robin Distribution ------------------------ To balance load across LTA sites, the system distributes **primary transfers** using round-robin. **Note**: Round-robin is **only used for primary transfers** (SOURCE → LTA). Secondary transfers (LTA → LTA) use opportunistic replication instead. Algorithm ~~~~~~~~~ :py:func:`ccat_data_transfer.data_transfer_package_manager.get_next_lta_site_round_robin` For a SOURCE site with multiple LTA destinations: .. literalinclude:: ../../ccat_data_transfer/data_transfer_package_manager.py :pyobject: get_next_lta_site_round_robin :language: python **Example Sequence**: Given LTA sites: [Cologne, Cornell] .. code-block:: text Transfer 1: CCAT → Cologne (index 0) Transfer 2: CCAT → Cornell (index 1) Transfer 3: CCAT → Cologne (index 0) Transfer 4: CCAT → Cornell (index 1) ... This ensures: * Even distribution over time * No single site overwhelmed * Fair to all LTA sites * State survives service restarts (Redis persistence) **Redis Key Format**: The round-robin state is stored in Redis with the key: .. code-block:: text round_robin:source:{site_short_name} Examples: round_robin:source:ccat round_robin:source:apex Custom Routing (Not Implemented) -------------------------------- The :py:class:`ccat_ops_db.models.DataTransferRoute` model exists in the database schema, but custom routing is **not currently implemented** in the transfer logic. DataTransferRoute Model ~~~~~~~~~~~~~~~~~~~~~~~ :py:class:`ccat_ops_db.models.DataTransferRoute` The :py:class:`ccat_ops_db.models.DataTransferRoute` model allows specifying custom routes, but the :py:func:`~ccat_data_transfer.data_transfer_package_manager.find_route_overrides` function currently only logs these records without acting on them: .. literalinclude:: ../../ccat_data_transfer/data_transfer_package_manager.py :pyobject: find_route_overrides :language: python **Intended Route Types** (when implemented): * ``DIRECT``: Skip intermediate steps, go directly * ``RELAY``: Route through specific intermediate site * ``CUSTOM``: Override normal location selection **Intended Use Cases** (when implemented): * Testing specific routes * Temporary routing during maintenance * Optimizing for specific network paths * Manual intervention during issues **Current Workaround**: To manually control routing today, you must: 1. Temporarily deactivate unwanted LTA sites in the database (set ``active=false``) 2. Let automatic route discovery use remaining sites 3. Reactivate sites when ready This is not ideal and proper custom routing should be implemented for production use. CLI Tools --------- **List All Queues**: .. code-block:: bash ccat_data_transfer list-queues Shows all dynamically discovered queues based on active DataLocations. **List Queues for Specific Location**: .. code-block:: bash ccat_data_transfer list-queues cologne_buffer Shows only queues for the specified location. **List All Locations**: .. code-block:: bash ccat_data_transfer list-locations Output shows sites, locations, and their types: .. code-block:: text **CCAT Observatory - ccat** - telescope_computer (SOURCE) - buffer (BUFFER) **University of Cologne - cologne** - buffer (BUFFER) - lta (LONG_TERM_ARCHIVE) - processing (PROCESSING) **Cornell University - us** - buffer (BUFFER) - lta (LONG_TERM_ARCHIVE) **Note**: There is no ``show-routes`` command. To see routing in action, check the logs from the ``data-transfer-package-manager`` or query the database directly (see Database Queries section below). Database Queries ~~~~~~~~~~~~~~~~ For deeper investigation: .. code-block:: sql -- Show all active locations and their queues SELECT s.short_name as site, dl.name as location, dl.location_type, dl.priority, dl.active FROM data_location dl JOIN site s ON dl.site_id = s.id WHERE dl.active = true ORDER BY s.short_name, dl.location_type, dl.priority; -- Show recent transfers and their routes SELECT dt.id, origin.name as origin, dest.name as destination, dt.status, dt.start_time FROM data_transfer dt JOIN data_location origin ON dt.origin_location_id = origin.id JOIN data_location dest ON dt.destination_location_id = dest.id ORDER BY dt.start_time DESC LIMIT 100; -- Show automatic routes discovered for a specific source site SELECT DISTINCT source_site.short_name as source, lta_site.short_name as lta_destination FROM site source_site CROSS JOIN site lta_site WHERE EXISTS ( SELECT 1 FROM data_location dl1 WHERE dl1.site_id = source_site.id AND dl1.location_type = 'SOURCE' AND dl1.active = true ) AND EXISTS ( SELECT 1 FROM data_location dl2 WHERE dl2.site_id = lta_site.id AND dl2.location_type = 'LONG_TERM_ARCHIVE' AND dl2.active = true ) AND source_site.id != lta_site.id; Adding New Sites/Locations --------------------------- The beauty of database-driven routing: adding sites requires no code changes. Process ~~~~~~~ 1. **Add Site to Database**: .. code-block:: python new_site = Site( name="New Observatory", short_name="newobs", location="Antarctica" ) session.add(new_site) 2. **Add Data Locations**: .. code-block:: python source = DiskDataLocation( name="telescope_computer", site=new_site, location_type=LocationType.SOURCE, path="/data/raw", active=True, priority=1 ) buffer = DiskDataLocation( name="buffer", site=new_site, location_type=LocationType.BUFFER, path="/data/buffer", active=True, priority=1 ) session.add_all([source, buffer]) 3. **Restart Services**: Queue discovery runs at startup, will find new locations 4. **Start Workers**: .. code-block:: bash # On new site's servers celery worker -Q newobs_telescope_computer_raw_data_package_creation,\ newobs_telescope_computer_deletion,\ newobs_buffer_data_transfer_package_creation,\ newobs_buffer_data_transfer 5. **Routes Automatically Created**: * ``discover_automatic_routes`` finds new SOURCE→LTA routes * ``discover_secondary_routes`` adds LTA←→new_lta routes (if LTA location added) * Round-robin includes new site in rotation That's it! No code changes, just configuration. Troubleshooting Routing ------------------------ **Problem**: Tasks not being processed **Checks**: 1. Is the location active? ``SELECT * FROM data_location WHERE name = '...'`` 2. Does a worker exist for this queue? ``celery inspect active_queues`` 3. Is the queue name correct? ``ccat_data_transfer list-queues`` 4. Is the operation valid for this location type? Check ``QUEUE_OPERATIONS_BY_LOCATION_TYPE`` **Problem**: Tasks going to wrong queue **Checks**: 1. Verify queue name generation logic 2. Check ``route_task_by_location`` call site 3. Examine manager logs for routing decisions 4. Ensure database has correct site/location names **Problem**: Uneven distribution across LTA sites (primary transfers) **Checks**: 1. Verify round-robin state in Redis: ``redis-cli GET round_robin:source:ccat`` 2. Check that all LTA sites are active 3. Examine recent transfers: are they balanced? 4. Reset round-robin if needed: ``redis-cli DEL round_robin:source:ccat`` **Problem**: Packages not replicating to all LTA sites (secondary transfers) **Checks**: 1. Verify secondary routes are discovered: Check logs for ``discover_secondary_routes`` 2. Check if ``create_secondary_data_transfers()`` is running periodically 3. Verify all LTA sites have active buffer locations 4. Check for failed transfers: ``SELECT * FROM data_transfer WHERE status = 'FAILED'`` 5. Look for packages stuck with physical copies at only one LTA site: .. code-block:: sql -- Find packages that exist at fewer than all LTA sites SELECT dtp.id, dtp.name, COUNT(DISTINCT dl.site_id) as sites_with_copy FROM data_transfer_package dtp JOIN data_transfer_package_physical_copy pc ON pc.data_transfer_package_id = dtp.id JOIN data_location dl ON pc.data_location_id = dl.id WHERE dl.location_type = 'BUFFER' AND pc.status = 'COMPLETED' GROUP BY dtp.id, dtp.name HAVING COUNT(DISTINCT dl.site_id) < ( SELECT COUNT(DISTINCT site_id) FROM data_location WHERE location_type = 'LONG_TERM_ARCHIVE' AND active = true ); Next Steps ---------- * :doc:`monitoring` - Health checks and observability * :doc:`lifecycle` - Deletion policies and data lifecycle management * :doc:`philosophy` - Why routing works this way