Skip to content
GitLab

Real-Time Event Pipeline

A complete working example of a real-time event processing pipeline on the Akili platform. Ingests clickstream events from a Redpanda topic via CDC, applies sessionization transforms, enforces quality gates, and serves the results through both the analytics engine (for dashboards) and a lookup store (for real-time queries).

%%{init: {'flowchart': {'curve': 'basis'}}}%%
flowchart LR
    subgraph Source["Event Source"]
        CLICKS[Clickstream Events]
        USERS[User Profiles]
    end

    subgraph Akili["Akili Platform"]
        CDC[CDC Ingestion] --> TRANSFORM[Sessionization\nTransform]
        INC[Incremental Sync] --> TRANSFORM
        TRANSFORM --> QUALITY[Quality Gates]
        QUALITY --> LAKE[Data Lake]
    end

    subgraph Serving["Serving Stores"]
        SR[Analytics Engine\nStarRocks]
        PG[Lookup Store\nPostgreSQL]
    end

    CLICKS --> CDC
    USERS --> INC
    LAKE --> SR
    LAKE --> PG

Product identity for a streaming clickstream sessionization pipeline.

# Product manifest -- streaming event pipeline
product:
name: user_sessions
namespace: analytics
version: "1.0.0"
archetype: source
description: >
Sessionizes raw clickstream events into user sessions.
Streams from the production clickstream topic via CDC.
Sessions are defined by a 30-minute inactivity window.
owner: analytics-team@example.com
tags:
- streaming
- clickstream
- sessions
- real-time

Two inputs: clickstream events via CDC from Redpanda, and user profiles via incremental sync from PostgreSQL.

# Input ports -- event stream + enrichment table
inputs:
- name: clickstream
kind: source
connection: production-events
resource: clickstream.page-views
strategy: cdc
cursor_field: event_time
schedule: "*/5 * * * *"
columns:
- event_id
- user_id
- page_url
- referrer_url
- user_agent
- ip_address
- event_time
- session_id
- name: user_profiles
kind: source
connection: production-db
resource: public.users
strategy: incremental
cursor_field: updated_at
schedule: "0 * * * *"
columns:
- user_id
- username
- account_type
- signup_date
- country

Sessionization transform that groups clickstream events into sessions using a 30-minute inactivity window, then enriches with user profile data.

-- Sessionize clickstream events
-- A session ends after 30 minutes of inactivity
WITH ordered_events AS (
SELECT
e.user_id,
e.page_url,
e.referrer_url,
e.event_time,
LAG(e.event_time) OVER (
PARTITION BY e.user_id
ORDER BY e.event_time
) AS prev_event_time
FROM {{ ref('clickstream') }} e
),
session_boundaries AS (
SELECT
*,
CASE
WHEN prev_event_time IS NULL
OR DATEDIFF('minute', prev_event_time, event_time) > 30
THEN 1
ELSE 0
END AS is_new_session
FROM ordered_events
),
session_ids AS (
SELECT
*,
SUM(is_new_session) OVER (
PARTITION BY user_id
ORDER BY event_time
) AS session_num
FROM session_boundaries
)
SELECT
s.user_id,
u.username,
u.account_type,
u.country,
s.session_num,
MIN(s.event_time) AS session_start,
MAX(s.event_time) AS session_end,
DATEDIFF('second', MIN(s.event_time), MAX(s.event_time)) AS session_duration_seconds,
COUNT(*) AS page_views,
MIN(s.page_url) AS landing_page,
MAX(s.page_url) AS exit_page,
MIN(s.referrer_url) AS entry_referrer
FROM session_ids s
LEFT JOIN {{ ref('user_profiles') }} u ON s.user_id = u.user_id
GROUP BY s.user_id, u.username, u.account_type, u.country, s.session_num

Output stored in Iceberg format, partitioned by date and country for efficient analytical queries.

