Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Rust Client

The official Rust client for Durable Streams protocol with async/await support.

Installation

Add to your Cargo.toml:
[dependencies]
durable-streams = "0.1"
tokio = { version = "1", features = ["full"] }

Quick Start

use durable_streams::{Client, Offset};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create client
    let client = Client::new();
    let stream = client.stream("https://streams.example.com/my-stream");

    // Create stream
    stream.create().await?;

    // Append data
    stream.append(b"{\"message\":\"hello\"}").await?;

    // Read with iterator
    let mut reader = stream.read()
        .offset(Offset::Beginning)
        .live_mode(durable_streams::LiveMode::LongPoll)
        .build()?;

    while let Some(chunk) = reader.next_chunk().await? {
        println!("Got {} bytes", chunk.data.len());
    }

    Ok(())
}

Core APIs

Client

Client::new()
function
Create a new client with default settingsReturns: Client
Client::builder()
function
Create a builder for custom configurationReturns: ClientBuilder
Example:
use durable_streams::Client;
use std::time::Duration;

let client = Client::builder()
    .base_url("https://streams.example.com")
    .timeout(Duration::from_secs(30))
    .header("Authorization", "Bearer token")
    .build()?;
stream()
method
Get a handle to a stream (no network I/O)
url
&str
required
Stream URL (absolute or relative to base_url)
Returns: DurableStream

DurableStream

Handle to a durable stream for read/write operations.

Write Operations

create()
async method
Create a new stream with default settingsReturns: Result<(), StreamError>Errors: StreamError::AlreadyExists if stream already exists
create_with_options()
async method
Create a stream with options
options
CreateOptions
required
content_type
&str
Content type (default: "application/octet-stream")
ttl
Duration
Time-to-live
expires_at
DateTime<Utc>
Absolute expiry time
closed
bool
default:"false"
Create in closed state
Returns: Result<(), StreamError>
Example:
use durable_streams::CreateOptions;
use std::time::Duration;

stream.create_with_options(
    CreateOptions::new()
        .content_type("application/json")
        .ttl(Duration::from_secs(86400))
).await?;
append()
async method
Append data to the stream
data
&[u8]
required
Data to append (for JSON, serialize first)
Returns: Result<AppendResponse, StreamError> with next_offsetErrors: StreamError::NotFound, StreamError::Closed, etc.
Example:
// JSON stream - serialize first
use serde_json::json;

let message = json!({"message": "hello"});
let json = serde_json::to_vec(&message)?;
let response = stream.append(&json).await?;
println!("Offset: {:?}", response.next_offset);

// Byte stream
stream.append(b"raw bytes").await?;
append_with_options()
async method
Append with options
data
&[u8]
required
Data to append
options
AppendOptions
seq
&str
Writer coordination sequence
Returns: Result<AppendResponse, StreamError>
close()
async method
Close the stream permanently (no more appends)Returns: Result<CloseResponse, StreamError> with final_offsetErrors: StreamError::Closed if already closed with data
close_with_data()
async method
Close with optional final message
data
&[u8]
Final message
Returns: Result<CloseResponse, StreamError>
delete()
async method
Delete the streamReturns: Result<(), StreamError>Errors: StreamError::NotFound if stream doesn’t exist

Read Operations

read()
method
Create a builder for reading the streamReturns: ReadBuilder
Example:
use durable_streams::{Offset, LiveMode};

let mut reader = stream.read()
    .offset(last_offset)
    .live_mode(LiveMode::LongPoll)
    .build()?;

while let Some(chunk) = reader.next_chunk().await? {
    println!("Got {} bytes at offset {:?}", chunk.data.len(), chunk.offset);
    // Save checkpoint
    save_offset(&chunk.offset).await?;
}
head()
async method
Get stream metadata via HEAD requestReturns: Result<HeadResponse, StreamError> with content_type, offset, closed

ChunkIterator

Asynchronous iterator for reading stream chunks.
next_chunk()
async method
Get the next chunkReturns: Result<Option<Chunk>, StreamError>Returns Ok(None) when iteration is complete.
Chunk
struct
data
Vec<u8>
Chunk data
offset
Offset
Offset after this chunk
up_to_date
bool
Whether we’ve reached the current end
cursor
Option<String>
Stream cursor for CDN collapsing

Producer

Idempotent producer for exactly-once writes with automatic batching.
use durable_streams::{Producer, ProducerBuilder};

