Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 121 additions & 18 deletions lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ const Packets = require('../packets/index.js');
const Commands = require('../commands/index.js');
const ConnectionConfig = require('../connection_config.js');
const CharsetToEncoding = require('../constants/charset_encodings.js');
const {
traceQuery,
traceExecute,
traceConnect,
getServerContext,
connectChannel,
} = require('../tracing.js');

let _connectionId = 0;

Expand Down Expand Up @@ -141,6 +148,29 @@ class BaseConnection extends EventEmitter {
this._notifyError(err);
});
this.addCommand(handshakeCommand);

// Trace the connection handshake
if (connectChannel?.hasSubscribers) {
const config = this.config;
traceConnect(
() =>
new Promise((resolve, reject) => {
this.once('connect', resolve);
this.once('error', reject);
}),
() => {
const server = getServerContext(config);
return {
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
user: config.user || '',
};
}
).catch(() => {
// errors are already handled by the handshake error listener
});
}
}
// in case there was no initial handshake but we need to read sting, assume it utf-8
// most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
Expand Down Expand Up @@ -601,7 +631,44 @@ class BaseConnection extends EventEmitter {
cmdQuery.values !== undefined ? cmdQuery.values : []
);
cmdQuery.sql = rawSql;
return this.addCommand(cmdQuery);

const config = this.config;
const origCb = cmdQuery.onResult;

traceQuery(
() =>
new Promise((resolve, reject) => {
if (origCb) {
cmdQuery.onResult = (err, results, fields) => {
if (err) reject(err);
else resolve({ results, fields });
};
} else {
cmdQuery.once('error', reject);
cmdQuery.once('end', () => resolve());
}
this.addCommand(cmdQuery);
}),
() => {
const server = getServerContext(config);
return {
query: cmdQuery.sql,
values: cmdQuery.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
).then(
(result) => {
if (origCb) origCb(null, result.results, result.fields);
},
(err) => {
if (origCb) origCb(err);
}
);

return cmdQuery;
}

pause() {
Expand Down Expand Up @@ -701,26 +768,62 @@ class BaseConnection extends EventEmitter {
}
});
}
const config = this.config;
const executeCommand = new Commands.Execute(options, cb);
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed, we have main
// combined callback here
executeCommand.start = function () {
return null;
const origExecCb = executeCommand.onResult;

traceExecute(
() =>
new Promise((resolve, reject) => {
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
executeCommand.start = function () {
return null;
};
if (origExecCb) {
// Let the promise handle the error
} else {
executeCommand.emit('error', err);
}
executeCommand.emit('end');
reject(err);
return;
}
executeCommand.statement = stmt;
});

if (origExecCb) {
executeCommand.onResult = (err, results, fields) => {
if (err) reject(err);
else resolve({ results, fields });
};
} else {
executeCommand.once('error', reject);
executeCommand.once('end', () => resolve());
}

this.addCommand(prepareCommand);
this.addCommand(executeCommand);
}),
() => {
const server = getServerContext(config);
return {
query: options.sql,
values: options.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
if (cb) {
cb(err);
} else {
executeCommand.emit('error', err);
}
executeCommand.emit('end');
return;
}
executeCommand.statement = stmt;
});
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
).then(
(result) => {
if (origExecCb) origExecCb(null, result.results, result.fields);
},
(err) => {
if (origExecCb) origExecCb(err);
}
);

return executeCommand;
}

Expand Down
32 changes: 32 additions & 0 deletions lib/base/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ const PoolConnection = require('../pool_connection.js');
const Queue = require('denque');
const BaseConnection = require('./connection.js');
const Errors = require('../constants/errors.js');
const {
tracePoolConnect,
getServerContext,
poolConnectChannel,
} = require('../tracing.js');

