Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

TypeScript Client

The official TypeScript/JavaScript client for Durable Streams protocol.

Installation

npm install @durable-streams/client
# or
pnpm add @durable-streams/client
# or
yarn add @durable-streams/client

Quick Start

import { stream, DurableStream } from "@durable-streams/client";

// Read-only API (fetch-like)
const res = await stream("https://streams.example.com/my-stream");
const items = await res.json();

// Read/Write handle API
const handle = await DurableStream.create({
  url: "https://streams.example.com/my-stream",
  contentType: "application/json",
});

await handle.append(JSON.stringify({ message: "hello" }));

Core APIs

stream()

Fetch-like read API for consuming streams.
options
StreamOptions
required
Configuration for the stream session
url
string | URL
required
The full URL to the durable stream
headers
HeadersRecord
HTTP headers (values can be static strings or async functions)
params
ParamsRecord
Query parameters (values can be static strings or async functions)
offset
Offset
default:"-1"
Starting offset (-1 for start of stream)
live
LiveMode
default:"true"
Live mode: false, true, "long-poll", or "sse"
json
boolean
Hint: treat content as JSON even if Content-Type doesn’t indicate it
signal
AbortSignal
AbortSignal for cancellation
onError
StreamErrorHandler
Error handler for recoverable errors
StreamResponse<TJson>
object
Stream session with multiple consumption APIs
url
string
The stream URL
offset
Offset
Current offset (advances as data is consumed)
upToDate
boolean
Whether we’ve reached the current end of stream
streamClosed
boolean
Whether the stream is permanently closed (no more data will ever be appended)
body()
() => Promise<Uint8Array>
Accumulate raw bytes until upToDate, then resolve
json<T>()
() => Promise<Array<T>>
Accumulate JSON items into array, resolve at upToDate
text()
() => Promise<string>
Accumulate text chunks into string, resolve at upToDate
bodyStream()
() => ReadableStream<Uint8Array>
Raw bytes as ReadableStream (async iterable)
jsonStream()
() => ReadableStream<TJson>
Individual JSON items as ReadableStream (async iterable)
subscribeJson()
(subscriber: (batch: JsonBatch<T>) => void | Promise<void>) => () => void
Subscribe to JSON batches with backpressure control
Example: Streaming JSON
const res = await stream<{ message: string }>({
  url: "https://streams.example.com/chat",
  live: true,
});

// Subscribe to batches
res.subscribeJson(async (batch) => {
  for (const item of batch.items) {
    console.log(item.message);
  }
  // Save checkpoint
  await db.saveOffset(batch.offset);
});

DurableStream

Handle class for read/write operations.

Static Methods

DurableStream.create()
static
Create a new stream and return a handle
options
CreateOptions
required
url
string | URL
required
Stream URL
contentType
string
Content type (e.g., "application/json")
ttlSeconds
number
Time-to-live in seconds
expiresAt
string
Absolute expiry time (RFC3339)
closed
boolean
default:"false"
Create stream in closed state
DurableStream.connect()
static
Validate that a stream exists via HEAD and return a handle
options
DurableStreamOptions
required
url
string | URL
required
Stream URL

Instance Methods

append()
method
Append data to the stream (with automatic batching)
body
Uint8Array | string | Promise<Uint8Array | string>
required
Data to append (for JSON streams, pass pre-serialized JSON)
options
AppendOptions
seq
string
Writer coordination sequence (monotonic, lexicographic)
contentType
string
Content type override
Example:
const stream = new DurableStream({ url: "https://..." });

// JSON stream
await stream.append(JSON.stringify({ message: "hello" }));

// Byte stream
await stream.append(new Uint8Array([1, 2, 3]));
stream()
method
Start a read session using the handle’s URL/headers
options
Omit<StreamOptions, 'url'>
Stream options (url inherited from handle)
Returns: Promise<StreamResponse<TJson>>
close()
method
Close the stream permanently (no more appends allowed)
options
CloseOptions
body
Uint8Array | string
Optional final message to append atomically
Returns: Promise<CloseResult> with finalOffset
head()
method
Get stream metadata via HEAD requestReturns: Promise<HeadResult> with contentType, offset, streamClosed
delete()
method
Delete the stream

IdempotentProducer

