Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

.NET Client

The official .NET client for Durable Streams protocol with async/await support.

Installation

dotnet add package DurableStreams
Requires .NET 6.0 or later

Quick Start

using DurableStreams;

// Create a client (singleton pattern recommended)
var client = new DurableStreamClient();

// Get a stream handle
var stream = client.GetStream("https://streams.example.com/my-stream");

// Create the stream
await stream.CreateAsync(new CreateStreamOptions
{
    ContentType = "application/json"
});

// Append data
var json = JsonSerializer.Serialize(new { message = "hello" });
await stream.AppendAsync(Encoding.UTF8.GetBytes(json));

// Read data
await foreach (var batch in stream.ReadJsonAsync<Event>())
{
    foreach (var item in batch.Items)
    {
        Console.WriteLine($"Event: {item.Id}");
    }
    // Save checkpoint
    await SaveOffsetAsync(batch.Offset);
}

Core APIs

DurableStreamClient

The main client class (thread-safe, designed for singleton use).
new DurableStreamClient()
constructor
Create a client with default options
new DurableStreamClient(options)
constructor
Create a client with custom options
options
DurableStreamClientOptions
BaseUrl
string
Base URL for relative stream paths
Timeout
TimeSpan?
Default request timeout
DefaultHeaders
Dictionary<string, string>
Static headers for all requests
DynamicHeaders
Dictionary<string, Func<CancellationToken, Task<string>>>
Dynamic headers (evaluated per-request)
Example:
var client = new DurableStreamClient(new DurableStreamClientOptions
{
    BaseUrl = "https://streams.example.com",
    Timeout = TimeSpan.FromSeconds(30),
    DefaultHeaders = new Dictionary<string, string>
    {
        ["Authorization"] = "Bearer token123"
    },
    DynamicHeaders = new Dictionary<string, Func<CancellationToken, Task<string>>>
    {
        ["X-Request-ID"] = async ct => Guid.NewGuid().ToString()
    }
});
GetStream()
method
Get a handle to a stream (no network I/O)
url
string
required
Stream URL (absolute or relative to BaseUrl)
Returns: DurableStream
CreateStreamAsync()
method
Create a new stream and return a handle
url
string
required
Stream URL
options
CreateStreamOptions?
Creation options
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<DurableStream>
ConnectAsync()
method
Connect to an existing stream (validates via HEAD)
url
string
required
Stream URL
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<DurableStream>

DurableStream

Handle to a durable stream for read/write operations.

Write Operations

CreateAsync()
method
Create this stream on the server
options
CreateStreamOptions?
ContentType
string
Content type (e.g., "application/json")
TtlSeconds
int?
Time-to-live in seconds
ExpiresAt
DateTimeOffset?
Absolute expiry time
Closed
bool
default:"false"
Create in closed state
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task
AppendAsync()
method
Append data to the stream
data
byte[]
required
Data to append (for JSON streams, serialize first)
options
AppendOptions?
Seq
string
Writer coordination sequence
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<AppendResult> with NextOffset
Example:
// JSON stream
var json = JsonSerializer.Serialize(new { message = "hello" });
var result = await stream.AppendAsync(Encoding.UTF8.GetBytes(json));
Console.WriteLine($"Offset: {result.NextOffset}");

// Byte stream
var result = await stream.AppendAsync(Encoding.UTF8.GetBytes("raw text"));
CloseAsync()
method
Close the stream permanently (no more appends)
options
CloseOptions?
Data
byte[]
Optional final message
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<CloseResult> with FinalOffset
DeleteAsync()
method
Delete the stream
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task

Read Operations

ReadJsonAsync<T>()
method
Read JSON batches as an async enumerable
options
ReadOptions?
Offset
Offset?
default:"Offset.Start"
Starting offset
LiveMode
LiveMode
default:"LiveMode.Auto"
Live mode: Off, Auto, LongPoll, Sse
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: IAsyncEnumerable<JsonBatch<T>>
Example:
public record Event(string Id, string Type, object Data);

await foreach (var batch in stream.ReadJsonAsync<Event>(new ReadOptions
{
    Offset = lastOffset,
    LiveMode = LiveMode.LongPoll
}))
{
    foreach (var evt in batch.Items)
    {
        Console.WriteLine($"Event {evt.Id}: {evt.Type}");
    }

    // Save checkpoint
    await db.SaveOffsetAsync(batch.Offset);
}
ReadBytesAsync()
method
Read raw byte chunks as an async enumerable
options
ReadOptions?
Read options
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: IAsyncEnumerable<ByteChunk>
HeadAsync()
method
Get stream metadata via HEAD request
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<StreamCheckpoint> with ContentType, Offset, StreamClosed

