Transaction Buffering Overview =============================== A comprehensive overview of the transaction buffering system architecture, lifecycle, and data structures. .. contents:: Table of Contents :local: :depth: 2 The Problem Statement --------------------- **Scenario**: CCAT observatory at 5600m in Chile needs to record telescope observations. The main database is in Cologne, Germany, 11,000+ km away. Network connectivity is unreliable. **Requirements**: 1. Observations must be recorded immediately (can't wait for network) 2. Data must eventually reach the main database (can't lose observations) 3. Operators need immediate feedback (success/failure) 4. Local reads must reflect buffered writes (consistency) 5. System must know when data has replicated (for cache cleanup) **Traditional approaches fail**: * Direct DB writes: Block on network latency, fail when network down * Local DB with sync: Complex conflict resolution, merge complexity * Fire-and-forget queue: No way to query buffered data, unclear replication state **Our solution**: Transaction buffering with LSN tracking and smart queries. System Architecture ------------------- Complete Component Diagram ~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. mermaid:: graph TB subgraph "Client Layer" Client[Observatory Script] end subgraph "API Layer" Router[FastAPI Router
@critical_operation] Deps[Dependencies] end subgraph "Transaction Building" Builder[Transaction Builder] IDGen[ID Generator] end subgraph "Transaction Management" Manager[Transaction Manager] Redis[(Redis)] Cache[Write-Through Cache] BufCache[Buffered Data Cache] end subgraph "Background Processing" BG[Background Processor] Executor[Transaction Executor] Retry[Retry Logic] end subgraph "Replication Tracking" LSN[LSN Tracker] MainDB[(Main Database
Cologne)] Replica[(Local Replica
Observatory)] end subgraph "Query Layer" Smart[Smart Query Manager] ReadBuf[Read Buffer Manager] end Client --> Router Router --> Deps Deps --> Builder Builder --> IDGen Builder --> Manager Manager --> Redis Manager --> Cache Manager --> BufCache Redis --> BG BG --> Executor Executor --> Retry Executor --> MainDB Executor --> LSN LSN --> Replica MainDB -.WAL Stream.-> Replica Smart --> Replica Smart --> Redis Smart --> ReadBuf Client -.Query.-> Smart style Redis fill:#FFD700 style MainDB fill:#90EE90 style Replica fill:#FFB6C1 Transaction Lifecycle --------------------- Phase 1: Transaction Building ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Client makes request to critical endpoint: .. code-block:: python # Client code response = requests.post( "http://api:8000/executed_obs_units/start", headers={"Authorization": f"Bearer {token}"}, json={ "obs_unit_id": 123, "start_time": "2025-01-01T00:00:00Z", # ... more fields } ) Router receives request: .. code-block:: python @router.post("/executed_obs_units/start") @critical_operation # Decorator enables buffering async def start_observation( obs_data: ExecutedObsUnitCreate, _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder) ): # Build transaction obs_step = _transaction_builder.create( model_class=models.ExecutedObsUnit, data={ "id": uuid.uuid4(), # Pre-generated! **obs_data.dict() }, step_id="create_observation" ) # Decorator handles buffering automatically return {"id": obs_step.data["id"], "status": "buffered"} Transaction builder constructs transaction: * Pre-generates UUIDs for all records * Captures dependencies between steps * Serializes to JSON for Redis storage **Key innovation**: Pre-generated IDs mean client can reference the observation immediately, even though it's not in the database yet. Phase 2: Buffering to Redis ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Transaction manager buffers to Redis: .. code-block:: python # From transaction_manager.py async def buffer_transaction(transaction): # Push to queue await redis.lpush( f"site:{site_name}:transaction_buffer", transaction.model_dump_json() ) # Store status await redis.setex( f"site:{site_name}:transaction:{transaction.transaction_id}", 3600, # 1 hour TTL transaction.model_dump_json() ) # Write-through cache: Store generated IDs for step in transaction.steps: if "id" in step.data: await redis.setex( f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}", 3600, json.dumps(step.data) ) # Cache buffered data for smart queries for step in transaction.steps: await redis.setex( f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}", 3600, json.dumps(step.data) ) return transaction.transaction_id **Latency**: < 10ms typical (all in-memory operations) **Result**: Client receives immediate response with pre-generated UUID Phase 3: Background Processing ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Background processor runs continuously: .. code-block:: python # From background_processor.py async def _process_transactions(self): while self.is_running: # Check buffer size buffer_size = await redis.llen(buffer_key) if buffer_size == 0: await asyncio.sleep(0.1) continue # Check database connectivity if not await check_database_connectivity(): await asyncio.sleep(10) # Wait longer when DB down continue # Process transaction transaction_json = await redis.rpop(buffer_key) transaction = SQLAlchemyBufferedTransaction.model_validate_json(transaction_json) try: await process_single_transaction(transaction) except Exception as e: await handle_failure(transaction, e) Processing a single transaction: .. code-block:: python async def process_single_transaction(transaction): # Execute transaction async with main_db_session() as session: for step in transaction.steps: if step.operation == "create": record = step.model_class(**step.data) session.add(record) elif step.operation == "update": # ... update logic # ... other operations await session.commit() # Capture LSN result = await session.execute("SELECT pg_current_wal_lsn()") lsn = result.scalar() # Track LSN for replication await lsn_tracker.wait_for_replication(lsn, transaction.transaction_id) **Throughput**: ~10 transactions/second typical **Error handling**: Automatic retry with exponential backoff Phase 4: LSN Tracking ~~~~~~~~~~~~~~~~~~~~~~ After executing on main database, track replication: .. code-block:: python # From lsn_tracker.py async def wait_for_replication(target_lsn: str, transaction_id: str): start_time = time.time() while (time.time() - start_time) < timeout: # Query replica async with replica_session() as session: result = await session.execute("SELECT pg_last_wal_replay_lsn()") replica_lsn = result.scalar() # Compare LSNs if replica_lsn >= target_lsn: # Replicated! await cleanup_caches(transaction_id) return True # Not yet, wait and retry await asyncio.sleep(0.1) # Timeout: extend cache TTL await extend_cache_ttl(transaction_id) return False **Why LSN tracking matters**: * Know precisely when data has replicated (no guessing) * Smart cache management (cleanup when safe) * Monitoring and alerting (detect lag) Phase 5: Cache Cleanup ~~~~~~~~~~~~~~~~~~~~~~~ When LSN confirms replication: .. code-block:: python async def cleanup_caches(transaction_id): transaction = await get_transaction(transaction_id) for step in transaction.steps: # Remove from write-through cache await redis.delete( f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}" ) # Remove from buffered data cache await redis.delete( f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}" ) # Remove from read buffer (if exists) await redis.delete( f"site:{site_name}:read_buffer:{step.model_class}:{step.data['id']}" ) # Remove transaction status await redis.delete(f"site:{site_name}:transaction:{transaction_id}") **Why cleanup**: Free memory, prevent stale data, indicate replication complete Smart Query Integration ----------------------- While transaction is buffered or replicating, smart queries provide consistent view: .. code-block:: python # Client queries for observations observations = await smart_query.search_records( "ExecutedObsUnit", {"obs_unit_id": 123} ) Smart query manager: 1. **Query database** (may not have buffered data yet) 2. **Query buffer cache** (data still in buffer) 3. **Query read buffer** (updates to buffered records) 4. **Merge results** (buffer + read buffer overrides database) 5. **Deduplicate** by ID 6. **Return** complete view **Result**: Client always sees latest state, even if not yet in database. See :doc:`smart-query-manager` for details. Read Buffer for Mutable Updates -------------------------------- **Problem**: Buffered record needs update before replication: .. code-block:: python # Start observation (buffered) obs_id = await start_observation(...) # Returns pre-gen UUID # 10 seconds later: finish observation # But record still in buffer (not in DB yet) await finish_observation(obs_id, end_time="...") **Solution**: Read buffer tracks updates: .. code-block:: python await read_buffer_manager.update( model_class="ExecutedObsUnit", record_id=obs_id, updates={"status": "completed", "end_time": "..."} ) Smart query manager applies read buffer updates when merging. See :doc:`read-buffer-manager` for details. Data Structures In Detail -------------------------- SQLAlchemyBufferedTransaction ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Complete model: .. code-block:: python class SQLAlchemyBufferedTransaction(BaseModel): transaction_id: str = Field(default_factory=lambda: str(uuid.uuid4())) endpoint: str # Endpoint that created it site: str # Site name timestamp: datetime = Field(default_factory=datetime.utcnow) status: str = "pending" # pending, processing, completed, failed retry_count: int = 0 max_retries: int = 3 steps: List[SQLAlchemyTransactionStep] = [] metadata: Dict[str, Any] = {} SQLAlchemyTransactionStep ~~~~~~~~~~~~~~~~~~~~~~~~~~ Complete model: .. code-block:: python class SQLAlchemyTransactionStep(BaseModel): step_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8]) operation: SQLAlchemyOperationType # create, update, delete, bulk_create model_class: str # "ExecutedObsUnit", "RawDataFile", etc. data: Dict[str, Any] = {} conditions: Dict[str, Any] = {} # For updates/deletes dependencies: List[str] = [] # step_ids this depends on expected_result: Optional[str] = None Example Complete Transaction ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Create observation with files: .. code-block:: json { "transaction_id": "abc-123-def", "endpoint": "create_observation_with_files", "site": "observatory", "timestamp": "2025-01-01T00:00:00Z", "status": "pending", "retry_count": 0, "steps": [ { "step_id": "step1", "operation": "create", "model_class": "ExecutedObsUnit", "data": { "id": "uuid-1", "obs_unit_id": 123, "start_time": "2025-01-01T00:00:00Z", "status": "running" } }, { "step_id": "step2", "operation": "create", "model_class": "RawDataPackage", "data": { "id": "uuid-2", "name": "obs_123_package", "executed_obs_unit_id": "uuid-1" }, "dependencies": ["step1"] }, { "step_id": "step3", "operation": "bulk_create", "model_class": "RawDataFile", "data": [ { "id": "uuid-3", "name": "file1.fits", "raw_data_package_id": "uuid-2" }, { "id": "uuid-4", "name": "file2.fits", "raw_data_package_id": "uuid-2" } ], "dependencies": ["step2"] } ] } Failure Handling ---------------- Retry Logic ~~~~~~~~~~~ .. code-block:: python async def execute_with_retry(transaction): for attempt in range(transaction.max_retries): try: await execute_transaction(transaction) return # Success except Exception as e: if attempt < transaction.max_retries - 1: delay = 5 * (2 ** attempt) # Exponential backoff logger.warning(f"Retry {attempt + 1} in {delay}s: {e}") await asyncio.sleep(delay) else: # Final failure await move_to_failed_queue(transaction) raise Failed Transaction Queue ~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: text Key: site:{site_name}:failed_transactions Type: List Content: JSON-serialized transactions that exceeded max retries Manual intervention required for failed transactions. Monitoring and Observability ----------------------------- Key Metrics ~~~~~~~~~~~ **Buffer health**: * ``transaction_buffer_size`` - Current buffer size * ``failed_transaction_count`` - Transactions in failed queue * ``oldest_pending_age_seconds`` - Age of oldest transaction **Processing performance**: * ``transactions_processed_total`` - Cumulative count * ``transaction_processing_rate`` - Transactions/second * ``transaction_execution_time_ms`` - Average execution time **Replication lag**: * ``replication_lag_seconds`` - Time lag between main and replica * ``replication_lag_bytes`` - Byte lag between main and replica * ``lsn_tracking_timeout_count`` - Timeouts waiting for replication Health Endpoint ~~~~~~~~~~~~~~~ .. code-block:: bash curl http://localhost:8000/health .. code-block:: json { "status": "healthy", "transaction_buffer": { "size": 5, "failed": 0, "oldest_pending_seconds": 2.5 }, "background_processor": { "status": "running", "last_run": "2025-01-01T00:00:05Z", "transactions_processed": 150 }, "replication_lag": { "seconds": 1.2, "main_lsn": "0/12345678", "replica_lsn": "0/12345600" } } Summary ------- The transaction buffering system provides: * **Immediate responses**: < 20ms typical for buffered operations * **Guaranteed execution**: Redis persistence + automatic retry * **Precise tracking**: LSN-based replication monitoring * **Consistent views**: Smart queries merge buffer + database * **Mutable updates**: Read buffer tracks changes to buffered records Key architectural decisions: * **Pre-generated IDs**: Enable immediate client access * **LSN tracking**: Eliminate guesswork about replication * **Smart queries**: Merge multiple data sources transparently * **Read buffer**: Support mutable buffered records * **Background processing**: Decouple buffering from execution This architecture ensures **observatory operations never fail due to network issues** while maintaining eventual data consistency. Next Steps ---------- * :doc:`transaction-builder` - Building transactions * :doc:`transaction-manager` - Buffering and state management * :doc:`background-processor` - Async processing * :doc:`lsn-tracking` - Replication tracking * :doc:`smart-query-manager` - Querying buffered data * :doc:`../../tutorials/complex-endpoints/buffered-critical-operations` - Using in endpoints