diff --git a/apps/worker/src/log-processing.spec.ts b/apps/worker/src/log-processing.spec.ts index c6c376955..eac58f9b4 100644 --- a/apps/worker/src/log-processing.spec.ts +++ b/apps/worker/src/log-processing.spec.ts @@ -1,6 +1,14 @@ import { randomUUID } from "node:crypto"; -import { afterAll, beforeEach, describe, expect, test } from "vitest"; +import { + afterAll, + afterEach, + beforeEach, + describe, + expect, + test, + vi, +} from "vitest"; import { eq, @@ -16,6 +24,9 @@ import { import { batchProcessLogs } from "./worker.js"; describe("Log Processing", () => { + const previousProviderErrorDiscordUrl = + process.env.PROVIDER_ERROR_DISCORD_URL; + interface TestIds { apiKeyId: string; email: string; @@ -112,6 +123,12 @@ describe("Log Processing", () => { await cleanupLogProcessingTestData(currentTestIds); }); + afterEach(() => { + process.env.PROVIDER_ERROR_DISCORD_URL = previousProviderErrorDiscordUrl; + vi.unstubAllGlobals(); + vi.restoreAllMocks(); + }); + describe("batchProcessLogs", () => { test("should process logs and set processedAt timestamp", async () => { // Insert unprocessed log directly @@ -571,5 +588,243 @@ describe("Log Processing", () => { expect(Number(updatedOrg!.credits)).toBe(initialCredits); }); + + test.each([ + { + statusCode: 429, + statusText: "Too Many Requests", + unifiedFinishReason: "upstream_error", + }, + { + statusCode: 401, + statusText: "Unauthorized", + unifiedFinishReason: "gateway_error", + }, + ])( + "should report $unifiedFinishReason logs to Discord even for non-5xx statuses", + async ({ unifiedFinishReason, statusCode, statusText }) => { + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(null, { status: 204 })); + vi.stubGlobal("fetch", fetchMock); + process.env.PROVIDER_ERROR_DISCORD_URL = + "https://discord.example/provider-errors"; + const requestId = `test-request-${unifiedFinishReason}`; + const traceId = `trace-${unifiedFinishReason}`; + + const insertedLogs = await db + .insert(log) + .values({ + requestId, + organizationId: testOrg.id, + projectId: testProject.id, + apiKeyId: testApiKey.id, + cost: 0, + cached: false, + usedMode: "credits", + duration: 2450, + requestedModel: "openai/gpt-4o-mini", + requestedProvider: "openai", + usedModel: "gpt-4o-mini", + usedModelMapping: "gpt-4o-mini", + usedProvider: "openai", + responseSize: 150, + mode: "credits", + hasError: true, + errorDetails: { + statusCode, + statusText, + responseText: + "provider timed out for support@example.com " + + `request ${requestId} in project ${testProject.id}`, + cause: `trace ${traceId}`, + }, + unifiedFinishReason, + traceId, + }) + .returning({ + id: log.id, + }); + const insertedLogId = insertedLogs[0]?.id ?? ""; + + await batchProcessLogs(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock).toHaveBeenCalledWith( + "https://discord.example/provider-errors", + expect.objectContaining({ + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }), + ); + + const payloadText = String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}"); + const payload = JSON.parse(payloadText) as { + embeds?: Array<{ + description?: string; + fields?: Array<{ name: string; value: string }>; + title?: string; + }>; + }; + + expect(payload.embeds?.[0]?.title).toContain( + unifiedFinishReason.replaceAll("_", " "), + ); + expect(payload.embeds?.[0]?.description).toContain( + "provider timed out", + ); + expect(payload.embeds?.[0]?.description).toContain(""); + expect(payloadText).not.toContain(requestId); + expect(payloadText).not.toContain(traceId); + expect(payloadText).not.toContain(testProject.id); + expect(payloadText).not.toContain(testOrg.id); + expect(insertedLogId).not.toBe(""); + expect(payloadText).not.toContain(insertedLogId); + expect(payload.embeds?.[0]?.fields).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + name: "Trace ID", + value: expect.stringMatching(/^redacted:/), + }), + expect.objectContaining({ + name: "Request ID", + value: expect.stringMatching(/^redacted:/), + }), + expect.objectContaining({ + name: "Project", + value: expect.stringMatching(/^redacted:/), + }), + expect.objectContaining({ + name: "Organization", + value: expect.stringMatching(/^redacted:/), + }), + expect.objectContaining({ + name: "Log ID", + value: expect.stringMatching(/^redacted:/), + }), + ]), + ); + }, + ); + + test("should not report to Discord when the webhook URL is blank", async () => { + const fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + process.env.PROVIDER_ERROR_DISCORD_URL = " "; + + await db.insert(log).values({ + requestId: "test-request-blank-webhook-url", + organizationId: testOrg.id, + projectId: testProject.id, + apiKeyId: testApiKey.id, + cost: 0, + cached: false, + usedMode: "credits", + duration: 1200, + requestedModel: "openai/gpt-4o-mini", + requestedProvider: "openai", + usedModel: "gpt-4o-mini", + usedProvider: "openai", + responseSize: 150, + mode: "credits", + hasError: true, + errorDetails: { + statusCode: 502, + statusText: "Bad Gateway", + responseText: "upstream unavailable", + }, + unifiedFinishReason: "upstream_error", + }); + + await batchProcessLogs(); + + expect(fetchMock).not.toHaveBeenCalled(); + }); + + test("should not report client errors to Discord", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(null, { status: 204 })); + vi.stubGlobal("fetch", fetchMock); + process.env.PROVIDER_ERROR_DISCORD_URL = + "https://discord.example/provider-errors"; + + await db.insert(log).values({ + requestId: "test-request-client-error", + organizationId: testOrg.id, + projectId: testProject.id, + apiKeyId: testApiKey.id, + cost: 0, + cached: false, + usedMode: "credits", + duration: 900, + requestedModel: "openai/gpt-4o-mini", + requestedProvider: "openai", + usedModel: "gpt-4o-mini", + usedProvider: "openai", + responseSize: 150, + mode: "credits", + hasError: true, + errorDetails: { + statusCode: 400, + statusText: "Bad Request", + responseText: "invalid input", + }, + unifiedFinishReason: "client_error", + }); + + await batchProcessLogs(); + + expect(fetchMock).not.toHaveBeenCalled(); + }); + + test("should report provider errors without a finish reason", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(new Response(null, { status: 204 })); + vi.stubGlobal("fetch", fetchMock); + process.env.PROVIDER_ERROR_DISCORD_URL = + "https://discord.example/provider-errors"; + + await db.insert(log).values({ + requestId: "test-request-missing-finish-reason", + organizationId: testOrg.id, + projectId: testProject.id, + apiKeyId: testApiKey.id, + cost: 0, + cached: false, + usedMode: "credits", + duration: 1200, + requestedModel: "openai/gpt-4o-mini", + requestedProvider: "openai", + usedModel: "gpt-4o-mini", + usedProvider: "openai", + responseSize: 150, + mode: "credits", + hasError: true, + errorDetails: { + statusCode: 502, + statusText: "Bad Gateway", + responseText: "upstream unavailable", + }, + unifiedFinishReason: null, + }); + + await batchProcessLogs(); + + expect(fetchMock).toHaveBeenCalledTimes(1); + + const payload = JSON.parse( + String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}"), + ) as { + embeds?: Array<{ + title?: string; + }>; + }; + + expect(payload.embeds?.[0]?.title).toBe("Provider error"); + }); }); }); diff --git a/apps/worker/src/services/provider-error-discord.ts b/apps/worker/src/services/provider-error-discord.ts new file mode 100644 index 000000000..753b35a83 --- /dev/null +++ b/apps/worker/src/services/provider-error-discord.ts @@ -0,0 +1,233 @@ +import { createHash } from "node:crypto"; + +import { logger } from "@llmgateway/logger"; + +interface ErrorDetails { + statusCode: number; + statusText: string; + responseText: string; + cause?: string; +} + +export interface ProviderErrorNotificationLog { + duration: number; + errorDetails: ErrorDetails | null; + logId: string; + organizationId: string; + projectId: string; + requestId: string; + requestedModel: string; + requestedProvider: string | null; + traceId: string | null; + unifiedFinishReason: string | null; + usedModel: string; + usedModelMapping: string | null; + usedProvider: string; +} + +interface DiscordEmbed { + title: string; + description?: string; + color?: number; + fields?: Array<{ + name: string; + value: string; + inline?: boolean; + }>; + timestamp?: string; +} + +const DISCORD_WEBHOOK_TIMEOUT_MS = 5000; +const REDACTED_EMAIL = ""; +const EMAIL_PATTERN = /\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b/gi; + +function getProviderErrorDiscordUrl(): string | null { + const url = process.env.PROVIDER_ERROR_DISCORD_URL?.trim(); + if (!url) { + return null; + } + + return url; +} + +function truncate(value: string, maxLength: number): string { + if (value.length <= maxLength) { + return value; + } + + return `${value.slice(0, Math.max(0, maxLength - 3))}...`; +} + +function hashValue(value: string): string { + return createHash("sha256").update(value).digest("hex").slice(0, 12); +} + +function formatIdentifier(value: string | null): string { + if (!value) { + return "n/a"; + } + + return `redacted:${hashValue(value)}`; +} + +function sanitizeErrorText( + value: string, + log: ProviderErrorNotificationLog, +): string { + let sanitized = value; + + for (const [label, rawValue] of [ + ["request_id", log.requestId], + ["project_id", log.projectId], + ["organization_id", log.organizationId], + ["trace_id", log.traceId], + ["log_id", log.logId], + ] as const) { + if (!rawValue) { + continue; + } + + sanitized = sanitized + .split(rawValue) + .join(`<${label}:${formatIdentifier(rawValue)}>`); + } + + return sanitized.replace(EMAIL_PATTERN, REDACTED_EMAIL); +} + +function formatErrorDescription( + errorDetails: ErrorDetails | null, + log: ProviderErrorNotificationLog, +): string { + if (!errorDetails) { + return "No error details captured."; + } + + const lines = [ + `${errorDetails.statusCode} ${errorDetails.statusText}`.trim(), + errorDetails.cause + ? `Cause: ${sanitizeErrorText(errorDetails.cause, log)}` + : null, + errorDetails.responseText + ? sanitizeErrorText(errorDetails.responseText, log) + : null, + ].filter((line): line is string => Boolean(line)); + + return truncate(lines.join("\n"), 4000); +} + +function buildDiscordEmbed(log: ProviderErrorNotificationLog): { + embeds: DiscordEmbed[]; +} { + const errorType = log.unifiedFinishReason?.replaceAll("_", " ") ?? "error"; + + return { + embeds: [ + { + title: `Provider ${errorType}`, + description: formatErrorDescription(log.errorDetails, log), + color: 0xef4444, + fields: [ + { + name: "Provider", + value: truncate(log.usedProvider, 1024), + inline: true, + }, + { + name: "Model", + value: truncate(log.usedModelMapping ?? log.usedModel, 1024), + inline: true, + }, + { + name: "Requested", + value: truncate( + `${log.requestedProvider ?? "auto"} / ${log.requestedModel}`, + 1024, + ), + inline: true, + }, + { + name: "Request ID", + value: formatIdentifier(log.requestId), + }, + { + name: "Project", + value: formatIdentifier(log.projectId), + inline: true, + }, + { + name: "Organization", + value: formatIdentifier(log.organizationId), + inline: true, + }, + { + name: "Duration", + value: `${log.duration}ms`, + inline: true, + }, + { + name: "Trace ID", + value: formatIdentifier(log.traceId), + inline: true, + }, + { + name: "Log ID", + value: formatIdentifier(log.logId), + inline: true, + }, + ], + timestamp: new Date().toISOString(), + }, + ], + }; +} + +export async function notifyProviderError( + log: ProviderErrorNotificationLog, +): Promise { + const providerErrorDiscordUrl = getProviderErrorDiscordUrl(); + + if (!providerErrorDiscordUrl) { + return; + } + + try { + const controller = new AbortController(); + const timeout = setTimeout(() => { + controller.abort(); + }, DISCORD_WEBHOOK_TIMEOUT_MS); + try { + const response = await fetch(providerErrorDiscordUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(buildDiscordEmbed(log)), + signal: controller.signal, + }); + + if (!response.ok) { + const errorText = await response.text(); + throw new Error( + `Discord webhook error: ${response.status} - ${errorText}`, + ); + } + } finally { + clearTimeout(timeout); + } + } catch (error) { + const errorToLog = + error instanceof Error && error.name === "AbortError" + ? new Error( + `Discord webhook timed out after ${DISCORD_WEBHOOK_TIMEOUT_MS}ms`, + ) + : error instanceof Error + ? error + : new Error(String(error)); + + logger.error( + `Failed to send provider error Discord notification for log ${log.logId}`, + errorToLog, + ); + } +} diff --git a/apps/worker/src/worker.ts b/apps/worker/src/worker.ts index d11777b74..e09ad7304 100644 --- a/apps/worker/src/worker.ts +++ b/apps/worker/src/worker.ts @@ -39,6 +39,10 @@ import { PROJECT_STATS_REFRESH_INTERVAL_SECONDS, refreshProjectHourlyStats, } from "./services/project-stats-aggregator.js"; +import { + notifyProviderError, + type ProviderErrorNotificationLog, +} from "./services/provider-error-discord.js"; import { backfillHistoryIfNeeded, calculateAggregatedStatistics, @@ -205,6 +209,30 @@ const schema = z.object({ unified_finish_reason: z.string().nullable(), }); +type ProcessedLogRow = z.infer; + +const NOTIFIABLE_PROVIDER_ERROR_REASONS = new Set([ + "gateway_error", + "upstream_error", +]); + +function shouldNotifyProviderError(logRow: ProcessedLogRow): boolean { + if (logRow.hasError !== true) { + return false; + } + + if (logRow.unified_finish_reason === "client_error") { + return false; + } + + if (logRow.unified_finish_reason !== null) { + return NOTIFIABLE_PROVIDER_ERROR_REASONS.has(logRow.unified_finish_reason); + } + + const statusCode = logRow.error_details?.statusCode; + return statusCode !== undefined && statusCode >= 500; +} + export async function acquireLock(key: string): Promise { // eslint-disable-next-line no-mixed-operators const lockExpiry = new Date(Date.now() - LOCK_DURATION_MINUTES * 60 * 1000); @@ -678,6 +706,8 @@ export async function batchProcessLogs(): Promise { } const deductedOrgIds: string[] = []; + const providerErrorLogs: ProviderErrorNotificationLog[] = []; + let shouldSendProviderErrorNotifications = false; try { await db.transaction(async (tx) => { @@ -781,6 +811,24 @@ export async function batchProcessLogs(): Promise { unifiedFinishReason: row.unified_finish_reason, }); + if (shouldNotifyProviderError(row)) { + providerErrorLogs.push({ + duration: row.duration, + errorDetails: row.error_details, + logId: row.id, + organizationId: row.organization_id, + projectId: row.project_id, + requestId: row.request_id, + requestedModel: row.requested_model, + requestedProvider: row.requested_provider, + traceId: row.trace_id, + unifiedFinishReason: row.unified_finish_reason, + usedModel: row.used_model, + usedModelMapping: row.used_model_mapping, + usedProvider: row.used_provider, + }); + } + if (row.cost && row.cost > 0 && !row.cached) { const apiKeyCost = new Decimal(row.cost); const existingEvents = apiKeyEvents.get(row.api_key_id) ?? []; @@ -984,11 +1032,7 @@ export async function batchProcessLogs(): Promise { logger.debug(`Marked ${logIds.length} logs as processed`); }); - - // Async low-balance alert check (outside transaction, non-blocking) - if (deductedOrgIds.length > 0) { - void checkLowBalanceAlerts(deductedOrgIds); - } + shouldSendProviderErrorNotifications = true; } catch (error) { logger.error( "Error processing batch credit deductions", @@ -997,6 +1041,14 @@ export async function batchProcessLogs(): Promise { } finally { await releaseLock(CREDIT_PROCESSING_LOCK_KEY); } + + if (deductedOrgIds.length > 0) { + void checkLowBalanceAlerts(deductedOrgIds); + } + + if (shouldSendProviderErrorNotifications && providerErrorLogs.length > 0) { + await Promise.all(providerErrorLogs.map(notifyProviderError)); + } } async function checkLowBalanceAlerts(orgIds: string[]): Promise {