Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Ruby Client

The official Ruby client for Durable Streams protocol with enumerable streaming.

Installation

Add to your Gemfile:
gem 'durable_streams'
Or install directly:
gem install durable_streams
Requires Ruby 3.0 or later

Quick Start

require 'durable_streams'

# Create a client
client = DurableStreams::Client.new(base_url: 'https://streams.example.com')
stream = client.stream('/my-stream')

# Create the stream
stream.create(content_type: 'application/json')

# Append data
stream.append({message: 'hello'}.to_json)

# Read data
stream.read(offset: '-1', live: :long_poll).each do |batch|
  batch.items.each do |item|
    puts "Item: #{item}"
  end
  # Save checkpoint
  save_offset(batch.offset)
end

Core APIs

Client

Client.new
method
Create a new client
base_url
String | nil
Base URL for relative paths
headers
Hash
Default headers (values can be strings or callables)
timeout
Numeric
default:"30"
Request timeout in seconds
retry_policy
RetryPolicy | nil
Custom retry configuration
Returns: Client
Example:
client = DurableStreams::Client.new(
  base_url: 'https://streams.example.com',
  headers: {
    'Authorization' => 'Bearer token123',
    'X-Request-ID' => -> { SecureRandom.uuid }
  },
  timeout: 60
)
stream
method
Get a stream handle
url
String
required
Stream URL or path
options
Hash
Additional options
Returns: Stream
Client.open
class method
Open a client with block form for automatic cleanup
options
Hash
required
Client options
block
Block
required
Block to execute
Returns: Block’s return value
Example:
DurableStreams::Client.open(base_url: 'https://...') do |client|
  stream = client.stream('/events')
  # ...
end # auto-closes

Stream

Handle to a durable stream for read/write operations.

Static Factory Methods

Stream.create
class method
Create a new stream and return a handle
url
String
required
Stream URL or path
content_type
String
required
Content type (e.g., "application/json")
context
Context
Client context
ttl_seconds
Integer | nil
Time-to-live in seconds
expires_at
String | nil
Absolute expiry time (ISO8601)
closed
Boolean
default:"false"
Create in closed state
Returns: StreamRaises: StreamExistsError if stream already exists
Stream.connect
class method
Connect to an existing stream (validates via HEAD)
url
String
required
Stream URL or path
context
Context
Client context
Returns: StreamRaises: StreamNotFoundError if stream doesn’t exist
Example:
stream = DurableStreams::Stream.create(
  'https://streams.example.com/events',
  content_type: 'application/json',
  ttl_seconds: 86400  # 24 hours
)

Write Operations

append
method
Append data to the stream
data
String | Hash | Array
required
Data to append (for JSON streams, pass Hash/Array or pre-encoded string)
seq
String | nil
Writer coordination sequence
Returns: {next_offset: String}Raises: StreamClosedError, SeqConflictError, DurableStreamError
Example:
# JSON stream - pass hash (auto-encoded)
result = stream.append({message: 'hello'})
puts "Offset: #{result[:next_offset]}"

# Or pre-encode JSON
result = stream.append('{"message":"hello"}')

# Byte stream
result = stream.append('raw bytes')

# With sequence
result = stream.append({message: 'world'}, seq: 'seq-001')
close_stream
method
Close the stream permanently (no more appends)
data
String | Hash | nil
Optional final message
Returns: {final_offset: String}Raises: StreamClosedError if called with data on already-closed stream
delete
method
Delete this streamRaises: StreamNotFoundError if stream doesn’t exist

Read Operations

read
method
Read from the stream as an Enumerable
offset
String
default:"'-1'"
Starting offset
live
:off | :long_poll | :sse
default:":off"
Live mode
cursor
String | nil
Stream cursor for CDN collapsing
Returns: Enumerable (yields batches)
Example:
stream.read(offset: last_offset, live: :long_poll).each do |batch|
  # For JSON streams
  if batch.respond_to?(:items)
    batch.items.each do |item|
      puts "Item: #{item.inspect}"
    end
  else
    # For byte streams
    puts "Got #{batch.data.bytesize} bytes"
  end

  # Save checkpoint
  save_checkpoint(batch.offset)
end
read_json
method
Read JSON items as an Enumerable
offset
String
default:"'-1'"
Starting offset
live
:off | :long_poll | :sse
default:":off"
Live mode
Returns: Enumerable (yields individual items, not batches)
Example:
stream.read_json(live: :long_poll).each do |item|
  puts "Event #{item['id']}: #{item['type']}"
  process_event(item)
