diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index e4bc15e2c0..5be892a7e1 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -214,48 +214,60 @@ impl DirtyTimeWindows { // get the first `window_cnt` time windows let max_time_range = window_size * window_cnt as i32; - let nth = { - let mut cur_time_range = chrono::Duration::zero(); - let mut nth_key = None; - for (idx, (start, end)) in self.windows.iter().enumerate() { - // if time range is too long, stop - if cur_time_range > max_time_range { - nth_key = Some(*start); - break; - } - // if we have enough time windows, stop - if idx >= window_cnt { - nth_key = Some(*start); - break; - } + let mut to_be_query = BTreeMap::new(); + let mut new_windows = self.windows.clone(); + let mut cur_time_range = chrono::Duration::zero(); + for (idx, (start, end)) in self.windows.iter().enumerate() { + let first_end = start + .add_duration(window_size.to_std().unwrap()) + .context(TimeSnafu)?; + let end = end.unwrap_or(first_end); - if let Some(end) = end { - if let Some(x) = end.sub(start) { - cur_time_range += x; - } - } + // if time range is too long, stop + if cur_time_range >= max_time_range { + break; } - nth_key - }; - let first_nth = { - if let Some(nth) = nth { - let mut after = self.windows.split_off(&nth); - std::mem::swap(&mut self.windows, &mut after); - - after - } else { - std::mem::take(&mut self.windows) + // if we have enough time windows, stop + if idx >= window_cnt { + break; } - }; + + if let Some(x) = end.sub(start) { + if cur_time_range + x <= max_time_range { + to_be_query.insert(*start, Some(end)); + new_windows.remove(start); + cur_time_range += x; + } else { + // too large a window, split it + // split at window_size * times + let surplus = max_time_range - cur_time_range; + let times = surplus.num_seconds() / window_size.num_seconds(); + + let split_offset = window_size * times as i32; + let split_at = start + .add_duration(split_offset.to_std().unwrap()) + .context(TimeSnafu)?; + to_be_query.insert(*start, Some(split_at)); + + // remove the original window + new_windows.remove(start); + new_windows.insert(split_at, Some(end)); + cur_time_range += split_offset; + break; + } + } + } + + self.windows = new_windows; METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT .with_label_values(&[ flow_id.to_string().as_str(), format!("{}", window_size).as_str(), ]) - .observe(first_nth.len() as f64); + .observe(to_be_query.len() as f64); METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT .with_label_values(&[ @@ -264,7 +276,7 @@ impl DirtyTimeWindows { ]) .observe(self.windows.len() as f64); - let full_time_range = first_nth + let full_time_range = to_be_query .iter() .fold(chrono::Duration::zero(), |acc, (start, end)| { if let Some(end) = end { @@ -282,7 +294,7 @@ impl DirtyTimeWindows { .observe(full_time_range); let mut expr_lst = vec![]; - for (start, end) in first_nth.into_iter() { + for (start, end) in to_be_query.into_iter() { // align using time window exprs let (start, end) = if let Some(ctx) = task_ctx { let Some(time_window_expr) = &ctx.config.time_window_expr else { @@ -516,6 +528,64 @@ mod test { "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:21' AS TIMESTAMP)))", ) ), + // split range + ( + Vec::from_iter((0..20).map(|i|Timestamp::new_second(i*3)).chain(std::iter::once( + Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)), + ))), + (chrono::Duration::seconds(3), None), + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second( + 60 + )), + ), + ( + Timestamp::new_second(60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1)), + Some(Timestamp::new_second( + 60 + 3 * (DirtyTimeWindows::MERGE_DIST as i64 + 1) + 3 + )), + )]), + Some( + "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))", + ) + ), + // split 2 min into 1 min + ( + Vec::from_iter((0..40).map(|i|Timestamp::new_second(i*3))), + (chrono::Duration::seconds(3), None), + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second( + 40 * 3 + )), + )]), + Some( + "((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:00' AS TIMESTAMP)))", + ) + ), + // split 3s + 1min into 3s + 57s + ( + Vec::from_iter(std::iter::once(Timestamp::new_second(0)).chain((0..40).map(|i|Timestamp::new_second(20+i*3)))), + (chrono::Duration::seconds(3), None), + BTreeMap::from([ + ( + Timestamp::new_second(0), + Some(Timestamp::new_second( + 3 + )), + ),( + Timestamp::new_second(20), + Some(Timestamp::new_second( + 140 + )), + )]), + Some( + "(((ts >= CAST('1970-01-01 00:00:00' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:00:03' AS TIMESTAMP))) OR ((ts >= CAST('1970-01-01 00:00:20' AS TIMESTAMP)) AND (ts < CAST('1970-01-01 00:01:17' AS TIMESTAMP))))", + ) + ), // expired ( vec![ @@ -532,6 +602,8 @@ mod test { None ), ]; + // let len = testcases.len(); + // let testcases = testcases[(len - 2)..(len - 1)].to_vec(); for (lower_bounds, (window_size, expire_lower_bound), expected, expected_filter_expr) in testcases {