sveltekit-pgboss Svelte Themes

Sveltekit Pgboss

@segbedji/sveltekit-pgboss

A reusable pg-boss job system for SvelteKit projects. Provides a single factory function that sets up a pg-boss instance with queue management, worker registration, schedule registration, orphan cleanup, and a dashboard data layer — so you can drop background jobs into any SvelteKit app without re-writing the boilerplate.

Install

npm install @segbedji/sveltekit-pgboss
# or
bun add @segbedji/sveltekit-pgboss

Quick Start

import { createJobSystem, queue } from '@segbedji/sveltekit-pgboss';

const { send, initJobs } = createJobSystem({
  connectionString: process.env.DATABASE_URL!,
  queues: {
    'send-email': queue<{ to: string; subject: string }>({ retryLimit: 3 }),
    'generate-report': queue<{ type: string }>({ expireInSeconds: 3600 }),
  },
  schedules: [
    { queue: 'generate-report', cron: '0 8 * * 1' }, // every Monday 8 AM
  ],
});

// Initialize with handlers
await initJobs({
  'send-email': async (data) => {
    console.log('Sending email to', data.to);
  },
  'generate-report': async (data) => {
    console.log('Generating report', data.type);
  },
});

// Type-safe: queue name and payload are checked at compile time
await send({ name: 'send-email', data: { to: '[email protected]', subject: 'Hello' } });

// TS error: 'nope' is not a valid queue name
await send({ name: 'nope', data: {} });

// TS error: wrong payload shape for 'send-email'
await send({ name: 'send-email', data: { type: 'weekly' } });

API Reference

createJobSystem(config)

Returns { send, getBoss, stopBoss, initJobs, dashboard }.

Config

Field Type Default Description
connectionString string required PostgreSQL connection string
schema string 'pgboss' pg-boss schema name
queues Record<string, QueueConfig<T>> required Queue definitions created with queue<T>()
schedules ScheduleConfig[] [] Cron schedules
cleanOrphans boolean true Fail orphaned active jobs on startup
onError (err: Error) => void console.error Error handler

queue<T>(config?)

Creates a typed queue definition. The type parameter T defines the payload shape for send() and the handler.

import { queue } from '@segbedji/sveltekit-pgboss';

// With config
queue<{ to: string; subject: string }>({ retryLimit: 3, expireInSeconds: 3600 })

// Without config (defaults only)
queue<{ to: string }>()
Field Type Default Description
batchSize number 1 Jobs per batch
expireInSeconds number pg-boss default Job expiration
retryLimit number pg-boss default Max retries
retryDelay number pg-boss default Delay between retries (seconds)
localConcurrency number pg-boss default Max concurrent jobs per worker
onFailed (opts: { data: T; error: unknown }) => Promise<void> Called when a job exhausts all retries

Returned Object

Property Type Description
send (opts: { name, data, options? }) => Promise<string | null> Type-safe job sender — queue name and payload are validated at compile time
getBoss () => Promise<PgBoss> Get the raw pg-boss instance (starts it on first call)
stopBoss () => Promise<void> Graceful shutdown
initJobs (handlers) => Promise<void> Initialize: clean orphans, create queues, register workers & schedules. Handlers are required for every queue.
dashboard.getData({ page?, perPage? }) () => Promise<DashboardData> Queue stats + paginated jobs (default: page 1, 50 per page)
dashboard.rerunJob({ queue, jobId }) () => Promise<{ queued: true }> Re-queue a job by ID
dashboard.getStats() () => Promise<QueueStats[]> Queue stats only
dashboard.getRecentJobs({ page?, perPage? }) () => Promise<{ jobs: JobInfo[], pagination: PaginationInfo }> Paginated jobs (default: page 1, 50 per page)

Usage with SvelteKit

Define your job system

Queue definitions and handlers live in separate files. This avoids circular imports when handlers need to call send.

// src/lib/server/jobs/system.ts — defines queues, exports send
import { createJobSystem, queue } from '@segbedji/sveltekit-pgboss';

const { send, getBoss, stopBoss, initJobs, dashboard } = createJobSystem({
  connectionString: process.env.DATABASE_URL!,
  queues: {
    'send-email': queue<{ to: string; subject: string }>({ retryLimit: 3 }),
    'generate-report': queue<{ type: string }>({
      expireInSeconds: 3600,
      retryLimit: 2,
    }),
  },
  schedules: [
    { queue: 'generate-report', cron: '0 8 * * 1' },
  ],
});