end
head
method
Get stream metadata via HEAD requestReturns: {content_type: String, offset: String, stream_closed: Boolean}Raises: StreamNotFoundError if stream doesn’t exist

Producer

Idempotent producer for exactly-once writes.
producer = DurableStreams::Producer.new(
  stream: stream,
  producer_id: 'order-service-1',
  epoch: 0,
  auto_claim: true,
  max_batch_bytes: 1024 * 1024,
  linger_ms: 5
)

# Fire-and-forget writes
producer.append({orderId: '123'}.to_json)
producer.append({orderId: '456'}.to_json)

# Ensure delivery
producer.flush
producer.close
Producer.new
method
Create an idempotent producer
stream
Stream
required
Target stream
producer_id
String
required
Stable identifier
epoch
Integer
default:"0"
Starting epoch
auto_claim
Boolean
default:"false"
Auto-retry with epoch+1 on 403
max_batch_bytes
Integer
default:"1048576"
Max batch size (1MB)
linger_ms
Integer
default:"5"
Max wait before sending batch
on_error
Proc | nil
Error callback
Returns: Producer
append
method
Fire-and-forget append (returns immediately)
data
String
required
Data to append (for JSON, pass pre-encoded JSON)
flush
method
Send pending batch and wait for all in-flight batchesRaises: ProducerError on failure
close
method
Flush and close (no stream close)
close_stream
method
Close the stream with producer headers (idempotent)
data
String | nil
Optional final message
Returns: {final_offset: String}

Types

Offset

# String type - use "-1" for start of stream
offset = "-1"
offset = "123_456"

LiveMode

# Symbols
:off        # Catch-up only
:long_poll  # Long-polling
:sse        # Server-Sent Events

Error Handling

require 'durable_streams'

begin
  stream.append(data)
rescue DurableStreams::StreamClosedError => e
  puts "Stream is closed: #{e.url}"
rescue DurableStreams::SeqConflictError
  puts "Sequence conflict"
rescue DurableStreams::StreamNotFoundError => e
  puts "Stream not found: #{e.url}"
rescue DurableStreams::DurableStreamError => e
  puts "Error #{e.status_code}: #{e.message}"
end

Error Hierarchy

  • DurableStreamError - Base exception
    • StreamNotFoundError - 404
    • StreamExistsError - 409 on create
    • StreamClosedError - 409 with Stream-Closed header
    • SeqConflictError - 409 sequence conflict
    • StaleEpochError - 403 for producers
    • UnauthorizedError - 401
    • ForbiddenError - 403
    • RateLimitedError - 429

Advanced Features

Block Form

DurableStreams::Client.open(base_url: 'https://...') do |client|
  stream = client.stream('/events')
  stream.create(content_type: 'application/json')

  stream.append({event: 'test'})
end # auto-closes client

Dynamic Headers

client = DurableStreams::Client.new(
  base_url: 'https://...',
  headers: {
    'Authorization' => -> { "Bearer #{fetch_token}" },
    'X-Request-ID' => -> { SecureRandom.uuid }
  }
)

Enumerable Chaining

stream.read_json(live: :long_poll)
  .lazy
  .select { |item| item['type'] == 'order.created' }
  .map { |item| process_order(item) }
  .each { |result| save_result(result) }

Rails Integration

# config/initializers/durable_streams.rb
RAILS_DURABLE_STREAMS = DurableStreams::Client.new(
  base_url: ENV['STREAMS_URL'],
  headers: { 'Authorization' => "Bearer #{ENV['STREAMS_TOKEN']}" }
)

# In your model/service
class EventPublisher
  def self.publish(event)
    stream = RAILS_DURABLE_STREAMS.stream('/events')
    stream.append(event.to_json)
  end
end

Thread Safety

# Client is thread-safe
client = DurableStreams::Client.new(base_url: 'https://...')

threads = 10.times.map do |i|
  Thread.new do
    stream = client.stream("/stream-#{i}")
    stream.create(content_type: 'application/json')
    stream.append({index: i})
  end
end

threads.each(&:join)

Testing with RSpec

RSpec.describe EventService do
  let(:client) { DurableStreams::Client.new(base_url: test_server_url) }
  let(:stream) { client.stream('/test-stream') }

  before do
    stream.create(content_type: 'application/json')
  end

  it 'publishes events' do
    stream.append({event: 'test'})

    items = stream.read.first.items
    expect(items).to include(hash_including('event' => 'test'))
  end
end

Source Code

Source: packages/client-rb/lib/durable_streams/