Skip to content
GitLab

Step 1: Declare

Scaffold the product directory with all 6 manifest files:

Terminal window
akili init customer-orders --archetype aggregate

This creates:

customer-orders/
product.yaml
inputs.yaml
output.yaml
serving.yaml
quality.yaml
compute.yaml
logic/
transform.sql

Replace the scaffolded files with the following content.

apiVersion: akili/v1
kind: DataProduct
metadata:
name: customer-orders
domain: commerce
version: 1.0.0
owner: analytics-team
description: >
Aggregates cleaned customer data with order events to produce
per-customer order metrics. Powers the customer insights dashboard
and the mobile app customer profile.
tags:
- customers
- orders
- commerce
classification: internal
contacts:
- name: Amina Wanjiku
role: product-owner
email: amina.wanjiku@example.com
retention:
period: "730d"
basis: event_time
review_date: "2027-01-01"

Key decisions:

  • domain: commerce — groups this product with other commerce data products
  • classification: internal — accessible to all team members (no PII in the output)
  • retention: 730d — two years of data retention
apiVersion: akili/v1
kind: Inputs
inputs:
- id: cleaned-customers
type: data_product
version: ">=1.0.0"
timeout: 6h
fallback: use_cached
optional: false
- id: cleaned-orders
type: data_product
version: ">=1.0.0"
timeout: 4h
fallback: fail
partition_mapping: same_day
defaults:
timeout: 4h
fallback: fail

Key decisions:

  • cleaned-customers uses fallback: use_cached — if the customer data is late, use the last successful version rather than blocking
  • cleaned-orders uses fallback: fail — order data is critical, we cannot proceed without it
  • partition_mapping: same_day — orders and the output share the same daily partition
apiVersion: akili/v1
kind: Output
schema:
- name: customer_id
type: string
primary_key: true
role: identity
nullable: false
description: Unique customer identifier
- name: customer_name
type: string
nullable: false
role: attribute
description: Customer display name
- name: email
type: string
nullable: true
role: attribute
description: Customer email address
- name: order_date
type: date
primary_key: true
role: attribute
nullable: false
description: Date of aggregated orders
- name: total_orders
type: integer
nullable: false
role: measure
description: Number of orders placed on this date
- name: total_revenue
type: "decimal(18,2)"
nullable: false
role: measure
description: Sum of order amounts for the day
- name: avg_order_value
type: "decimal(10,2)"
nullable: true
role: measure
description: Average order value (null if zero orders)
- name: first_order_at
type: timestamp
nullable: true
role: attribute
description: Timestamp of first order on this date
- name: last_order_at
type: timestamp
nullable: true
role: attribute
description: Timestamp of last order on this date
- name: updated_at
type: timestamp
nullable: false
description: When this row was last computed
format: parquet
partitioning:
- field: order_date
granularity: day

Key decisions:

  • Composite primary key: customer_id + order_date (one row per customer per day)
  • customer_id has role: identity — it is the entity’s natural key
  • Numeric aggregation columns use role: measure — enabling correct aggregation in analytics tools
  • Partitioned by order_date for efficient time-range queries
apiVersion: akili/v1
kind: Serving
endpoints:
- type: lookup
description: Customer order details for Portal profile pages
config:
index_columns:
- customer_id
- order_date
- type: analytics
description: Customer order analytics for Superset dashboards
- type: realtime
description: Today's order totals per customer for live dashboard
config:
key_template: "customer:{customer_id}:orders:{order_date}"
ttl: 24h
include_columns:
- customer_id
- order_date
- total_orders
- total_revenue
visualization:
enabled: true
dashboard_template: customer-orders-overview
refresh_interval: 15m

Three serving endpoints:

  1. Lookup — Portal fetches individual customer profiles by ID
  2. Analytics — StarRocks federation for OLAP dashboards (no data movement)
  3. Realtime — Redis for live dashboard tiles with 24h TTL
apiVersion: akili/v1
kind: Quality
checks:
# Primary key integrity
- name: customer_id_complete
type: completeness
config:
column: customer_id
threshold: 1.0
severity: error
- name: unique_customer_day
type: uniqueness
config:
columns:
- customer_id
- order_date
severity: error
# Freshness
- name: data_freshness
type: freshness
config:
column: updated_at
max_age: 6h
severity: error
# Value constraints
- name: revenue_non_negative
type: range
config:
column: total_revenue
min: 0
severity: error
- name: orders_non_negative
type: range
config:
column: total_orders
min: 0
severity: error
# Referential integrity
- name: valid_customer_ref
type: referential
config:
column: customer_id
reference_product: cleaned-customers
reference_column: customer_id
severity: error
# Volume sanity
- name: row_volume
type: volume
config:
min_rows: 10
max_rows: 10000000
severity: warn
# Business logic
- name: avg_matches_total
type: custom_expression
config:
expression: "total_orders = 0 OR avg_order_value IS NOT NULL"
threshold: 1.0
severity: error
# Cross-check via SQL
- name: revenue_consistent
type: custom_sql
sql: |
SELECT COUNT(*) as inconsistent
FROM {output}
WHERE total_orders > 0
AND total_revenue <= 0
expect: "inconsistent = 0"
severity: error

Nine quality checks covering:

  • Primary key integrity (completeness + uniqueness)
  • Data freshness (within 6 hours)
  • Value constraints (non-negative revenue and orders)
  • Referential integrity (customers exist in upstream)
  • Volume sanity (warning on anomalies)
  • Business logic (average order value consistency)
  • Cross-check (revenue should be positive when orders exist)
apiVersion: akili/v1
kind: Compute
runtime: sql
mode: transform
engine: auto
schedule:
type: event
resources:
cpu: "1"
memory: "2Gi"
timeout: 30m
entrypoint: logic/transform.sql
retry:
max_attempts: 3
backoff: exponential
initial_delay: 30s

Key decisions:

  • schedule.type: event — runs when both inputs are materialized (not on a cron)
  • engine: auto — platform selects DuckDB or Spark based on input size
  • resources — adequate for a medium-sized join
-- Inputs available as tables matching their ids from inputs.yaml:
-- cleaned_customers, cleaned_orders
SELECT
c.customer_id,
c.customer_name,
c.email,
o.order_date,
COUNT(o.order_id) AS total_orders,
COALESCE(SUM(o.amount), 0) AS total_revenue,
CASE
WHEN COUNT(o.order_id) > 0
THEN SUM(o.amount) / COUNT(o.order_id)
ELSE NULL
END AS avg_order_value,
MIN(o.order_timestamp) AS first_order_at,
MAX(o.order_timestamp) AS last_order_at,
NOW() AS updated_at
FROM cleaned_customers c
LEFT JOIN cleaned_orders o
ON c.customer_id = o.customer_id
GROUP BY
c.customer_id,
c.customer_name,
c.email,
o.order_date

Note: Input table names match the id fields from inputs.yaml, with hyphens converted to underscores. cleaned-customers becomes cleaned_customers.