Skip to content
GitLab

Advanced Orchestration

This guide covers advanced orchestration topics for data products that need fine-grained control over execution timing, input correlation, streaming offset management, and retry behavior.

When a data product has multiple inputs, the platform’s correlation engine ensures all inputs are ready before triggering execution. This prevents partial materializations where some inputs have fresh data and others are stale.

%%{init: {'sequence': {'mirrorActors': false, 'messageMargin': 40}}}%%
sequenceDiagram
    participant A as Product A
    participant B as Product B
    participant EB as Event Bus
    participant CE as Correlation Engine
    participant C as Product C

    A->>EB: data.available (A)
    EB->>CE: Receive event for A
    CE->>CE: Check: is B also ready?
    Note over CE: B not yet ready -- wait
    B->>EB: data.available (B)
    EB->>CE: Receive event for B
    CE->>CE: Check: A ready + B ready
    Note over CE: All inputs ready!
    CE->>C: Trigger execution

The correlation engine maintains a window for each product’s inputs. When all inputs have published data.available within the window, execution triggers.

The correlation window is the maximum time the engine will wait for all inputs to become ready. Configure it per input in inputs.yaml:

inputs:
- name: orders
kind: product
product: raw-orders
timeout: 30m # Wait up to 30 minutes for this input
fallback: use_cached # If timeout expires, use the last successful data
- name: customers
kind: product
product: customer-master
timeout: 15m
fallback: fail # If timeout expires, fail the execution
StrategyBehaviorUse Case
use_cachedUse the last successful materialization of the inputEnrichment data that changes slowly
skipProceed without the input (input asset is optional)Optional supplementary data
failFail the execution if the input is not ready in timeCritical inputs that must be fresh

For products with many inputs, consider the interaction between timeouts:

# Product with 3 inputs -- staggered timeouts
inputs:
- name: transactions
kind: product
product: raw-transactions
timeout: 45m # Primary input -- longest wait
fallback: fail
- name: exchange-rates
kind: product
product: daily-rates
timeout: 15m # Reference data -- shorter wait
fallback: use_cached
- name: country-codes
kind: product
product: ref-countries
timeout: 5m # Static reference -- minimal wait
fallback: use_cached

The correlation engine uses the longest timeout as the overall window. If transactions arrives at minute 40, it still waits up to 5 more minutes for exchange-rates and country-codes.

For kind: source inputs using CDC or incremental strategies, the platform tracks ingestion offsets to enable exactly-once semantics.

CDC (Change Data Capture) offsets are tracked via Redpanda consumer offsets. Each source port gets a dedicated consumer group:

Consumer group: {tenant}.{product}.{port_name}.cdc

The offset advances only after successful landing of the CDC batch in the data lake. If ingestion fails mid-batch, the offset does not advance — the next run retries from the same position.

Incremental ingestion tracks a high-water mark per source port:

-- Ingestion state table
SELECT cursor_field, high_water_mark, last_run_at, rows_extracted, status
FROM ingestion_state
WHERE tenant_id = $1 AND product_id = $2 AND port_name = $3;

The high-water mark updates atomically after successful extraction. You can inspect current state:

Terminal window
# View ingestion state for a product
akili ingestion state daily-orders

If an offset needs to be manually adjusted (rare, but necessary after data corruption):

Terminal window
# View current ingestion state
akili ingestion state daily-orders
# Trigger a backfill from a specific point
akili ingestion backfill daily-orders \
--from "2026-03-01T00:00:00Z" \
--to "2026-03-15T00:00:00Z" \
--strategy incremental

The dispatcher is the state machine that manages execution lifecycle for each product. It transitions through four states:

%%{init: {'stateDiagram': {'curve': 'basis'}}}%%
stateDiagram-v2
    [*] --> WAIT: Deployed
    WAIT --> EXECUTE: All inputs ready
    WAIT --> TIMEOUT: Correlation window expired
    TIMEOUT --> EXECUTE: Fallback = use_cached/skip
    TIMEOUT --> FAILSAFE: Fallback = fail
    EXECUTE --> WAIT: Success
    EXECUTE --> FAILSAFE: Failure (retries exhausted)
    FAILSAFE --> WAIT: Manual reset or auto-recovery

When multiple products are ready for execution simultaneously, the dispatcher uses priority ordering:

FactorWeightDescription
SLA urgencyHighProducts close to SLA breach execute first
Downstream countMediumProducts with more downstream dependents get priority
Historical durationLowLonger-running products start earlier to reduce total wait

The dispatcher controls how many products execute concurrently per tenant:

Tenant TierDefault ParallelismMax Parallelism
Starter24
Standard48
Enterprise816

Parallelism is bounded by the compute resource quota for the tenant.

The default retry policy (3 retries, exponential backoff starting at 30s) works for most products. For specialized needs, configure retry behavior in compute.yaml:

compute:
engine: dagster
schedule: "0 6 * * *"
resources:
cpu: "2"
memory: 4Gi
timeout: 3600
retries: 5 # Default is 2
tags:
- high-priority
Failure TypeDefault RetriesBackoffDLQ After
runtime_errorPer compute.yamlExponentialRetries exhausted
quality_gate_failurePer compute.yamlExponentialRetries exhausted
transient_io_errorPer compute.yamlExponentialRetries exhausted
schema_mismatch0 (non-retryable)N/AImmediately
permission_denied0 (non-retryable)N/AImmediately

The backoff calculation uses fixed platform parameters:

initial_delay = 30s
multiplier = 2
max_delay = 300s (5 minutes)
jitter = 0-20% random

These are not configurable per-product. The number of retries is the primary tuning knob.

For products that need near-real-time freshness (sub-5-minute SLAs), use the adaptive scheduling mode:

compute:
schedule: adaptive
sla_freshness: 5m # Target freshness SLA

Adaptive mode deploys a per-product trigger agent that:

  1. Consumes data.available events from upstream products
  2. Debounces rapid-fire events (prevents over-materialization)
  3. Fires a materialization when freshness would breach the SLA if not refreshed
  4. Falls back to a cron schedule if no events arrive within the SLA window

This mode is more resource-intensive than cron or event scheduling because it maintains a persistent consumer per product. Use it only when the freshness SLA demands it.

For cross-product orchestration, the platform provides a correlation ID that traces events across the entire dependency chain:

Terminal window
# View execution with correlation details
akili run get run-abc123 --json
# The correlation_id field links this execution to upstream triggers
# and downstream consumers

The correlation ID is set when the first event in a chain fires and propagated through all downstream executions. This enables end-to-end tracing from initial data ingestion to final serving.