Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Python Client

The official Python client for Durable Streams protocol with both sync and async support.

Installation

pip install durable-streams
Requires Python 3.10+

Quick Start

Synchronous API

from durable_streams import stream, DurableStream

# Read-only streaming
with stream("https://streams.example.com/my-stream") as res:
    for item in res.iter_json():
        print(item)

# Read/write handle
handle = DurableStream.create(
    "https://streams.example.com/my-stream",
    content_type="application/json",
)

handle.append({"message": "hello"})

Asynchronous API

from durable_streams import astream, AsyncDurableStream
import asyncio

async def main():
    # Async streaming
    async with astream("https://streams.example.com/my-stream") as res:
        async for item in res.iter_json():
            print(item)

    # Async handle
    handle = await AsyncDurableStream.create(
        "https://streams.example.com/my-stream",
        content_type="application/json",
    )
    await handle.append({"message": "hello"})

asyncio.run(main())

Core APIs

stream()

Synchronous read API for consuming streams.
url
str
required
The full URL to the durable stream
offset
Offset | None
default:"-1"
Starting offset ("-1" for start of stream)
live
LiveMode
default:"True"
Live mode: False, True, "long-poll", or "sse"
headers
HeadersLike | None
HTTP headers (values can be strings or callables)
params
ParamsLike | None
Query parameters (values can be strings or callables)
client
httpx.Client | None
Optional httpx.Client for connection reuse
StreamResponse
object
Context manager for consuming stream data
iter_bytes()
Iterator[bytes]
Iterate over raw byte chunks
iter_json()
Iterator[Any]
Iterate over individual JSON items
iter_text()
Iterator[str]
Iterate over text chunks
read_all()
bytes
Accumulate all bytes until upToDate
json_all()
list[Any]
Accumulate all JSON items into list
offset
Offset
Current stream offset (advances with each chunk)
up_to_date
bool
Whether we’ve reached the current end of stream
Example:
from durable_streams import stream

with stream("https://streams.example.com/events", live=True) as res:
    for event in res.iter_json():
        print(f"Event: {event}")
        print(f"Offset: {res.offset}")
        # Save checkpoint
        db.save_offset(res.offset)

DurableStream

Synchronous handle class for read/write operations.

Static Methods

DurableStream.create()
classmethod
Create a new stream and return a handle
url
str
required
Stream URL
content_type
str | None
Content type (e.g., "application/json")
ttl_seconds
int | None
Time-to-live in seconds
expires_at
str | None
Absolute expiry time (RFC3339)
closed
bool
default:"False"
Create stream in closed state
client
httpx.Client | None
Optional httpx.Client
Returns: DurableStream
DurableStream.connect()
classmethod
Connect to an existing stream (validates via HEAD)
url
str
required
Stream URL
client
httpx.Client | None
Optional httpx.Client
Returns: DurableStream

Instance Methods

append()
method
Append data to the stream (with automatic batching)
data
bytes | str | Any
required
Data to append. For JSON streams, pass dict/list (auto-serialized) or pre-serialized JSON string
seq
str | None
Writer coordination sequence
content_type
str | None
Content type override
Returns: AppendResult with next_offset
Example:
handle = DurableStream("https://streams.example.com/events")

# JSON stream - pass dict (auto-serialized)
result = handle.append({"message": "hello"})
print(f"Next offset: {result.next_offset}")

# Or pre-serialize
result = handle.append('{"message":"hello"}')

# Byte stream
result = handle.append(b"raw bytes")
stream()
method
Start a read session
offset
Offset | None
Starting offset
live
LiveMode
default:"True"
Live mode
Returns: StreamResponse (context manager)
close_stream()
method
Close the stream permanently (no more appends)
data
bytes | str | Any | None
Optional final message
Returns: CloseResult with final_offset
head()
method
Get stream metadata via HEAD requestReturns: HeadResult with content_type, offset, stream_closed
delete()
method
Delete the stream

AsyncDurableStream

Asynchronous handle class (same API as DurableStream, but async).
import asyncio
from durable_streams import AsyncDurableStream

