Documentation Index Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Durable Streams provides a simple HTTP-based API for working with persistent, resumable data streams. This guide covers the essential operations you’ll use in most applications.
Reading from a Stream
Fetch-like API
The stream() function provides a fetch-like interface for reading streams:
import { stream } from "@durable-streams/client"
// Read from beginning (catch-up mode)
const res = await stream <{ message : string }>({
url: "https://streams.example.com/v1/stream/my-stream" ,
offset: "-1" , // Special value: start of stream
live: false , // Catch-up only, don't wait for new data
})
const items = await res . json ()
console . log ( items ) // Array of all messages
The offset -1 is a special value meaning “start of stream”. Always use the offset value returned from reads to resume from where you left off.
Using Handles
For applications that both read and write, use DurableStream to create a reusable handle:
import { DurableStream } from "@durable-streams/client"
// Create a handle (no network request yet)
const handle = new DurableStream ({
url: "https://streams.example.com/v1/stream/my-stream" ,
headers: { Authorization: `Bearer ${ token } ` },
})
// Validate stream exists (HEAD request)
await handle . head ()
// Read data
const res = await handle . stream <{ event : string }>({
offset: "-1" ,
live: false ,
})
import { DurableStream } from "@durable-streams/client"
const handle = new DurableStream ({
url: "https://streams.example.com/v1/stream/chat-room-1" ,
})
const res = await handle . stream ({ live: false })
const text = await res . text ()
from durable_streams import DurableStream
stream = DurableStream(
url = "https://streams.example.com/v1/stream/chat-room-1"
)
text = stream.read( live = False )
import " github.com/durable-streams/client-go "
stream := client . New ( "https://streams.example.com/v1/stream/chat-room-1" )
data , err := stream . Read ( client . ReadOptions {
Live : false ,
})
Writing to a Stream
Creating a Stream
Before writing, create the stream with a content type:
const stream = await DurableStream . create ({
url: "https://streams.example.com/v1/stream/my-stream" ,
contentType: "application/json" ,
})
The contentType is set once at creation and preserved for all reads. For JSON streams, the server automatically handles message boundaries.
Appending Data
Single message
Use append() for individual writes: await stream . append ( JSON . stringify ({ message: "hello" }))
Multiple messages
Call append() multiple times. By default, messages are automatically batched: await stream . append ( JSON . stringify ({ event: "user.created" , userId: "123" }))
await stream . append ( JSON . stringify ({ event: "user.updated" , userId: "123" }))
Streaming upload
For large data, stream directly from a source: const response = await fetch ( "https://example.com/data" )
await stream . appendStream ( response . body ! )
Content Types
Durable Streams supports two modes for handling data:
JSON Mode
For structured data, use application/json:
const stream = await DurableStream . create ({
url: "https://streams.example.com/v1/stream/events" ,
contentType: "application/json" ,
})
// Each append stores one message
await stream . append ( JSON . stringify ({ type: "order.created" , id: 123 }))
await stream . append ( JSON . stringify ({ type: "order.shipped" , id: 123 }))
// Read returns individual messages
const res = await stream . stream ({ live: false })
for await ( const message of res . jsonStream ()) {
console . log ( message )
// { type: "order.created", id: 123 }
// { type: "order.shipped", id: 123 }
}
In JSON mode, the server automatically preserves message boundaries. Each append() call stores exactly one message, which you receive as a separate item when reading.
Byte Stream Mode
For raw data, use any other content type:
const stream = await DurableStream . create ({
url: "https://streams.example.com/v1/stream/logs" ,
contentType: "text/plain" ,
})
// Data is concatenated
await stream . append ( "Line 1 \n " )
await stream . append ( "Line 2 \n " )
// Read returns concatenated data
const res = await stream . stream ({ live: false })
const text = await res . text ()
console . log ( text ) // "Line 1\nLine 2\n"
In byte stream mode, you must implement your own message framing (e.g., newline-delimited JSON). The server concatenates all appends.
Consumption Patterns
The StreamResponse interface provides multiple ways to consume data:
Accumulate All
Async Iteration
Subscriber Pattern
ReadableStream
// Load all data into memory
const res = await stream . stream ({ live: false })
const items = await res . json ()
Closing Streams
Close a stream to mark it as complete and prevent further writes:
// Close without final message
await stream . close ()
// Close with final message
await stream . close ({
body: JSON . stringify ({ event: "stream.completed" }),
})
Once closed, the stream remains readable but rejects further appends with a 409 Conflict error.
Error Handling
Durable Streams provides structured error types:
import {
DurableStreamError ,
StreamClosedError ,
FetchError
} from "@durable-streams/client"
try {
await stream . append ( data )
} catch ( err ) {
if ( err instanceof StreamClosedError ) {
console . log ( "Stream is closed at offset:" , err . finalOffset )
} else if ( err instanceof DurableStreamError ) {
console . log ( "Protocol error:" , err . code , err . status )
} else if ( err instanceof FetchError ) {
console . log ( "Network error:" , err . status )
}
}
Common error codes:
NOT_FOUND (404): Stream doesn’t exist
CONFLICT_EXISTS (409): Stream already exists (on create)
STREAM_CLOSED (409): Cannot append to closed stream
UNAUTHORIZED (401): Authentication failed
FORBIDDEN (403): Authorization failed
Next Steps
Resumable Streaming Learn how to resume streams from saved offsets
Real-time Subscriptions Stream live updates with long-poll and SSE
Error Handling Handle errors and implement retry logic
Production Deployment Deploy Durable Streams to production