test: add align time window test

This commit is contained in:
discord9
2025-04-21 12:59:27 +08:00
parent 6d0470c3fb
commit 82cee11eea
2 changed files with 86 additions and 11 deletions

View File

@@ -27,6 +27,7 @@ use tokio::sync::oneshot;
use tokio::time::Instant;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::{Error, FlowId};
@@ -192,16 +193,7 @@ impl DirtyTimeWindows {
}
.fail()?
};
let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
reason: format!(
"Failed to align start time {:?} with time window expr {:?}",
start, time_window_expr
),
})?;
let align_end = end
.and_then(|end| time_window_expr.eval(end).map(|r| r.1).transpose())
.transpose()?;
(align_start, align_end)
self.align_time_window(start, end, time_window_expr)?
} else {
(start, end)
};
@@ -227,6 +219,30 @@ impl DirtyTimeWindows {
Ok(expr)
}
fn align_time_window(
&self,
start: Timestamp,
end: Option<Timestamp>,
time_window_expr: &TimeWindowExpr,
) -> Result<(Timestamp, Option<Timestamp>), Error> {
let align_start = time_window_expr.eval(start)?.0.context(UnexpectedSnafu {
reason: format!(
"Failed to align start time {:?} with time window expr {:?}",
start, time_window_expr
),
})?;
let align_end = end
.and_then(|end| {
time_window_expr
.eval(end)
// if after aligned, end is the same, then use end(because it's already aligned) else use aligned end
.map(|r| if r.0 == Some(end) { r.0 } else { r.1 })
.transpose()
})
.transpose()?;
Ok((align_start, align_end))
}
/// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows(
&mut self,
@@ -315,8 +331,12 @@ enum ExecState {
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use session::context::QueryContext;
use super::*;
use crate::batching_mode::time_window::find_time_window_expr;
use crate::batching_mode::utils::sql_to_df_plan;
use crate::test_utils::create_test_query_engine;
#[test]
fn test_merge_dirty_time_windows() {
@@ -432,4 +452,59 @@ mod test {
assert_eq!(expected_filter_expr, to_sql.as_deref());
}
}
#[tokio::test]
async fn test_align_time_window() {
type TimeWindow = (Timestamp, Option<Timestamp>);
struct TestCase {
sql: String,
aligns: Vec<(TimeWindow, TimeWindow)>,
}
let testcases: Vec<TestCase> = vec![TestCase{
sql: "SELECT date_bin(INTERVAL '5 second', ts) AS time_window FROM numbers_with_ts GROUP BY time_window;".to_string(),
aligns: vec![
((Timestamp::new_second(3), None), (Timestamp::new_second(0), None)),
((Timestamp::new_second(8), None), (Timestamp::new_second(5), None)),
((Timestamp::new_second(8), Some(Timestamp::new_second(10))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
((Timestamp::new_second(8), Some(Timestamp::new_second(9))), (Timestamp::new_second(5), Some(Timestamp::new_second(10)))),
],
}];
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
for TestCase { sql, aligns } in testcases {
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), &sql, true)
.await
.unwrap();
let (column_name, time_window_expr, _, df_schema) = find_time_window_expr(
&plan,
query_engine.engine_state().catalog_manager().clone(),
ctx.clone(),
)
.await
.unwrap();
let time_window_expr = time_window_expr
.map(|expr| {
TimeWindowExpr::from_expr(
&expr,
&column_name,
&df_schema,
&query_engine.engine_state().session_state(),
)
})
.transpose()
.unwrap()
.unwrap();
let dirty = DirtyTimeWindows::default();
for (before_align, expected_after_align) in aligns {
let after_align = dirty
.align_time_window(before_align.0, before_align.1, &time_window_expr)
.unwrap();
assert_eq!(expected_after_align, after_align);
}
}
}
}

View File

@@ -267,7 +267,7 @@ fn columnar_to_ts_vector(columnar: &ColumnarValue) -> Result<Vec<Option<Timestam
Ok(val)
}
/// Return (the column name of time index column, the time window expr, the expected time unit of time index column, the expr's schema for evaluating the time window)
/// Return (`the column name of time index column`, `the time window expr`, `the expected time unit of time index column`, `the expr's schema for evaluating the time window`)
///
/// The time window expr is expected to have one input column with Timestamp type, and also return Timestamp type, the time window expr is expected
/// to be monotonic increasing and appears in the innermost GROUP BY clause