Documentation Index
Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Swift Client
The official Swift client for Durable Streams protocol with async/await and AsyncSequence support.
Installation
Swift Package Manager
Add to your Package.swift:
dependencies: [
.package(url: "https://github.com/durable-streams/durable-streams.git", from: "0.1.0")
]
Or in Xcode: File → Add Packages → Enter repository URL
Quick Start
import DurableStreams
// Create a stream
let stream = try await DurableStream.create(
url: URL(string: "https://streams.example.com/my-stream")!,
contentType: "application/json"
)
// Append data
let json = try JSONEncoder().encode(["message": "hello"])
try await stream.appendSync(json)
// Read data as async sequence
for try await message in stream.messages(as: Event.self) {
print("Event: \(message.id)")
}
Core APIs
DurableStream
Actor-based stream handle (thread-safe by design).
Static Factory Methods
Create a new streamcontentType
String
default:"\"application/json\""
Content type
Absolute expiry time (ISO8601)
config
Configuration
default:".default"
Client configuration
Returns: DurableStreamThrows: DurableStreamError.conflictExists if stream already exists
Connect to an existing stream (validates via HEAD)config
Configuration
default:".default"
Client configuration
Returns: DurableStreamThrows: DurableStreamError.notFound if stream doesn’t exist
Example:
let stream = try await DurableStream.create(
url: URL(string: "https://streams.example.com/events")!,
contentType: "application/json",
ttlSeconds: 86400, // 24 hours
config: DurableStream.Configuration(
headers: ["Authorization": "Bearer token"],
timeout: 30
)
)
Write Operations
Append data to the stream (awaits server acknowledgment)Data to append (for JSON, encode first)
Writer coordination sequence
Returns: AppendResult with offset and isDuplicateThrows: DurableStreamError on failure
Example:
// JSON stream
struct Message: Codable {
let id: String
let text: String
}
let message = Message(id: "123", text: "Hello")
let json = try JSONEncoder().encode(message)
let result = try await stream.appendSync(json)
print("Offset: \(result.offset.rawValue)")
// String append
let result = try await stream.appendSync("raw text")
Close the stream permanently (no more appends)Returns: CloseResult with finalOffsetThrows: DurableStreamError.streamClosed if called with data on already-closed stream
Delete a streamconfig
Configuration
default:".default"
Client configuration
Throws: DurableStreamError.notFound if stream doesn’t exist
Read Operations (AsyncSequence)
Stream individual JSON messages as AsyncSequenceDecodable type for JSON items
decoder
JSONDecoder
default:"JSONDecoder()"
Custom JSON decoder
Returns: AsyncThrowingStream<T, Error>
Example:
struct Event: Codable {
let id: String
let type: String
let data: [String: String]
}
// Stream messages with automatic long-polling
for try await event in stream.messages(as: Event.self, from: lastOffset) {
print("Event \(event.id): \(event.type)")
// Process event
}
Stream JSON batches (for checkpointing)decoder
JSONDecoder
default:"JSONDecoder()"
JSON decoder
Returns: AsyncThrowingStream<JsonBatch<T>, Error>
Example:
for try await batch in stream.jsonBatches(as: Event.self) {
// Process batch atomically
try await database.transaction { tx in
for event in batch.items {
try await tx.insert(event)
}
try await tx.saveOffset(batch.offset)
}
}
Stream raw byte chunksReturns: AsyncThrowingStream<ByteChunk, Error>
Stream text chunksReturns: AsyncThrowingStream<TextChunk, Error>
One-Shot Reads
Read all data and decode as JSON arraydecoder
JSONDecoder
default:"JSONDecoder()"
JSON decoder
Returns: JsonBatch<T>
Read all raw bytesReturns: ByteResult with data and offset
Read all data as textReturns: TextResult with text and offset
Get stream metadata without establishing a handleconfig
Configuration
default:".default"
Configuration
Returns: StreamInfo with offset, contentType, streamClosed
IdempotentProducer
Exactly-once writes with automatic batching (coming soon in Swift client).
Types
Offset
public struct Offset: Codable, Hashable {
public let rawValue: String
public static let start = Offset(rawValue: "-1")
public init(rawValue: String) {
self.rawValue = rawValue
}
}
JsonBatch<T>
public struct JsonBatch<T: Decodable>: Sendable {
public let items: [T]
public let offset: Offset
public let upToDate: Bool
public let cursor: String?
}
ByteChunk / TextChunk
public struct ByteChunk: Sendable {
public let data: Data
public let offset: Offset
public let upToDate: Bool
public let cursor: String?
}
public struct TextChunk: Sendable {
public let text: String
public let offset: Offset
public let upToDate: Bool
public let cursor: String?
}
Configuration
public struct Configuration: Sendable {
public var headers: HeadersRecord
public var params: ParamsRecord
public var timeout: TimeInterval
public var longPollTimeout: TimeInterval
public var session: URLSession
public static let `default` = Configuration()
}
Error Handling
import DurableStreams
do {
try await stream.appendSync(data)
} catch let error as DurableStreamError {
switch error.code {
case .notFound:
print("Stream not found")
case .streamClosed:
print("Stream is closed")
case .conflictSeq:
print("Sequence conflict")
case .unauthorized:
print("Not authorized")
default:
print("Error: \(error.message)")
}
} catch {
print("Unexpected error: \(error)")
}
DurableStreamError.Code
public enum Code: String {
case notFound
case conflictExists
case conflictSeq
case streamClosed
case unauthorized
case forbidden
case rateLimited
case serverBusy
case timeout
case retentionExpired
case badRequest
case staleEpoch
case sequenceGap
case parseError
}
Advanced Features
Custom URLSession
let configuration = URLSessionConfiguration.default
configuration.timeoutIntervalForRequest = 30
configuration.httpMaximumConnectionsPerHost = 10
let session = URLSession(configuration: configuration)
let stream = try await DurableStream.connect(
url: streamURL,
config: DurableStream.Configuration(
session: session
)
)
var headers: HeadersRecord = [:]
headers["Authorization"] = { await authService.getToken() }
headers["X-Request-ID"] = { UUID().uuidString }
let stream = try await DurableStream.connect(
url: streamURL,
config: DurableStream.Configuration(headers: headers)
)
Task Cancellation
let task = Task {
for try await event in stream.messages(as: Event.self) {
if Task.isCancelled { break }
await process(event)
}
}
// Later: cancel the task
task.cancel()
SwiftUI Integration
struct EventsView: View {
@State private var events: [Event] = []
var body: some View {
List(events) { event in
EventRow(event: event)
}
.task {
await streamEvents()
}
}
func streamEvents() async {
let stream = try? await DurableStream.connect(
url: URL(string: "https://streams.example.com/events")!
)
guard let stream = stream else { return }
for try await event in stream.messages(as: Event.self) {
events.append(event)
}
}
}
- iOS 15.0+
- macOS 12.0+
- tvOS 15.0+
- watchOS 8.0+
- Linux (with Foundation)
Source Code
Source: packages/client-swift/Sources/DurableStreams/