diff --git a/.changeset/state-libsql.md b/.changeset/state-libsql.md new file mode 100644 index 00000000..ef6454ea --- /dev/null +++ b/.changeset/state-libsql.md @@ -0,0 +1,5 @@ +--- +"@chat-adapter/state-libsql": minor +--- + +Add `@chat-adapter/state-libsql`: libSQL / Turso state adapter backed by `@libsql/client`. Supports both local SQLite files (`file:`) and remote libSQL / Turso servers (`libsql:`/`http(s):`/`ws(s):`) with the same API. diff --git a/knip.json b/knip.json index eab64222..7b38df93 100644 --- a/knip.json +++ b/knip.json @@ -6,5 +6,10 @@ "rules": { "duplicates": "off", "types": "off" + }, + "workspaces": { + "packages/state-libsql": { + "ignoreDependencies": ["libsql", "@libsql/client"] + } } } diff --git a/packages/integration-tests/src/documentation-test-utils.ts b/packages/integration-tests/src/documentation-test-utils.ts index 40409a72..5a1290f6 100644 --- a/packages/integration-tests/src/documentation-test-utils.ts +++ b/packages/integration-tests/src/documentation-test-utils.ts @@ -23,11 +23,13 @@ export const VALID_PACKAGE_README_IMPORTS = [ "@chat-adapter/state-ioredis", "@chat-adapter/state-pg", "@chat-adapter/state-memory", + "@chat-adapter/state-libsql", "next/server", "redis", "ioredis", "pg", "postgres", + "@libsql/client", ]; export const VALID_DOC_PACKAGES = [ diff --git a/packages/state-libsql/CHANGELOG.md b/packages/state-libsql/CHANGELOG.md new file mode 100644 index 00000000..fc8621b5 --- /dev/null +++ b/packages/state-libsql/CHANGELOG.md @@ -0,0 +1 @@ +# @chat-adapter/state-libsql diff --git a/packages/state-libsql/README.md b/packages/state-libsql/README.md new file mode 100644 index 00000000..64016311 --- /dev/null +++ b/packages/state-libsql/README.md @@ -0,0 +1,175 @@ +# @chat-adapter/state-libsql + +[![npm version](https://img.shields.io/npm/v/@chat-adapter/state-libsql)](https://www.npmjs.com/package/@chat-adapter/state-libsql) +[![npm downloads](https://img.shields.io/npm/dm/@chat-adapter/state-libsql)](https://www.npmjs.com/package/@chat-adapter/state-libsql) + +libSQL / Turso state adapter for [Chat SDK](https://chat-sdk.dev). Ships two variants — import the one that matches your runtime. + +| Import path | Driver | Best for | +|---|---|---| +| `@chat-adapter/state-libsql` | [`libsql`](https://www.npmjs.com/package/libsql) (native binding) | Node. Fast local file access, also supports remote libSQL / Turso. | +| `@chat-adapter/state-libsql/client` | [`@libsql/client`](https://www.npmjs.com/package/@libsql/client) (pure JS) | Edge / serverless (Vercel) where native modules aren't available. | + +Both entry points expose the same chat-sdk `StateAdapter` surface with `createLibSqlState` and `LibSqlStateAdapter`. + +## Installation + +Install the package plus **one** of the two drivers: + +```bash +# Node / local-file primary +pnpm add @chat-adapter/state-libsql libsql + +# edge / Turso-primary +pnpm add @chat-adapter/state-libsql @libsql/client +``` + +Both drivers are declared `optional` peer dependencies — pnpm / npm won't complain if you only install one. + +## Usage + +### Node + local file + +```typescript +import { Chat } from "chat"; +import { createLibSqlState } from "@chat-adapter/state-libsql"; + +const bot = new Chat({ + userName: "mybot", + adapters: { /* ... */ }, + state: createLibSqlState({ url: "file:./chat-state.db" }), +}); +``` + +### Edge / serverless + Turso + +```typescript +import { createLibSqlState } from "@chat-adapter/state-libsql/client"; + +const state = createLibSqlState({ + url: "libsql://your-db.turso.io", + authToken: process.env.TURSO_AUTH_TOKEN, +}); +``` + +### Auto-detect via env vars + +Both variants read `TURSO_DATABASE_URL` and `TURSO_AUTH_TOKEN` if no options are provided: + +```typescript +const state = createLibSqlState(); // uses TURSO_DATABASE_URL / TURSO_AUTH_TOKEN +``` + +### Injecting your own client + +Native: + +```typescript +import Database from "libsql/promise"; +import { createLibSqlState } from "@chat-adapter/state-libsql"; + +const db = new Database("file:./chat-state.db", {}); +const state = createLibSqlState({ client: db }); +``` + +`@libsql/client`: + +```typescript +import { createClient } from "@libsql/client"; +import { createLibSqlState } from "@chat-adapter/state-libsql/client"; + +const client = createClient({ + url: process.env.TURSO_DATABASE_URL!, + authToken: process.env.TURSO_AUTH_TOKEN, +}); +const state = createLibSqlState({ client }); +``` + +## Configuration + +Both entry points accept the same core options: + +| Option | Required | Description | +|--------|----------|-------------| +| `url` | No* | libSQL connection URL / path | +| `authToken` | No | Auth token for remote libSQL / Turso | +| `client` | No | Existing driver client instance | +| `keyPrefix` | No | Prefix for all state rows (default: `"chat-sdk"`) | +| `logger` | No | Logger instance (defaults to `ConsoleLogger("info").child("libsql")`) | + +*Either `url`, the `TURSO_DATABASE_URL` env var, or `client` is required. + +The default entry additionally accepts `syncUrl`, `syncPeriod`, `encryptionKey`, `offline`, `timeout`. +The `/client` entry additionally accepts a `config` pass-through for `@libsql/client` (`encryptionKey`, `syncUrl`, `intMode`, `tls`, …). + +### URL schemes + +| Scheme | Default (`libsql`) | `/client` (`@libsql/client`) | +|--------|:------------------:|:----------------------------:| +| `file:...` | ✅ | ✅ | +| `:memory:` | ✅ | — | +| `libsql:...` | ✅ | ✅ | +| `http(s)://...` | ✅ | ✅ | +| `ws(s)://...` | — | ✅ | + +Always prefix local paths with `file:` — both drivers accept it, and it keeps your config portable if you later switch entry points. + +## Environment variables + +```bash +# Local file (works with both entries) +TURSO_DATABASE_URL=file:./chat-state.db + +# or remote libSQL / Turso +TURSO_DATABASE_URL=libsql://your-db.turso.io +TURSO_AUTH_TOKEN=your-token +``` + +## Data model + +The adapter creates these tables automatically on `connect()`: + +``` +chat_state_subscriptions +chat_state_locks +chat_state_cache +chat_state_lists +chat_state_queues +``` + +All rows are namespaced by `key_prefix`. Timestamps are stored as millisecond integers. + +## Features + +| Feature | Supported | +|---------|-----------| +| Persistence | Yes | +| Multi-instance | Yes (remote mode) | +| Subscriptions | Yes | +| Distributed locking | Yes | +| Key-value caching | Yes (with TTL) | +| Message queue | Yes | +| Ordered lists | Yes | +| Automatic table creation | Yes | +| Key prefix namespacing | Yes | + +## Locking considerations + +Lock acquisition runs inside a write transaction that clears any expired lock and then performs `INSERT ... ON CONFLICT DO NOTHING RETURNING`. This gives atomic compare-and-set semantics against both local SQLite files and remote libSQL / Turso servers. + +For multi-instance deployments, use a remote libSQL / Turso URL — a local file database only coordinates processes on the same host. + +## Expired row cleanup + +SQLite does not expire rows automatically. The adapter performs opportunistic cleanup on every relevant operation (`get`, `getList`, `dequeue`, lock acquisition). For long-running deployments you may want to run a periodic cleanup: + +```sql +DELETE FROM chat_state_locks WHERE expires_at <= strftime('%s','now') * 1000; +DELETE FROM chat_state_cache WHERE expires_at IS NOT NULL AND expires_at <= strftime('%s','now') * 1000; +DELETE FROM chat_state_queues WHERE expires_at <= strftime('%s','now') * 1000; +DELETE FROM chat_state_lists WHERE expires_at IS NOT NULL AND expires_at <= strftime('%s','now') * 1000; +``` + +## License + +MIT diff --git a/packages/state-libsql/package.json b/packages/state-libsql/package.json new file mode 100644 index 00000000..0e9e13e0 --- /dev/null +++ b/packages/state-libsql/package.json @@ -0,0 +1,75 @@ +{ + "name": "@chat-adapter/state-libsql", + "version": "0.0.0", + "description": "libSQL / Turso state adapter for chat (local file or remote)", + "type": "module", + "main": "./dist/index.js", + "module": "./dist/index.js", + "types": "./dist/index.d.ts", + "sideEffects": false, + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + }, + "./client": { + "types": "./dist/client.d.ts", + "import": "./dist/client.js" + } + }, + "files": [ + "dist" + ], + "scripts": { + "build": "tsup", + "dev": "tsup --watch", + "test": "vitest run --coverage", + "test:watch": "vitest", + "typecheck": "tsc --noEmit", + "clean": "rm -rf dist" + }, + "dependencies": { + "chat": "workspace:*" + }, + "peerDependencies": { + "@libsql/client": ">=0.15.0", + "libsql": ">=0.5.0" + }, + "peerDependenciesMeta": { + "@libsql/client": { + "optional": true + }, + "libsql": { + "optional": true + } + }, + "repository": { + "type": "git", + "url": "git+https://github.com/vercel/chat.git", + "directory": "packages/state-libsql" + }, + "homepage": "https://github.com/vercel/chat#readme", + "bugs": { + "url": "https://github.com/vercel/chat/issues" + }, + "publishConfig": { + "access": "public" + }, + "devDependencies": { + "@libsql/client": "^0.15.15", + "@types/node": "^25.3.2", + "@vitest/coverage-v8": "^4.0.18", + "libsql": "^0.5.29", + "tsup": "^8.3.5", + "typescript": "^5.7.2", + "vitest": "^4.0.18" + }, + "keywords": [ + "chat", + "state", + "libsql", + "turso", + "sqlite" + ], + "license": "MIT" +} diff --git a/packages/state-libsql/src/client.test.ts b/packages/state-libsql/src/client.test.ts new file mode 100644 index 00000000..b0bf205c --- /dev/null +++ b/packages/state-libsql/src/client.test.ts @@ -0,0 +1,516 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { type Client, createClient } from "@libsql/client"; +import type { Lock, Logger } from "chat"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { createLibSqlState, LibSqlStateAdapter } from "./client"; + +const mockLogger: Logger = { + child: vi.fn(() => mockLogger), + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +const LIBSQL_TOKEN_RE = /^libsql_/; + +function tmpFileUrl(): { url: string; cleanup: () => void } { + const dir = mkdtempSync(join(tmpdir(), "chat-libsql-client-")); + const file = join(dir, "state.db"); + return { + url: `file:${file}`, + cleanup: () => rmSync(dir, { recursive: true, force: true }), + }; +} + +function makeTmpClient(): { client: Client; cleanup: () => void } { + const { url, cleanup } = tmpFileUrl(); + const client = createClient({ url }); + return { + client, + cleanup: () => { + if (!client.closed) { + client.close(); + } + cleanup(); + }, + }; +} + +describe("LibSqlStateAdapter (@libsql/client)", () => { + it("exports createLibSqlState", () => { + expect(typeof createLibSqlState).toBe("function"); + }); + + it("exports LibSqlStateAdapter", () => { + expect(typeof LibSqlStateAdapter).toBe("function"); + }); + + describe("createLibSqlState", () => { + it("creates an adapter from a file URL", () => { + const { url, cleanup } = tmpFileUrl(); + try { + const adapter = createLibSqlState({ url, logger: mockLogger }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + } finally { + cleanup(); + } + }); + + it("accepts an existing Client", () => { + const { client, cleanup } = makeTmpClient(); + try { + const adapter = createLibSqlState({ client, logger: mockLogger }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + } finally { + cleanup(); + } + }); + + it("throws when no url or TURSO_DATABASE_URL is available", () => { + vi.stubEnv("TURSO_DATABASE_URL", ""); + try { + expect(() => createLibSqlState({ logger: mockLogger })).toThrow( + "libSQL url is required" + ); + } finally { + vi.unstubAllEnvs(); + } + }); + + it("uses TURSO_DATABASE_URL env var as fallback", () => { + const { url, cleanup } = tmpFileUrl(); + vi.stubEnv("TURSO_DATABASE_URL", url); + try { + const adapter = createLibSqlState({ logger: mockLogger }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + } finally { + vi.unstubAllEnvs(); + cleanup(); + } + }); + + describe("URL types", () => { + it("connects to a file: URL", async () => { + const { url, cleanup } = tmpFileUrl(); + try { + const adapter = createLibSqlState({ url, logger: mockLogger }); + await adapter.connect(); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.disconnect(); + } finally { + cleanup(); + } + }); + + it.each([ + ["libsql://db.turso.io"], + ["https://db.turso.io"], + ["http://127.0.0.1:8080"], + ["wss://db.turso.io"], + ["ws://127.0.0.1:8080"], + ])("accepts remote URL %s", (url) => { + const adapter = createLibSqlState({ + url, + authToken: "tok", + logger: mockLogger, + }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + }); + }); + }); + + describe("ensureConnected", () => { + let client: Client; + let cleanup: () => void; + + beforeEach(() => { + ({ client, cleanup } = makeTmpClient()); + }); + + afterEach(() => { + cleanup(); + }); + + it.each([ + ["subscribe", (a: LibSqlStateAdapter) => a.subscribe("t1")], + ["unsubscribe", (a: LibSqlStateAdapter) => a.unsubscribe("t1")], + ["isSubscribed", (a: LibSqlStateAdapter) => a.isSubscribed("t1")], + ["acquireLock", (a: LibSqlStateAdapter) => a.acquireLock("t1", 5000)], + ["get", (a: LibSqlStateAdapter) => a.get("key")], + ["set", (a: LibSqlStateAdapter) => a.set("key", "value")], + [ + "setIfNotExists", + (a: LibSqlStateAdapter) => a.setIfNotExists("key", "value"), + ], + ["delete", (a: LibSqlStateAdapter) => a.delete("key")], + [ + "appendToList", + (a: LibSqlStateAdapter) => a.appendToList("list", "value"), + ], + ["getList", (a: LibSqlStateAdapter) => a.getList("list")], + [ + "enqueue", + (a: LibSqlStateAdapter) => + a.enqueue( + "t1", + { + message: { id: "m1" }, + enqueuedAt: 0, + expiresAt: 1, + } as never, + 10 + ), + ], + ["dequeue", (a: LibSqlStateAdapter) => a.dequeue("t1")], + ["queueDepth", (a: LibSqlStateAdapter) => a.queueDepth("t1")], + ])("throws when calling %s before connect", async (_, fn) => { + const adapter = new LibSqlStateAdapter({ client, logger: mockLogger }); + await expect(fn(adapter)).rejects.toThrow("not connected"); + }); + + it("throws for releaseLock before connect", async () => { + const adapter = new LibSqlStateAdapter({ client, logger: mockLogger }); + const lock: Lock = { + threadId: "t1", + token: "tok", + expiresAt: Date.now(), + }; + await expect(adapter.releaseLock(lock)).rejects.toThrow("not connected"); + }); + + it("throws for extendLock before connect", async () => { + const adapter = new LibSqlStateAdapter({ client, logger: mockLogger }); + const lock: Lock = { + threadId: "t1", + token: "tok", + expiresAt: Date.now(), + }; + await expect(adapter.extendLock(lock, 5000)).rejects.toThrow( + "not connected" + ); + }); + + it("throws for forceReleaseLock before connect", async () => { + const adapter = new LibSqlStateAdapter({ client, logger: mockLogger }); + await expect(adapter.forceReleaseLock("t1")).rejects.toThrow( + "not connected" + ); + }); + }); + + describe("with a real libsql file client", () => { + let client: Client; + let cleanupClient: () => void; + let adapter: LibSqlStateAdapter; + + beforeEach(async () => { + ({ client, cleanup: cleanupClient } = makeTmpClient()); + adapter = new LibSqlStateAdapter({ client, logger: mockLogger }); + await adapter.connect(); + }); + + afterEach(async () => { + await adapter.disconnect(); + cleanupClient(); + }); + + describe("connect / disconnect", () => { + it("is idempotent on connect", async () => { + await adapter.connect(); + await adapter.connect(); + }); + + it("deduplicates concurrent connect calls", async () => { + const { client: c, cleanup } = makeTmpClient(); + try { + const a = new LibSqlStateAdapter({ client: c, logger: mockLogger }); + await Promise.all([a.connect(), a.connect()]); + await a.disconnect(); + } finally { + cleanup(); + } + }); + + it("is idempotent on disconnect", async () => { + await adapter.disconnect(); + await adapter.disconnect(); + await adapter.connect(); + }); + + it("does not close external client on disconnect", async () => { + await adapter.disconnect(); + const rs = await client.execute("SELECT 1 AS v"); + expect(rs.rows[0].v).toBe(1); + await adapter.connect(); + }); + + it("closes owned client on disconnect", async () => { + const { url, cleanup } = tmpFileUrl(); + try { + const a = createLibSqlState({ url, logger: mockLogger }); + await a.connect(); + const innerClient = a.getClient(); + await a.disconnect(); + expect(innerClient.closed).toBe(true); + } finally { + cleanup(); + } + }); + + it("handles connect failure and allows retry", async () => { + const { client: broken, cleanup } = makeTmpClient(); + try { + broken.close(); + const a = new LibSqlStateAdapter({ + client: broken, + logger: mockLogger, + }); + await expect(a.connect()).rejects.toThrow(); + expect(mockLogger.error).toHaveBeenCalled(); + await expect(a.connect()).rejects.toThrow(); + } finally { + cleanup(); + } + }); + }); + + describe("subscriptions", () => { + it("round-trips subscribe / isSubscribed / unsubscribe", async () => { + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(false); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.unsubscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(false); + }); + + it("isolates subscriptions by keyPrefix", async () => { + const other = new LibSqlStateAdapter({ + client, + keyPrefix: "other", + logger: mockLogger, + }); + await other.connect(); + await adapter.subscribe("t1"); + expect(await other.isSubscribed("t1")).toBe(false); + await other.disconnect(); + }); + }); + + describe("locking", () => { + it("acquires a lock with a token and expiry", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + expect(lock?.threadId).toBe("t1"); + expect(lock?.token).toMatch(LIBSQL_TOKEN_RE); + expect(lock?.expiresAt).toBeGreaterThan(Date.now()); + }); + + it("returns null when the lock is held", async () => { + const first = await adapter.acquireLock("t1", 5000); + expect(first).not.toBeNull(); + const second = await adapter.acquireLock("t1", 5000); + expect(second).toBeNull(); + }); + + it("allows reacquiring an expired lock", async () => { + const first = await adapter.acquireLock("t1", 1); + expect(first).not.toBeNull(); + await new Promise((r) => setTimeout(r, 10)); + const second = await adapter.acquireLock("t1", 5000); + expect(second).not.toBeNull(); + expect(second?.token).not.toBe(first?.token); + }); + + it("releases a lock only with the right token", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + await adapter.releaseLock({ + threadId: "t1", + token: "wrong", + expiresAt: lock?.expiresAt ?? 0, + }); + expect(await adapter.acquireLock("t1", 5000)).toBeNull(); + if (lock) { + await adapter.releaseLock(lock); + } + expect(await adapter.acquireLock("t1", 5000)).not.toBeNull(); + }); + + it("extends a lock when the token matches", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + if (!lock) { + return; + } + const extended = await adapter.extendLock(lock, 10_000); + expect(extended).toBe(true); + }); + + it("returns false when extending with the wrong token", async () => { + await adapter.acquireLock("t1", 5000); + const extended = await adapter.extendLock( + { threadId: "t1", token: "nope", expiresAt: Date.now() + 5000 }, + 5000 + ); + expect(extended).toBe(false); + }); + + it("force-releases a lock without checking token", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + await adapter.forceReleaseLock("t1"); + expect(await adapter.acquireLock("t1", 5000)).not.toBeNull(); + }); + }); + + describe("cache", () => { + it("round-trips JSON values", async () => { + await adapter.set("key", { foo: "bar" }); + expect(await adapter.get("key")).toEqual({ foo: "bar" }); + }); + + it("returns null on miss", async () => { + expect(await adapter.get("missing")).toBeNull(); + }); + + it("respects TTL", async () => { + await adapter.set("key", "value", 1); + await new Promise((r) => setTimeout(r, 10)); + expect(await adapter.get("key")).toBeNull(); + }); + + it("setIfNotExists inserts only when absent", async () => { + expect(await adapter.setIfNotExists("key", "first")).toBe(true); + expect(await adapter.setIfNotExists("key", "second")).toBe(false); + expect(await adapter.get("key")).toBe("first"); + }); + + it("setIfNotExists succeeds after TTL expiry", async () => { + expect(await adapter.setIfNotExists("key", "first", 1)).toBe(true); + await new Promise((r) => setTimeout(r, 10)); + expect(await adapter.setIfNotExists("key", "second")).toBe(true); + expect(await adapter.get("key")).toBe("second"); + }); + + it("delete removes a value", async () => { + await adapter.set("key", "value"); + await adapter.delete("key"); + expect(await adapter.get("key")).toBeNull(); + }); + }); + + describe("lists", () => { + it("appends values and returns them in insertion order", async () => { + await adapter.appendToList("mylist", { id: 1 }); + await adapter.appendToList("mylist", { id: 2 }); + await adapter.appendToList("mylist", { id: 3 }); + expect(await adapter.getList("mylist")).toEqual([ + { id: 1 }, + { id: 2 }, + { id: 3 }, + ]); + }); + + it("trims to maxLength, keeping newest", async () => { + for (let i = 1; i <= 5; i++) { + await adapter.appendToList("mylist", { id: i }, { maxLength: 3 }); + } + expect(await adapter.getList("mylist")).toEqual([ + { id: 3 }, + { id: 4 }, + { id: 5 }, + ]); + }); + + it("expires the whole list when TTL passes", async () => { + await adapter.appendToList("mylist", { id: 1 }, { ttlMs: 1 }); + await new Promise((r) => setTimeout(r, 10)); + expect(await adapter.getList("mylist")).toEqual([]); + }); + + it("returns empty for unknown keys", async () => { + expect(await adapter.getList("nope")).toEqual([]); + }); + }); + + describe("queue", () => { + const makeEntry = (id: string, offsetMs = 90_000) => ({ + message: { id }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + offsetMs, + }); + + it("enqueues and dequeues in FIFO order", async () => { + await adapter.enqueue("t1", makeEntry("m1") as never, 10); + await adapter.enqueue("t1", makeEntry("m2") as never, 10); + await adapter.enqueue("t1", makeEntry("m3") as never, 10); + + const a = await adapter.dequeue("t1"); + const b = await adapter.dequeue("t1"); + const c = await adapter.dequeue("t1"); + const d = await adapter.dequeue("t1"); + + expect(a?.message.id).toBe("m1"); + expect(b?.message.id).toBe("m2"); + expect(c?.message.id).toBe("m3"); + expect(d).toBeNull(); + }); + + it("returns current depth from enqueue", async () => { + const d1 = await adapter.enqueue("t1", makeEntry("m1") as never, 10); + const d2 = await adapter.enqueue("t1", makeEntry("m2") as never, 10); + expect(d1).toBe(1); + expect(d2).toBe(2); + }); + + it("trims to maxSize, keeping newest entries", async () => { + for (let i = 1; i <= 5; i++) { + await adapter.enqueue("t1", makeEntry(`m${i}`) as never, 3); + } + expect(await adapter.queueDepth("t1")).toBe(3); + const entries: (string | undefined)[] = []; + let next = await adapter.dequeue("t1"); + while (next) { + entries.push(next.message.id as string); + next = await adapter.dequeue("t1"); + } + expect(entries).toEqual(["m3", "m4", "m5"]); + }); + + it("drops expired entries", async () => { + await adapter.enqueue("t1", makeEntry("old", 1) as never, 10); + await new Promise((r) => setTimeout(r, 10)); + await adapter.enqueue("t1", makeEntry("fresh") as never, 10); + const entry = await adapter.dequeue("t1"); + expect(entry?.message.id).toBe("fresh"); + }); + + it("queueDepth returns 0 for empty queues", async () => { + expect(await adapter.queueDepth("nobody")).toBe(0); + }); + }); + + describe("getClient", () => { + it("returns the underlying libsql Client", () => { + expect(adapter.getClient()).toBe(client); + }); + }); + }); + + describe.skip("integration tests against TURSO_DATABASE_URL", () => { + it("connects to the configured remote", async () => { + const adapter = createLibSqlState({ logger: mockLogger }); + await adapter.connect(); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.unsubscribe("slack:C1:1.2"); + await adapter.disconnect(); + }); + }); +}); diff --git a/packages/state-libsql/src/client.ts b/packages/state-libsql/src/client.ts new file mode 100644 index 00000000..095ee8d6 --- /dev/null +++ b/packages/state-libsql/src/client.ts @@ -0,0 +1,619 @@ +import { type Client, type Config, createClient } from "@libsql/client"; +import type { Lock, Logger, QueueEntry, StateAdapter } from "chat"; +import { ConsoleLogger } from "chat"; + +export interface LibSqlStateAdapterOptions { + /** Auth token for remote libSQL / Turso connections. */ + authToken?: string; + /** Additional `@libsql/client` options (encryption, sync, tls, intMode, …). */ + config?: Omit; + /** Key prefix for all rows (default: "chat-sdk") */ + keyPrefix?: string; + /** Logger instance for error reporting */ + logger?: Logger; + /** libSQL connection URL. Supports `file:`, `libsql:`, `http(s):`, `ws(s):`. */ + url: string; +} + +export interface LibSqlStateClientOptions { + /** Existing `@libsql/client` Client instance. */ + client: Client; + /** Key prefix for all rows (default: "chat-sdk") */ + keyPrefix?: string; + /** Logger instance for error reporting */ + logger?: Logger; +} + +export type CreateLibSqlStateOptions = + | (Partial & { client?: never }) + | (Partial> & { + client: Client; + }); + +/** + * libSQL state adapter backed by `@libsql/client`. + * + * Use this when you want a pure-JS driver that tree-shakes cleanly in edge / + * serverless environments (Vercel Edge, Cloudflare Workers). For local Node + * with the native binding, import from `@chat-adapter/state-libsql` instead. + */ +export class LibSqlStateAdapter implements StateAdapter { + private readonly client: Client; + private readonly keyPrefix: string; + private readonly logger: Logger; + private readonly ownsClient: boolean; + private connected = false; + private connectPromise: Promise | null = null; + + constructor(options: LibSqlStateAdapterOptions | LibSqlStateClientOptions) { + if ("client" in options) { + this.client = options.client; + this.ownsClient = false; + } else { + this.client = createClient({ + ...options.config, + url: options.url, + authToken: options.authToken, + }); + this.ownsClient = true; + } + + this.keyPrefix = options.keyPrefix || "chat-sdk"; + this.logger = options.logger ?? new ConsoleLogger("info").child("libsql"); + } + + async connect(): Promise { + if (this.connected) { + return; + } + + if (!this.connectPromise) { + this.connectPromise = (async () => { + try { + await this.client.execute("SELECT 1"); + await this.ensureSchema(); + this.connected = true; + } catch (error) { + this.connectPromise = null; + this.logger.error("libSQL connect failed", { error }); + throw error; + } + })(); + } + + await this.connectPromise; + } + + async disconnect(): Promise { + if (!this.connected) { + return; + } + + if (this.ownsClient) { + this.client.close(); + } + + this.connected = false; + this.connectPromise = null; + } + + async subscribe(threadId: string): Promise { + this.ensureConnected(); + + await this.client.execute({ + sql: `INSERT INTO chat_state_subscriptions (key_prefix, thread_id) + VALUES (?, ?) + ON CONFLICT DO NOTHING`, + args: [this.keyPrefix, threadId], + }); + } + + async unsubscribe(threadId: string): Promise { + this.ensureConnected(); + + await this.client.execute({ + sql: `DELETE FROM chat_state_subscriptions + WHERE key_prefix = ? AND thread_id = ?`, + args: [this.keyPrefix, threadId], + }); + } + + async isSubscribed(threadId: string): Promise { + this.ensureConnected(); + + const result = await this.client.execute({ + sql: `SELECT 1 FROM chat_state_subscriptions + WHERE key_prefix = ? AND thread_id = ? + LIMIT 1`, + args: [this.keyPrefix, threadId], + }); + + return result.rows.length > 0; + } + + async acquireLock(threadId: string, ttlMs: number): Promise { + this.ensureConnected(); + + const token = generateToken(); + const now = Date.now(); + const expiresAt = now + ttlMs; + + const tx = await this.client.transaction("write"); + try { + await tx.execute({ + sql: `DELETE FROM chat_state_locks + WHERE key_prefix = ? AND thread_id = ? AND expires_at <= ?`, + args: [this.keyPrefix, threadId, now], + }); + + const result = await tx.execute({ + sql: `INSERT INTO chat_state_locks (key_prefix, thread_id, token, expires_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (key_prefix, thread_id) DO NOTHING + RETURNING thread_id, token, expires_at`, + args: [this.keyPrefix, threadId, token, expiresAt, now], + }); + + await tx.commit(); + + if (result.rows.length === 0) { + return null; + } + + const row = result.rows[0]; + return { + threadId: row.thread_id as string, + token: row.token as string, + expiresAt: Number(row.expires_at as number), + }; + } catch (error) { + tx.close(); + throw error; + } + } + + async forceReleaseLock(threadId: string): Promise { + this.ensureConnected(); + + await this.client.execute({ + sql: `DELETE FROM chat_state_locks + WHERE key_prefix = ? AND thread_id = ?`, + args: [this.keyPrefix, threadId], + }); + } + + async releaseLock(lock: Lock): Promise { + this.ensureConnected(); + + await this.client.execute({ + sql: `DELETE FROM chat_state_locks + WHERE key_prefix = ? AND thread_id = ? AND token = ?`, + args: [this.keyPrefix, lock.threadId, lock.token], + }); + } + + async extendLock(lock: Lock, ttlMs: number): Promise { + this.ensureConnected(); + + const now = Date.now(); + const result = await this.client.execute({ + sql: `UPDATE chat_state_locks + SET expires_at = ?, updated_at = ? + WHERE key_prefix = ? + AND thread_id = ? + AND token = ? + AND expires_at > ? + RETURNING thread_id`, + args: [now + ttlMs, now, this.keyPrefix, lock.threadId, lock.token, now], + }); + + return result.rows.length > 0; + } + + async get(key: string): Promise { + this.ensureConnected(); + + const now = Date.now(); + const result = await this.client.execute({ + sql: `SELECT value FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ? + AND (expires_at IS NULL OR expires_at > ?) + LIMIT 1`, + args: [this.keyPrefix, key, now], + }); + + if (result.rows.length === 0) { + await this.client.execute({ + sql: `DELETE FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ? + AND expires_at IS NOT NULL AND expires_at <= ?`, + args: [this.keyPrefix, key, now], + }); + + return null; + } + + const value = result.rows[0].value as string; + try { + return JSON.parse(value) as T; + } catch { + return value as unknown as T; + } + } + + async set(key: string, value: T, ttlMs?: number): Promise { + this.ensureConnected(); + + const now = Date.now(); + const serialized = JSON.stringify(value); + const expiresAt = ttlMs ? now + ttlMs : null; + + await this.client.execute({ + sql: `INSERT INTO chat_state_cache (key_prefix, cache_key, value, expires_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (key_prefix, cache_key) DO UPDATE + SET value = excluded.value, + expires_at = excluded.expires_at, + updated_at = excluded.updated_at`, + args: [this.keyPrefix, key, serialized, expiresAt, now], + }); + } + + async setIfNotExists( + key: string, + value: unknown, + ttlMs?: number + ): Promise { + this.ensureConnected(); + + const now = Date.now(); + const serialized = JSON.stringify(value); + const expiresAt = ttlMs ? now + ttlMs : null; + + const tx = await this.client.transaction("write"); + try { + await tx.execute({ + sql: `DELETE FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ? + AND expires_at IS NOT NULL AND expires_at <= ?`, + args: [this.keyPrefix, key, now], + }); + + const result = await tx.execute({ + sql: `INSERT INTO chat_state_cache (key_prefix, cache_key, value, expires_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (key_prefix, cache_key) DO NOTHING + RETURNING cache_key`, + args: [this.keyPrefix, key, serialized, expiresAt, now], + }); + + await tx.commit(); + return result.rows.length > 0; + } catch (error) { + tx.close(); + throw error; + } + } + + async delete(key: string): Promise { + this.ensureConnected(); + + await this.client.execute({ + sql: `DELETE FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ?`, + args: [this.keyPrefix, key], + }); + } + + async appendToList( + key: string, + value: unknown, + options?: { maxLength?: number; ttlMs?: number } + ): Promise { + this.ensureConnected(); + + const serialized = JSON.stringify(value); + const expiresAt = options?.ttlMs ? Date.now() + options.ttlMs : null; + + const tx = await this.client.transaction("write"); + try { + await tx.execute({ + sql: `INSERT INTO chat_state_lists (key_prefix, list_key, value, expires_at) + VALUES (?, ?, ?, ?)`, + args: [this.keyPrefix, key, serialized, expiresAt], + }); + + if (expiresAt !== null) { + await tx.execute({ + sql: `UPDATE chat_state_lists + SET expires_at = ? + WHERE key_prefix = ? AND list_key = ?`, + args: [expiresAt, this.keyPrefix, key], + }); + } + + if (options?.maxLength && options.maxLength > 0) { + await tx.execute({ + sql: `DELETE FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? AND seq NOT IN ( + SELECT seq FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? + ORDER BY seq DESC + LIMIT ? + )`, + args: [this.keyPrefix, key, this.keyPrefix, key, options.maxLength], + }); + } + + await tx.commit(); + } catch (error) { + tx.close(); + throw error; + } + } + + async getList(key: string): Promise { + this.ensureConnected(); + + const now = Date.now(); + + await this.client.execute({ + sql: `DELETE FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? + AND expires_at IS NOT NULL AND expires_at <= ?`, + args: [this.keyPrefix, key, now], + }); + + const result = await this.client.execute({ + sql: `SELECT value FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? + ORDER BY seq ASC`, + args: [this.keyPrefix, key], + }); + + return result.rows.map((row) => { + const value = row.value as string; + try { + return JSON.parse(value) as T; + } catch { + return value as unknown as T; + } + }); + } + + async enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise { + this.ensureConnected(); + + const now = Date.now(); + const serialized = JSON.stringify(entry); + + const tx = await this.client.transaction("write"); + try { + await tx.execute({ + sql: `DELETE FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at <= ?`, + args: [this.keyPrefix, threadId, now], + }); + + await tx.execute({ + sql: `INSERT INTO chat_state_queues (key_prefix, thread_id, value, expires_at) + VALUES (?, ?, ?, ?)`, + args: [this.keyPrefix, threadId, serialized, entry.expiresAt], + }); + + if (maxSize > 0) { + await tx.execute({ + sql: `DELETE FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND seq NOT IN ( + SELECT seq FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? + ORDER BY seq DESC + LIMIT ? + )`, + args: [this.keyPrefix, threadId, this.keyPrefix, threadId, maxSize], + }); + } + + const depthResult = await tx.execute({ + sql: `SELECT COUNT(*) AS depth FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at > ?`, + args: [this.keyPrefix, threadId, now], + }); + + await tx.commit(); + return toNumber(depthResult.rows[0].depth); + } catch (error) { + tx.close(); + throw error; + } + } + + async dequeue(threadId: string): Promise { + this.ensureConnected(); + + const now = Date.now(); + const tx = await this.client.transaction("write"); + try { + await tx.execute({ + sql: `DELETE FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at <= ?`, + args: [this.keyPrefix, threadId, now], + }); + + const selected = await tx.execute({ + sql: `SELECT seq, value FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at > ? + ORDER BY seq ASC + LIMIT 1`, + args: [this.keyPrefix, threadId, now], + }); + + if (selected.rows.length === 0) { + await tx.commit(); + return null; + } + + const row = selected.rows[0]; + await tx.execute({ + sql: "DELETE FROM chat_state_queues WHERE seq = ?", + args: [row.seq as number], + }); + + await tx.commit(); + return JSON.parse(row.value as string) as QueueEntry; + } catch (error) { + tx.close(); + throw error; + } + } + + async queueDepth(threadId: string): Promise { + this.ensureConnected(); + + const result = await this.client.execute({ + sql: `SELECT COUNT(*) AS depth FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at > ?`, + args: [this.keyPrefix, threadId, Date.now()], + }); + + return toNumber(result.rows[0].depth); + } + + getClient(): Client { + return this.client; + } + + private async ensureSchema(): Promise { + await this.client.execute( + `CREATE TABLE IF NOT EXISTS chat_state_subscriptions ( + key_prefix TEXT NOT NULL, + thread_id TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch() * 1000), + PRIMARY KEY (key_prefix, thread_id) + )` + ); + await this.client.execute( + `CREATE TABLE IF NOT EXISTS chat_state_locks ( + key_prefix TEXT NOT NULL, + thread_id TEXT NOT NULL, + token TEXT NOT NULL, + expires_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (key_prefix, thread_id) + )` + ); + await this.client.execute( + `CREATE TABLE IF NOT EXISTS chat_state_cache ( + key_prefix TEXT NOT NULL, + cache_key TEXT NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER, + updated_at INTEGER NOT NULL, + PRIMARY KEY (key_prefix, cache_key) + )` + ); + await this.client.execute( + `CREATE INDEX IF NOT EXISTS chat_state_locks_expires_idx + ON chat_state_locks (expires_at)` + ); + await this.client.execute( + `CREATE INDEX IF NOT EXISTS chat_state_cache_expires_idx + ON chat_state_cache (expires_at) + WHERE expires_at IS NOT NULL` + ); + await this.client.execute( + `CREATE TABLE IF NOT EXISTS chat_state_lists ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + key_prefix TEXT NOT NULL, + list_key TEXT NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER + )` + ); + await this.client.execute( + `CREATE INDEX IF NOT EXISTS chat_state_lists_key_idx + ON chat_state_lists (key_prefix, list_key, seq)` + ); + await this.client.execute( + `CREATE INDEX IF NOT EXISTS chat_state_lists_expires_idx + ON chat_state_lists (expires_at) + WHERE expires_at IS NOT NULL` + ); + await this.client.execute( + `CREATE TABLE IF NOT EXISTS chat_state_queues ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + key_prefix TEXT NOT NULL, + thread_id TEXT NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER NOT NULL + )` + ); + await this.client.execute( + `CREATE INDEX IF NOT EXISTS chat_state_queues_thread_idx + ON chat_state_queues (key_prefix, thread_id, seq)` + ); + await this.client.execute( + `CREATE INDEX IF NOT EXISTS chat_state_queues_expires_idx + ON chat_state_queues (expires_at)` + ); + } + + private ensureConnected(): void { + if (!this.connected) { + throw new Error( + "LibSqlStateAdapter is not connected. Call connect() first." + ); + } + } +} + +function toNumber(value: unknown): number { + if (typeof value === "bigint") { + return Number(value); + } + if (typeof value === "number") { + return value; + } + if (typeof value === "string") { + return Number.parseInt(value, 10); + } + return 0; +} + +function generateToken(): string { + return `libsql_${crypto.randomUUID()}`; +} + +export function createLibSqlState( + options: CreateLibSqlStateOptions = {} +): LibSqlStateAdapter { + if ("client" in options && options.client) { + return new LibSqlStateAdapter({ + client: options.client, + keyPrefix: options.keyPrefix, + logger: options.logger, + }); + } + + const url = options.url || process.env.TURSO_DATABASE_URL; + + if (!url) { + throw new Error( + "libSQL url is required. Set TURSO_DATABASE_URL or provide it in options." + ); + } + + const authToken = options.authToken || process.env.TURSO_AUTH_TOKEN; + + return new LibSqlStateAdapter({ + url, + authToken, + config: options.config, + keyPrefix: options.keyPrefix, + logger: options.logger, + }); +} diff --git a/packages/state-libsql/src/index.test.ts b/packages/state-libsql/src/index.test.ts new file mode 100644 index 00000000..0880371b --- /dev/null +++ b/packages/state-libsql/src/index.test.ts @@ -0,0 +1,567 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { Lock, Logger } from "chat"; +import Database from "libsql/promise"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { createLibSqlState, LibSqlStateAdapter } from "./index"; + +const mockLogger: Logger = { + child: vi.fn(() => mockLogger), + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), +}; + +const LIBSQL_TOKEN_RE = /^libsql_/; + +function tmpFilePath(): { path: string; cleanup: () => void } { + const dir = mkdtempSync(join(tmpdir(), "chat-libsql-")); + const file = join(dir, "state.db"); + return { + path: file, + cleanup: () => rmSync(dir, { recursive: true, force: true }), + }; +} + +function makeTmpDb(): { db: Database; cleanup: () => void } { + const { path, cleanup } = tmpFilePath(); + const db = new Database(path, {}); + return { + db, + cleanup: () => { + try { + db.close(); + } catch { + // already closed + } + cleanup(); + }, + }; +} + +describe("LibSqlStateAdapter (libsql/promise)", () => { + it("exports createLibSqlState", () => { + expect(typeof createLibSqlState).toBe("function"); + }); + + it("exports LibSqlStateAdapter", () => { + expect(typeof LibSqlStateAdapter).toBe("function"); + }); + + describe("createLibSqlState", () => { + it("creates an adapter from a file path", () => { + const { path, cleanup } = tmpFilePath(); + try { + const adapter = createLibSqlState({ url: path, logger: mockLogger }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + } finally { + cleanup(); + } + }); + + it("accepts an existing Database", () => { + const { db, cleanup } = makeTmpDb(); + try { + const adapter = createLibSqlState({ client: db, logger: mockLogger }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + } finally { + cleanup(); + } + }); + + it("throws when no url or TURSO_DATABASE_URL is available", () => { + vi.stubEnv("TURSO_DATABASE_URL", ""); + try { + expect(() => createLibSqlState({ logger: mockLogger })).toThrow( + "libSQL url is required" + ); + } finally { + vi.unstubAllEnvs(); + } + }); + + it("uses TURSO_DATABASE_URL env var as fallback", () => { + const { path, cleanup } = tmpFilePath(); + vi.stubEnv("TURSO_DATABASE_URL", path); + try { + const adapter = createLibSqlState({ logger: mockLogger }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + } finally { + vi.unstubAllEnvs(); + cleanup(); + } + }); + + describe("URL types", () => { + it("connects to a local file path", async () => { + const { path, cleanup } = tmpFilePath(); + try { + const adapter = createLibSqlState({ + url: path, + logger: mockLogger, + }); + await adapter.connect(); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.disconnect(); + } finally { + cleanup(); + } + }); + + it("connects to a file: URL", async () => { + const { path, cleanup } = tmpFilePath(); + try { + const adapter = createLibSqlState({ + url: `file:${path}`, + logger: mockLogger, + }); + await adapter.connect(); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.disconnect(); + } finally { + cleanup(); + } + }); + + it("connects to an in-memory database", async () => { + const adapter = createLibSqlState({ + url: ":memory:", + logger: mockLogger, + }); + await adapter.connect(); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.disconnect(); + }); + + // Remote URLs the native binding supports at construction time. + // It lazily dials the network on the first query, so construction + // succeeds without a live server. `ws://`/`wss://` are only supported + // by `@chat-adapter/state-libsql/client` (`@libsql/client`), not by + // the native binding. + it.each([ + ["libsql://db.turso.io"], + ["https://db.turso.io"], + ["http://127.0.0.1:8080"], + ])("accepts remote URL %s", (url) => { + const adapter = createLibSqlState({ + url, + authToken: "tok", + logger: mockLogger, + }); + expect(adapter).toBeInstanceOf(LibSqlStateAdapter); + }); + }); + }); + + describe("ensureConnected", () => { + let db: Database; + let cleanup: () => void; + + beforeEach(() => { + ({ db, cleanup } = makeTmpDb()); + }); + + afterEach(() => { + cleanup(); + }); + + it.each([ + ["subscribe", (a: LibSqlStateAdapter) => a.subscribe("t1")], + ["unsubscribe", (a: LibSqlStateAdapter) => a.unsubscribe("t1")], + ["isSubscribed", (a: LibSqlStateAdapter) => a.isSubscribed("t1")], + ["acquireLock", (a: LibSqlStateAdapter) => a.acquireLock("t1", 5000)], + ["get", (a: LibSqlStateAdapter) => a.get("key")], + ["set", (a: LibSqlStateAdapter) => a.set("key", "value")], + [ + "setIfNotExists", + (a: LibSqlStateAdapter) => a.setIfNotExists("key", "value"), + ], + ["delete", (a: LibSqlStateAdapter) => a.delete("key")], + [ + "appendToList", + (a: LibSqlStateAdapter) => a.appendToList("list", "value"), + ], + ["getList", (a: LibSqlStateAdapter) => a.getList("list")], + [ + "enqueue", + (a: LibSqlStateAdapter) => + a.enqueue( + "t1", + { + message: { id: "m1" }, + enqueuedAt: 0, + expiresAt: 1, + } as never, + 10 + ), + ], + ["dequeue", (a: LibSqlStateAdapter) => a.dequeue("t1")], + ["queueDepth", (a: LibSqlStateAdapter) => a.queueDepth("t1")], + ])("throws when calling %s before connect", async (_, fn) => { + const adapter = new LibSqlStateAdapter({ + client: db, + logger: mockLogger, + }); + await expect(fn(adapter)).rejects.toThrow("not connected"); + }); + + it("throws for releaseLock before connect", async () => { + const adapter = new LibSqlStateAdapter({ + client: db, + logger: mockLogger, + }); + const lock: Lock = { + threadId: "t1", + token: "tok", + expiresAt: Date.now(), + }; + await expect(adapter.releaseLock(lock)).rejects.toThrow("not connected"); + }); + + it("throws for extendLock before connect", async () => { + const adapter = new LibSqlStateAdapter({ + client: db, + logger: mockLogger, + }); + const lock: Lock = { + threadId: "t1", + token: "tok", + expiresAt: Date.now(), + }; + await expect(adapter.extendLock(lock, 5000)).rejects.toThrow( + "not connected" + ); + }); + + it("throws for forceReleaseLock before connect", async () => { + const adapter = new LibSqlStateAdapter({ + client: db, + logger: mockLogger, + }); + await expect(adapter.forceReleaseLock("t1")).rejects.toThrow( + "not connected" + ); + }); + }); + + describe("with a real libsql file database", () => { + let db: Database; + let cleanupDb: () => void; + let adapter: LibSqlStateAdapter; + + beforeEach(async () => { + ({ db, cleanup: cleanupDb } = makeTmpDb()); + adapter = new LibSqlStateAdapter({ client: db, logger: mockLogger }); + await adapter.connect(); + }); + + afterEach(async () => { + await adapter.disconnect(); + cleanupDb(); + }); + + describe("connect / disconnect", () => { + it("is idempotent on connect", async () => { + await adapter.connect(); + await adapter.connect(); + }); + + it("deduplicates concurrent connect calls", async () => { + const { db: d, cleanup } = makeTmpDb(); + try { + const a = new LibSqlStateAdapter({ client: d, logger: mockLogger }); + await Promise.all([a.connect(), a.connect()]); + await a.disconnect(); + } finally { + cleanup(); + } + }); + + it("is idempotent on disconnect", async () => { + await adapter.disconnect(); + await adapter.disconnect(); + await adapter.connect(); + }); + + it("does not close external client on disconnect", async () => { + await adapter.disconnect(); + // The Database is still usable — proving the adapter did not close + // a client it didn't own. + const stmt = await db.prepare("SELECT 1 AS v"); + expect(stmt.get().v).toBe(1); + await adapter.connect(); + }); + + it("closes owned client on disconnect", async () => { + const { path, cleanup } = tmpFilePath(); + try { + const a = createLibSqlState({ url: path, logger: mockLogger }); + await a.connect(); + const innerDb = a.getClient(); + await a.disconnect(); + // libsql/promise doesn't flip `open` on close(), so assert via + // behaviour: the closed Database should reject further use. + await expect(innerDb.prepare("SELECT 1")).rejects.toThrow(); + } finally { + cleanup(); + } + }); + + it("handles connect failure and allows retry", async () => { + const { db: broken, cleanup } = makeTmpDb(); + try { + broken.close(); + const a = new LibSqlStateAdapter({ + client: broken, + logger: mockLogger, + }); + await expect(a.connect()).rejects.toThrow(); + expect(mockLogger.error).toHaveBeenCalled(); + await expect(a.connect()).rejects.toThrow(); + } finally { + cleanup(); + } + }); + }); + + describe("subscriptions", () => { + it("round-trips subscribe / isSubscribed / unsubscribe", async () => { + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(false); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.subscribe("slack:C1:1.2"); // idempotent + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.unsubscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(false); + }); + + it("isolates subscriptions by keyPrefix", async () => { + const other = new LibSqlStateAdapter({ + client: db, + keyPrefix: "other", + logger: mockLogger, + }); + await other.connect(); + await adapter.subscribe("t1"); + expect(await other.isSubscribed("t1")).toBe(false); + await other.disconnect(); + }); + }); + + describe("locking", () => { + it("acquires a lock with a token and expiry", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + expect(lock?.threadId).toBe("t1"); + expect(lock?.token).toMatch(LIBSQL_TOKEN_RE); + expect(lock?.expiresAt).toBeGreaterThan(Date.now()); + }); + + it("returns null when the lock is held", async () => { + const first = await adapter.acquireLock("t1", 5000); + expect(first).not.toBeNull(); + const second = await adapter.acquireLock("t1", 5000); + expect(second).toBeNull(); + }); + + it("allows reacquiring an expired lock", async () => { + const first = await adapter.acquireLock("t1", 1); + expect(first).not.toBeNull(); + await new Promise((r) => setTimeout(r, 10)); + const second = await adapter.acquireLock("t1", 5000); + expect(second).not.toBeNull(); + expect(second?.token).not.toBe(first?.token); + }); + + it("releases a lock only with the right token", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + await adapter.releaseLock({ + threadId: "t1", + token: "wrong", + expiresAt: lock?.expiresAt ?? 0, + }); + expect(await adapter.acquireLock("t1", 5000)).toBeNull(); + if (lock) { + await adapter.releaseLock(lock); + } + expect(await adapter.acquireLock("t1", 5000)).not.toBeNull(); + }); + + it("extends a lock when the token matches", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + if (!lock) { + return; + } + const extended = await adapter.extendLock(lock, 10_000); + expect(extended).toBe(true); + }); + + it("returns false when extending with the wrong token", async () => { + await adapter.acquireLock("t1", 5000); + const extended = await adapter.extendLock( + { threadId: "t1", token: "nope", expiresAt: Date.now() + 5000 }, + 5000 + ); + expect(extended).toBe(false); + }); + + it("force-releases a lock without checking token", async () => { + const lock = await adapter.acquireLock("t1", 5000); + expect(lock).not.toBeNull(); + await adapter.forceReleaseLock("t1"); + expect(await adapter.acquireLock("t1", 5000)).not.toBeNull(); + }); + }); + + describe("cache", () => { + it("round-trips JSON values", async () => { + await adapter.set("key", { foo: "bar" }); + expect(await adapter.get("key")).toEqual({ foo: "bar" }); + }); + + it("returns null on miss", async () => { + expect(await adapter.get("missing")).toBeNull(); + }); + + it("respects TTL", async () => { + await adapter.set("key", "value", 1); + await new Promise((r) => setTimeout(r, 10)); + expect(await adapter.get("key")).toBeNull(); + }); + + it("setIfNotExists inserts only when absent", async () => { + expect(await adapter.setIfNotExists("key", "first")).toBe(true); + expect(await adapter.setIfNotExists("key", "second")).toBe(false); + expect(await adapter.get("key")).toBe("first"); + }); + + it("setIfNotExists succeeds after TTL expiry", async () => { + expect(await adapter.setIfNotExists("key", "first", 1)).toBe(true); + await new Promise((r) => setTimeout(r, 10)); + expect(await adapter.setIfNotExists("key", "second")).toBe(true); + expect(await adapter.get("key")).toBe("second"); + }); + + it("delete removes a value", async () => { + await adapter.set("key", "value"); + await adapter.delete("key"); + expect(await adapter.get("key")).toBeNull(); + }); + }); + + describe("lists", () => { + it("appends values and returns them in insertion order", async () => { + await adapter.appendToList("mylist", { id: 1 }); + await adapter.appendToList("mylist", { id: 2 }); + await adapter.appendToList("mylist", { id: 3 }); + expect(await adapter.getList("mylist")).toEqual([ + { id: 1 }, + { id: 2 }, + { id: 3 }, + ]); + }); + + it("trims to maxLength, keeping newest", async () => { + for (let i = 1; i <= 5; i++) { + await adapter.appendToList("mylist", { id: i }, { maxLength: 3 }); + } + expect(await adapter.getList("mylist")).toEqual([ + { id: 3 }, + { id: 4 }, + { id: 5 }, + ]); + }); + + it("expires the whole list when TTL passes", async () => { + await adapter.appendToList("mylist", { id: 1 }, { ttlMs: 1 }); + await new Promise((r) => setTimeout(r, 10)); + expect(await adapter.getList("mylist")).toEqual([]); + }); + + it("returns empty for unknown keys", async () => { + expect(await adapter.getList("nope")).toEqual([]); + }); + }); + + describe("queue", () => { + const makeEntry = (id: string, offsetMs = 90_000) => ({ + message: { id }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + offsetMs, + }); + + it("enqueues and dequeues in FIFO order", async () => { + await adapter.enqueue("t1", makeEntry("m1") as never, 10); + await adapter.enqueue("t1", makeEntry("m2") as never, 10); + await adapter.enqueue("t1", makeEntry("m3") as never, 10); + + const a = await adapter.dequeue("t1"); + const b = await adapter.dequeue("t1"); + const c = await adapter.dequeue("t1"); + const d = await adapter.dequeue("t1"); + + expect(a?.message.id).toBe("m1"); + expect(b?.message.id).toBe("m2"); + expect(c?.message.id).toBe("m3"); + expect(d).toBeNull(); + }); + + it("returns current depth from enqueue", async () => { + const d1 = await adapter.enqueue("t1", makeEntry("m1") as never, 10); + const d2 = await adapter.enqueue("t1", makeEntry("m2") as never, 10); + expect(d1).toBe(1); + expect(d2).toBe(2); + }); + + it("trims to maxSize, keeping newest entries", async () => { + for (let i = 1; i <= 5; i++) { + await adapter.enqueue("t1", makeEntry(`m${i}`) as never, 3); + } + expect(await adapter.queueDepth("t1")).toBe(3); + const entries: (string | undefined)[] = []; + let next = await adapter.dequeue("t1"); + while (next) { + entries.push(next.message.id as string); + next = await adapter.dequeue("t1"); + } + expect(entries).toEqual(["m3", "m4", "m5"]); + }); + + it("drops expired entries", async () => { + await adapter.enqueue("t1", makeEntry("old", 1) as never, 10); + await new Promise((r) => setTimeout(r, 10)); + await adapter.enqueue("t1", makeEntry("fresh") as never, 10); + const entry = await adapter.dequeue("t1"); + expect(entry?.message.id).toBe("fresh"); + }); + + it("queueDepth returns 0 for empty queues", async () => { + expect(await adapter.queueDepth("nobody")).toBe(0); + }); + }); + + describe("getClient", () => { + it("returns the underlying Database", () => { + expect(adapter.getClient()).toBe(db); + }); + }); + }); + + describe.skip("integration tests against TURSO_DATABASE_URL", () => { + it("connects to the configured remote", async () => { + const adapter = createLibSqlState({ logger: mockLogger }); + await adapter.connect(); + await adapter.subscribe("slack:C1:1.2"); + expect(await adapter.isSubscribed("slack:C1:1.2")).toBe(true); + await adapter.unsubscribe("slack:C1:1.2"); + await adapter.disconnect(); + }); + }); +}); diff --git a/packages/state-libsql/src/index.ts b/packages/state-libsql/src/index.ts new file mode 100644 index 00000000..a87941a3 --- /dev/null +++ b/packages/state-libsql/src/index.ts @@ -0,0 +1,637 @@ +import type { Lock, Logger, QueueEntry, StateAdapter } from "chat"; +import { ConsoleLogger } from "chat"; +import Database from "libsql/promise"; + +export interface LibSqlDatabaseOptions { + /** Auth token for remote libSQL / Turso connections. */ + authToken?: string; + /** Encryption key for encrypted local databases. */ + encryptionKey?: string; + /** Open the replica in offline mode. */ + offline?: boolean; + /** Sync period in seconds when using embedded replicas. */ + syncPeriod?: number; + /** Sync URL for embedded-replica mode (local file mirrored from remote). */ + syncUrl?: string; + /** Connection timeout in seconds. */ + timeout?: number; +} + +export interface LibSqlStateAdapterOptions extends LibSqlDatabaseOptions { + /** Key prefix for all rows (default: "chat-sdk") */ + keyPrefix?: string; + /** Logger instance for error reporting */ + logger?: Logger; + /** + * libSQL database URL. Use a local path (e.g. `./chat-state.db`), `:memory:`, + * or a remote URL (`libsql://`, `http(s)://`, `ws(s)://`). + */ + url: string; +} + +export interface LibSqlStateClientOptions { + /** Existing libsql Database instance (opened via `libsql/promise`). */ + client: Database; + /** Key prefix for all rows (default: "chat-sdk") */ + keyPrefix?: string; + /** Logger instance for error reporting */ + logger?: Logger; +} + +export type CreateLibSqlStateOptions = + | (Partial & { client?: never }) + | (Partial> & { + client: Database; + }); + +// Minimal typing for libsql/promise — the shipped .d.ts is incomplete. +interface LibSqlStatement { + all(...args: unknown[]): Promise[]>; + get(...args: unknown[]): Record | undefined; + run(...args: unknown[]): { + changes: number; + lastInsertRowid: number | bigint; + }; +} + +interface LibSqlTxFn { + deferred(): Promise; + exclusive(): Promise; + immediate(): Promise; + (): Promise; +} + +interface LibSqlDatabase { + close(): void; + exec(sql: string): Promise; + open: boolean; + prepare(sql: string): Promise; + transaction(fn: () => R): LibSqlTxFn; +} + +function asDb(db: Database): LibSqlDatabase { + return db as unknown as LibSqlDatabase; +} + +export class LibSqlStateAdapter implements StateAdapter { + private readonly db: LibSqlDatabase; + private readonly keyPrefix: string; + private readonly logger: Logger; + private readonly ownsClient: boolean; + private connected = false; + private connectPromise: Promise | null = null; + + constructor(options: LibSqlStateAdapterOptions | LibSqlStateClientOptions) { + if ("client" in options) { + this.db = asDb(options.client); + this.ownsClient = false; + } else { + this.db = asDb( + new Database(options.url, { + authToken: options.authToken, + syncUrl: options.syncUrl, + syncPeriod: options.syncPeriod, + encryptionKey: options.encryptionKey, + offline: options.offline, + timeout: options.timeout, + }) + ); + this.ownsClient = true; + } + + this.keyPrefix = options.keyPrefix || "chat-sdk"; + this.logger = options.logger ?? new ConsoleLogger("info").child("libsql"); + } + + async connect(): Promise { + if (this.connected) { + return; + } + + if (!this.connectPromise) { + this.connectPromise = (async () => { + try { + await this.db.exec("SELECT 1"); + await this.ensureSchema(); + this.connected = true; + } catch (error) { + this.connectPromise = null; + this.logger.error("libSQL connect failed", { error }); + throw error; + } + })(); + } + + await this.connectPromise; + } + + async disconnect(): Promise { + if (!this.connected) { + return; + } + + if (this.ownsClient) { + this.db.close(); + } + + this.connected = false; + this.connectPromise = null; + } + + async subscribe(threadId: string): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `INSERT INTO chat_state_subscriptions (key_prefix, thread_id) + VALUES (?, ?) + ON CONFLICT DO NOTHING` + ); + stmt.run(this.keyPrefix, threadId); + } + + async unsubscribe(threadId: string): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `DELETE FROM chat_state_subscriptions + WHERE key_prefix = ? AND thread_id = ?` + ); + stmt.run(this.keyPrefix, threadId); + } + + async isSubscribed(threadId: string): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `SELECT 1 AS present FROM chat_state_subscriptions + WHERE key_prefix = ? AND thread_id = ? + LIMIT 1` + ); + const row = stmt.get(this.keyPrefix, threadId); + return row !== undefined; + } + + async acquireLock(threadId: string, ttlMs: number): Promise { + this.ensureConnected(); + + const token = generateToken(); + const now = Date.now(); + const expiresAt = now + ttlMs; + + const delExpired = await this.db.prepare( + `DELETE FROM chat_state_locks + WHERE key_prefix = ? AND thread_id = ? AND expires_at <= ?` + ); + const insert = await this.db.prepare( + `INSERT INTO chat_state_locks (key_prefix, thread_id, token, expires_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (key_prefix, thread_id) DO NOTHING + RETURNING thread_id, token, expires_at` + ); + + const row = await this.db + .transaction(() => { + delExpired.run(this.keyPrefix, threadId, now); + return insert.get(this.keyPrefix, threadId, token, expiresAt, now); + }) + .immediate(); + + if (!row) { + return null; + } + + return { + threadId: row.thread_id as string, + token: row.token as string, + expiresAt: Number(row.expires_at), + }; + } + + async forceReleaseLock(threadId: string): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `DELETE FROM chat_state_locks + WHERE key_prefix = ? AND thread_id = ?` + ); + stmt.run(this.keyPrefix, threadId); + } + + async releaseLock(lock: Lock): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `DELETE FROM chat_state_locks + WHERE key_prefix = ? AND thread_id = ? AND token = ?` + ); + stmt.run(this.keyPrefix, lock.threadId, lock.token); + } + + async extendLock(lock: Lock, ttlMs: number): Promise { + this.ensureConnected(); + + const now = Date.now(); + const stmt = await this.db.prepare( + `UPDATE chat_state_locks + SET expires_at = ?, updated_at = ? + WHERE key_prefix = ? AND thread_id = ? AND token = ? AND expires_at > ? + RETURNING thread_id` + ); + const row = stmt.get( + now + ttlMs, + now, + this.keyPrefix, + lock.threadId, + lock.token, + now + ); + return row !== undefined; + } + + async get(key: string): Promise { + this.ensureConnected(); + + const now = Date.now(); + const select = await this.db.prepare( + `SELECT value FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ? + AND (expires_at IS NULL OR expires_at > ?) + LIMIT 1` + ); + const row = select.get(this.keyPrefix, key, now); + + if (!row) { + // Opportunistic cleanup of any stale row. + const del = await this.db.prepare( + `DELETE FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ? + AND expires_at IS NOT NULL AND expires_at <= ?` + ); + del.run(this.keyPrefix, key, now); + return null; + } + + const value = row.value as string; + try { + return JSON.parse(value) as T; + } catch { + return value as unknown as T; + } + } + + async set(key: string, value: T, ttlMs?: number): Promise { + this.ensureConnected(); + + const now = Date.now(); + const serialized = JSON.stringify(value); + const expiresAt = ttlMs ? now + ttlMs : null; + + const stmt = await this.db.prepare( + `INSERT INTO chat_state_cache (key_prefix, cache_key, value, expires_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (key_prefix, cache_key) DO UPDATE + SET value = excluded.value, + expires_at = excluded.expires_at, + updated_at = excluded.updated_at` + ); + stmt.run(this.keyPrefix, key, serialized, expiresAt, now); + } + + async setIfNotExists( + key: string, + value: unknown, + ttlMs?: number + ): Promise { + this.ensureConnected(); + + const now = Date.now(); + const serialized = JSON.stringify(value); + const expiresAt = ttlMs ? now + ttlMs : null; + + const delExpired = await this.db.prepare( + `DELETE FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ? + AND expires_at IS NOT NULL AND expires_at <= ?` + ); + const insert = await this.db.prepare( + `INSERT INTO chat_state_cache (key_prefix, cache_key, value, expires_at, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT (key_prefix, cache_key) DO NOTHING + RETURNING cache_key` + ); + + const row = await this.db + .transaction(() => { + delExpired.run(this.keyPrefix, key, now); + return insert.get(this.keyPrefix, key, serialized, expiresAt, now); + }) + .immediate(); + + return row !== undefined; + } + + async delete(key: string): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `DELETE FROM chat_state_cache + WHERE key_prefix = ? AND cache_key = ?` + ); + stmt.run(this.keyPrefix, key); + } + + async appendToList( + key: string, + value: unknown, + options?: { maxLength?: number; ttlMs?: number } + ): Promise { + this.ensureConnected(); + + const serialized = JSON.stringify(value); + const expiresAt = options?.ttlMs ? Date.now() + options.ttlMs : null; + + const insert = await this.db.prepare( + `INSERT INTO chat_state_lists (key_prefix, list_key, value, expires_at) + VALUES (?, ?, ?, ?)` + ); + const refreshTtl = await this.db.prepare( + `UPDATE chat_state_lists + SET expires_at = ? + WHERE key_prefix = ? AND list_key = ?` + ); + const trim = await this.db.prepare( + `DELETE FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? AND seq NOT IN ( + SELECT seq FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? + ORDER BY seq DESC + LIMIT ? + )` + ); + + await this.db + .transaction(() => { + insert.run(this.keyPrefix, key, serialized, expiresAt); + if (expiresAt !== null) { + refreshTtl.run(expiresAt, this.keyPrefix, key); + } + if (options?.maxLength && options.maxLength > 0) { + trim.run(this.keyPrefix, key, this.keyPrefix, key, options.maxLength); + } + }) + .immediate(); + } + + async getList(key: string): Promise { + this.ensureConnected(); + + const now = Date.now(); + const del = await this.db.prepare( + `DELETE FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? + AND expires_at IS NOT NULL AND expires_at <= ?` + ); + del.run(this.keyPrefix, key, now); + + const select = await this.db.prepare( + `SELECT value FROM chat_state_lists + WHERE key_prefix = ? AND list_key = ? + ORDER BY seq ASC` + ); + const rows = await select.all(this.keyPrefix, key); + return rows.map((row) => { + const value = row.value as string; + try { + return JSON.parse(value) as T; + } catch { + return value as unknown as T; + } + }); + } + + async enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise { + this.ensureConnected(); + + const now = Date.now(); + const serialized = JSON.stringify(entry); + + const purge = await this.db.prepare( + `DELETE FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at <= ?` + ); + const insert = await this.db.prepare( + `INSERT INTO chat_state_queues (key_prefix, thread_id, value, expires_at) + VALUES (?, ?, ?, ?)` + ); + const trim = await this.db.prepare( + `DELETE FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND seq NOT IN ( + SELECT seq FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? + ORDER BY seq DESC + LIMIT ? + )` + ); + const countStmt = await this.db.prepare( + `SELECT COUNT(*) AS depth FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at > ?` + ); + + const depth = await this.db + .transaction(() => { + purge.run(this.keyPrefix, threadId, now); + insert.run(this.keyPrefix, threadId, serialized, entry.expiresAt); + if (maxSize > 0) { + trim.run(this.keyPrefix, threadId, this.keyPrefix, threadId, maxSize); + } + const row = countStmt.get(this.keyPrefix, threadId, now); + return toNumber(row?.depth); + }) + .immediate(); + + return depth; + } + + async dequeue(threadId: string): Promise { + this.ensureConnected(); + + const now = Date.now(); + const purge = await this.db.prepare( + `DELETE FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at <= ?` + ); + const pick = await this.db.prepare( + `SELECT seq, value FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at > ? + ORDER BY seq ASC + LIMIT 1` + ); + const del = await this.db.prepare( + "DELETE FROM chat_state_queues WHERE seq = ?" + ); + + const value = await this.db + .transaction(() => { + purge.run(this.keyPrefix, threadId, now); + const row = pick.get(this.keyPrefix, threadId, now); + if (!row) { + return null; + } + del.run(row.seq); + return row.value as string; + }) + .immediate(); + + if (value === null) { + return null; + } + + return JSON.parse(value) as QueueEntry; + } + + async queueDepth(threadId: string): Promise { + this.ensureConnected(); + const stmt = await this.db.prepare( + `SELECT COUNT(*) AS depth FROM chat_state_queues + WHERE key_prefix = ? AND thread_id = ? AND expires_at > ?` + ); + const row = stmt.get(this.keyPrefix, threadId, Date.now()); + return toNumber(row?.depth); + } + + getClient(): Database { + return this.db as unknown as Database; + } + + private async ensureSchema(): Promise { + await this.db.exec( + `CREATE TABLE IF NOT EXISTS chat_state_subscriptions ( + key_prefix TEXT NOT NULL, + thread_id TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch() * 1000), + PRIMARY KEY (key_prefix, thread_id) + )` + ); + await this.db.exec( + `CREATE TABLE IF NOT EXISTS chat_state_locks ( + key_prefix TEXT NOT NULL, + thread_id TEXT NOT NULL, + token TEXT NOT NULL, + expires_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + PRIMARY KEY (key_prefix, thread_id) + )` + ); + await this.db.exec( + `CREATE TABLE IF NOT EXISTS chat_state_cache ( + key_prefix TEXT NOT NULL, + cache_key TEXT NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER, + updated_at INTEGER NOT NULL, + PRIMARY KEY (key_prefix, cache_key) + )` + ); + await this.db.exec( + `CREATE INDEX IF NOT EXISTS chat_state_locks_expires_idx + ON chat_state_locks (expires_at)` + ); + await this.db.exec( + `CREATE INDEX IF NOT EXISTS chat_state_cache_expires_idx + ON chat_state_cache (expires_at) + WHERE expires_at IS NOT NULL` + ); + await this.db.exec( + `CREATE TABLE IF NOT EXISTS chat_state_lists ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + key_prefix TEXT NOT NULL, + list_key TEXT NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER + )` + ); + await this.db.exec( + `CREATE INDEX IF NOT EXISTS chat_state_lists_key_idx + ON chat_state_lists (key_prefix, list_key, seq)` + ); + await this.db.exec( + `CREATE INDEX IF NOT EXISTS chat_state_lists_expires_idx + ON chat_state_lists (expires_at) + WHERE expires_at IS NOT NULL` + ); + await this.db.exec( + `CREATE TABLE IF NOT EXISTS chat_state_queues ( + seq INTEGER PRIMARY KEY AUTOINCREMENT, + key_prefix TEXT NOT NULL, + thread_id TEXT NOT NULL, + value TEXT NOT NULL, + expires_at INTEGER NOT NULL + )` + ); + await this.db.exec( + `CREATE INDEX IF NOT EXISTS chat_state_queues_thread_idx + ON chat_state_queues (key_prefix, thread_id, seq)` + ); + await this.db.exec( + `CREATE INDEX IF NOT EXISTS chat_state_queues_expires_idx + ON chat_state_queues (expires_at)` + ); + } + + private ensureConnected(): void { + if (!this.connected) { + throw new Error( + "LibSqlStateAdapter is not connected. Call connect() first." + ); + } + } +} + +function toNumber(value: unknown): number { + if (typeof value === "bigint") { + return Number(value); + } + if (typeof value === "number") { + return value; + } + if (typeof value === "string") { + return Number.parseInt(value, 10); + } + return 0; +} + +function generateToken(): string { + return `libsql_${crypto.randomUUID()}`; +} + +export function createLibSqlState( + options: CreateLibSqlStateOptions = {} +): LibSqlStateAdapter { + if ("client" in options && options.client) { + return new LibSqlStateAdapter({ + client: options.client, + keyPrefix: options.keyPrefix, + logger: options.logger, + }); + } + + const url = options.url || process.env.TURSO_DATABASE_URL; + + if (!url) { + throw new Error( + "libSQL url is required. Set TURSO_DATABASE_URL or provide it in options." + ); + } + + const authToken = options.authToken || process.env.TURSO_AUTH_TOKEN; + + return new LibSqlStateAdapter({ + url, + authToken, + syncUrl: options.syncUrl, + syncPeriod: options.syncPeriod, + encryptionKey: options.encryptionKey, + offline: options.offline, + timeout: options.timeout, + keyPrefix: options.keyPrefix, + logger: options.logger, + }); +} diff --git a/packages/state-libsql/tsconfig.json b/packages/state-libsql/tsconfig.json new file mode 100644 index 00000000..8768f5bd --- /dev/null +++ b/packages/state-libsql/tsconfig.json @@ -0,0 +1,10 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "./dist", + "rootDir": "./src", + "strictNullChecks": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist", "**/*.test.ts"] +} diff --git a/packages/state-libsql/tsup.config.ts b/packages/state-libsql/tsup.config.ts new file mode 100644 index 00000000..7e0f69b3 --- /dev/null +++ b/packages/state-libsql/tsup.config.ts @@ -0,0 +1,10 @@ +import { defineConfig } from "tsup"; + +export default defineConfig({ + entry: ["src/index.ts", "src/client.ts"], + format: ["esm"], + dts: true, + clean: true, + sourcemap: false, + external: ["libsql", "libsql/promise", "@libsql/client"], +}); diff --git a/packages/state-libsql/vitest.config.ts b/packages/state-libsql/vitest.config.ts new file mode 100644 index 00000000..edc2d946 --- /dev/null +++ b/packages/state-libsql/vitest.config.ts @@ -0,0 +1,14 @@ +import { defineProject } from "vitest/config"; + +export default defineProject({ + test: { + globals: true, + environment: "node", + coverage: { + provider: "v8", + reporter: ["text", "json-summary"], + include: ["src/**/*.ts"], + exclude: ["src/**/*.test.ts"], + }, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index dcf29975..14a88a29 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -587,6 +587,34 @@ importers: specifier: ^4.0.18 version: 4.0.18(@opentelemetry/api@1.9.0)(@types/node@25.3.2)(jiti@2.6.1)(lightningcss@1.30.2)(tsx@4.21.0) + packages/state-libsql: + dependencies: + chat: + specifier: workspace:* + version: link:../chat + devDependencies: + '@libsql/client': + specifier: ^0.15.15 + version: 0.15.15 + '@types/node': + specifier: ^25.3.2 + version: 25.3.2 + '@vitest/coverage-v8': + specifier: ^4.0.18 + version: 4.0.18(vitest@4.0.18(@opentelemetry/api@1.9.0)(@types/node@25.3.2)(jiti@2.6.1)(lightningcss@1.30.2)(tsx@4.21.0)) + libsql: + specifier: ^0.5.29 + version: 0.5.29 + tsup: + specifier: ^8.3.5 + version: 8.5.1(jiti@2.6.1)(postcss@8.5.9)(tsx@4.21.0)(typescript@5.9.3) + typescript: + specifier: ^5.7.2 + version: 5.9.3 + vitest: + specifier: ^4.0.18 + version: 4.0.18(@opentelemetry/api@1.9.0)(@types/node@25.3.2)(jiti@2.6.1)(lightningcss@1.30.2)(tsx@4.21.0) + packages/state-memory: dependencies: chat: @@ -1297,6 +1325,67 @@ packages: '@jridgewell/trace-mapping@0.3.31': resolution: {integrity: sha512-zzNR+SdQSDJzc8joaeP8QQoCQr8NuYx2dIIytl1QeBEZHJ9uW6hebsrYgbz8hJwUQao3TWCMtmfV8Nu1twOLAw==} + '@libsql/client@0.15.15': + resolution: {integrity: sha512-twC0hQxPNHPKfeOv3sNT6u2pturQjLcI+CnpTM0SjRpocEGgfiZ7DWKXLNnsothjyJmDqEsBQJ5ztq9Wlu470w==} + + '@libsql/core@0.15.15': + resolution: {integrity: sha512-C88Z6UKl+OyuKKPwz224riz02ih/zHYI3Ho/LAcVOgjsunIRZoBw7fjRfaH9oPMmSNeQfhGklSG2il1URoOIsA==} + + '@libsql/darwin-arm64@0.5.29': + resolution: {integrity: sha512-K+2RIB1OGFPYQbfay48GakLhqf3ArcbHqPFu7EZiaUcRgFcdw8RoltsMyvbj5ix2fY0HV3Q3Ioa/ByvQdaSM0A==} + cpu: [arm64] + os: [darwin] + + '@libsql/darwin-x64@0.5.29': + resolution: {integrity: sha512-OtT+KFHsKFy1R5FVadr8FJ2Bb1mghtXTyJkxv0trocq7NuHntSki1eUbxpO5ezJesDvBlqFjnWaYYY516QNLhQ==} + cpu: [x64] + os: [darwin] + + '@libsql/hrana-client@0.7.0': + resolution: {integrity: sha512-OF8fFQSkbL7vJY9rfuegK1R7sPgQ6kFMkDamiEccNUvieQ+3urzfDFI616oPl8V7T9zRmnTkSjMOImYCAVRVuw==} + + '@libsql/isomorphic-fetch@0.3.1': + resolution: {integrity: sha512-6kK3SUK5Uu56zPq/Las620n5aS9xJq+jMBcNSOmjhNf/MUvdyji4vrMTqD7ptY7/4/CAVEAYDeotUz60LNQHtw==} + engines: {node: '>=18.0.0'} + + '@libsql/isomorphic-ws@0.1.5': + resolution: {integrity: sha512-DtLWIH29onUYR00i0GlQ3UdcTRC6EP4u9w/h9LxpUZJWRMARk6dQwZ6Jkd+QdwVpuAOrdxt18v0K2uIYR3fwFg==} + + '@libsql/linux-arm-gnueabihf@0.5.29': + resolution: {integrity: sha512-CD4n4zj7SJTHso4nf5cuMoWoMSS7asn5hHygsDuhRl8jjjCTT3yE+xdUvI4J7zsyb53VO5ISh4cwwOtf6k2UhQ==} + cpu: [arm] + os: [linux] + + '@libsql/linux-arm-musleabihf@0.5.29': + resolution: {integrity: sha512-2Z9qBVpEJV7OeflzIR3+l5yAd4uTOLxklScYTwpZnkm2vDSGlC1PRlueLaufc4EFITkLKXK2MWBpexuNJfMVcg==} + cpu: [arm] + os: [linux] + + '@libsql/linux-arm64-gnu@0.5.29': + resolution: {integrity: sha512-gURBqaiXIGGwFNEaUj8Ldk7Hps4STtG+31aEidCk5evMMdtsdfL3HPCpvys+ZF/tkOs2MWlRWoSq7SOuCE9k3w==} + cpu: [arm64] + os: [linux] + + '@libsql/linux-arm64-musl@0.5.29': + resolution: {integrity: sha512-fwgYZ0H8mUkyVqXZHF3mT/92iIh1N94Owi/f66cPVNsk9BdGKq5gVpoKO+7UxaNzuEH1roJp2QEwsCZMvBLpqg==} + cpu: [arm64] + os: [linux] + + '@libsql/linux-x64-gnu@0.5.29': + resolution: {integrity: sha512-y14V0vY0nmMC6G0pHeJcEarcnGU2H6cm21ZceRkacWHvQAEhAG0latQkCtoS2njFOXiYIg+JYPfAoWKbi82rkg==} + cpu: [x64] + os: [linux] + + '@libsql/linux-x64-musl@0.5.29': + resolution: {integrity: sha512-gquqwA/39tH4pFl+J9n3SOMSymjX+6kZ3kWgY3b94nXFTwac9bnFNMffIomgvlFaC4ArVqMnOZD3nuJ3H3VO1w==} + cpu: [x64] + os: [linux] + + '@libsql/win32-x64-msvc@0.5.29': + resolution: {integrity: sha512-4/0CvEdhi6+KjMxMaVbFM2n2Z44escBRoEYpR+gZg64DdetzGnYm8mcNLcoySaDJZNaBd6wz5DNdgRmcI4hXcg==} + cpu: [x64] + os: [win32] + '@linear/sdk@76.0.0': resolution: {integrity: sha512-Xt0x5Kl6qBoWhGFypb8ykyP+c5kT7scmRPs1uJidSPOaRgkMJ/4y41QpmZCWCBUMmZtf/O0VktgQio6rLXT94w==} engines: {node: '>=18.x'} @@ -1365,6 +1454,9 @@ packages: '@napi-rs/wasm-runtime@1.1.1': resolution: {integrity: sha512-p64ah1M1ld8xjWv3qbvFwHiFVWrq1yFvV4f7w+mzaqiR4IlSgkqhcRdHwsGgomwzBH51sRY4NEowLxnaBjcW/A==} + '@neon-rs/load@0.0.4': + resolution: {integrity: sha512-kTPhdZyTQxB+2wpiRcFWrDcejc4JI6tkPuS7UZCG4l6Zvc5kU/gGQ/ozvHTh1XR5tS+UlfAfGuPajjzQjCiHCw==} + '@next/env@16.2.3': resolution: {integrity: sha512-ZWXyj4uNu4GCWQw9cjRxWlbD+33mcDszIo9iQxFnBX3Wmgq9ulaSJcl6VhuWx5pCWqqD+9W6Wfz7N0lM5lYPMA==} @@ -3551,6 +3643,10 @@ packages: resolution: {integrity: sha512-reYkTUJAZb9gUuZ2RvVCNhVHdg62RHnJ7WJl8ftMi4diZ6NWlciOzQN88pUhSELEwflJht4oQDv0F0BMlwaYtA==} engines: {node: '>=8'} + detect-libc@2.0.2: + resolution: {integrity: sha512-UX6sGumvvqSaXgdKGUsgZWqcUyIXZ/vZTrlRT/iobiKhGL0zL4d3osHj3uqllWJK+i+sixDS/3COVEOFbupFyw==} + engines: {node: '>=8'} + detect-libc@2.1.2: resolution: {integrity: sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==} engines: {node: '>=8'} @@ -4218,6 +4314,9 @@ packages: resolution: {integrity: sha512-34wB/Y7MW7bzjKRjUKTa46I2Z7eV62Rkhva+KkopW7Qvv/OSWBqvkSY7vusOPrNuZcUG3tApvdVgNB8POj3SPw==} engines: {node: '>=10'} + js-base64@3.7.8: + resolution: {integrity: sha512-hNngCeKxIUQiEUN3GPJOkz4wF/YvdUdbNL9hsBcMQTkKzboD7T/q3OYOuuPZLUE6dBxSGpwhk5mwuDud7JVAow==} + js-tokens@10.0.0: resolution: {integrity: sha512-lM/UBzQmfJRo9ABXbPWemivdCW8V2G8FHaHdypQaIy523snUjog0W71ayWXTjiR+ixeMyVHN2XcpnTd/liPg/Q==} @@ -4290,6 +4389,11 @@ packages: layout-base@2.0.1: resolution: {integrity: sha512-dp3s92+uNI1hWIpPGH3jK2kxE2lMjdXdr+DH8ynZHpd6PUlH6x6cbuXnoMmiNumznqaNO31xu9e79F0uuZ0JFg==} + libsql@0.5.29: + resolution: {integrity: sha512-8lMP8iMgiBzzoNbAPQ59qdVcj6UaE/Vnm+fiwX4doX4Narook0a4GPKWBEv+CR8a1OwbfkgL18uBfBjWdF0Fzg==} + cpu: [x64, arm64, wasm32, arm] + os: [darwin, linux, win32] + lie@3.1.1: resolution: {integrity: sha512-RiNhHysUjhrDQntfYSfY4MU24coXXdEOgw9WGcKHNeEwffDYbF//u87M1EWaMGzuFoSbqW0C9C6lEEhDOAswfw==} @@ -5081,6 +5185,9 @@ packages: engines: {node: '>=10.13.0'} hasBin: true + promise-limit@2.7.0: + resolution: {integrity: sha512-7nJ6v5lnJsXwGprnGXga4wx6d1POjvi5Qmf1ivTRxTjH4Z/9Czja/UCMLVmB9N93GeWOU93XaFaEt6jbuoagNw==} + prop-types@15.8.1: resolution: {integrity: sha512-oj87CgZICdulUohogVAR7AjlC0327U4el4L6eAvOqCeudMDVU0NThNaV+b9Df4dXgSP1gXMTnPdhfe/2qDH5cg==} @@ -6600,6 +6707,68 @@ snapshots: '@jridgewell/resolve-uri': 3.1.2 '@jridgewell/sourcemap-codec': 1.5.5 + '@libsql/client@0.15.15': + dependencies: + '@libsql/core': 0.15.15 + '@libsql/hrana-client': 0.7.0 + js-base64: 3.7.8 + libsql: 0.5.29 + promise-limit: 2.7.0 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + + '@libsql/core@0.15.15': + dependencies: + js-base64: 3.7.8 + + '@libsql/darwin-arm64@0.5.29': + optional: true + + '@libsql/darwin-x64@0.5.29': + optional: true + + '@libsql/hrana-client@0.7.0': + dependencies: + '@libsql/isomorphic-fetch': 0.3.1 + '@libsql/isomorphic-ws': 0.1.5 + js-base64: 3.7.8 + node-fetch: 3.3.2 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + + '@libsql/isomorphic-fetch@0.3.1': {} + + '@libsql/isomorphic-ws@0.1.5': + dependencies: + '@types/ws': 8.18.1 + ws: 8.18.3 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + + '@libsql/linux-arm-gnueabihf@0.5.29': + optional: true + + '@libsql/linux-arm-musleabihf@0.5.29': + optional: true + + '@libsql/linux-arm64-gnu@0.5.29': + optional: true + + '@libsql/linux-arm64-musl@0.5.29': + optional: true + + '@libsql/linux-x64-gnu@0.5.29': + optional: true + + '@libsql/linux-x64-musl@0.5.29': + optional: true + + '@libsql/win32-x64-msvc@0.5.29': + optional: true + '@linear/sdk@76.0.0(graphql@15.10.1)': dependencies: '@graphql-typed-document-node/core': 3.2.0(graphql@15.10.1) @@ -6742,6 +6911,8 @@ snapshots: '@tybys/wasm-util': 0.10.1 optional: true + '@neon-rs/load@0.0.4': {} + '@next/env@16.2.3': {} '@next/swc-darwin-arm64@16.2.3': @@ -8850,6 +9021,8 @@ snapshots: detect-indent@6.1.0: {} + detect-libc@2.0.2: {} + detect-libc@2.1.2: {} detect-node-es@1.1.0: {} @@ -9698,6 +9871,8 @@ snapshots: joycon@3.1.1: {} + js-base64@3.7.8: {} + js-tokens@10.0.0: {} js-tokens@4.0.0: {} @@ -9796,6 +9971,21 @@ snapshots: layout-base@2.0.1: {} + libsql@0.5.29: + dependencies: + '@neon-rs/load': 0.0.4 + detect-libc: 2.0.2 + optionalDependencies: + '@libsql/darwin-arm64': 0.5.29 + '@libsql/darwin-x64': 0.5.29 + '@libsql/linux-arm-gnueabihf': 0.5.29 + '@libsql/linux-arm-musleabihf': 0.5.29 + '@libsql/linux-arm64-gnu': 0.5.29 + '@libsql/linux-arm64-musl': 0.5.29 + '@libsql/linux-x64-gnu': 0.5.29 + '@libsql/linux-x64-musl': 0.5.29 + '@libsql/win32-x64-msvc': 0.5.29 + lie@3.1.1: dependencies: immediate: 3.0.6 @@ -10822,6 +11012,8 @@ snapshots: prettier@2.8.8: {} + promise-limit@2.7.0: {} + prop-types@15.8.1: dependencies: loose-envify: 1.4.0