Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The Node.js server package (@durable-streams/server) provides a reference implementation of the Durable Streams protocol. It supports both in-memory and file-backed storage modes, making it suitable for development, testing, and production workloads.
This is a development/testing server. For production deployments, consider using the Caddy-based server.
Installation
npm install @durable-streams/server
Quick Start
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
host: "127.0.0.1",
})
await server.start()
console.log(`Server running at ${server.url}`)
DurableStreamTestServer
Constructor
const server = new DurableStreamTestServer(options?: TestServerOptions)
Configuration options for the serverhost
string
default:"127.0.0.1"
Host to bind to
Default long-poll timeout in milliseconds
Data directory for file-backed storage. If provided, enables file-backed mode using LMDB and append-only logs. If omitted, uses in-memory storage.
Hook called when a stream is created
Hook called when a stream is deleted
Enable gzip/deflate compression for responses
Interval in seconds for cursor calculation. Used for CDN cache collapsing to prevent infinite cache loops.
cursorEpoch
Date
default:"October 9, 2024 00:00:00 UTC"
Epoch timestamp for cursor interval calculation
Methods
start()
Starts the server and returns the server URL.
await server.start(): Promise<string>
Returns: Server URL (e.g., http://127.0.0.1:4437)
stop()
Stops the server and cleans up resources.
await server.stop(): Promise<void>
clear()
Clears all streams from the store.
injectFault()
Injects a fault to be triggered on the next N requests to a path. Used for testing retry/resilience behavior.
server.injectFault(path: string, fault: InjectedFault): void
Stream path to inject fault for
Fault configurationHTTP status code to return (if set, returns error response)
Number of times to trigger this fault
Retry-After header value in seconds
Delay in milliseconds before responding
Drop the connection after sending headers (simulates network failure)
Truncate response body to this many bytes
Probability of triggering fault (0-1)
Only match specific HTTP method (GET, POST, PUT, DELETE)
Corrupt the response body by flipping random bits
Add jitter to delay (random 0-jitterMs added to delayMs)
Properties
url
The server URL.
store
The underlying storage instance.
server.store: StreamStore | FileBackedStreamStore
Storage Modes
In-Memory Mode
Fast, ephemeral storage for development and testing:
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
// No dataDir = in-memory storage
})
await server.start()
File-Backed Mode
Persistent storage with streams stored as log files and LMDB for metadata:
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
dataDir: "./data/streams",
})
await server.start()
StreamStore
In-memory stream storage implementation.
Methods
create()
Creates a new stream or returns existing stream if it matches configuration.
store.create(path: string, options: CreateOptions): Stream
Content type of the stream
Absolute expiry time (ISO 8601)
Optional initial data to append on creation
Create the stream in closed state
get()
Retrieves a stream by path.
store.get(path: string): Stream | undefined
has()
Checks if a stream exists.
store.has(path: string): boolean
delete()
Deletes a stream.
store.delete(path: string): boolean
append()
Appends data to a stream.
store.append(path: string, data: Uint8Array, options?: AppendOptions): StreamMessage
Sequence number for write coordination
Content type (must match stream’s content type)
Producer ID for idempotent writes
Producer epoch for idempotent writes
Producer sequence for idempotent writes
Close the stream after appending
read()
Reads messages from a stream starting from an offset.
store.read(path: string, offset?: string): ReadResult
Returns:
interface ReadResult {
messages: StreamMessage[]
upToDate: boolean
}
Waits for new messages on a stream (for long-polling).
store.waitForMessages(
path: string,
offset: string,
timeout: number
): Promise<WaitResult>
Returns:
interface WaitResult {
messages: StreamMessage[]
timedOut: boolean
streamClosed: boolean
}
closeStream()
Closes a stream to prevent further appends.
store.closeStream(path: string): { finalOffset: string } | null
Formats messages for HTTP response (wraps JSON in array brackets).
store.formatResponse(path: string, messages: StreamMessage[]): Uint8Array
FileBackedStreamStore
Persistent file-backed storage implementation using LMDB for metadata and append-only log files for stream data.
Provides the same interface as StreamStore with additional cleanup methods:
close()
Closes the store and releases file handles.
await store.close(): Promise<void>
Registry Hooks
Track stream lifecycle events (creation, deletion) with a registry stream:
import {
DurableStreamTestServer,
createRegistryHooks,
} from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
onStreamCreated: createRegistryHooks({
registryPath: "__registry__",
}).onStreamCreated,
onStreamDeleted: createRegistryHooks({
registryPath: "__registry__",
}).onStreamDeleted,
})
The registry maintains a system stream that tracks all stream creates and deletes, useful for building admin UIs or monitoring.
Cursor Management
Cursor utilities for CDN cache collision prevention:
import {
calculateCursor,
generateResponseCursor,
handleCursorCollision,
DEFAULT_CURSOR_EPOCH,
DEFAULT_CURSOR_INTERVAL_SECONDS,
} from "@durable-streams/server"
calculateCursor()
Calculates a time-based interval cursor.
calculateCursor(options?: CursorOptions): string
epoch
Date
default:"October 9, 2024 00:00:00 UTC"
Epoch timestamp for interval calculation
generateResponseCursor()
Generates a response cursor ensuring monotonic progression.
generateResponseCursor(
clientCursor?: string,
options?: CursorOptions
): string
handleCursorCollision()
Handles cursor collision by adding jitter.
handleCursorCollision(
clientCursor: string,
currentCursor: string
): string
Path Encoding
Utilities for encoding/decoding stream paths:
import {
encodeStreamPath,
decodeStreamPath,
} from "@durable-streams/server"
const encoded = encodeStreamPath("/my/stream/path")
const decoded = decodeStreamPath(encoded)
Types
StreamMessage
interface StreamMessage {
data: Uint8Array
offset: string // Format: "<read-seq>_<byte-offset>"
timestamp: number
}
Stream
interface Stream {
path: string
contentType?: string
messages: StreamMessage[]
currentOffset: string
lastSeq?: string
ttlSeconds?: number
expiresAt?: string
createdAt: number
producers?: Map<string, ProducerState>
closed?: boolean
closedBy?: {
producerId: string
epoch: number
seq: number
}
}
StreamLifecycleEvent
interface StreamLifecycleEvent {
type: "created" | "deleted"
path: string
contentType?: string
timestamp: number
}
ProducerState
interface ProducerState {
epoch: number
lastSeq: number
lastUpdated: number
}
Testing
Use the conformance test suite to validate protocol compliance:
import { runConformanceTests } from "@durable-streams/server-conformance-tests"
runConformanceTests({
baseUrl: "http://localhost:4437",
})
Examples
Basic Server
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
})
await server.start()
console.log(`Server listening at ${server.url}`)
// Cleanup
process.on("SIGINT", async () => {
await server.stop()
process.exit(0)
})
Persistent Storage
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
dataDir: "./data",
})
await server.start()
With Lifecycle Hooks
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
onStreamCreated: async (event) => {
console.log(`Stream created: ${event.path}`)
},
onStreamDeleted: async (event) => {
console.log(`Stream deleted: ${event.path}`)
},
})
await server.start()
Custom Timeouts
import { DurableStreamTestServer } from "@durable-streams/server"
const server = new DurableStreamTestServer({
port: 4437,
longPollTimeout: 60000, // 60 seconds
compression: true,
})
await server.start()