From 705b2007cf53d283d628c1902d0a8fba5809fcda Mon Sep 17 00:00:00 2001 From: discord9 Date: Fri, 6 Jun 2025 16:08:08 +0800 Subject: [PATCH] feat: flownode to frontend load balance with guess --- src/flow/src/batching_mode/frontend_client.rs | 141 ++++++++++++++++-- src/flow/src/batching_mode/task.rs | 4 +- src/flow/src/metrics.rs | 8 + 3 files changed, 142 insertions(+), 11 deletions(-) diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 0b7a1cbcc3..3ca322baa6 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -14,8 +14,9 @@ //! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user -use std::sync::{Arc, Weak}; -use std::time::SystemTime; +use std::collections::HashMap; +use std::sync::{Arc, Mutex, Weak}; +use std::time::{Duration, Instant, SystemTime}; use api::v1::greptime_request::Request; use api::v1::CreateTableExpr; @@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; +use itertools::Itertools; use meta_client::client::MetaClient; -use rand::rng; -use rand::seq::SliceRandom; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; +use crate::batching_mode::task::BatchingTask; use crate::batching_mode::{ DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT, GRPC_MAX_RETRIES, }; use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu}; -use crate::{Error, FlowAuthHeader}; +use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD; +use crate::{Error, FlowAuthHeader, FlowId}; /// Just like [`GrpcQueryHandler`] but use BoxedError /// @@ -74,6 +76,105 @@ impl< type HandlerMutable = Arc>>>; +/// Statistics about running query on this frontend from flownode +#[derive(Debug, Default, Clone)] +struct FrontendStat { + /// The query for flow id has been running since this timestamp + since: HashMap, + /// The average query time for each flow id + /// This is used to calculate the average query time for each flow id + past_query_avg: HashMap, +} + +#[derive(Debug, Default, Clone)] +pub struct FrontendStats { + /// The statistics for each flow id + stats: Arc>>, +} + +impl FrontendStats { + pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard { + let mut stats = self.stats.lock().expect("Failed to lock frontend stats"); + let stat = stats.entry(frontend_addr.to_string()).or_default(); + stat.since.insert(flow_id, Instant::now()); + + FrontendStatsGuard { + stats: self.stats.clone(), + frontend_addr: frontend_addr.to_string(), + cur: flow_id, + } + } + + /// return frontend addrs sorted by load, from lightest to heaviest + /// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed + pub fn sort_by_load(&self) -> Vec { + let stats = self.stats.lock().expect("Failed to lock frontend stats"); + let fe_load_factor = stats + .iter() + .map(|(node_addr, stat)| { + // total expected avg running time for all currently running queries + let total_expect_avg_run_time = stat + .since + .keys() + .map(|f| { + let (count, total_duration) = + stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO)); + if *count == 0 { + 0.0 + } else { + total_duration.as_secs_f64() / *count as f64 + } + }) + .sum::(); + let total_cur_running_time = stat + .since + .values() + .map(|since| since.elapsed().as_secs_f64()) + .sum::(); + ( + node_addr.to_string(), + total_expect_avg_run_time + total_cur_running_time, + ) + }) + .sorted_by(|(_, load_a), (_, load_b)| { + load_a + .partial_cmp(load_b) + .unwrap_or(std::cmp::Ordering::Equal) + }) + .collect::>(); + debug!("Frontend load factor: {:?}", fe_load_factor); + for (node_addr, load) in &fe_load_factor { + METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD + .with_label_values(&[&node_addr.to_string()]) + .observe(*load); + } + fe_load_factor + .into_iter() + .map(|(addr, _)| addr) + .collect::>() + } +} + +pub struct FrontendStatsGuard { + stats: Arc>>, + frontend_addr: String, + cur: FlowId, +} + +impl Drop for FrontendStatsGuard { + fn drop(&mut self) { + let mut stats = self.stats.lock().expect("Failed to lock frontend stats"); + if let Some(stat) = stats.get_mut(&self.frontend_addr) { + if let Some(since) = stat.since.remove(&self.cur) { + let elapsed = since.elapsed(); + let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default(); + *count += 1; + *total_duration += elapsed; + } + } + } +} + /// A simple frontend client able to execute sql using grpc protocol /// /// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode @@ -83,6 +184,7 @@ pub enum FrontendClient { meta_client: Arc, chnl_mgr: ChannelManager, auth: Option, + fe_stats: FrontendStats, }, Standalone { /// for the sake of simplicity still use grpc even in standalone mode @@ -114,6 +216,7 @@ impl FrontendClient { ChannelManager::with_config(cfg) }, auth, + fe_stats: Default::default(), } } @@ -192,6 +295,7 @@ impl FrontendClient { meta_client: _, chnl_mgr, auth, + fe_stats, } = self else { return UnexpectedSnafu { @@ -208,8 +312,21 @@ impl FrontendClient { .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_millis() as i64; - // shuffle the frontends to avoid always pick the same one - frontends.shuffle(&mut rng()); + let node_addrs_by_load = fe_stats.sort_by_load(); + // index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0 + let addr2load = node_addrs_by_load + .iter() + .enumerate() + .map(|(i, id)| (id.clone(), i + 1)) + .collect::>(); + // sort frontends by load, from lightest to heaviest + frontends.sort_by(|(_, a), (_, b)| { + // if not even in stats, treat as 0 load since never been queried + let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0); + let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0); + load_a.cmp(load_b) + }); + debug!("Frontend nodes sorted by load: {:?}", frontends); // found node with maximum last_activity_ts for (_, node_info) in frontends @@ -257,6 +374,7 @@ impl FrontendClient { create: CreateTableExpr, catalog: &str, schema: &str, + task: Option<&BatchingTask>, ) -> Result { self.handle( Request::Ddl(api::v1::DdlRequest { @@ -265,6 +383,7 @@ impl FrontendClient { catalog, schema, &mut None, + task, ) .await } @@ -276,15 +395,19 @@ impl FrontendClient { catalog: &str, schema: &str, peer_desc: &mut Option, + task: Option<&BatchingTask>, ) -> Result { match self { - FrontendClient::Distributed { .. } => { + FrontendClient::Distributed { fe_stats, .. } => { let db = self.get_random_active_frontend(catalog, schema).await?; *peer_desc = Some(PeerDesc::Dist { peer: db.peer.clone(), }); + let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default(); + let _guard = fe_stats.observe(&db.peer.addr, flow_id); + db.database .handle_with_retry(req.clone(), GRPC_MAX_RETRIES) .await diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index f93755d4f8..92700394ae 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -280,7 +280,7 @@ impl BatchingTask { let catalog = &self.config.sink_table_name[0]; let schema = &self.config.sink_table_name[1]; frontend_client - .create(expr.clone(), catalog, schema) + .create(expr.clone(), catalog, schema, Some(self)) .await?; Ok(()) } @@ -361,7 +361,7 @@ impl BatchingTask { }; frontend_client - .handle(req, catalog, schema, &mut peer_desc) + .handle(req, catalog, schema, &mut peer_desc, Some(self)) .await }; diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 2b93a4a0a0..9dfec94a90 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -58,6 +58,14 @@ lazy_static! { vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_guess_fe_load", + "flow batching engine guessed frontend load", + &["fe_addr"], + vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(