async def main():
    handle = await AsyncDurableStream.create(
        "https://streams.example.com/events",
        content_type="application/json",
    )

    await handle.append({"message": "hello"})

    async with handle.stream() as res:
        async for event in res.iter_json():
            print(event)

asyncio.run(main())

IdempotentProducer

Exactly-once writes with automatic batching and pipelining.
from durable_streams import IdempotentProducer
import httpx

async def main():
    async with httpx.AsyncClient() as client:
        producer = IdempotentProducer(
            url="https://streams.example.com/orders",
            producer_id="order-service-1",
            client=client,
            epoch=0,
            auto_claim=True,
            max_batch_bytes=1024 * 1024,
            linger_ms=5,
        )

        # Fire-and-forget (returns immediately)
        producer.append(b'{"orderId":"123"}')
        producer.append(b'{"orderId":"456"}')

        # Ensure delivery
        await producer.flush()
        await producer.close()
__init__()
method
Create an idempotent producer
url
str
required
Stream URL
producer_id
str
required
Stable identifier (e.g., “order-service-1”)
client
httpx.AsyncClient | None
HTTP client for connection reuse
epoch
int
default:"0"
Starting epoch (increment on restart)
auto_claim
bool
default:"False"
Auto-retry with epoch+1 on 403
max_batch_bytes
int
default:"1048576"
Max batch size (1MB)
linger_ms
int
default:"5"
Max wait before sending batch
max_in_flight
int
default:"5"
Max concurrent batches
on_error
Callable[[Exception], None] | None
Error callback
append()
method
Fire-and-forget append
body
bytes | str
required
Data to append (for JSON, pass pre-serialized JSON)
flush()
async method
Send pending batch and wait for all in-flight batchesReturns: None
close_stream()
async method
Close the stream with producer headers (idempotent)
data
str | bytes | None
Optional final message
Returns: IdempotentAppendResult

Types

Offset

Offset = str  # e.g., "-1" for start, "123_456" for position

LiveMode

LiveMode = Literal[False, True, "long-poll", "sse"]
# False: catch-up only
# True: auto-select mode
# "long-poll": explicit long-polling
# "sse": explicit Server-Sent Events

HeadersLike / ParamsLike

HeadersLike = dict[str, str | Callable[[], str]]
ParamsLike = dict[str, str | Callable[[], str]]

Error Handling

from durable_streams import (
    DurableStreamError,
    StreamNotFoundError,
    StreamClosedError,
    SeqConflictError,
)

try:
    handle.append(data)
except StreamClosedError as e:
    print(f"Stream closed: {e.url}")
except SeqConflictError:
    print("Sequence conflict")
except DurableStreamError as e:
    print(f"Error {e.code}: {e}")

Exception Hierarchy

  • DurableStreamError - Base exception
    • StreamNotFoundError - 404
    • StreamExistsError - 409 on create
    • StreamClosedError - 409 with Stream-Closed header
    • SeqConflictError - 409 sequence conflict
    • RetentionGoneError - 410 offset expired
    • SSENotSupportedError - Server doesn’t support SSE

Advanced Features

Connection Pooling

import httpx
from durable_streams import DurableStream

with httpx.Client(timeout=30.0) as client:
    stream1 = DurableStream("https://.../stream1", client=client)
    stream2 = DurableStream("https://.../stream2", client=client)
    # Reuses connections

Dynamic Headers

from durable_streams import stream

def get_token():
    return "Bearer " + fetch_current_token()

with stream(
    "https://...",
    headers={
        "Authorization": get_token,  # Called per-request
        "X-Request-ID": lambda: str(uuid.uuid4()),
    },
) as res:
    for item in res.iter_json():
        print(item)

Type Annotations

from typing import TypedDict
from durable_streams import stream

class Event(TypedDict):
    id: str
    type: str
    data: dict

with stream("https://...") as res:
    for event in res.iter_json():
        event: Event  # Type hint for IDE
        print(event["id"])

Testing

The package includes a testing module:
from durable_streams.testing import InMemoryStreamServer

server = InMemoryStreamServer()
handle = DurableStream(server.url("/test"))
handle.create_stream(content_type="application/json")
handle.append({"test": "data"})

Source Code

Source: packages/client-py/src/durable_streams/