Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Python Client
The official Python client for Durable Streams protocol with both sync and async support.
Installation
pip install durable-streams
Requires Python 3.10+
Quick Start
Synchronous API
from durable_streams import stream, DurableStream
# Read-only streaming
with stream("https://streams.example.com/my-stream") as res:
for item in res.iter_json():
print(item)
# Read/write handle
handle = DurableStream.create(
"https://streams.example.com/my-stream",
content_type="application/json",
)
handle.append({"message": "hello"})
Asynchronous API
from durable_streams import astream, AsyncDurableStream
import asyncio
async def main():
# Async streaming
async with astream("https://streams.example.com/my-stream") as res:
async for item in res.iter_json():
print(item)
# Async handle
handle = await AsyncDurableStream.create(
"https://streams.example.com/my-stream",
content_type="application/json",
)
await handle.append({"message": "hello"})
asyncio.run(main())
Core APIs
stream()
Synchronous read API for consuming streams.
The full URL to the durable stream
offset
Offset | None
default:"-1"
Starting offset ("-1" for start of stream)
Live mode: False, True, "long-poll", or "sse"
HTTP headers (values can be strings or callables)
Query parameters (values can be strings or callables)
Optional httpx.Client for connection reuse
Context manager for consuming stream dataIterate over raw byte chunks
Iterate over individual JSON items
Accumulate all bytes until upToDate
Accumulate all JSON items into list
Current stream offset (advances with each chunk)
Whether we’ve reached the current end of stream
Example:
from durable_streams import stream
with stream("https://streams.example.com/events", live=True) as res:
for event in res.iter_json():
print(f"Event: {event}")
print(f"Offset: {res.offset}")
# Save checkpoint
db.save_offset(res.offset)
DurableStream
Synchronous handle class for read/write operations.
Static Methods
Create a new stream and return a handleContent type (e.g., "application/json")
Absolute expiry time (RFC3339)
Create stream in closed state
Returns: DurableStream
Connect to an existing stream (validates via HEAD)Returns: DurableStream
Instance Methods
Append data to the stream (with automatic batching)data
bytes | str | Any
required
Data to append. For JSON streams, pass dict/list (auto-serialized) or pre-serialized JSON string
Writer coordination sequence
Returns: AppendResult with next_offset
Example:
handle = DurableStream("https://streams.example.com/events")
# JSON stream - pass dict (auto-serialized)
result = handle.append({"message": "hello"})
print(f"Next offset: {result.next_offset}")
# Or pre-serialize
result = handle.append('{"message":"hello"}')
# Byte stream
result = handle.append(b"raw bytes")
Start a read sessionReturns: StreamResponse (context manager)
Close the stream permanently (no more appends)Returns: CloseResult with final_offset
Get stream metadata via HEAD requestReturns: HeadResult with content_type, offset, stream_closed
AsyncDurableStream
Asynchronous handle class (same API as DurableStream, but async).
import asyncio
from durable_streams import AsyncDurableStream
async def main():
handle = await AsyncDurableStream.create(
"https://streams.example.com/events",
content_type="application/json",
)
await handle.append({"message": "hello"})
async with handle.stream() as res:
async for event in res.iter_json():
print(event)
asyncio.run(main())
IdempotentProducer
Exactly-once writes with automatic batching and pipelining.
from durable_streams import IdempotentProducer
import httpx
async def main():
async with httpx.AsyncClient() as client:
producer = IdempotentProducer(
url="https://streams.example.com/orders",
producer_id="order-service-1",
client=client,
epoch=0,
auto_claim=True,
max_batch_bytes=1024 * 1024,
linger_ms=5,
)
# Fire-and-forget (returns immediately)
producer.append(b'{"orderId":"123"}')
producer.append(b'{"orderId":"456"}')
# Ensure delivery
await producer.flush()
await producer.close()
Create an idempotent producerStable identifier (e.g., “order-service-1”)
HTTP client for connection reuse
Starting epoch (increment on restart)
Auto-retry with epoch+1 on 403
Max wait before sending batch
on_error
Callable[[Exception], None] | None
Error callback
Fire-and-forget appendData to append (for JSON, pass pre-serialized JSON)
Send pending batch and wait for all in-flight batchesReturns: None
Close the stream with producer headers (idempotent)Returns: IdempotentAppendResult
Types
Offset
Offset = str # e.g., "-1" for start, "123_456" for position
LiveMode
LiveMode = Literal[False, True, "long-poll", "sse"]
# False: catch-up only
# True: auto-select mode
# "long-poll": explicit long-polling
# "sse": explicit Server-Sent Events
HeadersLike = dict[str, str | Callable[[], str]]
ParamsLike = dict[str, str | Callable[[], str]]
Error Handling
from durable_streams import (
DurableStreamError,
StreamNotFoundError,
StreamClosedError,
SeqConflictError,
)
try:
handle.append(data)
except StreamClosedError as e:
print(f"Stream closed: {e.url}")
except SeqConflictError:
print("Sequence conflict")
except DurableStreamError as e:
print(f"Error {e.code}: {e}")
Exception Hierarchy
DurableStreamError - Base exception
StreamNotFoundError - 404
StreamExistsError - 409 on create
StreamClosedError - 409 with Stream-Closed header
SeqConflictError - 409 sequence conflict
RetentionGoneError - 410 offset expired
SSENotSupportedError - Server doesn’t support SSE
Advanced Features
Connection Pooling
import httpx
from durable_streams import DurableStream
with httpx.Client(timeout=30.0) as client:
stream1 = DurableStream("https://.../stream1", client=client)
stream2 = DurableStream("https://.../stream2", client=client)
# Reuses connections
from durable_streams import stream
def get_token():
return "Bearer " + fetch_current_token()
with stream(
"https://...",
headers={
"Authorization": get_token, # Called per-request
"X-Request-ID": lambda: str(uuid.uuid4()),
},
) as res:
for item in res.iter_json():
print(item)
Type Annotations
from typing import TypedDict
from durable_streams import stream
class Event(TypedDict):
id: str
type: str
data: dict
with stream("https://...") as res:
for event in res.iter_json():
event: Event # Type hint for IDE
print(event["id"])
Testing
The package includes a testing module:
from durable_streams.testing import InMemoryStreamServer
server = InMemoryStreamServer()
handle = DurableStream(server.url("/test"))
handle.create_stream(content_type="application/json")
handle.append({"test": "data"})
Source Code
Source: packages/client-py/src/durable_streams/