Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Java Client

The official Java client for Durable Streams protocol using java.net.http.HttpClient.

Installation

Maven

<dependency>
    <groupId>com.durablestreams</groupId>
    <artifactId>durable-streams</artifactId>
    <version>0.1.0</version>
</dependency>

Gradle

implementation 'com.durablestreams:durable-streams:0.1.0'
Requires Java 11 or later

Quick Start

import com.durablestreams.*;
import com.durablestreams.model.*;
import com.google.gson.Gson;

public class Example {
    public static void main(String[] args) {
        // Create client
        DurableStream client = DurableStream.create();

        String url = "https://streams.example.com/my-stream";

        // Create stream
        client.create(url, "application/json");

        // Append data
        String json = new Gson().toJson(Map.of("message", "hello"));
        AppendResult result = client.append(url, json.getBytes());
        System.out.println("Offset: " + result.nextOffset().getValue());

        // Read data
        try (JsonIterator<Event> it = client.readJson(url, json -> 
            new Gson().fromJson(json, new TypeToken<List<Event>>(){}.getType())
        )) {
            for (Event event : it.items()) {
                System.out.println("Event: " + event.getId());
            }
        }
    }
}

Core APIs

DurableStream

Main client class (thread-safe, use singleton pattern).
DurableStream.create()
static
Create a client with default settingsReturns: DurableStream
DurableStream.builder()
static
Create a builder for custom configurationReturns: DurableStream.Builder
Example:
DurableStream client = DurableStream.builder()
    .httpClient(customHttpClient)
    .retryPolicy(RetryPolicy.defaults())
    .header("Authorization", "Bearer token")
    .header("X-Request-ID", () -> UUID.randomUUID().toString())
    .build();

Write Operations

create()
method
Create a new stream
url
String
required
Stream URL
contentType
String
required
Content type (e.g., "application/json")
Throws: StreamExistsException if stream already exists
create()
method (overload)
Create a stream with full options
url
String
required
Stream URL
contentType
String
required
Content type
ttl
Duration
Time-to-live
expiresAt
Instant
Absolute expiry time
closed
boolean
default:"false"
Create in closed state
data
byte[]
Optional initial data
append()
method
Append data to a stream
url
String
required
Stream URL
data
byte[]
required
Data to append (for JSON, marshal first)
Returns: AppendResult with nextOffset() and etag()Throws: StreamNotFoundException, StreamClosedException, DurableStreamException
Example:
// JSON stream - marshal first
Gson gson = new Gson();
Map<String, String> message = Map.of("message", "hello");
byte[] json = gson.toJson(message).getBytes();

AppendResult result = client.append(url, json);
System.out.println("Next offset: " + result.nextOffset().getValue());

// Byte stream
client.append(url, "raw bytes".getBytes());
appendAsync()
method
Append data asynchronously
url
String
required
Stream URL
data
byte[]
required
Data to append
Returns: CompletableFuture<AppendResult>
close()
method
Close a stream permanently (no more appends)
url
String
required
Stream URL
Returns: CloseResult with finalOffset()Throws: StreamClosedException if already closed with data
close()
method (overload)
Close with optional final message
url
String
required
Stream URL
data
byte[]
Final message
contentType
String
Content type
Returns: CloseResult
delete()
method
Delete a stream
url
String
required
Stream URL
Throws: StreamNotFoundException if stream doesn’t exist

Read Operations

read()
method
Read from a stream (catch-up mode, from beginning)
url
String
required
Stream URL
Returns: ChunkIterator
read()
method (overload)
Read with options
url
String
required
Stream URL
options
ReadOptions
required
offset(Offset)
builder method
Starting offset (default: Offset.START)
liveMode(LiveMode)
builder method
Live mode: OFF, LONG_POLL, SSE
timeout(Duration)
builder method
Request timeout
cursor(String)
builder method
Stream cursor for CDN collapsing
Returns: ChunkIterator
Example:
import com.durablestreams.model.ReadOptions;
import com.durablestreams.model.LiveMode;
import com.durablestreams.model.Offset;

try (ChunkIterator it = client.read(url, 
    ReadOptions.builder()
        .offset(lastOffset)
        .liveMode(LiveMode.LONG_POLL)
        .timeout(Duration.ofSeconds(30))
        .build()
)) {
    while (it.hasNext()) {
        Chunk chunk = it.next();
        System.out.printf("Got %d bytes at offset %s%n", 
            chunk.data().length, chunk.offset().getValue());

        // Save checkpoint
        saveOffset(chunk.offset());
    }
}
readJson()
method
Read JSON with type-safe parsing
url
String
required
Stream URL
parser
Function<String, List<T>>
required
JSON parser function (use Gson, Jackson, etc.)
Returns: JsonIterator<T>
Example with Gson:
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;

Gson gson = new Gson();
Type listType = new TypeToken<List<Event>>(){}.getType();

try (JsonIterator<Event> it = client.readJson(url, json -> 
    gson.fromJson(json, listType)
)) {
    for (Event event : it.items()) {
        System.out.println("Event: " + event.getId());
    }

    // Save checkpoint per batch
    saveOffset(it.currentBatch().offset());
}
head()
method
Get stream metadata
url
String
required
Stream URL
Returns: Metadata with contentType(), nextOffset(), streamClosed()

