From 38cac301f2ad3340c88c5f6514820ea7bb91bd30 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Tue, 3 Jun 2025 20:27:07 +0800 Subject: [PATCH] refactor(flow): limit the size of query (#6216) * refactor: not wait for slow query * chore: clippy * chore: fmt * WIP: time range lock * WIP * refactor: rm over-complicated query pool * chore: add more metrics& rm sql from slow query metrics --- src/flow/src/batching_mode/state.rs | 74 +++++++++++++++++++++++++---- src/flow/src/batching_mode/task.rs | 27 +++++++---- src/flow/src/metrics.rs | 18 ++++++- 3 files changed, 99 insertions(+), 20 deletions(-) diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 57d254a63b..f2e101a533 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -30,6 +30,9 @@ use crate::batching_mode::task::BatchingTask; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; +use crate::metrics::{ + METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, +}; use crate::{Error, FlowId}; /// The state of the [`BatchingTask`]. @@ -127,10 +130,10 @@ impl DirtyTimeWindows { /// Time window merge distance /// /// TODO(discord9): make those configurable - const MERGE_DIST: i32 = 3; + pub const MERGE_DIST: i32 = 3; /// Maximum number of filters allowed in a single query - const MAX_FILTER_NUM: usize = 20; + pub const MAX_FILTER_NUM: usize = 20; /// Add lower bounds to the dirty time windows. Upper bounds are ignored. /// @@ -154,11 +157,16 @@ impl DirtyTimeWindows { } /// Generate all filter expressions consuming all time windows + /// + /// there is two limits: + /// - shouldn't return a too long time range(<=`window_size * window_cnt`), so that the query can be executed in a reasonable time + /// - shouldn't return too many time range exprs, so that the query can be parsed properly instead of causing parser to overflow pub fn gen_filter_exprs( &mut self, col_name: &str, expire_lower_bound: Option, window_size: chrono::Duration, + window_cnt: usize, flow_id: FlowId, task_ctx: Option<&BatchingTask>, ) -> Result, Error> { @@ -196,12 +204,33 @@ impl DirtyTimeWindows { } } - // get the first `MAX_FILTER_NUM` time windows - let nth = self - .windows - .iter() - .nth(Self::MAX_FILTER_NUM) - .map(|(key, _)| *key); + // get the first `window_cnt` time windows + let max_time_range = window_size * window_cnt as i32; + let nth = { + let mut cur_time_range = chrono::Duration::zero(); + let mut nth_key = None; + for (idx, (start, end)) in self.windows.iter().enumerate() { + // if time range is too long, stop + if cur_time_range > max_time_range { + nth_key = Some(*start); + break; + } + + // if we have enough time windows, stop + if idx >= window_cnt { + nth_key = Some(*start); + break; + } + + if let Some(end) = end { + if let Some(x) = end.sub(start) { + cur_time_range += x; + } + } + } + + nth_key + }; let first_nth = { if let Some(nth) = nth { let mut after = self.windows.split_off(&nth); @@ -213,6 +242,24 @@ impl DirtyTimeWindows { } }; + METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT + .with_label_values(&[flow_id.to_string().as_str()]) + .observe(first_nth.len() as f64); + + let full_time_range = first_nth + .iter() + .fold(chrono::Duration::zero(), |acc, (start, end)| { + if let Some(end) = end { + acc + end.sub(start).unwrap_or(chrono::Duration::zero()) + } else { + acc + } + }) + .num_seconds() as f64; + METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE + .with_label_values(&[flow_id.to_string().as_str()]) + .observe(full_time_range); + let mut expr_lst = vec![]; for (start, end) in first_nth.into_iter() { // align using time window exprs @@ -274,6 +321,8 @@ impl DirtyTimeWindows { } /// Merge time windows that overlaps or get too close + /// + /// TODO(discord9): not merge and prefer to send smaller time windows? how? pub fn merge_dirty_time_windows( &mut self, window_size: chrono::Duration, @@ -472,7 +521,14 @@ mod test { .unwrap(); assert_eq!(expected, dirty.windows); let filter_expr = dirty - .gen_filter_exprs("ts", expire_lower_bound, window_size, 0, None) + .gen_filter_exprs( + "ts", + expire_lower_bound, + window_size, + DirtyTimeWindows::MAX_FILTER_NUM, + 0, + None, + ) .unwrap(); let unparser = datafusion::sql::unparser::Unparser::default(); diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 7161a7fb49..f93755d4f8 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -46,7 +46,7 @@ use tokio::time::Instant; use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::state::TaskState; +use crate::batching_mode::state::{DirtyTimeWindows, TaskState}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, @@ -387,7 +387,6 @@ impl BatchingTask { METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY .with_label_values(&[ flow_id.to_string().as_str(), - &plan.to_string(), &peer_desc.unwrap_or_default().to_string(), ]) .observe(elapsed.as_secs_f64()); @@ -429,16 +428,23 @@ impl BatchingTask { } } - let mut new_query = None; - let mut gen_and_exec = async || { - new_query = self.gen_insert_plan(&engine).await?; - if let Some(new_query) = &new_query { - self.execute_logical_plan(&frontend_client, new_query).await - } else { - Ok(None) + let new_query = match self.gen_insert_plan(&engine).await { + Ok(new_query) => new_query, + Err(err) => { + common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id); + // also sleep for a little while before try again to prevent flooding logs + tokio::time::sleep(MIN_REFRESH_DURATION).await; + continue; } }; - match gen_and_exec().await { + + let res = if let Some(new_query) = &new_query { + self.execute_logical_plan(&frontend_client, new_query).await + } else { + Ok(None) + }; + + match res { // normal execute, sleep for some time before doing next query Ok(Some(_)) => { let sleep_until = { @@ -583,6 +589,7 @@ impl BatchingTask { &col_name, Some(l), window_size, + DirtyTimeWindows::MAX_FILTER_NUM, self.config.flow_id, Some(self), )?; diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index b1540c0f56..2b93a4a0a0 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -38,10 +38,26 @@ lazy_static! { pub static ref METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_slow_query_secs", "flow batching engine slow query(seconds)", - &["flow_id", "sql", "peer"], + &["flow_id", "peer"], vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_query_window_cnt", + "flow batching engine query time window count", + &["flow_id"], + vec![0.0, 5., 10., 20., 40.] + ) + .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_query_time_range_secs", + "flow batching engine query time range(seconds)", + &["flow_id"], + vec![60., 4. * 60., 16. * 60., 64. * 60., 256. * 60.] + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(