Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Idempotent producers provide exactly-once write semantics for Durable Streams, eliminating duplicates from client retries while enabling fire-and-forget writes with automatic batching and pipelining.

Why idempotent producers?

Without idempotency, network retries cause duplicate writes:
// Without idempotent producer
try {
  await stream.append('{"order_id": 123, "amount": 100}')
} catch (error) {
  // Network timeout - did the append succeed or not?
  // Retrying might create duplicate order
  await stream.append('{"order_id": 123, "amount": 100}')  // Duplicate?
}

How it works

Idempotent producers use a three-part tuple for deduplication:
1
Producer ID
2
Client-supplied stable identifier (e.g., "order-service-1", UUID):
3
const producer = new IdempotentProducer(
  stream,
  'order-service-1',  // Producer ID
  { epoch: 0 }
)
4
Epoch
5
Client-declared, server-validated monotonic counter. Increment on restart:
6
// Initial start
const producer1 = new IdempotentProducer(stream, 'my-producer', { epoch: 0 })

// After restart
const producer2 = new IdempotentProducer(stream, 'my-producer', { epoch: 1 })
7
Sequence number
8
Monotonically increasing per epoch, per batch:
9
// Automatic sequence tracking
producer.append('message 1')  // seq=0
producer.append('message 2')  // seq=0 (batched with message 1)
await producer.flush()        // Sends batch with seq=0

producer.append('message 3')  // seq=1
await producer.flush()        // Sends batch with seq=1
The server tracks (producerId, epoch, seq) tuples and deduplicates retries:
  • First request with (producerId=order-service-1, epoch=0, seq=0): Accepted (200 OK)
  • Retry with same tuple: Deduplicated (204 No Content)
  • New message with seq=1: Accepted (200 OK)

Basic usage

import { DurableStream, IdempotentProducer } from '@durable-streams/client'

// Create or connect to stream
const stream = await DurableStream.create({
  url: 'https://streams.example.com/orders',
  contentType: 'application/json'
})

// Create idempotent producer
const producer = new IdempotentProducer(
  stream,
  'order-service-1',  // Producer ID (stable across restarts)
  {
    epoch: 0,         // Increment on producer restart
    autoClaim: true   // Auto-claim on epoch conflicts
  }
)

// Fire-and-forget writes
producer.append(JSON.stringify({ orderId: 123, amount: 100 }))
producer.append(JSON.stringify({ orderId: 124, amount: 200 }))
producer.append(JSON.stringify({ orderId: 125, amount: 300 }))

// Ensure all messages are delivered before shutdown
await producer.flush()
await producer.close()

Fire-and-forget semantics

Unlike regular appends, producer.append() returns immediately without waiting for the network:
// Regular append - waits for server response
await stream.append(data)  // Blocks until server ACKs

// Idempotent producer - returns immediately
producer.append(data)  // Returns instantly, batches in background
producer.append(data)  // Returns instantly
producer.append(data)  // Returns instantly

// Wait for all pending messages
await producer.flush()  // Now waits for server ACKs
Fire-and-forget means you can write thousands of messages without blocking. The producer automatically batches and pipelines requests for maximum throughput.

Automatic batching

The producer automatically batches multiple append() calls into single HTTP requests:
const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: 0,
  maxBatchBytes: 1024 * 1024,  // 1MB batch size
  lingerMs: 5                   // Wait up to 5ms for more messages
})

// These 3 appends are batched into 1 HTTP request
producer.append('{"id": 1}')
producer.append('{"id": 2}')
producer.append('{"id": 3}')

// Batch is sent when:
// 1. maxBatchBytes is reached, OR
// 2. lingerMs elapses, OR
// 3. flush() is called
Batching parameters:
  • maxBatchBytes: Maximum batch size (default: 1MB)
  • lingerMs: Maximum time to wait for more messages (default: 5ms)
Tune these based on your workload:
  • High-frequency writes: Use larger batches (1-10MB) and longer linger (10-50ms)
  • Low-latency requirements: Use smaller batches (64-256KB) and shorter linger (1-5ms)

Pipelining for throughput

Idempotent producers support pipelining - multiple batches in flight simultaneously:
const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: 0,
  maxInFlight: 5,  // Up to 5 concurrent HTTP requests
  maxBatchBytes: 256 * 1024,  // 256KB batches
  lingerMs: 10
})

// Producer can have up to 5 batches in flight at once
for (let i = 0; i < 1000; i++) {
  producer.append(JSON.stringify({ id: i }))
}

await producer.flush()
Pipelining dramatically improves throughput over high-latency connections. With maxInFlight=5 and 100ms latency, you can achieve 5x the throughput compared to serial requests.

Epoch management

Manual epoch management

Increment epoch on producer restart to establish a new session:
// Persistent storage for epoch
const savedEpoch = await db.getProducerEpoch('my-producer') ?? 0

const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: savedEpoch
})

// On restart, increment epoch
await producer.restart()
console.log(producer.epoch)  // savedEpoch + 1

// Save new epoch
await db.saveProducerEpoch('my-producer', producer.epoch)

Auto-claim mode

For ephemeral producers (serverless, testing), use autoClaim to automatically claim epochs:
const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: 0,
  autoClaim: true  // Auto-retry with epoch+1 on stale epoch errors
})

