Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Durable Streams provides two comprehensive conformance test suites to ensure protocol compliance across all implementations:
  • Server Conformance Tests: Verify that your server correctly implements the Durable Streams protocol
  • Client Conformance Tests: Validate client implementations across any programming language

Server Conformance Tests

The server conformance test suite verifies that your server implementation correctly handles all aspects of the Durable Streams protocol.

Installation

npm install @durable-streams/server-conformance-tests

Running Tests

CI Mode (Run Once)

Run tests once and exit - perfect for continuous integration:
npx @durable-streams/server-conformance-tests --run http://localhost:4437

Watch Mode (Development)

Automatically rerun tests when source files change:
# Watch a single directory
npx @durable-streams/server-conformance-tests --watch src http://localhost:4437

# Watch multiple directories
npx @durable-streams/server-conformance-tests --watch src lib http://localhost:4437

Programmatic Usage

Integrate conformance tests into your existing test suite:
import { runConformanceTests } from "@durable-streams/server-conformance-tests"

describe("My Server Implementation", () => {
  const config = { baseUrl: "" }

  beforeAll(async () => {
    // Start your server
    const server = await startMyServer({ port: 0 })
    config.baseUrl = server.url
  })

  afterAll(async () => {
    await server.stop()
  })

  // Run all conformance tests
  runConformanceTests(config)
})

CI Integration

Example GitHub Actions workflow:
jobs:
  conformance:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-node@v4
        with:
          node-version: "20"

      - name: Install dependencies
        run: npm install

      - name: Start server
        run: npm run start:server &

      - name: Wait for server
        run: npx wait-on http://localhost:4437

      - name: Run conformance tests
        run: npx @durable-streams/server-conformance-tests --run http://localhost:4437

Test Coverage

The server conformance suite validates: Core Operations
  • Stream creation and deletion
  • Idempotent operations
  • Append operations (string, binary, chunking)
  • Read operations (empty/full streams, offset handling)
  • HEAD metadata requests
Live Streaming
  • Long-poll operations (waiting, immediate returns)
  • SSE (Server-Sent Events) mode
  • JSON mode with array flattening
Protocol Compliance
  • HTTP headers and status codes
  • Content-Type validation and enforcement
  • Cache headers (ETag, 304 responses)
  • TTL and Expires-At handling
  • Offset validation and monotonicity
Data Integrity
  • Byte-exactness guarantees
  • Read-your-writes consistency
  • Message ordering preservation
  • Sequence ordering validation
Edge Cases & Robustness
  • Empty request bodies
  • Large payloads and chunking
  • Binary data handling
  • Malformed input fuzzing
  • Property-based testing with random sequences

Client Conformance Tests

The client conformance test suite uses a language-agnostic architecture to validate client implementations in any programming language.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Test Runner (Node.js)                        │
│  - Reads test cases from YAML                                  │
│  - Manages reference server lifecycle                          │
│  - Orchestrates client adapter process                         │
│  - Compares results against expectations                       │
└────────────────────────┬────────────────────────────────────────┘
                         │ stdin/stdout (JSON lines)

┌─────────────────────────────────────────────────────────────────┐
│              Client Adapter (any language)                      │
│  - Reads test commands from stdin                              │
│  - Uses native SDK to execute operations                       │
│  - Reports results to stdout                                   │
└─────────────────────────────────────────────────────────────────┘
                         │ HTTP

┌─────────────────────────────────────────────────────────────────┐
│              Reference Server (TypeScript)                      │
│  - Full protocol compliance                                    │
│  - Validates client behavior                                   │
└─────────────────────────────────────────────────────────────────┘

Installation

npm install @durable-streams/client-conformance-tests

Running Tests

Test Built-in TypeScript Client

npx @durable-streams/client-conformance-tests --run ts

Test Custom Client Adapter

# Python client
npx @durable-streams/client-conformance-tests --run ./my-python-adapter.py

# Go client
npx @durable-streams/client-conformance-tests --run ./my-go-adapter

# Any executable
npx @durable-streams/client-conformance-tests --run /path/to/adapter

Test Specific Suites

# Test only producer functionality
npx @durable-streams/client-conformance-tests --run ts --suite producer

# Test only consumer functionality
npx @durable-streams/client-conformance-tests --run ts --suite consumer

# Test specific tags
npx @durable-streams/client-conformance-tests --run ts --tag core

Advanced Options

# Verbose output and fail-fast
npx @durable-streams/client-conformance-tests --run ts --verbose --fail-fast

# Custom timeout
npx @durable-streams/client-conformance-tests --run ts --timeout 60000

# Custom server port
npx @durable-streams/client-conformance-tests --run ts --port 8080

Implementing a Client Adapter

A client adapter is an executable that communicates via stdin/stdout using JSON-line protocol.

