Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
TypeScript Client
The official TypeScript/JavaScript client for Durable Streams protocol.
Installation
npm install @durable-streams/client
# or
pnpm add @durable-streams/client
# or
yarn add @durable-streams/client
Quick Start
import { stream, DurableStream } from "@durable-streams/client";
// Read-only API (fetch-like)
const res = await stream("https://streams.example.com/my-stream");
const items = await res.json();
// Read/Write handle API
const handle = await DurableStream.create({
url: "https://streams.example.com/my-stream",
contentType: "application/json",
});
await handle.append(JSON.stringify({ message: "hello" }));
Core APIs
stream()
Fetch-like read API for consuming streams.
Configuration for the stream sessionThe full URL to the durable stream
HTTP headers (values can be static strings or async functions)
Query parameters (values can be static strings or async functions)
Starting offset (-1 for start of stream)
Live mode: false, true, "long-poll", or "sse"
Hint: treat content as JSON even if Content-Type doesn’t indicate it
AbortSignal for cancellation
Error handler for recoverable errors
Stream session with multiple consumption APIsCurrent offset (advances as data is consumed)
Whether we’ve reached the current end of stream
Whether the stream is permanently closed (no more data will ever be appended)
body()
() => Promise<Uint8Array>
Accumulate raw bytes until upToDate, then resolve
Accumulate JSON items into array, resolve at upToDate
Accumulate text chunks into string, resolve at upToDate
bodyStream()
() => ReadableStream<Uint8Array>
Raw bytes as ReadableStream (async iterable)
jsonStream()
() => ReadableStream<TJson>
Individual JSON items as ReadableStream (async iterable)
subscribeJson()
(subscriber: (batch: JsonBatch<T>) => void | Promise<void>) => () => void
Subscribe to JSON batches with backpressure control
Example: Streaming JSON
const res = await stream<{ message: string }>({
url: "https://streams.example.com/chat",
live: true,
});
// Subscribe to batches
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
console.log(item.message);
}
// Save checkpoint
await db.saveOffset(batch.offset);
});
DurableStream
Handle class for read/write operations.
Static Methods
Create a new stream and return a handleContent type (e.g., "application/json")
Absolute expiry time (RFC3339)
Create stream in closed state
Validate that a stream exists via HEAD and return a handleoptions
DurableStreamOptions
required
Instance Methods
Append data to the stream (with automatic batching)body
Uint8Array | string | Promise<Uint8Array | string>
required
Data to append (for JSON streams, pass pre-serialized JSON)
Writer coordination sequence (monotonic, lexicographic)
Example:
const stream = new DurableStream({ url: "https://..." });
// JSON stream
await stream.append(JSON.stringify({ message: "hello" }));
// Byte stream
await stream.append(new Uint8Array([1, 2, 3]));
Start a read session using the handle’s URL/headersoptions
Omit<StreamOptions, 'url'>
Stream options (url inherited from handle)
Returns: Promise<StreamResponse<TJson>>
Close the stream permanently (no more appends allowed)Optional final message to append atomically
Returns: Promise<CloseResult> with finalOffset
Get stream metadata via HEAD requestReturns: Promise<HeadResult> with contentType, offset, streamClosed
IdempotentProducer
Exactly-once writes with Kafka-style producer semantics.
import { IdempotentProducer, DurableStream } from "@durable-streams/client";
const stream = new DurableStream({ url: "https://..." });
const producer = new IdempotentProducer(stream, "order-service-1", {
epoch: 0,
autoClaim: true,
maxBatchBytes: 1024 * 1024, // 1MB
lingerMs: 5,
maxInFlight: 5,
});
// Fire-and-forget writes (returns immediately)
producer.append(JSON.stringify({ orderId: "123" }));
producer.append(JSON.stringify({ orderId: "456" }));
// Ensure all messages are delivered
await producer.flush();
await producer.close();
Create an idempotent producerStable identifier (e.g., “order-service-1”)
options
IdempotentProducerOptions
Starting epoch (increment on restart)
Auto-retry with epoch+1 on 403 (for serverless)
Max batch size (1MB default)
Max wait time before sending batch
Error callback for async failures
Fire-and-forget append (returns immediately)body
Uint8Array | string
required
Data to append (for JSON, pass pre-serialized JSON)
Send pending batch and wait for all in-flight batchesReturns: Promise<void>
Flush and close the stream (EOF)Optional final message (idempotent with producer headers)
Returns: Promise<CloseResult>
Types
Offset
Opaque string representing stream position. Use -1 for start of stream.
LiveMode
type LiveMode = boolean | "long-poll" | "sse";
// false: catch-up only
// true: auto-select (SSE for JSON, long-poll for binary)
// "long-poll": explicit long-polling
// "sse": explicit Server-Sent Events
JsonBatch<T>
interface JsonBatch<T = unknown> {
items: ReadonlyArray<T>;
offset: Offset;
upToDate: boolean;
cursor?: string;
streamClosed: boolean;
}
ByteChunk / TextChunk
interface ByteChunk {
data: Uint8Array;
offset: Offset;
upToDate: boolean;
cursor?: string;
streamClosed: boolean;
}
interface TextChunk {
text: string;
offset: Offset;
upToDate: boolean;
cursor?: string;
streamClosed: boolean;
}
Error Handling
import { DurableStreamError, FetchError } from "@durable-streams/client";
try {
await stream.append(data);
} catch (error) {
if (error instanceof DurableStreamError) {
console.log(error.code); // "NOT_FOUND" | "CONFLICT_SEQ" | ...
console.log(error.status); // HTTP status code
}
}
Error Codes
NOT_FOUND - Stream doesn’t exist
CONFLICT_SEQ - Sequence conflict
CONFLICT_EXISTS - Stream already exists (create)
STREAM_CLOSED - Stream is permanently closed
RATE_LIMITED - Rate limit exceeded (429)
UNAUTHORIZED - Auth required (401)
FORBIDDEN - Access denied (403)
Advanced Features
const res = await stream({
url: "https://...",
headers: {
Authorization: () => getToken(), // Called per-request
"X-Request-ID": () => crypto.randomUUID(),
},
});
WritableStream Integration
const stream = new DurableStream({ url: "https://..." });
const writable = stream.writable({
producerId: "my-producer",
lingerMs: 10,
});
// Pipe from fetch
const response = await fetch("https://source.com/data");
await response.body.pipeTo(writable);
Error Recovery
const res = await stream({
url: "https://...",
onError: async (error) => {
if (error instanceof FetchError && error.status === 401) {
const newToken = await refreshToken();
return { headers: { Authorization: `Bearer ${newToken}` } };
}
// Return undefined to propagate error
},
});
Browser Support
Requires modern browsers with:
- Fetch API
- ReadableStream
- AbortController
For older browsers, use polyfills:
npm install whatwg-fetch web-streams-polyfill
Source Code
Source: packages/client/src/