From f04f16cec3efae81b3d375aef0b1e9d2a33dedd4 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 13 Mar 2025 14:15:29 +0800 Subject: [PATCH] fix: quote&more info when time window too many chore: even more warning fix: filter first warn later --- src/flow/src/recording_rules.rs | 71 +++++++++++++---- src/flow/src/recording_rules/engine.rs | 106 +++++++++++++++++-------- src/flow/src/test_utils.rs | 31 ++++++++ 3 files changed, 161 insertions(+), 47 deletions(-) diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs index 00c67cf96c..228c30c61e 100644 --- a/src/flow/src/recording_rules.rs +++ b/src/flow/src/recording_rules.rs @@ -60,15 +60,13 @@ use crate::Error; #[derive(Debug, Clone)] pub struct TimeWindowExpr { - expr: PhysicalExprRef, + phy_expr: PhysicalExprRef, column_name: String, + logical_expr: Expr, + df_schema: DFSchema, } impl TimeWindowExpr { - pub fn new(expr: PhysicalExprRef, column_name: String) -> Self { - Self { expr, column_name } - } - pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result { let phy_planner = DefaultPhysicalPlanner::default(); @@ -80,11 +78,24 @@ impl TimeWindowExpr { ), })?; Ok(Self { - expr: phy_expr, + phy_expr, column_name: column_name.to_string(), + logical_expr: expr.clone(), + df_schema: df_schema.clone(), }) } + pub fn eval( + &self, + current: Timestamp, + ) -> Result<(Option, Option), Error> { + let lower_bound = + find_expr_time_window_lower_bound(&self.logical_expr, &self.df_schema, current)?; + let upper_bound = + find_expr_time_window_upper_bound(&self.logical_expr, &self.df_schema, current)?; + Ok((lower_bound, upper_bound)) + } + /// Find timestamps from rows using time window expr pub async fn handle_rows( &self, @@ -131,12 +142,15 @@ impl TimeWindowExpr { ), })?; - let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu { - context: format!( - "Failed to evaluate physical expression {:?} on {rb:?}", - self.expr - ), - })?; + let eval_res = self + .phy_expr + .evaluate(&rb) + .with_context(|_| DatafusionSnafu { + context: format!( + "Failed to evaluate physical expression {:?} on {rb:?}", + self.phy_expr + ), + })?; let res = columnar_to_ts_vector(&eval_res)?; @@ -447,6 +461,7 @@ pub async fn find_plan_time_window_bound( /// /// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`, /// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound +/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound /// of current time window given the current timestamp /// /// if return None, meaning this time window have no lower bound @@ -656,7 +671,19 @@ impl TreeNodeRewriter for AddFilterRewriter { } fn df_plan_to_sql(plan: &LogicalPlan) -> Result { - let unparser = Unparser::default(); + /// A dialect that forces all identifiers to be quoted + struct ForceQuoteIdentifiers; + impl datafusion::sql::unparser::dialect::Dialect for ForceQuoteIdentifiers { + fn identifier_quote_style(&self, identifier: &str) -> Option { + if identifier.to_lowercase() != identifier { + Some('"') + } else { + None + } + } + } + let unparser = Unparser::new(&ForceQuoteIdentifiers); + // first make all column qualified let sql = unparser .plan_to_sql(plan) .with_context(|_e| DatafusionSnafu { @@ -675,6 +702,22 @@ mod test { use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter}; use crate::test_utils::create_test_query_engine; + #[tokio::test] + async fn test_sql_plan_convert() { + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + let old = r#"SELECT "NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#; + let new = sql_to_df_plan(ctx.clone(), query_engine.clone(), old, false) + .await + .unwrap(); + let new_sql = df_plan_to_sql(&new).unwrap(); + + assert_eq!( + r#"SELECT "UPPERCASE_NUMBERS_WITH_TS"."NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#, + new_sql + ); + } + #[tokio::test] async fn test_add_filter() { let testcases = vec![ @@ -723,7 +766,7 @@ mod test { Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)), Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)), ), - "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts" + r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"# ), // complex time window index ( diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index a9f8c791af..3c5a3d12f4 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -45,7 +45,7 @@ use crate::error::{ TimeSnafu, UnexpectedSnafu, }; use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY}; -use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan}; +use crate::recording_rules::{find_time_window_expr, sql_to_df_plan}; use crate::Error; /// TODO(discord9): make those constants configurable @@ -214,6 +214,8 @@ impl RecordingRuleEngine { .map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema)) .transpose()?; + info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr); + let task = RecordingRuleTask::new( flow_id, &sql, @@ -268,11 +270,11 @@ impl RecordingRuleEngine { #[derive(Debug, Clone)] pub struct RecordingRuleTask { - flow_id: FlowId, + pub flow_id: FlowId, query: String, - time_window_expr: Option, + pub time_window_expr: Option, /// in seconds - expire_after: Option, + pub expire_after: Option, sink_table_name: [String; 3], source_table_names: HashSet<[String; 3]>, state: Arc>, @@ -413,30 +415,40 @@ impl RecordingRuleTask { let low_bound = Timestamp::new_second(low_bound as i64); - let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?; - - // TODO(discord9): find more time window! - let (col_name, lower, upper) = - find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone()) - .await?; + // TODO(discord9): use time window expr to get the precise expire lower bound + let expire_time_window_bound = self + .time_window_expr + .as_ref() + .map(|expr| expr.eval(low_bound)) + .transpose()?; let new_sql = { let expr = { - match (lower, upper) { - (Some(l), Some(u)) => { + match expire_time_window_bound { + Some((Some(l), Some(u))) => { let window_size = u.sub(&l).with_context(|| UnexpectedSnafu { reason: format!("Can't get window size from {u:?} - {l:?}"), })?; + let col_name = self + .time_window_expr + .as_ref() + .map(|expr| expr.column_name.clone()) + .with_context(|| UnexpectedSnafu { + reason: format!( + "Flow id={:?}, Failed to get column name from time window expr", + self.flow_id + ), + })?; self.state .write() .await .dirty_time_windows - .gen_filter_exprs(&col_name, lower, window_size)? + .gen_filter_exprs(&col_name, Some(l), window_size, self)? } _ => { warn!( - "Flow id = {:?}, can't get window size: lower={lower:?}, upper={upper:?}, using the same query", self.flow_id + "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id ); // since no time window lower/upper bound is found, just return the original query return Ok(Some(self.query.clone())); @@ -533,19 +545,28 @@ impl DirtyTimeWindows { col_name: &str, expire_lower_bound: Option, window_size: chrono::Duration, + task_ctx: &RecordingRuleTask, ) -> Result, Error> { debug!( "expire_lower_bound: {:?}, window_size: {:?}", expire_lower_bound.map(|t| t.to_iso8601_string()), window_size ); - self.merge_dirty_time_windows(window_size)?; + self.merge_dirty_time_windows(window_size, expire_lower_bound)?; if self.windows.len() > Self::MAX_FILTER_NUM { + let first_time_window = self.windows.first_key_value(); + let last_time_window = self.windows.last_key_value(); warn!( - "Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong", + "Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}", + task_ctx.flow_id, self.windows.len(), - Self::MAX_FILTER_NUM + Self::MAX_FILTER_NUM, + task_ctx.time_window_expr, + task_ctx.expire_after, + first_time_window, + last_time_window, + task_ctx.query ); } @@ -573,18 +594,6 @@ impl DirtyTimeWindows { start.to_iso8601_string(), end.map(|t| t.to_iso8601_string()) ); - let start = if let Some(expire) = expire_lower_bound { - start.max(expire) - } else { - start - }; - - // filter out expired time window - if let Some(end) = end { - if end <= start { - continue; - } - } use datafusion_expr::{col, lit}; let lower = to_df_literal(start)?; @@ -603,11 +612,22 @@ impl DirtyTimeWindows { } /// Merge time windows that overlaps or get too close - pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> { + pub fn merge_dirty_time_windows( + &mut self, + window_size: chrono::Duration, + expire_lower_bound: Option, + ) -> Result<(), Error> { let mut new_windows = BTreeMap::new(); let mut prev_tw = None; for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) { + // filter out expired time window + if let Some(expire_lower_bound) = expire_lower_bound { + if lower_bound <= expire_lower_bound { + continue; + } + } + let Some(prev_tw) = &mut prev_tw else { prev_tw = Some((lower_bound, upper_bound)); continue; @@ -708,7 +728,7 @@ mod test { .into_iter(), ); dirty - .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60)) + .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None) .unwrap(); // just enough to merge assert_eq!( @@ -731,7 +751,7 @@ mod test { .into_iter(), ); dirty - .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60)) + .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None) .unwrap(); // just enough to merge assert_eq!( @@ -760,7 +780,7 @@ mod test { .into_iter(), ); dirty - .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60)) + .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None) .unwrap(); // just enough to merge assert_eq!( @@ -772,5 +792,25 @@ mod test { ),]), dirty.windows ); + + // expired + let mut dirty = DirtyTimeWindows::default(); + dirty.add_lower_bounds( + vec![ + Timestamp::new_second(0), + Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + ] + .into_iter(), + ); + dirty + .merge_dirty_time_windows( + chrono::Duration::seconds(5 * 60), + Some(Timestamp::new_second( + (DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60, + )), + ) + .unwrap(); + // just enough to merge + assert_eq!(BTreeMap::from([]), dirty.windows); } } diff --git a/src/flow/src/test_utils.rs b/src/flow/src/test_utils.rs index 3c4a614951..4d269a80c0 100644 --- a/src/flow/src/test_utils.rs +++ b/src/flow/src/test_utils.rs @@ -115,6 +115,37 @@ pub fn create_test_query_engine() -> Arc { }; catalog_list.register_table_sync(req_with_ts).unwrap(); + let schema = vec![ + datatypes::schema::ColumnSchema::new("NUMBER", CDT::uint32_datatype(), false), + datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false) + .with_time_index(true), + ]; + let mut columns = vec![]; + let numbers = (1..=10).collect_vec(); + let column: VectorRef = Arc::new(::VectorType::from_vec(numbers)); + columns.push(column); + + let ts = (1..=10).collect_vec(); + let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10); + ts.into_iter() + .map(|v| builder.push(Some(TimestampMillisecond::new(v)))) + .count(); + let column: VectorRef = builder.to_vector_cloned(); + columns.push(column); + + let schema = Arc::new(Schema::new(schema)); + let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap(); + let table = MemTable::table("UPPERCASE_NUMBERS_WITH_TS", recordbatch); + + let req_with_ts = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "UPPERCASE_NUMBERS_WITH_TS".to_string(), + table_id: 1025, + table, + }; + catalog_list.register_table_sync(req_with_ts).unwrap(); + let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false); let engine = factory.query_engine();