// Source: https://github.com/go-sql-driver/mysql/blob/76c00e35a8d48f8f70f0e7dffe584692bd3fa612/packets.go#L598-L613
function isReadOnlyError(err) {
Expand Down Expand Up @@ -49,6 +54,33 @@ class BasePool extends EventEmitter {
}

getConnection(cb) {
if (!poolConnectChannel?.hasSubscribers) {
return this._getConnection(cb);
}
const config = this.config.connectionConfig;
tracePoolConnect(
() =>
new Promise((resolve, reject) => {
this._getConnection((err, connection) => {
if (err) reject(err);
else resolve(connection);
});
}),
() => {
const server = getServerContext(config);
return {
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
).then(
(connection) => cb(null, connection),
(err) => cb(err)
);
}

_getConnection(cb) {
if (this._closed) {
return process.nextTick(() => cb(new Error('Pool is closed.')));
}
Expand Down
69 changes: 69 additions & 0 deletions lib/tracing.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { TracingChannel } from 'node:diagnostics_channel';

export interface QueryTraceContext {
query: string;
values: any;
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export interface ExecuteTraceContext {
query: string;
values: any;
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export interface ConnectTraceContext {
database: string;
serverAddress: string;
serverPort: number | undefined;
user: string;
}

export interface PoolConnectTraceContext {
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export declare const queryChannel:
| TracingChannel<QueryTraceContext>
| undefined;
export declare const executeChannel:
| TracingChannel<ExecuteTraceContext>
| undefined;
export declare const connectChannel:
| TracingChannel<ConnectTraceContext>
| undefined;
export declare const poolConnectChannel:
| TracingChannel<PoolConnectTraceContext>
| undefined;

export declare function getServerContext(config: {
socketPath?: string;
host?: string;
port?: number;
}): { serverAddress: string; serverPort: number | undefined };

export declare function traceQuery<T>(
fn: () => Promise<T>,
contextFactory: () => QueryTraceContext
): Promise<T>;

export declare function traceExecute<T>(
fn: () => Promise<T>,
contextFactory: () => ExecuteTraceContext
): Promise<T>;

export declare function traceConnect<T>(
fn: () => Promise<T>,
contextFactory: () => ConnectTraceContext
): Promise<T>;

export declare function tracePoolConnect<T>(
fn: () => Promise<T>,
contextFactory: () => PoolConnectTraceContext
): Promise<T>;
82 changes: 82 additions & 0 deletions lib/tracing.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
'use strict';

const process = require('process');

// Safe load: use getBuiltinModule if available, fallback to require, catch if unavailable
const dc = (() => {
try {
return 'getBuiltinModule' in process
? process.getBuiltinModule('node:diagnostics_channel')
: require('node:diagnostics_channel');
} catch {
return undefined;
}
})();

const hasTracingChannel = typeof dc?.tracingChannel === 'function';

const queryChannel = hasTracingChannel
? dc.tracingChannel('mysql2:query')
: undefined;

const executeChannel = hasTracingChannel
? dc.tracingChannel('mysql2:execute')
: undefined;

const connectChannel = hasTracingChannel
? dc.tracingChannel('mysql2:connect')
: undefined;

const poolConnectChannel = hasTracingChannel
? dc.tracingChannel('mysql2:pool:connect')
: undefined;

function getServerContext(config) {
if (config.socketPath) {
return { serverAddress: config.socketPath, serverPort: undefined };
}
return {
serverAddress: config.host || 'localhost',
serverPort: config.port || 3306,
};
}

function traceQuery(fn, contextFactory) {
if (queryChannel?.hasSubscribers) {
return queryChannel.tracePromise(fn, contextFactory());
}
return fn();
}

function traceExecute(fn, contextFactory) {
if (executeChannel?.hasSubscribers) {
return executeChannel.tracePromise(fn, contextFactory());
}
return fn();
}

function traceConnect(fn, contextFactory) {
if (connectChannel?.hasSubscribers) {
return connectChannel.tracePromise(fn, contextFactory());
}
return fn();
}

function tracePoolConnect(fn, contextFactory) {
if (poolConnectChannel?.hasSubscribers) {
return poolConnectChannel.tracePromise(fn, contextFactory());
}
return fn();
}

module.exports = {
queryChannel,
executeChannel,
connectChannel,
poolConnectChannel,
getServerContext,
traceQuery,
traceExecute,
traceConnect,
tracePoolConnect,
};
Loading
Loading