Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
fail-fast: false
max-parallel: 6
matrix:
library:
Expand Down
5 changes: 5 additions & 0 deletions src/core/tracing/SpanUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ export class SpanUtils {
logger.debug(
`[SpanUtils] Stopping recording of child spans for span ${spanContext.spanId}, packageName: ${options.packageName}, instrumentationName: ${options.instrumentationName}`,
);
if (mode === TuskDriftMode.REPLAY) {
throw new Error(
`Unexpected child span in replay mode for span ${spanContext.spanId}, packageName: ${options.packageName}, instrumentationName: ${options.instrumentationName}`,
);
}
return originalFunctionCall();
}
}
Expand Down
242 changes: 207 additions & 35 deletions src/instrumentation/libraries/mysql2/Instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { TdMysql2ConnectionMock } from "./mocks/TdMysql2ConnectionMock";
import { TdMysql2QueryMock } from "./mocks/TdMysql2QueryMock";
import { captureStackTrace } from "src/instrumentation/core/utils";
import { TdMysql2ConnectionEventMock } from "./mocks/TdMysql2ConnectionEventMock";
import * as diagnosticsChannel from "node:diagnostics_channel";
import { EventEmitter } from "events";

// Version ranges for mysql2
Expand All @@ -34,8 +35,13 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
private readonly INSTRUMENTATION_NAME = "Mysql2Instrumentation";
private readonly CONTEXT_BOUND_CONNECTION = Symbol("mysql2-context-bound-connection");
private readonly CONTEXT_BOUND_PARENT_CONTEXT = Symbol("mysql2-bound-parent-context");
private readonly MYSQL2_NATIVE_QUERY_CHANNEL =
typeof diagnosticsChannel.tracingChannel === "function"
? diagnosticsChannel.tracingChannel("mysql2:query")
: undefined;
private mode: TuskDriftMode;
private queryMock: TdMysql2QueryMock;
private baseConnectionCreateQuery?: Function;

constructor(config: Mysql2InstrumentationConfig = {}) {
super("mysql2", config);
Expand Down Expand Up @@ -104,6 +110,10 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
return BaseConnectionClass;
}

if (typeof BaseConnectionClass.createQuery === "function") {
this.baseConnectionCreateQuery = BaseConnectionClass.createQuery.bind(BaseConnectionClass);
}

