Streaming

Streams are durable, append-only tables that serve as the ingestion layer for event data. They provide HTTP webhook endpoints, a programmatic publish/subscribe API, and cursor-based batch consumption – all backed by your database.

Defining a Stream

Use the @stream decorator to declare a stream. The decorated function itself is never executed – it serves as a registration point for stream metadata.

from interlace import stream

@stream(
    name="user_events",
    schema="events",
    connection="default",
    cursor="rowid",
    fields={"user_id": "string", "action": "string", "timestamp": "timestamp"},
    description="User interaction events",
    tags=["ingestion", "clickstream"],
    owner="data-team",
)
def user_events():
    pass

Decorator Parameters

ParameterTypeDefaultDescription
namestr \| Nonefunc nameStream name (defaults to the function name)
schemastr"events"Database schema for the stream table
connectionstr \| Noneproject defaultNamed connection to use
cursorstr"rowid"Cursor column for incremental processing
fieldsdict \| NoneNoneSchema definition (e.g. {"id": "int64", "data": "string"})
descriptionstr \| NoneNoneHuman-readable description
tagslist[str][]Tags for categorisation and filtering
ownerstr \| NoneNoneOwner identifier
authdict \| NoneNonePer-stream authentication config
rate_limitdict \| NoneNoneRate limiting config
validate_schemaboolFalseValidate incoming events against fields
retentiondict \| NoneNoneData retention policy

Publishing Events

The publish function appends events to a stream table. It works both as a standalone call and inside the Interlace service (where it also triggers downstream models automatically).

Async

from interlace import publish

# Single event
result = await publish("user_events", {"user_id": "u_123", "action": "click"})

# Batch of events
result = await publish("user_events", [
    {"user_id": "u_123", "action": "click"},
    {"user_id": "u_124", "action": "signup"},
])

# Using the decorated function reference instead of a string name
result = await publish(user_events, {"user_id": "u_123", "action": "click"})

Sync

from interlace import publish_sync

result = publish_sync("user_events", {"user_id": "u_123", "action": "click"})

Publish Result

Both functions return a dict describing the outcome:

{
    "status": "accepted",
    "stream": "user_events",
    "rows_received": 2,
    "publish_id": "a1b2c3d4-...",
    "triggered_models": ["user_sessions", "daily_active_users"]
}

Each published row automatically receives _interlace_published_at and _interlace_publish_id metadata columns.

Consuming Events

Interlace provides two consumption patterns: real-time in-process subscription and cursor-based batch consumption.

Real-time Subscription

subscribe returns an async iterator that yields events as they are published within the same process:

from interlace import subscribe

# One event at a time
async for event in subscribe("user_events"):
    print(f"New event: {event}")

# Batched (yield every 10 events or on timeout)
async for batch in subscribe("user_events", batch_size=10, timeout=5.0):
    process_batch(batch)

# With filtering
async for event in subscribe("user_events", filter_fn=lambda e: e.get("action") == "signup"):
    handle_signup(event)

Subscribe parameters:

ParameterTypeDefaultDescription
batch_sizeint1Events to buffer before yielding (1 = one at a time)
timeoutfloat \| NoneNoneSeconds to wait between events before yielding partial batch
filter_fnCallable[[dict], bool] \| NoneNoneOnly yield events where the function returns True

Batch Consumption (Cursor-based)

consume and ack provide reliable, at-least-once processing backed by database cursors. Use this for batch processing, multi-process consumers, or distributed workloads.

from interlace import consume, ack

# Pull a batch of unprocessed events
events = await consume("user_events", "analytics_worker", batch_size=100)

for event in events:
    process(event)

# Acknowledge processing to advance the cursor
await ack("user_events", "analytics_worker", events)

Each consumer tracks its own cursor position, so multiple consumers can process the same stream independently at different rates.

Consume parameters:

ParameterTypeDefaultDescription
stream_namestrName of the stream to consume from
consumer_namestrUnique consumer identifier (tracks cursor)
batch_sizeint100Maximum number of events to return
connectionstr \| NoneNoneOverride connection name

HTTP Endpoints

When the Interlace service is running (interlace serve), streams are automatically exposed via REST endpoints.

Publish

# Single event
curl -X POST http://localhost:8080/api/v1/streams/user_events \
  -H "Content-Type: application/json" \
  -d '{"user_id": "u_123", "action": "click"}'

# Batch
curl -X POST http://localhost:8080/api/v1/streams/user_events \
  -H "Content-Type: application/json" \
  -d '[{"user_id": "u_123", "action": "click"}, {"user_id": "u_124", "action": "signup"}]'

