A reusable pg-boss job system for SvelteKit projects. Provides a single factory function that sets up a pg-boss instance with queue management, worker registration, schedule registration, orphan cleanup, and a dashboard data layer — so you can drop background jobs into any SvelteKit app without re-writing the boilerplate.
npm install @segbedji/sveltekit-pgboss
# or
bun add @segbedji/sveltekit-pgboss
import { createJobSystem, queue } from '@segbedji/sveltekit-pgboss';
const { send, initJobs } = createJobSystem({
connectionString: process.env.DATABASE_URL!,
queues: {
'send-email': queue<{ to: string; subject: string }>({ retryLimit: 3 }),
'generate-report': queue<{ type: string }>({ expireInSeconds: 3600 }),
},
schedules: [
{ queue: 'generate-report', cron: '0 8 * * 1' }, // every Monday 8 AM
],
});
// Initialize with handlers
await initJobs({
'send-email': async (data) => {
console.log('Sending email to', data.to);
},
'generate-report': async (data) => {
console.log('Generating report', data.type);
},
});
// Type-safe: queue name and payload are checked at compile time
await send({ name: 'send-email', data: { to: '[email protected]', subject: 'Hello' } });
// TS error: 'nope' is not a valid queue name
await send({ name: 'nope', data: {} });
// TS error: wrong payload shape for 'send-email'
await send({ name: 'send-email', data: { type: 'weekly' } });
createJobSystem(config)Returns { send, getBoss, stopBoss, initJobs, dashboard }.
| Field | Type | Default | Description |
|---|---|---|---|
connectionString |
string |
required | PostgreSQL connection string |
schema |
string |
'pgboss' |
pg-boss schema name |
queues |
Record<string, QueueConfig<T>> |
required | Queue definitions created with queue<T>() |
schedules |
ScheduleConfig[] |
[] |
Cron schedules |
cleanOrphans |
boolean |
true |
Fail orphaned active jobs on startup |
onError |
(err: Error) => void |
console.error |
Error handler |
queue<T>(config?)Creates a typed queue definition. The type parameter T defines the payload shape for send() and the handler.
import { queue } from '@segbedji/sveltekit-pgboss';
// With config
queue<{ to: string; subject: string }>({ retryLimit: 3, expireInSeconds: 3600 })
// Without config (defaults only)
queue<{ to: string }>()
| Field | Type | Default | Description |
|---|---|---|---|
batchSize |
number |
1 |
Jobs per batch |
expireInSeconds |
number |
pg-boss default | Job expiration |
retryLimit |
number |
pg-boss default | Max retries |
retryDelay |
number |
pg-boss default | Delay between retries (seconds) |
localConcurrency |
number |
pg-boss default | Max concurrent jobs per worker |
onFailed |
(opts: { data: T; error: unknown }) => Promise<void> |
— | Called when a job exhausts all retries |
| Property | Type | Description |
|---|---|---|
send |
(opts: { name, data, options? }) => Promise<string | null> |
Type-safe job sender — queue name and payload are validated at compile time |
getBoss |
() => Promise<PgBoss> |
Get the raw pg-boss instance (starts it on first call) |
stopBoss |
() => Promise<void> |
Graceful shutdown |
initJobs |
(handlers) => Promise<void> |
Initialize: clean orphans, create queues, register workers & schedules. Handlers are required for every queue. |
dashboard.getData({ page?, perPage? }) |
() => Promise<DashboardData> |
Queue stats + paginated jobs (default: page 1, 50 per page) |
dashboard.rerunJob({ queue, jobId }) |
() => Promise<{ queued: true }> |
Re-queue a job by ID |
dashboard.getStats() |
() => Promise<QueueStats[]> |
Queue stats only |
dashboard.getRecentJobs({ page?, perPage? }) |
() => Promise<{ jobs: JobInfo[], pagination: PaginationInfo }> |
Paginated jobs (default: page 1, 50 per page) |
Queue definitions and handlers live in separate files. This avoids circular imports when handlers need to call send.
// src/lib/server/jobs/system.ts — defines queues, exports send
import { createJobSystem, queue } from '@segbedji/sveltekit-pgboss';
const { send, getBoss, stopBoss, initJobs, dashboard } = createJobSystem({
connectionString: process.env.DATABASE_URL!,
queues: {
'send-email': queue<{ to: string; subject: string }>({ retryLimit: 3 }),
'generate-report': queue<{ type: string }>({
expireInSeconds: 3600,
retryLimit: 2,
}),
},
schedules: [
{ queue: 'generate-report', cron: '0 8 * * 1' },
],
});
export { send, getBoss, stopBoss, initJobs, dashboard };
// src/lib/server/jobs/index.ts — wires handlers
import { initJobs } from './system';
import { handleSendEmail } from './handlers/send-email';
import { handleGenerateReport } from './handlers/generate-report';
const init = () =>
initJobs({
'send-email': handleSendEmail,
'generate-report': handleGenerateReport,
});
export { init as initJobs };
Handlers can safely import send from system.ts without creating a circular dependency:
// src/lib/server/jobs/handlers/send-email.ts
import { send } from '../system';
const handleSendEmail = async (data: { to: string; subject: string }) => {
// ... can use send() to enqueue other jobs
};
export { handleSendEmail };
hooks.server.ts// src/hooks.server.ts
import { initJobs } from '$lib/server/jobs';
import { building } from '$app/environment';
export const init = async () => {
if (!building && process.env.ENABLE_WORKER === 'true') {
await initJobs();
}
};
Set ENABLE_WORKER=true on the process that should run workers. This lets you run workers in-process during development and in a separate container in production.
import { send } from '$lib/server/jobs/system';
// Type-safe: TS validates queue name and payload
await send({ name: 'send-email', data: { to: '[email protected]', subject: 'Hello' } });
Wrap the dashboard helpers in SvelteKit remote functions for your admin panel:
// src/lib/remote-functions/admin/jobs.remote.ts
import { command, query } from '$app/server';
import { dashboard } from '$lib/server/jobs/system';
import { z } from 'zod';
const getJobsDashboard = query(
z.object({ page: z.number().optional(), perPage: z.number().optional() }),
async ({ page, perPage }) => {
// Add your own auth check here
return dashboard.getData({ page, perPage });
}
);
const rerunJob = command(
z.object({ queue: z.string(), jobId: z.string() }),
async ({ queue, jobId }) => {
// Add your own auth check here
return dashboard.rerunJob({ queue, jobId });
}
);
export { getJobsDashboard, rerunJob };
A minimal admin page using the remote functions above:
<!-- src/routes/admin/jobs/+page.svelte -->
<script lang="ts">
import { getJobsDashboard, rerunJob } from '$lib/remote-functions/admin/jobs.remote';
let data = $state(getJobsDashboard());
const handleRerun = async (queue: string, jobId: string) => {
await rerunJob({ queue, jobId });
data = getJobsDashboard();
};
</script>
{#await data}
<p>Loading...</p>
{:then { queues, jobs, pagination }}
<h2>Queues</h2>
<table>
<thead>
<tr><th>Queue</th><th>Queued</th><th>Active</th><th>Deferred</th><th>Total</th></tr>
</thead>
<tbody>
{#each queues as q}
<tr>
<td>{q.name}</td>
<td>{q.queuedCount}</td>
<td>{q.activeCount}</td>
<td>{q.deferredCount}</td>
<td>{q.totalCount}</td>
</tr>
{/each}
</tbody>
</table>
<h2>Recent Jobs</h2>
<table>
<thead>
<tr><th>ID</th><th>Queue</th><th>State</th><th>Created</th><th>Actions</th></tr>
</thead>
<tbody>
{#each jobs as job}
<tr>
<td>{job.id.slice(0, 8)}</td>
<td>{job.name}</td>
<td>{job.state}</td>
<td>{new Date(job.createdOn).toLocaleString()}</td>
<td>
{#if job.state === 'failed' || job.state === 'completed'}
<button onclick={() => handleRerun(job.name, job.id)}>Rerun</button>
{/if}
</td>
</tr>
{/each}
</tbody>
</table>
{/await}
Run workers separately from your web server:
# docker-compose.yml
services:
web:
build: .
environment:
- ENABLE_WORKER=false
worker:
build: .
environment:
- ENABLE_WORKER=true
All types are exported:
import type {
JobSystemConfig,
PayloadMap,
HandlersMap,
QueueConfig,
ScheduleConfig,
QueueStats,
JobInfo,
PaginationInfo,
DashboardData,
} from '@segbedji/sveltekit-pgboss';
MIT