Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/common/meta/src/cache/flow/table_flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod tests {
use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::FlowMetadataManager;
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
Expand Down Expand Up @@ -242,11 +242,14 @@ mod tests {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
query_context: None,
flow_name: "my_flow".to_string(),
all_source_table_names: vec![],
unresolved_source_table_names: vec![],
raw_sql: "sql".to_string(),
expire_after: Some(300),
eval_interval_secs: None,
comment: "comment".to_string(),
options: Default::default(),
status: FlowStatus::Active,
created_time: chrono::Utc::now(),
updated_time: chrono::Utc::now(),
},
Expand Down
131 changes: 124 additions & 7 deletions src/common/meta/src/ddl/create_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

mod metadata;

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt;

use api::v1::ExpireAfter;
Expand All @@ -34,13 +34,14 @@ use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure};
use strum::AsRefStr;
use table::metadata::TableId;
use table::table_name::TableName;

use crate::cache_invalidator::Context;
use crate::ddl::DdlContext;
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
use crate::error::{self, Result, UnexpectedSnafu};
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::flow_info::{FlowInfoValue, FlowStatus};
use crate::key::flow::flow_route::FlowRouteValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
Expand All @@ -67,6 +68,7 @@ impl CreateFlowProcedure {
flow_id: None,
peers: vec![],
source_table_ids: vec![],
unresolved_source_table_names: vec![],
flow_context: query_context.into(), // Convert to FlowQueryContext
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
Expand All @@ -89,6 +91,8 @@ impl CreateFlowProcedure {
let create_if_not_exists = self.data.task.create_if_not_exists;
let or_replace = self.data.task.or_replace;

validate_flow_options(&self.data.task)?;

let flow_name_value = self
.context
.flow_metadata_manager
Expand Down Expand Up @@ -167,6 +171,21 @@ impl CreateFlowProcedure {
}

self.collect_source_tables().await?;
ensure!(
self.data.unresolved_source_table_names.is_empty()
|| defer_on_missing_source(&self.data.task)?,
error::UnsupportedSnafu {
operation: format!(
"Create flow with missing source tables requires WITH ('{DEFER_ON_MISSING_SOURCE_KEY}'='true'): {}",
self.data
.unresolved_source_table_names
.iter()
.map(ToString::to_string)
.join(", ")
)
}
);
self.ensure_supported_replace_transition()?;

// Validate that source and sink tables are not the same
let sink_table_name = &self.data.task.sink_table_name;
Expand All @@ -189,13 +208,38 @@ impl CreateFlowProcedure {
if self.data.flow_id.is_none() {
self.allocate_flow_id().await?;
}
self.data.state = CreateFlowState::CreateFlows;
// determine flow type
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);

self.data.state = if self.data.is_pending() {
self.data.peers.clear();
CreateFlowState::CreateMetadata
} else {
CreateFlowState::CreateFlows
};

Ok(Status::executing(true))
}

fn ensure_supported_replace_transition(&self) -> Result<()> {
if !self.data.task.or_replace {
return Ok(());
}

let Some(prev_flow_info) = self.data.prev_flow_info_value.as_ref() else {
return Ok(());
};
let prev_pending = prev_flow_info.get_inner_ref().is_pending();
let new_pending = self.data.is_pending();
ensure!(
prev_pending == new_pending,
error::UnsupportedSnafu {
operation: "Replacing between pending and active flow states is not supported yet"
}
);

Ok(())
}

async fn on_flownode_create_flows(&mut self) -> Result<Status> {
// Safety: must be allocated.
let mut create_flow = Vec::with_capacity(self.data.peers.len());
Expand Down Expand Up @@ -365,6 +409,60 @@ pub fn get_flow_type_from_options(flow_task: &CreateFlowTask) -> Result<FlowType
}
}

pub const DEFER_ON_MISSING_SOURCE_KEY: &str = "defer_on_missing_source";

pub fn defer_on_missing_source(flow_task: &CreateFlowTask) -> Result<bool> {
flow_task
.flow_options
.get(DEFER_ON_MISSING_SOURCE_KEY)
.map(|value| {
value
.trim()
.to_ascii_lowercase()
.parse::<bool>()
.map_err(|_| {
error::UnexpectedSnafu {
err_msg: format!(
"Invalid flow option '{DEFER_ON_MISSING_SOURCE_KEY}': {value}"
),
}
.build()
})
})
.transpose()
.map(|value| value.unwrap_or(false))
}

pub fn validate_flow_options(flow_task: &CreateFlowTask) -> Result<()> {
for key in flow_task.flow_options.keys() {
match key.as_str() {
DEFER_ON_MISSING_SOURCE_KEY | FlowType::FLOW_TYPE_KEY => {}
unknown => {
return UnexpectedSnafu {
err_msg: format!(
"Unknown flow option '{unknown}', supported user options: {DEFER_ON_MISSING_SOURCE_KEY}"
),
}
.fail();
}
}
}

defer_on_missing_source(flow_task)?;
get_flow_type_from_options(flow_task)?;
Ok(())
}

fn user_runtime_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
let mut options = options.clone();
options.remove(DEFER_ON_MISSING_SOURCE_KEY);
options
}

fn metadata_flow_options(options: &HashMap<String, String>) -> HashMap<String, String> {
options.clone()
}

/// The state of [CreateFlowProcedure].
#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)]
pub enum CreateFlowState {
Expand Down Expand Up @@ -411,6 +509,8 @@ pub struct CreateFlowData {
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
#[serde(default)]
pub(crate) unresolved_source_table_names: Vec<TableName>,
/// Use alias for backward compatibility with QueryContext serialized data
#[serde(alias = "query_context")]
pub(crate) flow_context: FlowQueryContext,
Expand All @@ -424,6 +524,16 @@ pub struct CreateFlowData {
pub(crate) flow_type: Option<FlowType>,
}

impl CreateFlowData {
pub(crate) fn is_pending(&self) -> bool {
!self.unresolved_source_table_names.is_empty()
}

pub(crate) fn is_active(&self) -> bool {
!self.is_pending()
}
}

impl From<&CreateFlowData> for CreateRequest {
fn from(value: &CreateFlowData) -> Self {
let flow_id = value.flow_id.unwrap();
Expand All @@ -446,7 +556,7 @@ impl From<&CreateFlowData> for CreateRequest {
.map(|seconds| api::v1::EvalInterval { seconds }),
comment: value.task.comment.clone(),
sql: value.task.sql.clone(),
flow_options: value.task.flow_options.clone(),
flow_options: user_runtime_flow_options(&value.task.flow_options),
};

let flow_type = value.flow_type.unwrap_or_default().to_string();
Expand All @@ -466,9 +576,9 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
eval_interval_secs: eval_interval,
comment,
sql,
flow_options: mut options,
..
} = value.task.clone();
let mut options = metadata_flow_options(&value.task.flow_options);

let flownode_ids = value
.peers
Expand All @@ -484,7 +594,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
.collect::<Vec<_>>();

let flow_type = value.flow_type.unwrap_or_default().to_string();
options.insert("flow_type".to_string(), flow_type);
options.insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type);

let mut create_time = chrono::Utc::now();
if let Some(prev_flow_value) = value.prev_flow_info_value.as_ref()
Expand All @@ -495,6 +605,8 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa

let flow_info: FlowInfoValue = FlowInfoValue {
source_table_ids: value.source_table_ids.clone(),
all_source_table_names: value.task.source_table_names.clone(),
unresolved_source_table_names: value.unresolved_source_table_names.clone(),
sink_table_name,
flownode_ids,
catalog_name,
Expand All @@ -506,6 +618,11 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
eval_interval_secs: eval_interval,
comment,
options,
status: if value.is_active() {
FlowStatus::Active
} else {
FlowStatus::PendingSources
},
created_time: create_time,
updated_time: chrono::Utc::now(),
};
Expand Down
29 changes: 14 additions & 15 deletions src/common/meta/src/ddl/create_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use snafu::OptionExt;

use crate::ddl::create_flow::CreateFlowProcedure;
use crate::error::{self, Result};
use crate::error::Result;
use crate::key::table_name::TableNameKey;

impl CreateFlowProcedure {
Expand All @@ -34,9 +32,8 @@ impl CreateFlowProcedure {
Ok(())
}

/// Ensures all source tables exist and collects source table ids
/// Collects source table ids and keeps track of missing tables.
pub(crate) async fn collect_source_tables(&mut self) -> Result<()> {
// Ensures all source tables exist.
let keys = self
.data
.task
Expand All @@ -52,22 +49,24 @@ impl CreateFlowProcedure {
.batch_get(keys)
.await?;

let source_table_ids = self
let mut resolved = Vec::with_capacity(self.data.task.source_table_names.len());
let mut unresolved = Vec::new();

for (name, table_id) in self
.data
.task
.source_table_names
.iter()
.zip(source_table_ids)
.map(|(name, table_id)| {
Ok(table_id
.with_context(|| error::TableNotFoundSnafu {
table_name: name.to_string(),
})?
.table_id())
})
.collect::<Result<Vec<_>>>()?;
{
match table_id {
Some(table_id) => resolved.push(table_id.table_id()),
None => unresolved.push(name.clone()),
}
}

self.data.source_table_ids = source_table_ids;
self.data.source_table_ids = resolved;
self.data.unresolved_source_table_names = unresolved;
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/common/meta/src/ddl/drop_flow/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl DropFlowProcedure {
.map(|(_, value)| value)
.collect::<Vec<_>>();
ensure!(
!flow_route_values.is_empty(),
flow_info_value.is_pending() || !flow_route_values.is_empty(),
error::FlowRouteNotFoundSnafu {
flow_name: format_full_flow_name(catalog_name, flow_name),
}
Expand Down
Loading
Loading