Transaction Builder =================== The Transaction Builder constructs complex, multi-step SQL Alchemy transactions with dependency resolution and pre-generated IDs. .. contents:: Table of Contents :local: :depth: 2 Purpose ------- **Problem**: Observatory operations often require multiple related database records (observation + package + files). These must be atomic (all succeed or all fail), even when buffered. **Solution**: Transaction Builder constructs a multi-step transaction specification that can be: * Serialized to JSON and buffered in Redis * Executed atomically later by the background processor * Referenced immediately via pre-generated IDs Core Implementation ------------------- The transaction builder is defined in ``transaction_builder.py``: .. literalinclude:: ../../../ccat_ops_db_api/transaction_buffering/transaction_builder.py :language: python :lines: 1-28 Key Features ------------ 1. Pre-Generated IDs ~~~~~~~~~~~~~~~~~~~~ IDs are generated **before** database insertion: .. code-block:: python from ccat_ops_db_api.transaction_buffering import get_id_manager id_manager = get_id_manager() obs_id = id_manager.generate_id("ExecutedObsUnit") # Returns UUID # Use immediately, even though not in DB yet client_response = {"id": obs_id, "status": "buffered"} **Why this matters**: * Client can reference the record immediately * Related records can use the ID (foreign keys) * Status queries work before replication 2. Supported Operations ~~~~~~~~~~~~~~~~~~~~~~~ .. list-table:: :header-rows: 1 :widths: 20 50 30 * - Operation - Description - Example Use Case * - **CREATE** - Insert single record - Create observation * - **UPDATE** - Update existing record - Finish observation * - **DELETE** - Delete record(s) - Cancel observation * - **BULK_CREATE** - Insert multiple records - Register 100+ data files 3. Dependency Syntax ~~~~~~~~~~~~~~~~~~~~ Reference IDs from previous steps using ``${step_id}.field``: .. code-block:: python # Step 1: Create observation obs_step = builder.create( model_class=models.ExecutedObsUnit, data={"id": uuid.uuid4(), "status": "running"}, step_id="create_obs" ) # Step 2: Create package (references observation ID) pkg_step = builder.create( model_class=models.RawDataPackage, data={ "id": uuid.uuid4(), "executed_obs_unit_id": f"${obs_step.step_id}.id" # Reference! }, step_id="create_pkg" ) **At execution time**, the transaction executor resolves dependencies: .. code-block:: python # Executor resolves "${create_obs}.id" to actual UUID executed_obs_unit_id = resolved_values["create_obs"]["id"] Complete Working Example ------------------------ Create observation with package and files: .. code-block:: python from ccat_ops_db_api.transaction_buffering import get_transaction_builder from ccat_ops_db import models import uuid # Get builder instance builder = get_transaction_builder() builder.start_new_transaction( endpoint="create_observation_with_files", site="observatory" ) # Step 1: Create observation obs_id = uuid.uuid4() obs_step = builder.create( model_class=models.ExecutedObsUnit, data={ "id": obs_id, "obs_unit_id": 123, "start_time": "2025-01-01T00:00:00Z", "status": "running", "mean_pwv": 2.5, "mean_elevation": 45.0, "instrument_module_configuration_id": 1 }, step_id="create_obs" ) # Step 2: Create data package pkg_id = uuid.uuid4() pkg_step = builder.create( model_class=models.RawDataPackage, data={ "id": pkg_id, "name": "obs_123_package_001", "executed_obs_unit_id": obs_id, # Can use directly or via ${} "status": "building" }, step_id="create_pkg" ) # Step 3: Bulk create data files file_data = [] for i in range(10): file_data.append({ "id": uuid.uuid4(), "name": f"obs_123_det_{i:03d}.fits", "path": f"/data/obs_123/det_{i:03d}.fits", "size": 1048576, "checksum": f"sha256_{i}", "file_type": "fits", "raw_data_package_id": pkg_id }) files_step = builder.bulk_create( model_class=models.RawDataFile, data_list=file_data, dependencies=[pkg_step.step_id], # Wait for package creation step_id="create_files" ) # Build final transaction transaction = builder.build() # Buffer for async execution from ccat_ops_db_api.transaction_buffering import get_transaction_manager manager = get_transaction_manager() transaction_id = await manager.buffer_transaction(transaction) # Return pre-generated IDs to client return { "observation_id": obs_id, "package_id": pkg_id, "transaction_id": transaction_id, "status": "buffered" } Using in Endpoints ------------------ The ``@critical_operation`` decorator automatically provides a transaction builder: .. code-block:: python from fastapi import APIRouter, Depends from ccat_ops_db_api.transaction_buffering import ( critical_operation, SQLAlchemyTransactionBuilder, get_transaction_builder ) router = APIRouter() @router.post("/executed_obs_units/start") @critical_operation async def start_observation( obs_data: ExecutedObsUnitCreate, _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder) ): # Build transaction obs_step = _transaction_builder.create( model_class=models.ExecutedObsUnit, data=obs_data.dict(), step_id="create_observation" ) # Decorator handles buffering automatically return { "id": obs_step.data["id"], "status": "buffered" } Builder Methods --------------- create() ~~~~~~~~ .. code-block:: python step = builder.create( model_class=models.ExecutedObsUnit, # SQLAlchemy model class data={"id": uuid.uuid4(), "status": "running"}, # Record data step_id="create_obs" # Unique step identifier ) **Returns**: ``SQLAlchemyTransactionStep`` with pre-generated data update() ~~~~~~~~ .. code-block:: python step = builder.update( model_class=models.ExecutedObsUnit, conditions={"id": obs_id}, # Which record(s) to update updates={"status": "completed", "end_time": "2025-01-01T01:00:00Z"}, step_id="finish_obs" ) delete() ~~~~~~~~ .. code-block:: python step = builder.delete( model_class=models.RawDataFile, conditions={"raw_data_package_id": pkg_id}, # Which records to delete step_id="delete_files" ) bulk_create() ~~~~~~~~~~~~~ .. code-block:: python step = builder.bulk_create( model_class=models.RawDataFile, data_list=[ {"id": uuid.uuid4(), "name": "file1.fits", ...}, {"id": uuid.uuid4(), "name": "file2.fits", ...}, # ... 100+ files ], dependencies=["create_package"], # Wait for package first step_id="create_files" ) build() ~~~~~~~ .. code-block:: python transaction = builder.build() # Returns: SQLAlchemyBufferedTransaction ready for buffering Transaction Validation ---------------------- The builder validates transactions before buffering: **Checks**: * All required fields present * Data types correct * Foreign key references valid * Dependencies form DAG (no cycles) * Model classes exist **Example error**: .. code-block:: python # Missing required field builder.create( model_class=models.ExecutedObsUnit, data={"status": "running"}, # Missing obs_unit_id! step_id="create_obs" ) # Raises: ValidationError Dependency Resolution --------------------- The transaction executor resolves dependencies at execution time: .. code-block:: python # From transaction_executor.py async def resolve_dependencies(step, resolved_values): data = step.data.copy() for key, value in data.items(): if isinstance(value, str) and value.startswith("${"): # Parse: "${step1}.field" ref = value[2:-1] # Remove ${ and } step_id, field = ref.split(".") # Lookup resolved value data[key] = resolved_values[step_id][field] return data **Execution order**: Topological sort based on dependencies Advanced Patterns ----------------- Conditional Steps ~~~~~~~~~~~~~~~~~ .. code-block:: python # Create observation obs_step = builder.create(...) # Conditionally create package if include_package: pkg_step = builder.create( model_class=models.RawDataPackage, data={"executed_obs_unit_id": f"${obs_step.step_id}.id"}, step_id="create_pkg" ) Complex Dependencies ~~~~~~~~~~~~~~~~~~~~ .. code-block:: python # Multiple dependencies final_step = builder.create( model_class=models.SomeModel, data={ "obs_id": f"${create_obs}.id", "pkg_id": f"${create_pkg}.id", "file_count": 10 }, dependencies=["create_obs", "create_pkg"], # Wait for both step_id="final_step" ) Metadata and Context ~~~~~~~~~~~~~~~~~~~~ .. code-block:: python transaction = builder.build() transaction.metadata = { "user_id": current_user.id, "source": "api", "request_id": request_id } Testing Transaction Building ----------------------------- Unit Test Example ~~~~~~~~~~~~~~~~~ .. code-block:: python def test_transaction_builder(): builder = SQLAlchemyTransactionBuilder( endpoint="test", site="test" ) # Create step step = builder.create( model_class=models.ExecutedObsUnit, data={"id": uuid.uuid4(), "status": "running"}, step_id="test_step" ) # Build transaction = builder.build() # Assertions assert transaction.endpoint == "test" assert len(transaction.steps) == 1 assert transaction.steps[0].operation == "create" Summary ------- The Transaction Builder: * **Constructs** multi-step database transactions * **Pre-generates** IDs for immediate client access * **Handles** dependencies between steps * **Validates** transaction structure * **Serializes** to JSON for Redis buffering * **Enables** atomic buffered operations Key methods: ``create()``, ``update()``, ``delete()``, ``bulk_create()``, ``build()`` Next Steps ---------- * :doc:`transaction-manager` - Buffering transactions to Redis * :doc:`../routers/operations-routers` - Using in endpoints * :doc:`../../tutorials/complex-endpoints/buffered-critical-operations` - Tutorial