perf: optimize extrapolated rate op family (#7880)

* perf(promql): optimize extrapolated rate hot path

* more ut

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix gauge rate case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* adjust comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-04-13 12:11:56 -07:00
committed by GitHub
parent 32a2990802
commit 8ad77ce649
2 changed files with 441 additions and 101 deletions

View File

@@ -32,18 +32,19 @@
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::array::{
DictionaryArray, Float64Array, Float64Builder, TimestampMillisecondArray,
};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::{ScalarUDF, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datafusion_expr::create_udf;
use datatypes::arrow::array::{Array, Int64Array};
use datatypes::arrow::datatypes::DataType;
use datatypes::arrow::datatypes::{DataType, Int64Type};
use crate::extension_plan::Millisecond;
use crate::functions::extract_array;
use crate::range_array::RangeArray;
use crate::range_array::{RangeArray, unpack};
pub type Delta = ExtrapolatedRate<false, false>;
pub type Rate = ExtrapolatedRate<true, true>;
@@ -63,6 +64,17 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
Self { range_length }
}
fn func_name() -> &'static str {
match (IS_COUNTER, IS_RATE) {
(true, true) => "prom_rate",
(true, false) => "prom_increase",
(false, false) => "prom_delta",
(false, true) => {
unreachable!("gauge rate is not supported by ExtrapolatedRate")
}
}
}
fn scalar_udf_with_name(name: &str) -> ScalarUDF {
let input_types = vec![
// timestamp range vector
@@ -92,11 +104,23 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
}
let range_length_array = extract_array(&inputs[3])?;
let range_length = range_length_array
let range_length_array = range_length_array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
.ok_or_else(|| {
DataFusionError::Execution(format!(
"{}: expect Int64 as range length type, found {}",
Self::func_name(),
range_length_array.data_type()
))
})?;
if range_length_array.is_empty() || range_length_array.is_null(0) {
return Err(DataFusionError::Execution(format!(
"{}: range length must contain a non-null Int64 value",
Self::func_name()
)));
}
let range_length = range_length_array.value(0);
Ok(Self::new(range_length))
}
@@ -105,131 +129,209 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
/// * 0: timestamp range vector
/// * 1: value range vector
/// * 2: timestamp vector
/// * 3: range length. Range duration in millisecond. Not used here
/// * 3: range length. Range duration in milliseconds
fn calc(&self, input: &[ColumnarValue]) -> DfResult<ColumnarValue> {
assert_eq!(input.len(), 4);
if input.len() != 4 {
return Err(DataFusionError::Plan(
"ExtrapolatedRate function should have 4 inputs".to_string(),
));
}
// construct matrix from input
let ts_array = extract_array(&input[0])?;
let ts_range = RangeArray::try_new(ts_array.to_data().into())?;
let value_array = extract_array(&input[1])?;
let value_range = RangeArray::try_new(value_array.to_data().into())?;
let ts = extract_array(&input[2])?;
let ts = ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let ts_dict = extract_range_dict(
&input[0],
Self::func_name(),
"timestamp range vector",
&DataType::Timestamp(TimeUnit::Millisecond, None),
)?;
let value_dict = extract_range_dict(
&input[1],
Self::func_name(),
"value range vector",
&DataType::Float64,
)?;
let eval_ts_array = extract_eval_timestamps(&input[2], Self::func_name())?;
// calculation
let mut result_array = Vec::with_capacity(ts_range.len());
let keys = ts_dict.keys().values();
let num_windows = keys.len();
if value_dict.keys().len() != num_windows {
return Err(DataFusionError::Execution(format!(
"{}: timestamp and value ranges should have the same number of windows, found {} and {}",
Self::func_name(),
num_windows,
value_dict.keys().len()
)));
}
if value_dict.keys().values() != keys {
return Err(DataFusionError::Execution(format!(
"{}: timestamp and value ranges should have the same window layout",
Self::func_name()
)));
}
if eval_ts_array.len() != num_windows {
return Err(DataFusionError::Execution(format!(
"{}: evaluation timestamp vector should have the same number of rows as range inputs, found {} and {}",
Self::func_name(),
eval_ts_array.len(),
num_windows
)));
}
let all_timestamps = ts_range
let all_timestamps = ts_dict
.values()
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.expect("validated by extract_range_dict")
.values();
let all_values = value_range
let all_values = value_dict
.values()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.expect("validated by extract_range_dict")
.values();
for index in 0..ts_range.len() {
// Safety: we are inside `ts_range`'s iterator which guarantees the index is valid.
let (offset, length) = ts_range.get_offset_length(index).unwrap();
let eval_ts = eval_ts_array.values();
let timestamps = &all_timestamps[offset..offset + length];
let end_ts = ts.value(index);
let values = &all_values[offset..offset + length];
let mut result_builder = Float64Builder::with_capacity(num_windows);
let range_length = self.range_length;
let range_length_secs = range_length as f64 / 1000.0;
if values.len() < 2 {
result_array.push(None);
let mut counter_correction = 0.0;
let mut prev_offset = usize::MAX;
let mut prev_length = 0usize;
for index in 0..num_windows {
let (raw_offset, raw_length) = unpack(keys[index]);
let offset = raw_offset as usize;
let length = raw_length as usize;
if length < 2 {
result_builder.append_null();
prev_offset = usize::MAX;
continue;
}
// refer to functions.go L83-L110
let mut result_value = values.last().unwrap() - values.first().unwrap();
if IS_COUNTER {
for window in values.windows(2) {
let prev = window[0];
let curr = window[1];
if curr < prev {
result_value += prev
let end = offset + length;
let first_value = all_values[offset];
let last_value = all_values[end - 1];
let result_value = if IS_COUNTER {
// Adjacent normalized windows usually slide forward by one sample. Reuse the
// previous window's accumulated reset correction and adjust only the dropped and
// newly added edges, falling back to a full scan when the layout changes.
if prev_offset != usize::MAX && offset == prev_offset + 1 && length == prev_length {
if all_values[prev_offset + 1] < all_values[prev_offset] {
counter_correction -= all_values[prev_offset];
}
if all_values[end - 1] < all_values[end - 2] {
counter_correction += all_values[end - 2];
}
} else {
counter_correction = 0.0;
for pair in all_values[offset..end].windows(2) {
if pair[1] < pair[0] {
counter_correction += pair[0];
}
}
}
last_value - first_value + counter_correction
} else {
last_value - first_value
};
prev_offset = offset;
prev_length = length;
let first_ts = all_timestamps[offset];
let last_ts = all_timestamps[end - 1];
let range_end = eval_ts[index];
let range_start = range_end - range_length;
let sampled_interval_ms = (last_ts - first_ts) as f64;
let average_interval_ms = sampled_interval_ms / (length - 1) as f64;
let mut duration_to_start_ms = (first_ts - range_start) as f64;
let duration_to_end_ms = (range_end - last_ts) as f64;
// Counters cannot be negative, so Prometheus allows the extrapolation window to snap
// back to the inferred zero point instead of extending into negative values.
if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
let duration_to_zero = sampled_interval_ms * (first_value / result_value);
if duration_to_zero < duration_to_start_ms {
duration_to_start_ms = duration_to_zero;
}
}
let mut factor = Self::extrapolate_factor(
timestamps,
end_ts,
self.range_length,
*values.first().unwrap(),
result_value,
);
let extrapolation_threshold = average_interval_ms * 1.1;
let mut extrapolated_interval_ms = sampled_interval_ms;
// Mirror Prometheus extrapolation: extend to the real range boundary when a sample is
// close enough, otherwise add half an average sampling interval on that side.
if duration_to_start_ms < extrapolation_threshold {
extrapolated_interval_ms += duration_to_start_ms;
} else {
extrapolated_interval_ms += average_interval_ms / 2.0;
}
if duration_to_end_ms < extrapolation_threshold {
extrapolated_interval_ms += duration_to_end_ms;
} else {
extrapolated_interval_ms += average_interval_ms / 2.0;
}
let mut factor = extrapolated_interval_ms / sampled_interval_ms;
if IS_RATE {
// safety: range_length is checked to be non-zero in the planner.
factor /= self.range_length as f64 / 1000.0;
factor /= range_length_secs;
}
result_array.push(Some(result_value * factor));
result_builder.append_value(result_value * factor);
}
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
Ok(result)
}
}
fn extrapolate_factor(
timestamps: &[Millisecond],
range_end: Millisecond,
range_length: Millisecond,
// the following two parameters are for counters.
// see functions.go L121 - L127
first_value: f64,
result_value: f64,
) -> f64 {
// result_value
// refer to functions.go extrapolatedRate fn
// assume offset is processed (and it should be processed in normalize plan)
let range_start = range_end - range_length;
let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0;
let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0;
let sampled_interval =
(timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0;
let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64;
fn extract_range_dict(
columnar_value: &ColumnarValue,
func_name: &str,
arg_name: &str,
expected_value_type: &DataType,
) -> DfResult<DictionaryArray<Int64Type>> {
let array = extract_array(columnar_value)?;
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<Int64Type>>()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"{func_name}: expect {arg_name} as DictionaryArray<Int64>, found {}",
array.data_type()
))
})?
.clone();
// functions.go L122 - L134. quote:
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the
// zero point is shorter than the durationToStart, we
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
let duration_to_zero = sampled_interval * (first_value / result_value);
if duration_to_zero < duration_to_start {
duration_to_start = duration_to_zero;
}
}
let extrapolation_threshold = average_duration_between_samples * 1.1;
let mut extrapolate_to_interval = sampled_interval;
if duration_to_start < extrapolation_threshold {
extrapolate_to_interval += duration_to_start;
} else {
extrapolate_to_interval += average_duration_between_samples / 2.0;
}
if duration_to_end < extrapolation_threshold {
extrapolate_to_interval += duration_to_end;
} else {
extrapolate_to_interval += average_duration_between_samples / 2.0;
}
extrapolate_to_interval / sampled_interval
if &dict.value_type() != expected_value_type {
return Err(DataFusionError::Execution(format!(
"{func_name}: expect {arg_name} values of type {expected_value_type}, found {}",
dict.value_type()
)));
}
RangeArray::try_new(dict.clone()).map_err(DataFusionError::from)?;
Ok(dict)
}
fn extract_eval_timestamps(
columnar_value: &ColumnarValue,
func_name: &str,
) -> DfResult<TimestampMillisecondArray> {
let array = extract_array(columnar_value)?;
let timestamps = array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"{func_name}: expect evaluation timestamp vector as Timestamp(Millisecond), found {}",
array.data_type()
))
})?;
Ok(timestamps.clone())
}
// delta
@@ -287,6 +389,7 @@ impl Display for ExtrapolatedRate<true, false> {
mod test {
use datafusion::arrow::array::ArrayRef;
use datafusion_common::ScalarValue;
use super::*;
@@ -317,6 +420,80 @@ mod test {
assert_eq!(output, expected);
}
fn sample_range_inputs() -> (ColumnarValue, ColumnarValue, ColumnarValue) {
let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3].into_iter().map(Some),
));
let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
let ranges = [(0, 2), (1, 2)];
let ts_range = RangeArray::from_ranges(ts_values, ranges).unwrap();
let value_range = RangeArray::from_ranges(value_values, ranges).unwrap();
let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3].into_iter().map(Some),
)) as _;
(
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(eval_ts),
)
}
#[test]
fn rate_rejects_wrong_input_arity() {
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[])
.unwrap_err();
assert!(err.to_string().contains("should have 4 inputs"));
}
#[test]
fn rate_rejects_non_int64_range_length() {
let (ts_range, value_range, eval_ts) = sample_range_inputs();
let err = ExtrapolatedRate::<true, true>::create_function(&[
ts_range,
value_range,
eval_ts,
ColumnarValue::Scalar(ScalarValue::Float64(Some(5.0))),
])
.unwrap_err();
assert!(err.to_string().contains("range length type"));
}
#[test]
fn rate_rejects_empty_range_length() {
let (ts_range, value_range, eval_ts) = sample_range_inputs();
let err = ExtrapolatedRate::<true, true>::create_function(&[
ts_range,
value_range,
eval_ts,
ColumnarValue::Array(Arc::new(Int64Array::from(Vec::<i64>::new()))),
])
.unwrap_err();
assert!(err.to_string().contains("range length must contain"));
}
#[test]
fn rate_rejects_null_range_length() {
let (ts_range, value_range, eval_ts) = sample_range_inputs();
let err = ExtrapolatedRate::<true, true>::create_function(&[
ts_range,
value_range,
eval_ts,
ColumnarValue::Array(Arc::new(Int64Array::from(vec![None]))),
])
.unwrap_err();
assert!(err.to_string().contains("range length must contain"));
}
#[test]
fn increase_abnormal_input() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
@@ -449,6 +626,169 @@ mod test {
);
}
#[test]
fn increase_counter_reset_wide_windows() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7].into_iter().map(Some),
));
let values_array = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 2.0]));
let ranges = [(0, 4), (1, 4), (2, 4), (3, 4)];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[4, 5, 6, 7].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<true, false>(
ts_range,
value_range,
timestamps,
vec![4.0, 3.5, 3.5, 4.0],
);
}
#[test]
fn rate_rejects_non_array_timestamp_ranges() {
let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(eval_ts),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(err.to_string().contains("timestamp range vector"));
}
#[test]
fn rate_rejects_non_timestamp_timestamp_range_values() {
let ts_values = Arc::new(Int64Array::from_iter([1, 2]));
let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap();
let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(eval_ts),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(err.to_string().contains("values of type Timestamp"));
}
#[test]
fn rate_rejects_non_float_value_range_values() {
let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2].into_iter().map(Some),
));
let value_values = Arc::new(Int64Array::from_iter([1, 2]));
let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap();
let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(eval_ts),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(
err.to_string()
.contains("value range vector values of type Float64")
);
}
#[test]
fn rate_rejects_mismatched_range_counts() {
let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3].into_iter().map(Some),
));
let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap();
let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3].into_iter().map(Some),
));
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(eval_ts),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(err.to_string().contains("same number of windows"));
}
#[test]
fn rate_rejects_mismatched_range_layouts() {
let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4].into_iter().map(Some),
));
let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 4.0]));
let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap();
let value_range = RangeArray::from_ranges(value_values, [(0, 2), (2, 2)]).unwrap();
let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
[2, 4].into_iter().map(Some),
));
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(eval_ts),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(err.to_string().contains("same window layout"));
}
#[test]
fn rate_rejects_non_timestamp_eval_vector() {
let (ts_range, value_range, _) = sample_range_inputs();
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ts_range,
value_range,
ColumnarValue::Array(Arc::new(Float64Array::from_iter([2.0, 3.0]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(err.to_string().contains("evaluation timestamp vector"));
}
#[test]
fn rate_rejects_mismatched_eval_timestamp_rows() {
let (ts_range, value_range, _) = sample_range_inputs();
let err = ExtrapolatedRate::<true, true>::new(5)
.calc(&[
ts_range,
value_range,
ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from_iter([Some(2)]))),
ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
])
.unwrap_err();
assert!(err.to_string().contains("same number of rows"));
}
#[test]
fn rate_counter_reset() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(

View File

@@ -248,7 +248,7 @@ fn pack(offset: u32, length: u32) -> i64 {
bytemuck::cast::<[u32; 2], i64>([offset, length])
}
fn unpack(compound: i64) -> (u32, u32) {
pub(crate) fn unpack(compound: i64) -> (u32, u32) {
let [offset, length] = bytemuck::cast::<i64, [u32; 2]>(compound);
(offset, length)
}