From ddec6061be61e10071aab363a702e69c6a76a75a Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 12 Mar 2026 20:39:05 +0800 Subject: [PATCH] fix: drop flow when replace with pending one Signed-off-by: discord9 --- src/common/meta/src/ddl/create_flow.rs | 54 +++++++++- src/common/meta/src/ddl/tests/create_flow.rs | 102 ++++++++++++++++++- 2 files changed, 152 insertions(+), 4 deletions(-) diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 69b869faa9..10dcb6562b 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -19,9 +19,11 @@ use std::fmt; use api::v1::ExpireAfter; use api::v1::flow::flow_request::Body as PbFlowRequest; -use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; +use api::v1::flow::{CreateRequest, DropRequest, FlowRequest, FlowRequestHeader}; use async_trait::async_trait; use common_catalog::format_full_flow_name; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, @@ -195,8 +197,20 @@ impl CreateFlowProcedure { self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?); if self.data.is_pending() { - self.data.peers.clear(); - self.data.state = CreateFlowState::CreateMetadata; + if self.data.task.or_replace + && self + .data + .prev_flow_info_value + .as_ref() + .is_some_and(|flow| flow.get_inner_ref().is_active()) + && !self.data.peers.is_empty() + { + // if old flow is active and have peers, need to drop old flow on flownodes before create new flow metadata. + self.data.state = CreateFlowState::DropOldFlows; + } else { + self.data.peers.clear(); + self.data.state = CreateFlowState::CreateMetadata; + } } else { if self.data.peers.is_empty() { self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?; @@ -249,6 +263,37 @@ impl CreateFlowProcedure { Ok(Status::executing(true)) } + async fn on_flownode_drop_flows(&mut self) -> Result { + let flow_id = self.data.flow_id.unwrap(); + let mut drop_flow_tasks = Vec::with_capacity(self.data.peers.len()); + for peer in &self.data.peers { + let requester = self.context.node_manager.flownode(peer).await; + let request = FlowRequest { + body: Some(PbFlowRequest::Drop(DropRequest { + flow_id: Some(api::v1::FlowId { id: flow_id }), + })), + ..Default::default() + }; + + drop_flow_tasks.push(async move { + if let Err(err) = requester.handle(request).await + && err.status_code() != StatusCode::FlowNotFound + { + return Err(add_peer_context_if_needed(peer.clone())(err)); + } + Ok(()) + }); + } + + join_all(drop_flow_tasks) + .await + .into_iter() + .collect::>>()?; + self.data.peers.clear(); + self.data.state = CreateFlowState::CreateMetadata; + Ok(Status::executing(true)) + } + /// Creates flow metadata. /// /// Abort(not-retry): @@ -341,6 +386,7 @@ impl Procedure for CreateFlowProcedure { match state { CreateFlowState::Prepare => self.on_prepare().await, CreateFlowState::CreateFlows => self.on_flownode_create_flows().await, + CreateFlowState::DropOldFlows => self.on_flownode_drop_flows().await, CreateFlowState::CreateMetadata => self.on_create_metadata().await, CreateFlowState::InvalidateFlowCache => self.on_broadcast().await, } @@ -392,6 +438,8 @@ pub enum CreateFlowState { Prepare, /// Creates flows on the flownode. CreateFlows, + /// Drops old flows on the flownode when replacing a old flow with a pending flow. + DropOldFlows, /// Invalidate flow cache. InvalidateFlowCache, /// Create metadata. diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index 0e5b40c888..cd4777494b 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -14,8 +14,10 @@ use std::assert_matches::assert_matches; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{DropRequest, FlowRequest, FlowResponse}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure::Status; use common_procedure_test::execute_procedure_until_done; @@ -31,6 +33,36 @@ use crate::key::table_route::TableRouteValue; use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext}; use crate::test_util::{MockFlownodeManager, new_ddl_context}; +#[derive(Clone, Default)] +struct RecordingDropFlownodeHandler { + drop_requests: Arc>>, +} + +#[async_trait::async_trait] +impl crate::test_util::MockFlownodeHandler for RecordingDropFlownodeHandler { + async fn handle( + &self, + _peer: &crate::peer::Peer, + request: FlowRequest, + ) -> crate::error::Result { + if let Some(PbFlowRequest::Drop(drop_req)) = request.body { + self.drop_requests.lock().unwrap().push(drop_req); + } + Ok(FlowResponse { + affected_rows: 0, + ..Default::default() + }) + } + + async fn handle_inserts( + &self, + _peer: &crate::peer::Peer, + _requests: api::v1::region::InsertRequests, + ) -> crate::error::Result { + unreachable!() + } +} + fn test_query_context() -> QueryContext { QueryContext { current_catalog: DEFAULT_CATALOG_NAME.to_string(), @@ -244,6 +276,74 @@ async fn test_replace_pending_flow_activates_with_allocated_peers() { assert!(!routes.is_empty()); } +#[tokio::test] +async fn test_replace_active_flow_to_pending_drops_old_flows() { + let existing_source_table = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_active_source_table", + ); + let missing_source_table = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_missing_source_table", + ); + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "replace_active_sink_table", + ); + + let handler = RecordingDropFlownodeHandler::default(); + let node_manager = Arc::new(MockFlownodeManager::new(handler.clone())); + let ddl_context = new_ddl_context(node_manager); + + let create_table_task = test_create_table_task("replace_active_source_table", 2026); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + + let flow_id = create_test_flow( + &ddl_context, + "replace_active_flow_to_pending", + vec![existing_source_table], + sink_table_name.clone(), + ) + .await; + + let mut replace_task = test_create_flow_task( + "replace_active_flow_to_pending", + vec![missing_source_table], + sink_table_name, + false, + ); + replace_task.or_replace = true; + let query_ctx = test_query_context(); + let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone()); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let replaced_flow_id = *output.downcast_ref::().unwrap(); + assert_eq!(replaced_flow_id, flow_id); + + let replaced_flow = ddl_context + .flow_metadata_manager + .flow_info_manager() + .get(flow_id) + .await + .unwrap() + .unwrap(); + assert!(replaced_flow.is_pending()); + + let drop_requests = handler.drop_requests.lock().unwrap(); + assert!(!drop_requests.is_empty()); + assert_eq!(drop_requests[0].flow_id.as_ref().unwrap().id, flow_id); +} + #[tokio::test] async fn test_create_flow_same_source_and_sink_table() { let table_id = 1024;