Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Go Client
The official Go client for Durable Streams protocol with iterator-based streaming.
Installation
go get github.com/durable-streams/durable-streams/packages/client-go
Requires Go 1.21+
Quick Start
import (
durablestreams "github.com/durable-streams/durable-streams/packages/client-go"
)
func main() {
client := durablestreams.NewClient()
stream := client.Stream("https://streams.example.com/my-stream")
// Create stream
err := stream.Create(ctx, durablestreams.WithContentType("application/json"))
// Append data
result, err := stream.Append(ctx, []byte(`{"message":"hello"}`))
fmt.Println("Next offset:", result.NextOffset)
// Read with iterator
it := stream.Read(ctx, durablestreams.WithLive(durablestreams.LiveModeLongPoll))
defer it.Close()
for {
chunk, err := it.Next()
if errors.Is(err, durablestreams.Done) {
break
}
fmt.Printf("Got %d bytes\n", len(chunk.Data))
}
}
Core APIs
Client
Create a new client with default settingsReturns: *Client
Options:
client := durablestreams.NewClient(
durablestreams.WithBaseURL("https://streams.example.com"),
durablestreams.WithHTTPClient(httpClient),
)
Stream
Handle to a durable stream for read/write operations.
Stream Methods
Create a new stream (idempotent)Content type (e.g., "application/json")
Returns: error
Append data to the stream with automatic retriesData to append (for JSON, pass pre-marshaled JSON)
Writer coordination sequence
Returns: (*AppendResult, error)Stream offset after append
ETag for conditional requests
Example:
// JSON stream
data, _ := json.Marshal(map[string]string{"message": "hello"})
result, err := stream.Append(ctx, data)
if err != nil {
log.Fatal(err)
}
fmt.Println("Offset:", result.NextOffset)
// With sequence for coordination
result, err = stream.Append(ctx, data, durablestreams.WithSeq("seq-001"))
Append JSON data (auto-marshals)Returns: (*AppendResult, error)
Example:
result, err := stream.AppendJSON(ctx, map[string]string{
"event": "order.created",
"orderId": "123",
})
Create an iterator for reading stream chunksStarting offset (default: StartOffset = “-1”)
Live mode: LiveModeNone, LiveModeLongPoll, LiveModeSSE
WithTimeout(time.Duration)
Request timeout
Returns: *ChunkIterator
Example:
it := stream.Read(ctx,
durablestreams.WithOffset(lastOffset),
durablestreams.WithLive(durablestreams.LiveModeLongPoll),
)
defer it.Close()
for {
chunk, err := it.Next()
if errors.Is(err, durablestreams.Done) {
break
}
if err != nil {
log.Fatal(err)
}
// Process chunk
fmt.Printf("Got %d bytes at offset %s\n", len(chunk.Data), chunk.Offset)
// Save checkpoint
saveOffset(chunk.Offset)
}
Close the stream permanently (no more appends)Returns: (*CloseResult, error)Stream offset after close
Get stream metadata via HEAD requestReturns: (*Metadata, error)Whether stream is closed (EOF)
Delete the streamReturns: error
ChunkIterator
Iterator for reading stream chunks.
Get the next chunkReturns: (*Chunk, error)Returns durablestreams.Done when iteration is complete.Whether we’ve reached the current end
Stream cursor for CDN collapsing
Close the iterator and release resourcesReturns: error
JSONIterator
Type-safe JSON iteration with Go generics.
import "encoding/json"
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
}
it := stream.Read(ctx)
defer it.Close()
jsonIt := durablestreams.NewJSONIterator[Event](it)
for {
batch, err := jsonIt.Next()
if errors.Is(err, durablestreams.Done) {
break
}
if err != nil {
log.Fatal(err)
}
for _, event := range batch.Items {
fmt.Printf("Event %s: %s\n", event.ID, event.Type)
}
// Save checkpoint using batch offset
saveOffset(batch.Offset)
}
IdempotentProducer
Exactly-once writes with automatic batching and pipelining.
import durablestreams "github.com/durable-streams/durable-streams/packages/client-go"
producer, err := client.IdempotentProducer(
"https://streams.example.com/orders",
"order-service-1",
durablestreams.IdempotentProducerConfig{
Epoch: 0,
AutoClaim: true,
MaxBatchBytes: 1024 * 1024,
LingerMs: 5,
MaxInFlight: 5,
},
)
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// Fire-and-forget writes
producer.Append([]byte(`{"orderId":"123"}`))
producer.Append([]byte(`{"orderId":"456"}`))
// Ensure delivery
if err := producer.Flush(ctx); err != nil {
log.Fatal(err)
}
Client.IdempotentProducer()
Create an idempotent producerStable identifier (e.g., “order-service-1”)
config
IdempotentProducerConfig
required
Starting epoch (increment on restart)
Auto-retry with epoch+1 on 403
Max wait before sending batch
Returns: (*IdempotentProducer, error)
Fire-and-forget append (returns immediately)Data to append ([]byte or string)
Returns: error
Send pending batch and wait for all in-flight batchesReturns: error
Close the stream with producer headers (idempotent)Returns: (IdempotentAppendResult, error)
Types
Offset
type Offset string
const (
StartOffset Offset = "-1" // Start of stream
)
// Create from string
offset := Offset("123_456")
// Check if start
if offset.IsStart() {
// ...
}
LiveMode
type LiveMode int
const (
LiveModeNone LiveMode = 0 // Catch-up only
LiveModeLongPoll LiveMode = 1 // Long-polling
LiveModeSSE LiveMode = 2 // Server-Sent Events
)
Error Handling
import (
durablestreams "github.com/durable-streams/durable-streams/packages/client-go"
)
result, err := stream.Append(ctx, data)
if err != nil {
switch {
case errors.Is(err, durablestreams.ErrStreamNotFound):
// Handle 404
case errors.Is(err, durablestreams.ErrStreamClosed):
// Handle closed stream
case errors.Is(err, durablestreams.ErrSeqConflict):
// Handle sequence conflict
default:
log.Fatal(err)
}
}
Error Variables
ErrStreamNotFound - Stream doesn’t exist (404)
ErrStreamExists - Stream already exists (409 on create)
ErrStreamClosed - Stream is closed (409 with Stream-Closed header)
ErrSeqConflict - Sequence conflict (409)
ErrStaleEpoch - Producer epoch is stale (403)
ErrEmptyAppend - Cannot append empty data
Done - Iterator exhausted
Advanced Features
Custom HTTP Client
import (
"net/http"
"time"
)
httpClient := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
},
}
client := durablestreams.NewClient(
durablestreams.WithHTTPClient(httpClient),
)
Retry Policy
retryPolicy := durablestreams.RetryPolicy{
MaxRetries: 5,
InitialDelay: 100 * time.Millisecond,
MaxDelay: 5 * time.Second,
BackoffFactor: 2.0,
JitterFraction: 0.1,
}
client := durablestreams.NewClient(
durablestreams.WithRetryPolicy(retryPolicy),
)
Context Cancellation
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
it := stream.Read(ctx)
defer it.Close()
for {
chunk, err := it.Next()
if err == context.DeadlineExceeded {
fmt.Println("Timeout")
break
}
// ...
}
Source Code
Source: packages/client-go/