Event System
Akili uses an event-driven architecture to coordinate data product execution, governance enforcement, and system observability. Every significant platform action emits a domain event to the Redpanda event bus. Downstream consumers react to these events — triggering pipeline execution, updating governance state, and feeding the audit trail.
Event Architecture
Section titled “Event Architecture”%%{init: {'sequence': {'mirrorActors': false, 'messageMargin': 40}}}%%
sequenceDiagram
participant SVC as Service Layer
participant EB as Redpanda Event Bus
participant SENSOR as Dagster Sensor
participant GOV as Governance Consumer
participant DLQ as DLQ Monitor
participant AUDIT as Audit Trail
SVC->>EB: ProductRegistered
SVC->>EB: ProductDeployed
EB->>SENSOR: data.available
SENSOR->>SENSOR: Correlate inputs
SENSOR->>SVC: Trigger execution
SVC->>EB: ExecutionStarted
SVC->>EB: ExecutionCompleted
EB->>GOV: Quality + SLA events
SVC->>EB: QualityCheckCompleted
EB->>DLQ: Failed events (after retry)
EB->>AUDIT: All events (append-only)
Domain Events
Section titled “Domain Events”Every domain event follows a standard envelope format:
{ "event_id": "evt-abc123", "event_type": "ProductDeployed", "tenant_id": "550e8400-e29b-41d4-a716-446655440000", "correlation_id": "corr-xyz789", "timestamp": "2026-03-16T10:30:00Z", "payload": { }}| Field | Description |
|---|---|
event_id | Globally unique identifier for this event instance |
event_type | Discriminator string identifying the event kind |
tenant_id | Tenant scope — all events are tenant-isolated |
correlation_id | Traces related events across an execution lifecycle |
timestamp | ISO 8601 timestamp of when the event occurred |
payload | Event-specific data (varies by event type) |
Event Types
Section titled “Event Types”The platform emits 20+ event types across four categories.
Product Lifecycle Events
Section titled “Product Lifecycle Events”| Event Type | Trigger | Key Payload Fields |
|---|---|---|
ProductRegistered | New product created via API | product_id, name, namespace, archetype |
ProductDeployed | Product deployed to execution engine | product_id, version, manifest_hash |
ProductUpdated | Product metadata changed | product_id, changed_fields |
ProductDeprecated | Product marked for deprecation | product_id, deprecation_date |
ProductDeleted | Product permanently removed | product_id |
Execution Events
Section titled “Execution Events”| Event Type | Trigger | Key Payload Fields |
|---|---|---|
ExecutionStarted | Pipeline execution begins | execution_id, product_id, partition_key |
ExecutionCompleted | Pipeline execution succeeds | execution_id, duration_ms, row_count |
ExecutionFailed | Pipeline execution fails (after retries) | execution_id, error, retry_count |
DataAvailable | New data materialized and quality-checked | product_id, snapshot_id, freshness |
Quality and Governance Events
Section titled “Quality and Governance Events”| Event Type | Trigger | Key Payload Fields |
|---|---|---|
QualityCheckCompleted | Quality gate evaluated | product_id, check_name, passed, severity |
QualityCheckFailed | Blocking quality check fails | product_id, check_name, error_detail |
SlaBreached | Freshness or availability SLA exceeded | product_id, sla_type, threshold, actual |
ClassificationViolation | Classification laundering attempt | product_id, declared, required |
RetentionExpired | Data exceeds retention period | product_id, retention_period, oldest_record |
DeletionRequested | Right-to-erasure request filed | entity_type, identity_value, reason |
DeletionCompleted | Deletion executed and verified | request_id, products_affected, rows_deleted |
Registry and Concept Events
Section titled “Registry and Concept Events”| Event Type | Trigger | Key Payload Fields |
|---|---|---|
ConceptCreated | New business concept extracted | concept_name, domain, source_product |
ConceptPromoted | Concept maturity advanced | concept_name, from_state, to_state |
SemanticContractBroken | Upstream removed a referenced intent | contract_id, upstream_product, downstream_products |
SemanticContractStale | Upstream values changed since compile | contract_id, upstream_product |
ConnectionCreated | New external connection registered | connection_id, connector_type |
Event Bus: Redpanda
Section titled “Event Bus: Redpanda”Akili uses Redpanda as the event bus. Redpanda provides Kafka-compatible APIs with lower operational overhead.
Topic Naming Convention
Section titled “Topic Naming Convention”All topics are tenant-prefixed to enforce isolation:
{tenant_slug}.{product_name}.data.available{tenant_slug}.{product_name}.execution.events{tenant_slug}.governance.events{tenant_slug}.dlq.{stage}| Topic Pattern | Purpose |
|---|---|
{tenant}.{product}.data.available | Signals that new data is ready for downstream consumption |
{tenant}.{product}.execution.events | Execution lifecycle events for a specific product |
{tenant}.governance.events | All governance events for a tenant |
{tenant}.dlq.{stage} | Dead letter queue entries by failure stage |
Consumer Groups
Section titled “Consumer Groups”Each consumer maintains its own offset position via a consumer group:
| Consumer | Group ID Pattern | Purpose |
|---|---|---|
| Dagster Sensor | {tenant}.{product}.sensor | Detect upstream data availability |
| Governance | {tenant}.governance.consumer | Process governance events |
| DLQ Monitor | {tenant}.dlq.monitor | Alert on new DLQ entries |
| Audit Logger | {tenant}.audit.consumer | Append all events to audit trail |
Event-Driven Orchestration
Section titled “Event-Driven Orchestration”The data.available event is the primary orchestration primitive. When a data product completes materialization and passes its quality gates, it publishes data.available to the event bus. Downstream products subscribe to these events through their inputs.yaml declarations.
%%{init: {'flowchart': {'curve': 'basis'}}}%%
flowchart LR
subgraph Product_A["Product A: raw-orders"]
A_EXEC[Execute] --> A_QC[Quality Check]
A_QC --> A_PUB[Publish data.available]
end
subgraph Event_Bus["Redpanda"]
A_PUB --> TOPIC[tenant.raw-orders\n.data.available]
end
subgraph Product_B["Product B: daily-summary"]
TOPIC --> B_SENSOR[Sensor detects event]
B_SENSOR --> B_CORR[Correlate all inputs]
B_CORR --> B_EXEC[Execute when all ready]
end
Correlation Engine
Section titled “Correlation Engine”When a data product has multiple inputs, the correlation engine waits until all inputs have published data.available within the configured time window before triggering execution. This prevents partial materializations.
Subscription Maps
Section titled “Subscription Maps”The platform builds a subscription map from all deployed products’ inputs.yaml files. Each kind: product input creates a subscription entry:
Product B (inputs.yaml) - name: orders kind: product product: raw-orders
--> Subscription: tenant.raw-orders.data.available -> Product B sensorEvent Delivery Guarantees
Section titled “Event Delivery Guarantees”| Guarantee | Implementation |
|---|---|
| At-least-once delivery | Redpanda consumer offset commits after processing |
| Ordering | Per-partition ordering within a topic |
| Durability | Redpanda replication factor 3 |
| Deduplication | event_id used for idempotent processing |
Events that fail processing are sent to the dead letter queue after retry exhaustion. See DLQ Management for details.
Related
Section titled “Related”- Orchestration — how events drive pipeline execution
- Governance Model — governance events and enforcement
- DLQ Management — handling failed events