Skip to content
GitLab

Orchestration

Akili’s execution engine translates 6 YAML manifest files into assets, sensors, store writers, and quality checks automatically via akili codegen. You write your business logic in SQL or Python — the platform generates all orchestration code.

Each manifest file maps to a platform concept:

ManifestPlatform ConceptPurpose
source.yamlAsset (one per source)Ingestion layer
pipeline.yamlAsset / multi-assetTransformations
quality.yamlQuality checkBlocking on severity: error
serving.yamlStore writerIntent-based store routing
schedule.yamlDeadline timer + retry configScheduling and fallback
governance.yamlAsset metadata + tagsRetention, ownership
contract.ext.yamlRegistry subscriptionsEvent routing

Asset key convention: AssetKey(["<tenant>", "<product>", "<asset>"])

The orchestration layer is organized into independent sub-concerns:

A data product often depends on multiple inputs. The correlation engine detects when all inputs are ready within a time window before triggering execution.

%%{init: {'sequence': {'mirrorActors': false}}}%%
sequenceDiagram
    participant EB as Event Bus
    participant CE as Correlation Engine
    participant D as Dispatcher

    EB->>CE: data.available (input A, window W1)
    CE->>CE: Store: A ready for W1
    CE->>CE: Check: all inputs for W1 ready? No
    EB->>CE: data.available (input B, window W1)
    CE->>CE: Store: B ready for W1
    CE->>CE: Check: all inputs for W1 ready? Yes
    CE->>D: All inputs correlated — trigger execution

Correlation key: (tenant_id, product_id, window_id). Each input event is matched to its correlation window. Once all declared inputs have a data.available event in the same window, the dispatcher is triggered.

Timeout: If not all inputs arrive within the configured deadline, the dispatcher enters TIMEOUT state and applies the fallback policy.

The dispatcher manages execution lifecycle through four states:

stateDiagram-v2
    [*] --> WAIT: Inputs declared
    WAIT --> EXECUTE: All inputs correlated
    WAIT --> TIMEOUT: Deadline exceeded
    TIMEOUT --> EXECUTE: Fallback policy = run_partial
    TIMEOUT --> FAILSAFE: Fallback policy = skip
    EXECUTE --> [*]: Success
    EXECUTE --> DLQ: Execution failed
    DLQ --> EXECUTE: Retry (within policy)
    DLQ --> FAILSAFE: Retries exhausted
    FAILSAFE --> [*]: Alert + skip
StateBehavior
WAITCorrelation engine is collecting input events
TIMEOUTDeadline exceeded — apply fallback policy from schedule.yaml
EXECUTECompute job launched, transform running
FAILSAFEAll retries exhausted — alert, serve last-known-good data

When execution fails, the entry routes to the Dead Letter Queue with a retry policy:

PolicyBehavior
exponential_backoff1s, 2s, 4s, 8s… up to max retries
fixed_intervalFixed delay between retries
noneNo retry — straight to DLQ for manual inspection

Failed executions in the DLQ can be inspected via GET /api/v1/dlq and replayed via POST /api/v1/dlq/{id}/replay. A circuit breaker prevents cascading failures — if a product fails 3 consecutive times, further executions are suspended until manual intervention.

%%{init: {'flowchart': {'curve': 'basis'}}}%%
flowchart LR
    A[YAML Validation] --> B[Code Generation]
    B --> C[Lint & Check]
    C --> D[Unit Tests]
    D --> E[Integration Tests]
    E --> F[Pre-push Hook]
    F --> G[CI Pipeline]
    G --> H[Deploy & Sync]
%%{init: {'sequence': {'mirrorActors': false, 'messageMargin': 40}}}%%
sequenceDiagram
    participant API as Control Plane
    participant Engine as Execution Engine
    participant Sensor as Sensor
    participant Job as Compute Job
    participant QC as Quality Checks
    participant SW as Store Writers
    participant Stores as Serving Stores
    participant EB as Event Bus

    API->>Engine: Deploy product
    Engine->>Sensor: Create sensor for inputs
    EB->>Sensor: data.available events
    Sensor->>Sensor: Correlate — all inputs ready?
    Sensor->>Engine: Yield run request
    Engine->>Job: Launch compute job
    Job->>Job: Execute SQL/Python transform
    Job->>QC: Run quality checks
    QC-->>QC: Blocking checks must pass
    QC->>SW: Quality gate passed
    SW->>Stores: Write to data lake
    SW->>Stores: Write to serving stores
    SW->>EB: Publish data.available
    EB->>Sensor: Trigger downstream products

When all inputs for a data product are available:

  1. Sensor detection — A platform sensor detects all required data.available events
  2. Run request — Sensor yields a run request for the product’s asset
  3. Compute job — The execution engine launches a job with resources from compute.yaml
  4. Transform execution — SQL or Python runs in an isolated compute container
  5. Quality gate — Quality checks execute; blocking checks must pass
  6. Store routing — Store writers write to the data lake (always) plus serving stores per serving.yaml
  7. Event publicationdata.available published to the event bus, triggering downstream sensors

Three scheduling modes are supported:

ModeTriggerUse Case
cronTime-based scheduleRegular batch processing (daily, hourly)
eventUpstream data.available eventReactive pipelines triggered by data readiness
adaptiveEvent-driven with debouncing and fallback cronNear-real-time freshness SLAs (sub-5-minute)

The adaptive mode (ADR-034) deploys a per-product trigger agent that consumes event bus messages, debounces them, and fires materializations when freshness would breach the SLA.

akili codegen <product> is a deterministic transformation: same YAML input always produces byte-for-byte identical Python output. The pipeline:

  1. Parse — Read all 6 manifest files
  2. Resolve dependencies — Build the dependency graph
  3. Resolve templates — Substitute {{ intent() }} calls with concrete values from upstream semantic intents
  4. Compile — Generate execution engine code (assets, checks, store writers, sensors)

Template resolution enables semantic intent contracts — downstream products reference upstream meaning (e.g., “high-value customer”) rather than specific column values, so upstream changes propagate automatically at codegen time.