System Overview =============== .. verified:: 2025-11-12 :reviewer: Christof Buchbender A comprehensive overview of the ops-db-api system components, their interactions, and data flow patterns. .. contents:: Table of Contents :local: :depth: 2 Component Architecture ---------------------- The system is organized into distinct layers, each with specific responsibilities: Application Layer ~~~~~~~~~~~~~~~~~ **FastAPI Application**: The main application is initialized in ``ccat_ops_db_api/main.py``: .. literalinclude:: ../../ccat_ops_db_api/main.py :language: python :lines: 1-50 :emphasize-lines: 8-25 Key features: * **Lifespan Context Manager**: Handles startup/shutdown of background services * **CORS Middleware**: Enables cross-origin requests from web frontend * **Router Registration**: Mounts all API routers with appropriate prefixes * **WebSocket Support**: Tracks active WebSocket connections globally **Router Organization**: Routers are organized by functional area: .. code-block:: text ccat_ops_db_api/routers/ ├── auth.py # Basic authentication endpoints ├── github_auth.py # GitHub OAuth flow ├── api_tokens.py # API token management ├── user_preferences.py # User settings ├── transfer.py # Data transfer monitoring (UI) ├── obs_unit.py # Observation units (UI) ├── observing_program.py # Observing programs (UI) ├── sources.py # Astronomical sources (UI) ├── visibility.py # Source visibility (UI) ├── instruments.py # Instruments and modules (UI) ├── executed_obs_units.py # Observation execution (Ops) ├── raw_data_files.py # Data file registration (Ops) ├── raw_data_package.py # Data package management (Ops) ├── staging.py # Data staging (Ops) ├── site.py # Site information └── demo.py # Demo/testing endpoints Transaction Buffering Layer ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Site Configuration**: Determines system behavior based on site type: .. literalinclude:: ../../ccat_ops_db_api/transaction_buffering/site_config.py :language: python :lines: 17-29, 83-91, 142-164 :emphasize-lines: 8-10, 25-27 **Transaction Builder**: Constructs multi-step transactions with dependencies: .. literalinclude:: ../../ccat_ops_db_api/transaction_buffering/transaction_builder.py :language: python :lines: 1-50 **Transaction Manager**: Buffers transactions to Redis and manages execution: .. literalinclude:: ../../ccat_ops_db_api/transaction_buffering/transaction_manager.py :language: python :lines: 42-78 **Background Processor**: Continuously processes buffered transactions: .. literalinclude:: ../../ccat_ops_db_api/transaction_buffering/background_processor.py :language: python :lines: 19-43, 69-100 Authentication Layer ~~~~~~~~~~~~~~~~~~~~ **Unified Authentication**: Handles both GitHub OAuth and API tokens: .. literalinclude:: ../../ccat_ops_db_api/auth/unified_auth.py :language: python :lines: 1-50 Database Layer ~~~~~~~~~~~~~~ **Connection Management**: The system maintains separate connection pools for main and local databases: .. code-block:: python # From dependencies.py def get_main_db_session(): """Get async session for main database (writes)""" site_config = get_site_config() engine = create_async_engine( site_config.get_main_database_url() ) return sessionmaker(engine, class_=AsyncSession) def get_local_db_session(): """Get async session for local database (reads)""" site_config = get_site_config() engine = create_async_engine( site_config.get_local_database_url() ) return sessionmaker(engine, class_=AsyncSession) Data Flow Patterns ------------------ UI Request Flow ~~~~~~~~~~~~~~~ Typical flow for a UI read request: 1. **Request arrives** at FastAPI application 2. **CORS middleware** validates origin 3. **Authentication** verifies JWT token 4. **Router** handles the request 5. **Query local database** for data 6. **Format response** with Pydantic schemas 7. **Return JSON** to frontend Example: Transfer Overview .. code-block:: python @router.get("/api/transfer/overview") async def get_transfer_overview( current_user: User = Depends(get_current_user), db: Session = Depends(get_db) ): # Query various transfer-related tables packages = db.query(RawDataPackage).count() transfers = db.query(DataTransfer).filter( DataTransfer.status == "active" ).count() return { "total_packages": packages, "active_transfers": transfers, # ... more stats } Observatory Write Flow (Buffered) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Flow for a critical operation at secondary site: 1. **Request arrives** with API token 2. **Authentication** verifies token and permissions 3. **Router** marked with ``@critical_operation`` decorator 4. **Site config** determines buffering needed (secondary site) 5. **Transaction builder** constructs transaction with pre-gen IDs 6. **Transaction manager** buffers to Redis (LPUSH) 7. **Write-through cache** stores generated IDs 8. **Immediate response** returned to client with UUID 9. **Background processor** later executes on main DB 10. **LSN tracker** monitors replication 11. **Cache cleanup** occurs when replicated Example: Start Observation .. code-block:: python @router.post("/executed_obs_units/start") @critical_operation async def start_observation( obs_data: ExecutedObsUnitCreate, _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder) ): # Build transaction obs_step = _transaction_builder.create( model_class=models.ExecutedObsUnit, data=obs_data.dict(), step_id="create_observation" ) # Return immediately (buffered by decorator) return { "id": obs_step.data["id"], # Pre-generated UUID "status": "buffered" } Read with Buffer Merge Flow ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Flow for reading potentially buffered data: 1. **Request arrives** at read endpoint 2. **Smart query manager** invoked 3. **Parallel queries**: Database + Redis buffer + Redis read buffer 4. **Merge results**: Buffer overrides database (fresher data) 5. **Deduplicate** by record ID 6. **Return merged view** to client Example: Get Executed Observations .. code-block:: python @router.get("/executed_obs_units/{obs_unit_id}") async def get_executed_obs_units( obs_unit_id: int, db: Session = Depends(get_db) ): smart_query = get_smart_query_manager() # Merges DB + buffer + read buffer executed_units = await smart_query.search_records( "ExecutedObsUnit", {"obs_unit_id": obs_unit_id}, limit=100 ) return executed_units WebSocket Flow ~~~~~~~~~~~~~~ Real-time updates via WebSockets: 1. **Client connects** to WebSocket endpoint 2. **Authentication** via query parameter token 3. **Redis pub/sub** subscription created 4. **Initial data** sent immediately 5. **Stream updates** as Redis notifications arrive 6. **Client disconnects** - cleanup subscription Example: Transfer Monitoring .. code-block:: python @router.websocket("/api/transfer/ws/overview") async def websocket_transfer_overview( websocket: WebSocket, token: str = Query(...) ): await websocket.accept() # Subscribe to Redis pub/sub pubsub = redis.pubsub() await pubsub.subscribe("transfer_updates") # Send initial data overview = await get_transfer_overview_data() await websocket.send_json(overview) # Stream updates async for message in pubsub.listen(): if message["type"] == "message": await websocket.send_json(message["data"]) Critical Paths -------------- Observatory Operation Critical Path ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Requirement**: Operation completes in < 100ms regardless of network state **Path**: 1. API receives request (< 1ms) 2. Token validation (< 5ms, cached) 3. Transaction building (< 10ms) 4. Redis LPUSH (< 1ms, in-memory) 5. Response generation (< 1ms) **Total**: < 20ms typical **No network dependency**: All operations are local Background Processing Path ~~~~~~~~~~~~~~~~~~~~~~~~~~~ **Frequency**: Continuous polling (1-second intervals when buffer has data) **Path**: 1. RPOP from Redis buffer (< 1ms) 2. Parse transaction (< 1ms) 3. Execute on main database (50-500ms depending on complexity) 4. Capture LSN (< 5ms) 5. Check replica LSN (< 10ms) 6. Update cache TTL or cleanup (< 5ms) **Total**: 100ms - 1s per transaction LSN Tracking Path ~~~~~~~~~~~~~~~~~ **Trigger**: After each buffered transaction executes **Path**: 1. Capture main DB LSN (< 5ms) 2. Poll replica LSN every 100ms 3. Compare LSNs (< 1ms) 4. If replicated: Cleanup caches 5. If not: Extend cache TTL and retry **Timeout**: 30 seconds default (configurable) Component Interactions ---------------------- Dependency Graph ~~~~~~~~~~~~~~~~ .. mermaid:: graph TB Main[main.py] Deps[dependencies.py] Routers[routers/*] Auth[auth/*] TxBuffer[transaction_buffering/*] Schemas[schemas.py] Models[ccat_ops_db models] Main --> Deps Main --> Routers Routers --> Auth Routers --> TxBuffer Routers --> Schemas Routers --> Deps TxBuffer --> Models Auth --> Models Schemas --> Models Deps --> TxBuffer style Main fill:#90EE90 style TxBuffer fill:#FFD700 style Models fill:#87CEEB Initialization Sequence ~~~~~~~~~~~~~~~~~~~~~~~ Application startup sequence: .. mermaid:: sequenceDiagram participant Main participant Deps participant TxBuffer participant BgProc as Background Processor participant Redis participant DB Main->>Deps: initialize_transaction_buffering() Deps->>Redis: Connect Redis-->>Deps: Connection established Deps->>TxBuffer: Create transaction manager Deps->>BgProc: Create background processor Deps-->>Main: Initialization complete Main->>BgProc: start() BgProc->>Redis: Check buffer BgProc->>DB: Verify connectivity BgProc-->>Main: Started Main->>Main: Register routers Main-->>Main: Ready for requests Configuration Management ------------------------ Environment Variables ~~~~~~~~~~~~~~~~~~~~~ Key configuration sources (in order of precedence): 1. **Environment variables** (highest priority) 2. **``.env`` file** (development) 3. **``config/settings.toml``** (defaults) Critical settings: .. code-block:: bash # Site Configuration SITE_NAME=observatory SITE_TYPE=secondary # or "main" # Database Connections MAIN_DB_HOST=main-db.example.com MAIN_DB_PORT=5432 LOCAL_DB_HOST=localhost LOCAL_DB_PORT=5432 # Redis REDIS_HOST=localhost REDIS_PORT=6379 # Transaction Buffering TRANSACTION_BUFFER_SIZE=1000 TRANSACTION_RETRY_ATTEMPTS=3 TRANSACTION_RETRY_DELAY=5 BACKGROUND_PROCESSING_INTERVAL=1.0 # LSN Tracking LSN_TRACKING_ENABLED=true LSN_CHECK_INTERVAL=0.1 LSN_TIMEOUT=30 Runtime Configuration Access ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python from ccat_ops_db_api.transaction_buffering import get_site_config site_config = get_site_config() # Check site type if site_config.is_secondary_site: # Enable buffering should_buffer = site_config.should_buffer_operation("critical") # Get database URL db_url = site_config.get_database_url("default") # Get Redis keys buffer_key = site_config.get_transaction_buffer_key() # Returns: "site:observatory:transaction_buffer" Monitoring Endpoints -------------------- Health Check ~~~~~~~~~~~~ .. code-block:: bash GET /health Returns system health: .. code-block:: json { "status": "healthy", "database": "connected", "redis": "connected", "transaction_buffer": { "size": 5, "failed": 0 }, "background_processor": "running" } Buffer Statistics ~~~~~~~~~~~~~~~~~ .. code-block:: bash GET /buffer-stats Returns buffering metrics: .. code-block:: json { "pending_transactions": 5, "failed_transactions": 0, "processing_rate": 9.5, "average_execution_time_ms": 125 } Site Information ~~~~~~~~~~~~~~~~ .. code-block:: bash GET /api/site/info Returns site configuration: .. code-block:: json { "site_name": "observatory", "site_type": "secondary", "buffering_enabled": true, "main_db_host": "main-db.example.com", "local_db_host": "localhost" } Summary ------- The ops-db-api system is structured in layers: * **Application Layer**: FastAPI with organized routers * **Transaction Buffering**: Site-aware buffering with LSN tracking * **Authentication**: Unified GitHub OAuth + API tokens * **Database**: Main + replica with streaming replication Key characteristics: * **Async throughout**: AsyncIO, async SQLAlchemy, async Redis * **Site-aware**: Behavior adapts to MAIN vs SECONDARY * **Network resilient**: Buffering ensures operations never block * **Precise tracking**: LSN-based replication monitoring * **Dual purpose**: Serves both UI and observatory needs Next Steps ---------- * :doc:`database-topology` - Database architecture details * :doc:`site-configuration` - Site configuration deep dive * :doc:`authentication-system` - Authentication architecture * :doc:`../deep-dive/transaction-buffering/overview` - Buffering system details