Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion packages/realm-server/tests/full-reindex-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ module(basename(__filename), function (hooks) {
realmUrls: [sourceRealmURL, publishedRealmURL],
});

type JobArgs = { realmURL: string; realmUsername: string };
type JobArgs = {
realmURL: string;
realmUsername: string;
clearLastModified: boolean;
};
type JobRow = {
job_type: string;
concurrency_group: string | null;
Expand Down Expand Up @@ -116,6 +120,12 @@ module(basename(__filename), function (hooks) {
{
realmURL: sourceRealmURL,
realmUsername: 'owner',
// full-reindex enqueues with clearLastModified: true so every
// file re-renders even when its mtime is unchanged. Surfaced in
// args so the from-scratch coalesce can refuse to attach a
// clearing publish to an already-running same-realm
// from-scratch.
clearLastModified: true,
},
'source job args are correct',
);
Expand All @@ -137,6 +147,7 @@ module(basename(__filename), function (hooks) {
{
realmURL: publishedRealmURL,
realmUsername: 'owner',
clearLastModified: true,
},
'published job args use the source owner',
);
Expand Down
170 changes: 168 additions & 2 deletions packages/realm-server/tests/queue-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ module(basename(__filename), function () {
});

async function publishFromScratchIndexJob(args: {
args: FromScratchArgs;
args: Omit<FromScratchArgs, 'clearLastModified'> & {
clearLastModified?: boolean;
};
priority: number;
}) {
return await publisher.publish<FromScratchResult>({
jobType: 'from-scratch-index',
concurrencyGroup: `indexing:${args.args.realmURL}`,
timeout: FROM_SCRATCH_JOB_TIMEOUT_SEC,
priority: args.priority,
args: args.args,
args: { clearLastModified: false, ...args.args },
});
}

Expand Down Expand Up @@ -323,6 +325,170 @@ module(basename(__filename), function () {
);
});

test('from-scratch dedup: a duplicate publish for an in-flight from-scratch attaches as a late waiter', async function (assert) {
// From-scratch reindex is the maximal indexing operation for a
// realm: any same-realm from-scratch already running subsumes a
// second concurrent from-scratch by definition. A worker
// claiming the first publish before the second arrives must not
// force a second canonical row, because the second caller's
// result is already covered by what the in-flight job will
// produce.
await runner.destroy();
let realmURL = 'http://example.com/from-scratch-in-flight-dedup/';
let started = new Deferred<void>();
let release = new Deferred<void>();

let worker = new PgQueueRunner({
adapter,
workerId: 'from-scratch-in-flight-worker',
});
worker.register('from-scratch-index', async () => {
started.fulfill();
await release.promise;
return {
invalidations: [],
ignoreData: {},
stats: {
instancesIndexed: 0,
filesIndexed: 0,
instanceErrors: 0,
fileErrors: 0,
totalIndexEntries: 0,
},
};
});

try {
await worker.start();

let first = await publishFromScratchIndexJob({
priority: 0,
args: {
realmURL,
realmUsername: 'owner',
},
});
// Wait for the worker to actually claim the job — that moves
// the row from `candidates` to `inFlightCandidates`, which is
// the precondition for the pending-candidate path to miss and
// the in-flight fallback to fire.
await started.promise;

let second = await publishFromScratchIndexJob({
priority: userInitiatedPriority,
args: {
realmURL,
realmUsername: 'owner',
},
});

assert.strictEqual(
first.id,
second.id,
'duplicate publish reuses the in-flight job id instead of creating a new row',
);

let rows = (await adapter.execute(
`SELECT id
FROM jobs
WHERE concurrency_group = $1
AND status = 'unfulfilled'`,
{ bind: [`indexing:${realmURL}`] },
)) as { id: number }[];
assert.strictEqual(
rows.length,
1,
'only one from-scratch-index row exists; duplicate did not enqueue a second job',
);

release.fulfill();
await Promise.all([first.done, second.done]);
} finally {
release.fulfill();
await worker.destroy();
}
});

test('from-scratch dedup: a clearLastModified publish does NOT attach to an in-flight from-scratch', async function (assert) {
// A clearLastModified publish has already nulled
// boxel_index.last_modified for the realm so the next from-scratch
// pass re-renders every row. An already-running from-scratch read
// its mtimes snapshot before that clear, so joining the running
// job would let the caller observe a successful job that did NOT
// re-render the swapped files (e.g. a publish-realm caller would
// return ok despite never having indexed the new content).
await runner.destroy();
let realmURL = 'http://example.com/from-scratch-clear-last-modified/';
let started = new Deferred<void>();
let release = new Deferred<void>();

let worker = new PgQueueRunner({
adapter,
workerId: 'from-scratch-clear-worker',
});
worker.register('from-scratch-index', async () => {
started.fulfill();
await release.promise;
return {
invalidations: [],
ignoreData: {},
stats: {
instancesIndexed: 0,
filesIndexed: 0,
instanceErrors: 0,
fileErrors: 0,
totalIndexEntries: 0,
},
};
});

try {
await worker.start();

let first = await publishFromScratchIndexJob({
priority: 0,
args: { realmURL, realmUsername: 'owner' },
});
await started.promise;

// Second publish flags clearLastModified — must not coalesce
// onto the running first job.
let second = await publishFromScratchIndexJob({
priority: userInitiatedPriority,
args: {
realmURL,
realmUsername: 'owner',
clearLastModified: true,
},
});

assert.notStrictEqual(
first.id,
second.id,
'clearLastModified publish does not attach to the in-flight job',
);

let rows = (await adapter.execute(
`SELECT id, status
FROM jobs
WHERE concurrency_group = $1
ORDER BY id`,
{ bind: [`indexing:${realmURL}`] },
)) as { id: number; status: string }[];
assert.strictEqual(
rows.length,
2,
'a fresh row is inserted for the clearLastModified publish',
);

release.fulfill();
await Promise.all([first.done, second.done]);
} finally {
release.fulfill();
await worker.destroy();
}
});

test('from-scratch does not coalesce onto pending incremental in same group', async function (assert) {
await runner.destroy();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,12 @@ module(`server-endpoints/${basename(__filename)}`, function () {
{
realmURL,
realmUsername: owner,
// The grafana reindex path passes clearLastModified: true so
// every file in boxel_index re-renders even when its mtime
// hasn't changed. Surfaced in args so the from-scratch
// coalesce can refuse to attach this kind of publish to an
// already-running same-realm from-scratch.
clearLastModified: true,
},
'realm args are correct',
);
Expand Down
7 changes: 6 additions & 1 deletion packages/runtime-common/jobs/reindex-realm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@ export async function enqueueReindexRealmJob(
priority: number,
opts?: EnqueueReindexRealmJobOptions,
) {
// Flagging the args so the from-scratch coalesce can refuse to attach
// a forced-refresh publish to an already-running same-realm
// from-scratch whose mtimes snapshot pre-dates the clear below.
let clearLastModified = opts?.clearLastModified === true;
let args = {
realmURL: realmUrl,
realmUsername,
clearLastModified,
};
if (opts?.clearLastModified) {
if (clearLastModified) {
await query(dbAdapter, [
`UPDATE boxel_index SET last_modified = NULL WHERE realm_url =`,
param(realmUrl),
Expand Down
79 changes: 60 additions & 19 deletions packages/runtime-common/tasks/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,18 @@ export interface IncrementalDoneResult extends IncrementalResult {
clientRequestId: string | null;
}

export type FromScratchArgs = WorkerArgs;
export interface FromScratchArgs extends WorkerArgs {
// True when the caller cleared `boxel_index.last_modified` for the
// realm before publishing. The worker doesn't need to act on this
// (the clear already happened in the DB) — it's surfaced in args
// so the coalesce decision can refuse to attach a clearing publish
// to an already-running same-realm from-scratch whose
// `Batch.getModifiedTimes` snapshot pre-dates the clear, which would
// otherwise let the running job report success without re-rendering
// the swapped files. Always present (non-optional) so the args
// object satisfies WorkerArgs's JSON-shape index signature.
clearLastModified: boolean;
}

export interface FromScratchResult extends JSONTypes.Object {
invalidations: string[];
Expand Down Expand Up @@ -239,29 +250,59 @@ function chooseIncrementalCoalesceDecision(
function chooseFromScratchCoalesceDecision(
context: QueueCoalesceContext,
): QueueCoalesceDecision {
let { incoming, candidates } = context;
let { incoming, candidates, inFlightCandidates } = context;
let sameTypeCandidate = candidates.find(
(candidate) => candidate.jobType === incoming.jobType,
);
if (!sameTypeCandidate) {
return { type: 'insert' };
if (sameTypeCandidate) {
return {
type: 'join',
jobId: sameTypeCandidate.id,
update: {
...maxPriorityAndTimeout(sameTypeCandidate, incoming),
args: {
...(isObjectLike(sameTypeCandidate.args)
? sameTypeCandidate.args
: {}),
...(isObjectLike(incoming.args) ? incoming.args : {}),
coalescedCallers: mergeCoalescedCallers(
getCoalescedCallers(sameTypeCandidate.args),
getCoalescedCallers(incoming.args),
),
},
},
};
}

return {
type: 'join',
jobId: sameTypeCandidate.id,
update: {
...maxPriorityAndTimeout(sameTypeCandidate, incoming),
args: {
...(isObjectLike(sameTypeCandidate.args) ? sameTypeCandidate.args : {}),
...(isObjectLike(incoming.args) ? incoming.args : {}),
coalescedCallers: mergeCoalescedCallers(
getCoalescedCallers(sameTypeCandidate.args),
getCoalescedCallers(incoming.args),
),
},
},
};
// No still-pending candidate. Attach to an in-flight same-realm
// from-scratch instead — same concurrency group + same jobType is
// sufficient because a from-scratch reindex subsumes any other
// from-scratch for that realm by definition. Without this fallback,
// a worker claiming the first enqueue between two pre-claim publishes
// forces the second to insert a fresh row at its own priority, even
// though the in-flight job will produce exactly the result the second
// caller wanted.
//
// Exception: a publish carrying `clearLastModified: true` has already
// nulled `boxel_index.last_modified` for the realm so the next
// from-scratch pass re-renders every row even where mtimes didn't
// change. An already-running from-scratch read its mtimes snapshot
// before that clear, so attaching this publish to it would let the
// caller observe a successful job that did NOT actually re-render
// the swapped files. Force a fresh row instead.
if (!incomingClearsLastModified(incoming.args)) {
for (let candidate of inFlightCandidates) {
if (candidate.jobType === incoming.jobType) {
return { type: 'join', jobId: candidate.id };
}
}
}

return { type: 'insert' };
}

function incomingClearsLastModified(args: unknown): boolean {
return isObjectLike(args) && args.clearLastModified === true;
}

registerQueueJobDefinition({
Expand Down
Loading