From 1c56188b4d1fb45fbb1ac206a7943e11ba6a5eae Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Wed, 8 Apr 2026 18:25:01 +0200 Subject: [PATCH 01/24] feat(tesseract): Support separate pre-aggregations for different multi-stage subqueries --- .../src/adapter/BaseQuery.js | 21 +- .../src/adapter/PreAggregations.ts | 26 ++ .../compiled_pre_aggregation.rs | 1 + .../optimizers/pre_aggregation/optimizer.rs | 252 ++++++++++++++---- .../pre_aggregations_compiler.rs | 1 + .../src/logical_plan/pre_aggregation.rs | 6 + .../processors/pre_aggregation.rs | 6 +- .../cubesqlplanner/src/planner/base_query.rs | 63 +++-- .../src/planner/top_level_planner.rs | 25 +- .../test_fixtures/test_utils/test_context.rs | 9 +- 10 files changed, 309 insertions(+), 101 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 186116124c423..fb850e3bc7c88 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -959,10 +959,16 @@ export class BaseQuery { try { const buildResult = nativeBuildSqlAndParams(queryParams); - const [query, params, preAggregation] = buildResult; + const [query, params, preAggregationUsages] = buildResult; const paramsArray = [...params]; - if (preAggregation) { - this.preAggregations.preAggregationForQuery = preAggregation; + if (preAggregationUsages && Array.isArray(preAggregationUsages) && preAggregationUsages.length > 0) { + this.preAggregations.preAggregationUsages = preAggregationUsages; + // Backward compat: set preAggregationForQuery from first usage + const first = preAggregationUsages[0]; + this.preAggregations.preAggregationForQuery = this.getPreAggregationByName( + first.cubeName, + first.preAggregationName, + ); } return [query, paramsArray]; } catch (e) { @@ -1009,8 +1015,13 @@ export class BaseQuery { const buildResult = nativeBuildSqlAndParams(queryParams); - const [, , preAggregation] = buildResult; - return preAggregation; + const [, , preAggregationUsages] = buildResult; + if (preAggregationUsages && Array.isArray(preAggregationUsages) && preAggregationUsages.length > 0) { + this.preAggregations.preAggregationUsages = preAggregationUsages; + const first = preAggregationUsages[0]; + return this.getPreAggregationByName(first.cubeName, first.preAggregationName); + } + return preAggregationUsages; } allCubeMembers(path) { diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts index 16899de004b31..c766795e563fb 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts @@ -107,6 +107,8 @@ export class PreAggregations { public preAggregationForQuery: PreAggregationForQuery | undefined = undefined; + public preAggregationUsages: any[] | undefined = undefined; + public constructor(query: BaseQuery, historyQueries, cubeLatticeCache) { this.query = query; this.historyQueries = historyQueries; @@ -136,6 +138,10 @@ export class PreAggregations { private preAggregationsDescriptionLocal(): FullPreAggregationDescription[] { const isInPreAggregationQuery = this.query.options.preAggregationQuery; if (!isInPreAggregationQuery) { + // Tesseract multi-usage path: generate per-usage descriptions with unique placeholders + if (this.preAggregationUsages && this.preAggregationUsages.length > 0) { + return this.preAggregationDescriptionsForUsages(this.preAggregationUsages); + } const preAggregationForQuery = this.findPreAggregationForQuery(); if (preAggregationForQuery) { return this.preAggregationDescriptionsFor(preAggregationForQuery); @@ -165,6 +171,26 @@ export class PreAggregations { return join.joins.map(j => j.originalTo).concat([join.root]); } + private preAggregationDescriptionsForUsages(usages: any[]): FullPreAggregationDescription[] { + return usages.flatMap(usage => { + const preAggObj = this.getRollupPreAggregationByName(usage.cubeName, usage.preAggregationName); + if (!preAggObj || !('preAggregationName' in preAggObj)) { + return []; + } + const foundPreAgg = preAggObj as PreAggregationForQuery; + + // Generate description via the standard path + const descriptions = this.preAggregationDescriptionsFor(foundPreAgg); + + // Override tableName and matchedTimeDimensionDateRange for each description + return descriptions.map(desc => ({ + ...desc, + tableName: usage.placeholder, + matchedTimeDimensionDateRange: usage.dateRange || desc.matchedTimeDimensionDateRange, + })); + }); + } + private preAggregationDescriptionsFor(foundPreAggregation: PreAggregationForQuery): FullPreAggregationDescription[] { let preAggregations: PreAggregationForQuery[] = [foundPreAggregation]; if (foundPreAggregation.preAggregation.type === 'rollupJoin') { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs index dd8a0bffc962f..1fb2ff414b451 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/compiled_pre_aggregation.rs @@ -29,6 +29,7 @@ pub struct PreAggregationTable { pub cube_alias: String, pub name: String, pub alias: Option, + pub usage_index: Option, } #[derive(Clone, Debug)] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index f0a2bd5b54066..3c0299760408a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -3,18 +3,40 @@ use super::*; use crate::logical_plan::visitor::{LogicalPlanRewriter, NodeRewriteResult}; use crate::logical_plan::*; use crate::plan::FilterItem; +use crate::planner::filter::FilterOperator; use crate::planner::join_hints::JoinHints; use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::MemberSymbol; use cubenativeutils::CubeError; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::rc::Rc; +pub struct PreAggregationUsage { + pub index: usize, + pub pre_aggregation: Rc, + pub date_range: Option<(String, String)>, +} + +impl PreAggregationUsage { + pub fn name(&self) -> &String { + self.pre_aggregation.name() + } + + pub fn cube_name(&self) -> &String { + self.pre_aggregation.cube_name() + } + + pub fn external(&self) -> bool { + self.pre_aggregation.external() + } +} + pub struct PreAggregationOptimizer { query_tools: Rc, allow_multi_stage: bool, - used_pre_aggregations: HashMap<(String, String), Rc>, + usages: Vec, + usage_counter: usize, } impl PreAggregationOptimizer { @@ -22,7 +44,8 @@ impl PreAggregationOptimizer { Self { query_tools, allow_multi_stage, - used_pre_aggregations: HashMap::new(), + usages: Vec::new(), + usage_counter: 0, } } @@ -38,14 +61,27 @@ impl PreAggregationOptimizer { let compiled_pre_aggregations = compiler.compile_all_pre_aggregations(disable_external_pre_aggregations)?; - for pre_aggregation in compiled_pre_aggregations.iter() { - if let Some(id) = pre_aggregation_id { - let full_name = format!("{}.{}", pre_aggregation.cube_name, pre_aggregation.name); - if full_name != id { - continue; - } - } - let new_query = self.try_rewrite_query(plan.clone(), pre_aggregation)?; + let filtered_pre_aggregations: Vec<_> = if let Some(id) = pre_aggregation_id { + compiled_pre_aggregations + .iter() + .filter(|pa| format!("{}.{}", pa.cube_name, pa.name) == id) + .cloned() + .collect() + } else { + compiled_pre_aggregations + }; + + if !plan.multistage_members().is_empty() && self.allow_multi_stage { + return self + .try_rewrite_query_with_multistages(&plan, &filtered_pre_aggregations); + } + + for pre_aggregation in filtered_pre_aggregations.iter() { + let new_query = self.try_rewrite_simple_query( + &plan, + pre_aggregation, + None, + )?; if new_query.is_some() { return Ok(new_query); } @@ -54,35 +90,26 @@ impl PreAggregationOptimizer { Ok(None) } - pub fn get_used_pre_aggregations(&self) -> Vec> { - self.used_pre_aggregations.values().cloned().collect() + pub fn get_usages(&self) -> &Vec { + &self.usages } - fn try_rewrite_query( - &mut self, - query: Rc, - pre_aggregation: &Rc, - ) -> Result>, CubeError> { - if query.multistage_members().is_empty() { - self.try_rewrite_simple_query(&query, pre_aggregation) - } else if !self.allow_multi_stage { - Ok(None) - } else { - self.try_rewrite_query_with_multistages(&query, pre_aggregation) - } + pub fn take_usages(&mut self) -> Vec { + std::mem::take(&mut self.usages) } fn try_rewrite_simple_query( &mut self, query: &Rc, pre_aggregation: &Rc, + date_range: Option<(String, String)>, ) -> Result>, CubeError> { if let Some(matched_measures) = self.is_schema_and_filters_match(&query.schema(), &query.filter(), pre_aggregation)? { let mut new_query = query.as_ref().clone(); new_query.set_source( - self.make_pre_aggregation_source(pre_aggregation, &matched_measures)? + self.make_pre_aggregation_source(pre_aggregation, &matched_measures, date_range)? .into(), ); Ok(Some(Rc::new(new_query))) @@ -91,22 +118,51 @@ impl PreAggregationOptimizer { } } + fn try_rewrite_leaf_query( + &mut self, + query: Rc, + compiled_pre_aggregations: &[Rc], + ) -> Result>, CubeError> { + let date_range = Self::extract_date_range(&query.filter()); + + if !query.multistage_members().is_empty() { + // Nested multi-stage: recurse with full list + return self.try_rewrite_query_with_multistages(&query, compiled_pre_aggregations); + } + + for pre_aggregation in compiled_pre_aggregations.iter() { + let result = self.try_rewrite_simple_query( + &query, + pre_aggregation, + date_range.clone(), + )?; + if result.is_some() { + return Ok(result); + } + } + Ok(None) + } + fn try_rewrite_query_with_multistages( &mut self, query: &Rc, - pre_aggregation: &Rc, + compiled_pre_aggregations: &[Rc], ) -> Result>, CubeError> { let rewriter = LogicalPlanRewriter::new(); let mut has_unrewritten_leaf = false; + // Save state in case we need to rollback + let saved_usages_len = self.usages.len(); + let saved_counter = self.usage_counter; + let mut rewritten_multistages = Vec::new(); for multi_stage in query.multistage_members() { let rewritten = rewriter.rewrite_top_down_with(multi_stage.clone(), |plan_node| { let res = match plan_node { PlanNode::MultiStageLeafMeasure(multi_stage_leaf_measure) => { - if let Some(rewritten) = self.try_rewrite_query( + if let Some(rewritten) = self.try_rewrite_leaf_query( multi_stage_leaf_measure.query.clone(), - pre_aggregation, + compiled_pre_aggregations, )? { let new_leaf = Rc::new(MultiStageLeafMeasure { measure: multi_stage_leaf_measure.measure.clone(), @@ -134,6 +190,9 @@ impl PreAggregationOptimizer { } if has_unrewritten_leaf { + // Rollback usages added during failed attempt + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; return Ok(None); } @@ -145,31 +204,47 @@ impl PreAggregationOptimizer { resolver_multiplied_measures, ) = resolver_multiplied_measures { - if let Some(matched_measures) = self.is_schema_and_filters_match( - &resolver_multiplied_measures.schema, - &resolver_multiplied_measures.filter, - &pre_aggregation, - )? { - let pre_aggregation_source = - self.make_pre_aggregation_source(pre_aggregation, &matched_measures)?; - - let pre_aggregation_query = Query::builder() - .schema(resolver_multiplied_measures.schema.clone()) - .filter(resolver_multiplied_measures.filter.clone()) - .modifers(Rc::new(LogicalQueryModifiers { - offset: None, - limit: None, - ungrouped: false, - order_by: vec![], - })) - .source(pre_aggregation_source.into()) - .build(); - Some(ResolvedMultipliedMeasures::PreAggregation(Rc::new( - pre_aggregation_query, - ))) - } else { + // Try each pre-aggregation for the multiplied measures resolver + let mut result_source = None; + for pre_aggregation in compiled_pre_aggregations.iter() { + if let Some(matched_measures) = self.is_schema_and_filters_match( + &resolver_multiplied_measures.schema, + &resolver_multiplied_measures.filter, + pre_aggregation, + )? { + let date_range = Self::extract_date_range( + &resolver_multiplied_measures.filter, + ); + let pre_aggregation_source = self.make_pre_aggregation_source( + pre_aggregation, + &matched_measures, + date_range, + )?; + + let pre_aggregation_query = Query::builder() + .schema(resolver_multiplied_measures.schema.clone()) + .filter(resolver_multiplied_measures.filter.clone()) + .modifers(Rc::new(LogicalQueryModifiers { + offset: None, + limit: None, + ungrouped: false, + order_by: vec![], + })) + .source(pre_aggregation_source.into()) + .build(); + result_source = Some(ResolvedMultipliedMeasures::PreAggregation( + Rc::new(pre_aggregation_query), + )); + break; + } + } + if result_source.is_none() { + // Rollback + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; return Ok(None); } + result_source } else { Some(resolver_multiplied_measures.clone()) } @@ -202,7 +277,11 @@ impl PreAggregationOptimizer { &mut self, pre_aggregation: &Rc, matched_measures: &HashSet, + date_range: Option<(String, String)>, ) -> Result, CubeError> { + let usage_index = self.usage_counter; + self.usage_counter += 1; + let filtered_measures: Vec> = pre_aggregation .measures .iter() @@ -221,13 +300,17 @@ impl PreAggregationOptimizer { measures: filtered_measures.clone(), multiplied_measures: HashSet::new(), }; + + // Set usage_index on the source table so the physical plan can generate unique placeholders + let source = Self::source_with_usage_index(&pre_aggregation.source, usage_index); + // Measures are filtered to only those actually consumed during matching. // This prevents calculated measures (e.g. amount_per_count) from getting a // direct column reference when they should be decomposed to base measures. // Dimensions are intentionally NOT filtered: unlike measures (where // sum(precomputed_ratio) != sum(a)/sum(b)), extra dimension references // are harmless — they're simply unused if the query doesn't select them. - let pre_aggregation = PreAggregation::builder() + let pre_aggregation_node = PreAggregation::builder() .name(pre_aggregation.name.clone()) .time_dimensions(pre_aggregation.time_dimensions.clone()) .dimensions(pre_aggregation.dimensions.clone()) @@ -236,17 +319,68 @@ impl PreAggregationOptimizer { .schema(Rc::new(schema)) .external(pre_aggregation.external.unwrap_or_default()) .granularity(pre_aggregation.granularity.clone()) - .source(pre_aggregation.source.clone()) + .source(source) .cube_name(pre_aggregation.cube_name.clone()) + .usage_index(Some(usage_index)) .build(); - let result = Rc::new(pre_aggregation); - self.used_pre_aggregations.insert( - (result.cube_name().clone(), result.name().clone()), - result.clone(), - ); + let result = Rc::new(pre_aggregation_node); + + self.usages.push(PreAggregationUsage { + index: usage_index, + pre_aggregation: result.clone(), + date_range, + }); + Ok(result) } + fn source_with_usage_index( + source: &Rc, + usage_index: usize, + ) -> Rc { + match source.as_ref() { + PreAggregationSource::Single(table) => { + Rc::new(PreAggregationSource::Single(PreAggregationTable { + usage_index: Some(usage_index), + ..table.clone() + })) + } + PreAggregationSource::Union(union) => { + let items = union + .items + .iter() + .map(|t| { + Rc::new(PreAggregationTable { + usage_index: Some(usage_index), + ..t.as_ref().clone() + }) + }) + .collect(); + Rc::new(PreAggregationSource::Union(PreAggregationUnion { items })) + } + PreAggregationSource::Join(_) => { + // Join pre-aggregations: usage_index is set on the PreAggregation node itself + source.clone() + } + } + } + + fn extract_date_range(filter: &LogicalFilter) -> Option<(String, String)> { + for item in &filter.time_dimensions_filters { + if let FilterItem::Item(base_filter) = item { + if *base_filter.filter_operator() == FilterOperator::InDateRange { + let values = base_filter.values(); + if values.len() >= 2 { + if let (Some(from), Some(to)) = (&values[0], &values[1]) { + return Some((from.clone(), to.clone())); + } + } + } + } + } + None + } + fn is_schema_and_filters_match( &self, schema: &Rc, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs index 6b559125faf9d..717dea63bfc27 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/pre_aggregations_compiler.rs @@ -265,6 +265,7 @@ impl PreAggregationsCompiler { cube_alias, name: name.name.clone(), alias: static_data.sql_alias.clone(), + usage_index: None, }) }; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs index b5a35d2c67fdb..3c1c6f0e5a6dc 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/pre_aggregation.rs @@ -22,6 +22,8 @@ pub struct PreAggregation { granularity: Option, source: Rc, cube_name: String, + #[builder(default)] + usage_index: Option, } impl PreAggregation { @@ -64,6 +66,10 @@ impl PreAggregation { pub fn cube_name(&self) -> &String { &self.cube_name } + + pub fn usage_index(&self) -> Option { + self.usage_index + } } impl LogicalNode for PreAggregation { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs index 8e58f3e3976c6..b2558e405dfb8 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/physical_plan_builder/processors/pre_aggregation.rs @@ -26,9 +26,13 @@ impl PreAggregationProcessor<'_> { ) -> Result { let query_tools = self.builder.query_tools(); let name = table.alias.clone().unwrap_or_else(|| table.name.clone()); - let table_name = query_tools + let base_table_name = query_tools .base_tools() .pre_aggregation_table_name(table.cube_name.clone(), name.clone())?; + let table_name = match table.usage_index { + Some(idx) => format!("{}__usage_{}", base_table_name, idx), + None => base_table_name, + }; let alias = PlanSqlTemplates::member_alias_name(&table.cube_name, &name, &None); let res = SingleAliasedSource::new_from_table_reference( table_name, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 92ef1b9a824ba..6cc94814a958c 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -2,15 +2,26 @@ use super::query_tools::QueryTools; use super::top_level_planner::TopLevelPlanner; use super::QueryProperties; use crate::cube_bridge::base_query_options::BaseQueryOptions; -use crate::cube_bridge::pre_aggregation_obj::NativePreAggregationObj; use cubenativeutils::wrappers::inner_types::InnerTypes; use cubenativeutils::wrappers::object::NativeArray; use cubenativeutils::wrappers::serializer::NativeSerialize; use cubenativeutils::wrappers::NativeType; use cubenativeutils::wrappers::{NativeContextHolder, NativeObjectHandle}; use cubenativeutils::CubeError; +use serde::Serialize; use std::rc::Rc; +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct PreAggregationUsageInfo { + cube_name: String, + pre_aggregation_name: String, + placeholder: String, + #[serde(skip_serializing_if = "Option::is_none")] + date_range: Option>, + external: bool, +} + pub struct BaseQuery { context: NativeContextHolder, query_tools: Rc, @@ -59,12 +70,12 @@ impl BaseQuery { self.cubestore_support_multistage, ); - let (sql, used_pre_aggregations) = planner.plan()?; + let (sql, usages) = planner.plan()?; - let is_external = if !used_pre_aggregations.is_empty() { - used_pre_aggregations + let is_external = if !usages.is_empty() { + usages .iter() - .all(|pre_aggregation| pre_aggregation.external()) + .all(|usage| usage.pre_aggregation.external()) } else { false }; @@ -77,20 +88,36 @@ impl BaseQuery { let res = self.context.empty_array()?; res.set(0, result_sql.to_native(self.context.clone())?)?; res.set(1, params.to_native(self.context.clone())?)?; - if let Some(used_pre_aggregation) = used_pre_aggregations.first() { - let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name( - used_pre_aggregation.cube_name().clone(), - used_pre_aggregation.name().clone(), - )?; - res.set( - 2, - pre_aggregation_obj - .as_any() - .downcast::>() - .unwrap() - .to_native(self.context.clone())?, - )?; + + if !usages.is_empty() { + let base_tools = self.query_tools.base_tools(); + let usages_info: Vec = usages + .iter() + .map(|usage| { + let pre_agg = &usage.pre_aggregation; + let name = pre_agg.name().clone(); + let cube_name = pre_agg.cube_name().clone(); + let placeholder = base_tools + .pre_aggregation_table_name(cube_name.clone(), name.clone()) + .map(|base| match usage.index { + idx => format!("{}__usage_{}", base, idx), + }) + .unwrap_or_default(); + PreAggregationUsageInfo { + cube_name, + pre_aggregation_name: name, + placeholder, + date_range: usage + .date_range + .as_ref() + .map(|(from, to)| vec![from.clone(), to.clone()]), + external: pre_agg.external(), + } + }) + .collect(); + res.set(2, usages_info.to_native(self.context.clone())?)?; } + let result = NativeObjectHandle::new(res.into_object()); Ok(result) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs index 3b69cd453491e..9653f6035c436 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs @@ -2,8 +2,8 @@ use super::planners::QueryPlanner; use super::query_tools::QueryTools; use super::QueryProperties; use crate::logical_plan::OriginalSqlCollector; -use crate::logical_plan::PreAggregation; use crate::logical_plan::PreAggregationOptimizer; +use crate::logical_plan::PreAggregationUsage; use crate::logical_plan::Query; use crate::physical_plan_builder::PhysicalPlanBuilder; use cubenativeutils::CubeError; @@ -29,17 +29,16 @@ impl TopLevelPlanner { } } - pub fn plan(&self) -> Result<(String, Vec>), CubeError> { + pub fn plan(&self) -> Result<(String, Vec), CubeError> { let query_planner = QueryPlanner::new(self.request.clone(), self.query_tools.clone()); let logical_plan = query_planner.plan()?; - let (optimized_plan, used_pre_aggregations) = - self.try_pre_aggregations(logical_plan.clone())?; + let (optimized_plan, usages) = self.try_pre_aggregations(logical_plan.clone())?; - let is_external = if !used_pre_aggregations.is_empty() { - used_pre_aggregations + let is_external = if !usages.is_empty() { + usages .iter() - .all(|pre_aggregation| pre_aggregation.external()) + .all(|usage| usage.pre_aggregation.external()) } else { false }; @@ -62,13 +61,13 @@ impl TopLevelPlanner { let sql = physical_plan.to_sql(&templates)?; - Ok((sql, used_pre_aggregations)) + Ok((sql, usages)) } fn try_pre_aggregations( &self, plan: Rc, - ) -> Result<(Rc, Vec>), CubeError> { + ) -> Result<(Rc, Vec), CubeError> { let result = if !self.request.is_pre_aggregation_query() { let mut pre_aggregation_optimizer = PreAggregationOptimizer::new( self.query_tools.clone(), @@ -82,11 +81,9 @@ impl TopLevelPlanner { disable_external_pre_aggregations, pre_aggregation_id, )? { - if pre_aggregation_optimizer.get_used_pre_aggregations().len() == 1 { - ( - result, - pre_aggregation_optimizer.get_used_pre_aggregations(), - ) + let usages = pre_aggregation_optimizer.take_usages(); + if !usages.is_empty() { + (result, usages) } else { (plan.clone(), Vec::new()) } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs index 4af4fadfbaf9e..a2acfcd94d517 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs @@ -1,6 +1,6 @@ use crate::cube_bridge::base_query_options::BaseQueryOptions; use crate::cube_bridge::join_hints::JoinHintItem; -use crate::logical_plan::PreAggregation; +use crate::logical_plan::{PreAggregation, PreAggregationUsage}; #[cfg(feature = "integration-postgres")] use crate::logical_plan::{PreAggregationSource, PreAggregationTable}; use crate::plan::Filter; @@ -382,7 +382,7 @@ impl TestContext { pub fn build_sql_with_used_pre_aggregations( &self, query: &str, - ) -> Result<(String, Vec>), cubenativeutils::CubeError> { + ) -> Result<(String, Vec), cubenativeutils::CubeError> { let options = self.create_query_options_from_yaml(query); let ctx = self.for_options(options.as_ref())?; let request = QueryProperties::try_new(ctx.query_tools.clone(), options)?; @@ -448,9 +448,10 @@ impl TestContext { async fn create_pre_agg_tables( &self, client: &tokio_postgres::Client, - pre_aggregations: &[Rc], + pre_aggregations: &[PreAggregationUsage], ) { - for pre_agg in pre_aggregations { + for usage in pre_aggregations { + let pre_agg = &usage.pre_aggregation; let tables = Self::collect_pre_agg_source_tables(pre_agg.source()); let yaml = Self::build_pre_agg_query_yaml(pre_agg); From 8ad35faeeff1dc5bbc2b1159788826d021f09d1b Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Thu, 9 Apr 2026 17:20:22 +0200 Subject: [PATCH 02/24] in work --- .../src/adapter/BaseQuery.js | 35 ++++++++++------- .../cubesqlplanner/src/planner/base_query.rs | 39 +++++++++++++++---- 2 files changed, 52 insertions(+), 22 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index fb850e3bc7c88..7c776a22ec3e6 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -959,16 +959,16 @@ export class BaseQuery { try { const buildResult = nativeBuildSqlAndParams(queryParams); - const [query, params, preAggregationUsages] = buildResult; + const [query, params, preAggResult] = buildResult; const paramsArray = [...params]; - if (preAggregationUsages && Array.isArray(preAggregationUsages) && preAggregationUsages.length > 0) { - this.preAggregations.preAggregationUsages = preAggregationUsages; - // Backward compat: set preAggregationForQuery from first usage - const first = preAggregationUsages[0]; - this.preAggregations.preAggregationForQuery = this.getPreAggregationByName( - first.cubeName, - first.preAggregationName, - ); + if (preAggResult) { + if (Array.isArray(preAggResult)) { + // Multi-usage format: array of usage info objects + this.preAggregations.preAggregationUsages = preAggResult; + } else { + // Single-usage format: old-style pre-aggregation object + this.preAggregations.preAggregationForQuery = preAggResult; + } } return [query, paramsArray]; } catch (e) { @@ -1015,13 +1015,18 @@ export class BaseQuery { const buildResult = nativeBuildSqlAndParams(queryParams); - const [, , preAggregationUsages] = buildResult; - if (preAggregationUsages && Array.isArray(preAggregationUsages) && preAggregationUsages.length > 0) { - this.preAggregations.preAggregationUsages = preAggregationUsages; - const first = preAggregationUsages[0]; - return this.getPreAggregationByName(first.cubeName, first.preAggregationName); + const [, , preAggResult] = buildResult; + if (preAggResult) { + if (Array.isArray(preAggResult)) { + // Multi-usage format: array of usage info objects + this.preAggregations.preAggregationUsages = preAggResult; + const first = preAggResult[0]; + return this.getPreAggregationByName(first.cubeName, first.preAggregationName); + } + // Single-usage format: old-style pre-aggregation object + return preAggResult; } - return preAggregationUsages; + return undefined; } allCubeMembers(path) { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 6cc94814a958c..8b77cf45f108a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -2,6 +2,7 @@ use super::query_tools::QueryTools; use super::top_level_planner::TopLevelPlanner; use super::QueryProperties; use crate::cube_bridge::base_query_options::BaseQueryOptions; +use crate::cube_bridge::pre_aggregation_obj::NativePreAggregationObj; use cubenativeutils::wrappers::inner_types::InnerTypes; use cubenativeutils::wrappers::object::NativeArray; use cubenativeutils::wrappers::serializer::NativeSerialize; @@ -85,11 +86,24 @@ impl BaseQuery { .query_tools .build_sql_and_params(&sql, true, &templates)?; + let needs_usage_suffix = usages.len() > 1; + + // For single usage, strip __usage_N suffix from SQL to maintain backward compat + let final_sql = if !needs_usage_suffix && usages.len() == 1 { + result_sql.replace( + &format!("__usage_{}", usages[0].index), + "", + ) + } else { + result_sql + }; + let res = self.context.empty_array()?; - res.set(0, result_sql.to_native(self.context.clone())?)?; + res.set(0, final_sql.to_native(self.context.clone())?)?; res.set(1, params.to_native(self.context.clone())?)?; - if !usages.is_empty() { + if needs_usage_suffix { + // Multiple usages: return array of usage info objects (new format) let base_tools = self.query_tools.base_tools(); let usages_info: Vec = usages .iter() @@ -97,16 +111,13 @@ impl BaseQuery { let pre_agg = &usage.pre_aggregation; let name = pre_agg.name().clone(); let cube_name = pre_agg.cube_name().clone(); - let placeholder = base_tools + let base_table = base_tools .pre_aggregation_table_name(cube_name.clone(), name.clone()) - .map(|base| match usage.index { - idx => format!("{}__usage_{}", base, idx), - }) .unwrap_or_default(); PreAggregationUsageInfo { cube_name, pre_aggregation_name: name, - placeholder, + placeholder: format!("{}__usage_{}", base_table, usage.index), date_range: usage .date_range .as_ref() @@ -116,6 +127,20 @@ impl BaseQuery { }) .collect(); res.set(2, usages_info.to_native(self.context.clone())?)?; + } else if let Some(usage) = usages.first() { + // Single usage: return old-style pre-aggregation object for backward compat + let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name( + usage.pre_aggregation.cube_name().clone(), + usage.pre_aggregation.name().clone(), + )?; + res.set( + 2, + pre_aggregation_obj + .as_any() + .downcast::>() + .unwrap() + .to_native(self.context.clone())?, + )?; } let result = NativeObjectHandle::new(res.into_object()); From ddbd7a2563a295bb54eaa31255fec30249ce1334 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Thu, 9 Apr 2026 17:47:25 +0200 Subject: [PATCH 03/24] in work --- .../PreAggregationPartitionRangeLoader.ts | 43 ++++++++- .../src/orchestrator/PreAggregations.ts | 2 + .../src/orchestrator/QueryCache.ts | 17 ++-- .../src/adapter/BaseQuery.js | 15 ++-- .../src/adapter/PreAggregations.ts | 22 +++-- .../cubesqlplanner/src/planner/base_query.rs | 90 +++++++++++-------- 6 files changed, 129 insertions(+), 60 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index f36c7f6d15f8e..6498ea2eea5d2 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -297,14 +297,55 @@ export class PreAggregationPartitionRangeLoader { .map(targetTableName => `SELECT * FROM ${targetTableName}${emptyResult ? ' WHERE 1 = 0' : ''}`) .join(' UNION ALL '); + const baseTargetTableName = allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`; + + // Build per-usage target table names if usageMapping is present + let usageTargetTableNames: Record | undefined; + if (this.preAggregation.usageMapping) { + usageTargetTableNames = {}; + for (const [suffix, usageInfo] of Object.entries(this.preAggregation.usageMapping)) { + if (usageInfo.dateRange && this.preAggregation.partitionGranularity) { + // Load partition ranges specific to this usage's dateRange + const usageDateRange = PreAggregationPartitionRangeLoader.intersectDateRanges( + [loadResults[0]?.buildRangeEnd ? loadResults[0].partitionRange?.[0] : null, loadResults[loadResults.length - 1]?.buildRangeEnd || null] as QueryDateRange, + usageInfo.dateRange as QueryDateRange, + ); + if (usageDateRange) { + const usagePartitions = loadResults.filter(r => { + if (!r.partitionRange) return true; + const [pStart, pEnd] = r.partitionRange; + const [uStart, uEnd] = usageDateRange; + return pEnd >= uStart && pStart <= uEnd; + }); + const usageTableNames = usagePartitions.map(r => r.targetTableName); + if (usageTableNames.length === 1) { + usageTargetTableNames[suffix] = usageTableNames[0]; + } else if (usageTableNames.length > 0) { + const usageUnion = usageTableNames + .map(t => `SELECT * FROM ${t}`) + .join(' UNION ALL '); + usageTargetTableNames[suffix] = `(${usageUnion})`; + } else { + usageTargetTableNames[suffix] = baseTargetTableName; + } + } else { + usageTargetTableNames[suffix] = baseTargetTableName; + } + } else { + usageTargetTableNames[suffix] = baseTargetTableName; + } + } + } + return { - targetTableName: allTableTargetNames.length === 1 && !emptyResult ? allTableTargetNames[0] : `(${unionTargetTableName})`, + targetTableName: baseTargetTableName, refreshKeyValues: loadResults.map(t => t.refreshKeyValues), lastUpdatedAt, buildRangeEnd: !emptyResult && loadResults.length && loadResults[loadResults.length - 1].buildRangeEnd, lambdaTable, rollupLambdaId: this.preAggregation.rollupLambdaId, isMultiTableUnion: allTableTargetNames.length > 1, + usageTargetTableNames, }; } else { return new PreAggregationLoader( diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 7d00e5fe1e91b..a0f9d48660464 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -149,6 +149,7 @@ export type LoadPreAggregationResult = { rollupLambdaId?: string; partitionRange?: QueryDateRange; isMultiTableUnion?: boolean; + usageTargetTableNames?: Record; }; export type PreAggregationTableToTempTable = [string, LoadPreAggregationResult]; @@ -192,6 +193,7 @@ export type PreAggregationDescription = { sealAt?: string; rollupLambdaId?: string; lastRollupLambda?: boolean; + usageMapping?: Record; }; export const tablesToVersionEntries = (schema, tables: TableCacheEntry[]): VersionEntry[] => R.sortBy( diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index c4919daa1bb6e..d2601661c3b4c 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -115,7 +115,7 @@ export type PreAggTableToTempTable = [ TempTable, ]; -export type PreAggTableToTempTableNames = [string, { targetTableName: string; }]; +export type PreAggTableToTempTableNames = [string, { targetTableName: string; usageTargetTableNames?: Record; }]; export type CacheKeyItem = string | string[] | QueryWithParams | QueryWithParams[] | undefined; @@ -428,10 +428,17 @@ export class QueryCache { const [keyQuery, params, queryOptions] = Array.isArray(queryAndParams) ? queryAndParams : [queryAndParams, []]; - const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( - (query, [tableName, { targetTableName }]) => ( - QueryCache.replaceAll(tableName, targetTableName, query) - ), + let replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( + (query, [tableName, { targetTableName, usageTargetTableNames }]) => { + // First replace usage-specific placeholders (e.g. tableName__usage_0) + if (usageTargetTableNames) { + for (const [suffix, usageTargetName] of Object.entries(usageTargetTableNames)) { + query = QueryCache.replaceAll(`${tableName}${suffix}`, usageTargetName, query); + } + } + // Then replace base table name for any remaining references + return QueryCache.replaceAll(tableName, targetTableName, query); + }, keyQuery ); return Array.isArray(queryAndParams) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 7c776a22ec3e6..210664670d5f5 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -963,10 +963,13 @@ export class BaseQuery { const paramsArray = [...params]; if (preAggResult) { if (Array.isArray(preAggResult)) { - // Multi-usage format: array of usage info objects - this.preAggregations.preAggregationUsages = preAggResult; + // Grouped usage info objects (multiple usages) + this.preAggregations.preAggregationUsageInfos = preAggResult; + const first = preAggResult[0]; + this.preAggregations.preAggregationForQuery = + this.getPreAggregationByName(first.cubeName, first.preAggregationName); } else { - // Single-usage format: old-style pre-aggregation object + // Single-usage: old-style pre-aggregation object this.preAggregations.preAggregationForQuery = preAggResult; } } @@ -1018,12 +1021,12 @@ export class BaseQuery { const [, , preAggResult] = buildResult; if (preAggResult) { if (Array.isArray(preAggResult)) { - // Multi-usage format: array of usage info objects - this.preAggregations.preAggregationUsages = preAggResult; + // Grouped usage info objects (multiple usages) + this.preAggregations.preAggregationUsageInfos = preAggResult; const first = preAggResult[0]; return this.getPreAggregationByName(first.cubeName, first.preAggregationName); } - // Single-usage format: old-style pre-aggregation object + // Single-usage: old-style pre-aggregation object return preAggResult; } return undefined; diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts index c766795e563fb..ce5cf96c5f332 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts @@ -107,7 +107,7 @@ export class PreAggregations { public preAggregationForQuery: PreAggregationForQuery | undefined = undefined; - public preAggregationUsages: any[] | undefined = undefined; + public preAggregationUsageInfos: any[] | undefined = undefined; public constructor(query: BaseQuery, historyQueries, cubeLatticeCache) { this.query = query; @@ -138,11 +138,11 @@ export class PreAggregations { private preAggregationsDescriptionLocal(): FullPreAggregationDescription[] { const isInPreAggregationQuery = this.query.options.preAggregationQuery; if (!isInPreAggregationQuery) { - // Tesseract multi-usage path: generate per-usage descriptions with unique placeholders - if (this.preAggregationUsages && this.preAggregationUsages.length > 0) { - return this.preAggregationDescriptionsForUsages(this.preAggregationUsages); - } const preAggregationForQuery = this.findPreAggregationForQuery(); + // Check usageInfos after findPreAggregationForQuery (which may populate them) + if (this.preAggregationUsageInfos && this.preAggregationUsageInfos.length > 0) { + return this.preAggregationDescriptionsForUsageInfos(this.preAggregationUsageInfos); + } if (preAggregationForQuery) { return this.preAggregationDescriptionsFor(preAggregationForQuery); } @@ -171,22 +171,20 @@ export class PreAggregations { return join.joins.map(j => j.originalTo).concat([join.root]); } - private preAggregationDescriptionsForUsages(usages: any[]): FullPreAggregationDescription[] { - return usages.flatMap(usage => { - const preAggObj = this.getRollupPreAggregationByName(usage.cubeName, usage.preAggregationName); + private preAggregationDescriptionsForUsageInfos(usageInfos: any[]): FullPreAggregationDescription[] { + return usageInfos.flatMap(usageInfo => { + const preAggObj = this.getRollupPreAggregationByName(usageInfo.cubeName, usageInfo.preAggregationName); if (!preAggObj || !('preAggregationName' in preAggObj)) { return []; } const foundPreAgg = preAggObj as PreAggregationForQuery; - // Generate description via the standard path + // One description per physical pre-aggregation, with usageMapping attached const descriptions = this.preAggregationDescriptionsFor(foundPreAgg); - // Override tableName and matchedTimeDimensionDateRange for each description return descriptions.map(desc => ({ ...desc, - tableName: usage.placeholder, - matchedTimeDimensionDateRange: usage.dateRange || desc.matchedTimeDimensionDateRange, + usageMapping: usageInfo.usages, // { "__usage_0": { dateRange }, "__usage_1": { dateRange } } })); }); } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 8b77cf45f108a..91e22a637da1e 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -3,6 +3,7 @@ use super::top_level_planner::TopLevelPlanner; use super::QueryProperties; use crate::cube_bridge::base_query_options::BaseQueryOptions; use crate::cube_bridge::pre_aggregation_obj::NativePreAggregationObj; +use crate::logical_plan::PreAggregationUsage; use cubenativeutils::wrappers::inner_types::InnerTypes; use cubenativeutils::wrappers::object::NativeArray; use cubenativeutils::wrappers::serializer::NativeSerialize; @@ -10,17 +11,23 @@ use cubenativeutils::wrappers::NativeType; use cubenativeutils::wrappers::{NativeContextHolder, NativeObjectHandle}; use cubenativeutils::CubeError; use serde::Serialize; +use std::collections::HashMap; use std::rc::Rc; #[derive(Serialize)] #[serde(rename_all = "camelCase")] -struct PreAggregationUsageInfo { - cube_name: String, - pre_aggregation_name: String, - placeholder: String, +struct UsageDateRange { #[serde(skip_serializing_if = "Option::is_none")] date_range: Option>, +} + +#[derive(Serialize)] +#[serde(rename_all = "camelCase")] +struct GroupedPreAggregationInfo { + cube_name: String, + pre_aggregation_name: String, external: bool, + usages: HashMap, } pub struct BaseQuery { @@ -86,14 +93,11 @@ impl BaseQuery { .query_tools .build_sql_and_params(&sql, true, &templates)?; - let needs_usage_suffix = usages.len() > 1; + let has_multiple_usages = usages.len() > 1; // For single usage, strip __usage_N suffix from SQL to maintain backward compat - let final_sql = if !needs_usage_suffix && usages.len() == 1 { - result_sql.replace( - &format!("__usage_{}", usages[0].index), - "", - ) + let final_sql = if !has_multiple_usages && usages.len() == 1 { + result_sql.replace(&format!("__usage_{}", usages[0].index), "") } else { result_sql }; @@ -102,31 +106,10 @@ impl BaseQuery { res.set(0, final_sql.to_native(self.context.clone())?)?; res.set(1, params.to_native(self.context.clone())?)?; - if needs_usage_suffix { - // Multiple usages: return array of usage info objects (new format) - let base_tools = self.query_tools.base_tools(); - let usages_info: Vec = usages - .iter() - .map(|usage| { - let pre_agg = &usage.pre_aggregation; - let name = pre_agg.name().clone(); - let cube_name = pre_agg.cube_name().clone(); - let base_table = base_tools - .pre_aggregation_table_name(cube_name.clone(), name.clone()) - .unwrap_or_default(); - PreAggregationUsageInfo { - cube_name, - pre_aggregation_name: name, - placeholder: format!("{}__usage_{}", base_table, usage.index), - date_range: usage - .date_range - .as_ref() - .map(|(from, to)| vec![from.clone(), to.clone()]), - external: pre_agg.external(), - } - }) - .collect(); - res.set(2, usages_info.to_native(self.context.clone())?)?; + if has_multiple_usages { + // Multiple usages: group by (cubeName, name), return array of grouped infos + let grouped = Self::group_usages(&usages, &self.query_tools); + res.set(2, grouped.to_native(self.context.clone())?)?; } else if let Some(usage) = usages.first() { // Single usage: return old-style pre-aggregation object for backward compat let pre_aggregation_obj = self.query_tools.base_tools().get_pre_aggregation_by_name( @@ -144,7 +127,42 @@ impl BaseQuery { } let result = NativeObjectHandle::new(res.into_object()); - Ok(result) } + + fn group_usages( + usages: &[PreAggregationUsage], + query_tools: &Rc, + ) -> Vec { + let base_tools = query_tools.base_tools(); + let mut groups: HashMap<(String, String), GroupedPreAggregationInfo> = HashMap::new(); + + for usage in usages { + let pre_agg = &usage.pre_aggregation; + let cube_name = pre_agg.cube_name().clone(); + let name = pre_agg.name().clone(); + let key = (cube_name.clone(), name.clone()); + + let suffix = format!("__usage_{}", usage.index); + + let group = groups.entry(key).or_insert_with(|| GroupedPreAggregationInfo { + cube_name, + pre_aggregation_name: name, + external: pre_agg.external(), + usages: HashMap::new(), + }); + + group.usages.insert( + suffix, + UsageDateRange { + date_range: usage + .date_range + .as_ref() + .map(|(from, to)| vec![from.clone(), to.clone()]), + }, + ); + } + + groups.into_values().collect() + } } From d92be682777cd6e4963810373d0cabb1c18c3fd3 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Thu, 9 Apr 2026 18:49:30 +0200 Subject: [PATCH 04/24] in work --- .../src/adapter/PreAggregations.ts | 15 +++++++++-- .../test/integration/utils/BaseDbRunner.ts | 10 ++++++++ .../optimizers/pre_aggregation/optimizer.rs | 25 ++++++++++++------- .../cubesqlplanner/src/planner/base_query.rs | 8 ++---- .../src/planner/filter/base_filter.rs | 4 +++ .../cubesqlplanner/src/planner/filter/mod.rs | 5 ++-- .../planner/filter/operators/date_range.rs | 7 ++++++ .../src/planner/filter/typed_filter.rs | 4 +++ 8 files changed, 59 insertions(+), 19 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts index ce5cf96c5f332..07e67e2ac476a 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts @@ -94,6 +94,17 @@ export type FullPreAggregationDescription = any; */ export type TransformedQuery = any; +export type UsageDateRangeInfo = { + dateRange?: [string, string]; +}; + +export type PreAggregationUsageInfo = { + cubeName: string; + preAggregationName: string; + external: boolean; + usages: Record; +}; + export class PreAggregations { private readonly query: BaseQuery; @@ -107,7 +118,7 @@ export class PreAggregations { public preAggregationForQuery: PreAggregationForQuery | undefined = undefined; - public preAggregationUsageInfos: any[] | undefined = undefined; + public preAggregationUsageInfos: PreAggregationUsageInfo[] | undefined = undefined; public constructor(query: BaseQuery, historyQueries, cubeLatticeCache) { this.query = query; @@ -171,7 +182,7 @@ export class PreAggregations { return join.joins.map(j => j.originalTo).concat([join.root]); } - private preAggregationDescriptionsForUsageInfos(usageInfos: any[]): FullPreAggregationDescription[] { + private preAggregationDescriptionsForUsageInfos(usageInfos: PreAggregationUsageInfo[]): FullPreAggregationDescription[] { return usageInfos.flatMap(usageInfo => { const preAggObj = this.getRollupPreAggregationByName(usageInfo.cubeName, usageInfo.preAggregationName); if (!preAggObj || !('preAggregationName' in preAggObj)) { diff --git a/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts b/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts index 80f3259ef4bbb..3eb7753699a79 100644 --- a/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts +++ b/packages/cubejs-schema-compiler/test/integration/utils/BaseDbRunner.ts @@ -95,6 +95,16 @@ export class BaseDbRunner { range => `SELECT * FROM ${PreAggregationPartitionRangeLoader.partitionTableName(desc.tableName, desc.partitionGranularity, range)}_${suffix}` ).join(' UNION ALL '); const targetTableName = desc.dateRange ? `(${partitionUnion})` : `${desc.tableName}_${suffix}`; + // Replace usage-suffixed placeholders first (e.g. tableName__usage_0) + if (desc.usageMapping) { + for (const usageSuffix of Object.keys(desc.usageMapping)) { + replacedQuery = replacedQuery.replace( + new RegExp(`${desc.tableName}${usageSuffix.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\s+`, 'g'), + `${targetTableName} ` + ); + } + } + // Replace base table name return replacedQuery.replace( new RegExp(`${desc.tableName}\\s+`, 'g'), `${targetTableName} ` diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 3c0299760408a..6f637cbc09ef3 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -3,7 +3,7 @@ use super::*; use crate::logical_plan::visitor::{LogicalPlanRewriter, NodeRewriteResult}; use crate::logical_plan::*; use crate::plan::FilterItem; -use crate::planner::filter::FilterOperator; +use crate::planner::filter::FilterOp; use crate::planner::join_hints::JoinHints; use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; use crate::planner::query_tools::QueryTools; @@ -123,7 +123,7 @@ impl PreAggregationOptimizer { query: Rc, compiled_pre_aggregations: &[Rc], ) -> Result>, CubeError> { - let date_range = Self::extract_date_range(&query.filter()); + let date_range = Self::extract_date_range(&query.filter(), &self.query_tools); if !query.multistage_members().is_empty() { // Nested multi-stage: recurse with full list @@ -214,6 +214,7 @@ impl PreAggregationOptimizer { )? { let date_range = Self::extract_date_range( &resolver_multiplied_measures.filter, + &self.query_tools, ); let pre_aggregation_source = self.make_pre_aggregation_source( pre_aggregation, @@ -365,15 +366,21 @@ impl PreAggregationOptimizer { } } - fn extract_date_range(filter: &LogicalFilter) -> Option<(String, String)> { + fn extract_date_range( + filter: &LogicalFilter, + query_tools: &Rc, + ) -> Option<(String, String)> { + let precision = query_tools + .base_tools() + .driver_tools(false) + .ok() + .and_then(|dt| dt.timestamp_precision().ok()) + .unwrap_or(3); for item in &filter.time_dimensions_filters { if let FilterItem::Item(base_filter) = item { - if *base_filter.filter_operator() == FilterOperator::InDateRange { - let values = base_filter.values(); - if values.len() >= 2 { - if let (Some(from), Some(to)) = (&values[0], &values[1]) { - return Some((from.clone(), to.clone())); - } + if let FilterOp::DateRange(date_range_op) = base_filter.operation() { + if let Ok(formatted) = date_range_op.formatted_date_range(precision) { + return Some(formatted); } } } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 91e22a637da1e..60a54aab65c8b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -108,7 +108,7 @@ impl BaseQuery { if has_multiple_usages { // Multiple usages: group by (cubeName, name), return array of grouped infos - let grouped = Self::group_usages(&usages, &self.query_tools); + let grouped = Self::group_usages(&usages); res.set(2, grouped.to_native(self.context.clone())?)?; } else if let Some(usage) = usages.first() { // Single usage: return old-style pre-aggregation object for backward compat @@ -130,11 +130,7 @@ impl BaseQuery { Ok(result) } - fn group_usages( - usages: &[PreAggregationUsage], - query_tools: &Rc, - ) -> Vec { - let base_tools = query_tools.base_tools(); + fn group_usages(usages: &[PreAggregationUsage]) -> Vec { let mut groups: HashMap<(String, String), GroupedPreAggregationInfo> = HashMap::new(); for usage in usages { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs index 00d5f7941f035..7015af24cdbd2 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/base_filter.rs @@ -105,6 +105,10 @@ impl BaseFilter { self.typed_filter.operator() } + pub fn operation(&self) -> &super::typed_filter::FilterOp { + self.typed_filter.operation() + } + pub fn use_raw_values(&self) -> bool { self.typed_filter.use_raw_values() } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs index 4825c52bb8abc..a567c9df0a5db 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/mod.rs @@ -2,10 +2,11 @@ pub mod base_filter; pub mod base_segment; pub mod compiler; pub mod filter_operator; -mod operators; +pub(crate) mod operators; pub mod typed_filter; pub use base_filter::BaseFilter; pub use base_segment::BaseSegment; pub use filter_operator::FilterOperator; -pub use typed_filter::resolve_base_symbol; +pub use operators::date_range::DateRangeOp; +pub use typed_filter::{resolve_base_symbol, FilterOp}; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs index 9aa395daefae5..93d359a32b0be 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/operators/date_range.rs @@ -1,4 +1,5 @@ use super::{FilterOperationSql, FilterSqlContext}; +use crate::planner::time_dimension::QueryDateTimeHelper; use cubenativeutils::CubeError; #[derive(Clone, Debug)] @@ -18,6 +19,12 @@ impl DateRangeOp { pub fn new(kind: DateRangeKind, from: String, to: String) -> Self { Self { kind, from, to } } + + pub fn formatted_date_range(&self, precision: u32) -> Result<(String, String), CubeError> { + let from = QueryDateTimeHelper::format_from_date(&self.from, precision)?; + let to = QueryDateTimeHelper::format_to_date(&self.to, precision)?; + Ok((from, to)) + } } impl FilterOperationSql for DateRangeOp { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs index eb75f26bdefac..f7d3951a3e84e 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/filter/typed_filter.rs @@ -91,6 +91,10 @@ impl TypedFilter { &self.values } + pub fn operation(&self) -> &FilterOp { + &self.op + } + pub fn use_raw_values(&self) -> bool { self.use_raw_values } From 7b41a181cfe0aef103980565f751426ea52c424d Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Mon, 13 Apr 2026 13:26:13 +0200 Subject: [PATCH 05/24] in work --- .../src/test_fixtures/test_utils/test_context.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs index a2acfcd94d517..8998ae873dbdf 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/test_utils/test_context.rs @@ -1,8 +1,8 @@ use crate::cube_bridge::base_query_options::BaseQueryOptions; use crate::cube_bridge::join_hints::JoinHintItem; -use crate::logical_plan::{PreAggregation, PreAggregationUsage}; +use crate::logical_plan::PreAggregationUsage; #[cfg(feature = "integration-postgres")] -use crate::logical_plan::{PreAggregationSource, PreAggregationTable}; +use crate::logical_plan::{PreAggregation, PreAggregationSource, PreAggregationTable}; use crate::plan::Filter; use crate::planner::filter::base_segment::BaseSegment; use crate::planner::query_tools::QueryTools; @@ -430,6 +430,11 @@ impl TestContext { .build_sql_and_params(&raw_sql, true, &templates) .expect("Failed to build SQL and params"); + // Strip __usage_N suffixes from SQL, same as base_query.rs does for single usage + let sql = pre_aggregations + .iter() + .fold(sql, |s, u| s.replace(&format!("__usage_{}", u.index), "")); + let final_sql = Self::inline_params(&sql, ¶ms); let messages = client.simple_query(&final_sql).await.unwrap_or_else(|e| { From 1ae0190e1732be73126612c2ee9da8ad291d095b Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Mon, 13 Apr 2026 13:43:55 +0200 Subject: [PATCH 06/24] in work --- .../multi_stage_separate_pre_aggs_test.yaml | 50 +++++++++++++++++++ .../multi_stage_separate_pre_aggs_tables.sql | 16 ++++++ .../tests/pre_aggregation_sql_generation.rs | 41 +++++++++++++++ ...lti_stage_separate_pre_aggs_pg_result.snap | 9 ++++ 4 files changed, 116 insertions(+) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml new file mode 100644 index 0000000000000..d8d51f6d929ec --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_separate_pre_aggs_test.yaml @@ -0,0 +1,50 @@ +cubes: + - name: orders + sql: "SELECT * FROM ms_pa_orders" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: status + type: string + sql: status + + measures: + - name: count + type: count + public: false + + - name: revenue + type: sum + sql: amount + public: false + + # Multi-stage: sum of count, reducing by status + - name: count_reduce_status + type: sum + sql: "{CUBE.count}" + multi_stage: true + reduce_by: + - orders.status + + # Multi-stage: sum of revenue, reducing by status + - name: revenue_reduce_status + type: sum + sql: "{CUBE.revenue}" + multi_stage: true + reduce_by: + - orders.status + + pre_aggregations: + - name: count_rollup + measures: + - count + dimensions: + - status + + - name: revenue_rollup + measures: + - revenue + dimensions: + - status diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql new file mode 100644 index 0000000000000..a23a2f07ed100 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_separate_pre_aggs_tables.sql @@ -0,0 +1,16 @@ +DROP TABLE IF EXISTS ms_pa_orders CASCADE; + +CREATE TABLE ms_pa_orders ( + id INTEGER PRIMARY KEY, + status VARCHAR(50) NOT NULL, + amount NUMERIC(10,2) NOT NULL +); + +INSERT INTO ms_pa_orders (id, status, amount) VALUES + (1, 'new', 100.00), + (2, 'new', 200.00), + (3, 'active', 150.00), + (4, 'active', 250.00), + (5, 'active', 350.00), + (6, 'completed', 300.00), + (7, 'completed', 400.00); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs index d31d66f12a402..8d746e46fa61b 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs @@ -834,6 +834,47 @@ async fn test_multi_stage_count_distinct_sum_by_quarter_with_pre_aggregation() { } } +// --- Multi-stage with separate pre-aggregations --- + +#[tokio::test(flavor = "multi_thread")] +async fn test_multi_stage_separate_pre_aggregations() { + let schema = MockSchema::from_yaml_file("common/multi_stage_separate_pre_aggs_test.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + measures: + - orders.count_reduce_status + - orders.revenue_reduce_status + cubestoreSupportMultistage: true + "}; + + let (_sql, pre_aggrs) = ctx + .build_sql_with_used_pre_aggregations(query_yaml) + .unwrap(); + + assert_eq!(pre_aggrs.len(), 2, "Expected 2 pre-aggregation usages"); + + let names: Vec<&str> = pre_aggrs + .iter() + .map(|u| u.pre_aggregation.name().as_str()) + .collect(); + assert!(names.contains(&"count_rollup"), "Expected count_rollup, got {:?}", names); + assert!(names.contains(&"revenue_rollup"), "Expected revenue_rollup, got {:?}", names); + + if let Some(result) = ctx + .try_execute_pg( + query_yaml, + "multi_stage_separate_pre_aggs_tables.sql", + ) + .await + { + insta::assert_snapshot!( + "multi_stage_separate_pre_aggs_pg_result", + result + ); + } +} + // --- rollupJoin with calculated measures through view --- #[test] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap new file mode 100644 index 0000000000000..b08f3c2536a06 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_pg_result.snap @@ -0,0 +1,9 @@ +--- +source: cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +assertion_line: 873 +expression: result +snapshot_kind: text +--- +orders__count_reduce_status | orders__revenue_reduce_status +----------------------------+------------------------------ +7 | 1750.00 From 34703336199952a757d9f1c8c7966ab325fde8a4 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Mon, 13 Apr 2026 14:04:59 +0200 Subject: [PATCH 07/24] in work --- .../optimizers/pre_aggregation/optimizer.rs | 37 +++++++++++ .../multi_stage_pre_agg_time_shift_test.yaml | 57 ++++++++++++++++ .../multi_stage_pre_agg_time_shift_tables.sql | 23 +++++++ .../tests/pre_aggregation_sql_generation.rs | 65 +++++++++++++++++++ ...eparate_pre_aggs_time_shift_pg_result.snap | 11 ++++ 5 files changed, 193 insertions(+) create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql create mode 100644 rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 6f637cbc09ef3..5aada5ccba78f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -6,8 +6,10 @@ use crate::plan::FilterItem; use crate::planner::filter::FilterOp; use crate::planner::join_hints::JoinHints; use crate::planner::multi_fact_join_groups::{MeasuresJoinHints, MultiFactJoinGroups}; +use crate::planner::planners::multi_stage::TimeShiftState; use crate::planner::query_tools::QueryTools; use crate::planner::sql_evaluator::MemberSymbol; +use crate::planner::time_dimension::QueryDateTime; use cubenativeutils::CubeError; use std::collections::HashSet; use std::rc::Rc; @@ -122,8 +124,10 @@ impl PreAggregationOptimizer { &mut self, query: Rc, compiled_pre_aggregations: &[Rc], + time_shifts: &TimeShiftState, ) -> Result>, CubeError> { let date_range = Self::extract_date_range(&query.filter(), &self.query_tools); + let date_range = self.apply_time_shifts_to_date_range(date_range, time_shifts); if !query.multistage_members().is_empty() { // Nested multi-stage: recurse with full list @@ -163,6 +167,7 @@ impl PreAggregationOptimizer { if let Some(rewritten) = self.try_rewrite_leaf_query( multi_stage_leaf_measure.query.clone(), compiled_pre_aggregations, + &multi_stage_leaf_measure.time_shifts, )? { let new_leaf = Rc::new(MultiStageLeafMeasure { measure: multi_stage_leaf_measure.measure.clone(), @@ -388,6 +393,38 @@ impl PreAggregationOptimizer { None } + /// Shift date_range by the negated time_shift interval. + /// The SQL renders `column + interval`, so to get the actual data range + /// we need `date_range - interval`. + fn apply_time_shifts_to_date_range( + &self, + date_range: Option<(String, String)>, + time_shifts: &TimeShiftState, + ) -> Option<(String, String)> { + let (from, to) = date_range?; + if time_shifts.is_empty() { + return Some((from, to)); + } + + // Use the first shift's interval (multi-stage typically has one time dimension) + let interval = time_shifts + .dimensions_shifts + .values() + .find_map(|shift| shift.interval.as_ref())?; + + let tz = self.query_tools.timezone(); + let shifted_from = QueryDateTime::from_date_str(tz, &from) + .and_then(|dt| dt.add_interval(&-interval.clone())) + .map(|dt| dt.default_format()) + .unwrap_or(from); + let shifted_to = QueryDateTime::from_date_str(tz, &to) + .and_then(|dt| dt.add_interval(&-interval.clone())) + .map(|dt| dt.default_format()) + .unwrap_or(to); + + Some((shifted_from, shifted_to)) + } + fn is_schema_and_filters_match( &self, schema: &Rc, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml new file mode 100644 index 0000000000000..5de1a4c0749d7 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/common/multi_stage_pre_agg_time_shift_test.yaml @@ -0,0 +1,57 @@ +cubes: + - name: orders + sql: "SELECT * FROM ms_pa_ts_orders" + dimensions: + - name: id + type: number + sql: id + primary_key: true + - name: status + type: string + sql: status + - name: created_at + type: time + sql: created_at + + measures: + - name: count + type: count + public: false + + - name: revenue + type: sum + sql: amount + public: false + + # Multi-stage: count with time shift (prior 1 month) + - name: count_prev_month + type: number + sql: "{CUBE.count}" + multi_stage: true + time_shift: + - interval: "1 month" + type: prior + timeDimension: orders.created_at + + # Multi-stage: revenue reducing by status + - name: revenue_reduce_status + type: sum + sql: "{CUBE.revenue}" + multi_stage: true + reduce_by: + - orders.status + + pre_aggregations: + - name: count_rollup + measures: + - count + time_dimension: created_at + granularity: day + + - name: revenue_rollup + measures: + - revenue + dimensions: + - status + time_dimension: created_at + granularity: day diff --git a/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql new file mode 100644 index 0000000000000..d1eb5e80465c5 --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/test_fixtures/schemas/yaml_files/seeds/multi_stage_pre_agg_time_shift_tables.sql @@ -0,0 +1,23 @@ +DROP TABLE IF EXISTS ms_pa_ts_orders CASCADE; + +CREATE TABLE ms_pa_ts_orders ( + id INTEGER PRIMARY KEY, + status VARCHAR(50) NOT NULL, + amount NUMERIC(10,2) NOT NULL, + created_at TIMESTAMP NOT NULL +); + +INSERT INTO ms_pa_ts_orders (id, status, amount, created_at) VALUES + -- December 2024 + (1, 'new', 80.00, '2024-12-10'), + (2, 'active', 120.00, '2024-12-20'), + -- January 2025 + (3, 'new', 100.00, '2025-01-15'), + (4, 'active', 150.00, '2025-01-20'), + -- February 2025 + (5, 'new', 200.00, '2025-02-10'), + (6, 'active', 250.00, '2025-02-15'), + (7, 'completed', 300.00, '2025-02-20'), + -- March 2025 + (8, 'completed', 400.00, '2025-03-10'), + (9, 'active', 350.00, '2025-03-25'); diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs index 8d746e46fa61b..135e30116c14f 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs @@ -875,6 +875,71 @@ async fn test_multi_stage_separate_pre_aggregations() { } } +// --- Multi-stage with separate pre-aggregations and time shift --- + +#[tokio::test(flavor = "multi_thread")] +async fn test_multi_stage_separate_pre_aggs_with_time_shift() { + let schema = MockSchema::from_yaml_file("common/multi_stage_pre_agg_time_shift_test.yaml"); + let ctx = TestContext::new(schema).unwrap(); + + let query_yaml = indoc! {" + measures: + - orders.count_prev_month + - orders.revenue_reduce_status + time_dimensions: + - dimension: orders.created_at + granularity: month + dateRange: + - \"2025-01-01\" + - \"2025-03-31\" + cubestoreSupportMultistage: true + "}; + + let (_sql, pre_aggrs) = ctx + .build_sql_with_used_pre_aggregations(query_yaml) + .unwrap(); + + assert_eq!(pre_aggrs.len(), 2, "Expected 2 pre-aggregation usages"); + + // Find usages by pre-aggregation name + let count_usage = pre_aggrs + .iter() + .find(|u| u.pre_aggregation.name() == "count_rollup") + .expect("Expected count_rollup usage"); + let revenue_usage = pre_aggrs + .iter() + .find(|u| u.pre_aggregation.name() == "revenue_rollup") + .expect("Expected revenue_rollup usage"); + + // count_prev_month has time_shift prior 1 month, so date range should be shifted back + // TODO: currently time_shift is not propagated to date_range — fix in optimizer + assert_eq!( + count_usage.date_range, + Some(("2024-12-01T00:00:00.000".to_string(), "2025-02-28T23:59:59.999".to_string())), + "count_rollup should have date range shifted 1 month prior" + ); + + // revenue_reduce_status has no time shift, so original date range + assert_eq!( + revenue_usage.date_range, + Some(("2025-01-01T00:00:00.000".to_string(), "2025-03-31T23:59:59.999".to_string())), + "revenue_rollup should have original date range" + ); + + if let Some(result) = ctx + .try_execute_pg( + query_yaml, + "multi_stage_pre_agg_time_shift_tables.sql", + ) + .await + { + insta::assert_snapshot!( + "multi_stage_separate_pre_aggs_time_shift_pg_result", + result + ); + } +} + // --- rollupJoin with calculated measures through view --- #[test] diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap new file mode 100644 index 0000000000000..a0d46d261d64f --- /dev/null +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/snapshots/cubesqlplanner__tests__pre_aggregation_sql_generation__multi_stage_separate_pre_aggs_time_shift_pg_result.snap @@ -0,0 +1,11 @@ +--- +source: cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +assertion_line: 936 +expression: result +snapshot_kind: text +--- +orders__created_at_month | orders__count_prev_month | orders__revenue_reduce_status +-------------------------+--------------------------+------------------------------ +2025-01-01 00:00:00 | 2 | 250.00 +2025-02-01 00:00:00 | 2 | 750.00 +2025-03-01 00:00:00 | 3 | 750.00 From e25a879990bc5afcbb666935d1f2d997bb753c87 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 12:55:57 +0200 Subject: [PATCH 08/24] in work --- .../pre-aggregations-multi-stage.test.ts | 172 ++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts index 4a476040c0010..9c6f2f467f829 100644 --- a/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts +++ b/packages/cubejs-schema-compiler/test/integration/postgres/pre-aggregations-multi-stage.test.ts @@ -250,6 +250,86 @@ describe('PreAggregationsMultiStage', () => { }, }) + cube('monthly_data', { + sql: \` + SELECT 1 AS id, 10 AS amount, 'a' AS category, '2017-01-10'::TIMESTAMP AS created_at UNION ALL + SELECT 2 AS id, 20 AS amount, 'b' AS category, '2017-01-20'::TIMESTAMP AS created_at UNION ALL + SELECT 4 AS id, 100 AS amount, 'a' AS category, '2017-02-10'::TIMESTAMP AS created_at UNION ALL + SELECT 5 AS id, 100 AS amount, 'b' AS category, '2017-02-20'::TIMESTAMP AS created_at UNION ALL + SELECT 10 AS id, 200 AS amount, 'a' AS category, '2017-03-10'::TIMESTAMP AS created_at UNION ALL + SELECT 20 AS id, 200 AS amount, 'b' AS category, '2017-03-20'::TIMESTAMP AS created_at + \`, + + sqlAlias: 'md', + + dimensions: { + id: { + type: 'number', + sql: 'id', + primaryKey: true + }, + category: { + type: 'string', + sql: 'category' + }, + created_at: { + type: 'time', + sql: 'created_at' + }, + }, + + measures: { + revenue: { + sql: 'amount', + type: 'sum' + }, + count: { + type: 'count' + }, + revenue_per_id: { + multi_stage: true, + sql: \`\${revenue} / \${id}\`, + type: 'sum', + add_group_by: [monthly_data.id], + }, + count_by_category: { + multi_stage: true, + sql: \`\${count}\`, + type: 'sum', + add_group_by: [monthly_data.category], + }, + prev_month_revenue: { + multi_stage: true, + sql: \`\${revenue}\`, + type: 'number', + timeShift: [{ + timeDimension: created_at, + interval: '1 month', + type: 'prior', + }], + }, + }, + + preAggregations: { + revenueById: { + type: 'rollup', + measureReferences: [revenue], + dimensionReferences: [id], + timeDimensionReference: created_at, + granularity: 'day', + partitionGranularity: 'month', + }, + countByCat: { + type: 'rollup', + measureReferences: [revenue, count], + dimensionReferences: [category], + timeDimensionReference: created_at, + granularity: 'day', + partitionGranularity: 'month', + }, + }, + }) + `); if (getEnv('nativeSqlPlanner')) { @@ -388,6 +468,98 @@ describe('PreAggregationsMultiStage', () => { ); }); })); + + it('two multi-stage measures with different pre-aggregations', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'monthly_data.revenue_per_id', + 'monthly_data.count_by_category' + ], + timeDimensions: [{ + dimension: 'monthly_data.created_at', + granularity: 'month', + dateRange: ['2017-01-01', '2017-03-31'] + }], + timezone: 'UTC', + order: [{ + id: 'monthly_data.created_at' + }], + preAggregationsSchema: '', + cubestoreSupportMultistage: true + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + const tableNames = preAggregationsDescription.map((d: any) => d.tableName); + expect(tableNames).toContain('md_revenue_by_id'); + expect(tableNames).toContain('md_count_by_cat'); + expect(sqlAndParams[0]).toContain('md_revenue_by_id'); + expect(sqlAndParams[0]).toContain('md_count_by_cat'); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + md__created_at_month: '2017-01-01T00:00:00.000Z', + md__revenue_per_id: '20.0000000000000000', + md__count_by_category: '2' + }, + { + md__created_at_month: '2017-02-01T00:00:00.000Z', + md__revenue_per_id: '45.0000000000000000', + md__count_by_category: '2' + }, + { + md__created_at_month: '2017-03-01T00:00:00.000Z', + md__revenue_per_id: '30.0000000000000000', + md__count_by_category: '2' + } + ] + ); + }); + })); + + it('multi-stage with time_shift loading different pre-aggregation partitions', () => compiler.compile().then(() => { + const query = new PostgresQuery({ joinGraph, cubeEvaluator, compiler }, { + measures: [ + 'monthly_data.revenue_per_id', + 'monthly_data.prev_month_revenue' + ], + timeDimensions: [{ + dimension: 'monthly_data.created_at', + granularity: 'month', + dateRange: ['2017-02-01', '2017-03-31'] + }], + timezone: 'UTC', + order: [{ + id: 'monthly_data.created_at' + }], + preAggregationsSchema: '', + cubestoreSupportMultistage: true + }); + + const preAggregationsDescription: any = query.preAggregations?.preAggregationsDescription(); + const sqlAndParams = query.buildSqlAndParams(); + expect(preAggregationsDescription.length).toBeGreaterThanOrEqual(1); + expect(preAggregationsDescription.some((d: any) => d.tableName.startsWith('md_'))).toBe(true); + + return dbRunner.evaluateQueryWithPreAggregations(query).then(res => { + expect(res).toEqual( + [ + { + md__created_at_month: '2017-02-01T00:00:00.000Z', + md__revenue_per_id: '45.0000000000000000', + md__prev_month_revenue: '30' + }, + { + md__created_at_month: '2017-03-01T00:00:00.000Z', + md__revenue_per_id: '30.0000000000000000', + md__prev_month_revenue: '200' + } + ] + ); + }); + })); } else { it.skip('multi stage pre-aggregations', () => { // Skipping because it works only in Tesseract From f1b51bff6dbc3fe3e001ac63168c87f5559e8dcd Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 13:03:42 +0200 Subject: [PATCH 09/24] in work --- .../src/adapter/PreAggregations.ts | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts index 07e67e2ac476a..9c8314d8059f1 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts @@ -193,13 +193,36 @@ export class PreAggregations { // One description per physical pre-aggregation, with usageMapping attached const descriptions = this.preAggregationDescriptionsFor(foundPreAgg); + // Compute the union of all usage date ranges so that partitions cover + // every usage (e.g. time_shift may require earlier partitions). + const mergedDateRange = PreAggregations.mergeUsageDateRanges(usageInfo.usages); + return descriptions.map(desc => ({ ...desc, - usageMapping: usageInfo.usages, // { "__usage_0": { dateRange }, "__usage_1": { dateRange } } + usageMapping: usageInfo.usages, + ...(mergedDateRange && desc.matchedTimeDimensionDateRange ? { matchedTimeDimensionDateRange: mergedDateRange } : {}), })); }); } + private static mergeUsageDateRanges(usages: Record): [string, string] | null { + let minDate: string | null = null; + let maxDate: string | null = null; + + for (const usage of Object.values(usages)) { + if (usage.dateRange) { + if (!minDate || usage.dateRange[0] < minDate) { + minDate = usage.dateRange[0]; + } + if (!maxDate || usage.dateRange[1] > maxDate) { + maxDate = usage.dateRange[1]; + } + } + } + + return minDate && maxDate ? [minDate, maxDate] : null; + } + private preAggregationDescriptionsFor(foundPreAggregation: PreAggregationForQuery): FullPreAggregationDescription[] { let preAggregations: PreAggregationForQuery[] = [foundPreAggregation]; if (foundPreAggregation.preAggregation.type === 'rollupJoin') { From 610935c6e2c9ff33aa60725151c474fe7607d9be Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 13:05:48 +0200 Subject: [PATCH 10/24] fmt --- .../optimizers/pre_aggregation/optimizer.rs | 16 ++----- .../cubesqlplanner/src/planner/base_query.rs | 18 ++++---- .../src/planner/top_level_planner.rs | 4 +- .../tests/pre_aggregation_sql_generation.rs | 42 ++++++++++--------- 4 files changed, 36 insertions(+), 44 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 5aada5ccba78f..d5b3a89589322 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -74,16 +74,11 @@ impl PreAggregationOptimizer { }; if !plan.multistage_members().is_empty() && self.allow_multi_stage { - return self - .try_rewrite_query_with_multistages(&plan, &filtered_pre_aggregations); + return self.try_rewrite_query_with_multistages(&plan, &filtered_pre_aggregations); } for pre_aggregation in filtered_pre_aggregations.iter() { - let new_query = self.try_rewrite_simple_query( - &plan, - pre_aggregation, - None, - )?; + let new_query = self.try_rewrite_simple_query(&plan, pre_aggregation, None)?; if new_query.is_some() { return Ok(new_query); } @@ -135,11 +130,8 @@ impl PreAggregationOptimizer { } for pre_aggregation in compiled_pre_aggregations.iter() { - let result = self.try_rewrite_simple_query( - &query, - pre_aggregation, - date_range.clone(), - )?; + let result = + self.try_rewrite_simple_query(&query, pre_aggregation, date_range.clone())?; if result.is_some() { return Ok(result); } diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 60a54aab65c8b..232a148cba266 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -81,9 +81,7 @@ impl BaseQuery { let (sql, usages) = planner.plan()?; let is_external = if !usages.is_empty() { - usages - .iter() - .all(|usage| usage.pre_aggregation.external()) + usages.iter().all(|usage| usage.pre_aggregation.external()) } else { false }; @@ -141,12 +139,14 @@ impl BaseQuery { let suffix = format!("__usage_{}", usage.index); - let group = groups.entry(key).or_insert_with(|| GroupedPreAggregationInfo { - cube_name, - pre_aggregation_name: name, - external: pre_agg.external(), - usages: HashMap::new(), - }); + let group = groups + .entry(key) + .or_insert_with(|| GroupedPreAggregationInfo { + cube_name, + pre_aggregation_name: name, + external: pre_agg.external(), + usages: HashMap::new(), + }); group.usages.insert( suffix, diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs index 9653f6035c436..77cac4501fc42 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/top_level_planner.rs @@ -36,9 +36,7 @@ impl TopLevelPlanner { let (optimized_plan, usages) = self.try_pre_aggregations(logical_plan.clone())?; let is_external = if !usages.is_empty() { - usages - .iter() - .all(|usage| usage.pre_aggregation.external()) + usages.iter().all(|usage| usage.pre_aggregation.external()) } else { false }; diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs index 135e30116c14f..4f7329aa29a3a 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs @@ -858,20 +858,22 @@ async fn test_multi_stage_separate_pre_aggregations() { .iter() .map(|u| u.pre_aggregation.name().as_str()) .collect(); - assert!(names.contains(&"count_rollup"), "Expected count_rollup, got {:?}", names); - assert!(names.contains(&"revenue_rollup"), "Expected revenue_rollup, got {:?}", names); + assert!( + names.contains(&"count_rollup"), + "Expected count_rollup, got {:?}", + names + ); + assert!( + names.contains(&"revenue_rollup"), + "Expected revenue_rollup, got {:?}", + names + ); if let Some(result) = ctx - .try_execute_pg( - query_yaml, - "multi_stage_separate_pre_aggs_tables.sql", - ) + .try_execute_pg(query_yaml, "multi_stage_separate_pre_aggs_tables.sql") .await { - insta::assert_snapshot!( - "multi_stage_separate_pre_aggs_pg_result", - result - ); + insta::assert_snapshot!("multi_stage_separate_pre_aggs_pg_result", result); } } @@ -915,28 +917,28 @@ async fn test_multi_stage_separate_pre_aggs_with_time_shift() { // TODO: currently time_shift is not propagated to date_range — fix in optimizer assert_eq!( count_usage.date_range, - Some(("2024-12-01T00:00:00.000".to_string(), "2025-02-28T23:59:59.999".to_string())), + Some(( + "2024-12-01T00:00:00.000".to_string(), + "2025-02-28T23:59:59.999".to_string() + )), "count_rollup should have date range shifted 1 month prior" ); // revenue_reduce_status has no time shift, so original date range assert_eq!( revenue_usage.date_range, - Some(("2025-01-01T00:00:00.000".to_string(), "2025-03-31T23:59:59.999".to_string())), + Some(( + "2025-01-01T00:00:00.000".to_string(), + "2025-03-31T23:59:59.999".to_string() + )), "revenue_rollup should have original date range" ); if let Some(result) = ctx - .try_execute_pg( - query_yaml, - "multi_stage_pre_agg_time_shift_tables.sql", - ) + .try_execute_pg(query_yaml, "multi_stage_pre_agg_time_shift_tables.sql") .await { - insta::assert_snapshot!( - "multi_stage_separate_pre_aggs_time_shift_pg_result", - result - ); + insta::assert_snapshot!("multi_stage_separate_pre_aggs_time_shift_pg_result", result); } } From e1f43f24b13b56277d7021b5e64140979cbb301f Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 13:26:27 +0200 Subject: [PATCH 11/24] fix --- .../optimizers/pre_aggregation/optimizer.rs | 61 ++++++++----------- 1 file changed, 25 insertions(+), 36 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index d5b3a89589322..53fe3e76c76ef 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -121,8 +121,8 @@ impl PreAggregationOptimizer { compiled_pre_aggregations: &[Rc], time_shifts: &TimeShiftState, ) -> Result>, CubeError> { - let date_range = Self::extract_date_range(&query.filter(), &self.query_tools); - let date_range = self.apply_time_shifts_to_date_range(date_range, time_shifts); + let date_range = + Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts); if !query.multistage_members().is_empty() { // Nested multi-stage: recurse with full list @@ -212,6 +212,7 @@ impl PreAggregationOptimizer { let date_range = Self::extract_date_range( &resolver_multiplied_measures.filter, &self.query_tools, + &TimeShiftState::default(), ); let pre_aggregation_source = self.make_pre_aggregation_source( pre_aggregation, @@ -366,6 +367,7 @@ impl PreAggregationOptimizer { fn extract_date_range( filter: &LogicalFilter, query_tools: &Rc, + time_shifts: &TimeShiftState, ) -> Option<(String, String)> { let precision = query_tools .base_tools() @@ -376,8 +378,27 @@ impl PreAggregationOptimizer { for item in &filter.time_dimensions_filters { if let FilterItem::Item(base_filter) = item { if let FilterOp::DateRange(date_range_op) = base_filter.operation() { - if let Ok(formatted) = date_range_op.formatted_date_range(precision) { - return Some(formatted); + if let Ok((from, to)) = date_range_op.formatted_date_range(precision) { + // Apply time shift for this dimension if present. + // SQL renders `column + interval`, so actual data range is `date - interval`. + if let Some(interval) = time_shifts + .dimensions_shifts + .get(&base_filter.member_name()) + .and_then(|s| s.interval.as_ref()) + { + let tz = query_tools.timezone(); + let neg = -interval.clone(); + let shifted_from = QueryDateTime::from_date_str(tz, &from) + .and_then(|dt| dt.add_interval(&neg)) + .map(|dt| dt.default_format()) + .unwrap_or(from); + let shifted_to = QueryDateTime::from_date_str(tz, &to) + .and_then(|dt| dt.add_interval(&neg)) + .map(|dt| dt.default_format()) + .unwrap_or(to); + return Some((shifted_from, shifted_to)); + } + return Some((from, to)); } } } @@ -385,38 +406,6 @@ impl PreAggregationOptimizer { None } - /// Shift date_range by the negated time_shift interval. - /// The SQL renders `column + interval`, so to get the actual data range - /// we need `date_range - interval`. - fn apply_time_shifts_to_date_range( - &self, - date_range: Option<(String, String)>, - time_shifts: &TimeShiftState, - ) -> Option<(String, String)> { - let (from, to) = date_range?; - if time_shifts.is_empty() { - return Some((from, to)); - } - - // Use the first shift's interval (multi-stage typically has one time dimension) - let interval = time_shifts - .dimensions_shifts - .values() - .find_map(|shift| shift.interval.as_ref())?; - - let tz = self.query_tools.timezone(); - let shifted_from = QueryDateTime::from_date_str(tz, &from) - .and_then(|dt| dt.add_interval(&-interval.clone())) - .map(|dt| dt.default_format()) - .unwrap_or(from); - let shifted_to = QueryDateTime::from_date_str(tz, &to) - .and_then(|dt| dt.add_interval(&-interval.clone())) - .map(|dt| dt.default_format()) - .unwrap_or(to); - - Some((shifted_from, shifted_to)) - } - fn is_schema_and_filters_match( &self, schema: &Rc, From 97606cf2bbef549e184735ee94e59c3b2afd2dc5 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 13:37:06 +0200 Subject: [PATCH 12/24] fix --- .../cubesqlplanner/src/planner/base_query.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index 232a148cba266..c6837da799740 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -159,6 +159,12 @@ impl BaseQuery { ); } - groups.into_values().collect() + let mut result: Vec<_> = groups.into_values().collect(); + result.sort_by(|a, b| { + a.cube_name + .cmp(&b.cube_name) + .then(a.pre_aggregation_name.cmp(&b.pre_aggregation_name)) + }); + result } } From 669da14a75dfca5f74687cfb2f675a5683004379 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 14:53:04 +0200 Subject: [PATCH 13/24] fix --- .../src/adapter/BaseQuery.js | 36 ++++++++----------- .../tests/pre_aggregation_sql_generation.rs | 1 - 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index 210664670d5f5..da946405db615 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -961,18 +961,7 @@ export class BaseQuery { const [query, params, preAggResult] = buildResult; const paramsArray = [...params]; - if (preAggResult) { - if (Array.isArray(preAggResult)) { - // Grouped usage info objects (multiple usages) - this.preAggregations.preAggregationUsageInfos = preAggResult; - const first = preAggResult[0]; - this.preAggregations.preAggregationForQuery = - this.getPreAggregationByName(first.cubeName, first.preAggregationName); - } else { - // Single-usage: old-style pre-aggregation object - this.preAggregations.preAggregationForQuery = preAggResult; - } - } + this.applyNativePreAggResult(preAggResult); return [query, paramsArray]; } catch (e) { if (e.name === 'TesseractUserError') { @@ -1019,17 +1008,20 @@ export class BaseQuery { const buildResult = nativeBuildSqlAndParams(queryParams); const [, , preAggResult] = buildResult; - if (preAggResult) { - if (Array.isArray(preAggResult)) { - // Grouped usage info objects (multiple usages) - this.preAggregations.preAggregationUsageInfos = preAggResult; - const first = preAggResult[0]; - return this.getPreAggregationByName(first.cubeName, first.preAggregationName); - } - // Single-usage: old-style pre-aggregation object - return preAggResult; + this.applyNativePreAggResult(preAggResult); + return this.preAggregations.preAggregationForQuery; + } + + applyNativePreAggResult(preAggResult) { + if (!preAggResult) return; + if (Array.isArray(preAggResult)) { + this.preAggregations.preAggregationUsageInfos = preAggResult; + const first = preAggResult[0]; + this.preAggregations.preAggregationForQuery = + this.getPreAggregationByName(first.cubeName, first.preAggregationName); + } else { + this.preAggregations.preAggregationForQuery = preAggResult; } - return undefined; } allCubeMembers(path) { diff --git a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs index 4f7329aa29a3a..a19583bb70ad8 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/tests/pre_aggregation_sql_generation.rs @@ -914,7 +914,6 @@ async fn test_multi_stage_separate_pre_aggs_with_time_shift() { .expect("Expected revenue_rollup usage"); // count_prev_month has time_shift prior 1 month, so date range should be shifted back - // TODO: currently time_shift is not propagated to date_range — fix in optimizer assert_eq!( count_usage.date_range, Some(( From 849beacfd7a059e1d54f2d015e05238339d6f2d8 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 15:59:58 +0200 Subject: [PATCH 14/24] fmt --- .../src/logical_plan/optimizers/pre_aggregation/optimizer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 53fe3e76c76ef..894a92997f585 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -121,8 +121,7 @@ impl PreAggregationOptimizer { compiled_pre_aggregations: &[Rc], time_shifts: &TimeShiftState, ) -> Result>, CubeError> { - let date_range = - Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts); + let date_range = Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts); if !query.multistage_members().is_empty() { // Nested multi-stage: recurse with full list From 159e389e2e2d717a667c0fe52afadeaf96907af3 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 16:01:05 +0200 Subject: [PATCH 15/24] lint --- packages/cubejs-schema-compiler/src/adapter/BaseQuery.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js index da946405db615..30528a028ca41 100644 --- a/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/src/adapter/BaseQuery.js @@ -3507,7 +3507,7 @@ export class BaseQuery { } escapeStringLiteral(str) { - return `'${str.replace(/'/g, "''")}'`; + return `'${str.replace(/'/g, '\'\'')}'`; } autoPrefixAndEvaluateSql(cubeName, sql, isMemberExpr = false) { From 5f8a06056c44e3715bad970e3838407b9ceec5de Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 16:15:24 +0200 Subject: [PATCH 16/24] lint --- .../src/orchestrator/PreAggregationPartitionRangeLoader.ts | 2 +- .../cubejs-query-orchestrator/src/orchestrator/QueryCache.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index 6498ea2eea5d2..42421cd84405d 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -319,7 +319,7 @@ export class PreAggregationPartitionRangeLoader { }); const usageTableNames = usagePartitions.map(r => r.targetTableName); if (usageTableNames.length === 1) { - usageTargetTableNames[suffix] = usageTableNames[0]; + [usageTargetTableNames[suffix]] = usageTableNames; } else if (usageTableNames.length > 0) { const usageUnion = usageTableNames .map(t => `SELECT * FROM ${t}`) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index d2601661c3b4c..96cd09a8d17c4 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -428,7 +428,7 @@ export class QueryCache { const [keyQuery, params, queryOptions] = Array.isArray(queryAndParams) ? queryAndParams : [queryAndParams, []]; - let replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( + const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( (query, [tableName, { targetTableName, usageTargetTableNames }]) => { // First replace usage-specific placeholders (e.g. tableName__usage_0) if (usageTargetTableNames) { From 96444bc64a1760002edd10985063bd1d5077b9c9 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 16:24:20 +0200 Subject: [PATCH 17/24] fix --- .../cubesqlplanner/cubesqlplanner/src/planner/base_query.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs index c6837da799740..f64d382d9f3e7 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/planner/base_query.rs @@ -91,10 +91,8 @@ impl BaseQuery { .query_tools .build_sql_and_params(&sql, true, &templates)?; - let has_multiple_usages = usages.len() > 1; - // For single usage, strip __usage_N suffix from SQL to maintain backward compat - let final_sql = if !has_multiple_usages && usages.len() == 1 { + let final_sql = if usages.len() == 1 { result_sql.replace(&format!("__usage_{}", usages[0].index), "") } else { result_sql @@ -104,7 +102,7 @@ impl BaseQuery { res.set(0, final_sql.to_native(self.context.clone())?)?; res.set(1, params.to_native(self.context.clone())?)?; - if has_multiple_usages { + if usages.len() > 1 { // Multiple usages: group by (cubeName, name), return array of grouped infos let grouped = Self::group_usages(&usages); res.set(2, grouped.to_native(self.context.clone())?)?; From 58789ef09b9194873f98818f332bd2b1afb46dc0 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 16:31:25 +0200 Subject: [PATCH 18/24] fix --- .../src/adapter/PreAggregations.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts index 9c8314d8059f1..250e7661efea4 100644 --- a/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts +++ b/packages/cubejs-schema-compiler/src/adapter/PreAggregations.ts @@ -211,11 +211,12 @@ export class PreAggregations { for (const usage of Object.values(usages)) { if (usage.dateRange) { - if (!minDate || usage.dateRange[0] < minDate) { - minDate = usage.dateRange[0]; + const [from, to] = usage.dateRange; + if (!minDate || from < minDate) { + minDate = from; } - if (!maxDate || usage.dateRange[1] > maxDate) { - maxDate = usage.dateRange[1]; + if (!maxDate || to > maxDate) { + maxDate = to; } } } From dc794f6d174038e13e2c75546e9f17f7c8498bd7 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Tue, 14 Apr 2026 19:27:44 +0200 Subject: [PATCH 19/24] fix --- .../src/orchestrator/PreAggregationPartitionRangeLoader.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts index 42421cd84405d..352f7895d2e6b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationPartitionRangeLoader.ts @@ -305,9 +305,11 @@ export class PreAggregationPartitionRangeLoader { usageTargetTableNames = {}; for (const [suffix, usageInfo] of Object.entries(this.preAggregation.usageMapping)) { if (usageInfo.dateRange && this.preAggregation.partitionGranularity) { - // Load partition ranges specific to this usage's dateRange + // Load partition ranges specific to this usage's dateRange. + // Use partitionRange (generated locally via timeSeries, always in DEFAULT_TS_FORMAT) + // instead of buildRangeEnd (from DB, may include Z suffix depending on driver timestampFormat). const usageDateRange = PreAggregationPartitionRangeLoader.intersectDateRanges( - [loadResults[0]?.buildRangeEnd ? loadResults[0].partitionRange?.[0] : null, loadResults[loadResults.length - 1]?.buildRangeEnd || null] as QueryDateRange, + [loadResults[0]?.partitionRange?.[0] || null, loadResults[loadResults.length - 1]?.partitionRange?.[1] || null] as QueryDateRange, usageInfo.dateRange as QueryDateRange, ); if (usageDateRange) { From 51289313dccb604177f67aa89a00ab2a92ec23fc Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Wed, 15 Apr 2026 13:44:11 +0200 Subject: [PATCH 20/24] fix --- .../src/orchestrator/QueryCache.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 96cd09a8d17c4..6e9f744a7a0cf 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -436,8 +436,14 @@ export class QueryCache { query = QueryCache.replaceAll(`${tableName}${suffix}`, usageTargetName, query); } } - // Then replace base table name for any remaining references - return QueryCache.replaceAll(tableName, targetTableName, query); + // Replace base table name only when there are no usage-specific replacements. + // When usageTargetTableNames is present, all SQL references already use __usage_N suffixes + // and the base replacement would incorrectly match inside already-replaced target names + // (e.g. turning "preagg20200101_hash" into "preagg20200101_hash20200101_hash"). + if (!usageTargetTableNames || Object.keys(usageTargetTableNames).length === 0) { + return QueryCache.replaceAll(tableName, targetTableName, query); + } + return query; }, keyQuery ); From 4d9339e56888622417ded757d446d4add50e775d Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Wed, 15 Apr 2026 14:44:45 +0200 Subject: [PATCH 21/24] fix --- .../optimizers/pre_aggregation/optimizer.rs | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index 894a92997f585..ff7a87139dad3 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -121,16 +121,21 @@ impl PreAggregationOptimizer { compiled_pre_aggregations: &[Rc], time_shifts: &TimeShiftState, ) -> Result>, CubeError> { - let date_range = Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts); - if !query.multistage_members().is_empty() { // Nested multi-stage: recurse with full list return self.try_rewrite_query_with_multistages(&query, compiled_pre_aggregations); } for pre_aggregation in compiled_pre_aggregations.iter() { + let external = pre_aggregation.external.unwrap_or(false); + let date_range = Self::extract_date_range( + &query.filter(), + &self.query_tools, + time_shifts, + external, + ); let result = - self.try_rewrite_simple_query(&query, pre_aggregation, date_range.clone())?; + self.try_rewrite_simple_query(&query, pre_aggregation, date_range)?; if result.is_some() { return Ok(result); } @@ -212,6 +217,7 @@ impl PreAggregationOptimizer { &resolver_multiplied_measures.filter, &self.query_tools, &TimeShiftState::default(), + pre_aggregation.external.unwrap_or(false), ); let pre_aggregation_source = self.make_pre_aggregation_source( pre_aggregation, @@ -260,6 +266,17 @@ impl PreAggregationOptimizer { query.source().clone() }; + // Reject mixed external/non-external pre-aggregation usages + let new_usages = &self.usages[saved_usages_len..]; + if !new_usages.is_empty() { + let first_external = new_usages[0].external(); + if new_usages.iter().any(|u| u.external() != first_external) { + self.usages.truncate(saved_usages_len); + self.usage_counter = saved_counter; + return Ok(None); + } + } + let result = Query::builder() .multistage_members(rewritten_multistages) .schema(query.schema().clone()) @@ -367,10 +384,11 @@ impl PreAggregationOptimizer { filter: &LogicalFilter, query_tools: &Rc, time_shifts: &TimeShiftState, + external: bool, ) -> Option<(String, String)> { let precision = query_tools .base_tools() - .driver_tools(false) + .driver_tools(external) .ok() .and_then(|dt| dt.timestamp_precision().ok()) .unwrap_or(3); From 594dcdc6373749462626919e711c7d1f96937b66 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Wed, 15 Apr 2026 15:43:47 +0200 Subject: [PATCH 22/24] fmt --- .../optimizers/pre_aggregation/optimizer.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs index ff7a87139dad3..64a99d5690034 100644 --- a/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs +++ b/rust/cubesqlplanner/cubesqlplanner/src/logical_plan/optimizers/pre_aggregation/optimizer.rs @@ -128,14 +128,9 @@ impl PreAggregationOptimizer { for pre_aggregation in compiled_pre_aggregations.iter() { let external = pre_aggregation.external.unwrap_or(false); - let date_range = Self::extract_date_range( - &query.filter(), - &self.query_tools, - time_shifts, - external, - ); - let result = - self.try_rewrite_simple_query(&query, pre_aggregation, date_range)?; + let date_range = + Self::extract_date_range(&query.filter(), &self.query_tools, time_shifts, external); + let result = self.try_rewrite_simple_query(&query, pre_aggregation, date_range)?; if result.is_some() { return Ok(result); } From 0b35e136a49e11fd596ac4074036108897e0e872 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Wed, 15 Apr 2026 15:50:27 +0200 Subject: [PATCH 23/24] fix --- .../__snapshots__/mysql-full.test.ts.snap | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap b/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap index 83fb966c1071e..c8f5ecb5a8dd7 100644 --- a/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap +++ b/packages/cubejs-testing-drivers/test/__snapshots__/mysql-full.test.ts.snap @@ -9236,51 +9236,51 @@ Array [ exports[`Queries with the @cubejs-backend/mysql-driver querying BigECommerce: SeveralMultiStageMeasures 1`] = ` Array [ Object { - "BigECommerce.count": 2, + "BigECommerce.count": "2", "BigECommerce.orderDate": "2020-01-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-01-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, + "BigECommerce.percentageOfTotalForStatus": "100", "BigECommerce.totalCountRetailMonthAgo": null, "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 1, + "BigECommerce.count": "1", "BigECommerce.orderDate": "2020-02-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-02-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 2, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 2, + "BigECommerce.count": "2", "BigECommerce.orderDate": "2020-03-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-03-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 1, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 1, + "BigECommerce.count": "1", "BigECommerce.orderDate": "2020-04-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-04-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 2, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "2", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 5, + "BigECommerce.count": "5", "BigECommerce.orderDate": "2020-05-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-05-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 1, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "1", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 7, + "BigECommerce.count": "7", "BigECommerce.orderDate": "2020-06-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-06-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 5, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "5", "BigECommerce.totalProfitYearAgo": null, }, Object { @@ -9288,39 +9288,39 @@ Array [ "BigECommerce.orderDate": "2020-07-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-07-01T00:00:00.000", "BigECommerce.percentageOfTotalForStatus": null, - "BigECommerce.totalCountRetailMonthAgo": 7, + "BigECommerce.totalCountRetailMonthAgo": "7", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 6, + "BigECommerce.count": "6", "BigECommerce.orderDate": "2020-09-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-09-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, + "BigECommerce.percentageOfTotalForStatus": "100", "BigECommerce.totalCountRetailMonthAgo": null, "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 4, + "BigECommerce.count": "4", "BigECommerce.orderDate": "2020-10-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-10-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 6, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "6", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 9, + "BigECommerce.count": "9", "BigECommerce.orderDate": "2020-11-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-11-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 4, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "4", "BigECommerce.totalProfitYearAgo": null, }, Object { - "BigECommerce.count": 7, + "BigECommerce.count": "7", "BigECommerce.orderDate": "2020-12-01T00:00:00.000", "BigECommerce.orderDate.month": "2020-12-01T00:00:00.000", - "BigECommerce.percentageOfTotalForStatus": 100, - "BigECommerce.totalCountRetailMonthAgo": 9, + "BigECommerce.percentageOfTotalForStatus": "100", + "BigECommerce.totalCountRetailMonthAgo": "9", "BigECommerce.totalProfitYearAgo": null, }, ] From 64242fefcb250b31687fc6bc3bbd201730de1d37 Mon Sep 17 00:00:00 2001 From: Aleksandr Romanenko Date: Thu, 23 Apr 2026 16:23:25 +0200 Subject: [PATCH 24/24] fix --- .../src/orchestrator/PreAggregations.ts | 16 ++++++++++++++-- .../src/orchestrator/QueryCache.ts | 19 ++----------------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index a0f9d48660464..fc890b7df1b6c 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -572,10 +572,22 @@ export class PreAggregations { ); } - return [p.tableName, usedPreAggregation]; + return [p.tableName, usedPreAggregation] as PreAggregationTableToTempTable; }; - return preAggregationPromise().then(res => preAggregationsTablesToTempTables.concat([res])); + return preAggregationPromise().then(([tableName, result]) => { + const { usageTargetTableNames } = result; + if (usageTargetTableNames && Object.keys(usageTargetTableNames).length > 0) { + const entries: PreAggregationTableToTempTable[] = Object.entries(usageTargetTableNames).map( + ([suffix, usageTarget]) => [ + `${tableName}${suffix}`, + { ...result, targetTableName: usageTarget, usageTargetTableNames: undefined }, + ] + ); + return preAggregationsTablesToTempTables.concat(entries); + } + return preAggregationsTablesToTempTables.concat([[tableName, result]]); + }); }).reduce((promise, fn) => promise.then(fn), Promise.resolve([])); return preAggregationsTablesToTempTablesPromise.then(preAggregationsTablesToTempTables => ({ diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index 6e9f744a7a0cf..70b04ade38ddb 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -115,7 +115,7 @@ export type PreAggTableToTempTable = [ TempTable, ]; -export type PreAggTableToTempTableNames = [string, { targetTableName: string; usageTargetTableNames?: Record; }]; +export type PreAggTableToTempTableNames = [string, { targetTableName: string; }]; export type CacheKeyItem = string | string[] | QueryWithParams | QueryWithParams[] | undefined; @@ -429,22 +429,7 @@ export class QueryCache { ? queryAndParams : [queryAndParams, []]; const replacedKeyQuery: string = preAggregationsTablesToTempTables.reduce( - (query, [tableName, { targetTableName, usageTargetTableNames }]) => { - // First replace usage-specific placeholders (e.g. tableName__usage_0) - if (usageTargetTableNames) { - for (const [suffix, usageTargetName] of Object.entries(usageTargetTableNames)) { - query = QueryCache.replaceAll(`${tableName}${suffix}`, usageTargetName, query); - } - } - // Replace base table name only when there are no usage-specific replacements. - // When usageTargetTableNames is present, all SQL references already use __usage_N suffixes - // and the base replacement would incorrectly match inside already-replaced target names - // (e.g. turning "preagg20200101_hash" into "preagg20200101_hash20200101_hash"). - if (!usageTargetTableNames || Object.keys(usageTargetTableNames).length === 0) { - return QueryCache.replaceAll(tableName, targetTableName, query); - } - return query; - }, + (query, [tableName, { targetTableName }]) => QueryCache.replaceAll(tableName, targetTableName, query), keyQuery ); return Array.isArray(queryAndParams)