Skip to content
GitLab

compute.yaml

Declares how the product executes. Most fields have sensible defaults; many products won’t need this file at all beyond the entrypoint.

apiVersion: akili/v1
kind: Compute
runtime: sql
mode: transform
engine: auto
schedule:
type: cron
expression: "0 6 * * *"
timezone: Africa/Nairobi
resources:
cpu: "500m"
memory: "1Gi"
timeout: 30m
entrypoint: logic/transform.sql
retry:
max_attempts: 3
backoff: exponential
initial_delay: 30s
requirements: # Python only
- pandas>=2.0
- scikit-learn>=1.3
FieldTypeRequiredDefaultValidationDescription
runtimeenumNoInferred from entrypoint extensionsql, pythonsql uses DuckDB/Spark SQL. python uses Python process.
modeenumNotransformtransform, train, inferenceExecution mode. See ML as Data Products below.
engineenumNoautoauto, duckdb, sparkauto selects DuckDB (<10GB) or Spark (>=10GB).
schedule.typeenumNoeventcron, event, manualTrigger type
schedule.expressionstringCron onlyStandard 5-field cronCron schedule expression
schedule.timezonestringNoUTCIANA timezoneSchedule timezone
resources.cpustringNo500mK8s CPU formatCPU request
resources.memorystringNo1GiK8s memory formatMemory request
resources.timeoutdurationNo30mDuration stringMax execution wall time
entrypointstringNologic/transform.sql or logic/transform.pyRelative path, must existPath to logic file
retry.max_attemptsintNo3Total attempts including first
retry.backoffenumNoexponentialfixed, exponentialfixed = constant delay. exponential = doubles each attempt.
retry.initial_delaydurationNo30sDuration stringDelay before first retry
requirementsstring[]NononePython package specifiersAdditional Python packages. Pre-installed: pandas, numpy, scikit-learn, pyarrow.
schedule.typeDagster ImplementationWhen It Runs
cronScheduleDefinition with cron expressionAt scheduled time
event@sensor watching upstream data.available events on RedpandaWhen all required inputs are materialized
manualNo automationOnly via akili run CLI or POST /api/v1/products/:id/run

Default is event — the product runs when its inputs are ready. For source-aligned products with no internal inputs, cron or manual is typical.

The mode field enables three data product behaviors through the same manifest structure:

ModeEntrypointInputsOutput
transformtransform.sql or transform.pyDataFramesDataFrame to Iceberg table
traintrain.pyDataFramesModel artifact to S3 (/tenants/{id}/models/{name}/{version}/)
inferencepredict.pyDataFrames + model artifactDataFrame to Iceberg table

Training entrypoint:

def train(inputs: dict[str, pd.DataFrame], context: dict) -> ModelArtifact:
"""Train a model and return it for storage."""

Inference entrypoint:

def predict(inputs: dict[str, pd.DataFrame], model: Any, context: dict) -> pd.DataFrame:
"""Load model, score inputs, return predictions."""

SQL transforms run against DuckDB (default) or Spark SQL (large data). Input data products are available as table references matching their id from inputs.yaml, with hyphens replaced by underscores.

-- logic/transform.sql
-- Inputs available as tables: cleaned_outlets, cleaned_transactions
SELECT
t.outlet_id,
t.transaction_date AS sale_date,
SUM(t.amount) AS total_revenue,
COUNT(*) AS transaction_count,
AVG(t.amount) AS avg_basket_size,
o.territory_code,
NOW() AS updated_at
FROM cleaned_transactions t
JOIN cleaned_outlets o ON t.outlet_id = o.outlet_id
WHERE t.transaction_date = '{{ partition_key }}'
GROUP BY t.outlet_id, t.transaction_date, o.territory_code

{{ partition_key }} is injected by the platform at execution time. For non-partitioned products, the full dataset is available without filtering.

logic/transform.py
import pandas as pd
def transform(inputs: dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
Args:
inputs: Dict mapping input id to DataFrame.
Keys match inputs.yaml ids (hyphens replaced with underscores).
Returns:
DataFrame matching the schema declared in output.yaml.
"""
transactions = inputs["cleaned_transactions"]
outlets = inputs["cleaned_outlets"]
merged = transactions.merge(outlets, on="outlet_id")
result = (
merged
.groupby(["outlet_id", "transaction_date", "territory_code"])
.agg(
total_revenue=("amount", "sum"),
transaction_count=("amount", "count"),
avg_basket_size=("amount", "mean"),
)
.reset_index()
.rename(columns={"transaction_date": "sale_date"})
)
result["updated_at"] = pd.Timestamp.now(tz="UTC")
return result

Downstream products reference upstream semantic intents via template functions:

-- Reference semantic intent tier values in SQL
SELECT customer_id, store_id,
SUM(net_revenue) AS total_spend
FROM store_performance
WHERE customer_segment IN ({{ intent('customer_classification', 'high_value') }})
GROUP BY 1, 2
FunctionDescriptionResolves To
{{ intent('<name>', '<tier>') }}Current tier values as comma-separated quoted list'Engaged' or 'Engaged', 'Premium'
{{ intent_column('<name>') }}Current column name for the intentcustomer_segment

Template resolution happens during codegen (akili codegen), not at raw SQL execution time. Unresolvable templates fail the build with a clear error.

  1. SQL files must be valid SQL syntax (parse-checked at akili validate).
  2. Python files must expose a transform(), train(), or predict() function matching the mode in compute.yaml.
  3. Python files can import from requirements listed in compute.yaml plus pre-installed packages.
  4. Logic files must not perform I/O directly. All I/O is handled by the platform.
  5. Logic files receive data and return data. Side effects are forbidden.
  6. Internal decomposition (CTEs, helper functions, modular SQL) is private. No external product can reference internal structures. A product’s public interface is its output schema only.