Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 161 additions & 18 deletions lib/base/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ const Packets = require('../packets/index.js');
const Commands = require('../commands/index.js');
const ConnectionConfig = require('../connection_config.js');
const CharsetToEncoding = require('../constants/charset_encodings.js');
const {
traceQuery,
traceExecute,
traceConnect,
getServerContext,
queryChannel,
executeChannel,
connectChannel,
} = require('../tracing.js');

let _connectionId = 0;

Expand Down Expand Up @@ -141,6 +150,40 @@ class BaseConnection extends EventEmitter {
this._notifyError(err);
});
this.addCommand(handshakeCommand);

// Trace the connection handshake
if (connectChannel?.hasSubscribers) {
const config = this.config;
traceConnect(
() =>
new Promise((resolve, reject) => {
/* eslint-disable prefer-const */
let onConnect, onError;
onConnect = (param) => {
this.removeListener('error', onError);
resolve(param);
};
onError = (err) => {
this.removeListener('connect', onConnect);
reject(err);
};
/* eslint-enable prefer-const */
this.once('connect', onConnect);
this.once('error', onError);
}),
() => {
const server = getServerContext(config);
return {
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
user: config.user || '',
};
}
).catch(() => {
// errors are already handled by the handshake error listener
});
}
}
// in case there was no initial handshake but we need to read sting, assume it utf-8
// most common example: "Too many connections" error ( packet is sent immediately on connection attempt, we don't know server encoding yet)
Expand Down Expand Up @@ -601,7 +644,48 @@ class BaseConnection extends EventEmitter {
cmdQuery.values !== undefined ? cmdQuery.values : []
);
cmdQuery.sql = rawSql;
return this.addCommand(cmdQuery);

if (!queryChannel?.hasSubscribers) {
return this.addCommand(cmdQuery);
}

const config = this.config;
const origCb = cmdQuery.onResult;

traceQuery(
() =>
new Promise((resolve, reject) => {
if (origCb) {
cmdQuery.onResult = function (err, results, fields) {
if (err) reject(err);
else resolve({ results, fields });
};
} else {
cmdQuery.once('error', reject);
cmdQuery.once('end', () => resolve());
}
this.addCommand(cmdQuery);
}),
() => {
const server = getServerContext(config);
return {
query: cmdQuery.sql,
values: cmdQuery.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
).then(
(result) => {
if (origCb) origCb.call(cmdQuery, null, result.results, result.fields);
},
(err) => {
if (origCb) origCb.call(cmdQuery, err);
}
);

return cmdQuery;
}

pause() {
Expand Down Expand Up @@ -702,25 +786,84 @@ class BaseConnection extends EventEmitter {
});
}
const executeCommand = new Commands.Execute(options, cb);
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed, we have main
// combined callback here
executeCommand.start = function () {
return null;
};
if (cb) {
cb(err);
} else {
executeCommand.emit('error', err);

if (!executeChannel?.hasSubscribers) {
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
// skip execute command if prepare failed, we have main
// combined callback here
executeCommand.start = function () {
return null;
};
if (cb) {
cb(err);
} else {
executeCommand.emit('error', err);
}
executeCommand.emit('end');
return;
}
executeCommand.emit('end');
return;
executeCommand.statement = stmt;
});
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
return executeCommand;
}

const config = this.config;
const origExecCb = executeCommand.onResult;

traceExecute(
() =>
new Promise((resolve, reject) => {
const prepareCommand = new Commands.Prepare(options, (err, stmt) => {
if (err) {
executeCommand.start = function () {
return null;
};
if (!origExecCb) {
executeCommand.emit('error', err);
}
executeCommand.emit('end');
reject(err);
return;
}
executeCommand.statement = stmt;
});

if (origExecCb) {
executeCommand.onResult = function (err, results, fields) {
if (err) reject(err);
else resolve({ results, fields });
};
} else {
executeCommand.once('error', reject);
executeCommand.once('end', () => resolve());
}

this.addCommand(prepareCommand);
this.addCommand(executeCommand);
}),
() => {
const server = getServerContext(config);
return {
query: options.sql,
values: options.values,
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
executeCommand.statement = stmt;
});
this.addCommand(prepareCommand);
this.addCommand(executeCommand);
).then(
(result) => {
if (origExecCb)
origExecCb.call(executeCommand, null, result.results, result.fields);
},
(err) => {
if (origExecCb) origExecCb.call(executeCommand, err);
}
);

return executeCommand;
}

Expand Down
32 changes: 32 additions & 0 deletions lib/base/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ const PoolConnection = require('../pool_connection.js');
const Queue = require('denque');
const BaseConnection = require('./connection.js');
const Errors = require('../constants/errors.js');
const {
tracePoolConnect,
getServerContext,
poolConnectChannel,
} = require('../tracing.js');

// Source: https://github.com/go-sql-driver/mysql/blob/76c00e35a8d48f8f70f0e7dffe584692bd3fa612/packets.go#L598-L613
function isReadOnlyError(err) {
Expand Down Expand Up @@ -49,6 +54,33 @@ class BasePool extends EventEmitter {
}

getConnection(cb) {
if (!poolConnectChannel?.hasSubscribers) {
return this._getConnection(cb);
}
const config = this.config.connectionConfig;
tracePoolConnect(
() =>
new Promise((resolve, reject) => {
this._getConnection((err, connection) => {
if (err) reject(err);
else resolve(connection);
});
}),
() => {
const server = getServerContext(config);
return {
database: config.database || '',
serverAddress: server.serverAddress,
serverPort: server.serverPort,
};
}
).then(
(connection) => cb(null, connection),
(err) => cb(err)
);
}

_getConnection(cb) {
if (this._closed) {
return process.nextTick(() => cb(new Error('Pool is closed.')));
}
Expand Down
69 changes: 69 additions & 0 deletions lib/tracing.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { TracingChannel } from 'node:diagnostics_channel';

export interface QueryTraceContext {
query: string;
values: any;
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export interface ExecuteTraceContext {
query: string;
values: any;
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export interface ConnectTraceContext {
database: string;
serverAddress: string;
serverPort: number | undefined;
user: string;
}

export interface PoolConnectTraceContext {
database: string;
serverAddress: string;
serverPort: number | undefined;
}

export declare const queryChannel:
| TracingChannel<QueryTraceContext>
| undefined;
export declare const executeChannel:
| TracingChannel<ExecuteTraceContext>
| undefined;
export declare const connectChannel:
| TracingChannel<ConnectTraceContext>
| undefined;
export declare const poolConnectChannel:
| TracingChannel<PoolConnectTraceContext>
| undefined;

export declare function getServerContext(config: {
socketPath?: string;
host?: string;
port?: number;
}): { serverAddress: string; serverPort: number | undefined };

export declare function traceQuery<T>(
fn: () => Promise<T>,
contextFactory: () => QueryTraceContext
): Promise<T>;

export declare function traceExecute<T>(
fn: () => Promise<T>,
contextFactory: () => ExecuteTraceContext
): Promise<T>;

export declare function traceConnect<T>(
fn: () => Promise<T>,
contextFactory: () => ConnectTraceContext
): Promise<T>;

export declare function tracePoolConnect<T>(
fn: () => Promise<T>,
contextFactory: () => PoolConnectTraceContext
): Promise<T>;
Loading
Loading