Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Elixir Client
The official Elixir client for Durable Streams protocol with Stream-based consumption.
Installation
Add to your mix.exs:
defp deps do
[
{:durable_streams, "~> 0.1.0"}
]
end
Quick Start
alias DurableStreams.Client
alias DurableStreams.Stream, as: DSStream
# Create a client
client = Client.new("https://streams.example.com")
# Get a stream handle
stream = Client.stream(client, "/my-stream")
# Create the stream
{:ok, _} = DSStream.create(stream, content_type: "application/json")
# Append data
{:ok, result} = DSStream.append(stream, ~s({"message":"hello"}))
IO.puts("Offset: #{result.next_offset}")
# Read data
{:ok, consumer} = DSStream.consumer(stream, offset: "-1", live: :long_poll)
for {:ok, batch} <- consumer do
IO.inspect(batch.items, label: "Items")
# Save checkpoint
save_offset(batch.offset)
end
Core APIs
Client
Create a new clientBase URL for the streams server
:timeout
pos_integer()
default:"30000"
Request timeout in milliseconds
Returns: Client.t()
Example:
client = Client.new("https://streams.example.com",
headers: %{"Authorization" => "Bearer #{token}"},
timeout: 60_000
)
Get a stream handleStream path (e.g., “/events”)
Returns: DurableStreams.Stream.t()
DurableStreams.Stream
Handle to a durable stream for read/write operations.
Write Operations
Create a new streamContent type (e.g., "application/json")
Absolute expiry time (ISO8601)
Returns: {:ok, Stream.t()} | {:error, term()}
Append data to the streamdata
binary() | map() | list()
required
Data to append (for JSON streams, pass map/list or pre-encoded string)
Writer coordination sequence
Returns: {:ok, %{next_offset: String.t()}} | {:error, term()}
Example:
# JSON stream - pass map (auto-encoded)
{:ok, result} = Stream.append(stream, %{message: "hello"})
IO.puts("Next offset: #{result.next_offset}")
# Or pre-encode JSON
{:ok, result} = Stream.append(stream, ~s({"message":"hello"}))
# Byte stream
{:ok, result} = Stream.append(stream, "raw bytes")
Close the stream permanently (no more appends)Returns: {:ok, %{final_offset: String.t()}} | {:error, term()}
Read Operations
Create a consumer for reading the stream:offset
String.t()
default:"\"-1\""
Starting offset
:live
:off | :long_poll | :sse
default:":off"
Live mode
Stream cursor for CDN collapsing
Returns: {:ok, Stream.t()} | {:error, term()}
Example:
{:ok, consumer} = Stream.consumer(stream, offset: last_offset, live: :long_poll)
for {:ok, batch} <- consumer do
case batch do
%{items: items, offset: offset} when is_list(items) ->
# JSON batch
Enum.each(items, &process_item/1)
save_checkpoint(offset)
%{data: data, offset: offset} ->
# Byte chunk
process_bytes(data)
save_checkpoint(offset)
end
end
Get stream metadata via HEAD requestReturns: {:ok, %{content_type: String.t(), offset: String.t(), stream_closed: boolean()}} | {:error, term()}
Delete the streamReturns: :ok | {:error, term()}
DurableStreams.Writer
Idempotent producer for exactly-once writes.
alias DurableStreams.Writer
{:ok, writer} = Writer.start_link(
stream: stream,
producer_id: "order-service-1",
epoch: 0,
auto_claim: true,
max_batch_bytes: 1_048_576,
linger_ms: 5
)
# Fire-and-forget writes
Writer.append(writer, ~s({"orderId":"123"}))
Writer.append(writer, ~s({"orderId":"456"}))
# Ensure delivery
:ok = Writer.flush(writer)
Start an idempotent producer:epoch
non_neg_integer()
default:"0"
Starting epoch
Auto-retry with epoch+1 on 403
:max_batch_bytes
pos_integer()
default:"1048576"
Max batch size (1MB)
:linger_ms
non_neg_integer()
default:"5"
Max wait before sending batch
Returns: {:ok, pid()} | {:error, term()}
Fire-and-forget appendReturns: :ok
Send pending batch and wait for all in-flight batchesReturns: :ok | {:error, term()}
Types
Offset
@type offset :: String.t()
# "-1" for start of stream
# "123_456" for specific position
LiveMode
@type live_mode :: :off | :long_poll | :sse
# :off - catch-up only
# :long_poll - long-polling for live updates
# :sse - Server-Sent Events for live updates
Error Handling
case Stream.append(stream, data) do
{:ok, result} ->
IO.puts("Appended at offset #{result.next_offset}")
{:error, :stream_not_found} ->
IO.puts("Stream doesn't exist")
{:error, :stream_closed} ->
IO.puts("Stream is closed")
{:error, :seq_conflict} ->
IO.puts("Sequence conflict")
{:error, reason} ->
IO.inspect(reason, label: "Error")
end
Error Atoms
:stream_not_found - Stream doesn’t exist (404)
:stream_exists - Stream already exists (409 on create)
:stream_closed - Stream is closed (409 with Stream-Closed header)
:seq_conflict - Sequence conflict (409)
:unauthorized - Auth required (401)
:forbidden - Access denied (403)
Advanced Features
Supervision
children = [
{DurableStreams.Writer, [
stream: stream,
producer_id: "my-producer",
epoch: 0,
name: MyApp.Writer
]}
]
Supervisor.start_link(children, strategy: :one_for_one)
# Use named writer
Writer.append(MyApp.Writer, data)
JSON Decoding
{:ok, consumer} = Stream.consumer(stream, offset: "-1")
for {:ok, batch} <- consumer do
items = Jason.decode!(batch.items)
Enum.each(items, fn item ->
IO.inspect(item)
end)
end
Stream Processing with Flow
alias Experimental.Flow
{:ok, consumer} = Stream.consumer(stream, live: :long_poll)
consumer
|> Flow.from_enumerable()
|> Flow.flat_map(fn {:ok, batch} -> batch.items end)
|> Flow.map(&process_item/1)
|> Flow.partition()
|> Flow.reduce(fn -> [] end, fn item, acc -> [item | acc] end)
|> Enum.to_list()
Source Code
Source: packages/client-elixir/lib/durable_streams/