Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Durable Streams provides structured error types and built-in retry mechanisms to help you build resilient applications.

Error Types

The client library exports three main error types:
import { DurableStreamError } from "@durable-streams/client"

// Protocol-level errors with structured codes
try {
  await stream.create()
} catch (err) {
  if (err instanceof DurableStreamError) {
    console.log(err.code)    // "CONFLICT_EXISTS"
    console.log(err.status)  // 409
    console.log(err.details) // Response body
  }
}

Error Codes

All DurableStreamError instances include a structured error code:
CodeHTTP StatusDescription
NOT_FOUND404Stream doesn’t exist
CONFLICT_EXISTS409Stream already exists (on create)
CONFLICT_SEQ409Sequence gap (idempotent producer)
STREAM_CLOSED409Cannot append to closed stream
UNAUTHORIZED401Authentication failed
FORBIDDEN403Authorization failed or stale epoch
BAD_REQUEST400Invalid request
RATE_LIMITED429Too many requests
BUSY503Server temporarily unavailable
UNKNOWNOtherUnclassified error

Handling Specific Errors

Stream Not Found

Create the stream if it doesn’t exist:
import { DurableStream, DurableStreamError } from "@durable-streams/client"

async function getOrCreateStream(url: string, contentType: string) {
  try {
    // Try connecting to existing stream
    const stream = await DurableStream.connect({ url })
    return stream
  } catch (err) {
    if (err instanceof DurableStreamError && err.code === "NOT_FOUND") {
      // Stream doesn't exist, create it
      return await DurableStream.create({ url, contentType })
    }
    throw err
  }
}

Stream Already Exists

Handle race conditions when multiple clients try to create:
async function ensureStreamExists(url: string, contentType: string) {
  try {
    return await DurableStream.create({ url, contentType })
  } catch (err) {
    if (err instanceof DurableStreamError && err.code === "CONFLICT_EXISTS") {
      // Another client created it, connect instead
      return await DurableStream.connect({ url })
    }
    throw err
  }
}

Stream Closed

Detect and handle closed streams:
import { StreamClosedError } from "@durable-streams/client"

try {
  await stream.append(JSON.stringify({ event: "user.created" }))
} catch (err) {
  if (err instanceof StreamClosedError) {
    console.log(`Stream was closed at offset: ${err.finalOffset}`)
    
    // Option 1: Create a new stream with timestamp suffix
    const newStream = await DurableStream.create({
      url: `${baseUrl}-${Date.now()}`,
      contentType: "application/json",
    })
    
    // Option 2: Log and move to a different stream
    await logToArchive({ closedAt: err.finalOffset })
    
    return
  }
  throw err
}

Authentication Errors

Refresh tokens automatically on 401 errors:
import { DurableStream } from "@durable-streams/client"

class AuthenticatedStreamClient {
  private tokenRefreshPromise: Promise<string> | null = null

  async getStream(url: string) {
    return new DurableStream({
      url,
      headers: {
        Authorization: async () => {
          const token = await this.getValidToken()
          return `Bearer ${token}`
        },
      },
    })
  }

  private async getValidToken(): Promise<string> {
    // Deduplicate concurrent refresh requests
    if (this.tokenRefreshPromise) {
      return this.tokenRefreshPromise
    }

    const cachedToken = this.getCachedToken()
    if (cachedToken && !this.isExpired(cachedToken)) {
      return cachedToken
    }

    this.tokenRefreshPromise = this.refreshToken()
    try {
      const newToken = await this.tokenRefreshPromise
      this.cacheToken(newToken)
      return newToken
    } finally {
      this.tokenRefreshPromise = null
    }
  }

  private getCachedToken(): string | null {
    return localStorage.getItem("auth-token")
  }

  private cacheToken(token: string): void {
    localStorage.setItem("auth-token", token)
  }

  private isExpired(token: string): boolean {
    // Implement token expiry check
    return false
  }

  private async refreshToken(): Promise<string> {
    // Implement token refresh logic
    const response = await fetch("/api/auth/refresh", { method: "POST" })
    const { token } = await response.json()
    return token
  }
}

