From 73ce5914f20972b6ee4603520d6c060a0c6d9fe9 Mon Sep 17 00:00:00 2001 From: discord9 Date: Tue, 10 Jun 2025 23:11:00 +0800 Subject: [PATCH] feat: metrics --- src/flow/src/batching_mode/engine.rs | 5 +++++ src/flow/src/metrics.rs | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 4cdf68e389..61d8633886 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -42,6 +42,7 @@ use crate::error::{ ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; +use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE; use crate::{CreateFlowArgs, Error, FlowId, TableName}; /// Batching mode Engine, responsible for driving all the batching mode tasks @@ -166,7 +167,11 @@ impl BatchingEngine { } } let mut state = task.state.write().unwrap(); + let flow_id_label = task.config.flow_id.to_string(); for (s, e) in all_dirty_windows { + METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE + .with_label_values(&[&flow_id_label]) + .observe(e.sub(&s).unwrap_or_default().num_seconds() as f64); state.dirty_time_windows.add_window(s, Some(e)); } Ok(()) diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 2b93a4a0a0..3939afcc44 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -58,6 +58,14 @@ lazy_static! { vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_bulk_mark_time_window_range_secs", + "flow batching engine query time window range marked by bulk memtable in seconds", + &["flow_id"], + vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(