Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions plugins/replicator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Replicator Plugin

Pulls data from a configured **external data source** (e.g. a Postgres database
on Supabase) into the **internal Durable Object SQLite** so a StarbaseDB
instance can serve as a close-to-edge replica that can be queried instead of
hitting the upstream database directly.

Replication is a **pull** mechanism and is **append-only**: for each table you
register a monotonically increasing _tracking column_ (e.g. `id` or
`created_at`). On every sync the plugin pulls only the rows whose tracking
column is greater than the last value it has already seen, and upserts them
into the matching internal table with `INSERT OR REPLACE`.

## Requirements

An external data source must be configured (see the `EXTERNAL_DB_*` /
`HYPERDRIVE` settings in `wrangler.toml`). Without one the management endpoints
still work but `POST /replicator/sync` will report an error per table.

## Endpoints

All endpoints require an **admin** authorization token.

### `GET /replicator/tables`

Lists every table configured for replication, including the last synced
watermark (`last_value`) and `last_synced_at`.

### `POST /replicator/tables`

Registers (or updates) a table for replication.

```json
{
"table": "orders",
"schema": "public",
"trackingColumn": "id",
"intervalSeconds": 300,
"batchSize": 1000,
"isActive": true
}
```

- `table` (required) – name of the table in both the external and internal DB.
- `trackingColumn` (required) – append-only column used as the watermark.
- `schema` – optional schema name for the external table.
- `intervalSeconds` – how often the table should be polled (default `300`).
- `batchSize` – max rows pulled per sync (default `1000`).
- `isActive` – set to `false` to pause replication for the table.

### `DELETE /replicator/tables/:table`

Removes a table from replication. Existing internal data is left untouched.

### `POST /replicator/sync`

Triggers a sync immediately. Pass `?table=<name>` to sync a single table,
otherwise every active table is synced. Returns the number of rows replicated
per table. This endpoint can be driven by a Cloudflare Cron Trigger, the cron
plugin, or any external scheduler.

## Automatic polling

By default the plugin also syncs opportunistically: on incoming requests it
checks whether any table's `intervalSeconds` has elapsed since its last sync
and, if so, replicates it in the background (via `ctx.waitUntil`). Pass
`new ReplicatorPlugin({ autoSyncOnRequest: false })` in `src/index.ts` to rely
solely on the `/replicator/sync` endpoint instead.
247 changes: 247 additions & 0 deletions plugins/replicator/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import { ReplicatorPlugin, ReplicatedTable } from './index'
import { executeQuery } from '../../src/operation'
import { DataSource } from '../../src/types'

vi.mock('../../src/operation', () => ({
executeQuery: vi.fn(),
}))

const mockedExecuteQuery = vi.mocked(executeQuery)

let plugin: ReplicatorPlugin
let internalQuery: ReturnType<typeof vi.fn>
let dataSource: DataSource

function makeTable(overrides: Partial<ReplicatedTable> = {}): ReplicatedTable {
return {
table_name: 'orders',
source_schema: null,
tracking_column: 'id',
last_value: null,
interval_seconds: 300,
batch_size: 1000,
is_active: 1,
last_synced_at: null,
...overrides,
}
}

beforeEach(() => {
vi.clearAllMocks()
internalQuery = vi.fn().mockResolvedValue([])
dataSource = {
rpc: { executeQuery: internalQuery },
source: 'internal',
external: { dialect: 'postgresql' },
} as unknown as DataSource

plugin = new ReplicatorPlugin()
// Inject the data source the way the register() middleware would.
;(plugin as any).dataSource = dataSource
;(plugin as any).config = { role: 'admin' }
})

describe('ReplicatorPlugin - initialization', () => {
it('registers with the expected name and route prefix', () => {
expect(plugin.name).toBe('starbasedb:replicator')
expect(plugin.pathPrefix).toBe('/replicator')
expect(plugin.opts.requiresAuth).toBe(true)
})
})

describe('ReplicatorPlugin - quoteIdentifier', () => {
it('uses double quotes by default', () => {
expect(plugin.quoteIdentifier('orders')).toBe('"orders"')
})

it('uses backticks for mysql', () => {
expect(plugin.quoteIdentifier('orders', 'mysql')).toBe('`orders`')
})

it('escapes embedded quote characters', () => {
expect(plugin.quoteIdentifier('we"ird')).toBe('"we""ird"')
expect(plugin.quoteIdentifier('we`ird', 'mysql')).toBe('`we``ird`')
})
})

describe('ReplicatorPlugin - quoteLiteral', () => {
it('emits numbers bare', () => {
expect(plugin.quoteLiteral(42)).toBe('42')
})

it('single-quotes strings and escapes quotes', () => {
expect(plugin.quoteLiteral("O'Brien")).toBe("'O''Brien'")
})

it('renders null/undefined as NULL', () => {
expect(plugin.quoteLiteral(null)).toBe('NULL')
expect(plugin.quoteLiteral(undefined)).toBe('NULL')
})
})

