refactor: even finer&limit time window num

This commit is contained in:
discord9
2025-03-03 18:23:02 +08:00
parent 5d14a1afc2
commit 0374c332d2

View File

@@ -507,6 +507,10 @@ fn to_df_literal(value: Timestamp) -> Result<datafusion_common::ScalarValue, Err
impl DirtyTimeWindows {
/// Time window merge distance
const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
/// # Arguments
@@ -532,8 +536,34 @@ impl DirtyTimeWindows {
window_size
);
self.merge_dirty_time_windows(window_size)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
warn!(
"Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong",
self.windows.len(),
Self::MAX_FILTER_NUM
);
}
// get the first `MAX_FILTER_NUM` time windows
let nth = self
.windows
.iter()
.nth(Self::MAX_FILTER_NUM)
.map(|(key, _)| *key);
let first_nth = {
if let Some(nth) = nth {
let mut after = self.windows.split_off(&nth);
std::mem::swap(&mut self.windows, &mut after);
after
} else {
std::mem::take(&mut self.windows)
}
};
let mut expr_lst = vec![];
for (start, end) in std::mem::take(&mut self.windows).into_iter() {
for (start, end) in first_nth.into_iter() {
debug!(
"Time window start: {:?}, end: {:?}",
start.to_iso8601_string(),
@@ -590,6 +620,7 @@ impl DirtyTimeWindows {
let prev_upper = prev_tw
.1
.unwrap_or(prev_tw.0.add_duration(std_window_size).context(TimeSnafu)?);
prev_tw.1 = Some(prev_upper);
let cur_upper = upper_bound.unwrap_or(
lower_bound
@@ -655,3 +686,87 @@ enum ExecState {
Idle,
Executing,
}
#[cfg(test)]
mod test {
use pretty_assertions::assert_eq;
use super::*;
#[test]
fn test_merge_dirty_time_windows() {
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.unwrap();
// just enough to merge
assert_eq!(
dirty.windows,
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
)])
);
// separate time window
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.unwrap();
// just enough to merge
assert_eq!(
BTreeMap::from([
(
Timestamp::new_second(0),
Some(Timestamp::new_second(5 * 60))
),
(
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Some(Timestamp::new_second(
(3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
)
]),
dirty.windows
);
// overlapping
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.unwrap();
// just enough to merge
assert_eq!(
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60
))
),]),
dirty.windows
);
}
}