svelte-realtime

svelte-realtime

Realtime RPC and reactive subscriptions for SvelteKit, built on svelte-adapter-uws.

Documentation · Tutorial · Live Demo


Write server functions. Import them in components. Call them over WebSocket. No boilerplate, no manual pub/sub wiring, no protocol design.

Upgrading from 0.4.x? See the migration guide for every breaking change between 0.4.x and 0.5.x.


Quick start

npx svelte-realtime my-app
cd my-app
npm run dev

This creates a SvelteKit project with svelte-realtime fully wired: adapter, vite plugins, WebSocket hooks, and a working counter example you can open in your browser right away.


Version compatibility

The three ecosystem packages move together. Bump them as a group:

svelte-adapter-uws svelte-realtime svelte-adapter-uws-extensions Notes
^0.4.x ^0.4.x ^0.4.x Legacy stable
^0.5.0 ^0.5.0 ^0.5.0 Current. Node 22+ required. See MIGRATION.md if upgrading from 0.4.

Mixed-version installs are rejected at install time with a peer-dep warning.


Manual setup

Starting from a SvelteKit project. If you do not have one yet, run npx sv create my-app && cd my-app && npm install first.

Step 1: Install everything

npm install svelte-adapter-uws svelte-realtime
npm install uNetworking/uWebSockets.js#v20.60.0
npm install -D ws

What each package does:

  • svelte-adapter-uws (>=0.4.0) - the SvelteKit adapter that runs your app on uWebSockets.js with built-in WebSocket support
  • svelte-realtime - this library (RPC + streams on top of the adapter)
  • uWebSockets.js - the native C++ HTTP/WebSocket server (installed from GitHub, not npm)
  • ws - dev dependency used by the adapter during npm run dev (not needed in production)

Step 2: Configure the adapter

Open svelte.config.js and replace the default adapter:

// svelte.config.js
import adapter from 'svelte-adapter-uws';

export default {
  kit: {
    adapter: adapter({ websocket: true })
  }
};

Step 3: Add the Vite plugins

Open vite.config.js and add the adapter and realtime plugins:

// vite.config.js
import { sveltekit } from '@sveltejs/kit/vite';
import uws from 'svelte-adapter-uws/vite';
import realtime from 'svelte-realtime/vite';

export default {
  plugins: [sveltekit(), uws(), realtime()]
};

All three plugins are required. Order does not matter.

Step 4: Create the WebSocket hooks file

Create src/hooks.ws.js in your project root. This file tells the adapter how to handle WebSocket connections and messages.

// src/hooks.ws.js
export { message } from 'svelte-realtime/server';

export function upgrade({ cookies }) {
  // Return user data to attach to the connection, or false to reject.
  // This runs on every new WebSocket connection.
  const session = validateSession(cookies.session_id);
  if (!session) return false;
  return { id: session.userId, name: session.name };
}

message is a ready-made hook that routes incoming WebSocket messages to your live functions. upgrade decides who can connect and attaches user data to the connection.

This file is required. The Vite plugin will warn at startup if it finds live modules in src/live/ but no src/hooks.ws.js (or .ts). Without it, WebSocket messages have nothing on the server side to route them, and all RPC calls will silently time out.

Step 5: Write a server function

Create the src/live/ directory. Every .js file in this directory becomes a module of callable server functions.

// src/live/chat.js
import { live, LiveError } from 'svelte-realtime/server';
import { db } from '$lib/server/db';

// A plain RPC function - clients can call this like a regular async function
export const sendMessage = live(async (ctx, text) => {
  if (!ctx.user) throw new LiveError('UNAUTHORIZED', 'Login required');

  const msg = await db.messages.insert({ userId: ctx.user.id, text });
  ctx.publish('messages', 'created', msg);
  return msg;
});

// A stream - clients get a Svelte store with initial data + live updates
export const messages = live.stream('messages', async (ctx) => {
  return db.messages.latest(50);
}, { merge: 'crud', key: 'id', prepend: true });

live() marks a function as callable over WebSocket. The first argument is always ctx (context), which contains the user data from upgrade(), the WebSocket connection, and a publish function for sending events to all subscribers.

live.stream() creates a reactive subscription. When a client subscribes, it gets the initial data from the function, then receives live updates whenever someone publishes to that topic.

Step 6: Use it in a component

<!-- src/routes/chat/+page.svelte -->
<script>
  import { sendMessage, messages } from '$live/chat';
  let text = $state('');

  async function send() {
    await sendMessage(text);
    text = '';
  }
</script>