describe('ReplicatorPlugin - buildSelectQuery', () => {
it('selects all rows when there is no watermark yet', () => {
const sql = plugin.buildSelectQuery(makeTable(), 'postgresql')
expect(sql).toBe('SELECT * FROM "orders" ORDER BY "id" ASC LIMIT 1000')
})

it('filters by the tracking column once a watermark exists', () => {
const sql = plugin.buildSelectQuery(
makeTable({ last_value: '100', batch_size: 50 }),
'postgresql'
)
expect(sql).toBe(
'SELECT * FROM "orders" WHERE "id" > \'100\' ORDER BY "id" ASC LIMIT 50'
)
})

it('qualifies the table with its schema when provided', () => {
const sql = plugin.buildSelectQuery(
makeTable({ source_schema: 'public' }),
'postgresql'
)
expect(sql).toBe(
'SELECT * FROM "public"."orders" ORDER BY "id" ASC LIMIT 1000'
)
})
})

describe('ReplicatorPlugin - buildUpsertQuery', () => {
it('builds a parameterized INSERT OR REPLACE statement', () => {
const { sql, params } = plugin.buildUpsertQuery('orders', {
id: 1,
name: 'Alice',
})
expect(sql).toBe(
'INSERT OR REPLACE INTO "orders" ("id", "name") VALUES (?, ?)'
)
expect(params).toEqual([1, 'Alice'])
})
})

describe('ReplicatorPlugin - isDue', () => {
it('is due when a table has never synced', () => {
expect(plugin.isDue(makeTable())).toBe(true)
})

it('is not due when the interval has not elapsed', () => {
const now = Date.now()
const table = makeTable({
interval_seconds: 300,
last_synced_at: new Date(now - 60_000)
.toISOString()
.replace('T', ' ')
.replace(/\.\d+Z$/, ''),
})
expect(plugin.isDue(table, now)).toBe(false)
})

it('is due once the interval has elapsed', () => {
const now = Date.now()
const table = makeTable({
interval_seconds: 60,
last_synced_at: new Date(now - 120_000)
.toISOString()
.replace('T', ' ')
.replace(/\.\d+Z$/, ''),
})
expect(plugin.isDue(table, now)).toBe(true)
})

it('is never due when the table is inactive', () => {
expect(plugin.isDue(makeTable({ is_active: 0 }))).toBe(false)
})
})

describe('ReplicatorPlugin - registerTable', () => {
it('upserts the table configuration with defaults', async () => {
await plugin.registerTable({
table: 'orders',
trackingColumn: 'id',
})

expect(internalQuery).toHaveBeenCalledTimes(1)
const call = internalQuery.mock.calls[0][0]
expect(call.params).toEqual(['orders', null, 'id', 300, 1000, 1])
})

it('honors provided options', async () => {
await plugin.registerTable({
table: 'orders',
schema: 'public',
trackingColumn: 'created_at',
intervalSeconds: 30,
batchSize: 10,
isActive: false,
})

const call = internalQuery.mock.calls[0][0]
expect(call.params).toEqual([
'orders',
'public',
'created_at',
30,
10,
0,
])
})
})

describe('ReplicatorPlugin - syncTable', () => {
it('pulls external rows, upserts them and advances the watermark', async () => {
mockedExecuteQuery.mockResolvedValue([
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
] as any)

const result = await plugin.syncTable(makeTable())

expect(result.rowsReplicated).toBe(2)
expect(result.lastValue).toBe('2')

// 2 upserts + 1 progress update on the internal database.
expect(internalQuery).toHaveBeenCalledTimes(3)
const upsert = internalQuery.mock.calls[0][0]
expect(upsert.sql).toContain('INSERT OR REPLACE INTO "orders"')
expect(upsert.params).toEqual([1, 'Alice'])

const progress = internalQuery.mock.calls[2][0]
expect(progress.params).toEqual(['2', 'orders'])
})

it('keeps the previous watermark when no new rows are returned', async () => {
mockedExecuteQuery.mockResolvedValue([] as any)

const result = await plugin.syncTable(makeTable({ last_value: '99' }))

expect(result.rowsReplicated).toBe(0)
expect(result.lastValue).toBe('99')
// Only the progress update runs.
expect(internalQuery).toHaveBeenCalledTimes(1)
expect(internalQuery.mock.calls[0][0].params).toEqual(['99', 'orders'])
})
})

describe('ReplicatorPlugin - sync', () => {
it('captures per-table errors instead of failing the whole run', async () => {
internalQuery.mockResolvedValueOnce([makeTable()])
mockedExecuteQuery.mockRejectedValue(new Error('connection refused'))

const results = await plugin.sync()

expect(results).toHaveLength(1)
expect(results[0].error).toBe('connection refused')
expect(results[0].rowsReplicated).toBe(0)
})

it('skips inactive tables when syncing everything', async () => {
internalQuery.mockResolvedValueOnce([
makeTable({ table_name: 'orders', is_active: 0 }),
])

const results = await plugin.sync()
expect(results).toHaveLength(0)
expect(mockedExecuteQuery).not.toHaveBeenCalled()
})
})
Loading
Loading