Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions packages/realm-server/tests/queue-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,90 @@ module(basename(__filename), function () {
);
});

test('from-scratch dedup: a duplicate publish for an in-flight from-scratch attaches as a late waiter', async function (assert) {
// From-scratch reindex is the maximal indexing operation for a
// realm: any same-realm from-scratch already running subsumes a
// second concurrent from-scratch by definition. A worker
// claiming the first publish before the second arrives must not
// force a second canonical row, because the second caller's
// result is already covered by what the in-flight job will
// produce.
await runner.destroy();
let realmURL = 'http://example.com/from-scratch-in-flight-dedup/';
let started = new Deferred<void>();
let release = new Deferred<void>();

let worker = new PgQueueRunner({
adapter,
workerId: 'from-scratch-in-flight-worker',
});
worker.register('from-scratch-index', async () => {
started.fulfill();
await release.promise;
return {
invalidations: [],
ignoreData: {},
stats: {
instancesIndexed: 0,
filesIndexed: 0,
instanceErrors: 0,
fileErrors: 0,
totalIndexEntries: 0,
},
};
});

try {
await worker.start();

let first = await publishFromScratchIndexJob({
priority: 0,
args: {
realmURL,
realmUsername: 'owner',
},
});
// Wait for the worker to actually claim the job — that moves
// the row from `candidates` to `inFlightCandidates`, which is
// the precondition for the pending-candidate path to miss and
// the in-flight fallback to fire.
await started.promise;

let second = await publishFromScratchIndexJob({
priority: userInitiatedPriority,
args: {
realmURL,
realmUsername: 'owner',
},
});

assert.strictEqual(
first.id,
second.id,
'duplicate publish reuses the in-flight job id instead of creating a new row',
);

let rows = (await adapter.execute(
`SELECT id
FROM jobs
WHERE concurrency_group = $1
AND status = 'unfulfilled'`,
{ bind: [`indexing:${realmURL}`] },
)) as { id: number }[];
assert.strictEqual(
rows.length,
1,
'only one from-scratch-index row exists; duplicate did not enqueue a second job',
);

release.fulfill();
await Promise.all([first.done, second.done]);
} finally {
release.fulfill();
await worker.destroy();
}
});

test('from-scratch does not coalesce onto pending incremental in same group', async function (assert) {
await runner.destroy();

Expand Down
52 changes: 34 additions & 18 deletions packages/runtime-common/tasks/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,29 +239,45 @@ function chooseIncrementalCoalesceDecision(
function chooseFromScratchCoalesceDecision(
context: QueueCoalesceContext,
): QueueCoalesceDecision {
let { incoming, candidates } = context;
let { incoming, candidates, inFlightCandidates } = context;
let sameTypeCandidate = candidates.find(
(candidate) => candidate.jobType === incoming.jobType,
);
if (!sameTypeCandidate) {
return { type: 'insert' };
if (sameTypeCandidate) {
return {
type: 'join',
jobId: sameTypeCandidate.id,
update: {
...maxPriorityAndTimeout(sameTypeCandidate, incoming),
args: {
...(isObjectLike(sameTypeCandidate.args)
? sameTypeCandidate.args
: {}),
...(isObjectLike(incoming.args) ? incoming.args : {}),
coalescedCallers: mergeCoalescedCallers(
getCoalescedCallers(sameTypeCandidate.args),
getCoalescedCallers(incoming.args),
),
},
},
};
}

return {
type: 'join',
jobId: sameTypeCandidate.id,
update: {
...maxPriorityAndTimeout(sameTypeCandidate, incoming),
args: {
...(isObjectLike(sameTypeCandidate.args) ? sameTypeCandidate.args : {}),
...(isObjectLike(incoming.args) ? incoming.args : {}),
coalescedCallers: mergeCoalescedCallers(
getCoalescedCallers(sameTypeCandidate.args),
getCoalescedCallers(incoming.args),
),
},
},
};
// No still-pending candidate. Attach to an in-flight same-realm
// from-scratch instead — same concurrency group + same jobType is
// sufficient because a from-scratch reindex subsumes any other
// from-scratch for that realm by definition. Without this fallback,
// a worker claiming the first enqueue between two pre-claim publishes
// forces the second to insert a fresh row at its own priority, even
// though the in-flight job will produce exactly the result the second
// caller wanted.
for (let candidate of inFlightCandidates) {
if (candidate.jobType === incoming.jobType) {
return { type: 'join', jobId: candidate.id };
Comment thread
habdelra marked this conversation as resolved.
Outdated
}
}

return { type: 'insert' };
}

registerQueueJobDefinition({
Expand Down
Loading