ChunkIterator

Iterator for reading raw byte chunks (implements Iterator<Chunk> and AutoCloseable).
hasNext()
method
Check if more chunks availableReturns: boolean
next()
method
Get next chunkReturns: Chunk with data(), offset(), upToDate(), cursor()Throws: NoSuchElementException if no more chunks
close()
method
Close the iterator and release resources

JsonIterator<T>

Type-safe JSON iteration (implements Iterator<JsonBatch<T>> and AutoCloseable).
public class Event {
    private String id;
    private String type;
    private Map<String, Object> data;
    // getters/setters
}

try (JsonIterator<Event> it = client.readJson(url, parser)) {
    for (Event event : it.items()) {
        processEvent(event);
    }

    // Get current batch for checkpointing
    JsonBatch<Event> batch = it.currentBatch();
    saveOffset(batch.offset());
}
items()
method
Iterate over individual items across all batchesReturns: Iterable<T>
batches()
method
Iterate over batches (for checkpointing)Returns: Iterable<JsonBatch<T>>
currentBatch()
method
Get the current batchReturns: JsonBatch<T>

IdempotentProducer

Exactly-once writes with automatic batching and pipelining.
import com.durablestreams.IdempotentProducer;

IdempotentProducer producer = client.producer(
    url,
    "order-service-1",
    IdempotentProducer.Config.defaults()
        .withEpoch(0)
        .withAutoClaim(true)
        .withMaxBatchBytes(1024 * 1024)
);

// Fire-and-forget writes
producer.append("{\"orderId\":\"123\"}".getBytes());
producer.append("{\"orderId\":\"456\"}".getBytes());

// Ensure delivery
producer.flush();
producer.close();
producer()
method
Create an idempotent producer
url
String
required
Stream URL
producerId
String
required
Stable identifier
config
IdempotentProducer.Config
epoch
int
default:"0"
Starting epoch
autoClaim
boolean
default:"false"
Auto-retry with epoch+1 on 403
maxBatchBytes
int
default:"1048576"
Max batch size (1MB)
lingerMs
int
default:"5"
Max wait before sending batch
maxInFlight
int
default:"5"
Max concurrent batches
Returns: IdempotentProducer
append()
method
Fire-and-forget append (blocks until added to batch)
data
byte[]
required
Data to append
Throws: DurableStreamException if producer is closed
flush()
method
Send pending batch and wait for all in-flight batchesThrows: DurableStreamException on failure
close()
method (AutoCloseable)
Flush and close the producer

Types

Offset

public record Offset(String value) {
    public static final Offset START = new Offset("-1");

    public static Offset of(String value) {
        return new Offset(value);
    }

    public String getValue() {
        return value;
    }
}

LiveMode

public enum LiveMode {
    OFF,        // Catch-up only
    LONG_POLL,  // Long-polling
    SSE;        // Server-Sent Events

    String getWireValue() {
        return switch (this) {
            case LONG_POLL -> "long-poll";
            case SSE -> "sse";
            default -> null;
        };
    }
}

JsonBatch<T>

public record JsonBatch<T>(
    List<T> items,
    Offset offset,
    boolean upToDate,
    String cursor
) {}

Error Handling

import com.durablestreams.exception.*;

try {
    client.append(url, data);
} catch (StreamNotFoundException e) {
    System.err.println("Stream not found: " + e.getUrl());
} catch (StreamClosedException e) {
    System.err.println("Stream closed: " + e.getUrl());
} catch (SequenceConflictException e) {
    System.err.println("Sequence conflict");
} catch (DurableStreamException e) {
    System.err.println("Error: " + e.getMessage());
    e.getStatusCode().ifPresent(code -> 
        System.err.println("Status: " + code));
}

Exception Hierarchy

  • DurableStreamException - Base exception
    • StreamNotFoundException - 404
    • StreamExistsException - 409 on create
    • StreamClosedException - 409 with Stream-Closed header
    • SequenceConflictException - 409 sequence conflict
    • StaleEpochException - 403 for producers
    • OffsetGoneException - 410 retention expired
    • ParseErrorException - JSON parse error

Advanced Features

Custom HTTP Client

import java.net.http.HttpClient;
import java.time.Duration;

HttpClient httpClient = HttpClient.newBuilder()
    .version(HttpClient.Version.HTTP_2)
    .connectTimeout(Duration.ofSeconds(30))
    .executor(Executors.newFixedThreadPool(10))
    .build();

DurableStream client = DurableStream.builder()
    .httpClient(httpClient)
    .build();

Retry Policy

import com.durablestreams.internal.RetryPolicy;

RetryPolicy retryPolicy = RetryPolicy.builder()
    .maxRetries(5)
    .initialDelay(Duration.ofMillis(100))
    .maxDelay(Duration.ofSeconds(5))
    .backoffMultiplier(2.0)
    .build();

DurableStream client = DurableStream.builder()
    .retryPolicy(retryPolicy)
    .build();

Try-with-resources

try (DurableStream client = DurableStream.create()) {
    // Use client
} // Automatically closes executor

Source Code

Source: packages/client-java/src/main/java/com/durablestreams/