Exactly-once writes with Kafka-style producer semantics.
import { IdempotentProducer, DurableStream } from "@durable-streams/client";

const stream = new DurableStream({ url: "https://..." });
const producer = new IdempotentProducer(stream, "order-service-1", {
  epoch: 0,
  autoClaim: true,
  maxBatchBytes: 1024 * 1024, // 1MB
  lingerMs: 5,
  maxInFlight: 5,
});

// Fire-and-forget writes (returns immediately)
producer.append(JSON.stringify({ orderId: "123" }));
producer.append(JSON.stringify({ orderId: "456" }));

// Ensure all messages are delivered
await producer.flush();
await producer.close();
constructor
method
Create an idempotent producer
stream
DurableStream
required
Target stream
producerId
string
required
Stable identifier (e.g., “order-service-1”)
options
IdempotentProducerOptions
epoch
number
default:"0"
Starting epoch (increment on restart)
autoClaim
boolean
default:"false"
Auto-retry with epoch+1 on 403 (for serverless)
maxBatchBytes
number
default:"1048576"
Max batch size (1MB default)
lingerMs
number
default:"5"
Max wait time before sending batch
maxInFlight
number
default:"5"
Max concurrent batches
onError
(error: Error) => void
Error callback for async failures
append()
method
Fire-and-forget append (returns immediately)
body
Uint8Array | string
required
Data to append (for JSON, pass pre-serialized JSON)
flush()
method
Send pending batch and wait for all in-flight batchesReturns: Promise<void>
close()
method
Flush and close the stream (EOF)
finalMessage
Uint8Array | string
Optional final message (idempotent with producer headers)
Returns: Promise<CloseResult>

Types

Offset

Opaque string representing stream position. Use -1 for start of stream.
type Offset = string;

LiveMode

type LiveMode = boolean | "long-poll" | "sse";
// false: catch-up only
// true: auto-select (SSE for JSON, long-poll for binary)
// "long-poll": explicit long-polling
// "sse": explicit Server-Sent Events

JsonBatch<T>

interface JsonBatch<T = unknown> {
  items: ReadonlyArray<T>;
  offset: Offset;
  upToDate: boolean;
  cursor?: string;
  streamClosed: boolean;
}

ByteChunk / TextChunk

interface ByteChunk {
  data: Uint8Array;
  offset: Offset;
  upToDate: boolean;
  cursor?: string;
  streamClosed: boolean;
}

interface TextChunk {
  text: string;
  offset: Offset;
  upToDate: boolean;
  cursor?: string;
  streamClosed: boolean;
}

Error Handling

import { DurableStreamError, FetchError } from "@durable-streams/client";

try {
  await stream.append(data);
} catch (error) {
  if (error instanceof DurableStreamError) {
    console.log(error.code); // "NOT_FOUND" | "CONFLICT_SEQ" | ...
    console.log(error.status); // HTTP status code
  }
}

Error Codes

  • NOT_FOUND - Stream doesn’t exist
  • CONFLICT_SEQ - Sequence conflict
  • CONFLICT_EXISTS - Stream already exists (create)
  • STREAM_CLOSED - Stream is permanently closed
  • RATE_LIMITED - Rate limit exceeded (429)
  • UNAUTHORIZED - Auth required (401)
  • FORBIDDEN - Access denied (403)

Advanced Features

Dynamic Headers

const res = await stream({
  url: "https://...",
  headers: {
    Authorization: () => getToken(), // Called per-request
    "X-Request-ID": () => crypto.randomUUID(),
  },
});

WritableStream Integration

const stream = new DurableStream({ url: "https://..." });
const writable = stream.writable({
  producerId: "my-producer",
  lingerMs: 10,
});

// Pipe from fetch
const response = await fetch("https://source.com/data");
await response.body.pipeTo(writable);

Error Recovery

const res = await stream({
  url: "https://...",
  onError: async (error) => {
    if (error instanceof FetchError && error.status === 401) {
      const newToken = await refreshToken();
      return { headers: { Authorization: `Bearer ${newToken}` } };
    }
    // Return undefined to propagate error
  },
});

Browser Support

Requires modern browsers with:
  • Fetch API
  • ReadableStream
  • AbortController
For older browsers, use polyfills:
npm install whatwg-fetch web-streams-polyfill

Source Code

Source: packages/client/src/