Automatic Retry with Backoff

Durable Streams includes built-in exponential backoff for transient failures:
const stream = new DurableStream({
  url: "https://streams.example.com/v1/stream/events",
  backoffOptions: {
    maxRetries: 10,
    initialDelayMs: 100,
    maxDelayMs: 30000,
    backoffMultiplier: 2,
    jitter: true, // Add randomness to prevent thundering herd
  },
})
Backoff applies to network errors and server errors (5xx). Protocol errors (4xx) fail immediately without retry.

Custom Retry Logic

Implement application-specific retry strategies:
import { FetchError } from "@durable-streams/client"

async function retryWithStrategy<T>(
  operation: () => Promise<T>,
  shouldRetry: (error: Error, attempt: number) => boolean,
  maxAttempts = 5
): Promise<T> {
  let lastError: Error
  
  for (let attempt = 1; attempt <= maxAttempts; attempt++) {
    try {
      return await operation()
    } catch (err) {
      lastError = err as Error
      
      if (!shouldRetry(lastError, attempt)) {
        throw lastError
      }
      
      const delayMs = Math.min(1000 * Math.pow(2, attempt - 1), 30000)
      await new Promise((resolve) => setTimeout(resolve, delayMs))
    }
  }
  
  throw lastError!
}

// Usage
await retryWithStrategy(
  () => stream.append(data),
  (error, attempt) => {
    // Retry on network errors and 503
    if (error instanceof FetchError) {
      return error.status === 503 || error.status >= 500
    }
    return false
  }
)

Error Handling in Subscriptions

Graceful Degradation

const res = await stream.stream({ live: "long-poll" })

res.subscribeJson(async (batch) => {
  for (const item of batch.items) {
    try {
      await processItem(item)
    } catch (err) {
      // Log error but continue processing other items
      console.error(`Failed to process item:`, err)
      await logFailedItem(item, err)
    }
  }
})

Dead Letter Queue Pattern

const res = await stream.stream({ live: "long-poll" })

res.subscribeJson(async (batch) => {
  const failedItems: Array<{ item: any; error: Error }> = []
  
  for (const item of batch.items) {
    try {
      await processItem(item)
    } catch (err) {
      failedItems.push({ item, error: err as Error })
    }
  }
  
  // Send failed items to DLQ
  if (failedItems.length > 0) {
    await dlqStream.append(
      JSON.stringify({
        sourceStream: stream.url,
        offset: batch.offset,
        failedItems: failedItems.map((f) => ({
          item: f.item,
          error: f.error.message,
        })),
        timestamp: new Date().toISOString(),
      })
    )
  }
})

onError Callback

Use the onError callback for custom error handling during reads:
const res = await stream.stream({
  live: "long-poll",
  onError: async (error) => {
    console.error("Stream error:", error)
    
    // Return undefined to stop
    if (error instanceof DurableStreamError && error.code === "NOT_FOUND") {
      return undefined
    }
    
    // Refresh auth on 401
    if (error instanceof FetchError && error.status === 401) {
      const newToken = await refreshAuthToken()
      return {
        headers: { Authorization: `Bearer ${newToken}` },
      }
    }
    
    // Update params on 403
    if (error instanceof FetchError && error.status === 403) {
      const newTenantId = await switchTenant()
      return {
        params: { tenantId: newTenantId },
      }
    }
    
    // For other errors, let default retry handle it
    return undefined
  },
})
If onError returns undefined, the stream stops retrying and the error is thrown. Return an object with updated headers or params to retry.

IdempotentProducer Error Handling

Handle producer errors with the onError callback:
import { 
  IdempotentProducer, 
  StaleEpochError,
  SequenceGapError 
} from "@durable-streams/client"

const producer = new IdempotentProducer(stream, "my-producer", {
  epoch: 0,
  autoClaim: true,
  onError: (error) => {
    if (error instanceof StaleEpochError) {
      console.error(`Stale epoch. Current: ${error.currentEpoch}`)
      // autoClaim will handle this automatically
    } else if (error instanceof SequenceGapError) {
      console.error(
        `Sequence gap: expected ${error.expectedSeq}, got ${error.receivedSeq}`
      )
    } else {
      console.error("Producer error:", error)
    }
  },
})

