Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Overview

The Node.js server package (@durable-streams/server) provides a reference implementation of the Durable Streams protocol. It supports both in-memory and file-backed storage modes, making it suitable for development, testing, and production workloads.
This is a development/testing server. For production deployments, consider using the Caddy-based server.

Installation

npm install @durable-streams/server

Quick Start

import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  host: "127.0.0.1",
})

await server.start()
console.log(`Server running at ${server.url}`)

DurableStreamTestServer

Constructor

const server = new DurableStreamTestServer(options?: TestServerOptions)
options
TestServerOptions
Configuration options for the server
port
number
default:"4437"
Port to listen on
host
string
default:"127.0.0.1"
Host to bind to
longPollTimeout
number
default:"30000"
Default long-poll timeout in milliseconds
dataDir
string
Data directory for file-backed storage. If provided, enables file-backed mode using LMDB and append-only logs. If omitted, uses in-memory storage.
onStreamCreated
StreamLifecycleHook
Hook called when a stream is created
onStreamDeleted
StreamLifecycleHook
Hook called when a stream is deleted
compression
boolean
default:"true"
Enable gzip/deflate compression for responses
cursorIntervalSeconds
number
default:"20"
Interval in seconds for cursor calculation. Used for CDN cache collapsing to prevent infinite cache loops.
cursorEpoch
Date
default:"October 9, 2024 00:00:00 UTC"
Epoch timestamp for cursor interval calculation

Methods

start()

Starts the server and returns the server URL.
await server.start(): Promise<string>
Returns: Server URL (e.g., http://127.0.0.1:4437)

stop()

Stops the server and cleans up resources.
await server.stop(): Promise<void>

clear()

Clears all streams from the store.
server.clear(): void

injectFault()

Injects a fault to be triggered on the next N requests to a path. Used for testing retry/resilience behavior.
server.injectFault(path: string, fault: InjectedFault): void
path
string
required
Stream path to inject fault for
fault
InjectedFault
Fault configuration
status
number
HTTP status code to return (if set, returns error response)
count
number
default:"1"
Number of times to trigger this fault
retryAfter
number
Retry-After header value in seconds
delayMs
number
Delay in milliseconds before responding
dropConnection
boolean
Drop the connection after sending headers (simulates network failure)
truncateBodyBytes
number
Truncate response body to this many bytes
probability
number
default:"1.0"
Probability of triggering fault (0-1)
method
string
Only match specific HTTP method (GET, POST, PUT, DELETE)
corruptBody
boolean
Corrupt the response body by flipping random bits
jitterMs
number
Add jitter to delay (random 0-jitterMs added to delayMs)

Properties

url

The server URL.
server.url: string

store

The underlying storage instance.
server.store: StreamStore | FileBackedStreamStore

Storage Modes

In-Memory Mode

Fast, ephemeral storage for development and testing:
import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  // No dataDir = in-memory storage
})

await server.start()

File-Backed Mode

Persistent storage with streams stored as log files and LMDB for metadata:
import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  dataDir: "./data/streams",
})

await server.start()

StreamStore

In-memory stream storage implementation.

Methods

create()

Creates a new stream or returns existing stream if it matches configuration.
store.create(path: string, options: CreateOptions): Stream
path
string
required
Stream path (URL path)
options
CreateOptions
contentType
string
Content type of the stream
ttlSeconds
number
TTL in seconds
expiresAt
string
Absolute expiry time (ISO 8601)
initialData
Uint8Array
Optional initial data to append on creation
closed
boolean
Create the stream in closed state

get()

Retrieves a stream by path.
store.get(path: string): Stream | undefined

has()

Checks if a stream exists.
store.has(path: string): boolean

delete()

Deletes a stream.
store.delete(path: string): boolean

append()

Appends data to a stream.
store.append(path: string, data: Uint8Array, options?: AppendOptions): StreamMessage
options
AppendOptions
seq
string
Sequence number for write coordination
contentType
string
Content type (must match stream’s content type)
producerId
string
Producer ID for idempotent writes
producerEpoch
number
Producer epoch for idempotent writes
producerSeq
number
Producer sequence for idempotent writes
close
boolean
Close the stream after appending

read()

Reads messages from a stream starting from an offset.
store.read(path: string, offset?: string): ReadResult
Returns:
interface ReadResult {
  messages: StreamMessage[]
  upToDate: boolean
}

