Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Rust Client
The official Rust client for Durable Streams protocol with async/await support.
Installation
Add to your Cargo.toml:
[dependencies]
durable-streams = "0.1"
tokio = { version = "1", features = ["full"] }
Quick Start
use durable_streams::{Client, Offset};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let client = Client::new();
let stream = client.stream("https://streams.example.com/my-stream");
// Create stream
stream.create().await?;
// Append data
stream.append(b"{\"message\":\"hello\"}").await?;
// Read with iterator
let mut reader = stream.read()
.offset(Offset::Beginning)
.live_mode(durable_streams::LiveMode::LongPoll)
.build()?;
while let Some(chunk) = reader.next_chunk().await? {
println!("Got {} bytes", chunk.data.len());
}
Ok(())
}
Core APIs
Client
Create a new client with default settingsReturns: Client
Create a builder for custom configurationReturns: ClientBuilder
Example:
use durable_streams::Client;
use std::time::Duration;
let client = Client::builder()
.base_url("https://streams.example.com")
.timeout(Duration::from_secs(30))
.header("Authorization", "Bearer token")
.build()?;
Get a handle to a stream (no network I/O)Stream URL (absolute or relative to base_url)
Returns: DurableStream
DurableStream
Handle to a durable stream for read/write operations.
Write Operations
Create a new stream with default settingsReturns: Result<(), StreamError>Errors: StreamError::AlreadyExists if stream already exists
Create a stream with optionsContent type (default: "application/octet-stream")
Returns: Result<(), StreamError>
Example:
use durable_streams::CreateOptions;
use std::time::Duration;
stream.create_with_options(
CreateOptions::new()
.content_type("application/json")
.ttl(Duration::from_secs(86400))
).await?;
Append data to the streamData to append (for JSON, serialize first)
Returns: Result<AppendResponse, StreamError> with next_offsetErrors: StreamError::NotFound, StreamError::Closed, etc.
Example:
// JSON stream - serialize first
use serde_json::json;
let message = json!({"message": "hello"});
let json = serde_json::to_vec(&message)?;
let response = stream.append(&json).await?;
println!("Offset: {:?}", response.next_offset);
// Byte stream
stream.append(b"raw bytes").await?;
Append with optionsWriter coordination sequence
Returns: Result<AppendResponse, StreamError>
Close the stream permanently (no more appends)Returns: Result<CloseResponse, StreamError> with final_offsetErrors: StreamError::Closed if already closed with data
Close with optional final messageReturns: Result<CloseResponse, StreamError>
Delete the streamReturns: Result<(), StreamError>Errors: StreamError::NotFound if stream doesn’t exist
Read Operations
Create a builder for reading the streamReturns: ReadBuilder
Example:
use durable_streams::{Offset, LiveMode};
let mut reader = stream.read()
.offset(last_offset)
.live_mode(LiveMode::LongPoll)
.build()?;
while let Some(chunk) = reader.next_chunk().await? {
println!("Got {} bytes at offset {:?}", chunk.data.len(), chunk.offset);
// Save checkpoint
save_offset(&chunk.offset).await?;
}
Get stream metadata via HEAD requestReturns: Result<HeadResponse, StreamError> with content_type, offset, closed
ChunkIterator
Asynchronous iterator for reading stream chunks.
Get the next chunkReturns: Result<Option<Chunk>, StreamError>Returns Ok(None) when iteration is complete.Whether we’ve reached the current end
Stream cursor for CDN collapsing
Producer
Idempotent producer for exactly-once writes with automatic batching.
use durable_streams::{Producer, ProducerBuilder};
let producer = Producer::builder()
.url("https://streams.example.com/orders")
.producer_id("order-service-1")
.epoch(0)
.auto_claim(true)
.max_batch_bytes(1024 * 1024)
.build()?;
// Fire-and-forget writes
producer.append(b"{\"orderId\":\"123\"}").await?;
producer.append(b"{\"orderId\":\"456\"}").await?;
// Ensure delivery
producer.flush().await?;
producer.close().await?;
Create a producer builderReturns: ProducerBuilderAuto-retry with epoch+1 on 403
Max wait before sending batch
Append data (fire-and-forget after buffering)Returns: Result<(), ProducerError>
Send pending batch and wait for all in-flight batchesReturns: Result<(), ProducerError>
Flush and close the stream (EOF)Returns: Result<CloseResponse, ProducerError>
Close with optional final message (idempotent)Returns: Result<CloseResponse, ProducerError>
Types
Offset
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Offset(String);
impl Offset {
pub const fn beginning() -> Self {
Self(String::from("-1"))
}
pub fn new(value: impl Into<String>) -> Self {
Self(value.into())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
LiveMode
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LiveMode {
Off, // Catch-up only
LongPoll, // Long-polling
Sse, // Server-Sent Events
}
AppendResponse
pub struct AppendResponse {
pub next_offset: Offset,
pub etag: Option<String>,
}
HeadResponse
pub struct HeadResponse {
pub content_type: Option<String>,
pub offset: Offset,
pub closed: bool,
pub ttl: Option<Duration>,
}
CloseResponse
pub struct CloseResponse {
pub final_offset: Offset,
}
Error Handling
use durable_streams::StreamError;
match stream.append(data).await {
Ok(response) => println!("Appended at {:?}", response.next_offset),
Err(StreamError::NotFound) => eprintln!("Stream not found"),
Err(StreamError::Closed) => eprintln!("Stream is closed"),
Err(StreamError::Conflict) => eprintln!("Conflict"),
Err(e) => eprintln!("Error: {}", e),
}
StreamError Variants
pub enum StreamError {
NotFound,
AlreadyExists,
Closed,
Conflict,
Unauthorized,
Forbidden,
RateLimited,
ServerError(u16),
Network(reqwest::Error),
InvalidHeader(InvalidHeaderError),
}
Advanced Features
Custom HTTP Client
use reqwest::Client as HttpClient;
use std::time::Duration;
let http_client = HttpClient::builder()
.timeout(Duration::from_secs(30))
.pool_max_idle_per_host(10)
.build()?;
let client = Client::builder()
.http_client(http_client)
.build()?;
Serde Integration
use serde::{Deserialize, Serialize};
use serde_json;
#[derive(Debug, Serialize, Deserialize)]
struct Event {
id: String,
event_type: String,
data: serde_json::Value,
}
// Serialize and append
let event = Event {
id: "123".to_string(),
event_type: "order.created".to_string(),
data: serde_json::json!({"orderId": "456"}),
};
let json = serde_json::to_vec(&event)?;
stream.append(&json).await?;
// Read and deserialize
let mut reader = stream.read().build()?;
while let Some(chunk) = reader.next_chunk().await? {
// Parse JSON array
let events: Vec<Event> = serde_json::from_slice(&chunk.data)?;
for event in events {
println!("Event {}: {}", event.id, event.event_type);
}
}
Tokio Streams
use futures::StreamExt;
let mut reader = stream.read().build()?;
while let Some(result) = reader.next_chunk().await.transpose() {
match result {
Ok(chunk) => {
// Process chunk
println!("Got {} bytes", chunk.data.len());
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}
Error Context with anyhow
use anyhow::Context;
stream.append(data).await
.context("Failed to append to stream")?;
Source Code
Source: packages/client-rust/src/