Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Durable Streams provides a natural foundation for event-sourced architectures, offering durable append-only logs with offset-based replay and exactly-once write semantics.

Why Durable Streams for event sourcing

While backend event stores like Kafka excel at server-to-server event processing, they don’t extend cleanly to client applications. Durable Streams bridges this gap:
  • HTTP-native - Works in browsers, mobile apps, IoT devices—anywhere HTTP works
  • Offset-based replay - Resume from any point in the event stream
  • Exactly-once writes - IdempotentProducer prevents duplicate events
  • CDN-friendly - Historical replays leverage edge caching for massive scale
  • Live tailing - Seamlessly transition from replay to real-time event streaming

Basic event sourcing pattern

1

Append events to the log

Events are appended to a durable stream as they occur. Each event is immutable and ordered.
import { DurableStream } from "@durable-streams/client"

const eventLog = await DurableStream.create({
  url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
  contentType: "application/json",
})

// Append events as they happen
await eventLog.append({ type: "OrderCreated", orderId: "123", total: 99.99 })
await eventLog.append({ type: "OrderPaid", orderId: "123", paymentId: "pay_xyz" })
await eventLog.append({ type: "OrderShipped", orderId: "123", trackingId: "track_abc" })
2

Replay events to rebuild state

Read all events from the beginning to reconstruct current state. This is the core of event sourcing.
import { stream } from "@durable-streams/client"

// Replay from beginning
const res = await stream({
  url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
  offset: "-1", // Start from beginning
  live: false,  // Catch-up only
})

const events = await res.json()
const state = events.reduce(applyEvent, initialState)

console.log("Current order state:", state)
3

Subscribe to new events

After catching up, subscribe to new events in real-time to keep state synchronized.
// Replay then live tail
const res = await stream({
  url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
  offset: "-1",
  live: true, // Continue after catching up
})

let state = initialState

res.subscribeJson(async (batch) => {
  for (const event of batch.items) {
    state = applyEvent(state, event)
  }
  saveCheckpoint(batch.offset)
})

Event reducer pattern

Define how each event type transforms your state:
interface OrderState {
  status: "created" | "paid" | "shipped" | "delivered"
  total: number
  paymentId?: string
  trackingId?: string
  deliveredAt?: number
}

const initialState: OrderState = {
  status: "created",
  total: 0,
}

function applyEvent(state: OrderState, event: any): OrderState {
  switch (event.type) {
    case "OrderCreated":
      return { ...state, total: event.total }
    
    case "OrderPaid":
      return { ...state, status: "paid", paymentId: event.paymentId }
    
    case "OrderShipped":
      return { ...state, status: "shipped", trackingId: event.trackingId }
    
    case "OrderDelivered":
      return { 
        ...state, 
        status: "delivered", 
        deliveredAt: event.timestamp 
      }
    
    default:
      return state
  }
}
Now you can rebuild state at any point:
const res = await stream({
  url: eventLogUrl,
  offset: "-1",
  live: false,
})

const events = await res.json()
const currentState = events.reduce(applyEvent, initialState)

Exactly-once event writes

Use IdempotentProducer to guarantee exactly-once event delivery even with retries:
import { DurableStream, IdempotentProducer } from "@durable-streams/client"

const eventLog = await DurableStream.create({
  url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
  contentType: "application/json",
})

const producer = new IdempotentProducer(eventLog, `order-service-1`, {
  epoch: 0,
  autoClaim: true, // Auto-recover on failover
})

// Fire-and-forget writes - deduplicated by (producerId, epoch, seq)
producer.append(JSON.stringify({ type: "OrderCreated", orderId, total }))
producer.append(JSON.stringify({ type: "OrderPaid", orderId, paymentId }))

// Ensure all events are written
await producer.flush()
await producer.close()
If the network fails mid-batch and you retry, the server deduplicates using producer headers—no duplicate events in your log.

Snapshots for performance

For long event streams, replaying from the beginning can be slow. Use snapshots to checkpoint state:
interface Snapshot {
  offset: string    // Event log offset where snapshot was taken
  state: OrderState // State at that offset
  timestamp: number
}

// Take periodic snapshots
async function takeSnapshot(
  state: OrderState, 
  offset: string
): Promise<void> {
  const snapshot: Snapshot = {
    offset,
    state,
    timestamp: Date.now(),
  }
  
  await saveSnapshot(orderId, snapshot)
}

// Restore from snapshot then replay recent events
async function restoreState(orderId: string): Promise<OrderState> {
  const snapshot = await loadSnapshot(orderId)
  
  if (!snapshot) {
    // No snapshot, replay from beginning
    return replayFromBeginning(orderId)
  }
  
  // Replay events since snapshot
  const res = await stream({
    url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
    offset: snapshot.offset,
    live: false,
  })
  
  const recentEvents = await res.json()
  return recentEvents.reduce(applyEvent, snapshot.state)
}

Projections and views

Build multiple read models (projections) from the same event stream:
// Lightweight view for listings
interface OrderSummary {
  orderId: string
  status: string
  total: number
  createdAt: number
}