// If another producer already claimed epoch=0,
// this producer automatically retries with epoch=1
Auto-claim should be used cautiously. In production, prefer explicit epoch management with persistent storage to avoid accidental zombie fencing.

Zombie fencing

Epochs provide zombie fencing - old producers are automatically rejected:
1
Producer 1 starts
2
const producer1 = new IdempotentProducer(stream, 'my-producer', { epoch: 0 })
producer1.append('message 1')  // Accepted (epoch=0, seq=0)
3
Producer 1 crashes, Producer 2 starts with higher epoch
4
const producer2 = new IdempotentProducer(stream, 'my-producer', { epoch: 1 })
producer2.append('message 2')  // Accepted (epoch=1, seq=0)
5
Producer 1 recovers (zombie) - requests are rejected
6
producer1.append('message 3')  // Rejected! 403 Forbidden (stale epoch)
// Server returns: Producer-Epoch: 1
This prevents split-brain scenarios where two producers think they’re the active writer.

Error handling

Fire-and-forget with error callback

Since append() returns immediately, errors are reported via callback:
const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: 0,
  onError: (error) => {
    if (error instanceof StaleEpochError) {
      console.error('Producer is stale, shutting down')
      producer.detach()
    } else {
      console.error('Append error:', error)
      // Log to monitoring system
    }
  }
})

producer.append('data')  // Errors reported via onError callback

Flush for immediate error detection

Call flush() to wait for pending batches and detect errors:
try {
  producer.append('message 1')
  producer.append('message 2')
  await producer.flush()  // Throws if any batch failed
} catch (error) {
  if (error instanceof StaleEpochError) {
    console.error('Stale epoch, need to restart')
  }
}

Closing streams with idempotent producers

Idempotent producer close is fully idempotent, even with a final message:
// Close with final message (idempotent)
await producer.close(JSON.stringify({ status: 'complete' }))

// Safe to retry - deduplication works
await producer.close(JSON.stringify({ status: 'complete' }))
// Returns same finalOffset, no duplicate

// Or close without final message
await producer.close()
Unlike DurableStream.close({ body }) which is not idempotent with a body, IdempotentProducer.close() uses producer headers for deduplication, making it safe to retry.

Detach vs. close

Two ways to stop a producer:

detach() - Stop without closing stream

// Stop this producer but keep stream open
await producer.detach()

// Another producer can continue writing
const producer2 = new IdempotentProducer(stream, 'producer-2', { epoch: 0 })
producer2.append('more data')

close() - Close the stream (EOF)

// Close the stream (no further writes permitted)
await producer.close()

// Future appends will fail with 409 Conflict
Use detach() when handing off writing to another producer. Use close() when signaling end-of-stream to readers.

Advanced: Writable streams

Create a WritableStream that pipes to an idempotent producer:
import { DurableStream } from '@durable-streams/client'

const stream = await DurableStream.create({
  url: 'https://streams.example.com/data',
  contentType: 'application/octet-stream'
})

// Create writable stream with idempotent producer under the hood
const writable = stream.writable({
  producerId: 'my-producer',
  lingerMs: 10,
  maxBatchBytes: 64 * 1024
})

// Pipe from any source
const response = await fetch('https://example.com/large-file.dat')
await response.body!.pipeTo(writable)

console.log('File uploaded to durable stream')
The WritableStream:
  • Uses IdempotentProducer internally for exactly-once delivery
  • Supports backpressure
  • Automatically batches chunks
  • Closes the stream on WritableStream close
  • Detaches (doesn’t close) on WritableStream abort

Performance tuning

1
Tune batch size for your workload
2
// High-throughput workload (large batches)
const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: 0,
  maxBatchBytes: 10 * 1024 * 1024,  // 10MB batches
  lingerMs: 50,                      // Wait up to 50ms
  maxInFlight: 10                    // 10 concurrent requests
})

// Low-latency workload (small batches)
const producer = new IdempotentProducer(stream, 'my-producer', {
  epoch: 0,
  maxBatchBytes: 64 * 1024,  // 64KB batches
  lingerMs: 1,               // Send after 1ms
  maxInFlight: 3             // 3 concurrent requests
})
3
Monitor in-flight state
4
console.log('Pending messages:', producer.pendingCount)
console.log('Batches in flight:', producer.inFlightCount)
console.log('Current epoch:', producer.epoch)
console.log('Next sequence:', producer.nextSeq)
5
Flush before shutdown
6
process.on('SIGTERM', async () => {
  console.log('Shutting down, flushing producer')
  await producer.flush()
  await producer.close()
  process.exit(0)
})

Best practices

Always flush before shutdown to ensure pending messages are delivered.
Use stable producer IDs that persist across restarts (e.g., service name + instance ID).
Persist epochs in production for explicit session management and zombie prevention.
Use autoClaim: true only for ephemeral producers (testing, serverless) where zombie risk is acceptable.
Don’t share producer instances across threads or workers. Each thread should have its own producer with a unique producer ID.

Next steps

Caching and Fanout

Scale reads to millions of viewers with CDN caching

Protocol Overview

Review the core protocol concepts