waitForMessages()

Waits for new messages on a stream (for long-polling).
store.waitForMessages(
  path: string,
  offset: string,
  timeout: number
): Promise<WaitResult>
Returns:
interface WaitResult {
  messages: StreamMessage[]
  timedOut: boolean
  streamClosed: boolean
}

closeStream()

Closes a stream to prevent further appends.
store.closeStream(path: string): { finalOffset: string } | null

formatResponse()

Formats messages for HTTP response (wraps JSON in array brackets).
store.formatResponse(path: string, messages: StreamMessage[]): Uint8Array

FileBackedStreamStore

Persistent file-backed storage implementation using LMDB for metadata and append-only log files for stream data. Provides the same interface as StreamStore with additional cleanup methods:

close()

Closes the store and releases file handles.
await store.close(): Promise<void>

Registry Hooks

Track stream lifecycle events (creation, deletion) with a registry stream:
import {
  DurableStreamTestServer,
  createRegistryHooks,
} from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  onStreamCreated: createRegistryHooks({
    registryPath: "__registry__",
  }).onStreamCreated,
  onStreamDeleted: createRegistryHooks({
    registryPath: "__registry__",
  }).onStreamDeleted,
})
The registry maintains a system stream that tracks all stream creates and deletes, useful for building admin UIs or monitoring.

Cursor Management

Cursor utilities for CDN cache collision prevention:
import {
  calculateCursor,
  generateResponseCursor,
  handleCursorCollision,
  DEFAULT_CURSOR_EPOCH,
  DEFAULT_CURSOR_INTERVAL_SECONDS,
} from "@durable-streams/server"

calculateCursor()

Calculates a time-based interval cursor.
calculateCursor(options?: CursorOptions): string
options
CursorOptions
epoch
Date
default:"October 9, 2024 00:00:00 UTC"
Epoch timestamp for interval calculation
intervalSeconds
number
default:"20"
Interval in seconds

generateResponseCursor()

Generates a response cursor ensuring monotonic progression.
generateResponseCursor(
  clientCursor?: string,
  options?: CursorOptions
): string

handleCursorCollision()

Handles cursor collision by adding jitter.
handleCursorCollision(
  clientCursor: string,
  currentCursor: string
): string

Path Encoding

Utilities for encoding/decoding stream paths:
import {
  encodeStreamPath,
  decodeStreamPath,
} from "@durable-streams/server"

const encoded = encodeStreamPath("/my/stream/path")
const decoded = decodeStreamPath(encoded)

Types

StreamMessage

interface StreamMessage {
  data: Uint8Array
  offset: string // Format: "<read-seq>_<byte-offset>"
  timestamp: number
}

Stream

interface Stream {
  path: string
  contentType?: string
  messages: StreamMessage[]
  currentOffset: string
  lastSeq?: string
  ttlSeconds?: number
  expiresAt?: string
  createdAt: number
  producers?: Map<string, ProducerState>
  closed?: boolean
  closedBy?: {
    producerId: string
    epoch: number
    seq: number
  }
}

StreamLifecycleEvent

interface StreamLifecycleEvent {
  type: "created" | "deleted"
  path: string
  contentType?: string
  timestamp: number
}

ProducerState

interface ProducerState {
  epoch: number
  lastSeq: number
  lastUpdated: number
}

Testing

Use the conformance test suite to validate protocol compliance:
import { runConformanceTests } from "@durable-streams/server-conformance-tests"

runConformanceTests({
  baseUrl: "http://localhost:4437",
})

Examples

Basic Server

import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
})

await server.start()
console.log(`Server listening at ${server.url}`)

// Cleanup
process.on("SIGINT", async () => {
  await server.stop()
  process.exit(0)
})

Persistent Storage

import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  dataDir: "./data",
})

await server.start()

With Lifecycle Hooks

import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  onStreamCreated: async (event) => {
    console.log(`Stream created: ${event.path}`)
  },
  onStreamDeleted: async (event) => {
    console.log(`Stream deleted: ${event.path}`)
  },
})

await server.start()

Custom Timeouts

import { DurableStreamTestServer } from "@durable-streams/server"

const server = new DurableStreamTestServer({
  port: 4437,
  longPollTimeout: 60000, // 60 seconds
  compression: true,
})

await server.start()