# Output specification
output:
format: iceberg
partition_key:
- session_start_date
- country
retention:
days: 90
policy: delete
schema:
- name: user_id
type: string
description: Unique user identifier
- name: username
type: string
description: User display name
- name: account_type
type: string
description: Account tier (free, pro, enterprise)
- name: country
type: string
description: User country from profile
- name: session_num
type: integer
description: Session sequence number for user
- name: session_start
type: timestamp
description: First event timestamp in session
- name: session_end
type: timestamp
description: Last event timestamp in session
- name: session_duration_seconds
type: integer
description: Duration in seconds
- name: page_views
type: integer
description: Number of page views in session
- name: landing_page
type: string
description: First page visited
- name: exit_page
type: string
description: Last page visited
- name: entry_referrer
type: string
description: Referrer URL for session entry

Quality checks to ensure session data integrity and freshness for real-time use.

# Quality checks -- critical for real-time pipeline
quality:
checks:
- name: no_null_users
sql: "SELECT COUNT(*) FROM {{ this }} WHERE user_id IS NULL"
severity: critical
threshold: 0
description: User ID must never be null
- name: positive_duration
sql: "SELECT COUNT(*) FROM {{ this }} WHERE session_duration_seconds < 0"
severity: critical
threshold: 0
description: Session duration must be non-negative
- name: reasonable_page_views
sql: "SELECT COUNT(*) FROM {{ this }} WHERE page_views > 10000"
severity: warning
threshold: 0
description: Extremely high page view count may indicate bot traffic
- name: freshness
sql: "SELECT DATEDIFF('minute', MAX(session_end), CURRENT_TIMESTAMP()) FROM {{ this }}"
severity: critical
threshold: 15
description: Sessions must be no more than 15 minutes stale
- name: session_count_anomaly
sql: >
SELECT CASE
WHEN COUNT(*) < 100 AND EXTRACT(HOUR FROM CURRENT_TIMESTAMP()) BETWEEN 8 AND 22
THEN 1 ELSE 0 END
FROM {{ this }}
WHERE session_start >= CURRENT_DATE()
severity: warning
threshold: 0
description: Unusually low session count during business hours

Dual-mode serving: analytics engine for dashboards and OLAP queries, lookup store for real-time session lookups by user ID.

# Serving configuration -- dual mode for analytics + real-time lookup
serving:
modes:
- intent: analytics
config:
engine: starrocks
materialized: true
refresh_interval: "5m"
indexes:
- columns: [session_start, country]
type: sort_key
- intent: lookup
config:
engine: postgresql
cache_ttl: "1m"

Runs every 5 minutes to maintain real-time freshness, with generous resource allocation for the sessionization window function.

# Compute specification -- high-frequency for real-time
compute:
engine: dagster
schedule: "*/5 * * * *"
resources:
cpu: "2"
memory: 4Gi
timeout: 240
retries: 3
tags:
- streaming
- high-priority
Terminal window
# 1. Register connections
akili connection create \
--name production-events \
--connector-type redpanda \
--config '{"brokers": ["broker-1:9092"], "security_protocol": "SASL_SSL"}'
akili connection create \
--name production-db \
--connector-type postgres \
--config '{"host": "db.example.com", "port": 5432, "database": "app"}'
# 2. Test connectivity
akili connection test production-events
akili connection test production-db
# 3. Register the product
akili product create \
--name user_sessions \
--namespace analytics \
--archetype source
# 4. Upload manifests and deploy
akili product deploy user_sessions --version 1.0.0
# 5. Monitor execution
akili run list user_sessions
akili governance quality user_sessions
akili governance sla user_sessions

Once deployed and materialized:

Terminal window
# Analytics query -- top countries by session volume
akili query "
SELECT country, COUNT(*) AS sessions, AVG(session_duration_seconds) AS avg_duration
FROM analytics.user_sessions
WHERE session_start >= CURRENT_DATE()
GROUP BY country
ORDER BY sessions DESC
LIMIT 10
"
# Lookup query -- recent sessions for a specific user
akili query "
SELECT session_start, session_end, page_views, landing_page
FROM analytics.user_sessions
WHERE user_id = 'user-12345'
ORDER BY session_start DESC
LIMIT 5
"