fix: drop flow when replace with pending one

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-12 20:39:05 +08:00
parent 4a5cbe244e
commit ddec6061be
2 changed files with 152 additions and 4 deletions

View File

@@ -19,9 +19,11 @@ use std::fmt;
use api::v1::ExpireAfter;
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader};
use api::v1::flow::{CreateRequest, DropRequest, FlowRequest, FlowRequestHeader};
use async_trait::async_trait;
use common_catalog::format_full_flow_name;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
@@ -195,8 +197,20 @@ impl CreateFlowProcedure {
self.data.flow_type = Some(get_flow_type_from_options(&self.data.task)?);
if self.data.is_pending() {
self.data.peers.clear();
self.data.state = CreateFlowState::CreateMetadata;
if self.data.task.or_replace
&& self
.data
.prev_flow_info_value
.as_ref()
.is_some_and(|flow| flow.get_inner_ref().is_active())
&& !self.data.peers.is_empty()
{
// if old flow is active and have peers, need to drop old flow on flownodes before create new flow metadata.
self.data.state = CreateFlowState::DropOldFlows;
} else {
self.data.peers.clear();
self.data.state = CreateFlowState::CreateMetadata;
}
} else {
if self.data.peers.is_empty() {
self.data.peers = self.context.flow_metadata_allocator.alloc_peers(1).await?;
@@ -249,6 +263,37 @@ impl CreateFlowProcedure {
Ok(Status::executing(true))
}
async fn on_flownode_drop_flows(&mut self) -> Result<Status> {
let flow_id = self.data.flow_id.unwrap();
let mut drop_flow_tasks = Vec::with_capacity(self.data.peers.len());
for peer in &self.data.peers {
let requester = self.context.node_manager.flownode(peer).await;
let request = FlowRequest {
body: Some(PbFlowRequest::Drop(DropRequest {
flow_id: Some(api::v1::FlowId { id: flow_id }),
})),
..Default::default()
};
drop_flow_tasks.push(async move {
if let Err(err) = requester.handle(request).await
&& err.status_code() != StatusCode::FlowNotFound
{
return Err(add_peer_context_if_needed(peer.clone())(err));
}
Ok(())
});
}
join_all(drop_flow_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
self.data.peers.clear();
self.data.state = CreateFlowState::CreateMetadata;
Ok(Status::executing(true))
}
/// Creates flow metadata.
///
/// Abort(not-retry):
@@ -341,6 +386,7 @@ impl Procedure for CreateFlowProcedure {
match state {
CreateFlowState::Prepare => self.on_prepare().await,
CreateFlowState::CreateFlows => self.on_flownode_create_flows().await,
CreateFlowState::DropOldFlows => self.on_flownode_drop_flows().await,
CreateFlowState::CreateMetadata => self.on_create_metadata().await,
CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
}
@@ -392,6 +438,8 @@ pub enum CreateFlowState {
Prepare,
/// Creates flows on the flownode.
CreateFlows,
/// Drops old flows on the flownode when replacing a old flow with a pending flow.
DropOldFlows,
/// Invalidate flow cache.
InvalidateFlowCache,
/// Create metadata.

View File

@@ -14,8 +14,10 @@
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{DropRequest, FlowRequest, FlowResponse};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::Status;
use common_procedure_test::execute_procedure_until_done;
@@ -31,6 +33,36 @@ use crate::key::table_route::TableRouteValue;
use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
use crate::test_util::{MockFlownodeManager, new_ddl_context};
#[derive(Clone, Default)]
struct RecordingDropFlownodeHandler {
drop_requests: Arc<Mutex<Vec<DropRequest>>>,
}
#[async_trait::async_trait]
impl crate::test_util::MockFlownodeHandler for RecordingDropFlownodeHandler {
async fn handle(
&self,
_peer: &crate::peer::Peer,
request: FlowRequest,
) -> crate::error::Result<FlowResponse> {
if let Some(PbFlowRequest::Drop(drop_req)) = request.body {
self.drop_requests.lock().unwrap().push(drop_req);
}
Ok(FlowResponse {
affected_rows: 0,
..Default::default()
})
}
async fn handle_inserts(
&self,
_peer: &crate::peer::Peer,
_requests: api::v1::region::InsertRequests,
) -> crate::error::Result<FlowResponse> {
unreachable!()
}
}
fn test_query_context() -> QueryContext {
QueryContext {
current_catalog: DEFAULT_CATALOG_NAME.to_string(),
@@ -244,6 +276,74 @@ async fn test_replace_pending_flow_activates_with_allocated_peers() {
assert!(!routes.is_empty());
}
#[tokio::test]
async fn test_replace_active_flow_to_pending_drops_old_flows() {
let existing_source_table = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"replace_active_source_table",
);
let missing_source_table = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"replace_missing_source_table",
);
let sink_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"replace_active_sink_table",
);
let handler = RecordingDropFlownodeHandler::default();
let node_manager = Arc::new(MockFlownodeManager::new(handler.clone()));
let ddl_context = new_ddl_context(node_manager);
let create_table_task = test_create_table_task("replace_active_source_table", 2026);
ddl_context
.table_metadata_manager
.create_table_metadata(
create_table_task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let flow_id = create_test_flow(
&ddl_context,
"replace_active_flow_to_pending",
vec![existing_source_table],
sink_table_name.clone(),
)
.await;
let mut replace_task = test_create_flow_task(
"replace_active_flow_to_pending",
vec![missing_source_table],
sink_table_name,
false,
);
replace_task.or_replace = true;
let query_ctx = test_query_context();
let mut procedure = CreateFlowProcedure::new(replace_task, query_ctx, ddl_context.clone());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let replaced_flow_id = *output.downcast_ref::<FlowId>().unwrap();
assert_eq!(replaced_flow_id, flow_id);
let replaced_flow = ddl_context
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.unwrap()
.unwrap();
assert!(replaced_flow.is_pending());
let drop_requests = handler.drop_requests.lock().unwrap();
assert!(!drop_requests.is_empty());
assert_eq!(drop_requests[0].flow_id.as_ref().unwrap().id, flow_id);
}
#[tokio::test]
async fn test_create_flow_same_source_and_sink_table() {
let table_id = 1024;