diff --git a/packages/realm-server/tests/full-reindex-test.ts b/packages/realm-server/tests/full-reindex-test.ts index cd3e798c0cd..34349dbd92a 100644 --- a/packages/realm-server/tests/full-reindex-test.ts +++ b/packages/realm-server/tests/full-reindex-test.ts @@ -87,7 +87,11 @@ module(basename(__filename), function (hooks) { realmUrls: [sourceRealmURL, publishedRealmURL], }); - type JobArgs = { realmURL: string; realmUsername: string }; + type JobArgs = { + realmURL: string; + realmUsername: string; + clearLastModified: boolean; + }; type JobRow = { job_type: string; concurrency_group: string | null; @@ -116,6 +120,12 @@ module(basename(__filename), function (hooks) { { realmURL: sourceRealmURL, realmUsername: 'owner', + // full-reindex enqueues with clearLastModified: true so every + // file re-renders even when its mtime is unchanged. Surfaced in + // args so the from-scratch coalesce can refuse to attach a + // clearing publish to an already-running same-realm + // from-scratch. + clearLastModified: true, }, 'source job args are correct', ); @@ -137,6 +147,7 @@ module(basename(__filename), function (hooks) { { realmURL: publishedRealmURL, realmUsername: 'owner', + clearLastModified: true, }, 'published job args use the source owner', ); diff --git a/packages/realm-server/tests/queue-test.ts b/packages/realm-server/tests/queue-test.ts index db2f1661974..57ead25c539 100644 --- a/packages/realm-server/tests/queue-test.ts +++ b/packages/realm-server/tests/queue-test.ts @@ -54,7 +54,9 @@ module(basename(__filename), function () { }); async function publishFromScratchIndexJob(args: { - args: FromScratchArgs; + args: Omit & { + clearLastModified?: boolean; + }; priority: number; }) { return await publisher.publish({ @@ -62,7 +64,7 @@ module(basename(__filename), function () { concurrencyGroup: `indexing:${args.args.realmURL}`, timeout: FROM_SCRATCH_JOB_TIMEOUT_SEC, priority: args.priority, - args: args.args, + args: { clearLastModified: false, ...args.args }, }); } @@ -323,6 +325,170 @@ module(basename(__filename), function () { ); }); + test('from-scratch dedup: a duplicate publish for an in-flight from-scratch attaches as a late waiter', async function (assert) { + // From-scratch reindex is the maximal indexing operation for a + // realm: any same-realm from-scratch already running subsumes a + // second concurrent from-scratch by definition. A worker + // claiming the first publish before the second arrives must not + // force a second canonical row, because the second caller's + // result is already covered by what the in-flight job will + // produce. + await runner.destroy(); + let realmURL = 'http://example.com/from-scratch-in-flight-dedup/'; + let started = new Deferred(); + let release = new Deferred(); + + let worker = new PgQueueRunner({ + adapter, + workerId: 'from-scratch-in-flight-worker', + }); + worker.register('from-scratch-index', async () => { + started.fulfill(); + await release.promise; + return { + invalidations: [], + ignoreData: {}, + stats: { + instancesIndexed: 0, + filesIndexed: 0, + instanceErrors: 0, + fileErrors: 0, + totalIndexEntries: 0, + }, + }; + }); + + try { + await worker.start(); + + let first = await publishFromScratchIndexJob({ + priority: 0, + args: { + realmURL, + realmUsername: 'owner', + }, + }); + // Wait for the worker to actually claim the job — that moves + // the row from `candidates` to `inFlightCandidates`, which is + // the precondition for the pending-candidate path to miss and + // the in-flight fallback to fire. + await started.promise; + + let second = await publishFromScratchIndexJob({ + priority: userInitiatedPriority, + args: { + realmURL, + realmUsername: 'owner', + }, + }); + + assert.strictEqual( + first.id, + second.id, + 'duplicate publish reuses the in-flight job id instead of creating a new row', + ); + + let rows = (await adapter.execute( + `SELECT id + FROM jobs + WHERE concurrency_group = $1 + AND status = 'unfulfilled'`, + { bind: [`indexing:${realmURL}`] }, + )) as { id: number }[]; + assert.strictEqual( + rows.length, + 1, + 'only one from-scratch-index row exists; duplicate did not enqueue a second job', + ); + + release.fulfill(); + await Promise.all([first.done, second.done]); + } finally { + release.fulfill(); + await worker.destroy(); + } + }); + + test('from-scratch dedup: a clearLastModified publish does NOT attach to an in-flight from-scratch', async function (assert) { + // A clearLastModified publish has already nulled + // boxel_index.last_modified for the realm so the next from-scratch + // pass re-renders every row. An already-running from-scratch read + // its mtimes snapshot before that clear, so joining the running + // job would let the caller observe a successful job that did NOT + // re-render the swapped files (e.g. a publish-realm caller would + // return ok despite never having indexed the new content). + await runner.destroy(); + let realmURL = 'http://example.com/from-scratch-clear-last-modified/'; + let started = new Deferred(); + let release = new Deferred(); + + let worker = new PgQueueRunner({ + adapter, + workerId: 'from-scratch-clear-worker', + }); + worker.register('from-scratch-index', async () => { + started.fulfill(); + await release.promise; + return { + invalidations: [], + ignoreData: {}, + stats: { + instancesIndexed: 0, + filesIndexed: 0, + instanceErrors: 0, + fileErrors: 0, + totalIndexEntries: 0, + }, + }; + }); + + try { + await worker.start(); + + let first = await publishFromScratchIndexJob({ + priority: 0, + args: { realmURL, realmUsername: 'owner' }, + }); + await started.promise; + + // Second publish flags clearLastModified — must not coalesce + // onto the running first job. + let second = await publishFromScratchIndexJob({ + priority: userInitiatedPriority, + args: { + realmURL, + realmUsername: 'owner', + clearLastModified: true, + }, + }); + + assert.notStrictEqual( + first.id, + second.id, + 'clearLastModified publish does not attach to the in-flight job', + ); + + let rows = (await adapter.execute( + `SELECT id, status + FROM jobs + WHERE concurrency_group = $1 + ORDER BY id`, + { bind: [`indexing:${realmURL}`] }, + )) as { id: number; status: string }[]; + assert.strictEqual( + rows.length, + 2, + 'a fresh row is inserted for the clearLastModified publish', + ); + + release.fulfill(); + await Promise.all([first.done, second.done]); + } finally { + release.fulfill(); + await worker.destroy(); + } + }); + test('from-scratch does not coalesce onto pending incremental in same group', async function (assert) { await runner.destroy(); diff --git a/packages/realm-server/tests/server-endpoints/maintenance-endpoints-test.ts b/packages/realm-server/tests/server-endpoints/maintenance-endpoints-test.ts index 3eed1d5a108..bc647748776 100644 --- a/packages/realm-server/tests/server-endpoints/maintenance-endpoints-test.ts +++ b/packages/realm-server/tests/server-endpoints/maintenance-endpoints-test.ts @@ -665,6 +665,12 @@ module(`server-endpoints/${basename(__filename)}`, function () { { realmURL, realmUsername: owner, + // The grafana reindex path passes clearLastModified: true so + // every file in boxel_index re-renders even when its mtime + // hasn't changed. Surfaced in args so the from-scratch + // coalesce can refuse to attach this kind of publish to an + // already-running same-realm from-scratch. + clearLastModified: true, }, 'realm args are correct', ); diff --git a/packages/runtime-common/jobs/reindex-realm.ts b/packages/runtime-common/jobs/reindex-realm.ts index ceeda0b00ab..281aa24a855 100644 --- a/packages/runtime-common/jobs/reindex-realm.ts +++ b/packages/runtime-common/jobs/reindex-realm.ts @@ -17,11 +17,16 @@ export async function enqueueReindexRealmJob( priority: number, opts?: EnqueueReindexRealmJobOptions, ) { + // Flagging the args so the from-scratch coalesce can refuse to attach + // a forced-refresh publish to an already-running same-realm + // from-scratch whose mtimes snapshot pre-dates the clear below. + let clearLastModified = opts?.clearLastModified === true; let args = { realmURL: realmUrl, realmUsername, + clearLastModified, }; - if (opts?.clearLastModified) { + if (clearLastModified) { await query(dbAdapter, [ `UPDATE boxel_index SET last_modified = NULL WHERE realm_url =`, param(realmUrl), diff --git a/packages/runtime-common/tasks/indexer.ts b/packages/runtime-common/tasks/indexer.ts index 76bfd16eccc..aa8c584e6e2 100644 --- a/packages/runtime-common/tasks/indexer.ts +++ b/packages/runtime-common/tasks/indexer.ts @@ -55,7 +55,18 @@ export interface IncrementalDoneResult extends IncrementalResult { clientRequestId: string | null; } -export type FromScratchArgs = WorkerArgs; +export interface FromScratchArgs extends WorkerArgs { + // True when the caller cleared `boxel_index.last_modified` for the + // realm before publishing. The worker doesn't need to act on this + // (the clear already happened in the DB) — it's surfaced in args + // so the coalesce decision can refuse to attach a clearing publish + // to an already-running same-realm from-scratch whose + // `Batch.getModifiedTimes` snapshot pre-dates the clear, which would + // otherwise let the running job report success without re-rendering + // the swapped files. Always present (non-optional) so the args + // object satisfies WorkerArgs's JSON-shape index signature. + clearLastModified: boolean; +} export interface FromScratchResult extends JSONTypes.Object { invalidations: string[]; @@ -239,29 +250,59 @@ function chooseIncrementalCoalesceDecision( function chooseFromScratchCoalesceDecision( context: QueueCoalesceContext, ): QueueCoalesceDecision { - let { incoming, candidates } = context; + let { incoming, candidates, inFlightCandidates } = context; let sameTypeCandidate = candidates.find( (candidate) => candidate.jobType === incoming.jobType, ); - if (!sameTypeCandidate) { - return { type: 'insert' }; + if (sameTypeCandidate) { + return { + type: 'join', + jobId: sameTypeCandidate.id, + update: { + ...maxPriorityAndTimeout(sameTypeCandidate, incoming), + args: { + ...(isObjectLike(sameTypeCandidate.args) + ? sameTypeCandidate.args + : {}), + ...(isObjectLike(incoming.args) ? incoming.args : {}), + coalescedCallers: mergeCoalescedCallers( + getCoalescedCallers(sameTypeCandidate.args), + getCoalescedCallers(incoming.args), + ), + }, + }, + }; } - return { - type: 'join', - jobId: sameTypeCandidate.id, - update: { - ...maxPriorityAndTimeout(sameTypeCandidate, incoming), - args: { - ...(isObjectLike(sameTypeCandidate.args) ? sameTypeCandidate.args : {}), - ...(isObjectLike(incoming.args) ? incoming.args : {}), - coalescedCallers: mergeCoalescedCallers( - getCoalescedCallers(sameTypeCandidate.args), - getCoalescedCallers(incoming.args), - ), - }, - }, - }; + // No still-pending candidate. Attach to an in-flight same-realm + // from-scratch instead — same concurrency group + same jobType is + // sufficient because a from-scratch reindex subsumes any other + // from-scratch for that realm by definition. Without this fallback, + // a worker claiming the first enqueue between two pre-claim publishes + // forces the second to insert a fresh row at its own priority, even + // though the in-flight job will produce exactly the result the second + // caller wanted. + // + // Exception: a publish carrying `clearLastModified: true` has already + // nulled `boxel_index.last_modified` for the realm so the next + // from-scratch pass re-renders every row even where mtimes didn't + // change. An already-running from-scratch read its mtimes snapshot + // before that clear, so attaching this publish to it would let the + // caller observe a successful job that did NOT actually re-render + // the swapped files. Force a fresh row instead. + if (!incomingClearsLastModified(incoming.args)) { + for (let candidate of inFlightCandidates) { + if (candidate.jobType === incoming.jobType) { + return { type: 'join', jobId: candidate.id }; + } + } + } + + return { type: 'insert' }; +} + +function incomingClearsLastModified(args: unknown): boolean { + return isObjectLike(args) && args.clearLastModified === true; } registerQueueJobDefinition({