Reactive Graph Sync Engine for Go — real-time data synchronization between PostgreSQL and any frontend via normalized diffs.
Website | Quick Start | API Reference | TypeScript SDK
Inspired by ZeroCache, Replicache, and Electric SQL — built for Go backends with a TypeScript SDK for frontends.
table_diff (row data, broadcast) + view_diff (structure, per-client)go get github.com/FrankFMY/arcana
package main
import (
"context"
"net/http"
"github.com/FrankFMY/arcana"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
pool, _ := pgxpool.New(context.Background(), "postgres://...")
engine := arcana.New(arcana.Config{
Pool: arcana.PgxQuerier(pool),
Transport: arcana.NewCentrifugoTransport(arcana.CentrifugoConfig{
APIURL: "http://localhost:8000",
APIKey: "your-api-key",
}),
AuthFunc: func(r *http.Request) (*arcana.Identity, error) {
return &arcana.Identity{
SeanceID: r.Header.Get("X-Seance-ID"),
UserID: "user-1",
WorkspaceID: "workspace-1",
}, nil
},
})
engine.Register(UserList)
engine.Start(context.Background())
defer engine.Stop()
mux := http.NewServeMux()
mux.Handle("/arcana/", http.StripPrefix("/arcana", engine.Handler()))
http.ListenAndServe(":8080", mux)
}
A graph is a named, parameterized SQL query that produces normalized data. When the underlying data changes, Arcana re-runs the query and pushes only the diff to subscribed clients.
var UserList = arcana.GraphDef{
Key: "user_list",
// Typed parameters with validation
Params: arcana.ParamSchema{
"org_id": arcana.ParamUUID().Required(),
"limit": arcana.ParamInt().Default(50),
"role": arcana.ParamString().OneOf("admin", "member", "viewer").Build(),
},
// Declares which tables/columns this graph depends on.
// Changes to these columns trigger re-evaluation.
Deps: []arcana.TableDep{
{Table: "users", Columns: []string{"id", "name", "email", "role"}},
},
// Factory runs the SQL query and returns normalized refs + rows.
Factory: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.Result, error) {
wsID := arcana.WorkspaceID(ctx)
rows, err := q.Query(ctx,
`SELECT id, name, email, role FROM users
WHERE workspace_id = $1 AND org_id = $2 ORDER BY name LIMIT $3`,
wsID, p.UUID("org_id"), p.Int("limit"),
)
if err != nil {
return nil, err
}
defer rows.Close()
result := arcana.NewResult()
for rows.Next() {
var id, name, email, role string
rows.Scan(&id, &name, &email, &role)
// AddRow stores the normalized row data
result.AddRow("users", id, map[string]any{
"id": id, "name": name, "email": email, "role": role,
})
// AddRef defines the view structure (which rows belong to this view)
result.AddRef(arcana.Ref{
Table: "users", ID: id,
Fields: []string{"id", "name", "email", "role"},
})
}
return result, nil
},
}
Arcana maintains a 4-level normalized in-memory store: workspace -> table -> row_id -> fields. Multiple subscriptions can reference the same row — a RefCount GC automatically cleans up rows no longer referenced by any subscription.
When data changes, Arcana sends two types of messages:
| Stream | Channel | Content | Scope |
|---|---|---|---|
table_diff |
workspace:{id} |
Row field changes (JSON Patch) | Broadcast to all clients in workspace |
view_diff |
views:{seanceID} |
Refs structure changes + new row data | Per-client (seance) |
This separation means row data is sent once to the workspace (shared), while each client only receives structural changes relevant to their subscriptions.
Every subscription runs within an authenticated context:
type Identity struct {
SeanceID string // Unique client session ID
UserID string // Authenticated user
WorkspaceID string // Tenant/organization scope
Role string // User role
Permissions []string // Fine-grained permissions
}
Inside a factory, use context helpers:
Factory: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.Result, error) {
wsID := arcana.WorkspaceID(ctx) // Workspace isolation
user := arcana.User(ctx) // Full identity
if !user.HasPermission("read:users") {
return nil, arcana.ErrForbidden
}
// ...
}
Arcana supports three modes for detecting data changes:
Call engine.Notify() after database mutations. Simple, requires app instrumentation.
func (r *UserRepo) Update(ctx context.Context, id, name string) error {
_, err := r.pool.Exec(ctx, "UPDATE users SET name = $1 WHERE id = $2", name, id)
if err == nil {
engine.NotifyTable(ctx, "users", id, []string{"name"})
}
return err
}
Auto-generated triggers send change notifications via PostgreSQL's NOTIFY mechanism. Zero app instrumentation needed after setup.
// Generate and install triggers for all registered tables
stmts := arcana.GenerateTriggerSQL("arcana_changes", engine.Registry().RepTable())
for _, sql := range stmts {
pool.Exec(ctx, sql)
}
// Or use the convenience function:
arcana.EnsureTriggers(ctx, pool, "arcana_changes", engine.Registry().RepTable())
// Configure the engine to listen
engine := arcana.New(arcana.Config{
ChangeDetector: arcana.NewPGNotifyListener(arcana.PGNotifyConfig{
Conn: pgConn,
Channel: "arcana_changes",
}),
// ...
})
PGNotifyListener features:
Listens to PostgreSQL's logical replication stream (pgoutput plugin). Most reliable — captures all DML operations with zero app changes. Requires wal_level=logical.
engine := arcana.New(arcana.Config{
ChangeDetector: arcana.NewWALListener(arcana.WALConfig{
ConnString: "postgres://user:pass@localhost:5432/mydb",
SlotName: "arcana_slot", // default
Publication: "arcana_pub", // default
Tables: []string{"users", "orders"}, // or empty for all
StandbyTimeout: 10 * time.Second, // default
}),
// ...
})
WALListener features:
Note: You can combine modes. When using PGNotifyListener or WALListener as the ChangeDetector, explicit engine.Notify() calls still work — both paths feed into the same invalidation pipeline.
The Transport interface abstracts message delivery to clients:
type Transport interface {
SendToSeance(ctx context.Context, seanceID string, msg Message) error
SendToWorkspace(ctx context.Context, workspaceID string, msg Message) error
DisconnectSeance(ctx context.Context, seanceID string) error
}
transport := arcana.NewCentrifugoTransport(arcana.CentrifugoConfig{
APIURL: "http://localhost:8000", // Base URL (without /api — appended automatically)
APIKey: "your-centrifugo-api-key",
HTTPClient: &http.Client{Timeout: 5 * time.Second}, // optional
Retries: 3, // optional, default: 0
})
Important: The Centrifugo transport appends /api to the APIURL internally. Pass the base URL (e.g., http://localhost:8000), not http://localhost:8000/api.
Features:
/api/batchworkspace:{id} for table diffs, views:{seanceID} for view diffsImplement the Transport interface for any delivery mechanism (raw WebSocket, SSE, gRPC streams):
type MyTransport struct{}
func (t *MyTransport) SendToSeance(ctx context.Context, seanceID string, msg arcana.Message) error {
// Deliver msg to the specific client session
return nil
}
func (t *MyTransport) SendToWorkspace(ctx context.Context, wsID string, msg arcana.Message) error {
// Broadcast msg to all clients in the workspace
return nil
}
func (t *MyTransport) DisconnectSeance(ctx context.Context, seanceID string) error {
// Force-disconnect a client session
return nil
}
Mount the handler on your router:
mux.Handle("/arcana/", http.StripPrefix("/arcana", engine.Handler()))
| Method | Path | Description |
|---|---|---|
POST |
/subscribe |
Subscribe to a graph view |
POST |
/unsubscribe |
Unsubscribe from a view |
POST |
/sync |
Reconnect sync (catch-up or snapshot) |
GET |
/active |
List active subscriptions for the current seance |
GET |
/schema |
Representation table (all tables/columns across graphs) |
GET |
/health |
Health check with engine stats |
All endpoints require authentication via AuthFunc. Responses use the envelope format: {"ok": true, "data": {...}} or {"ok": false, "error": "..."}.
POST /subscribe
{
"view": "user_list",
"params": {"org_id": "550e8400-...", "limit": 20}
}
Response:
{
"ok": true,
"data": {
"params_hash": "a1b2c3d4",
"version": 1,
"refs": [
{"table": "users", "id": "u1", "fields": ["id", "name", "email"]}
],
"tables": {
"users": {
"u1": {"id": "u1", "name": "Alice", "email": "[email protected]"}
}
},
"total": 150
}
}
// "total" is included when the factory calls result.SetTotal(n).
// Useful for paginated views where clients need the full count.
When a client reconnects, it sends its last known version for each view. The engine responds with either catch-up patches or a full snapshot:
POST /sync
{
"views": [
{"view": "user_list", "params_hash": "a1b2c3d4", "version": 3}
]
}
Response (catch-up mode):
{
"ok": true,
"data": {
"views": [{
"view": "user_list",
"params_hash": "a1b2c3d4",
"mode": "catch_up",
"patches": [
{"version": 4, "refs_patch": [...], "tables": {...}},
{"version": 5, "refs_patch": [...], "tables": {...}}
]
}]
}
}
Response (snapshot mode — version gap too large):
{
"ok": true,
"data": {
"views": [{
"view": "user_list",
"params_hash": "a1b2c3d4",
"mode": "snapshot",
"version": 42,
"refs": [...],
"tables": {...}
}]
}
}
The SnapshotThreshold config controls when sync falls back to a full snapshot (default: 50 versions).
arcana.Config{
// Required
Pool: arcana.PgxQuerier(pool), // PostgreSQL connection pool
Transport: transport, // Message delivery
// Authentication
AuthFunc: func(r *http.Request) (*arcana.Identity, error) { ... },
// Change detection (nil = ExplicitNotifier)
ChangeDetector: nil,
// Tuning
InvalidationDebounce: 50 * time.Millisecond, // Batch changes within window
MaxSubscriptionsPerSeance: 100, // Per-client subscription limit
SnapshotThreshold: 50, // Version gap for full snapshot vs catch-up
GCInterval: time.Minute, // Unreferenced row cleanup interval
}
arcana.ParamSchema{
"org_id": arcana.ParamUUID().Required(), // Required UUID
"limit": arcana.ParamInt().Default(50), // Optional int with default
"status": arcana.ParamString().OneOf("active", "archived").Build(), // Enum
"verbose": arcana.ParamBool().Default(false), // Optional boolean
"min_price": arcana.ParamFloat().Build(), // Optional float
}
Factory: func(ctx context.Context, q arcana.Querier, p arcana.Params) (*arcana.Result, error) {
orgID := p.UUID("org_id") // string
limit := p.Int("limit") // int (50 if not provided)
status := p.String("status") // string
verbose := p.Bool("verbose") // bool
minPrice := p.Float("min_price") // float64
raw := p.Raw() // map[string]any
// ...
}
resolved, err := arcana.ValidateParams(schema, rawInput)
// strict mode rejects unknown parameters:
resolved, err := arcana.ValidateParams(schema, rawInput, true)
result := arcana.NewResult()
// Add a normalized row (table, row_id, fields)
result.AddRow("users", "u1", map[string]any{
"id": "u1", "name": "Alice", "email": "[email protected]",
})
// Add a ref (defines view structure — which rows belong to this view)
result.AddRef(arcana.Ref{
Table: "users", ID: "u1",
Fields: []string{"id", "name", "email"},
})
// Nested refs for hierarchical data
result.AddRef(arcana.Ref{
Table: "orders", ID: "o1",
Fields: []string{"id", "total"},
Nested: map[string]arcana.Ref{
"customer": {Table: "users", ID: "u1", Fields: []string{"id", "name"}},
},
})
// Pagination: set total count for paginated results
result.SetTotal(150) // total matching rows (before LIMIT/OFFSET)
// Inspect
result.RowCount() // number of rows added
result.Refs() // all refs
result.Tables() // map[table]map[rowID]map[field]any
result.Total() // total count set via SetTotal (0 if not set)
arcana.ErrForbidden // 403 — permission denied
arcana.ErrNotFound // 404 — graph or subscription not found
arcana.ErrInvalidParams // 400 — parameter validation failed
arcana.ErrTooManySubscriptions // 429 — exceeded MaxSubscriptionsPerSeance
arcana.ErrAlreadyStarted // engine.Start() called twice
arcana.ErrNotStarted // engine.Stop() called before Start()
stats := engine.Stats()
// EngineStats{
// Running: true,
// RegisteredGraphs: 5,
// ActiveSubscriptions: 42,
// SeancesWithSubs: 12,
// DataStoreRows: 350,
// }
Generate type-safe TypeScript definitions from your Go graph registry:
go run ./cmd/arcana-gen -output ./sdk/generated/
Produces tables.d.ts (all table schemas) and views.d.ts (graph parameters and dependencies).
Arcana uses a Querier interface compatible with any SQL driver:
type Querier interface {
Query(ctx context.Context, sql string, args ...any) (Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) Row
}
For pgx v5:
pool, _ := pgxpool.New(ctx, connStr)
querier := arcana.PgxQuerier(pool)
arcana.go Engine: New, Start, Stop, Notify, Handler, Stats
config.go Configuration with sensible defaults
types.go Core types: GraphDef, Ref, PatchOp, Params, ParamSchema, Identity
result.go Factory result accumulator (AddRow, AddRef, SetTotal)
errors.go Exported error values
context.go Identity context helpers (WithIdentity, WorkspaceID, User)
registry.go Graph registry with inverted table->graph index
store.go 4-level normalized DataStore with RefCount GC
diff.go JSON Patch (RFC 6902) diff engine
invalidator.go Change -> Factory re-run -> diff -> transport pipeline
manager.go Subscription lifecycle, sync (catch-up/snapshot)
subscription.go Subscription with version history ring buffer
handler.go HTTP endpoints (/subscribe, /unsubscribe, /sync, /active, /schema, /health)
middleware.go Auth middleware for HTTP handler
transport.go Transport interface
transport_centrifugo.go Centrifugo HTTP API (publish, batch, retry with backoff)
change.go ChangeDetector interface
change_explicit.go Explicit notify (default, channel-based)
change_pgnotify.go PostgreSQL LISTEN/NOTIFY with auto-reconnect
change_wal.go PostgreSQL WAL logical replication (pgoutput)
pgnotify_triggers.go Auto-generate PostgreSQL trigger functions
pgx_adapter.go pgxpool.Pool -> Querier adapter
codegen.go TypeScript type generation
sdk/ TypeScript client SDK with Svelte 5 adapter
cmd/arcana-gen/ Codegen CLI tool
examples/basic/ Minimal working example
# Unit tests (fast, no Docker)
go test -short ./...
# Full suite including integration tests (requires Docker for testcontainers)
go test -race ./... -count=1
# Integration tests only
go test -run Integration -v -timeout 120s
See CONTRIBUTING.md for guidelines.
Artem Pryanishnikov