svelte-pg-live-query Svelte Themes

Svelte Pg Live Query

Easy PG listener for Svelte Query.Live funcs

svelte-pg-live-query

Typed PostgreSQL LISTEN/NOTIFY helpers for SvelteKit query.live(...) remote functions.

This package gives you a single factory API:

  • createPgLiveQuery<Channels>(options?)

You define your channel-to-payload types once, then use pgLiveQuery.fn(...) inside query.live(...).

Install

npm install svelte-pg-live-query pg-listen

What you get

  • Strongly-typed payloads per channel
  • Shared Postgres listener scaffolding under the hood
  • Easy onInit + onNotified live query model
  • Configurable listener connection/options via factory

Payload shape

Recommended notify payload shape from SQL trigger:

{
  "operation": "INSERT | UPDATE | DELETE",
  "table": "User",
  "row": { "...": "row data" }
}

Using a single row field is easier to type than new/old branching.

SQL trigger example

CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
  payload JSON;
BEGIN
  payload := json_build_object(
    'operation', TG_OP,
    'table', TG_TABLE_NAME,
    'row', CASE WHEN TG_OP = 'DELETE' THEN row_to_json(OLD) ELSE row_to_json(NEW) END
  );

  PERFORM pg_notify(TG_ARGV[0], payload::text);

  IF (TG_OP = 'DELETE') THEN
    RETURN OLD;
  END IF;

  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Basic usage

// src/routes/some.remote.ts
import { query } from '$app/server';
import { createPgLiveQuery } from 'svelte-pg-live-query';

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE';
    table: string;
    row: { id: number; email: string; name: string | null };
  };
};

const pgLiveQuery = createPgLiveQuery<Channels>();

export const usersLive = query.live(
  pgLiveQuery.fn({
    channel: 'user_changes',
    onInit: async () => {
      return { ok: true };
    },
    onNotified: async ({ payload }) => {
      // payload is fully typed by channel
      console.log(payload.operation, payload.row.id);
      return { ok: true };
    }
  })
);

Factory options

const pgLiveQuery = createPgLiveQuery<Channels>({
  debug: true,
  debounceMs: 50,
  postgres: {
    connectionString: process.env.DATABASE_URL,
    subscriberConfig: {
      // any pg-listen subscriber options except connectionString
      retryInterval: 200,
      retryTimeout: 5000
    },
    onError: (error) => {
      console.error('listener error', error);
    }
  }
});

Option reference

  • debug?: boolean
  • debounceMs?: number
  • postgres?:
    • connectionString?: string
    • subscriberConfig?: Omit<pg-listen config, 'connectionString'>
    • onError?: (error: Error) => void

fn(...) options

pgLiveQuery.fn({
  channel: 'user_changes',
  id: 'users-stream',
  debug: true,
  onInit: async ({ input }) => {
    return null;
  },
  onNotified: async ({ input, payload }) => {
    return payload;
  },
  onServer: ({ input, payload }) => {
    // optional side effects/logging
  }
});

Return semantics

  • onInit return value is first yield
  • each onNotified return value is yielded to clients
  • return SKIP to ignore a notification and not emit an update
import { SKIP } from 'svelte-pg-live-query';

SKIP example (ignore unrelated updates)

Use SKIP when a notification is valid but not relevant to the current live-query input.

import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE';
    table: string;
    row: { id: number; email: string };
  };
};

const pgLiveQuery = createPgLiveQuery<Channels>();

export const userByIdLive = query.live(
  pgLiveQuery.fn({
    channel: 'user_changes',
    onInit: async ({ input }: { input: { id: number } }) => {
      return { id: input.id };
    },
    onNotified: async ({ input, payload }) => {
      // Ignore notifications for other users
      if (payload.row.id !== input.id) return SKIP;

      return payload.row;
    }
  })
);

Database setup requirement

This library does not create database triggers for you. You must create your own Postgres NOTIFY triggers/channels that match the channel names and payload shape used in your createPgLiveQuery config.

PostgreSQL trigger docs:

Prisma migration example (create + apply trigger)

Below is a minimal example using Prisma migrations to create NOTIFY triggers.

1) Create an empty migration

npx prisma migrate dev --name add_user_changes_listener --create-only

This creates a new migration folder with migration.sql that you can edit before applying.

2) Edit migration.sql

-- Function that sends one consistent payload shape
CREATE OR REPLACE FUNCTION notify_table_change() RETURNS trigger AS $$
DECLARE
  payload JSON;
BEGIN
  payload := json_build_object(
    'operation', TG_OP,          -- operation type: INSERT | UPDATE | DELETE
    'table', TG_TABLE_NAME,      -- table name that fired the trigger
    'row', CASE                  -- row shape returned to your live query payload
      WHEN TG_OP = 'DELETE' THEN row_to_json(OLD)
      ELSE row_to_json(NEW)
    END
  );

  -- Channel name your app listens to (must match `pgLiveQuery.fn({ channel: ... })`)
  PERFORM pg_notify('user_changes', payload::text);

  IF (TG_OP = 'DELETE') THEN
    RETURN OLD;
  END IF;
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Trigger name (you choose this; useful for identifying/dropping later)
CREATE TRIGGER user_notify_changes
AFTER INSERT OR UPDATE OR DELETE ON "User"
FOR EACH ROW
EXECUTE FUNCTION notify_table_change();

3) Apply migration to database

npx prisma migrate dev

4) Match channel + payload type in your live query

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE'; // from payload.operation
    table: string;                              // from payload.table
    row: { id: number; email: string };         // from payload.row
  };
};
import { query } from '$app/server';
import { createPgLiveQuery, SKIP } from 'svelte-pg-live-query';
import { prisma } from '$lib/server/db'; // your Prisma client path

type Channels = {
  user_changes: {
    operation: 'INSERT' | 'UPDATE' | 'DELETE';
    table: string;
    row: { id: number; email: string; name: string | null };
  };
};

const pgLiveQuery = createPgLiveQuery<Channels>();

export const userByIdLive = query.live(
  pgLiveQuery.fn({
    channel: 'user_changes', // must match pg_notify('user_changes', ...)
    onInit: async ({ input }: { input: { id: number } }) => {
      return prisma.user.findUnique({ where: { id: input.id } });
    },
    onNotified: async ({ input, payload }) => {
      if (payload.row.id !== input.id) return SKIP; // ignore unrelated updates
      return prisma.user.findUnique({ where: { id: input.id } });
    }
  })
);

Top categories

Loading Svelte Themes