From 21f9330ad3054ad735f4a44418ff21d6111977e2 Mon Sep 17 00:00:00 2001 From: Alec Wang Date: Tue, 7 Apr 2026 00:30:37 -0700 Subject: [PATCH] fix out-of-order session recovery Treat forward sequence gaps as unrecoverable session state so reconnects create a fresh connection instead of reusing a poisoned session. --- transport/client.ts | 9 +- transport/server.ts | 4 +- .../sessionStateMachine/SessionConnected.ts | 13 +- .../sessionStateMachine/stateMachine.test.ts | 58 ++++++++ transport/transport.test.ts | 127 ++++++++++++++++++ 5 files changed, 201 insertions(+), 10 deletions(-) diff --git a/transport/client.ts b/transport/client.ts index 0d5d876c..99429cce 100644 --- a/transport/client.ts +++ b/transport/client.ts @@ -328,10 +328,12 @@ export abstract class ClientTransport< onMessage: (msg) => { this.handleMsg(msg); }, - onInvalidMessage: (reason) => { + onInvalidMessage: (reason, transportMessage, options) => { + const sessionTo = connectedSession.to; + this.log?.error(`invalid message: ${reason}`, { ...connectedSession.loggingMetadata, - transportMessage: msg, + transportMessage: transportMessage ?? msg, }); this.protocolError({ @@ -339,6 +341,9 @@ export abstract class ClientTransport< message: reason, }); this.deleteSession(connectedSession, { unhealthy: true }); + if (options?.reconnect) { + this.tryReconnecting(sessionTo); + } }, onMessageSendFailure: (msg, reason) => { this.log?.error(`failed to send message: ${reason}`, { diff --git a/transport/server.ts b/transport/server.ts index 78cf8042..853e0f15 100644 --- a/transport/server.ts +++ b/transport/server.ts @@ -530,10 +530,10 @@ export abstract class ServerTransport< onMessage: (msg) => { this.handleMsg(msg); }, - onInvalidMessage: (reason) => { + onInvalidMessage: (reason, transportMessage) => { this.log?.error(`invalid message: ${reason}`, { ...connectedSession.loggingMetadata, - transportMessage: msg, + transportMessage: transportMessage ?? msg, }); this.protocolError({ diff --git a/transport/sessionStateMachine/SessionConnected.ts b/transport/sessionStateMachine/SessionConnected.ts index 1316ca5f..baa29f11 100644 --- a/transport/sessionStateMachine/SessionConnected.ts +++ b/transport/sessionStateMachine/SessionConnected.ts @@ -21,7 +21,11 @@ export interface SessionConnectedListeners extends IdentifiedSessionListeners { onConnectionErrored: (err: unknown) => void; onConnectionClosed: () => void; onMessage: (msg: OpaqueTransportMessage) => void; - onInvalidMessage: (reason: string) => void; + onInvalidMessage: ( + reason: string, + transportMessage?: OpaqueTransportMessage, + options?: { reconnect: boolean }, + ) => void; } export interface SessionConnectedProps @@ -203,11 +207,10 @@ export class SessionConnected< }, ); } else { - const reason = `received out-of-order msg, closing connection (got seq: ${parsedMsg.seq}, wanted seq: ${this.ack})`; + const reason = `received out-of-order msg, closing session (got seq: ${parsedMsg.seq}, wanted seq: ${this.ack})`; this.log?.error(reason, { ...this.loggingMetadata, transportMessage: parsedMsg, - tags: ['invariant-violation'], }); this.telemetry.span.setStatus({ @@ -215,9 +218,7 @@ export class SessionConnected< message: reason, }); - // try to recover by closing the connection and re-handshaking - // with the session intact - this.conn.close(); + this.listeners.onInvalidMessage(reason, parsedMsg, { reconnect: true }); } return; diff --git a/transport/sessionStateMachine/stateMachine.test.ts b/transport/sessionStateMachine/stateMachine.test.ts index 0d825daa..96eda738 100644 --- a/transport/sessionStateMachine/stateMachine.test.ts +++ b/transport/sessionStateMachine/stateMachine.test.ts @@ -1982,6 +1982,64 @@ describe('session state machine', () => { // should not have transitioned to the next state expect(session.state).toBe(SessionState.Connected); }); + + test('connected event listeners: parse failures are invalid without reconnect hints', async () => { + const sessionHandle = await createSessionConnected(); + const session = sessionHandle.session; + const conn = session.conn; + + conn.emitData(new Uint8Array([255])); + + await waitFor(async () => { + expect(sessionHandle.onInvalidMessage).toHaveBeenCalledTimes(1); + expect(sessionHandle.onInvalidMessage).toHaveBeenCalledWith( + expect.stringContaining('could not parse message:'), + ); + expect(sessionHandle.onConnectionClosed).not.toHaveBeenCalled(); + }); + + expect(conn.status).toBe('open'); + expect(session.state).toBe(SessionState.Connected); + }); + + test('connected event listeners: out-of-order messages are invalid', async () => { + const sessionHandle = await createSessionConnected(); + const session = sessionHandle.session; + const conn = session.conn; + const reason = + 'received out-of-order msg, closing session (got seq: 1, wanted seq: 0)'; + + conn.emitData( + session.options.codec.toBuffer({ + id: 'msgid', + to: session.from, + from: session.to, + seq: 1, + ack: 0, + streamId: 'stream', + controlFlags: 0, + payload: 'hello', + }), + ); + + await waitFor(async () => { + expect(sessionHandle.onInvalidMessage).toHaveBeenCalledTimes(1); + expect(sessionHandle.onInvalidMessage).toHaveBeenCalledWith( + reason, + expect.objectContaining({ + seq: 1, + ack: 0, + streamId: 'stream', + }), + { reconnect: true }, + ); + expect(sessionHandle.onConnectionClosed).not.toHaveBeenCalled(); + expect(sessionHandle.onMessage).not.toHaveBeenCalled(); + }); + + expect(conn.status).toBe('open'); + expect(session.state).toBe(SessionState.Connected); + }); }); describe('heartbeats', () => { diff --git a/transport/transport.test.ts b/transport/transport.test.ts index 8c93f287..a6a3f821 100644 --- a/transport/transport.test.ts +++ b/transport/transport.test.ts @@ -166,6 +166,133 @@ describe.each(testMatrix())( }); }); + test('out-of-order messages delete poisoned sessions before reconnect', async () => { + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + clientTransport.connect(serverTransport.clientId); + + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + await waitFor(() => { + expect(numberOfConnections(clientTransport)).toBe(1); + expect(numberOfConnections(serverTransport)).toBe(1); + }); + + const clientSession = clientTransport.sessions.get('SERVER'); + assert(clientSession); + expect(clientSession.state).toBe(SessionState.Connected); + + if (clientSession.state !== SessionState.Connected) { + throw new Error('expected a connected client session'); + } + + const badMsgRes = clientSession.codec.toBuffer({ + id: 'bad-msg', + to: serverTransport.clientId, + from: clientTransport.clientId, + seq: clientSession.seq + 1, + ack: clientSession.ack, + streamId: 'stream', + controlFlags: 0, + payload: { msg: 'bad' }, + }); + expect(badMsgRes.ok).toBe(true); + + if (!badMsgRes.ok) { + throw new Error('expected out-of-order message to encode'); + } + + expect(clientSession.conn.send(badMsgRes.value)).toBe(true); + + await waitFor(() => + expect(serverTransport.sessions.has('client')).toBe(false), + ); + await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0)); + await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0)); + + await advanceFakeTimersByConnectionBackoff(); + await waitFor(() => { + expect(numberOfConnections(clientTransport)).toBe(1); + expect(numberOfConnections(serverTransport)).toBe(1); + }); + + const sendFn = getClientSendFn(clientTransport, serverTransport); + const msg = createDummyTransportMessage(); + const msgId = sendFn(msg); + await expect( + waitForMessage(serverTransport, (recv) => recv.id === msgId), + ).resolves.toStrictEqual(msg.payload); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + }); + }); + + test('client hard reconnects after receiving out-of-order messages', async () => { + const clientTransport = getClientTransport('client'); + const serverTransport = getServerTransport(); + clientTransport.connect(serverTransport.clientId); + + addPostTestCleanup(async () => { + await cleanupTransports([clientTransport, serverTransport]); + }); + + await waitFor(() => { + expect(numberOfConnections(clientTransport)).toBe(1); + expect(numberOfConnections(serverTransport)).toBe(1); + }); + + const serverSession = serverTransport.sessions.get('client'); + assert(serverSession); + expect(serverSession.state).toBe(SessionState.Connected); + + if (serverSession.state !== SessionState.Connected) { + throw new Error('expected a connected server session'); + } + + const badMsgRes = serverSession.codec.toBuffer({ + id: 'bad-msg', + to: clientTransport.clientId, + from: serverTransport.clientId, + seq: serverSession.seq + 1, + ack: serverSession.ack, + streamId: 'stream', + controlFlags: 0, + payload: { msg: 'bad' }, + }); + expect(badMsgRes.ok).toBe(true); + + if (!badMsgRes.ok) { + throw new Error('expected out-of-order message to encode'); + } + + expect(serverSession.conn.send(badMsgRes.value)).toBe(true); + + await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0)); + await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0)); + + await advanceFakeTimersByConnectionBackoff(); + await waitFor(() => { + expect(numberOfConnections(clientTransport)).toBe(1); + expect(numberOfConnections(serverTransport)).toBe(1); + }); + + const sendFn = getServerSendFn(serverTransport, clientTransport); + const msg = createDummyTransportMessage(); + const msgId = sendFn(msg); + await expect( + waitForMessage(clientTransport, (recv) => recv.id === msgId), + ).resolves.toStrictEqual(msg.payload); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + }); + }); + test('idle transport cleans up nicely', async () => { const clientTransport = getClientTransport('client'); const serverTransport = getServerTransport();