Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 201 additions & 19 deletions lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ const Packets = require('../packets/index.js');
const Commands = require('../commands/index.js');
const ConnectionConfig = require('../connection_config.js');
const CharsetToEncoding = require('../constants/charset_encodings.js');
const {
traceCallback,
tracePromise,
getServerContext,
shouldTrace,
queryChannel,
executeChannel,
connectChannel,
} = require('../tracing.js');

let _connectionId = 0;

Expand Down Expand Up @@ -141,6 +150,41 @@ class BaseConnection extends EventEmitter {
this._notifyError(err);
});
this.addCommand(handshakeCommand);

// Trace the connection handshake
if (shouldTrace(connectChannel)) {
const config = this.config;
tracePromise(
connectChannel,
() =>
new Promise((resolve, reject) => {
/* eslint-disable prefer-const */
let onConnect, onError;
onConnect = (param) => {
this.removeListener('error', onError);
resolve(param);
};
onError = (err) => {
this.removeListener('connect', onConnect);
reject(err);
};
/* eslint-enable prefer-const */
this.once('connect', onConnect);
this.once('error', onError);
}),
() => {
const server = getServerContext(config);
return {
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
user: config.user || '',
};
}
).catch(() => {
// errors are already handled by the handshake error listener
});
}
}
// in case there was no initial handshake but we need to read sting, assume it utf-8
// most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
Expand Down Expand Up @@ -601,7 +645,62 @@ class BaseConnection extends EventEmitter {
cmdQuery.values !== undefined ? cmdQuery.values : []
);
cmdQuery.sql = rawSql;
return this.addCommand(cmdQuery);

if (!shouldTrace(queryChannel)) {
return this.addCommand(cmdQuery);
}

const config = this.config;
const origCb = cmdQuery.onResult;

if (origCb) {
// Callback mode: use traceCallback for synchronous wrapping
traceCallback(
queryChannel,
(wrappedCb) => {
cmdQuery.onResult = wrappedCb;
this.addCommand(cmdQuery);
},
0,
() => {
const server = getServerContext(config);
return {
query: cmdQuery.sql,
values: cmdQuery.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
},
null,
origCb
);
} else {
// Event-emitter mode: use tracePromise since there is no callback
tracePromise(
queryChannel,
() =>
new Promise((resolve, reject) => {
cmdQuery.once('error', reject);
cmdQuery.once('end', () => resolve());
this.addCommand(cmdQuery);
}),
() => {
const server = getServerContext(config);
return {
query: cmdQuery.sql,
values: cmdQuery.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
).catch(() => {
// errors are already emitted on the command
});
}

return cmdQuery;
}

pause() {
Expand Down Expand Up @@ -702,25 +801,108 @@ class BaseConnection extends EventEmitter {
});
}
const executeCommand = new Commands.Execute(options, cb);
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed, we have main
// combined callback here
executeCommand.start = function () {
return null;
};
if (cb) {
cb(err);
} else {
executeCommand.emit('error', err);

if (!shouldTrace(executeChannel)) {
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed, we have main
// combined callback here
executeCommand.start = function () {
return null;
};
if (cb) {
cb(err);
} else {
executeCommand.emit('error', err);
}
executeCommand.emit('end');
return;
}
executeCommand.emit('end');
return;
}
executeCommand.statement = stmt;
});
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
executeCommand.statement = stmt;
});
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
return executeCommand;
}

const config = this.config;
const origExecCb = executeCommand.onResult;

if (origExecCb) {
// Callback mode: use traceCallback for synchronous wrapping
traceCallback(
executeChannel,
(wrappedCb) => {
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
executeCommand.start = function () {
return null;
};
executeCommand.emit('end');
wrappedCb(err);
return;
}
executeCommand.statement = stmt;
});
executeCommand.onResult = wrappedCb;
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
},
0,
() => {
const server = getServerContext(config);
return {
query: options.sql,
values: options.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
},
null,
origExecCb
);
} else {
// Event-emitter mode: use tracePromise since there is no callback
tracePromise(
executeChannel,
() =>
new Promise((resolve, reject) => {
const prepareCommand = new Commands.Prepare(
options,
(err, stmt) => {
if (err) {
executeCommand.start = function () {
return null;
};
executeCommand.emit('error', err);
executeCommand.emit('end');
reject(err);
return;
}
executeCommand.statement = stmt;
}
);
executeCommand.once('error', reject);
executeCommand.once('end', () => resolve());
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
}),
() => {
const server = getServerContext(config);
return {
query: options.sql,
values: options.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
).catch(() => {
// errors are already emitted on the command
});
}

return executeCommand;
}

Expand Down
28 changes: 28 additions & 0 deletions lib/base/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ const PoolConnection = require('../pool_connection.js');
const Queue = require('denque');
const BaseConnection = require('./connection.js');
const Errors = require('../constants/errors.js');
const {
traceCallback,
getServerContext,
shouldTrace,
poolConnectChannel,
} = require('../tracing.js');

// Source: https://github.com/go-sql-driver/mysql/blob/76c00e35a8d48f8f70f0e7dffe584692bd3fa612/packets.go#L598-L613
function isReadOnlyError(err) {
Expand Down Expand Up @@ -49,6 +55,28 @@ class BasePool extends EventEmitter {
}

getConnection(cb) {
if (!shouldTrace(poolConnectChannel)) {
return this._getConnection(cb);
}
const config = this.config.connectionConfig;
traceCallback(
poolConnectChannel,
this._getConnection.bind(this),
0,
() => {
const server = getServerContext(config);
return {
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
},
null,
cb
);
}

_getConnection(cb) {
if (this._closed) {
return process.nextTick(() => cb(new Error('Pool is closed.')));
}
Expand Down
71 changes: 71 additions & 0 deletions lib/tracing.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import type { TracingChannel } from 'node:diagnostics_channel';

export interface QueryTraceContext {
query: string;
values: any;
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export interface ExecuteTraceContext {
query: string;
values: any;
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export interface ConnectTraceContext {
database: string;
serverAddress: string;
serverPort: number | undefined;
user: string;
}

export interface PoolConnectTraceContext {
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export declare const dc: typeof import('node:diagnostics_channel') | undefined;
export declare const hasTracingChannel: boolean;

export declare function shouldTrace(
channel: TracingChannel<object> | undefined | null
): boolean;

export declare function traceCallback<T extends object>(
channel: TracingChannel<T> | undefined | null,
fn: (...args: any[]) => any,
position: number,
contextFactory: () => T,
thisArg: any,
...args: any[]
): any;

export declare function tracePromise<T extends object, R>(
channel: TracingChannel<T> | undefined | null,
fn: () => Promise<R>,
contextFactory: () => T
): Promise<R>;

export declare const queryChannel:
| TracingChannel<QueryTraceContext>
| undefined;
export declare const executeChannel:
| TracingChannel<ExecuteTraceContext>
| undefined;
export declare const connectChannel:
| TracingChannel<ConnectTraceContext>
| undefined;
export declare const poolConnectChannel:
| TracingChannel<PoolConnectTraceContext>
| undefined;

export declare function getServerContext(config: {
socketPath?: string;
host?: string;
port?: number;
}): { serverAddress: string; serverPort: number | undefined };
Loading
Loading