{#if $messages === undefined}
  <p>Loading...</p>
{:else}
  {#each $messages as msg (msg.id)}
    <p><b>{msg.userId}:</b> {msg.text}</p>
  {/each}
{/if}

<input bind:value={text} />
<button onclick={send}>Send</button>

$live/chat is a virtual import. The Vite plugin reads your src/live/chat.js file, sees which functions are wrapped in live() and live.stream(), and generates lightweight client stubs that call them over WebSocket.

  • sendMessage becomes a regular async function on the client.
  • messages becomes a Svelte store. It starts as undefined (loading), then populates with the initial data, then merges live updates as they arrive.

That is the entire setup. Run npm run dev and it works.


How it works

You write:     src/live/chat.js       (server functions)
You import:    $live/chat             (auto-generated client stubs)

live()         -> RPC call over WebSocket (async function)
live.stream()  -> Svelte store with initial data + live updates

The ctx object passed to every server function contains:

Field Description
ctx.user Whatever upgrade() returned (your user data)
ctx.ws The raw WebSocket connection
ctx.platform The adapter platform API
ctx.publish Shorthand for platform.publish(). Rejects __-prefixed topics with LiveError('INVALID_TOPIC') (those are framework-internal channels; use ctx.platform.publish() directly if you genuinely need to broadcast on one)
ctx.cursor Cursor from a loadMore() call, or null
ctx.requestId Correlation id from platform.requestId (per WS connection or per HTTP request); honors X-Request-ID
ctx.throttle (topic, event, data, ms) - publish at most once per ms ms
ctx.debounce (topic, event, data, ms) - publish after ms ms of silence
ctx.signal (userId, event, data) - point-to-point message
ctx.batch (messages) - publish multiple messages in one call via platform.batch()

Note: ctx.user may contain adapter-injected properties (__subscriptions, remoteAddress) in addition to whatever your upgrade() function returned. These are stripped automatically by the adapter before broadcasting to other clients.


Table of contents

Core

Client features

Server features

Deployment

Reference


Merge strategies

The merge option on live.stream() controls how live pub/sub events are applied to the store.

crud (default)

Handles created, updated, deleted events. The store maintains an array, keyed by id (configurable with key). Set max to cap the buffer size and drop the oldest items when exceeded (useful for live feeds with prepend: true).

// Server
export const todos = live.stream('todos', async (ctx) => {
  return db.todos.all();
}, { merge: 'crud', key: 'id' });

export const addTodo = live(async (ctx, text) => {
  const todo = await db.todos.insert({ text });
  ctx.publish('todos', 'created', todo);
  return todo;
});
<!-- Client -->
<script>
  import { todos, addTodo } from '$live/todos';
</script>

{#each $todos as todo (todo.id)}
  <p>{todo.text}</p>
{/each}

latest

Ring buffer of the last N events. Good for activity feeds and logs.

export const activity = live.stream('activity', async (ctx) => {
  return db.activity.recent(100);
}, { merge: 'latest', max: 100 });

set

Replaces the entire value. Good for counters, status indicators, and aggregated data.

export const stats = live.stream('stats', async (ctx) => {
  return { users: 42, messages: 1337 };
}, { merge: 'set' });

presence

Tracks connected users with join and leave events. Items are keyed by .key.

export const presence = live.stream(
  (ctx, roomId) => 'presence:' + roomId,
  async (ctx, roomId) => [],
  { merge: 'presence' }
);

export const join = live(async (ctx, roomId) => {
  ctx.publish('presence:' + roomId, 'join', { key: ctx.user.id, name: ctx.user.name });
});
<script>
  import { presence } from '$live/room';
  const users = presence(data.roomId);
</script>

{#each $users as user (user.key)}
  <span>{user.name}</span>
{/each}

Events: join (add/update by key), leave (remove by key), set (replace all).

cursor

Tracks cursor positions with update and remove events. Items are keyed by .key.

export const cursors = live.stream(
  (ctx, docId) => 'cursors:' + docId,
  async (ctx, docId) => [],
  { merge: 'cursor' }
);

export const moveCursor = live(async (ctx, docId, x, y) => {
  ctx.publish('cursors:' + docId, 'update', { key: ctx.user.id, x, y, color: ctx.user.color });
});

Events: update (add/update by key), remove (remove by key), set (replace all).

Stream options reference

Option Default Description
merge 'crud' Merge strategy: 'crud', 'latest', 'set', 'presence', 'cursor'
key 'id' Key field for crud mode
prepend false Prepend new items instead of appending (crud mode)
max 50 / 0 Max items to keep. Defaults to 50 for latest, 0 (unlimited) for crud. Oldest items are dropped when exceeded
replay false Enable seq-based replay for gap-free reconnection
args - Standard Schema (Zod / ArkType / Valibot) for stream arguments. Validated before topic resolution - prevents topic injection via malformed dynamic-topic args
transform - (data) => projection applied to BOTH initial-load data (per-item for arrays) AND every live publish for this topic. Ship a wide row from the database, emit a narrow shape on the wire
coalesceBy - (data) => key extractor. Publishes fan out via per-socket sendCoalesced; the latest value for each (topic, key) pair wins. For high-frequency latest-value streams (prices, cursors, presence). Cannot combine with volatile
volatile false Mark messages fire-and-forget. Disables seq stamping for this topic so reconnects with lastSeenSeq won't try to backfill. Wire-level drop-on-backpressure is the adapter's default. For typing indicators, telemetry pings, cursors
staleAfterMs - Per-topic staleness watchdog. If no events arrive for N ms, the loader re-runs and the result broadcasts as a refreshed event. Useful for streams whose source can quietly stop emitting. See Stream lifecycle hooks
invalidateOn - String or array of glob-style topic patterns (e.g. 'todos:*'). When ctx.publish hits a matching topic, the stream's loader reruns and the result broadcasts as a refreshed event. See Stream lifecycle hooks
onError - (err, ctx, topic) per-stream observer. Fires on loader throws (subscribe / stale-reload / .load() SSR). Errors thrown inside are silently swallowed
classOfService - Names a class registered via live.admission(). New subscribes are shed under matching pressure. See Load shedding
onSubscribe - Callback (ctx, topic) fired when a client subscribes
onUnsubscribe - Callback (ctx, topic, remainingSubscribers) fired when a client disconnects. remainingSubscribers counts OTHER WebSockets still on the topic - use it to tear down upstream feeds at zero
filter / access - Per-connection publish filter (see Access control)
delta - Delta sync config (see Delta sync and replay)
version - Schema version (see Schema evolution)
migrate - Migration functions (see Schema evolution)

Reconnection

When the WebSocket reconnects, streams automatically refetch initial data and resubscribe. The store keeps showing stale data during the refetch - it does not reset to undefined.

Mid-flight RPCs reject with DISCONNECTED. If the WS drops while an RPC or mutate is awaiting a response, the corresponding promise rejects with RpcError('DISCONNECTED'). Callers that wrap the call in mutate(asyncOp, change) get auto-rollback for free: the optimistic-queue entry settles as failed, removing the placeholder from the displayed state. Concurrent in-flight mutates each roll back independently - if A and B are both pending when the WS drops, both promises reject and both placeholders are removed in one display recompute.

Catch-up via initial fetch on resubscribe. Once the WS reconnects, every active stream re-runs its loader and broadcasts the result through the same merge strategy used on first subscribe. Pub/sub events that fired during the disconnect window arrive as part of the fresh fetch (they're materialized in the loader's data source). For tighter "no-frame-loss" guarantees use the delta configuration (see Delta sync and replay), which fills small gaps via the per-topic seq-numbered replay buffer and falls back to delta sync for larger gaps.

Triggering reconnect. The next RPC call after the WS drops triggers the adapter's reconnect logic. Most apps don't need to do anything special - the user clicks something, the RPC fires, the adapter reconnects, the call lands. If you want to display a reconnecting banner, watch the status store from svelte-adapter-uws/client for the 'reconnecting' -> 'open' transition.


Connection state stores

Four reactive stores re-export from svelte-realtime/client for rendering connection state without app-side WebSocket plumbing.

<script>
  import { status, failure, quiescent, health } from 'svelte-realtime/client';
</script>

{#if $health === 'degraded'}
  <Banner severity="warn">Real-time updates paused, reconnecting...</Banner>
{:else if $failure?.class === 'TERMINAL'}
  <Banner severity="error">Session expired. <a href="/login">Sign in again</a></Banner>
{:else if $failure?.class === 'THROTTLE'}
  <Banner severity="warn">Server is busy. Reconnecting more slowly...</Banner>
{:else if $status === 'disconnected'}
  <Banner severity="info">Reconnecting...</Banner>
{/if}

{#if !$quiescent}
  <Spinner />
{/if}
Store Type Behavior
status 'connecting' | 'open' | 'suspended' | 'disconnected' | 'failed' Connection state machine. suspended = tab in background; failed = terminal (auth denied or close() called)
failure { kind, class, code, reason } | null Cause of the most recent non-open transition. class is TERMINAL (auth) / EXHAUSTED (max retries) / THROTTLE (4429) / RETRY / AUTH (HTTP preflight). Cleared on next 'open'. Not set on intentional close()
quiescent Readable<boolean> true when every active stream has settled (initial load + all reconnects). Continuous signal - a false -> true transition after a reconnect cycle marks "everything caught up"
health 'healthy' | 'degraded' System-wide health, sourced from degraded / recovered events on the __realtime topic. Stays 'healthy' until something publishes - typically the extensions package's pub/sub bus circuit breaker

failure and quiescent are pure additions; apps that don't use them pay nothing. health lazily subscribes to __realtime only on first read; never reading it = no subscription.

Apps that need richer health detail (reason strings, timestamps) can listen to the topic directly:

import { on } from 'svelte-adapter-uws/client';
on('__realtime').subscribe((envelope) => { /* full payload */ });

Svelte 5 store helpers

Generated $live/* stream stores work out of the box as Svelte 4 Readable<T> values via the $store auto-subscribe syntax. For Svelte 5 apps, two methods are exposed alongside the existing subscribe interface so component code stays terse without reaching for $derived.by(() => $store ?? []) boilerplate.

store.rune() - Svelte 5 reactive object

Returns an object with a single current getter, backed by Svelte's fromStore from svelte/store. Reading current inside an effect or component subscribes via Svelte's createSubscriber for fine-grained reactivity; reading it outside an effect synchronously returns the latest value.

<script>
  import { todos } from '$live/todos';
  const items = todos.rune();
</script>

<p>{items.current?.length ?? 0} items</p>
{#each items.current ?? [] as todo}
  <li>{todo.title}</li>
{/each}

rune() requires Svelte 5 (the fromStore export is not available in Svelte 4) and throws a descriptive error if called against an older runtime. Apps still on Svelte 4 use the $store auto-subscribe syntax instead.

store.map(fn) - per-item projection

Returns a mapped store with the same { subscribe, rune, map } shape as the source. Idiomatic alternative to $derived.by(() => ($stream ?? []).map(...)) and avoids the $derived(() => ...) footgun where storing a function reference instead of its return value silently breaks rendering.

<script>
  import { todos } from '$live/todos';
  const titles = todos.map(t => t.title);
  // Or compose with rune() for Svelte 5:
  const titlesRune = todos.map(t => t.title).rune();
</script>

{#each $titles as title}<li>{title}</li>{/each}

Semantics match the documented ($stream ?? []).map(fn) pattern: a null or undefined source emits []; an array source emits source.map(fn); a non-array source (set-merge stream, paginated wrapper) emits [] after a dev-mode console.warn pointing at the merge-strategy docs. Subscriptions are lazy: the source is only subscribed while at least one mapped consumer is active. Chains via further .map() calls preserve the same shape.

empty - bundled placeholder store

Every generated $live/<name>.js re-exports an empty store that holds undefined. Use it as the fallback for conditional streams without importing readable from svelte/store:

<script>
  import { todos, empty } from '$live/todos';
  let { user, orgId } = $props();
  const items = $derived(user ? todos(orgId) : empty);
</script>

{#each $items ?? [] as todo}
  <li>{todo.title}</li>
{/each}

Auto-imported alongside the stream itself; nothing extra to wire.


Error handling

The data store value never changes shape. It is always your data type or undefined while loading. Errors and connection status live on separate reactive stores so a network failure can never crash your UI:

Property Type Description
$store T | undefined Your data. Never replaced by an error object. On failure, the last loaded value is preserved.
store.error Readable<RpcError | null> Current error, or null when healthy.
store.status Readable<'loading' | 'connected' | 'reconnecting' | 'error'> Connection status.

Handle loading in your template:

{#if $messages === undefined}
  <p>Loading...</p>
{:else}
  {#each $messages as msg (msg.id)}
    <p>{msg.text}</p>
  {/each}
{/if}

To show errors, subscribe to the .error store:

<script>
  import { messages } from '$live/chat';

  const err = messages.error;
  const status = messages.status;
</script>

{#if $err}
  <p>Error: {$err.message} ({$err.code})</p>
{/if}

{#if $status === 'reconnecting'}
  <p>Reconnecting...</p>
{/if}

Defensive patterns like ($store ?? []).filter(...) work correctly because $store is always an array or undefined.

For RPC calls, errors are thrown as RpcError with a code field:

import { sendMessage } from '$live/chat';

try {
  await sendMessage(text);
} catch (err) {
  if (err.code === 'VALIDATION') {
    // handle validation error - err.issues has details
  } else if (err.code === 'UNAUTHORIZED') {
    // redirect to login
  }
}

Terminal close codes

When the adapter's ready() promise rejects (terminal close codes 1008, 4401, 4403, exhausted retries, or explicit close()), svelte-realtime:

  • Rejects all pending RPCs immediately with RpcError('CONNECTION_CLOSED', ...)
  • Sets .error on all active stream stores (the data value is preserved)
  • Drains the offline queue with errors

RPCs called after a terminal close reject immediately without sending.

Reusable error boundary component

For Svelte 5, you can build a reusable boundary that handles loading and error states:

<!-- src/lib/StreamView.svelte -->
<script>
  /** @type {{ store: any, children: import('svelte').Snippet, loading?: import('svelte').Snippet, error?: import('svelte').Snippet<[any]> }} */
  let { store, children, loading, error } = $props();

  let value = $derived($store);
  const err = store.error;
</script>

{#if value === undefined}
  {#if loading}
    {@render loading()}
  {:else}
    <p>Loading...</p>
  {/if}
{:else if $err}
  {#if error}
    {@render error($err)}
  {:else}
    <p>Error: {$err.message}</p>
  {/if}
{:else}
  {@render children()}
{/if}

Use it to wrap any stream:

<script>
  import StreamView from '$lib/StreamView.svelte';
  import { messages, sendMessage } from '$live/chat';
</script>

<StreamView store={messages}>
  {#each $messages as msg (msg.id)}
    <p>{msg.text}</p>
  {/each}

  {#snippet loading()}
    <div class="skeleton-loader">Loading messages...</div>
  {/snippet}

  {#snippet error(err)}
    <div class="error-banner">
      <p>Could not load messages: {err.message}</p>
      <button onclick={() => location.reload()}>Retry</button>
    </div>
  {/snippet}
</StreamView>

With default slots, the minimal version is just:

<StreamView store={messages}>
  {#each $messages as msg (msg.id)}
    <p>{msg.text}</p>
  {/each}
</StreamView>

This removes the {#if}/{:else} boilerplate from every page that uses a stream.


Per-module auth

Every file in src/live/ can export a _guard that runs before all functions in that file.

// src/live/admin.js
import { live, guard, LiveError } from 'svelte-realtime/server';

export const _guard = guard((ctx) => {
  if (ctx.user?.role !== 'admin')
    throw new LiveError('FORBIDDEN', 'Admin only');
});

export const deleteUser = live(async (ctx, userId) => {
  await db.users.delete(userId);
});

export const banUser = live(async (ctx, userId) => {
  await db.users.ban(userId);
});

Both deleteUser and banUser require admin access. No need to check in each function.

guard() accepts multiple functions for composable middleware chains. They run in order, and earlier ones can enrich ctx for later ones:

export const _guard = guard(
  (ctx) => { if (!ctx.user) throw new LiveError('UNAUTHORIZED'); },
  (ctx) => { ctx.permissions = lookupPermissions(ctx.user.id); },
  (ctx) => { if (!ctx.permissions.includes('write')) throw new LiveError('FORBIDDEN'); }
);

Dynamic topics

Use a function instead of a string as the first argument to live.stream() for per-entity streams. The client-side stub becomes a factory function - call it with arguments to get a cached store for that entity.

The first argument's shape decides the client export. If the first argument is a string, the export is a Svelte store and you read it as $messages. If the first argument is a function, the export is a factory that returns a store, and you must call it first: const messages = roomMessages(data.roomId); ... $messages. Forgetting the call gives Svelte error: store_invalid_shape - 'roomMessages' is not a store with a 'subscribe' method during SSR. If the topic does not depend on arguments, prefer the string form.

// src/live/rooms.js
import { live } from 'svelte-realtime/server';

export const roomMessages = live.stream(
  (ctx, roomId) => 'chat:' + roomId,
  async (ctx, roomId) => db.messages.forRoom(roomId),
  { merge: 'crud', key: 'id' }
);

export const sendToRoom = live(async (ctx, roomId, text) => {
  const msg = await db.messages.insert({ roomId, userId: ctx.user.id, text });
  ctx.publish('chat:' + roomId, 'created', msg);
  return msg;
});
<!-- src/routes/rooms/[id]/+page.svelte -->
<script>
  import { roomMessages, sendToRoom } from '$live/rooms';
  let { data } = $props();

  // roomMessages is a function - call it with the room ID to get a store
  const messages = roomMessages(data.roomId);
</script>

{#each $messages as msg (msg.id)}
  <p>{msg.text}</p>
{/each}

Same arguments return the same cached store instance. The cache is cleaned up when all subscribers unsubscribe.


Topic registry

Centralize topic strings so the SQL trigger and the stream definition reference one source of truth. defineTopics(map) validates the map at boot and exposes __patterns for tooling.

// src/lib/topics.js
import { defineTopics } from 'svelte-realtime/server';

export const TOPICS = defineTopics({
  audit: (orgId) => `audit:${orgId}`,
  security: (orgId) => `security:${orgId}`,
  systemNotices: 'system:notices'
});

// Stream definitions reference the registry
live.stream((ctx, orgId) => TOPICS.audit(orgId), loadAudit, { merge: 'crud' });

// Tooling reads __patterns to derive shapes
TOPICS.__patterns;
// => { audit: 'audit:{arg0}', security: 'security:{arg0}', systemNotices: 'system:notices' }

Map values can be strings (static topics) or (...args) => string functions (dynamic topics). The helper validates non-empty strings and rejects reserved names (__patterns, __definedTopics). Pattern derivation calls each function with sentinel placeholders ({arg0}, {arg1}, ...) and falls back to '<dynamic>' if the function throws on placeholders or returns a non-string.

Build-time registry check

When the Vite plugin sees a defineTopics({...}) call anywhere under src/, it builds a registry of patterns and validates string-literal topics passed to live.stream(...) and live.channel(...) against it. A literal that does not match any registered pattern triggers a one-shot warning per (file, topic) pair:

[svelte-realtime] src/live/feed.js: live.stream topic 'mistyped-topic' is not in
your TOPICS registry. Either add it to defineTopics({...}) or call TOPICS.<name>(...)
instead of passing a string literal.

The check covers static-string patterns (feed: 'feed:notices') and arrow-return template literals (audit: (orgId) => \audit:${orgId}`); template interpolations match .+so'audit:org-123'and'audit:any-id'both pass against theauditpattern. Function references and other dynamic value shapes are silently skipped at parse time - the warning only fires for literal topics under a confidently parsed registry. If your project does not calldefineTopics` at all, the check is disabled.


Schema validation

Use live.validated(schema, fn) to validate the first argument against a schema before the function runs. Any Standard Schema-compatible validator is supported, including Zod, ArkType, Valibot, and others.

import { z } from 'zod';
import { live } from 'svelte-realtime/server';

const CreateTodo = z.object({
  text: z.string().min(1).max(200),
  priority: z.enum(['low', 'medium', 'high']).optional()
});

export const addTodo = live.validated(CreateTodo, async (ctx, input) => {
  const todo = await db.todos.insert({ ...input, userId: ctx.user.id });
  ctx.publish('todos', 'created', todo);
  return todo;
});

Because live.validated() uses the Standard Schema interface, you can swap in any compatible validator:

import { type } from 'arktype';
import { live } from 'svelte-realtime/server';

const CreateTodo = type({ text: 'string>0', priority: '"low"|"medium"|"high"|undefined' });

export const addTodo = live.validated(CreateTodo, async (ctx, input) => {
  const todo = await db.todos.insert({ ...input, userId: ctx.user.id });
  ctx.publish('todos', 'created', todo);
  return todo;
});

On the client, validated exports work like regular live() calls. Validation errors are thrown as RpcError with code: 'VALIDATION' and an issues array.

Validating stream arguments

Stream arguments validate via the args option on live.stream(). Validation runs BEFORE topic resolution, so a malformed argument can never reach a dynamic topic function.

import { z } from 'zod';

export const auditFeed = live.stream(
  (ctx, orgId) => `audit:${orgId}`,
  async (ctx, orgId) => loadAudit(orgId),
  { args: z.tuple([z.string().uuid()]) }
);

Validation failures reject the subscribe RPC with { code: 'VALIDATION', issues }. Both the WebSocket and .load() SSR paths apply the schema. Coerced values from the schema (e.g. Zod transforms) flow through to the loader and the topic function.


Channels

Ephemeral pub/sub topics with no database initialization. Clients subscribe and receive live events immediately.

// src/live/typing.js
import { live } from 'svelte-realtime/server';

export const typing = live.channel('typing:lobby', { merge: 'presence' });
<script>
  import { typing } from '$live/typing';
</script>

{#each $typing as user (user.key)}
  <span>{user.data.name} is typing...</span>
{/each}

Dynamic channels work the same way:

export const cursors = live.channel(
  (ctx, docId) => 'cursors:' + docId,
  { merge: 'cursor' }
);

For high-frequency streams where a missed frame is acceptable (typing indicators, telemetry pings, raw cursor positions you don't need replayed on reconnect), set volatile: true on the stream:

export const cursors = live.stream(
  (ctx, roomId) => `room:${roomId}:cursors`,
  loader,
  { merge: 'cursor', volatile: true }
);

Two effects: per-event seq stamping is skipped for the topic (so reconnect with lastSeenSeq won't try to backfill), and the option declares intent at the call site. Wire-level "drop on backpressure" is the adapter's default behavior already - uWS auto-skips a subscriber whose outbound buffer is over maxBackpressure (default 64 KB). Cannot combine with coalesceBy (queue vs drop - different intents) or replay (volatile messages aren't buffered for resume).


SSR hydration

Call live functions from +page.server.js to load data server-side, then hydrate the client-side stream store to avoid loading spinners.

// src/routes/chat/+page.server.js
export async function load({ platform, locals }) {
  const { messages } = await import('$live/chat');
  const data = await messages.load(platform, { user: locals.user });
  return { messages: data };
}
<!-- src/routes/chat/+page.svelte -->
<script>
  import { messages } from '$live/chat';
  let { data } = $props();

  // Pre-populate the store with SSR data - no loading spinner
  const msgs = messages.hydrate(data.messages);
</script>

{#each $msgs as msg (msg.id)}
  <p>{msg.text}</p>
{/each}

The hydrated store still subscribes for live updates on first render. It keeps the SSR data visible instead of showing undefined during the initial fetch. Guards still run during .load() calls. Pass { user } as the second argument if your guard or init function needs user data.

For dynamic streams (streams with a topic function), call the stream first to get the store, then hydrate:

// src/routes/team/[id]/+page.server.js
export async function load({ platform, locals, params }) {
  const { invitations } = await import('$live/invitation');
  const data = await invitations.load(platform, { args: [params.id], user: locals.user });
  return { invitations: data };
}
<!-- src/routes/team/[id]/+page.svelte -->
<script>
  import { invitations } from '$live/invitation';
  import { page } from '$app/state';
  let { data } = $props();

  const invites = invitations(page.params.id).hydrate(data.invitations);
</script>

{#each $invites as invite (invite.id)}
  <p>{invite.email}</p>
{/each}

Batching

Group multiple RPC calls into a single WebSocket frame to reduce round trips.

<script>
  import { batch } from 'svelte-realtime/client';
  import { createBoard, addColumn, addCard } from '$live/boards';

  async function setupBoard() {
    const [board, column, card] = await batch(() => [
      createBoard('My Board'),
      addColumn('To Do'),
      addCard('First task')
    ]);
  }
</script>

By default, calls in a batch run in parallel on the server. Pass { sequential: true } when order matters:

const [board, column] = await batch(() => [
  createBoard('My Board'),
  addColumn(boardId, 'To Do')
], { sequential: true });

Each call resolves or rejects independently - one failure does not cancel the others. Batches are limited to 50 calls - enforced both client-side (rejects before sending) and server-side.

Server-side batching

Use ctx.batch() inside RPC handlers to publish multiple messages in a single call:

export const resetBoard = live(async (ctx, boardId) => {
  await db.boards.reset(boardId);
  ctx.batch([
    { topic: `board:${boardId}`, event: 'set', data: [] },
    { topic: `board:${boardId}:presence`, event: 'set', data: [] }
  ]);
});

Optimistic updates

Apply changes to a stream store instantly, then roll back if the server call fails.

<script>
  import { todos, addTodo } from '$live/todos';

  async function add(text) {
    const tempId = 'temp-' + Date.now();
    const rollback = todos.optimistic('created', { id: tempId, text });

    try {
      await addTodo(text);
      // Server broadcasts the real 'created' event, which replaces the
      // optimistic entry (matched by key) with the confirmed data.
    } catch {
      rollback();
    }
  }
</script>

optimistic(event, data) returns a rollback function that restores the store to its previous state. It works with all merge strategies:

Merge Events Behavior
crud created, updated, deleted Modifies array by key. Server event with same key replaces the optimistic entry.
latest any event name Appends data to the ring buffer.
set any event name Replaces the entire value.

Auto-rollback with store.mutate()

store.mutate(asyncOp, optimisticChange) wraps the apply-await-rollback pattern. Applies the optimistic change synchronously, awaits the RPC, and on rejection rolls back and re-throws.

// Event-based: server's confirming event reconciles the placeholder by key
const todo = await todos.mutate(
  () => addTodo(text),
  { event: 'created', data: { id: tempId(), text } }
);

// Free-form mutator: arbitrary local change, no merge-strategy assumptions
await todos.mutate(
  () => removeTodo(id),
  (current) => current.filter((t) => t.id !== id)
);

Returns the result of asyncOp on success. The free-form mutator receives a shallow copy: top-level array shape changes (push, pop, filter, splice) roll back cleanly; in-place item field mutations (draft[0].name = 'x') do NOT, because the draft and the prior items share item references. Replace whole items instead: draft[i] = { ...draft[i], name: 'x' }.

Concurrent mutates roll back independently. Pending mutations are tracked in an in-flight queue and the displayed value is recomputed by replaying that queue against the un-overlaid server state after every server event and every settle. If mutate A and mutate B are both in flight and both fail, the displayed state returns to the latest server state with no phantom traces of either A or B. Server events with a key matching a queue entry's optimistic key absorb the entry, so the typical "client generates UUID, server confirms with same id" flow does not flicker.

RPC-bound shorthand: rpc.createOptimistic()

Every generated RPC stub also exposes a .createOptimistic(store, callArgs, optimisticChange) method. It threads callArgs into the optimistic-change callback so the call site doesn't have to capture them in a closure:

import { sendMessage, messages } from '$live/chat';

await sendMessage.createOptimistic(
  messages,
  ['Hello!'],
  (current, args) => [...current, { id: tempId(), text: args[0] }]
);

callArgs is always passed as an array (so multi-argument RPCs work the same as single-argument ones; pass [arg] for the single-arg case). The third argument accepts the same two shapes as store.mutate(): a (current, args) => newValue function or a { event, data } object. Equivalent to:

store.mutate(() => rpc(...callArgs), wrappedChange);

so behavior on success/rollback/server-confirmation is identical to store.mutate(). The shorthand is purely syntactic; reach for store.mutate() directly when the asyncOp isn't an RPC (third-party API call, multi-step flow, etc.).

Curried form - bind once, call many times. Pass two arguments instead of three (store, change) and createOptimistic returns a callable bound to that store + change:

const optimisticSend = sendMessage.createOptimistic(
  messages,
  (current, args) => [...current, { id: tempId(), text: args[0] }]
);
await optimisticSend('Hello!');
await optimisticSend('There!');

Stream-side spelling - the same flow can be expressed from the stream's perspective via store.createOptimistic(rpc, callArgs, change):

await messages.createOptimistic(
  sendMessage,
  ['Hello!'],
  (current, args) => [...current, { id: tempId(), text: args[0] }]
);

Identical semantics to the RPC-side spelling; pick whichever reads more naturally for your call site (stream-focused code prefers store.createOptimistic; RPC-focused code prefers rpc.createOptimistic).


Stream pagination

For large datasets, return { data, hasMore, cursor } from your stream init function to enable cursor-based pagination.

// src/live/feed.js
import { live } from 'svelte-realtime/server';

export const posts = live.stream('posts', async (ctx) => {
  const limit = 20;
  const rows = await db.posts.list({ limit: limit + 1, after: ctx.cursor });
  const hasMore = rows.length > limit;
  const data = hasMore ? rows.slice(0, limit) : rows;
  const cursor = data.length > 0 ? data[data.length - 1].id : null;
  return { data, hasMore, cursor };
}, { merge: 'crud', key: 'id' });
<script>
  import { posts } from '$live/feed';

  async function loadNext() {
    await posts.loadMore();
  }
</script>

{#each $posts as post (post.id)}
  <p>{post.title}</p>
{/each}

{#if posts.hasMore}
  <button onclick={loadNext}>Load more</button>
{/if}

The server detects the { data, hasMore } shape automatically. ctx.cursor contains the cursor value sent by the client on subsequent loadMore() calls (null on the first request).


Undo and redo

Stream stores support history tracking for undo/redo.

<script>
  import { todos } from '$live/todos';

  todos.enableHistory(100); // max 100 entries

  function handleUndo() {
    todos.undo();
  }
</script>

<button onclick={handleUndo} disabled={!todos.canUndo}>Undo</button>
<button onclick={() => todos.redo()} disabled={!todos.canRedo}>Redo</button>

History is recorded after every mutation (both live events and optimistic updates). Call enableHistory() once to start tracking.


Request deduplication

Identical RPC calls made within the same microtask are automatically coalesced into a single request.

// These two calls happen in the same microtask - only one request is sent
const [a, b] = await Promise.all([
  getUser(userId),
  getUser(userId) // same call, same args - reuses the first request
]);

To bypass deduplication and force a fresh request:

const result = await getUser.fresh(userId); // always sends a new request

Dev-mode coalesce warning. Stress tests and parallel-fan-out patterns like Promise.allSettled(Array.from({ length: 25 }, () => buyProduct('phone'))) expect N wire requests but get one - all 25 promises resolve to the same response, the server-side state shows a single decrement, and the failure is silent. To make this discoverable, the client logs a one-time console.warn on the first coalesce per RPC path per session, with a one-line pointer to .fresh(...). Dev-only (stripped under NODE_ENV=production); double-tap dedup on the same path warns once, never again.

[svelte-realtime] coalesced two or more identical calls to 'shop/buy' within one microtask - only one wire request was sent and all callers received the same response. ... If you wanted N parallel requests (stress test, fan-out), call `.fresh(...args)` on the rpc to bypass dedup. Warned once per path per session.

Idempotency keys

Microtask deduplication only collapses calls within the same tick. For durable safety against retries that span reconnects, tab reloads, or offline replay, wrap the handler with live.idempotent(config, fn). Identical calls (by key) return the cached result without re-running the handler.

// Server: server-derived key (Stripe-style)
import { live } from 'svelte-realtime/server';

export const createOrder = live.idempotent(
  { keyFrom: (ctx, input) => `order:${ctx.user.id}:${input.clientOrderId}`, ttl: 48 * 3600 },
  live.validated(OrderSchema, async (ctx, input) => {
    return db.orders.insert({ userId: ctx.user.id, ...input });
  })
);
// Client: envelope-supplied key (uuid per intent)
import { createOrder } from '$live/orders';

const intentId = crypto.randomUUID();
const order = await createOrder.with({ idempotencyKey: intentId })(payload);

Resolution: keyFrom(ctx, ...args) if defined, otherwise the client envelope's idempotencyKey, otherwise the wrapper is a no-op. Only successful results cache; throwing handlers abort the slot so the next caller re-runs. Default store is in-process and bounded; for multi-instance deployments pass store: createIdempotencyStore(redis) from svelte-adapter-uws-extensions.

Field name divergence with live.lock. live.idempotent uses keyFrom; live.lock uses key (which accepts a string OR a function). Mistakenly mirroring the wrong helper's shape used to silently fall through to the no-key bypass, breaking the one-per-key guarantee with no warning. Unknown config fields now throw at registration time with a cross-helper hint:

[svelte-realtime] live.idempotent: unknown config field 'key'. Allowed: keyFrom, store, ttl. Hint: live.lock uses 'key' but live.idempotent uses 'keyFrom' (the names diverged historically).
Option Default Description
keyFrom - (ctx, ...args) => string | null | undefined. null/undefined falls back to the envelope key
store in-process Any object exposing acquire(key, ttlSec) matching the extensions store contract
ttl 172800 (48h) TTL in seconds. 0 skips the cache write (concurrent waiters re-run after the first finishes)

__rpc().with({ ... }) composes options on the same surface:

// Compose idempotency + per-call timeout
await createOrder.with({ idempotencyKey: id, timeout: 60_000 })(payload);
.with({}) option Description
idempotencyKey Carried in the envelope for the server's live.idempotent wrapper
timeout Per-call timeout in ms; overrides the global configure({ timeout }) default. Sleep-detect threshold scales with the override

Offline queue

Queue RPC calls when the WebSocket is disconnected and replay them on reconnect.

import { configure } from 'svelte-realtime/client';

configure({
  offline: {
    queue: true,        // enable offline queuing
    maxQueue: 100,      // drop oldest if queue exceeds this (default: 100)
    maxAge: 60000,      // auto-reject queued calls older than this (ms)
    beforeReplay(call) {
      // Return false to drop stale mutations
      return Date.now() - call.queuedAt < 60000; // drop if older than 1 minute
    },
    onReplayError(call, error) {
      console.warn('Replay failed:', call.path, error);
    }
  }
});

When offline queuing is enabled, RPC calls made while disconnected return promises that resolve when the call is replayed after reconnection. If the queue overflows, the oldest entry is dropped and its promise rejects with QUEUE_FULL. If maxAge is set, queued calls older than that threshold are rejected with STALE at replay time.


Connection hooks

Use configure() on the client to react to WebSocket connection state changes.

<!-- src/routes/+layout.svelte -->
<script>
  import { configure } from 'svelte-realtime/client';

  configure({
    onConnect() {
      // Reconnected after a drop
      invalidateAll();
    },
    onDisconnect() {
      showBanner('Connection lost, reconnecting...');
    }
  });
</script>

Call configure() once at app startup. The hooks fire on state transitions only (not on the initial connection).

Option Description
url Full WebSocket URL for cross-origin or native app usage (e.g. 'wss://api.example.com/ws')
auth true (or a custom path) to enable an HTTP preflight before each WebSocket upgrade so cookies set by the server's authenticate hook ride a normal HTTP response. Required behind Cloudflare Tunnel and other proxies that drop Set-Cookie on 101 responses. Requires svelte-adapter-uws >= 0.4.12.
onConnect() Called when the WebSocket connection opens after a reconnect
onDisconnect() Called when the WebSocket connection closes
beforeReconnect() Called before each reconnection attempt (can be async)

Cross-origin and native app usage

When using svelte-realtime from a client that runs on a different origin (Svelte Native, React Native, or any standalone app), pass the url option to point at your SvelteKit backend:

import { configure, __rpc, __stream } from 'svelte-realtime/client';

configure({ url: 'wss://my-sveltekit-app.com/ws' });

// Call a live function (equivalent to $live/chat.sendMessage, but untyped)
const sendMessage = __rpc('chat/sendMessage');
await sendMessage('hello');

// Subscribe to a stream (returns a Svelte store)
const messages = __stream('chat/messages', { merge: 'crud', key: 'id' });
messages.subscribe((value) => console.log(value));

The typed $live/* imports and stream hydration are generated by the Vite plugin and only work inside a SvelteKit project. Outside SvelteKit, use __rpc() and __stream() directly. You get the same reconnection, offline queue, and batching - just without codegen and types.

When url is set, the default same-origin WebSocket URL is bypassed entirely. Requires svelte-adapter-uws 0.4.8+.

Browser clients authenticate via cookies set during login. Native clients typically use a token instead. Your upgrade hook can support both:

// src/hooks.ws.js
export { message } from 'svelte-realtime/server';

export function upgrade({ cookies, url }) {
  // Browser - cookie auth
  const session = cookies.session_id;
  if (session) return validateSession(session);

  // Native app - token auth via query string
  const token = new URL(url, 'http://n').searchParams.get('token');
  if (token) return validateToken(token);

  return false;
}

The native client passes the token in the URL:

configure({ url: 'wss://my-sveltekit-app.com/ws?token=...' });

Refreshing session cookies on connect (Cloudflare Tunnel and friends)

Cloudflare Tunnel and other strict edge proxies silently drop the Set-Cookie header on WebSocket 101 Switching Protocols responses. The connection appears to open server-side, then the client immediately sees close 1006 and never receives a single frame. The classic symptom for this in production: WebSockets work locally and on a bare server, then break the moment you put Cloudflare in front.

Fix it in three pieces:

  1. Export an authenticate hook from src/hooks.ws.{js,ts}. It runs as a normal HTTP POST /__ws/auth before every upgrade (including reconnects), so cookies you set ride a 204 No Content response that proxies route correctly.
  2. Opt into the client preflight with configure({ auth: true }).
  3. Use svelte-adapter-uws >= 0.4.12.
// src/hooks.ws.js
export { message, close, unsubscribe } from 'svelte-realtime/server';

export function upgrade({ cookies }) {
  const session = validateSession(cookies.session_id);
  return session ? { id: session.userId, name: session.name } : false;
}

export function authenticate({ cookies }) {
  const session = validateSession(cookies.get('session_id'));
  if (!session) return false;

  if (shouldRotate(session)) {
    cookies.set('session_id', rotate(session), {
      httpOnly: true,
      secure: true,
      sameSite: 'lax',
      path: '/'
    });
  }
  return { id: session.userId, name: session.name };
}
<!-- src/routes/+layout.svelte -->
<script>
  import { configure } from 'svelte-realtime/client';
  configure({ auth: true });
</script>

The client coalesces concurrent connects into a single in-flight preflight, treats 4xx as terminal, and falls back to normal reconnect backoff on 5xx and network errors.

Detector: if the client sees two consecutive WebSocket open->close cycles inside one second with no traffic, it logs a one-shot console.warn pointing at this section. That is the Cloudflare-Tunnel-eating-cookies fingerprint.


SvelteKit transport

realtimeTransport() from svelte-realtime/hooks is a SvelteKit transport-hook preset that auto-registers RpcError and LiveError serialization across the SSR / client boundary. Without it, typed errors thrown from +page.server.js load() arrive at +error.svelte as plain Error instances and lose their code field.

// src/hooks.js  (NOT hooks.server.js - the shared hook is required)
import { realtimeTransport } from 'svelte-realtime/hooks';

export const transport = realtimeTransport();

Compose with app-defined types via the optional extras parameter (user entries appear after defaults so they win on key conflict):

// src/hooks.js
import { realtimeTransport } from 'svelte-realtime/hooks';
import { Vector } from '$lib/geometry';

export const transport = realtimeTransport({
  Vector: {
    encode: (v) => v instanceof Vector && [v.x, v.y],
    decode: ([x, y]) => new Vector(x, y)
  }
});

Wire from src/hooks.js, NOT hooks.server.js. SvelteKit's transport primitive needs both encode (server-side) and decode (client-side hydration) visible at build time - the wrong file silently half-works (encode runs but decode never reaches the client). RpcError's optional issues field (carried by live.validated() failures) survives the round-trip. Validation runs at registration: malformed extras throw immediately so misconfiguration fails fast at app boot.


Combine stores

Compose multiple stream stores into a single derived store. When any source updates, the combining function re-runs.

<script>
  import { combine } from 'svelte-realtime/client';
  import { orders, inventory } from '$live/dashboard';

  const dashboard = combine(orders, inventory, (o, i) => ({
    pendingOrders: o?.filter(x => x.status === 'pending').length ?? 0,
    lowStock: i?.filter(x => x.qty < 10) ?? []
  }));
</script>

<p>Pending: {$dashboard.pendingOrders}</p>

combine() accepts 2-6 stores with typed overloads, plus a variadic fallback for more. Zero network overhead - all computation happens client-side.


Global middleware

Use live.middleware() to register cross-cutting logic that runs before per-module guards on every RPC and stream call.

import { live, LiveError } from 'svelte-realtime/server';

// Logging middleware
live.middleware(async (ctx, next) => {
  const start = Date.now();
  const result = await next();
  console.log(`[${ctx.user?.id}] took ${Date.now() - start}ms`);
  return result;
});

// Auth middleware - rejects unauthenticated requests globally
live.middleware(async (ctx, next) => {
  if (!ctx.user) throw new LiveError('UNAUTHORIZED', 'Login required');
  return next();
});

Middleware runs in registration order. Each must call next() to continue the chain. When no middleware is registered, there is zero overhead.


Throttle and debounce

Use ctx.throttle() and ctx.debounce() inside any live() function to rate-limit publishes.

export const updatePosition = live(async (ctx, x, y) => {
  // Throttle: publishes immediately, then at most once per 50ms (trailing edge guaranteed)
  ctx.throttle('cursors', 'update', { key: ctx.user.id, x, y }, 50);
});

export const saveSearch = live(async (ctx, query) => {
  // Debounce: waits for 300ms of silence before publishing
  ctx.debounce('search:' + ctx.user.id, 'set', { query }, 300);
});

ctx.throttle publishes the first call immediately, stores subsequent calls, and sends the last value when the interval expires (trailing edge). ctx.debounce resets the timer on each call and only publishes after silence.


Stream lifecycle hooks

Use onSubscribe and onUnsubscribe in stream options to run logic when clients join or leave a stream.

export const presence = live.stream('room:lobby', async (ctx) => {
  return db.presence.list('lobby');
}, {
  merge: 'presence',
  onSubscribe(ctx, topic) {
    ctx.publish(topic, 'join', { key: ctx.user.id, name: ctx.user.name });
  },
  onUnsubscribe(ctx, topic, remainingSubscribers) {
    ctx.publish(topic, 'leave', { key: ctx.user.id });
    if (remainingSubscribers === 0) stopUpstreamFeed(topic);
  }
});

onSubscribe fires after ws.subscribe(topic) and the initial data fetch. onUnsubscribe fires in real time when a client unsubscribes from a topic (adapter 0.4.0+), and also when the WebSocket closes for any remaining topics. Export both hooks from your hooks.ws.js:

export { message, close, unsubscribe } from 'svelte-realtime/server';

onUnsubscribe fires for both static and dynamic topics. For dynamic topics, the server tracks which stream produced each subscription and fires the correct hook. The unsubscribe hook fires as soon as the client drops a topic; close only fires for topics still active at disconnect time. There is no double-firing.

The third argument remainingSubscribers counts OTHER WebSockets still holding a realtime-stream subscription to the topic after the current one drops. Use it to tear down upstream feeds (CDC connections, polling loops, external pub/sub follows) when the count reaches zero. Existing 2-argument (ctx, topic) => ... handlers continue to work; the third arg is silently ignored.

Staleness watchdog and per-stream onError

Streams whose underlying source can quietly stop emitting (CDC drops, polling stalls, upstream cache evicts the key) declare staleAfterMs to arm a per-topic watchdog. Every ctx.publish to the topic resets the timer; if no events arrive for the configured duration, the realtime layer re-runs the loader and broadcasts the result as a refreshed event. The client merges refreshed as a full-state replacement across every merge strategy.

export const auditFeed = live.stream(
  (ctx, orgId) => `audit:${orgId}`,
  async (ctx, orgId) => loadAudit(orgId),
  {
    merge: 'crud',
    key: 'id',
    staleAfterMs: 30_000,
    onError: (err, ctx, topic) => log.warn({ err, topic }, 'audit stream error')
  }
);

Watchdog state is per-topic. Multiple subscribers share one timer; the timer arms on the first subscribe and clears when the last subscriber leaves. The reload uses the first subscriber's ctx and args, which is correct for shared topics since the loader's output is identical regardless of which subscriber's ctx triggers it.

onError(err, ctx, topic) is observer-only. It fires on loader throws across three paths - the initial subscribe, the staleness-driven reload, and the .load() SSR path. Errors thrown inside onError are silently swallowed so a buggy logger never breaks the original error path. Apps that want a topic-scoped degraded signal can publish a system event from inside the handler:

onError: (err, ctx, topic) => {
  ctx.publish(`__system:${topic}`, 'degraded', { reason: err.message });
}

Topic-driven invalidation

For mutations whose effects don't fit the merge-strategy model cleanly (bulk operations, server-side recomputation, cascading writes), declare an invalidateOn pattern so any matching ctx.publish triggers a loader rerun:

export const todos = live.stream('todos', loadTodos, {
  merge: 'crud',
  invalidateOn: 'todos:*'
});

// Anywhere in your live functions:
ctx.publish('todos:bulk-imported', 'created', { count: 42 });
// -> matches 'todos:*', the todos loader reruns, the result is broadcast
//    as a 'refreshed' event, and every subscriber gets the new state.

invalidateOn accepts a single string or an array of strings. * is a wildcard that matches any sequence of one or more characters; other regex specials are escaped. Multiple patterns are OR-ed (any match triggers the reload).

The reload reuses the staleness machinery: it captures the first subscriber's ctx + args, applies any configured init transform, and publishes a refreshed event to the stream's own topic. The client merges refreshed as a full-state replacement.

refreshed events are themselves excluded from the invalidation check, so a stream whose own topic happens to match its invalidateOn pattern (e.g., 'todos*' matching topic 'todos') won't loop. Concurrent triggers while a reload is in flight are deduped via a per-watcher reloading flag.

Errors thrown by the loader during an invalidateOn reload route through the same onError(err, ctx, topic) observer as staleness-driven reloads.


Access control

Use the filter / access option on live.stream() to control who can subscribe. The predicate receives ctx and is checked once at subscription time. If it returns false, the subscription is denied with { ok: false, code: 'FORBIDDEN', error: 'Access denied' } and no data is sent. For per-event projection or filtering on a live stream, use the transform option on live.stream({ transform }).

import { live } from 'svelte-realtime/server';

// Only admins can subscribe
export const adminFeed = live.stream('admin-feed', async (ctx) => {
  return db.adminEvents.recent();
}, {
  merge: 'crud',
  access: (ctx) => ctx.user?.role === 'admin'
});

// Role-based: different roles get different access
export const items = live.stream('items', async (ctx) => {
  return db.items.all();
}, {
  merge: 'crud',
  access: live.access.role({
    admin: true,
    viewer: false
  })
});

For per-user data isolation, use dynamic topics so each user subscribes to their own topic:

// Each user gets their own topic - no cross-user data leakage
export const myOrders = live.stream(
  (ctx) => `orders:${ctx.user.id}`,
  async (ctx) => db.orders.forUser(ctx.user.id),
  { merge: 'crud', key: 'id' }
);
Helper Description
live.access.owner(field?) Subscription allowed if ctx.user[field] is present (default: 'id')
live.access.team() Subscription allowed if ctx.user.teamId is present
live.access.role(map) Role-based: { admin: true, viewer: (ctx) => ... }
live.access.org(opts?) Subscription allowed if args[0] matches ctx.user.organization_id. Configurable via { from, orgField }
live.access.user(opts?) Subscription allowed if args[0] matches ctx.user.user_id. Configurable via { from, userField }
live.access.any(...predicates) OR: any predicate returning true allows the subscription
live.access.all(...predicates) AND: all predicates must return true

live.access.org and live.access.user follow the SQL [table]_id convention. Override the field for non-default user shapes:

// Stream that takes an orgId arg and verifies the caller belongs to that org
export const auditFeed = live.stream(
  (ctx, orgId) => `audit:${orgId}`,
  async (ctx, orgId) => loadAudit(orgId),
  { access: live.access.org() }
);

// Custom user-shape (e.g. SvelteKit locals.user with camelCase)
access: live.access.org({ orgField: 'organizationId' })

Authentication shorthand on guards

guard() accepts an options object alongside the existing variadic-function shape. { authenticated: true } rejects calls with no ctx.user as UNAUTHENTICATED:

// src/live/_guard.js
import { guard } from 'svelte-realtime/server';

// Bare authenticated check
export const _guard = guard({ authenticated: true });

// Compose with custom predicates
export const _guard = guard({ authenticated: true }, (ctx) => ctx.user.role === 'admin');

Bare Error throws from any guard auto-classify: thrown errors against an anonymous user produce LiveError('UNAUTHENTICATED'); thrown errors with a user produce LiveError('FORBIDDEN'). Original errors travel on .cause for server-side logging. Throw LiveError(code, message) explicitly to control the wire-visible code and message verbatim.

live.scoped(predicate, fn)

Wrap an RPC handler with a per-call access predicate that throws UNAUTHENTICATED (no user) or FORBIDDEN (predicate rejects). Composes with live.validated, live.rateLimit, etc.

export const editOrgSettings = live.scoped(
  live.access.org(),
  live.validated(SettingsSchema, async (ctx, orgId, patch) => {
    return db.orgs.update(orgId, patch);
  })
);

For streams, use the access option directly. live.scoped is the RPC equivalent.

Subscribe-denial codes on stream stores

When a server-side subscribe denial fires (guard rejection, access predicate, rate limit, invalid topic), the typed reason flows through the stream store's error slot as an RpcError with the canonical code:

<script>
  import { auditFeed } from '$live/audit';
  const err = auditFeed.error;
</script>

{#if $err?.code === 'UNAUTHENTICATED'}
  <p>Please sign in to view audit history.</p>
{:else if $err?.code === 'FORBIDDEN'}
  <p>You don't have access to this organization's audit log.</p>
{:else if $err?.code === 'RATE_LIMITED'}
  <p>Too many requests. Please wait a moment.</p>
{:else if $err}
  <p>Audit feed unavailable: {$err.message}</p>
{/if}

Custom denial reasons returned from a server-side subscribe hook (e.g. 'KYC_PENDING') flow through verbatim as the code field. The same denial fans out to every stream subscribed to the topic.


Rate limiting

Per-function rate limiting

Use live.rateLimit() to apply a sliding window rate limiter to a single function:

export const sendMessage = live.rateLimit({ points: 5, window: 10000 }, async (ctx, text) => {
  const msg = await db.messages.insert({ userId: ctx.user.id, text });
  ctx.publish('messages', 'created', msg);
  return msg;
});

Registry-level rate limiting

For the common case of "a default for everyone, with a few path overrides and a few exemptions" you can configure the registry once at startup with live.rateLimits():

// hooks.ws.js or any startup module
import { live } from 'svelte-realtime/server';

live.rateLimits({
  default: { points: 200, window: 10_000 },
  overrides: {
    'chat/sendMessage': { points: 50, window: 10_000 },
    'orders/create':    { points: 5,  window: 60_000 }
  },
  exempt: ['presence/moveCursor', 'cursor/move']
});

Resolution order per call: exempt -> per-handler live.rateLimit(...) wrapping (explicit wins over central) -> overrides[path] -> default -> none. Stream subscribes are not rate-limited by this primitive. Pass null to clear the registry.

Global rate limiting with Redis

Use the beforeExecute hook with the rate limit extension for global per-connection throttling:

import { createMessage, LiveError } from 'svelte-realtime/server';
import { createRedis, createRateLimit } from 'svelte-adapter-uws-extensions/redis';

const redis = createRedis();
const limiter = createRateLimit(redis, { points: 30, interval: 10000 });

export const message = createMessage({
  async beforeExecute(ws, rpcPath) {
    const { allowed, resetMs } = await limiter.consume(ws);
    if (!allowed)
      throw new LiveError('RATE_LIMITED', `Too many requests. Retry in ${Math.ceil(resetMs / 1000)}s`);
  }
});

Load shedding

Under sustained pressure, drop low-priority traffic before it reaches the handler. live.admission(config) registers named pressure rules; ctx.shed(className) checks them; the classOfService stream option gates new subscribes automatically.

import { live } from 'svelte-realtime/server';

// Register classes once at startup
live.admission({
  classes: {
    background: ['PUBLISH_RATE', 'SUBSCRIBERS', 'MEMORY'],   // shed under any pressure
    nonCritical: ['MEMORY'],                                  // shed only on memory pressure
    realtime: (snapshot) => snapshot.publishRate > 8000      // custom predicate
  }
});

// Stream gating: new subscribes shed under matching pressure
export const browseList = live.stream('browse:list', loader, {
  merge: 'crud',
  classOfService: 'background'
});

// RPC gating: per-call decision
export const expensiveSearch = live(async (ctx, query) => {
  if (ctx.shed('background')) {
    throw new LiveError('OVERLOADED', 'Server is busy, try again shortly');
  }
  return search(query);
});

platform.pressure.reason is a precedence-ordered enum (MEMORY > PUBLISH_RATE > SUBSCRIBERS > NONE); class rules using a string-array match if the active reason is in the array. Predicate rules receive the full pressure snapshot. Existing subscribers are unaffected - shedding applies to NEW subscribes and RPC calls only.

live.admission() validates rules at registration; unknown reasons throw with a [svelte-realtime]-prefixed error so typos fail fast at boot. Pass null to clear.


Concurrency control

live.lock(keyOrConfig, fn) serializes concurrent RPC calls that resolve to the same key. Composes with live.validated, live.idempotent, live.rateLimit, etc.

// Per-org leaderboard recompute: one in-flight per org, others wait for the result
export const recomputeLeaderboard = live.lock(
  (ctx) => `leaderboard:${ctx.user.organization_id}`,
  async (ctx) => {
    const rows = await db.expensive.recompute(ctx.user.organization_id);
    ctx.publish(`org:${ctx.user.organization_id}:leaderboard`, 'set', rows);
    return rows;
  }
);

// Static key (single global section)
export const rebuildSearchIndex = live.lock('search-index-rebuild', async (ctx) => {
  return rebuildIndex();
});

// Distributed lock via the extensions package
import { createDistributedLock } from 'svelte-adapter-uws-extensions/redis/lock';
const distributedLock = createDistributedLock(redis);

export const settleInvoice = live.lock(
  { key: (ctx, id) => `invoice:${id}`, lock: distributedLock },
  live.validated(InvoiceIdSchema, async (ctx, id) => settle(id))
);

Concurrent callers wait in FIFO order for the holder's result. A null / undefined / empty key bypasses the lock (the handler runs unguarded for that call). Custom lock implementations need a single method: withLock(key, fn, opts?) -> Promise<result>.

Default lock is in-process and bounded. For multi-instance deployments, pass lock: createDistributedLock(redis) from the extensions package - any object exposing the withLock(key, fn, opts?) contract works.

Bounded wait with maxWaitMs

Pass maxWaitMs (in the config-object form) to bound how long a queued caller will wait before giving up. On timeout the wrapper rejects with LiveError('LOCK_TIMEOUT', ...) so the client receives a typed error with .code === 'LOCK_TIMEOUT' (plus .key and .maxWaitMs fields for observability).

export const settleInvoice = live.lock(
  { key: (ctx, id) => `invoice:${id}`, maxWaitMs: 5000 },
  async (ctx, id) => settle(id)
);

The current holder is not interrupted when a waiter times out - only the waiting caller gives up. Subsequent waiters on the same key are unaffected and continue in their original order. This is the right primitive for "fail fast under contention" rather than "cancel work in flight."

For custom lock implementations, the option is forwarded as the third argument: lockInst.withLock(key, fn, { maxWaitMs }). The default in-process lock and createDistributedLock from the extensions package both honor it.


Server-initiated push

live.push({ userId }, event, data, options?) sends a server-initiated request to a connected user and awaits the reply. Routes through a per-instance userId -> WebSocket registry maintained by a small pair of hooks.

Trust model: live.push and live.notify are server-trust primitives. The userId you pass is whatever you pass; the framework does NOT check that the calling context is allowed to address that user. If your call site interpolates a wire-supplied value, read Trust model: target.userId is whatever the caller passes below before shipping.

// hooks.ws.js - wire the registry once
import { pushHooks } from 'svelte-realtime/server';

export const open = pushHooks.open;
export const close = pushHooks.close;
// Anywhere on the server (admin RPC, cron, webhook receiver, etc.)
import { live } from 'svelte-realtime/server';

const reply = await live.push(
  { userId: 'u-123' },
  'confirm-delete',
  { itemId: 42 },
  { timeoutMs: 30_000 }
);
if (reply.confirmed) await actuallyDelete(42);
<!-- Client: register handlers per event -->
<script>
  import { onPush } from 'svelte-realtime/client';

  onPush('confirm-delete', async ({ itemId }) => {
    return { confirmed: confirm(`Delete item ${itemId}?`) };
  });
</script>

The default identifier reads ws.getUserData()?.user_id ?? ws.getUserData()?.userId. Override for custom userData shapes:

import { live } from 'svelte-realtime/server';
live.configurePush({ identify: (ws) => ws.getUserData()?.account?.id });

Rejects with a typed LiveError so callers can discriminate via err.code instead of string-matching err.message:

err.code When
VALIDATION Bad target / event / options / timeoutMs (caught at the call site, before any wire I/O).
NOT_FOUND No connection registered for the userId (and no remoteRegistry is configured).
TIMEOUT Recipient did not reply within timeoutMs (default 5000ms).
import { live, LiveError } from 'svelte-realtime/server';

try {
  const reply = await live.push({ userId }, 'confirm-delete', { itemId }, { timeoutMs: 8000 });
  // ...use reply...
} catch (err) {
  if (err instanceof LiveError) {
    switch (err.code) {
      case 'NOT_FOUND': /* user has no active connection */ break;
      case 'TIMEOUT':   /* recipient saw the prompt but didn't respond */ break;
      case 'VALIDATION': /* bad arguments - programmer error, surface in dev only */ break;
    }
  }
  // Anything else is either a recipient-thrown handler error (caller-defined
  // shape) or `Error('connection closed')` from the adapter - those pass
  // through unchanged.
}

Message text on the wrapped TIMEOUT is preserved verbatim from the underlying primitive ('request timed out'), so existing substring-matching callers keep working while they migrate to the structured err.code.

Multi-device users see most-recent-connection-wins routing within each instance, and cluster-wide most-recent-wins via the registry's Redis hash when cluster routing is configured (see Cluster routing below). Older connections still receive topic publishes via their own subscriptions; only push routing flips. Anonymous connections (identify returning null/undefined) are silently skipped at registration so they cannot be push targets.

onPush(event, handler) multiplexes multiple events over the adapter's single onRequest channel. Returning a value sends it as the reply; throwing rejects the server-side promise. Returns an unsubscribe function.

Trust model: target.userId is whatever the caller passes

live.push and live.notify are server-trust primitives. Calling live.push({ userId: someId }, event, data) delivers event to whichever connection is registered under someId, full stop. The framework does NOT check that the server context invoking the push is allowed to address that user. It cannot - your authorization model lives outside the framework.

This matters most when a message handler interpolates a wire-supplied target:

// BUG: `msg.to` is client-controlled. Any authenticated user can push
// arbitrary events to any other user - including admins, including
// users in other tenants.
export const dm = live(async (ctx, msg) => {
  await live.notify({ userId: msg.to }, 'message', {
    from: ctx.user.id,
    text: msg.text
  });
});

The exploit shape is one line of client code: dm.invoke({ to: 'admin-1', text: 'fake admin notification' }). The fix is a tiny ownership check at the handler boundary - keep it generic and reuse it everywhere the handler routes to a userId from the wire:

import { live, RpcError } from 'svelte-realtime/server';

function mustOwnUser(ctx, targetUserId) {
  if (ctx.user?.id === targetUserId) return;        // self-targeted (allowed)
  if (ctx.user?.role === 'admin') return;           // admins can address anyone
  if (sameTenant(ctx.user, targetUserId)) return;   // tenant peers
  throw new RpcError('FORBIDDEN', 'You cannot push to that user');
}

export const dm = live(async (ctx, msg) => {
  mustOwnUser(ctx, msg.to); // <-- decision lives here, not in live.push
  await live.notify({ userId: msg.to }, 'message', {
    from: ctx.user.id,
    text: msg.text
  });
});

The rule of thumb: every userId you hand to live.push / live.notify must come from a value the server trusts. Safe sources:

  • ctx.user.id - you put it there in upgrade(); the framework guarantees provenance.
  • A database row your server just looked up (the DB is server-trusted).
  • A cron job's iteration target (cron payloads are server-authored).
  • A webhook payload after you've verified the source and confirmed it's allowed to address that userId. The userId field of an unverified third-party webhook is just another wire value.

Unsafe sources without an ownership check:

  • msg.targetUserId, payload.to, args.recipient - any field that arrived on the WebSocket wire.
  • searchParams.get('user') on a webhook endpoint - same shape, different transport.
  • A row id chosen by a client even if you fetched the row server-side - the row is trusted, but the choice of WHICH row to fetch was not.

The same contract applies to any future push-target shape ({ group, role, tenant }, etc.): the framework treats the target as an instruction, not as an authorization claim. Authorization is your handler's job; the framework is the delivery primitive.

Fire-and-forget: live.notify

For server-initiated events where you don't need a reply - progress notifications, "upload complete" pings, "new message available" hints, cron-driven price ticks fanned out to many users - use live.notify(target, event, data) instead of live.push:

// Inside an upload completion handler:
live.notify({ userId: upload.userId }, 'upload:complete', { id: upload.id });
// Returns Promise<void> immediately. No await needed.
// If the user is offline, silently drops - they'll see the result on next page load.

The wire path is identical to live.push: the client's onPush(event, handler) still fires for the event. The difference is caller-side - live.notify returns immediately without waiting for the handler's return value, and it never rejects in normal operation (offline user, timeout, client handler error - all silent). Validation throws synchronously for programming errors (bad target, empty event name) so those still surface loud at the call site.

Don't use live.push({ timeoutMs: 0 }) for fire-and-forget. It throws synchronously since timeoutMs must be positive. Wrapping the throw in .catch(() => {}) silently swallows it - the push never fires, the recipient never sees anything. Use live.notify instead.

Cluster routing

For multi-instance deploys, wire the connection registry from svelte-adapter-uws-extensions so a live.push originating on any instance reaches the user's owning instance. With svelte-adapter-uws >= 0.5.0-next.15, the init({ platform }) hook is the recommended call site - the Redis client is connected by then and the registry is wired before the first upgrade / open runs:

// hooks.ws.js
import { live } from 'svelte-realtime/server';
import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
import { createConnectionRegistry } from 'svelte-adapter-uws-extensions/redis/registry';

const redis = createRedisClient({ url: process.env.REDIS_URL });
const registry = createConnectionRegistry(redis, {
  identify: (ws) => ws.getUserData()?.userId
});

export function init({ platform }) {
  // Tell live.push to fall back to the registry for cross-instance lookups.
  live.configurePush({ remoteRegistry: registry });
}

// Wire the registry's own connection hooks (NOT pushHooks.* in this mode --
// the registry tracks ownership in Redis and short-circuits same-instance
// requests internally).
export const open = registry.hooks.open;
export const close = registry.hooks.close;

Lookup order inside live.push:

  1. Local registry - the per-instance Map populated by pushHooks.open / pushHooks.close. Resolves directly via platform.request(ws, ...) with no I/O.
  2. Remote registry - when configured via live.configurePush({ remoteRegistry }), used as a fallback when the userId is not registered locally. The extensions registry looks up the owning instance in Redis and either short-circuits to a local platform.request or forwards the envelope on a per-instance push channel and awaits the reply.

Errors with a remote registry come from the registry layer: typically an offline rejection when the user has no active connection cluster-wide, a timeout when routing succeeded but the client did not reply within timeoutMs, or a propagated handler error from the receiving instance. The realtime layer translates "timed out" rejections from either path into LiveError('TIMEOUT') so callers see the same code regardless of whether the timeout originated in the local adapter primitive or the remote registry; offline / handler-error shapes pass through unchanged so the caller can distinguish them.

You can wire BOTH (pushHooks.* + remoteRegistry); the local Map wins when an entry is present and the remote registry is consulted only as fallback. In practice pick one of the two patterns - the registry-only setup is simpler and the registry already does same-instance short-circuit on its own.

Single-instance setups without a registry continue to throw LiveError('NOT_FOUND') for unknown userIds, unchanged.


Prometheus metrics

Opt-in instrumentation for RPC calls, stream subscriptions, and cron executions. Zero overhead if not called.

live.metrics(registry) is a one-time setup call. The top of src/hooks.ws.{js,ts} is a natural place, since it loads once when the server boots. Pair it with the createMetrics() registry from svelte-adapter-uws-extensions/prometheus:

// src/hooks.ws.js
import { live } from 'svelte-realtime/server';
import { createMetrics } from 'svelte-adapter-uws-extensions/prometheus';

const metrics = createMetrics();

live.metrics({
  counter:   ({ name, help, labelNames }) => metrics.counter(name, help, labelNames),
  histogram: ({ name, help, labelNames }) => metrics.histogram(name, help, labelNames),
  gauge:     ({ name, help, labelNames }) => metrics.gauge(name, help, labelNames)
});

export { message, close, unsubscribe } from 'svelte-realtime/server';

Mount the metrics endpoint on your uWS app (typically in svelte.config.js or wherever you build the app):

app.get('/metrics', metrics.handler);

The six-line shim adapts realtime's options-object call shape to the extensions registry's positional create methods. Once a metric is registered, every increment, observation, and gauge update flows directly to the extensions registry, so the emitted Prometheus output is exactly what metrics.serialize() produces.

Registered metrics

  • svelte_realtime_rpc_total - RPC call count by path and status
  • svelte_realtime_rpc_duration_seconds - RPC latency by path
  • svelte_realtime_rpc_errors_total - RPC errors by path and code
  • svelte_realtime_stream_subscriptions - active stream subscription gauge
  • svelte_realtime_cron_total - cron execution count by path and status
  • svelte_realtime_cron_errors_total - cron errors by path
  • svelte_realtime_assertion_violations_total - production-assertion violations by category (see "Production assertions" below)

Production assertions

svelte-realtime instruments each internal invariant with assert(cond, category, context). Behavior:

  • In production, a violation increments an in-memory per-category counter, fires the Prometheus counter svelte_realtime_assertion_violations_total{category} (when live.metrics(...) is wired), and logs a single [realtime/assert] {...} line at console.error. The assert does NOT throw - a thrown exception inside a publish hot-path microtask or a subscribe callback could leave a half-applied bookkeeping update or a corrupted index. Counter + log give observability without the corruption risk.
  • In test mode (process.env.VITEST or NODE_ENV === 'test') the assert THROWS so vitest surfaces the failure as a normal test error.

Categories are stable strings prefixed realtime/<module>.<invariant> (so the Prometheus label cardinality is bounded and won't collide with the adapter's extensions_assertion_violations_total). Today's categories:

Category Where
realtime/handleRpc.envelope.non-empty RPC frame has non-empty rpc and id
realtime/subscription.bookkeeping.ws-was-tracked Unsubscribe path: ws was in the topic set
realtime/push-registry.entry-tracked Close hook: registry entry exists for userId
realtime/lock.waiter.shape Dequeued lock waiter has resolve+reject
realtime/optimistic.queue.serverValue-iff-nonempty Server-merge path: _serverValue set when queue non-empty
realtime/optimistic.queue.drain-precondition _drainQueue called only with empty queue
realtime/optimistic.queue.entry.shape Settle path: queue entry has expected fields

Read the live counter map programmatically:

import { getAssertionCounters } from 'svelte-realtime/server';

setInterval(() => {
  for (const [category, count] of getAssertionCounters()) {
    if (count > 0) console.warn(`assertion ${category}: ${count} violations`);
  }
}, 60_000);

The client-side assert helper is exported from svelte-realtime/client with the same shape (sans Prometheus wiring - the browser has no metrics registry; use the in-memory counter or log shipping instead).

Registry shape

live.metrics() accepts any object exposing:

  • counter({ name, help, labelNames }) -> { inc(labels?) }
  • histogram({ name, help, labelNames }) -> { observe(labels, valueSeconds) }
  • gauge({ name, help }) -> { inc(), dec() }

If you prefer prom-client, wire it the same way: counter: ({ name, help, labelNames }) => new client.Counter({ name, help, labelNames, registers: [register] }), and likewise for Histogram and Gauge.


Circuit breaker

Wrap a stream or RPC init function with a circuit breaker from svelte-adapter-uws-extensions. When the breaker is open, returns a fallback value or throws SERVICE_UNAVAILABLE.

import { live } from 'svelte-realtime/server';
import { createBreaker } from 'svelte-adapter-uws-extensions/breaker';

const dbBreaker = createBreaker({ threshold: 5, resetMs: 30000 });

export const items = live.stream('items',
  live.breaker({ breaker: dbBreaker, fallback: [] }, async (ctx) => {
    return db.items.list();
  })
);

If fallback is omitted and the circuit is open, the call throws LiveError('SERVICE_UNAVAILABLE', ...).


Request correlation

Every live function receives ctx.requestId - a stable identifier for the originating RPC envelope. The id flows in three directions automatically:

  • From clients: WebSocket clients tag every RPC envelope with a generated id; HTTP request handlers honor X-Request-ID headers (and generate one if absent). The adapter writes the resolved id to platform.requestId, and the realtime layer copies it to ctx.requestId for the duration of that handler.
  • Through your handlers: pass it to whatever you do downstream so log lines, traces, and persisted rows all share one key.
  • Into background work: the svelte-adapter-uws-extensions postgres tasks/jobs APIs persist request_id on every row. Pass it explicitly or pipe ctx.platform and the helpers extract it for you.
import { live } from 'svelte-realtime/server';
import { createTasks } from 'svelte-adapter-uws-extensions/postgres/tasks';
import { createJobs } from 'svelte-adapter-uws-extensions/postgres/jobs';

const tasks = createTasks({ client: pgClient });
const jobs = createJobs({ client: pgClient });

export const submitOrder = live(async (ctx, input) => {
  // Option A: explicit - works anywhere ctx.requestId is in scope
  const order = await tasks.run('processOrder', input, {
    requestId: ctx.requestId
  });

  // Option B: pipe ctx.platform; the helpers read platform.requestId for you
  await jobs.enqueue('shipment-notice', { orderId: order.id }, {
    platform: ctx.platform
  });

  return order;
});

Once the rows land, you can join them back to the originating RPC for debugging or audit:

-- Trace one user request across the websocket -> task -> job pipeline
SELECT
  t.svti_tasks_id, t.name AS task_name, t.status, t.created_at AS task_created,
  j.svti_jobs_id,  j.queue,             j.attempts,
  t.request_id
FROM ws_tasks t
LEFT JOIN ws_jobs j ON j.request_id = t.request_id
WHERE t.request_id = $1
ORDER BY task_created;

The id passes opaquely - no validation, no length cap from the realtime layer. If you generate ids with a structured prefix (o- for orders, a- for audits), every downstream record carries that prefix too.

If you also instrument with Prometheus metrics, include requestId in your log fields rather than as a metric label - it's high-cardinality and would blow up your label space.


Cron scheduling

Use live.cron() to run server-side functions on a schedule and publish results to a topic.

import { live } from 'svelte-realtime/server';

export const refreshStats = live.cron('*/5 * * * *', 'stats', async () => {
  return { users: await db.users.count(), orders: await db.orders.todayCount() };
});

The cron function publishes its return value as a set event on the given topic. Pair it with a merge: 'set' stream:

export const stats = live.stream('stats', async (ctx) => {
  return db.stats();
}, { merge: 'set' });

The function receives a ctx object with publish, throttle, debounce, and signal - the same helpers available in RPC handlers (minus user and ws, since cron runs outside a connection). Use ctx.publish for fine-grained control, e.g. publishing individual created/deleted events on a crud stream:

export const cleanup = live.cron('0 * * * *', 'boards', async (ctx) => {
  const stale = await listStaleBoards();
  for (const board of stale) {
    await deleteBoard(board.board_id);
    ctx.publish('boards', 'deleted', { board_id: board.board_id });
  }
  // returning undefined skips the automatic 'set' publish
});

If the function returns a value, it is published as a set event (same as before). If it returns undefined, no automatic publish happens - this lets you use ctx.publish exclusively without an unwanted set event overwriting your crud updates.

Cron expressions use 5 fields: minute hour day month weekday. Supported syntax: *, single values, ranges (9-17), lists (0,15,30), and steps (*/5).

Wiring the platform

Cron registers at module load but the tick can only publish once a platform reference has been captured. With svelte-adapter-uws >= 0.5.0-next.15, wire it from the new init({ platform }) hook so the tick is ready from boot:

// src/hooks.ws.js
import { setCronPlatform, pushHooks, message, upgrade } from 'svelte-realtime/server';

export { upgrade, message };
export const open = pushHooks.open;
export const close = pushHooks.close;

export function init({ platform }) {
  setCronPlatform(platform);
}

On older adapters (open(ws, platform) is the only available hand-off point), call setCronPlatform(platform) from open instead. Cron ticks fired before the first connection log a single deduped warning and become no-ops.

Cluster mode

Each worker process runs its own cron tick. In a single-process deployment that's exactly what you want. In a clustered deployment - whether CLUSTER_MODE=reuseport on Linux (N kernel workers per replica), acceptor mode on Windows / macOS (N internal workers per process), or N Docker replicas, or any combination - every worker fires every job in parallel by default. For "send the daily summary at 9am" jobs, that's almost certainly wrong.

Wire a cluster-wide leader gate via configureCron({ leader, bus }). The canonical leader implementation lives in svelte-adapter-uws-extensions/redis/leader (Redis SETNX lease) and the canonical bus is svelte-adapter-uws-extensions/redis/pubsub:

// src/hooks.ws.js (clustered, with extensions)
import { setCronPlatform, configureCron } from 'svelte-realtime/server';
import { createLeader } from 'svelte-adapter-uws-extensions/redis/leader';
import { createPubSubBus } from 'svelte-adapter-uws-extensions/redis/pubsub';

const redis = ...; // your shared Redis client

const leader = createLeader(redis); // instanceId defaults to a random hex; override for diagnostics
const bus = createPubSubBus(redis);

export async function init({ platform }) {
  await bus.activate(platform);
  setCronPlatform(platform);
  configureCron({ leader: leader.isLeader, bus });
}

export async function shutdown() {
  // Best-effort lease release so a sibling can take over within renewMs
  // (default 10s) instead of waiting for the full lease to expire.
  await leader.stop();
  await bus.deactivate();
}

Behavior with a leader configured:

  • Only the worker whose leader() returns true proceeds with the tick. All other workers exit early and increment a cron{status:'not-leader'} metric.
  • A throwing leader() is fail-closed: this worker skips the tick and logs in dev. Better to miss a tick than to double-fire because the leader-election machinery is broken.
  • Single-flight, single-fire semantics still apply per cron path within the elected worker.

Why also pass bus?

With only leader configured, the elected worker's cron publishes still go to uWS subscribers on that worker's process only. Subscribers connected to non-leader instances see nothing - because no other worker independently produced the publish. Wiring bus routes every cron fire through bus.wrap(platform) so the leader's publishes relay over the bus's pubsub channel and reach every cluster instance. This applies to both the return value auto-publish and the cron handler's own ctx.publish(...).

bus is the extensions-package pubsub bus (redis/pubsub for the broadcast channel, redis/sharded-pubsub for per-shard channels at scale). svelte-realtime consumes it structurally as { wrap(platform): wrapped } - any pubsub primitive exposing that shape works.

Setting leader without bus emits a single dev warning at configureCron time, since cluster intent without cluster fan-out is almost always a misconfig. Production deployments don't see the warning (it's _IS_DEV-gated like the rest of the cron diagnostics), so suppress is via fixing the wiring rather than via a config flag.

Without a leader configured (the default), every worker fires every job. svelte-realtime stays cluster-agnostic by design; the cluster transport (Redis or otherwise) is the extensions package's domain.


Derived streams

Server-side computed streams that recompute when any source topic publishes.

import { live } from 'svelte-realtime/server';

export const dashboardStats = live.derived(
  ['orders', 'inventory', 'users'],
  async () => {
    return {
      totalOrders: await db.orders.count(),
      lowStock: await db.inventory.lowStockCount(),
      activeUsers: await db.users.activeCount()
    };
  },
  { debounce: 500 }
);

On the client, derived streams work like regular streams:

<script>
  import { dashboardStats } from '$live/dashboard';
</script>

<p>Orders: {$dashboardStats?.totalOrders}</p>

Dynamic derived streams

When source topics depend on runtime arguments (e.g., an org ID, a room ID), pass a source factory function instead of a static array. The factory receives the same args the client passes at subscribe time:

export const orgStats = live.derived(
  (orgId) => [`memberships:${orgId}`, `emails:${orgId}`, `audit:${orgId}`],
  async (ctx, orgId) => {
    const [members, emails, auditCount] = await Promise.all([
      db.query('SELECT count(*) FROM memberships WHERE org_id = $1', [orgId]),
      db.query('SELECT count(*) FROM emails WHERE org_id = $1', [orgId]),
      db.query('SELECT count(*) FROM audit_log WHERE org_id = $1', [orgId])
    ]);
    return { members, emails, auditCount };
  },
  { debounce: 100 }
);

On the client, dynamic derived streams are called like functions:

<script>
  import { orgStats } from '$live/dashboard';
  let { orgId } = $props();
</script>

<p>Members: {$orgStats(orgId)?.members}</p>

Each unique set of args creates an independent instance with its own source subscriptions. Instances are created when the first subscriber connects and cleaned up when the last subscriber disconnects.

Activation

Call _activateDerived(platform) in your open hook to enable derived stream listeners:

import { _activateDerived } from 'svelte-realtime/server';

export function open(ws, { platform }) {
  _activateDerived(platform);
}

Without this call, derived streams will still serve their initial SSR data but will never receive live updates. In dev mode, a console warning is emitted when a client subscribes to a derived stream and _activateDerived has not been called.

Dynamic derived compute functions receive ctx.user from the subscribing client, so auth checks like if (orgId !== ctx.user.organization_id) throw new LiveError("FORBIDDEN") work the same as they do in regular stream handlers.

Option Default Description
merge 'set' Merge strategy for the derived topic
debounce 0 Debounce recomputation by this many milliseconds

Effects

Server-side reactive side effects that fire when source topics publish. Fire-and-forget - no topic, no client subscription.

// src/live/notifications.js
import { live } from 'svelte-realtime/server';

export const orderNotifications = live.effect(['orders'], async (event, data, platform) => {
  if (event === 'created') {
    await email.send(data.userEmail, 'Order confirmed', templates.orderConfirm(data));
  }
});

Effects are server-only. They fire whenever a matching topic publishes and cannot be subscribed to from the client.


Aggregates

Real-time incremental aggregations. Reducers run on each event, maintaining O(1) state.

// src/live/stats.js
import { live } from 'svelte-realtime/server';

export const orderStats = live.aggregate('orders', {
  count: { init: () => 0, reduce: (acc, event) => event === 'created' ? acc + 1 : acc },
  total: { init: () => 0, reduce: (acc, event, data) => event === 'created' ? acc + data.amount : acc },
  avg: { compute: (state) => state.count > 0 ? state.total / state.count : 0 }
}, { topic: 'order-stats' });

The aggregate publishes its state to the output topic on every event. Clients subscribe to the output topic as a regular stream.

Time windows

Pass a windows option to maintain one state slice per declared window with its own output topic. Three window types:

  • lifetime - never resets. Equivalent to a single-state aggregate, exposed as a named output for symmetry.
  • tumbling - boundary-anchored. period: 'minute' | 'hour' | 'daily' | 'monthly' resets at the configured tz's natural boundary; durationMs + anchor resets at fixed intervals from a custom epoch. On boundary cross, the closing window publishes one final pre-reset state, then state is init()-cleared for the new window.
  • sliding - hop-window. durationMs + slideMs partitions state into ceil(durationMs / slideMs) hop buckets. Each event reduces into the current hop; on each slide tick, drop the oldest bucket and start a new one. Reducers MUST provide a combine(...buckets) field so cross-bucket state can be recomputed.
import { live, combineCounts } from 'svelte-realtime/server';

export const trending = live.aggregate('events:view', {
  counts: {
    init: () => ({}),
    reduce: (acc, event, data) => event === 'viewed'
      ? { ...acc, [data.itemId]: (acc[data.itemId] ?? 0) + 1 }
      : acc,
    combine: combineCounts
  },
  top: {
    compute: (state) => Object.entries(state.counts)
      .sort((a, b) => b[1] - a[1])
      .slice(0, 10)
      .map(([itemId, count]) => ({ itemId, count }))
  }
}, {
  topic: 'events:view:topk',
  windows: {
    last10min: { type: 'sliding',  durationMs: 600_000, slideMs: 30_000 },
    today:     { type: 'tumbling', period: 'daily',   tz: 'UTC' },
    thisMonth: { type: 'tumbling', period: 'monthly', tz: 'UTC' },
    lifetime:  { type: 'lifetime' }
  }
});

Output topics: events:view:topk:last10min, events:view:topk:today, events:view:topk:thisMonth, events:view:topk:lifetime. The client export becomes a namespace object keyed by window name:

<script>
  import { trending } from '$live/topk';
</script>

<h2>Last 10 min</h2>
<ul>{#each $trending.last10min?.top ?? [] as row}<li>{row.itemId}: {row.count}</li>{/each}</ul>

<h2>Today</h2>
<ul>{#each $trending.today?.top ?? [] as row}<li>{row.itemId}: {row.count}</li>{/each}</ul>

Built-in combine helpers

For the common reducer shapes:

Helper Reducer state shape
combineSum number
combineMax number
combineMin number
combineCounts Record<string, number>
combineMerge Record<string, any> (last-write-wins per key)

Hand-roll your own combine(...buckets) for non-trivial reducers (top-K, percentile sketches, custom shapes).

Per-window snapshots

The single snapshot option only restores the single-state form. For windowed aggregates, pass snapshots keyed by window name - each restores one window's state on registration in parallel:

{
  topic: 'events:view:topk',
  snapshots: {
    today:    () => db.query('select counts_json from topk_daily where day = today()').then(r => r[0]?.counts_json ?? {}),
    lifetime: () => db.query('select counts_json from topk_lifetime').then(r => r[0]?.counts_json ?? {})
  },
  windows: { /* ... */ }
}

Sliding windows are not snapshot-restorable - their bucket boundaries are tied to wall-clock time and would not survive a restart coherently. Pass tumbling and lifetime here.

Cluster mode

Today's aggregate runs on every worker, fed by the source topic via the adapter's cluster bus. State converges across workers as long as the source topic fans out to every worker (the default with createPubSubBus/createShardedBus from svelte-adapter-uws-extensions). For sharded source topics where each worker sees a partition rather than the full firehose, per-worker state diverges and per-window publishes will be inconsistent across instances. A configureAggregate({ leader }) hook may land later (symmetric to configureCron({ leader })); for now, prefer fanout-to-every-worker source topics for windowed aggregates.

Capacity bound

MAX_AGGREGATE_BUCKETS (default 1000) caps a single sliding window's hop-bucket count. A 10-hour sliding window with 1-minute slides allocates 600 buckets - well under the cap. Misconfigurations like a 1ms slide on a 1s window (1000 buckets) are caught at registration with a clear error pointing at the relevant window name.


Gates

Conditional stream activation. On the server, a predicate controls whether the client subscribes. On the client, .when() manages the subscription lifecycle.

// src/live/beta.js
import { live } from 'svelte-realtime/server';

export const betaFeed = live.gate(
  (ctx) => ctx.user?.flags?.includes('beta'),
  live.stream('beta-feed', async (ctx) => db.betaFeed.latest(50), { merge: 'latest' })
);
<script>
  import { betaFeed } from '$live/beta';

  import { writable } from 'svelte/store';

  const tabActive = writable(true);
  const feed = betaFeed.when(tabActive);
</script>

{#if $feed !== undefined}
  {#each $feed as item (item.id)}
    <p>{item.title}</p>
  {/each}
{/if}

When the predicate returns false, the server responds with a graceful no-op (no error, no subscription). The client store stays undefined. .when() accepts a boolean, a Svelte store, or a getter function. When given a store, it subscribes/unsubscribes reactively as the value changes. Getter functions are evaluated once at subscribe time; for reactivity with Svelte 5 $state, wrap in $derived or pass a store.


Pipes

Composable server-side stream transforms. Apply filter, sort, limit, and join operations to both initial data and live events.

// src/live/notifications.js
import { live, pipe } from 'svelte-realtime/server';

export const myNotifications = pipe(
  live.stream('notifications', async (ctx) => {
    return db.notifications.forUser(ctx.user.id);
  }, { merge: 'crud', key: 'id' }),

  pipe.filter((ctx, item) => !item.dismissed),
  pipe.sort('createdAt', 'desc'),
  pipe.limit(20),
  pipe.join('authorId', async (id) => db.users.getName(id), 'authorName')
);
Transform Initial data Live events
pipe.filter(predicate) Filters the array Initial data only
pipe.sort(field, dir) Sorts the array Initial data only
pipe.limit(n) Slices to N items Initial data only
pipe.join(field, resolver, as) Enriches each item Initial data only

Piped functions preserve all stream metadata. The client receives already-transformed data.


Binary RPC

Use live.binary() to send raw binary data (file uploads, images, protobuf) over WebSocket without base64 encoding.

// src/live/upload.js
import { live } from 'svelte-realtime/server';

export const uploadAvatar = live.binary(async (ctx, buffer, filename) => {
  await storage.put(`avatars/${ctx.user.id}/${filename}`, buffer);
  return { url: `/avatars/${ctx.user.id}/${filename}` };
}, { maxSize: 5 * 1024 * 1024 }); // reject payloads over 5MB (default: 10MB)
<script>
  import { uploadAvatar } from '$live/upload';

  async function handleFile(e) {
    const file = e.target.files[0];
    const buffer = await file.arrayBuffer();
    const { url } = await uploadAvatar(buffer, file.name);
  }
</script>

<input type="file" accept="image/*" onchange={handleFile} />

The wire format uses a compact binary frame: 0x00 marker byte + uint16 BE header length + JSON header + raw binary payload. This avoids base64 overhead entirely.


Streaming uploads

Use live.upload() when live.binary()'s atomic-one-frame contract starts to hurt - multi-megabyte files, slow connections, anywhere you want progress, cancellation, and bounded server memory. The handler consumes an async-iterable of chunks, returns a JSON-serialisable result when done, and gets an AbortSignal that fires on client cancel, WS disconnect, or capacity-cap rejection.

// src/live/uploads.js
import { live, LiveError } from 'svelte-realtime/server';
import { open } from 'node:fs/promises';

export const avatar = live.upload(async (ctx, name, mime) => {
  if (!ctx.user) throw new LiveError('UNAUTHORIZED');

  const sink = await open(`/var/uploads/${ctx.user.id}/${name}`, 'w');
  let bytes = 0;
  try {
    for await (const chunk of ctx.stream) {
      ctx.signal.throwIfAborted();
      await sink.write(chunk);
      bytes += chunk.byteLength;
    }
    return { url: `/uploads/${ctx.user.id}/${name}`, bytes, mime };
  } finally {
    await sink.close();
  }
}, {
  maxSize: 50 * 1024 * 1024,         // hard cap per upload (default 100MB)
  maxConcurrentPerSession: 2,         // protect a node from one chatty client (default 4)
  maxConcurrentTotal: 1000,           // global cap (default unbounded; opt in)
  maxBufferedChunks: 64,              // backpressure - chunks queued before refusing more (default 64)
  reauthEvery: 5 * 1024 * 1024        // re-run module guards every 5 MB received (default unset)
});

The handler signature is (ctx, ...args) like a regular RPC. ctx carries the usual fields plus three streaming extras:

  • ctx.stream - AsyncIterable<Uint8Array> yielding chunks in arrival order.
  • ctx.signal - AbortSignal that aborts on client cancel, WS disconnect, maxSize exceeded, or maxBufferedChunks overflow. Wire any cleanup you need to it.
  • ctx.upload - { id } for log correlation.

Guards and global middleware run once before the first chunk is consumed. An unauthorised client never gets to send bytes.

For long uploads where the user's session can be revoked mid-stream (token expiry, explicit logout, role downgrade), pass reauthEvery: <bytes>. The module guard is re-run against the live ctx every N bytes received past the last re-auth; on rejection the upload aborts with the error code (UNAUTHENTICATED / FORBIDDEN) and the consumer observes ctx.signal.aborted. Default is unset (legacy single-check behavior at chunk-0). Reauth runs as a fire-and-forget task off the chunk-receive path so the receive loop stays sync; concurrent reauths on the same upload are coalesced.

Client

The Vite plugin generates a client stub for every live.upload() export in src/live/, so usage looks like a normal RPC import:

<script>
  import { avatar } from '$live/uploads';

  let uploading = $state(false);
  let percent = $state(0);
  let pendingHandle = $state(null);

  async function upload(file) {
    uploading = true;
    percent = 0;

    const handle = avatar(file, file.name, file.type);
    pendingHandle = handle;
    handle.on('progress', (p) => { percent = (p.percent ?? 0) * 100; });

    try {
      const result = await handle;
      console.log('uploaded:', result);
    } catch (err) {
      if (err.code !== 'CANCELLED') console.error('upload failed:', err);
    } finally {
      uploading = false;
      pendingHandle = null;
    }
  }

  function cancel() {
    pendingHandle?.cancel();
  }
</script>

<input type="file" onchange={(e) => upload(e.target.files[0])} disabled={uploading} />
{#if uploading}
  <progress value={percent} max="100"></progress>
  <button onclick={cancel}>Cancel</button>
{/if}

The handle is a thenable - await handle resolves with the handler's return value or rejects with an RpcError. The same handle exposes:

  • Events via handle.on(event, cb) returning an unsubscribe:

    • progress - { sent, total?, percent?, chunks, bytesPerSec }. total and percent are undefined for ReadableStream sources unless you compute the total yourself.
    • complete - the server's return value.
    • cancel - the cancel reason (fires immediately before error when cancelled).
    • error - the RpcError.
  • Snapshot getters: handle.sent, handle.total, handle.chunks, handle.progress, handle.bytesPerSec, handle.streamId, handle.streamIdHex.

  • Cancellation: handle.cancel(reason?) sends a control frame to the server and rejects the promise. Compose with AbortController:

    ac.signal.addEventListener('abort', () => handle.cancel());
    

Frame size is auto-discovered, hard-capped, and structurally safe. The server announces its platform.maxPayloadLength on the first upload response per connection; subsequent uploads use that value as the wire frame size, with the framework subtracting envelope overhead (10 bytes on chunks 1+, 12 + argsLen on chunk 0) per chunk internally. The first upload uses a conservative 12KB default that fits under any realistic adapter cap. To pin an explicit value (e.g. for memory-constrained clients), configure({ upload: { frameSize: 32 * 1024 } }) -- user-configured wins over discovery, but is silently clamped down to the discovered cap with a one-time dev warn if it exceeds the adapter's limit. The framework guarantees no wire frame ever exceeds the adapter's maxPayloadLength, eliminating the silent close-with-1009 failure mode that the pre-rename chunkSize knob could trigger when a user mirrored the adapter's cap value verbatim.

The chunkSize field is accepted as a deprecated alias for frameSize; existing config still works, with a one-time dev warn pointing at the rename. See MIGRATION.md for the rationale and migration shape.

The handle auto-starts but the first chunk is sent on the next microtask, so attaching listeners on the same line as construction (const h = avatar(file); h.on('progress', ...);) never misses early events.

Wire format

Two frame markers join 0x00 (binary RPC) and 0x7B (JSON RPC) on the existing realtime channel:

Chunk frame (client -> server):
  [0]      0x01
  [1]      flags
            bit 0: hasArgs   (set on chunk 0 only)
            bit 1: isLast
            bits 2-7: reserved (must be 0)
  [2..5]   streamId, big-endian uint32
  [6..9]   seq, big-endian uint32 (0-indexed, contiguous)
  [10..]   if hasArgs:  [10..11] argsLen u16  + JSON header { rpc, args }  + payload bytes
           else:        payload bytes

Cancel frame (client -> server):
  [0]      0x02
  [1]      0x10 (cancel)
  [2..5]   streamId

Per-chunk overhead is 10 bytes (12 bytes + JSON args length on chunk 0). On 64KB chunks that's 0.015% wire overhead.

Server -> client responses ride the existing __upload topic via platform.send(ws, '__upload', streamIdHex, payload), with payload shaped as either { ok: true, data: <handler return> } or { ok: false, code, error }.

Status

live.upload ships complete: server primitive with capacity caps + abort-on-disconnect, client UploadHandle with progress / cancel / events / AbortController integration, auto-discovery of the adapter's maxPayloadLength for chunk-size tuning, and Vite plugin generation of typed client stubs (UploadHandle<T> in $live/<module> types). Adapter follow-ups (raise default maxPayloadLength, expose bufferedAmount(ws) for true backpressure-aware pumping) are tracked separately and don't block this surface.


Rooms

Bundle data, presence, cursors, and scoped actions into a single declaration.

// src/live/collab.js
import { live } from 'svelte-realtime/server';

export const board = live.room({
  topic: (ctx, boardId) => 'board:' + boardId,
  init: async (ctx, boardId) => db.cards.forBoard(boardId),
  presence: (ctx) => ({ name: ctx.user.name, avatar: ctx.user.avatar }),
  cursors: true,
  guard: async (ctx) => {
    if (!ctx.user) throw new LiveError('UNAUTHORIZED');
  },
  actions: {
    addCard: async (ctx, boardId, title) => {
      const card = await db.cards.insert({ boardId, title });
      ctx.publish('created', card);
      return card;
    }
  }
});

On the client, the room export becomes an object with sub-streams and actions. Room actions receive the same leading arguments as the topic function (boardId in this case), followed by any action-specific arguments:

<script>
  import { board } from '$live/collab';

  const data = board.data(boardId);         // main data stream
  const users = board.presence(boardId);     // presence stream
  const cursors = board.cursors(boardId);    // cursor stream
</script>

{#each $data as card (card.id)}
  <Card {card} />
{/each}

<button onclick={() => board.addCard(boardId, 'New card')}>Add</button>

Room hooks shortcut

Rooms expose a .hooks property for one-liner wiring in hooks.ws.js:

// src/hooks.ws.js
import { board } from './live/collab.js';

export const { message, close, unsubscribe } = board.hooks;

Webhooks

Bridge external HTTP webhooks into your pub/sub topics.

// src/live/integrations.js
import { live } from 'svelte-realtime/server';

export const stripeEvents = live.webhook('payments', {
  verify({ body, headers }) {
    return stripe.webhooks.constructEvent(body, headers['stripe-signature'], webhookSecret);
  },
  transform(event) {
    if (event.type === 'payment_intent.succeeded') {
      return { event: 'created', data: event.data.object };
    }
    return null; // ignore other event types
  }
});

Use the handler in a SvelteKit endpoint:

// src/routes/api/stripe/+server.js
import { stripeEvents } from '$live/integrations';

export async function POST({ request, platform }) {
  const body = await request.text();
  const headers = Object.fromEntries(request.headers);
  const result = await stripeEvents.handle({ body, headers, platform });
  return new Response(result.body, { status: result.status });
}

Signals

Point-to-point ephemeral messaging. Send a signal to a specific user without broadcasting to a topic.

// Server: send a signal
const handler = live(async (ctx, targetUserId, offer) => {
  ctx.signal(targetUserId, 'call:offer', offer);
});
// Client: receive signals
import { onSignal } from 'svelte-realtime/client';

const unsub = onSignal(currentUser.id, (event, data) => {
  if (event === 'call:offer') showIncomingCall(data);
});

Enable signal delivery in your open hook:

import { enableSignals } from 'svelte-realtime/server';
export function open(ws) { enableSignals(ws); }

Schema evolution

Versioned streams with declarative migration functions. When you change a data shape, old clients receive migrated data automatically.

export const todos = live.stream('todos', async (ctx) => {
  return db.todos.all();
}, {
  merge: 'crud',
  key: 'id',
  version: 3,
  migrate: {
    // v1 -> v2: add priority field
    1: (item) => ({ ...item, priority: item.priority ?? 'medium' }),
    // v2 -> v3: rename 'done' to 'completed'
    2: (item) => {
      const { done, ...rest } = item;
      return { ...rest, completed: done ?? false };
    }
  }
});

The Vite plugin includes the stream version in the client stub. On reconnect, the client sends its version. If the server is ahead, migration functions chain in order (v1 -> v2 -> v3). If versions match, no migration runs.

Demo + e2e: subscribeAt(stream, { schemaVersion })

The migration codepath only fires across a real deploy boundary - a v1 server is replaced with a v2 server, a previously-connected v1 client reconnects, the cached _schemaVersion rides up. There's no path in a fresh tab to observe the migrate chain end-to-end, which makes demos and e2e tests awkward.

subscribeAt(stream, { schemaVersion }) from svelte-realtime/test-client creates a parallel store that subscribes pretending to be a stale client at the chosen version. The wire envelope carries schemaVersion: N, the server runs the registered migrate chain forward, and the parallel store renders the migrated payload. Use it for side-by-side demo panels and for e2e assertions on the migrate chain output.

<script>
  import { todos } from '$live/todos';
  import { subscribeAt } from 'svelte-realtime/test-client';

  // Production store at the current server version (no migration on its responses).
  // Parallel stores pretending to be stale clients - each triggers the migrate
  // chain forward from its declared schemaVersion to the server's current version.
  const todosAsV1 = subscribeAt(todos, { schemaVersion: 1 });
  const todosAsV2 = subscribeAt(todos, { schemaVersion: 2 });
</script>

<section>
  <h3>Live (v3): {JSON.stringify($todos)}</h3>
  <h3>Stale v1 client would see: {JSON.stringify($todosAsV1)}</h3>
  <h3>Stale v2 client would see: {JSON.stringify($todosAsV2)}</h3>
</section>

For dynamic streams, call the factory first and pass the cached store:

import { messages } from '$live/chat';
import { subscribeAt } from 'svelte-realtime/test-client';

const v1Messages = subscribeAt(messages('room-1'), { schemaVersion: 1 });

Faithful production semantics. Migration is applied ONCE on the initial subscribe response, just as in production. Subsequent live publishes arrive as raw current-version events and merge into the migrated base, exactly as a real reconnected stale client would experience - the panel shows the migrated initial state, then forward-merges new events at the server's current shape.

Why this lives in /test-client and not the main client surface. A public client-side API for "pin my schema version" would let production code chain through migrations on every fetch, which is wasteful and confusing. Schema migration is fundamentally about long-disconnected clients catching up, not opt-in version pinning. The /test-client import path makes the test/demo intent loud at every call site.


Delta sync and replay

Delta sync

Enable delta sync for efficient reconnection on streams with large datasets. Instead of refetching all data, the server sends only what changed since the client's last known version.

export const inventory = live.stream('inventory', async (ctx) => {
  return db.inventory.all();
}, {
  merge: 'crud',
  key: 'sku',
  delta: {
    version: () => db.inventory.lastModified(),
    diff: async (sinceVersion) => {
      const changes = await db.inventory.changedSince(sinceVersion);
      return changes; // null to force full refetch
    }
  }
});

How it works:

  • On first connect, the client gets the full dataset plus a version value
  • On reconnect, the client sends its last known version
  • If versions match: server responds with { unchanged: true } (nearly zero bytes)
  • If versions differ: server calls diff(sinceVersion) and sends only the changes
  • If diff returns null: falls back to full refetch

Three-tier reconnect with delta.fromSeq

For seq-based reconnects where the bounded replay buffer cannot satisfy the gap (the client's lastSeenSeq is older than the oldest entry in the buffer), delta.fromSeq(sinceSeq) is the user-supplied bridge to the durable store. The server falls back to full rehydrate only when fromSeq returns null / undefined.

export const auditFeed = live.stream(
  (ctx, orgId) => `audit:${orgId}`,
  async (ctx, orgId) => loadAudit(orgId, { limit: 200 }),
  {
    merge: 'crud',
    key: 'id',
    replay: true,
    delta: {
      fromSeq: async (sinceSeq) => {
        const events = await db.auditEvents.where('seq', '>', sinceSeq).orderBy('seq').limit(500);
        if (events.length === 0) return [];        // nothing missed, no-op
        if (events.length === 500) return null;     // too many -> fall through to full rehydrate
        return events;
      }
    }
  }
);

Resolution order on reconnect-with-seq: replay buffer (bounded, fast) -> delta.fromSeq(clientSeq) (this hook) -> full rehydrate via the loader (always safe). Returning [] signals "nothing missed" (client no-op). Each event should carry a seq field so the client's _lastSeq advances; if the events come from a Postgres column with a seq per row, the client tracks correctly without extra plumbing.

Replay

Enable seq-based replay for gap-free stream reconnection. When a client reconnects, it sends its last known sequence number. If the server has the missed events buffered, it sends only those instead of a full refetch.

export const feed = live.stream('feed', async (ctx) => {
  return db.feed.latest(50);
}, { merge: 'latest', max: 50, replay: true });

Replay requires the replay extension from svelte-adapter-uws-extensions. When replay is not available or the gap is too large, the client falls back to a full refetch automatically.

With adapter 0.4.0+, the replay end marker sends { reqId } (replay complete) or { reqId, truncated: true } (cache miss). When truncated, the client automatically resets its sequence number and triggers a full refetch.

Auto-routing: replay: true is the only declaration you need

Once platform.replay is exposed (the standard install pattern is platform.replay = createReplay(redisClient) in your hooks), the framework auto-routes every publish to a replay-eligible topic through platform.replay.publish regardless of which seam the publisher sits on. live.stream(topic, loader, { replay: true }) registers the topic at declaration time; static topics are registered up-front and dynamic topics are registered at first-subscribe time when they resolve.

This applies to every framework publish surface — ctx.publish from RPC handlers, cron auto-publish (live.cron('* * * * * *', topic, async (ctx) => result) where result !== undefined), and ctx.publish from inside cron handlers — without the user wiring anything beyond replay: true. Pre-fix, the user was responsible for wrapping the platform with a wrapWithReplay proxy at every seam (the docs showed it on createMessage only; cron was a separate setCronPlatform(platform) capture, and cron-published events silently bypassed the buffer because the wrap was missing there). The auto-routing makes that asymmetry impossible by construction.

If you need to keep your own platform-wrapping proxy (custom topic patterns, additional intercepts), set [WRAPPED_FOR_REPLAY] = true on the proxy:

import { WRAPPED_FOR_REPLAY } from 'svelte-realtime/server';

function wrapWithReplay(p) {
  const wrapped = new Proxy(p, { /* ...your intercepts... */ });
  wrapped[WRAPPED_FOR_REPLAY] = true; // tell the framework "I own replay routing"
  return wrapped;
}

The framework defers entirely when this marker is present (no double-write to Redis). Without the marker, the framework's auto-routing runs alongside the user proxy's routing and will issue duplicate Redis writes — explicit opt-out is required.

If replay: true is declared but platform.replay is never set, dev-mode logs a one-time console.warn per topic on the first publish, with the install pointer for the replay extension. Production runs silently (no per-publish overhead) and the local broadcast still happens.


Redis multi-instance

Use createMessage with the Redis pub/sub bus for multi-instance deployments. ctx.publish automatically goes through Redis when the platform is wrapped.

// src/hooks.ws.js
import { createMessage } from 'svelte-realtime/server';
import { createRedis, createPubSubBus } from 'svelte-adapter-uws-extensions/redis';

const redis = createRedis();
const bus = createPubSubBus(redis);

export function open(ws, { platform }) {
  bus.activate(platform);
}

export function upgrade({ cookies }) {
  return validateSession(cookies.session_id) || false;
}

export const message = createMessage({ platform: (p) => bus.wrap(p) });

No changes needed in your live modules. ctx.publish delegates to whatever platform was passed in, so Redis wrapping is transparent.

If you already run Postgres and don't need Redis, you can use the LISTEN/NOTIFY bridge instead for cross-instance pub/sub.

What the extensions handle

When you add the Redis extensions from svelte-adapter-uws-extensions, you get:

  • Cross-instance pub/sub with echo suppression (messages from the same instance are dropped on receive) and microtask-batched Redis pipelines (multiple publishes in one event loop tick become a single Redis roundtrip)
  • Distributed presence with heartbeat-based zombie cleanup - dead sockets are detected by probing getBufferedAmount(), and stale Redis entries are cleaned server-side by a Lua script after a configurable TTL (default 90s)
  • Replay buffers with atomic sequence numbering via Lua INCR + sorted sets - per-topic ordering is strict, and gap detection triggers a truncation event before replaying what's available
  • Cross-instance rate limiting via atomic Lua scripts that use redis.call('TIME') to avoid clock skew between app servers
  • Circuit breakers with a three-state machine (healthy / broken / probing) - when Redis goes down, the breaker trips after a configurable failure threshold, local delivery continues, and a single probe request tests recovery before resuming full traffic

Combined: Redis + rate limiting

import { createMessage, LiveError } from 'svelte-realtime/server';
import { createRedis, createPubSubBus, createRateLimit } from 'svelte-adapter-uws-extensions/redis';

const redis = createRedis();
const bus = createPubSubBus(redis);
const limiter = createRateLimit(redis, { points: 30, interval: 10000 });

export function open(ws, { platform }) { bus.activate(platform); }
export function upgrade({ cookies }) { return validateSession(cookies.session_id) || false; }

export const message = createMessage({
  platform: (p) => bus.wrap(p),
  async beforeExecute(ws, rpcPath) {
    const { allowed, resetMs } = await limiter.consume(ws);
    if (!allowed)
      throw new LiveError('RATE_LIMITED', `Retry in ${Math.ceil(resetMs / 1000)}s`);
  }
});

Postgres NOTIFY

Combine live.stream with the Postgres NOTIFY bridge for zero-code reactivity. A DB trigger fires pg_notify(), the bridge calls platform.publish(), and the stream auto-updates.

// src/hooks.ws.js
export { message } from 'svelte-realtime/server';
import { createPgClient, createNotifyBridge } from 'svelte-adapter-uws-extensions/postgres';

const pg = createPgClient({ connectionString: process.env.DATABASE_URL });
const notify = createNotifyBridge(pg, {
  channel: 'table_changes',
  parse: (payload) => JSON.parse(payload)
});

export function open(ws, { platform }) {
  notify.activate(platform);
}
// src/live/orders.js - no ctx.publish needed, the DB trigger handles it
export const createOrder = live(async (ctx, items) => {
  return db.orders.insert({ userId: ctx.user.id, items });
});

export const orders = live.stream('orders', async (ctx) => {
  return db.orders.forUser(ctx.user.id);
}, { merge: 'crud', key: 'id' });

Failure modes

Redis goes down

All Redis extensions accept an optional circuit breaker. The breaker trips after a configurable number of consecutive failures (default 5). Once broken, cross-instance pub/sub, presence writes, replay buffering, and distributed rate limiting are skipped entirely - no retries, no queuing, no thundering herd. Local delivery continues normally: ctx.publish() still reaches subscribers on the same instance and across workers. After a configurable timeout (default 30s), the breaker enters a probing state where a single request is allowed through. If it succeeds, the breaker resets to healthy and all extensions resume.

Instance crashes mid-session

The distributed presence extension runs a heartbeat cycle (default 30s) that probes each tracked WebSocket with getBufferedAmount(). Under mass disconnect, the runtime may drop close events entirely - the heartbeat catches these and triggers a synchronous leave. On the Redis side, stale presence entries are cleaned by a server-side Lua script that scans the hash and removes fields older than the configurable TTL (default 90s). The LEAVE_SCRIPT atomically checks whether the same user is still connected on another instance before broadcasting a leave event, so users don't appear to leave and rejoin when a single instance restarts.

Client reconnects after a long disconnect

Reconnection uses up to three tiers depending on what's available and how large the gap is. The replay buffer (configurable, default 1000 messages per topic) fills small gaps with strict per-topic ordering via atomic Lua sequence numbering. If the gap is too large for replay, delta sync kicks in - the client sends its last known version, and the server returns only the changes since that version (or {unchanged: true} if nothing changed). If neither replay nor delta sync can cover the gap, the client falls back to a full refetch of the init function. All three paths are automatic and require no client-side code changes.

Send buffer overflow

Each WebSocket connection has a send buffer limit (default 1MB, configurable via maxBackpressure in the adapter). When the buffer is full, messages are silently dropped. In dev mode, handleRpc logs a warning when a response fails to deliver. For streams that produce high-frequency output, wrap the source with live.breaker() or use live.throttle() / live.debounce() to control the publish rate.

Batch and queue limits

A single batch() call is capped at 50 RPC calls - the client rejects before sending, and the server enforces the same cap as a safety net. The adapter's client-side send queue holds up to 1000 messages; when full, the oldest item is dropped. The adapter rate-limits WebSocket upgrades per IP with a sliding window (default 10 per 10s) to prevent connection floods.


Clustering

svelte-realtime works with the adapter's CLUSTER_WORKERS mode. The adapter spawns N worker threads (default: number of CPUs). On Linux, workers share the port via SO_REUSEPORT and the kernel distributes incoming connections. On macOS and Windows, a primary thread accepts connections and routes them to workers via uWS child app descriptors.

Cross-worker ctx.publish() calls are batched via microtask coalescing - all publishes within one event loop tick are bundled into a single postMessage to the primary thread, which fans them out to other workers. This keeps IPC overhead constant regardless of publish volume.

Workers are health-checked every 10 seconds. If a worker fails to respond within 30 seconds, it is terminated and restarted with exponential backoff (starting at 100ms, max 5s, up to 50 restart attempts before the process exits). On graceful shutdown (SIGTERM / SIGINT), the primary stops accepting connections, sends a shutdown signal to all workers, and waits for them to drain in-flight requests and close WebSocket connections with code 1001 (Going Away) so clients reconnect to another instance.

Method Cross-worker? Safe in live()?
ctx.publish() Yes (relayed) Yes
ctx.platform.send() N/A (single ws) Yes
ctx.platform.sendTo() No (local only) Use with caution
ctx.platform.subscribers() No (local only) Use with caution
ctx.platform.connections No (local only) Use with caution

ctx.publish() is always safe - it relays across workers and, with Redis wrapping, across instances. For targeted messaging, prefer publish() with a user-specific topic over sendTo().


Capacity model

Every internal Map / Set / array with caller-driven growth is bounded by default. Numbers are deliberately generous - far above any healthy single-instance workload - so they catch obvious bugs (subscribe-leak, register-without-deregister) without biting real apps.

Each cap is one of three saturation behaviors:

  • REJECT - caller gets an explicit error or a documented silent skip.
  • WARN-ONLY - logs once per category; structure keeps growing because eviction would corrupt routing.
  • FIFO-evict - drops oldest insertion-order entries; safe for dedup state where re-warn or duplicate is acceptable.
Cap Default Behavior Notes
MAX_PUSH_REGISTRY 10,000,000 WARN+skip Per-userId connection registry for live.push({ userId }).
TOPIC_WS_COUNTS_WARN_THRESHOLD 1,000,000 WARN-only Per-topic subscriber index. Eviction would corrupt routing.
SILENT_TOPIC_WARN_DEDUP_MAX 1,000,000 FIFO-evict Dev-mode silent-topic warning dedup set.
PUBLISH_RATE_WARN_DEDUP_MAX 1,000,000 FIFO-evict Dev-mode publish-rate warning dedup set.
MAX_PRESENCE_REF 1,000,000 WARN+skip In-memory presence-ref map (single-instance dev / small-prod path).
MAX_OPTIMISTIC_QUEUE_DEPTH 1,000 REJECT Per-stream in-flight mutate() queue depth.
Rate-limit identities 5,000 REJECT Per-function live.rateLimit() buckets after stale-sweep.
Throttle/debounce timers 5,000 direct At cap, publishes immediately instead of dropping.
Idempotency results 10,000 FIFO-evict In-process live.idempotent() store; evicts oldest 10%.
Per-stream history 50 FIFO store.history undo/redo ring; configurable via enableHistory.
Per-stream devtools events 20 FIFO Recent-events ring shown in __devtools.

The first six are exported as named constants from svelte-realtime/server and svelte-realtime/client so apps writing tools or dashboards can read them programmatically. The remaining caps are internal; their values are documented here for capacity planning.

MAX_PUSH_REGISTRY (10,000,000)

Per-process Map of userId -> { ws, platform } populated by pushHooks.open and drained by pushHooks.close. When the registry reaches the cap, new userIds are not registered (the connection still works, it just can't be the target of live.push({ userId }) until existing entries clear). A one-shot warning surfaces the saturation. Hitting this typically means push registrations are not being released on disconnect - check hooks.ws.js wires pushHooks.close.

TOPIC_WS_COUNTS_WARN_THRESHOLD (1,000,000)

Per-process Map<topic, Set> tracking which sockets hold an active stream subscription per topic. Used by the realtime layer to pass remainingSubscribers to __onUnsubscribe. Eviction would corrupt subscribe / unsubscribe routing, so this is WARN-ONLY: the map keeps growing past the threshold, but a one-shot warning surfaces. Hitting this typically means runaway dynamic-topic generation (per-request topic strings); prefer aggregating into stable topic names.

SILENT_TOPIC_WARN_DEDUP_MAX (1,000,000)

Dev-mode set of topics that have already fired the silent-topic warning. FIFO-evicts at the cap (dropping the oldest topic just lets it re-warn on its next over-threshold subscribe). Dev-only; production code paths are constant-folded out.

PUBLISH_RATE_WARN_DEDUP_MAX (1,000,000)

Dev-mode set of topics that have already fired the high-frequency publish-rate warning. FIFO-evicts at the cap (dropping the oldest topic just lets it re-warn on its next sample tick). Dev-only.

MAX_OPTIMISTIC_QUEUE_DEPTH (1,000)

Per-stream upper bound on concurrent in-flight store.mutate(asyncOp, change) calls. When the queue depth equals the cap, the next mutate() rejects with MAX_OPTIMISTIC_QUEUE_DEPTH=1000. Hitting this means either the server is unresponsive (mutates are not settling), the call site is firing mutates faster than the server can confirm, or a bulk operation is being submitted as N individual mutates instead of one batch. For bulk actions over 1000 items, prefer batch() or a single bulk-RPC handler. The cap protects the worst-case display-recompute cost during slow-server scenarios; recompute walks the full queue on every server event so cost grows linearly.

Rate-limit identities (5,000)

Per-function rate limiting (live.rateLimit()) tracks sliding-window buckets in memory. When the bucket map reaches the cap, stale buckets are swept first. If still full, new identities are rejected with a RATE_LIMITED error. Existing identities are unaffected.

Throttle/debounce timers (5,000)

Active per-key throttle and debounce entries (ctx.throttle() / ctx.debounce()). At capacity, new entries bypass the timer and publish immediately so data is never silently dropped.

MAX_PRESENCE_REF (1,000,000)

In-memory map of ${topic}\0${userId} -> { count, timer, data } populated by live.room's data-stream onSubscribe and drained on the 5-second grace timer set by onUnsubscribe. Two roles:

  1. Refcounts joins per (user, room) pair so multiple WebSocket subscriptions from the same user in the same room don't double-count, and so a brief disconnect-reconnect within the grace window doesn't fire a leave/rejoin pair.
  2. Backs the in-memory presence-roster fallback in live.room's presence-stream init when platform.presence.list isn't wired (the zero-config dev / single-instance path). The user-supplied presence(ctx) payload is held alongside the refcount so a fresh subscriber can reconstruct the existing roster without a cluster-aware backend.

When the map reaches the cap: entries with a pending leave timer are evicted first (they were already on their way out). If still full after eviction, the new join is dropped - no entry is created, no 'join' is published - and a one-shot warning surfaces. New joiners in this state are invisible in any subscriber's roster until existing entries clear.

For multi-instance deploys, wire a cluster-aware platform.presence (e.g. from svelte-adapter-uws-extensions/presence). When platform.presence.list is a function, the in-memory fallback is bypassed entirely and this cap stops mattering.

Idempotency results (10,000)

In-process live.idempotent() store. At capacity, evicts the oldest 10% of entries to make room. The TTL set per-call also prunes expired entries every 30 seconds.


Production limits

maxPayloadLength (default: 1MB)

Maximum size of a single WebSocket message. If an RPC request exceeds this, the adapter closes the connection (uWS behavior). Adjust maxPayloadLength in the adapter's websocket config if your app needs a different cap.

maxBackpressure (default: 1MB)

Per-connection send buffer. When exceeded, messages are silently dropped. handleRpc checks the return value of platform.send() and warns in dev mode when a response is not delivered.

Client send queue (max 1000)

The adapter's sendQueued() drops the oldest item when the queue exceeds 1000 messages. This queue buffers messages while the WebSocket is reconnecting.

Batch size (max 50)

A single batch() call is limited to 50 RPC calls. The client rejects before sending if the limit is exceeded, and the server enforces the same limit as a safety net. Split into multiple batch() calls if you need more.

Topic length (max 256 characters)

The adapter rejects topic names longer than 256 characters or containing control characters (byte value < 32). This applies to subscribe, unsubscribe, and batch-subscribe messages.

ws.subscribe() vs the subscribe hook

live.stream() calls ws.subscribe(topic) server-side, bypassing the adapter's subscribe hook entirely. This is correct - stream topics are gated by guard(), not the subscribe hook.


Error reporting

onError hook

Both handleRpc and createMessage accept an onError callback for non-LiveError exceptions. LiveError throws are expected errors sent to the client; everything else is an unexpected failure that should be reported.

export const message = createMessage({
  onError(path, error, ctx) {
    sentry.captureException(error, {
      tags: { rpc: path },
      user: { id: ctx.user?.id }
    });
  }
});

Standalone onError

For errors in cron jobs, effects, and derived streams, use the standalone onError function:

import { onError } from 'svelte-realtime/server';

onError((path, error) => {
  sentry.captureException(error, { tags: { live: path } });
});

onCronError still works but is deprecated - use onError instead.

Dev-mode publish rate warning

In development, the framework samples platform.pressure.topPublishers at a configurable interval and logs a one-shot warning per topic when message rate crosses a threshold. Suggests coalesceBy and volatile: true mitigations and suppresses automatically when the topic is already configured with either.

import { live } from 'svelte-realtime/server';

// Defaults: enabled, threshold 200 events/sec, intervalMs 5000
live.publishRateWarning({ threshold: 500, intervalMs: 10_000 });

// Disable entirely
live.publishRateWarning(false);

Production builds constant-fold the activation branch to dead code - zero overhead. The sampler runs once per platform on the first ctx-helpers cache miss; per-publish cost is unchanged. Topics already in _topicCoalesce or _topicVolatile are skipped (the user has already addressed them).

Dev-mode silent-topic warning

In development, the framework arms a one-shot timer when a stream first subscribes to a topic. If no events arrive within thresholdMs (default 30000), it logs a warning naming the topic and the common causes:

[svelte-realtime] Topic 'audit:org-1' has subscribers but no events arrived within 30000ms.
  Common causes:
    - missing pg_notify trigger on the underlying table
    - no ctx.publish() call in the relevant handler
    - intentionally low-traffic topic (extend threshold or suppress)
  Configure: live.silentTopicWarning({ thresholdMs: 60000 })
  Suppress:  live.silentTopicWarning({ suppress: ['audit:org-1'] })
  Disable:   live.silentTopicWarning(false)
  See: https://svti.me/silent-topic
import { live } from 'svelte-realtime/server';

// Lower the bar (default 30s)
live.silentTopicWarning({ thresholdMs: 5000 });

// Suppress per-topic for known-quiet streams (admin views, scheduled reports)
live.silentTopicWarning({ suppress: ['admin:audit', 'cron:reports'] });

// Disable globally
live.silentTopicWarning(false);

Topics starting with __ (system topics: __realtime, __signal:*, __custom) are always skipped automatically; you don't need to add them to suppress. Each topic warns at most once per process; the warning never fires for a topic that has been live, and re-subscribing after a warn does not re-fire. Hard-gated to development - production builds constant-fold the activation branch to dead code, so apps not in dev mode pay zero cost regardless of configuration.

The watchdog reuses the same lifecycle hooks as the staleness watchdog (staleAfterMs): arms on first sub for the topic, observed on every publish, disarms on last unsub. Apps using both features share the per-topic timer machinery without paying twice.

Per-stream onError for loader observability

For streams whose loader can fail, add onError(err, ctx, topic) to the stream options. See Stream lifecycle hooks for the full pattern. Per-stream observers fire alongside the global onError setter, not instead of it.


Custom message handling

When you need to mix RPC with custom WebSocket messages, use onUnhandled or drop to handleRpc directly.

With createMessage:

export const message = createMessage({
  onUnhandled(ws, data, platform) {
    // handle non-RPC messages (binary data, custom protocols, etc.)
  }
});

With handleRpc:

import { handleRpc } from 'svelte-realtime/server';

export function message(ws, { data, platform }) {
  if (handleRpc(ws, data, platform)) return;
  // your custom message handling
}

Progression: export { message } -> createMessage({...}) -> manual handleRpc. Start simple, add options when needed, drop to full control only if you have to.


Server-Side HMR

Changes to files in src/live/ are hot-reloaded on the server without restarting npm run dev. When you save a file, the plugin:

  1. Invalidates the changed module in Vite's server module graph
  2. Clears all server-side registrations (RPC handlers, guards, cron jobs, derived streams, effects, aggregates)
  3. Re-imports the registry module so every __register* call runs with the updated handler functions

This applies to all handler types - live(), live.stream(), live.cron(), live.derived(), live.effect(), live.aggregate(), live.room(), guard(), and everything else. Adding or deleting files in src/live/ also triggers a full re-registration.

Error recovery: if the edited file has a syntax error, the previous handlers are restored so the server keeps working. Fix the error and save again.

Active subscriptions: existing stream subscribers keep their current data and connection. They will receive new events published by the updated handler, but the init function only runs on new subscriptions. A full page reload picks up the latest init logic.

Cron jobs: old intervals are cleared and restarted with the updated schedule and handler.


DevTools

In dev mode, the Vite plugin injects an in-browser overlay that shows active streams, RPC history, and connection status. Toggle with Ctrl+Shift+L.

The Streams tab lists every store currently mounted on the page. For each one it shows:

Field Meaning
topic The wire topic the store is subscribed to (or ? while loading).
merge The store's merge strategy (crud, latest, set, presence, cursor).
subs Active subscriber count; the entry disappears when this hits zero.
last Event name of the most recent pub/sub frame and its relative age (12s ago).
err Error code + message if the stream is in the error state; cleared on recovery.

Click any stream row to expand a per-stream payload preview - the most recent 20 envelopes, time + event name + JSON data. Toggle Pretty / Raw via the header buttons (Raw shows full JSON up to ~500 chars; Pretty truncates at ~200 with overflow indicator). Pause stops capturing new events without affecting the live last: timestamp; Clear events drops every stream's ring buffer in one click. Pretty/Raw + Pause states persist across reloads via localStorage.

Privacy. Captured payloads are walked once at write time with key-based redaction. The default redact list covers password, token, apiKey / api_key, secret, authorization, cookie, sessionid / session_id, csrf / csrftoken. Override or extend at runtime:

import { __devtools } from 'svelte-realtime/client';
if (__devtools) {
  __devtools.redactKeys.add('paymentMethod');
  __devtools.redactKeys.add('ssn');
}

Match is case-insensitive and exact-key (no substring fuzz). Redacted values render as '[REDACTED]'. Recursion is capped at depth 5 (deeper structures show '[depth-cap]') and arrays at 50 items so a malformed-large payload doesn't pin a graph in memory.

The RPC tab shows pending calls (with elapsed time) and a 50-entry ring buffer of recent results (ok/err, duration). The Connection tab summarizes the same counters.

The overlay is stripped from production builds. Disable it in dev with:

realtime({ devtools: false })

Testing

Use createTestEnv() from svelte-realtime/test to test your live functions without a real WebSocket server.

import { describe, it, expect, afterEach } from 'vitest';
import { createTestEnv } from 'svelte-realtime/test';
import * as chat from '../src/live/chat.js';

describe('chat module', () => {
  const env = createTestEnv();
  afterEach(() => env.cleanup());

  it('sends and receives messages', async () => {
    env.register('chat', chat);

    const alice = env.connect({ id: 'alice', name: 'Alice' });
    const bob = env.connect({ id: 'bob', name: 'Bob' });

    // Subscribe Bob to the messages stream
    const stream = bob.subscribe('chat/messages');
    await new Promise(r => setTimeout(r, 10));

    // Alice sends a message
    const msg = await alice.call('chat/sendMessage', 'Hello!');
    expect(msg.text).toBe('Hello!');

    // Bob receives the live update
    await new Promise(r => setTimeout(r, 10));
    expect(stream.events).toHaveLength(1);
  });
});

TestEnv API:

Method Description
register(moduleName, exports) Register a module's live functions
connect(userData) Create a fake connected client
cleanup() Clear all state (call in afterEach)
platform The mock platform object

TestClient API:

Method Description
call(path, ...args) Call a live() function
subscribe(path, ...args) Subscribe to a live.stream()
binary(path, buffer, ...args) Call a live.binary() function
disconnect() / reconnect() Simulate connection state changes

TestStream API:

Property Description
value Latest value from the stream
error Error if the stream failed
topic The topic the stream is subscribed to
events All pub/sub events received
hasMore Whether more pages are available
waitFor(predicate, timeout?) Wait for a value matching a predicate
simulatePublish(event, data) Publish a server-side event to this stream's topic. Equivalent to env.platform.publish(stream.topic, event, data), but discoverable on the stream return where the test is already focused. Throws if the topic is not yet known (await the initial subscribe first).

Chaos harness

createTestEnv({ chaos: { dropRate, seed } }) enables fault injection on platform.publish so tests can verify resilience to message drops without spinning a real cluster.

import { createTestEnv } from 'svelte-realtime/test';

// 50% drop rate, deterministic via seed
const env = createTestEnv({
  chaos: { dropRate: 0.5, seed: 'rep-1234' }
});

env.register('chat', chat);
const client = env.connect({ id: 'u1' });
const stream = client.subscribe('chat/messages');

// Publish 100 events; ~50 of them get dropped.
for (let i = 0; i < 100; i++) {
  env.platform.publish('chat-messages', 'created', { id: i });
}
// Same seed -> same drop sequence across runs, so failures replay.

Runtime control via env.chaos:

Method/property Description
env.chaos.set({ dropRate?, seed? }) Apply (or replace) chaos config mid-test.
env.chaos.disable() Equivalent to set(null).
env.chaos.config Current { dropRate, seed } or null.
env.chaos.dropped Running count of platform.publish drops.
env.chaos.resetCounter() Zero the counter without changing config.

Currently models the drop-outbound scenario only - platform.publish events to subscribers are dropped at the platform layer. RPC replies (platform.send) are exempt because timing them out would just hang test code; the chaos harness is for testing pub/sub resilience, not RPC retry behavior.

Direct ctx unit tests

createTestContext({ user }) builds a ctx-shaped object suitable for direct unit tests of guards and predicates - helper methods are no-ops, the user / cursor / requestId can be overridden. Use this when the function under test takes ctx and synchronously returns a value; reach for createTestEnv() only when you need full publish/subscribe round-trips.

import { createTestContext } from 'svelte-realtime/test';

const adminOnly = (ctx) => ctx.user?.role === 'admin';

expect(adminOnly(createTestContext({ user: { role: 'admin' } }))).toBe(true);
expect(adminOnly(createTestContext({ user: { role: 'viewer' } }))).toBe(false);
expect(adminOnly(createTestContext())).toBe(false);

The returned shape mirrors the production _buildCtx: user, ws, platform, publish, cursor, throttle, debounce, signal, batch, shed, requestId. Helpers default to no-op stubs (publish returns true, shed returns false, etc.), which is correct for predicates that only read ctx.user or ctx.cursor.

Asserting guard rejections

expectGuardRejects(promise, expectedCode?) is a small ergonomic wrapper for the common "this call should be denied" pattern. It awaits the promise, asserts it rejected with a LiveError of the expected code (default 'FORBIDDEN'), and returns the error so further assertions can run on it.

import { createTestEnv, expectGuardRejects } from 'svelte-realtime/test';

const env = createTestEnv();
env.register('admin', adminModule);

const user = env.connect({ role: 'viewer' });

// Guards that throw FORBIDDEN are the default case
await expectGuardRejects(user.call('admin/destroyAll'));

// Anonymous calls typically reject with UNAUTHENTICATED
const anon = env.connect(null);
await expectGuardRejects(anon.call('admin/destroyAll'), 'UNAUTHENTICATED');

// The rejected error is returned for further assertions
const err = await expectGuardRejects(user.call('admin/destroyAll'));
expect(err.message).toMatch(/admin role/);

Server API reference

Import from svelte-realtime/server.

Export Description
live(fn) Mark a function as RPC-callable
live.stream(topic, initFn, options?) Create a reactive stream
live.channel(topic, options?) Create an ephemeral pub/sub channel
live.binary(fn, options?) Mark a function as a binary RPC handler (maxSize limits payload, default 10MB)
live.upload(fn, options?) Streaming upload handler (chunked, abortable async-iterable; maxSize 100MB, maxConcurrentPerSession 4, maxBufferedChunks 64)
live.validated(schema, fn) RPC with Standard Schema input validation (Zod, ArkType, Valibot, etc.)
live.cron(schedule, topic, fn) Server-side scheduled function
live.derived(sources, fn, options?) Server-side computed stream (static or dynamic sources)
live.effect(sources, fn, options?) Server-side reactive side effect
live.aggregate(source, reducers, options) Real-time incremental aggregation
live.room(config) Collaborative room (data + presence + cursors + actions)
live.webhook(topic, config) HTTP webhook-to-stream bridge
live.gate(predicate, fn) Conditional stream activation
live.rateLimit(config, fn) Per-function sliding window rate limiter
live.rateLimits(config) Registry-level rate limits with default / overrides / exempt
live.middleware(fn) Global middleware (runs before guards)
live.access.* Subscribe-time access control helpers
guard(...fns) Per-module auth middleware
LiveError(code, message?) Typed error (propagates to client)
handleRpc(ws, data, platform, options?) Low-level RPC handler
message Ready-made message hook
createMessage(options?) Custom message hook factory
pipe(stream, ...transforms) Composable stream transforms
close Ready-made close hook (fires onUnsubscribe for remaining topics)
unsubscribe Ready-made unsubscribe hook (fires onUnsubscribe in real time)
setCronPlatform(platform) Capture platform for cron jobs (call from init({ platform }))
configureCron({ leader }) Cluster-mode leader gate for cron (default: every worker fires)
onError(handler) Global error handler for cron, effects, and derived
onCronError(handler) Deprecated alias for onError
enableSignals(ws) Enable point-to-point signal delivery
_activateDerived(platform) Enable derived stream listeners
live.metrics(registry) Opt-in Prometheus metrics
live.breaker(options, fn) Circuit breaker wrapper

Client API reference

Import from svelte-realtime/client.

Export Description
RpcError Typed error with code field
UploadHandle<T> Type for live.upload client handles (thenable + events + cancel)
batch(fn, options?) Group RPC calls into one WebSocket frame
configure(config) Connection hooks, offline queue, upload frame size
combine(...stores, fn) Multi-store composition
onSignal(userId, callback) Listen for point-to-point signals
onDerived Re-exported from adapter: reactive derived topic subscription

Stream store methods (on $live/ stream imports):

Method/Property Description
error Readable<RpcError | null> - current error, or null when healthy
status Readable<'loading' | 'connected' | 'reconnecting' | 'error'> - connection status
optimistic(event, data) Apply instant UI update, returns rollback function
hydrate(initialData) Pre-populate with SSR data
loadMore(...extraArgs) Load next page (cursor-based)
hasMore Whether more pages are available
enableHistory(maxSize?) Start tracking for undo/redo
undo() / redo() Navigate history
canUndo / canRedo Whether undo/redo is available
when(condition) Conditional subscription

Vite plugin options

Import from svelte-realtime/vite.

import realtime from 'svelte-realtime/vite';

export default {
  plugins: [sveltekit(), uws(), realtime({ dir: 'src/live' })]
};
Option Default Description
dir 'src/live' Directory containing live modules
typedImports true Generate .d.ts for typed $live/ imports
devtools true Enable the in-browser DevTools overlay in dev mode

The plugin resolves $live/chat to src/live/chat.js, generates client stubs, supports nested directories ($live/rooms/lobby), and watches for file changes in dev mode. When typedImports is enabled, it generates type declarations that strip the ctx parameter and infer return types.


Benchmarks

The benchmark suite measures the full-stack overhead added by svelte-realtime on top of raw WebSocket messaging: JSON serialization, RPC path resolution, registry lookup, context construction, handler execution, and response encoding. These run in-process with mock objects and isolate the framework cost from network latency.

Run with:

node bench/rpc.js

What gets measured:

  • RPC dispatch overhead: time for handleRpc to parse, look up the registry, build ctx, execute, and respond - compared to calling the function directly
  • Stream merge throughput: operations per second for each merge strategy (crud, latest, set, presence, cursor) applying events to arrays of varying sizes
  • Fast-path rejection: how quickly non-RPC messages are identified and skipped

Merge strategies use an internal Map<key, index> for O(1) lookups instead of linear scans. Updates and upserts on keyed strategies (crud, presence, cursor) are constant-time regardless of array size. Deletes and prepends require an index rebuild (linear), which matches the cost of the delete itself.

Event batching (browser)

In the browser, incoming pub/sub events are queued and flushed once per requestAnimationFrame instead of triggering a Svelte store update per event. This is automatic - no configuration needed.

With high-frequency streams (e.g. 1000 cursors at 20 updates/sec), this reduces reactive store updates from ~20,000/sec to ~60/sec (one per frame). All merge operations still run, but Svelte only diffs and re-renders once per frame.

In Node/SSR (tests, __directCall, etc.), events apply synchronously - no batching overhead.

See bench/rpc.js for the full source.


You can also run the package's own tests:

npm test

Tauri and Capacitor

svelte-realtime works with Tauri and Capacitor without any static build or architectural changes.

Both runtimes let you point their webview at a live URL instead of local files. Your SvelteKit app runs on the server as normal - SSR, WebSocket hydration, live stores, RPC - and the native wrapper adds platform APIs (camera, push notifications, filesystem, etc.) on top.

Capacitor - capacitor.config.ts:

import { CapacitorConfig } from '@capacitor/cli';

const config: CapacitorConfig = {
  appId: 'com.example.app',
  appName: 'My App',
  server: {
    url: 'https://yourapp.com'
  }
};

export default config;

Tauri - tauri.conf.json:

{
  "build": {
    "devPath": "https://yourapp.com",
    "distDir": "https://yourapp.com"
  }
}

The webview loads your server directly. No static adapter, no URL configuration in the client, nothing special in your SvelteKit code.


License

MIT

Top categories

Loading Svelte Themes