From 47a267e29c7c852e19261b1c0f7f8ed43b047d11 Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 18 Apr 2025 21:04:56 +0800 Subject: [PATCH] fix: add locks for create/drop flow&docs: update docs --- src/cmd/src/standalone.rs | 3 +- src/flow/src/adapter/flownode_impl.rs | 90 ++++++++++--------- src/flow/src/batching_mode/frontend_client.rs | 1 + src/flow/src/batching_mode/utils.rs | 1 - 4 files changed, 49 insertions(+), 46 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 5d15e36e1a..b16a57a575 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -524,9 +524,8 @@ impl StartCommand { ..Default::default() }; - // TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without + // for standalone not use grpc, but get a handler to frontend grpc client without // actually make a connection - let fe_server_addr = fe_opts.grpc.bind_addr.clone(); let (frontend_client, frontend_instance_handler) = FrontendClient::from_empty_grpc_handler(); let flow_builder = FlownodeBuilder::new( diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 90f33ddbfb..af557c8404 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -33,7 +33,7 @@ use itertools::Itertools; use session::context::QueryContextBuilder; use snafu::{IntoError, OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, RwLock}; use crate::adapter::{CreateFlowArgs, FlowWorkerManager}; use crate::batching_mode::engine::BatchingEngine; @@ -57,7 +57,8 @@ pub struct FlowDualEngine { streaming_engine: Arc, batching_engine: Arc, /// helper struct for faster query flow by table id or vice versa - src_table2flow: std::sync::RwLock, + /// need to also use as a lock so use tokio RwLock + src_table2flow: RwLock, flow_metadata_manager: Arc, catalog_manager: Arc, check_task: tokio::sync::Mutex>, @@ -73,7 +74,7 @@ impl FlowDualEngine { Self { streaming_engine, batching_engine, - src_table2flow: std::sync::RwLock::new(SrcTableToFlow::default()), + src_table2flow: RwLock::new(SrcTableToFlow::default()), flow_metadata_manager, catalog_manager, check_task: Mutex::new(None), @@ -88,6 +89,36 @@ impl FlowDualEngine { self.batching_engine.clone() } + /// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required + /// + /// the need to sync is to make sure flush flow actually get called + async fn try_sync_with_check_task( + &self, + flow_id: FlowId, + allow_drop: bool, + ) -> Result<(), Error> { + // this function rarely get called so adding some log is helpful + info!("Try to sync with check task for flow {}", flow_id); + let mut retry = 0; + let max_retry = 10; + // keep trying to trigger consistent check + while retry < max_retry { + if let Some(task) = self.check_task.lock().await.as_ref() { + task.trigger(false, allow_drop).await?; + break; + } + retry += 1; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + if retry == max_retry { + return FlowNotFoundSnafu { id: flow_id }.fail(); + } + info!("Successfully sync with check task for flow {}", flow_id); + + Ok(()) + } + /// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode, /// so on startup, this will create all missing flow tasks, and constantly check at a interval async fn check_flow_consistent( @@ -427,21 +458,21 @@ impl FlowEngine for FlowDualEngine { let flow_id = args.flow_id; let src_table_ids = args.source_table_ids.clone(); + let mut src_table2flow = self.src_table2flow.write().await; let res = match flow_type { FlowType::Batching => self.batching_engine.create_flow(args).await, FlowType::Streaming => self.streaming_engine.create_flow(args).await, }?; - self.src_table2flow - .write() - .unwrap() - .add_flow(flow_id, flow_type, src_table_ids); + src_table2flow.add_flow(flow_id, flow_type, src_table_ids); Ok(res) } async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> { - let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); + let mut src_table2flow = self.src_table2flow.write().await; + let flow_type = src_table2flow.get_flow_type(flow_id); + match flow_type { Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await, Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await, @@ -453,56 +484,29 @@ impl FlowEngine for FlowDualEngine { "Flow {} is not exist in the underlying engine, but exist in metadata", flow_id ); - let mut retry = 0; - let max_retry = 10; - // keep trying to trigger consistent check - while retry < max_retry { - if let Some(task) = self.check_task.lock().await.as_ref() { - task.trigger(false, true).await?; - break; - } - retry += 1; - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - - if retry == max_retry { - error!( - "Failed to trigger consistent check after {} retries while dropping flow {}", - max_retry, flow_id - ); - return FlowNotFoundSnafu { id: flow_id }.fail(); - } + self.try_sync_with_check_task(flow_id, true).await?; Ok(()) } }?; // remove mapping - self.src_table2flow.write().unwrap().remove_flow(flow_id); + src_table2flow.remove_flow(flow_id); Ok(()) } async fn flush_flow(&self, flow_id: FlowId) -> Result { - let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); + // sync with check task + self.try_sync_with_check_task(flow_id, false).await?; + let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id); match flow_type { Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await, Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await, - None => { - // this might happen if flownode only just started - if self.flow_exist_in_metadata(flow_id).await? { - warn!( - "Flow {} is not exist in the underlying engine, but exist in metadata", - flow_id - ); - Ok(0) - } else { - FlowNotFoundSnafu { id: flow_id }.fail() - } - } + None => Ok(0), } } async fn flow_exist(&self, flow_id: FlowId) -> Result { - let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id); + let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id); // not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine match flow_type { Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await, @@ -527,7 +531,7 @@ impl FlowEngine for FlowDualEngine { let mut to_batch_engine = request.requests; { - let src_table2flow = self.src_table2flow.read().unwrap(); + let src_table2flow = self.src_table2flow.read().await; to_batch_engine.retain(|req| { let region_id = RegionId::from(req.region_id); let table_id = region_id.table_id(); diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index e7fe37ba05..4824539056 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -85,6 +85,7 @@ pub enum FrontendClient { } impl FrontendClient { + /// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) { let handler = Arc::new(std::sync::Mutex::new(None)); ( diff --git a/src/flow/src/batching_mode/utils.rs b/src/flow/src/batching_mode/utils.rs index ca3f0b5d1d..eba790bd65 100644 --- a/src/flow/src/batching_mode/utils.rs +++ b/src/flow/src/batching_mode/utils.rs @@ -256,7 +256,6 @@ impl TreeNodeRewriter for AddAutoColumnRewriter { node = projection; } - // FIXME(discord9): just read plan.expr and do stuffs let mut exprs = node.expressions(); let all_names = self .schema