Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Durable Streams provides structured error types and built-in retry mechanisms to help you build resilient applications.
Error Types
The client library exports three main error types:
import { DurableStreamError } from "@durable-streams/client"
// Protocol-level errors with structured codes
try {
await stream.create()
} catch (err) {
if (err instanceof DurableStreamError) {
console.log(err.code) // "CONFLICT_EXISTS"
console.log(err.status) // 409
console.log(err.details) // Response body
}
}
Error Codes
All DurableStreamError instances include a structured error code:
| Code | HTTP Status | Description |
|---|
NOT_FOUND | 404 | Stream doesn’t exist |
CONFLICT_EXISTS | 409 | Stream already exists (on create) |
CONFLICT_SEQ | 409 | Sequence gap (idempotent producer) |
STREAM_CLOSED | 409 | Cannot append to closed stream |
UNAUTHORIZED | 401 | Authentication failed |
FORBIDDEN | 403 | Authorization failed or stale epoch |
BAD_REQUEST | 400 | Invalid request |
RATE_LIMITED | 429 | Too many requests |
BUSY | 503 | Server temporarily unavailable |
UNKNOWN | Other | Unclassified error |
Handling Specific Errors
Stream Not Found
Create the stream if it doesn’t exist:
import { DurableStream, DurableStreamError } from "@durable-streams/client"
async function getOrCreateStream(url: string, contentType: string) {
try {
// Try connecting to existing stream
const stream = await DurableStream.connect({ url })
return stream
} catch (err) {
if (err instanceof DurableStreamError && err.code === "NOT_FOUND") {
// Stream doesn't exist, create it
return await DurableStream.create({ url, contentType })
}
throw err
}
}
Stream Already Exists
Handle race conditions when multiple clients try to create:
async function ensureStreamExists(url: string, contentType: string) {
try {
return await DurableStream.create({ url, contentType })
} catch (err) {
if (err instanceof DurableStreamError && err.code === "CONFLICT_EXISTS") {
// Another client created it, connect instead
return await DurableStream.connect({ url })
}
throw err
}
}
Stream Closed
Detect and handle closed streams:
import { StreamClosedError } from "@durable-streams/client"
try {
await stream.append(JSON.stringify({ event: "user.created" }))
} catch (err) {
if (err instanceof StreamClosedError) {
console.log(`Stream was closed at offset: ${err.finalOffset}`)
// Option 1: Create a new stream with timestamp suffix
const newStream = await DurableStream.create({
url: `${baseUrl}-${Date.now()}`,
contentType: "application/json",
})
// Option 2: Log and move to a different stream
await logToArchive({ closedAt: err.finalOffset })
return
}
throw err
}
Authentication Errors
Refresh tokens automatically on 401 errors:
import { DurableStream } from "@durable-streams/client"
class AuthenticatedStreamClient {
private tokenRefreshPromise: Promise<string> | null = null
async getStream(url: string) {
return new DurableStream({
url,
headers: {
Authorization: async () => {
const token = await this.getValidToken()
return `Bearer ${token}`
},
},
})
}
private async getValidToken(): Promise<string> {
// Deduplicate concurrent refresh requests
if (this.tokenRefreshPromise) {
return this.tokenRefreshPromise
}
const cachedToken = this.getCachedToken()
if (cachedToken && !this.isExpired(cachedToken)) {
return cachedToken
}
this.tokenRefreshPromise = this.refreshToken()
try {
const newToken = await this.tokenRefreshPromise
this.cacheToken(newToken)
return newToken
} finally {
this.tokenRefreshPromise = null
}
}
private getCachedToken(): string | null {
return localStorage.getItem("auth-token")
}
private cacheToken(token: string): void {
localStorage.setItem("auth-token", token)
}
private isExpired(token: string): boolean {
// Implement token expiry check
return false
}
private async refreshToken(): Promise<string> {
// Implement token refresh logic
const response = await fetch("/api/auth/refresh", { method: "POST" })
const { token } = await response.json()
return token
}
}
Automatic Retry with Backoff
Durable Streams includes built-in exponential backoff for transient failures:
const stream = new DurableStream({
url: "https://streams.example.com/v1/stream/events",
backoffOptions: {
maxRetries: 10,
initialDelayMs: 100,
maxDelayMs: 30000,
backoffMultiplier: 2,
jitter: true, // Add randomness to prevent thundering herd
},
})
Backoff applies to network errors and server errors (5xx). Protocol errors (4xx) fail immediately without retry.
Custom Retry Logic
Implement application-specific retry strategies:
import { FetchError } from "@durable-streams/client"
async function retryWithStrategy<T>(
operation: () => Promise<T>,
shouldRetry: (error: Error, attempt: number) => boolean,
maxAttempts = 5
): Promise<T> {
let lastError: Error
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await operation()
} catch (err) {
lastError = err as Error
if (!shouldRetry(lastError, attempt)) {
throw lastError
}
const delayMs = Math.min(1000 * Math.pow(2, attempt - 1), 30000)
await new Promise((resolve) => setTimeout(resolve, delayMs))
}
}
throw lastError!
}
// Usage
await retryWithStrategy(
() => stream.append(data),
(error, attempt) => {
// Retry on network errors and 503
if (error instanceof FetchError) {
return error.status === 503 || error.status >= 500
}
return false
}
)
Error Handling in Subscriptions
Graceful Degradation
const res = await stream.stream({ live: "long-poll" })
res.subscribeJson(async (batch) => {
for (const item of batch.items) {
try {
await processItem(item)
} catch (err) {
// Log error but continue processing other items
console.error(`Failed to process item:`, err)
await logFailedItem(item, err)
}
}
})
Dead Letter Queue Pattern
const res = await stream.stream({ live: "long-poll" })
res.subscribeJson(async (batch) => {
const failedItems: Array<{ item: any; error: Error }> = []
for (const item of batch.items) {
try {
await processItem(item)
} catch (err) {
failedItems.push({ item, error: err as Error })
}
}
// Send failed items to DLQ
if (failedItems.length > 0) {
await dlqStream.append(
JSON.stringify({
sourceStream: stream.url,
offset: batch.offset,
failedItems: failedItems.map((f) => ({
item: f.item,
error: f.error.message,
})),
timestamp: new Date().toISOString(),
})
)
}
})
onError Callback
Use the onError callback for custom error handling during reads:
const res = await stream.stream({
live: "long-poll",
onError: async (error) => {
console.error("Stream error:", error)
// Return undefined to stop
if (error instanceof DurableStreamError && error.code === "NOT_FOUND") {
return undefined
}
// Refresh auth on 401
if (error instanceof FetchError && error.status === 401) {
const newToken = await refreshAuthToken()
return {
headers: { Authorization: `Bearer ${newToken}` },
}
}
// Update params on 403
if (error instanceof FetchError && error.status === 403) {
const newTenantId = await switchTenant()
return {
params: { tenantId: newTenantId },
}
}
// For other errors, let default retry handle it
return undefined
},
})
If onError returns undefined, the stream stops retrying and the error is thrown. Return an object with updated headers or params to retry.
IdempotentProducer Error Handling
Handle producer errors with the onError callback:
import {
IdempotentProducer,
StaleEpochError,
SequenceGapError
} from "@durable-streams/client"
const producer = new IdempotentProducer(stream, "my-producer", {
epoch: 0,
autoClaim: true,
onError: (error) => {
if (error instanceof StaleEpochError) {
console.error(`Stale epoch. Current: ${error.currentEpoch}`)
// autoClaim will handle this automatically
} else if (error instanceof SequenceGapError) {
console.error(
`Sequence gap: expected ${error.expectedSeq}, got ${error.receivedSeq}`
)
} else {
console.error("Producer error:", error)
}
},
})
// Fire-and-forget writes
producer.append(JSON.stringify({ event: "data" }))
// Ensure delivery before shutdown
try {
await producer.flush()
await producer.close()
} catch (err) {
console.error("Failed to flush producer:", err)
// Decide: retry, log, or abort
}
With autoClaim: true, the producer automatically handles StaleEpochError by incrementing the epoch and retrying.
Circuit Breaker Pattern
Prevent cascading failures with a circuit breaker:
enum CircuitState {
CLOSED,
OPEN,
HALF_OPEN,
}
class CircuitBreaker {
private state = CircuitState.CLOSED
private failureCount = 0
private lastFailureTime = 0
private readonly threshold = 5
private readonly timeout = 60000 // 1 minute
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === CircuitState.OPEN) {
if (Date.now() - this.lastFailureTime > this.timeout) {
this.state = CircuitState.HALF_OPEN
} else {
throw new Error("Circuit breaker is OPEN")
}
}
try {
const result = await operation()
this.onSuccess()
return result
} catch (err) {
this.onFailure()
throw err
}
}
private onSuccess(): void {
this.failureCount = 0
this.state = CircuitState.CLOSED
}
private onFailure(): void {
this.failureCount++
this.lastFailureTime = Date.now()
if (this.failureCount >= this.threshold) {
this.state = CircuitState.OPEN
}
}
}
// Usage
const breaker = new CircuitBreaker()
try {
await breaker.execute(() => stream.append(data))
} catch (err) {
console.error("Operation failed or circuit open:", err)
}
Monitoring and Observability
Log errors with context for debugging:
import { DurableStreamError, FetchError } from "@durable-streams/client"
function logStreamError(error: Error, context: Record<string, any>): void {
const baseLog = {
timestamp: new Date().toISOString(),
...context,
}
if (error instanceof DurableStreamError) {
console.error("DurableStreamError:", {
...baseLog,
code: error.code,
status: error.status,
details: error.details,
message: error.message,
})
} else if (error instanceof FetchError) {
console.error("FetchError:", {
...baseLog,
status: error.status,
url: error.url,
headers: error.headers,
text: error.text,
message: error.message,
})
} else {
console.error("Unknown error:", {
...baseLog,
message: error.message,
stack: error.stack,
})
}
}
// Usage
try {
await stream.append(data)
} catch (err) {
logStreamError(err as Error, {
operation: "append",
streamUrl: stream.url,
dataSize: data.length,
})
}
Testing Error Scenarios
Test error handling with mock responses:
import { DurableStream, DurableStreamError } from "@durable-streams/client"
describe("Error handling", () => {
it("handles stream not found", async () => {
const mockFetch = async () =>
new Response(null, {
status: 404,
statusText: "Not Found"
})
const stream = new DurableStream({
url: "https://streams.example.com/v1/stream/missing",
fetch: mockFetch,
})
await expect(stream.head()).rejects.toThrow(DurableStreamError)
})
it("retries on 503", async () => {
let attempts = 0
const mockFetch = async () => {
attempts++
if (attempts < 3) {
return new Response(null, { status: 503 })
}
return new Response(null, { status: 200 })
}
const stream = new DurableStream({
url: "https://streams.example.com/v1/stream/test",
fetch: mockFetch,
backoffOptions: { maxRetries: 5, initialDelayMs: 10 },
})
await stream.head()
expect(attempts).toBe(3)
})
})
Next Steps
Production Deployment
Deploy Durable Streams to production with monitoring and scaling