diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 5cbc541d88..e89ca996fb 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -27,6 +27,7 @@ use tokio::sync::oneshot; use tokio::time::Instant; use crate::batching_mode::task::BatchingTask; +use crate::batching_mode::time_window::TimeWindowExpr; use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::{Error, FlowId}; @@ -192,16 +193,7 @@ impl DirtyTimeWindows { } .fail()? }; - let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu { - reason: format!( - "Failed to align start time {:?} with time window expr {:?}", - start, time_window_expr - ), - })?; - let align_end = end - .and_then(|end| time_window_expr.eval(end).map(|r| r.1).transpose()) - .transpose()?; - (align_start, align_end) + self.align_time_window(start, end, time_window_expr)? } else { (start, end) }; @@ -227,6 +219,30 @@ impl DirtyTimeWindows { Ok(expr) } + fn align_time_window( + &self, + start: Timestamp, + end: Option, + time_window_expr: &TimeWindowExpr, + ) -> Result<(Timestamp, Option), Error> { + let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu { + reason: format!( + "Failed to align start time {:?} with time window expr {:?}", + start, time_window_expr + ), + })?; + let align_end = end + .and_then(|end| { + time_window_expr + .eval(end) + // if after aligned, end is the same, then use end(because it's already aligned) else use aligned end + .map(|r| if r.0 == Some(end) { r.0 } else { r.1 }) + .transpose() + }) + .transpose()?; + Ok((align_start, align_end)) + } + /// Merge time windows that overlaps or get too close pub fn merge_dirty_time_windows( &mut self, @@ -315,8 +331,12 @@ enum ExecState { #[cfg(test)] mod test { use pretty_assertions::assert_eq; + use session::context::QueryContext; use super::*; + use crate::batching_mode::time_window::find_time_window_expr; + use crate::batching_mode::utils::sql_to_df_plan; + use crate::test_utils::create_test_query_engine; #[test] fn test_merge_dirty_time_windows() { @@ -432,4 +452,59 @@ mod test { assert_eq!(expected_filter_expr, to_sql.as_deref()); } } + + #[tokio::test] + async fn test_align_time_window() { + type TimeWindow = (Timestamp, Option); + struct TestCase { + sql: String, + aligns: Vec<(TimeWindow, TimeWindow)>, + } + let testcases: Vec = vec![TestCase{ + sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(), + aligns: vec![ + ((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)), + ((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)), + ((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))), + ((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))), + ], + }]; + + let query_engine = create_test_query_engine(); + let ctx = QueryContext::arc(); + for TestCase { sql, aligns } in testcases { + let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true) + .await + .unwrap(); + + let (column_name, time_window_expr, _, df_schema) = find_time_window_expr( + &plan, + query_engine.engine_state().catalog_manager().clone(), + ctx.clone(), + ) + .await + .unwrap(); + + let time_window_expr = time_window_expr + .map(|expr| { + TimeWindowExpr::from_expr( + &expr, + &column_name, + &df_schema, + &query_engine.engine_state().session_state(), + ) + }) + .transpose() + .unwrap() + .unwrap(); + + let dirty = DirtyTimeWindows::default(); + for (before_align, expected_after_align) in aligns { + let after_align = dirty + .align_time_window(before_align.0, before_align.1, &time_window_expr) + .unwrap(); + assert_eq!(expected_after_align, after_align); + } + } + } } diff --git a/src/flow/src/batching_mode/time_window.rs b/src/flow/src/batching_mode/time_window.rs index 1b47c205ed..e6a0d6ad8c 100644 --- a/src/flow/src/batching_mode/time_window.rs +++ b/src/flow/src/batching_mode/time_window.rs @@ -267,7 +267,7 @@ fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result