Complete and type-safe Server-Sent Events (SSE) implementation for Svelte 5 and SvelteKit, with automatic reconnection, reactive state, multiple topics support, and TypeScript.
$state, $effect)Get SSE working in your SvelteKit project in 3 simple steps:
Create a file at src/routes/api/events/+server.ts:
import type { RequestHandler } from './$types';
// Define your event types
interface SSETopics {
notification: { message: string; timestamp: number };
counter: { count: number };
}
export const GET: RequestHandler = () => {
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
// Keep-alive mechanism
const keepAlive = setInterval(() => {
controller.enqueue(encoder.encode(': keep-alive\n\n'));
}, 15000);
// Send a notification every 3 seconds
let count = 0;
const interval = setInterval(() => {
const notification = {
message: `Notification #${count}`,
timestamp: Date.now()
};
const payload = `event: notification\ndata: ${JSON.stringify(notification)}\n\n`;
controller.enqueue(encoder.encode(payload));
count++;
}, 3000);
// Cleanup on disconnect
return () => {
clearInterval(keepAlive);
clearInterval(interval);
};
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
};
Create a file at src/lib/hooks/sse.hook.svelte.ts:
interface SSEOptions<TTopics> {
topics: { [K in keyof TTopics]?: (data: TTopics[K]) => void };
autoConnect?: boolean;
debug?: boolean;
}
export class SSEClient<TTopics extends Record<string, any>> {
status = $state<'idle' | 'connecting' | 'connected' | 'error'>('idle');
error = $state<Error | null>(null);
#eventSource: EventSource | null = null;
#baseURL: string;
#options: SSEOptions<TTopics>;
constructor(baseURL: string, options: SSEOptions<TTopics>) {
this.#baseURL = baseURL;
this.#options = { autoConnect: true, ...options };
if (this.#options.autoConnect) {
$effect(() => {
this.connect();
return () => this.close();
});
}
}
connect = () => {
if (this.#eventSource) return;
this.status = 'connecting';
this.#eventSource = new EventSource(this.#baseURL);
this.#eventSource.onopen = () => {
this.status = 'connected';
this.error = null;
};
this.#eventSource.onerror = () => {
this.status = 'error';
this.error = new Error('Connection lost');
};
// Subscribe to topics
for (const [topic, callback] of Object.entries(this.#options.topics)) {
this.#eventSource.addEventListener(topic, (event: MessageEvent) => {
const data = JSON.parse(event.data);
callback?.(data);
});
}
};
close = () => {
this.#eventSource?.close();
this.#eventSource = null;
this.status = 'idle';
};
}
<script lang="ts">
import { SSEClient } from '$lib/hooks/sse.hook.svelte';
interface SSETopics {
notification: { message: string; timestamp: number };
}
let notifications = $state<{ message: string; timestamp: number }[]>([]);
const stream = new SSEClient<SSETopics>('/api/events', {
topics: {
notification: (data) => {
notifications.push(data);
notifications = notifications.slice(-5); // Keep last 5
}
},
debug: true
});
</script>
<div>
<h2>Connection: {stream.status}</h2>
{#each notifications as notif}
<div class="notification">
{notif.message} - {new Date(notif.timestamp).toLocaleTimeString()}
</div>
{/each}
</div>
That's it! You now have a working SSE implementation. For production use with advanced features like message replay and dynamic topics, see the full implementation in this repository.
This implementation follows modern web development patterns:
$state, $effect, $derived) for automatic UI updatesThis implementation goes beyond basic SSE with several production-ready features:
When a client reconnects (due to network issues or manual reconnection), the server automatically replays any missed messages:
// Server automatically handles replay
// Client receives:
// id: 42
// event: message
// data: {"text": "Hello"}
//
// On reconnect with Last-Event-ID: 42
// Client receives messages 43, 44, 45...
When clients dynamically add new topics to an existing connection, the server detects this and handles it intelligently:
// Initial connection: topics = ["chat"]
// User enables notifications: topics = ["chat", "notifications"]
// Server detects "notifications" is NEW
// โ Replays ALL notification history
// โ Replays only MISSED chat messages
This prevents data loss when users toggle features on/off.
Add or remove topics without losing your connection state:
// Start with chat only
const stream = new SSEClient<Topics>("/api/events", {
topics: {
chat: (msg) => console.log(msg)
}
});
// Later, add notifications dynamically
stream.updateTopics({
addTopics: ["notifications"],
// Server will replay any missed notifications automatically
});
// Remove a topic
stream.updateTopics({
removeTopics: ["chat"]
});
Track activity on each topic in real-time:
const stream = new SSEClient<Topics>("/api/events", {
topics: {
chat: (msg) => console.log(msg),
notifications: (notif) => console.log(notif)
}
});
// Access reactive counters
console.log(stream.topicCounters.chat); // 42
console.log(stream.topicCounters.notifications); // 7
Perfect for showing unread counts or activity indicators in the UI.
Server maintains a ring buffer of recent messages per session:
// Server-side session management
const session = {
id: sessionID,
history: [], // Ring buffer of recent messages
emitter: null, // Current SSE emitter
};
// Automatically persists messages
function emitWithHistory({ event, data }) {
const message = {
id: globalSequenceID++,
topic: event,
data: data,
timestamp: Date.now(),
};
session.history.push(message);
emit({ event, data, id: message.id });
}
This enables:
sequenceDiagram
participant C as Client
participant S as Server
participant H as History Buffer
Note over C,S: Initial Connection
C->>S: GET /api/events?topics=chat
S->>H: Create session history
S-->>C: 200 OK (SSE stream)
S->>H: Store message #1
S->>C: id: 1, event: chat, data: {...}
S->>H: Store message #2
S->>C: id: 2, event: chat, data: {...}
Note over C,S: Connection Lost
C-xS: Network error
S->>H: Store message #3
S->>H: Store message #4
Note over C,S: Reconnect with Last-Event-ID
C->>S: GET /api/events?topics=chat<br/>Last-Event-ID: 2
S->>H: Query messages since ID 2
H-->>S: Messages 3, 4
S->>C: id: 3, event: chat, data: {...}
S->>C: id: 4, event: chat, data: {...}
Note over C: No messages lost!
Note over C,S: Add New Topic
C->>C: User enables notifications
C->>S: GET /api/events?topics=chat,notifications<br/>lastEventID: 4
S->>S: analyzeTopicSafety()
Note right of S: "notifications" is NEW<br/>"chat" is SAFE
S->>H: Get ALL notifications
S->>H: Get chat messages since 4
S->>C: Full notification history
S->>C: Missed chat messages
| Browser | Support |
|---|---|
| Chrome | โ All versions |
| Firefox | โ All versions |
| Safari | โ All versions |
| Edge | โ All versions |
| Opera | โ All versions |
| IE 11 | โ Not supported |
Note for IE support: Use the event-source-polyfill package:
pnpm add event-source-polyfill
// src/lib/hooks/sse.hook.svelte.ts
import { EventSourcePolyfill } from "event-source-polyfill";
// Replace native EventSource with polyfill
const EventSourceClass = typeof EventSource !== "undefined"
? EventSource
: EventSourcePolyfill;
this.#eventSource = new EventSourceClass(url.toString());
Server-Sent Events (SSE) is a web technology that enables servers to push real-time updates to clients over a persistent HTTP connection. It's part of the HTML5 standard and provides a simple, efficient way to stream data from server to client.
| Feature | SSE | WebSocket | Long Polling |
|---|---|---|---|
| Direction | Server โ Client only | Bidirectional | Client โ Server (request) |
| Protocol | HTTP | WebSocket (ws://) | HTTP |
| Complexity | Simple | Complex | Very Simple |
| Auto-reconnect | โ Built-in | โ Manual | โ Manual |
| Event Types | โ Named events | โ Raw messages | โ Raw messages |
| Browser Support | โ All modern | โ All modern | โ Universal |
| Firewall/Proxy | โ Compatible | โ ๏ธ May block | โ Compatible |
| Overhead | Low | Very Low | High (repeated requests) |
| Best for | Real-time updates | Real-time chat, games | Simple polling |
sequenceDiagram
autonumber
participant C as Client (SSEClient)
participant B as Browser (EventSource API)
participant S as Server (produceSSE)
Note over C,S: Initial Connection
C->>C: new SSEClient() with topics
C->>B: Create EventSource with topics in URL
B->>+S: GET /api/events?topics=chat&topics=notifications
S-->>B: 200 OK (Content-Type: text/event-stream)
Note over B,S: Persistent Connection Established
S->>B: event: chat\ndata: {"text":"Hello"}\n\n
B->>C: Trigger "chat" event listener
C->>C: Update $state reactively
S->>B: : keep-alive\n\n
Note right of S: Every 15s to prevent timeout
rect rgba(100, 200, 100, 0.1)
Note over S: Server emits notification
S->>B: event: notifications\ndata: {"message":"Alert!"}\n\n
B->>C: Trigger "notifications" listener
C->>C: Update $state
end
Note over C,S: Connection Lost (Network/Server Issue)
S-xB: Connection dropped
B->>C: onerror triggered
C->>C: status = "error"
Note right of C: Wait reconnectWait (3s)
C->>B: Reconnect automatically
B->>S: GET /api/events?topics=chat&topics=notifications
S-->>B: Connection restored
Note over B,S: Client receives missed events (if implemented)
Note over C,S: Manual Cleanup
C->>B: close()
B->>S: Close connection
S->>S: cleanup() called
SSE uses a simple text-based protocol. Messages are sent as UTF-8 text with specific field formats:
event: notification
data: {"id": "123", "message": "Hello World"}
: This is a comment (keep-alive)
event: message
data: {"text": "Multi-line messages"}
data: {"can": "span multiple data fields"}
Field types:
event: โ Event name (e.g., "notification", "message")data: โ Event data (usually JSON)id: โ Event ID for resumption (this project doesn't use it yet)retry: โ Reconnection time in milliseconds: โ Comment (used for keep-alive)Each message ends with two newlines (\n\n).
Before you begin, make sure you have installed:
git clone https://github.com/gustavomorinaga/sveltekit-sse.git
cd sveltekit-sse
pnpm install
pnpm dev
The project will be available at:
pnpm dev
pnpm build
pnpm preview
pnpm dev โ starts development serverpnpm build โ creates production buildpnpm preview โ previews production buildpnpm check โ checks TypeScript and Svelte typespnpm check:watch โ checks types in watch modepnpm format โ formats code with Ultracitepnpm lint โ runs linting with Ultracitesveltekit-sse/
โโ src/
โ โโ lib/
โ โ โโ components/ # Svelte components
โ โ โ โโ chat/
โ โ โ โ โโ chat.component.svelte # Interactive chat UI
โ โ โ โ โโ index.ts # Component exports
โ โ โ โโ notifications/
โ โ โ โ โโ notifications.component.svelte # Notifications display
โ โ โ โ โโ index.ts
โ โ โ โโ toolbar/
โ โ โ โโ toolbar.component.svelte # Connection status toolbar
โ โ โ โโ index.ts
โ โ โโ contexts/ # Svelte 5 contexts
โ โ โ โโ events.context.svelte.ts # Global SSE state management
โ โ โโ hooks/ # Custom Svelte hooks
โ โ โ โโ sse.hook.svelte.ts # Reactive SSE client class
โ โ โโ mock/ # Mock data for demo
โ โ โ โโ chat.mock.ts # Story script data
โ โ โ โโ notifications.mock.ts # Sample notifications
โ โ โโ server/ # Server-side utilities
โ โ โ โโ sse.ts # SSE response producer
โ โ โ โโ story-engine.ts # Chat story state machine
โ โ โโ ts/ # TypeScript definitions
โ โ โโ chat.ts # Chat message types
โ โ โโ notification.ts # Notification types
โ โ โโ sse-topics.ts # SSE topics map (type-safe)
โ โ โโ index.ts # Type exports
โ โโ routes/ # SvelteKit routes
โ โ โโ +layout.svelte # Root layout
โ โ โโ +page.svelte # Home page (demo)
โ โ โโ layout.css # Global styles
โ โ โโ api/ # API endpoints
โ โ โโ chat/
โ โ โ โโ +server.ts # HTTP POST for chat actions
โ โ โโ events/
โ โ โโ +server.ts # SSE endpoint (main)
โ โโ app.d.ts # TypeScript app definitions
โ โโ app.html # HTML template
โโ static/ # Static assets
โ โโ assets/ # Images, fonts, etc.
โ โโ robots.txt
โโ .nvmrc # Node version specification
โโ biome.jsonc # Biome (linter/formatter) config
โโ package.json # Dependencies and scripts
โโ pnpm-lock.yaml # pnpm lock file
โโ pnpm-workspace.yaml # pnpm workspace configuration
โโ svelte.config.js # SvelteKit configuration
โโ tsconfig.json # TypeScript configuration
โโ vite.config.ts # Vite configuration
โโ README.md # This documentation
| File | Purpose |
|---|---|
src/lib/hooks/sse.hook.svelte.ts |
Core SSE client โ Reactive EventSource wrapper with auto-reconnect |
src/lib/server/sse.ts |
Server producer โ Creates SSE Response with keep-alive |
src/lib/contexts/events.context.svelte.ts |
Global state โ Manages SSE connection and data across components |
src/lib/ts/sse-topics.ts |
Type safety โ Defines all SSE topics and their data shapes |
src/routes/api/events/+server.ts |
Main SSE endpoint โ Streams multiple topics (chat, notifications) |
src/routes/api/chat/+server.ts |
Chat actions โ HTTP POST endpoint for user interactions |
src/lib/server/story-engine.ts |
Story logic โ State machine for interactive chat demo |
src/lib/components/*/ |
UI components โ Reusable Svelte 5 components with runes |
This implementation uses a topics-based approach where you define a map of event types and their corresponding data shapes:
// Define your topics and their data types
interface SSETopicsMap {
chat: { id: string; text: string; sender: string };
notification: { id: string; message: string; type: "info" | "error" };
progress: { percent: number; taskId: string };
}
import { SSEClient } from "$lib/hooks/sse.hook.svelte";
const client = new SSEClient<SSETopicsMap>("/api/events", {
topics: {
// Subscribe to specific topics with callbacks
chat: (data) => {
console.log("New chat message:", data.text);
},
notification: (data) => {
console.log(`${data.type}: ${data.message}`);
},
// You can subscribe to some topics and ignore others
},
reconnectWait: 3000, // Wait time for reconnection (default: 3000ms)
autoConnect: true, // Connect automatically (default: true)
debug: false, // Enable console logs (default: false)
});
interface SSEOptions<TTopics> {
// Map of topic names to their callback functions
topics: {
[K in keyof TTopics]?: (data: TTopics[K]) => void;
};
// Automatically connect on instantiation
autoConnect?: boolean; // default: true
// Enable debug logging in console
debug?: boolean; // default: false
}
The client exposes reactive properties using Svelte 5 runes:
// Connection status (reactive)
client.status // "idle" | "connecting" | "connected" | "error"
// Error object if status is "error" (reactive)
client.error // Error | null
// Last global sequence ID received (reactive)
client.lastEventID // string | null
// Per-topic message counters (reactive)
client.topicCounters // Record<string, number>
// Example: { chat: 42, notifications: 7 }
// Manually connect (if autoConnect: false)
client.connect();
// Disconnect and cleanup
client.close();
// Dynamically update subscribed topics
client.updateTopics({
addTopics?: ["newTopic1", "newTopic2"],
removeTopics?: ["oldTopic"],
// OR completely replace:
nextTopics?: { topic1: callback1, topic2: callback2 }
});
const stream = new SSEClient<TopicsMap>("/api/events", {
topics: {
chat: (msg) => console.log("Chat:", msg)
}
});
// Later, user enables notifications
stream.updateTopics({
addTopics: ["notifications"]
});
// Server will automatically replay missed notifications
// User disables chat
stream.updateTopics({
removeTopics: ["chat"]
});
// Or replace everything at once
stream.updateTopics({
nextTopics: {
notifications: (notif) => console.log("Notification:", notif),
logs: (log) => console.log("Log:", log)
}
});
<script lang="ts">
import { SSEClient } from "$lib/hooks/sse.hook.svelte";
interface Message {
id: string;
text: string;
timestamp: number;
}
interface Notification {
id: string;
message: string;
type: "info" | "error";
}
interface TopicsMap {
message: Message;
notification: Notification;
}
let messages = $state<Message[]>([]);
let notifications = $state<Notification[]>([]);
// Subscribe to multiple topics in one connection
const stream = new SSEClient<TopicsMap>("/api/stream", {
topics: {
message: (msg) => {
messages = [...messages, msg];
},
notification: (notif) => {
notifications = [notif, ...notifications].slice(0, 5); // Keep last 5
},
},
debug: true, // See logs in console
});
</script>
<div class="dashboard">
<!-- Connection Status Indicator -->
<div class="status" data-status={stream.status}>
{#if stream.status === "connected"}
๐ข Connected
{:else if stream.status === "connecting"}
๐ก Connecting...
{:else if stream.status === "error"}
๐ด Error: {stream.error?.message}
<button onclick={stream.connect}>Retry</button>
{:else}
โช Disconnected
<button onclick={stream.connect}>Connect</button>
{/if}
</div>
<!-- Messages List -->
<section>
<h2>Messages</h2>
{#each messages as message (message.id)}
<div class="message">
{message.text}
<small>{new Date(message.timestamp).toLocaleTimeString()}</small>
</div>
{/each}
</section>
<!-- Notifications List -->
<section>
<h2>Notifications</h2>
{#each notifications as notification (notification.id)}
<div class="notification" data-type={notification.type}>
{notification.message}
</div>
{/each}
</section>
</div>
<style>
.status[data-status="connected"] { color: green; }
.status[data-status="error"] { color: red; }
.notification[data-type="error"] { background: #fee; }
</style>
For global state management across components:
// events.context.svelte.ts
import { getContext, setContext } from "svelte";
import { SSEClient } from "$lib/hooks/sse.hook.svelte";
interface TopicsMap {
message: { text: string };
notification: { message: string };
}
class EventsContext {
messages = $state<string[]>([]);
readonly stream = new SSEClient<TopicsMap>("/api/events", {
topics: {
message: (data) => this.messages.push(data.text),
notification: (data) => alert(data.message),
},
});
}
const KEY = Symbol("events");
export function setEventsContext() {
return setContext(KEY, new EventsContext());
}
export function getEventsContext() {
return getContext<EventsContext>(KEY);
}
<!-- +layout.svelte -->
<script lang="ts">
import { setEventsContext } from "$lib/contexts/events.context.svelte";
setEventsContext();
</script>
<slot />
<!-- +page.svelte -->
<script lang="ts">
import { getEventsContext } from "$lib/contexts/events.context.svelte";
const { messages, stream } = getEventsContext();
</script>
<div>
Status: {stream.status}
{#each messages as message}
<p>{message}</p>
{/each}
</div>
The produceSSE function creates a Server-Sent Events response stream with automatic keep-alive and proper cleanup.
// src/routes/api/events/+server.ts
import { produceSSE } from "$lib/server/sse";
interface TopicsMap {
message: { id: string; text: string };
notification: { id: string; message: string; type: string };
}
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
// Send events periodically
const interval = setInterval(() => {
emit({
event: "message",
data: {
id: crypto.randomUUID(),
text: "New message!",
},
id: String(Date.now()) // Optional: sequence ID for replay
});
}, 1000);
// Cleanup function (called when connection closes)
return () => {
clearInterval(interval);
console.log("Client disconnected");
};
});
};
interface SSEEmitOptions<TTopics, K extends keyof TTopics> {
/** The event/topic name */
event: K;
/** The event data payload */
data: TTopics[K];
/** Optional unique ID for message recovery on reconnection */
id?: string;
}
type SSEEmitter<TTopics> = <K extends keyof TTopics>(
options: SSEEmitOptions<TTopics, K>
) => void;
type SSEProducer<TTopics> = (
emit: SSEEmitter<TTopics>
) => () => void;
function produceSSE<TTopics>(
producer: SSEProducer<TTopics>
): Response
Parameters:
producer: Function that receives an emit callback and returns a cleanup functionemit({ event, data, id? }): Sends a typed event to the client with optional sequence IDreturn: Cleanup function executed when the connection is closedReturns:
Response: HTTP response with configured SSE stream and headers:Content-Type: text/event-streamCache-Control: no-cache, no-transformConnection: keep-aliveAutomatic Features:
: keep-alive\n\n)For production use, implement message replay to handle reconnections:
import { produceSSE } from "$lib/server/sse";
import {
createEmitWithHistory,
replayMissedMessages,
setTopicSubscription
} from "$lib/server/sse-helpers";
export const GET = ({ cookies, request, url }) => {
const requestedTopics = url.searchParams.getAll("topics");
const lastEventID =
request.headers.get("last-event-id") ??
url.searchParams.get("lastEventID");
let sessionID = cookies.get("session_id");
if (!sessionID) {
sessionID = crypto.randomUUID();
cookies.set("session_id", sessionID, { path: "/" });
}
return produceSSE<TopicsMap>((emit) => {
// Wrap emitter to automatically track history
const emitWithHistory = createEmitWithHistory({ sessionID, emit });
// Replay missed messages on reconnection
if (lastEventID) {
replayMissedMessages({
sessionID,
lastEventID,
requestedTopics,
emit
});
}
// Remember current topics for next reconnection
setTopicSubscription(sessionID, requestedTopics);
// Now emit using the history-aware wrapper
const interval = setInterval(() => {
emitWithHistory({
event: "message",
data: { text: "Hello" }
// ID is auto-assigned by createEmitWithHistory
});
}, 1000);
return () => clearInterval(interval);
});
};
createEmitWithHistoryWraps your emitter to automatically persist messages to a session history buffer with global sequence IDs:
const emitWithHistory = createEmitWithHistory({ sessionID, emit });
// Every call is automatically stored with an ID
emitWithHistory({ event: "chat", data: { text: "Hello" } });
// โ Stored as { id: "123", topic: "chat", data: {...}, timestamp: ... }
// โ Sent to client with id: 123
replayMissedMessagesIntelligently replays messages based on topic safety analysis:
replayMissedMessages({
sessionID,
lastEventID: "42",
requestedTopics: ["chat", "notifications"], // notifications is NEW
emit
});
// Behavior:
// - "chat" was already subscribed โ replay only messages > 42
// - "notifications" is new โ replay ALL notification history
analyzeTopicSafetyDetermines which topics are new vs. previously subscribed:
const { safeTopics, newTopics } = analyzeTopicSafety(
sessionID,
["chat", "notifications"]
);
// safeTopics: ["chat"] - was in previous connection
// newTopics: ["notifications"] - newly added
Use SvelteKit's RequestEvent parameter to access cookies, headers, URL params, etc.:
import type { RequestEvent } from "@sveltejs/kit";
export const GET = ({ cookies, url, request, locals }: RequestEvent) => {
// Get requested topics from URL query params
const topics = url.searchParams.getAll("topics");
// Access authentication
const userId = locals.user?.id;
// Get session
const sessionId = cookies.get("session_id");
return produceSSE<TopicsMap>((emit) => {
// Only send events for requested topics
if (topics.includes("notifications")) {
const interval = setInterval(() => {
emit({
event: "notifications",
data: {
userId,
message: "New notification",
}
});
}, 5000);
return () => clearInterval(interval);
}
return () => {}; // Empty cleanup if no topics
});
};
interface TopicsMap {
chat: { id: string; text: string; sender: string };
notifications: { id: string; message: string };
metrics: { cpu: number; memory: number };
}
export const GET = ({ url }) => {
const requestedTopics = url.searchParams.getAll("topics");
return produceSSE<TopicsMap>((emit) => {
const cleanupFunctions: Array<() => void> = [];
// Chat messages (if requested)
if (requestedTopics.includes("chat")) {
const unsubscribeChat = subscribeToChat((message) => {
emit({ event: "chat", data: message });
});
cleanupFunctions.push(unsubscribeChat);
}
// Notifications (if requested)
if (requestedTopics.includes("notifications")) {
const unsubscribeNotif = subscribeToNotifications((notif) => {
emit({ event: "notifications", data: notif });
});
cleanupFunctions.push(unsubscribeNotif);
}
// System metrics (if requested)
if (requestedTopics.includes("metrics")) {
const metricsInterval = setInterval(async () => {
const metrics = await getSystemMetrics();
emit({ event: "metrics", data: metrics });
}, 5000);
cleanupFunctions.push(() => clearInterval(metricsInterval));
}
// Cleanup all subscriptions
return () => {
cleanupFunctions.forEach(fn => fn());
};
});
};
import { db } from "$lib/server/db";
interface TopicsMap {
order: {
id: string;
status: "pending" | "completed";
total: number;
};
}
export const GET = ({ locals }) => {
if (!locals.user) {
return new Response("Unauthorized", { status: 401 });
}
return produceSSE<TopicsMap>((emit) => {
// Subscribe to database changes
const unsubscribe = db.orders
.where("userId", "==", locals.user.id)
.onSnapshot((snapshot) => {
snapshot.docChanges().forEach((change) => {
if (change.type === "modified" || change.type === "added") {
const order = change.doc.data();
emit({
event: "order",
data: {
id: order.id,
status: order.status,
total: order.total,
}
});
}
});
});
return () => {
unsubscribe();
console.log(`User ${locals.user.id} disconnected`);
};
});
};
interface TopicsMap {
payment: {
orderId: string;
status: "success" | "failed";
amount: number;
};
}
// Global event emitter for webhooks
const paymentEmitter = new EventEmitter();
// Webhook endpoint
export const POST = async ({ request }) => {
const webhook = await request.json();
// Emit to all connected SSE clients
paymentEmitter.emit("payment", {
orderId: webhook.orderId,
status: webhook.status,
amount: webhook.amount,
});
return json({ received: true });
};
// SSE endpoint
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
const handler = (data: TopicsMap["payment"]) => {
emit({ event: "payment", data });
};
paymentEmitter.on("payment", handler);
return () => {
paymentEmitter.off("payment", handler);
};
});
};
This project includes a complete demo showcasing SSE capabilities with three real-time features running simultaneously:
An interactive text adventure that demonstrates:
// src/routes/api/events/+server.ts
export const GET = ({ cookies, request, url }) => {
const requestedTopics = url.searchParams.getAll("topics");
const lastEventID =
request.headers.get("last-event-id") ?? url.searchParams.get("lastEventID");
let sessionID = cookies.get("story_session");
if (!sessionID) {
sessionID = crypto.randomUUID();
cookies.set("story_session", sessionID, { path: "/", httpOnly: true });
}
const connectionType = lastEventID ? "RECONNECT" : "NEW";
console.log(`[SSE] ${connectionType} connection for session ${sessionID}`);
return produceSSE<SSETopicsMap>((emit) => {
const session = getSession(sessionID);
const emitWithHistory = createEmitWithHistory({ sessionID, emit });
session.emitter = emitWithHistory;
// Replay missed messages if reconnecting (topic-safety-aware)
if (lastEventID) {
replayMissedMessages({ sessionID, lastEventID, requestedTopics, emit });
}
// Persist current topic subscription for next reconnect
setTopicSubscription(sessionID, requestedTopics);
// Handle chat topics if requested
const chatTopics = ["message", "prompt", "end", "history"];
const isChatTopicRequested = requestedTopics.some((topic) =>
chatTopics.includes(topic)
);
if (isChatTopicRequested) {
handleChatTopics({ sessionID, lastEventID, emitWithHistory, request });
}
// Setup polling for notifications and logs
const notificationsInterval = setupNotificationsPolling({
requestedTopics,
emitWithHistory,
});
const logsInterval = setupLogsPolling({ requestedTopics, emitWithHistory });
// Cleanup on disconnect
return () => {
if (session.timeoutID) clearTimeout(session.timeoutID);
clearInterval(notificationsInterval);
clearInterval(logsInterval);
};
});
};
// src/lib/server/story-engine.ts
interface PlayerSession {
step: number;
timeoutID?: ReturnType<typeof setTimeout>;
emitter: SSEEmitter<SSETopicsMap> | null;
history: ChatMessage[];
}
export function playStory(sessionId: string) {
const session = getSession(sessionId);
if (!session?.emitter) return;
const node = SCRIPT[session.step];
if (node.type === "npc") {
// NPC message - send and auto-advance after delay
const message: ChatMessage = {
id: crypto.randomUUID(),
sender: node.name,
text: node.text,
};
session.history.push(message);
session.emitter({ event: "message", data: message });
session.step++;
session.timeoutID = setTimeout(() => playStory(sessionId), node.delay);
} else if (node.type === "prompt") {
// User input required - send prompt and wait
session.emitter({
event: "prompt",
data: {
id: crypto.randomUUID(),
sender: "System",
text: node.text,
}
});
}
}
// src/lib/contexts/events.context.svelte.ts
class EventsContext {
readonly stream = new SSEClient<SSETopicsMap>(resolve("/api/events"), {
debug: dev,
topics: this.#buildTopicHandlers(["notifications", "logs"]),
});
activeTopics = new SvelteSet<StreamTopic>(["notifications", "logs"]);
chat = $state<ChatMessage[]>([]);
notifications = $state<Notification[]>([]);
logs = $state<LogEntry[]>([]);
toggleTopic(topic: StreamTopic) {
if (this.activeTopics.has(topic)) {
this.activeTopics.delete(topic);
this.stream.updateTopics({ removeTopics: [topic] });
} else {
this.activeTopics.add(topic);
this.stream.updateTopics({
addTopics: [topic],
nextTopics: this.#buildTopicHandlers([...this.activeTopics])
});
}
}
#buildTopicHandlers(streamTopics: StreamTopic[]) {
const handlers = {
// Chat handlers (always active)
message: (data) => this.chat.unshift(data),
prompt: (data) => (this.expectedPrompt = data.text),
end: (data) => {
this.chat.unshift(data);
this.ended = true;
},
history: (data) => (this.chat = [...data].reverse()),
};
// Optional stream handlers
if (streamTopics.includes("notifications")) {
handlers.notifications = (data) => {
this.notifications.unshift(data);
if (this.notifications.length > 5) this.notifications.pop();
};
}
if (streamTopics.includes("logs")) {
handlers.logs = (data) => {
this.logs.unshift(data);
if (this.logs.length > 30) this.logs.pop();
};
}
return handlers;
}
}
<!-- src/lib/components/chat/chat.component.svelte -->
<script lang="ts">
import { getEventsContext } from "$lib/contexts/events.context.svelte";
const { chat, expectedPrompt, sendPrompt, resetChat } = getEventsContext();
const isWaitingForUser = $derived(expectedPrompt !== null);
</script>
<section class="chat">
<header>
<h3>๐ฌ Chat</h3>
<button onclick={resetChat}>Reset</button>
</header>
<!-- Messages displayed in reverse chronological order -->
<ul class="messages">
{#each chat as message (message.id)}
<li class:mine={message.isMe}>
<strong>{message.sender}</strong>
<p>{message.text}</p>
</li>
{/each}
</ul>
<!-- User input -->
<footer>
<form onsubmit={sendPrompt}>
<input
value={expectedPrompt || "Waiting for story..."}
readonly
disabled={!isWaitingForUser}
/>
<button type="submit" disabled={!isWaitingForUser}>
Send
</button>
</form>
</footer>
</section>
Demonstrates periodic server-sent notifications with topic toggling:
// Part of the same /api/events endpoint
export function setupNotificationsPolling({ requestedTopics, emitWithHistory }) {
return setInterval(() => {
if (requestedTopics.includes("notifications")) {
const randomNotification = getRandomNotification();
emitWithHistory({
event: "notifications",
data: {
id: crypto.randomUUID(),
...randomNotification,
timestamp: new Date().toLocaleTimeString(),
}
});
}
}, 3000); // Every 3 seconds
}
<!-- src/lib/components/notifications/notifications.component.svelte -->
<script lang="ts">
import { getEventsContext } from "$lib/contexts/events.context.svelte";
const { notifications, stream, toggleTopic, activeTopics } = getEventsContext();
const isActive = $derived(activeTopics.has("notifications"));
const count = $derived(stream.topicCounters.notifications ?? 0);
</script>
<section class="notifications">
<header>
<h3>๐ Notifications</h3>
<button onclick={() => toggleTopic("notifications")}>
{isActive ? "Disable" : "Enable"}
</button>
<span class="count">{count} total</span>
</header>
<ul>
{#each notifications as notif (notif.id)}
<li class:error={notif.type === "error"}>
<strong>{notif.timestamp}</strong>
{notif.message}
</li>
{/each}
</ul>
</section>
Demonstrates a third topic that can be toggled independently:
// Server
export function setupLogsPolling({ requestedTopics, emitWithHistory }) {
return setInterval(() => {
if (requestedTopics.includes("logs")) {
const randomLog = getRandomLog();
emitWithHistory({
event: "logs",
data: {
id: crypto.randomUUID(),
...randomLog,
timestamp: Date.now(),
}
});
}
}, 2000); // Every 2 seconds
}
Multiple Topics in One Connection
Message Replay on Reconnection
Topic Safety Analysis
Session Persistence
Hybrid Communication
Proper Resource Management
Dynamic UI Updates
Scenario: Monitor system health with live CPU, memory, and network metrics.
// Server: src/routes/api/dashboard/+server.ts
interface DashboardTopics {
metrics: {
cpu: number;
memory: number;
network: { in: number; out: number };
};
alerts: {
level: "info" | "warning" | "critical";
message: string;
};
}
export const GET = () => {
return produceSSE<DashboardTopics>((emit) => {
// Send metrics every 2 seconds
const metricsInterval = setInterval(async () => {
const metrics = await getSystemMetrics();
emit({ event: "metrics", data: metrics });
// Send alert if CPU is high
if (metrics.cpu > 80) {
emit({
event: "alerts",
data: {
level: "warning",
message: `High CPU usage: ${metrics.cpu}%`,
}
});
}
}, 2000);
return () => clearInterval(metricsInterval);
});
};
<!-- Client -->
<script lang="ts">
let cpuHistory = $state<number[]>([]);
const stream = new SSEClient<DashboardTopics>("/api/dashboard", {
topics: {
metrics: (data) => {
cpuHistory = [...cpuHistory, data.cpu].slice(-20); // Keep last 20
},
alerts: (data) => {
toast.show(data.message, data.level);
},
},
});
</script>
<Dashboard {cpuHistory} status={stream.status} />
Scenario: Social feed with likes, comments, and follows in real-time.
// Server
interface FeedTopics {
like: { postId: string; userId: string; userName: string };
comment: { postId: string; text: string; userId: string };
follow: { followerId: string; followingId: string };
}
export const GET = ({ locals }) => {
if (!locals.user) {
return new Response("Unauthorized", { status: 401 });
}
return produceSSE<FeedTopics>((emit) => {
const userId = locals.user.id;
// Subscribe to relevant activities
const unsubscribeLikes = subscribeToLikes(userId, (like) => {
emit({ event: "like", data: like });
});
const unsubscribeComments = subscribeToComments(userId, (comment) => {
emit({ event: "comment", data: comment });
});
const unsubscribeFollows = subscribeToFollows(userId, (follow) => {
emit({ event: "follow", data: follow });
});
return () => {
unsubscribeLikes();
unsubscribeComments();
unsubscribeFollows();
};
});
};
<!-- Client -->
<script lang="ts">
let activities = $state<Activity[]>([]);
const feed = new SSEClient<FeedTopics>("/api/feed", {
topics: {
like: (data) => {
activities.unshift({
type: "like",
text: `${data.userName} liked your post`,
timestamp: Date.now(),
});
},
comment: (data) => {
activities.unshift({
type: "comment",
text: `New comment: "${data.text}"`,
timestamp: Date.now(),
});
// Update post comments count
updatePostComments(data.postId);
},
follow: (data) => {
activities.unshift({
type: "follow",
text: "Someone started following you",
timestamp: Date.now(),
});
},
},
});
</script>
Scenario: Track progress of file uploads, data processing, or batch operations.
// Server
interface TaskTopics {
progress: { taskId: string; percent: number; step: string };
complete: { taskId: string; result: unknown };
error: { taskId: string; error: string };
}
export const GET = ({ url, locals }) => {
const taskId = url.searchParams.get("taskId");
if (!taskId) {
return new Response("Missing taskId", { status: 400 });
}
return produceSSE<TaskTopics>((emit) => {
const task = getTask(taskId);
// Task progress updates
task.on("progress", (data) => {
emit({
event: "progress",
data: {
taskId,
percent: data.percent,
step: data.step,
}
});
});
// Task completion
task.on("complete", (result) => {
emit({ event: "complete", data: { taskId, result } });
});
// Task errors
task.on("error", (error) => {
emit({ event: "error", data: { taskId, error: error.message } });
});
return () => task.cleanup();
});
};
<!-- Client -->
<script lang="ts">
let progress = $state(0);
let currentStep = $state("Initializing...");
let result = $state<unknown>(null);
const taskStream = new SSEClient<TaskTopics>(
`/api/task?taskId=${taskId}`,
{
topics: {
progress: (data) => {
progress = data.percent;
currentStep = data.step;
},
complete: (data) => {
result = data.result;
taskStream.close();
},
error: (data) => {
alert(`Error: ${data.error}`);
taskStream.close();
},
},
}
);
</script>
<div>
<progress value={progress} max="100" />
<p>{currentStep} - {progress}%</p>
{#if result}
<div>โ
Task completed! Result: {JSON.stringify(result)}</div>
{/if}
</div>
Scenario: Show who's online and where they're editing in real-time.
// Server
interface CollabTopics {
presence: { userId: string; userName: string; status: "online" | "offline" };
cursor: { userId: string; position: { line: number; column: number } };
edit: { userId: string; changes: TextChange[] };
}
export const GET = ({ url, locals }) => {
const documentId = url.searchParams.get("docId");
return produceSSE<CollabTopics>((emit) => {
const userId = locals.user.id;
// Broadcast user presence
broadcastPresence(documentId, {
userId,
userName: locals.user.name,
status: "online",
});
// Subscribe to other users' activities
const unsubPresence = subscribeToPresence(documentId, (data) => {
if (data.userId !== userId) emit({ event: "presence", data });
});
const unsubCursors = subscribeToCursors(documentId, (data) => {
if (data.userId !== userId) emit({ event: "cursor", data });
});
const unsubEdits = subscribeToEdits(documentId, (data) => {
if (data.userId !== userId) emit({ event: "edit", data });
});
return () => {
// Broadcast offline status
broadcastPresence(documentId, {
userId,
userName: locals.user.name,
status: "offline",
});
unsubPresence();
unsubCursors();
unsubEdits();
};
});
};
Scenario: Stream AI-generated text token by token.
// Server
interface AITopics {
token: { text: string; index: number };
complete: { fullText: string; tokensUsed: number };
}
export const POST = async ({ request }) => {
const { prompt } = await request.json();
return produceSSE<AITopics>((emit) => {
let index = 0;
let fullText = "";
// Stream tokens from AI model
const stream = openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: prompt }],
stream: true,
});
(async () => {
for await (const chunk of stream) {
const token = chunk.choices[0]?.delta?.content || "";
if (token) {
fullText += token;
emit({ event: "token", data: { text: token, index: index++ } });
}
}
emit({
event: "complete",
data: {
fullText,
tokensUsed: index,
}
});
})();
return () => {
// Cancel stream if client disconnects
stream.controller?.abort();
};
});
};
<!-- Client -->
<script lang="ts">
let response = $state("");
let isComplete = $state(false);
function askAI(prompt: string) {
response = "";
isComplete = false;
const stream = new SSEClient<AITopics>("/api/ai", {
topics: {
token: (data) => {
response += data.text;
},
complete: (data) => {
isComplete = true;
console.log(`Used ${data.tokensUsed} tokens`);
stream.close();
},
},
});
}
</script>
<div>
<input onsubmit={() => askAI(promptValue)} />
<div class="response">
{response}
{#if !isComplete}<span class="cursor">โ</span>{/if}
</div>
</div>
Always verify user identity before sending sensitive data:
import type { RequestEvent } from "@sveltejs/kit";
export const GET = async ({ locals, cookies }: RequestEvent) => {
// Check if user is authenticated
if (!locals.user) {
return new Response("Unauthorized", { status: 401 });
}
return produceSSE<TopicsMap>((emit) => {
const userId = locals.user.id;
// Only send events relevant to this user
const unsubscribe = subscribeToUserEvents(userId, (event) => {
emit({ event: "notification", data: event });
});
return unsubscribe;
});
};
Prevent abuse by limiting connections per IP or user:
// src/lib/server/rate-limit.ts
const connections = new Map<string, number>();
export function checkRateLimit(identifier: string, max = 5): boolean {
const count = connections.get(identifier) || 0;
if (count >= max) {
return false; // Exceeded limit
}
connections.set(identifier, count + 1);
// Cleanup after 1 minute
setTimeout(() => {
connections.set(identifier, (connections.get(identifier) || 1) - 1);
}, 60_000);
return true;
}
// src/routes/api/events/+server.ts
export const GET = ({ getClientAddress, locals }) => {
const identifier = locals.user?.id || getClientAddress();
if (!checkRateLimit(identifier)) {
return new Response("Too Many Requests", {
status: 429,
headers: { "Retry-After": "60" },
});
}
return produceSSE<TopicsMap>((emit) => {
// ...
});
};
Prevent infinite connections that consume server resources:
const MAX_CONNECTION_TIME = 15 * 60 * 1000; // 15 minutes
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
// Auto-close after timeout
const timeout = setTimeout(() => {
emit({
event: "timeout",
data: {
message: "Connection expired. Please reconnect.",
}
});
// Connection will be closed automatically
}, MAX_CONNECTION_TIME);
return () => {
clearTimeout(timeout);
};
});
};
Validate topic subscriptions to prevent abuse:
const ALLOWED_TOPICS = ["chat", "notifications", "metrics"] as const;
export const GET = ({ url }) => {
const requestedTopics = url.searchParams.getAll("topics");
// Validate topics
const invalidTopics = requestedTopics.filter(
(topic) => !ALLOWED_TOPICS.includes(topic as any)
);
if (invalidTopics.length > 0) {
return new Response(`Invalid topics: ${invalidTopics.join(", ")}`, {
status: 400,
});
}
// Limit number of topics
if (requestedTopics.length > 10) {
return new Response("Too many topics requested", { status: 400 });
}
return produceSSE<TopicsMap>((emit) => {
// ...
});
};
Never send sensitive data without filtering:
export const GET = ({ locals }) => {
return produceSSE<TopicsMap>((emit) => {
const userId = locals.user.id;
const unsubscribe = db.users.onChange((user) => {
// โ BAD: Sending everything
// emit({ event: "user", data: user });
// โ
GOOD: Only send what's needed
emit({
event: "user",
data: {
id: user.id,
name: user.name,
avatar: user.avatar,
// Don't send: password, email, tokens, etc.
}
});
});
return unsubscribe;
});
};
Don't expose internal errors to clients:
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
try {
const interval = setInterval(async () => {
try {
const data = await fetchExternalAPI();
emit({ event: "data", data });
} catch (error) {
// Log internally
console.error("[SSE] Error fetching data:", error);
// Send generic error to client
emit({
event: "error",
data: {
message: "Failed to fetch data. Retrying...",
}
});
message: "Failed to fetch data. Please try again.",
// Don't send: error.stack, internal details
});
}
}, 5000);
return () => clearInterval(interval);
} catch (error) {
console.error("[SSE] Fatal error:", error);
throw error; // Will send 500 to client
}
});
};
Configure CORS properly for cross-origin requests:
// svelte.config.js
export default {
kit: {
cors: {
origin: process.env.NODE_ENV === "production"
? ["https://your-domain.com"]
: "*",
credentials: true,
},
},
};
// For specific endpoints
export const GET = () => {
const response = produceSSE<TopicsMap>((emit) => {
// ...
});
// Add CORS headers if needed
response.headers.set("Access-Control-Allow-Origin", "https://your-domain.com");
response.headers.set("Access-Control-Allow-Credentials", "true");
return response;
};
Always cleanup resources to prevent memory leaks:
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
// โ
Track all resources
const intervals: ReturnType<typeof setInterval>[] = [];
const timeouts: ReturnType<typeof setTimeout>[] = [];
const subscriptions: Array<() => void> = [];
// Create interval
const interval = setInterval(() => {
emit("ping", { timestamp: Date.now() });
}, 5000);
intervals.push(interval);
// Subscribe to events
const unsubscribe = eventEmitter.on("data", (data) => {
emit("data", data);
});
subscriptions.push(unsubscribe);
// โ
Cleanup ALL resources
return () => {
intervals.forEach((id) => clearInterval(id));
timeouts.forEach((id) => clearTimeout(id));
subscriptions.forEach((unsub) => unsub());
console.log("[SSE] All resources cleaned up");
};
});
};
Optimize for scalability:
// โ BAD: Sending too much data too frequently
const interval = setInterval(() => {
emit("data", hugeObject); // Large payload
}, 100); // Every 100ms
// โ
GOOD: Throttle and compress
const interval = setInterval(() => {
const summary = compressData(data); // Send only what's needed
emit("data", summary);
}, 5000); // Reasonable interval
Batch multiple updates:
// โ BAD: Emit on every change
db.collection.onChange((doc) => {
emit("update", doc); // Can fire 100s of times per second
});
// โ
GOOD: Batch updates
let batch: Doc[] = [];
let batchTimeout: ReturnType<typeof setTimeout>;
db.collection.onChange((doc) => {
batch.push(doc);
clearTimeout(batchTimeout);
batchTimeout = setTimeout(() => {
emit("updates", batch); // Send batch
batch = [];
}, 1000); // Wait 1s for more changes
});
Track SSE connections for debugging and analytics:
// src/lib/server/sse-monitor.ts
export const activeConnections = new Map<string, {
userId: string;
connectedAt: Date;
topics: string[];
}>();
export function trackConnection(id: string, metadata: any) {
activeConnections.set(id, {
...metadata,
connectedAt: new Date(),
});
console.log(`[SSE] Active connections: ${activeConnections.size}`);
}
export function untrackConnection(id: string) {
activeConnections.delete(id);
console.log(`[SSE] Active connections: ${activeConnections.size}`);
}
// Use in endpoint
export const GET = ({ locals }) => {
const connectionId = crypto.randomUUID();
trackConnection(connectionId, {
userId: locals.user?.id,
topics: requestedTopics,
});
return produceSSE<TopicsMap>((emit) => {
// ...
return () => {
untrackConnection(connectionId);
};
});
};
// Client-side debugging
const client = new SSEClient<TopicsMap>("/api/events", {
topics: { /* ... */ },
debug: true, // Enable console logs
});
// Now you'll see:
// [SSE] SSEClient initialized { baseURL: "/api/events", options: {...} }
// [SSE] Connecting to SSE { url: "...", topics: ["chat", "notifications"] }
// [SSE] SSE connection opened
// [SSE] Event received on topic "chat" { id: "...", text: "..." }
1. Network Tab
Content-Type: text/event-stream2. Console
debug: true, see all client-side logsexport const GET = ({ locals }) => {
const userId = locals.user?.id || "anonymous";
console.log(`[SSE] New connection from user: ${userId}`);
return produceSSE<TopicsMap>((emit) => {
console.log(`[SSE] Starting event stream for ${userId}`);
const interval = setInterval(() => {
console.log(`[SSE] Emitting event to ${userId}`);
emit("data", { timestamp: Date.now() });
}, 5000);
return () => {
console.log(`[SSE] Connection closed for ${userId}`);
clearInterval(interval);
};
});
};
Symptoms:
status transitions from "connecting" to "error" or "idle"Possible causes:
Server error before streaming starts
// โ BAD: Error before produceSSE
export const GET = () => {
throw new Error("Oops"); // Kills connection
return produceSSE((emit) => { /* ... */ });
};
// โ
GOOD: Error handling
export const GET = () => {
try {
validateRequest();
return produceSSE((emit) => { /* ... */ });
} catch (error) {
console.error(error);
return new Response("Internal Error", { status: 500 });
}
};
Missing return statement
// โ BAD: No Response returned
export const GET = () => {
produceSSE((emit) => { /* ... */ }); // Missing return!
};
// โ
GOOD
export const GET = () => {
return produceSSE((emit) => { /* ... */ });
};
Symptoms:
status === "connected"Possible causes:
Topic mismatch
// Client subscribes to "message"
const client = new SSEClient<TopicsMap>("/api/events", {
topics: {
message: (data) => console.log(data),
},
});
// Server sends "msg" (different name!)
emit("msg", { text: "Hello" }); // โ Won't be received
// Fix: Match topic names
emit("message", { text: "Hello" }); // โ
Client not subscribed to topic
// Client only subscribes to "chat"
const client = new SSEClient<TopicsMap>("/api/events", {
topics: {
chat: (data) => console.log(data),
// notifications: ... (not subscribed)
},
});
// Server sends "notifications" - client ignores it
emit("notifications", { message: "Hello" }); // Client won't receive
Server not checking requested topics
// โ BAD: Sending all topics regardless of request
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
// Always sends chat, even if client doesn't want it
setInterval(() => emit("chat", {...}), 1000);
});
};
// โ
GOOD: Check requested topics
export const GET = ({ url }) => {
const topics = url.searchParams.getAll("topics");
return produceSSE<TopicsMap>((emit) => {
if (topics.includes("chat")) {
setInterval(() => emit("chat", {...}), 1000);
}
});
};
Symptoms:
Possible causes:
Server immediately closing connection
// โ BAD: Returns empty cleanup (connection closes immediately)
return produceSSE((emit) => {
emit("message", { text: "Hello" });
return () => {}; // Connection closed after emit!
});
// โ
GOOD: Keep connection alive
return produceSSE((emit) => {
const interval = setInterval(() => {
emit("message", { text: "Hello" });
}, 1000);
return () => clearInterval(interval);
});
Server error in emit callback
return produceSSE((emit) => {
const interval = setInterval(() => {
// โ This throws and kills connection
const data = null;
emit("message", { text: data.text });
}, 1000);
return () => clearInterval(interval);
});
// โ
GOOD: Error handling
return produceSSE((emit) => {
const interval = setInterval(() => {
try {
const data = getData();
emit("message", { text: data.text });
} catch (error) {
console.error("[SSE] Error:", error);
// Connection stays open
}
}, 1000);
return () => clearInterval(interval);
});
Symptoms:
Solution: Always cleanup resources
// โ
Comprehensive cleanup
export const GET = () => {
return produceSSE<TopicsMap>((emit) => {
const intervals: ReturnType<typeof setInterval>[] = [];
const subscriptions: Array<() => void> = [];
// Track all resources
const i1 = setInterval(() => emit("data1", {}), 1000);
const i2 = setInterval(() => emit("data2", {}), 2000);
intervals.push(i1, i2);
const unsub = eventEmitter.on("event", (data) => emit("event", data));
subscriptions.push(unsub);
// Clean up EVERYTHING
return () => {
intervals.forEach(clearInterval);
subscriptions.forEach((fn) => fn());
console.log("[SSE] Cleanup completed");
};
});
};
Symptoms:
emit()data parameter has wrong typeSolution: Define proper TopicsMap
// โ BAD: No type safety
const client = new SSEClient<any>("/api/events", {
topics: {
message: (data) => console.log(data.text), // No autocomplete
},
});
// โ
GOOD: Proper types
interface TopicsMap {
message: { id: string; text: string };
notification: { id: string; message: string };
}
const client = new SSEClient<TopicsMap>("/api/events", {
topics: {
message: (data) => console.log(data.text), // โ
Autocomplete works!
},
});
Using curl:
# Test SSE endpoint
curl -N -H "Accept: text/event-stream" http://localhost:5173/api/events?topics=chat
# You should see:
# : keep-alive
#
# event: chat
# data: {"id":"123","text":"Hello"}
#
Using JavaScript:
// Quick test in browser console
const es = new EventSource("/api/events?topics=chat");
es.addEventListener("chat", (e) => console.log(JSON.parse(e.data)));
es.onerror = (e) => console.error("Error:", e);
Monitor connection health:
<script lang="ts">
let eventCount = $state(0);
let lastEventTime = $state<Date | null>(null);
let avgLatency = $state(0);
const client = new SSEClient<TopicsMap>("/api/events", {
topics: {
message: (data) => {
eventCount++;
lastEventTime = new Date();
// Calculate latency if timestamp is included
if ("timestamp" in data) {
const latency = Date.now() - (data as any).timestamp;
avgLatency = (avgLatency * (eventCount - 1) + latency) / eventCount;
}
},
},
});
</script>
<div class="metrics">
<div>Status: {client.status}</div>
<div>Events received: {eventCount}</div>
<div>Last event: {lastEventTime?.toLocaleTimeString() || "N/A"}</div>
<div>Avg latency: {avgLatency.toFixed(0)}ms</div>
</div>
A: No, SSE is unidirectional (server โ client only). For client โ server communication:
Example:
// Receive via SSE
const stream = new SSEClient<TopicsMap>("/api/events", { /* ... */ });
// Send via HTTP POST
async function sendMessage(text: string) {
await fetch("/api/messages", {
method: "POST",
body: JSON.stringify({ text }),
});
}
A:
Best practice: Use one connection with multiple topics instead of multiple connections.
// โ BAD: Multiple connections
const chatStream = new SSEClient<{chat: Message}>("/api/chat", ...);
const notifStream = new SSEClient<{notif: Notif}>("/api/notifications", ...);
// โ
GOOD: One connection, multiple topics
const stream = new SSEClient<{chat: Message, notif: Notif}>("/api/events", {
topics: {
chat: (msg) => handleChat(msg),
notif: (n) => handleNotif(n),
},
});
A: The client will automatically:
onerror triggered)reconnectWait milliseconds (default: 3000ms)Note: Any server-side state (like session data) will be lost unless persisted to a database.
A: It depends:
For long-lived connections, prefer traditional servers or platforms with SSE support.
A: Use cookies or query parameters (cookies are better for security):
// Client: cookies are sent automatically
const stream = new SSEClient<TopicsMap>("/api/events", { /* ... */ });
// Server: access via locals or cookies
export const GET = ({ locals, cookies }) => {
const user = locals.user; // From session middleware
if (!user) {
return new Response("Unauthorized", { status: 401 });
}
return produceSSE<TopicsMap>((emit) => {
// Send user-specific events
subscribeToUserEvents(user.id, emit);
});
};
Avoid tokens in URL: They can leak in logs and browser history.
A: The system uses global sequence IDs to track and replay missed messages:
lastEventID state// Server automatically assigns IDs
const emitWithHistory = createEmitWithHistory({ sessionID, emit });
emitWithHistory({ event: "chat", data: { text: "Hello" } });
// โ Sent with id: 42
// Client receives and tracks
// lastEventID = "42"
// On network drop + reconnect:
// Client: GET /api/events?lastEventID=42
// Server: Replays messages 43, 44, 45...
Benefits:
A: Topic safety prevents data inconsistency when dynamically changing subscriptions.
The Problem:
// Connection 1: subscribed to ["chat"]
// Last-Event-ID: 100
// User enables notifications
// Connection 2: subscribed to ["chat", "notifications"]
// If we just replay from ID 100...
// โ We'd miss all notifications sent before ID 100!
The Solution: Server analyzes which topics are NEW vs. SAFE:
const { safeTopics, newTopics } = analyzeTopicSafety(
sessionID,
["chat", "notifications"] // Current request
);
// safeTopics: ["chat"] โ delta replay
// newTopics: ["notifications"] โ full replay
This ensures you get all notification history even if they were sent while you had them disabled.
A: Use the updateTopics method:
const stream = new SSEClient<Topics>("/api/events", {
topics: {
chat: (msg) => console.log(msg)
}
});
// Add notifications later
stream.updateTopics({
addTopics: ["notifications"]
});
// โ Closes current connection
// โ Opens new with topics=chat,notifications
// โ Server replays missed notifications
// Remove chat
stream.updateTopics({
removeTopics: ["chat"]
});
// Replace all topics
stream.updateTopics({
nextTopics: {
logs: (log) => console.log(log)
}
});
The lastEventID is automatically preserved across topic changes for intelligent replay.
A: The topicCounters property tracks how many messages were received per topic:
const stream = new SSEClient<Topics>("/api/events", {
topics: {
chat: (msg) => handleChat(msg),
notifications: (n) => handleNotification(n)
}
});
// Access counters reactively
console.log(stream.topicCounters.chat); // 42
console.log(stream.topicCounters.notifications); // 7
Use cases:
Example UI:
<button>
Notifications
{#if stream.topicCounters.notifications > 0}
<badge>{stream.topicCounters.notifications}</badge>
{/if}
</button>
A: It depends on your use case:
Small buffer (50-100 messages):
Large buffer (1000+ messages):
Best practice: Use a ring buffer with TTL:
const MAX_HISTORY_SIZE = 100;
const MAX_HISTORY_AGE = 5 * 60 * 1000; // 5 minutes
function pushMessage({ sessionID, topic, data }) {
const session = getSession(sessionID);
// Add message
session.history.push({
id: String(globalSequenceID++),
topic,
data,
timestamp: Date.now(),
});
// Remove old messages (by size)
if (session.history.length > MAX_HISTORY_SIZE) {
session.history.shift();
}
// Remove old messages (by age)
const now = Date.now();
session.history = session.history.filter(
(msg) => now - msg.timestamp < MAX_HISTORY_AGE
);
}
For production: Consider persisting to Redis or a database for longer retention.
A: Yes, but you need centralized state management:
Problem: Each server instance has its own memory
Solution 1: Sticky sessions (load balancer)
# Nginx
upstream backend {
ip_hash; # Routes same IP to same server
server backend1:3000;
server backend2:3000;
}
Solution 2: Shared state (Redis)
import { Redis } from "ioredis";
const redis = new Redis();
// Store history in Redis
function pushMessage({ sessionID, topic, data }) {
const message = {
id: String(Date.now()),
topic,
data,
};
redis.lpush(`history:${sessionID}`, JSON.stringify(message));
redis.ltrim(`history:${sessionID}`, 0, 99); // Keep last 100
redis.expire(`history:${sessionID}`, 300); // 5 min TTL
}
// Retrieve on reconnect
async function getHistory(sessionID: string) {
const items = await redis.lrange(`history:${sessionID}`, 0, -1);
return items.map((item) => JSON.parse(item));
}
Solution 3: Pub/Sub (for live messages)
// Server 1 emits event
redis.publish("events:chat", JSON.stringify(message));
// All servers listen
redis.subscribe("events:chat");
redis.on("message", (channel, message) => {
// Forward to connected clients on this instance
broadcastToLocalClients(JSON.parse(message));
});
A: SSEClient is a wrapper around the browser's native EventSource API with additional features:
EventSource is the low-level browser API that only provides basic SSE functionality.
A: No, SSE only supports text (UTF-8). For binary data:
// โ BAD: Binary not supported
emit({ event: "image", data: binaryImageData });
// โ
OPTION 1: Base64 encode
emit({
event: "image",
data: {
data: Buffer.from(binaryImageData).toString("base64"),
type: "image/png",
}
});
// โ
OPTION 2: Send URL instead
emit({
event: "image",
data: {
url: "/api/images/123.png",
}
});
A: Multiple ways:
1. Browser DevTools:
2. curl:
curl -N -H "Accept: text/event-stream" http://localhost:5173/api/events
3. Browser console:
const es = new EventSource("/api/events?topics=chat");
es.addEventListener("chat", (e) => console.log(e.data));
4. Enable debug mode:
const client = new SSEClient<TopicsMap>("/api/events", {
topics: { /* ... */ },
debug: true, // See all logs
});
A: Not directly, but you can control it:
const client = new SSEClient<TopicsMap>("/api/events", {
autoConnect: false, // Don't connect automatically
topics: { /* ... */ },
});
// Manually control
function play() {
client.connect();
}
function pause() {
client.close();
}
Note: Closing and reopening creates a new connection.
A: The server's ReadableStream automatically handles backpressure. If a client is slow:
Solution: Implement client timeouts and limits:
const MAX_QUEUE_SIZE = 100;
let queueSize = 0;
return produceSSE<TopicsMap>((emit) => {
const originalEmit = emit;
// Wrapped emit with queue tracking
const safeEmit: typeof emit = (topic, data) => {
if (queueSize > MAX_QUEUE_SIZE) {
console.warn("[SSE] Client too slow, disconnecting");
return; // Stop sending
}
queueSize++;
originalEmit(topic, data);
setTimeout(() => queueSize--, 100);
};
// Use safeEmit instead of emit
});
A: Minimal:
: keep-alive\n\n)For 1000 concurrent connections: ~930 bytes/second total.
A: One connection with multiple topics is more efficient:
One connection:
Multiple connections:
Exception: If topics have very different data rates or lifetimes, separate connections might make sense.
A: Depends on:
Rough estimates:
1000 concurrent connections: ~50-100 MB (idle) + event data
Tip: Monitor your server's memory usage and implement connection limits.
SvelteKit works perfectly on Vercel with SSE, but be aware of time limits:
Install adapter:
pnpm add -D @sveltejs/adapter-vercel
Configure:
// svelte.config.js
import adapter from "@sveltejs/adapter-vercel";
export default {
kit: {
adapter: adapter({
// Function execution time limits
maxDuration: 60, // Pro plan: up to 900s (15 min)
}),
},
};
โ ๏ธ Important notes:
Similar to Vercel, with function timeout limits:
pnpm add -D @sveltejs/adapter-netlify
// svelte.config.js
import adapter from "@sveltejs/adapter-netlify";
export default {
kit: {
adapter: adapter(),
},
};
Limits:
Best option for long-lived SSE connections.
pnpm add -D @sveltejs/adapter-node
// svelte.config.js
import adapter from "@sveltejs/adapter-node";
export default {
kit: {
adapter: adapter({
out: "build",
}),
},
};
Build and run:
# Build
pnpm build
# Run
node build/index.js
# Or with PM2
pm2 start build/index.js --name sveltekit-sse
Advantages:
# Dockerfile
FROM node:20-alpine
WORKDIR /app
# Copy package files
COPY package.json pnpm-lock.yaml ./
# Install pnpm
RUN npm install -g pnpm
# Install dependencies
RUN pnpm install --frozen-lockfile --prod
# Copy built files
COPY build ./build
COPY package.json ./
EXPOSE 3000
CMD ["node", "build/index.js"]
Deploy:
# Build SvelteKit app
pnpm build
# Build Docker image
docker build -t sveltekit-sse .
# Run container
docker run -p 3000:3000 sveltekit-sse
โ ๏ธ Limited SSE support on edge runtime. Consider alternatives:
Option 1: Use Durable Objects (advanced)
pnpm add -D @sveltejs/adapter-cloudflare
Option 2: Use Workers with WebSocket
Option 3: Hybrid approach
These platforms work great with SSE (no time limits):
pnpm add -D @sveltejs/adapter-node
Railway:
railway up
Render:
# render.yaml
services:
- type: web
name: sveltekit-sse
env: node
buildCommand: pnpm install && pnpm build
startCommand: node build/index.js
Fly.io:
# fly.toml
app = "sveltekit-sse"
[build]
builder = "paketobuildpacks/builder:base"
buildpacks = ["gcr.io/paketo-buildpacks/nodejs"]
[[services]]
internal_port = 3000
protocol = "tcp"
[[services.ports]]
port = 80
handlers = ["http"]
[[services.ports]]
port = 443
handlers = ["tls", "http"]
# .env
PUBLIC_API_URL=https://your-domain.com
ORIGIN=https://your-domain.com
// Access in SvelteKit
import { PUBLIC_API_URL } from "$env/static/public";
const stream = new SSEClient<TopicsMap>(`${PUBLIC_API_URL}/api/events`, {
// ...
});
If using Nginx in front of your Node.js server:
# /etc/nginx/sites-available/sveltekit-sse
server {
listen 80;
server_name your-domain.com;
location / {
proxy_pass http://localhost:3000;
proxy_http_version 1.1;
# SSE-specific headers
proxy_set_header Connection '';
proxy_set_header Cache-Control 'no-cache';
proxy_set_header X-Accel-Buffering 'no';
proxy_buffering off;
# Standard proxy headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts (important for SSE)
proxy_read_timeout 1h;
proxy_send_timeout 1h;
}
}
Key settings for SSE:
proxy_buffering off โ Disable bufferingX-Accel-Buffering 'no' โ Disable Nginx bufferingConnection '' โ Clear connection headerproxy_read_timeout 1h โ Long timeout for SSEHealth check endpoint:
// src/routes/api/health/+server.ts
export const GET = () => {
return new Response(JSON.stringify({
status: "ok",
timestamp: new Date().toISOString(),
activeConnections: getActiveConnectionCount(),
}), {
headers: { "Content-Type": "application/json" },
});
};
Connection monitoring:
// src/lib/server/monitor.ts
let connectionCount = 0;
export function incrementConnections() {
connectionCount++;
console.log(`[SSE] Active connections: ${connectionCount}`);
}
export function decrementConnections() {
connectionCount--;
console.log(`[SSE] Active connections: ${connectionCount}`);
}
export function getActiveConnectionCount() {
return connectionCount;
}
// Use in SSE endpoint
import { incrementConnections, decrementConnections } from "$lib/server/monitor";
export const GET = () => {
incrementConnections();
return produceSSE<TopicsMap>((emit) => {
// ...
return () => {
decrementConnections();
};
});
};
Contributions are welcome! Feel free to:
git checkout -b feat/my-feature)git commit -m 'feat: add my feature')git push origin feat/my-feature)โ Perfect for:
โ Not ideal for:
When implementing SSE in production, consider these features:
Simple pattern (single endpoint):
Client โ SSE(/api/events?topics=chat,notifications) โ Server
Hybrid pattern (SSE + HTTP):
Client โ SSE(/api/events) โ Server (receive)
Client โ POST(/api/messages) โ Server (send)
Scalable pattern (with pub/sub):
Client โ SSE โ App Server โ Redis Pub/Sub โ Background Workers
With message replay:
Client (reconnect with Last-Event-ID: 42)
โ
Server checks history buffer
โ
Replays messages 43, 44, 45...
โ
Resumes live stream from 46+
This project is licensed under the MIT License. See the LICENSE file for more details.
โญ Made with โค๏ธ by Gustavo Morinaga
If you found this project helpful, please consider:
โญ Giving it a star on GitHub
๐ฆ Sharing it on social media
๐ฌ Contributing with improvements or bug reports
Happy coding! ๐