diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index f7cf78ca70..6a1697a389 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -316,7 +316,7 @@ impl StreamingEngine { ); METRIC_FLOW_ROWS - .with_label_values(&["out"]) + .with_label_values(&["out-streaming"]) .inc_by(total_rows as u64); let now = self.tick_manager.tick(); diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index 25bbc1e5e0..eba72c26e9 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -47,7 +47,7 @@ use crate::error::{ IllegalCheckTaskStateSnafu, InsertIntoFlowSnafu, InternalSnafu, JoinTaskSnafu, ListFlowsSnafu, NoAvailableFrontendSnafu, SyncCheckTaskSnafu, UnexpectedSnafu, }; -use crate::metrics::METRIC_FLOW_TASK_COUNT; +use crate::metrics::{METRIC_FLOW_ROWS, METRIC_FLOW_TASK_COUNT}; use crate::repr::{self, DiffRow}; use crate::{Error, FlowId}; @@ -690,6 +690,9 @@ impl FlowEngine for FlowDualEngine { let mut to_stream_engine = Vec::with_capacity(request.requests.len()); let mut to_batch_engine = request.requests; + let mut batching_row_cnt = 0; + let mut streaming_row_cnt = 0; + { // not locking this, or recover flows will be starved when also handling flow inserts let src_table2flow = self.src_table2flow.read().await; @@ -699,9 +702,11 @@ impl FlowEngine for FlowDualEngine { let is_in_stream = src_table2flow.in_stream(table_id); let is_in_batch = src_table2flow.in_batch(table_id); if is_in_stream { + streaming_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0); to_stream_engine.push(req.clone()); } if is_in_batch { + batching_row_cnt += req.rows.as_ref().map(|rs| rs.rows.len()).unwrap_or(0); return true; } if !is_in_batch && !is_in_stream { @@ -714,6 +719,14 @@ impl FlowEngine for FlowDualEngine { // can't use drop due to https://github.com/rust-lang/rust/pull/128846 } + METRIC_FLOW_ROWS + .with_label_values(&["in-streaming"]) + .inc_by(streaming_row_cnt as u64); + + METRIC_FLOW_ROWS + .with_label_values(&["in-batching"]) + .inc_by(batching_row_cnt as u64); + let streaming_engine = self.streaming_engine.clone(); let stream_handler: JoinHandle> = common_runtime::spawn_global(async move { diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index f93755d4f8..aef9eea5e8 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -61,7 +61,9 @@ use crate::error::{ SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, }; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, + METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, + METRIC_FLOW_ROWS, }; use crate::{Error, FlowId}; @@ -371,6 +373,9 @@ impl BatchingTask { "Flow {flow_id} executed, affected_rows: {affected_rows:?}, elapsed: {:?}", elapsed ); + METRIC_FLOW_ROWS + .with_label_values(&[format!("{}-out-batching", flow_id).as_str()]) + .inc_by(*affected_rows as _); } else if let Err(err) = &res { warn!( "Failed to execute Flow {flow_id} on frontend {:?}, result: {err:?}, elapsed: {:?} with query: {}", @@ -410,6 +415,7 @@ impl BatchingTask { engine: QueryEngineRef, frontend_client: Arc, ) { + let flow_id_str = self.config.flow_id.to_string(); loop { // first check if shutdown signal is received // if so, break the loop @@ -427,6 +433,9 @@ impl BatchingTask { Err(TryRecvError::Empty) => (), } } + METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT + .with_label_values(&[&flow_id_str]) + .inc(); let new_query = match self.gen_insert_plan(&engine).await { Ok(new_query) => new_query, @@ -473,6 +482,9 @@ impl BatchingTask { } // TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed Err(err) => { + METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT + .with_label_values(&[&flow_id_str]) + .inc(); match new_query { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 3939afcc44..16c2b29de6 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -66,11 +66,25 @@ lazy_static! { vec![0.0, 60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_start_query_count", + "flow batching engine started query count", + &["flow_id"], + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT: IntCounterVec = + register_int_counter_vec!( + "greptime_flow_batching_error_count", + "flow batching engine error count per flow id", + &["flow_id"], + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( "greptime_flow_processed_rows", - "Count of rows flowing through the system", + "Count of rows flowing through the system.", &["direction"] ) .unwrap();