Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
.NET Client
The official .NET client for Durable Streams protocol with async/await support.
Installation
dotnet add package DurableStreams
Requires .NET 6.0 or later
Quick Start
using DurableStreams;
// Create a client (singleton pattern recommended)
var client = new DurableStreamClient();
// Get a stream handle
var stream = client.GetStream("https://streams.example.com/my-stream");
// Create the stream
await stream.CreateAsync(new CreateStreamOptions
{
ContentType = "application/json"
});
// Append data
var json = JsonSerializer.Serialize(new { message = "hello" });
await stream.AppendAsync(Encoding.UTF8.GetBytes(json));
// Read data
await foreach (var batch in stream.ReadJsonAsync<Event>())
{
foreach (var item in batch.Items)
{
Console.WriteLine($"Event: {item.Id}");
}
// Save checkpoint
await SaveOffsetAsync(batch.Offset);
}
Core APIs
DurableStreamClient
The main client class (thread-safe, designed for singleton use).
new DurableStreamClient()
Create a client with default options
new DurableStreamClient(options)
Create a client with custom optionsoptions
DurableStreamClientOptions
Base URL for relative stream paths
Static headers for all requests
Dynamic headers (evaluated per-request)
Example:
var client = new DurableStreamClient(new DurableStreamClientOptions
{
BaseUrl = "https://streams.example.com",
Timeout = TimeSpan.FromSeconds(30),
DefaultHeaders = new Dictionary<string, string>
{
["Authorization"] = "Bearer token123"
},
DynamicHeaders = new Dictionary<string, Func<CancellationToken, Task<string>>>
{
["X-Request-ID"] = async ct => Guid.NewGuid().ToString()
}
});
Get a handle to a stream (no network I/O)Stream URL (absolute or relative to BaseUrl)
Returns: DurableStream
Create a new stream and return a handlecancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<DurableStream>
Connect to an existing stream (validates via HEAD)cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<DurableStream>
DurableStream
Handle to a durable stream for read/write operations.
Write Operations
Create this stream on the serverContent type (e.g., "application/json")
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task
Append data to the streamData to append (for JSON streams, serialize first)
Writer coordination sequence
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<AppendResult> with NextOffset
Example:
// JSON stream
var json = JsonSerializer.Serialize(new { message = "hello" });
var result = await stream.AppendAsync(Encoding.UTF8.GetBytes(json));
Console.WriteLine($"Offset: {result.NextOffset}");
// Byte stream
var result = await stream.AppendAsync(Encoding.UTF8.GetBytes("raw text"));
Close the stream permanently (no more appends)cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<CloseResult> with FinalOffset
Delete the streamcancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task
Read Operations
Read JSON batches as an async enumerableOffset
Offset?
default:"Offset.Start"
Starting offset
LiveMode
LiveMode
default:"LiveMode.Auto"
Live mode: Off, Auto, LongPoll, Sse
cancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: IAsyncEnumerable<JsonBatch<T>>
Example:
public record Event(string Id, string Type, object Data);
await foreach (var batch in stream.ReadJsonAsync<Event>(new ReadOptions
{
Offset = lastOffset,
LiveMode = LiveMode.LongPoll
}))
{
foreach (var evt in batch.Items)
{
Console.WriteLine($"Event {evt.Id}: {evt.Type}");
}
// Save checkpoint
await db.SaveOffsetAsync(batch.Offset);
}
Read raw byte chunks as an async enumerablecancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: IAsyncEnumerable<ByteChunk>
Get stream metadata via HEAD requestcancellationToken
CancellationToken
default:"default"
Cancellation token
Returns: Task<StreamCheckpoint> with ContentType, Offset, StreamClosed
IdempotentProducer
Exactly-once writes with automatic batching and pipelining.
var producer = new IdempotentProducer(stream, "order-service-1", new IdempotentProducerOptions
{
Epoch = 0,
AutoClaim = true,
MaxBatchBytes = 1024 * 1024,
LingerMs = 5,
MaxInFlight = 5
});
// Fire-and-forget writes
producer.Append(Encoding.UTF8.GetBytes("{\"orderId\":\"123\"}"));
producer.Append(Encoding.UTF8.GetBytes("{\"orderId\":\"456\"}"));
// Ensure delivery
await producer.FlushAsync();
await producer.CloseAsync();
Create an idempotent produceroptions
IdempotentProducerOptions
Auto-retry with epoch+1 on 403
Max wait before sending batch
Fire-and-forget append (synchronous, returns immediately)
Send pending batch and wait for all in-flight batchesReturns: Task
Close the stream with producer headers (idempotent)Returns: Task<CloseResult>
Types
Offset
public record Offset(string Value)
{
public static Offset Start { get; } = new Offset("-1");
public bool IsStart => Value == "-1";
}
LiveMode
public enum LiveMode
{
Off, // Catch-up only
Auto, // Auto-select (SSE for JSON, long-poll for binary)
LongPoll, // Explicit long-polling
Sse // Explicit Server-Sent Events
}
JsonBatch<T>
public record JsonBatch<T>
{
public IReadOnlyList<T> Items { get; init; }
public Offset Offset { get; init; }
public bool UpToDate { get; init; }
public string? Cursor { get; init; }
public bool StreamClosed { get; init; }
}
Error Handling
using DurableStreams.Exceptions;
try
{
await stream.AppendAsync(data);
}
catch (StreamNotFoundException ex)
{
Console.WriteLine($"Stream not found: {ex.Url}");
}
catch (StreamClosedException ex)
{
Console.WriteLine($"Stream closed: {ex.Url}");
}
catch (DurableStreamException ex)
{
Console.WriteLine($"Error {ex.StatusCode}: {ex.Message}");
}
Exception Hierarchy
DurableStreamException - Base exception
StreamNotFoundException - 404
StreamExistsException - 409 on create
StreamClosedException - 409 with Stream-Closed header
SequenceConflictException - 409 sequence conflict
StaleEpochException - 403 for producers
Advanced Features
Dependency Injection
// In Startup.cs or Program.cs
services.AddSingleton(new DurableStreamClient(new DurableStreamClientOptions
{
BaseUrl = "https://streams.example.com"
}));
// In your service
public class EventService
{
private readonly DurableStreamClient _client;
public EventService(DurableStreamClient client)
{
_client = client;
}
public async Task PublishAsync(Event evt)
{
var stream = _client.GetStream("/events");
var json = JsonSerializer.Serialize(evt);
await stream.AppendAsync(Encoding.UTF8.GetBytes(json));
}
}
IHttpClientFactory Integration
services.AddHttpClient("DurableStreams")
.ConfigurePrimaryHttpMessageHandler(() => new SocketsHttpHandler
{
PooledConnectionLifetime = TimeSpan.FromMinutes(15),
MaxConnectionsPerServer = 100
});
var httpClient = serviceProvider
.GetRequiredService<IHttpClientFactory>()
.CreateClient("DurableStreams");
var client = new DurableStreamClient(
new DurableStreamClientOptions(),
httpClient
);
Cancellation
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
await foreach (var batch in stream.ReadJsonAsync<Event>(cancellationToken: cts.Token))
{
// Process batch
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation cancelled");
}
Source Code
Source: packages/client-dotnet/src/DurableStreams/