From d088dc27981ec305d9e9e9c88deaf609fcf17d60 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 3 Mar 2025 18:23:02 +0800 Subject: [PATCH] refactor: even finer&limit time window num --- src/flow/src/recording_rules/engine.rs | 117 ++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 1 deletion(-) diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index 48c7b9fe82..37944c5b55 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -507,6 +507,10 @@ fn to_df_literal(value: Timestamp) -> Result Self::MAX_FILTER_NUM { + warn!( + "Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong", + self.windows.len(), + Self::MAX_FILTER_NUM + ); + } + + // get the first `MAX_FILTER_NUM` time windows + let nth = self + .windows + .iter() + .nth(Self::MAX_FILTER_NUM) + .map(|(key, _)| *key); + let first_nth = { + if let Some(nth) = nth { + let mut after = self.windows.split_off(&nth); + std::mem::swap(&mut self.windows, &mut after); + + after + } else { + std::mem::take(&mut self.windows) + } + }; + let mut expr_lst = vec![]; - for (start, end) in std::mem::take(&mut self.windows).into_iter() { + for (start, end) in first_nth.into_iter() { debug!( "Time window start: {:?}, end: {:?}", start.to_iso8601_string(), @@ -590,6 +620,7 @@ impl DirtyTimeWindows { let prev_upper = prev_tw .1 .unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?); + prev_tw.1 = Some(prev_upper); let cur_upper = upper_bound.unwrap_or( lower_bound @@ -655,3 +686,87 @@ enum ExecState { Idle, Executing, } + +#[cfg(test)] +mod test { + use pretty_assertions::assert_eq; + + use super::*; + + #[test] + fn test_merge_dirty_time_windows() { + let mut dirty = DirtyTimeWindows::default(); + dirty.add_lower_bounds( + vec![ + Timestamp::new_second(0), + Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + ] + .into_iter(), + ); + dirty + .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60)) + .unwrap(); + // just enough to merge + assert_eq!( + dirty.windows, + BTreeMap::from([( + Timestamp::new_second(0), + Some(Timestamp::new_second( + (2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60 + )) + )]) + ); + + // separate time window + let mut dirty = DirtyTimeWindows::default(); + dirty.add_lower_bounds( + vec![ + Timestamp::new_second(0), + Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + ] + .into_iter(), + ); + dirty + .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60)) + .unwrap(); + // just enough to merge + assert_eq!( + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second(5 * 60)) + ), + ( + Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + Some(Timestamp::new_second( + (3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60 + )) + ) + ]), + dirty.windows + ); + + // overlapping + let mut dirty = DirtyTimeWindows::default(); + dirty.add_lower_bounds( + vec![ + Timestamp::new_second(0), + Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60), + ] + .into_iter(), + ); + dirty + .merge_dirty_time_windows(chrono::Duration::seconds(5 * 60)) + .unwrap(); + // just enough to merge + assert_eq!( + BTreeMap::from([( + Timestamp::new_second(0), + Some(Timestamp::new_second( + (1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60 + )) + ),]), + dirty.windows + ); + } +}