From e045a0dbdfee40fc2728044c9409dbdc0c6b86cb Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 10 Jul 2025 00:00:20 +0800 Subject: [PATCH] refactor: faster window expr Signed-off-by: discord9 --- src/flow/src/batching_mode/engine.rs | 6 +-- src/flow/src/batching_mode/time_window.rs | 49 +++++++++++++++++++++-- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 83e2f90b27..e6ff6c54b3 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -14,7 +14,7 @@ //! Batching mode engine -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::sync::Arc; use api::v1::flow::{DirtyWindowRequests, FlowResponse}; @@ -142,7 +142,7 @@ impl BatchingEngine { let handle: JoinHandle> = tokio::spawn(async move { let src_table_names = &task.config.source_table_names; - let mut all_dirty_windows = vec![]; + let mut all_dirty_windows = HashSet::new(); for src_table_name in src_table_names { if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) { let Some(expr) = &task.config.time_window_expr else { @@ -155,7 +155,7 @@ impl BatchingEngine { .context(UnexpectedSnafu { reason: "Failed to eval start value", })?; - all_dirty_windows.push(align_start); + all_dirty_windows.insert(align_start); } } } diff --git a/src/flow/src/batching_mode/time_window.rs b/src/flow/src/batching_mode/time_window.rs index 54ccf7a49d..795de2cd8c 100644 --- a/src/flow/src/batching_mode/time_window.rs +++ b/src/flow/src/batching_mode/time_window.rs @@ -50,7 +50,8 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::adapter::util::from_proto_to_data_type; use crate::error::{ - ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu, + ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu, + UnexpectedSnafu, }; use crate::expr::error::DataTypeSnafu; use crate::Error; @@ -74,6 +75,7 @@ pub struct TimeWindowExpr { logical_expr: Expr, df_schema: DFSchema, eval_time_window_size: Option, + eval_time_original: Option, } impl std::fmt::Display for TimeWindowExpr { @@ -106,10 +108,11 @@ impl TimeWindowExpr { logical_expr: expr.clone(), df_schema: df_schema.clone(), eval_time_window_size: None, + eval_time_original: None, }; let test_ts = DEFAULT_TEST_TIMESTAMP; - let (l, u) = zelf.eval(test_ts)?; - let time_window_size = match (l, u) { + let (lower, upper) = zelf.eval(test_ts)?; + let time_window_size = match (lower, upper) { (Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| { UnexpectedSnafu { reason: format!( @@ -121,13 +124,53 @@ impl TimeWindowExpr { _ => None, }; zelf.eval_time_window_size = time_window_size; + zelf.eval_time_original = lower; + Ok(zelf) } + /// TODO(discord9): add `eval_batch` too pub fn eval( &self, current: Timestamp, ) -> Result<(Option, Option), Error> { + fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 { + if stride_ns == 0 { + return time_diff_ns; + } + // a - (a % n) impl ceil to nearest n * stride + let time_delta = time_diff_ns - (time_diff_ns % stride_ns); + + if time_diff_ns < 0 && time_delta != time_diff_ns { + // The origin is later than the source timestamp, round down to the previous bin + + time_delta - stride_ns + } else { + time_delta + } + } + + if let (Some(original), Some(window_size)) = + (self.eval_time_original, self.eval_time_window_size) + { + // date_bin align current to lower bound + let time_diff_ns = current.sub(&original).unwrap().num_nanoseconds().unwrap(); + + let window_size_ns = window_size.as_nanos() as i64; + + let distance_ns = compute_distance(time_diff_ns, window_size_ns); + + let lower_bound = if distance_ns >= 0 { + original.add_duration(std::time::Duration::from_nanos(distance_ns as u64)) + } else { + original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64)) + } + .context(TimeSnafu)?; + let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?; + + return Ok((Some(lower_bound), Some(upper_bound))); + } + let lower_bound = calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?; let upper_bound =