diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 464cba8735..1e1cac3555 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -478,12 +478,13 @@ impl Stream for RangeManipulateStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let timer = std::time::Instant::now(); let poll = loop { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { + let timer = std::time::Instant::now(); let result = self.manipulate(batch); if let Ok(None) = result { + self.metric.elapsed_compute().add_elapsed(timer); continue; } else { self.num_series.add(1); @@ -577,7 +578,10 @@ impl RangeManipulateStream { ) })?; - let mut ranges = vec![]; + let len = ts_column.len(); + if len == 0 { + return Ok((vec![], (self.start, self.end))); + } // shorten the range to calculate let first_ts = ts_column.value(0); @@ -589,17 +593,29 @@ impl RangeManipulateStream { if start > end { return Ok((vec![], (start, end))); } + let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize); // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered. let mut range_start_index = 0usize; + let mut last_range_start = 0; + let mut start_delta = 0; for curr_ts in (start..=end).step_by(self.interval as _) { + // determine range start + let start_ts = curr_ts - self.range; + + // advance cursor based on last range let mut range_start = ts_column.len(); let mut range_end = 0; - let mut cursor = range_start_index; + let mut cursor = range_start_index + start_delta; + // search back to keep the result correct + while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 { + cursor -= 1; + } + while cursor < ts_column.len() { let ts = ts_column.value(cursor); - if ts + self.range >= curr_ts { - range_start = range_start.min(cursor); + if range_start > cursor && ts >= start_ts { + range_start = cursor; range_start_index = range_start; } if ts <= curr_ts { @@ -612,8 +628,11 @@ impl RangeManipulateStream { } if range_start > range_end { ranges.push((0, 0)); + start_delta = 0; } else { ranges.push((range_start as _, (range_end + 1 - range_start) as _)); + start_delta = range_start - last_range_start; + last_range_start = range_start; } }