From ced018fce0c28e3d85fc577031c1b0f326805b11 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Wed, 11 Jun 2025 07:41:13 +0000 Subject: [PATCH] feat/bulk-support-flow-batch: ### Update Metrics in Batching Mode Engine - **Modified Metrics**: Replaced `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE` with `METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW` to track the count of time windows instead of their range. - Files affected: `engine.rs`, `metrics.rs` - **New Method**: Added `len()` method to `DirtyTimeWindows` to return the number of dirty windows. - File affected: `state.rs` Signed-off-by: Lei, HUANG --- src/flow/src/batching_mode/engine.rs | 9 +++++---- src/flow/src/batching_mode/state.rs | 5 +++++ src/flow/src/metrics.rs | 9 ++++----- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 842ae100b4..e18e466867 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -42,7 +42,7 @@ use crate::error::{ ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; -use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE; +use crate::metrics::METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW; use crate::{CreateFlowArgs, Error, FlowId, TableName}; /// Batching mode Engine, responsible for driving all the batching mode tasks @@ -163,10 +163,11 @@ impl BatchingEngine { let flow_id_label = task.config.flow_id.to_string(); for timestamp in all_dirty_windows { state.dirty_time_windows.add_window(timestamp, None); - METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE - .with_label_values(&[&flow_id_label]) - .observe(e.sub(&s).unwrap_or_default().num_seconds() as f64); } + + METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW + .with_label_values(&[&flow_id_label]) + .set(state.dirty_time_windows.len() as f64); Ok(()) }); handles.push(handle); diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index f2e101a533..0ecf68b488 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -156,6 +156,11 @@ impl DirtyTimeWindows { self.windows.clear(); } + /// Number of dirty windows. + pub fn len(&self) -> usize { + self.windows.len() + } + /// Generate all filter expressions consuming all time windows /// /// there is two limits: diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 16c2b29de6..66ec7423e0 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -58,12 +58,11 @@ lazy_static! { vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); - pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW_RANGE: HistogramVec = - register_histogram_vec!( - "greptime_flow_batching_engine_bulk_mark_time_window_range_secs", - "flow batching engine query time window range marked by bulk memtable in seconds", + pub static ref METRIC_FLOW_BATCHING_ENGINE_BULK_MARK_TIME_WINDOW: GaugeVec = + register_gauge_vec!( + "greptime_flow_batching_engine_bulk_mark_time_window", + "flow batching engine query time window count marked by bulk inserts", &["flow_id"], - vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec =