fix: avoid large vector allocation on large query span (#2006)

* avoid collect all timestamp at the begining

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* simplify branch logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-07-20 17:26:53 +08:00
committed by GitHub
parent 172febb1af
commit 6235441577

View File

@@ -333,7 +333,7 @@ impl InstantManipulateStream {
// refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = Vec::with_capacity(input.num_rows());
let mut take_indices = vec![];
// TODO(ruihang): maybe the input is not timestamp millisecond array
let ts_column = input
.column(self.time_index)
@@ -347,12 +347,12 @@ impl InstantManipulateStream {
.and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
let mut cursor = 0;
let aligned_ts = (self.start..=self.end)
.step_by(self.interval as usize)
.collect::<Vec<_>>();
let aligned_ts_iter = (self.start..=self.end).step_by(self.interval as usize);
let mut aligned_ts = vec![];
// calculate the offsets to take
'next: for expected_ts in aligned_ts.iter().copied() {
'next: for expected_ts in aligned_ts_iter {
// first, search toward end to see if there is matched timestamp
while cursor < ts_column.len() {
let curr = ts_column.value(cursor);
@@ -360,9 +360,9 @@ impl InstantManipulateStream {
Ordering::Equal => {
if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// ignore the NaN value
take_indices.push(None);
} else {
take_indices.push(Some(cursor as u64));
take_indices.push(cursor as u64);
aligned_ts.push(expected_ts);
}
continue 'next;
}
@@ -373,37 +373,39 @@ impl InstantManipulateStream {
}
if cursor == ts_column.len() {
cursor -= 1;
// short cut this loop
if ts_column.value(cursor) + self.lookback_delta < expected_ts {
break;
}
}
// then examine the value
let curr_ts = ts_column.value(cursor);
if curr_ts + self.lookback_delta < expected_ts {
take_indices.push(None);
continue;
}
if curr_ts > expected_ts {
// exceeds current expected timestamp, examine the previous value
if let Some(prev_cursor) = cursor.checked_sub(1) {
let prev_ts = ts_column.value(prev_cursor);
if prev_ts + self.lookback_delta < expected_ts {
// not found in lookback, leave this field blank.
take_indices.push(None);
} else if let Some(field_column) = &field_column && field_column.value(prev_cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
} else {
if prev_ts + self.lookback_delta >= expected_ts {
// only use the point in the time range
if let Some(field_column) = &field_column
&& field_column.value(prev_cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
continue;
}
// use this point
take_indices.push(Some(prev_cursor as u64));
take_indices.push(prev_cursor as u64);
aligned_ts.push(expected_ts);
}
} else {
take_indices.push(None);
}
} else if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() {
// if the newest value is NaN, it means the value is stale, so we should not use it
take_indices.push(None);
} else {
// use this point
take_indices.push(Some(cursor as u64));
take_indices.push(cursor as u64);
aligned_ts.push(expected_ts);
}
}
@@ -415,19 +417,10 @@ impl InstantManipulateStream {
fn take_record_batch_optional(
&self,
record_batch: RecordBatch,
take_indices: Vec<Option<u64>>,
take_indices: Vec<u64>,
aligned_ts: Vec<Millisecond>,
) -> DataFusionResult<RecordBatch> {
let aligned_ts = aligned_ts
.into_iter()
.zip(take_indices.iter())
.filter_map(|(ts, i)| i.map(|_| ts))
.collect::<Vec<_>>();
let take_indices = take_indices
.iter()
.filter(|i| i.is_some())
.copied()
.collect::<Vec<_>>();
assert_eq!(take_indices.len(), aligned_ts.len());
let indices_array = UInt64Array::from(take_indices);
let mut arrays = record_batch
@@ -801,4 +794,18 @@ mod test {
);
do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
}
#[tokio::test]
async fn ultra_large_range() {
let expected = String::from(
"+-------------------------+-------+\
\n| timestamp | value |\
\n+-------------------------+-------+\
\n| 1970-01-01T00:00:00.001 | 0.0 |\
\n| 1970-01-01T00:01:00.001 | 6.0 |\
\n| 1970-01-01T00:02:00.001 | 12.0 |\
\n+-------------------------+-------+",
);
do_normalize_test(1, 900_000_000_000_000, 10_000, 10_000, expected, true).await;
}
}