Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,55 @@ export class PreAggregationPartitionRangeLoader {
.map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`)
.join(' UNION ALL ');

const baseTargetTableName = allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`;

// Build per-usage target table names if usageMapping is present
let usageTargetTableNames: Record<string, string> | undefined;
if (this.preAggregation.usageMapping) {
usageTargetTableNames = {};
for (const [suffix, usageInfo] of Object.entries(this.preAggregation.usageMapping)) {
if (usageInfo.dateRange && this.preAggregation.partitionGranularity) {
// Load partition ranges specific to this usage's dateRange
const usageDateRange = PreAggregationPartitionRangeLoader.intersectDateRanges(
[loadResults[0]?.buildRangeEnd ? loadResults[0].partitionRange?.[0] : null, loadResults[loadResults.length - 1]?.buildRangeEnd || null] as QueryDateRange,
usageInfo.dateRange as QueryDateRange,
);
if (usageDateRange) {
const usagePartitions = loadResults.filter(r => {
if (!r.partitionRange) return true;
const [pStart, pEnd] = r.partitionRange;
const [uStart, uEnd] = usageDateRange;
return pEnd >= uStart && pStart <= uEnd;
});
const usageTableNames = usagePartitions.map(r => r.targetTableName);
if (usageTableNames.length === 1) {
[usageTargetTableNames[suffix]] = usageTableNames;
} else if (usageTableNames.length > 0) {
const usageUnion = usageTableNames
.map(t => `SELECT * FROM ${t}`)
.join(' UNION ALL ');
usageTargetTableNames[suffix] = `(${usageUnion})`;
} else {
usageTargetTableNames[suffix] = baseTargetTableName;
}
} else {
usageTargetTableNames[suffix] = baseTargetTableName;
}
} else {
usageTargetTableNames[suffix] = baseTargetTableName;
}
}
}

return {
targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`,
targetTableName: baseTargetTableName,
refreshKeyValues: loadResults.map(t => t.refreshKeyValues),
lastUpdatedAt,
buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd,
lambdaTable,
rollupLambdaId: this.preAggregation.rollupLambdaId,
isMultiTableUnion: allTableTargetNames.length > 1,
usageTargetTableNames,
};
} else {
return new PreAggregationLoader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ export type LoadPreAggregationResult = {
rollupLambdaId?: string;
partitionRange?: QueryDateRange;
isMultiTableUnion?: boolean;
usageTargetTableNames?: Record<string, string>;
};

export type PreAggregationTableToTempTable = [string, LoadPreAggregationResult];
Expand Down Expand Up @@ -192,6 +193,7 @@ export type PreAggregationDescription = {
sealAt?: string;
rollupLambdaId?: string;
lastRollupLambda?: boolean;
usageMapping?: Record<string, { dateRange?: QueryDateRange }>;
};

export const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ export type PreAggTableToTempTable = [
TempTable,
];

export type PreAggTableToTempTableNames = [string, { targetTableName: string; }];
export type PreAggTableToTempTableNames = [string, { targetTableName: string; usageTargetTableNames?: Record<string, string>; }];

export type CacheKeyItem = string | string[] | QueryWithParams | QueryWithParams[] | undefined;

Expand Down Expand Up @@ -429,9 +429,16 @@ export class QueryCache {
? queryAndParams
: [queryAndParams, []];
const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce(
(query, [tableName, { targetTableName }]) => (
QueryCache.replaceAll(tableName, targetTableName, query)
),
(query, [tableName, { targetTableName, usageTargetTableNames }]) => {
// First replace usage-specific placeholders (e.g. tableName__usage_0)
if (usageTargetTableNames) {
for (const [suffix, usageTargetName] of Object.entries(usageTargetTableNames)) {
query = QueryCache.replaceAll(`${tableName}${suffix}`, usageTargetName, query);
}
}
// Then replace base table name for any remaining references
return QueryCache.replaceAll(tableName, targetTableName, query);
},
keyQuery
);
return Array.isArray(queryAndParams)
Expand Down
25 changes: 18 additions & 7 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -959,11 +959,9 @@ export class BaseQuery {
try {
const buildResult = nativeBuildSqlAndParams(queryParams);

const [query, params, preAggregation] = buildResult;
const [query, params, preAggResult] = buildResult;
const paramsArray = [...params];
if (preAggregation) {
this.preAggregations.preAggregationForQuery = preAggregation;
}
this.applyNativePreAggResult(preAggResult);
return [query, paramsArray];
} catch (e) {
if (e.name === 'TesseractUserError') {
Expand Down Expand Up @@ -1009,8 +1007,21 @@ export class BaseQuery {

const buildResult = nativeBuildSqlAndParams(queryParams);

const [, , preAggregation] = buildResult;
return preAggregation;
const [, , preAggResult] = buildResult;
this.applyNativePreAggResult(preAggResult);
return this.preAggregations.preAggregationForQuery;
}

applyNativePreAggResult(preAggResult) {
if (!preAggResult) return;
if (Array.isArray(preAggResult)) {
this.preAggregations.preAggregationUsageInfos = preAggResult;
const first = preAggResult[0];
this.preAggregations.preAggregationForQuery =
this.getPreAggregationByName(first.cubeName, first.preAggregationName);
} else {
this.preAggregations.preAggregationForQuery = preAggResult;
}
}

allCubeMembers(path) {
Expand Down Expand Up @@ -3491,7 +3502,7 @@ export class BaseQuery {
}

escapeStringLiteral(str) {
return `'${str.replace(/'/g, "''")}'`;
return `'${str.replace(/'/g, '\'\'')}'`;
}

