diff --git a/.changeset/add-onerror-retry-backoff.md b/.changeset/add-onerror-retry-backoff.md new file mode 100644 index 0000000000..588d5dfa38 --- /dev/null +++ b/.changeset/add-onerror-retry-backoff.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/client': patch +--- + +Add exponential backoff to onError-driven retries to prevent tight loops on persistent 4xx errors (e.g. expired auth tokens returning 403). The backoff uses jitter with a 100ms base and 30s cap, and is abort-aware so stream teardown remains responsive. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index bc64a06054..70439290b6 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -621,6 +621,10 @@ export class ShapeStream = Row> #fastLoopBackoffMaxMs = 5_000 #fastLoopConsecutiveCount = 0 #fastLoopMaxCount = 5 + // onError retry backoff: prevent tight loops when onError always returns {} + #onErrorRetryCount = 0 + #onErrorBackoffBaseMs = 100 + #onErrorBackoffMaxMs = 30_000 #pendingRequestShapeCacheBuster?: string #maxSnapshotRetries = 5 @@ -770,6 +774,39 @@ export class ShapeStream = Row> this.#fastLoopConsecutiveCount = 0 this.#recentRequestEntries = [] + // Apply exponential backoff with jitter to prevent tight retry loops + // (e.g., when onError always returns {} on persistent 4xx errors) + const retryCount = ++this.#onErrorRetryCount + const maxDelay = Math.min( + this.#onErrorBackoffMaxMs, + this.#onErrorBackoffBaseMs * Math.pow(2, retryCount - 1) + ) + const delayMs = Math.floor(Math.random() * maxDelay) + if (retryCount > 1) { + console.warn( + `[Electric] onError retry backoff: waiting ${Math.round(delayMs / 1000)}s before retry ` + + `(attempt ${retryCount}). ` + + `Previous error: ${(err as Error)?.message ?? err}` + ) + } + await new Promise((resolve) => { + const signal = this.options.signal + const onAbort = () => { + clearTimeout(timer) + resolve() + } + const timer = setTimeout(() => { + signal?.removeEventListener(`abort`, onAbort) + resolve() + }, delayMs) + signal?.addEventListener(`abort`, onAbort, { once: true }) + }) + + if (this.options.signal?.aborted) { + this.#teardown() + return + } + // Restart from current offset this.#started = false await this.#start() diff --git a/packages/typescript-client/test/stream.test.ts b/packages/typescript-client/test/stream.test.ts index fba7e8e885..6b5d07bfee 100644 --- a/packages/typescript-client/test/stream.test.ts +++ b/packages/typescript-client/test/stream.test.ts @@ -706,4 +706,209 @@ describe(`ShapeStream`, () => { warnSpy.mockRestore() }) + + it(`should apply exponential backoff on onError retries for persistent 4xx errors`, async () => { + // When onError always returns {} on a persistent 4xx error, the retry + // delay should increase exponentially rather than retrying immediately. + const requestTimestamps: number[] = [] + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestTimestamps.push(Date.now()) + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for enough retries so we can compare early vs late gaps + await vi.waitFor( + () => { + expect(requestTimestamps.length).toBeGreaterThanOrEqual(6) + }, + { timeout: 15_000 } + ) + + // Verify gaps between requests grow over time (exponential backoff). + // Compare the sum of the first half vs the second half of gaps to be + // robust against jitter on any individual gap. + const gaps = requestTimestamps + .slice(1) + .map((t, i) => t - requestTimestamps[i]!) + + const mid = Math.floor(gaps.length / 2) + const earlySum = gaps.slice(0, mid).reduce((a, b) => a + b, 0) + const lateSum = gaps.slice(mid).reduce((a, b) => a + b, 0) + expect(lateSum).toBeGreaterThan(earlySum) + + warnSpy.mockRestore() + }) + + it(`should tear down immediately when aborted during onError backoff`, async () => { + // When the stream is in the middle of a backoff delay and the user + // aborts, it should tear down promptly rather than waiting for the timer. + let requestCount = 0 + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestCount++ + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const localAborter = new AbortController() + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: localAborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for at least one retry so we know backoff is active + await vi.waitFor( + () => { + expect(requestCount).toBeGreaterThanOrEqual(2) + }, + { timeout: 5_000 } + ) + + const countBeforeAbort = requestCount + + // Abort the stream + localAborter.abort() + + // Give a tick for teardown + await resolveInMacrotask(undefined) + + // No more requests should have been made after abort + expect(requestCount).toBe(countBeforeAbort) + + warnSpy.mockRestore() + }) + + it(`should warn on 2nd+ onError retry attempt`, async () => { + // The stream should log a console.warn starting from the 2nd retry + // to help developers diagnose persistent error loops. + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + let requestCount = 0 + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestCount++ + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: aborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for enough retries to trigger the warning + await vi.waitFor( + () => { + expect(requestCount).toBeGreaterThanOrEqual(3) + }, + { timeout: 15_000 } + ) + + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining(`onError retry backoff`) + ) + + warnSpy.mockRestore() + }) + + it(`should clean up abort listeners after onError backoff timer expires`, async () => { + // When the backoff timer expires normally (not via abort), the abort + // listener must be removed to prevent closure accumulation on + // long-lived streams with many recoverable errors. + let requestCount = 0 + const warnSpy = vi.spyOn(console, `warn`).mockImplementation(() => {}) + const addSpy = vi.fn() + const removeSpy = vi.fn() + + const localAborter = new AbortController() + const originalAdd = localAborter.signal.addEventListener.bind( + localAborter.signal + ) + const originalRemove = localAborter.signal.removeEventListener.bind( + localAborter.signal + ) + localAborter.signal.addEventListener = ( + ...args: Parameters + ) => { + addSpy(...args) + return originalAdd(...args) + } + localAborter.signal.removeEventListener = ( + ...args: Parameters + ) => { + removeSpy(...args) + return originalRemove(...args) + } + + const fetchMock = ( + ..._args: Parameters + ): Promise => { + requestCount++ + return Promise.resolve(new Response(`Forbidden`, { status: 403 })) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `test` }, + signal: localAborter.signal, + fetchClient: fetchMock, + subscribe: false, + onError: () => ({}), + }) + + stream.subscribe(() => {}) + + // Wait for several retries so multiple backoff timers expire normally + await vi.waitFor( + () => { + expect(requestCount).toBeGreaterThanOrEqual(4) + }, + { timeout: 15_000 } + ) + + localAborter.abort() + + // Each backoff cycle should have added AND removed an abort listener. + // The remove count should match the add count (minus 1 for the final + // cycle that was interrupted by abort, where { once: true } handles cleanup). + const abortAdds = addSpy.mock.calls.filter( + (args: unknown[]) => args[0] === `abort` + ).length + const abortRemoves = removeSpy.mock.calls.filter( + (args: unknown[]) => args[0] === `abort` + ).length + expect(abortAdds).toBeGreaterThanOrEqual(3) + expect(abortRemoves).toBeGreaterThanOrEqual(abortAdds - 1) + + warnSpy.mockRestore() + }) }) diff --git a/packages/typescript-client/test/wake-detection.test.ts b/packages/typescript-client/test/wake-detection.test.ts index 28a83650a4..0aa92f10d7 100644 --- a/packages/typescript-client/test/wake-detection.test.ts +++ b/packages/typescript-client/test/wake-detection.test.ts @@ -257,10 +257,10 @@ describe(`Wake detection`, () => { const unsub = stream.subscribe(() => {}) // Let the stream start, hit the 400 error, and retry via onError. - // The error retry path (#start lines 767-769) calls #start() recursively - // WITHOUT calling #teardown() first, so the timer is still alive. - await vi.advanceTimersByTimeAsync(0) + // The error retry path calls #start() recursively after an exponential + // backoff delay, so we need to advance time enough to cover it. await vi.advanceTimersByTimeAsync(0) + await vi.advanceTimersByTimeAsync(200) await vi.advanceTimersByTimeAsync(0) expect(fetchCallCount).toBeGreaterThanOrEqual(2)