Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Elixir Client

The official Elixir client for Durable Streams protocol with Stream-based consumption.

Installation

Add to your mix.exs:
defp deps do
  [
    {:durable_streams, "~> 0.1.0"}
  ]
end

Quick Start

alias DurableStreams.Client
alias DurableStreams.Stream, as: DSStream

# Create a client
client = Client.new("https://streams.example.com")

# Get a stream handle
stream = Client.stream(client, "/my-stream")

# Create the stream
{:ok, _} = DSStream.create(stream, content_type: "application/json")

# Append data
{:ok, result} = DSStream.append(stream, ~s({"message":"hello"}))
IO.puts("Offset: #{result.next_offset}")

# Read data
{:ok, consumer} = DSStream.consumer(stream, offset: "-1", live: :long_poll)

for {:ok, batch} <- consumer do
  IO.inspect(batch.items, label: "Items")
  # Save checkpoint
  save_offset(batch.offset)
end

Core APIs

Client

Client.new/2
function
Create a new client
base_url
String.t()
required
Base URL for the streams server
opts
keyword()
:headers
map()
Default headers
:timeout
pos_integer()
default:"30000"
Request timeout in milliseconds
Returns: Client.t()
Example:
client = Client.new("https://streams.example.com",
  headers: %{"Authorization" => "Bearer #{token}"},
  timeout: 60_000
)
Client.stream/2
function
Get a stream handle
client
Client.t()
required
Client instance
path
String.t()
required
Stream path (e.g., “/events”)
Returns: DurableStreams.Stream.t()

DurableStreams.Stream

Handle to a durable stream for read/write operations.

Write Operations

Stream.create/2
function
Create a new stream
stream
Stream.t()
required
Stream handle
opts
keyword()
:content_type
String.t()
Content type (e.g., "application/json")
:ttl_seconds
integer()
Time-to-live in seconds
:expires_at
String.t()
Absolute expiry time (ISO8601)
:closed
boolean()
default:"false"
Create in closed state
Returns: {:ok, Stream.t()} | {:error, term()}
Stream.append/3
function
Append data to the stream
stream
Stream.t()
required
Stream handle
data
binary() | map() | list()
required
Data to append (for JSON streams, pass map/list or pre-encoded string)
opts
keyword()
:seq
String.t()
Writer coordination sequence
Returns: {:ok, %{next_offset: String.t()}} | {:error, term()}
Example:
# JSON stream - pass map (auto-encoded)
{:ok, result} = Stream.append(stream, %{message: "hello"})
IO.puts("Next offset: #{result.next_offset}")

# Or pre-encode JSON
{:ok, result} = Stream.append(stream, ~s({"message":"hello"}))

# Byte stream
{:ok, result} = Stream.append(stream, "raw bytes")
Stream.close/2
function
Close the stream permanently (no more appends)
stream
Stream.t()
required
Stream handle
opts
keyword()
:data
binary()
Optional final message
Returns: {:ok, %{final_offset: String.t()}} | {:error, term()}

Read Operations

Stream.consumer/2
function
Create a consumer for reading the stream
stream
Stream.t()
required
Stream handle
opts
keyword()
:offset
String.t()
default:"\"-1\""
Starting offset
:live
:off | :long_poll | :sse
default:":off"
Live mode
:cursor
String.t()
Stream cursor for CDN collapsing
Returns: {:ok, Stream.t()} | {:error, term()}
Example:
{:ok, consumer} = Stream.consumer(stream, offset: last_offset, live: :long_poll)

for {:ok, batch} <- consumer do
  case batch do
    %{items: items, offset: offset} when is_list(items) ->
      # JSON batch
      Enum.each(items, &process_item/1)
      save_checkpoint(offset)

    %{data: data, offset: offset} ->
      # Byte chunk
      process_bytes(data)
      save_checkpoint(offset)
  end
end
Stream.head/1
function
Get stream metadata via HEAD request
stream
Stream.t()
required
Stream handle
Returns: {:ok, %{content_type: String.t(), offset: String.t(), stream_closed: boolean()}} | {:error, term()}
Stream.delete/1
function
Delete the stream
stream
Stream.t()
required
Stream handle
Returns: :ok | {:error, term()}

DurableStreams.Writer

Idempotent producer for exactly-once writes.
alias DurableStreams.Writer

{:ok, writer} = Writer.start_link(
  stream: stream,
  producer_id: "order-service-1",
  epoch: 0,
  auto_claim: true,
  max_batch_bytes: 1_048_576,
  linger_ms: 5
)

# Fire-and-forget writes
Writer.append(writer, ~s({"orderId":"123"}))
Writer.append(writer, ~s({"orderId":"456"}))

# Ensure delivery
:ok = Writer.flush(writer)
Start an idempotent producer
opts
keyword()
required
:stream
Stream.t()
required
Stream handle
:producer_id
String.t()
required
Stable identifier
:epoch
non_neg_integer()
default:"0"
Starting epoch
:auto_claim
boolean()
default:"false"
Auto-retry with epoch+1 on 403
:max_batch_bytes
pos_integer()
default:"1048576"
Max batch size (1MB)
:linger_ms
non_neg_integer()
default:"5"
Max wait before sending batch
Returns: {:ok, pid()} | {:error, term()}
Writer.append/2
function
Fire-and-forget append
writer
pid()
required
Writer process
data
binary()
required
Data to append
Returns: :ok
Writer.flush/1
function
Send pending batch and wait for all in-flight batches
writer
pid()
required
Writer process
Returns: :ok | {:error, term()}

Types

Offset

@type offset :: String.t()
# "-1" for start of stream
# "123_456" for specific position

LiveMode

@type live_mode :: :off | :long_poll | :sse
# :off - catch-up only
# :long_poll - long-polling for live updates
# :sse - Server-Sent Events for live updates

Error Handling

case Stream.append(stream, data) do
  {:ok, result} ->
    IO.puts("Appended at offset #{result.next_offset}")

  {:error, :stream_not_found} ->
    IO.puts("Stream doesn't exist")

  {:error, :stream_closed} ->
    IO.puts("Stream is closed")

  {:error, :seq_conflict} ->
    IO.puts("Sequence conflict")

  {:error, reason} ->
    IO.inspect(reason, label: "Error")
end

Error Atoms

  • :stream_not_found - Stream doesn’t exist (404)
  • :stream_exists - Stream already exists (409 on create)
  • :stream_closed - Stream is closed (409 with Stream-Closed header)
  • :seq_conflict - Sequence conflict (409)
  • :unauthorized - Auth required (401)
  • :forbidden - Access denied (403)

Advanced Features

Supervision

children = [
  {DurableStreams.Writer, [
    stream: stream,
    producer_id: "my-producer",
    epoch: 0,
    name: MyApp.Writer
  ]}
]

Supervisor.start_link(children, strategy: :one_for_one)

# Use named writer
Writer.append(MyApp.Writer, data)

JSON Decoding

{:ok, consumer} = Stream.consumer(stream, offset: "-1")

for {:ok, batch} <- consumer do
  items = Jason.decode!(batch.items)
  Enum.each(items, fn item ->
    IO.inspect(item)
  end)
end

Stream Processing with Flow

alias Experimental.Flow

{:ok, consumer} = Stream.consumer(stream, live: :long_poll)

consumer
|> Flow.from_enumerable()
|> Flow.flat_map(fn {:ok, batch} -> batch.items end)
|> Flow.map(&process_item/1)
|> Flow.partition()
|> Flow.reduce(fn -> [] end, fn item, acc -> [item | acc] end)
|> Enum.to_list()

Source Code

Source: packages/client-elixir/lib/durable_streams/