fix: flownode chose fe randomly&not starve lock (#6077)

* fix: choose frontend randomly

* docs: update comment

* chore: more logs

* fix: ignore inserts until recovering flow is done

* chore: resolve TODO

* fix: rm unused code&set done in correct location

* refactor: speed up create flow
This commit is contained in:
discord9
2025-05-12 20:11:28 +08:00
committed by GitHub
parent 2ce5631d3c
commit 265b144ca2
8 changed files with 89 additions and 141 deletions

1
Cargo.lock generated
View File

@@ -4345,6 +4345,7 @@ dependencies = [
"prometheus",
"prost 0.13.5",
"query",
"rand 0.9.0",
"serde",
"serde_json",
"servers",

View File

@@ -59,6 +59,7 @@ partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
rand.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true

View File

@@ -14,6 +14,7 @@
//! impl `FlowNode` trait for FlowNodeManager so standalone can call them
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use api::v1::flow::{
@@ -41,9 +42,9 @@ use crate::batching_mode::engine::BatchingEngine;
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, IllegalCheckTaskStateSnafu,
InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu,
SyncCheckTaskSnafu, UnexpectedSnafu,
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu,
NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu,
};
use crate::metrics::METRIC_FLOW_TASK_COUNT;
use crate::repr::{self, DiffRow};
@@ -65,6 +66,7 @@ pub struct FlowDualEngine {
catalog_manager: Arc<dyn CatalogManager>,
check_task: tokio::sync::Mutex<Option<ConsistentCheckTask>>,
plugins: Plugins,
done_recovering: AtomicBool,
}
impl FlowDualEngine {
@@ -83,9 +85,55 @@ impl FlowDualEngine {
catalog_manager,
check_task: Mutex::new(None),
plugins,
done_recovering: AtomicBool::new(false),
}
}
/// Set `done_recovering` to true
/// indicate that we are ready to handle requests
pub fn set_done_recovering(&self) {
info!("FlowDualEngine done recovering");
self.done_recovering
.store(true, std::sync::atomic::Ordering::Release);
}
/// Check if `done_recovering` is true
pub fn is_recover_done(&self) -> bool {
self.done_recovering
.load(std::sync::atomic::Ordering::Acquire)
}
/// wait for recovering to be done, this will only happen when flownode just started
async fn wait_for_all_flow_recover(&self, waiting_req_cnt: usize) -> Result<(), Error> {
if self.is_recover_done() {
return Ok(());
}
warn!(
"FlowDualEngine is not done recovering, {} insert request waiting for recovery",
waiting_req_cnt
);
// wait 3 seconds, check every 1 second
// TODO(discord9): make this configurable
let mut retry = 0;
let max_retry = 3;
while retry < max_retry && !self.is_recover_done() {
warn!(
"FlowDualEngine is not done recovering, retry {} in 1s",
retry
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
retry += 1;
}
if retry == max_retry {
return FlowNotRecoveredSnafu.fail();
} else {
info!("FlowDualEngine is done recovering");
}
// TODO(discord9): also put to centralized logging for flow once it implemented
Ok(())
}
pub fn plugins(&self) -> &Plugins {
&self.plugins
}
@@ -243,7 +291,7 @@ impl FlowDualEngine {
to_be_created
);
let mut errors = vec![];
for flow_id in to_be_created {
for flow_id in to_be_created.clone() {
let flow_id = *flow_id;
let info = self
.flow_metadata_manager
@@ -302,6 +350,10 @@ impl FlowDualEngine {
errors.push((flow_id, err));
}
}
if errors.is_empty() {
info!("Recover flows successfully, flows: {:?}", to_be_created);
}
for (flow_id, err) in errors {
warn!("Failed to recreate flow {}, err={:#?}", flow_id, err);
}
@@ -410,6 +462,8 @@ impl ConsistentCheckTask {
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
engine.set_done_recovering();
// then do check flows, with configurable allow_create and allow_drop
let (mut allow_create, mut allow_drop) = (false, false);
let mut ret_signal: Option<tokio::sync::oneshot::Sender<()>> = None;
@@ -629,11 +683,14 @@ impl FlowEngine for FlowDualEngine {
&self,
request: api::v1::region::InsertRequests,
) -> Result<(), Error> {
self.wait_for_all_flow_recover(request.requests.len())
.await?;
// TODO(discord9): make as little clone as possible
let mut to_stream_engine = Vec::with_capacity(request.requests.len());
let mut to_batch_engine = request.requests;
{
// not locking this, or recover flows will be starved when also handling flow inserts
let src_table2flow = self.src_table2flow.read().await;
to_batch_engine.retain(|req| {
let region_id = RegionId::from(req.region_id);

View File

@@ -330,7 +330,7 @@ impl BatchingEngine {
let frontend = self.frontend_client.clone();
// check execute once first to detect any error early
task.check_execute(&engine, &frontend).await?;
task.check_or_create_sink_table(&engine, &frontend).await?;
// TODO(discord9): use time wheel or what for better
let handle = common_runtime::spawn_global(async move {

View File

@@ -27,8 +27,9 @@ use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use itertools::Itertools;
use meta_client::client::MetaClient;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
@@ -180,8 +181,9 @@ impl FrontendClient {
Ok(res)
}
/// Get the database with maximum `last_activity_ts`& is able to process query
async fn get_latest_active_frontend(
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -201,17 +203,17 @@ impl FrontendClient {
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
let frontends = self.scan_for_frontend().await?;
let mut frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
.sorted_by_key(|(_, node_info)| node_info.last_activity_ts)
.rev()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
@@ -277,7 +279,7 @@ impl FrontendClient {
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
let db = self.get_latest_active_frontend(catalog, schema).await?;
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),

View File

@@ -142,26 +142,12 @@ impl BatchingTask {
Ok(())
}
/// Test execute, for check syntax or such
pub async fn check_execute(
/// Create sink table if not exists
pub async fn check_or_create_sink_table(
&self,
engine: &QueryEngineRef,
frontend_client: &Arc<FrontendClient>,
) -> Result<Option<(u32, Duration)>, Error> {
// use current time to test get a dirty time window, which should be safe
let start = SystemTime::now();
let ts = Timestamp::new_second(
start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs() as _,
);
self.state
.write()
.unwrap()
.dirty_time_windows
.add_lower_bounds(vec![ts].into_iter());
if !self.is_table_exist(&self.config.sink_table_name).await? {
let create_table = self.gen_create_table_expr(engine.clone()).await?;
info!(
@@ -174,7 +160,8 @@ impl BatchingTask {
self.config.sink_table_name.join(".")
);
}
self.gen_exec_once(engine, frontend_client).await
Ok(None)
}
async fn is_table_exist(&self, table_name: &[String; 3]) -> Result<bool, Error> {

View File

@@ -46,6 +46,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Flow engine is still recovering"))]
FlowNotRecovered {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Error encountered while creating flow: {sql}"))]
CreateFlow {
sql: String,
@@ -310,7 +316,8 @@ impl ErrorExt for Error {
| Self::JoinTask { .. }
| Self::Datafusion { .. }
| Self::InsertIntoFlow { .. }
| Self::NoAvailableFrontend { .. } => StatusCode::Internal,
| Self::NoAvailableFrontend { .. }
| Self::FlowNotRecovered { .. } => StatusCode::Internal,
Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists,
Self::TableNotFound { .. }
| Self::TableNotFoundMeta { .. }

View File

@@ -43,7 +43,7 @@ use servers::error::{StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use session::context::{QueryContextBuilder, QueryContextRef};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, oneshot, Mutex};
@@ -54,19 +54,15 @@ use tonic::{Request, Response, Status};
use crate::adapter::flownode_impl::{FlowDualEngine, FlowDualEngineRef};
use crate::adapter::{create_worker, FlowStreamingEngineRef};
use crate::batching_mode::engine::BatchingEngine;
use crate::engine::FlowEngine;
use crate::error::{
to_status_with_last_err, CacheRequiredSnafu, CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu,
IllegalAuthConfigSnafu, ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu,
UnexpectedSnafu,
to_status_with_last_err, CacheRequiredSnafu, ExternalSnafu, IllegalAuthConfigSnafu,
ListFlowsSnafu, ParseAddrSnafu, ShutdownServerSnafu, StartServerSnafu, UnexpectedSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::metrics::{METRIC_FLOW_PROCESSING_TIME, METRIC_FLOW_ROWS};
use crate::transform::register_function_to_query_engine;
use crate::utils::{SizeReportSender, StateReportHandler};
use crate::{
CreateFlowArgs, Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine,
};
use crate::{Error, FlowAuthHeader, FlownodeOptions, FrontendClient, StreamingEngine};
pub const FLOW_NODE_SERVER_NAME: &str = "FLOW_NODE_SERVER";
/// wrapping flow node manager to avoid orphan rule with Arc<...>
@@ -416,109 +412,6 @@ impl FlownodeBuilder {
Ok(instance)
}
/// recover all flow tasks in this flownode in distributed mode(nodeid is Some(<num>))
///
/// or recover all existing flow tasks if in standalone mode(nodeid is None)
///
/// TODO(discord9): persistent flow tasks with internal state
async fn recover_flows(&self, manager: &FlowDualEngine) -> Result<usize, Error> {
let nodeid = self.opts.node_id;
let to_be_recovered: Vec<_> = if let Some(nodeid) = nodeid {
let to_be_recover = self
.flow_metadata_manager
.flownode_flow_manager()
.flows(nodeid)
.try_collect::<Vec<_>>()
.await
.context(ListFlowsSnafu { id: Some(nodeid) })?;
to_be_recover.into_iter().map(|(id, _)| id).collect()
} else {
let all_catalogs = self
.catalog_manager
.catalog_names()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut all_flow_ids = vec![];
for catalog in all_catalogs {
let flows = self
.flow_metadata_manager
.flow_name_manager()
.flow_names(&catalog)
.await
.try_collect::<Vec<_>>()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
all_flow_ids.extend(flows.into_iter().map(|(_, id)| id.flow_id()));
}
all_flow_ids
};
let cnt = to_be_recovered.len();
// TODO(discord9): recover in parallel
info!("Recovering {} flows: {:?}", cnt, to_be_recovered);
for flow_id in to_be_recovered {
let info = self
.flow_metadata_manager
.flow_info_manager()
.get(flow_id)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
.context(FlowNotFoundSnafu { id: flow_id })?;
let sink_table_name = [
info.sink_table_name().catalog_name.clone(),
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
source_table_ids: info.source_table_ids().to_vec(),
// because recover should only happen on restart the `create_if_not_exists` and `or_replace` can be arbitrary value(since flow doesn't exist)
// but for the sake of consistency and to make sure recover of flow actually happen, we set both to true
// (which is also fine since checks for not allow both to be true is on metasrv and we already pass that)
create_if_not_exists: true,
or_replace: true,
expire_after: info.expire_after(),
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: info
.query_context()
.clone()
.map(|ctx| {
ctx.try_into()
.map_err(BoxedError::new)
.context(ExternalSnafu)
})
.transpose()?
// or use default QueryContext with catalog_name from info
// to keep compatibility with old version
.or_else(|| {
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().to_string())
.build(),
)
}),
};
manager
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu {
sql: info.raw_sql().clone(),
})?;
}
Ok(cnt)
}
/// build [`FlowWorkerManager`], note this doesn't take ownership of `self`,
/// nor does it actually start running the worker.
async fn build_manager(