Redis and Postgres extensions for svelte-adapter-uws.
The core adapter keeps everything in-process memory. That works great for single-server deployments, but the moment you scale to multiple instances you need shared state. This package provides drop-in replacements backed by Redis and Postgres, with the same API shapes you already know from the core plugins.
platform.publish() reaches all instances, not just the local oneUpgrading from 0.4.x? See the migration guide for every breaking change between 0.4.x and 0.5.x.
Getting started
Clients
Redis extensions
Postgres extensions
Cross-backend
Observability
Reliability
Operations
More
Getting started
The three ecosystem packages move together. Bump them as a group:
svelte-adapter-uws |
svelte-realtime |
svelte-adapter-uws-extensions |
Notes |
|---|---|---|---|
^0.4.x |
^0.4.x |
^0.4.x |
Legacy stable |
^0.5.0 |
^0.5.0 |
^0.5.0 |
Current. Node 22+ required. Redis 7+ for createShardedBus / createFunctionLibrary. Redis 7.4+ for createPresence (per-field HEXPIRE). See MIGRATION.md if upgrading from 0.4. |
Mixed-version installs are rejected at install time with a peer-dep warning.
npm install svelte-adapter-uws-extensions ioredis
Postgres support is optional:
npm install pg
Requires svelte-adapter-uws >= 0.2.0 as a peer dependency.
Clients
Factory that wraps ioredis with lifecycle management. All Redis extensions accept this client.
// src/lib/server/redis.js
import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
export const redis = createRedisClient({
url: 'redis://localhost:6379',
keyPrefix: 'myapp:' // optional, prefixes all keys
});
| Option | Default | Description |
|---|---|---|
url |
'redis://localhost:6379' |
Redis connection URL |
keyPrefix |
'' |
Prefix for all keys |
autoShutdown |
true |
Disconnect on sveltekit:shutdown |
options |
{} |
Extra ioredis options |
| Method | Description |
|---|---|
redis.redis |
The underlying ioredis instance |
redis.key(k) |
Returns keyPrefix + k |
redis.duplicate(overrides?) |
New connection with same config. Pass ioredis options to override defaults. |
redis.quit() |
Gracefully disconnect all connections |
Factory that wraps pg Pool with lifecycle management. Two construction modes:
// src/lib/server/pg.js
import { createPgClient } from 'svelte-adapter-uws-extensions/postgres';
// (1) Owned-pool: the client constructs and owns its own pg.Pool.
export const pg = createPgClient({
connectionString: 'postgres://localhost:5432/mydb'
});
// (2) Wrapped-pool: pass an existing pg.Pool to share with raw `pg` use
// elsewhere, another framework integration, or a different module
// in the same app - single connection footprint against the database.
import pg from 'pg';
const pool = new pg.Pool({ connectionString: env.DATABASE_URL });
export const wrapped = createPgClient({ pool });
// pg.end() is a no-op now - the caller owns `pool`'s lifecycle.
| Option | Default | Description |
|---|---|---|
connectionString |
- | Postgres connection string. Required UNLESS pool is provided. |
pool |
- | An existing pg.Pool to wrap. When provided, autoShutdown defaults to false and end() becomes a no-op. Pass connectionString alongside pool if you also need createClient(). |
autoShutdown |
true (owned pool) / false (wrapped pool) |
Disconnect on sveltekit:shutdown. |
options |
{} |
Extra pg Pool options. Ignored when pool is provided. |
| Method | Description |
|---|---|
pg.pool |
The underlying pg Pool (provided or owned). |
pg.query(text, values?) |
Run a query. |
pg.createClient() |
New standalone pg.Client with the same config. Throws when only pool was provided - pass connectionString alongside pool to enable this path. |
pg.end() |
Gracefully close the pool. No-op when wrapping an externally-provided pool. |
Every extension in this package is an authorization-free primitive. None of them know who the caller is, what roles the caller has, or whether the requested action is allowed - they execute whatever the caller passes against the shared Redis or Postgres backend. Calling replay.replay(ws, topic, since), idempotency.handle(key, fn), lock.withLock(key, fn), or queue.enqueue(task) is no more an authorization check than redis.GET key or pg.query('SELECT ...') is one.
Your message handler is the gate. Identity is established at connect time by the adapter's upgrade() hook and stashed on the socket via ws.getUserData(). Your message() handler reads that identity, decides whether the action is allowed, and only then invokes the extension:
// hooks.ws.js
import { idempotency } from './idempotency.js';
export async function message(ws, { data }) {
const { action, input } = JSON.parse(Buffer.from(data).toString());
const { userId, role } = ws.getUserData() ?? {};
// 1. Authentication: did upgrade() reject? If not, ws.getUserData() is non-empty.
if (!userId) return;
// 2. Authorization: this handler decides. The extension does not.
if (action === 'place-order' && !canPlaceOrders(userId, role)) return;
// 3. Only now is the extension invoked. The key is prefixed with the
// trusted userId so a wire-supplied clientOrderId cannot collide
// with another user's namespace.
const key = `order:${userId}:${input.clientOrderId}`;
await idempotency.handle(key, () => persistOrder(userId, input));
}
Higher-level frameworks built on this adapter (e.g. svelte-realtime) wrap this pattern: ctx.user is the same identity object the adapter's upgrade() hook returned, and the framework's _guard / live.public() / // realtime-allow-public machinery is the authorization layer at the RPC seam. The extensions below still do not gate anything themselves; the framework's auth lives outside them.
Two failure modes deserve specific call-out for distributed extensions:
lock.withLock('account:42', ...) call on a shared Redis hits the same lock entry regardless of which app instance or which tenant issued it. If your handler builds the key from a wire field without checking ownership, an unauthorized caller can grab the lock and stall any legitimate owner. Derive keys from a trusted prefix (account:${assertedAccountId(ctx, payload.accountId)}), and consider per-tenant key prefixes on the client itself (createRedisClient({ keyPrefix: \tenant:${tenantId}:` })`) when you operate one Redis across multiple tenants - see shared-Redis tenancy notes for the deployment shape.The same pattern applies to every extension in this README: read identity, decide, derive keys/topics from trusted identity prefixes, then invoke. An extension that "looks like an auth gate" by virtue of accepting an identity-shaped key is just substituting whatever string the caller hands it.
Redis extensions
Distributes platform.publish() calls across multiple server instances via Redis pub/sub. Each instance publishes locally AND to Redis. Incoming Redis messages are forwarded to the local platform with echo suppression (messages originating from the same instance are dropped on receive, keyed by a per-process instance ID).
Multiple publish() calls within the same event-loop tick are coalesced into a single Redis pipeline via microtask batching. This means a form action that publishes to three topics results in one pipelined round trip, not three independent commands.
Authorization: the bus delivers whatever publishes arrive to whoever is subscribed. It does not gate topic-subscribe or topic-publish. If a topic should be limited to a subset of users, enforce that in your subscribe / message handler. See Authorization model.
// src/lib/server/bus.js
import { redis } from './redis.js';
import { createPubSubBus } from 'svelte-adapter-uws-extensions/redis/pubsub';
export const bus = createPubSubBus(redis);
// src/hooks.ws.js
import { bus } from '$lib/server/bus';
let distributed;
export async function open(ws, ctx) {
// Activates the Redis subscriber (idempotent) AND subscribes the
// connection to the bus's systemChannel so degraded / recovered
// events reach it. One line, zero ceremony.
await bus.hooks.open(ws, ctx);
// Wrapped platform: publish() reaches local clients AND every other instance.
distributed = bus.wrap(ctx.platform);
}
export function message(ws, { data, platform }) {
const msg = JSON.parse(Buffer.from(data).toString());
distributed.publish('chat', 'message', msg);
}
If your open hook does nothing else, drop the ceremony entirely:
// src/hooks.ws.js
import { bus } from '$lib/server/bus';
export const { open } = bus.hooks;
publishBatched)When a single request publishes many events at once - bulk imports, room state resets, audit fanouts - use publishBatched instead of a publish loop. It ships one Redis envelope for the whole batch, and receivers fan out via platform.publishBatched so each subscriber sees one WebSocket frame per call.
distributed.publishBatched([
{ topic: 'org:42:items', event: 'updated', data: a },
{ topic: 'org:42:items', event: 'updated', data: b },
{ topic: 'org:42:audit', event: 'created', data: c }
]);
Trade-offs vs wrapped.publish in a tight loop:
bench/01-publish-batched-bus.mjs).relay: false is honored. Flagged messages still publish locally but are excluded from the Redis envelope.wrapped.batch(messages) is left in place for callers that explicitly want N independent publishes; it does not produce wire batching. Use publishBatched whenever you want one frame per subscriber per call.
| Option | Default | Description |
|---|---|---|
channel |
'uws:pubsub' |
Redis channel name |
systemChannel |
'__realtime' |
Topic for auto-emitted degraded / recovered events. null or false to disable. Requires a breaker |
onDegraded |
- | Server-side handler invoked once when the breaker leaves the healthy state |
onRecovered |
- | Server-side handler invoked once when the breaker returns to the healthy state |
maxEnvelopeBytes |
1048576 (1 MB) |
Reject inbound bus envelopes larger than this before JSON.parse runs. Defends against a hostile co-tenant or compromised peer flooding the bus with oversized payloads. |
allowSystemTopics |
false |
When false (default), inbound envelopes addressed to __-prefixed topics are dropped; the configured systemChannel (default __realtime) remains in an explicit allowlist so the bus's own degraded / recovered events still flow. Closes the bus-injection class in shared-Redis deployments where a foreign publisher could otherwise inject forged __signal:* / __rpc / plugin-internal frames into the local platform. Apps that legitimately bus-relay user-defined __-prefixed topics (rare) can opt back in with true. |
See Notifying clients of degradation for the full pattern.
| Method | Description |
|---|---|
bus.wrap(platform) |
Returns a new Platform whose publish(), batch(), and publishBatched() send to Redis + local. Other Platform methods (send, sendCoalesced, request, pressure, etc.) pass through unchanged. App-stashed convention slots (platform.replay for svelte-realtime's auto-replay routing) are forwarded as live getters so framework discovery survives the wrap on the cron seam |
bus.hooks |
Ready-made WebSocket hooks. open(ws, ctx) activates the Redis subscriber (idempotent) AND subscribes ws to the bus's systemChannel so degraded / recovered events are delivered. Destructure for one-line hooks.ws.js wiring: export const { open } = bus.hooks; |
bus.activate(platform) |
Start the Redis subscriber (idempotent). Equivalent to the subscriber half of bus.hooks.open; prefer bus.hooks.open for new code |
bus.deactivate() |
Stop the subscriber |
createPubSubBus uses one channel for every message, so in a Redis Cluster every node receives every publish via the cluster bus. For deployments with many fine-grained topics where each subscriber only cares about a small subset (chat with millions of rooms, per-document collaboration, per-user feeds), most of that fan-out is wasted bandwidth.
createShardedBus (svelte-adapter-uws-extensions/redis/sharded-pubsub) is the SPUBLISH/SSUBSCRIBE variant: per-topic channels, dynamic subscription via follow(topic) / unfollow(topic), no wildcards. In Redis Cluster, messages stay on the shard that owns each channel rather than fanning out to every node.
Requires Redis 7+. activate() runs INFO server and throws on older servers; use createPubSubBus for Redis 6 / older Valkey.
Authorization: same as the unsharded bus - the sharded variant is a delivery optimization, not an access gate. Gate
follow(topic)calls and publishes in your handler. See Authorization model.
import { createShardedBus } from 'svelte-adapter-uws-extensions/redis/sharded-pubsub';
export const bus = createShardedBus(redis, {
shardKey: (topic) => topic.split(':')[0] // optional grouping
});
// src/hooks.ws.js
import { bus } from '$lib/server/bus';
let distributed;
export async function open(ws, { platform }) {
await bus.activate(platform);
distributed = bus.wrap(platform);
}
// Wire follow / unfollow against WebSocket subscribe / unsubscribe:
export const { subscribe, unsubscribe, close } = bus.hooks;
// Or manually:
// await bus.follow('chat:room-7');
// distributed.publish('chat:room-7', 'msg', { text: 'hi' });
// await bus.unfollow('chat:room-7');
bus.hooks is the recommended path - it tracks per-ws subscription state and refcounts so the bus only SSUBSCRIBEs on the first follower per channel and SUNSUBSCRIBEs on the last one out.
followBatch, bus.hooks.subscribeBatch)bus.followBatch(topics) groups the input topics by shard channel and SSUBSCRIBEs any new channels in one round trip. Pairs with the adapter's subscribeBatch hook (hooks.ws.subscribeBatch) so an N-topic subscribe batch lands as one round-trip-per-channel rather than one round-trip-per-topic. With the adapter's client-side coalescing (next.7+), the win covers initial-mount subscribes too, not just reconnect resubscribes.
// Today (works, but N round trips):
export const subscribeBatch = async (ws, topics) => {
for (const topic of topics) await bus.follow(topic);
};
// Recommended - one round trip per shard channel:
export const { subscribeBatch } = bus.hooks;
Single follow / unfollow keep their existing semantics; followBatch is purely additive. Refcount semantics for individual topics match follow: each call to followBatch bumps every input topic's refcount by 1, and only channel transitions trigger Redis traffic. Empty arrays no-op; duplicate topics in the input collapse to one refcount bump.
bus.hooks.subscribeBatch skips __-prefixed topics like the per-topic subscribe hook does, and skips topics this ws is already following so a duplicate batch from a flaky reconnect doesn't leak refcount.
publishBatched)distributed.publishBatched(messages) ships one SPUBLISH envelope per shard channel per call. Receivers fan out via platform.publishBatched so each follower sees one WebSocket frame per call.
distributed.publishBatched([
{ topic: 'chat:room1', event: 'msg', data: a },
{ topic: 'chat:room2', event: 'msg', data: b },
{ topic: 'audit:org1', event: 'created', data: c }
]);
// With shardKey: (t) => t.split(':')[0], this is two SPUBLISH envelopes
// (one to the 'chat' shard channel, one to 'audit') instead of three.
Same trade-offs as the unsharded bus: linear Redis-publish-count reduction with batch size, per-subscriber wire frames drop from N to 1, per-message relay: false honored. Use publishBatched for bulk operations; wrapped.batch(messages) remains a per-event loop for callers that explicitly want N separate publishes.
| Option | Default | Description |
|---|---|---|
channelPrefix |
'uws:sharded:' |
Prefix for sharded pub/sub channels |
shardKey |
(topic) => topic |
Map a topic to a shard label. The channel is channelPrefix + shardKey(topic). Default: identity (one channel per topic). |
maxEnvelopeBytes |
1048576 (1 MB) |
Reject inbound bus envelopes larger than this before JSON.parse runs. Defends against a hostile co-tenant or compromised peer flooding the cluster bus with oversized payloads. |
allowSystemTopics |
false |
When false (default), inbound envelopes addressed to __-prefixed topics are dropped. Closes the bus-injection class in shared-Redis deployments. Apps that legitimately bus-relay user-defined __-prefixed topics (rare) can opt back in with true. |
createPubSubBus |
createShardedBus |
|
|---|---|---|
| Redis version | any | 7+ |
| Topology | standalone or cluster | meaningful only in cluster |
| Channel model | one shared channel | per-topic (or per shard) |
| Subscription | every instance auto-subscribes | dynamic via follow / hooks |
| Best fit | most apps; broad-interest topics | many fine-grained topics with narrow audiences |
If you don't have a concrete cluster + fine-grained-topics use case, createPubSubBus is simpler and sufficient.
Same API as the core createReplay plugin, but backed by Redis sorted sets. Messages survive restarts and are shared across instances.
Sequence numbers are incremented atomically via a Lua script (INCR + ZADD + trim in a single EVAL), so concurrent publishes from multiple instances produce strictly ordered, gap-free sequences per topic. When the buffer exceeds size, the oldest entries are removed inside the same Lua script - no second round trip required.
When a client requests replay, the buffer checks whether the client's last-seen sequence is older than the oldest buffered entry. If it is (the buffer was trimmed past the client's position), a truncated event fires on __replay:{topic} before any msg events, so the client knows it missed messages and can do a full reload. This also fires when the buffer is completely empty but the sequence counter has advanced past the client's position (e.g. all entries expired via TTL).
Authorization: the replay buffer is identity-blind - it hands history to whoever subscribes to
__replay:{topic}. Gate the topic-subscribe in your handler before invoking the replay extension. See Authorization model.
The same gap state is exposed as a callable: gap(topic, lastSeenSeq) returns { truncated, missingFrom } without driving a full WebSocket replay. Useful for SSR loaders that want to decide between an incremental since() fetch and a full reload before the page even opens its socket.
Replay buffers track one sequence per topic, so the topic boundary is also the gap-detection boundary. Map topics to aggregates (auction:a1b2, chat:room-7, doc:abc123) rather than broadcast channels (auctions:all, chat:everyone). With one topic per aggregate, the buffer size budget covers a real history window per aggregate and gap detection is meaningful: if a client missed seq 14 of chat:room-7, you know exactly which room to refetch. With one broadcast topic, a hot aggregate can rotate the buffer past every other aggregate's history within seconds, so any reconnecting client looks "truncated" even when the aggregate they care about hasn't changed in an hour.
// src/lib/server/replay.js
import { redis } from './redis.js';
import { createReplay } from 'svelte-adapter-uws-extensions/redis/replay';
export const replay = createReplay(redis, {
size: 500,
ttl: 3600 // expire after 1 hour
});
// In a form action or API route
export const actions = {
send: async ({ request, platform }) => {
const data = Object.fromEntries(await request.formData());
const msg = await db.createMessage(data);
await replay.publish(platform, 'chat', 'created', msg);
}
};
// In +page.server.js
export async function load() {
const messages = await db.getRecentMessages();
return { messages, seq: await replay.seq('chat') };
}
// In hooks.ws.js - handle replay requests
export async function message(ws, { data, platform }) {
const msg = JSON.parse(Buffer.from(data).toString());
if (msg.type === 'replay') {
await replay.replay(ws, msg.topic, msg.since, platform);
return;
}
}
resumeHook)The adapter's WebSocket resume hook fires on reconnect when the client presents per-topic lastSeenSeqs from sessionStorage. resumeHook() returns a hook function that drives gap-fill across every topic the client cared about, in one line:
// src/lib/server/replay.js
export const replay = createReplay(redis);
// src/hooks.ws.js
import { replay } from '$lib/server/replay';
export const resume = replay.resumeHook();
The returned hook iterates the client's lastSeenSeqs and calls replay.replay(ws, topic, sinceSeq, platform) per topic. Per-topic truncation detection still happens inside replay() - a client whose buffer rolled gets a truncated event on __replay:{topic} so it can do a full reload for that aggregate while other topics continue with incremental gap-fill.
For finer control - custom truncation handling, gathering several gap-fills before flushing, mixing in other resume work - compose by hand:
export async function resume(ws, { lastSeenSeqs, platform }) {
for (const [topic, sinceSeq] of Object.entries(lastSeenSeqs)) {
await replay.replay(ws, topic, sinceSeq, platform);
}
// ... your own resume work alongside replay
}
The same resumeHook() is available on the Postgres backend; behavior is identical.
| Option | Default | Description |
|---|---|---|
storage |
'sortedset' |
Backend: 'sortedset' (default) uses ZADD; 'stream' uses XADD. See Stream backend. |
size |
1000 |
Max messages per topic |
ttl |
0 |
Key expiry in seconds (0 = never) |
durability |
- | Set to 'replicated' for per-publish replication signalling. See Replicated durability. |
minReplicas |
1 |
Minimum replicas that must ack (only with durability: 'replicated'). |
replicationTimeoutMs |
1000 |
Per-publish replication timeout in ms. 0 blocks indefinitely (Redis WAIT semantics). |
storage: 'stream' dispatches to a Redis Streams implementation (XADD/XRANGE) instead of the default sorted-set one. Same external contract - the same publish / seq / gap / since / replay / clear methods, same durability: 'replicated' mode, same metrics. Two changes under the hood:
<seq>-0 where seq is the same INCR counter the sorted-set backend uses. XRANGE against (seq-0 filters natively by sequence number, so range queries skip the app-side scan the sorted-set backend does for some paths.const replay = createReplay(redis, {
storage: 'stream',
size: 10000
});
Both backends use the same seq-counter key ({prefix}replay:seq:{topic}) but different buf-key prefixes (replay:buf:{topic} for sorted-set, replay:streambuf:{topic} for stream), so they can coexist on the same Redis without WRONGTYPE collisions. A single topic should pick one backend at startup and stay there - there is no built-in migration helper for switching an existing topic from one backend to the other (greenfield deployments don't need it; if you have one in flight and need to migrate, drain consumers, copy entries with a one-off script, and switch).
The stream backend works on Redis 5+; listpack encoding is the Redis 7+ default that delivers the memory win.
For producers that need at-most-once semantics under retry, the stream backend exposes publishIdempotent:
const replay = createReplay(redis, { storage: 'stream' });
const { seq, isDuplicate } = await replay.publishIdempotent(platform, 'orders', 'created', order, {
producerId: 'order-service',
requestId: order.clientOrderId // stable per-operation id supplied by the caller
});
On a fresh (producerId, requestId) tuple, the call performs the same INCR + XADD + broadcast as publish() and stashes seq in a per-(producer, topic) dedup hash. On a repeat tuple within idempotencyTtl (default 48 hours), the call returns the cached seq, skips the XADD, and skips the local broadcast - the original publish already broadcast to live consumers, and reconnecting consumers pick the entry up via replay() from the buffer.
The seq counter only advances on fresh writes, so duplicate retries do not introduce gaps that would trigger false-positive truncation events on consumers.
The dedup cache key is {prefix}replay:idmp:{producerId}:{topic} - topic-scoped so the same requestId can be reused across topics without collision. Override the TTL per call via opts.idempotencyTtl, or globally via idempotencyTtl on createReplay.
This pairs with the durable task runner (postgres/tasks): a task that publishes to the replay buffer can pass its task id as requestId so worker-crash retries don't double-publish.
For loss-sensitive flows (audit logs, financial events) opt in with durability: 'replicated'. After the write to the master, publish() runs WAIT minReplicas replicationTimeoutMs. If fewer than minReplicas replicas ack within the timeout, publish() throws ReplicationTimeoutError and skips the local broadcast - the data is on the master only, and broadcasting would commit live consumers to state that could be lost if the master fails before replicas catch up.
import { createReplay, ReplicationTimeoutError } from 'svelte-adapter-uws-extensions/redis/replay';
const replay = createReplay(redis, {
durability: 'replicated',
minReplicas: 1,
replicationTimeoutMs: 1000
});
try {
await replay.publish(platform, 'orders', 'created', order);
} catch (err) {
if (err instanceof ReplicationTimeoutError) {
// err.ack, err.minReplicas, err.timeoutMs available for logging
// Caller decides: retry, fail the request, or accept best-effort
}
throw err;
}
The data is in the buffer on the master regardless of the WAIT outcome - other instances doing replay() will still see it. Only the local broadcast is suppressed when the durability signal fails. WAIT command-level errors (network/protocol) bubble up as the original error and DO count as a circuit-breaker failure; an under-acked timeout is a separate signal layer and does NOT trip the breaker.
All methods are async (they hit Redis). The API otherwise matches the core plugin exactly:
| Method | Description |
|---|---|
publish(platform, topic, event, data) |
Store + broadcast. May throw ReplicationTimeoutError when durability: 'replicated'. |
seq(topic) |
Current sequence number |
gap(topic, lastSeenSeq) |
Probe for a buffer gap. Returns { truncated, missingFrom } |
since(topic, seq) |
Messages after a sequence |
replay(ws, topic, sinceSeq, platform) |
Send missed messages to one client |
clear() |
Delete all replay data |
clearTopic(topic) |
Delete replay data for one topic |
Same API as the core createPresence plugin, but backed by Redis hashes. Presence state is shared across instances with cross-instance join/leave notifications via Redis pub/sub.
Requires Redis 7.4+. Uses the per-field hash TTL primitive (HEXPIRE family) so staleness is enforced atomically by Redis rather than by an application-side cleanup script. createPresence runs INFO server on first use and throws on older servers; fall back to the in-memory createPresence from svelte-adapter-uws/plugins/presence for single-instance deployments on older Redis.
Authorization: same as the in-memory presence plugin - the plugin shows whoever subscribes to the topic. Gate topic-subscribe in your handler. The
selectcallback only chooses which fields ofws.getUserData()to publish; it does not gate identity. See Authorization model.
Clients see three event types on __presence:{topic}. Mirrors the adapter's bundled createPresence plugin so a single client decoder handles both single-instance and cluster deployments:
| Event | When | Payload | Direction |
|---|---|---|---|
state |
Once on subscribe | {[userKey]: data} - flat snapshot of current presence |
Server -> single connection |
diff |
Microtask-batched after joins / leaves / updates | {joins: {[key]: data}, leaves: {[key]: data}} |
Server -> topic subscribers |
heartbeat |
Per heartbeat interval | string[] - array of currently-known user keys |
Server -> topic subscribers |
diff collapses by key per-tick: if the same user joins and leaves in the same microtask, only the latest op survives on the wire. An update (same user re-joins with different data) appears as a joins entry carrying the new data, since clients overwrite their Map.set on the same key.
Cross-instance traffic on the dedicated presence:events:{topic} Redis pub/sub channel is {instanceId, topic, event, payload} with event in 'join' | 'leave' | 'updated'. Receivers route inbound events into their local diff buffer for client fan-out, so clients only ever see the unified state / diff shape regardless of which instance the change originated on.
Joins are staged with full rollback on failure: local state is set up first, then the Redis hashes are written, then the WebSocket is subscribed. If any step fails (circuit breaker trips, Redis is down, WebSocket closed during an async gap), all prior steps are undone - local maps, the Redis state, and any buffered diff entry are reversed. Compensating join+leave ops on the same key in the same tick collapse to nothing on the wire.
Storage layout (two hashes per topic). presence:topic:{topic} is a hash keyed by userKey -> JSON {data, ts}; one field per unique user on the topic. It backs list() and count(). presence:user:{topic}:{userKey} is a hash keyed by instanceId -> ts; one field per instance currently presenting this user. It is what LEAVE_SCRIPT consults via HLEN to decide whether to broadcast a leave (HLEN == 0 after the script's HDEL means this instance was the last one). The per-field TTL on both hashes is set with HPEXPIRE, so a crashed instance's entries auto-expire field-by-field without an application-side cleanup pass. Whole-key TTL is intentionally NOT set; the key implicitly disappears when its last field expires.
Leaves use an atomic Lua script (LEAVE_SCRIPT) that does HDEL instanceId on the per-user hash and an O(1) HLEN check. If zero remaining, it also HDELs the userKey from the per-topic hash and returns 1; the application broadcasts a leave. Mass disconnect of N users is therefore O(N) Redis-blocked Lua time, regardless of the topic's total user count.
Crashed-instance cleanup is implicit: the heartbeat refreshes per-field TTLs via HPEXPIRE on every tick, so an instance that stops heartbeating loses its presence entries field-by-field as Redis expires them. The previous heartbeat-driven CLEANUP_SCRIPT pass is no longer needed - Redis 7.4+ does the work in its background expiry task. Zombie cleanup of locally-dead WebSockets still runs on the heartbeat interval: each tick probes every tracked WebSocket via getBufferedAmount() and synchronously purges any whose call throws before the HPEXPIRE refresh runs.
// src/lib/server/presence.js
import { redis } from './redis.js';
import { createPresence } from 'svelte-adapter-uws-extensions/redis/presence';
export const presence = createPresence(redis, {
key: 'id',
select: (userData) => ({ id: userData.id, name: userData.name }),
heartbeat: 30000,
ttl: 90
});
// src/hooks.ws.js
import { presence } from '$lib/server/presence';
export async function subscribe(ws, topic, { platform }) {
await presence.join(ws, topic, platform);
}
export async function close(ws, { platform }) {
await presence.leave(ws, platform);
}
| Option | Default | Description |
|---|---|---|
key |
'id' |
Field for user dedup (multi-tab) |
select |
strips __-prefixed keys |
Extract public fields from userData |
heartbeat |
30000 |
TTL refresh interval in ms |
ttl |
90 |
Per-entry expiry in seconds. Entries from crashed instances expire individually after this period, even if other instances are still active on the same topic. |
keyspaceNotifications |
false |
Subscribe to Redis __keyevent@*__:expired. When a presence hash key expires (instance-died scenario), this instance's local subscribers receive an empty state event. See Keyspace cleanup mode. |
| Method | Description |
|---|---|
join(ws, topic, platform) |
Add connection to presence |
leave(ws, platform, topic?) |
Remove from a specific topic, or all topics if omitted |
sync(ws, topic, platform) |
Send list without joining |
list(topic) |
Get current users |
count(topic) |
Count unique users |
metrics() |
Synchronous snapshot: { totalOnline, heartbeatLatencyMs, staleCleanedTotal }. See Metrics snapshot. |
flushDiffs() |
Drain the pending diff buffer synchronously. Use in graceful-shutdown paths or tests that need the diff to land before the await chain continues. |
clear() |
Reset all presence state |
destroy() |
Stop heartbeat and subscriber |
hooks |
{ subscribe, close } - ready-made WebSocket hooks. Destructure for one-line hooks.ws.js setup. |
presence.metrics() returns a synchronous snapshot of in-memory state:
| Field | Description |
|---|---|
totalOnline |
Sum of unique-users-per-topic across all topics this instance is locally tracking. The same user in two topics counts twice; per-topic counts sum cleanly. |
heartbeatLatencyMs |
Duration of the most recent heartbeat tick in milliseconds. Useful as a rough Redis-health indicator - a tick that suddenly takes longer than usual is likely waiting on a slow Redis. |
staleCleanedTotal |
Reserved for backward compatibility. Always 0 since per-field staleness is now enforced atomically by Redis via HPEXPIRE rather than by an application-side cleanup script. The field stays for callers that read it. |
The same numbers are exposed as Prometheus when a metrics registry is attached: presence_total_online{topic="..."} (gauge), presence_heartbeat_latency_ms (gauge). The pre-Design-G presence_stale_cleaned_total counter is no longer registered.
Two additional counters track the diff-protocol behavior:
| Metric | Description |
|---|---|
presence_diff_frames_total{topic="..."} |
diff frames published to topic subscribers. Compared against presence_joins_total + presence_leaves_total it tells you how much per-tick coalescing the buffer is doing - the bigger the gap, the more bandwidth saved versus per-event broadcast. |
presence_diff_coalesced_total{topic="..."} |
Buffered diff entries overwritten by a later op in the same tick. A non-zero rate confirms the same-key collapse is working (e.g. a user reconnecting fast enough to leave-then-join in one tick). Zero is also a valid state under steady traffic. |
By default a sync-only observer (a connection that called presence.sync() to watch a room without joining it) only learns about leaves when the tracking instance broadcasts a diff with the user in leaves. If the tracking instance crashes, the broadcast never fires and the observer's UI shows stale data until the page is reloaded.
keyspaceNotifications: true closes that gap by psubscribe-ing to __keyevent@*__:expired. When the presence hash key for a topic expires (which happens once no instance is heartbeating the topic anymore - typically because the only tracker crashed), this instance emits an empty state event on __presence:<topic> so local subscribers can replace their entire local map with "no one here."
const presence = createPresence(redis, {
key: 'id',
keyspaceNotifications: true
});
Operator burden: Redis must be configured to publish keyspace events:
CONFIG SET notify-keyspace-events Ex
(or any flagset that includes both K/E and x - e.g. Ex, KEA, etc.) If the psubscribe call fails because keyspace events are off, the failure is logged once and the rest of the tracker keeps working without the keyspace branch.
Scope: this hooks into whole-key expiry of presence:topic:{topic}, which fires once every field has expired (no live instances presenting any user on this topic). Per-field expiry from individual crashed instances is handled atomically by Redis 7.4+ HEXPIRE and does NOT trigger this notification - the user just disappears from list() / count() results without an explicit "user left" event. Apps that need an explicit "user left" event for crashed instances either accept the eventual-consistency story (subscribers see the user disappear on the next presence query) or wire a sweeper that subscribes to __keyevent@*__:hexpired (per-field expiry events, separate flag from whole-key :expired).
Instead of writing subscribe and close handlers manually, destructure presence.hooks:
// src/hooks.ws.js
import { presence } from '$lib/server/presence';
export const { subscribe, close } = presence.hooks;
subscribe handles both regular topics (calls join) and __presence:* topics (calls sync so the client gets the current list). close calls leave.
If you need custom logic (auth gating, logging), wrap the hooks:
import { presence } from '$lib/server/presence';
export async function subscribe(ws, topic, ctx) {
if (!ctx.platform.getUserData(ws).authenticated) return;
await presence.hooks.subscribe(ws, topic, ctx);
}
export const { close } = presence.hooks;
The adapter's platform.request(ws, ...) is single-instance: it takes a local ws reference, so it only works against connections owned by the calling instance. createConnectionRegistry is the cluster-routed counterpart - a userId -> {instanceId, sessionId, ts} map in Redis plus a per-instance push channel that lets any instance route a request to whichever one currently owns a given user's WebSocket.
// src/lib/server/registry.js
import { redis } from './redis.js';
import { createConnectionRegistry } from 'svelte-adapter-uws-extensions/redis/registry';
export const registry = createConnectionRegistry(redis, {
identify: (ws) => ws.getUserData()?.userId
});
Wire the open / close hooks so each connection is tracked cluster-wide:
// src/hooks.ws.js
import { registry } from '$lib/server/registry';
export const open = registry.hooks.open;
export const close = registry.hooks.close;
The identify function returns the user identity for a WebSocket (return null / undefined for anonymous connections; the registry skips them). The registry reads the per-connection WS_SESSION_ID slot the adapter stamps on userData, so no other configuration is required.
// From any instance:
const reply = await registry.request('user-123', 'confirm-action', { op: 'delete' }, {
timeoutMs: 5000
});
if (reply.confirmed) await actuallyDelete();
The lookup resolves which instance currently owns user-123's connection. If that's the calling instance, the request short-circuits to a local platform.request(ws, ...) - no Redis hop. Otherwise the request envelope ships across the per-instance push channel, the owning instance calls platform.request locally, and the reply ships back on the origin's push channel.
Wire envelopes (internal):
| Direction | Channel | Payload |
|---|---|---|
| Origin -> owner | {prefix}__push:{ownerInstanceId} |
{type:'request', ref, sessionId, event, data, replyTo, timeoutMs} |
| Owner -> origin | {prefix}__push:{originInstanceId} |
{type:'reply', ref, data} or {type:'reply', ref, error} |
request(...) rejects on:
timeoutMs exceeded)platform.requestMid-flight migration (user reconnects to a different instance between lookup and reply) surfaces as a timeout on the origin; the owning instance's late reply lands on a missing pending entry and is dropped with a push_late_replies_total increment. See Edge cases below.
| Key | Shape | Notes |
|---|---|---|
{prefix}conns:{userId} |
Hash {instanceId, sessionId, ts, attrs?} |
Most-recent-connection-wins. A second device on the same userId replaces the first; targeting from request(...) always reaches the most recent connection. attrs is a JSON-encoded snapshot of the optional attributes(ws) callback's return value - present only when attributes is wired and returns at least one value. |
{prefix}__push:{instanceId} |
Pub/sub channel | Each instance subscribes to its own channel. Inbound messages dispatch by envelope.type. |
{prefix}__registry-events |
Pub/sub channel | Shared across all instances. Carries `{type:'open' |
Compare-and-delete on close: a Lua-atomic check ensures the close hook only removes the registry entry when the stored instanceId still matches this instance. Prevents a stale close from clobbering a registration that already migrated to another instance via a fast laptop-then-phone reconnect.
| Option | Default | Description |
|---|---|---|
identify |
(required) | `(ws) => userId |
attributes |
- | `(ws) => Record<string, string |
keyPrefix |
'' |
Prefix prepended to all registry keys and channels. Stacks with the underlying client's keyPrefix. |
ttl |
90 |
Expiry on registry entries in seconds. Should be > heartbeat * 3 so a missed beat doesn't drop a live user. |
heartbeat |
30000 |
TTL refresh interval in ms. Each tick EXPIREs every locally-owned entry. |
requestTimeoutMs |
5000 |
Default timeout for request(...) calls. Overridable per call via options.timeoutMs. |
breaker |
- | Optional circuit breaker for Redis ops. |
metrics |
- | Optional Prometheus metrics registry. |
| Method | Description |
|---|---|
lookup(userId) |
Resolve a userId to its current entry ({instanceId, sessionId, ts}) or null. |
request(target, event, data?, opts?) |
Cluster-routed request/reply. Resolves with the reply. |
send(target, topic, event, data?) |
Cluster-routed platform.send counterpart. Fire-and-forget. See Targeted sends below. |
sendCoalesced(target, message) |
Cluster-routed coalesce-by-key send. Fire-and-forget. See Coalesced sends below. |
sendTo(criteria, topic, event, data?) |
Attribute-targeted broadcast across the cluster. Requires attributes option. See Attribute-targeted broadcast below. |
size() |
Count of users registered to THIS instance (local view, scrape-time). |
instanceId |
Stable id for this instance, also the name of its push channel. |
hooks.open / hooks.close |
Wire as ready-made WebSocket hooks. |
destroy() |
Stop the heartbeat timer and Redis subscriber. |
registry.send(target, topic, event, data) is the cluster-routed counterpart to the adapter's platform.send(ws, topic, event, data). Lookup resolves the owning instance, self-targeting short-circuits to a local platform.send, otherwise a fire-and-forget envelope {type:'send', sessionId, topic, event, data} ships on the owner's push channel.
registry.send('user-123', 'notifications', 'incoming', { id: 42 });
Fire-and-forget: no Promiserequest(...) instead. A user offline at lookup-time silently drops with push_sends_total{result="offline"}. Mid-flight migration (user disconnects between lookup and arrival) drops on the receiver with push_sends_total{result="late"}.
registry.sendCoalesced(target, { key, topic, event, data }) is the cluster-routed counterpart to the adapter's platform.sendCoalesced(ws, ...) - one slot per (connection, key) tuple, latest-value-wins. Fire-and-forget; no reply path, no Promise<reply>.
registry.sendCoalesced('user-123', {
key: 'cursor:doc-7',
topic: 'doc:7',
event: 'cursor',
data: { x: 410, y: 220 }
});
Routing follows the same shape as request(...): lookup the owning instance, self-target short-circuits to a local platform.sendCoalesced(ws, ...), otherwise a fire-and-forget envelope {type:'coalesced', sessionId, key, topic, event, data} ships on the owner's push channel and the receiver calls platform.sendCoalesced locally.
Per-(connection, key) replacement happens on the receiver via the adapter's existing coalesce semantics, so a duplicate or out-of-order envelope from a flaky link is collapsed on arrival rather than producing a stutter on the wire. Ordering is preserved within a (user, key) tuple as long as the user does not move instances mid-flight; instance migration triggers one transient out-of-order moment that the per-connection coalesce collapses on the new instance.
Best fit: targeted latest-value streams where the target is a user, not a topic. Cursor positions inside a doc, typing indicators between two users, presence-state pushes from a moderator to a single subscriber. Topic-broadcast coalesce (every subscriber sees the same stream) already works cluster-wide via bus.wrap(platform).publish(...) on either bus and per-receiver A1 logic; this method covers the remaining gap.
registry.sendTo(criteria, topic, event, data) is the cluster-routed counterpart to the adapter's platform.sendTo(filter, ...). Captures per-user attributes at registration time via the attributes(ws) option, indexes them in memory on every instance, and resolves a match into one envelope per owning instance. Keys are entirely user-defined - whatever you return from attributes(ws) is what sendTo can match against. The adapter's platform.sendTo examples use userData.role === 'admin'; the cluster version follows the same shape:
import { createConnectionRegistry } from 'svelte-adapter-uws-extensions/redis/registry';
const registry = createConnectionRegistry(redis, {
identify: (ws) => ws.getUserData()?.userId,
attributes: (ws) => {
const ud = ws.getUserData();
return { role: ud.role, region: ud.region };
}
});
// Broadcast to every connection where attributes.role === 'admin':
registry.sendTo({ role: 'admin' }, 'alerts', 'warning', { message: 'High load' });
// Compound match (AND across keys):
registry.sendTo({ role: 'admin', region: 'eu' }, 'audit', 'created', payload);
platform.sendTo(filter, topic, event, data) accepts a filter function - functions don't serialize across instances, so the filter-function escape hatch is deliberately not lifted here. The cluster shape is shallow equality only: one literal value per attribute key, AND across keys. No regex, no array containment, no nested-object queries. Apps that need richer queries should publish to a dedicated topic and route their own broadcasts.
How it works:
Map<attrKey, Map<attrValue, Set<userId>>> plus a shadow userId -> attrs map and a userId -> instanceId map.{prefix}__registry-events Redis pub/sub channel. Every successful hooks.open publishes {type:'open', userId, instanceId, attrs}; every successful close publishes {type:'close', userId, instanceId} (compare-and-delete already protects against a stale close from a previous owner).{prefix}conns:* hashes and calling HGETALL on each to populate from the live registry state. Subscribe-first / SCAN-second keeps the bootstrap race-tolerant under set semantics.sendTo(criteria, ...) resolves matching userIds via the local index, groups them by their owning instance, fires once per owning instance on the existing {prefix}__push: channel: {type:'sendTo', criteria, topic, event, data}.platform.send(ws, topic, event, data) for each match. Authoritative-on-receiver matching tolerates one round of sender-index staleness from a fast user migration.sendTo is fire-and-forget. No reply, no acknowledgement, no per-target outcome. The single push_sendto_total counter records sender-side outcomes:
result label |
Meaning |
|---|---|
ok |
At least one match resolved; envelopes published successfully (or only self-matches that delivered locally without Redis traffic). |
empty |
No matches resolved by the local index. The call no-ops. |
error |
At least one Redis publish failed; partial delivery still occurred for the successful publishes and any local self-matches. |
Eventual consistency caveats:
attrs field on the Redis hash stays authoritative for lookup(userId). The next call to ensureSubscriber() (e.g., on the next hooks.open) re-bootstraps via SCAN.platform.send/platform.publish reach the receiving connections; matched users with no active subscription on the targeted topic still receive the event since platform.send is per-connection, not per-topic.For exact targeting (audit log, billing, transactional broadcasts), use request(...) per userId or fan out via topic subscribers with the bus.
| Metric | Description |
|---|---|
| `push_requests_total{result="ok | offline |
push_reply_latency_ms |
Histogram of request-publish to reply-receive in milliseconds (success path). |
push_registry_size |
Gauge: connections registered to this instance. Scrape-time, no continuous accounting. |
push_late_replies_total |
Counter: replies that arrived after their request expired or migrated. |
| `push_coalesced_total{result="ok | self |
| `push_sends_total{result="ok | self |
| `push_sendto_total{result="ok | empty |
push_late_replies_total increments).platform.request. Same shape as above - request times out. The Redis entry remains until the TTL expires (sliding heartbeat cleared by the dead instance), after which subsequent request(...) calls see result="offline" from the lookup.platform.request(ws, ...) without round-tripping Redis. One conditional in the dispatcher; not a special case at the API surface.identify(ws) returning null / undefined makes the open / close hooks no-op. Anonymous users are not addressable through the registry by design.Same API as the core createRateLimit plugin, but backed by Redis using an atomic Lua script. Rate limits are enforced across all server instances with exactly one Redis roundtrip per consume() call.
Authorization: rate limiting is anti-abuse, not authorization. Identity-based access checks still live in your handler. The two layers compose: gate auth first, then meter. See Authorization model.
// src/lib/server/ratelimit.js
import { redis } from './redis.js';
import { createRateLimit } from 'svelte-adapter-uws-extensions/redis/ratelimit';
export const limiter = createRateLimit(redis, {
points: 10,
interval: 1000,
blockDuration: 30000
});
// src/hooks.ws.js
import { limiter } from '$lib/server/ratelimit';
export async function message(ws, { data, platform }) {
const { allowed } = await limiter.consume(ws);
if (!allowed) return; // drop the message
// ... handle message
}
| Option | Default | Description |
|---|---|---|
points |
required | Tokens available per interval |
interval |
required | Refill interval in ms |
blockDuration |
0 |
Auto-ban duration in ms (0 = no ban) |
keyBy |
'ip' |
'ip', 'connection', or a function |
All methods are async (they hit Redis). The API otherwise matches the core plugin:
| Method | Description |
|---|---|
consume(ws, cost?) |
Attempt to consume tokens. cost must be a positive integer. |
reset(key) |
Clear the bucket for a key |
ban(key, duration?) |
Manually ban a key |
unban(key) |
Remove a ban |
clear() |
Reset all state |
Same API as the core createGroup plugin, but membership is stored in Redis so groups work across multiple server instances. Local WebSocket tracking is maintained per-instance, and cross-instance events are relayed via Redis pub/sub.
Authorization: the
onJoinhook is the join-decision site; the plugin itself does not authorize. Returning a role admits the socket; throwing rejects. See Authorization model.
// src/lib/server/lobby.js
import { redis } from './redis.js';
import { createGroup } from 'svelte-adapter-uws-extensions/redis/groups';
export const lobby = createGroup(redis, 'lobby', {
maxMembers: 50,
meta: { game: 'chess' }
});
Note: the API signature is createGroup(client, name, options) instead of createGroup(name, options) - the Redis client is the first argument.
// src/hooks.ws.js
import { lobby } from '$lib/server/lobby';
export async function subscribe(ws, topic, { platform }) {
if (topic === 'lobby') await lobby.join(ws, platform);
}
export async function close(ws, { platform }) {
await lobby.leave(ws, platform);
}
| Option | Default | Description |
|---|---|---|
maxMembers |
Infinity |
Maximum members allowed (enforced atomically) |
meta |
{} |
Initial group metadata |
memberTtl |
120 |
Member entry TTL in seconds. Entries from crashed instances expire after this period. |
onJoin |
- | Called after a member joins |
onLeave |
- | Called after a member leaves |
onFull |
- | Called when a join is rejected (full) |
onClose |
- | Called when the group is closed |
| Method | Description |
|---|---|
join(ws, platform, role?) |
Add a member (returns false if full/closed) |
leave(ws, platform) |
Remove a member |
publish(platform, event, data, role?) |
Broadcast to all or filter by role |
send(platform, ws, event, data) |
Send to a single member |
localMembers() |
Members on this instance |
count() |
Total members across all instances |
has(ws) |
Check membership on this instance |
getMeta() / setMeta(meta) |
Read/write group metadata |
close(platform) |
Dissolve the group |
destroy() |
Stop the Redis subscriber |
Same API as the core createCursor plugin, but cursor positions are shared across instances via Redis. Each instance throttles locally (same leading/trailing edge logic as the core), then relays broadcasts through Redis pub/sub so subscribers on other instances see cursor updates.
Out of the box the broadcast path is a 60Hz world-state tick: each topic emits at most one frame per topicThrottle window, carrying the latest position for every cursor that moved. Bandwidth per peer scales with active-mover count, not with mover-count times per-mover rate - 100 cursors moving at 60Hz cost the same per peer as 100 cursors moving at 1Hz, because every peer receives one bulk frame per tick regardless. For high-density rooms (>200 active movers) where the bulk-frame size becomes the bottleneck, lower the tick by raising topicThrottle to 33 (30Hz).
Hash entries have a TTL so stale cursors from crashed instances get cleaned up automatically.
Authorization: cursors broadcast whatever the caller publishes to whoever is subscribed. Gate topic-subscribe and topic-publish in your handler. See Authorization model.
// src/lib/server/cursors.js
import { redis } from './redis.js';
import { createCursor } from 'svelte-adapter-uws-extensions/redis/cursor';
export const cursors = createCursor(redis, {
select: (userData) => ({ id: userData.id, name: userData.name, color: userData.color }),
ttl: 30
});
Cursor publishes go to the internal __cursor:{topic} channel. Clients receive those frames only if the extension has subscribed them server-side - the adapter's wire-level __-prefix gate intentionally denies client-sent __cursor: subscribe frames. Call cursors.attach(ws, topic, platform) from your "join room" RPC (mirroring presence.join); without it, every update fans out to an empty subscriber set.
// src/lib/server/rpc.js (or wherever you handle joinBoard / leaveBoard)
import { cursors } from '$lib/server/cursors';
export async function joinBoard(ws, { topic, platform }) {
await cursors.attach(ws, topic, platform); // subscribes ws + sends snapshot
}
export function leaveBoard(ws, { topic, platform }) {
cursors.detach(ws, topic, platform); // unsubscribes (only needed if the user stays connected)
}
// src/hooks.ws.js
import { cursors } from '$lib/server/cursors';
export function message(ws, { data, platform }) {
const msg = JSON.parse(Buffer.from(data).toString());
if (msg.type === 'cursor') {
cursors.update(ws, msg.topic, msg.position, platform);
}
}
export function close(ws, { platform }) {
cursors.remove(ws, platform);
// No per-topic detach needed - uWS releases all subscriptions on disconnect.
}
| Option | Default | Description |
|---|---|---|
throttle |
16 |
Minimum ms between broadcasts per user per topic. 60Hz default matches the world-state tick so an individual cursor stays smooth at the per-peer wire rate. |
topicThrottle |
16 |
World-state tick rate in ms. Each topic emits at most one bulk frame per window, carrying the latest position for every cursor that moved. Raise to 33 (30Hz) for high-density rooms; 0 disables the tick. |
select |
strips __-prefixed keys |
Extract user data to broadcast alongside position |
ttl |
30 |
Per-entry TTL in seconds (auto-refreshed on each broadcast). Stale entries from crashed instances are filtered out individually, even if other instances are still active on the same topic. |
maxEnvelopeBytes |
1048576 (1 MB) |
Reject inbound cursor envelopes larger than this before JSON.parse runs. The inner topic is always validated against the __ denylist (the module constructs its own __cursor: wrapper prefix), so no allowSystemTopics knob is needed here. |
| Method | Description |
|---|---|
attach(ws, topic, platform) |
Opt the connection into receiving cursor updates for topic (subscribes to __cursor:{topic} server-side and sends a snapshot). Required for any client to see cursor frames. |
detach(ws, topic, platform) |
Stop the connection from receiving updates for topic. Only needed for explicit leave; uWS handles disconnect cleanup. |
update(ws, topic, data, platform) |
Broadcast cursor position (throttled per user per topic) |
remove(ws, platform, topic?) |
Remove from a specific topic, or all topics if omitted |
list(topic) |
Get current positions across all instances |
clear() |
Reset all local and Redis state |
destroy() |
Stop the Redis subscriber and clear timers |
Postgres extensions
Same API as the Redis replay buffer, but backed by a Postgres table. Best suited for durable audit trails or history that needs to survive longer than Redis TTLs. Sequence numbers are generated atomically via a dedicated _seq table using INSERT ... ON CONFLICT DO UPDATE, so concurrent publishes from multiple instances produce strictly ordered sequences with no duplicates or gaps.
Buffer trimming runs after each publish by deleting rows with seq <= currentSeq - maxSize. If the trim query fails, the publish still succeeds - the periodic background cleanup (configurable via cleanupInterval) catches any excess rows later.
Same gap detection behavior as the Redis replay buffer: if the client's last-seen sequence is older than the oldest buffered row, or the buffer is empty but the sequence counter has advanced, a truncated event fires before replay. The standalone gap(topic, lastSeenSeq) probe is also available with the same { truncated, missingFrom } shape; the gap query uses the (topic, seq) index for an O(log n) seek rather than scanning the buffer.
Authorization: same as the Redis replay buffer - history goes to whoever subscribes to
__replay:{topic}. Gate topic-subscribe in your handler. See Authorization model.
The aggregate-vs-broadcast guidance from the Redis replay section applies equally here - one topic per aggregate keeps the buffer size budget meaningful and gap detection actionable.
resumeHook() is available with identical semantics to the Redis backend; see Session resumption.
// src/lib/server/replay.js
import { pg } from './pg.js';
import { createReplay } from 'svelte-adapter-uws-extensions/postgres/replay';
export const replay = createReplay(pg, {
table: 'svti_replay',
size: 1000,
ttl: 86400, // 24 hours
autoMigrate: true // auto-create table
});
The table is created automatically on first use (if autoMigrate is true):
CREATE TABLE IF NOT EXISTS svti_replay (
svti_replay_id BIGSERIAL PRIMARY KEY,
topic TEXT NOT NULL,
seq BIGINT NOT NULL,
event TEXT NOT NULL,
data JSONB,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_svti_replay_topic_seq ON svti_replay (topic, seq);
CREATE TABLE IF NOT EXISTS svti_replay_seq (
topic TEXT PRIMARY KEY,
seq BIGINT NOT NULL DEFAULT 0
);
| Option | Default | Description |
|---|---|---|
table |
'svti_replay' |
Table name |
size |
1000 |
Max messages per topic |
ttl |
0 |
Row expiry in seconds (0 = never) |
autoMigrate |
true |
Auto-create table |
cleanupInterval |
60000 |
Periodic cleanup interval in ms (0 to disable) |
Same as Replay buffer (Redis), plus:
| Method | Description |
|---|---|
destroy() |
Stop the cleanup timer |
Listens on a Postgres channel for notifications and forwards them to platform.publish(). You provide the trigger on your table - this module handles the listening side.
Uses a standalone connection (not from the pool) since LISTEN requires a persistent connection that stays open for the lifetime of the bridge.
Authorization: the bridge forwards every NOTIFY payload from Postgres straight to
platform.publish(). The trigger that emits the NOTIFY is the trust boundary - if your trigger fires on every row mutation regardless of tenant, the published topic must encode the tenant so subscribers see only their own data, AND your subscribe gate must enforce that tenants only subscribe to their own topics. Treating a NOTIFY payload as "already authenticated because it came from the DB" is incorrect; the DB does not know who is subscribing. See Authorization model.
// src/lib/server/notify.js
import { pg } from './pg.js';
import { createNotifyBridge } from 'svelte-adapter-uws-extensions/postgres/notify';
export const bridge = createNotifyBridge(pg, {
channel: 'table_changes',
parse: (payload) => {
const row = JSON.parse(payload);
return { topic: row.table, event: row.op, data: row.data };
}
});
// src/hooks.ws.js
import { bridge } from '$lib/server/notify';
let activated = false;
export function open(ws, { platform }) {
if (!activated) {
activated = true;
bridge.activate(platform);
}
}
Create a trigger function and attach it to your table:
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('table_changes', json_build_object(
'table', TG_TABLE_NAME,
'op', lower(TG_OP),
'data', CASE TG_OP
WHEN 'DELETE' THEN row_to_json(OLD)
ELSE row_to_json(NEW)
END
)::text);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER messages_notify
AFTER INSERT OR UPDATE OR DELETE ON messages
FOR EACH ROW EXECUTE FUNCTION notify_table_change();
Now any INSERT, UPDATE, or DELETE on the messages table will fire a notification. The bridge parses it and calls platform.publish(), which reaches all connected WebSocket clients subscribed to the topic.
The client side needs no changes - the core crud('messages') store already handles created, updated, and deleted events.
| Option | Default | Description |
|---|---|---|
channel |
required | Postgres LISTEN channel name |
parse |
JSON with { topic, event, data } |
Parse notification payload into a publish call. Return null to skip. |
autoReconnect |
true |
Reconnect on connection loss |
reconnectInterval |
3000 |
ms between reconnect attempts |
multiListener |
'all' |
'all': every replica opens its own LISTEN (current default). 'advisory': leader-elected via pg_try_advisory_lock. See Single-listener mode. |
lockId |
- | Advisory lock id. Required when multiListener: 'advisory'. |
pollInterval |
5000 |
ms between leader-election polls (advisory mode only). |
maxEnvelopeBytes |
1048576 (1 MB) |
Reject inbound NOTIFY payloads larger than this before JSON.parse runs. Defends against a buggy or hostile trigger flooding the bridge with oversized payloads. |
allowSystemTopics |
false |
When false (default), parsed envelopes addressed to __-prefixed topics are dropped before the publish call. Closes the NOTIFY-injection class where a foreign publisher (or hostile DBA) could inject forged __signal:* / __rpc / plugin-internal frames via pg_notify. Apps that legitimately bridge user-defined __-prefixed topics (rare) can opt back in with true. |
By default each replica in an N-replica deployment opens its own LISTEN connection. That's N persistent Postgres connections doing the same work, plus N copies of the same notification.
multiListener: 'advisory' elects a single leader via Postgres advisory locks. One replica wins pg_try_advisory_lock(lockId) and holds the LISTEN connection; others poll for the lock every pollInterval ms. If the leader's connection drops, the session-scoped lock auto-releases and another replica picks it up on its next poll.
import { createPubSubBus } from 'svelte-adapter-uws-extensions/redis/pubsub';
import { createNotifyBridge } from 'svelte-adapter-uws-extensions/postgres/notify';
const bus = createPubSubBus(redis);
const bridge = createNotifyBridge(pg, {
channel: 'table_changes',
multiListener: 'advisory',
lockId: 0x6e6f7466 // any stable 32-bit id; e.g. CRC32 of the channel name
});
export function open(ws, { platform }) {
bus.activate(platform);
bridge.activate(bus.wrap(platform));
}
Requires a cross-instance pub/sub bus. In 'all' mode the bridge passes relay: false because every replica's local LISTEN already delivers the notification. In 'advisory' mode only the leader has LISTEN active, so the leader publishes with relay - the bus fans out to non-leader replicas via Redis. Without a bus the leader's local clients receive notifications but other replicas' clients don't.
Choosing a lockId. Pick a stable 32-bit signed integer that's unique per channel within your deployment. CRC32 of the channel name is a reasonable hash; multiple channels in the same database need distinct ids.
| Method | Description |
|---|---|
activate(platform) |
Start listening (idempotent) |
deactivate() |
Stop listening and release the connection |
pg_notify silently truncates or errors above this). This is a Postgres constraint, not a library limitation. The bridge warns at 7500 bytes. For large rows, send the row ID in the notification and let the client fetch the full row via an API call.If your real-time events are driven by database writes and you do not need Redis for other extensions (presence, rate limiting, groups, cursors), the LISTEN/NOTIFY bridge is a simpler deployment: no Redis infrastructure, no separate pub/sub channel management, and your notifications are inherently tied to committed transactions. Use the Redis pub/sub bus when you need to broadcast events that do not originate from database writes, or when you are already running Redis for other extensions.
createJobQueue (svelte-adapter-uws-extensions/postgres/jobs) is a minimal SELECT ... FOR UPDATE SKIP LOCKED queue that works on vanilla Postgres 9.5+ - no extensions required.
The shape: enqueue jobs into a named queue, claim batches atomically, mark complete (delete) or fail (release for retry). Visibility timeout means a worker that crashes mid-processing has its claim auto-expire so another worker can pick the job up. Max-attempts and dead-letter behavior are intentionally NOT baked in - the attempts counter is exposed on every claim, callers track it and decide when to give up.
Authorization: the queue does not check who is allowed to enqueue. If your handler calls
queue.enqueue('emails', payload)with a wire-supplied payload, an attacker can enqueue arbitrary background work for your workers to execute. Validate every enqueue at the handler boundary; treat theclaimedpayload server-side as untrusted input. See Authorization model.
// src/lib/server/jobs.js
import { pg } from './pg.js';
import { createJobQueue } from 'svelte-adapter-uws-extensions/postgres/jobs';
export const jobs = createJobQueue(pg, {
visibilityTimeout: 60000 // 60s default; per-call override on claim()
});
// In a request handler:
const id = await jobs.enqueue(
'email',
{ to: '[email protected]', subject: 'Welcome' },
{ platform } // captures platform.requestId onto the row, surfaced as job.requestId on claim
);
The third argument is an options bag; platform (the SvelteKit event.platform) auto-captures the originating request id, or pass requestId explicitly to override. The captured id surfaces on job.requestId when the row is claimed, so the worker can correlate logs back to the request that enqueued the job.
enqueue() returns the row id verbatim from pg, which serialises BIGINT/BIGSERIAL columns as strings by default to avoid precision loss past Number.MAX_SAFE_INTEGER. Pass it through to claim()/complete()/fail()/extend() as-is. If you want a JS number for logging or comparison, coerce explicitly with Number(id) (safe up to 2^53 - still ~9 quadrillion rows of headroom).
// In a separate worker process or background loop:
async function workerLoop() {
while (running) {
const batch = await jobs.claim('email', { batchSize: 5, visibilityTimeoutMs: 30000 });
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
for (const job of batch) {
try {
await sendEmail(job.payload);
await jobs.complete(job.id);
} catch (err) {
if (job.attempts >= 5) {
await jobs.complete(job.id); // give up after 5 tries
await logToDeadLetter(job, err);
} else {
await jobs.fail(job.id); // release for retry
}
}
}
}
}
For long-running jobs that need more visibility headroom, call jobs.extend(id, additionalMs) periodically while processing.
The table is created automatically on first use (if autoMigrate is true):
CREATE TABLE IF NOT EXISTS svti_jobs (
svti_jobs_id BIGSERIAL PRIMARY KEY,
queue TEXT NOT NULL,
payload JSONB,
request_id TEXT, - originating platform.requestId, or null
claimed_at TIMESTAMPTZ,
claimed_until TIMESTAMPTZ,
attempts INTEGER NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_svti_jobs_queue_pending
ON svti_jobs (queue, svti_jobs_id) WHERE claimed_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_svti_jobs_visibility
ON svti_jobs (claimed_until) WHERE claimed_at IS NOT NULL;
Existing 0.5.0-next.1 deployments forward-migrate via ALTER TABLE ... ADD COLUMN IF NOT EXISTS request_id TEXT on first use; idempotent and zero-downtime.
| Option | Default | Description |
|---|---|---|
table |
'svti_jobs' |
Table name |
autoMigrate |
true |
Auto-create the table on first use |
visibilityTimeout |
30000 |
Default ms a claim is held before another worker can re-claim |
| Method | Description |
|---|---|
enqueue(queue, payload, opts?) |
Insert a job; returns the job id. Opts: { requestId?, platform? } - platform.requestId is captured automatically when platform is passed |
claim(queue, opts?) |
SELECT ... FOR UPDATE SKIP LOCKED claim; opts: { batchSize?, visibilityTimeoutMs? }. Each returned job carries id, queue, payload, requestId, attempts, created_at |
complete(idOrIds) |
Delete the job(s) on success |
fail(idOrIds) |
Release the claim for retry |
extend(idOrIds, ms) |
Push back the visibility deadline |
pending(queue?) |
Count of unclaimed jobs |
clear(queue?) |
Delete all jobs (useful for tests) |
createJobQueue |
createTaskRunner |
|
|---|---|---|
| Surface | minimal: claim / complete / fail | full state machine: idempotency + fence + retry + result tracking |
| Best fit | "ingest event, defer work" with caller-driven retry | tasks that must complete exactly once with cross-instance recovery |
| Result tracking | none (caller tracks via DB writes) | yes (run / await) |
If your handler must run exactly once and you want the runtime to track the result, reach for createTaskRunner. If you want a lighter producer/consumer split with caller-driven retry, this is the simpler shape.
Cross-backend
Caches the result of an effectful operation under a stable key so retries within ttl return the original outcome rather than re-executing. Use it for HTTP/RPC retries, webhook redeliveries, and any handler where the caller may legitimately repeat a request that must execute at most once - charge-customer, send-email, create-order.
The store exposes three states via acquire(key):
commit(result) on success or abort() on failure.A short acquireTtl (default 60 seconds) bounds how long a pending slot can hold the key, so a crashed owner cannot deadlock retries forever. On commit the longer ttl (default 48 hours) replaces the sentinel and governs the cache lifetime.
Two backends share the same contract: pick whichever your stack already runs. The adapter's in-memory Dedup plugin is the zero-config fallback for single-instance deployments.
Authorization: the idempotency key is whatever the caller passes. If your handler builds the key from a wire field without prefixing the trusted userId (e.g.
key: payload.clientOrderId), one user can collide with another's slot and block their next legitimate request - or read their cached result ifacquirereturns the prior commit. Always derive the key from a trusted identity prefix - e.g.\order:${userId}:${payload.clientOrderId}``. See Authorization model.
// src/lib/server/idempotency.js
import { redis } from './redis.js';
import { createIdempotencyStore } from 'svelte-adapter-uws-extensions/redis/idempotency';
export const idempotency = createIdempotencyStore(redis, {
keyPrefix: 'idem:',
ttl: 48 * 3600, // result cache lifetime (48h)
acquireTtl: 60 // pending-slot lifetime (60s)
});
Backed by a single Redis string per key. The acquire path is one Lua-script round trip.
// src/lib/server/idempotency.js
import { pg } from './pg.js';
import { createIdempotencyStore } from 'svelte-adapter-uws-extensions/postgres/idempotency';
export const idempotency = createIdempotencyStore(pg, {
table: 'svti_idempotency',
ttl: 48 * 3600,
acquireTtl: 60,
autoMigrate: true
});
The Postgres backend periodically deletes expired rows (configurable via cleanupInterval, default 60s, 0 to disable). Stale pending rows clear on the next sweep without manual intervention.
The Postgres table is created automatically on first use:
CREATE TABLE IF NOT EXISTS svti_idempotency (
svti_idempotency_key TEXT PRIMARY KEY,
status TEXT NOT NULL,
result JSONB,
expires_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_svti_idempotency_expires_at ON svti_idempotency (expires_at);
// Wrap an effectful handler. The caller passes a stable key per logical
// operation; identical retries return the cached result.
export async function placeOrder(input, ctx) {
const idempotencyKey = `order:${ctx.user.id}:${input.clientOrderId}`;
const slot = await idempotency.acquire(idempotencyKey);
if (slot.acquired) {
try {
const order = await db.createOrder(input);
await slot.commit(order);
return order;
} catch (err) {
await slot.abort();
throw err;
}
}
if (slot.pending) {
throw new Error('duplicate request in flight');
}
return slot.result;
}
| Option | Default | Description |
|---|---|---|
keyPrefix (Redis) |
'idem:' |
Prepended to every Redis key after the client keyPrefix |
table (Postgres) |
'svti_idempotency' |
Table name |
ttl |
172800 (48h) |
Result cache lifetime in seconds |
acquireTtl |
60 |
Pending-slot lifetime in seconds (anti-deadlock) |
autoMigrate (Postgres) |
true |
Auto-create the table on first use |
cleanupInterval (Postgres) |
60000 |
Periodic expired-row cleanup interval in ms (0 to disable) |
breaker |
- | Circuit breaker; bypassed when broken |
metrics |
- | Prometheus registry; emits idempotency_*_total counters |
| Method | Description |
|---|---|
acquire(key) |
Returns {acquired, commit, abort} or {acquired: false, pending: true} or {acquired: false, result} |
purge(key) |
Drop a single cached result |
clear() |
Drop every key under the configured prefix / table |
destroy() (Postgres only) |
Stop the cleanup timer |
acquireTtl is the upper bound on how long a single execution of the wrapped operation can run before retries see the slot as available again. Set it longer than your worst expected latency for the wrapped handler, but short enough that a crashed instance does not block retries for too long. The default (60 seconds) suits most HTTP and RPC handlers; bump it for long-running tasks (large file uploads, multi-step workflows) and trim it for tight read-heavy paths.
The adapter's in-memory createDedup plugin (svelte-adapter-uws/plugins/dedup) is the single-instance fallback for the same contract. The shape is identical, so swapping the backend is a one-line change. Use the in-memory plugin for tests and single-process deployments; reach for this store the moment a second instance enters the picture.
Cluster-wide mutual-exclusion primitive. The adapter ships an in-memory Lock plugin that serializes withLock(key, fn) per key on a single instance via a Map<string, Promise>; this is the Redis-backed swap for multi-instance deployments. Distinct from redis/fence (B2c): the fence module is task-runner-specific (one fence per taskId, paired with the Postgres state machine); this is a general-purpose mutex any user code can grab.
Authorization:
withLock(key, fn)serializes whoever calls it under that key; it does not check whether the caller owns the resource the key represents. A wire-supplied lock key lets an attacker grab a lock on a resource they don't own and stall any legitimate owner. Derive lock keys from a trusted prefix - e.g.\account:${assertedAccountId(ctx, payload.accountId)}``. See Authorization model.
import { redis } from './redis.js';
import { createDistributedLock } from 'svelte-adapter-uws-extensions/redis/lock';
export const lock = createDistributedLock(redis, {
defaultTtlMs: 30_000,
maxWaitMs: 5000
});
import { lock } from '$lib/server/lock';
await lock.withLock('order-42', async (signal) => {
// serialized cluster-wide. Bail when `signal.aborted` if the heartbeat
// detects we lost ownership mid-flight.
if (signal.aborted) return;
await processOrder(42);
});
withLock(key, fn, options?) runs fn while holding a cluster-wide mutex on key. The user fn cannot forget to release - the lock module owns the release path. fn's return value is forwarded through; errors thrown by fn propagate after the lock is released.
SET <fullKey> <fenceToken> NX PX <ttlMs> - atomic test-and-set with a TTL. The fence token is a per-call random UUID so a stale heartbeat or release from a previous holder can never affect a new holder.retryDelayMs and try again. After maxWaitMs total wait, throw LockAcquireTimeoutError.fn is running, a heartbeat tick every heartbeatMs (defaults to defaultTtlMs / 3) refreshes the TTL via Lua-atomic if get == fenceToken then pexpire end. If the heartbeat returns 0 (we no longer own the key - operator force-takeover, TTL elapsed before we could refresh), the supplied AbortSignal fires with a LockLostError and the heartbeat stops.fn's completion (success or error), Lua-atomic if get == fenceToken then del end releases the key. Skipped if we already lost ownership (no-op via the compare guard regardless).The AbortSignal shape is the cluster-correctness story: when the heartbeat detects loss, your code learns immediately and can bail instead of continuing to mutate state another holder now thinks it owns. Listen for the abort event or check signal.aborted at long-running checkpoints.
| Option | Default | Description |
|---|---|---|
keyPrefix |
'lock:' |
Prefix prepended (after the client keyPrefix) to every lock key. |
defaultTtlMs |
30000 |
TTL on a held lock in milliseconds. The heartbeat refreshes this back to the original value before it elapses; if the holder dies and stops heartbeating, the lock auto-expires after at most ttlMs so cluster work is not permanently blocked. |
retryDelayMs |
50 |
Sleep between acquire retries when the key is held by another instance. Constant retry; no exponential backoff. |
maxWaitMs |
5000 |
Total time to wait before throwing LockAcquireTimeoutError. Override per call via withLock(key, fn, { maxWaitMs }). |
heartbeatMs |
defaultTtlMs / 3 |
Heartbeat refresh interval. Default keeps margin for one missed beat. |
mapKey |
identity | Map lock key names to bounded label values for metric cardinality. |
breaker |
- | Optional circuit breaker for the Redis ops. |
metrics |
- | Optional Prometheus metrics registry. |
| Option | Description |
|---|---|
ttlMs |
Override the holder TTL for this call. |
maxWaitMs |
Override the maximum acquire wait for this call. |
signal |
External cancellation signal. Aborts the acquire loop and the inner-fn execution; the lock is still released cleanly on the way out (we held it; we give it back). |
LockAcquireTimeoutError - thrown by withLock when maxWaitMs elapses without a successful acquire. Properties: key, waitedMs.LockLostError - surfaced via signal.reason (and controller.abort(...) on the user fn's signal) when the heartbeat detects we no longer own the key. The user fn keeps running through this; it's up to your code to react. Property: key.| Metric | Description |
|---|---|
lock_acquired_total{key_class} |
Counter of successful acquires. key_class is mapKey(key). |
lock_acquire_wait_ms |
Histogram of time waited from withLock call to successful acquire (ms). Observed only on success. |
lock_acquire_timeouts_total{key_class} |
Counter of withLock calls that exceeded maxWaitMs without acquiring. |
lock_lost_total{key_class} |
Counter of locks lost mid-flight via heartbeat detection. A non-zero rate signals operator force-takeover or holder TTL elapsing - both indicate defaultTtlMs should be raised or work should be split into smaller chunks. |
createDistributedLock when business logic needs "only one instance runs this critical section at a time" - dedicated lookups against rate-limited APIs, periodic cluster work that must not double-fire, transactional state machines that don't fit createTaskRunner's shape.createTaskRunner when the work is a durable side-effect that must finish exactly once across crashes (charge customer, send email). The runner pairs a Postgres state machine with the Redis fence to guarantee at-most-one and at-least-once delivery.createIdempotencyStore when the contract is "this operation has a result, and a retry within the TTL must return the same result." Mutex semantics are not the goal - caching the outcome is.createLeader (next section) when the question is "which one of N workers should fire this scheduled job right now," not "who runs this critical section." Distinct from withLock: leader is a long-lived synchronous observer, lock is a request-scoped serializer.Cluster-wide leader-election primitive via Redis lease. One worker across the cluster holds the lease at any moment; the synchronous isLeader() getter is microsecond-cost and cached. Use for cluster-wide singletons (cron schedulers, periodic cleanup, health probes that should run from one place) where firing N times across N workers would be wrong. Designed to plug into svelte-realtime's live.configureCron({ leader }) hook.
// src/lib/server/leader.js
import { redis } from './redis.js';
import { createLeader } from 'svelte-adapter-uws-extensions/redis/leader';
export const leader = createLeader(redis);
// src/hooks.ws.js
import { leader } from '$lib/server/leader';
import { configureCron } from 'svelte-realtime/server';
export function init() {
configureCron({ leader: leader.isLeader });
}
export async function shutdown() {
// Best-effort release so a sibling takes over within `renewMs`
// instead of waiting for the full lease to expire.
await leader.stop();
}
isLeader() is the only call on the hot path. It reads a cached boolean (no Redis I/O, microsecond cost) and is safe to call at every cron tick / scheduled-job entry. Falsy means "another worker holds the lease" or "no Redis, fail closed" - in both cases the consumer skips.
SET <fullKey> <instanceId> NX PX <leaseMs> on construction. If the key was free, this worker holds the lease and _isLeader flips true.renewMs (default leaseMs / 3), Lua-atomic if get == instanceId then pexpire end. Returns 1 -> still ours, refreshed; returns 0 -> we lost it (force-takeover or TTL elapsed before renewal could land), _isLeader flips false._isLeader is false, the same renewal tick attempts a fresh SET NX. As soon as the holder releases or the lease expires server-side, the next non-leader to tick wins.stop() runs Lua-atomic if get == instanceId then del end. Skipped if we already lost ownership; the compare guard means we never accidentally delete a sibling's lease.The compare-on-mutate guard on both renew and release means a stale tick from a worker that already lost leadership cannot extend or release somebody else's lease. Same shape and the same shared Lua scripts as redis/lock's heartbeat - intentionally identical so the two primitives can't drift on lease semantics.
A renewal that throws (Redis disconnect, breaker open, network partition) drops _isLeader to false and surfaces the error to onError. The renewal interval keeps ticking so leadership can recover when Redis recovers. Errors never escape the interval - a Redis blip cannot crash the worker.
Across the cluster, a partitioned Redis means the lease expires server-side and no worker holds leadership until the partition heals - jobs miss ticks rather than double-fire. Better-safe-than-double-fire is the deliberate choice: across most cron consumers, missing one tick is acceptable while running a job twice is not.
GC-pause caveat: a long stop-the-world pause on the leader can cause brief overlap with a freshly-elected successor. Make jobs idempotent at the consumer; this primitive does not provide fencing tokens (consumer sinks for cron-style work rarely have the machinery to consume them anyway).
| Option | Default | Description |
|---|---|---|
key |
'leader' |
Redis key for the lease (prefixed by the client keyPrefix). |
instanceId |
random hex | This worker's identity. Override only if you want a stable identity for diagnostics; correctness does not depend on it. |
leaseMs |
30000 |
TTL on the lease in milliseconds. Worst-case window between leader death and successor takeover. |
renewMs |
leaseMs / 3 |
Renewal interval, also the interval at which non-leaders attempt fresh acquire. Must be < leaseMs. |
onError |
- | Called on every Redis failure. Use for structured logging. Errors never escape the renewal interval regardless. |
mapKey |
identity | Map the lease key to a bounded label value for cardinality control on the four leader_* counters. |
breaker |
- | Optional circuit breaker. Renewal failures count via breaker.failure(err); successes via breaker.success(). |
metrics |
- | Optional Prometheus metrics registry. |
| Method | Description |
|---|---|
isLeader() |
Synchronous cached check. Microsecond-cost. Call at the top of every scheduled-job entry. |
currentLeader() |
Single-GET diagnostic read of the current owner's instanceId. Returns null if unowned or on Redis failure. |
stop() |
Stop the renewal interval and best-effort release the lease via compare-and-delete. Idempotent. Never throws. |
instanceId |
This worker's identity (provided or generated). |
key |
The fully-prefixed lease key (useful for diagnostics). |
| Metric | Description |
|---|---|
leader_acquired_total{key_class} |
Counter of successful acquires (transitions to leader). |
leader_lost_total{key_class} |
Counter of leadership losses (transitions to non-leader, including renewal failure). |
leader_renewals_total{key_class} |
Counter of successful renewals. |
leader_renewal_failures_total{key_class} |
Counter of renewal calls that threw or returned 0 (lease vanished or taken over). |
createLeader for "exactly one of N workers fires this scheduled job." Cluster-wide singleton observation. The primitive holds long-lived state (the lease); the consumer polls isLeader() synchronously at every tick.createDistributedLock for "only one of N workers runs this critical section right now." Per-call mutual exclusion around a function that returns. The primitive holds the lock only while the function is running and forwards the function's return value.Both use the same backing Lua scripts and the same lease semantics; they differ in consumer shape (long-lived observer vs. scoped serializer).
Cluster-wide session store with sliding TTL. The adapter ships an in-memory Session plugin (svelte-adapter-uws/plugins/session) that holds Map<token, data> in process memory; this is the Redis-backed swap for multi-instance deployments where a session created on instance A must be readable from instance B after a load-balancer hop.
Pairs with Connection registry: when both are wired, the session provides the durable per-user state (survives disconnect, persists across reconnect, available before a WS even opens), while the registry provides the live "where is this user right now" pointer (only meaningful while the WS is connected). They answer different questions and compose without overlap.
Authorization: the session store maps tokens to data. Producing the token-to-user binding (your auth layer) and validating the token before lookup (your
upgrade()hook or middleware) are NOT the plugin's job. If your handler callssession.get(payload.token)with a wire-supplied token without first confirming the caller owns it, an attacker can lift any active token they happen to know and impersonate its owner. See Authorization model.
import { redis } from './redis.js';
import { createDistributedSession } from 'svelte-adapter-uws-extensions/redis/session';
export const sessions = createDistributedSession(redis, {
ttlMs: 30 * 60 * 1000 // 30 minutes
});
// On login (HTTP handler):
await sessions.set(token, { userId: 42, role: 'admin' });
// On WS upgrade or HTTP request:
const data = await sessions.get(token);
if (!data) return error(401, 'session expired');
// Refresh without reading data:
await sessions.touch(token);
// Logout:
await sessions.delete(token);
Each session is one Redis string at {prefix}sess:{token} containing the JSON-encoded data, with a TTL of ttlMs. Single round-trip for every operation: SET key json PX ttl to write, GET key plus PEXPIRE key ttl to read with sliding refresh, PEXPIRE to touch, UNLINK to delete. JSON blob keeps the whole record atomic on replace; partial-field updates are not a feature - callers do set(token, { ...await get(token), changed: 'value' }) for that.
Every set resets the TTL to ttlMs. By default get and touch also extend the TTL on a hit (the adapter's bundled Session plugin does the same). Disable read-time refresh via refreshOnGet: false for read-only flows where reads should not act as liveness signals.
touch(token) and delete(token) return true if the entry was present and the operation succeeded, false if the entry was missing or already expired. get(token) returns null for missing, expired, or corrupt (non-JSON) entries; the corrupt-entry path is treated as a miss so the next set cleanly overwrites.
| Option | Default | Description |
|---|---|---|
keyPrefix |
'sess:' |
Prefix prepended (after the client keyPrefix) to every session key. |
ttlMs |
86_400_000 (24h) |
Sliding TTL window in milliseconds. |
refreshOnGet |
true |
Whether get(token) extends the TTL on a hit. |
breaker |
- | Optional circuit breaker for the Redis ops. |
metrics |
- | Optional Prometheus metrics registry. |
The shape is identical: same get / set / delete / touch / clear names, same sliding-TTL semantics. The Redis-backed methods return Promise<T> while the bundled plugin's are synchronous, so a swap requires await on the call sites - but otherwise the contract is shared. Use the bundled plugin for tests and single-process deployments; reach for this store the moment a second instance enters the picture.
Sessions and the Connection registry answer different questions:
Wire them together by storing the auth token both on the WS userData (so the registry can identify the user) and using the same token to look up session data:
// hooks.ws.js
import { sessions } from '$lib/server/sessions';
import { registry } from '$lib/server/registry';
export async function upgrade({ cookies }) {
const token = cookies.get('session_id');
if (!token) return false;
const session = await sessions.get(token);
if (!session) return false;
return { token, userId: session.userId };
}
export const open = registry.hooks.open;
export const close = registry.hooks.close;
export async function message(ws) {
await sessions.touch(ws.getUserData().token);
}
| Metric | Description |
|---|---|
| `session_get_total{result="hit | miss"}` |
session_set_total |
Counter of set calls. |
| `session_delete_total{result="present | absent"}` |
| `session_touch_total{result="present | absent"}` |
clear() and operational notesclear() removes every session under the configured keyPrefix via SCAN + UNLINK. Cluster-wide cost scales with total session count - not a hot-path operation. Use for graceful-shutdown teardowns, test harnesses, or operator-initiated wipes (e.g., post-incident "log everyone out"). Other keys outside the prefix are untouched.
There is intentionally no size() method: a cluster-wide count of session keys requires SCAN every time, and exposing it as a synchronous-looking accessor would be misleading. Apps that want session counts should track a separate Redis SET on set / delete and SCARD it.
Wraps an effectful operation in a state machine that survives process crashes and naturally fans across cluster instances. Use it for background work that absolutely must finish exactly once: charging a customer, sending a transactional email, posting to a webhook, kicking off a long-running pipeline.
Requires Postgres 13+. Uses the built-in gen_random_uuid() function (added to core in 13; older versions need the pgcrypto extension explicitly enabled, which the runner does not do for you).
Task names must match /^[a-zA-Z][a-zA-Z0-9_-]*$/ - start with a letter, then letters/digits/underscores/hyphens. Names starting with _ or a digit are rejected at register() time. Trips test fixtures most often (__noop -> noop).
Authorization:
run(taskName, args)executes the registered handler with whatever args the caller passes. If your message handler forwards a wire payload straight torun(...)without validating that the caller is allowed to invoke this task and own the inputs, an attacker can drive arbitrary registered tasks with arbitrary inputs - charge an account they don't own, send a webhook on someone else's behalf. The handler is server-trusted; the args are not. Validate at the handler boundary, treat args server-side as untrusted input. See Authorization model.
Three guarantees:
idempotency option. When a caller passes the same idempotencyKey twice, the second call returns the cached result instead of re-running the handler.fence_expires_at timestamp. The conditional commit UPDATE ... WHERE fence = $current_fence is atomic, so a stuck attempt that comes back from the dead cannot overwrite a completed attempt's result. A periodic recovery sweep reclaims rows whose fence has expired and re-drives the handler in any live instance.idempotencyKey is forwarded to the handler, where you pass it on to Stripe / SendGrid / S3 so the side-effect target de-duplicates retries too.// src/lib/server/tasks.js
import { pg } from './pg.js';
import { idempotency } from './idempotency.js';
import { createTaskRunner } from 'svelte-adapter-uws-extensions/postgres/tasks';
export const tasks = createTaskRunner(pg, {
idempotency, // optional but recommended
fenceTtl: 60, // seconds; per-attempt fence lifetime
recoveryInterval: 30000,
cleanupInterval: 3600000,
rowTtl: 7 * 24 * 3600 // keep terminal rows for 7 days
});
tasks.register('charge-customer', async ({ input, idempotencyKey, requestId, signal }) => {
log.info({ requestId, customerId: input.customerId }, 'charging customer');
return await stripe.paymentIntents.create(
{ amount: input.amount, customer: input.customerId },
{ idempotencyKey, signal }
);
}, {
retry: {
maxAttempts: 5,
backoff: (attempt) => Math.min(1000 * 2 ** (attempt - 1), 60000),
on: (err) => err.type === 'StripeAPIError'
}
});
// In a form action, RPC handler, anywhere with an awaited result
import { tasks } from '$lib/server/tasks';
export const actions = {
pay: async ({ request, locals, platform }) => {
const { amount } = Object.fromEntries(await request.formData());
const result = await tasks.run('charge-customer', {
input: { amount, customerId: locals.user.stripeCustomerId },
idempotencyKey: `charge-${locals.user.id}-${request.headers.get('idempotency-key')}`,
platform // captures platform.requestId onto the row, exposed as ctx.requestId in the handler
});
return { success: true, paymentIntentId: result.id };
}
};
Pass platform (the SvelteKit event.platform) to capture the originating request id automatically - it lands on svti_tasks.request_id and surfaces as ctx.requestId in the handler so logs from inside the task correlate back to the WS / HTTP request that started it. Override explicitly via the requestId option for non-request contexts (cron, recovery, manual invocation).
The table is created automatically on first use (if autoMigrate is true):
CREATE TABLE IF NOT EXISTS svti_tasks (
svti_tasks_id UUID PRIMARY KEY,
name TEXT NOT NULL,
input JSONB,
svti_idempotency_key TEXT,
request_id TEXT, - originating platform.requestId, or null
status TEXT NOT NULL, - 'running' | 'committed' | 'failed'
result JSONB,
error JSONB,
fence UUID NOT NULL,
fence_expires_at TIMESTAMPTZ NOT NULL,
attempts INT NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS idx_svti_tasks_running_fence
ON svti_tasks (fence_expires_at) WHERE status = 'running';
CREATE INDEX IF NOT EXISTS idx_svti_tasks_terminal_updated
ON svti_tasks (updated_at) WHERE status IN ('committed', 'failed');
Existing 0.5.0-next.1 deployments forward-migrate via ALTER TABLE ... ADD COLUMN IF NOT EXISTS request_id TEXT on first use; idempotent and zero-downtime.
| Option | Default | Description |
|---|---|---|
table |
'svti_tasks' |
Table name |
idempotency |
- | An idempotency store (above). When provided, results are cached per idempotencyKey. Strongly recommended. |
fenceTtl |
60 |
Per-attempt fence lifetime in seconds. Heartbeat extends it while the handler runs. |
heartbeatInterval |
fenceTtl * 1000 / 3 |
ms between fence heartbeats |
recoveryInterval |
30000 |
ms between recovery sweeps. 0 disables. |
recoveryBatchSize |
10 |
Max rows reclaimed per sweep |
dispatchInterval |
5000 |
ms between dispatch sweeps that claim enqueued pending rows. 0 disables. |
dispatchBatchSize |
10 |
Max pending rows claimed per dispatch sweep |
awaitPollInterval |
500 |
ms between row reads while await() waits |
awaitTimeout |
60000 |
ms after which await() rejects if the task is still not terminal. 0 = no timeout. |
cleanupInterval |
3600000 |
ms between cleanup sweeps. 0 disables. |
rowTtl |
604800 (7 days) |
Seconds to keep terminal rows before deletion |
autoMigrate |
true |
Auto-create the table on first use. Migration is kicked off at construction; await tasks.ready() to block until it lands. |
onStateChange |
- | Local-worker callback fired AFTER each state-machine transition commits. See Live observation below. |
breaker |
- | Circuit breaker; bypassed when broken |
metrics |
- | Prometheus registry; emits tasks_*_total counters |
The runner owns every state-machine transition; expose what it knows via the onStateChange callback or read directly with list() / counts(). Both work without standing up postgres/notify.
const tasks = createTaskRunner(pg, {
onStateChange(event) {
// event.{ taskId, name, oldStatus, newStatus, attempt, requestId, result?, error? }
metrics.inc(`tasks.${event.name}.${event.newStatus}`);
if (event.newStatus === 'committed') {
bus.publish(`task:${event.taskId}`, 'committed', { result: event.result });
}
}
});
// Wait for migration before polling.
await tasks.ready();
// Dashboard / admin polling tick.
const recent = await tasks.list({ limit: 30 });
const summary = await tasks.counts({ name: 'charge-customer' });
// summary = { pending, running, committed, failed, total }
Transitions fired:
null -> pending - enqueue() inserts the row.null -> running - run() inserts the first attempt directly.pending -> running - the dispatch sweep claims the row.running -> running - retry rearm (within the same run() loop) OR recovery sweep (a sibling instance reclaims an expired fence). attempt bumps either way.running -> committed - handler succeeded. event.result carries the value.running -> failed - handler errored past retry.maxAttempts. event.error carries the JSON-safe { name, message, stack?, code?, cause? } shape.Errors thrown from the callback (sync or via promise rejection) are caught and console.warned; they do NOT roll back the state machine. Listeners run on the same tick as the SQL commit but are awaited fire-and-forget; the runner never blocks on them.
Cluster-wide fan-out is a separate concern: onStateChange fires on the worker that performed the transition, not across the cluster. Use postgres/notify if every instance needs to react.
// Drain this instance: kick its in-flight tasks off so the recovery sweep
// on a healthy instance picks them up faster than waiting for the
// fence_expires_at deadline.
const took = await tasks.takeover(taskId);
// took === true if a row was running and got taken over.
takeover(taskId) expires the Postgres fence AND (when a Redis fence provider is configured) releases the Redis mirror key, cutting abort latency from O(heartbeatInterval) to one tick. The recovery sweep then reclaims the row and re-drives the handler under the registered retry policy.
Retry is declared at registration so the policy travels with the handler, not with each call site:
tasks.register('flaky-webhook', handler, {
retry: {
maxAttempts: 5,
backoff: (attempt, err) => Math.min(1000 * 2 ** (attempt - 1), 60000),
on: (err) => !(err instanceof PermanentError)
}
});
Default is no retry on handler-thrown errors - safe for non-idempotent tasks. Stuck recovery (fence-expired-while-running) is always on; it is not the same thing as retry-on-failure.
{
input: TInput, // the input passed to run()
idempotencyKey: string | undefined, // forward to external services
fence: string, // this attempt's fence UUID, read-only
signal: AbortSignal, // aborts when the fence is lost
attempt: number // 1-based attempt counter
}
The signal fires when the heartbeat detects another worker has reclaimed the row (your fence_expires_at passed and the recovery sweep took over). Pass it to fetch, Stripe, anything that supports cancellation - the handler should bail gracefully when it fires rather than racing the new owner.
run() throws three error shapes:
UnknownTaskError - no handler registered for that name in this process. Recovery does not throw on unknown names because the handler may live on a different deployment.TaskInFlightError - the idempotency store reports the slot as pending (another caller is mid-flight for the same key). Caller may surface a 409 to the upstream HTTP request or retry after a backoff.{name, message, stack, code, cause} for the failed-row record and reconstructed as a plain Error if a sibling caller reads the row.enqueue + awaitrun() blocks the calling process until the handler finishes. Two more verbs let you decouple submission from completion:
enqueue(name, opts) - fire-and-forget. Inserts the row with status='pending' and returns the taskId immediately. A dispatch sweep on any live instance picks up the row and runs the handler in the background.await(taskId, opts?) - block until the task reaches a terminal status. Returns the committed result, throws the stored error, or rejects with a timeout if the task is still pending/running past awaitTimeout.import { tasks } from '$lib/server/tasks';
// Submit a job that will be processed elsewhere
const taskId = await tasks.enqueue('send-welcome-email', {
input: { userId: locals.user.id },
idempotencyKey: `welcome-${locals.user.id}`
});
// Optionally block on completion (or fire-and-forget by skipping this)
const result = await tasks.await(taskId, { timeout: 30000 });
Use cases:
enqueue and respond with the taskId. The client polls a status endpoint that reads the row.running until another instance reclaims them.The dispatch loop runs every dispatchInterval ms (default 5000) and claims up to dispatchBatchSize pending rows per sweep via FOR UPDATE SKIP LOCKED. Set dispatchInterval: 0 to disable dispatch entirely (use only run() paths).
await polls the task row every awaitPollInterval ms (default 500) until terminal or awaitTimeout ms (default 60000). For most use cases the runner-level defaults are fine; per-call overrides are available:
await tasks.await(taskId, { pollInterval: 100, timeout: 10000 });
Errors thrown by the handler are reconstructed from the stored row (name, message, stack, code, cause) when await reads a failed row. The reconstructed error is a plain Error; the original prototype chain does not survive the JSON round-trip.
By default the handler runs in the current process. For CPU-bound work that would otherwise block the event loop - image resize, hashing, large JSON parse, anything that genuinely consumes a CPU core for long enough to matter - you can opt in to running the handler in a worker thread.
The handler lives in a separate file whose default export is the handler. The runner spawns a thread pool per task; each thread imports the handler file once at startup and reuses the import across runs. Database and Redis clients cannot be shared across worker threads (native handles do not cross thread boundaries), so the worker file boots its own.
// src/lib/server/workers/resize.js
import sharp from 'sharp';
export default async function resize({ input, signal }) {
return await sharp(input.imageBuffer, { signal })
.resize(input.width, input.height)
.toBuffer();
}
// src/lib/server/tasks.js
import { tasks } from './tasks.js';
tasks.register('resize-image', null, {
worker: new URL('./workers/resize.js', import.meta.url)
});
// Or with explicit pool config:
tasks.register('resize-image', null, {
worker: {
path: new URL('./workers/resize.js', import.meta.url),
pool: { size: 4, idleTimeout: 30000 }
}
});
When worker is set, the handler argument to register must be null or omitted - the handler argument exists in the worker file, not at the registration site. Pool defaults: size: 1, idleTimeout: 30000 ms (set to 0 to keep workers warm forever). Workers spawn lazily on first run; idle workers past idleTimeout are terminated.
The signal in the handler context fires when the runner detects a fence loss, exactly as for in-process handlers. The runner forwards an abort message to the worker; the worker translates it into a local AbortController.abort() so the handler can bail.
When not to use this:
When this is the right tool: a single handler that synchronously consumes the event loop for tens of milliseconds or more, where blocking the cluster instance's other work is unacceptable.
Pass an optional fence provider to add a second source of truth for "is this attempt's fence still alive". The Postgres row remains the canonical record of task state; the provider mirrors the fence value to an external store with a short TTL refreshed by heartbeat. On every heartbeat tick the runner consults both sources - either reporting "lost" aborts the handler.
The primary value is force-takeover detection. If an operator manually deletes the fence key (or another instance forcibly releases it), the heartbeat sees the divergence and bails immediately, even if the Postgres fence_expires_at would still pass. Useful for ops scenarios like "drain this instance, kick its in-flight tasks off so the recovery sweep on a healthy instance picks them up faster than waiting for the Postgres deadline."
import { createRedisFence } from 'svelte-adapter-uws-extensions/redis/fence';
import { createTaskRunner } from 'svelte-adapter-uws-extensions/postgres/tasks';
export const tasks = createTaskRunner(pg, {
idempotency,
fence: createRedisFence(redis, { keyPrefix: 'fence:' })
});
The provider exposes acquire/heartbeat/release. The runner pairs each with the matching Postgres operation: acquire runs after the row is inserted/rearmed; heartbeat runs before the Postgres heartbeat each tick and short-circuits the abort path on its own; release is best-effort after a terminal commit/fail.
createRedisFence options:
| Option | Default | Description |
|---|---|---|
keyPrefix |
'fence:' |
Prefix prepended (after the client keyPrefix) to every fence key |
The Redis side uses two atomic Lua scripts: heartbeat is if get == fence then pexpire end, release is if get == fence then del end. No fence held by another owner can be released or refreshed by accident.
Observability
Exposes extension metrics in Prometheus text exposition format. No external dependencies. Zero overhead when not enabled - every metric call uses optional chaining on a nullish reference, so V8 short-circuits on a single pointer check.
// src/lib/server/metrics.js
import { createMetrics } from 'svelte-adapter-uws-extensions/prometheus';
export const metrics = createMetrics({
prefix: 'myapp_',
mapTopic: (topic) => topic.startsWith('room:') ? 'room:*' : topic
});
Pass the metrics object to any extension via its options:
import { metrics } from './metrics.js';
import { redis } from './redis.js';
import { createPresence } from 'svelte-adapter-uws-extensions/redis/presence';
import { createPubSubBus } from 'svelte-adapter-uws-extensions/redis/pubsub';
import { createReplay } from 'svelte-adapter-uws-extensions/redis/replay';
import { createRateLimit } from 'svelte-adapter-uws-extensions/redis/ratelimit';
import { createGroup } from 'svelte-adapter-uws-extensions/redis/groups';
import { createCursor } from 'svelte-adapter-uws-extensions/redis/cursor';
export const bus = createPubSubBus(redis, { metrics });
export const presence = createPresence(redis, { metrics, key: 'id' });
export const replay = createReplay(redis, { metrics });
export const limiter = createRateLimit(redis, { points: 10, interval: 1000, metrics });
export const lobby = createGroup(redis, 'lobby', { metrics });
export const cursors = createCursor(redis, { metrics });
With uWebSockets.js:
app.get('/metrics', metrics.handler);
Or use metrics.serialize() to get the raw text and serve it however you like.
Recommended deployment shape: mount the metrics endpoint behind a network barrier whenever possible - a private scrape-only port, an internal load balancer, or a sidecar scrape target. The default metrics.handler does no auth; the assumption is that your scraper is the only thing that can reach the listener.
When same-listener mount is unavoidable (the metrics endpoint shares its port with public traffic), use metrics.authedHandler(predicate):
// Token-based auth, token from env
const expectedToken = process.env.METRICS_SCRAPE_TOKEN;
app.get('/metrics', metrics.authedHandler(
(res, req) => req.getHeader('x-scrape-token') === expectedToken
));
The predicate receives (res, req) and returns truthy to allow or falsy to deny. Async predicates are awaited. Predicate exceptions are caught and treated as denial. Denials return 401 Unauthorized with no metrics body and no internal error info leaked.
| Option | Default | Description |
|---|---|---|
prefix |
'' |
Prefix for all metric names |
mapTopic |
identity | Map topic names to bounded label values for cardinality control |
defaultBuckets |
[1, 5, 10, 25, 50, 100, 250, 500, 1000] |
Default histogram buckets |
maxSeries |
10_000 |
Per-metric series cap; past this, new labelsets are dropped and prometheus_series_dropped_total{metric} counts them. Pass Infinity to disable. |
maxBuckets |
32 |
Maximum buckets a histogram may declare. Throws at registration if exceeded. Pass Infinity to disable. |
Metric names must match [a-zA-Z_:][a-zA-Z0-9_:]* and label names must match [a-zA-Z_][a-zA-Z0-9_]* (no __ prefix). Invalid names throw at registration time. HELP text containing backslashes or newlines is escaped automatically.
If your topics are user-generated (e.g. room:abc123), per-topic labels will grow unbounded. Use mapTopic to collapse them:
const metrics = createMetrics({
mapTopic: (topic) => {
if (topic.startsWith('room:')) return 'room:*';
if (topic.startsWith('user:')) return 'user:*';
return topic;
}
});
Two drop-in wirers for adapter telemetry:
import {
createMetrics,
wirePublishRateMetrics,
connectionMetricsHook
} from 'svelte-adapter-uws-extensions/prometheus';
export const metrics = createMetrics({ prefix: 'myapp_' });
// In setup, once you have a `platform`:
wirePublishRateMetrics(platform, metrics, { topN: 10 });
// In hooks.ws.js:
export const close = connectionMetricsHook(metrics);
wirePublishRateMetrics registers ws_topic_publish_rate{topic="..."} and ws_topic_publish_bytes{topic="..."} gauges that read platform.pressure.topPublishers at scrape time - no continuous accounting on the publish hot path. The topN cap (default 10) bounds gauge cardinality; the registry's mapTopic (or an inline mapTopic option) can further collapse user-generated topic names.
connectionMetricsHook(metrics, userClose?) returns a close-hook that emits per-connection histograms (ws_connection_duration_seconds, ws_connection_messages_in / _out, ws_connection_bytes_in / _out) plus a ws_connection_close_total{code} counter from the close-ctx fields the adapter populates. Compose with your own close logic by passing a function as the second argument; it runs after the metrics are recorded:
export const close = connectionMetricsHook(metrics, async (ws, ctx) => {
// your own teardown - runs after metrics, with the same ctx
});
Requires svelte-adapter-uws >= 0.5.0-next.4: the topPublishers field on the pressure snapshot and the duration / messages / bytes fields on the close ctx are only populated by that version.
Pub/sub bus
| Metric | Type | Description |
|---|---|---|
pubsub_messages_relayed_total |
counter | Messages relayed to Redis |
pubsub_messages_received_total |
counter | Messages received from Redis |
pubsub_echo_suppressed_total |
counter | Messages dropped by echo suppression |
pubsub_parse_errors_total |
counter | Malformed envelopes dropped on receive |
pubsub_relay_batch_size |
histogram | Relay batch size per flush |
pubsub_degraded_total |
counter | Auto-emitted degraded events |
pubsub_recovered_total |
counter | Auto-emitted recovered events |
Sharded pub/sub bus
| Metric | Type | Labels | Description |
|---|---|---|---|
sharded_pubsub_messages_relayed_total |
counter | topic |
Messages SPUBLISHed |
sharded_pubsub_messages_received_total |
counter | topic |
Messages received via SSUBSCRIBE |
sharded_pubsub_echo_suppressed_total |
counter | Sharded messages dropped by echo suppression | |
sharded_pubsub_parse_errors_total |
counter | Malformed envelopes dropped on receive | |
sharded_pubsub_ssubscribes_total |
counter | SSUBSCRIBE calls (first follower per channel) | |
sharded_pubsub_sunsubscribes_total |
counter | SUNSUBSCRIBE calls (last follower out) |
Adapter telemetry (wirePublishRateMetrics + connectionMetricsHook)
| Metric | Type | Labels | Description |
|---|---|---|---|
ws_topic_publish_rate |
gauge | topic |
Messages per second sampled from platform.pressure.topPublishers (top N) |
ws_topic_publish_bytes |
gauge | topic |
Bytes per second sampled from platform.pressure.topPublishers (top N) |
ws_connection_duration_seconds |
histogram | Connection duration in seconds at close | |
ws_connection_messages_in |
histogram | Messages received per connection at close | |
ws_connection_messages_out |
histogram | Messages sent per connection at close | |
ws_connection_bytes_in |
histogram | Bytes received per connection at close | |
ws_connection_bytes_out |
histogram | Bytes sent per connection at close | |
ws_connection_close_total |
counter | code |
Connections closed by close code |
Presence
| Metric | Type | Labels | Description |
|---|---|---|---|
presence_joins_total |
counter | topic |
Join events |
presence_leaves_total |
counter | topic |
Leave events |
presence_heartbeats_total |
counter | Heartbeat refresh cycles | |
presence_stale_cleaned_total |
counter | Stale entries removed by cleanup | |
presence_total_online |
gauge | topic |
Unique users present per topic on this instance |
presence_heartbeat_latency_ms |
gauge | Duration of the most recent heartbeat tick in ms | |
presence_keyspace_cleanups_total |
counter | Topic hash expiries that triggered an empty-list emit (keyspace mode only) |
Replay buffer (Redis and Postgres)
| Metric | Type | Labels | Description |
|---|---|---|---|
replay_publishes_total |
counter | topic |
Messages published |
replay_messages_replayed_total |
counter | topic |
Messages replayed to clients |
replay_truncations_total |
counter | topic |
Truncation events detected |
replay_replications_total |
counter | Publishes confirmed replicated within timeout (Redis only, durability: 'replicated' mode) |
|
replay_replication_timeouts_total |
counter | Publishes that did not reach minReplicas within timeout |
|
replay_idmp_hits_total |
counter | topic |
publishIdempotent calls served from the dedup cache (no XADD) |
replay_idmp_writes_total |
counter | topic |
publishIdempotent calls that produced a new entry |
Rate limiting
| Metric | Type | Description |
|---|---|---|
ratelimit_allowed_total |
counter | Requests allowed |
ratelimit_denied_total |
counter | Requests denied |
ratelimit_bans_total |
counter | Bans applied |
Broadcast groups
| Metric | Type | Labels | Description |
|---|---|---|---|
group_joins_total |
counter | group |
Join events |
group_joins_rejected_total |
counter | group |
Joins rejected (full) |
group_leaves_total |
counter | group |
Leave events |
group_publishes_total |
counter | group |
Publish events |
Cursor
| Metric | Type | Labels | Description |
|---|---|---|---|
cursor_updates_total |
counter | topic |
Cursor update calls |
cursor_broadcasts_total |
counter | topic |
Broadcasts actually sent |
cursor_throttled_total |
counter | topic |
Updates deferred by throttle |
LISTEN/NOTIFY bridge
| Metric | Type | Labels | Description |
|---|---|---|---|
notify_received_total |
counter | channel |
Notifications received |
notify_parse_errors_total |
counter | channel |
Parse failures |
notify_reconnects_total |
counter | Reconnect attempts |
Admission control
| Metric | Type | Labels | Description |
|---|---|---|---|
admission_accepted_total |
counter | class |
shouldAccept calls that returned true |
admission_rejected_total |
counter | class, reason |
shouldAccept calls that returned false, labeled with the pressure reason that caused rejection |
Job queue
| Metric | Type | Labels | Description |
|---|---|---|---|
jobs_enqueued_total |
counter | queue |
Jobs enqueued |
jobs_claimed_total |
counter | queue |
Jobs claimed (rows returned by claim) |
jobs_completed_total |
counter | queue |
Jobs completed (deleted) |
jobs_failed_total |
counter | queue |
Jobs released via fail() for retry |
Redis Functions
| Metric | Type | Labels | Description |
|---|---|---|---|
redis_function_loads_total |
counter | library |
FUNCTION LOAD calls |
redis_function_calls_total |
counter | library, function |
FCALL calls |
redis_function_errors_total |
counter | library, function |
FCALL calls that threw |
Three helpers in shared/sensitive.js (re-exported from svelte-adapter-uws-extensions/sensitive) cover the recurring "this value might be a credential" problem so callers do not have to write a redactor per call site.
stripInternal(obj) -- safe to spread, safe to logRecursively strips sensitive and adapter-internal keys from a user-supplied object, returning a clone you can hand to Object.assign, JSON.stringify, or console.log without leaking credentials or polluting the prototype chain.
import { stripInternal } from 'svelte-adapter-uws-extensions/sensitive';
const userData = JSON.parse(req.body);
const safe = stripInternal(userData);
console.log('upload from', safe); // safe to log
Object.assign(target, safe); // safe to spread
JSON.stringify(safe); // safe to serialize
Safe to spread (no prototype pollution at the target).
__ are dropped at every depth. Catches __proto__ on a JSON.parse('{"__proto__":{...}}') payload before it can reach an Object.assign target.constructor and prototype are dropped. Catches the parallel "shadow via own property on the target" class.{} (not Object.create(null)) so callers depend on .toString(), .hasOwnProperty(), and instanceof Object. The spread-safety guarantee comes from filtering the dangerous keys at iteration, not from a null-prototype result.Safe to log (no credentials in logs).
/token|secret|password|auth|session|cookie|jwt|credential/i are dropped at every depth. Conservative substring match: a session_id field is dropped; a payment_method field is not. If your domain field name happens to contain one of those substrings, give the public-surface field a different name.Buffer, Uint8Array, any TypedArray, DataView, raw ArrayBuffer) are replaced with the string '[bytes: <byteLength>]'. Naively walking a Buffer via Object.keys yields {"0":byte0,"1":byte1,...} which JSON.stringify happily serializes -- if those bytes were a JWT, the credential lands in stderr.Cycle-safe. A per-call WeakSet tracks ancestors; the second visit to a node returns undefined rather than recursing forever.
Not safe for keys whose name does not match the conservative pattern. account_balance, internal_billing_ref, medical_record_id -- the helper does not know those are sensitive in your domain. If your app has domain-specific PII, layer a second redactor on top of stripInternal.
redactConnectionUrl(url) -- safe to embed in logs and errorsRedacts the password segment of a connection URL so the URL is safe to include in error messages, structured log lines, or assertion context. Substitutes :password@ userinfo and password= / pass= / pwd= query params with ***. Other URL bytes pass through unchanged.
import { redactConnectionUrl } from 'svelte-adapter-uws-extensions/sensitive';
const url = 'postgres://user:[email protected]:5432/app?sslmode=require';
console.error('connect failed:', redactConnectionUrl(url));
// connect failed: postgres://user:***@db.internal:5432/app?sslmode=require
Three classes the byte-level scan handles correctly that a naive regex misses:
@ (redis://user:p@ssword@host) -- a first-@ regex stops too early and leaks the password tail; the scan walks to the LAST @ in the authority region.redis://:secret@[::1]:6379) -- bracket-aware scanning suspends authority-terminator detection inside [...] so the : and @ of an IPv6 host are not confused with userinfo.postgres://host/db?password=hunter2) -- pg accepts password as a connection parameter; redaction is case-insensitive and also matches pass and pwd.Non-string input passes through String(url) without scanning, so callers can pipe arbitrary error context through without a type guard. Used internally by every connection failure log so a leaked DSN cannot escape via an unwrapped err.message.
createSensitiveWarner(prefix) -- one-shot dev warningReturns a function that scans a userData object (up to depth 3) and calls console.warn once if it finds a key matching /token|secret|password|key|auth|session|cookie|jwt|credential/i. After the first warning the function latches and is a no-op. Designed for plugin select callbacks: the plugin wires a warner per topic and surfaces a one-time hint when a developer forgets to strip credentials from broadcast data.
import { createSensitiveWarner } from 'svelte-adapter-uws-extensions/sensitive';
const warn = createSensitiveWarner('redis/cursor');
warn(userData); // logs once if userData has anything credential-shaped
warn(userData); // no-op after the latch
The warner uses the broader pattern (the bare substring "key" is included) because at the warning stage the cost of a false positive is just a developer-facing console.warn line, not silent data loss. The redaction pattern used by stripInternal deliberately omits "key" to avoid dropping legitimate id-like fields.
zod, valibot, custom) at the message-handler boundary.stripInternal works but is a heavy hammer -- a typed select projection is usually clearer.Reliability
Every internal Map / Set / queue of factory-or-module-level scope declares an explicit upper bound and a documented saturation behavior, so a runaway publisher, a subscribe-in-loop bug, or a topic-cardinality leak can no longer exhaust process memory silently. Mirrors the adapter's 0.5.0-next.8 capacity-cap pattern, scaled one tier up: where the adapter caps per-connection at 1M, we cap per-instance at 10M (an instance can hold ~1M concurrent uWS connections with ~10 entries of state per connection on average), and cluster-wide warn-only caps land at 100M (beyond which the in-memory index itself becomes a real memory concern, ~3.2GB on each instance maintaining the index).
Saturation behavior is matched to each data structure's contract:
userToInstance for sendTo routing, the secondary index buckets, the cluster-wide user index): warn-only. Eviction would corrupt routing or matching, so a single structured console.warn fires the first time the cap is crossed - surfacing the leak shape - and the index keeps growing.WS_COALESCED and the throttle/debounce plugins).Caps live as named constants in shared/caps.js. They are not currently configurable per-instance; if a real workload needs a tighter or looser bound for a specific module, raise an issue and we'll add a per-factory option.
| Cap | Default | Saturation | Notes |
|---|---|---|---|
MAX_REGISTRY_SESSIONS_PER_INSTANCE |
10_000_000 | reject new | hooks.open skips the registration past the cap |
MAX_REGISTRY_PENDING_REQUESTS |
10_000_000 | reject new | request(...) rejects with "pending requests exceeded" |
MAX_REGISTRY_USER_INDEX |
100_000_000 | warn-only | cluster-wide; eviction would mis-route sendTo |
MAX_REGISTRY_INDEX_VALUES_PER_KEY |
10_000_000 | warn-only | per-attribute-key bucket in the secondary index |
MAX_SHARDED_BUS_TOPICS |
10_000_000 | reject new | follow / followBatch reject distinct-new past the cap |
MAX_SHARDED_BUS_BATCH_CHANNELS_PER_TICK |
1_000_000 | warn-only | per-microtask outbound batch |
MAX_PUBSUB_RELAY_BATCH_PER_TICK |
1_000_000 | warn-only | per-microtask outbound batch |
MAX_PRESENCE_WS |
10_000_000 | reject new | per-instance ws joins |
MAX_PRESENCE_TOPICS |
10_000_000 | reject new | per-instance topic count |
MAX_CURSOR_WS |
10_000_000 | reject new | per-instance ws cursor activity |
MAX_CURSOR_TOPICS |
10_000_000 | reject new | per-instance cursor topic count |
MAX_GROUPS_LOCAL_MEMBERS |
10_000_000 | reject new | treated as "group full" |
MAX_TASK_HANDLERS |
10_000 | reject new | bootstrap-time register(name, handler) calls |
MAX_REDIS_DUPLICATES_PER_CLIENT |
1_000 | warn-only | duplicate ioredis connections per client wrapper |
MAX_AGGREGATOR_REMOTE_INSTANCES |
10_000 | warn-only | sibling instances on the publish-rate aggregator |
MAX_BREAKER_LISTENERS |
10_000 | reject new | listeners on a single breaker |
Aggregate-memory protection still belongs to the adapter's upgradeAdmission.maxConcurrent (see the adapter's "Layered admission" section); per-instance caps are not the right place to defend against a 10M-connection DoS.
Critical invariants across the extensions are checked at runtime via a two-tier assertion helper that mirrors the adapter's 0.5.0-next.8 shape. Two helpers, assert(cond, category, context) and devAssert(cond, message, context), live in shared/assert.js and fire at ~10 invariant sites today (envelope shape on inbound pubsub frames, registry session-shadow consistency, registry events-channel payload type, secondary-index consistency, lock heartbeat-vs-signal-aborted invariant, etc).
| Mode | assert behavior |
devAssert behavior |
|---|---|---|
Production (NODE_ENV === 'production') |
counter++, structured [extensions/assert] log line, does NOT throw - a thrown exception inside a Redis pubsub callback or a publish hot-path microtask could leave a half-applied transaction or a corrupted local index |
full no-op |
Test (process.env.VITEST or NODE_ENV === 'test') |
counter++, log, and throws so vitest surfaces the failure as a test error | log only, never throws |
| Development (otherwise) | counter++, log, no throw | log only |
devAssert is for cosmetic / DX hints (schema-mismatch warnings, etc); assert is for invariants whose violation indicates corrupted internal state. The DX framing matches the adapter's: hard assertions never fire on healthy code; if they fire, something is genuinely wrong, and the metric + log surface it without taking the worker down.
import { createMetrics } from 'svelte-adapter-uws-extensions/prometheus';
import { wireAssertionMetrics } from 'svelte-adapter-uws-extensions/prometheus';
const metrics = createMetrics();
wireAssertionMetrics(metrics);
After wiring, every assert violation increments extensions_assertion_violations_total{category}. The label cardinality is bounded by the number of distinct categories declared in the source - not user-input-driven.
If a counter goes non-zero in production: file an issue with the category name, the log entries, and a description of the workload. The category names follow the convention <module>.<invariant> (e.g. registry.session-shadow.consistency, pubsub.envelope.shape).
getAssertionCounters() from shared/assert.js returns the live counter Map. Mirrors the adapter's platform.assertions shape - the Map is the live state, not a snapshot, so consumers holding the reference see updates automatically.
import { getAssertionCounters } from 'svelte-adapter-uws-extensions/assert';
if (getAssertionCounters().size > 0) {
console.warn('extensions: invariant violations detected', getAssertionCounters());
}
Every Redis and Postgres extension accepts an optional breaker option - a shared circuit breaker that tracks backend health across all extensions wired to it. When the breaker trips, each extension degrades differently depending on whether the operation is critical or best-effort:
| Extension | Awaited operations (join, consume, publish) | Fire-and-forget operations |
|---|---|---|
| Pub/sub bus | wrap().publish() queues to local platform only; relay to Redis is skipped silently |
Microtask relay flush is skipped entirely |
| Presence | join() / leave() throw CircuitBrokenError |
Heartbeat refresh and stale cleanup are skipped |
| Replay buffer | publish() / replay() / seq() throw CircuitBrokenError |
- |
| Rate limiting | consume() throws CircuitBrokenError (fail-closed - requests are blocked, not allowed through) |
- |
| Broadcast groups | join() / leave() throw CircuitBrokenError |
Heartbeat refresh is skipped |
| Cursor | - | Hash writes and cross-instance relay are skipped; local throttle continues |
| LISTEN/NOTIFY | activate() throws; auto-reconnect retries on its own interval |
- |
The breaker is a three-state machine: healthy (all requests pass through) -> broken after N consecutive failures (all requests fail fast via CircuitBrokenError) -> probing after a timeout (one request is allowed through to test recovery) -> back to healthy on success. See Circuit breaker for configuration.
When Redis pub/sub fails, live streams on other replicas stop receiving updates. Connected clients continue showing stale data with no indication that the stream is degraded. The pub/sub bus emits this directly: when the shared breaker leaves the healthy state, a degraded event fires on the bus's systemChannel (default '__realtime'); when it returns to healthy, a recovered event fires.
// src/lib/server/bus.js
import { createCircuitBreaker } from 'svelte-adapter-uws-extensions/breaker';
import { createPubSubBus } from 'svelte-adapter-uws-extensions/redis/pubsub';
export const breaker = createCircuitBreaker({ failureThreshold: 5, resetTimeout: 30000 });
export const bus = createPubSubBus(redis, {
breaker,
// optional handlers for server-side reactions (logging, alerts):
onDegraded: () => console.warn('pubsub bus degraded'),
onRecovered: () => console.info('pubsub bus recovered')
});
// src/hooks.ws.js
import { bus } from '$lib/server/bus';
// bus.hooks.open subscribes every connection to the systemChannel.
// Without this wiring, degraded / recovered events publish into an
// empty subscriber set and clients never hear about the outage.
export const { open } = bus.hooks;
On the client, subscribe to the __realtime topic and show a banner when the degraded event fires. On recovered, dismiss the banner and refetch stale data. The event payload is { at: <epoch ms> } so a client can show "lost connection 12s ago".
Both the topic name and the auto-emission are configurable:
| Option | Default | Description |
|---|---|---|
systemChannel |
'__realtime' |
Topic used for degraded / recovered events. Set to null or false to disable auto-emission. |
onDegraded |
- | Server-side handler invoked once on the healthy -> non-healthy transition |
onRecovered |
- | Server-side handler invoked once on the non-healthy -> healthy transition |
Auto-emission is local-only - Redis is what's degraded, so the event reaches local clients via the underlying platform without attempting a relay. Each instance reports its own breaker state to its own clients. If you need different semantics (cross-instance forwarding, custom payload, filtering by failure type), use breaker.subscribe(handler) to register your own listener and emit through whichever channel you prefer.
createPublishRateAggregator (svelte-adapter-uws-extensions/redis/publish-rate) gives every instance a cluster-wide view of which topics are hottest across the whole deployment. Each instance broadcasts its own platform.pressure.topPublishers slice on a Redis pub/sub channel; every instance maintains a sliding-window view of all instances' slices and merges them into a cluster-wide top-N. No leader election - each instance is its own aggregator. Storage cost is O(instances * topN) per instance, bounded and small.
// src/lib/server/publish-rate.js
import { redis } from './redis.js';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';
export const aggregator = createPublishRateAggregator(redis, {
publishInterval: 5000,
staleAfter: 12000,
topN: 20
});
// In your open hook (or any startup path with a platform reference):
export async function open(ws, { platform }) {
await aggregator.activate(platform);
}
| Member | Description |
|---|---|
instanceId |
Stable id for this instance, used as the from-tag on outbound slice envelopes. |
topPublishers |
Cluster-wide top publishers, merged from this instance's local slice (read fresh from platform.pressure.topPublishers) and the cached remote slices (stale entries dropped). Sorted descending by messagesPerSec, capped at topN. Each entry is {topic, messagesPerSec, bytesPerSec, contributingInstances}. Pure memory computation. |
rateOf(topic) |
Cluster-wide messagesPerSec for a topic, or 0 if not in the merged top-N. Used by the clusterTopPublisher admission rule. |
subscribersOf(topic) |
Cluster-wide subscriber count for a topic, summed across this instance's live local count (from the optional subjects callback) and cached non-stale remote contributions. Returns 0 when no subjects is wired and no remote instance has reported the topic. The sharded bus's bus.subscribers(topic) delegates here when an aggregator is wired. |
activate(platform) |
Open the subscriber and start the broadcast timer. Idempotent. |
deactivate() |
Stop the timer, drop the subscriber, clear cached slices. |
Channel: {channel} (default: 'uws:pressure:rates')
Payload: {instanceId, ts, slice: [{topic, messagesPerSec, bytesPerSec}, ...], subs?: [{topic, count}, ...]}
Receivers merge into a per-instanceId map keyed on the broadcasting instance; entries older than staleAfter are dropped on the next merge. The subs field is omitted when no subjects callback is configured. Aggregators on either side of a version skew tolerate envelopes with or without subs (forward and backward compatible).
| Option | Default | Description |
|---|---|---|
channel |
'uws:pressure:rates' |
Redis channel for slice broadcasts. |
publishInterval |
5000 |
How often this instance broadcasts its slice (ms). |
staleAfter |
12000 |
Drop a remote instance's slice if no fresher one arrives within this window (ms). Should be at least 2 * publishInterval. |
topN |
20 |
Cap on per-instance slice and merged result. Bounds storage cost. Also caps the subs slice (sorted descending by count). |
subjects |
- | Optional () => Array<{topic, count}> contributor for cluster-wide subscriber counts. Called fresh on every broadcast tick. When wired, the envelope grows a subs field and subscribersOf(topic) returns the merged sum. Pair with the sharded bus via subjects: () => bus.localSubjects(platform). |
breaker |
- | Optional circuit breaker for the publish call. |
metrics |
- | Optional Prometheus metrics registry. |
createAdmissionControl({ aggregator, classes: { hot: { clusterTopPublisher: { threshold } } } }) consults aggregator.rateOf(topic) on every shouldAccept call. Memory-only lookup, no Redis traffic on the hot path. See Cluster-aware shedding.
The aggregator's subjects option is the channel for cluster-wide subscriber counts. Wire the sharded bus's localSubjects(platform) helper as the contributor; the bus exposes a matching bus.subscribers(topic) that delegates to aggregator.subscribersOf(topic):
import { createShardedBus } from 'svelte-adapter-uws-extensions/redis/sharded-pubsub';
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';
const bus = createShardedBus(redis);
const aggregator = createPublishRateAggregator(redis, {
subjects: () => bus.localSubjects(platform)
});
// One option to also pass subscribersAggregator into the bus so
// bus.subscribers(topic) returns the cluster-wide count rather than
// just the local one:
const busWithCluster = createShardedBus(redis, { subscribersAggregator: aggregator });
await bus.activate(platform);
await aggregator.activate(platform);
const cluster = busWithCluster.subscribers('chat:room-7');
// Local count + sum from non-stale remote instances.
Without an aggregator wired, bus.subscribers(topic) returns the local count only - same number platform.subscribers(topic) reports. Eventually-consistent within publishInterval for the remote contribution; the local read is always live. For exact counts (audit log, billing), track a Redis SET cluster-wide on subscribe / unsubscribe and SCARD it.
The unsharded createPubSubBus does not track per-topic state (it forwards every topic through a single Redis channel), so it does not expose localSubjects / subscribers. Apps that need cluster-wide subscriber counts on the unsharded bus thread their own per-topic state into the aggregator's subjects callback.
wireClusterPublishRateMetrics(aggregator, metrics, { topN }) registers two gauges that scrape the merged top-N at collect time:
cluster_topic_publish_rate{topic} - cluster-wide messagesPerSec, summed across instancescluster_topic_publish_bytes{topic} - cluster-wide bytesPerSec, summed across instancesBoth wirers (per-instance via wirePublishRateMetrics, cluster via wireClusterPublishRateMetrics) can be active simultaneously. The local view shows hot-shard pressure; the cluster view shows global capacity.
| Metric | Description |
|---|---|
cluster_publish_rate_broadcasts_total |
Slice envelopes published by this instance. |
cluster_publish_rate_received_total |
Slice envelopes received from sibling instances. |
cluster_publish_rate_parse_errors_total |
Malformed envelopes dropped on receive. |
cluster_publish_rate_instance_count |
Gauge: sibling instances contributing slices (excluding self) at scrape time. Useful for cluster-size monitoring. |
Prevents thundering herd when a backend goes down. When Redis or Postgres becomes unreachable, every extension that uses the breaker fails fast instead of queueing up timeouts, and fire-and-forget operations (heartbeats, relay flushes, cursor broadcasts) are skipped entirely.
Three states:
CircuitBrokenError// src/lib/server/breaker.js
import { createCircuitBreaker } from 'svelte-adapter-uws-extensions/breaker';
export const breaker = createCircuitBreaker({
failureThreshold: 5,
resetTimeout: 30000,
onStateChange: (from, to) => console.log(`circuit: ${from} -> ${to}`)
});
Pass the same breaker to all extensions that share a backend:
import { breaker } from './breaker.js';
export const bus = createPubSubBus(redis, { breaker });
export const presence = createPresence(redis, { breaker, key: 'id' });
export const replay = createReplay(redis, { breaker });
export const limiter = createRateLimit(redis, { points: 10, interval: 1000, breaker });
Failures from any extension contribute to the same breaker. When one trips it, all others fail fast.
| Option | Default | Description |
|---|---|---|
failureThreshold |
5 |
Consecutive failures before breaking |
resetTimeout |
30000 |
Ms before transitioning from broken to probing |
onStateChange |
- | Called on state transitions: (from, to) => void |
| Method / Property | Description |
|---|---|
breaker.state |
'healthy', 'broken', or 'probing' |
breaker.isHealthy |
true only when state is 'healthy' |
breaker.failures |
Current consecutive failure count |
breaker.guard() |
Throws CircuitBrokenError if the circuit is broken |
breaker.success() |
Record a successful operation |
breaker.failure() |
Record a failed operation |
breaker.reset() |
Force back to healthy |
breaker.destroy() |
Clear internal timers |
Awaited operations (join, consume, publish) call guard() before the Redis/Postgres call, success() after, and failure() in the catch block. When the circuit is broken, guard() throws CircuitBrokenError and the operation never reaches the backend.
Fire-and-forget operations (heartbeat refresh, relay flush, cursor broadcast) check isHealthy and skip entirely when the circuit is not healthy. This prevents piling up commands on a dead connection.
import { CircuitBrokenError } from 'svelte-adapter-uws-extensions/breaker';
try {
await replay.publish(platform, 'chat', 'msg', data);
} catch (err) {
if (err instanceof CircuitBrokenError) {
// Backend is down - degrade gracefully
platform.publish('chat', 'msg', data); // local-only delivery
}
}
Pressure-aware companion to the circuit breaker. Where the breaker answers "is the backend up?", admission control answers "are we OK to take more work right now?" - using the adapter's platform.pressure signal (memory, publish rate, subscriber ratio) to gate non-critical work before it ever reaches a backend.
Requires svelte-adapter-uws >= 0.5.0-next.1 (the version that ships platform.pressure).
// src/lib/server/admission.js
import { createAdmissionControl } from 'svelte-adapter-uws-extensions/admission';
export const ac = createAdmissionControl({
classes: {
critical: ['MEMORY'], // refuse only on memory pressure
normal: ['MEMORY', 'PUBLISH_RATE'], // refuse on memory or publish rate
background: ['MEMORY', 'PUBLISH_RATE', 'SUBSCRIBERS'] // refuse on any pressure
}
});
// In a server endpoint or RPC handler:
import { ac } from '$lib/server/admission';
export async function POST({ platform, request }) {
if (!ac.shouldAccept('background', platform)) {
return new Response('busy', { status: 503 });
}
// ...proceed with the request...
}
Each class is independently configured. The adapter has already collapsed concurrent signals (memory, publish rate, subscribers) into a single most-urgent reason - this controller just maps the resolved reason to a per-class accept/reject decision.
A class rule is either an array of pressure reasons that should block this class, or a predicate function:
classes: {
// Array form: block when reason is in this list
critical: ['MEMORY'],
// Predicate form: block when the predicate returns truthy
streaming: (snapshot) => snapshot.subscriberRatio > 50
}
Predicates receive the full PressureSnapshot so they can apply custom thresholds (e.g. block above a specific publish-rate that's tighter than the adapter's). Array form is the simple-case shorthand and is what 90% of callers should use.
Valid reason strings: 'NONE', 'PUBLISH_RATE', 'SUBSCRIBERS', 'MEMORY'. Including 'NONE' in a block list means "always block this class," which is occasionally useful for kill-switching a class without removing the wiring.
| Option | Default | Description |
|---|---|---|
classes |
(required) | Map of class name to admission rule. Must define at least one class. |
metrics |
- | Prometheus metrics registry. |
| Method | Description |
|---|---|
shouldAccept(className, platform) |
Returns true to admit, false to shed. Throws on unknown class name (typo defense) or missing platform.pressure. |
shouldAccept reads platform.pressure via a property access - no I/O, safe to call on every request hot path. The reason-precedence math (memory > publish rate > subscribers > none) lives in the adapter; this method only checks the resolved reason against the configured rule.
Admission control and the circuit breaker check independent signals. Use them together:
export async function POST({ platform, request }) {
// Local pressure check first - cheaper, no Redis call.
if (!ac.shouldAccept('normal', platform)) {
return new Response('busy', { status: 503 });
}
// Then attempt the backend call. CircuitBrokenError surfaces if the
// breaker is open.
try {
await replay.publish(platform, 'chat', 'msg', data);
} catch (err) {
if (err instanceof CircuitBrokenError) {
return new Response('backend unavailable', { status: 503 });
}
throw err;
}
}
The two layers complement each other: admission control prevents new work from piling up under server-local pressure; the breaker prevents thundering-herd retries against a dead backend.
clusterTopPublisher rule)platform.pressure.topPublishers is per-instance. In an N-instance cluster, a topic that's hot across every instance simultaneously looks the same locally as one that's only hot here - but it warrants a different response. The clusterTopPublisher rule consults a createPublishRateAggregator (redis/publish-rate) to shed at the cluster layer:
import { createPublishRateAggregator } from 'svelte-adapter-uws-extensions/redis/publish-rate';
import { createAdmissionControl } from 'svelte-adapter-uws-extensions/admission';
const aggregator = createPublishRateAggregator(redis);
await aggregator.activate(platform);
export const ac = createAdmissionControl({
aggregator,
classes: {
nonCritical: { clusterTopPublisher: { threshold: 5000 } }
}
});
// In a request handler that publishes to a hot topic:
if (!ac.shouldAccept('nonCritical', platform, { topic: 'org:42:audit' })) {
return new Response('busy', { status: 503 });
}
The rule's check is a memory-only lookup (aggregator.rateOf(topic) against the merged top-N); no Redis traffic on the hot path. Rejected admissions surface in admission_rejected_total{class, reason="CLUSTER_TOP_PUBLISHER"}. See Cluster publish-rate aggregator for aggregator setup.
The adapter ships a separate admission layer at the WebSocket handshake path - before any TLS / header work - via the upgradeAdmission option on its wsOptions (and on createTestServer for test harnesses). The two layers operate at different points in the connection lifecycle and are configured independently:
| Layer | Where | Sheds | Configured via |
|---|---|---|---|
| Handshake | Adapter, before res.upgrade() |
Concurrent in-flight upgrades and per-tick handshake budget | wsOptions.upgradeAdmission = { maxConcurrent, perTickBudget } |
| Message / RPC | Extensions, in your handler | Per-class load-shedding against platform.pressure |
createAdmissionControl({ classes }) plus a shouldAccept(...) check |
Wire both for full coverage of "new connections under storm" and "established connections under pressure":
// svelte.config.js (or wherever you configure the adapter)
import adapter from 'svelte-adapter-uws';
export default {
kit: {
adapter: adapter({
websocket: {
upgradeAdmission: { maxConcurrent: 200, perTickBudget: 50 }
}
})
}
};
// In your message / RPC handler
import { ac } from '$lib/server/admission';
export function message(ws, { data, platform }) {
if (!ac.shouldAccept('background', platform)) {
ws.send(JSON.stringify({ error: 'overloaded' }));
return;
}
// ...handle the message...
}
Connections that make it past the handshake are not exempt from message-tier shedding, and message-tier shedding cannot rescue a connection that lost the handshake race - the layers compose without overlap. See the adapter's Layered admission section for the handshake-tier reference.
createFunctionLibrary (svelte-adapter-uws-extensions/redis/functions) is a thin wrapper over Redis 7+ FUNCTION LOAD / FCALL. Versioned, hot-reloadable server-side scripts: ship a new library version and load() swaps it in atomically without an app redeploy.
The library code is plain Lua and must start with #!lua name=<libname> - the wrapper parses the name from the shebang. Inside the library, declare functions via redis.register_function(...). Function names are global on the Redis server (not namespaced by library), which is why call(funcName, ...) keys on function name only.
import { createFunctionLibrary } from 'svelte-adapter-uws-extensions/redis/functions';
const lib = createFunctionLibrary(redis, `#!lua name=ws-presence
redis.register_function('cleanup', function(keys, args)
- args[1] = now (ms), args[2] = ttl (ms)
local now = tonumber(args[1])
local ttl = tonumber(args[2])
local removed = 0
- ... iterate hash fields, HDEL stale ...
return removed
end)
`);
await lib.load();
const removed = await lib.call('cleanup', {
keys: ['presence:room1'],
args: [Date.now(), 90000]
});
await lib.delete(); // FUNCTION DELETE
| Option | Default | Description |
|---|---|---|
metrics |
- | Prometheus metrics registry. |
breaker |
- | Circuit breaker instance. |
| Method / Property | Description |
|---|---|
lib.name |
Library name parsed from the shebang |
lib.load() |
FUNCTION LOAD REPLACE. Runs INFO server on first call and throws on Redis < 7. Idempotent. |
lib.call(funcName, { keys?, args? }) |
FCALL - returns the function's return value |
lib.delete() |
FUNCTION DELETE <libname> |
redis.evalredis.eval is fine for one-off scripts that ship inside the app code. Use createFunctionLibrary when:
eval becomes a measurable cost.Requires Redis 7+. There is no built-in fallback to EVALSHA for older servers because that would require maintaining each function in two forms (library function + plain Lua); on Redis 6 just call redis.eval directly with your own SHA caching.
Operations
All clients listen for the sveltekit:shutdown event and disconnect cleanly by default. You can disable this with autoShutdown: false and manage the lifecycle yourself.
// Manual shutdown
await redis.quit();
await pg.end();
presence.destroy();
When two or more tenants share one Redis (or one Postgres) instance, the defaults will collide silently. The failure mode is the worst-case shape: cross-tenant data leakage on subscribe, cross-tenant cache hits on idempotency keys, cross-tenant lock contention on lock keys. Nothing throws; the wrong tenant just sees the wrong data.
The package gives you the knobs to fix this, but the defaults are tuned for the single-tenant 80% case. Multi-tenant deployments must explicitly opt in. Two namespaces matter: keys and channels. They are separate concerns and need separate handling.
keyPrefix)Every Redis extension that stores state in keys honors a keyPrefix option that stacks on top of the underlying client's keyPrefix. Setting the client's prefix once isolates all key-based extensions for that client at the same time:
import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
const redis = createRedisClient({
url: 'redis://localhost:6379',
keyPrefix: `tenant-${tenantId}:` // every key written by every extension below gets this prefix
});
// All of these now write keys under `tenant-${tenantId}:`...
const lock = createDistributedLock(redis); // keys: tenant-A:lock:...
const sess = createDistributedSession(redis); // keys: tenant-A:sess:...
const idem = createIdempotencyStore(redis); // keys: tenant-A:idem:...
const registry = createConnectionRegistry(redis); // keys: tenant-A:conns:...
Each extension has its own per-extension default sub-prefix (lock:, sess:, idem:, fence:, '' for registry) that stacks after the client's prefix, so two extensions on the same client cannot collide with each other either.
channel / channelPrefix)ioredis's keyPrefix does NOT apply to pub/sub commands (PUBLISH, SUBSCRIBE, SSUBSCRIBE). Redis channels are a separate namespace from keys, and ioredis only rewrites the key-shaped commands. Channels are not auto-prefixed. A createPubSubBus(redis) call on a keyPrefix: 'tenant-A:' client still publishes on the literal channel name uws:pubsub -- the same channel every other tenant on the same Redis is subscribed to.
The three extensions that default to a globally-named channel must be overridden explicitly per tenant:
| Extension | Default | Multi-tenant override |
|---|---|---|
createPubSubBus |
channel: 'uws:pubsub' |
channel: \tenant-${tenantId}:uws:pubsub`` |
createShardedBus |
channelPrefix: 'uws:sharded:' |
channelPrefix: \tenant-${tenantId}:uws:sharded:`` |
createPublishRateAggregator |
channel: 'uws:pressure:rates' |
channel: \tenant-${tenantId}:uws:pressure:rates`` |
const bus = createPubSubBus(redis, {
channel: `tenant-${tenantId}:uws:pubsub`
});
const sharded = createShardedBus(redis, {
channelPrefix: `tenant-${tenantId}:uws:sharded:`
});
The createConnectionRegistry extension is the exception in this category: its channels are built via client.key(keyPrefix + '__registry-events') at the application layer, so a registry-level keyPrefix option DOES isolate registry channels too. Pass createConnectionRegistry(redis, { keyPrefix: \tenant-${tenantId}:` })` if you mount it on a shared client without a client-level prefix.
createShardedBus uses one Redis channel per topic (after the channelPrefix). Topic names are user-supplied. Two tenants that both use a topic literally called 'alerts' -- even with different channelPrefix values -- still land on different channels because the prefix is part of the channel name. But topic names that travel through your client API (e.g. a URL path or a wire field) and a subscriber that re-uses one bus across tenants would re-introduce the collision. Keep one bus per tenant, or build the per-tenant tenancy into the topic name itself if you must share a bus.
The LISTEN/NOTIFY channel name is supplied by the caller (createNotifyBridge(pg, { channel: '...' })). It is NOT auto-prefixed. Multi-tenant deployments must either run one channel per tenant (channel: \tenant-${tenantId}:table_changes``) or have the trigger emit a payload that carries the tenant id and gate the forward path on the subscriber side. The former is cleaner; the latter is necessary only when a single trigger has to fan out to multiple subscribers AND the trigger cannot encode the tenant in the channel name.
If your Redis is configured with multiple databases (the db option on the connection string), each tenant can use a different db number for full isolation without any per-extension overrides. Costs: limited to 16 databases by default, more complex to monitor, and Redis Cluster does not support multiple databases at all. Per-prefix isolation is the recommended approach for any deployment that might scale past 16 tenants or onto Cluster.
// src/lib/server/redis-per-tenant.js
import { createRedisClient } from 'svelte-adapter-uws-extensions/redis';
import { createPubSubBus } from 'svelte-adapter-uws-extensions/redis/pubsub';
import { createDistributedLock } from 'svelte-adapter-uws-extensions/redis/lock';
import { createIdempotencyStore } from 'svelte-adapter-uws-extensions/redis/idempotency';
import { createConnectionRegistry } from 'svelte-adapter-uws-extensions/redis/registry';
export function tenantStack(tenantId) {
// One isolated namespace per tenant - both keys and channels.
const redis = createRedisClient({
url: process.env.REDIS_URL,
keyPrefix: `tenant-${tenantId}:` // isolates KEYS for every extension below
});
const bus = createPubSubBus(redis, {
channel: `tenant-${tenantId}:uws:pubsub` // isolates the bus CHANNEL (keyPrefix doesn't apply to pubsub)
});
const lock = createDistributedLock(redis); // keys auto-prefixed via client
const idem = createIdempotencyStore(redis); // keys auto-prefixed via client
const registry = createConnectionRegistry(redis); // keys + channels auto-prefixed via client
return { redis, bus, lock, idem, registry };
}
Then route every per-request piece of state through the tenant-specific stack:
// hooks.ws.js
import { tenantStack } from '$lib/server/redis-per-tenant.js';
const stacks = new Map();
function stackFor(tenantId) {
let s = stacks.get(tenantId);
if (!s) { s = tenantStack(tenantId); stacks.set(tenantId, s); }
return s;
}
export async function upgrade({ cookies }) {
const { tenantId, userId } = await getSession(cookies);
if (!tenantId) return false;
// ws.getUserData() gets tenantId so every message handler can find its stack
return { tenantId, userId };
}
export async function message(ws, { data, platform }) {
const { tenantId } = ws.getUserData();
const { bus, lock } = stackFor(tenantId);
// ...use bus / lock / idem from THIS tenant's stack only.
}
A handler that pulls extensions from the wrong tenant's stack -- e.g. by reading tenantId from the wire payload instead of from ws.getUserData() -- defeats the isolation. Treat the tenantId on the socket as the trust anchor; ws.getUserData().tenantId is server-trusted because you put it there in upgrade(). The same trust contract as live.push (Trust model) applies here.
Per-tenant prefixes add a handful of bytes to every key name and every published message. Redis pricing is dominated by op count and memory, not channel-name length, so the overhead is unmeasurable on production workloads. The bigger cost is operational: per-tenant prefixes require operator discipline to apply consistently, and a missed override surfaces only as silent cross-tenant data. The default-permissive shape is a deliberate trade -- single-tenant deployments are the 80% case; multi-tenant deployments are an explicit opt-in.
This repo runs tests in two layers. Both stay green; you can run either independently.
npm test # mock layer (24 files, 861 tests, no services needed)
npm run test:integration # integration layer (real Redis 7 + Postgres 16 in Docker)
test/)In-memory mocks for Redis and Postgres that mirror the public APIs closely enough to drive the extensions through their happy paths and edge cases. Fast feedback (~15s for the full suite), no Docker required.
The mocks live at testing/mock-redis.js and testing/mock-pg.js and are exported as svelte-adapter-uws-extensions/testing so consumers of this package can use them too. See Testing your own code below.
test/integration/)Exercises the same modules against real services in docker compose. Picks up cases the mocks can only approximate: Lua atomicity inside Redis EVAL, Postgres LISTEN/NOTIFY cross-connection delivery, real TTL/EXPIRE behaviour, partial-index plans on the job queue.
The compose stack at test/integration/docker-compose.yml binds non-default host ports so it does not clash with a locally running Postgres/Redis: Postgres on 55432, Redis on 56379. test/integration/global-setup.js runs docker compose up -d --wait before the suite, exports INTEGRATION_REDIS_URL / INTEGRATION_POSTGRES_URL for the tests to read, and tears the stack down with docker compose down -v afterwards.
The host ports and compose project name are env-var overridable for running multiple stacks side-by-side on the same machine:
INTEGRATION_REDIS_HOST_PORT=56380 \
INTEGRATION_POSTGRES_HOST_PORT=55433 \
INTEGRATION_COMPOSE_PROJECT=my-slice \
npm run test:integration
Project name auto-derives from the port pair when overridden, so unique ports also mean unique container names.
Drop a *.test.js file under test/integration/redis/ or test/integration/postgres/.
In beforeAll, build a real client:
import { createRedisClient } from '../../../redis/index.js';
// or: import { createPgClient } from '../../../postgres/index.js';
beforeAll(() => {
client = createRedisClient({
url: process.env.INTEGRATION_REDIS_URL,
keyPrefix: 'inttest-yourmodule:', // namespace per test file
autoShutdown: false // tests own the lifecycle
});
});
In beforeEach, wipe state under your prefix (Redis: SCAN MATCH prefix* + UNLINK; Postgres: TRUNCATE your test tables, or use distinct channels for LISTEN/NOTIFY).
In afterAll, await client.quit() / await client.end().
The integration layer is additive: the mock-based test for a module stays in place when you add the integration counterpart. They cover different failure modes.
The svelte-adapter-uws-extensions/testing entry point exports the same in-memory mocks used by the extensions' own test suite. Use them to test your extension-consuming code without running Redis or Postgres:
import { mockRedisClient, mockPlatform, mockWs } from 'svelte-adapter-uws-extensions/testing';
import { createPresence } from 'svelte-adapter-uws-extensions/redis/presence';
import { createRateLimit } from 'svelte-adapter-uws-extensions/redis/ratelimit';
import { describe, it, expect } from 'vitest';
describe('presence', () => {
it('tracks users across topics', async () => {
const client = mockRedisClient();
const platform = mockPlatform();
const presence = createPresence(client, { key: 'id' });
const ws = mockWs({ id: 'user-1', name: 'Alice' });
await presence.join(ws, 'room:lobby', platform);
expect(await presence.count('room:lobby')).toBe(1);
expect(platform.published.some(p => p.event === 'join')).toBe(true);
presence.destroy();
});
});
describe('rate limiting', () => {
it('blocks after exhausting points', async () => {
const client = mockRedisClient();
const limiter = createRateLimit(client, { points: 3, interval: 10000 });
const ws = mockWs({ remoteAddress: '1.2.3.4' });
for (let i = 0; i < 3; i++) {
expect((await limiter.consume(ws)).allowed).toBe(true);
}
expect((await limiter.consume(ws)).allowed).toBe(false);
});
});
| Export | What it mocks | Supports |
|---|---|---|
mockRedisClient(prefix?) |
createRedisClient() |
Strings, hashes, sorted sets, pub/sub, pipelines, scan, Lua eval for all extension scripts |
mockPlatform() |
Platform API | publish(), send(), batch(), topic() - records all calls in .published and .sent |
mockWs(userData?) |
uWS WebSocket | subscribe(), unsubscribe(), getUserData(), getBufferedAmount(), close() |
mockPgClient() |
createPgClient() |
SQL parsing for replay buffer operations, sequence counters |
The circuit breaker (createCircuitBreaker()) is pure logic with no I/O - use it directly in tests, no mock needed.
svelte-adapter-uws-extensions/testing also re-exports the curated wire-protocol helpers and userData slot constants the adapter exposes from svelte-adapter-uws/testing, so test code asserting on the wire format or per-connection state has one import location alongside the mocks:
import {
// Wire-protocol helpers
esc, completeEnvelope, wrapBatchEnvelope, isValidWireTopic, createScopedTopic,
// Behavior helpers
collapseByCoalesceKey, resolveRequestId, createChaosState,
// userData slot constants
WS_SUBSCRIPTIONS, WS_COALESCED, WS_SESSION_ID, WS_PENDING_REQUESTS,
WS_STATS, WS_PLATFORM, WS_CAPS, WS_REQUEST_ID_KEY,
// Plus the in-memory mocks
mockRedisClient, mockPlatform, mockWs, mockPgClient
} from 'svelte-adapter-uws-extensions/testing';
The re-exported names are the exact same identities as the adapter source (expect(extensionsTesting.wrapBatchEnvelope).toBe(adapterTesting.wrapBatchEnvelope)); a surface-lock test in this package pins the set so a future adapter refactor that drops one fails here. See the adapter's testing entry point docs for the per-helper reference.
createTestServer is intentionally not re-exported - it boots a real uWebSockets.js instance, which is the adapter's responsibility; import it directly from svelte-adapter-uws/testing if you need it.
The adapter's __chaos harness on createTestServer covers the WS-frame outbound path (drop / delay frames going to connected clients). It does not reach traffic on other transports - ioredis, pg, NATS, custom HTTP backends - because each of those goes through its own client, not through the test server's outbound chokepoint. To inject faults at those wires, wrap the transport client in a chaos proxy: createChaosState is re-exported above and composes with any client method via a small Proxy wrapper. See the adapter's Wrap your own transport for cross-wire chaos section for the pattern. The __chaos JSDoc on the adapter's testing.d.ts names the WS-only scope explicitly so tests reaching for cross-wire coverage see the boundary at the type level.
MIT