diff --git a/packages/appkit-ui/src/react/charts/types.ts b/packages/appkit-ui/src/react/charts/types.ts index 65804a74..fdcc55f1 100644 --- a/packages/appkit-ui/src/react/charts/types.ts +++ b/packages/appkit-ui/src/react/charts/types.ts @@ -5,7 +5,7 @@ import type { Table } from "apache-arrow"; // ============================================================================ /** Supported data formats for analytics queries */ -export type DataFormat = "json" | "arrow" | "auto"; +export type DataFormat = "json" | "arrow" | "arrow_stream" | "auto"; /** Chart orientation */ export type Orientation = "vertical" | "horizontal"; diff --git a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts index 3d5e96f1..32ce52cb 100644 --- a/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts +++ b/packages/appkit-ui/src/react/hooks/__tests__/use-chart-data.test.ts @@ -205,7 +205,7 @@ describe("useChartData", () => { ); }); - test("auto-selects JSON by default when no heuristics match", () => { + test("auto-selects ARROW_STREAM by default when no heuristics match", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -223,11 +223,11 @@ describe("useChartData", () => { expect(mockUseAnalyticsQuery).toHaveBeenCalledWith( "test", { limit: 100 }, - expect.objectContaining({ format: "JSON" }), + expect.objectContaining({ format: "ARROW_STREAM" }), ); }); - test("defaults to auto format (JSON) when format is not specified", () => { + test("defaults to auto format (ARROW_STREAM) when format is not specified", () => { mockUseAnalyticsQuery.mockReturnValue({ data: [], loading: false, @@ -243,7 +243,7 @@ describe("useChartData", () => { expect(mockUseAnalyticsQuery).toHaveBeenCalledWith( "test", undefined, - expect.objectContaining({ format: "JSON" }), + expect.objectContaining({ format: "ARROW_STREAM" }), ); }); }); diff --git a/packages/appkit-ui/src/react/hooks/types.ts b/packages/appkit-ui/src/react/hooks/types.ts index 5db725fc..f25f17dd 100644 --- a/packages/appkit-ui/src/react/hooks/types.ts +++ b/packages/appkit-ui/src/react/hooks/types.ts @@ -5,7 +5,7 @@ import type { Table } from "apache-arrow"; // ============================================================================ /** Supported response formats for analytics queries */ -export type AnalyticsFormat = "JSON" | "ARROW"; +export type AnalyticsFormat = "JSON" | "ARROW" | "ARROW_STREAM"; /** * Typed Arrow Table - preserves row type information for type inference. @@ -32,8 +32,10 @@ export interface TypedArrowTable< // ============================================================================ /** Options for configuring an analytics SSE query */ -export interface UseAnalyticsQueryOptions { - /** Response format - "JSON" returns typed arrays, "ARROW" returns TypedArrowTable */ +export interface UseAnalyticsQueryOptions< + F extends AnalyticsFormat = "ARROW_STREAM", +> { + /** Response format - "ARROW_STREAM" (default) uses inline Arrow, "JSON" returns typed arrays, "ARROW" uses external links */ format?: F; /** Maximum size of serialized parameters in bytes */ diff --git a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts index 24e03ea3..7d13648f 100644 --- a/packages/appkit-ui/src/react/hooks/use-analytics-query.ts +++ b/packages/appkit-ui/src/react/hooks/use-analytics-query.ts @@ -54,13 +54,13 @@ function getArrowStreamUrl(id: string) { export function useAnalyticsQuery< T = unknown, K extends QueryKey = QueryKey, - F extends AnalyticsFormat = "JSON", + F extends AnalyticsFormat = "ARROW_STREAM", >( queryKey: K, parameters?: InferParams | null, options: UseAnalyticsQueryOptions = {} as UseAnalyticsQueryOptions, ): UseAnalyticsQueryResult> { - const format = options?.format ?? "JSON"; + const format = options?.format ?? "ARROW_STREAM"; const maxParametersSize = options?.maxParametersSize ?? 100 * 1024; const autoStart = options?.autoStart ?? true; diff --git a/packages/appkit-ui/src/react/hooks/use-chart-data.ts b/packages/appkit-ui/src/react/hooks/use-chart-data.ts index d8d0bd38..1d1da2dd 100644 --- a/packages/appkit-ui/src/react/hooks/use-chart-data.ts +++ b/packages/appkit-ui/src/react/hooks/use-chart-data.ts @@ -50,10 +50,11 @@ export interface UseChartDataResult { function resolveFormat( format: DataFormat, parameters?: Record, -): "JSON" | "ARROW" { +): "JSON" | "ARROW" | "ARROW_STREAM" { // Explicit format selection if (format === "json") return "JSON"; if (format === "arrow") return "ARROW"; + if (format === "arrow_stream") return "ARROW_STREAM"; // Auto-selection heuristics if (format === "auto") { @@ -72,10 +73,10 @@ function resolveFormat( return "ARROW"; } - return "JSON"; + return "ARROW_STREAM"; } - return "JSON"; + return "ARROW_STREAM"; } // ============================================================================ diff --git a/packages/appkit/src/connectors/sql-warehouse/defaults.ts b/packages/appkit/src/connectors/sql-warehouse/defaults.ts index 994f11da..506fa52d 100644 --- a/packages/appkit/src/connectors/sql-warehouse/defaults.ts +++ b/packages/appkit/src/connectors/sql-warehouse/defaults.ts @@ -12,7 +12,7 @@ interface ExecuteStatementDefaults { export const executeStatementDefaults: ExecuteStatementDefaults = { wait_timeout: "30s", disposition: "INLINE", - format: "JSON_ARRAY", + format: "ARROW_STREAM", on_wait_timeout: "CONTINUE", timeout: 60000, }; diff --git a/packages/appkit/src/plugins/analytics/analytics.ts b/packages/appkit/src/plugins/analytics/analytics.ts index 86b60986..6fc3357e 100644 --- a/packages/appkit/src/plugins/analytics/analytics.ts +++ b/packages/appkit/src/plugins/analytics/analytics.ts @@ -19,6 +19,7 @@ import { queryDefaults } from "./defaults"; import manifest from "./manifest.json"; import { QueryProcessor } from "./query"; import type { + AnalyticsFormat, AnalyticsQueryResponse, IAnalyticsConfig, IAnalyticsQueryRequest, @@ -119,7 +120,8 @@ export class AnalyticsPlugin extends Plugin { res: express.Response, ): Promise { const { query_key } = req.params; - const { parameters, format = "JSON" } = req.body as IAnalyticsQueryRequest; + const { parameters, format = "ARROW_STREAM" } = + req.body as IAnalyticsQueryRequest; // Request-scoped logging with WideEvent tracking logger.debug(req, "Executing query: %s (format=%s)", query_key, format); @@ -155,19 +157,6 @@ export class AnalyticsPlugin extends Plugin { const userKey = getCurrentUserId(); const executorKey = isAsUser ? userKey : "global"; - const queryParameters = - format === "ARROW" - ? { - formatParameters: { - disposition: "EXTERNAL_LINKS", - format: "ARROW_STREAM", - }, - type: "arrow", - } - : { - type: "result", - }; - const hashedQuery = this.queryProcessor.hashQuery(query); const defaultConfig: PluginExecuteConfig = { @@ -197,20 +186,115 @@ export class AnalyticsPlugin extends Plugin { parameters, ); - const result = await executor.query( + return this._executeWithFormatFallback( + executor, query, processedParams, - queryParameters.formatParameters, + format, signal, ); - - return { type: queryParameters.type, ...result }; }, streamExecutionSettings, executorKey, ); } + /** Format configurations in fallback order. */ + private static readonly FORMAT_CONFIGS = { + ARROW_STREAM: { + formatParameters: { disposition: "INLINE", format: "ARROW_STREAM" }, + type: "result" as const, + }, + JSON: { + formatParameters: undefined, + type: "result" as const, + }, + ARROW: { + formatParameters: { + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }, + type: "arrow" as const, + }, + }; + + /** + * Execute a query with automatic format fallback. + * + * For the default ARROW_STREAM format, tries formats in order until one + * succeeds: ARROW_STREAM → JSON → ARROW. This handles warehouses that + * only support a subset of format/disposition combinations. + * + * Explicit format requests (JSON, ARROW) are not retried. + */ + private async _executeWithFormatFallback( + executor: AnalyticsPlugin, + query: string, + processedParams: + | Record + | undefined, + requestedFormat: AnalyticsFormat, + signal?: AbortSignal, + ): Promise<{ type: string; [key: string]: any }> { + // Explicit format — no fallback. + if (requestedFormat === "JSON" || requestedFormat === "ARROW") { + const config = AnalyticsPlugin.FORMAT_CONFIGS[requestedFormat]; + const result = await executor.query( + query, + processedParams, + config.formatParameters, + signal, + ); + return { type: config.type, ...result }; + } + + // Default (ARROW_STREAM) — try each format in order. + const fallbackOrder: AnalyticsFormat[] = ["ARROW_STREAM", "JSON", "ARROW"]; + + for (let i = 0; i < fallbackOrder.length; i++) { + const fmt = fallbackOrder[i]; + const config = AnalyticsPlugin.FORMAT_CONFIGS[fmt]; + try { + const result = await executor.query( + query, + processedParams, + config.formatParameters, + signal, + ); + if (i > 0) { + logger.info( + "Query succeeded with fallback format %s (preferred %s was rejected)", + fmt, + fallbackOrder[0], + ); + } + return { type: config.type, ...result }; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + const isFormatError = + msg.includes("ARROW_STREAM") || + msg.includes("JSON_ARRAY") || + msg.includes("EXTERNAL_LINKS") || + msg.includes("INVALID_PARAMETER_VALUE") || + msg.includes("NOT_IMPLEMENTED"); + + if (!isFormatError || i === fallbackOrder.length - 1) { + throw err; + } + + logger.warn( + "Format %s rejected by warehouse, falling back to %s: %s", + fmt, + fallbackOrder[i + 1], + msg, + ); + } + } + + // Unreachable — last format in fallbackOrder throws on failure. + throw new Error("All format fallbacks exhausted"); + } + /** * Execute a SQL query using the current execution context. * diff --git a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts index 71a8fd98..27637540 100644 --- a/packages/appkit/src/plugins/analytics/tests/analytics.test.ts +++ b/packages/appkit/src/plugins/analytics/tests/analytics.test.ts @@ -525,6 +525,298 @@ describe("Analytics Plugin", () => { ); }); + test("/query/:query_key should pass INLINE + ARROW_STREAM format parameters when format is ARROW_STREAM", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW_STREAM" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + statement: "SELECT * FROM test", + warehouse_id: "test-warehouse-id", + disposition: "INLINE", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should pass EXTERNAL_LINKS + ARROW_STREAM format parameters when format is ARROW", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "ARROW" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + statement: "SELECT * FROM test", + warehouse_id: "test-warehouse-id", + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should use INLINE + ARROW_STREAM by default when no format specified", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + disposition: "INLINE", + format: "ARROW_STREAM", + }), + expect.any(AbortSignal), + ); + }); + + test("/query/:query_key should not pass format parameters when format is explicitly JSON", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi.fn().mockResolvedValue({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + const callArgs = executeMock.mock.calls[0][1]; + expect(callArgs).not.toHaveProperty("disposition"); + expect(callArgs).not.toHaveProperty("format"); + }); + + test("/query/:query_key should fall back from ARROW_STREAM to JSON when warehouse rejects ARROW_STREAM", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValueOnce( + new Error( + "INVALID_PARAMETER_VALUE: Inline disposition only supports JSON_ARRAY format", + ), + ) + .mockResolvedValueOnce({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // First call: ARROW_STREAM (rejected) + expect(executeMock.mock.calls[0][1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + // Second call: JSON (no format params, uses defaults) + const secondCallArgs = executeMock.mock.calls[1][1]; + expect(secondCallArgs).not.toHaveProperty("disposition"); + expect(secondCallArgs).not.toHaveProperty("format"); + }); + + test("/query/:query_key should fall back through all formats when each is rejected", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValueOnce( + new Error("INVALID_PARAMETER_VALUE: only supports JSON_ARRAY"), + ) + .mockRejectedValueOnce( + new Error("INVALID_PARAMETER_VALUE: only supports ARROW_STREAM"), + ) + .mockResolvedValueOnce({ + result: { data: [{ id: 1 }] }, + }); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + expect(executeMock).toHaveBeenCalledTimes(3); + // Third call: ARROW (EXTERNAL_LINKS) + expect(executeMock.mock.calls[2][1]).toMatchObject({ + disposition: "EXTERNAL_LINKS", + format: "ARROW_STREAM", + }); + }); + + test("/query/:query_key should not fall back for non-format errors", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue(new Error("PERMISSION_DENIED: no access")); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {} }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // All calls use same format (ARROW_STREAM) — no format fallback occurred. + // (executeStream's retry interceptor may retry, but always with the same format.) + for (const call of executeMock.mock.calls) { + expect(call[1]).toMatchObject({ + disposition: "INLINE", + format: "ARROW_STREAM", + }); + } + }); + + test("/query/:query_key should not fall back when format is explicitly JSON", async () => { + const plugin = new AnalyticsPlugin(config); + const { router, getHandler } = createMockRouter(); + + (plugin as any).app.getAppQuery = vi.fn().mockResolvedValue({ + query: "SELECT * FROM test", + isAsUser: false, + }); + + const executeMock = vi + .fn() + .mockRejectedValue( + new Error("INVALID_PARAMETER_VALUE: only supports ARROW_STREAM"), + ); + (plugin as any).SQLClient.executeStatement = executeMock; + + plugin.injectRoutes(router); + + const handler = getHandler("POST", "/query/:query_key"); + const mockReq = createMockRequest({ + params: { query_key: "test_query" }, + body: { parameters: {}, format: "JSON" }, + }); + const mockRes = createMockResponse(); + + await handler(mockReq, mockRes); + + // All calls have no disposition/format — explicit JSON uses defaults, no fallback. + for (const call of executeMock.mock.calls) { + expect(call[1]).not.toHaveProperty("disposition"); + expect(call[1]).not.toHaveProperty("format"); + } + }); + test("should return 404 when query file is not found", async () => { const plugin = new AnalyticsPlugin(config); const { router, getHandler } = createMockRouter(); diff --git a/packages/appkit/src/plugins/analytics/types.ts b/packages/appkit/src/plugins/analytics/types.ts index c58b6ecf..bc7568f9 100644 --- a/packages/appkit/src/plugins/analytics/types.ts +++ b/packages/appkit/src/plugins/analytics/types.ts @@ -4,7 +4,7 @@ export interface IAnalyticsConfig extends BasePluginConfig { timeout?: number; } -export type AnalyticsFormat = "JSON" | "ARROW"; +export type AnalyticsFormat = "JSON" | "ARROW" | "ARROW_STREAM"; export interface IAnalyticsQueryRequest { parameters?: Record; format?: AnalyticsFormat; diff --git a/packages/appkit/src/type-generator/query-registry.ts b/packages/appkit/src/type-generator/query-registry.ts index 0ed785cb..7d31ca3e 100644 --- a/packages/appkit/src/type-generator/query-registry.ts +++ b/packages/appkit/src/type-generator/query-registry.ts @@ -266,10 +266,32 @@ export async function generateQueriesFromDescribe( sqlHash, cleanedSql, }: (typeof uncachedQueries)[number]): Promise => { - const result = (await client.statementExecution.executeStatement({ - statement: `DESCRIBE QUERY ${cleanedSql}`, - warehouse_id: warehouseId, - })) as DatabricksStatementExecutionResponse; + let result: DatabricksStatementExecutionResponse; + try { + // Prefer JSON_ARRAY for predictable data_array parsing. + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + format: "JSON_ARRAY", + disposition: "INLINE", + })) as DatabricksStatementExecutionResponse; + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err); + if (msg.includes("ARROW_STREAM") || msg.includes("JSON_ARRAY")) { + // Warehouse doesn't support JSON_ARRAY inline — retry with no format + // to let it use its default (typically ARROW_STREAM inline). + logger.debug( + "Warehouse rejected JSON_ARRAY for %s, retrying with default format", + queryName, + ); + result = (await client.statementExecution.executeStatement({ + statement: `DESCRIBE QUERY ${cleanedSql}`, + warehouse_id: warehouseId, + })) as DatabricksStatementExecutionResponse; + } else { + throw err; + } + } completed++; spinner.update(