From 8ad77ce649101b8cf10773882595cd45c9e10b4b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 13 Apr 2026 12:11:56 -0700 Subject: [PATCH] perf: optimize extrapolated rate op family (#7880) * perf(promql): optimize extrapolated rate hot path * more ut Signed-off-by: Ruihang Xia * fix gauge rate case Signed-off-by: Ruihang Xia * adjust comments Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/functions/extrapolate_rate.rs | 540 +++++++++++++++---- src/promql/src/range_array.rs | 2 +- 2 files changed, 441 insertions(+), 101 deletions(-) diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 8c3ab88776..726218de0f 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -32,18 +32,19 @@ use std::fmt::Display; use std::sync::Arc; -use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{ + DictionaryArray, Float64Array, Float64Builder, TimestampMillisecondArray, +}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::{DataFusionError, Result as DfResult}; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datafusion_expr::create_udf; use datatypes::arrow::array::{Array, Int64Array}; -use datatypes::arrow::datatypes::DataType; +use datatypes::arrow::datatypes::{DataType, Int64Type}; -use crate::extension_plan::Millisecond; use crate::functions::extract_array; -use crate::range_array::RangeArray; +use crate::range_array::{RangeArray, unpack}; pub type Delta = ExtrapolatedRate; pub type Rate = ExtrapolatedRate; @@ -63,6 +64,17 @@ impl ExtrapolatedRate &'static str { + match (IS_COUNTER, IS_RATE) { + (true, true) => "prom_rate", + (true, false) => "prom_increase", + (false, false) => "prom_delta", + (false, true) => { + unreachable!("gauge rate is not supported by ExtrapolatedRate") + } + } + } + fn scalar_udf_with_name(name: &str) -> ScalarUDF { let input_types = vec![ // timestamp range vector @@ -92,11 +104,23 @@ impl ExtrapolatedRate() - .unwrap() - .value(0); + .ok_or_else(|| { + DataFusionError::Execution(format!( + "{}: expect Int64 as range length type, found {}", + Self::func_name(), + range_length_array.data_type() + )) + })?; + if range_length_array.is_empty() || range_length_array.is_null(0) { + return Err(DataFusionError::Execution(format!( + "{}: range length must contain a non-null Int64 value", + Self::func_name() + ))); + } + let range_length = range_length_array.value(0); Ok(Self::new(range_length)) } @@ -105,131 +129,209 @@ impl ExtrapolatedRate DfResult { - assert_eq!(input.len(), 4); + if input.len() != 4 { + return Err(DataFusionError::Plan( + "ExtrapolatedRate function should have 4 inputs".to_string(), + )); + } - // construct matrix from input - let ts_array = extract_array(&input[0])?; - let ts_range = RangeArray::try_new(ts_array.to_data().into())?; - let value_array = extract_array(&input[1])?; - let value_range = RangeArray::try_new(value_array.to_data().into())?; - let ts = extract_array(&input[2])?; - let ts = ts - .as_any() - .downcast_ref::() - .unwrap(); + let ts_dict = extract_range_dict( + &input[0], + Self::func_name(), + "timestamp range vector", + &DataType::Timestamp(TimeUnit::Millisecond, None), + )?; + let value_dict = extract_range_dict( + &input[1], + Self::func_name(), + "value range vector", + &DataType::Float64, + )?; + let eval_ts_array = extract_eval_timestamps(&input[2], Self::func_name())?; - // calculation - let mut result_array = Vec::with_capacity(ts_range.len()); + let keys = ts_dict.keys().values(); + let num_windows = keys.len(); + if value_dict.keys().len() != num_windows { + return Err(DataFusionError::Execution(format!( + "{}: timestamp and value ranges should have the same number of windows, found {} and {}", + Self::func_name(), + num_windows, + value_dict.keys().len() + ))); + } + if value_dict.keys().values() != keys { + return Err(DataFusionError::Execution(format!( + "{}: timestamp and value ranges should have the same window layout", + Self::func_name() + ))); + } + if eval_ts_array.len() != num_windows { + return Err(DataFusionError::Execution(format!( + "{}: evaluation timestamp vector should have the same number of rows as range inputs, found {} and {}", + Self::func_name(), + eval_ts_array.len(), + num_windows + ))); + } - let all_timestamps = ts_range + let all_timestamps = ts_dict .values() .as_any() .downcast_ref::() - .unwrap() + .expect("validated by extract_range_dict") .values(); - let all_values = value_range + let all_values = value_dict .values() .as_any() .downcast_ref::() - .unwrap() + .expect("validated by extract_range_dict") .values(); - for index in 0..ts_range.len() { - // Safety: we are inside `ts_range`'s iterator which guarantees the index is valid. - let (offset, length) = ts_range.get_offset_length(index).unwrap(); + let eval_ts = eval_ts_array.values(); - let timestamps = &all_timestamps[offset..offset + length]; - let end_ts = ts.value(index); - let values = &all_values[offset..offset + length]; + let mut result_builder = Float64Builder::with_capacity(num_windows); + let range_length = self.range_length; + let range_length_secs = range_length as f64 / 1000.0; - if values.len() < 2 { - result_array.push(None); + let mut counter_correction = 0.0; + let mut prev_offset = usize::MAX; + let mut prev_length = 0usize; + + for index in 0..num_windows { + let (raw_offset, raw_length) = unpack(keys[index]); + let offset = raw_offset as usize; + let length = raw_length as usize; + + if length < 2 { + result_builder.append_null(); + prev_offset = usize::MAX; continue; } - // refer to functions.go L83-L110 - let mut result_value = values.last().unwrap() - values.first().unwrap(); - if IS_COUNTER { - for window in values.windows(2) { - let prev = window[0]; - let curr = window[1]; - if curr < prev { - result_value += prev + let end = offset + length; + let first_value = all_values[offset]; + let last_value = all_values[end - 1]; + + let result_value = if IS_COUNTER { + // Adjacent normalized windows usually slide forward by one sample. Reuse the + // previous window's accumulated reset correction and adjust only the dropped and + // newly added edges, falling back to a full scan when the layout changes. + if prev_offset != usize::MAX && offset == prev_offset + 1 && length == prev_length { + if all_values[prev_offset + 1] < all_values[prev_offset] { + counter_correction -= all_values[prev_offset]; } + if all_values[end - 1] < all_values[end - 2] { + counter_correction += all_values[end - 2]; + } + } else { + counter_correction = 0.0; + for pair in all_values[offset..end].windows(2) { + if pair[1] < pair[0] { + counter_correction += pair[0]; + } + } + } + last_value - first_value + counter_correction + } else { + last_value - first_value + }; + + prev_offset = offset; + prev_length = length; + + let first_ts = all_timestamps[offset]; + let last_ts = all_timestamps[end - 1]; + let range_end = eval_ts[index]; + let range_start = range_end - range_length; + let sampled_interval_ms = (last_ts - first_ts) as f64; + let average_interval_ms = sampled_interval_ms / (length - 1) as f64; + let mut duration_to_start_ms = (first_ts - range_start) as f64; + let duration_to_end_ms = (range_end - last_ts) as f64; + + // Counters cannot be negative, so Prometheus allows the extrapolation window to snap + // back to the inferred zero point instead of extending into negative values. + if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 { + let duration_to_zero = sampled_interval_ms * (first_value / result_value); + if duration_to_zero < duration_to_start_ms { + duration_to_start_ms = duration_to_zero; } } - let mut factor = Self::extrapolate_factor( - timestamps, - end_ts, - self.range_length, - *values.first().unwrap(), - result_value, - ); + let extrapolation_threshold = average_interval_ms * 1.1; + let mut extrapolated_interval_ms = sampled_interval_ms; + + // Mirror Prometheus extrapolation: extend to the real range boundary when a sample is + // close enough, otherwise add half an average sampling interval on that side. + if duration_to_start_ms < extrapolation_threshold { + extrapolated_interval_ms += duration_to_start_ms; + } else { + extrapolated_interval_ms += average_interval_ms / 2.0; + } + if duration_to_end_ms < extrapolation_threshold { + extrapolated_interval_ms += duration_to_end_ms; + } else { + extrapolated_interval_ms += average_interval_ms / 2.0; + } + + let mut factor = extrapolated_interval_ms / sampled_interval_ms; if IS_RATE { - // safety: range_length is checked to be non-zero in the planner. - factor /= self.range_length as f64 / 1000.0; + factor /= range_length_secs; } - result_array.push(Some(result_value * factor)); + result_builder.append_value(result_value * factor); } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } +} - fn extrapolate_factor( - timestamps: &[Millisecond], - range_end: Millisecond, - range_length: Millisecond, - // the following two parameters are for counters. - // see functions.go L121 - L127 - first_value: f64, - result_value: f64, - ) -> f64 { - // result_value - // refer to functions.go extrapolatedRate fn - // assume offset is processed (and it should be processed in normalize plan) - let range_start = range_end - range_length; - let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0; - let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0; - let sampled_interval = - (timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0; - let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64; +fn extract_range_dict( + columnar_value: &ColumnarValue, + func_name: &str, + arg_name: &str, + expected_value_type: &DataType, +) -> DfResult> { + let array = extract_array(columnar_value)?; + let dict = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + DataFusionError::Execution(format!( + "{func_name}: expect {arg_name} as DictionaryArray, found {}", + array.data_type() + )) + })? + .clone(); - // functions.go L122 - L134. quote: - // Counters cannot be negative. If we have any slope at - // all (i.e. resultValue went up), we can extrapolate - // the zero point of the counter. If the duration to the - // zero point is shorter than the durationToStart, we - // take the zero point as the start of the series, - // thereby avoiding extrapolation to negative counter - // values. - if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 { - let duration_to_zero = sampled_interval * (first_value / result_value); - if duration_to_zero < duration_to_start { - duration_to_start = duration_to_zero; - } - } - - let extrapolation_threshold = average_duration_between_samples * 1.1; - let mut extrapolate_to_interval = sampled_interval; - - if duration_to_start < extrapolation_threshold { - extrapolate_to_interval += duration_to_start; - } else { - extrapolate_to_interval += average_duration_between_samples / 2.0; - } - if duration_to_end < extrapolation_threshold { - extrapolate_to_interval += duration_to_end; - } else { - extrapolate_to_interval += average_duration_between_samples / 2.0; - } - - extrapolate_to_interval / sampled_interval + if &dict.value_type() != expected_value_type { + return Err(DataFusionError::Execution(format!( + "{func_name}: expect {arg_name} values of type {expected_value_type}, found {}", + dict.value_type() + ))); } + + RangeArray::try_new(dict.clone()).map_err(DataFusionError::from)?; + Ok(dict) +} + +fn extract_eval_timestamps( + columnar_value: &ColumnarValue, + func_name: &str, +) -> DfResult { + let array = extract_array(columnar_value)?; + let timestamps = array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Execution(format!( + "{func_name}: expect evaluation timestamp vector as Timestamp(Millisecond), found {}", + array.data_type() + )) + })?; + Ok(timestamps.clone()) } // delta @@ -287,6 +389,7 @@ impl Display for ExtrapolatedRate { mod test { use datafusion::arrow::array::ArrayRef; + use datafusion_common::ScalarValue; use super::*; @@ -317,6 +420,80 @@ mod test { assert_eq!(output, expected); } + fn sample_range_inputs() -> (ColumnarValue, ColumnarValue, ColumnarValue) { + let ts_values = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3].into_iter().map(Some), + )); + let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0])); + let ranges = [(0, 2), (1, 2)]; + + let ts_range = RangeArray::from_ranges(ts_values, ranges).unwrap(); + let value_range = RangeArray::from_ranges(value_values, ranges).unwrap(); + let eval_ts = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3].into_iter().map(Some), + )) as _; + + ( + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(eval_ts), + ) + } + + #[test] + fn rate_rejects_wrong_input_arity() { + let err = ExtrapolatedRate::::new(5) + .calc(&[]) + .unwrap_err(); + + assert!(err.to_string().contains("should have 4 inputs")); + } + + #[test] + fn rate_rejects_non_int64_range_length() { + let (ts_range, value_range, eval_ts) = sample_range_inputs(); + + let err = ExtrapolatedRate::::create_function(&[ + ts_range, + value_range, + eval_ts, + ColumnarValue::Scalar(ScalarValue::Float64(Some(5.0))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("range length type")); + } + + #[test] + fn rate_rejects_empty_range_length() { + let (ts_range, value_range, eval_ts) = sample_range_inputs(); + + let err = ExtrapolatedRate::::create_function(&[ + ts_range, + value_range, + eval_ts, + ColumnarValue::Array(Arc::new(Int64Array::from(Vec::::new()))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("range length must contain")); + } + + #[test] + fn rate_rejects_null_range_length() { + let (ts_range, value_range, eval_ts) = sample_range_inputs(); + + let err = ExtrapolatedRate::::create_function(&[ + ts_range, + value_range, + eval_ts, + ColumnarValue::Array(Arc::new(Int64Array::from(vec![None]))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("range length must contain")); + } + #[test] fn increase_abnormal_input() { let ts_array = Arc::new(TimestampMillisecondArray::from_iter( @@ -449,6 +626,169 @@ mod test { ); } + #[test] + fn increase_counter_reset_wide_windows() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4, 5, 6, 7].into_iter().map(Some), + )); + let values_array = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 2.0])); + let ranges = [(0, 4), (1, 4), (2, 4), (3, 4)]; + let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range = RangeArray::from_ranges(values_array, ranges).unwrap(); + let timestamps = Arc::new(TimestampMillisecondArray::from_iter( + [4, 5, 6, 7].into_iter().map(Some), + )) as _; + extrapolated_rate_runner::( + ts_range, + value_range, + timestamps, + vec![4.0, 3.5, 3.5, 4.0], + ); + } + + #[test] + fn rate_rejects_non_array_timestamp_ranges() { + let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0])); + let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap(); + let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)])); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ColumnarValue::Scalar(ScalarValue::Int64(Some(0))), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(eval_ts), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("timestamp range vector")); + } + + #[test] + fn rate_rejects_non_timestamp_timestamp_range_values() { + let ts_values = Arc::new(Int64Array::from_iter([1, 2])); + let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0])); + let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap(); + let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap(); + let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)])); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(eval_ts), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("values of type Timestamp")); + } + + #[test] + fn rate_rejects_non_float_value_range_values() { + let ts_values = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2].into_iter().map(Some), + )); + let value_values = Arc::new(Int64Array::from_iter([1, 2])); + let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap(); + let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap(); + let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)])); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(eval_ts), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!( + err.to_string() + .contains("value range vector values of type Float64") + ); + } + + #[test] + fn rate_rejects_mismatched_range_counts() { + let ts_values = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3].into_iter().map(Some), + )); + let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0])); + let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap(); + let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap(); + let eval_ts = Arc::new(TimestampMillisecondArray::from_iter( + [2, 3].into_iter().map(Some), + )); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(eval_ts), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("same number of windows")); + } + + #[test] + fn rate_rejects_mismatched_range_layouts() { + let ts_values = Arc::new(TimestampMillisecondArray::from_iter( + [1, 2, 3, 4].into_iter().map(Some), + )); + let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 4.0])); + let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap(); + let value_range = RangeArray::from_ranges(value_values, [(0, 2), (2, 2)]).unwrap(); + let eval_ts = Arc::new(TimestampMillisecondArray::from_iter( + [2, 4].into_iter().map(Some), + )); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(value_range.into_dict())), + ColumnarValue::Array(eval_ts), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("same window layout")); + } + + #[test] + fn rate_rejects_non_timestamp_eval_vector() { + let (ts_range, value_range, _) = sample_range_inputs(); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ts_range, + value_range, + ColumnarValue::Array(Arc::new(Float64Array::from_iter([2.0, 3.0]))), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("evaluation timestamp vector")); + } + + #[test] + fn rate_rejects_mismatched_eval_timestamp_rows() { + let (ts_range, value_range, _) = sample_range_inputs(); + + let err = ExtrapolatedRate::::new(5) + .calc(&[ + ts_range, + value_range, + ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from_iter([Some(2)]))), + ColumnarValue::Scalar(ScalarValue::Int64(Some(5))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("same number of rows")); + } + #[test] fn rate_counter_reset() { let ts_array = Arc::new(TimestampMillisecondArray::from_iter( diff --git a/src/promql/src/range_array.rs b/src/promql/src/range_array.rs index 4af5217a76..dccc199181 100644 --- a/src/promql/src/range_array.rs +++ b/src/promql/src/range_array.rs @@ -248,7 +248,7 @@ fn pack(offset: u32, length: u32) -> i64 { bytemuck::cast::<[u32; 2], i64>([offset, length]) } -fn unpack(compound: i64) -> (u32, u32) { +pub(crate) fn unpack(compound: i64) -> (u32, u32) { let [offset, length] = bytemuck::cast::(compound); (offset, length) }