Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

One of Durable Streams’ core features is offset-based resumability. Your application can resume reading from exactly where it left off, even after page refreshes, network failures, or crashes.

Understanding Offsets

Every read operation returns an offset representing your position in the stream:
const res = await stream.stream({ 
  offset: "-1",  // Start from beginning
  live: false,
})

const items = await res.json()

// Save this offset to resume later
const currentOffset = res.offset
console.log("Current position:", currentOffset)
Offsets are opaque strings (e.g., "42_1024") containing internal state. Always save and use the exact offset value returned by the server—never construct or modify offsets manually.

Saving and Resuming

1

Save offset after processing

Store the offset after successfully processing each batch:
const res = await stream.stream({ live: false })
const items = await res.json()

// Process items
for (const item of items) {
  await processItem(item)
}

// Save offset for next read
localStorage.setItem("stream-offset", res.offset)
2

Resume from saved offset

On restart, load the saved offset and continue:
const savedOffset = localStorage.getItem("stream-offset") ?? "-1"

const res = await stream.stream({ 
  offset: savedOffset,
  live: false,
})

// Only new data since saved offset
const newItems = await res.json()
3

Update offset incrementally

For live streams, update the offset continuously:
const res = await stream.stream({ 
  offset: savedOffset,
  live: "long-poll",
})

res.subscribeJson(async (batch) => {
  for (const item of batch.items) {
    await processItem(item)
  }
  
  // Update offset after each batch
  localStorage.setItem("stream-offset", batch.offset)
})

Resumability Patterns

Pattern 1: Checkpoint on Batch Boundaries

Process and checkpoint data in batches for efficiency:
async function consumeStream(
  stream: DurableStream,
  startOffset: string
) {
  const res = await stream.stream({
    offset: startOffset,
    live: "long-poll",
  })

  res.subscribeJson(async (batch) => {
    // Process entire batch
    const results = await Promise.all(
      batch.items.map(item => processItem(item))
    )

    // Checkpoint only after successful batch processing
    await saveCheckpoint({
      offset: batch.offset,
      processedAt: new Date(),
      itemCount: batch.items.length,
    })
  })
}
Checkpointing on batch boundaries reduces I/O overhead and ensures you never reprocess the same batch unnecessarily.

Pattern 2: Multi-tab Coordination

Share a single offset across browser tabs using shared storage:
import { DurableStream } from "@durable-streams/client"

class SharedStreamConsumer {
  private stream: DurableStream
  private storageKey: string

  constructor(url: string, storageKey: string) {
    this.stream = new DurableStream({ url })
    this.storageKey = storageKey

    // Listen for offset updates from other tabs
    window.addEventListener("storage", (e) => {
      if (e.key === this.storageKey && e.newValue) {
        console.log("Offset updated by another tab:", e.newValue)
      }
    })
  }

  async start() {
    const savedOffset = localStorage.getItem(this.storageKey) ?? "-1"

    const res = await this.stream.stream({
      offset: savedOffset,
      live: "long-poll",
    })

    res.subscribeJson(async (batch) => {
      for (const item of batch.items) {
        await this.processItem(item)
      }

      // Updates are automatically visible to other tabs
      localStorage.setItem(this.storageKey, batch.offset)
    })
  }

  private async processItem(item: any) {
    // Your processing logic
  }
}

Pattern 3: Database-backed Checkpoints

For server-side consumers, persist offsets to a database:
import { DurableStream } from "@durable-streams/client"
import { db } from "./database"

async function consumeStreamWithDBCheckpoint(
  streamId: string,
  consumerId: string
) {
  // Load last checkpoint from database
  const checkpoint = await db.checkpoints.findOne({
    streamId,
    consumerId,
  })

  const startOffset = checkpoint?.offset ?? "-1"

  const stream = new DurableStream({
    url: `https://streams.example.com/v1/stream/${streamId}`,
  })

  const res = await stream.stream({
    offset: startOffset,
    live: "long-poll",
  })

  res.subscribeJson(async (batch) => {
    // Process items within a transaction
    await db.transaction(async (tx) => {
      // Insert processed items
      for (const item of batch.items) {
        await tx.processedItems.insert({
          streamId,
          data: item,
          processedAt: new Date(),
        })
      }

      // Update checkpoint atomically
      await tx.checkpoints.upsert({
        streamId,
        consumerId,
        offset: batch.offset,
        updatedAt: new Date(),
      })
    })
  })
}
Always update the checkpoint within the same transaction as processing the data to ensure exactly-once semantics.

Exactly-Once Delivery with Idempotent Producers

For write-side resumability, use IdempotentProducer to guarantee exactly-once delivery:
import { DurableStream, IdempotentProducer } from "@durable-streams/client"

const stream = await DurableStream.create({
  url: "https://streams.example.com/v1/stream/orders",
  contentType: "application/json",
})

const producer = new IdempotentProducer(stream, "order-service-1", {
  epoch: 0,
  autoClaim: true, // Automatically handle epoch conflicts
  onError: (err) => console.error("Write failed:", err),
})

// Fire-and-forget writes (synchronous, returns immediately)
for (const order of orders) {
  producer.append(JSON.stringify(order))
}

// Ensure all messages are delivered
await producer.flush()
The producer uses (producerId, epoch, seq) headers to deduplicate writes. If the producer crashes and restarts, retried writes are automatically detected and ignored by the server.

How It Works

1

Producer identifies itself

Each producer has a unique ID (e.g., "order-service-1"):
const producer = new IdempotentProducer(
  stream, 
  "order-service-1",  // Stable producer ID
  { epoch: 0 }
)
2

Sequence numbers ensure order

Each batch gets a monotonic sequence number:
POST /v1/stream/orders
Producer-Id: order-service-1
Producer-Epoch: 0
Producer-Seq: 42
3

Server deduplicates retries

If the same (producerId, epoch, seq) arrives twice, the server returns 204 No Content without duplicating data:
// First attempt: 200 OK - data written
// Retry after crash: 204 No Content - deduplicated
4

Epochs fence zombies

When restarting a producer, increment the epoch to fence old instances:
// Old producer (crashed)
const oldProducer = new IdempotentProducer(stream, "worker-1", { epoch: 0 })

// New producer (after restart)
const newProducer = new IdempotentProducer(stream, "worker-1", { epoch: 1 })

// Old producer's retries now fail with 403 Forbidden

Handling Network Interruptions

Durable Streams automatically handles transient failures with exponential backoff:
const res = await stream.stream({
  offset: savedOffset,
  live: "long-poll",
  backoffOptions: {
    maxRetries: 10,
    initialDelayMs: 100,
    maxDelayMs: 30000,
    backoffMultiplier: 2,
  },
})
For custom retry logic, use the onError callback:
const res = await stream.stream({
  offset: savedOffset,
  live: "long-poll",
  onError: async (error) => {
    if (error instanceof FetchError && error.status === 401) {
      // Refresh auth token
      const newToken = await refreshAuthToken()
      
      // Return new headers to retry with
      return {
        headers: { Authorization: `Bearer ${newToken}` },
      }
    }
    
    // Return undefined to stop retrying
    return undefined
  },
})
The onError handler is called before each retry. Return updated headers or params to retry with fresh credentials, or return undefined to abort.

Cursor-based Caching

Durable Streams includes cursor-based cache busting for CDN compatibility:
const res = await stream.stream({ 
  offset: "100_2048",
  live: false,
})

// Response includes cursor for cache coordination
const cursor = res.cursor
Cursors change when new data is written, ensuring CDN caches are invalidated:
GET /v1/stream/my-stream?offset=100_2048&cursor=abc123
Cache-Control: public, max-age=60
Stream-Next-Cursor: def456
The client automatically manages cursors. You don’t need to handle them explicitly unless building custom caching logic.

Testing Resumability

Use these patterns to verify your application handles interruptions correctly:
import { DurableStream } from "@durable-streams/client"

describe("Stream resumability", () => {
  it("resumes from saved offset after disconnect", async () => {
    const stream = new DurableStream({
      url: "https://streams.example.com/v1/stream/test",
    })

    // First read
    const res1 = await stream.stream({ offset: "-1", live: false })
    const items1 = await res1.json()
    const offset1 = res1.offset

    // Write more data
    await stream.append(JSON.stringify({ new: "data" }))

    // Resume from saved offset
    const res2 = await stream.stream({ offset: offset1, live: false })
    const items2 = await res2.json()

    // Should only get new data
    expect(items2).toHaveLength(1)
    expect(items2[0]).toEqual({ new: "data" })
  })
})

Next Steps

Real-time Subscriptions

Stream live updates with long-poll and SSE modes

Error Handling

Implement robust error handling and retry logic