perf: faster range manipulate for promql (#5859)

* try 1

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* heuristically advance cursor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* prevent underflow

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* postpone vec allocation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-09 18:08:41 +08:00
committed by GitHub
parent df362be012
commit dda7496265

View File

@@ -478,12 +478,13 @@ impl Stream for RangeManipulateStream {
type Item = DataFusionResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let timer = std::time::Instant::now();
let poll = loop {
match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
let timer = std::time::Instant::now();
let result = self.manipulate(batch);
if let Ok(None) = result {
self.metric.elapsed_compute().add_elapsed(timer);
continue;
} else {
self.num_series.add(1);
@@ -577,7 +578,10 @@ impl RangeManipulateStream {
)
})?;
let mut ranges = vec![];
let len = ts_column.len();
if len == 0 {
return Ok((vec![], (self.start, self.end)));
}
// shorten the range to calculate
let first_ts = ts_column.value(0);
@@ -589,17 +593,29 @@ impl RangeManipulateStream {
if start > end {
return Ok((vec![], (start, end)));
}
let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
// calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
let mut range_start_index = 0usize;
let mut last_range_start = 0;
let mut start_delta = 0;
for curr_ts in (start..=end).step_by(self.interval as _) {
// determine range start
let start_ts = curr_ts - self.range;
// advance cursor based on last range
let mut range_start = ts_column.len();
let mut range_end = 0;
let mut cursor = range_start_index;
let mut cursor = range_start_index + start_delta;
// search back to keep the result correct
while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
cursor -= 1;
}
while cursor < ts_column.len() {
let ts = ts_column.value(cursor);
if ts + self.range >= curr_ts {
range_start = range_start.min(cursor);
if range_start > cursor && ts >= start_ts {
range_start = cursor;
range_start_index = range_start;
}
if ts <= curr_ts {
@@ -612,8 +628,11 @@ impl RangeManipulateStream {
}
if range_start > range_end {
ranges.push((0, 0));
start_delta = 0;
} else {
ranges.push((range_start as _, (range_end + 1 - range_start) as _));
start_delta = range_start - last_range_start;
last_range_start = range_start;
}
}