diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index ebe3664202fa..4d3513a21dbf 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -210,7 +210,7 @@ mod tests { use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; use crate::key::flow::FlowMetadataManager; - use crate::key::flow::flow_info::FlowInfoValue; + use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; use crate::key::flow::flow_route::FlowRouteValue; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; @@ -242,11 +242,14 @@ mod tests { catalog_name: DEFAULT_CATALOG_NAME.to_string(), query_context: None, flow_name: "my_flow".to_string(), + all_source_table_names: vec![], + unresolved_source_table_names: vec![], raw_sql: "sql".to_string(), expire_after: Some(300), eval_interval_secs: None, comment: "comment".to_string(), options: Default::default(), + status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), }, diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 7120e5042597..14a217a4d301 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -14,7 +14,7 @@ mod metadata; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use api::v1::ExpireAfter; @@ -34,13 +34,14 @@ use serde::{Deserialize, Serialize}; use snafu::{ResultExt, ensure}; use strum::AsRefStr; use table::metadata::TableId; +use table::table_name::TableName; use crate::cache_invalidator::Context; use crate::ddl::DdlContext; use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::error::{self, Result, UnexpectedSnafu}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; -use crate::key::flow::flow_info::FlowInfoValue; +use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus}; use crate::key::flow::flow_route::FlowRouteValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId}; @@ -67,6 +68,7 @@ impl CreateFlowProcedure { flow_id: None, peers: vec![], source_table_ids: vec![], + unresolved_source_table_names: vec![], flow_context: query_context.into(), // Convert to FlowQueryContext state: CreateFlowState::Prepare, prev_flow_info_value: None, @@ -89,6 +91,8 @@ impl CreateFlowProcedure { let create_if_not_exists = self.data.task.create_if_not_exists; let or_replace = self.data.task.or_replace; + validate_flow_options(&self.data.task)?; + let flow_name_value = self .context .flow_metadata_manager @@ -167,6 +171,21 @@ impl CreateFlowProcedure { } self.collect_source_tables().await?; + ensure!( + self.data.unresolved_source_table_names.is_empty() + || defer_on_missing_source(&self.data.task)?, + error::UnsupportedSnafu { + operation: format!( + "Create flow with missing source tables requires WITH ('{DEFER_ON_MISSING_SOURCE_KEY}'='true'): {}", + self.data + .unresolved_source_table_names + .iter() + .map(ToString::to_string) + .join(", ") + ) + } + ); + self.ensure_supported_replace_transition()?; // Validate that source and sink tables are not the same let sink_table_name = &self.data.task.sink_table_name; @@ -189,13 +208,38 @@ impl CreateFlowProcedure { if self.data.flow_id.is_none() { self.allocate_flow_id().await?; } - self.data.state = CreateFlowState::CreateFlows; - // determine flow type self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?); + self.data.state = if self.data.is_pending() { + self.data.peers.clear(); + CreateFlowState::CreateMetadata + } else { + CreateFlowState::CreateFlows + }; + Ok(Status::executing(true)) } + fn ensure_supported_replace_transition(&self) -> Result<()> { + if !self.data.task.or_replace { + return Ok(()); + } + + let Some(prev_flow_info) = self.data.prev_flow_info_value.as_ref() else { + return Ok(()); + }; + let prev_pending = prev_flow_info.get_inner_ref().is_pending(); + let new_pending = self.data.is_pending(); + ensure!( + prev_pending == new_pending, + error::UnsupportedSnafu { + operation: "Replacing between pending and active flow states is not supported yet" + } + ); + + Ok(()) + } + async fn on_flownode_create_flows(&mut self) -> Result { // Safety: must be allocated. let mut create_flow = Vec::with_capacity(self.data.peers.len()); @@ -365,6 +409,60 @@ pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result Result { + flow_task + .flow_options + .get(DEFER_ON_MISSING_SOURCE_KEY) + .map(|value| { + value + .trim() + .to_ascii_lowercase() + .parse::() + .map_err(|_| { + error::UnexpectedSnafu { + err_msg: format!( + "Invalid flow option '{DEFER_ON_MISSING_SOURCE_KEY}': {value}" + ), + } + .build() + }) + }) + .transpose() + .map(|value| value.unwrap_or(false)) +} + +pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> { + for key in flow_task.flow_options.keys() { + match key.as_str() { + DEFER_ON_MISSING_SOURCE_KEY | FlowType::FLOW_TYPE_KEY => {} + unknown => { + return UnexpectedSnafu { + err_msg: format!( + "Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}" + ), + } + .fail(); + } + } + } + + defer_on_missing_source(flow_task)?; + get_flow_type_from_options(flow_task)?; + Ok(()) +} + +fn user_runtime_flow_options(options: &HashMap) -> HashMap { + let mut options = options.clone(); + options.remove(DEFER_ON_MISSING_SOURCE_KEY); + options +} + +fn metadata_flow_options(options: &HashMap) -> HashMap { + options.clone() +} + /// The state of [CreateFlowProcedure]. #[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] pub enum CreateFlowState { @@ -411,6 +509,8 @@ pub struct CreateFlowData { pub(crate) flow_id: Option, pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, + #[serde(default)] + pub(crate) unresolved_source_table_names: Vec, /// Use alias for backward compatibility with QueryContext serialized data #[serde(alias = "query_context")] pub(crate) flow_context: FlowQueryContext, @@ -424,6 +524,16 @@ pub struct CreateFlowData { pub(crate) flow_type: Option, } +impl CreateFlowData { + pub(crate) fn is_pending(&self) -> bool { + !self.unresolved_source_table_names.is_empty() + } + + pub(crate) fn is_active(&self) -> bool { + !self.is_pending() + } +} + impl From<&CreateFlowData> for CreateRequest { fn from(value: &CreateFlowData) -> Self { let flow_id = value.flow_id.unwrap(); @@ -446,7 +556,7 @@ impl From<&CreateFlowData> for CreateRequest { .map(|seconds| api::v1::EvalInterval { seconds }), comment: value.task.comment.clone(), sql: value.task.sql.clone(), - flow_options: value.task.flow_options.clone(), + flow_options: user_runtime_flow_options(&value.task.flow_options), }; let flow_type = value.flow_type.unwrap_or_default().to_string(); @@ -466,9 +576,9 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa eval_interval_secs: eval_interval, comment, sql, - flow_options: mut options, .. } = value.task.clone(); + let mut options = metadata_flow_options(&value.task.flow_options); let flownode_ids = value .peers @@ -484,7 +594,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa .collect::>(); let flow_type = value.flow_type.unwrap_or_default().to_string(); - options.insert("flow_type".to_string(), flow_type); + options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type); let mut create_time = chrono::Utc::now(); if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref() @@ -495,6 +605,8 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa let flow_info: FlowInfoValue = FlowInfoValue { source_table_ids: value.source_table_ids.clone(), + all_source_table_names: value.task.source_table_names.clone(), + unresolved_source_table_names: value.unresolved_source_table_names.clone(), sink_table_name, flownode_ids, catalog_name, @@ -506,6 +618,11 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa eval_interval_secs: eval_interval, comment, options, + status: if value.is_active() { + FlowStatus::Active + } else { + FlowStatus::PendingSources + }, created_time: create_time, updated_time: chrono::Utc::now(), }; diff --git a/src/common/meta/src/ddl/create_flow/metadata.rs b/src/common/meta/src/ddl/create_flow/metadata.rs index 27b85b794634..f97ecfdf4a86 100644 --- a/src/common/meta/src/ddl/create_flow/metadata.rs +++ b/src/common/meta/src/ddl/create_flow/metadata.rs @@ -12,10 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use snafu::OptionExt; - use crate::ddl::create_flow::CreateFlowProcedure; -use crate::error::{self, Result}; +use crate::error::Result; use crate::key::table_name::TableNameKey; impl CreateFlowProcedure { @@ -34,9 +32,8 @@ impl CreateFlowProcedure { Ok(()) } - /// Ensures all source tables exist and collects source table ids + /// Collects source table ids and keeps track of missing tables. pub(crate) async fn collect_source_tables(&mut self) -> Result<()> { - // Ensures all source tables exist. let keys = self .data .task @@ -52,22 +49,24 @@ impl CreateFlowProcedure { .batch_get(keys) .await?; - let source_table_ids = self + let mut resolved = Vec::with_capacity(self.data.task.source_table_names.len()); + let mut unresolved = Vec::new(); + + for (name, table_id) in self .data .task .source_table_names .iter() .zip(source_table_ids) - .map(|(name, table_id)| { - Ok(table_id - .with_context(|| error::TableNotFoundSnafu { - table_name: name.to_string(), - })? - .table_id()) - }) - .collect::>>()?; + { + match table_id { + Some(table_id) => resolved.push(table_id.table_id()), + None => unresolved.push(name.clone()), + } + } - self.data.source_table_ids = source_table_ids; + self.data.source_table_ids = resolved; + self.data.unresolved_source_table_names = unresolved; Ok(()) } } diff --git a/src/common/meta/src/ddl/drop_flow/metadata.rs b/src/common/meta/src/ddl/drop_flow/metadata.rs index 0437098be358..7afd00f9d5ac 100644 --- a/src/common/meta/src/ddl/drop_flow/metadata.rs +++ b/src/common/meta/src/ddl/drop_flow/metadata.rs @@ -43,7 +43,7 @@ impl DropFlowProcedure { .map(|(_, value)| value) .collect::>(); ensure!( - !flow_route_values.is_empty(), + flow_info_value.is_pending() || !flow_route_values.is_empty(), error::FlowRouteNotFoundSnafu { flow_name: format_full_flow_name(catalog_name, flow_name), } diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 344fc05024e6..a1a6c040f18f 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -16,12 +16,17 @@ use std::assert_matches; use std::collections::HashMap; use std::sync::Arc; +use api::v1::flow::CreateRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_procedure::Status; use common_procedure_test::execute_procedure_until_done; use table::table_name::TableName; use crate::ddl::DdlContext; -use crate::ddl::create_flow::{CreateFlowData, CreateFlowProcedure, CreateFlowState, FlowType}; +use crate::ddl::create_flow::{ + CreateFlowData, CreateFlowProcedure, CreateFlowState, DEFER_ON_MISSING_SOURCE_KEY, FlowType, + defer_on_missing_source, +}; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler; use crate::error; @@ -63,6 +68,11 @@ pub(crate) fn test_create_flow_task( } } +fn enable_defer_on_missing_source(task: &mut CreateFlowTask) { + task.flow_options + .insert(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string()); +} + #[tokio::test] async fn test_create_flow_source_table_not_found() { let source_table_names = vec![TableName::new( @@ -78,7 +88,261 @@ async fn test_create_flow_source_table_not_found() { let query_ctx = test_query_context(); let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); let err = procedure.on_prepare().await.unwrap_err(); - assert_matches!(err, error::Error::TableNotFound { .. }); + assert_matches!(err, error::Error::Unsupported { .. }); + assert!( + err.to_string() + .contains("requires WITH ('defer_on_missing_source'='true')") + ); +} + +#[tokio::test] +async fn test_create_pending_flow_source_table_not_found_with_defer() { + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_table", + )]; + let sink_table_name = + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"); + let mut task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); + enable_defer_on_missing_source(&mut task); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context.clone()); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true, .. }); + assert_eq!(procedure.data.unresolved_source_table_names.len(), 1); + assert_eq!(procedure.data.source_table_ids, Vec::::new()); + + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = *output.downcast_ref::().unwrap(); + let flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert_eq!(flow_info.source_table_ids(), Vec::::new()); + assert_eq!( + flow_info + .options() + .get(DEFER_ON_MISSING_SOURCE_KEY) + .map(String::as_str), + Some("true") + ); +} + +#[tokio::test] +async fn test_create_pending_flow_source_table_not_found_with_defer_false() { + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "my_table", + )]; + let sink_table_name = + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"); + let mut task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); + task.flow_options + .insert(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "false".to_string()); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::Unsupported { .. }); + assert!( + err.to_string() + .contains("requires WITH ('defer_on_missing_source'='true')") + ); +} + +#[tokio::test] +async fn test_create_pending_flow_records_partial_source_resolution() { + let existing_source = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "partial_existing_source_table", + ); + let missing_source = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "partial_missing_source_table", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "partial_pending_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let existing_table_id = 3026; + let create_table_task = + test_create_table_task("partial_existing_source_table", existing_table_id); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut task = test_create_flow_task( + "partial_pending_flow", + vec![existing_source.clone(), missing_source.clone()], + sink_table_name, + false, + ); + enable_defer_on_missing_source(&mut task); + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context.clone()); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true, .. }); + assert_eq!(procedure.data.source_table_ids, vec![existing_table_id]); + assert_eq!( + procedure.data.unresolved_source_table_names, + vec![missing_source.clone()] + ); + + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = *output.downcast_ref::().unwrap(); + let flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + + assert!(flow_info.is_pending()); + assert_eq!(flow_info.source_table_ids(), &[existing_table_id]); + let expected_all_sources = vec![existing_source, missing_source.clone()]; + assert_eq!( + flow_info.all_source_table_names(), + expected_all_sources.as_slice() + ); + assert_eq!(flow_info.unresolved_source_table_names(), &[missing_source]); + assert!(flow_info.flownode_ids().is_empty()); +} + +#[test] +fn test_defer_on_missing_source_defaults_false() { + let task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + + assert!(!defer_on_missing_source(&task).unwrap()); +} + +#[test] +fn test_defer_on_missing_source_true() { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.flow_options + .insert(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string()); + + assert!(defer_on_missing_source(&task).unwrap()); +} + +#[test] +fn test_defer_on_missing_source_invalid_value() { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.flow_options.insert( + DEFER_ON_MISSING_SOURCE_KEY.to_string(), + "invalid".to_string(), + ); + + let err = defer_on_missing_source(&task).unwrap_err(); + assert!( + err.to_string() + .contains("Invalid flow option 'defer_on_missing_source': invalid") + ); +} + +#[tokio::test] +async fn test_create_flow_rejects_unknown_option_in_meta_task() { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + task.flow_options + .insert("unknown_option".to_string(), "value".to_string()); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); + + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::Unexpected { .. }); + assert!( + err.to_string() + .contains("Unknown flow option 'unknown_option'") + ); +} + +#[test] +fn test_create_request_strips_defer_on_missing_source_runtime_option() { + let mut task = test_create_flow_task( + "my_flow", + vec![], + TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_sink_table"), + false, + ); + enable_defer_on_missing_source(&mut task); + + let data = CreateFlowData { + state: CreateFlowState::CreateFlows, + task, + flow_id: Some(1024), + peers: vec![], + source_table_ids: vec![], + unresolved_source_table_names: vec![], + flow_context: FlowQueryContext { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + snapshot_seqs: HashMap::new(), + sst_min_sequences: HashMap::new(), + }, + prev_flow_info_value: None, + did_replace: false, + flow_type: Some(FlowType::Batching), + }; + + let request: CreateRequest = (&data).into(); + + assert!( + !request + .flow_options + .contains_key(DEFER_ON_MISSING_SOURCE_KEY) + ); + assert_eq!( + request + .flow_options + .get(FlowType::FLOW_TYPE_KEY) + .map(String::as_str), + Some(FlowType::BATCHING) + ); } pub(crate) async fn create_test_flow( @@ -101,6 +365,27 @@ pub(crate) async fn create_test_flow( *flow_id } +pub(crate) async fn create_test_pending_flow( + ddl_context: &DdlContext, + flow_name: &str, + source_table_names: Vec, + sink_table_name: TableName, +) -> FlowId { + let mut task = test_create_flow_task( + flow_name, + source_table_names.clone(), + sink_table_name.clone(), + false, + ); + enable_defer_on_missing_source(&mut task); + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context.clone()); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let flow_id = output.downcast_ref::().unwrap(); + + *flow_id +} + #[tokio::test] async fn test_create_flow() { let table_id = 1024; @@ -154,6 +439,201 @@ async fn test_create_flow() { assert_matches!(err, error::Error::FlowAlreadyExists { .. }); } +#[tokio::test] +async fn test_replace_pending_flow_with_active_flow_is_unsupported() { + let source_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_source_table", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let pending_flow_id = create_test_pending_flow( + &ddl_context, + "replace_pending_flow", + vec![source_table_name.clone()], + sink_table_name.clone(), + ) + .await; + + let pending_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(pending_flow_id) + .await + .unwrap() + .unwrap(); + assert!(pending_flow.is_pending()); + assert!(pending_flow.flownode_ids().is_empty()); + + let create_table_task = test_create_table_task("replace_pending_source_table", 1026); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let mut replace_task = test_create_flow_task( + "replace_pending_flow", + vec![source_table_name], + sink_table_name, + false, + ); + replace_task.or_replace = true; + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone()); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::Unsupported { .. }); + assert!( + err.to_string() + .contains("Replacing between pending and active flow states") + ); +} + +#[tokio::test] +async fn test_replace_active_flow_with_pending_flow_is_unsupported() { + let existing_source_table = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_active_source_table", + ); + let missing_source_table = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_missing_source_table", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_active_sink_table", + ); + + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let create_table_task = test_create_table_task("replace_active_source_table", 2026); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let _flow_id = create_test_flow( + &ddl_context, + "replace_active_flow_to_pending", + vec![existing_source_table], + sink_table_name.clone(), + ) + .await; + + let mut replace_task = test_create_flow_task( + "replace_active_flow_to_pending", + vec![missing_source_table], + sink_table_name, + false, + ); + enable_defer_on_missing_source(&mut replace_task); + replace_task.or_replace = true; + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone()); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::Unsupported { .. }); + assert!( + err.to_string() + .contains("Replacing between pending and active flow states") + ); +} + +#[tokio::test] +async fn test_replace_pending_flow_with_pending_flow_updates_metadata() { + let first_missing_source = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_first_missing_source", + ); + let second_missing_source = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_second_missing_source", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_pending_to_pending_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let original_flow_id = create_test_pending_flow( + &ddl_context, + "replace_pending_to_pending_flow", + vec![first_missing_source.clone()], + sink_table_name.clone(), + ) + .await; + + let original_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(original_flow_id) + .await + .unwrap() + .unwrap(); + assert!(original_flow.is_pending()); + assert_eq!( + original_flow.unresolved_source_table_names(), + &[first_missing_source] + ); + assert!(original_flow.flownode_ids().is_empty()); + + let mut replace_task = test_create_flow_task( + "replace_pending_to_pending_flow", + vec![second_missing_source.clone()], + sink_table_name, + false, + ); + enable_defer_on_missing_source(&mut replace_task); + replace_task.or_replace = true; + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone()); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let replaced_flow_id = *output.downcast_ref::().unwrap(); + assert_eq!(replaced_flow_id, original_flow_id); + + let replaced_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(replaced_flow_id) + .await + .unwrap() + .unwrap(); + assert!(replaced_flow.is_pending()); + assert_eq!(replaced_flow.source_table_ids(), Vec::::new()); + assert_eq!( + replaced_flow.unresolved_source_table_names(), + std::slice::from_ref(&second_missing_source) + ); + assert_eq!( + replaced_flow.all_source_table_names(), + &[second_missing_source] + ); + assert!(replaced_flow.flownode_ids().is_empty()); +} + #[tokio::test] async fn test_create_flow_same_source_and_sink_table() { let table_id = 1024; @@ -228,6 +708,7 @@ fn test_create_flow_data_serialization_backward_compatibility() { "flow_id": null, "peers": [], "source_table_ids": [], + "unresolved_source_table_names": [], "query_context": { "current_catalog": "old_catalog", "current_schema": "old_schema", @@ -265,6 +746,7 @@ fn test_create_flow_data_new_format_serialization() { flow_id: None, peers: vec![], source_table_ids: vec![], + unresolved_source_table_names: vec![], flow_context, prev_flow_info_value: None, did_replace: false, @@ -327,6 +809,7 @@ fn test_flow_info_conversion_with_flow_context() { flow_id: Some(123), peers: vec![], source_table_ids: vec![456, 789], + unresolved_source_table_names: vec![], flow_context, prev_flow_info_value: None, did_replace: false, diff --git a/src/common/meta/src/ddl/tests/drop_flow.rs b/src/common/meta/src/ddl/tests/drop_flow.rs index af34da4809d3..400fd2e118bf 100644 --- a/src/common/meta/src/ddl/tests/drop_flow.rs +++ b/src/common/meta/src/ddl/tests/drop_flow.rs @@ -23,7 +23,7 @@ use table::table_name::TableName; use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler; -use crate::ddl::tests::create_flow::create_test_flow; +use crate::ddl::tests::create_flow::{create_test_flow, create_test_pending_flow}; use crate::error; use crate::key::table_route::TableRouteValue; use crate::rpc::ddl::DropFlowTask; @@ -91,3 +91,45 @@ async fn test_drop_flow() { let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, error::Error::FlowNotFound { .. }); } + +#[tokio::test] +async fn test_drop_pending_flow_without_routes() { + let source_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "drop_pending_missing_source_table", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "drop_pending_sink_table", + ); + let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); + let ddl_context = new_ddl_context(node_manager); + + let flow_id = create_test_pending_flow( + &ddl_context, + "drop_pending_flow", + vec![source_table_name], + sink_table_name, + ) + .await; + let flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(flow_info.is_pending()); + assert!(flow_info.flownode_ids().is_empty()); + + let task = test_drop_flow_task("drop_pending_flow", flow_id, false); + let mut procedure = DropFlowProcedure::new(task, ddl_context.clone()); + execute_procedure_until_done(&mut procedure).await; + + let task = test_drop_flow_task("drop_pending_flow", flow_id, false); + let mut procedure = DropFlowProcedure::new(task, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, error::Error::FlowNotFound { .. }); +} diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index d581b926853d..bc9aaaa6b370 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -459,6 +459,7 @@ mod tests { use super::*; use crate::FlownodeId; + use crate::key::flow::flow_info::FlowStatus; use crate::key::flow::table_flow::TableFlowKey; use crate::key::node_address::{NodeAddressKey, NodeAddressValue}; use crate::key::{FlowPartitionId, MetadataValue}; @@ -522,6 +523,8 @@ mod tests { query_context: None, flow_name: flow_name.to_string(), source_table_ids, + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name, flownode_ids, raw_sql: "raw".to_string(), @@ -529,6 +532,7 @@ mod tests { eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), + status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), } @@ -774,6 +778,8 @@ mod tests { query_context: None, flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), @@ -781,6 +787,7 @@ mod tests { eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), + status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), }; @@ -1151,6 +1158,8 @@ mod tests { query_context: None, flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], + all_source_table_names: vec![], + unresolved_source_table_names: vec![], sink_table_name: another_sink_table_name, flownode_ids: [(0, 1u64)].into(), raw_sql: "raw".to_string(), @@ -1158,6 +1167,7 @@ mod tests { eval_interval_secs: None, comment: "hi".to_string(), options: Default::default(), + status: FlowStatus::Active, created_time: chrono::Utc::now(), updated_time: chrono::Utc::now(), }; diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index d501822c3c9e..522860e418c6 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -16,6 +16,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use chrono::{DateTime, Utc}; +use futures::TryStreamExt; +use futures::stream::BoxStream; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -27,12 +29,24 @@ use crate::FlownodeId; use crate::error::{self, Result}; use crate::key::flow::FlowScoped; use crate::key::txn_helper::TxnOpGetResponseSet; -use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue}; +use crate::key::{ + BytesAdapter, DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey, MetadataValue, +}; use crate::kv_backend::KvBackendRef; use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; +use crate::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream}; +use crate::rpc::KeyValue; +use crate::rpc::store::RangeRequest; pub const FLOW_INFO_KEY_PREFIX: &str = "info"; +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum FlowStatus { + PendingSources, + #[default] + Active, +} + lazy_static! { static ref FLOW_INFO_KEY_PATTERN: Regex = Regex::new(&format!("^{FLOW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); @@ -114,7 +128,12 @@ impl<'a> MetadataKey<'a, FlowInfoKeyInner> for FlowInfoKeyInner { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowInfoValue { /// The source tables used by the flow. + #[serde(default)] pub source_table_ids: Vec, + #[serde(default)] + pub all_source_table_names: Vec, + #[serde(default)] + pub unresolved_source_table_names: Vec, /// The sink table used by the flow. pub sink_table_name: TableName, /// Which flow nodes this flow is running on. @@ -145,6 +164,8 @@ pub struct FlowInfoValue { pub comment: String, /// The options. pub options: HashMap, + #[serde(default)] + pub status: FlowStatus, /// The created time #[serde(default)] pub created_time: DateTime, @@ -154,6 +175,14 @@ pub struct FlowInfoValue { } impl FlowInfoValue { + pub fn is_pending(&self) -> bool { + self.status == FlowStatus::PendingSources + } + + pub fn is_active(&self) -> bool { + self.status == FlowStatus::Active + } + /// Returns the `flownode_id`. pub fn flownode_ids(&self) -> &BTreeMap { &self.flownode_ids @@ -173,6 +202,14 @@ impl FlowInfoValue { &self.source_table_ids } + pub fn all_source_table_names(&self) -> &[TableName] { + &self.all_source_table_names + } + + pub fn unresolved_source_table_names(&self) -> &[TableName] { + &self.unresolved_source_table_names + } + pub fn catalog_name(&self) -> &String { &self.catalog_name } @@ -209,6 +246,10 @@ impl FlowInfoValue { &self.options } + pub fn status(&self) -> &FlowStatus { + &self.status + } + pub fn created_time(&self) -> &DateTime { &self.created_time } @@ -225,6 +266,12 @@ pub struct FlowInfoManager { kv_backend: KvBackendRef, } +pub fn flow_info_decoder(kv: KeyValue) -> Result<(FlowInfoKey, FlowInfoValue)> { + let key = FlowInfoKey::from_bytes(&kv.key)?; + let value = FlowInfoValue::try_from_raw_value(&kv.value)?; + Ok((key, value)) +} + impl FlowInfoManager { /// Returns a new [FlowInfoManager]. pub fn new(kv_backend: KvBackendRef) -> Self { @@ -254,6 +301,23 @@ impl FlowInfoManager { .transpose() } + pub fn flow_infos(&self) -> BoxStream<'static, Result<(FlowId, FlowInfoValue)>> { + let start_key = FlowScoped::new(BytesAdapter::from( + format!("{FLOW_INFO_KEY_PREFIX}/").into_bytes(), + )) + .to_bytes(); + let req = RangeRequest::new().with_prefix(start_key); + let stream = PaginationStream::new( + self.kv_backend.clone(), + req, + DEFAULT_PAGE_SIZE, + flow_info_decoder, + ) + .into_stream(); + + Box::pin(stream.map_ok(|(key, value)| (key.flow_id(), value))) + } + /// Builds a create flow transaction. /// It is expected that the `__flow/info/{flow_id}` wasn't occupied. /// Otherwise, the transaction will retrieve existing value. diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 6fd6e4adb431..0dab4148bb4b 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -34,7 +34,7 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, is_reado use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; -use common_meta::ddl::create_flow::FlowType; +use common_meta::ddl::create_flow::{DEFER_ON_MISSING_SOURCE_KEY, FlowType}; use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::procedure_executor::ExecutorContext; @@ -113,7 +113,6 @@ struct DdlSubmitOptions { timeout: Duration, } -const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source"; const ALLOWED_FLOW_OPTIONS: [&str; 1] = [DEFER_ON_MISSING_SOURCE_KEY]; fn build_procedure_id_output(procedure_id: Vec) -> Result { @@ -204,6 +203,39 @@ fn validate_and_normalize_flow_options( .collect() } +fn determine_flow_type_for_source_state( + flow_name: &str, + flow_options: &HashMap, + has_missing_source_table: bool, + has_instant_ttl_source_table: bool, +) -> Result> { + if has_missing_source_table { + let defer_on_missing_source = flow_options + .get(DEFER_ON_MISSING_SOURCE_KEY) + .is_some_and(|value| value == "true"); + ensure!( + defer_on_missing_source, + InvalidSqlSnafu { + err_msg: format!( + "missing source tables for flow '{}'; use WITH ({DEFER_ON_MISSING_SOURCE_KEY} = true) to create a pending flow", + flow_name + ) + } + ); + info!( + "Flow `{}` is created as a pending batching flow because source tables are missing and defer_on_missing_source=true", + flow_name + ); + return Ok(Some(FlowType::Batching)); + } + + if has_instant_ttl_source_table { + return Ok(Some(FlowType::Streaming)); + } + + Ok(None) +} + impl StatementExecutor { pub fn catalog_manager(&self) -> CatalogManagerRef { self.catalog_manager.clone() @@ -714,7 +746,9 @@ impl StatementExecutor { expr: &CreateFlowExpr, query_ctx: QueryContextRef, ) -> Result { - // first check if source table's ttl is instant, if it is, force streaming mode + let mut has_missing_source_table = false; + let mut has_instant_ttl_source_table = false; + for src_table_name in &expr.source_table_names { let table = self .catalog_manager() @@ -726,16 +760,13 @@ impl StatementExecutor { ) .await .map_err(BoxedError::new) - .context(ExternalSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &src_table_name.catalog_name, - &src_table_name.schema_name, - &src_table_name.table_name, - ), - })?; + .context(ExternalSnafu)?; + + let Some(table) = table else { + has_missing_source_table = true; + continue; + }; - // instant source table can only be handled by streaming mode if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) { warn!( "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode", @@ -746,10 +777,19 @@ impl StatementExecutor { ), expr.flow_name ); - return Ok(FlowType::Streaming); + has_instant_ttl_source_table = true; } } + if let Some(flow_type) = determine_flow_type_for_source_state( + &expr.flow_name, + &expr.flow_options, + has_missing_source_table, + has_instant_ttl_source_table, + )? { + return Ok(flow_type); + } + let engine = &self.query_engine; let stmts = ParserContext::create_with_dialect( &expr.sql, @@ -2472,6 +2512,35 @@ SELECT max(c1), min(c2) FROM schema_2.table_2;"; )); } + #[test] + fn test_determine_flow_type_for_source_state_missing_sources_require_opt_in() { + let err = determine_flow_type_for_source_state("my_flow", &HashMap::new(), true, false) + .unwrap_err(); + + assert!(err.to_string().contains( + "missing source tables for flow 'my_flow'; use WITH (defer_on_missing_source = true) to create a pending flow" + )); + } + + #[test] + fn test_determine_flow_type_for_source_state_missing_sources_prefer_batching() { + let flow_options = + HashMap::from([(DEFER_ON_MISSING_SOURCE_KEY.to_string(), "true".to_string())]); + + assert_eq!( + determine_flow_type_for_source_state("my_flow", &flow_options, true, true).unwrap(), + Some(FlowType::Batching) + ); + } + + #[test] + fn test_determine_flow_type_for_source_state_instant_ttl_without_missing_sources() { + assert_eq!( + determine_flow_type_for_source_state("my_flow", &HashMap::new(), false, true).unwrap(), + Some(FlowType::Streaming) + ); + } + #[test] fn test_name_is_match() { assert!(!NAME_PATTERN_REG.is_match("/adaf")); diff --git a/tests/cases/standalone/common/flow/flow_pending.result b/tests/cases/standalone/common/flow/flow_pending.result new file mode 100644 index 000000000000..d6fe01b38a8f --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_pending.result @@ -0,0 +1,52 @@ +CREATE FLOW pending_without_defer +SINK TO pending_sink +AS SELECT val FROM pending_source; + +Error: 1004(InvalidArguments), Invalid SQL, error: missing source tables for flow 'pending_without_defer'; use WITH (defer_on_missing_source = true) to create a pending flow + +CREATE FLOW pending_with_defer +SINK TO pending_sink +WITH (defer_on_missing_source = true) +AS SELECT val FROM pending_source WHERE val > 10; + +Affected Rows: 0 + +SHOW CREATE FLOW pending_with_defer; + ++--------------------+--------------------------------------------------+ +| Flow | Create Flow | ++--------------------+--------------------------------------------------+ +| pending_with_defer | CREATE FLOW IF NOT EXISTS pending_with_defer | +| | SINK TO public.pending_sink | +| | WITH (defer_on_missing_source = 'true') | +| | AS SELECT val FROM pending_source WHERE val > 10 | ++--------------------+--------------------------------------------------+ + +SELECT + flow_definition, + source_table_ids, + source_table_names, + flownode_ids, + options LIKE '%"defer_on_missing_source":"true"%' AS has_defer_option, + options LIKE '%"flow_type":"batching"%' AS has_flow_type_option +FROM INFORMATION_SCHEMA.FLOWS +WHERE flow_name = 'pending_with_defer'; + ++--------------------------------------------------+------------------+--------------------+--------------+------------------+----------------------+ +| flow_definition | source_table_ids | source_table_names | flownode_ids | has_defer_option | has_flow_type_option | ++--------------------------------------------------+------------------+--------------------+--------------+------------------+----------------------+ +| CREATE FLOW IF NOT EXISTS pending_with_defer | [] | | {} | true | true | +| SINK TO public.pending_sink | | | | | | +| WITH (defer_on_missing_source = 'true') | | | | | | +| AS SELECT val FROM pending_source WHERE val > 10 | | | | | | ++--------------------------------------------------+------------------+--------------------+--------------+------------------+----------------------+ + +DROP FLOW pending_with_defer; + +Affected Rows: 0 + +SELECT flow_name FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'pending_with_defer'; + +++ +++ + diff --git a/tests/cases/standalone/common/flow/flow_pending.sql b/tests/cases/standalone/common/flow/flow_pending.sql new file mode 100644 index 000000000000..498f5b278250 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_pending.sql @@ -0,0 +1,24 @@ +CREATE FLOW pending_without_defer +SINK TO pending_sink +AS SELECT val FROM pending_source; + +CREATE FLOW pending_with_defer +SINK TO pending_sink +WITH (defer_on_missing_source = true) +AS SELECT val FROM pending_source WHERE val > 10; + +SHOW CREATE FLOW pending_with_defer; + +SELECT + flow_definition, + source_table_ids, + source_table_names, + flownode_ids, + options LIKE '%"defer_on_missing_source":"true"%' AS has_defer_option, + options LIKE '%"flow_type":"batching"%' AS has_flow_type_option +FROM INFORMATION_SCHEMA.FLOWS +WHERE flow_name = 'pending_with_defer'; + +DROP FLOW pending_with_defer; + +SELECT flow_name FROM INFORMATION_SCHEMA.FLOWS WHERE flow_name = 'pending_with_defer';