export { send, getBoss, stopBoss, initJobs, dashboard };
// src/lib/server/jobs/index.ts — wires handlers
import { initJobs } from './system';
import { handleSendEmail } from './handlers/send-email';
import { handleGenerateReport } from './handlers/generate-report';

const init = () =>
  initJobs({
    'send-email': handleSendEmail,
    'generate-report': handleGenerateReport,
  });

export { init as initJobs };

Handlers can safely import send from system.ts without creating a circular dependency:

// src/lib/server/jobs/handlers/send-email.ts
import { send } from '../system';

const handleSendEmail = async (data: { to: string; subject: string }) => {
  // ... can use send() to enqueue other jobs
};

export { handleSendEmail };

Start workers in hooks.server.ts

// src/hooks.server.ts
import { initJobs } from '$lib/server/jobs';
import { building } from '$app/environment';

export const init = async () => {
  if (!building && process.env.ENABLE_WORKER === 'true') {
    await initJobs();
  }
};

Set ENABLE_WORKER=true on the process that should run workers. This lets you run workers in-process during development and in a separate container in production.

Send jobs from anywhere

import { send } from '$lib/server/jobs/system';

// Type-safe: TS validates queue name and payload
await send({ name: 'send-email', data: { to: '[email protected]', subject: 'Hello' } });

Dashboard remote functions

Wrap the dashboard helpers in SvelteKit remote functions for your admin panel:

// src/lib/remote-functions/admin/jobs.remote.ts
import { command, query } from '$app/server';
import { dashboard } from '$lib/server/jobs/system';
import { z } from 'zod';

const getJobsDashboard = query(
  z.object({ page: z.number().optional(), perPage: z.number().optional() }),
  async ({ page, perPage }) => {
    // Add your own auth check here
    return dashboard.getData({ page, perPage });
  }
);

const rerunJob = command(
  z.object({ queue: z.string(), jobId: z.string() }),
  async ({ queue, jobId }) => {
    // Add your own auth check here
    return dashboard.rerunJob({ queue, jobId });
  }
);

export { getJobsDashboard, rerunJob };

Admin page example

A minimal admin page using the remote functions above:

<!-- src/routes/admin/jobs/+page.svelte -->
<script lang="ts">
  import { getJobsDashboard, rerunJob } from '$lib/remote-functions/admin/jobs.remote';

  let data = $state(getJobsDashboard());

  const handleRerun = async (queue: string, jobId: string) => {
    await rerunJob({ queue, jobId });
    data = getJobsDashboard();
  };
</script>

{#await data}
  <p>Loading...</p>
{:then { queues, jobs, pagination }}
  <h2>Queues</h2>
  <table>
    <thead>
      <tr><th>Queue</th><th>Queued</th><th>Active</th><th>Deferred</th><th>Total</th></tr>
    </thead>
    <tbody>
      {#each queues as q}
        <tr>
          <td>{q.name}</td>
          <td>{q.queuedCount}</td>
          <td>{q.activeCount}</td>
          <td>{q.deferredCount}</td>
          <td>{q.totalCount}</td>
        </tr>
      {/each}
    </tbody>
  </table>

  <h2>Recent Jobs</h2>
  <table>
    <thead>
      <tr><th>ID</th><th>Queue</th><th>State</th><th>Created</th><th>Actions</th></tr>
    </thead>
    <tbody>
      {#each jobs as job}
        <tr>
          <td>{job.id.slice(0, 8)}</td>
          <td>{job.name}</td>
          <td>{job.state}</td>
          <td>{new Date(job.createdOn).toLocaleString()}</td>
          <td>
            {#if job.state === 'failed' || job.state === 'completed'}
              <button onclick={() => handleRerun(job.name, job.id)}>Rerun</button>
            {/if}
          </td>
        </tr>
      {/each}
    </tbody>
  </table>
{/await}

Docker Compose worker pattern

Run workers separately from your web server:

# docker-compose.yml
services:
  web:
    build: .
    environment:
      - ENABLE_WORKER=false

  worker:
    build: .
    environment:
      - ENABLE_WORKER=true

Types

All types are exported:

import type {
  JobSystemConfig,
  PayloadMap,
  HandlersMap,
  QueueConfig,
  ScheduleConfig,
  QueueStats,
  JobInfo,
  PaginationInfo,
  DashboardData,
} from '@segbedji/sveltekit-pgboss';

License

MIT

Top categories

Loading Svelte Themes