Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Go Client

The official Go client for Durable Streams protocol with iterator-based streaming.

Installation

go get github.com/durable-streams/durable-streams/packages/client-go
Requires Go 1.21+

Quick Start

import (
    durablestreams "github.com/durable-streams/durable-streams/packages/client-go"
)

func main() {
    client := durablestreams.NewClient()
    stream := client.Stream("https://streams.example.com/my-stream")

    // Create stream
    err := stream.Create(ctx, durablestreams.WithContentType("application/json"))

    // Append data
    result, err := stream.Append(ctx, []byte(`{"message":"hello"}`))
    fmt.Println("Next offset:", result.NextOffset)

    // Read with iterator
    it := stream.Read(ctx, durablestreams.WithLive(durablestreams.LiveModeLongPoll))
    defer it.Close()

    for {
        chunk, err := it.Next()
        if errors.Is(err, durablestreams.Done) {
            break
        }
        fmt.Printf("Got %d bytes\n", len(chunk.Data))
    }
}

Core APIs

Client

NewClient()
function
Create a new client with default settings
options
...ClientOption
Optional configuration
Returns: *Client
Options:
client := durablestreams.NewClient(
    durablestreams.WithBaseURL("https://streams.example.com"),
    durablestreams.WithHTTPClient(httpClient),
)

Stream

Handle to a durable stream for read/write operations.

Stream Methods

Create()
method
Create a new stream (idempotent)
ctx
context.Context
required
Context for cancellation
opts
...CreateOption
WithContentType(string)
option
Content type (e.g., "application/json")
WithTTL(time.Duration)
option
Time-to-live
WithExpiresAt(time.Time)
option
Absolute expiry time
WithClosed(bool)
option
Create in closed state
Returns: error
Append()
method
Append data to the stream with automatic retries
ctx
context.Context
required
Context for cancellation
data
[]byte
required
Data to append (for JSON, pass pre-marshaled JSON)
opts
...AppendOption
WithSeq(string)
option
Writer coordination sequence
WithHeaders(map[string]string)
option
Additional headers
Returns: (*AppendResult, error)
AppendResult
struct
NextOffset
Offset
Stream offset after append
ETag
string
ETag for conditional requests
Example:
// JSON stream
data, _ := json.Marshal(map[string]string{"message": "hello"})
result, err := stream.Append(ctx, data)
if err != nil {
    log.Fatal(err)
}
fmt.Println("Offset:", result.NextOffset)

// With sequence for coordination
result, err = stream.Append(ctx, data, durablestreams.WithSeq("seq-001"))
AppendJSON()
method
Append JSON data (auto-marshals)
ctx
context.Context
required
Context
v
any
required
Value to marshal as JSON
opts
...AppendOption
Append options
Returns: (*AppendResult, error)
Example:
result, err := stream.AppendJSON(ctx, map[string]string{
    "event": "order.created",
    "orderId": "123",
})
Read()
method
Create an iterator for reading stream chunks
ctx
context.Context
required
Context for cancellation
opts
...ReadOption
WithOffset(Offset)
option
Starting offset (default: StartOffset = “-1”)
WithLive(LiveMode)
option
Live mode: LiveModeNone, LiveModeLongPoll, LiveModeSSE
WithTimeout(time.Duration)
option
Request timeout
WithHeaders(map[string]string)
option
Additional headers
Returns: *ChunkIterator
Example:
it := stream.Read(ctx,
    durablestreams.WithOffset(lastOffset),
    durablestreams.WithLive(durablestreams.LiveModeLongPoll),
)
defer it.Close()

for {
    chunk, err := it.Next()
    if errors.Is(err, durablestreams.Done) {
        break
    }
    if err != nil {
        log.Fatal(err)
    }

    // Process chunk
    fmt.Printf("Got %d bytes at offset %s\n", len(chunk.Data), chunk.Offset)

    // Save checkpoint
    saveOffset(chunk.Offset)
}
Close()
method
Close the stream permanently (no more appends)
ctx
context.Context
required
Context
opts
...CloseOption
WithCloseData([]byte)
option
Optional final message
Returns: (*CloseResult, error)
CloseResult
struct
FinalOffset
Offset
Stream offset after close
Head()
method
Get stream metadata via HEAD request
ctx
context.Context
required
Context
Returns: (*Metadata, error)
Metadata
struct
ContentType
string
Stream’s MIME type
NextOffset
Offset
Tail offset
StreamClosed
bool
Whether stream is closed (EOF)
TTL
*time.Duration
Remaining time-to-live
Delete()
method
Delete the stream
ctx
context.Context
required
Context
Returns: error

ChunkIterator

Iterator for reading stream chunks.
Next()
method
Get the next chunkReturns: (*Chunk, error)Returns durablestreams.Done when iteration is complete.
Chunk
struct
Data
[]byte
Chunk data
Offset
Offset
Offset after this chunk
UpToDate
bool
Whether we’ve reached the current end
Cursor
string
Stream cursor for CDN collapsing
Close()
method
Close the iterator and release resourcesReturns: error

