Ingestion Patterns
Input ports are how data enters a data product. Every data product declares its inputs in inputs.yaml. This guide covers the available ingestion strategies, how to configure connections, and how to monitor ingestion health.
Input Port Types
Section titled “Input Port Types”Akili uses a unified inputs.yaml file that handles two kinds of inputs:
| Kind | Purpose | Used By |
|---|---|---|
type: connector | External data from operational systems, files, APIs, or streams | Source-aligned products |
type: data_product | Internal data from another data product on the mesh | Aggregate and consumer products |
Source-aligned products must have at least one connector input. Aggregate and consumer products consume only from other data products.
Connection Configuration
Section titled “Connection Configuration”Before referencing external sources, a platform admin registers connections. Connections are tenant-scoped and referenced by name in manifests — developers never handle credentials directly.
Register a Connection
Section titled “Register a Connection”akili connections create \ --name fmcg-erp-db \ --type postgresql \ --host erp.fmcg-tenant.internal \ --port 5432 \ --database erp_production \ --credentials-from vault://fmcg/erp-dbThe API equivalent is POST /api/v1/connections. Credentials are stored encrypted and never appear in manifests or logs.
Connection Health Checks
Section titled “Connection Health Checks”Each connector type defines a health check query. After registration, verify:
akili connections test fmcg-erp-dbThe platform also runs periodic health checks and reports status in the Portal.
Connector Catalog
Section titled “Connector Catalog”The connector registry is a plugin architecture. Each connector type is defined by a YAML file in the platform, supporting specific ingestion strategies.
| Connector | Category | Supported Strategies |
|---|---|---|
| PostgreSQL | Database | cdc, incremental, full_refresh, snapshot_diff |
| MySQL | Database | cdc, incremental, full_refresh, snapshot_diff |
| REST API | API | api_poll, incremental |
| S3/SFTP | File | file_watch, full_refresh |
| Event Bus | Streaming | stream, event |
| Oracle | Database | incremental, full_refresh, snapshot_diff (CDC in v2) |
Ingestion Strategies
Section titled “Ingestion Strategies”CDC (Change Data Capture)
Section titled “CDC (Change Data Capture)”Best for capturing real-time changes from transactional databases. The platform reads the database’s write-ahead log (WAL) — it never queries the source for “what changed.”
How it works:
- The CDC connector maintains a continuous change stream from the source WAL
- Each source gets a dedicated event stream topic
- New batches of change events trigger a materialization of the source asset
inputs: - id: raw-transactions type: connector connector_ref: pg-production ingestion_strategy: cdc ingestion_config: primary_key: transaction_id timeout: 2h fallback: failSupported sources: PostgreSQL, MySQL (v1). Oracle, SQL Server planned for v2.
PostgreSQL CDC Setup Walkthrough
Section titled “PostgreSQL CDC Setup Walkthrough”-
Enable logical replication on the source database
In
postgresql.conf:wal_level = logicalmax_replication_slots = 4max_wal_senders = 4 -
Create a replication user
CREATE ROLE akili_cdc WITH REPLICATION LOGIN PASSWORD 'secure-password';GRANT SELECT ON ALL TABLES IN SCHEMA public TO akili_cdc; -
Register the connection in Akili
Terminal window akili connections create \--name pg-production \--type postgresql \--host db.production.internal \--port 5432 \--database production \--credentials-from vault://prod/pg-cdc -
Configure the input port
inputs:- id: raw-orderstype: connectorconnector_ref: pg-productioningestion_strategy: cdcingestion_config:primary_key: order_id -
Deploy and verify
Terminal window akili validate my-product/akili deploy my-product/The platform creates a replication slot, a dedicated event stream topic, and a platform sensor that triggers on new change events.
Incremental (Cursor-Based)
Section titled “Incremental (Cursor-Based)”Best for sources with a reliable “last modified” timestamp. Each run queries records newer than the previous high-water mark.
How it works:
- Each run executes
SELECT * FROM source WHERE {cursor_field} > {high_water_mark} - After successful extraction, the high-water mark advances atomically
- If extraction fails mid-run, the high-water mark stays put — the next run retries from the same point
inputs: - id: raw-customers type: connector connector_ref: crm-database ingestion_strategy: incremental ingestion_config: cursor_field: updated_at primary_key: customer_id timeout: 2h fallback: failState tracking: The platform stores the high-water mark in the state store:
-- Platform-managed table (you don't create this)ingestion_state ( tenant_id, product_id, port_name, cursor_field, high_water_mark, last_run_at, rows_extracted, status)Full Refresh
Section titled “Full Refresh”Best for small reference tables or sources with no reliable change tracking. Every run extracts the complete dataset and replaces the previous landing.
inputs: - id: country-codes type: connector connector_ref: reference-db ingestion_strategy: full_refresh timeout: 30m fallback: failNo cursor or state tracking. The platform records only execution history (when, duration, row count) for observability.
Snapshot Diff
Section titled “Snapshot Diff”Best for legacy systems with no WAL access and no reliable timestamps. Takes a full refresh but compares against the previous snapshot to detect inserts, updates, and deletes — CDC-like behavior without requiring WAL access.
inputs: - id: customer-master type: connector connector_ref: oracle-readonly ingestion_strategy: snapshot_diff ingestion_config: primary_key: [customer_id] timeout: 1h fallback: failMore expensive than cursor-based (full read every time) but works on any source that supports full reads.
Stream
Section titled “Stream”Best for real-time event consumption from message brokers. The platform registers a consumer group and processes events as they arrive.
inputs: - id: payment-events type: connector connector_ref: payments-redpanda ingestion_strategy: stream ingestion_config: topic: payments.completed consumer_group: analytics-payments # optionalState tracking uses event bus consumer group offsets — no platform-side cursor tracking needed.
File Watch
Section titled “File Watch”Best for batch file drops from legacy systems or partners. The platform watches a directory for new files and tracks processed files to prevent reprocessing.
inputs: - id: daily-inventory type: connector connector_ref: legacy-sftp ingestion_strategy: file_watch ingestion_config: resource: /exports/INV_*.DAT format: fixed_width format_spec: columns: - name: sku_code start: 0 width: 12 - name: warehouse start: 12 width: 8 - name: quantity start: 20 width: 10 type: integerSupports glob patterns. Files are tracked by path + SHA-256 hash.
API Polling
Section titled “API Polling”Best for REST APIs that support pagination. The platform polls on schedule and handles pagination automatically.
inputs: - id: weather-data type: connector connector_ref: weather-api ingestion_strategy: api_poll ingestion_config: url: /api/v2/weather/daily method: GET headers: Accept: application/json pagination: type: cursor cursor_param: next_cursorError Handling and Retries
Section titled “Error Handling and Retries”Fallback Strategies
Section titled “Fallback Strategies”When an input times out or fails, the fallback field determines behavior:
| Fallback | Behavior |
|---|---|
fail (default) | Abort execution. Emit failure event. Enter retry/DLQ flow. |
skip | Remove this input and proceed if remaining required inputs are ready |
use_cached | Substitute last successful materialization and re-evaluate |
Retry Configuration
Section titled “Retry Configuration”Retries are configured in compute.yaml (not inputs.yaml) since they apply to the entire execution:
retry: max_attempts: 3 backoff: exponential # fixed | exponential initial_delay: 30sWith exponential backoff, delays double each attempt: 30s, 60s, 120s. After all retries are exhausted, the execution enters the Dead Letter Queue (DLQ).
Ingestion-Time Fitness Functions
Section titled “Ingestion-Time Fitness Functions”Validate data quality before it enters the transform pipeline:
inputs: - id: raw-transactions type: connector connector_ref: pg-production ingestion_strategy: incremental ingestion_config: cursor_field: updated_at primary_key: transaction_id fitness: - type: row_count_min threshold: 100 - type: schema_match severity: error # error = block, warn = log and continueIf an error-severity fitness function fails, the input is rejected and the fallback logic activates. warn-severity failures are logged but do not block execution.
Multi-Input Correlation
Section titled “Multi-Input Correlation”When a product has multiple inputs, the platform correlates their availability before execution:
For each input: if materialized for current partition -> mark READY else -> mark WAITING
if all required inputs READY -> EXECUTEelse if any input past timeout -> apply fallbackelse -> WAIT (sensor polls every 30s)This is implemented as a platform sensor. The platform handles all the coordination — you just declare timeouts and fallbacks.
Monitoring Ingestion Health
Section titled “Monitoring Ingestion Health”Key Metrics
Section titled “Key Metrics”| Metric | Source | Alert Threshold |
|---|---|---|
| Freshness | Last materialization timestamp | 2x schedule interval |
| Volume | Row count per extraction | Anomaly detection (>50% deviation) |
| Duration | Extraction wall time | 3x historical p95 |
| Error rate | Failed extractions / total | Any persistent failure |
Checking Status
Section titled “Checking Status”# Check ingestion state for a productakili product status my-product
# View recent extraction historyakili product runs my-product --limit 10The Portal displays ingestion health dashboards with freshness, volume trends, and error history for each product.
Next Steps
Section titled “Next Steps”- Writing Manifests — Full reference for all 6 manifest files
- Quality Rules — Post-ingestion quality gates
- End-to-End Tutorial — Complete lifecycle walkthrough