IdempotentProducer

Exactly-once writes with automatic batching and pipelining.
var producer = new IdempotentProducer(stream, "order-service-1", new IdempotentProducerOptions
{
    Epoch = 0,
    AutoClaim = true,
    MaxBatchBytes = 1024 * 1024,
    LingerMs = 5,
    MaxInFlight = 5
});

// Fire-and-forget writes
producer.Append(Encoding.UTF8.GetBytes("{\"orderId\":\"123\"}"));
producer.Append(Encoding.UTF8.GetBytes("{\"orderId\":\"456\"}"));

// Ensure delivery
await producer.FlushAsync();
await producer.CloseAsync();
new IdempotentProducer()
constructor
Create an idempotent producer
stream
DurableStream
required
Target stream
producerId
string
required
Stable identifier
options
IdempotentProducerOptions
Epoch
int
default:"0"
Starting epoch
AutoClaim
bool
default:"false"
Auto-retry with epoch+1 on 403
MaxBatchBytes
int
default:"1048576"
Max batch size (1MB)
LingerMs
int
default:"5"
Max wait before sending batch
MaxInFlight
int
default:"5"
Max concurrent batches
OnError
Action<Exception>
Error callback
Append()
method
Fire-and-forget append (synchronous, returns immediately)
data
byte[]
required
Data to append
FlushAsync()
method
Send pending batch and wait for all in-flight batchesReturns: Task
CloseAsync()
method
Close the stream with producer headers (idempotent)
finalMessage
byte[]?
Optional final message
Returns: Task<CloseResult>

Types

Offset

public record Offset(string Value)
{
    public static Offset Start { get; } = new Offset("-1");
    public bool IsStart => Value == "-1";
}

LiveMode

public enum LiveMode
{
    Off,       // Catch-up only
    Auto,      // Auto-select (SSE for JSON, long-poll for binary)
    LongPoll,  // Explicit long-polling
    Sse        // Explicit Server-Sent Events
}

JsonBatch<T>

public record JsonBatch<T>
{
    public IReadOnlyList<T> Items { get; init; }
    public Offset Offset { get; init; }
    public bool UpToDate { get; init; }
    public string? Cursor { get; init; }
    public bool StreamClosed { get; init; }
}

Error Handling

using DurableStreams.Exceptions;

try
{
    await stream.AppendAsync(data);
}
catch (StreamNotFoundException ex)
{
    Console.WriteLine($"Stream not found: {ex.Url}");
}
catch (StreamClosedException ex)
{
    Console.WriteLine($"Stream closed: {ex.Url}");
}
catch (DurableStreamException ex)
{
    Console.WriteLine($"Error {ex.StatusCode}: {ex.Message}");
}

Exception Hierarchy

  • DurableStreamException - Base exception
    • StreamNotFoundException - 404
    • StreamExistsException - 409 on create
    • StreamClosedException - 409 with Stream-Closed header
    • SequenceConflictException - 409 sequence conflict
    • StaleEpochException - 403 for producers

Advanced Features

Dependency Injection

// In Startup.cs or Program.cs
services.AddSingleton(new DurableStreamClient(new DurableStreamClientOptions
{
    BaseUrl = "https://streams.example.com"
}));

// In your service
public class EventService
{
    private readonly DurableStreamClient _client;

    public EventService(DurableStreamClient client)
    {
        _client = client;
    }

    public async Task PublishAsync(Event evt)
    {
        var stream = _client.GetStream("/events");
        var json = JsonSerializer.Serialize(evt);
        await stream.AppendAsync(Encoding.UTF8.GetBytes(json));
    }
}

IHttpClientFactory Integration

services.AddHttpClient("DurableStreams")
    .ConfigurePrimaryHttpMessageHandler(() => new SocketsHttpHandler
    {
        PooledConnectionLifetime = TimeSpan.FromMinutes(15),
        MaxConnectionsPerServer = 100
    });

var httpClient = serviceProvider
    .GetRequiredService<IHttpClientFactory>()
    .CreateClient("DurableStreams");

var client = new DurableStreamClient(
    new DurableStreamClientOptions(),
    httpClient
);

Cancellation

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

try
{
    await foreach (var batch in stream.ReadJsonAsync<Event>(cancellationToken: cts.Token))
    {
        // Process batch
    }
}
catch (OperationCanceledException)
{
    Console.WriteLine("Operation cancelled");
}

Source Code

Source: packages/client-dotnet/src/DurableStreams/