JSONIterator

Type-safe JSON iteration with Go generics.
import "encoding/json"

type Event struct {
    ID   string `json:"id"`
    Type string `json:"type"`
}

it := stream.Read(ctx)
defer it.Close()

jsonIt := durablestreams.NewJSONIterator[Event](it)
for {
    batch, err := jsonIt.Next()
    if errors.Is(err, durablestreams.Done) {
        break
    }
    if err != nil {
        log.Fatal(err)
    }

    for _, event := range batch.Items {
        fmt.Printf("Event %s: %s\n", event.ID, event.Type)
    }

    // Save checkpoint using batch offset
    saveOffset(batch.Offset)
}

IdempotentProducer

Exactly-once writes with automatic batching and pipelining.
import durablestreams "github.com/durable-streams/durable-streams/packages/client-go"

producer, err := client.IdempotentProducer(
    "https://streams.example.com/orders",
    "order-service-1",
    durablestreams.IdempotentProducerConfig{
        Epoch:         0,
        AutoClaim:     true,
        MaxBatchBytes: 1024 * 1024,
        LingerMs:      5,
        MaxInFlight:   5,
    },
)
if err != nil {
    log.Fatal(err)
}
defer producer.Close()

// Fire-and-forget writes
producer.Append([]byte(`{"orderId":"123"}`))
producer.Append([]byte(`{"orderId":"456"}`))

// Ensure delivery
if err := producer.Flush(ctx); err != nil {
    log.Fatal(err)
}
Client.IdempotentProducer()
method
Create an idempotent producer
url
string
required
Stream URL
producerID
string
required
Stable identifier (e.g., “order-service-1”)
config
IdempotentProducerConfig
required
Epoch
int
default:"0"
Starting epoch (increment on restart)
AutoClaim
bool
default:"false"
Auto-retry with epoch+1 on 403
MaxBatchBytes
int
default:"1048576"
Max batch size (1MB)
LingerMs
int
default:"5"
Max wait before sending batch
MaxInFlight
int
default:"5"
Max concurrent batches
OnError
func(error)
Error callback
Returns: (*IdempotentProducer, error)
Append()
method
Fire-and-forget append (returns immediately)
data
any
required
Data to append ([]byte or string)
Returns: error
Flush()
method
Send pending batch and wait for all in-flight batches
ctx
context.Context
required
Context for cancellation
Returns: error
CloseStream()
method
Close the stream with producer headers (idempotent)
ctx
context.Context
required
Context
data
[]byte
Optional final message
Returns: (IdempotentAppendResult, error)

Types

Offset

type Offset string

const (
    StartOffset Offset = "-1" // Start of stream
)

// Create from string
offset := Offset("123_456")

// Check if start
if offset.IsStart() {
    // ...
}

LiveMode

type LiveMode int

const (
    LiveModeNone     LiveMode = 0 // Catch-up only
    LiveModeLongPoll LiveMode = 1 // Long-polling
    LiveModeSSE      LiveMode = 2 // Server-Sent Events
)

Error Handling

import (
    durablestreams "github.com/durable-streams/durable-streams/packages/client-go"
)

result, err := stream.Append(ctx, data)
if err != nil {
    switch {
    case errors.Is(err, durablestreams.ErrStreamNotFound):
        // Handle 404
    case errors.Is(err, durablestreams.ErrStreamClosed):
        // Handle closed stream
    case errors.Is(err, durablestreams.ErrSeqConflict):
        // Handle sequence conflict
    default:
        log.Fatal(err)
    }
}

Error Variables

  • ErrStreamNotFound - Stream doesn’t exist (404)
  • ErrStreamExists - Stream already exists (409 on create)
  • ErrStreamClosed - Stream is closed (409 with Stream-Closed header)
  • ErrSeqConflict - Sequence conflict (409)
  • ErrStaleEpoch - Producer epoch is stale (403)
  • ErrEmptyAppend - Cannot append empty data
  • Done - Iterator exhausted

Advanced Features

Custom HTTP Client

import (
    "net/http"
    "time"
)

httpClient := &http.Client{
    Timeout: 30 * time.Second,
    Transport: &http.Transport{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
    },
}

client := durablestreams.NewClient(
    durablestreams.WithHTTPClient(httpClient),
)

Retry Policy

retryPolicy := durablestreams.RetryPolicy{
    MaxRetries:     5,
    InitialDelay:   100 * time.Millisecond,
    MaxDelay:       5 * time.Second,
    BackoffFactor:  2.0,
    JitterFraction: 0.1,
}

client := durablestreams.NewClient(
    durablestreams.WithRetryPolicy(retryPolicy),
)

Context Cancellation

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

it := stream.Read(ctx)
defer it.Close()

for {
    chunk, err := it.Next()
    if err == context.DeadlineExceeded {
        fmt.Println("Timeout")
        break
    }
    // ...
}

Source Code

Source: packages/client-go/