autoPrefixAndEvaluateSql(cubeName, sql, isMemberExpr = false) {
Expand Down
59 changes: 59 additions & 0 deletions packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ export type FullPreAggregationDescription = any;
*/
export type TransformedQuery = any;

export type UsageDateRangeInfo = {
dateRange?: [string, string];
};

export type PreAggregationUsageInfo = {
cubeName: string;
preAggregationName: string;
external: boolean;
usages: Record<string, UsageDateRangeInfo>;
};

export class PreAggregations {
private readonly query: BaseQuery;

Expand All @@ -107,6 +118,8 @@ export class PreAggregations {

public preAggregationForQuery: PreAggregationForQuery | undefined = undefined;

public preAggregationUsageInfos: PreAggregationUsageInfo[] | undefined = undefined;

public constructor(query: BaseQuery, historyQueries, cubeLatticeCache) {
this.query = query;
this.historyQueries = historyQueries;
Expand Down Expand Up @@ -137,6 +150,10 @@ export class PreAggregations {
const isInPreAggregationQuery = this.query.options.preAggregationQuery;
if (!isInPreAggregationQuery) {
const preAggregationForQuery = this.findPreAggregationForQuery();
// Check usageInfos after findPreAggregationForQuery (which may populate them)
if (this.preAggregationUsageInfos && this.preAggregationUsageInfos.length > 0) {
return this.preAggregationDescriptionsForUsageInfos(this.preAggregationUsageInfos);
}
if (preAggregationForQuery) {
return this.preAggregationDescriptionsFor(preAggregationForQuery);
}
Expand Down Expand Up @@ -165,6 +182,48 @@ export class PreAggregations {
return join.joins.map(j => j.originalTo).concat([join.root]);
}

private preAggregationDescriptionsForUsageInfos(usageInfos: PreAggregationUsageInfo[]): FullPreAggregationDescription[] {
return usageInfos.flatMap(usageInfo => {
const preAggObj = this.getRollupPreAggregationByName(usageInfo.cubeName, usageInfo.preAggregationName);
if (!preAggObj || !('preAggregationName' in preAggObj)) {
return [];
}
const foundPreAgg = preAggObj as PreAggregationForQuery;

// One description per physical pre-aggregation, with usageMapping attached
const descriptions = this.preAggregationDescriptionsFor(foundPreAgg);

// Compute the union of all usage date ranges so that partitions cover
// every usage (e.g. time_shift may require earlier partitions).
const mergedDateRange = PreAggregations.mergeUsageDateRanges(usageInfo.usages);

return descriptions.map(desc => ({
...desc,
usageMapping: usageInfo.usages,
...(mergedDateRange && desc.matchedTimeDimensionDateRange ? { matchedTimeDimensionDateRange: mergedDateRange } : {}),
}));
});
}

private static mergeUsageDateRanges(usages: Record<string, UsageDateRangeInfo>): [string, string] | null {
let minDate: string | null = null;
let maxDate: string | null = null;

for (const usage of Object.values(usages)) {
if (usage.dateRange) {
const [from, to] = usage.dateRange;
if (!minDate || from < minDate) {
minDate = from;
}
if (!maxDate || to > maxDate) {
maxDate = to;
}
}
}

return minDate && maxDate ? [minDate, maxDate] : null;
}

private preAggregationDescriptionsFor(foundPreAggregation: PreAggregationForQuery): FullPreAggregationDescription[] {
let preAggregations: PreAggregationForQuery[] = [foundPreAggregation];
if (foundPreAggregation.preAggregation.type === 'rollupJoin') {
Expand Down
Loading
Loading