Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt

Use this file to discover all available pages before exploring further.

Durable Streams provides the synchronization layer for building collaborative editing experiences similar to Google Docs, Figma, or Notion. Stream CRDTs, operational transforms, or presence updates to clients with resume capability and conflict-free merging.

Why Durable Streams for collaboration

Collaborative editing requires:
  • Real-time sync - Changes appear instantly for all participants
  • Offline resilience - Users can work offline and sync when reconnected
  • Conflict resolution - Multiple simultaneous edits merge correctly
  • History replay - New joiners catch up on all prior changes
  • Presence awareness - Track who’s viewing and editing
Durable Streams handles the synchronization layer while you focus on your CRDT or OT implementation.

Architecture overview

1

Choose your conflict resolution strategy

Durable Streams works with any CRDT library or operational transform approach:
  • CRDTs - Yjs, Automerge, etc.
  • Operational Transforms - ShareDB, OT.js
  • Custom - Your own conflict resolution logic
2

Stream updates through durable streams

Append updates to a durable stream. All clients read from the stream to stay synchronized.
3

Clients replay and live tail

New clients replay history to build current state, then subscribe to live updates.

Yjs integration

The @durable-streams/y-durable-streams package provides first-class Yjs integration:
import * as Y from "yjs"
import { DurableStreamProvider } from "@durable-streams/y-durable-streams"

// Create a Yjs document
const doc = new Y.Doc()

// Connect to durable stream
const provider = new DurableStreamProvider(
  `https://your-server.com/v1/stream/doc/${documentId}`,
  doc,
  {
    // Optional: provide auth headers
    headers: {
      Authorization: `Bearer ${token}`,
    },
  }
)

// Use Yjs data types as normal
const yText = doc.getText("content")
yText.insert(0, "Hello, collaborative world!")

// Changes are automatically synced through the durable stream

How it works

Write path

Local edits generate Yjs update messages. The provider batches and appends them to the durable stream using IdempotentProducer.

Read path

Provider reads all updates from the stream (catching up from offset -1), applies them to the Yjs doc, then subscribes to live updates.

Offline mode

Yjs continues working offline. When reconnected, the provider catches up from its last known offset and syncs changes.

Multi-tab

Multiple tabs can share the same document. Each tab syncs independently through the stream.

Complete Yjs example

Here’s a collaborative text editor:
import * as Y from "yjs"
import { DurableStreamProvider } from "@durable-streams/y-durable-streams"
import { useEffect, useState } from "react"

function CollaborativeEditor({ documentId, userId }) {
  const [doc] = useState(() => new Y.Doc())
  const [text, setText] = useState("")
  const [synced, setSynced] = useState(false)

  useEffect(() => {
    // Create provider
    const provider = new DurableStreamProvider(
      `https://your-server.com/v1/stream/doc/${documentId}`,
      doc,
      {
        headers: {
          Authorization: `Bearer ${getAuthToken()}`,
        },
      }
    )

    // Wait for initial sync
    provider.on("synced", () => {
      setSynced(true)
    })

    // Get Yjs text type
    const yText = doc.getText("content")

    // Subscribe to changes
    const observer = () => {
      setText(yText.toString())
    }
    yText.observe(observer)
    setText(yText.toString())

    return () => {
      yText.unobserve(observer)
      provider.destroy()
    }
  }, [documentId, doc])

  const handleChange = (newText: string) => {
    const yText = doc.getText("content")
    doc.transact(() => {
      yText.delete(0, yText.length)
      yText.insert(0, newText)
    })
  }

  if (!synced) {
    return <div>Loading document...</div>
  }

  return (
    <textarea
      value={text}
      onChange={(e) => handleChange(e.target.value)}
      placeholder="Start typing..."
    />
  )
}

Custom CRDT implementation

If you’re building your own CRDT or using a different library:
import { DurableStream, IdempotentProducer } from "@durable-streams/client"