Returns 202 Accepted with the publish result.

Subscribe (Server-Sent Events)

curl -N http://localhost:8080/api/v1/streams/user_events/subscribe

Returns a persistent SSE connection. Events are delivered as they are published:

event: stream.event
data: {"stream": "user_events", "event": {"user_id": "u_123", "action": "click"}}

Consume Batch

curl -X POST http://localhost:8080/api/v1/streams/user_events/consume \
  -H "Content-Type: application/json" \
  -d '{"consumer": "analytics_worker", "batch_size": 100}'

Returns:

{
  "stream": "user_events",
  "consumer": "analytics_worker",
  "events": [{"user_id": "u_123", "action": "click", "_interlace_rowid": 1}],
  "count": 1
}

Acknowledge

curl -X POST http://localhost:8080/api/v1/streams/user_events/ack \
  -H "Content-Type: application/json" \
  -d '{"consumer": "analytics_worker", "events": [{"_interlace_rowid": 1}]}'

Endpoint Summary

MethodPathDescription
GET/api/v1/streamsList all registered streams
GET/api/v1/streams/{name}Get stream details and row count
POST/api/v1/streams/{name}Publish event(s)
GET/api/v1/streams/{name}/subscribeSubscribe via SSE
POST/api/v1/streams/{name}/consumeConsume a batch of events
POST/api/v1/streams/{name}/ackAcknowledge processed events

Streams as Model Dependencies

Stream tables are regular database tables, so downstream @model functions can reference them as dependencies. When events are published via the service, Interlace automatically triggers any models that depend on the stream.

from interlace import model, stream

@stream(name="orders", schema="events", fields={"order_id": "string", "total": "float64"})
def orders():
    pass

@model(
    name="daily_revenue",
    materialise="table",
    dependencies=["orders"],
)
def daily_revenue(orders):
    return (
        orders.mutate(order_date=orders._interlace_published_at.cast("date"))
        .group_by("order_date")
        .aggregate(revenue=orders.total.sum(), order_count=orders.order_id.count())
    )

When you POST to /api/v1/streams/orders, the daily_revenue model is automatically enqueued for re-execution.

Authentication

Configure per-stream authentication to protect HTTP endpoints:

# Bearer token authentication
@stream(
    name="webhooks",
    auth={"type": "bearer", "token": "sk_live_abc123"},
)
def webhooks():
    pass

# API key authentication
@stream(
    name="partner_events",
    auth={"type": "api_key", "header": "X-API-Key", "key": "partner_key_abc"},
)
def partner_events():
    pass

Requests without valid credentials receive a 401 Unauthorized response. Authentication is enforced only on the HTTP endpoints – the programmatic publish() API bypasses it.

Rate Limiting

Protect streams from excessive traffic with per-stream rate limits:

@stream(
    name="high_volume",
    rate_limit={"requests_per_second": 100},
)
def high_volume():
    pass

Requests exceeding the limit receive a 429 Too Many Requests response with a retry_after hint.

Schema Validation

When validate_schema=True and fields are defined, incoming events are validated before insertion. Events missing required fields are rejected:

@stream(
    name="typed_events",
    fields={"user_id": "string", "amount": "float64", "currency": "string"},
    validate_schema=True,
)
def typed_events():
    pass

# This succeeds
await publish("typed_events", {"user_id": "u_1", "amount": 9.99, "currency": "GBP"})

# This raises ValueError -- missing "currency"
await publish("typed_events", {"user_id": "u_1", "amount": 9.99})

When validate_schema=False (the default), events are accepted regardless of shape. If fields are defined but validation is off, the fields are still used to create the initial table schema.

Retention

Configure automatic data retention policies:

@stream(
    name="ephemeral_events",
    retention={"max_age_days": 30},
)
def ephemeral_events():
    pass

@stream(
    name="bounded_events",
    retention={"max_rows": 1_000_000},
)
def bounded_events():
    pass

Tips

  • Streams create append-only tables. Data is never updated in place – each publish adds new rows.
  • Use fields for predictable schemas. Without fields, the table schema is inferred from the first batch of events, which can lead to type mismatches on later batches.
  • Choose the right consumption pattern. Use subscribe() for real-time, in-process reactions. Use consume()/ack() for reliable batch processing across restarts.
  • Name consumers uniquely. Each consumer_name in consume() tracks its own cursor, so two workers with the same name will compete for events rather than process independently.
  • Stream tables work with incremental models. Use the _interlace_published_at column or rowid cursor in downstream models for efficient incremental reads.