// Wrap BaseConnection.prototype.query
if (BaseConnectionClass.prototype && BaseConnectionClass.prototype.query) {
if (!isWrapped(BaseConnectionClass.prototype.query)) {
Expand Down Expand Up @@ -1427,6 +1437,65 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
return sql;
}

private _shouldEmitMysql2NativeQueryEvents(): boolean {
const channel: any = this.MYSQL2_NATIVE_QUERY_CHANNEL;
if (!channel) {
return false;
}

return channel.hasSubscribers ?? channel.start?.hasSubscribers ?? false;
}

private _buildMysql2NativeQueryTraceContext(connectionContext: any, cmdQuery: any) {
const config = connectionContext?.config || {};

if (config.socketPath) {
return {
query: cmdQuery.sql,
values: cmdQuery.values,
database: config.database || "",
serverAddress: config.socketPath,
serverPort: undefined,
};
}

return {
query: cmdQuery.sql,
values: cmdQuery.values,
database: config.database || "",
serverAddress: config.host || "localhost",
serverPort: config.port || 3306,
};
}

private _addMysql2CommandWithNativeTracing(connectionContext: any, cmdQuery: any): any {
const channel: any = this.MYSQL2_NATIVE_QUERY_CHANNEL;

if (!this._shouldEmitMysql2NativeQueryEvents() || typeof channel?.tracePromise !== "function") {
return connectionContext.addCommand(cmdQuery);
}

const traceContext = this._buildMysql2NativeQueryTraceContext(connectionContext, cmdQuery);
const result = connectionContext.addCommand(cmdQuery);

if (result && typeof result.once === "function") {
void channel
.tracePromise(
() =>
new Promise<void>((resolve, reject) => {
result.once("error", reject);
result.once("end", () => resolve());
}),
traceContext,
)
.catch(() => {
// Query errors are already surfaced via mysql2's emitter/callback contract.
});
}

return result;
}

private _handleRecordQueryInSpan(
spanInfo: SpanInfo,
originalQuery: Function,
Expand Down Expand Up @@ -1500,47 +1569,139 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {

return invokeOriginal(args);
} else {
// Promise-based query or streaming query (no callback)
const result = invokeOriginal(args);

// For streaming queries (event emitters), attach event listeners
// In mysql2, streaming queries are identified by checking if result has 'on' method
// and we're NOT using it as a promise (not calling .then() explicitly)
if (result && typeof result.on === "function") {
// Collect data for streaming queries
const streamResults: any[] = [];
const connectionContext = context as any;
const canConstructQueryDirectly =
!!connectionContext &&
typeof connectionContext.addCommand === "function" &&
typeof connectionContext.format === "function" &&
typeof connectionContext._resolveNamedPlaceholders === "function" &&
typeof this.baseConnectionCreateQuery === "function";

let result: any;

if (canConstructQueryDirectly) {
try {
/**
* mysql2 3.20 introduced native TracingChannel support inside BaseConnection.query().
* That means callback-less queries can now route through additional internal tracing
* branches before the command is added to the connection. Our SDK also monkey-patches
* query() and expects the no-callback form to behave like the pre-3.20 path: build the
* Query command, enqueue it, then observe the returned EventEmitter.
*
* The `/test/stream-query` E2E endpoint depends on that exact EventEmitter lifecycle:
* it only sends the HTTP response when the returned query emits `error` or `end`.
* When we delegate to mysql2's newer callback-less query path, we become coupled to its
* internal tracing implementation and the endpoint can hang before `end` reaches the app.
*
* To keep our instrumentation stable across mysql2 versions, we intentionally recreate
* the old query() steps for connection-style, callback-less queries:
* 1. Reuse an existing Query command if one was provided.
* 2. Otherwise, construct the Query command with BaseConnection.createQuery captured when
* mysql2's base connection class was patched. We intentionally use the unwrapped base
* helper here because wrapped connection constructors may not preserve static helpers
* like createQuery on the instance constructor.
* 3. Resolve named placeholders and format SQL exactly like mysql2 query() does.
* 4. Add the command directly to the connection and observe the returned emitter.
* 5. If some other tool is subscribed to mysql2's native `mysql2:query`
* TracingChannel, emit equivalent lifecycle events around the direct command path
* so native subscribers (for example OTEL/APM integrations) still observe the query.
*
* This avoids mysql2's internal tracing branch for the emitter-style path while still
* recording the query correctly from the command lifecycle. Callback-based queries stay
* on the normal upstream path because they are not affected by this regression.
*/
const firstArg = args[0];
const isExistingQueryCommand =
!!firstArg &&
typeof firstArg === "object" &&
typeof firstArg.on === "function" &&
(typeof firstArg.onResult === "function" || firstArg.constructor?.name === "Query");

const cmdQuery = isExistingQueryCommand
? firstArg
: this.baseConnectionCreateQuery!(
args[0],
args[1],
undefined,
connectionContext.config,
);

connectionContext._resolveNamedPlaceholders(cmdQuery);
const rawSql = connectionContext.format(
cmdQuery.sql,
cmdQuery.values !== undefined ? cmdQuery.values : [],
);
cmdQuery.sql = rawSql;
result = this._addMysql2CommandWithNativeTracing(connectionContext, cmdQuery);
} catch (error) {
logger.debug(
`[Mysql2Instrumentation] direct query construction failed, falling back to original query path`,
error,
);
result = invokeOriginal(args);
}
} else {
result = invokeOriginal(args);
}

const isEmitterLike =
!!result &&
typeof result.on === "function" &&
typeof result.once === "function" &&
typeof result.emit === "function";

// mysql2 3.20 can return callback-less query objects that are both thenable
// and EventEmitter-like. For stream-style queries the application relies on
// the emitter contract, so prefer the emitter path whenever those methods exist.
if (isEmitterLike) {
let streamFields: any = null;
const streamResults: any[] = [];
let finished = false;

const finalize = (error?: Error) => {
if (finished) return;
finished = true;

setImmediate(() => {
if (error) {
logger.debug(
`[Mysql2Instrumentation] MySQL2 stream query error: ${error.message} (${SpanUtils.getTraceInfo()})`,
);
try {
SpanUtils.endSpan(spanInfo.span, {
code: SpanStatusCode.ERROR,
message: error.message,
});
} catch (endError) {
logger.error(`[Mysql2Instrumentation] error ending span:`, endError);
}
return;
}

result
.on("error", (error: Error) => {
logger.debug(
`[Mysql2Instrumentation] MySQL2 stream query error: ${error.message} (${SpanUtils.getTraceInfo()})`,
`[Mysql2Instrumentation] MySQL2 stream query completed (${SpanUtils.getTraceInfo()})`,
);
try {
SpanUtils.endSpan(spanInfo.span, {
code: SpanStatusCode.ERROR,
message: error.message,
});
} catch (error) {
logger.error(`[Mysql2Instrumentation] error ending span:`, error);
this._addOutputAttributesToSpan(spanInfo, streamResults, streamFields);
SpanUtils.endSpan(spanInfo.span, { code: SpanStatusCode.OK });
} catch (endError) {
logger.error(`[Mysql2Instrumentation] error ending span:`, endError);
}
});
};

result
.once("error", (error: Error) => {
finalize(error);
})
.on("fields", (fields: any) => {
streamFields = fields;
})
.on("result", (row: any) => {
streamResults.push(row);
})
.on("end", () => {
logger.debug(
`[Mysql2Instrumentation] MySQL2 stream query completed (${SpanUtils.getTraceInfo()})`,
);
try {
this._addOutputAttributesToSpan(spanInfo, streamResults, streamFields);
SpanUtils.endSpan(spanInfo.span, { code: SpanStatusCode.OK });
} catch (error) {
logger.error(`[Mysql2Instrumentation] error ending span:`, error);
}
.once("end", () => {
finalize();
});
}

Expand Down Expand Up @@ -1614,7 +1775,9 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
return otelContext.with(parentContext, () => callback(error, scopedConnection));
};

return otelContext.with(parentContext, () => originalGetConnection.call(context, wrappedCallback));
return otelContext.with(parentContext, () =>
originalGetConnection.call(context, wrappedCallback),
);
} else {
// Promise-based getConnection
const promise = otelContext.with(parentContext, () => originalGetConnection.call(context));
Expand All @@ -1635,7 +1798,10 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
});
SpanUtils.endSpan(spanInfo.span, { code: SpanStatusCode.OK });
} catch (error) {
logger.error(`[Mysql2Instrumentation] error processing getConnection response:`, error);
logger.error(
`[Mysql2Instrumentation] error processing getConnection response:`,
error,
);
}
return scopedConnection;
});
Expand Down Expand Up @@ -1733,15 +1899,21 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {

// Create the patched constructor function
function TdPatchedConnection(this: any, ...args: any[]) {
// Capture new.target so that subclasses (e.g. PoolConnection) get the
// correct prototype chain when we delegate to OriginalConnection.
// Without this, methods like _removeFromPool and _realEnd that live on
// PoolConnection.prototype would be missing from the returned instance.
const constructTarget = new.target || TdPatchedConnection;

const inputValue = { method: "createConnection" };
// RECORD mode: create real connection and record connect/error events
if (self.mode === TuskDriftMode.RECORD) {
return handleRecordMode({
originalFunctionCall: () => new OriginalConnection(...args),
originalFunctionCall: () => Reflect.construct(OriginalConnection, args, constructTarget),
recordModeHandler: ({ isPreAppStart }) => {
return SpanUtils.createAndExecuteSpan(
self.mode,
() => new OriginalConnection(...args),
() => Reflect.construct(OriginalConnection, args, constructTarget),
{
name: `mysql2.connection.create`,
kind: SpanKind.CLIENT,
Expand All @@ -1753,7 +1925,7 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
isPreAppStart,
},
(spanInfo) => {
const connection = new OriginalConnection(...args);
const connection = Reflect.construct(OriginalConnection, args, constructTarget);

// Listen for successful connection - record via span
connection.on("connect", (connectionObj: any) => {
Expand Down Expand Up @@ -1801,7 +1973,7 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
replayModeHandler: () => {
return SpanUtils.createAndExecuteSpan(
self.mode,
() => new OriginalConnection(...args),
() => Reflect.construct(OriginalConnection, args, constructTarget),
{
name: `mysql2.connection.create`,
kind: SpanKind.CLIENT,
Expand Down Expand Up @@ -1956,7 +2128,7 @@ export class Mysql2Instrumentation extends TdInstrumentationBase {
}

// Fallback for disabled mode
return new OriginalConnection(...args);
return Reflect.construct(OriginalConnection, args, constructTarget);
}

// Copy static properties from original class
Expand Down
Loading
Loading