Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Swift Client

The official Swift client for Durable Streams protocol with async/await and AsyncSequence support.

Installation

Swift Package Manager

Add to your Package.swift:
dependencies: [
    .package(url: "https://github.com/durable-streams/durable-streams.git", from: "0.1.0")
]
Or in Xcode: File → Add Packages → Enter repository URL

Quick Start

import DurableStreams

// Create a stream
let stream = try await DurableStream.create(
    url: URL(string: "https://streams.example.com/my-stream")!,
    contentType: "application/json"
)

// Append data
let json = try JSONEncoder().encode(["message": "hello"])
try await stream.appendSync(json)

// Read data as async sequence
for try await message in stream.messages(as: Event.self) {
    print("Event: \(message.id)")
}

Core APIs

DurableStream

Actor-based stream handle (thread-safe by design).

Static Factory Methods

DurableStream.create()
static async
Create a new stream
url
URL
required
Stream URL
contentType
String
default:"\"application/json\""
Content type
ttlSeconds
Int?
Time-to-live in seconds
expiresAt
String?
Absolute expiry time (ISO8601)
closed
Bool
default:"false"
Create in closed state
config
Configuration
default:".default"
Client configuration
Returns: DurableStreamThrows: DurableStreamError.conflictExists if stream already exists
DurableStream.connect()
static async
Connect to an existing stream (validates via HEAD)
url
URL
required
Stream URL
config
Configuration
default:".default"
Client configuration
Returns: DurableStreamThrows: DurableStreamError.notFound if stream doesn’t exist
Example:
let stream = try await DurableStream.create(
    url: URL(string: "https://streams.example.com/events")!,
    contentType: "application/json",
    ttlSeconds: 86400,  // 24 hours
    config: DurableStream.Configuration(
        headers: ["Authorization": "Bearer token"],
        timeout: 30
    )
)

Write Operations

appendSync()
async
Append data to the stream (awaits server acknowledgment)
data
Data
required
Data to append (for JSON, encode first)
contentType
String?
Content type override
seq
Int?
Writer coordination sequence
Returns: AppendResult with offset and isDuplicateThrows: DurableStreamError on failure
Example:
// JSON stream
struct Message: Codable {
    let id: String
    let text: String
}

let message = Message(id: "123", text: "Hello")
let json = try JSONEncoder().encode(message)
let result = try await stream.appendSync(json)
print("Offset: \(result.offset.rawValue)")

// String append
let result = try await stream.appendSync("raw text")
close()
async
Close the stream permanently (no more appends)
data
Data?
Optional final message
contentType
String?
Content type override
Returns: CloseResult with finalOffsetThrows: DurableStreamError.streamClosed if called with data on already-closed stream
DurableStream.delete()
static async
Delete a stream
url
URL
required
Stream URL
config
Configuration
default:".default"
Client configuration
Throws: DurableStreamError.notFound if stream doesn’t exist

Read Operations (AsyncSequence)

messages()
method
Stream individual JSON messages as AsyncSequence
as
T.Type
required
Decodable type for JSON items
from
Offset
default:".start"
Starting offset
decoder
JSONDecoder
default:"JSONDecoder()"
Custom JSON decoder
Returns: AsyncThrowingStream<T, Error>
Example:
struct Event: Codable {
    let id: String
    let type: String
    let data: [String: String]
}

// Stream messages with automatic long-polling
for try await event in stream.messages(as: Event.self, from: lastOffset) {
    print("Event \(event.id): \(event.type)")
    // Process event
}
jsonBatches()
method
Stream JSON batches (for checkpointing)
as
T.Type
required
Decodable type
from
Offset
default:".start"
Starting offset
decoder
JSONDecoder
default:"JSONDecoder()"
JSON decoder
Returns: AsyncThrowingStream<JsonBatch<T>, Error>
Example:
for try await batch in stream.jsonBatches(as: Event.self) {
    // Process batch atomically
    try await database.transaction { tx in
        for event in batch.items {
            try await tx.insert(event)
        }
        try await tx.saveOffset(batch.offset)
    }
}
byteChunks()
method
Stream raw byte chunks
from
Offset
default:".start"
Starting offset
Returns: AsyncThrowingStream<ByteChunk, Error>
textChunks()
method
Stream text chunks
from
Offset
default:".start"
Starting offset
Returns: AsyncThrowingStream<TextChunk, Error>