Protocol Overview

  1. Test runner starts your adapter as a subprocess
  2. Runner sends JSON commands to stdin (one per line)
  3. Adapter executes commands using your client SDK
  4. Adapter sends JSON results to stdout (one per line)

Command Types

Init Command (first command, always sent):
// stdin
{"type":"init","serverUrl":"http://localhost:3000"}

// stdout
{"type":"init","success":true,"clientName":"my-client","clientVersion":"1.0.0","features":{"batching":true,"sse":true,"longPoll":true}}
Create Command:
// stdin
{"type":"create","path":"/my-stream","contentType":"text/plain"}

// stdout (success)
{"type":"create","success":true,"status":201,"offset":"0"}

// stdout (error)
{"type":"error","success":false,"commandType":"create","status":409,"errorCode":"CONFLICT","message":"Stream already exists"}
Append Command:
// stdin
{"type":"append","path":"/my-stream","data":"Hello, World!","seq":1}

// stdout
{"type":"append","success":true,"status":200,"offset":"13"}
Read Command:
// stdin
{"type":"read","path":"/my-stream","offset":"0","live":"long-poll","timeoutMs":5000}

// stdout
{"type":"read","success":true,"status":200,"chunks":[{"data":"Hello, World!","offset":"13"}],"offset":"13","upToDate":true}
Head Command:
// stdin
{"type":"head","path":"/my-stream"}

// stdout
{"type":"head","success":true,"status":200,"offset":"13","contentType":"text/plain"}
Delete Command:
// stdin
{"type":"delete","path":"/my-stream"}

// stdout
{"type":"delete","success":true,"status":200}
Shutdown Command:
// stdin
{"type":"shutdown"}

// stdout
{"type":"shutdown","success":true}

Standard Error Codes

  • NETWORK_ERROR - Network connection failed
  • TIMEOUT - Operation timed out
  • CONFLICT - Stream already exists (409)
  • NOT_FOUND - Stream not found (404)
  • SEQUENCE_CONFLICT - Sequence number conflict (409)
  • INVALID_OFFSET - Invalid offset format
  • UNEXPECTED_STATUS - Unexpected HTTP status
  • PARSE_ERROR - Failed to parse response
  • INTERNAL_ERROR - Client internal error
  • NOT_SUPPORTED - Operation not supported

Example: Python Adapter

#!/usr/bin/env python3
import sys
import json
from durable_streams import DurableStream, DurableStreamError

def main():
    server_url = ""

    for line in sys.stdin:
        if not line.strip():
            continue

        command = json.loads(line)
        result = handle_command(command, server_url)

        if command["type"] == "init":
            server_url = command["serverUrl"]

        print(json.dumps(result), flush=True)

        if command["type"] == "shutdown":
            break

def handle_command(cmd, server_url):
    try:
        if cmd["type"] == "init":
            return {
                "type": "init",
                "success": True,
                "clientName": "durable-streams-python",
                "clientVersion": "0.1.0",
                "features": {"batching": False, "sse": True, "longPoll": True}
            }

        elif cmd["type"] == "create":
            url = f"{server_url}{cmd['path']}"
            stream = DurableStream.create(url, content_type=cmd.get("contentType"))
            return {"type": "create", "success": True, "status": 201}

        elif cmd["type"] == "append":
            url = f"{server_url}{cmd['path']}"
            stream = DurableStream(url)
            stream.append(cmd["data"], seq=cmd.get("seq"))
            return {"type": "append", "success": True, "status": 200}

        # ... implement other commands

    except DurableStreamError as e:
        return {
            "type": "error",
            "success": False,
            "commandType": cmd["type"],
            "errorCode": map_error_code(e),
            "message": str(e)
        }

if __name__ == "__main__":
    main()

Example: Go Adapter

package main

import (
    "bufio"
    "encoding/json"
    "fmt"
    "os"

    durable "github.com/durable-streams/go-client"
)

type Command struct {
    Type      string `json:"type"`
    ServerURL string `json:"serverUrl,omitempty"`
    Path      string `json:"path,omitempty"`
    Data      string `json:"data,omitempty"`
}

type Result struct {
    Type    string `json:"type"`
    Success bool   `json:"success"`
    Status  int    `json:"status,omitempty"`
}

func main() {
    scanner := bufio.NewScanner(os.Stdin)
    var serverURL string

    for scanner.Scan() {
        line := scanner.Text()
        if line == "" {
            continue
        }

        var cmd Command
        json.Unmarshal([]byte(line), &cmd)

        result := handleCommand(cmd, serverURL)

        if cmd.Type == "init" {
            serverURL = cmd.ServerURL
        }

        output, _ := json.Marshal(result)
        fmt.Println(string(output))

        if cmd.Type == "shutdown" {
            break
        }
    }
}

