Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,22 @@ export abstract class ClientTransport<
onMessage: (msg) => {
this.handleMsg(msg);
},
onInvalidMessage: (reason) => {
onInvalidMessage: (reason, transportMessage, options) => {
const sessionTo = connectedSession.to;

this.log?.error(`invalid message: ${reason}`, {
...connectedSession.loggingMetadata,
transportMessage: msg,
transportMessage: transportMessage ?? msg,
});

this.protocolError({
type: ProtocolError.InvalidMessage,
message: reason,
});
this.deleteSession(connectedSession, { unhealthy: true });
if (options?.reconnect) {
this.tryReconnecting(sessionTo);
}
},
onMessageSendFailure: (msg, reason) => {
this.log?.error(`failed to send message: ${reason}`, {
Expand Down
4 changes: 2 additions & 2 deletions transport/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -530,10 +530,10 @@ export abstract class ServerTransport<
onMessage: (msg) => {
this.handleMsg(msg);
},
onInvalidMessage: (reason) => {
onInvalidMessage: (reason, transportMessage) => {
this.log?.error(`invalid message: ${reason}`, {
...connectedSession.loggingMetadata,
transportMessage: msg,
transportMessage: transportMessage ?? msg,
});

this.protocolError({
Expand Down
13 changes: 7 additions & 6 deletions transport/sessionStateMachine/SessionConnected.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ export interface SessionConnectedListeners extends IdentifiedSessionListeners {
onConnectionErrored: (err: unknown) => void;
onConnectionClosed: () => void;
onMessage: (msg: OpaqueTransportMessage) => void;
onInvalidMessage: (reason: string) => void;
onInvalidMessage: (
reason: string,
transportMessage?: OpaqueTransportMessage,
options?: { reconnect: boolean },
) => void;
}

export interface SessionConnectedProps<ConnType extends Connection>
Expand Down Expand Up @@ -203,21 +207,18 @@ export class SessionConnected<
},
);
} else {
const reason = `received out-of-order msg, closing connection (got seq: ${parsedMsg.seq}, wanted seq: ${this.ack})`;
const reason = `received out-of-order msg, closing session (got seq: ${parsedMsg.seq}, wanted seq: ${this.ack})`;
this.log?.error(reason, {
...this.loggingMetadata,
transportMessage: parsedMsg,
tags: ['invariant-violation'],
});

this.telemetry.span.setStatus({
code: SpanStatusCode.ERROR,
message: reason,
});

// try to recover by closing the connection and re-handshaking
// with the session intact
this.conn.close();
this.listeners.onInvalidMessage(reason, parsedMsg, { reconnect: true });
}

return;
Expand Down
58 changes: 58 additions & 0 deletions transport/sessionStateMachine/stateMachine.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,64 @@ describe('session state machine', () => {
// should not have transitioned to the next state
expect(session.state).toBe(SessionState.Connected);
});

test('connected event listeners: parse failures are invalid without reconnect hints', async () => {
const sessionHandle = await createSessionConnected();
const session = sessionHandle.session;
const conn = session.conn;

conn.emitData(new Uint8Array([255]));

await waitFor(async () => {
expect(sessionHandle.onInvalidMessage).toHaveBeenCalledTimes(1);
expect(sessionHandle.onInvalidMessage).toHaveBeenCalledWith(
expect.stringContaining('could not parse message:'),
);
expect(sessionHandle.onConnectionClosed).not.toHaveBeenCalled();
});

expect(conn.status).toBe('open');
expect(session.state).toBe(SessionState.Connected);
});

test('connected event listeners: out-of-order messages are invalid', async () => {
const sessionHandle = await createSessionConnected();
const session = sessionHandle.session;
const conn = session.conn;
const reason =
'received out-of-order msg, closing session (got seq: 1, wanted seq: 0)';

conn.emitData(
session.options.codec.toBuffer({
id: 'msgid',
to: session.from,
from: session.to,
seq: 1,
ack: 0,
streamId: 'stream',
controlFlags: 0,
payload: 'hello',
}),
);

await waitFor(async () => {
expect(sessionHandle.onInvalidMessage).toHaveBeenCalledTimes(1);
expect(sessionHandle.onInvalidMessage).toHaveBeenCalledWith(
reason,
expect.objectContaining({
seq: 1,
ack: 0,
streamId: 'stream',
}),
{ reconnect: true },
);
expect(sessionHandle.onConnectionClosed).not.toHaveBeenCalled();
expect(sessionHandle.onMessage).not.toHaveBeenCalled();
});

expect(conn.status).toBe('open');
expect(session.state).toBe(SessionState.Connected);
});
});

describe('heartbeats', () => {
Expand Down
127 changes: 127 additions & 0 deletions transport/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,133 @@ describe.each(testMatrix())(
});
});

test('out-of-order messages delete poisoned sessions before reconnect', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
clientTransport.connect(serverTransport.clientId);

addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

await waitFor(() => {
expect(numberOfConnections(clientTransport)).toBe(1);
expect(numberOfConnections(serverTransport)).toBe(1);
});

const clientSession = clientTransport.sessions.get('SERVER');
assert(clientSession);
expect(clientSession.state).toBe(SessionState.Connected);

if (clientSession.state !== SessionState.Connected) {
throw new Error('expected a connected client session');
}

const badMsgRes = clientSession.codec.toBuffer({
id: 'bad-msg',
to: serverTransport.clientId,
from: clientTransport.clientId,
seq: clientSession.seq + 1,
ack: clientSession.ack,
streamId: 'stream',
controlFlags: 0,
payload: { msg: 'bad' },
});
expect(badMsgRes.ok).toBe(true);

if (!badMsgRes.ok) {
throw new Error('expected out-of-order message to encode');
}

expect(clientSession.conn.send(badMsgRes.value)).toBe(true);

await waitFor(() =>
expect(serverTransport.sessions.has('client')).toBe(false),
);
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0));
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0));

