From 6235441577ddd46e3a3d5f451d5e63aa1e1771b1 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 20 Jul 2023 17:26:53 +0800 Subject: [PATCH] fix: avoid large vector allocation on large query span (#2006) * avoid collect all timestamp at the begining Signed-off-by: Ruihang Xia * simplify branch logic Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- .../src/extension_plan/instant_manipulate.rs | 69 ++++++++++--------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index b73a968d97..b09bfe7bf7 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -333,7 +333,7 @@ impl InstantManipulateStream { // refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571 // and the function `vectorSelectorSingle` pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult { - let mut take_indices = Vec::with_capacity(input.num_rows()); + let mut take_indices = vec![]; // TODO(ruihang): maybe the input is not timestamp millisecond array let ts_column = input .column(self.time_index) @@ -347,12 +347,12 @@ impl InstantManipulateStream { .and_then(|index| input.column(index).as_any().downcast_ref::()); let mut cursor = 0; - let aligned_ts = (self.start..=self.end) - .step_by(self.interval as usize) - .collect::>(); + + let aligned_ts_iter = (self.start..=self.end).step_by(self.interval as usize); + let mut aligned_ts = vec![]; // calculate the offsets to take - 'next: for expected_ts in aligned_ts.iter().copied() { + 'next: for expected_ts in aligned_ts_iter { // first, search toward end to see if there is matched timestamp while cursor < ts_column.len() { let curr = ts_column.value(cursor); @@ -360,9 +360,9 @@ impl InstantManipulateStream { Ordering::Equal => { if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() { // ignore the NaN value - take_indices.push(None); } else { - take_indices.push(Some(cursor as u64)); + take_indices.push(cursor as u64); + aligned_ts.push(expected_ts); } continue 'next; } @@ -373,37 +373,39 @@ impl InstantManipulateStream { } if cursor == ts_column.len() { cursor -= 1; + // short cut this loop + if ts_column.value(cursor) + self.lookback_delta < expected_ts { + break; + } } // then examine the value let curr_ts = ts_column.value(cursor); if curr_ts + self.lookback_delta < expected_ts { - take_indices.push(None); continue; } if curr_ts > expected_ts { // exceeds current expected timestamp, examine the previous value if let Some(prev_cursor) = cursor.checked_sub(1) { let prev_ts = ts_column.value(prev_cursor); - if prev_ts + self.lookback_delta < expected_ts { - // not found in lookback, leave this field blank. - take_indices.push(None); - } else if let Some(field_column) = &field_column && field_column.value(prev_cursor).is_nan() { - // if the newest value is NaN, it means the value is stale, so we should not use it - take_indices.push(None); - } else { + if prev_ts + self.lookback_delta >= expected_ts { + // only use the point in the time range + if let Some(field_column) = &field_column + && field_column.value(prev_cursor).is_nan() { + // if the newest value is NaN, it means the value is stale, so we should not use it + continue; + } // use this point - take_indices.push(Some(prev_cursor as u64)); + take_indices.push(prev_cursor as u64); + aligned_ts.push(expected_ts); } - } else { - take_indices.push(None); } } else if let Some(field_column) = &field_column && field_column.value(cursor).is_nan() { // if the newest value is NaN, it means the value is stale, so we should not use it - take_indices.push(None); } else { // use this point - take_indices.push(Some(cursor as u64)); + take_indices.push(cursor as u64); + aligned_ts.push(expected_ts); } } @@ -415,19 +417,10 @@ impl InstantManipulateStream { fn take_record_batch_optional( &self, record_batch: RecordBatch, - take_indices: Vec>, + take_indices: Vec, aligned_ts: Vec, ) -> DataFusionResult { - let aligned_ts = aligned_ts - .into_iter() - .zip(take_indices.iter()) - .filter_map(|(ts, i)| i.map(|_| ts)) - .collect::>(); - let take_indices = take_indices - .iter() - .filter(|i| i.is_some()) - .copied() - .collect::>(); + assert_eq!(take_indices.len(), aligned_ts.len()); let indices_array = UInt64Array::from(take_indices); let mut arrays = record_batch @@ -801,4 +794,18 @@ mod test { ); do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await; } + + #[tokio::test] + async fn ultra_large_range() { + let expected = String::from( + "+-------------------------+-------+\ + \n| timestamp | value |\ + \n+-------------------------+-------+\ + \n| 1970-01-01T00:00:00.001 | 0.0 |\ + \n| 1970-01-01T00:01:00.001 | 6.0 |\ + \n| 1970-01-01T00:02:00.001 | 12.0 |\ + \n+-------------------------+-------+", + ); + do_normalize_test(1, 900_000_000_000_000, 10_000, 10_000, expected, true).await; + } }