Skip to content
GitLab

Custom Rules

For complex business logic that cannot be expressed with built-in types. Write SQL directly and declare an expected result.

checks:
- name: no_duplicate_outlets_per_day
type: custom_sql
sql: |
SELECT COUNT(*) as failures
FROM {output}
WHERE (outlet_id, sale_date) IN (
SELECT outlet_id, sale_date
FROM {output}
GROUP BY outlet_id, sale_date
HAVING COUNT(*) > 1
)
severity: error
expect: "failures = 0"

Key points:

  • Use {output} as a placeholder for the materialized table. The platform injects the correct tenant-scoped, partition-scoped table name at execution time.
  • The expect field is a simple assertion: "column_name operator value".
  • Supported operators: =, <, >, <=, >=, !=.
- name: territory_coverage
type: custom_sql
sql: |
SELECT COUNT(DISTINCT territory) as actual,
(SELECT COUNT(*) FROM territories WHERE active) as expected
FROM {output}
expect: "actual >= expected * 0.95"
severity: warn
- name: no_future_dates
type: custom_sql
sql: |
SELECT COUNT(*) as future_records
FROM {output}
WHERE sale_date > CURRENT_DATE
expect: "future_records = 0"
severity: error
- name: revenue_sums_match
type: custom_sql
sql: |
SELECT ABS(
(SELECT SUM(total_revenue) FROM {output}) -
(SELECT SUM(line_amount) FROM {output})
) as drift
expect: "drift < 0.01"
severity: error

For statistical analysis, ML drift detection, or any logic that requires Python libraries.

checks:
- name: revenue_distribution_stable
type: custom_python
entrypoint: checks/revenue_drift.py
severity: warn

The entrypoint file must expose a function with this signature:

checks/revenue_drift.py
import pandas as pd
from dataclasses import dataclass
@dataclass
class CheckResult:
passed: bool
message: str
metadata: dict
def check(df: pd.DataFrame, context: dict) -> CheckResult:
"""
Args:
df: The materialized output as a DataFrame.
context: Dict with keys:
- tenant_id: Current tenant
- product_name: Product being checked
- partition_key: Current partition
- previous_df: Last successful materialization (or None)
Returns:
CheckResult with passed, message, and metadata.
"""
if context["previous_df"] is None:
return CheckResult(
passed=True,
message="No previous data to compare",
metadata={}
)
current_mean = df["total_revenue"].mean()
previous_mean = context["previous_df"]["total_revenue"].mean()
drift = abs(current_mean - previous_mean) / previous_mean
return CheckResult(
passed=drift < 0.3, # 30% drift threshold
message=f"Revenue drift: {drift:.1%}",
metadata={"current_mean": current_mean, "previous_mean": previous_mean}
)

The context["previous_df"] enables drift detection by comparing the current materialization against the previous one.


apiVersion: akili/v1
kind: Quality
checks:
# Primary key integrity
- name: pk_not_null
type: completeness
config:
column: outlet_id
threshold: 1.0
severity: error
- name: pk_unique
type: uniqueness
config:
columns: [outlet_id, sale_date]
severity: error
# Freshness
- name: data_fresh
type: freshness
config:
column: updated_at
max_age: 6h
severity: error
# Value constraints
- name: revenue_positive
type: range
config:
column: total_revenue
min: 0
severity: error
- name: valid_territory
type: accepted_values
config:
column: territory_code
values: [NAI, MBS, KSM, NKR, ELD, THK]
severity: error
# Referential integrity
- name: valid_outlet
type: referential
config:
column: outlet_id
reference_product: cleaned-outlets
reference_column: outlet_id
severity: error
# Volume sanity
- name: row_count
type: volume
config:
min_rows: 50
max_rows: 100000
severity: warn
# Statistical stability
- name: avg_revenue_stable
type: statistical
config:
column: total_revenue
metric: mean
min: 500
max: 50000
severity: warn
# Business logic
- name: revenue_matches_count
type: custom_expression
config:
expression: "transaction_count > 0 OR total_revenue = 0"
threshold: 1.0
severity: error
# Complex cross-check
- name: territory_coverage
type: custom_sql
sql: |
SELECT COUNT(DISTINCT territory_code) as covered
FROM {output}
expect: "covered >= 5"
severity: warn