Skip to content
GitLab

Event System

Akili uses an event-driven architecture to coordinate data product execution, governance enforcement, and system observability. Every significant platform action emits a domain event to the Redpanda event bus. Downstream consumers react to these events — triggering pipeline execution, updating governance state, and feeding the audit trail.

%%{init: {'sequence': {'mirrorActors': false, 'messageMargin': 40}}}%%
sequenceDiagram
    participant SVC as Service Layer
    participant EB as Redpanda Event Bus
    participant SENSOR as Dagster Sensor
    participant GOV as Governance Consumer
    participant DLQ as DLQ Monitor
    participant AUDIT as Audit Trail

    SVC->>EB: ProductRegistered
    SVC->>EB: ProductDeployed
    EB->>SENSOR: data.available
    SENSOR->>SENSOR: Correlate inputs
    SENSOR->>SVC: Trigger execution
    SVC->>EB: ExecutionStarted
    SVC->>EB: ExecutionCompleted
    EB->>GOV: Quality + SLA events
    SVC->>EB: QualityCheckCompleted
    EB->>DLQ: Failed events (after retry)
    EB->>AUDIT: All events (append-only)

Every domain event follows a standard envelope format:

{
"event_id": "evt-abc123",
"event_type": "ProductDeployed",
"tenant_id": "550e8400-e29b-41d4-a716-446655440000",
"correlation_id": "corr-xyz789",
"timestamp": "2026-03-16T10:30:00Z",
"payload": { }
}
FieldDescription
event_idGlobally unique identifier for this event instance
event_typeDiscriminator string identifying the event kind
tenant_idTenant scope — all events are tenant-isolated
correlation_idTraces related events across an execution lifecycle
timestampISO 8601 timestamp of when the event occurred
payloadEvent-specific data (varies by event type)

The platform emits 20+ event types across four categories.

Event TypeTriggerKey Payload Fields
ProductRegisteredNew product created via APIproduct_id, name, namespace, archetype
ProductDeployedProduct deployed to execution engineproduct_id, version, manifest_hash
ProductUpdatedProduct metadata changedproduct_id, changed_fields
ProductDeprecatedProduct marked for deprecationproduct_id, deprecation_date
ProductDeletedProduct permanently removedproduct_id
Event TypeTriggerKey Payload Fields
ExecutionStartedPipeline execution beginsexecution_id, product_id, partition_key
ExecutionCompletedPipeline execution succeedsexecution_id, duration_ms, row_count
ExecutionFailedPipeline execution fails (after retries)execution_id, error, retry_count
DataAvailableNew data materialized and quality-checkedproduct_id, snapshot_id, freshness
Event TypeTriggerKey Payload Fields
QualityCheckCompletedQuality gate evaluatedproduct_id, check_name, passed, severity
QualityCheckFailedBlocking quality check failsproduct_id, check_name, error_detail
SlaBreachedFreshness or availability SLA exceededproduct_id, sla_type, threshold, actual
ClassificationViolationClassification laundering attemptproduct_id, declared, required
RetentionExpiredData exceeds retention periodproduct_id, retention_period, oldest_record
DeletionRequestedRight-to-erasure request filedentity_type, identity_value, reason
DeletionCompletedDeletion executed and verifiedrequest_id, products_affected, rows_deleted
Event TypeTriggerKey Payload Fields
ConceptCreatedNew business concept extractedconcept_name, domain, source_product
ConceptPromotedConcept maturity advancedconcept_name, from_state, to_state
SemanticContractBrokenUpstream removed a referenced intentcontract_id, upstream_product, downstream_products
SemanticContractStaleUpstream values changed since compilecontract_id, upstream_product
ConnectionCreatedNew external connection registeredconnection_id, connector_type

Akili uses Redpanda as the event bus. Redpanda provides Kafka-compatible APIs with lower operational overhead.

All topics are tenant-prefixed to enforce isolation:

{tenant_slug}.{product_name}.data.available
{tenant_slug}.{product_name}.execution.events
{tenant_slug}.governance.events
{tenant_slug}.dlq.{stage}
Topic PatternPurpose
{tenant}.{product}.data.availableSignals that new data is ready for downstream consumption
{tenant}.{product}.execution.eventsExecution lifecycle events for a specific product
{tenant}.governance.eventsAll governance events for a tenant
{tenant}.dlq.{stage}Dead letter queue entries by failure stage

Each consumer maintains its own offset position via a consumer group:

ConsumerGroup ID PatternPurpose
Dagster Sensor{tenant}.{product}.sensorDetect upstream data availability
Governance{tenant}.governance.consumerProcess governance events
DLQ Monitor{tenant}.dlq.monitorAlert on new DLQ entries
Audit Logger{tenant}.audit.consumerAppend all events to audit trail

The data.available event is the primary orchestration primitive. When a data product completes materialization and passes its quality gates, it publishes data.available to the event bus. Downstream products subscribe to these events through their inputs.yaml declarations.

%%{init: {'flowchart': {'curve': 'basis'}}}%%
flowchart LR
    subgraph Product_A["Product A: raw-orders"]
        A_EXEC[Execute] --> A_QC[Quality Check]
        A_QC --> A_PUB[Publish data.available]
    end

    subgraph Event_Bus["Redpanda"]
        A_PUB --> TOPIC[tenant.raw-orders\n.data.available]
    end

    subgraph Product_B["Product B: daily-summary"]
        TOPIC --> B_SENSOR[Sensor detects event]
        B_SENSOR --> B_CORR[Correlate all inputs]
        B_CORR --> B_EXEC[Execute when all ready]
    end

When a data product has multiple inputs, the correlation engine waits until all inputs have published data.available within the configured time window before triggering execution. This prevents partial materializations.

The platform builds a subscription map from all deployed products’ inputs.yaml files. Each kind: product input creates a subscription entry:

Product B (inputs.yaml)
- name: orders
kind: product
product: raw-orders
--> Subscription: tenant.raw-orders.data.available -> Product B sensor
GuaranteeImplementation
At-least-once deliveryRedpanda consumer offset commits after processing
OrderingPer-partition ordering within a topic
DurabilityRedpanda replication factor 3
Deduplicationevent_id used for idempotent processing

Events that fail processing are sent to the dead letter queue after retry exhaustion. See DLQ Management for details.