Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Durable Streams supports two live modes for real-time updates: long-polling and Server-Sent Events (SSE). Both enable efficient live tailing of streams while maintaining full resumability through offsets.

Overview

Live modes allow your application to receive new data as it arrives, without constantly polling the server:

Long-polling

The server waits up to a timeout for new data before responding. Efficient for binary streams and mobile apps.

Server-Sent Events

A persistent connection that streams data as it arrives. Ideal for JSON streams and real-time dashboards.

Choosing a live mode

Client libraries automatically select the best mode based on your stream’s content type:
import { stream } from '@durable-streams/client'

// Auto-select (recommended)
const response = await stream({
  url: 'https://streams.example.com/events',
  live: true  // Auto-selects SSE for JSON, long-poll for binary
})

// Explicit mode
const sseResponse = await stream({
  url: 'https://streams.example.com/events',
  live: 'sse'  // Force SSE
})

const longPollResponse = await stream({
  url: 'https://streams.example.com/events',
  live: 'long-poll'  // Force long-polling
})

// Catch-up only (no live updates)
const catchUpResponse = await stream({
  url: 'https://streams.example.com/events',
  live: false  // Stop after reaching current end
})
Recommendation: Use live: true to let the client choose the optimal mode. It selects SSE for JSON streams and long-polling for binary streams.

Long-polling mode

In long-polling mode, the client sends a GET request with live=long-poll. If no data is available at the requested offset, the server waits up to a timeout for new data to arrive.

How it works

1
Client requests with offset
2
GET /stream?offset=100_5678&live=long-poll
3
Server response scenarios
4
Case 1: Data available
5
HTTP/1.1 200 OK
Stream-Next-Offset: 100_9999
Stream-Cursor: abc123

[data bytes]
6
Case 2: Timeout (no new data)
7
HTTP/1.1 204 No Content
Stream-Next-Offset: 100_5678
Stream-Up-To-Date: true
Stream-Cursor: abc123
8
Case 3: Stream closed
9
HTTP/1.1 204 No Content
Stream-Next-Offset: 100_5678
Stream-Up-To-Date: true
Stream-Closed: true
10
Client reconnects automatically
11
After receiving a response, the client immediately makes another request with the new offset and cursor.

Example: Long-polling in TypeScript

import { stream } from '@durable-streams/client'

const response = await stream({
  url: 'https://streams.example.com/notifications',
  offset: '-1',
  live: 'long-poll'
})

// Subscribe to batches as they arrive
response.subscribeJson(async (batch) => {
  console.log(`Received ${batch.items.length} notifications`)
  
  for (const notification of batch.items) {
    showNotification(notification)
  }
  
  // Automatically reconnects for next batch
})

// Cancel when done
response.cancel()

Advantages of long-polling

Firewall friendly

Works through restrictive firewalls and proxies that block persistent connections.

Mobile efficient

Better for mobile apps where network conditions change frequently.

Simple infrastructure

No special server configuration needed beyond HTTP.

Automatic reconnection

Naturally handles connection resets and network changes.

Server-Sent Events (SSE)

SSE mode establishes a persistent HTTP connection that streams data as it arrives. The server sends events in a standardized format.

SSE event format

Durable Streams uses two event types:
Contain actual stream data:
event: data
data: [
data: {"id":1,"message":"hello"},
data: {"id":2,"message":"world"}
data: ]

For JSON streams, data events contain JSON arrays. For binary streams, data is base64-encoded.

Example: SSE in TypeScript

import { stream } from '@durable-streams/client'

const response = await stream<{ id: number; message: string }>({
  url: 'https://streams.example.com/chat/room-1',
  offset: '-1',
  live: 'sse'
})

// Process messages as they arrive
response.subscribeJson(async (batch) => {
  for (const msg of batch.items) {
    appendToChat(msg)
  }
  
  // Update UI with stream status
  updateStatus({
    upToDate: batch.upToDate,
    offset: batch.offset,
    closed: batch.streamClosed
  })
})

// Or use async iteration
for await (const message of response.jsonStream()) {
  appendToChat(message)
}

Binary streams with SSE

For non-text content types, SSE automatically base64-encodes data:
const response = await stream({
  url: 'https://streams.example.com/images',
  live: 'sse'
})

// Check for base64 encoding
if (response.headers.get('stream-sse-data-encoding') === 'base64') {
  // Data events are base64-encoded
  // Client library handles decoding automatically
}

for await (const chunk of response.bodyStream()) {
  // chunk is already decoded Uint8Array
  processImageData(chunk)
}

SSE connection lifecycle

1
Initial connection
2
GET /stream?offset=-1&live=sse
3
Server streams events
4
event: data
data: [{"id":1}]

event: control
data: {"streamNextOffset":"100_1","streamCursor":"abc"}

event: data
data: [{"id":2}]

event: control
data: {"streamNextOffset":"100_2","streamCursor":"def"}
5
Server closes connection (~60s)
6
Servers close SSE connections periodically (roughly every 60 seconds) to enable CDN request collapsing.
7
Client reconnects automatically
8
GET /stream?offset=100_2&cursor=def&live=sse

Advantages of SSE

Low latency

Data arrives immediately as it’s available, no polling delay.

Efficient

Single persistent connection reduces overhead compared to repeated requests.

Browser native

Built into browsers via EventSource API (though client libraries handle this).

Streaming friendly