One-Shot Reads

readJSON()
async
Read all data and decode as JSON array
as
T.Type
required
Decodable type
offset
Offset
default:".start"
Starting offset
decoder
JSONDecoder
default:"JSONDecoder()"
JSON decoder
Returns: JsonBatch<T>
readBytes()
async
Read all raw bytes
offset
Offset
default:".start"
Starting offset
Returns: ByteResult with data and offset
readText()
async
Read all data as text
offset
Offset
default:".start"
Starting offset
Returns: TextResult with text and offset

Metadata

DurableStream.head()
static async
Get stream metadata without establishing a handle
url
URL
required
Stream URL
config
Configuration
default:".default"
Configuration
Returns: StreamInfo with offset, contentType, streamClosed

IdempotentProducer

Exactly-once writes with automatic batching (coming soon in Swift client).

Types

Offset

public struct Offset: Codable, Hashable {
    public let rawValue: String

    public static let start = Offset(rawValue: "-1")

    public init(rawValue: String) {
        self.rawValue = rawValue
    }
}

JsonBatch<T>

public struct JsonBatch<T: Decodable>: Sendable {
    public let items: [T]
    public let offset: Offset
    public let upToDate: Bool
    public let cursor: String?
}

ByteChunk / TextChunk

public struct ByteChunk: Sendable {
    public let data: Data
    public let offset: Offset
    public let upToDate: Bool
    public let cursor: String?
}

public struct TextChunk: Sendable {
    public let text: String
    public let offset: Offset
    public let upToDate: Bool
    public let cursor: String?
}

Configuration

public struct Configuration: Sendable {
    public var headers: HeadersRecord
    public var params: ParamsRecord
    public var timeout: TimeInterval
    public var longPollTimeout: TimeInterval
    public var session: URLSession

    public static let `default` = Configuration()
}

Error Handling

import DurableStreams

do {
    try await stream.appendSync(data)
} catch let error as DurableStreamError {
    switch error.code {
    case .notFound:
        print("Stream not found")
    case .streamClosed:
        print("Stream is closed")
    case .conflictSeq:
        print("Sequence conflict")
    case .unauthorized:
        print("Not authorized")
    default:
        print("Error: \(error.message)")
    }
} catch {
    print("Unexpected error: \(error)")
}

DurableStreamError.Code

public enum Code: String {
    case notFound
    case conflictExists
    case conflictSeq
    case streamClosed
    case unauthorized
    case forbidden
    case rateLimited
    case serverBusy
    case timeout
    case retentionExpired
    case badRequest
    case staleEpoch
    case sequenceGap
    case parseError
}

Advanced Features

Custom URLSession

let configuration = URLSessionConfiguration.default
configuration.timeoutIntervalForRequest = 30
configuration.httpMaximumConnectionsPerHost = 10

let session = URLSession(configuration: configuration)

let stream = try await DurableStream.connect(
    url: streamURL,
    config: DurableStream.Configuration(
        session: session
    )
)

Dynamic Headers

var headers: HeadersRecord = [:]
headers["Authorization"] = { await authService.getToken() }
headers["X-Request-ID"] = { UUID().uuidString }

let stream = try await DurableStream.connect(
    url: streamURL,
    config: DurableStream.Configuration(headers: headers)
)

Task Cancellation

let task = Task {
    for try await event in stream.messages(as: Event.self) {
        if Task.isCancelled { break }
        await process(event)
    }
}

// Later: cancel the task
task.cancel()

SwiftUI Integration

struct EventsView: View {
    @State private var events: [Event] = []

    var body: some View {
        List(events) { event in
            EventRow(event: event)
        }
        .task {
            await streamEvents()
        }
    }

    func streamEvents() async {
        let stream = try? await DurableStream.connect(
            url: URL(string: "https://streams.example.com/events")!
        )

        guard let stream = stream else { return }

        for try await event in stream.messages(as: Event.self) {
            events.append(event)
        }
    }
}

Platform Support

  • iOS 15.0+
  • macOS 12.0+
  • tvOS 15.0+
  • watchOS 8.0+
  • Linux (with Foundation)

Source Code

Source: packages/client-swift/Sources/DurableStreams/