From 91f373e66ea27db27efbde20d117e1d50043ff66 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 11 Jun 2025 01:54:29 +0800 Subject: [PATCH] metrics: more generic metrics batching mode --- src/flow/src/adapter.rs | 2 +- src/flow/src/adapter/flownode_impl.rs | 15 ++++++++++++++- src/flow/src/batching_mode/engine.rs | 5 +++++ src/flow/src/batching_mode/task.rs | 14 +++++++++++++- src/flow/src/metrics.rs | 14 ++++++++++++++ 5 files changed, 47 insertions(+), 3 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index f7cf78ca70..6a1697a389 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -316,7 +316,7 @@ impl StreamingEngine { ); METRIC_FLOW_ROWS - .with_label_values(&["out"]) + .with_label_values(&["out-streaming"]) .inc_by(total_rows as u64); let now = self.tick_manager.tick(); diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index b42ae2c29e..4f22a6637b 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -46,7 +46,7 @@ use crate::error::{ IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, }; -use crate::metrics::METRIC_FLOW_TASK_COUNT; +use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT}; use crate::repr::{self, DiffRow}; use crate::{Error, FlowId}; @@ -689,6 +689,9 @@ impl FlowEngine for FlowDualEngine { let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_batch_engine = request.requests; + let mut batching_row_cnt = 0; + let mut streaming_row_cnt = 0; + { // not locking this, or recover flows will be starved when also handling flow inserts let src_table2flow = self.src_table2flow.read().await; @@ -698,9 +701,11 @@ impl FlowEngine for FlowDualEngine { let is_in_stream = src_table2flow.in_stream(table_id); let is_in_batch = src_table2flow.in_batch(table_id); if is_in_stream { + streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0); to_stream_engine.push(req.clone()); } if is_in_batch { + batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0); return true; } if !is_in_batch && !is_in_stream { @@ -713,6 +718,14 @@ impl FlowEngine for FlowDualEngine { // can't use drop due to https://github.com/rust-lang/rust/pull/128846 } + METRIC_FLOW_ROWS + .with_label_values(&["in-streaming"]) + .inc_by(streaming_row_cnt as u64); + + METRIC_FLOW_ROWS + .with_label_values(&["in-batching"]) + .inc_by(batching_row_cnt as u64); + let streaming_engine = self.streaming_engine.clone(); let stream_handler: JoinHandle> = common_runtime::spawn_global(async move { diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 06bd4a4d3c..b80fe76d87 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -42,6 +42,7 @@ use crate::error::{ ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu, UnexpectedSnafu, UnsupportedSnafu, }; +use crate::metrics::METRIC_FLOW_ROWS; use crate::{CreateFlowArgs, Error, FlowId, TableName}; /// Batching mode Engine, responsible for driving all the batching mode tasks @@ -155,6 +156,10 @@ impl BatchingEngine { let Some(expr) = &task.config.time_window_expr else { continue; }; + let row_cnt: usize = entry.iter().map(|rows| rows.rows.len()).sum(); + METRIC_FLOW_ROWS + .with_label_values(&[&format!("{}-batching-in", task.config.flow_id)]) + .inc_by(row_cnt as u64); let involved_time_windows = expr.handle_rows(entry.clone()).await?; let mut state = task.state.write().unwrap(); state diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index f93755d4f8..aef9eea5e8 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -61,7 +61,9 @@ use crate::error::{ SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, }; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, + METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, + METRIC_FLOW_ROWS, }; use crate::{Error, FlowId}; @@ -371,6 +373,9 @@ impl BatchingTask { "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", elapsed ); + METRIC_FLOW_ROWS + .with_label_values(&[format!("{}-out-batching", flow_id).as_str()]) + .inc_by(*affected_rows as _); } else if let Err(err) = &res { warn!( "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", @@ -410,6 +415,7 @@ impl BatchingTask { engine: QueryEngineRef, frontend_client: Arc, ) { + let flow_id_str = self.config.flow_id.to_string(); loop { // first check if shutdown signal is received // if so, break the loop @@ -427,6 +433,9 @@ impl BatchingTask { Err(TryRecvError::Empty) => (), } } + METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT + .with_label_values(&[&flow_id_str]) + .inc(); let new_query = match self.gen_insert_plan(&engine).await { Ok(new_query) => new_query, @@ -473,6 +482,9 @@ impl BatchingTask { } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT + .with_label_values(&[&flow_id_str]) + .inc(); match new_query { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 2b93a4a0a0..4fc6ccc433 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -58,6 +58,20 @@ lazy_static! { vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_start_query_count", + "flow batching engine started query count", + &["flow_id"], + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_error_count", + "flow batching engine error count per flow id", + &["flow_id"], + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(