diff --git a/Cargo.lock b/Cargo.lock index 1f65f1289c..4099c94a52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12578,6 +12578,7 @@ dependencies = [ "tokio", "tokio-postgres", "tokio-stream", + "toml 0.8.23", ] [[package]] diff --git a/src/common/meta/src/ddl/activate_flow.rs b/src/common/meta/src/ddl/activate_flow.rs index 25498bdfbd..e39bc72418 100644 --- a/src/common/meta/src/ddl/activate_flow.rs +++ b/src/common/meta/src/ddl/activate_flow.rs @@ -44,6 +44,11 @@ use crate::lock_key::{CatalogLock, FlowLock, FlowNameLock}; use crate::metrics; use crate::peer::Peer; +/// Activates a pending flow after all source tables become available. +/// +/// The procedure resolves missing source tables, validates the final activation +/// mode, creates the runtime flow on flownodes, updates metadata to active, and +/// invalidates related caches. pub struct ActivatePendingFlowProcedure { pub context: DdlContext, pub data: ActivatePendingFlowData, @@ -104,6 +109,10 @@ impl ActivatePendingFlowProcedure { .get_raw(self.data.flow_id) .await? else { + common_telemetry::debug!( + "Pending flow {} no longer exists during activation", + self.data.flow_id + ); return Ok(Status::done()); }; @@ -123,6 +132,12 @@ impl ActivatePendingFlowProcedure { let resolution = resolve_pending_flow_sources(&self.context, current_flow_info.get_inner_ref()).await?; + common_telemetry::debug!( + "Resolved pending flow {} source tables: {} resolved, {} unresolved", + self.data.flow_id, + resolution.resolved_table_ids.len(), + resolution.unresolved_source_table_names.len() + ); if !resolution.unresolved_source_table_names.is_empty() { update_pending_flow_metadata( &self.context, @@ -136,19 +151,21 @@ impl ActivatePendingFlowProcedure { return Ok(Status::done()); } - if let Some(reason) = - validate_batching_activation(&self.context, &resolution.resolved_table_ids).await? - { - update_pending_flow_metadata( - &self.context, - self.data.flow_id, - ¤t_flow_info, - resolution.resolved_table_ids, - vec![], - Some(reason), - ) - .await?; - return Ok(Status::done()); + if get_flow_type(current_flow_info.get_inner_ref()) == FlowType::Batching { + if let Some(reason) = + validate_batching_activation(&self.context, &resolution.resolved_table_ids).await? + { + update_pending_flow_metadata( + &self.context, + self.data.flow_id, + ¤t_flow_info, + resolution.resolved_table_ids, + vec![], + Some(reason), + ) + .await?; + return Ok(Status::done()); + } } self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?; @@ -172,6 +189,11 @@ impl ActivatePendingFlowProcedure { flow_info.get_inner_ref(), &self.data.resolved_table_ids, ); + debug_assert_eq!( + self.data.resolved_table_ids.len(), + flow_info.get_inner_ref().all_source_table_names().len(), + "All source tables must be resolved before pending flow activation" + ); create_flow_on_peers( &self.context, &self.data.peers, @@ -258,12 +280,12 @@ impl Procedure for ActivatePendingFlowProcedure { // we should lock the flow name to avoid concurrent modification with create/replace flow procedure LockKey::new(vec![ CatalogLock::Read(&self.data.catalog_name).into(), - FlowLock::Write(self.data.flow_id).into(), FlowNameLock::new( flow_info.get_inner_ref().catalog_name(), flow_info.get_inner_ref().flow_name(), ) .into(), + FlowLock::Write(self.data.flow_id).into(), ]) } else { LockKey::new(vec![ @@ -368,7 +390,7 @@ fn build_create_request( let mut flow_options = flow_info.options.clone(); flow_options.insert( FlowType::FLOW_TYPE_KEY.to_string(), - FlowType::Batching.to_string(), + get_flow_type(flow_info).to_string(), ); CreateRequest { flow_id: Some(api::v1::FlowId { id: flow_id }), @@ -425,6 +447,7 @@ fn build_active_flow_info( peers: &[Peer], resolved_table_ids: &[TableId], ) -> (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteValue)>) { + let flow_type = get_flow_type(current_flow_info); let flownode_ids = peers .iter() .enumerate() @@ -442,15 +465,25 @@ fn build_active_flow_info( new_flow_info.flownode_ids = flownode_ids; new_flow_info.status = FlowStatus::Active; new_flow_info.last_activation_error = None; - new_flow_info.options.insert( - FlowType::FLOW_TYPE_KEY.to_string(), - FlowType::Batching.to_string(), - ); + new_flow_info + .options + .insert(FlowType::FLOW_TYPE_KEY.to_string(), flow_type.to_string()); new_flow_info.updated_time = chrono::Utc::now(); (new_flow_info, flow_routes) } +fn get_flow_type(flow_info: &FlowInfoValue) -> FlowType { + match flow_info + .options() + .get(FlowType::FLOW_TYPE_KEY) + .map(String::as_str) + { + Some(FlowType::STREAMING) => FlowType::Streaming, + _ => FlowType::Batching, + } +} + async fn invalidate_flow_cache( context: &DdlContext, flow_id: FlowId, diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 658dc1cd04..242cad0ceb 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -438,7 +438,8 @@ pub enum CreateFlowState { Prepare, /// Creates flows on the flownode. CreateFlows, - /// (Optional, only when replacing a old flow with a pending flow)Drops old flows on the flownode. + /// Optional state used when replacing an old flow with a pending flow. + /// Drops old flows on the flownode. DropOldFlows, /// Invalidate flow cache. InvalidateFlowCache, diff --git a/src/common/meta/src/ddl/tests/activate_flow.rs b/src/common/meta/src/ddl/tests/activate_flow.rs index 4b30936ede..14744c9122 100644 --- a/src/common/meta/src/ddl/tests/activate_flow.rs +++ b/src/common/meta/src/ddl/tests/activate_flow.rs @@ -22,6 +22,7 @@ use common_time::TimeToLive; use table::table_name::TableName; use crate::ddl::activate_flow::ActivatePendingFlowProcedure; +use crate::ddl::create_flow::FlowType; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::tests::create_flow::create_test_flow; use crate::key::table_route::TableRouteValue; @@ -256,3 +257,94 @@ async fn test_activate_pending_flow_uses_replace_semantics() { assert!(req.or_replace); } } + +#[tokio::test] +async fn test_activate_pending_flow_preserves_streaming_type() { + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "activate_streaming_source_table", + )]; + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "activate_streaming_sink_table", + ); + + let handler = RecordingFlownodeHandler::default(); + let node_manager = Arc::new(MockFlownodeManager::new(handler.clone())); + let ddl_context = new_ddl_context(node_manager); + + let flow_id = create_test_flow( + &ddl_context, + "activate_streaming_pending_flow", + source_table_names, + sink_table_name, + ) + .await; + + let pending_flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get_raw(flow_id) + .await + .unwrap() + .unwrap(); + let mut updated_flow_info = pending_flow_info.get_inner_ref().clone(); + updated_flow_info.options.insert( + FlowType::FLOW_TYPE_KEY.to_string(), + FlowType::Streaming.to_string(), + ); + ddl_context + .flow_metadata_manager + .update_flow_metadata(flow_id, &pending_flow_info, &updated_flow_info, vec![]) + .await + .unwrap(); + + let create_table_task = test_create_table_task("activate_streaming_source_table", 1028); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + Default::default(), + ) + .await + .unwrap(); + + let mut procedure = ActivatePendingFlowProcedure::new( + flow_id, + DEFAULT_CATALOG_NAME.to_string(), + ddl_context.clone(), + ); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let activated_flow_id = output.downcast_ref::().unwrap(); + assert_eq!(*activated_flow_id, flow_id); + + let activated_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(activated_flow.is_active()); + assert_eq!( + activated_flow + .options() + .get(FlowType::FLOW_TYPE_KEY) + .map(String::as_str), + Some(FlowType::STREAMING) + ); + + let create_requests = handler.create_requests.lock().unwrap(); + assert!(!create_requests.is_empty()); + for req in create_requests.iter() { + assert_eq!( + req.flow_options + .get(FlowType::FLOW_TYPE_KEY) + .map(String::as_str), + Some(FlowType::STREAMING) + ); + } +} diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index cd4777494b..3c507806d7 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -124,6 +124,10 @@ async fn test_create_flow_source_table_not_found() { .unwrap(); assert!(flow_info.is_pending()); assert_eq!(flow_info.unresolved_source_table_names().len(), 1); + assert_eq!( + flow_info.unresolved_source_table_names()[0].table_name, + "my_table" + ); } pub(crate) async fn create_test_flow( diff --git a/src/meta-srv/src/flow.rs b/src/meta-srv/src/flow.rs index ea8f9af01c..8f7c22b9b1 100644 --- a/src/meta-srv/src/flow.rs +++ b/src/meta-srv/src/flow.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use common_meta::ddl_manager::DdlManagerRef; +use common_meta::key::table_name::TableNameKey; use common_telemetry::{error, info}; use futures::TryStreamExt; use snafu::ResultExt; @@ -79,41 +80,84 @@ impl PendingFlowReconcileManager { async fn handle_tick(&self) -> Result<()> { let ddl_context = self.ddl_manager.create_context(); - let flow_infos = ddl_context + let ddl_manager = self.ddl_manager.clone(); + ddl_context .flow_metadata_manager .flow_info_manager() .flow_infos() - .try_collect::>() + .try_for_each(move |(flow_id, flow_info)| { + let ddl_context = ddl_context.clone(); + let ddl_manager = ddl_manager.clone(); + async move { + if !flow_info.is_pending() { + return Ok(()); + } + + let current_flow_info = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get_raw(flow_id) + .await; + let current_flow_info = match current_flow_info { + Ok(current_flow_info) => current_flow_info, + Err(e) => { + error!(e; "Failed to load flow metadata for pending flow {}", flow_id); + return Ok(()); + } + }; + let Some(current_flow_info) = current_flow_info else { + return Ok(()); + }; + if !current_flow_info.get_inner_ref().is_pending() { + return Ok(()); + } + + let unresolved_source_table_names = current_flow_info + .get_inner_ref() + .unresolved_source_table_names(); + if !unresolved_source_table_names.is_empty() { + let unresolved_table_keys = unresolved_source_table_names + .iter() + .map(|name| { + TableNameKey::new( + &name.catalog_name, + &name.schema_name, + &name.table_name, + ) + }) + .collect::>(); + let resolved_tables = ddl_context + .table_metadata_manager + .table_name_manager() + .batch_get(unresolved_table_keys) + .await; + let resolved_tables = match resolved_tables { + Ok(resolved_tables) => resolved_tables, + Err(e) => { + error!(e; "Failed to resolve source tables for pending flow {}", flow_id); + return Ok(()); + } + }; + if resolved_tables.iter().all(|table_id| table_id.is_none()) { + return Ok(()); + } + } + + if let Err(e) = ddl_manager + .submit_activate_pending_flow_task( + flow_id, + current_flow_info.get_inner_ref().catalog_name().clone(), + ) + .await + { + error!(e; "Failed to reconcile pending flow {}", flow_id); + } + + Ok(()) + } + }) .await .context(PendingFlowReconcileSnafu)?; - let pending_flows = flow_infos - .into_iter() - .filter_map(|(flow_id, flow_info)| { - flow_info - .is_pending() - .then_some((flow_id, flow_info.catalog_name().clone())) - }) - .collect::>(); - - for (flow_id, catalog_name) in pending_flows { - let current_flow_info = ddl_context - .flow_metadata_manager - .flow_info_manager() - .get_raw(flow_id) - .await - .context(PendingFlowReconcileSnafu)?; - let Some(current_flow_info) = current_flow_info else { - continue; - }; - if !current_flow_info.get_inner_ref().is_pending() { - continue; - } - let _ = self - .ddl_manager - .submit_activate_pending_flow_task(flow_id, catalog_name) - .await - .context(PendingFlowReconcileSnafu)?; - } Ok(()) } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 37e2f9f93b..24e041dfa4 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -657,8 +657,10 @@ impl StatementExecutor { } /// Determine the flow type based on the SQL query + /// Determines the flow type from source-table state and SQL shape. /// - /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow + /// Missing source tables keep the flow pending and default it to batching. + /// Otherwise, aggregation, distinct, and TQL queries are batching; the rest are streaming. async fn determine_flow_type( &self, expr: &CreateFlowExpr, @@ -700,6 +702,10 @@ impl StatementExecutor { } if has_missing_source_table { + info!( + "Flow `{}` defaults to batching because some source tables are not available yet", + expr.flow_name + ); return Ok(FlowType::Batching); } diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index db1d1402aa..6d01aed5ab 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -32,3 +32,4 @@ tinytemplate = "1.2" tokio.workspace = true tokio-postgres = { workspace = true } tokio-stream.workspace = true +toml.workspace = true diff --git a/tests/runner/src/util.rs b/tests/runner/src/util.rs index 6b1c25398f..1eb75e6466 100644 --- a/tests/runner/src/util.rs +++ b/tests/runner/src/util.rs @@ -441,27 +441,24 @@ pub fn get_workspace_root() -> String { } fn get_target_dir_from_cargo_config() -> Option { - let workspace_root = get_workspace_root(); - let output = Command::new("cargo") - .current_dir(&workspace_root) - .args([ - "config", - "get", - "build.target-dir", - "-Z", - "unstable-options", - "--format", - "json-value", - ]) - .output() - .ok()?; - - if !output.status.success() { - return None; + if let Ok(target_dir) = std::env::var("CARGO_TARGET_DIR") { + let target_dir = PathBuf::from(target_dir); + return if target_dir.is_absolute() { + Some(target_dir) + } else { + Some(PathBuf::from(get_workspace_root()).join(target_dir)) + }; } - let target_dir = std::str::from_utf8(&output.stdout).ok()?.trim(); - let target_dir: String = serde_json::from_str(target_dir).ok()?; + let workspace_root = get_workspace_root(); + let cargo_config = PathBuf::from(&workspace_root).join(".cargo/config.toml"); + let cargo_config = std::fs::read_to_string(cargo_config).ok()?; + let cargo_config: toml::Value = toml::from_str(&cargo_config).ok()?; + let target_dir = cargo_config + .get("build")? + .get("target-dir")? + .as_str()? + .to_string(); let target_dir = PathBuf::from(target_dir); if target_dir.is_absolute() {