diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs index 3ca322baa6..c50f90480d 100644 --- a/src/flow/src/batching_mode/frontend_client.rs +++ b/src/flow/src/batching_mode/frontend_client.rs @@ -286,7 +286,7 @@ impl FrontendClient { /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts` /// and is able to process query - async fn get_random_active_frontend( + pub(crate) async fn get_random_active_frontend( &self, catalog: &str, schema: &str, @@ -382,7 +382,7 @@ impl FrontendClient { }), catalog, schema, - &mut None, + None, task, ) .await @@ -394,16 +394,28 @@ impl FrontendClient { req: api::v1::greptime_request::Request, catalog: &str, schema: &str, - peer_desc: &mut Option, + use_peer: Option, task: Option<&BatchingTask>, ) -> Result { match self { - FrontendClient::Distributed { fe_stats, .. } => { - let db = self.get_random_active_frontend(catalog, schema).await?; - - *peer_desc = Some(PeerDesc::Dist { - peer: db.peer.clone(), - }); + FrontendClient::Distributed { + fe_stats, chnl_mgr, .. + } => { + let db = if let Some(peer) = use_peer { + DatabaseWithPeer::new( + Database::new( + catalog, + schema, + Client::with_manager_and_urls( + chnl_mgr.clone(), + vec![peer.addr.clone()], + ), + ), + peer, + ) + } else { + self.get_random_active_frontend(catalog, schema).await? + }; let flow_id = task.map(|t| t.config.flow_id).unwrap_or_default(); let _guard = fe_stats.observe(&db.peer.addr, flow_id); diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 1aa9e468be..e4bc15e2c0 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -53,6 +53,8 @@ pub struct TaskState { pub(crate) shutdown_rx: oneshot::Receiver<()>, /// Task handle pub(crate) task_handle: Option>, + /// Slow Query metrics update task handle + pub(crate) slow_query_metric_task: Option>, /// min run interval in seconds pub(crate) min_run_interval: Option, @@ -69,6 +71,7 @@ impl TaskState { exec_state: ExecState::Idle, shutdown_rx, task_handle: None, + slow_query_metric_task: None, min_run_interval: None, max_filter_num: None, } @@ -248,11 +251,17 @@ impl DirtyTimeWindows { }; METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT - .with_label_values(&[flow_id.to_string().as_str()]) + .with_label_values(&[ + flow_id.to_string().as_str(), + format!("{}", window_size).as_str(), + ]) .observe(first_nth.len() as f64); METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT - .with_label_values(&[flow_id.to_string().as_str()]) + .with_label_values(&[ + flow_id.to_string().as_str(), + format!("{}", window_size).as_str(), + ]) .observe(self.windows.len() as f64); let full_time_range = first_nth @@ -266,7 +275,10 @@ impl DirtyTimeWindows { }) .num_seconds() as f64; METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE - .with_label_values(&[flow_id.to_string().as_str()]) + .with_label_values(&[ + flow_id.to_string().as_str(), + format!("{}", window_size).as_str(), + ]) .observe(full_time_range); let mut expr_lst = vec![]; diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 2474409211..2ebfcfb2ce 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -61,7 +61,8 @@ use crate::error::{ SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, }; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, + METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT, + METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, }; use crate::{Error, FlowId}; @@ -81,6 +82,14 @@ pub struct TaskConfig { query_type: QueryType, } +impl TaskConfig { + pub fn time_window_size(&self) -> Option { + self.time_window_expr + .as_ref() + .and_then(|expr| *expr.time_window_size()) + } +} + fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result { let stmts = ParserContext::create_with_dialect(query, query_ctx.sql_dialect(), ParseOptions::default()) @@ -334,11 +343,53 @@ impl BatchingTask { })?; let plan = expanded_plan; - let mut peer_desc = None; + + let db = frontend_client + .get_random_active_frontend(catalog, schema) + .await?; + let peer_desc = db.peer.clone(); + + let (tx, mut rx) = oneshot::channel(); + let peer_inner = peer_desc.clone(); + let window_size_pretty = format!( + "{}s", + self.config.time_window_size().unwrap_or_default().as_secs() + ); + let inner_window_size_pretty = window_size_pretty.clone(); + let flow_id = self.config.flow_id; + let slow_query_metric_task = tokio::task::spawn(async move { + tokio::time::sleep(SLOW_QUERY_THRESHOLD).await; + METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT + .with_label_values(&[ + flow_id.to_string().as_str(), + &peer_inner.to_string(), + inner_window_size_pretty.as_str(), + ]) + .add(1.0); + while rx.try_recv() == Err(TryRecvError::Empty) { + // sleep for a while before next update + tokio::time::sleep(MIN_REFRESH_DURATION).await; + } + METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT + .with_label_values(&[ + flow_id.to_string().as_str(), + &peer_inner.to_string(), + inner_window_size_pretty.as_str(), + ]) + .sub(1.0); + }); + self.state.write().unwrap().slow_query_metric_task = Some(slow_query_metric_task); let res = { let _timer = METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME - .with_label_values(&[flow_id.to_string().as_str()]) + .with_label_values(&[ + flow_id.to_string().as_str(), + format!( + "{}s", + self.config.time_window_size().unwrap_or_default().as_secs() + ) + .as_str(), + ]) .start_timer(); // hack and special handling the insert logical plan @@ -367,10 +418,12 @@ impl BatchingTask { }; frontend_client - .handle(req, catalog, schema, &mut peer_desc, Some(self)) + .handle(req, catalog, schema, Some(db.peer), Some(self)) .await }; + // signaling the slow query metric task to stop + let _ = tx.send(()); let elapsed = instant.elapsed(); if let Ok(affected_rows) = &res { debug!( @@ -393,7 +446,12 @@ impl BatchingTask { METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY .with_label_values(&[ flow_id.to_string().as_str(), - &peer_desc.unwrap_or_default().to_string(), + &peer_desc.to_string(), + format!( + "{}s", + self.config.time_window_size().unwrap_or_default().as_secs() + ) + .as_str(), ]) .observe(elapsed.as_secs_f64()); } diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index e0ff17c8f4..24fe1a081d 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -31,22 +31,29 @@ lazy_static! { pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_query_time_secs", "flow batching engine query time(seconds)", - &["flow_id"], + &["flow_id", "time_window_granularity"], vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] ) .unwrap(); pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_slow_query_secs", - "flow batching engine slow query(seconds)", - &["flow_id", "peer"], + "flow batching engine slow query(seconds), updated after query finished", + &["flow_id", "peer", "time_window_granularity"], vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_REAL_TIME_SLOW_QUERY_CNT: GaugeVec = + register_gauge_vec!( + "greptime_flow_batching_engine_real_time_slow_query_number", + "flow batching engine real time slow query number, updated in real time", + &["flow_id", "peer", "time_window_granularity"], + ) + .unwrap(); pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_stalled_query_window_cnt", "flow batching engine stalled query time window count", - &["flow_id"], + &["flow_id", "time_window_granularity"], vec![0.0, 5., 10., 20., 40.] ) .unwrap(); @@ -54,7 +61,7 @@ lazy_static! { register_histogram_vec!( "greptime_flow_batching_engine_query_window_cnt", "flow batching engine query time window count", - &["flow_id"], + &["flow_id", "time_window_granularity"], vec![0.0, 5., 10., 20., 40.] ) .unwrap(); @@ -62,7 +69,7 @@ lazy_static! { register_histogram_vec!( "greptime_flow_batching_engine_query_time_range_secs", "flow batching engine query time range(seconds)", - &["flow_id"], + &["flow_id", "time_window_granularity"], vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap();