Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
PHP Client
The official PHP client for Durable Streams protocol with PSR-18 HTTP client support.
Installation
composer require durable-streams/client
Requires PHP 8.1 or later
Quick Start
<?php
use DurableStreams\DurableStream;
use DurableStreams\LiveMode;
// Create a stream
$stream = DurableStream::create(
'https://streams.example.com/my-stream',
contentType: 'application/json'
);
// Append data
$stream->append(json_encode(['message' => 'hello']));
// Read data
$response = $stream->read(offset: '-1', live: LiveMode::LongPoll);
foreach ($response->iterJson() as $item) {
echo "Item: " . json_encode($item) . "\n";
}
Core APIs
DurableStream
Handle for interacting with a durable stream.
Static Factory Methods
Create a new streamcontentType
string
default:"'application/octet-stream'"
Content type
Absolute expiry (ISO 8601)
Optional HTTP client (PSR-18)
Returns: DurableStreamThrows: StreamExistsException if stream already exists
Connect to an existing stream (validates via HEAD)Returns: DurableStreamThrows: StreamNotFoundException if stream doesn’t exist
Example:
$stream = DurableStream::create(
'https://streams.example.com/events',
contentType: 'application/json',
ttlSeconds: 86400, // 24 hours
headers: ['Authorization' => 'Bearer token123']
);
Write Operations
Append data to the streamData to append (for JSON streams, pass pre-encoded JSON string)
Writer coordination sequence
Additional headers for this request
Returns: AppendResult with offset and duplicateThrows: StreamClosedException, SeqConflictException, DurableStreamException
Example:
// JSON stream - pass pre-encoded JSON
$result = $stream->append(json_encode(['message' => 'hello']));
echo "Offset: {$result->offset}\n";
// With sequence for coordination
$result = $stream->append(
json_encode(['message' => 'world']),
seq: 'seq-001'
);
// Byte stream
$result = $stream->append('raw text data');
Close the stream permanently (no more appends)Returns: CloseResult with finalOffsetThrows: StreamClosedException if called with data on already-closed stream
Delete this streamThrows: StreamNotFoundException if stream doesn’t exist
DurableStream::deleteStatic()
Delete a stream without creating a handle
Read Operations
Read from the streamlive
LiveMode
default:"LiveMode::Off"
Live mode: LiveMode::Off, LiveMode::LongPoll, LiveMode::SSE
Returns: StreamResponse
Example:
use DurableStreams\LiveMode;
$response = $stream->read(
offset: $lastOffset,
live: LiveMode::LongPoll
);
// Iterate over JSON items
foreach ($response->iterJson() as $item) {
echo "Item: " . json_encode($item) . "\n";
}
// Or get all items at once
$items = $response->jsonAll();
foreach ($items as $item) {
processItem($item);
}
// Save checkpoint
$checkpoint = $response->offset();
saveOffset($checkpoint);
Get stream metadata via HEAD requestReturns: HeadResult with offset, contentType, streamClosed
DurableStream::headStatic()
Get metadata without creating a handleReturns: HeadResult
StreamResponse
Response object for reading stream data.
Iterate over individual JSON itemsReturns: Iterator<mixed>
Iterate over byte chunksReturns: Iterator<string>
Get all JSON items as arrayReturns: array<mixed>
Get all bytesReturns: string
Get current stream offsetReturns: string
Check if we’ve reached the current end of streamReturns: bool
IdempotentProducer
Exactly-once writes with automatic batching.
use DurableStreams\IdempotentProducer;
$producer = new IdempotentProducer(
url: 'https://streams.example.com/orders',
producerId: 'order-service-1',
epoch: 0,
autoClaim: true,
maxBatchBytes: 1024 * 1024,
lingerMs: 5
);
// Fire-and-forget writes
$producer->append(json_encode(['orderId' => '123']));
$producer->append(json_encode(['orderId' => '456']));
// Ensure delivery
$producer->flush();
Create an idempotent producerAuto-retry with epoch+1 on 403
Max wait before sending batch
contentType
string
default:"'application/octet-stream'"
Content type
Fire-and-forget append (returns immediately)Data to append (for JSON, pass pre-encoded JSON)
Send pending batch and wait for completion
Types
LiveMode
enum LiveMode: string
{
case Off = 'off';
case LongPoll = 'long-poll';
case SSE = 'sse';
}
AppendResult
final class AppendResult
{
public readonly string $offset;
public readonly int $status;
public readonly bool $duplicate;
}
HeadResult
final class HeadResult
{
public readonly string $offset;
public readonly ?string $contentType;
public readonly bool $streamClosed;
}
CloseResult
final class CloseResult
{
public readonly string $finalOffset;
}
Error Handling
use DurableStreams\Exception\{
DurableStreamException,
StreamNotFoundException,
StreamClosedException,
StreamExistsException,
SeqConflictException
};
try {
$stream->append($data);
} catch (StreamClosedException $e) {
echo "Stream is closed: {$e->getUrl()}\n";
} catch (SeqConflictException $e) {
echo "Sequence conflict\n";
} catch (StreamNotFoundException $e) {
echo "Stream not found: {$e->getUrl()}\n";
} catch (DurableStreamException $e) {
echo "Error {$e->getStatusCode()}: {$e->getMessage()}\n";
}
Exception Hierarchy
DurableStreamException - Base exception
StreamNotFoundException - 404
StreamExistsException - 409 on create
StreamClosedException - 409 with Stream-Closed header
SeqConflictException - 409 sequence conflict
StaleEpochException - 403 for producers
UnauthorizedException - 401
RateLimitedException - 429
MessageTooLargeException - 413
Advanced Features
Custom HTTP Client (PSR-18)
use GuzzleHttp\Client;
use DurableStreams\Internal\Psr18HttpClient;
$guzzle = new Client([
'timeout' => 30,
'connect_timeout' => 10,
]);
$httpClient = new Psr18HttpClient($guzzle);
$stream = new DurableStream(
url: 'https://streams.example.com/events',
client: $httpClient
);
Streaming Large Reads
$response = $stream->read(live: LiveMode::LongPoll);
foreach ($response->iterJson() as $item) {
// Process one item at a time (memory efficient)
processItem($item);
// Save checkpoint periodically
if (shouldCheckpoint()) {
saveOffset($response->offset());
}
}
JSON Batching
// For JSON streams, DurableStream automatically wraps data in arrays
// No need to wrap manually
$stream = DurableStream::create(
'https://streams.example.com/events',
contentType: 'application/json'
);
// This is automatically wrapped as [{"event":"test"}] by the client
$stream->append(json_encode(['event' => 'test']));
Helper Functions
The library provides global helper functions:
use function DurableStreams\stream;
// Quick stream creation
$response = stream([
'url' => 'https://streams.example.com/events',
'offset' => '-1',
'live' => 'long-poll',
]);
foreach ($response->iterJson() as $item) {
echo json_encode($item) . "\n";
}
Source Code
Source: packages/client-php/src/