Typed PostgreSQL LISTEN/NOTIFY helpers for SvelteKit query.live(...) remote functions.
This package gives you a single factory API:
createPgLiveQuery<Channels>(options?)You define your channel-to-payload types once, then use pgLiveQuery.fn(...) inside query.live(...).
npm install svelte-pg-live-query pg-listen
onInit + onNotified live query modelRecommended notify payload shape from SQL trigger:
{
"operation": "INSERT | UPDATE | DELETE",
"table": "User",
"row": { "...": "row data" }
}
Using a single row field is easier to type than new/old branching.
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
payload JSON;
BEGIN
payload := json_build_object(
'operation', TG_OP,
'table', TG_TABLE_NAME,
'row', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE row_to_json(NEW) END
);
PERFORM pg_notify(TG_ARGV[0], payload::text);
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
// src/routes/some.remote.ts
import { query } from '$app/server';
import { createPgLiveQuery } from 'svelte-pg-live-query';
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE';
table: string;
row: { id: number; email: string; name: string | null };
};
};
const pgLiveQuery = createPgLiveQuery<Channels>();
export const usersLive = query.live(
pgLiveQuery.fn({
channel: 'user_changes',
onInit: async () => {
return { ok: true };
},
onNotified: async ({ payload }) => {
// payload is fully typed by channel
console.log(payload.operation, payload.row.id);
return { ok: true };
}
})
);
const pgLiveQuery = createPgLiveQuery<Channels>({
debug: true,
debounceMs: 50,
postgres: {
connectionString: process.env.DATABASE_URL,
subscriberConfig: {
// any pg-listen subscriber options except connectionString
retryInterval: 200,
retryTimeout: 5000
},
onError: (error) => {
console.error('listener error', error);
}
}
});
debug?: booleandebounceMs?: numberpostgres?:connectionString?: stringsubscriberConfig?: Omit<pg-listen config, 'connectionString'>onError?: (error: Error) => voidfn(...) optionspgLiveQuery.fn({
channel: 'user_changes',
id: 'users-stream',
debug: true,
onInit: async ({ input }) => {
return null;
},
onNotified: async ({ input, payload }) => {
return payload;
},
onServer: ({ input, payload }) => {
// optional side effects/logging
}
});
onInit return value is first yieldonNotified return value is yielded to clientsSKIP to ignore a notification and not emit an updateimport { SKIP } from 'svelte-pg-live-query';
Use SKIP when a notification is valid but not relevant to the current live-query input.
import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE';
table: string;
row: { id: number; email: string };
};
};
const pgLiveQuery = createPgLiveQuery<Channels>();
export const userByIdLive = query.live(
pgLiveQuery.fn({
channel: 'user_changes',
onInit: async ({ input }: { input: { id: number } }) => {
return { id: input.id };
},
onNotified: async ({ input, payload }) => {
// Ignore notifications for other users
if (payload.row.id !== input.id) return SKIP;
return payload.row;
}
})
);
This library does not create database triggers for you. You must create your own Postgres NOTIFY triggers/channels that match the channel names and payload shape used in your createPgLiveQuery config.
PostgreSQL trigger docs:
Below is a minimal example using Prisma migrations to create NOTIFY triggers.
npx prisma migrate dev --name add_user_changes_listener --create-only
This creates a new migration folder with migration.sql that you can edit before applying.
migration.sql-- Function that sends one consistent payload shape
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
payload JSON;
BEGIN
payload := json_build_object(
'operation', TG_OP, -- operation type: INSERT | UPDATE | DELETE
'table', TG_TABLE_NAME, -- table name that fired the trigger
'row', CASE -- row shape returned to your live query payload
WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
ELSE row_to_json(NEW)
END
);
-- Channel name your app listens to (must match `pgLiveQuery.fn({ channel: ... })`)
PERFORM pg_notify('user_changes', payload::text);
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Trigger name (you choose this; useful for identifying/dropping later)
CREATE TRIGGER user_notify_changes
AFTER INSERT OR UPDATE OR DELETE ON "User"
FOR EACH ROW
EXECUTE FUNCTION notify_table_change();
npx prisma migrate dev
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE'; // from payload.operation
table: string; // from payload.table
row: { id: number; email: string }; // from payload.row
};
};
import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';
import { prisma } from '$lib/server/db'; // your Prisma client path
type Channels = {
user_changes: {
operation: 'INSERT' | 'UPDATE' | 'DELETE';
table: string;
row: { id: number; email: string; name: string | null };
};
};
const pgLiveQuery = createPgLiveQuery<Channels>();
export const userByIdLive = query.live(
pgLiveQuery.fn({
channel: 'user_changes', // must match pg_notify('user_changes', ...)
onInit: async ({ input }: { input: { id: number } }) => {
return prisma.user.findUnique({ where: { id: input.id } });
},
onNotified: async ({ input, payload }) => {
if (payload.row.id !== input.id) return SKIP; // ignore unrelated updates
return prisma.user.findUnique({ where: { id: input.id } });
}
})
);