function projectToSummary(
  summary: OrderSummary | null, 
  event: any
): OrderSummary {
  if (event.type === "OrderCreated") {
    return {
      orderId: event.orderId,
      status: "created",
      total: event.total,
      createdAt: event.timestamp,
    }
  }
  
  if (!summary) return null
  
  if (event.type === "OrderPaid") {
    return { ...summary, status: "paid" }
  }
  
  // ... handle other events
  return summary
}
Build all projections from the same event stream:
const res = await stream({
  url: eventLogUrl,
  offset: "-1",
  live: true,
})

let summary: OrderSummary | null = null
let auditTrail: AuditEntry[] = []

res.subscribeJson(async (batch) => {
  for (const event of batch.items) {
    summary = projectToSummary(summary, event)
    auditTrail = projectToAudit(auditTrail, event)
  }
  
  await saveSummary(summary)
  await saveAuditTrail(auditTrail)
  saveCheckpoint(batch.offset)
})

Aggregate streams

Organize events by aggregate (e.g., one stream per order, user, or shopping cart):
// Each order has its own event stream
const orderStream = await DurableStream.create({
  url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
  contentType: "application/json",
})

// Each user has their own event stream
const userStream = await DurableStream.create({
  url: `https://your-server.com/v1/stream/users/${userId}/events`,
  contentType: "application/json",
})

// Workspace-level events
const workspaceStream = await DurableStream.create({
  url: `https://your-server.com/v1/stream/workspaces/${workspaceId}/events`,
  contentType: "application/json",
})
This pattern provides:
  • Isolation - Each aggregate’s events are independent
  • Performance - Replay only relevant events for an aggregate
  • Authorization - Control access per aggregate

Time travel and debugging

Because events are immutable and replayable, you can reconstruct state at any point in time for debugging.
// Debug: What was the order state at timestamp X?
async function getStateAtTime(
  orderId: string, 
  targetTimestamp: number
): Promise<OrderState> {
  const res = await stream({
    url: `https://your-server.com/v1/stream/orders/${orderId}/events`,
    offset: "-1",
    live: false,
  })
  
  const events = await res.json()
  
  // Filter events up to target time
  const eventsUntilTime = events.filter(
    e => e.timestamp <= targetTimestamp
  )
  
  return eventsUntilTime.reduce(applyEvent, initialState)
}

// "What did this order look like yesterday at 3pm?"
const stateYesterday = await getStateAtTime(
  orderId,
  Date.now() - 24 * 60 * 60 * 1000
)

Event versioning

Handle schema evolution by versioning events:
interface Event {
  version: number
  type: string
  timestamp: number
  data: any
}

function applyEvent(state: OrderState, event: Event): OrderState {
  // Handle different event versions
  if (event.type === "OrderCreated") {
    if (event.version === 1) {
      // Old schema: { total }
      return { ...state, total: event.data.total }
    }
    if (event.version === 2) {
      // New schema: { total, currency, items }
      return {
        ...state,
        total: event.data.total,
        currency: event.data.currency,
        items: event.data.items,
      }
    }
  }
  
  // ... handle other events
  return state
}

CQRS pattern

Combine event sourcing with Command Query Responsibility Segregation:

Write side (Commands)

Commands validate and append events to the stream.
async function handleCreateOrder(cmd) {
  // Validate command
  if (cmd.total <= 0) {
    throw new Error("Invalid total")
  }
  
  // Append event
  await eventLog.append({
    type: "OrderCreated",
    orderId: cmd.orderId,
    total: cmd.total,
    timestamp: Date.now(),
  })
}

Read side (Queries)

Queries read from optimized projections built from events.
async function getOrderSummary(orderId) {
  // Read from materialized view
  return await db.query(
    "SELECT * FROM order_summaries WHERE id = ?",
    [orderId]
  )
}

// Projection builder keeps view up-to-date
res.subscribeJson(async (batch) => {
  for (const event of batch.items) {
    await updateMaterializedView(event)
  }
})

Production considerations

For high-scale production deployments, configure your CDN to cache historical event replays and collapse concurrent requests.

CDN configuration

# Nginx example
location ~ ^/v1/stream/ {
    proxy_pass http://origin;
    
    # Cache historical reads (offset != "now")
    proxy_cache events_cache;
    proxy_cache_key "$uri$is_args$args";
    proxy_cache_valid 200 60s;
    proxy_cache_use_stale error timeout updating;
    
    # Collapse concurrent requests to same offset
    proxy_cache_lock on;
    proxy_cache_lock_timeout 5s;
}

Performance characteristics

From production usage:
  • Write throughput - IdempotentProducer achieves 10,000+ events/sec with batching and pipelining
  • Read latency - Sub-15ms for historical reads via CDN edge caching
  • Replay performance - 100,000+ events/sec for catch-up reads
  • Scale - CDN caching enables millions of concurrent readers without origin overload

Next steps

Idempotent Producer

Deep dive into exactly-once write semantics

State Protocol

Learn about the State Protocol for CQRS and projections

Offset Management

Understand offset semantics and checkpointing

Live Modes

Choose between long-poll and SSE for live tailing