await advanceFakeTimersByConnectionBackoff();
await waitFor(() => {
expect(numberOfConnections(clientTransport)).toBe(1);
expect(numberOfConnections(serverTransport)).toBe(1);
});

const sendFn = getClientSendFn(clientTransport, serverTransport);
const msg = createDummyTransportMessage();
const msgId = sendFn(msg);
await expect(
waitForMessage(serverTransport, (recv) => recv.id === msgId),
).resolves.toStrictEqual(msg.payload);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
});
});

test('client hard reconnects after receiving out-of-order messages', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
clientTransport.connect(serverTransport.clientId);

addPostTestCleanup(async () => {
await cleanupTransports([clientTransport, serverTransport]);
});

await waitFor(() => {
expect(numberOfConnections(clientTransport)).toBe(1);
expect(numberOfConnections(serverTransport)).toBe(1);
});

const serverSession = serverTransport.sessions.get('client');
assert(serverSession);
expect(serverSession.state).toBe(SessionState.Connected);

if (serverSession.state !== SessionState.Connected) {
throw new Error('expected a connected server session');
}

const badMsgRes = serverSession.codec.toBuffer({
id: 'bad-msg',
to: clientTransport.clientId,
from: serverTransport.clientId,
seq: serverSession.seq + 1,
ack: serverSession.ack,
streamId: 'stream',
controlFlags: 0,
payload: { msg: 'bad' },
});
expect(badMsgRes.ok).toBe(true);

if (!badMsgRes.ok) {
throw new Error('expected out-of-order message to encode');
}

expect(serverSession.conn.send(badMsgRes.value)).toBe(true);

await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0));
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0));

await advanceFakeTimersByConnectionBackoff();
await waitFor(() => {
expect(numberOfConnections(clientTransport)).toBe(1);
expect(numberOfConnections(serverTransport)).toBe(1);
});

const sendFn = getServerSendFn(serverTransport, clientTransport);
const msg = createDummyTransportMessage();
const msgId = sendFn(msg);
await expect(
waitForMessage(clientTransport, (recv) => recv.id === msgId),
).resolves.toStrictEqual(msg.payload);

await testFinishesCleanly({
clientTransports: [clientTransport],
serverTransport,
});
});

test('idle transport cleans up nicely', async () => {
const clientTransport = getClientTransport('client');
const serverTransport = getServerTransport();
Expand Down
Loading