Compare commits

...

1 Commits

Author SHA1 Message Date
discord9
0443042f14 feat: better metrics 2025-06-09 20:28:29 +08:00
4 changed files with 112 additions and 23 deletions

View File

@@ -286,7 +286,7 @@ impl FrontendClient {
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
async fn get_random_active_frontend(
pub(crate) async fn get_random_active_frontend(
&self,
catalog: &str,
schema: &str,
@@ -382,7 +382,7 @@ impl FrontendClient {
}),
catalog,
schema,
&mut None,
None,
task,
)
.await
@@ -394,16 +394,28 @@ impl FrontendClient {
req: api::v1::greptime_request::Request,
catalog: &str,
schema: &str,
peer_desc: &mut Option<PeerDesc>,
use_peer: Option<Peer>,
task: Option<&BatchingTask>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { fe_stats, .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
peer: db.peer.clone(),
});
FrontendClient::Distributed {
fe_stats, chnl_mgr, ..
} => {
let db = if let Some(peer) = use_peer {
DatabaseWithPeer::new(
Database::new(
catalog,
schema,
Client::with_manager_and_urls(
chnl_mgr.clone(),
vec![peer.addr.clone()],
),
),
peer,
)
} else {
self.get_random_active_frontend(catalog, schema).await?
};
let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default();
let _guard = fe_stats.observe(&db.peer.addr, flow_id);

View File

@@ -53,6 +53,8 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// Slow Query metrics update task handle
pub(crate) slow_query_metric_task: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
@@ -69,6 +71,7 @@ impl TaskState {
exec_state: ExecState::Idle,
shutdown_rx,
task_handle: None,
slow_query_metric_task: None,
min_run_interval: None,
max_filter_num: None,
}
@@ -248,11 +251,17 @@ impl DirtyTimeWindows {
};
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(self.windows.len() as f64);
let full_time_range = first_nth
@@ -266,7 +275,10 @@ impl DirtyTimeWindows {
})
.num_seconds() as f64;
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!("{}", window_size).as_str(),
])
.observe(full_time_range);
let mut expr_lst = vec![];

View File

@@ -61,7 +61,8 @@ use crate::error::{
SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu,
};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT,
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY,
};
use crate::{Error, FlowId};
@@ -81,6 +82,14 @@ pub struct TaskConfig {
query_type: QueryType,
}
impl TaskConfig {
pub fn time_window_size(&self) -> Option<Duration> {
self.time_window_expr
.as_ref()
.and_then(|expr| *expr.time_window_size())
}
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
let stmts =
ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default())
@@ -334,11 +343,53 @@ impl BatchingTask {
})?;
let plan = expanded_plan;
let mut peer_desc = None;
let db = frontend_client
.get_random_active_frontend(catalog, schema)
.await?;
let peer_desc = db.peer.clone();
let (tx, mut rx) = oneshot::channel();
let peer_inner = peer_desc.clone();
let window_size_pretty = format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
);
let inner_window_size_pretty = window_size_pretty.clone();
let flow_id = self.config.flow_id;
let slow_query_metric_task = tokio::task::spawn(async move {
tokio::time::sleep(SLOW_QUERY_THRESHOLD).await;
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.add(1.0);
while rx.try_recv() == Err(TryRecvError::Empty) {
// sleep for a while before next update
tokio::time::sleep(MIN_REFRESH_DURATION).await;
}
METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_inner.to_string(),
inner_window_size_pretty.as_str(),
])
.sub(1.0);
});
self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task);
let res = {
let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.with_label_values(&[
flow_id.to_string().as_str(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.start_timer();
// hack and special handling the insert logical plan
@@ -367,10 +418,12 @@ impl BatchingTask {
};
frontend_client
.handle(req, catalog, schema, &mut peer_desc, Some(self))
.handle(req, catalog, schema, Some(db.peer), Some(self))
.await
};
// signaling the slow query metric task to stop
let _ = tx.send(());
let elapsed = instant.elapsed();
if let Ok(affected_rows) = &res {
debug!(
@@ -393,7 +446,12 @@ impl BatchingTask {
METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY
.with_label_values(&[
flow_id.to_string().as_str(),
&peer_desc.unwrap_or_default().to_string(),
&peer_desc.to_string(),
format!(
"{}s",
self.config.time_window_size().unwrap_or_default().as_secs()
)
.as_str(),
])
.observe(elapsed.as_secs_f64());
}

View File

@@ -31,22 +31,29 @@ lazy_static! {
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_query_time_secs",
"flow batching engine query time(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_batching_engine_slow_query_secs",
"flow batching engine slow query(seconds)",
&["flow_id", "peer"],
"flow batching engine slow query(seconds), updated after query finished",
&["flow_id", "peer", "time_window_granularity"],
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec =
register_gauge_vec!(
"greptime_flow_batching_engine_real_time_slow_query_number",
"flow batching engine real time slow query number, updated in real time",
&["flow_id", "peer", "time_window_granularity"],
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -54,7 +61,7 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt",
"flow batching engine query time window count",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
@@ -62,7 +69,7 @@ lazy_static! {
register_histogram_vec!(
"greptime_flow_batching_engine_query_time_range_secs",
"flow batching engine query time range(seconds)",
&["flow_id"],
&["flow_id", "time_window_granularity"],
vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.]
)
.unwrap();