// Fire-and-forget writes
producer.append(JSON.stringify({ event: "data" }))

// Ensure delivery before shutdown
try {
  await producer.flush()
  await producer.close()
} catch (err) {
  console.error("Failed to flush producer:", err)
  // Decide: retry, log, or abort
}
With autoClaim: true, the producer automatically handles StaleEpochError by incrementing the epoch and retrying.

Circuit Breaker Pattern

Prevent cascading failures with a circuit breaker:
enum CircuitState {
  CLOSED,
  OPEN,
  HALF_OPEN,
}

class CircuitBreaker {
  private state = CircuitState.CLOSED
  private failureCount = 0
  private lastFailureTime = 0
  private readonly threshold = 5
  private readonly timeout = 60000 // 1 minute

  async execute<T>(operation: () => Promise<T>): Promise<T> {
    if (this.state === CircuitState.OPEN) {
      if (Date.now() - this.lastFailureTime > this.timeout) {
        this.state = CircuitState.HALF_OPEN
      } else {
        throw new Error("Circuit breaker is OPEN")
      }
    }

    try {
      const result = await operation()
      this.onSuccess()
      return result
    } catch (err) {
      this.onFailure()
      throw err
    }
  }

  private onSuccess(): void {
    this.failureCount = 0
    this.state = CircuitState.CLOSED
  }

  private onFailure(): void {
    this.failureCount++
    this.lastFailureTime = Date.now()

    if (this.failureCount >= this.threshold) {
      this.state = CircuitState.OPEN
    }
  }
}

// Usage
const breaker = new CircuitBreaker()

try {
  await breaker.execute(() => stream.append(data))
} catch (err) {
  console.error("Operation failed or circuit open:", err)
}

Monitoring and Observability

Log errors with context for debugging:
import { DurableStreamError, FetchError } from "@durable-streams/client"

function logStreamError(error: Error, context: Record<string, any>): void {
  const baseLog = {
    timestamp: new Date().toISOString(),
    ...context,
  }

  if (error instanceof DurableStreamError) {
    console.error("DurableStreamError:", {
      ...baseLog,
      code: error.code,
      status: error.status,
      details: error.details,
      message: error.message,
    })
  } else if (error instanceof FetchError) {
    console.error("FetchError:", {
      ...baseLog,
      status: error.status,
      url: error.url,
      headers: error.headers,
      text: error.text,
      message: error.message,
    })
  } else {
    console.error("Unknown error:", {
      ...baseLog,
      message: error.message,
      stack: error.stack,
    })
  }
}

// Usage
try {
  await stream.append(data)
} catch (err) {
  logStreamError(err as Error, {
    operation: "append",
    streamUrl: stream.url,
    dataSize: data.length,
  })
}

Testing Error Scenarios

Test error handling with mock responses:
import { DurableStream, DurableStreamError } from "@durable-streams/client"

describe("Error handling", () => {
  it("handles stream not found", async () => {
    const mockFetch = async () =>
      new Response(null, { 
        status: 404, 
        statusText: "Not Found" 
      })

    const stream = new DurableStream({
      url: "https://streams.example.com/v1/stream/missing",
      fetch: mockFetch,
    })

    await expect(stream.head()).rejects.toThrow(DurableStreamError)
  })

  it("retries on 503", async () => {
    let attempts = 0
    const mockFetch = async () => {
      attempts++
      if (attempts < 3) {
        return new Response(null, { status: 503 })
      }
      return new Response(null, { status: 200 })
    }

    const stream = new DurableStream({
      url: "https://streams.example.com/v1/stream/test",
      fetch: mockFetch,
      backoffOptions: { maxRetries: 5, initialDelayMs: 10 },
    })

    await stream.head()
    expect(attempts).toBe(3)
  })
})

Next Steps

Production Deployment

Deploy Durable Streams to production with monitoring and scaling