Skip to content
GitLab

Ingestion Patterns

Input ports are how data enters a data product. Every data product declares its inputs in inputs.yaml. This guide covers the available ingestion strategies, how to configure connections, and how to monitor ingestion health.

Akili uses a unified inputs.yaml file that handles two kinds of inputs:

KindPurposeUsed By
type: connectorExternal data from operational systems, files, APIs, or streamsSource-aligned products
type: data_productInternal data from another data product on the meshAggregate and consumer products

Source-aligned products must have at least one connector input. Aggregate and consumer products consume only from other data products.


Before referencing external sources, a platform admin registers connections. Connections are tenant-scoped and referenced by name in manifests — developers never handle credentials directly.

Terminal window
akili connections create \
--name fmcg-erp-db \
--type postgresql \
--host erp.fmcg-tenant.internal \
--port 5432 \
--database erp_production \
--credentials-from vault://fmcg/erp-db

The API equivalent is POST /api/v1/connections. Credentials are stored encrypted and never appear in manifests or logs.

Each connector type defines a health check query. After registration, verify:

Terminal window
akili connections test fmcg-erp-db

The platform also runs periodic health checks and reports status in the Portal.


The connector registry is a plugin architecture. Each connector type is defined by a YAML file in the platform, supporting specific ingestion strategies.

ConnectorCategorySupported Strategies
PostgreSQLDatabasecdc, incremental, full_refresh, snapshot_diff
MySQLDatabasecdc, incremental, full_refresh, snapshot_diff
REST APIAPIapi_poll, incremental
S3/SFTPFilefile_watch, full_refresh
Event BusStreamingstream, event
OracleDatabaseincremental, full_refresh, snapshot_diff (CDC in v2)

Best for capturing real-time changes from transactional databases. The platform reads the database’s write-ahead log (WAL) — it never queries the source for “what changed.”

How it works:

  1. The CDC connector maintains a continuous change stream from the source WAL
  2. Each source gets a dedicated event stream topic
  3. New batches of change events trigger a materialization of the source asset
inputs:
- id: raw-transactions
type: connector
connector_ref: pg-production
ingestion_strategy: cdc
ingestion_config:
primary_key: transaction_id
timeout: 2h
fallback: fail

Supported sources: PostgreSQL, MySQL (v1). Oracle, SQL Server planned for v2.

  1. Enable logical replication on the source database

    In postgresql.conf:

    wal_level = logical
    max_replication_slots = 4
    max_wal_senders = 4
  2. Create a replication user

    CREATE ROLE akili_cdc WITH REPLICATION LOGIN PASSWORD 'secure-password';
    GRANT SELECT ON ALL TABLES IN SCHEMA public TO akili_cdc;
  3. Register the connection in Akili

    Terminal window
    akili connections create \
    --name pg-production \
    --type postgresql \
    --host db.production.internal \
    --port 5432 \
    --database production \
    --credentials-from vault://prod/pg-cdc
  4. Configure the input port

    inputs:
    - id: raw-orders
    type: connector
    connector_ref: pg-production
    ingestion_strategy: cdc
    ingestion_config:
    primary_key: order_id
  5. Deploy and verify

    Terminal window
    akili validate my-product/
    akili deploy my-product/

    The platform creates a replication slot, a dedicated event stream topic, and a platform sensor that triggers on new change events.

Best for sources with a reliable “last modified” timestamp. Each run queries records newer than the previous high-water mark.

How it works:

  1. Each run executes SELECT * FROM source WHERE {cursor_field} > {high_water_mark}
  2. After successful extraction, the high-water mark advances atomically
  3. If extraction fails mid-run, the high-water mark stays put — the next run retries from the same point
inputs:
- id: raw-customers
type: connector
connector_ref: crm-database
ingestion_strategy: incremental
ingestion_config:
cursor_field: updated_at
primary_key: customer_id
timeout: 2h
fallback: fail

State tracking: The platform stores the high-water mark in the state store:

