diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index 0d2cc56fa4..2498f389a1 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -91,17 +91,14 @@ impl TaskState { time_window_size: &Option, max_timeout: Option, ) -> Instant { - let last_duration = max_timeout - .unwrap_or(self.last_query_duration) - .min(self.last_query_duration) - .max(MIN_REFRESH_DURATION); - - let next_duration = time_window_size - .map(|t| { - let half = t / 2; - half.max(last_duration) - }) - .unwrap_or(last_duration); + // = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout` + let lower = MIN_REFRESH_DURATION.max(time_window_size.unwrap_or_default()); + let next_duration = self.last_query_duration.max(lower); + let next_duration = if let Some(max_timeout) = max_timeout { + next_duration.min(max_timeout) + } else { + next_duration + }; METRIC_FLOW_BATCHING_ENGINE_WAIT_TIME .with_label_values(&[