const docStream = await DurableStream.create({
  url: `https://your-server.com/v1/stream/doc/${documentId}`,
  contentType: "application/json",
})

const producer = new IdempotentProducer(docStream, `client-${clientId}`, {
  epoch: Date.now(), // Use timestamp as epoch for client sessions
  autoClaim: true,
})

// Append operations as they happen
function applyLocalOp(op: Operation) {
  // Apply to local CRDT state
  crdt.apply(op)
  
  // Broadcast to stream
  producer.append(JSON.stringify({
    type: "operation",
    clientId,
    operation: op,
    timestamp: Date.now(),
  }))
}

await producer.flush()

Presence and awareness

Track who’s currently viewing or editing:
import { DurableStream, IdempotentProducer } from "@durable-streams/client"

const presenceStream = await DurableStream.create({
  url: `https://your-server.com/v1/stream/doc/${documentId}/presence`,
  contentType: "application/json",
})

const producer = new IdempotentProducer(
  presenceStream,
  `presence-${userId}`,
  { autoClaim: true }
)

// Announce presence
producer.append(JSON.stringify({
  type: "join",
  userId,
  userName: "Alice",
  color: "#3b82f6",
  timestamp: Date.now(),
}))

// Send cursor updates
function updateCursor(position: { x: number; y: number }) {
  producer.append(JSON.stringify({
    type: "cursor",
    userId,
    position,
    timestamp: Date.now(),
  }))
}

// Send typing indicator
function updateTyping(isTyping: boolean) {
  producer.append(JSON.stringify({
    type: "typing",
    userId,
    isTyping,
    timestamp: Date.now(),
  }))
}

// Cleanup on disconnect
window.addEventListener("beforeunload", async () => {
  await producer.close(JSON.stringify({
    type: "leave",
    userId,
    timestamp: Date.now(),
  }))
})
Receive presence updates:
const res = await stream({
  url: `https://your-server.com/v1/stream/doc/${documentId}/presence`,
  offset: "now", // Only live updates, skip history
  live: true,
})

const activeUsers = new Map<string, User>()

res.subscribeJson(async (batch) => {
  for (const event of batch.items) {
    switch (event.type) {
      case "join":
        activeUsers.set(event.userId, {
          name: event.userName,
          color: event.color,
        })
        break
      
      case "leave":
        activeUsers.delete(event.userId)
        break
      
      case "cursor":
        updateUserCursor(event.userId, event.position)
        break
      
      case "typing":
        updateTypingIndicator(event.userId, event.isTyping)
        break
    }
  }
  
  renderActiveUsers(Array.from(activeUsers.values()))
})

Conflict-free edits with operational transforms

For OT-based systems:
import { DurableStream } from "@durable-streams/client"
import { type, transformX } from "ot-text" // Example OT library

interface Operation {
  clientId: string
  revision: number
  ops: any[]
}

let localRevision = 0
let pendingOps: Operation[] = []

// Apply local operation
function applyLocalOp(ops: any[]) {
  const op: Operation = {
    clientId,
    revision: localRevision++,
    ops,
  }
  
  // Apply locally
  document = type.apply(document, ops)
  
  // Send to stream
  stream.append(op)
  
  // Track as pending
  pendingOps.push(op)
}

// Receive remote operations
res.subscribeJson(async (batch) => {
  for (const remoteOp of batch.items) {
    if (remoteOp.clientId === clientId) {
      // Our own operation confirmed - remove from pending
      pendingOps = pendingOps.filter(op => op.revision !== remoteOp.revision)
      continue
    }
    
    // Transform remote operation against pending local operations
    let transformedOp = remoteOp.ops
    for (const pendingOp of pendingOps) {
      [transformedOp, ] = transformX(transformedOp, pendingOp.ops)
    }
    
    // Apply transformed operation
    document = type.apply(document, transformedOp)
    updateUI(document)
  }
})

Multiplayer cursors and selections

Show where other users are editing in real-time:
interface Selection {
  userId: string
  userName: string
  color: string
  start: number
  end: number
}

const selections = new Map<string, Selection>()

// Send selection changes
function updateSelection(start: number, end: number) {
  producer.append(JSON.stringify({
    type: "selection",
    userId,
    userName,
    color: userColor,
    start,
    end,
    timestamp: Date.now(),
  }))
}

// Receive selection updates
res.subscribeJson(async (batch) => {
  for (const event of batch.items) {
    if (event.type === "selection" && event.userId !== userId) {
      selections.set(event.userId, {
        userId: event.userId,
        userName: event.userName,
        color: event.color,
        start: event.start,
        end: event.end,
      })
      
      renderSelections(Array.from(selections.values()))
    }
  }
})

Handling disconnections

Durable Streams automatically handles reconnection. Your CRDT state remains consistent.
const res = await stream({
  url: docStreamUrl,
  offset: lastSeenOffset,
  live: true,
  onError: async (error) => {
    // Connection lost - will automatically retry with backoff
    showDisconnectedBanner()
    
    // Optionally refresh auth on 401
    if (error instanceof FetchError && error.status === 401) {
      const newToken = await refreshAuthToken()
      hideDisconnectedBanner()
      return { headers: { Authorization: `Bearer ${newToken}` } }
    }
  },
})

// When reconnected, the stream automatically catches up from saved offset
res.subscribeJson(async (batch) => {
  if (batch.items.length > 0) {
    hideDisconnectedBanner() // We're back online and caught up
  }
  // ... process updates
})

Performance optimization

Batching updates

const producer = new IdempotentProducer(stream, clientId, {
  lingerMs: 50,           // Batch updates for 50ms
  maxBatchBytes: 65536,   // Or until 64KB
  maxInFlight: 3,         // Pipeline up to 3 batches
})

// Rapid edits are automatically batched
for (let i = 0; i < 100; i++) {
  producer.append(JSON.stringify({ type: "insert", char: "a" }))
}
// Only a few HTTP requests sent instead of 100

Throttling presence updates

import { throttle } from "lodash"

const sendCursorUpdate = throttle((position) => {
  producer.append(JSON.stringify({
    type: "cursor",
    userId,
    position,
  }))
}, 100) // Max one update per 100ms

document.addEventListener("mousemove", (e) => {
  sendCursorUpdate({ x: e.clientX, y: e.clientY })
})

Authorization

Implement document-level authorization by validating access tokens on stream creation and reads.
// Server-side middleware
app.put("/v1/stream/doc/:docId", async (req, res) => {
  const { docId } = req.params
  const token = req.headers.authorization
  
  // Validate user has write access to this document
  const canWrite = await checkPermission(token, docId, "write")
  if (!canWrite) {
    return res.status(403).json({ error: "Forbidden" })
  }
  
  // Create stream
  // ...
})

app.get("/v1/stream/doc/:docId", async (req, res) => {
  const { docId } = req.params
  const token = req.headers.authorization
  
  // Validate user has read access
  const canRead = await checkPermission(token, docId, "read")
  if (!canRead) {
    return res.status(403).json({ error: "Forbidden" })
  }
  
  // Read from stream
  // ...
})

Production considerations

Scale

CDN caching means thousands of viewers can watch the same document with minimal origin load.

Latency

Sub-15ms delivery in production. Use SSE mode for lowest latency on capable clients.

Offline-first

CRDTs continue working offline. Durable Streams syncs changes when reconnected.

History

Full edit history is preserved in the stream. Build version control or time-travel debugging.

Next steps

Yjs Integration

Complete guide to using Yjs with Durable Streams

Idempotent Producer

Learn about exactly-once delivery for conflict-free syncing

State Protocol

Build materialized views and projections from streams

Live Modes

Optimize real-time performance with SSE vs long-poll