-- Platform-managed table (you don't create this)
ingestion_state (
tenant_id, product_id, port_name,
cursor_field, high_water_mark,
last_run_at, rows_extracted, status
)

Best for small reference tables or sources with no reliable change tracking. Every run extracts the complete dataset and replaces the previous landing.

inputs:
- id: country-codes
type: connector
connector_ref: reference-db
ingestion_strategy: full_refresh
timeout: 30m
fallback: fail

No cursor or state tracking. The platform records only execution history (when, duration, row count) for observability.

Best for legacy systems with no WAL access and no reliable timestamps. Takes a full refresh but compares against the previous snapshot to detect inserts, updates, and deletes — CDC-like behavior without requiring WAL access.

inputs:
- id: customer-master
type: connector
connector_ref: oracle-readonly
ingestion_strategy: snapshot_diff
ingestion_config:
primary_key: [customer_id]
timeout: 1h
fallback: fail

More expensive than cursor-based (full read every time) but works on any source that supports full reads.

Best for real-time event consumption from message brokers. The platform registers a consumer group and processes events as they arrive.

inputs:
- id: payment-events
type: connector
connector_ref: payments-redpanda
ingestion_strategy: stream
ingestion_config:
topic: payments.completed
consumer_group: analytics-payments # optional

State tracking uses event bus consumer group offsets — no platform-side cursor tracking needed.

Best for batch file drops from legacy systems or partners. The platform watches a directory for new files and tracks processed files to prevent reprocessing.

inputs:
- id: daily-inventory
type: connector
connector_ref: legacy-sftp
ingestion_strategy: file_watch
ingestion_config:
resource: /exports/INV_*.DAT
format: fixed_width
format_spec:
columns:
- name: sku_code
start: 0
width: 12
- name: warehouse
start: 12
width: 8
- name: quantity
start: 20
width: 10
type: integer

Supports glob patterns. Files are tracked by path + SHA-256 hash.

Best for REST APIs that support pagination. The platform polls on schedule and handles pagination automatically.

inputs:
- id: weather-data
type: connector
connector_ref: weather-api
ingestion_strategy: api_poll
ingestion_config:
url: /api/v2/weather/daily
method: GET
headers:
Accept: application/json
pagination:
type: cursor
cursor_param: next_cursor

When an input times out or fails, the fallback field determines behavior:

FallbackBehavior
fail (default)Abort execution. Emit failure event. Enter retry/DLQ flow.
skipRemove this input and proceed if remaining required inputs are ready
use_cachedSubstitute last successful materialization and re-evaluate

Retries are configured in compute.yaml (not inputs.yaml) since they apply to the entire execution:

compute.yaml
retry:
max_attempts: 3
backoff: exponential # fixed | exponential
initial_delay: 30s

With exponential backoff, delays double each attempt: 30s, 60s, 120s. After all retries are exhausted, the execution enters the Dead Letter Queue (DLQ).

Validate data quality before it enters the transform pipeline:

inputs:
- id: raw-transactions
type: connector
connector_ref: pg-production
ingestion_strategy: incremental
ingestion_config:
cursor_field: updated_at
primary_key: transaction_id
fitness:
- type: row_count_min
threshold: 100
- type: schema_match
severity: error # error = block, warn = log and continue

If an error-severity fitness function fails, the input is rejected and the fallback logic activates. warn-severity failures are logged but do not block execution.


When a product has multiple inputs, the platform correlates their availability before execution:

For each input:
if materialized for current partition -> mark READY
else -> mark WAITING
if all required inputs READY -> EXECUTE
else if any input past timeout -> apply fallback
else -> WAIT (sensor polls every 30s)

This is implemented as a platform sensor. The platform handles all the coordination — you just declare timeouts and fallbacks.


MetricSourceAlert Threshold
FreshnessLast materialization timestamp2x schedule interval
VolumeRow count per extractionAnomaly detection (>50% deviation)
DurationExtraction wall time3x historical p95
Error rateFailed extractions / totalAny persistent failure
Terminal window
# Check ingestion state for a product
akili product status my-product
# View recent extraction history
akili product runs my-product --limit 10

The Portal displays ingestion health dashboards with freshness, volume trends, and error history for each product.