Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Durable Streams provides two comprehensive conformance test suites to ensure protocol compliance across all implementations:
- Server Conformance Tests: Verify that your server correctly implements the Durable Streams protocol
- Client Conformance Tests: Validate client implementations across any programming language
The server conformance test suite verifies that your server implementation correctly handles all aspects of the Durable Streams protocol.
Installation
npm install @durable-streams/server-conformance-tests
Running Tests
CI Mode (Run Once)
Run tests once and exit - perfect for continuous integration:
npx @durable-streams/server-conformance-tests --run http://localhost:4437
Watch Mode (Development)
Automatically rerun tests when source files change:
# Watch a single directory
npx @durable-streams/server-conformance-tests --watch src http://localhost:4437
# Watch multiple directories
npx @durable-streams/server-conformance-tests --watch src lib http://localhost:4437
Programmatic Usage
Integrate conformance tests into your existing test suite:
import { runConformanceTests } from "@durable-streams/server-conformance-tests"
describe("My Server Implementation", () => {
const config = { baseUrl: "" }
beforeAll(async () => {
// Start your server
const server = await startMyServer({ port: 0 })
config.baseUrl = server.url
})
afterAll(async () => {
await server.stop()
})
// Run all conformance tests
runConformanceTests(config)
})
CI Integration
Example GitHub Actions workflow:
jobs:
conformance:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: "20"
- name: Install dependencies
run: npm install
- name: Start server
run: npm run start:server &
- name: Wait for server
run: npx wait-on http://localhost:4437
- name: Run conformance tests
run: npx @durable-streams/server-conformance-tests --run http://localhost:4437
Test Coverage
The server conformance suite validates:
Core Operations
- Stream creation and deletion
- Idempotent operations
- Append operations (string, binary, chunking)
- Read operations (empty/full streams, offset handling)
- HEAD metadata requests
Live Streaming
- Long-poll operations (waiting, immediate returns)
- SSE (Server-Sent Events) mode
- JSON mode with array flattening
Protocol Compliance
- HTTP headers and status codes
- Content-Type validation and enforcement
- Cache headers (ETag, 304 responses)
- TTL and Expires-At handling
- Offset validation and monotonicity
Data Integrity
- Byte-exactness guarantees
- Read-your-writes consistency
- Message ordering preservation
- Sequence ordering validation
Edge Cases & Robustness
- Empty request bodies
- Large payloads and chunking
- Binary data handling
- Malformed input fuzzing
- Property-based testing with random sequences
The client conformance test suite uses a language-agnostic architecture to validate client implementations in any programming language.
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Test Runner (Node.js) │
│ - Reads test cases from YAML │
│ - Manages reference server lifecycle │
│ - Orchestrates client adapter process │
│ - Compares results against expectations │
└────────────────────────┬────────────────────────────────────────┘
│ stdin/stdout (JSON lines)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Client Adapter (any language) │
│ - Reads test commands from stdin │
│ - Uses native SDK to execute operations │
│ - Reports results to stdout │
└─────────────────────────────────────────────────────────────────┘
│ HTTP
▼
┌─────────────────────────────────────────────────────────────────┐
│ Reference Server (TypeScript) │
│ - Full protocol compliance │
│ - Validates client behavior │
└─────────────────────────────────────────────────────────────────┘
Installation
npm install @durable-streams/client-conformance-tests
Running Tests
Test Built-in TypeScript Client
npx @durable-streams/client-conformance-tests --run ts
Test Custom Client Adapter
# Python client
npx @durable-streams/client-conformance-tests --run ./my-python-adapter.py
# Go client
npx @durable-streams/client-conformance-tests --run ./my-go-adapter
# Any executable
npx @durable-streams/client-conformance-tests --run /path/to/adapter
Test Specific Suites
# Test only producer functionality
npx @durable-streams/client-conformance-tests --run ts --suite producer
# Test only consumer functionality
npx @durable-streams/client-conformance-tests --run ts --suite consumer
# Test specific tags
npx @durable-streams/client-conformance-tests --run ts --tag core
Advanced Options
# Verbose output and fail-fast
npx @durable-streams/client-conformance-tests --run ts --verbose --fail-fast
# Custom timeout
npx @durable-streams/client-conformance-tests --run ts --timeout 60000
# Custom server port
npx @durable-streams/client-conformance-tests --run ts --port 8080
Implementing a Client Adapter
A client adapter is an executable that communicates via stdin/stdout using JSON-line protocol.
Protocol Overview
- Test runner starts your adapter as a subprocess
- Runner sends JSON commands to stdin (one per line)
- Adapter executes commands using your client SDK
- Adapter sends JSON results to stdout (one per line)
Command Types
Init Command (first command, always sent):
// stdin
{"type":"init","serverUrl":"http://localhost:3000"}
// stdout
{"type":"init","success":true,"clientName":"my-client","clientVersion":"1.0.0","features":{"batching":true,"sse":true,"longPoll":true}}
Create Command:
// stdin
{"type":"create","path":"/my-stream","contentType":"text/plain"}
// stdout (success)
{"type":"create","success":true,"status":201,"offset":"0"}
// stdout (error)
{"type":"error","success":false,"commandType":"create","status":409,"errorCode":"CONFLICT","message":"Stream already exists"}
Append Command:
// stdin
{"type":"append","path":"/my-stream","data":"Hello, World!","seq":1}
// stdout
{"type":"append","success":true,"status":200,"offset":"13"}
Read Command:
// stdin
{"type":"read","path":"/my-stream","offset":"0","live":"long-poll","timeoutMs":5000}
// stdout
{"type":"read","success":true,"status":200,"chunks":[{"data":"Hello, World!","offset":"13"}],"offset":"13","upToDate":true}
Head Command:
// stdin
{"type":"head","path":"/my-stream"}
// stdout
{"type":"head","success":true,"status":200,"offset":"13","contentType":"text/plain"}
Delete Command:
// stdin
{"type":"delete","path":"/my-stream"}
// stdout
{"type":"delete","success":true,"status":200}
Shutdown Command:
// stdin
{"type":"shutdown"}
// stdout
{"type":"shutdown","success":true}
Standard Error Codes
NETWORK_ERROR - Network connection failed
TIMEOUT - Operation timed out
CONFLICT - Stream already exists (409)
NOT_FOUND - Stream not found (404)
SEQUENCE_CONFLICT - Sequence number conflict (409)
INVALID_OFFSET - Invalid offset format
UNEXPECTED_STATUS - Unexpected HTTP status
PARSE_ERROR - Failed to parse response
INTERNAL_ERROR - Client internal error
NOT_SUPPORTED - Operation not supported
Example: Python Adapter
#!/usr/bin/env python3
import sys
import json
from durable_streams import DurableStream, DurableStreamError
def main():
server_url = ""
for line in sys.stdin:
if not line.strip():
continue
command = json.loads(line)
result = handle_command(command, server_url)
if command["type"] == "init":
server_url = command["serverUrl"]
print(json.dumps(result), flush=True)
if command["type"] == "shutdown":
break
def handle_command(cmd, server_url):
try:
if cmd["type"] == "init":
return {
"type": "init",
"success": True,
"clientName": "durable-streams-python",
"clientVersion": "0.1.0",
"features": {"batching": False, "sse": True, "longPoll": True}
}
elif cmd["type"] == "create":
url = f"{server_url}{cmd['path']}"
stream = DurableStream.create(url, content_type=cmd.get("contentType"))
return {"type": "create", "success": True, "status": 201}
elif cmd["type"] == "append":
url = f"{server_url}{cmd['path']}"
stream = DurableStream(url)
stream.append(cmd["data"], seq=cmd.get("seq"))
return {"type": "append", "success": True, "status": 200}
# ... implement other commands
except DurableStreamError as e:
return {
"type": "error",
"success": False,
"commandType": cmd["type"],
"errorCode": map_error_code(e),
"message": str(e)
}
if __name__ == "__main__":
main()
Example: Go Adapter
package main
import (
"bufio"
"encoding/json"
"fmt"
"os"
durable "github.com/durable-streams/go-client"
)
type Command struct {
Type string `json:"type"`
ServerURL string `json:"serverUrl,omitempty"`
Path string `json:"path,omitempty"`
Data string `json:"data,omitempty"`
}
type Result struct {
Type string `json:"type"`
Success bool `json:"success"`
Status int `json:"status,omitempty"`
}
func main() {
scanner := bufio.NewScanner(os.Stdin)
var serverURL string
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
var cmd Command
json.Unmarshal([]byte(line), &cmd)
result := handleCommand(cmd, serverURL)
if cmd.Type == "init" {
serverURL = cmd.ServerURL
}
output, _ := json.Marshal(result)
fmt.Println(string(output))
if cmd.Type == "shutdown" {
break
}
}
}
func handleCommand(cmd Command, serverURL string) Result {
switch cmd.Type {
case "init":
return Result{
Type: "init",
Success: true,
}
case "create":
// Use your Go client SDK
return Result{Type: "create", Success: true, Status: 201}
// ... handle other commands
}
return Result{Type: "error", Success: false}
}
Test Coverage
Producer Tests (packages/client-conformance-tests/src/test-cases/producer/):
-
Stream Creation (
create-stream.yaml)
- Basic creation with default settings
- Custom content types
- Idempotent creation
- TTL handling
- Custom headers
- Binary streams
-
Append Operations (
append-data.yaml)
- String and binary data
- Unicode handling
- Large payloads
- Empty data
-
Sequence Ordering (
sequence-ordering.yaml)
- Monotonic sequence numbers
- Conflict detection
- Sequence validation
-
Batching (
batching.yaml)
- Concurrent appends
- Order preservation
- High-throughput scenarios
-
Idempotent Producers (
idempotent/)
- Producer ID and epoch handling
- Exactly-once semantics
- JSON batching with idempotency
-
Error Handling (
error-handling.yaml, error-context.yaml)
- 404 handling (stream not found)
- 409 handling (conflicts)
- Network errors
- Validation errors
Consumer Tests (packages/client-conformance-tests/src/test-cases/consumer/):
-
Catch-up Reads (
read-catchup.yaml, read-auto.yaml)
- Empty stream reads
- Full stream reads
- Offset resumption
- Pagination
-
Long-Poll (
read-longpoll.yaml)
- Waiting for new data
- Immediate returns when data exists
- Timeout handling
- Up-to-date detection
-
SSE Mode (
read-sse.yaml, read-sse-base64.yaml)
- Event streaming
- Reconnection handling
- Base64 encoding (for binary)
- Event parsing
-
Offset Handling (
offset-handling.yaml, offset-resumption.yaml)
- Monotonicity validation
- Byte-exactness
- Invalid offset errors
- Resume from specific offset
-
Message Ordering (
message-ordering.yaml)
- Order preservation
- Sequential delivery
- No duplicates
-
Fault Injection (
fault-injection.yaml)
- Network interruptions
- Server errors
- Retry behavior
-
Error Handling (
error-handling.yaml, json-parsing-errors.yaml, sse-parsing-errors.yaml)
- Deleted streams
- Malformed data
- Parsing errors
- Protocol violations
-
Cache Headers (
cache-headers.yaml)
- ETag support
- 304 responses
- Conditional requests
-
Streaming Equivalence (
streaming-equivalence.yaml)
- Long-poll vs SSE consistency
- Mode switching
- Same data guarantees
Lifecycle Tests (packages/client-conformance-tests/src/test-cases/lifecycle/):
- Complete workflows: create → append → read → delete
- Custom headers and authentication
- Metadata operations (HEAD)
- Stream lifecycle management
Programmatic Usage
import { runConformanceTests } from "@durable-streams/client-conformance-tests"
const summary = await runConformanceTests({
clientAdapter: "ts", // or path to your adapter
suites: ["producer", "consumer"],
verbose: true,
failFast: false,
timeout: 30000,
})
console.log(`Passed: ${summary.passed}/${summary.total}`)
console.log(`Failed: ${summary.failed}/${summary.total}`)
TypeScript Protocol Types
For TypeScript/JavaScript adapters, import protocol types:
import {
type TestCommand,
type TestResult,
parseCommand,
serializeResult,
ErrorCodes,
} from "@durable-streams/client-conformance-tests/protocol"
Best Practices
Test-Driven Development
When fixing bugs or adding features:
- Write a failing conformance test that demonstrates the issue
- Add the test to the appropriate YAML file in
test-cases/
- Fix the implementation until the test passes
- Verify all implementations pass the new test
This ensures:
- Regression protection across all clients
- Documentation of expected behavior
- Cross-language consistency
Running in CI
Always run conformance tests in your CI pipeline:
# Run server conformance tests
- name: Server Conformance
run: npx @durable-streams/server-conformance-tests --run http://localhost:4437
# Run client conformance tests for each implementation
- name: TypeScript Client
run: npx @durable-streams/client-conformance-tests --run ts
- name: Python Client
run: npx @durable-streams/client-conformance-tests --run ./client-py/adapter.py
- name: Go Client
run: npx @durable-streams/client-conformance-tests --run ./client-go/adapter
Debugging Failures
Use verbose mode to see detailed operation logs:
npx @durable-streams/client-conformance-tests --run ts --verbose
Use fail-fast to stop at the first failure:
npx @durable-streams/client-conformance-tests --run ts --fail-fast