feat: better metrics

This commit is contained in:
discord9
2025-06-09 20:25:53 +08:00
parent e962076207
commit e4328380b2
4 changed files with 112 additions and 23 deletions

View File

@@ -286,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts` /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query /// and is able to process query
async fn get_random_active_frontend( pub(crate) async fn get_random_active_frontend(
&self, &self,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
@@ -382,7 +382,7 @@ impl FrontendClient {
}), }),
catalog, catalog,
schema, schema,
&mut None, None,
task, task,
) )
.await .await
@@ -394,16 +394,28 @@ impl FrontendClient {
req: api::v1::greptime_request::Request, req: api::v1::greptime_request::Request,
catalog: &str, catalog: &str,
schema: &str, schema: &str,
peer_desc: &mut Option<PeerDesc>, use_peer: Option<Peer>,
task: Option<&BatchingTask>, task: Option<&BatchingTask>,
) -> Result<u32, Error> { ) -> Result<u32, Error> {
match self { match self {
FrontendClient::Distributed { fe_stats, .. } => { FrontendClient::Distributed {
let db = self.get_random_active_frontend(catalog, schema).await?; fe_stats, chnl_mgr, ..
} => {
*peer_desc = Some(PeerDesc::Dist { let db = if let Some(peer) = use_peer {
peer: db.peer.clone(), DatabaseWithPeer::new(
}); Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default(); let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id); let _guard = fe_stats.observe(&db.peer.addr, flow_id);

View File

@@ -53,6 +53,8 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>, pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle /// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>, pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds /// min run interval in seconds
pub(crate) min_run_interval: Option<u64>, pub(crate) min_run_interval: Option<u64>,
@@ -69,6 +71,7 @@ impl TaskState {
exec_state: ExecState::Idle, exec_state: ExecState::Idle,
shutdown_rx, shutdown_rx,
task_handle: None, task_handle: None,
slow_query_metric_task: None,
min_run_interval: None, min_run_interval: None,
max_filter_num: None, max_filter_num: None,
} }
@@ -248,11 +251,17 @@ impl DirtyTimeWindows {
}; };
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(first_nth.len() as f64); .observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64); .observe(self.windows.len() as f64);
let full_time_range = first_nth let full_time_range = first_nth
@@ -266,7 +275,10 @@ impl DirtyTimeWindows {
}) })
.num_seconds() as f64; .num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range); .observe(full_time_range);
let mut expr_lst = vec![]; let mut expr_lst = vec![];

View File

@@ -61,7 +61,8 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
}; };
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -81,6 +82,14 @@ pub struct TaskConfig {
query_type: QueryType, query_type: QueryType,
} }
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> { fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts = let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default()) ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -334,11 +343,53 @@ impl BatchingTask {
})?; })?;
let plan = expanded_plan; let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = { let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer(); .start_timer();
// hack and special handling the insert logical plan // hack and special handling the insert logical plan
@@ -367,10 +418,12 @@ impl BatchingTask {
}; };
frontend_client frontend_client
.handle(req, catalog, schema, &mut peer_desc, Some(self)) .handle(req, catalog, schema, Some(db.peer), Some(self))
.await .await
}; };
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed(); let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res { if let Ok(affected_rows) = &res {
debug!( debug!(
@@ -393,7 +446,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[ .with_label_values(&[
flow_id.to_string().as_str(), flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(), &peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
]) ])
.observe(elapsed.as_secs_f64()); .observe(elapsed.as_secs_f64());
} }

View File

@@ -31,22 +31,29 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs", "greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)", "flow batching engine query time(seconds)",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs", "greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)", "flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer"], &["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt", "greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count", "flow batching engine stalled query time window count",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
@@ -54,7 +61,7 @@ lazy_static! {
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt", "greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count", "flow batching engine query time window count",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.] vec![0.0, 5., 10., 20., 40.]
) )
.unwrap(); .unwrap();
@@ -62,7 +69,7 @@ lazy_static! {
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs", "greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)", "flow batching engine query time range(seconds)",
&["flow_id"], &["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
) )
.unwrap(); .unwrap();