Advanced Orchestration
This guide covers advanced orchestration topics for data products that need fine-grained control over execution timing, input correlation, streaming offset management, and retry behavior.
Execution Correlation
Section titled “Execution Correlation”When a data product has multiple inputs, the platform’s correlation engine ensures all inputs are ready before triggering execution. This prevents partial materializations where some inputs have fresh data and others are stale.
How Correlation Works
Section titled “How Correlation Works”%%{init: {'sequence': {'mirrorActors': false, 'messageMargin': 40}}}%%
sequenceDiagram
participant A as Product A
participant B as Product B
participant EB as Event Bus
participant CE as Correlation Engine
participant C as Product C
A->>EB: data.available (A)
EB->>CE: Receive event for A
CE->>CE: Check: is B also ready?
Note over CE: B not yet ready -- wait
B->>EB: data.available (B)
EB->>CE: Receive event for B
CE->>CE: Check: A ready + B ready
Note over CE: All inputs ready!
CE->>C: Trigger execution
The correlation engine maintains a window for each product’s inputs. When all inputs have published data.available within the window, execution triggers.
Correlation Window
Section titled “Correlation Window”The correlation window is the maximum time the engine will wait for all inputs to become ready. Configure it per input in inputs.yaml:
inputs: - name: orders kind: product product: raw-orders timeout: 30m # Wait up to 30 minutes for this input fallback: use_cached # If timeout expires, use the last successful data
- name: customers kind: product product: customer-master timeout: 15m fallback: fail # If timeout expires, fail the executionFallback Strategies
Section titled “Fallback Strategies”| Strategy | Behavior | Use Case |
|---|---|---|
use_cached | Use the last successful materialization of the input | Enrichment data that changes slowly |
skip | Proceed without the input (input asset is optional) | Optional supplementary data |
fail | Fail the execution if the input is not ready in time | Critical inputs that must be fresh |
Multi-Input Timing
Section titled “Multi-Input Timing”For products with many inputs, consider the interaction between timeouts:
# Product with 3 inputs -- staggered timeoutsinputs: - name: transactions kind: product product: raw-transactions timeout: 45m # Primary input -- longest wait fallback: fail
- name: exchange-rates kind: product product: daily-rates timeout: 15m # Reference data -- shorter wait fallback: use_cached
- name: country-codes kind: product product: ref-countries timeout: 5m # Static reference -- minimal wait fallback: use_cachedThe correlation engine uses the longest timeout as the overall window. If transactions arrives at minute 40, it still waits up to 5 more minutes for exchange-rates and country-codes.
Offset Management
Section titled “Offset Management”For kind: source inputs using CDC or incremental strategies, the platform tracks ingestion offsets to enable exactly-once semantics.
CDC Offset Tracking
Section titled “CDC Offset Tracking”CDC (Change Data Capture) offsets are tracked via Redpanda consumer offsets. Each source port gets a dedicated consumer group:
Consumer group: {tenant}.{product}.{port_name}.cdcThe offset advances only after successful landing of the CDC batch in the data lake. If ingestion fails mid-batch, the offset does not advance — the next run retries from the same position.
Incremental Cursor Tracking
Section titled “Incremental Cursor Tracking”Incremental ingestion tracks a high-water mark per source port:
-- Ingestion state tableSELECT cursor_field, high_water_mark, last_run_at, rows_extracted, statusFROM ingestion_stateWHERE tenant_id = $1 AND product_id = $2 AND port_name = $3;The high-water mark updates atomically after successful extraction. You can inspect current state:
# View ingestion state for a productakili ingestion state daily-ordersOffset Recovery
Section titled “Offset Recovery”If an offset needs to be manually adjusted (rare, but necessary after data corruption):
# View current ingestion stateakili ingestion state daily-orders
# Trigger a backfill from a specific pointakili ingestion backfill daily-orders \ --from "2026-03-01T00:00:00Z" \ --to "2026-03-15T00:00:00Z" \ --strategy incrementalDispatcher Configuration
Section titled “Dispatcher Configuration”The dispatcher is the state machine that manages execution lifecycle for each product. It transitions through four states:
%%{init: {'stateDiagram': {'curve': 'basis'}}}%%
stateDiagram-v2
[*] --> WAIT: Deployed
WAIT --> EXECUTE: All inputs ready
WAIT --> TIMEOUT: Correlation window expired
TIMEOUT --> EXECUTE: Fallback = use_cached/skip
TIMEOUT --> FAILSAFE: Fallback = fail
EXECUTE --> WAIT: Success
EXECUTE --> FAILSAFE: Failure (retries exhausted)
FAILSAFE --> WAIT: Manual reset or auto-recovery
Execution Priority
Section titled “Execution Priority”When multiple products are ready for execution simultaneously, the dispatcher uses priority ordering:
| Factor | Weight | Description |
|---|---|---|
| SLA urgency | High | Products close to SLA breach execute first |
| Downstream count | Medium | Products with more downstream dependents get priority |
| Historical duration | Low | Longer-running products start earlier to reduce total wait |
Parallelism
Section titled “Parallelism”The dispatcher controls how many products execute concurrently per tenant:
| Tenant Tier | Default Parallelism | Max Parallelism |
|---|---|---|
| Starter | 2 | 4 |
| Standard | 4 | 8 |
| Enterprise | 8 | 16 |
Parallelism is bounded by the compute resource quota for the tenant.
Custom Retry Policies
Section titled “Custom Retry Policies”The default retry policy (3 retries, exponential backoff starting at 30s) works for most products. For specialized needs, configure retry behavior in compute.yaml:
compute: engine: dagster schedule: "0 6 * * *" resources: cpu: "2" memory: 4Gi timeout: 3600 retries: 5 # Default is 2 tags: - high-priorityRetry Behavior by Failure Type
Section titled “Retry Behavior by Failure Type”| Failure Type | Default Retries | Backoff | DLQ After |
|---|---|---|---|
runtime_error | Per compute.yaml | Exponential | Retries exhausted |
quality_gate_failure | Per compute.yaml | Exponential | Retries exhausted |
transient_io_error | Per compute.yaml | Exponential | Retries exhausted |
schema_mismatch | 0 (non-retryable) | N/A | Immediately |
permission_denied | 0 (non-retryable) | N/A | Immediately |
Backoff Parameters
Section titled “Backoff Parameters”The backoff calculation uses fixed platform parameters:
initial_delay = 30smultiplier = 2max_delay = 300s (5 minutes)jitter = 0-20% randomThese are not configurable per-product. The number of retries is the primary tuning knob.
Adaptive Scheduling
Section titled “Adaptive Scheduling”For products that need near-real-time freshness (sub-5-minute SLAs), use the adaptive scheduling mode:
compute: schedule: adaptive sla_freshness: 5m # Target freshness SLAAdaptive mode deploys a per-product trigger agent that:
- Consumes
data.availableevents from upstream products - Debounces rapid-fire events (prevents over-materialization)
- Fires a materialization when freshness would breach the SLA if not refreshed
- Falls back to a cron schedule if no events arrive within the SLA window
This mode is more resource-intensive than cron or event scheduling because it maintains a persistent consumer per product. Use it only when the freshness SLA demands it.
Execution Correlation Across Products
Section titled “Execution Correlation Across Products”For cross-product orchestration, the platform provides a correlation ID that traces events across the entire dependency chain:
# View execution with correlation detailsakili run get run-abc123 --json
# The correlation_id field links this execution to upstream triggers# and downstream consumersThe correlation ID is set when the first event in a chain fires and propagated through all downstream executions. This enables end-to-end tracing from initial data ingestion to final serving.
Related
Section titled “Related”- Orchestration — execution engine overview
- Data Lifecycle — lifecycle stages
- DLQ Management — handling failed executions
- Event System — event-driven coordination