Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Ruby Client
The official Ruby client for Durable Streams protocol with enumerable streaming.
Installation
Add to your Gemfile:
Or install directly:
gem install durable_streams
Requires Ruby 3.0 or later
Quick Start
require 'durable_streams'
# Create a client
client = DurableStreams::Client.new(base_url: 'https://streams.example.com')
stream = client.stream('/my-stream')
# Create the stream
stream.create(content_type: 'application/json')
# Append data
stream.append({message: 'hello'}.to_json)
# Read data
stream.read(offset: '-1', live: :long_poll).each do |batch|
batch.items.each do |item|
puts "Item: #{item}"
end
# Save checkpoint
save_offset(batch.offset)
end
Core APIs
Client
Create a new clientBase URL for relative paths
Default headers (values can be strings or callables)
Request timeout in seconds
Custom retry configuration
Returns: Client
Example:
client = DurableStreams::Client.new(
base_url: 'https://streams.example.com',
headers: {
'Authorization' => 'Bearer token123',
'X-Request-ID' => -> { SecureRandom.uuid }
},
timeout: 60
)
Get a stream handleReturns: Stream
Open a client with block form for automatic cleanupReturns: Block’s return value
Example:
DurableStreams::Client.open(base_url: 'https://...') do |client|
stream = client.stream('/events')
# ...
end # auto-closes
Stream
Handle to a durable stream for read/write operations.
Static Factory Methods
Create a new stream and return a handleContent type (e.g., "application/json")
Absolute expiry time (ISO8601)
Returns: StreamRaises: StreamExistsError if stream already exists
Connect to an existing stream (validates via HEAD)Returns: StreamRaises: StreamNotFoundError if stream doesn’t exist
Example:
stream = DurableStreams::Stream.create(
'https://streams.example.com/events',
content_type: 'application/json',
ttl_seconds: 86400 # 24 hours
)
Write Operations
Append data to the streamdata
String | Hash | Array
required
Data to append (for JSON streams, pass Hash/Array or pre-encoded string)
Writer coordination sequence
Returns: {next_offset: String}Raises: StreamClosedError, SeqConflictError, DurableStreamError
Example:
# JSON stream - pass hash (auto-encoded)
result = stream.append({message: 'hello'})
puts "Offset: #{result[:next_offset]}"
# Or pre-encode JSON
result = stream.append('{"message":"hello"}')
# Byte stream
result = stream.append('raw bytes')
# With sequence
result = stream.append({message: 'world'}, seq: 'seq-001')
Close the stream permanently (no more appends)Returns: {final_offset: String}Raises: StreamClosedError if called with data on already-closed stream
Delete this streamRaises: StreamNotFoundError if stream doesn’t exist
Read Operations
Read from the stream as an Enumerablelive
:off | :long_poll | :sse
default:":off"
Live mode
Stream cursor for CDN collapsing
Returns: Enumerable (yields batches)
Example:
stream.read(offset: last_offset, live: :long_poll).each do |batch|
# For JSON streams
if batch.respond_to?(:items)
batch.items.each do |item|
puts "Item: #{item.inspect}"
end
else
# For byte streams
puts "Got #{batch.data.bytesize} bytes"
end
# Save checkpoint
save_checkpoint(batch.offset)
end
Read JSON items as an Enumerablelive
:off | :long_poll | :sse
default:":off"
Live mode
Returns: Enumerable (yields individual items, not batches)
Example:
stream.read_json(live: :long_poll).each do |item|
puts "Event #{item['id']}: #{item['type']}"
process_event(item)
end
Get stream metadata via HEAD requestReturns: {content_type: String, offset: String, stream_closed: Boolean}Raises: StreamNotFoundError if stream doesn’t exist
Producer
Idempotent producer for exactly-once writes.
producer = DurableStreams::Producer.new(
stream: stream,
producer_id: 'order-service-1',
epoch: 0,
auto_claim: true,
max_batch_bytes: 1024 * 1024,
linger_ms: 5
)
# Fire-and-forget writes
producer.append({orderId: '123'}.to_json)
producer.append({orderId: '456'}.to_json)
# Ensure delivery
producer.flush
producer.close
Create an idempotent producerAuto-retry with epoch+1 on 403
Max wait before sending batch
Returns: Producer
Fire-and-forget append (returns immediately)Data to append (for JSON, pass pre-encoded JSON)
Send pending batch and wait for all in-flight batchesRaises: ProducerError on failure
Flush and close (no stream close)
Close the stream with producer headers (idempotent)Returns: {final_offset: String}
Types
Offset
# String type - use "-1" for start of stream
offset = "-1"
offset = "123_456"
LiveMode
# Symbols
:off # Catch-up only
:long_poll # Long-polling
:sse # Server-Sent Events
Error Handling
require 'durable_streams'
begin
stream.append(data)
rescue DurableStreams::StreamClosedError => e
puts "Stream is closed: #{e.url}"
rescue DurableStreams::SeqConflictError
puts "Sequence conflict"
rescue DurableStreams::StreamNotFoundError => e
puts "Stream not found: #{e.url}"
rescue DurableStreams::DurableStreamError => e
puts "Error #{e.status_code}: #{e.message}"
end
Error Hierarchy
DurableStreamError - Base exception
StreamNotFoundError - 404
StreamExistsError - 409 on create
StreamClosedError - 409 with Stream-Closed header
SeqConflictError - 409 sequence conflict
StaleEpochError - 403 for producers
UnauthorizedError - 401
ForbiddenError - 403
RateLimitedError - 429
Advanced Features
DurableStreams::Client.open(base_url: 'https://...') do |client|
stream = client.stream('/events')
stream.create(content_type: 'application/json')
stream.append({event: 'test'})
end # auto-closes client
client = DurableStreams::Client.new(
base_url: 'https://...',
headers: {
'Authorization' => -> { "Bearer #{fetch_token}" },
'X-Request-ID' => -> { SecureRandom.uuid }
}
)
Enumerable Chaining
stream.read_json(live: :long_poll)
.lazy
.select { |item| item['type'] == 'order.created' }
.map { |item| process_order(item) }
.each { |result| save_result(result) }
Rails Integration
# config/initializers/durable_streams.rb
RAILS_DURABLE_STREAMS = DurableStreams::Client.new(
base_url: ENV['STREAMS_URL'],
headers: { 'Authorization' => "Bearer #{ENV['STREAMS_TOKEN']}" }
)
# In your model/service
class EventPublisher
def self.publish(event)
stream = RAILS_DURABLE_STREAMS.stream('/events')
stream.append(event.to_json)
end
end
Thread Safety
# Client is thread-safe
client = DurableStreams::Client.new(base_url: 'https://...')
threads = 10.times.map do |i|
Thread.new do
stream = client.stream("/stream-#{i}")
stream.create(content_type: 'application/json')
stream.append({index: i})
end
end
threads.each(&:join)
Testing with RSpec
RSpec.describe EventService do
let(:client) { DurableStreams::Client.new(base_url: test_server_url) }
let(:stream) { client.stream('/test-stream') }
before do
stream.create(content_type: 'application/json')
end
it 'publishes events' do
stream.append({event: 'test'})
items = stream.read.first.items
expect(items).to include(hash_including('event' => 'test'))
end
end
Source Code
Source: packages/client-rb/lib/durable_streams/