Real-Time Event Pipeline
A complete working example of a real-time event processing pipeline on the Akili platform. Ingests clickstream events from a Redpanda topic via CDC, applies sessionization transforms, enforces quality gates, and serves the results through both the analytics engine (for dashboards) and a lookup store (for real-time queries).
%%{init: {'flowchart': {'curve': 'basis'}}}%%
flowchart LR
subgraph Source["Event Source"]
CLICKS[Clickstream Events]
USERS[User Profiles]
end
subgraph Akili["Akili Platform"]
CDC[CDC Ingestion] --> TRANSFORM[Sessionization\nTransform]
INC[Incremental Sync] --> TRANSFORM
TRANSFORM --> QUALITY[Quality Gates]
QUALITY --> LAKE[Data Lake]
end
subgraph Serving["Serving Stores"]
SR[Analytics Engine\nStarRocks]
PG[Lookup Store\nPostgreSQL]
end
CLICKS --> CDC
USERS --> INC
LAKE --> SR
LAKE --> PG
Manifest Files
Section titled “Manifest Files”product.yaml
Section titled “product.yaml”Product identity for a streaming clickstream sessionization pipeline.
# Product manifest -- streaming event pipelineproduct: name: user_sessions namespace: analytics version: "1.0.0" archetype: source description: > Sessionizes raw clickstream events into user sessions. Streams from the production clickstream topic via CDC. Sessions are defined by a 30-minute inactivity window. owner: analytics-team@example.com tags: - streaming - clickstream - sessions - real-timeinputs.yaml
Section titled “inputs.yaml”Two inputs: clickstream events via CDC from Redpanda, and user profiles via incremental sync from PostgreSQL.
# Input ports -- event stream + enrichment tableinputs: - name: clickstream kind: source connection: production-events resource: clickstream.page-views strategy: cdc cursor_field: event_time schedule: "*/5 * * * *" columns: - event_id - user_id - page_url - referrer_url - user_agent - ip_address - event_time - session_id
- name: user_profiles kind: source connection: production-db resource: public.users strategy: incremental cursor_field: updated_at schedule: "0 * * * *" columns: - user_id - username - account_type - signup_date - countrytransform.sql
Section titled “transform.sql”Sessionization transform that groups clickstream events into sessions using a 30-minute inactivity window, then enriches with user profile data.
-- Sessionize clickstream events-- A session ends after 30 minutes of inactivity
WITH ordered_events AS ( SELECT e.user_id, e.page_url, e.referrer_url, e.event_time, LAG(e.event_time) OVER ( PARTITION BY e.user_id ORDER BY e.event_time ) AS prev_event_time FROM {{ ref('clickstream') }} e),
session_boundaries AS ( SELECT *, CASE WHEN prev_event_time IS NULL OR DATEDIFF('minute', prev_event_time, event_time) > 30 THEN 1 ELSE 0 END AS is_new_session FROM ordered_events),
session_ids AS ( SELECT *, SUM(is_new_session) OVER ( PARTITION BY user_id ORDER BY event_time ) AS session_num FROM session_boundaries)
SELECT s.user_id, u.username, u.account_type, u.country, s.session_num, MIN(s.event_time) AS session_start, MAX(s.event_time) AS session_end, DATEDIFF('second', MIN(s.event_time), MAX(s.event_time)) AS session_duration_seconds, COUNT(*) AS page_views, MIN(s.page_url) AS landing_page, MAX(s.page_url) AS exit_page, MIN(s.referrer_url) AS entry_referrerFROM session_ids sLEFT JOIN {{ ref('user_profiles') }} u ON s.user_id = u.user_idGROUP BY s.user_id, u.username, u.account_type, u.country, s.session_numoutput.yaml
Section titled “output.yaml”Output stored in Iceberg format, partitioned by date and country for efficient analytical queries.
# Output specificationoutput: format: iceberg partition_key: - session_start_date - country retention: days: 90 policy: delete schema: - name: user_id type: string description: Unique user identifier - name: username type: string description: User display name - name: account_type type: string description: Account tier (free, pro, enterprise) - name: country type: string description: User country from profile - name: session_num type: integer description: Session sequence number for user - name: session_start type: timestamp description: First event timestamp in session - name: session_end type: timestamp description: Last event timestamp in session - name: session_duration_seconds type: integer description: Duration in seconds - name: page_views type: integer description: Number of page views in session - name: landing_page type: string description: First page visited - name: exit_page type: string description: Last page visited - name: entry_referrer type: string description: Referrer URL for session entryquality.yaml
Section titled “quality.yaml”Quality checks to ensure session data integrity and freshness for real-time use.
# Quality checks -- critical for real-time pipelinequality: checks: - name: no_null_users sql: "SELECT COUNT(*) FROM {{ this }} WHERE user_id IS NULL" severity: critical threshold: 0 description: User ID must never be null
- name: positive_duration sql: "SELECT COUNT(*) FROM {{ this }} WHERE session_duration_seconds < 0" severity: critical threshold: 0 description: Session duration must be non-negative
- name: reasonable_page_views sql: "SELECT COUNT(*) FROM {{ this }} WHERE page_views > 10000" severity: warning threshold: 0 description: Extremely high page view count may indicate bot traffic
- name: freshness sql: "SELECT DATEDIFF('minute', MAX(session_end), CURRENT_TIMESTAMP()) FROM {{ this }}" severity: critical threshold: 15 description: Sessions must be no more than 15 minutes stale
- name: session_count_anomaly sql: > SELECT CASE WHEN COUNT(*) < 100 AND EXTRACT(HOUR FROM CURRENT_TIMESTAMP()) BETWEEN 8 AND 22 THEN 1 ELSE 0 END FROM {{ this }} WHERE session_start >= CURRENT_DATE() severity: warning threshold: 0 description: Unusually low session count during business hoursserving.yaml
Section titled “serving.yaml”Dual-mode serving: analytics engine for dashboards and OLAP queries, lookup store for real-time session lookups by user ID.
# Serving configuration -- dual mode for analytics + real-time lookupserving: modes: - intent: analytics config: engine: starrocks materialized: true refresh_interval: "5m" indexes: - columns: [session_start, country] type: sort_key
- intent: lookup config: engine: postgresql cache_ttl: "1m"compute.yaml
Section titled “compute.yaml”Runs every 5 minutes to maintain real-time freshness, with generous resource allocation for the sessionization window function.
# Compute specification -- high-frequency for real-timecompute: engine: dagster schedule: "*/5 * * * *" resources: cpu: "2" memory: 4Gi timeout: 240 retries: 3 tags: - streaming - high-priorityDeploy This Example
Section titled “Deploy This Example”# 1. Register connectionsakili connection create \ --name production-events \ --connector-type redpanda \ --config '{"brokers": ["broker-1:9092"], "security_protocol": "SASL_SSL"}'
akili connection create \ --name production-db \ --connector-type postgres \ --config '{"host": "db.example.com", "port": 5432, "database": "app"}'
# 2. Test connectivityakili connection test production-eventsakili connection test production-db
# 3. Register the productakili product create \ --name user_sessions \ --namespace analytics \ --archetype source
# 4. Upload manifests and deployakili product deploy user_sessions --version 1.0.0
# 5. Monitor executionakili run list user_sessionsakili governance quality user_sessionsakili governance sla user_sessionsQuerying the Data
Section titled “Querying the Data”Once deployed and materialized:
# Analytics query -- top countries by session volumeakili query " SELECT country, COUNT(*) AS sessions, AVG(session_duration_seconds) AS avg_duration FROM analytics.user_sessions WHERE session_start >= CURRENT_DATE() GROUP BY country ORDER BY sessions DESC LIMIT 10"
# Lookup query -- recent sessions for a specific userakili query " SELECT session_start, session_end, page_views, landing_page FROM analytics.user_sessions WHERE user_id = 'user-12345' ORDER BY session_start DESC LIMIT 5"Related
Section titled “Related”- Ingestion Patterns — CDC and incremental strategies
- Serving Configuration — intent-based routing details
- Advanced Orchestration — adaptive scheduling for real-time