Natural fit for continuous data like chat, logs, or real-time updates.

Catch-up mode (no live updates)

Set live: false to read only existing data and stop at the current end:
import { stream } from '@durable-streams/client'

const response = await stream({
  url: 'https://streams.example.com/archive',
  offset: '-1',
  live: false  // Stop after catching up
})

// Get all existing data
const allData = await response.json()
console.log(`Retrieved ${allData.length} items`)

// response.closed promise resolves when done
await response.closed
Catch-up mode is useful for:
  • Batch processing of historical data
  • One-time data exports
  • Backfilling databases
  • Testing and debugging

Handling up-to-date state

Both live modes provide upToDate status to indicate when you’ve caught up with the current end:
response.subscribeJson(async (batch) => {
  if (batch.upToDate) {
    console.log('Caught up! Now receiving live updates')
    hideLoadingSpinner()
  }
  
  processBatch(batch.items)
})
upToDate: true does not mean EOF. More data may arrive in the future. Only streamClosed: true indicates no more data will ever be appended.

Stream closure in live modes

When a stream is closed, live modes behave differently:

Long-polling

HTTP/1.1 204 No Content
Stream-Closed: true
Stream-Up-To-Date: true
Stream-Next-Offset: 100_9999
The server returns immediately (no waiting) with the closure signal.

SSE

event: control
data: {"streamNextOffset":"100_9999","streamClosed":true,"upToDate":true}
The server sends a final control event, then closes the connection.

Handling closure

response.subscribeJson(async (batch) => {
  processBatch(batch.items)
  
  if (batch.streamClosed) {
    console.log('Stream ended at offset:', batch.offset)
    showCompletionMessage()
    // Client automatically stops reconnecting
  }
})

// Or wait for the closed promise
await response.closed
console.log('Stream has ended')

Resumability with live modes

Live modes are fully resumable through offsets:
import { stream } from '@durable-streams/client'

// Initial connection
const response = await stream({
  url: 'https://streams.example.com/events',
  offset: '-1',
  live: true
})

let lastOffset = '-1'

response.subscribeJson(async (batch) => {
  processBatch(batch.items)
  
  // Save offset after each batch
  lastOffset = batch.offset
  await db.saveCheckpoint(lastOffset)
})

// Later, after disconnect or restart, resume from saved offset
const resumed = await stream({
  url: 'https://streams.example.com/events',
  offset: lastOffset,  // Resume from checkpoint
  live: true
})

Skip to live with offset=now

Skip all historical data and start from the current tail:
const response = await stream({
  url: 'https://streams.example.com/notifications',
  offset: 'now',  // Skip to current end
  live: true      // Then follow live
})

// Only receive new notifications from this point forward
response.subscribeJson(async (batch) => {
  for (const notification of batch.items) {
    showNotification(notification)  // Only new ones
  }
})
offset=now with live=true is perfect for:
  • Real-time dashboards (skip historical data)
  • Late joiners to a conversation
  • Monitoring and alerting (only new events)
  • Presence tracking

Error handling and resilience

Automatic reconnection

Client libraries automatically reconnect on network failures:
const response = await stream({
  url: 'https://streams.example.com/events',
  live: true,
  // Automatic exponential backoff on reconnection failures
  backoffOptions: {
    initialDelayMs: 100,
    maxDelayMs: 30000,
    maxAttempts: 10
  }
})

SSE fallback to long-polling

If SSE connections fail repeatedly (due to proxy buffering or infrastructure issues), clients can automatically fall back to long-polling:
const response = await stream({
  url: 'https://streams.example.com/events',
  live: 'sse',
  sseResilience: {
    minConnectionDuration: 1000,     // Connections < 1s are "short"
    maxShortConnections: 3,          // After 3 short connections, fall back
    backoffBaseDelay: 100,           // Backoff between retries
    backoffMaxDelay: 5000,           // Max backoff delay
    logWarnings: true                // Log fallback warnings
  }
})

Custom error handling

const response = await stream({
  url: 'https://streams.example.com/events',
  live: true,
  onError: async (error) => {
    if (error instanceof FetchError && error.status === 401) {
      // Refresh auth token
      const newToken = await refreshAuthToken()
      return { headers: { Authorization: `Bearer ${newToken}` } }
    }
    // Let other errors propagate
  }
})

Best practices

1
Let the client choose the mode
2
Use live: true for automatic mode selection:
3
// ✅ Good: Auto-select
const response = await stream({ url, live: true })

// ❌ Less ideal: Manual selection without reason
const response = await stream({ url, live: 'sse' })
4
Save offsets regularly
5
Checkpoint your position for resumability:
6
response.subscribeJson(async (batch) => {
  await processBatch(batch.items)
  await db.saveCheckpoint(batch.offset)  // Save after processing
})
7
Handle closure gracefully
8
Detect when streams end:
9
response.subscribeJson(async (batch) => {
  processBatch(batch.items)
  
  if (batch.streamClosed) {
    console.log('Stream completed')
    cleanupResources()
  }
})
10
Use appropriate timeouts
11
For mobile apps or unreliable networks, configure aggressive timeouts:
12
const response = await stream({
  url,
  live: 'long-poll',  // Better for mobile
  signal: AbortSignal.timeout(30000)  // 30s timeout
})

Next steps

Idempotent Producers

Implement exactly-once write semantics

Caching and Fanout

Scale to millions of viewers with CDN caching