diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index e7439f20c7..b7494fd78d 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -59,6 +59,11 @@ pub struct TaskState { pub(crate) min_run_interval: Option, /// max filter number per query pub(crate) max_filter_num: Option, + /// Current filter count, will grow when query succeeds(capped by `max_filter_num`), + /// and reset to 1 when query fails. + /// + /// This is useful for controlling resource usage + pub(crate) cur_filter_cnt: usize, } impl TaskState { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { @@ -72,6 +77,7 @@ impl TaskState { task_handle: None, min_run_interval: None, max_filter_num: None, + cur_filter_cnt: 1, } } @@ -140,6 +146,17 @@ pub struct DirtyTimeWindows { windows: BTreeMap>, } +/// Time windows that are being worked on, which are not dirty but are currently being processed +#[derive(Debug, Clone, Default)] +pub struct WorkingTimeWindows { + /// windows's `start -> end` and non-overlapping + /// `end` is exclusive(and optional) + pub windows: BTreeMap>, + /// Filter expression for the time windows + /// This is used to filter the data in the time windows. + pub filter: Option, +} + impl DirtyTimeWindows { /// Time window merge distance /// @@ -177,6 +194,12 @@ impl DirtyTimeWindows { self.windows.insert(start, end); } + pub fn add_windows(&mut self, windows: BTreeMap>) { + for (start, end) in windows { + self.windows.insert(start, end); + } + } + /// Clean all dirty time windows, useful when can't found time window expr pub fn clean(&mut self) { self.windows.clear(); @@ -195,7 +218,7 @@ impl DirtyTimeWindows { window_cnt: usize, flow_id: FlowId, task_ctx: Option<&BatchingTask>, - ) -> Result, Error> { + ) -> Result { debug!( "expire_lower_bound: {:?}, window_size: {:?}", expire_lower_bound.map(|t| t.to_iso8601_string()), @@ -318,7 +341,7 @@ impl DirtyTimeWindows { .observe(stalled_time_range.num_seconds() as f64); let mut expr_lst = vec![]; - for (start, end) in to_be_query.into_iter() { + for (start, end) in to_be_query.clone().into_iter() { // align using time window exprs let (start, end) = if let Some(ctx) = task_ctx { let Some(time_window_expr) = &ctx.config.time_window_expr else { @@ -350,7 +373,12 @@ impl DirtyTimeWindows { expr_lst.push(expr); } let expr = expr_lst.into_iter().reduce(|a, b| a.or(b)); - Ok(expr) + + let working = WorkingTimeWindows { + windows: to_be_query, + filter: expr, + }; + Ok(working) } fn align_time_window( @@ -646,7 +674,8 @@ mod test { 0, None, ) - .unwrap(); + .unwrap() + .filter; let unparser = datafusion::sql::unparser::Unparser::default(); let to_sql = filter_expr diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 942c7e0669..31d4f8a2df 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -46,7 +46,7 @@ use tokio::time::Instant; use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::state::{DirtyTimeWindows, TaskState}; +use crate::batching_mode::state::{DirtyTimeWindows, TaskState, WorkingTimeWindows}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, @@ -61,9 +61,9 @@ use crate::error::{ SubstraitEncodeLogicalPlanSnafu, UnexpectedSnafu, }; use crate::metrics::{ - METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, - METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, - METRIC_FLOW_ROWS, + METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT, METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT, + METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME, METRIC_FLOW_BATCHING_ENGINE_SLOW_QUERY, + METRIC_FLOW_BATCHING_ENGINE_START_QUERY_CNT, METRIC_FLOW_ROWS, }; use crate::{Error, FlowId}; @@ -110,6 +110,13 @@ enum QueryType { Sql, } +/// A plan with working time windows, used to track the time windows that are currently being processed +#[derive(Debug, Clone)] +pub struct PlanWithWindow { + pub plan: Option, + pub working_windows: WorkingTimeWindows, +} + #[derive(Clone)] pub struct BatchingTask { pub config: Arc, @@ -218,30 +225,29 @@ impl BatchingTask { engine: &QueryEngineRef, frontend_client: &Arc, ) -> Result, Error> { - if let Some(new_query) = self.gen_insert_plan(engine).await? { + if let Some(new_query) = self.gen_insert_plan(engine).await?.plan { debug!("Generate new query: {}", new_query); - self.execute_logical_plan(frontend_client, &new_query).await + self.execute_logical_plan(frontend_client, &new_query) + .await + .map(Some) } else { debug!("Generate no query"); Ok(None) } } - pub async fn gen_insert_plan( - &self, - engine: &QueryEngineRef, - ) -> Result, Error> { + pub async fn gen_insert_plan(&self, engine: &QueryEngineRef) -> Result { let (table, df_schema) = get_table_info_df_schema( self.config.catalog_manager.clone(), self.config.sink_table_name.clone(), ) .await?; - let new_query = self + let new_query_info = self .gen_query_with_time_window(engine.clone(), &table.meta.schema) .await?; - let insert_into = if let Some((new_query, _column_cnt)) = new_query { + let insert_into = if let Some(new_query) = new_query_info.plan { // first check if all columns in input query exists in sink table // since insert into ref to names in record batch generate by given query let table_columns = df_schema @@ -272,12 +278,15 @@ impl BatchingTask { Arc::new(new_query), )) } else { - return Ok(None); + return Ok(new_query_info); }; let insert_into = insert_into.recompute_schema().context(DatafusionSnafu { context: "Failed to recompute schema", })?; - Ok(Some(insert_into)) + Ok(PlanWithWindow { + plan: Some(insert_into), + working_windows: new_query_info.working_windows, + }) } pub async fn create_table( @@ -297,7 +306,7 @@ impl BatchingTask { &self, frontend_client: &Arc, plan: &LogicalPlan, - ) -> Result, Error> { + ) -> Result<(u32, Duration), Error> { let instant = Instant::now(); let flow_id = self.config.flow_id; @@ -410,7 +419,7 @@ impl BatchingTask { let res = res?; - Ok(Some((res, elapsed))) + Ok((res, elapsed)) } /// start executing query in a loop, break when receive shutdown signal @@ -443,7 +452,7 @@ impl BatchingTask { .with_label_values(&[&flow_id_str]) .inc(); - let new_query = match self.gen_insert_plan(&engine).await { + let new_query_info = match self.gen_insert_plan(&engine).await { Ok(new_query) => new_query, Err(err) => { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id); @@ -453,8 +462,10 @@ impl BatchingTask { } }; - let res = if let Some(new_query) = &new_query { - self.execute_logical_plan(&frontend_client, new_query).await + let res = if let Some(new_query) = &new_query_info.plan { + self.execute_logical_plan(&frontend_client, new_query) + .await + .map(Some) } else { Ok(None) }; @@ -463,7 +474,17 @@ impl BatchingTask { // normal execute, sleep for some time before doing next query Ok(Some(_)) => { let sleep_until = { - let state = self.state.write().unwrap(); + let mut state = self.state.write().unwrap(); + + // double cur_filter_cnt + state.cur_filter_cnt = state.cur_filter_cnt.saturating_mul(2).min( + state + .max_filter_num + .unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM), + ); + METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT + .with_label_values(&[&flow_id_str]) + .set(state.cur_filter_cnt as i64); state.get_next_start_query_time( self.config.flow_id, @@ -491,7 +512,7 @@ impl BatchingTask { METRIC_FLOW_BATCHING_ENGINE_ERROR_CNT .with_label_values(&[&flow_id_str]) .inc(); - match new_query { + match new_query_info.plan { Some(query) => { common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) } @@ -499,6 +520,17 @@ impl BatchingTask { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id) } } + { + // return working windows to dirty windows, and reset current filter cnt so next time we generate query only generate a small query + let mut state = self.state.write().unwrap(); + state + .dirty_time_windows + .add_windows(new_query_info.working_windows.windows); + state.cur_filter_cnt = 1; + METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT + .with_label_values(&[&flow_id_str]) + .set(state.cur_filter_cnt as i64); + } // also sleep for a little while before try again to prevent flooding logs tokio::time::sleep(MIN_REFRESH_DURATION).await; } @@ -527,7 +559,7 @@ impl BatchingTask { &self, engine: QueryEngineRef, sink_table_schema: &Arc, - ) -> Result, Error> { + ) -> Result { let query_ctx = self.state.read().unwrap().query_ctx.clone(); let start = SystemTime::now(); let since_the_epoch = start @@ -540,7 +572,6 @@ impl BatchingTask { .unwrap_or(u64::MIN); let low_bound = Timestamp::new_second(low_bound as i64); - let schema_len = self.config.output_schema.fields().len(); let expire_time_window_bound = self .config @@ -573,10 +604,12 @@ impl BatchingTask { context: format!("Failed to rewrite plan:\n {}\n", plan), })? .data; - let schema_len = plan.schema().fields().len(); // since no time window lower/upper bound is found, just return the original query(with auto columns) - return Ok(Some((plan, schema_len))); + return Ok(PlanWithWindow { + plan: Some(plan), + working_windows: WorkingTimeWindows::default(), + }); }; debug!( @@ -598,16 +631,14 @@ impl BatchingTask { ), })?; - let expr = { + let working_windows = { let mut state = self.state.write().unwrap(); - let max_window_cnt = state - .max_filter_num - .unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM); + let cur_wnd_cnt = state.cur_filter_cnt; state.dirty_time_windows.gen_filter_exprs( &col_name, Some(l), window_size, - max_window_cnt, + cur_wnd_cnt, self.config.flow_id, Some(self), )? @@ -616,7 +647,9 @@ impl BatchingTask { debug!( "Flow id={:?}, Generated filter expr: {:?}", self.config.flow_id, - expr.as_ref() + working_windows + .filter + .as_ref() .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu { context: format!("Failed to generate filter expr from {expr:?}"), })) @@ -624,13 +657,16 @@ impl BatchingTask { .map(|s| s.to_string()) ); - let Some(expr) = expr else { + let Some(expr) = &working_windows.filter else { // no new data, hence no need to update debug!("Flow id={:?}, no new data, not update", self.config.flow_id); - return Ok(None); + return Ok(PlanWithWindow { + plan: None, + working_windows, + }); }; - let mut add_filter = AddFilterRewriter::new(expr); + let mut add_filter = AddFilterRewriter::new(expr.clone()); let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); let plan = @@ -646,7 +682,10 @@ impl BatchingTask { // only apply optimize after complex rewrite is done let new_plan = apply_df_optimizer(rewrite).await?; - Ok(Some((new_plan, schema_len))) + Ok(PlanWithWindow { + plan: Some(new_plan), + working_windows, + }) } } diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 6c792f8f35..cd6f6d96b0 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -96,6 +96,12 @@ lazy_static! { &["flow_id"] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_CUR_WND_CNT: IntGaugeVec = register_int_gauge_vec!( + "greptime_flow_batching_current_window_count", + "flow batching engine current query window count per flow id", + &["flow_id"] + ) + .unwrap(); pub static ref METRIC_FLOW_BATCHING_ENGINE_GUESS_FE_LOAD: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_guess_fe_load",