mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-28 00:42:56 +00:00
Compare commits
1 Commits
feat/impl-
...
flow/lb_fe
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d17d195a3 |
@@ -14,8 +14,9 @@
|
|||||||
|
|
||||||
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
|
||||||
|
|
||||||
use std::sync::{Arc, Weak};
|
use std::collections::HashMap;
|
||||||
use std::time::SystemTime;
|
use std::sync::{Arc, Mutex, Weak};
|
||||||
|
use std::time::{Duration, Instant, SystemTime};
|
||||||
|
|
||||||
use api::v1::greptime_request::Request;
|
use api::v1::greptime_request::Request;
|
||||||
use api::v1::CreateTableExpr;
|
use api::v1::CreateTableExpr;
|
||||||
@@ -26,20 +27,21 @@ use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
|
|||||||
use common_meta::peer::Peer;
|
use common_meta::peer::Peer;
|
||||||
use common_meta::rpc::store::RangeRequest;
|
use common_meta::rpc::store::RangeRequest;
|
||||||
use common_query::Output;
|
use common_query::Output;
|
||||||
use common_telemetry::warn;
|
use common_telemetry::{debug, warn};
|
||||||
|
use itertools::Itertools;
|
||||||
use meta_client::client::MetaClient;
|
use meta_client::client::MetaClient;
|
||||||
use rand::rng;
|
|
||||||
use rand::seq::SliceRandom;
|
|
||||||
use servers::query_handler::grpc::GrpcQueryHandler;
|
use servers::query_handler::grpc::GrpcQueryHandler;
|
||||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||||
use snafu::{OptionExt, ResultExt};
|
use snafu::{OptionExt, ResultExt};
|
||||||
|
|
||||||
|
use crate::batching_mode::task::BatchingTask;
|
||||||
use crate::batching_mode::{
|
use crate::batching_mode::{
|
||||||
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
|
||||||
GRPC_MAX_RETRIES,
|
GRPC_MAX_RETRIES,
|
||||||
};
|
};
|
||||||
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
|
||||||
use crate::{Error, FlowAuthHeader};
|
use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD;
|
||||||
|
use crate::{Error, FlowAuthHeader, FlowId};
|
||||||
|
|
||||||
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
/// Just like [`GrpcQueryHandler`] but use BoxedError
|
||||||
///
|
///
|
||||||
@@ -74,6 +76,105 @@ impl<
|
|||||||
|
|
||||||
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
|
||||||
|
|
||||||
|
/// Statistics about running query on this frontend from flownode
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
struct FrontendStat {
|
||||||
|
/// The query for flow id has been running since this timestamp
|
||||||
|
since: HashMap<FlowId, Instant>,
|
||||||
|
/// The average query time for each flow id
|
||||||
|
/// This is used to calculate the average query time for each flow id
|
||||||
|
past_query_avg: HashMap<FlowId, (usize, Duration)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default, Clone)]
|
||||||
|
pub struct FrontendStats {
|
||||||
|
/// The statistics for each flow id
|
||||||
|
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FrontendStats {
|
||||||
|
pub fn observe(&self, frontend_addr: &str, flow_id: FlowId) -> FrontendStatsGuard {
|
||||||
|
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||||
|
let stat = stats.entry(frontend_addr.to_string()).or_default();
|
||||||
|
stat.since.insert(flow_id, Instant::now());
|
||||||
|
|
||||||
|
FrontendStatsGuard {
|
||||||
|
stats: self.stats.clone(),
|
||||||
|
frontend_addr: frontend_addr.to_string(),
|
||||||
|
cur: flow_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// return frontend addrs sorted by load, from lightest to heaviest
|
||||||
|
/// The load is calculated as the total average query time for each flow id plus running query's total running time elapsed
|
||||||
|
pub fn sort_by_load(&self) -> Vec<String> {
|
||||||
|
let stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||||
|
let fe_load_factor = stats
|
||||||
|
.iter()
|
||||||
|
.map(|(node_addr, stat)| {
|
||||||
|
// total expected avg running time for all currently running queries
|
||||||
|
let total_expect_avg_run_time = stat
|
||||||
|
.since
|
||||||
|
.keys()
|
||||||
|
.map(|f| {
|
||||||
|
let (count, total_duration) =
|
||||||
|
stat.past_query_avg.get(f).unwrap_or(&(0, Duration::ZERO));
|
||||||
|
if *count == 0 {
|
||||||
|
0.0
|
||||||
|
} else {
|
||||||
|
total_duration.as_secs_f64() / *count as f64
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.sum::<f64>();
|
||||||
|
let total_cur_running_time = stat
|
||||||
|
.since
|
||||||
|
.values()
|
||||||
|
.map(|since| since.elapsed().as_secs_f64())
|
||||||
|
.sum::<f64>();
|
||||||
|
(
|
||||||
|
node_addr.to_string(),
|
||||||
|
total_expect_avg_run_time + total_cur_running_time,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.sorted_by(|(_, load_a), (_, load_b)| {
|
||||||
|
load_a
|
||||||
|
.partial_cmp(load_b)
|
||||||
|
.unwrap_or(std::cmp::Ordering::Equal)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
debug!("Frontend load factor: {:?}", fe_load_factor);
|
||||||
|
for (node_addr, load) in &fe_load_factor {
|
||||||
|
METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD
|
||||||
|
.with_label_values(&[&node_addr.to_string()])
|
||||||
|
.observe(*load);
|
||||||
|
}
|
||||||
|
fe_load_factor
|
||||||
|
.into_iter()
|
||||||
|
.map(|(addr, _)| addr)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FrontendStatsGuard {
|
||||||
|
stats: Arc<Mutex<HashMap<String, FrontendStat>>>,
|
||||||
|
frontend_addr: String,
|
||||||
|
cur: FlowId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for FrontendStatsGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let mut stats = self.stats.lock().expect("Failed to lock frontend stats");
|
||||||
|
if let Some(stat) = stats.get_mut(&self.frontend_addr) {
|
||||||
|
if let Some(since) = stat.since.remove(&self.cur) {
|
||||||
|
let elapsed = since.elapsed();
|
||||||
|
let (count, total_duration) = stat.past_query_avg.entry(self.cur).or_default();
|
||||||
|
*count += 1;
|
||||||
|
*total_duration += elapsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// A simple frontend client able to execute sql using grpc protocol
|
/// A simple frontend client able to execute sql using grpc protocol
|
||||||
///
|
///
|
||||||
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
/// This is for computation-heavy query which need to offload computation to frontend, lifting the load from flownode
|
||||||
@@ -83,6 +184,7 @@ pub enum FrontendClient {
|
|||||||
meta_client: Arc<MetaClient>,
|
meta_client: Arc<MetaClient>,
|
||||||
chnl_mgr: ChannelManager,
|
chnl_mgr: ChannelManager,
|
||||||
auth: Option<FlowAuthHeader>,
|
auth: Option<FlowAuthHeader>,
|
||||||
|
fe_stats: FrontendStats,
|
||||||
},
|
},
|
||||||
Standalone {
|
Standalone {
|
||||||
/// for the sake of simplicity still use grpc even in standalone mode
|
/// for the sake of simplicity still use grpc even in standalone mode
|
||||||
@@ -114,6 +216,7 @@ impl FrontendClient {
|
|||||||
ChannelManager::with_config(cfg)
|
ChannelManager::with_config(cfg)
|
||||||
},
|
},
|
||||||
auth,
|
auth,
|
||||||
|
fe_stats: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -192,6 +295,7 @@ impl FrontendClient {
|
|||||||
meta_client: _,
|
meta_client: _,
|
||||||
chnl_mgr,
|
chnl_mgr,
|
||||||
auth,
|
auth,
|
||||||
|
fe_stats,
|
||||||
} = self
|
} = self
|
||||||
else {
|
else {
|
||||||
return UnexpectedSnafu {
|
return UnexpectedSnafu {
|
||||||
@@ -208,8 +312,21 @@ impl FrontendClient {
|
|||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.as_millis() as i64;
|
.as_millis() as i64;
|
||||||
// shuffle the frontends to avoid always pick the same one
|
let node_addrs_by_load = fe_stats.sort_by_load();
|
||||||
frontends.shuffle(&mut rng());
|
// index+1 to load order asc, so that the lightest node has load 1 and non-existent node has load 0
|
||||||
|
let addr2load = node_addrs_by_load
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(i, id)| (id.clone(), i + 1))
|
||||||
|
.collect::<HashMap<_, _>>();
|
||||||
|
// sort frontends by load, from lightest to heaviest
|
||||||
|
frontends.sort_by(|(_, a), (_, b)| {
|
||||||
|
// if not even in stats, treat as 0 load since never been queried
|
||||||
|
let load_a = addr2load.get(&a.peer.addr).unwrap_or(&0);
|
||||||
|
let load_b = addr2load.get(&b.peer.addr).unwrap_or(&0);
|
||||||
|
load_a.cmp(load_b)
|
||||||
|
});
|
||||||
|
debug!("Frontend nodes sorted by load: {:?}", frontends);
|
||||||
|
|
||||||
// found node with maximum last_activity_ts
|
// found node with maximum last_activity_ts
|
||||||
for (_, node_info) in frontends
|
for (_, node_info) in frontends
|
||||||
@@ -257,6 +374,7 @@ impl FrontendClient {
|
|||||||
create: CreateTableExpr,
|
create: CreateTableExpr,
|
||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
|
task: Option<&BatchingTask>,
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
self.handle(
|
self.handle(
|
||||||
Request::Ddl(api::v1::DdlRequest {
|
Request::Ddl(api::v1::DdlRequest {
|
||||||
@@ -265,6 +383,7 @@ impl FrontendClient {
|
|||||||
catalog,
|
catalog,
|
||||||
schema,
|
schema,
|
||||||
&mut None,
|
&mut None,
|
||||||
|
task,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -276,15 +395,19 @@ impl FrontendClient {
|
|||||||
catalog: &str,
|
catalog: &str,
|
||||||
schema: &str,
|
schema: &str,
|
||||||
peer_desc: &mut Option<PeerDesc>,
|
peer_desc: &mut Option<PeerDesc>,
|
||||||
|
task: Option<&BatchingTask>,
|
||||||
) -> Result<u32, Error> {
|
) -> Result<u32, Error> {
|
||||||
match self {
|
match self {
|
||||||
FrontendClient::Distributed { .. } => {
|
FrontendClient::Distributed { fe_stats, .. } => {
|
||||||
let db = self.get_random_active_frontend(catalog, schema).await?;
|
let db = self.get_random_active_frontend(catalog, schema).await?;
|
||||||
|
|
||||||
*peer_desc = Some(PeerDesc::Dist {
|
*peer_desc = Some(PeerDesc::Dist {
|
||||||
peer: db.peer.clone(),
|
peer: db.peer.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
|
||||||
|
let _guard = fe_stats.observe(&db.peer.addr, flow_id);
|
||||||
|
|
||||||
db.database
|
db.database
|
||||||
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -280,7 +280,7 @@ impl BatchingTask {
|
|||||||
let catalog = &self.config.sink_table_name[0];
|
let catalog = &self.config.sink_table_name[0];
|
||||||
let schema = &self.config.sink_table_name[1];
|
let schema = &self.config.sink_table_name[1];
|
||||||
frontend_client
|
frontend_client
|
||||||
.create(expr.clone(), catalog, schema)
|
.create(expr.clone(), catalog, schema, Some(self))
|
||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -361,7 +361,7 @@ impl BatchingTask {
|
|||||||
};
|
};
|
||||||
|
|
||||||
frontend_client
|
frontend_client
|
||||||
.handle(req, catalog, schema, &mut peer_desc)
|
.handle(req, catalog, schema, &mut peer_desc, Some(self))
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -58,6 +58,14 @@ lazy_static! {
|
|||||||
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec =
|
||||||
|
register_histogram_vec!(
|
||||||
|
"greptime_flow_batching_engine_guess_fe_load",
|
||||||
|
"flow batching engine guessed frontend load",
|
||||||
|
&["fe_addr"],
|
||||||
|
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
|
||||||
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
|
||||||
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(
|
||||||
|
|||||||
Reference in New Issue
Block a user