fix: add locks for create/drop flow&docs: update docs

This commit is contained in:
discord9
2025-04-18 21:04:56 +08:00
parent fa13d06fc6
commit 47a267e29c
4 changed files with 49 additions and 46 deletions

View File

@@ -524,9 +524,8 @@ impl StartCommand {
..Default::default()
};
// TODO(discord9): for standalone not use grpc, but just somehow get a handler to frontend grpc client without
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let fe_server_addr = fe_opts.grpc.bind_addr.clone();
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
let flow_builder = FlownodeBuilder::new(

View File

@@ -33,7 +33,7 @@ use itertools::Itertools;
use session::context::QueryContextBuilder;
use snafu::{IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use tokio::sync::Mutex;
use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, FlowWorkerManager};
use crate::batching_mode::engine::BatchingEngine;
@@ -57,7 +57,8 @@ pub struct FlowDualEngine {
streaming_engine: Arc<FlowWorkerManager>,
batching_engine: Arc<BatchingEngine>,
/// helper struct for faster query flow by table id or vice versa
src_table2flow: std::sync::RwLock<SrcTableToFlow>,
/// need to also use as a lock so use tokio RwLock
src_table2flow: RwLock<SrcTableToFlow>,
flow_metadata_manager: Arc<FlowMetadataManager>,
catalog_manager: Arc<dyn CatalogManager>,
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
@@ -73,7 +74,7 @@ impl FlowDualEngine {
Self {
streaming_engine,
batching_engine,
src_table2flow: std::sync::RwLock::new(SrcTableToFlow::default()),
src_table2flow: RwLock::new(SrcTableToFlow::default()),
flow_metadata_manager,
catalog_manager,
check_task: Mutex::new(None),
@@ -88,6 +89,36 @@ impl FlowDualEngine {
self.batching_engine.clone()
}
/// Try to sync with check task, this is only used in drop flow&flush flow, so a flow id is required
///
/// the need to sync is to make sure flush flow actually get called
async fn try_sync_with_check_task(
&self,
flow_id: FlowId,
allow_drop: bool,
) -> Result<(), Error> {
// this function rarely get called so adding some log is helpful
info!("Try to sync with check task for flow {}", flow_id);
let mut retry = 0;
let max_retry = 10;
// keep trying to trigger consistent check
while retry < max_retry {
if let Some(task) = self.check_task.lock().await.as_ref() {
task.trigger(false, allow_drop).await?;
break;
}
retry += 1;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
if retry == max_retry {
return FlowNotFoundSnafu { id: flow_id }.fail();
}
info!("Successfully sync with check task for flow {}", flow_id);
Ok(())
}
/// Spawn a task to consistently check if all flow tasks in metasrv is created on flownode,
/// so on startup, this will create all missing flow tasks, and constantly check at a interval
async fn check_flow_consistent(
@@ -427,21 +458,21 @@ impl FlowEngine for FlowDualEngine {
let flow_id = args.flow_id;
let src_table_ids = args.source_table_ids.clone();
let mut src_table2flow = self.src_table2flow.write().await;
let res = match flow_type {
FlowType::Batching => self.batching_engine.create_flow(args).await,
FlowType::Streaming => self.streaming_engine.create_flow(args).await,
}?;
self.src_table2flow
.write()
.unwrap()
.add_flow(flow_id, flow_type, src_table_ids);
src_table2flow.add_flow(flow_id, flow_type, src_table_ids);
Ok(res)
}
async fn remove_flow(&self, flow_id: FlowId) -> Result<(), Error> {
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
let mut src_table2flow = self.src_table2flow.write().await;
let flow_type = src_table2flow.get_flow_type(flow_id);
match flow_type {
Some(FlowType::Batching) => self.batching_engine.remove_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.remove_flow(flow_id).await,
@@ -453,56 +484,29 @@ impl FlowEngine for FlowDualEngine {
"Flow {} is not exist in the underlying engine, but exist in metadata",
flow_id
);
let mut retry = 0;
let max_retry = 10;
// keep trying to trigger consistent check
while retry < max_retry {
if let Some(task) = self.check_task.lock().await.as_ref() {
task.trigger(false, true).await?;
break;
}
retry += 1;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
if retry == max_retry {
error!(
"Failed to trigger consistent check after {} retries while dropping flow {}",
max_retry, flow_id
);
return FlowNotFoundSnafu { id: flow_id }.fail();
}
self.try_sync_with_check_task(flow_id, true).await?;
Ok(())
}
}?;
// remove mapping
self.src_table2flow.write().unwrap().remove_flow(flow_id);
src_table2flow.remove_flow(flow_id);
Ok(())
}
async fn flush_flow(&self, flow_id: FlowId) -> Result<usize, Error> {
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
// sync with check task
self.try_sync_with_check_task(flow_id, false).await?;
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
match flow_type {
Some(FlowType::Batching) => self.batching_engine.flush_flow(flow_id).await,
Some(FlowType::Streaming) => self.streaming_engine.flush_flow(flow_id).await,
None => {
// this might happen if flownode only just started
if self.flow_exist_in_metadata(flow_id).await? {
warn!(
"Flow {} is not exist in the underlying engine, but exist in metadata",
flow_id
);
Ok(0)
} else {
FlowNotFoundSnafu { id: flow_id }.fail()
}
}
None => Ok(0),
}
}
async fn flow_exist(&self, flow_id: FlowId) -> Result<bool, Error> {
let flow_type = self.src_table2flow.read().unwrap().get_flow_type(flow_id);
let flow_type = self.src_table2flow.read().await.get_flow_type(flow_id);
// not using `flow_type.is_some()` to make sure the flow is actually exist in the underlying engine
match flow_type {
Some(FlowType::Batching) => self.batching_engine.flow_exist(flow_id).await,
@@ -527,7 +531,7 @@ impl FlowEngine for FlowDualEngine {
let mut to_batch_engine = request.requests;
{
let src_table2flow = self.src_table2flow.read().unwrap();
let src_table2flow = self.src_table2flow.read().await;
to_batch_engine.retain(|req| {
let region_id = RegionId::from(req.region_id);
let table_id = region_id.table_id();

View File

@@ -85,6 +85,7 @@ pub enum FrontendClient {
}
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
(

View File

@@ -256,7 +256,6 @@ impl TreeNodeRewriter for AddAutoColumnRewriter {
node = projection;
}
// FIXME(discord9): just read plan.expr and do stuffs
let mut exprs = node.expressions();
let all_names = self
.schema