func handleCommand(cmd Command, serverURL string) Result {
    switch cmd.Type {
    case "init":
        return Result{
            Type:    "init",
            Success: true,
        }
    case "create":
        // Use your Go client SDK
        return Result{Type: "create", Success: true, Status: 201}
    // ... handle other commands
    }
    return Result{Type: "error", Success: false}
}

Test Coverage

Producer Tests (packages/client-conformance-tests/src/test-cases/producer/):
  • Stream Creation (create-stream.yaml)
    • Basic creation with default settings
    • Custom content types
    • Idempotent creation
    • TTL handling
    • Custom headers
    • Binary streams
  • Append Operations (append-data.yaml)
    • String and binary data
    • Unicode handling
    • Large payloads
    • Empty data
  • Sequence Ordering (sequence-ordering.yaml)
    • Monotonic sequence numbers
    • Conflict detection
    • Sequence validation
  • Batching (batching.yaml)
    • Concurrent appends
    • Order preservation
    • High-throughput scenarios
  • Idempotent Producers (idempotent/)
    • Producer ID and epoch handling
    • Exactly-once semantics
    • JSON batching with idempotency
  • Error Handling (error-handling.yaml, error-context.yaml)
    • 404 handling (stream not found)
    • 409 handling (conflicts)
    • Network errors
    • Validation errors
Consumer Tests (packages/client-conformance-tests/src/test-cases/consumer/):
  • Catch-up Reads (read-catchup.yaml, read-auto.yaml)
    • Empty stream reads
    • Full stream reads
    • Offset resumption
    • Pagination
  • Long-Poll (read-longpoll.yaml)
    • Waiting for new data
    • Immediate returns when data exists
    • Timeout handling
    • Up-to-date detection
  • SSE Mode (read-sse.yaml, read-sse-base64.yaml)
    • Event streaming
    • Reconnection handling
    • Base64 encoding (for binary)
    • Event parsing
  • Offset Handling (offset-handling.yaml, offset-resumption.yaml)
    • Monotonicity validation
    • Byte-exactness
    • Invalid offset errors
    • Resume from specific offset
  • Message Ordering (message-ordering.yaml)
    • Order preservation
    • Sequential delivery
    • No duplicates
  • Fault Injection (fault-injection.yaml)
    • Network interruptions
    • Server errors
    • Retry behavior
  • Error Handling (error-handling.yaml, json-parsing-errors.yaml, sse-parsing-errors.yaml)
    • Deleted streams
    • Malformed data
    • Parsing errors
    • Protocol violations
  • Cache Headers (cache-headers.yaml)
    • ETag support
    • 304 responses
    • Conditional requests
  • Streaming Equivalence (streaming-equivalence.yaml)
    • Long-poll vs SSE consistency
    • Mode switching
    • Same data guarantees
Lifecycle Tests (packages/client-conformance-tests/src/test-cases/lifecycle/):
  • Complete workflows: create → append → read → delete
  • Custom headers and authentication
  • Metadata operations (HEAD)
  • Stream lifecycle management

Programmatic Usage

import { runConformanceTests } from "@durable-streams/client-conformance-tests"

const summary = await runConformanceTests({
  clientAdapter: "ts", // or path to your adapter
  suites: ["producer", "consumer"],
  verbose: true,
  failFast: false,
  timeout: 30000,
})

console.log(`Passed: ${summary.passed}/${summary.total}`)
console.log(`Failed: ${summary.failed}/${summary.total}`)

TypeScript Protocol Types

For TypeScript/JavaScript adapters, import protocol types:
import {
  type TestCommand,
  type TestResult,
  parseCommand,
  serializeResult,
  ErrorCodes,
} from "@durable-streams/client-conformance-tests/protocol"

Best Practices

Test-Driven Development

When fixing bugs or adding features:
  1. Write a failing conformance test that demonstrates the issue
  2. Add the test to the appropriate YAML file in test-cases/
  3. Fix the implementation until the test passes
  4. Verify all implementations pass the new test
This ensures:
  • Regression protection across all clients
  • Documentation of expected behavior
  • Cross-language consistency

Running in CI

Always run conformance tests in your CI pipeline:
# Run server conformance tests
- name: Server Conformance
  run: npx @durable-streams/server-conformance-tests --run http://localhost:4437

# Run client conformance tests for each implementation
- name: TypeScript Client
  run: npx @durable-streams/client-conformance-tests --run ts

- name: Python Client
  run: npx @durable-streams/client-conformance-tests --run ./client-py/adapter.py

- name: Go Client
  run: npx @durable-streams/client-conformance-tests --run ./client-go/adapter

Debugging Failures

Use verbose mode to see detailed operation logs:
npx @durable-streams/client-conformance-tests --run ts --verbose
Use fail-fast to stop at the first failure:
npx @durable-streams/client-conformance-tests --run ts --fail-fast