Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Java Client
The official Java client for Durable Streams protocol using java.net.http.HttpClient.
Installation
Maven
<dependency>
<groupId>com.durablestreams</groupId>
<artifactId>durable-streams</artifactId>
<version>0.1.0</version>
</dependency>
Gradle
implementation 'com.durablestreams:durable-streams:0.1.0'
Requires Java 11 or later
Quick Start
import com.durablestreams.*;
import com.durablestreams.model.*;
import com.google.gson.Gson;
public class Example {
public static void main(String[] args) {
// Create client
DurableStream client = DurableStream.create();
String url = "https://streams.example.com/my-stream";
// Create stream
client.create(url, "application/json");
// Append data
String json = new Gson().toJson(Map.of("message", "hello"));
AppendResult result = client.append(url, json.getBytes());
System.out.println("Offset: " + result.nextOffset().getValue());
// Read data
try (JsonIterator<Event> it = client.readJson(url, json ->
new Gson().fromJson(json, new TypeToken<List<Event>>(){}.getType())
)) {
for (Event event : it.items()) {
System.out.println("Event: " + event.getId());
}
}
}
}
Core APIs
DurableStream
Main client class (thread-safe, use singleton pattern).
Create a client with default settingsReturns: DurableStream
Create a builder for custom configurationReturns: DurableStream.Builder
Example:
DurableStream client = DurableStream.builder()
.httpClient(customHttpClient)
.retryPolicy(RetryPolicy.defaults())
.header("Authorization", "Bearer token")
.header("X-Request-ID", () -> UUID.randomUUID().toString())
.build();
Write Operations
Create a new streamContent type (e.g., "application/json")
Throws: StreamExistsException if stream already exists
Create a stream with full options
Append data to a streamData to append (for JSON, marshal first)
Returns: AppendResult with nextOffset() and etag()Throws: StreamNotFoundException, StreamClosedException, DurableStreamException
Example:
// JSON stream - marshal first
Gson gson = new Gson();
Map<String, String> message = Map.of("message", "hello");
byte[] json = gson.toJson(message).getBytes();
AppendResult result = client.append(url, json);
System.out.println("Next offset: " + result.nextOffset().getValue());
// Byte stream
client.append(url, "raw bytes".getBytes());
Append data asynchronouslyReturns: CompletableFuture<AppendResult>
Close a stream permanently (no more appends)Returns: CloseResult with finalOffset()Throws: StreamClosedException if already closed with data
Close with optional final messageReturns: CloseResult
Delete a streamThrows: StreamNotFoundException if stream doesn’t exist
Read Operations
Read from a stream (catch-up mode, from beginning)Returns: ChunkIterator
Read with optionsStarting offset (default: Offset.START)
Live mode: OFF, LONG_POLL, SSE
Stream cursor for CDN collapsing
Returns: ChunkIterator
Example:
import com.durablestreams.model.ReadOptions;
import com.durablestreams.model.LiveMode;
import com.durablestreams.model.Offset;
try (ChunkIterator it = client.read(url,
ReadOptions.builder()
.offset(lastOffset)
.liveMode(LiveMode.LONG_POLL)
.timeout(Duration.ofSeconds(30))
.build()
)) {
while (it.hasNext()) {
Chunk chunk = it.next();
System.out.printf("Got %d bytes at offset %s%n",
chunk.data().length, chunk.offset().getValue());
// Save checkpoint
saveOffset(chunk.offset());
}
}
Read JSON with type-safe parsingparser
Function<String, List<T>>
required
JSON parser function (use Gson, Jackson, etc.)
Returns: JsonIterator<T>
Example with Gson:
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
Gson gson = new Gson();
Type listType = new TypeToken<List<Event>>(){}.getType();
try (JsonIterator<Event> it = client.readJson(url, json ->
gson.fromJson(json, listType)
)) {
for (Event event : it.items()) {
System.out.println("Event: " + event.getId());
}
// Save checkpoint per batch
saveOffset(it.currentBatch().offset());
}
Get stream metadataReturns: Metadata with contentType(), nextOffset(), streamClosed()
ChunkIterator
Iterator for reading raw byte chunks (implements Iterator<Chunk> and AutoCloseable).
Check if more chunks availableReturns: boolean
Get next chunkReturns: Chunk with data(), offset(), upToDate(), cursor()Throws: NoSuchElementException if no more chunks
Close the iterator and release resources
JsonIterator<T>
Type-safe JSON iteration (implements Iterator<JsonBatch<T>> and AutoCloseable).
public class Event {
private String id;
private String type;
private Map<String, Object> data;
// getters/setters
}
try (JsonIterator<Event> it = client.readJson(url, parser)) {
for (Event event : it.items()) {
processEvent(event);
}
// Get current batch for checkpointing
JsonBatch<Event> batch = it.currentBatch();
saveOffset(batch.offset());
}
Iterate over individual items across all batchesReturns: Iterable<T>
Iterate over batches (for checkpointing)Returns: Iterable<JsonBatch<T>>
Get the current batchReturns: JsonBatch<T>
IdempotentProducer
Exactly-once writes with automatic batching and pipelining.
import com.durablestreams.IdempotentProducer;
IdempotentProducer producer = client.producer(
url,
"order-service-1",
IdempotentProducer.Config.defaults()
.withEpoch(0)
.withAutoClaim(true)
.withMaxBatchBytes(1024 * 1024)
);
// Fire-and-forget writes
producer.append("{\"orderId\":\"123\"}".getBytes());
producer.append("{\"orderId\":\"456\"}".getBytes());
// Ensure delivery
producer.flush();
producer.close();
Create an idempotent producerconfig
IdempotentProducer.Config
Auto-retry with epoch+1 on 403
Max wait before sending batch
Returns: IdempotentProducer
Fire-and-forget append (blocks until added to batch)Throws: DurableStreamException if producer is closed
Send pending batch and wait for all in-flight batchesThrows: DurableStreamException on failure
Flush and close the producer
Types
Offset
public record Offset(String value) {
public static final Offset START = new Offset("-1");
public static Offset of(String value) {
return new Offset(value);
}
public String getValue() {
return value;
}
}
LiveMode
public enum LiveMode {
OFF, // Catch-up only
LONG_POLL, // Long-polling
SSE; // Server-Sent Events
String getWireValue() {
return switch (this) {
case LONG_POLL -> "long-poll";
case SSE -> "sse";
default -> null;
};
}
}
JsonBatch<T>
public record JsonBatch<T>(
List<T> items,
Offset offset,
boolean upToDate,
String cursor
) {}
Error Handling
import com.durablestreams.exception.*;
try {
client.append(url, data);
} catch (StreamNotFoundException e) {
System.err.println("Stream not found: " + e.getUrl());
} catch (StreamClosedException e) {
System.err.println("Stream closed: " + e.getUrl());
} catch (SequenceConflictException e) {
System.err.println("Sequence conflict");
} catch (DurableStreamException e) {
System.err.println("Error: " + e.getMessage());
e.getStatusCode().ifPresent(code ->
System.err.println("Status: " + code));
}
Exception Hierarchy
DurableStreamException - Base exception
StreamNotFoundException - 404
StreamExistsException - 409 on create
StreamClosedException - 409 with Stream-Closed header
SequenceConflictException - 409 sequence conflict
StaleEpochException - 403 for producers
OffsetGoneException - 410 retention expired
ParseErrorException - JSON parse error
Advanced Features
Custom HTTP Client
import java.net.http.HttpClient;
import java.time.Duration;
HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(30))
.executor(Executors.newFixedThreadPool(10))
.build();
DurableStream client = DurableStream.builder()
.httpClient(httpClient)
.build();
Retry Policy
import com.durablestreams.internal.RetryPolicy;
RetryPolicy retryPolicy = RetryPolicy.builder()
.maxRetries(5)
.initialDelay(Duration.ofMillis(100))
.maxDelay(Duration.ofSeconds(5))
.backoffMultiplier(2.0)
.build();
DurableStream client = DurableStream.builder()
.retryPolicy(retryPolicy)
.build();
Try-with-resources
try (DurableStream client = DurableStream.create()) {
// Use client
} // Automatically closes executor
Source Code
Source: packages/client-java/src/main/java/com/durablestreams/