let producer = Producer::builder()
    .url("https://streams.example.com/orders")
    .producer_id("order-service-1")
    .epoch(0)
    .auto_claim(true)
    .max_batch_bytes(1024 * 1024)
    .build()?;

// Fire-and-forget writes
producer.append(b"{\"orderId\":\"123\"}").await?;
producer.append(b"{\"orderId\":\"456\"}").await?;

// Ensure delivery
producer.flush().await?;
producer.close().await?;
Producer::builder()
function
Create a producer builderReturns: ProducerBuilder
url
&str
required
Stream URL
producer_id
&str
required
Stable identifier
epoch
u64
default:"0"
Starting epoch
auto_claim
bool
default:"false"
Auto-retry with epoch+1 on 403
max_batch_bytes
usize
default:"1048576"
Max batch size (1MB)
linger_ms
u64
default:"5"
Max wait before sending batch
max_in_flight
usize
default:"5"
Max concurrent batches
append()
async method
Append data (fire-and-forget after buffering)
data
&[u8]
required
Data to append
Returns: Result<(), ProducerError>
flush()
async method
Send pending batch and wait for all in-flight batchesReturns: Result<(), ProducerError>
close()
async method
Flush and close the stream (EOF)Returns: Result<CloseResponse, ProducerError>
close_with_data()
async method
Close with optional final message (idempotent)
data
&[u8]
Final message
Returns: Result<CloseResponse, ProducerError>

Types

Offset

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Offset(String);

impl Offset {
    pub const fn beginning() -> Self {
        Self(String::from("-1"))
    }

    pub fn new(value: impl Into<String>) -> Self {
        Self(value.into())
    }

    pub fn as_str(&self) -> &str {
        &self.0
    }
}

LiveMode

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LiveMode {
    Off,       // Catch-up only
    LongPoll,  // Long-polling
    Sse,       // Server-Sent Events
}

AppendResponse

pub struct AppendResponse {
    pub next_offset: Offset,
    pub etag: Option<String>,
}

HeadResponse

pub struct HeadResponse {
    pub content_type: Option<String>,
    pub offset: Offset,
    pub closed: bool,
    pub ttl: Option<Duration>,
}

CloseResponse

pub struct CloseResponse {
    pub final_offset: Offset,
}

Error Handling

use durable_streams::StreamError;

match stream.append(data).await {
    Ok(response) => println!("Appended at {:?}", response.next_offset),
    Err(StreamError::NotFound) => eprintln!("Stream not found"),
    Err(StreamError::Closed) => eprintln!("Stream is closed"),
    Err(StreamError::Conflict) => eprintln!("Conflict"),
    Err(e) => eprintln!("Error: {}", e),
}

StreamError Variants

pub enum StreamError {
    NotFound,
    AlreadyExists,
    Closed,
    Conflict,
    Unauthorized,
    Forbidden,
    RateLimited,
    ServerError(u16),
    Network(reqwest::Error),
    InvalidHeader(InvalidHeaderError),
}

Advanced Features

Custom HTTP Client

use reqwest::Client as HttpClient;
use std::time::Duration;

let http_client = HttpClient::builder()
    .timeout(Duration::from_secs(30))
    .pool_max_idle_per_host(10)
    .build()?;

let client = Client::builder()
    .http_client(http_client)
    .build()?;

Serde Integration

use serde::{Deserialize, Serialize};
use serde_json;

#[derive(Debug, Serialize, Deserialize)]
struct Event {
    id: String,
    event_type: String,
    data: serde_json::Value,
}

// Serialize and append
let event = Event {
    id: "123".to_string(),
    event_type: "order.created".to_string(),
    data: serde_json::json!({"orderId": "456"}),
};

let json = serde_json::to_vec(&event)?;
stream.append(&json).await?;

// Read and deserialize
let mut reader = stream.read().build()?;
while let Some(chunk) = reader.next_chunk().await? {
    // Parse JSON array
    let events: Vec<Event> = serde_json::from_slice(&chunk.data)?;
    for event in events {
        println!("Event {}: {}", event.id, event.event_type);
    }
}

Tokio Streams

use futures::StreamExt;

let mut reader = stream.read().build()?;

while let Some(result) = reader.next_chunk().await.transpose() {
    match result {
        Ok(chunk) => {
            // Process chunk
            println!("Got {} bytes", chunk.data.len());
        }
        Err(e) => {
            eprintln!("Error: {}", e);
            break;
        }
    }
}

Error Context with anyhow

use anyhow::Context;

stream.append(data).await
    .context("Failed to append to stream")?;

Source Code

Source: packages/client-rust/src/