diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index f2e101a533..3a4dcd4382 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::metrics::{ METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, + METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME, }; use crate::{Error, FlowId}; @@ -102,8 +103,19 @@ impl TaskState { }) .unwrap_or(last_duration); + METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME + .with_label_values(&[ + flow_id.to_string().as_str(), + time_window_size + .unwrap_or_default() + .as_secs_f64() + .to_string() + .as_str(), + ]) + .observe(next_duration.as_secs_f64()); + // if have dirty time window, execute immediately to clean dirty time window - if self.dirty_time_windows.windows.is_empty() { + /*if self.dirty_time_windows.windows.is_empty() { self.last_update_time + next_duration } else { debug!( @@ -113,7 +125,9 @@ impl TaskState { self.dirty_time_windows.windows ); Instant::now() - } + }*/ + + self.last_update_time + next_duration } } diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 4fc6ccc433..ff2114b257 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -35,6 +35,13 @@ lazy_static! { vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME: HistogramVec = register_histogram_vec!( + "greptime_flow_batching_engine_wait_time_secs", + "flow batching engine wait time between query(seconds)", + &["flow_id", "time_window_size"], + vec![0.0, 5., 10., 20., 40., 80., 160., 320., 640.,] + ) + .unwrap(); pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_slow_query_secs", "flow batching engine slow query(seconds)",