diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index ca3d78e710..90575e8804 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -192,6 +192,12 @@ impl DirtyTimeWindows { self.windows.insert(start, end); } + pub fn add_windows(&mut self, time_ranges: Vec<(Timestamp, Timestamp)>) { + for (start, end) in time_ranges { + self.windows.insert(start, Some(end)); + } + } + /// Clean all dirty time windows, useful when can't found time window expr pub fn clean(&mut self) { self.windows.clear(); @@ -242,7 +248,7 @@ impl DirtyTimeWindows { window_cnt: usize, flow_id: FlowId, task_ctx: Option<&BatchingTask>, - ) -> Result, Error> { + ) -> Result, Error> { ensure!( window_size.num_seconds() > 0, UnexpectedSnafu { @@ -372,7 +378,15 @@ impl DirtyTimeWindows { .with_label_values(&[flow_id.to_string().as_str()]) .observe(stalled_time_range.num_seconds() as f64); + let std_window_size = window_size.to_std().map_err(|e| { + InternalSnafu { + reason: e.to_string(), + } + .build() + })?; + let mut expr_lst = vec![]; + let mut time_ranges = vec![]; for (start, end) in to_be_query.into_iter() { // align using time window exprs let (start, end) = if let Some(ctx) = task_ctx { @@ -386,26 +400,31 @@ impl DirtyTimeWindows { } else { (start, end) }; + let end = end.unwrap_or(start.add_duration(std_window_size).context(TimeSnafu)?); + time_ranges.push((start, end)); + debug!( "Time window start: {:?}, end: {:?}", start.to_iso8601_string(), - end.map(|t| t.to_iso8601_string()) + end.to_iso8601_string() ); use datafusion_expr::{col, lit}; let lower = to_df_literal(start)?; - let upper = end.map(to_df_literal).transpose()?; - let expr = if let Some(upper) = upper { - col(col_name) - .gt_eq(lit(lower)) - .and(col(col_name).lt(lit(upper))) - } else { - col(col_name).gt_eq(lit(lower)) - }; + let upper = to_df_literal(end)?; + let expr = col(col_name) + .gt_eq(lit(lower)) + .and(col(col_name).lt(lit(upper))); expr_lst.push(expr); } let expr = expr_lst.into_iter().reduce(|a, b| a.or(b)); - Ok(expr) + let ret = expr.map(|expr| FilterExprInfo { + expr, + col_name: col_name.to_string(), + time_ranges, + window_size, + }); + Ok(ret) } fn align_time_window( @@ -519,6 +538,25 @@ enum ExecState { Executing, } +/// Filter Expression's information +#[derive(Debug, Clone)] +pub struct FilterExprInfo { + pub expr: datafusion_expr::Expr, + pub col_name: String, + pub time_ranges: Vec<(Timestamp, Timestamp)>, + pub window_size: chrono::Duration, +} + +impl FilterExprInfo { + pub fn total_window_length(&self) -> chrono::Duration { + self.time_ranges + .iter() + .fold(chrono::Duration::zero(), |acc, (start, end)| { + acc + end.sub(start).unwrap_or(chrono::Duration::zero()) + }) + } +} + #[cfg(test)] mod test { use pretty_assertions::assert_eq; @@ -702,7 +740,8 @@ mod test { 0, None, ) - .unwrap(); + .unwrap() + .map(|e| e.expr); let unparser = datafusion::sql::unparser::Unparser::default(); let to_sql = filter_expr diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 9467e089cd..ff97a8a410 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -46,7 +46,7 @@ use tokio::time::Instant; use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL}; use crate::batching_mode::frontend_client::FrontendClient; -use crate::batching_mode::state::TaskState; +use crate::batching_mode::state::{FilterExprInfo, TaskState}; use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::utils::{ get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter, @@ -130,6 +130,11 @@ pub struct TaskArgs<'a> { pub batch_opts: Arc, } +pub struct PlanInfo { + pub plan: LogicalPlan, + pub filter: Option, +} + impl BatchingTask { #[allow(clippy::too_many_arguments)] pub fn try_new( @@ -232,8 +237,9 @@ impl BatchingTask { max_window_cnt: Option, ) -> Result, Error> { if let Some(new_query) = self.gen_insert_plan(engine, max_window_cnt).await? { - debug!("Generate new query: {}", new_query); - self.execute_logical_plan(frontend_client, &new_query).await + debug!("Generate new query: {}", new_query.plan); + self.execute_logical_plan(frontend_client, &new_query.plan) + .await } else { debug!("Generate no query"); Ok(None) @@ -244,7 +250,7 @@ impl BatchingTask { &self, engine: &QueryEngineRef, max_window_cnt: Option, - ) -> Result, Error> { + ) -> Result, Error> { let (table, df_schema) = get_table_info_df_schema( self.config.catalog_manager.clone(), self.config.sink_table_name.clone(), @@ -259,7 +265,7 @@ impl BatchingTask { ) .await?; - let insert_into = if let Some((new_query, _column_cnt)) = new_query { + let insert_into_info = if let Some(new_query) = new_query { // first check if all columns in input query exists in sink table // since insert into ref to names in record batch generate by given query let table_columns = df_schema @@ -267,7 +273,7 @@ impl BatchingTask { .into_iter() .map(|c| c.name) .collect::>(); - for column in new_query.schema().columns() { + for column in new_query.plan.schema().columns() { ensure!( table_columns.contains(column.name()), InvalidQuerySnafu { @@ -283,7 +289,7 @@ impl BatchingTask { let table_source = Arc::new(DefaultTableSource::new(table_provider)); // update_at& time index placeholder (if exists) should have default value - LogicalPlan::Dml(DmlStatement::new( + let plan = LogicalPlan::Dml(DmlStatement::new( datafusion_common::TableReference::Full { catalog: self.config.sink_table_name[0].clone().into(), schema: self.config.sink_table_name[1].clone().into(), @@ -291,15 +297,26 @@ impl BatchingTask { }, table_source, WriteOp::Insert(datafusion_expr::dml::InsertOp::Append), - Arc::new(new_query), - )) + Arc::new(new_query.plan), + )); + PlanInfo { + plan, + filter: new_query.filter, + } } else { return Ok(None); }; - let insert_into = insert_into.recompute_schema().context(DatafusionSnafu { - context: "Failed to recompute schema", - })?; - Ok(Some(insert_into)) + let insert_into = insert_into_info + .plan + .recompute_schema() + .context(DatafusionSnafu { + context: "Failed to recompute schema", + })?; + + Ok(Some(PlanInfo { + plan: insert_into, + filter: insert_into_info.filter, + })) } pub async fn create_table( @@ -434,6 +451,7 @@ impl BatchingTask { frontend_client: Arc, ) { let flow_id_str = self.config.flow_id.to_string(); + let mut max_window_cnt = None; loop { // first check if shutdown signal is received // if so, break the loop @@ -457,7 +475,7 @@ impl BatchingTask { let min_refresh = self.config.batch_opts.experimental_min_refresh_duration; - let new_query = match self.gen_insert_plan(&engine, None).await { + let new_query = match self.gen_insert_plan(&engine, max_window_cnt).await { Ok(new_query) => new_query, Err(err) => { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id); @@ -468,7 +486,8 @@ impl BatchingTask { }; let res = if let Some(new_query) = &new_query { - self.execute_logical_plan(&frontend_client, new_query).await + self.execute_logical_plan(&frontend_client, &new_query.plan) + .await } else { Ok(None) }; @@ -476,6 +495,10 @@ impl BatchingTask { match res { // normal execute, sleep for some time before doing next query Ok(Some(_)) => { + // can increase max_window_cnt to query more windows next time + max_window_cnt = max_window_cnt.map(|cnt| { + (cnt + 1).min(self.config.batch_opts.experimental_max_filter_num_per_query) + }); let sleep_until = { let state = self.state.write().unwrap(); @@ -511,7 +534,16 @@ impl BatchingTask { .inc(); match new_query { Some(query) => { - common_telemetry::error!(err; "Failed to execute query for flow={} with query: {query}", self.config.flow_id) + common_telemetry::error!(err; "Failed to execute query for flow={} with query: {}", self.config.flow_id, query.plan); + // Re-add dirty windows back since query failed + self.state.write().unwrap().dirty_time_windows.add_windows( + query.filter.map(|f| f.time_ranges).unwrap_or_default(), + ); + // TODO(discord9): add some backoff here? half the query time window or what + // backoff meaning use smaller `max_window_cnt` for next query + + // since last query failed, we should not try to query too many windows + max_window_cnt = Some(1); } None => { common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id) @@ -546,7 +578,7 @@ impl BatchingTask { engine: QueryEngineRef, sink_table_schema: &Arc, max_window_cnt: Option, - ) -> Result, Error> { + ) -> Result, Error> { let query_ctx = self.state.read().unwrap().query_ctx.clone(); let start = SystemTime::now(); let since_the_epoch = start @@ -559,7 +591,6 @@ impl BatchingTask { .unwrap_or(u64::MIN); let low_bound = Timestamp::new_second(low_bound as i64); - let schema_len = self.config.output_schema.fields().len(); let expire_time_window_bound = self .config @@ -592,10 +623,9 @@ impl BatchingTask { context: format!("Failed to rewrite plan:\n {}\n", plan), })? .data; - let schema_len = plan.schema().fields().len(); // since no time window lower/upper bound is found, just return the original query(with auto columns) - return Ok(Some((plan, schema_len))); + return Ok(Some(PlanInfo { plan, filter: None })); }; debug!( @@ -636,9 +666,11 @@ impl BatchingTask { "Flow id={:?}, Generated filter expr: {:?}", self.config.flow_id, expr.as_ref() - .map(|expr| expr_to_sql(expr).with_context(|_| DatafusionSnafu { - context: format!("Failed to generate filter expr from {expr:?}"), - })) + .map( + |expr| expr_to_sql(&expr.expr).with_context(|_| DatafusionSnafu { + context: format!("Failed to generate filter expr from {expr:?}"), + }) + ) .transpose()? .map(|s| s.to_string()) ); @@ -649,7 +681,7 @@ impl BatchingTask { return Ok(None); }; - let mut add_filter = AddFilterRewriter::new(expr); + let mut add_filter = AddFilterRewriter::new(expr.expr.clone()); let mut add_auto_column = AddAutoColumnRewriter::new(sink_table_schema.clone()); let plan = @@ -665,7 +697,12 @@ impl BatchingTask { // only apply optimize after complex rewrite is done let new_plan = apply_df_optimizer(rewrite).await?; - Ok(Some((new_plan, schema_len))) + let info = PlanInfo { + plan: new_plan.clone(), + filter: Some(expr), + }; + + Ok(Some(info)) } }