diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index f36c7f6d15f8e..352f7895d2e6b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -297,14 +297,57 @@ export class PreAggregationPartitionRangeLoader { .map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`) .join(' UNION ALL '); + const baseTargetTableName = allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`; + + // Build per-usage target table names if usageMapping is present + let usageTargetTableNames: Record | undefined; + if (this.preAggregation.usageMapping) { + usageTargetTableNames = {}; + for (const [suffix, usageInfo] of Object.entries(this.preAggregation.usageMapping)) { + if (usageInfo.dateRange && this.preAggregation.partitionGranularity) { + // Load partition ranges specific to this usage's dateRange. + // Use partitionRange (generated locally via timeSeries, always in DEFAULT_TS_FORMAT) + // instead of buildRangeEnd (from DB, may include Z suffix depending on driver timestampFormat). + const usageDateRange = PreAggregationPartitionRangeLoader.intersectDateRanges( + [loadResults[0]?.partitionRange?.[0] || null, loadResults[loadResults.length - 1]?.partitionRange?.[1] || null] as QueryDateRange, + usageInfo.dateRange as QueryDateRange, + ); + if (usageDateRange) { + const usagePartitions = loadResults.filter(r => { + if (!r.partitionRange) return true; + const [pStart, pEnd] = r.partitionRange; + const [uStart, uEnd] = usageDateRange; + return pEnd >= uStart && pStart <= uEnd; + }); + const usageTableNames = usagePartitions.map(r => r.targetTableName); + if (usageTableNames.length === 1) { + [usageTargetTableNames[suffix]] = usageTableNames; + } else if (usageTableNames.length > 0) { + const usageUnion = usageTableNames + .map(t => `SELECT * FROM ${t}`) + .join(' UNION ALL '); + usageTargetTableNames[suffix] = `(${usageUnion})`; + } else { + usageTargetTableNames[suffix] = baseTargetTableName; + } + } else { + usageTargetTableNames[suffix] = baseTargetTableName; + } + } else { + usageTargetTableNames[suffix] = baseTargetTableName; + } + } + } + return { - targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`, + targetTableName: baseTargetTableName, refreshKeyValues: loadResults.map(t => t.refreshKeyValues), lastUpdatedAt, buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd, lambdaTable, rollupLambdaId: this.preAggregation.rollupLambdaId, isMultiTableUnion: allTableTargetNames.length > 1, + usageTargetTableNames, }; } else { return new PreAggregationLoader( diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 71eea71b64cf1..55ee2b9eb40e8 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -149,6 +149,7 @@ export type LoadPreAggregationResult = { rollupLambdaId?: string; partitionRange?: QueryDateRange; isMultiTableUnion?: boolean; + usageTargetTableNames?: Record; }; export type PreAggregationTableToTempTable = [string, LoadPreAggregationResult]; @@ -192,6 +193,7 @@ export type PreAggregationDescription = { sealAt?: string; rollupLambdaId?: string; lastRollupLambda?: boolean; + usageMapping?: Record; }; export const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy( diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index c4919daa1bb6e..6e9f744a7a0cf 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -115,7 +115,7 @@ export type PreAggTableToTempTable = [ TempTable, ]; -export type PreAggTableToTempTableNames = [string, { targetTableName: string; }]; +export type PreAggTableToTempTableNames = [string, { targetTableName: string; usageTargetTableNames?: Record; }]; export type CacheKeyItem = string | string[] | QueryWithParams | QueryWithParams[] | undefined; @@ -429,9 +429,22 @@ export class QueryCache { ? queryAndParams : [queryAndParams, []]; const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( - (query, [tableName, { targetTableName }]) => ( - QueryCache.replaceAll(tableName, targetTableName, query) - ), + (query, [tableName, { targetTableName, usageTargetTableNames }]) => { + // First replace usage-specific placeholders (e.g. tableName__usage_0) + if (usageTargetTableNames) { + for (const [suffix, usageTargetName] of Object.entries(usageTargetTableNames)) { + query = QueryCache.replaceAll(`${tableName}${suffix}`, usageTargetName, query); + } + } + // Replace base table name only when there are no usage-specific replacements. + // When usageTargetTableNames is present, all SQL references already use __usage_N suffixes + // and the base replacement would incorrectly match inside already-replaced target names + // (e.g. turning "preagg20200101_hash" into "preagg20200101_hash20200101_hash"). + if (!usageTargetTableNames || Object.keys(usageTargetTableNames).length === 0) { + return QueryCache.replaceAll(tableName, targetTableName, query); + } + return query; + }, keyQuery ); return Array.isArray(queryAndParams) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 7adad3e85ec0d..e15ee7bc1a8bf 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -959,11 +959,9 @@ export class BaseQuery { try { const buildResult = nativeBuildSqlAndParams(queryParams); - const [query, params, preAggregation] = buildResult; + const [query, params, preAggResult] = buildResult; const paramsArray = [...params]; - if (preAggregation) { - this.preAggregations.preAggregationForQuery = preAggregation; - } + this.applyNativePreAggResult(preAggResult); return [query, paramsArray]; } catch (e) { if (e.name === 'TesseractUserError') { @@ -1009,8 +1007,21 @@ export class BaseQuery { const buildResult = nativeBuildSqlAndParams(queryParams); - const [, , preAggregation] = buildResult; - return preAggregation; + const [, , preAggResult] = buildResult; + this.applyNativePreAggResult(preAggResult); + return this.preAggregations.preAggregationForQuery; + } + + applyNativePreAggResult(preAggResult) { + if (!preAggResult) return; + if (Array.isArray(preAggResult)) { + this.preAggregations.preAggregationUsageInfos = preAggResult; + const first = preAggResult[0]; + this.preAggregations.preAggregationForQuery = + this.getPreAggregationByName(first.cubeName, first.preAggregationName); + } else { + this.preAggregations.preAggregationForQuery = preAggResult; + } } allCubeMembers(path) { @@ -3491,7 +3502,7 @@ export class BaseQuery { } escapeStringLiteral(str) { - return `'${str.replace(/'/g, "''")}'`; + return `'${str.replace(/'/g, '\'\'')}'`; } autoPrefixAndEvaluateSql(cubeName, sql, isMemberExpr = false) { diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts index 16899de004b31..250e7661efea4 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts @@ -94,6 +94,17 @@ export type FullPreAggregationDescription = any; */ export type TransformedQuery = any; +export type UsageDateRangeInfo = { + dateRange?: [string, string]; +}; + +export type PreAggregationUsageInfo = { + cubeName: string; + preAggregationName: string; + external: boolean; + usages: Record; +}; + export class PreAggregations { private readonly query: BaseQuery; @@ -107,6 +118,8 @@ export class PreAggregations { public preAggregationForQuery: PreAggregationForQuery | undefined = undefined; + public preAggregationUsageInfos: PreAggregationUsageInfo[] | undefined = undefined; + public constructor(query: BaseQuery, historyQueries, cubeLatticeCache) { this.query = query; this.historyQueries = historyQueries; @@ -137,6 +150,10 @@ export class PreAggregations { const isInPreAggregationQuery = this.query.options.preAggregationQuery; if (!isInPreAggregationQuery) { const preAggregationForQuery = this.findPreAggregationForQuery(); + // Check usageInfos after findPreAggregationForQuery (which may populate them) + if (this.preAggregationUsageInfos && this.preAggregationUsageInfos.length > 0) { + return this.preAggregationDescriptionsForUsageInfos(this.preAggregationUsageInfos); + } if (preAggregationForQuery) { return this.preAggregationDescriptionsFor(preAggregationForQuery); } @@ -165,6 +182,48 @@ export class PreAggregations { return join.joins.map(j => j.originalTo).concat([join.root]); } + private preAggregationDescriptionsForUsageInfos(usageInfos: PreAggregationUsageInfo[]): FullPreAggregationDescription[] { + return usageInfos.flatMap(usageInfo => { + const preAggObj = this.getRollupPreAggregationByName(usageInfo.cubeName, usageInfo.preAggregationName); + if (!preAggObj || !('preAggregationName' in preAggObj)) { + return []; + } + const foundPreAgg = preAggObj as PreAggregationForQuery; + + // One description per physical pre-aggregation, with usageMapping attached + const descriptions = this.preAggregationDescriptionsFor(foundPreAgg); + + // Compute the union of all usage date ranges so that partitions cover + // every usage (e.g. time_shift may require earlier partitions). + const mergedDateRange = PreAggregations.mergeUsageDateRanges(usageInfo.usages); + + return descriptions.map(desc => ({ + ...desc, + usageMapping: usageInfo.usages, + ...(mergedDateRange && desc.matchedTimeDimensionDateRange ? { matchedTimeDimensionDateRange: mergedDateRange } : {}), + })); + }); + } + + private static mergeUsageDateRanges(usages: Record): [string, string] | null { + let minDate: string | null = null; + let maxDate: string | null = null; + + for (const usage of Object.values(usages)) { + if (usage.dateRange) { + const [from, to] = usage.dateRange; + if (!minDate || from < minDate) { + minDate = from; + } + if (!maxDate || to > maxDate) { + maxDate = to; + } + } + } + + return minDate && maxDate ? [minDate, maxDate] : null; + } + private preAggregationDescriptionsFor(foundPreAggregation: PreAggregationForQuery): FullPreAggregationDescription[] { let preAggregations: PreAggregationForQuery[] = [foundPreAggregation]; if (foundPreAggregation.preAggregation.type === 'rollupJoin') { diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts index 4a476040c0010..9c6f2f467f829 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts @@ -250,6 +250,86 @@ describe('PreAggregationsMultiStage', () => { }, }) + cube('monthly_data', { + sql: \` + SELECT 1 AS id, 10 AS amount, 'a' AS category, '2017-01-10'::TIMESTAMP AS created_at UNION ALL + SELECT 2 AS id, 20 AS amount, 'b' AS category, '2017-01-20'::TIMESTAMP AS created_at UNION ALL + SELECT 4 AS id, 100 AS amount, 'a' AS category, '2017-02-10'::TIMESTAMP AS created_at UNION ALL + SELECT 5 AS id, 100 AS amount, 'b' AS category, '2017-02-20'::TIMESTAMP AS created_at UNION ALL + SELECT 10 AS id, 200 AS amount, 'a' AS category, '2017-03-10'::TIMESTAMP AS created_at UNION ALL + SELECT 20 AS id, 200 AS amount, 'b' AS category, '2017-03-20'::TIMESTAMP AS created_at + \`, + + sqlAlias: 'md', + + dimensions: { + id: { + type: 'number', + sql: 'id', + primaryKey: true + }, + category: { + type: 'string', + sql: 'category' + }, + created_at: { + type: 'time', + sql: 'created_at' + }, + }, + + measures: { + revenue: { + sql: 'amount', + type: 'sum' + }, + count: { + type: 'count' + }, + revenue_per_id: { + multi_stage: true, + sql: \`\${revenue} / \${id}\`, + type: 'sum', + add_group_by: [monthly_data.id], + }, + count_by_category: { + multi_stage: true, + sql: \`\${count}\`, + type: 'sum', + add_group_by: [monthly_data.category], + }, + prev_month_revenue: { + multi_stage: true, + sql: \`\${revenue}\`, + type: 'number', + timeShift: [{ + timeDimension: created_at, + interval: '1 month', + type: 'prior', + }], + }, + }, + + preAggregations: { + revenueById: { + type: 'rollup', + measureReferences: [revenue], + dimensionReferences: [id], + timeDimensionReference: created_at, + granularity: 'day', + partitionGranularity: 'month', + }, + countByCat: { + type: 'rollup', + measureReferences: [revenue, count], + dimensionReferences: [category], + timeDimensionReference: created_at, + granularity: 'day', + partitionGranularity: 'month', + }, + }, + }) + `); if (getEnv('nativeSqlPlanner')) { @@ -388,6 +468,98 @@ describe('PreAggregationsMultiStage', () => { ); }); })); + + it('two multi-stage measures with different pre-aggregations', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'monthly_data.revenue_per_id', + 'monthly_data.count_by_category' + ], + timeDimensions: [{ + dimension: 'monthly_data.created_at', + granularity: 'month', + dateRange: ['2017-01-01', '2017-03-31'] + }], + timezone: 'UTC', + order: [{ + id: 'monthly_data.created_at' + }], + preAggregationsSchema: '', + cubestoreSupportMultistage: true + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + const tableNames = preAggregationsDescription.map((d: any) => d.tableName); + expect(tableNames).toContain('md_revenue_by_id'); + expect(tableNames).toContain('md_count_by_cat'); + expect(sqlAndParams[0]).toContain('md_revenue_by_id'); + expect(sqlAndParams[0]).toContain('md_count_by_cat'); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + md__created_at_month: '2017-01-01T00:00:00.000Z', + md__revenue_per_id: '20.0000000000000000', + md__count_by_category: '2' + }, + { + md__created_at_month: '2017-02-01T00:00:00.000Z', + md__revenue_per_id: '45.0000000000000000', + md__count_by_category: '2' + }, + { + md__created_at_month: '2017-03-01T00:00:00.000Z', + md__revenue_per_id: '30.0000000000000000', + md__count_by_category: '2' + } + ] + ); + }); + })); + + it('multi-stage with time_shift loading different pre-aggregation partitions', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'monthly_data.revenue_per_id', + 'monthly_data.prev_month_revenue' + ], + timeDimensions: [{ + dimension: 'monthly_data.created_at', + granularity: 'month', + dateRange: ['2017-02-01', '2017-03-31'] + }], + timezone: 'UTC', + order: [{ + id: 'monthly_data.created_at' + }], + preAggregationsSchema: '', + cubestoreSupportMultistage: true + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + expect(preAggregationsDescription.length).toBeGreaterThanOrEqual(1); + expect(preAggregationsDescription.some((d: any) => d.tableName.startsWith('md_'))).toBe(true); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + md__created_at_month: '2017-02-01T00:00:00.000Z', + md__revenue_per_id: '45.0000000000000000', + md__prev_month_revenue: '30' + }, + { + md__created_at_month: '2017-03-01T00:00:00.000Z', + md__revenue_per_id: '30.0000000000000000', + md__prev_month_revenue: '200' + } + ] + ); + }); + })); } else { it.skip('multi stage pre-aggregations', () => { // Skipping because it works only in Tesseract diff --git a/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts b/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts index 80f3259ef4bbb..3eb7753699a79 100644 --- a/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts +++ b/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts @@ -95,6 +95,16 @@ export class BaseDbRunner { range => `SELECT * FROM ${PreAggregationPartitionRangeLoader.partitionTableName(desc.tableName, desc.partitionGranularity, range)}_${suffix}` ).join(' UNION ALL '); const targetTableName = desc.dateRange ? `(${partitionUnion})` : `${desc.tableName}_${suffix}`; + // Replace usage-suffixed placeholders first (e.g. tableName__usage_0) + if (desc.usageMapping) { + for (const usageSuffix of Object.keys(desc.usageMapping)) { + replacedQuery = replacedQuery.replace( + new RegExp(`${desc.tableName}${usageSuffix.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s+`, 'g'), + `${targetTableName} ` + ); + } + } + // Replace base table name return replacedQuery.replace( new RegExp(`${desc.tableName}\\s+`, 'g'), `${targetTableName} ` diff --git a/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap b/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap index 83fb966c1071e..c8f5ecb5a8dd7 100644 --- a/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap +++ b/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap @@ -9236,51 +9236,51 @@ Array [ exports[`Queries with the @cubejs-backend/mysql-driver querying BigECommerce: SeveralMultiStageMeasures 1`] = ` Array [ Object { - "BigECommerce.count": 2, + "BigECommerce.count": "2", "BigECommerce.orderDate": "2020-01-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-01-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, + "BigECommerce.percentageOfTotalForStatus": "100", "BigECommerce.totalCountRetailMonthAgo": null, "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 1, + "BigECommerce.count": "1", "BigECommerce.orderDate": "2020-02-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-02-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 2, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 2, + "BigECommerce.count": "2", "BigECommerce.orderDate": "2020-03-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-03-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 1, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 1, + "BigECommerce.count": "1", "BigECommerce.orderDate": "2020-04-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-04-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 2, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 5, + "BigECommerce.count": "5", "BigECommerce.orderDate": "2020-05-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-05-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 1, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 7, + "BigECommerce.count": "7", "BigECommerce.orderDate": "2020-06-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-06-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 5, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "5", "BigECommerce.totalProfitYearAgo": null, }, Object { @@ -9288,39 +9288,39 @@ Array [ "BigECommerce.orderDate": "2020-07-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-07-01T00:00:00.000", "BigECommerce.percentageOfTotalForStatus": null, - "BigECommerce.totalCountRetailMonthAgo": 7, + "BigECommerce.totalCountRetailMonthAgo": "7", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 6, + "BigECommerce.count": "6", "BigECommerce.orderDate": "2020-09-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-09-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, + "BigECommerce.percentageOfTotalForStatus": "100", "BigECommerce.totalCountRetailMonthAgo": null, "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 4, + "BigECommerce.count": "4", "BigECommerce.orderDate": "2020-10-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-10-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 6, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "6", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 9, + "BigECommerce.count": "9", "BigECommerce.orderDate": "2020-11-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-11-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 4, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "4", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 7, + "BigECommerce.count": "7", "BigECommerce.orderDate": "2020-12-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-12-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 9, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "9", "BigECommerce.totalProfitYearAgo": null, }, ] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs index dd8a0bffc962f..1fb2ff414b451 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs @@ -29,6 +29,7 @@ pub struct PreAggregationTable { pub cube_alias: String, pub name: String, pub alias: Option, + pub usage_index: Option, } #[derive(Clone, Debug)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index f0a2bd5b54066..64a99d5690034 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -3,18 +3,42 @@ use super::*; use crate::logical_plan::visitor::{LogicalPlanRewriter, NodeRewriteResult}; use crate::logical_plan::*; use crate::plan::FilterItem; +use crate::planner::filter::FilterOp; use crate::planner::join_hints::JoinHints; use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; +use crate::planner::planners::multi_stage::TimeShiftState; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::time_dimension::QueryDateTime; use cubenativeutils::CubeError; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::rc::Rc; +pub struct PreAggregationUsage { + pub index: usize, + pub pre_aggregation: Rc, + pub date_range: Option<(String, String)>, +} + +impl PreAggregationUsage { + pub fn name(&self) -> &String { + self.pre_aggregation.name() + } + + pub fn cube_name(&self) -> &String { + self.pre_aggregation.cube_name() + } + + pub fn external(&self) -> bool { + self.pre_aggregation.external() + } +} + pub struct PreAggregationOptimizer { query_tools: Rc, allow_multi_stage: bool, - used_pre_aggregations: HashMap<(String, String), Rc>, + usages: Vec, + usage_counter: usize, } impl PreAggregationOptimizer { @@ -22,7 +46,8 @@ impl PreAggregationOptimizer { Self { query_tools, allow_multi_stage, - used_pre_aggregations: HashMap::new(), + usages: Vec::new(), + usage_counter: 0, } } @@ -38,14 +63,22 @@ impl PreAggregationOptimizer { let compiled_pre_aggregations = compiler.compile_all_pre_aggregations(disable_external_pre_aggregations)?; - for pre_aggregation in compiled_pre_aggregations.iter() { - if let Some(id) = pre_aggregation_id { - let full_name = format!("{}.{}", pre_aggregation.cube_name, pre_aggregation.name); - if full_name != id { - continue; - } - } - let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?; + let filtered_pre_aggregations: Vec<_> = if let Some(id) = pre_aggregation_id { + compiled_pre_aggregations + .iter() + .filter(|pa| format!("{}.{}", pa.cube_name, pa.name) == id) + .cloned() + .collect() + } else { + compiled_pre_aggregations + }; + + if !plan.multistage_members().is_empty() && self.allow_multi_stage { + return self.try_rewrite_query_with_multistages(&plan, &filtered_pre_aggregations); + } + + for pre_aggregation in filtered_pre_aggregations.iter() { + let new_query = self.try_rewrite_simple_query(&plan, pre_aggregation, None)?; if new_query.is_some() { return Ok(new_query); } @@ -54,35 +87,26 @@ impl PreAggregationOptimizer { Ok(None) } - pub fn get_used_pre_aggregations(&self) -> Vec> { - self.used_pre_aggregations.values().cloned().collect() + pub fn get_usages(&self) -> &Vec { + &self.usages } - fn try_rewrite_query( - &mut self, - query: Rc, - pre_aggregation: &Rc, - ) -> Result>, CubeError> { - if query.multistage_members().is_empty() { - self.try_rewrite_simple_query(&query, pre_aggregation) - } else if !self.allow_multi_stage { - Ok(None) - } else { - self.try_rewrite_query_with_multistages(&query, pre_aggregation) - } + pub fn take_usages(&mut self) -> Vec { + std::mem::take(&mut self.usages) } fn try_rewrite_simple_query( &mut self, query: &Rc, pre_aggregation: &Rc, + date_range: Option<(String, String)>, ) -> Result>, CubeError> { if let Some(matched_measures) = self.is_schema_and_filters_match(&query.schema(), &query.filter(), pre_aggregation)? { let mut new_query = query.as_ref().clone(); new_query.set_source( - self.make_pre_aggregation_source(pre_aggregation, &matched_measures)? + self.make_pre_aggregation_source(pre_aggregation, &matched_measures, date_range)? .into(), ); Ok(Some(Rc::new(new_query))) @@ -91,22 +115,50 @@ impl PreAggregationOptimizer { } } + fn try_rewrite_leaf_query( + &mut self, + query: Rc, + compiled_pre_aggregations: &[Rc], + time_shifts: &TimeShiftState, + ) -> Result>, CubeError> { + if !query.multistage_members().is_empty() { + // Nested multi-stage: recurse with full list + return self.try_rewrite_query_with_multistages(&query, compiled_pre_aggregations); + } + + for pre_aggregation in compiled_pre_aggregations.iter() { + let external = pre_aggregation.external.unwrap_or(false); + let date_range = + Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts, external); + let result = self.try_rewrite_simple_query(&query, pre_aggregation, date_range)?; + if result.is_some() { + return Ok(result); + } + } + Ok(None) + } + fn try_rewrite_query_with_multistages( &mut self, query: &Rc, - pre_aggregation: &Rc, + compiled_pre_aggregations: &[Rc], ) -> Result>, CubeError> { let rewriter = LogicalPlanRewriter::new(); let mut has_unrewritten_leaf = false; + // Save state in case we need to rollback + let saved_usages_len = self.usages.len(); + let saved_counter = self.usage_counter; + let mut rewritten_multistages = Vec::new(); for multi_stage in query.multistage_members() { let rewritten = rewriter.rewrite_top_down_with(multi_stage.clone(), |plan_node| { let res = match plan_node { PlanNode::MultiStageLeafMeasure(multi_stage_leaf_measure) => { - if let Some(rewritten) = self.try_rewrite_query( + if let Some(rewritten) = self.try_rewrite_leaf_query( multi_stage_leaf_measure.query.clone(), - pre_aggregation, + compiled_pre_aggregations, + &multi_stage_leaf_measure.time_shifts, )? { let new_leaf = Rc::new(MultiStageLeafMeasure { measure: multi_stage_leaf_measure.measure.clone(), @@ -134,6 +186,9 @@ impl PreAggregationOptimizer { } if has_unrewritten_leaf { + // Rollback usages added during failed attempt + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; return Ok(None); } @@ -145,31 +200,50 @@ impl PreAggregationOptimizer { resolver_multiplied_measures, ) = resolver_multiplied_measures { - if let Some(matched_measures) = self.is_schema_and_filters_match( - &resolver_multiplied_measures.schema, - &resolver_multiplied_measures.filter, - &pre_aggregation, - )? { - let pre_aggregation_source = - self.make_pre_aggregation_source(pre_aggregation, &matched_measures)?; - - let pre_aggregation_query = Query::builder() - .schema(resolver_multiplied_measures.schema.clone()) - .filter(resolver_multiplied_measures.filter.clone()) - .modifers(Rc::new(LogicalQueryModifiers { - offset: None, - limit: None, - ungrouped: false, - order_by: vec![], - })) - .source(pre_aggregation_source.into()) - .build(); - Some(ResolvedMultipliedMeasures::PreAggregation(Rc::new( - pre_aggregation_query, - ))) - } else { + // Try each pre-aggregation for the multiplied measures resolver + let mut result_source = None; + for pre_aggregation in compiled_pre_aggregations.iter() { + if let Some(matched_measures) = self.is_schema_and_filters_match( + &resolver_multiplied_measures.schema, + &resolver_multiplied_measures.filter, + pre_aggregation, + )? { + let date_range = Self::extract_date_range( + &resolver_multiplied_measures.filter, + &self.query_tools, + &TimeShiftState::default(), + pre_aggregation.external.unwrap_or(false), + ); + let pre_aggregation_source = self.make_pre_aggregation_source( + pre_aggregation, + &matched_measures, + date_range, + )?; + + let pre_aggregation_query = Query::builder() + .schema(resolver_multiplied_measures.schema.clone()) + .filter(resolver_multiplied_measures.filter.clone()) + .modifers(Rc::new(LogicalQueryModifiers { + offset: None, + limit: None, + ungrouped: false, + order_by: vec![], + })) + .source(pre_aggregation_source.into()) + .build(); + result_source = Some(ResolvedMultipliedMeasures::PreAggregation( + Rc::new(pre_aggregation_query), + )); + break; + } + } + if result_source.is_none() { + // Rollback + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; return Ok(None); } + result_source } else { Some(resolver_multiplied_measures.clone()) } @@ -187,6 +261,17 @@ impl PreAggregationOptimizer { query.source().clone() }; + // Reject mixed external/non-external pre-aggregation usages + let new_usages = &self.usages[saved_usages_len..]; + if !new_usages.is_empty() { + let first_external = new_usages[0].external(); + if new_usages.iter().any(|u| u.external() != first_external) { + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; + return Ok(None); + } + } + let result = Query::builder() .multistage_members(rewritten_multistages) .schema(query.schema().clone()) @@ -202,7 +287,11 @@ impl PreAggregationOptimizer { &mut self, pre_aggregation: &Rc, matched_measures: &HashSet, + date_range: Option<(String, String)>, ) -> Result, CubeError> { + let usage_index = self.usage_counter; + self.usage_counter += 1; + let filtered_measures: Vec> = pre_aggregation .measures .iter() @@ -221,13 +310,17 @@ impl PreAggregationOptimizer { measures: filtered_measures.clone(), multiplied_measures: HashSet::new(), }; + + // Set usage_index on the source table so the physical plan can generate unique placeholders + let source = Self::source_with_usage_index(&pre_aggregation.source, usage_index); + // Measures are filtered to only those actually consumed during matching. // This prevents calculated measures (e.g. amount_per_count) from getting a // direct column reference when they should be decomposed to base measures. // Dimensions are intentionally NOT filtered: unlike measures (where // sum(precomputed_ratio) != sum(a)/sum(b)), extra dimension references // are harmless — they're simply unused if the query doesn't select them. - let pre_aggregation = PreAggregation::builder() + let pre_aggregation_node = PreAggregation::builder() .name(pre_aggregation.name.clone()) .time_dimensions(pre_aggregation.time_dimensions.clone()) .dimensions(pre_aggregation.dimensions.clone()) @@ -236,17 +329,95 @@ impl PreAggregationOptimizer { .schema(Rc::new(schema)) .external(pre_aggregation.external.unwrap_or_default()) .granularity(pre_aggregation.granularity.clone()) - .source(pre_aggregation.source.clone()) + .source(source) .cube_name(pre_aggregation.cube_name.clone()) + .usage_index(Some(usage_index)) .build(); - let result = Rc::new(pre_aggregation); - self.used_pre_aggregations.insert( - (result.cube_name().clone(), result.name().clone()), - result.clone(), - ); + let result = Rc::new(pre_aggregation_node); + + self.usages.push(PreAggregationUsage { + index: usage_index, + pre_aggregation: result.clone(), + date_range, + }); + Ok(result) } + fn source_with_usage_index( + source: &Rc, + usage_index: usize, + ) -> Rc { + match source.as_ref() { + PreAggregationSource::Single(table) => { + Rc::new(PreAggregationSource::Single(PreAggregationTable { + usage_index: Some(usage_index), + ..table.clone() + })) + } + PreAggregationSource::Union(union) => { + let items = union + .items + .iter() + .map(|t| { + Rc::new(PreAggregationTable { + usage_index: Some(usage_index), + ..t.as_ref().clone() + }) + }) + .collect(); + Rc::new(PreAggregationSource::Union(PreAggregationUnion { items })) + } + PreAggregationSource::Join(_) => { + // Join pre-aggregations: usage_index is set on the PreAggregation node itself + source.clone() + } + } + } + + fn extract_date_range( + filter: &LogicalFilter, + query_tools: &Rc, + time_shifts: &TimeShiftState, + external: bool, + ) -> Option<(String, String)> { + let precision = query_tools + .base_tools() + .driver_tools(external) + .ok() + .and_then(|dt| dt.timestamp_precision().ok()) + .unwrap_or(3); + for item in &filter.time_dimensions_filters { + if let FilterItem::Item(base_filter) = item { + if let FilterOp::DateRange(date_range_op) = base_filter.operation() { + if let Ok((from, to)) = date_range_op.formatted_date_range(precision) { + // Apply time shift for this dimension if present. + // SQL renders `column + interval`, so actual data range is `date - interval`. + if let Some(interval) = time_shifts + .dimensions_shifts + .get(&base_filter.member_name()) + .and_then(|s| s.interval.as_ref()) + { + let tz = query_tools.timezone(); + let neg = -interval.clone(); + let shifted_from = QueryDateTime::from_date_str(tz, &from) + .and_then(|dt| dt.add_interval(&neg)) + .map(|dt| dt.default_format()) + .unwrap_or(from); + let shifted_to = QueryDateTime::from_date_str(tz, &to) + .and_then(|dt| dt.add_interval(&neg)) + .map(|dt| dt.default_format()) + .unwrap_or(to); + return Some((shifted_from, shifted_to)); + } + return Some((from, to)); + } + } + } + } + None + } + fn is_schema_and_filters_match( &self, schema: &Rc, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs index 6b559125faf9d..717dea63bfc27 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs @@ -265,6 +265,7 @@ impl PreAggregationsCompiler { cube_alias, name: name.name.clone(), alias: static_data.sql_alias.clone(), + usage_index: None, }) }; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs index b5a35d2c67fdb..3c1c6f0e5a6dc 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs @@ -22,6 +22,8 @@ pub struct PreAggregation { granularity: Option, source: Rc, cube_name: String, + #[builder(default)] + usage_index: Option, } impl PreAggregation { @@ -64,6 +66,10 @@ impl PreAggregation { pub fn cube_name(&self) -> &String { &self.cube_name } + + pub fn usage_index(&self) -> Option { + self.usage_index + } } impl LogicalNode for PreAggregation { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs index 8e58f3e3976c6..b2558e405dfb8 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs @@ -26,9 +26,13 @@ impl PreAggregationProcessor<'_> { ) -> Result { let query_tools = self.builder.query_tools(); let name = table.alias.clone().unwrap_or_else(|| table.name.clone()); - let table_name = query_tools + let base_table_name = query_tools .base_tools() .pre_aggregation_table_name(table.cube_name.clone(), name.clone())?; + let table_name = match table.usage_index { + Some(idx) => format!("{}__usage_{}", base_table_name, idx), + None => base_table_name, + }; let alias = PlanSqlTemplates::member_alias_name(&table.cube_name, &name, &None); let res = SingleAliasedSource::new_from_table_reference( table_name, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 92ef1b9a824ba..f64d382d9f3e7 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -3,14 +3,33 @@ use super::top_level_planner::TopLevelPlanner; use super::QueryProperties; use crate::cube_bridge::base_query_options::BaseQueryOptions; use crate::cube_bridge::pre_aggregation_obj::NativePreAggregationObj; +use crate::logical_plan::PreAggregationUsage; use cubenativeutils::wrappers::inner_types::InnerTypes; use cubenativeutils::wrappers::object::NativeArray; use cubenativeutils::wrappers::serializer::NativeSerialize; use cubenativeutils::wrappers::NativeType; use cubenativeutils::wrappers::{NativeContextHolder, NativeObjectHandle}; use cubenativeutils::CubeError; +use serde::Serialize; +use std::collections::HashMap; use std::rc::Rc; +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct UsageDateRange { + #[serde(skip_serializing_if = "Option::is_none")] + date_range: Option>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GroupedPreAggregationInfo { + cube_name: String, + pre_aggregation_name: String, + external: bool, + usages: HashMap, +} + pub struct BaseQuery { context: NativeContextHolder, query_tools: Rc, @@ -59,12 +78,10 @@ impl BaseQuery { self.cubestore_support_multistage, ); - let (sql, used_pre_aggregations) = planner.plan()?; + let (sql, usages) = planner.plan()?; - let is_external = if !used_pre_aggregations.is_empty() { - used_pre_aggregations - .iter() - .all(|pre_aggregation| pre_aggregation.external()) + let is_external = if !usages.is_empty() { + usages.iter().all(|usage| usage.pre_aggregation.external()) } else { false }; @@ -74,13 +91,26 @@ impl BaseQuery { .query_tools .build_sql_and_params(&sql, true, &templates)?; + // For single usage, strip __usage_N suffix from SQL to maintain backward compat + let final_sql = if usages.len() == 1 { + result_sql.replace(&format!("__usage_{}", usages[0].index), "") + } else { + result_sql + }; + let res = self.context.empty_array()?; - res.set(0, result_sql.to_native(self.context.clone())?)?; + res.set(0, final_sql.to_native(self.context.clone())?)?; res.set(1, params.to_native(self.context.clone())?)?; - if let Some(used_pre_aggregation) = used_pre_aggregations.first() { + + if usages.len() > 1 { + // Multiple usages: group by (cubeName, name), return array of grouped infos + let grouped = Self::group_usages(&usages); + res.set(2, grouped.to_native(self.context.clone())?)?; + } else if let Some(usage) = usages.first() { + // Single usage: return old-style pre-aggregation object for backward compat let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name( - used_pre_aggregation.cube_name().clone(), - used_pre_aggregation.name().clone(), + usage.pre_aggregation.cube_name().clone(), + usage.pre_aggregation.name().clone(), )?; res.set( 2, @@ -91,8 +121,48 @@ impl BaseQuery { .to_native(self.context.clone())?, )?; } - let result = NativeObjectHandle::new(res.into_object()); + let result = NativeObjectHandle::new(res.into_object()); Ok(result) } + + fn group_usages(usages: &[PreAggregationUsage]) -> Vec { + let mut groups: HashMap<(String, String), GroupedPreAggregationInfo> = HashMap::new(); + + for usage in usages { + let pre_agg = &usage.pre_aggregation; + let cube_name = pre_agg.cube_name().clone(); + let name = pre_agg.name().clone(); + let key = (cube_name.clone(), name.clone()); + + let suffix = format!("__usage_{}", usage.index); + + let group = groups + .entry(key) + .or_insert_with(|| GroupedPreAggregationInfo { + cube_name, + pre_aggregation_name: name, + external: pre_agg.external(), + usages: HashMap::new(), + }); + + group.usages.insert( + suffix, + UsageDateRange { + date_range: usage + .date_range + .as_ref() + .map(|(from, to)| vec![from.clone(), to.clone()]), + }, + ); + } + + let mut result: Vec<_> = groups.into_values().collect(); + result.sort_by(|a, b| { + a.cube_name + .cmp(&b.cube_name) + .then(a.pre_aggregation_name.cmp(&b.pre_aggregation_name)) + }); + result + } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs index 00d5f7941f035..7015af24cdbd2 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs @@ -105,6 +105,10 @@ impl BaseFilter { self.typed_filter.operator() } + pub fn operation(&self) -> &super::typed_filter::FilterOp { + self.typed_filter.operation() + } + pub fn use_raw_values(&self) -> bool { self.typed_filter.use_raw_values() } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs index 4825c52bb8abc..a567c9df0a5db 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs @@ -2,10 +2,11 @@ pub mod base_filter; pub mod base_segment; pub mod compiler; pub mod filter_operator; -mod operators; +pub(crate) mod operators; pub mod typed_filter; pub use base_filter::BaseFilter; pub use base_segment::BaseSegment; pub use filter_operator::FilterOperator; -pub use typed_filter::resolve_base_symbol; +pub use operators::date_range::DateRangeOp; +pub use typed_filter::{resolve_base_symbol, FilterOp}; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs index 9aa395daefae5..93d359a32b0be 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs @@ -1,4 +1,5 @@ use super::{FilterOperationSql, FilterSqlContext}; +use crate::planner::time_dimension::QueryDateTimeHelper; use cubenativeutils::CubeError; #[derive(Clone, Debug)] @@ -18,6 +19,12 @@ impl DateRangeOp { pub fn new(kind: DateRangeKind, from: String, to: String) -> Self { Self { kind, from, to } } + + pub fn formatted_date_range(&self, precision: u32) -> Result<(String, String), CubeError> { + let from = QueryDateTimeHelper::format_from_date(&self.from, precision)?; + let to = QueryDateTimeHelper::format_to_date(&self.to, precision)?; + Ok((from, to)) + } } impl FilterOperationSql for DateRangeOp { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs index eb75f26bdefac..f7d3951a3e84e 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs @@ -91,6 +91,10 @@ impl TypedFilter { &self.values } + pub fn operation(&self) -> &FilterOp { + &self.op + } + pub fn use_raw_values(&self) -> bool { self.use_raw_values } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs index 3b69cd453491e..77cac4501fc42 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs @@ -2,8 +2,8 @@ use super::planners::QueryPlanner; use super::query_tools::QueryTools; use super::QueryProperties; use crate::logical_plan::OriginalSqlCollector; -use crate::logical_plan::PreAggregation; use crate::logical_plan::PreAggregationOptimizer; +use crate::logical_plan::PreAggregationUsage; use crate::logical_plan::Query; use crate::physical_plan_builder::PhysicalPlanBuilder; use cubenativeutils::CubeError; @@ -29,17 +29,14 @@ impl TopLevelPlanner { } } - pub fn plan(&self) -> Result<(String, Vec>), CubeError> { + pub fn plan(&self) -> Result<(String, Vec), CubeError> { let query_planner = QueryPlanner::new(self.request.clone(), self.query_tools.clone()); let logical_plan = query_planner.plan()?; - let (optimized_plan, used_pre_aggregations) = - self.try_pre_aggregations(logical_plan.clone())?; + let (optimized_plan, usages) = self.try_pre_aggregations(logical_plan.clone())?; - let is_external = if !used_pre_aggregations.is_empty() { - used_pre_aggregations - .iter() - .all(|pre_aggregation| pre_aggregation.external()) + let is_external = if !usages.is_empty() { + usages.iter().all(|usage| usage.pre_aggregation.external()) } else { false }; @@ -62,13 +59,13 @@ impl TopLevelPlanner { let sql = physical_plan.to_sql(&templates)?; - Ok((sql, used_pre_aggregations)) + Ok((sql, usages)) } fn try_pre_aggregations( &self, plan: Rc, - ) -> Result<(Rc, Vec>), CubeError> { + ) -> Result<(Rc, Vec), CubeError> { let result = if !self.request.is_pre_aggregation_query() { let mut pre_aggregation_optimizer = PreAggregationOptimizer::new( self.query_tools.clone(), @@ -82,11 +79,9 @@ impl TopLevelPlanner { disable_external_pre_aggregations, pre_aggregation_id, )? { - if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 { - ( - result, - pre_aggregation_optimizer.get_used_pre_aggregations(), - ) + let usages = pre_aggregation_optimizer.take_usages(); + if !usages.is_empty() { + (result, usages) } else { (plan.clone(), Vec::new()) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml new file mode 100644 index 0000000000000..5de1a4c0749d7 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml @@ -0,0 +1,57 @@ +cubes: + - name: orders + sql: "SELECT * FROM ms_pa_ts_orders" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: status + type: string + sql: status + - name: created_at + type: time + sql: created_at + + measures: + - name: count + type: count + public: false + + - name: revenue + type: sum + sql: amount + public: false + + # Multi-stage: count with time shift (prior 1 month) + - name: count_prev_month + type: number + sql: "{CUBE.count}" + multi_stage: true + time_shift: + - interval: "1 month" + type: prior + timeDimension: orders.created_at + + # Multi-stage: revenue reducing by status + - name: revenue_reduce_status + type: sum + sql: "{CUBE.revenue}" + multi_stage: true + reduce_by: + - orders.status + + pre_aggregations: + - name: count_rollup + measures: + - count + time_dimension: created_at + granularity: day + + - name: revenue_rollup + measures: + - revenue + dimensions: + - status + time_dimension: created_at + granularity: day diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml new file mode 100644 index 0000000000000..d8d51f6d929ec --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml @@ -0,0 +1,50 @@ +cubes: + - name: orders + sql: "SELECT * FROM ms_pa_orders" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: status + type: string + sql: status + + measures: + - name: count + type: count + public: false + + - name: revenue + type: sum + sql: amount + public: false + + # Multi-stage: sum of count, reducing by status + - name: count_reduce_status + type: sum + sql: "{CUBE.count}" + multi_stage: true + reduce_by: + - orders.status + + # Multi-stage: sum of revenue, reducing by status + - name: revenue_reduce_status + type: sum + sql: "{CUBE.revenue}" + multi_stage: true + reduce_by: + - orders.status + + pre_aggregations: + - name: count_rollup + measures: + - count + dimensions: + - status + + - name: revenue_rollup + measures: + - revenue + dimensions: + - status diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql new file mode 100644 index 0000000000000..d1eb5e80465c5 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql @@ -0,0 +1,23 @@ +DROP TABLE IF EXISTS ms_pa_ts_orders CASCADE; + +CREATE TABLE ms_pa_ts_orders ( + id INTEGER PRIMARY KEY, + status VARCHAR(50) NOT NULL, + amount NUMERIC(10,2) NOT NULL, + created_at TIMESTAMP NOT NULL +); + +INSERT INTO ms_pa_ts_orders (id, status, amount, created_at) VALUES + -- December 2024 + (1, 'new', 80.00, '2024-12-10'), + (2, 'active', 120.00, '2024-12-20'), + -- January 2025 + (3, 'new', 100.00, '2025-01-15'), + (4, 'active', 150.00, '2025-01-20'), + -- February 2025 + (5, 'new', 200.00, '2025-02-10'), + (6, 'active', 250.00, '2025-02-15'), + (7, 'completed', 300.00, '2025-02-20'), + -- March 2025 + (8, 'completed', 400.00, '2025-03-10'), + (9, 'active', 350.00, '2025-03-25'); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql new file mode 100644 index 0000000000000..a23a2f07ed100 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS ms_pa_orders CASCADE; + +CREATE TABLE ms_pa_orders ( + id INTEGER PRIMARY KEY, + status VARCHAR(50) NOT NULL, + amount NUMERIC(10,2) NOT NULL +); + +INSERT INTO ms_pa_orders (id, status, amount) VALUES + (1, 'new', 100.00), + (2, 'new', 200.00), + (3, 'active', 150.00), + (4, 'active', 250.00), + (5, 'active', 350.00), + (6, 'completed', 300.00), + (7, 'completed', 400.00); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs index 4af4fadfbaf9e..8998ae873dbdf 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs @@ -1,8 +1,8 @@ use crate::cube_bridge::base_query_options::BaseQueryOptions; use crate::cube_bridge::join_hints::JoinHintItem; -use crate::logical_plan::PreAggregation; +use crate::logical_plan::PreAggregationUsage; #[cfg(feature = "integration-postgres")] -use crate::logical_plan::{PreAggregationSource, PreAggregationTable}; +use crate::logical_plan::{PreAggregation, PreAggregationSource, PreAggregationTable}; use crate::plan::Filter; use crate::planner::filter::base_segment::BaseSegment; use crate::planner::query_tools::QueryTools; @@ -382,7 +382,7 @@ impl TestContext { pub fn build_sql_with_used_pre_aggregations( &self, query: &str, - ) -> Result<(String, Vec>), cubenativeutils::CubeError> { + ) -> Result<(String, Vec), cubenativeutils::CubeError> { let options = self.create_query_options_from_yaml(query); let ctx = self.for_options(options.as_ref())?; let request = QueryProperties::try_new(ctx.query_tools.clone(), options)?; @@ -430,6 +430,11 @@ impl TestContext { .build_sql_and_params(&raw_sql, true, &templates) .expect("Failed to build SQL and params"); + // Strip __usage_N suffixes from SQL, same as base_query.rs does for single usage + let sql = pre_aggregations + .iter() + .fold(sql, |s, u| s.replace(&format!("__usage_{}", u.index), "")); + let final_sql = Self::inline_params(&sql, ¶ms); let messages = client.simple_query(&final_sql).await.unwrap_or_else(|e| { @@ -448,9 +453,10 @@ impl TestContext { async fn create_pre_agg_tables( &self, client: &tokio_postgres::Client, - pre_aggregations: &[Rc], + pre_aggregations: &[PreAggregationUsage], ) { - for pre_agg in pre_aggregations { + for usage in pre_aggregations { + let pre_agg = &usage.pre_aggregation; let tables = Self::collect_pre_agg_source_tables(pre_agg.source()); let yaml = Self::build_pre_agg_query_yaml(pre_agg); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs index d31d66f12a402..a19583bb70ad8 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs @@ -834,6 +834,113 @@ async fn test_multi_stage_count_distinct_sum_by_quarter_with_pre_aggregation() { } } +// --- Multi-stage with separate pre-aggregations --- + +#[tokio::test(flavor = "multi_thread")] +async fn test_multi_stage_separate_pre_aggregations() { + let schema = MockSchema::from_yaml_file("common/multi_stage_separate_pre_aggs_test.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + measures: + - orders.count_reduce_status + - orders.revenue_reduce_status + cubestoreSupportMultistage: true + "}; + + let (_sql, pre_aggrs) = ctx + .build_sql_with_used_pre_aggregations(query_yaml) + .unwrap(); + + assert_eq!(pre_aggrs.len(), 2, "Expected 2 pre-aggregation usages"); + + let names: Vec<&str> = pre_aggrs + .iter() + .map(|u| u.pre_aggregation.name().as_str()) + .collect(); + assert!( + names.contains(&"count_rollup"), + "Expected count_rollup, got {:?}", + names + ); + assert!( + names.contains(&"revenue_rollup"), + "Expected revenue_rollup, got {:?}", + names + ); + + if let Some(result) = ctx + .try_execute_pg(query_yaml, "multi_stage_separate_pre_aggs_tables.sql") + .await + { + insta::assert_snapshot!("multi_stage_separate_pre_aggs_pg_result", result); + } +} + +// --- Multi-stage with separate pre-aggregations and time shift --- + +#[tokio::test(flavor = "multi_thread")] +async fn test_multi_stage_separate_pre_aggs_with_time_shift() { + let schema = MockSchema::from_yaml_file("common/multi_stage_pre_agg_time_shift_test.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + measures: + - orders.count_prev_month + - orders.revenue_reduce_status + time_dimensions: + - dimension: orders.created_at + granularity: month + dateRange: + - \"2025-01-01\" + - \"2025-03-31\" + cubestoreSupportMultistage: true + "}; + + let (_sql, pre_aggrs) = ctx + .build_sql_with_used_pre_aggregations(query_yaml) + .unwrap(); + + assert_eq!(pre_aggrs.len(), 2, "Expected 2 pre-aggregation usages"); + + // Find usages by pre-aggregation name + let count_usage = pre_aggrs + .iter() + .find(|u| u.pre_aggregation.name() == "count_rollup") + .expect("Expected count_rollup usage"); + let revenue_usage = pre_aggrs + .iter() + .find(|u| u.pre_aggregation.name() == "revenue_rollup") + .expect("Expected revenue_rollup usage"); + + // count_prev_month has time_shift prior 1 month, so date range should be shifted back + assert_eq!( + count_usage.date_range, + Some(( + "2024-12-01T00:00:00.000".to_string(), + "2025-02-28T23:59:59.999".to_string() + )), + "count_rollup should have date range shifted 1 month prior" + ); + + // revenue_reduce_status has no time shift, so original date range + assert_eq!( + revenue_usage.date_range, + Some(( + "2025-01-01T00:00:00.000".to_string(), + "2025-03-31T23:59:59.999".to_string() + )), + "revenue_rollup should have original date range" + ); + + if let Some(result) = ctx + .try_execute_pg(query_yaml, "multi_stage_pre_agg_time_shift_tables.sql") + .await + { + insta::assert_snapshot!("multi_stage_separate_pre_aggs_time_shift_pg_result", result); + } +} + // --- rollupJoin with calculated measures through view --- #[test] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap new file mode 100644 index 0000000000000..b08f3c2536a06 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap @@ -0,0 +1,9 @@ +--- +source: cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +assertion_line: 873 +expression: result +snapshot_kind: text +--- +orders__count_reduce_status | orders__revenue_reduce_status +----------------------------+------------------------------ +7 | 1750.00 diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap new file mode 100644 index 0000000000000..a0d46d261d64f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap @@ -0,0 +1,11 @@ +--- +source: cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +assertion_line: 936 +expression: result +snapshot_kind: text +--- +orders__created_at_month | orders__count_prev_month | orders__revenue_reduce_status +-------------------------+--------------------------+------------------------------ +2025-01-01 00:00:00 | 2 | 250.00 +2025-02-01 00:00:00 | 2 | 750.00 +2025-03-01 00:00:00 | 3 | 750.00