diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 261a0995e1..01c7aaa264 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -31,8 +31,9 @@ use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, + METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE, METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT, + METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE, }; use crate::{Error, FlowId}; @@ -280,10 +281,25 @@ impl DirtyTimeWindows { } }) .num_seconds() as f64; - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE + METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE .with_label_values(&[flow_id.to_string().as_str()]) .observe(full_time_range); + let stalled_time_range = + self.windows + .iter() + .fold(chrono::Duration::zero(), |acc, (start, end)| { + if let Some(end) = end { + acc + end.sub(start).unwrap_or(chrono::Duration::zero()) + } else { + acc + } + }); + + METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE + .with_label_values(&[flow_id.to_string().as_str()]) + .observe(stalled_time_range.num_seconds() as f64); + let mut expr_lst = vec![]; for (start, end) in to_be_query.into_iter() { // align using time window exprs diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 6a3b45d588..6c792f8f35 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -58,10 +58,18 @@ lazy_static! { vec![0.0, 5., 10., 20., 40.] ) .unwrap(); - pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec = + pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE: HistogramVec = register_histogram_vec!( - "greptime_flow_batching_engine_query_time_range_secs", - "flow batching engine query time range(seconds)", + "greptime_flow_batching_engine_query_window_size_secs", + "flow batching engine query window size(seconds)", + &["flow_id"], + vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_WINDOW_SIZE: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_stalled_window_size_secs", + "flow batching engine stalled window size(seconds)", &["flow_id"], vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] )