Documentation Index Fetch the complete documentation index at: https://mintlify.com/durable-streams/durable-streams/llms.txt
Use this file to discover all available pages before exploring further.
Durable Streams provides the synchronization layer for building collaborative editing experiences similar to Google Docs, Figma, or Notion. Stream CRDTs, operational transforms, or presence updates to clients with resume capability and conflict-free merging.
Why Durable Streams for collaboration
Collaborative editing requires:
Real-time sync - Changes appear instantly for all participants
Offline resilience - Users can work offline and sync when reconnected
Conflict resolution - Multiple simultaneous edits merge correctly
History replay - New joiners catch up on all prior changes
Presence awareness - Track who’s viewing and editing
Durable Streams handles the synchronization layer while you focus on your CRDT or OT implementation.
Architecture overview
Choose your conflict resolution strategy
Durable Streams works with any CRDT library or operational transform approach:
CRDTs - Yjs, Automerge, etc.
Operational Transforms - ShareDB, OT.js
Custom - Your own conflict resolution logic
Stream updates through durable streams
Append updates to a durable stream. All clients read from the stream to stay synchronized.
Clients replay and live tail
New clients replay history to build current state, then subscribe to live updates.
Yjs integration
The @durable-streams/y-durable-streams package provides first-class Yjs integration:
import * as Y from "yjs"
import { DurableStreamProvider } from "@durable-streams/y-durable-streams"
// Create a Yjs document
const doc = new Y . Doc ()
// Connect to durable stream
const provider = new DurableStreamProvider (
`https://your-server.com/v1/stream/doc/ ${ documentId } ` ,
doc ,
{
// Optional: provide auth headers
headers: {
Authorization: `Bearer ${ token } ` ,
},
}
)
// Use Yjs data types as normal
const yText = doc . getText ( "content" )
yText . insert ( 0 , "Hello, collaborative world!" )
// Changes are automatically synced through the durable stream
How it works
Write path Local edits generate Yjs update messages. The provider batches and appends them to the durable stream using IdempotentProducer.
Read path Provider reads all updates from the stream (catching up from offset -1), applies them to the Yjs doc, then subscribes to live updates.
Offline mode Yjs continues working offline. When reconnected, the provider catches up from its last known offset and syncs changes.
Multi-tab Multiple tabs can share the same document. Each tab syncs independently through the stream.
Complete Yjs example
Here’s a collaborative text editor:
import * as Y from "yjs"
import { DurableStreamProvider } from "@durable-streams/y-durable-streams"
import { useEffect , useState } from "react"
function CollaborativeEditor ({ documentId , userId }) {
const [ doc ] = useState (() => new Y . Doc ())
const [ text , setText ] = useState ( "" )
const [ synced , setSynced ] = useState ( false )
useEffect (() => {
// Create provider
const provider = new DurableStreamProvider (
`https://your-server.com/v1/stream/doc/ ${ documentId } ` ,
doc ,
{
headers: {
Authorization: `Bearer ${ getAuthToken () } ` ,
},
}
)
// Wait for initial sync
provider . on ( "synced" , () => {
setSynced ( true )
})
// Get Yjs text type
const yText = doc . getText ( "content" )
// Subscribe to changes
const observer = () => {
setText ( yText . toString ())
}
yText . observe ( observer )
setText ( yText . toString ())
return () => {
yText . unobserve ( observer )
provider . destroy ()
}
}, [ documentId , doc ])
const handleChange = ( newText : string ) => {
const yText = doc . getText ( "content" )
doc . transact (() => {
yText . delete ( 0 , yText . length )
yText . insert ( 0 , newText )
})
}
if ( ! synced ) {
return < div > Loading document ...</ div >
}
return (
< textarea
value = { text }
onChange = {(e) => handleChange (e.target.value)}
placeholder = "Start typing..."
/>
)
}
Custom CRDT implementation
If you’re building your own CRDT or using a different library:
import { DurableStream , IdempotentProducer } from "@durable-streams/client"
const docStream = await DurableStream . create ({
url: `https://your-server.com/v1/stream/doc/ ${ documentId } ` ,
contentType: "application/json" ,
})
const producer = new IdempotentProducer ( docStream , `client- ${ clientId } ` , {
epoch: Date . now (), // Use timestamp as epoch for client sessions
autoClaim: true ,
})
// Append operations as they happen
function applyLocalOp ( op : Operation ) {
// Apply to local CRDT state
crdt . apply ( op )
// Broadcast to stream
producer . append ( JSON . stringify ({
type: "operation" ,
clientId ,
operation: op ,
timestamp: Date . now (),
}))
}
await producer . flush ()
Presence and awareness
Track who’s currently viewing or editing:
import { DurableStream , IdempotentProducer } from "@durable-streams/client"
const presenceStream = await DurableStream . create ({
url: `https://your-server.com/v1/stream/doc/ ${ documentId } /presence` ,
contentType: "application/json" ,
})
const producer = new IdempotentProducer (
presenceStream ,
`presence- ${ userId } ` ,
{ autoClaim: true }
)
// Announce presence
producer . append ( JSON . stringify ({
type: "join" ,
userId ,
userName: "Alice" ,
color: "#3b82f6" ,
timestamp: Date . now (),
}))
// Send cursor updates
function updateCursor ( position : { x : number ; y : number }) {
producer . append ( JSON . stringify ({
type: "cursor" ,
userId ,
position ,
timestamp: Date . now (),
}))
}
// Send typing indicator
function updateTyping ( isTyping : boolean ) {
producer . append ( JSON . stringify ({
type: "typing" ,
userId ,
isTyping ,
timestamp: Date . now (),
}))
}
// Cleanup on disconnect
window . addEventListener ( "beforeunload" , async () => {
await producer . close ( JSON . stringify ({
type: "leave" ,
userId ,
timestamp: Date . now (),
}))
})
Receive presence updates:
const res = await stream ({
url: `https://your-server.com/v1/stream/doc/ ${ documentId } /presence` ,
offset: "now" , // Only live updates, skip history
live: true ,
})
const activeUsers = new Map < string , User >()
res . subscribeJson ( async ( batch ) => {
for ( const event of batch . items ) {
switch ( event . type ) {
case "join" :
activeUsers . set ( event . userId , {
name: event . userName ,
color: event . color ,
})
break
case "leave" :
activeUsers . delete ( event . userId )
break
case "cursor" :
updateUserCursor ( event . userId , event . position )
break
case "typing" :
updateTypingIndicator ( event . userId , event . isTyping )
break
}
}
renderActiveUsers ( Array . from ( activeUsers . values ()))
})
For OT-based systems:
import { DurableStream } from "@durable-streams/client"
import { type , transformX } from "ot-text" // Example OT library
interface Operation {
clientId : string
revision : number
ops : any []
}
let localRevision = 0
let pendingOps : Operation [] = []
// Apply local operation
function applyLocalOp ( ops : any []) {
const op : Operation = {
clientId ,
revision: localRevision ++ ,
ops ,
}
// Apply locally
document = type . apply ( document , ops )
// Send to stream
stream . append ( op )
// Track as pending
pendingOps . push ( op )
}
// Receive remote operations
res . subscribeJson ( async ( batch ) => {
for ( const remoteOp of batch . items ) {
if ( remoteOp . clientId === clientId ) {
// Our own operation confirmed - remove from pending
pendingOps = pendingOps . filter ( op => op . revision !== remoteOp . revision )
continue
}
// Transform remote operation against pending local operations
let transformedOp = remoteOp . ops
for ( const pendingOp of pendingOps ) {
[ transformedOp , ] = transformX ( transformedOp , pendingOp . ops )
}
// Apply transformed operation
document = type . apply ( document , transformedOp )
updateUI ( document )
}
})
Multiplayer cursors and selections
Show where other users are editing in real-time:
interface Selection {
userId : string
userName : string
color : string
start : number
end : number
}
const selections = new Map < string , Selection >()
// Send selection changes
function updateSelection ( start : number , end : number ) {
producer . append ( JSON . stringify ({
type: "selection" ,
userId ,
userName ,
color: userColor ,
start ,
end ,
timestamp: Date . now (),
}))
}
// Receive selection updates
res . subscribeJson ( async ( batch ) => {
for ( const event of batch . items ) {
if ( event . type === "selection" && event . userId !== userId ) {
selections . set ( event . userId , {
userId: event . userId ,
userName: event . userName ,
color: event . color ,
start: event . start ,
end: event . end ,
})
renderSelections ( Array . from ( selections . values ()))
}
}
})
Handling disconnections
Durable Streams automatically handles reconnection. Your CRDT state remains consistent.
const res = await stream ({
url: docStreamUrl ,
offset: lastSeenOffset ,
live: true ,
onError : async ( error ) => {
// Connection lost - will automatically retry with backoff
showDisconnectedBanner ()
// Optionally refresh auth on 401
if ( error instanceof FetchError && error . status === 401 ) {
const newToken = await refreshAuthToken ()
hideDisconnectedBanner ()
return { headers: { Authorization: `Bearer ${ newToken } ` } }
}
},
})
// When reconnected, the stream automatically catches up from saved offset
res . subscribeJson ( async ( batch ) => {
if ( batch . items . length > 0 ) {
hideDisconnectedBanner () // We're back online and caught up
}
// ... process updates
})
Batching updates
const producer = new IdempotentProducer ( stream , clientId , {
lingerMs: 50 , // Batch updates for 50ms
maxBatchBytes: 65536 , // Or until 64KB
maxInFlight: 3 , // Pipeline up to 3 batches
})
// Rapid edits are automatically batched
for ( let i = 0 ; i < 100 ; i ++ ) {
producer . append ( JSON . stringify ({ type: "insert" , char: "a" }))
}
// Only a few HTTP requests sent instead of 100
Throttling presence updates
import { throttle } from "lodash"
const sendCursorUpdate = throttle (( position ) => {
producer . append ( JSON . stringify ({
type: "cursor" ,
userId ,
position ,
}))
}, 100 ) // Max one update per 100ms
document . addEventListener ( "mousemove" , ( e ) => {
sendCursorUpdate ({ x: e . clientX , y: e . clientY })
})
Authorization
Implement document-level authorization by validating access tokens on stream creation and reads.
// Server-side middleware
app . put ( "/v1/stream/doc/:docId" , async ( req , res ) => {
const { docId } = req . params
const token = req . headers . authorization
// Validate user has write access to this document
const canWrite = await checkPermission ( token , docId , "write" )
if ( ! canWrite ) {
return res . status ( 403 ). json ({ error: "Forbidden" })
}
// Create stream
// ...
})
app . get ( "/v1/stream/doc/:docId" , async ( req , res ) => {
const { docId } = req . params
const token = req . headers . authorization
// Validate user has read access
const canRead = await checkPermission ( token , docId , "read" )
if ( ! canRead ) {
return res . status ( 403 ). json ({ error: "Forbidden" })
}
// Read from stream
// ...
})
Production considerations
Scale CDN caching means thousands of viewers can watch the same document with minimal origin load.
Latency Sub-15ms delivery in production. Use SSE mode for lowest latency on capable clients.
Offline-first CRDTs continue working offline. Durable Streams syncs changes when reconnected.
History Full edit history is preserved in the stream. Build version control or time-travel debugging.
Next steps
Yjs Integration Complete guide to using Yjs with Durable Streams
Idempotent Producer Learn about exactly-once delivery for conflict-free syncing
State Protocol Build materialized views and projections from streams
Live Modes Optimize real-time performance with SSE vs long-poll