perf: optimize promql range functions (#7878)

* bench(promql): add range-function benchmark suite

* perf(promql): use flat buffers in range function hot loops

* perf(promql): reuse quantile scratch buffers
This commit is contained in:
Ruihang Xia
2026-03-28 07:36:13 +08:00
committed by GitHub
parent 6f2ec12059
commit fe45ae446c
8 changed files with 653 additions and 118 deletions

9
Cargo.lock generated
View File

@@ -10215,6 +10215,7 @@ dependencies = [
"common-macro",
"common-recordbatch",
"common-telemetry",
"criterion 0.7.0",
"datafusion",
"datafusion-common",
"datafusion-expr",
@@ -11595,9 +11596,9 @@ dependencies = [
[[package]]
name = "rustls-webpki"
version = "0.103.10"
version = "0.103.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef"
checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435"
dependencies = [
"ring",
"rustls-pki-types",
@@ -13365,9 +13366,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "tar"
version = "0.4.45"
version = "0.4.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22692a6476a21fa75fdfc11d452fda482af402c008cdbaf3476414e122040973"
checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a"
dependencies = [
"filetime",
"libc",

View File

@@ -27,4 +27,12 @@ prost.workspace = true
snafu.workspace = true
[dev-dependencies]
criterion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
tokio.workspace = true
[[bench]]
name = "bench_main"
harness = false

View File

@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use criterion::criterion_main;
mod bench_range_fn;
criterion_main! {
bench_range_fn::benches
}

View File

@@ -0,0 +1,355 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Benchmarks for PromQL range functions.
use std::sync::Arc;
use criterion::{BenchmarkId, Criterion, criterion_group};
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_common::config::ConfigOptions;
use datafusion_expr::ScalarFunctionArgs;
use datatypes::arrow::datatypes::{DataType, Field};
use promql::functions::{Delta, IDelta, Increase, PredictLinear, QuantileOverTime, Rate};
use promql::range_array::RangeArray;
fn build_sliding_ranges(
num_points: usize,
window_size: u32,
values: Vec<f64>,
eval_offset_ms: i64,
) -> (RangeArray, RangeArray, Arc<TimestampMillisecondArray>) {
let step_ms = 1000i64;
let timestamps: Vec<i64> = (0..num_points as i64).map(|i| (i + 1) * step_ms).collect();
let ts_array = Arc::new(TimestampMillisecondArray::from(timestamps.clone()));
let val_array = Arc::new(Float64Array::from(values));
let num_windows = if num_points >= window_size as usize {
num_points - window_size as usize + 1
} else {
0
};
let ranges: Vec<(u32, u32)> = (0..num_windows).map(|i| (i as u32, window_size)).collect();
let eval_ts: Vec<i64> = (0..num_windows)
.map(|i| timestamps[i + window_size as usize - 1] + eval_offset_ms)
.collect();
let eval_ts_array = Arc::new(TimestampMillisecondArray::from(eval_ts));
let ts_range = RangeArray::from_ranges(ts_array, ranges.clone()).unwrap();
let val_range = RangeArray::from_ranges(val_array, ranges).unwrap();
(ts_range, val_range, eval_ts_array)
}
fn build_monotonic_counter_values(num_points: usize) -> Vec<f64> {
let mut current = 0.0;
(0..num_points)
.map(|i| {
current += 1.0 + (i % 7) as f64 * 0.25;
current
})
.collect()
}
fn build_resetting_counter_values(num_points: usize) -> Vec<f64> {
let mut current = 0.0;
(0..num_points)
.map(|i| {
if i > 0 && i % 37 == 0 {
current = 1.0;
} else {
current += 1.0 + (i % 5) as f64 * 0.5;
}
current
})
.collect()
}
fn build_gauge_values(num_points: usize) -> Vec<f64> {
(0..num_points)
.map(|i| ((i % 29) as f64 - 14.0) * 1.25 + (i % 3) as f64 * 0.1)
.collect()
}
fn build_default_values(num_points: usize) -> Vec<f64> {
(0..num_points).map(|i| i as f64 * 1.5 + 0.1).collect()
}
fn make_extrapolated_rate_input(
num_points: usize,
window_size: u32,
values: Vec<f64>,
eval_offset_ms: i64,
) -> Vec<ColumnarValue> {
let (ts_range, val_range, eval_ts) =
build_sliding_ranges(num_points, window_size, values, eval_offset_ms);
let range_length = window_size as i64 * 1000;
vec![
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(val_range.into_dict())),
ColumnarValue::Array(eval_ts),
ColumnarValue::Scalar(ScalarValue::Int64(Some(range_length))),
]
}
fn make_idelta_input(num_points: usize, window_size: u32) -> Vec<ColumnarValue> {
let (ts_range, val_range, _) =
build_sliding_ranges(num_points, window_size, build_default_values(num_points), 0);
vec![
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(val_range.into_dict())),
]
}
fn make_quantile_input(num_points: usize, window_size: u32) -> Vec<ColumnarValue> {
let (ts_range, val_range, _) =
build_sliding_ranges(num_points, window_size, build_default_values(num_points), 0);
vec![
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(val_range.into_dict())),
ColumnarValue::Scalar(ScalarValue::Float64(Some(0.9))),
]
}
fn make_predict_linear_input(num_points: usize, window_size: u32) -> Vec<ColumnarValue> {
let (ts_range, val_range, _) =
build_sliding_ranges(num_points, window_size, build_default_values(num_points), 0);
vec![
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(val_range.into_dict())),
// predict 60s into the future
ColumnarValue::Scalar(ScalarValue::Int64(Some(60))),
]
}
struct PreparedUdfCall {
args: Vec<ColumnarValue>,
arg_fields: Vec<Arc<Field>>,
number_rows: usize,
return_field: Arc<Field>,
config_options: Arc<ConfigOptions>,
}
impl PreparedUdfCall {
fn new(args: Vec<ColumnarValue>) -> Self {
let arg_fields = args
.iter()
.enumerate()
.map(|(i, c)| Arc::new(Field::new(format!("c{i}"), c.data_type(), true)))
.collect();
let number_rows = args
.iter()
.find_map(|c| match c {
ColumnarValue::Array(a) => Some(a.len()),
_ => None,
})
.unwrap_or(1);
Self {
args,
arg_fields,
number_rows,
return_field: Arc::new(Field::new("out", DataType::Float64, true)),
config_options: Arc::new(ConfigOptions::default()),
}
}
}
fn invoke_prepared(udf: &datafusion::logical_expr::ScalarUDF, prepared: &PreparedUdfCall) {
udf.invoke_with_args(ScalarFunctionArgs {
args: prepared.args.clone(),
arg_fields: prepared.arg_fields.clone(),
number_rows: prepared.number_rows,
return_field: prepared.return_field.clone(),
config_options: prepared.config_options.clone(),
})
.unwrap();
}
fn bench_range_functions(c: &mut Criterion) {
let mut group = c.benchmark_group("range_fn");
// Benchmark parameters: (total_points, window_size)
let params: &[(usize, u32)] = &[
(1_000, 10), // small series, small window
(10_000, 10), // large series, small window
(10_000, 60), // large series, typical 1-min window at 1s step
(10_000, 360), // large series, wide 6-min window
];
// --- rate (monotonic counter) ---
let rate_udf = Rate::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_extrapolated_rate_input(
n,
w,
build_monotonic_counter_values(n),
500,
));
group.bench_with_input(
BenchmarkId::new("rate_counter", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&rate_udf, &prepared)),
);
}
// --- rate (periodic resets) ---
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_extrapolated_rate_input(
n,
w,
build_resetting_counter_values(n),
500,
));
group.bench_with_input(
BenchmarkId::new("rate_counter_reset", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&rate_udf, &prepared)),
);
}
// --- increase (monotonic counter) ---
let increase_udf = Increase::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_extrapolated_rate_input(
n,
w,
build_monotonic_counter_values(n),
500,
));
group.bench_with_input(
BenchmarkId::new("increase_counter", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&increase_udf, &prepared)),
);
}
// --- increase (periodic resets) ---
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_extrapolated_rate_input(
n,
w,
build_resetting_counter_values(n),
500,
));
group.bench_with_input(
BenchmarkId::new("increase_counter_reset", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&increase_udf, &prepared)),
);
}
// --- delta (gauge) ---
let delta_udf = Delta::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_extrapolated_rate_input(
n,
w,
build_gauge_values(n),
500,
));
group.bench_with_input(
BenchmarkId::new("delta_gauge", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&delta_udf, &prepared)),
);
}
// --- idelta ---
let idelta_udf = IDelta::<false>::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_idelta_input(n, w));
group.bench_with_input(
BenchmarkId::new("idelta", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&idelta_udf, &prepared)),
);
}
// --- irate ---
let irate_udf = IDelta::<true>::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_idelta_input(n, w));
group.bench_with_input(
BenchmarkId::new("irate", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&irate_udf, &prepared)),
);
}
// --- quantile_over_time ---
let quantile_udf = QuantileOverTime::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_quantile_input(n, w));
group.bench_with_input(
BenchmarkId::new("quantile_over_time", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&quantile_udf, &prepared)),
);
}
// --- predict_linear ---
let predict_udf = PredictLinear::scalar_udf();
for &(n, w) in params {
let prepared = PreparedUdfCall::new(make_predict_linear_input(n, w));
group.bench_with_input(
BenchmarkId::new("predict_linear", format!("n{n}_w{w}")),
&(n, w),
|b, _| b.iter(|| invoke_prepared(&predict_udf, &prepared)),
);
}
// --- RangeArray: get vs get_offset_length micro-benchmark ---
// Isolates the overhead of array slicing vs offset/length lookup
for &(n, w) in params {
let step_ms = 1000i64;
let timestamps: Vec<i64> = (0..n as i64).map(|i| (i + 1) * step_ms).collect();
let ts_array = Arc::new(TimestampMillisecondArray::from(timestamps));
let num_windows = n - w as usize + 1;
let ranges: Vec<(u32, u32)> = (0..num_windows).map(|i| (i as u32, w)).collect();
let range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
group.bench_with_input(
BenchmarkId::new("range_array_get", format!("n{n}_w{w}")),
&(),
|b, _| {
b.iter(|| {
for i in 0..range_array.len() {
std::hint::black_box(range_array.get(i));
}
})
},
);
group.bench_with_input(
BenchmarkId::new("range_array_get_offset_length", format!("n{n}_w{w}")),
&(),
|b, _| {
b.iter(|| {
for i in 0..range_array.len() {
std::hint::black_box(range_array.get_offset_length(i));
}
})
},
);
}
group.finish();
}
criterion_group!(benches, bench_range_functions);

View File

@@ -31,9 +31,13 @@ pub use aggr_over_time::{
PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
};
pub use changes::Changes;
use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
use datafusion::arrow::array::{
ArrayRef, DictionaryArray, Float64Array, TimestampMillisecondArray,
};
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::Int64Type;
pub use deriv::Deriv;
pub use double_exponential_smoothing::DoubleExponentialSmoothing;
pub use extrapolate_rate::{Delta, Increase, Rate};
@@ -44,6 +48,8 @@ pub use quantile_aggr::{QUANTILE_NAME, quantile_udaf};
pub use resets::Resets;
pub use round::Round;
use crate::range_array::RangeArray;
/// Extracts an array from a `ColumnarValue`.
///
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
@@ -54,6 +60,24 @@ pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef,
}
}
/// Extracts a validated [RangeArray] from a [ColumnarValue].
pub(crate) fn extract_range_array(
columnar_value: &ColumnarValue,
) -> Result<RangeArray, DataFusionError> {
let array = extract_array(columnar_value)?;
let dict = array
.as_any()
.downcast_ref::<DictionaryArray<Int64Type>>()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"expected DictionaryArray<Int64>, found {}",
array.data_type()
))
})?
.clone();
RangeArray::try_new(dict).map_err(DataFusionError::from)
}
/// compensation(Kahan) summation algorithm - a technique for reducing the numerical error
/// in floating-point arithmetic. The algorithm also includes the modification ("Neumaier improvement")
/// that reduces the numerical error further in cases
@@ -78,6 +102,29 @@ pub(crate) fn linear_regression(
values: &Float64Array,
intercept_time: i64,
) -> (Option<f64>, Option<f64>) {
linear_regression_slice(times.values(), values, 0, values.len(), intercept_time)
}
pub(crate) fn linear_regression_slice(
times: &[i64],
values: &Float64Array,
offset: usize,
len: usize,
intercept_time: i64,
) -> (Option<f64>, Option<f64>) {
linear_regression_slices(times, offset, values, offset, len, intercept_time)
}
pub(crate) fn linear_regression_slices(
times: &[i64],
time_offset: usize,
values: &Float64Array,
value_offset: usize,
len: usize,
intercept_time: i64,
) -> (Option<f64>, Option<f64>) {
let raw_values = values.values();
let has_nulls = values.null_count() > 0;
let mut count: f64 = 0.0;
let mut sum_x: f64 = 0.0;
let mut sum_y: f64 = 0.0;
@@ -89,15 +136,18 @@ pub(crate) fn linear_regression(
let mut comp_x2: f64 = 0.0;
let mut const_y = true;
let init_y: f64 = values.value(0);
let mut init_y = None;
for (i, value) in values.iter().enumerate() {
let time = times.value(i) as f64;
if value.is_none() {
for i in 0..len {
let time_idx = time_offset + i;
let value_idx = value_offset + i;
if has_nulls && values.is_null(value_idx) {
continue;
}
let value = value.unwrap();
if const_y && i > 0 && value != init_y {
let value = raw_values[value_idx];
let time = times[time_idx] as f64;
let initial = init_y.get_or_insert(value);
if const_y && count > 0.0 && value != *initial {
const_y = false;
}
count += 1.0;
@@ -113,6 +163,7 @@ pub(crate) fn linear_regression(
}
if const_y {
let init_y = init_y.unwrap();
if !init_y.is_finite() {
return (None, None);
}
@@ -135,7 +186,14 @@ pub(crate) fn linear_regression(
#[cfg(test)]
mod test {
use std::sync::Arc;
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::Int64Array;
use datatypes::arrow::datatypes::Int64Type;
use super::*;
use crate::range_array::RangeArray;
#[test]
fn calculate_linear_regression_none() {
@@ -253,4 +311,26 @@ mod test {
}
assert_eq!(sum + c, 2.0)
}
#[test]
fn extract_range_array_rejects_external_dictionary_with_null_keys() {
let keys = Int64Array::from_iter([Some(0), None]);
let values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
let dict = DictionaryArray::<Int64Type>::try_new(keys, values).unwrap();
let err = extract_range_array(&ColumnarValue::Array(Arc::new(dict))).unwrap_err();
assert!(err.to_string().contains("Empty range is not expected"));
}
#[test]
fn extract_range_array_accepts_internal_packed_ranges() {
let values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
let range_array = RangeArray::from_ranges(values, [(0, 2), (1, 2)]).unwrap();
let extracted =
extract_range_array(&ColumnarValue::Array(Arc::new(range_array.into_dict()))).unwrap();
assert_eq!(extracted.get_offset_length(0), Some((0, 2)));
assert_eq!(extracted.get_offset_length(1), Some((1, 2)));
}
}

View File

@@ -15,7 +15,7 @@
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Volatility};
@@ -94,49 +94,54 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
)),
)?;
// calculation
let mut result_array = Vec::with_capacity(ts_range.len());
let ts_values = ts_range.values();
let ts_values = ts_values
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values();
let value_values = value_range.values();
let value_values = value_values
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
for index in 0..ts_range.len() {
let timestamps = ts_range.get(index).unwrap();
let timestamps = timestamps
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values();
let values = value_range.get(index).unwrap();
let values = values
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
let (ts_offset, len) = ts_range.get_offset_length(index).unwrap();
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
error::ensure(
timestamps.len() == values.len(),
len == value_len,
DataFusionError::Execution(format!(
"{}: input arrays should have the same length, found {} and {}",
Self::name(),
timestamps.len(),
values.len()
len,
value_len
)),
)?;
let len = timestamps.len();
if len < 2 {
result_array.push(None);
result_builder.append_null();
continue;
}
// if is delta
let last_offset = ts_offset + len - 1;
let prev_offset = last_offset - 1;
let sampled_interval =
(ts_values[last_offset] - ts_values[prev_offset]) as f64 / 1000.0;
let last_value_offset = value_offset + len - 1;
let prev_value_offset = last_value_offset - 1;
let last_value = value_values[last_value_offset];
let prev_value = value_values[prev_value_offset];
if !IS_RATE {
result_array.push(Some(values[len - 1] - values[len - 2]));
result_builder.append_value(last_value - prev_value);
continue;
}
// else is rate
let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) as f64 / 1000.0;
let last_value = values[len - 1];
let prev_value = values[len - 2];
let result_value = if last_value < prev_value {
// counter reset
last_value
@@ -144,10 +149,10 @@ impl<const IS_RATE: bool> IDelta<IS_RATE> {
last_value - prev_value
};
result_array.push(Some(result_value / sampled_interval as f64));
result_builder.append_value(result_value / sampled_interval);
}
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
Ok(result)
}
}

View File

@@ -17,7 +17,7 @@
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Volatility};
@@ -28,7 +28,7 @@ use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
use crate::error;
use crate::functions::{extract_array, linear_regression};
use crate::functions::{extract_range_array, linear_regression_slices};
use crate::range_array::RangeArray;
pub struct PredictLinear;
@@ -62,12 +62,10 @@ impl PredictLinear {
DataFusionError::Plan("prom_predict_linear function should have 3 inputs".to_string()),
)?;
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
let t_col = &input[2];
let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
let ts_range = extract_range_array(&input[0])?;
let value_range = extract_range_array(&input[1])?;
error::ensure(
ts_range.len() == value_range.len(),
DataFusionError::Execution(format!(
@@ -130,74 +128,85 @@ impl PredictLinear {
Box::new(t_array.iter())
}
};
let mut result_array = Vec::with_capacity(ts_range.len());
let all_timestamps = ts_range
.values()
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values();
let all_values = value_range
.values()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
for (index, t) in t_iter.enumerate() {
let (timestamps, values) = get_ts_values(&ts_range, &value_range, index, Self::name())?;
let ret = predict_linear_impl(&timestamps, &values, t.unwrap());
result_array.push(ret);
match predict_linear_impl(
&ts_range,
&value_range,
all_timestamps,
all_values,
index,
t.unwrap(),
Self::name(),
)? {
Some(value) => result_builder.append_value(value),
None => result_builder.append_null(),
}
}
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
Ok(result)
}
}
fn get_ts_values(
fn predict_linear_impl(
ts_range: &RangeArray,
value_range: &RangeArray,
all_timestamps: &[i64],
all_values: &Float64Array,
index: usize,
t: i64,
func_name: &str,
) -> Result<(TimestampMillisecondArray, Float64Array), DataFusionError> {
let timestamps = ts_range
.get(index)
.unwrap()
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.clone();
let values = value_range
.get(index)
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.clone();
) -> Result<Option<f64>, DataFusionError> {
let (ts_offset, ts_len) = ts_range.get_offset_length(index).unwrap();
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
error::ensure(
timestamps.len() == values.len(),
ts_len == value_len,
DataFusionError::Execution(format!(
"{}: time and value arrays in a group should have the same length, found {} and {}",
func_name,
timestamps.len(),
values.len()
func_name, ts_len, value_len
)),
)?;
Ok((timestamps, values))
}
fn predict_linear_impl(
timestamps: &TimestampMillisecondArray,
values: &Float64Array,
t: i64,
) -> Option<f64> {
if timestamps.len() < 2 {
return None;
if ts_len < 2 {
return Ok(None);
}
// last timestamp is evaluation timestamp
let evaluate_ts = timestamps.value(timestamps.len() - 1);
let (slope, intercept) = linear_regression(timestamps, values, evaluate_ts);
let evaluate_ts = all_timestamps[ts_offset + ts_len - 1];
let (slope, intercept) = linear_regression_slices(
all_timestamps,
ts_offset,
all_values,
value_offset,
value_len,
evaluate_ts,
);
if slope.is_none() || intercept.is_none() {
return None;
return Ok(None);
}
Some(slope.unwrap() * t as f64 + intercept.unwrap())
Ok(Some(slope.unwrap() * t as f64 + intercept.unwrap()))
}
#[cfg(test)]
mod test {
use std::vec;
use datafusion::arrow::array::{DictionaryArray, Int64Array};
use datatypes::arrow::datatypes::Int64Type;
use super::*;
use crate::functions::test_util::simple_range_udf_runner;
@@ -304,4 +313,44 @@ mod test {
vec![Some(82765.9090909091)],
);
}
#[test]
fn calculate_predict_linear_with_misaligned_offsets() {
let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
[0i64, 1000, 2000, 3000].into_iter().map(Some),
));
let value_values = Arc::new(Float64Array::from_iter([10.0, 20.0, 30.0]));
let ts_array = RangeArray::from_ranges(ts_values, [(1, 3)]).unwrap();
let value_array = RangeArray::from_ranges(value_values, [(0, 3)]).unwrap();
simple_range_udf_runner(
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(0))],
vec![Some(30.0)],
);
}
#[test]
fn predict_linear_rejects_external_dictionary_with_null_keys() {
let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
[0i64, 1000].into_iter().map(Some),
));
let ts_keys = Int64Array::from_iter([Some(0), None]);
let ts_dict = DictionaryArray::<Int64Type>::try_new(ts_keys, ts_values).unwrap();
let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
let value_keys = Int64Array::from_iter([Some(0), Some(1)]);
let value_dict = DictionaryArray::<Int64Type>::try_new(value_keys, value_values).unwrap();
let err = PredictLinear::predict_linear(&[
ColumnarValue::Array(Arc::new(ts_dict)),
ColumnarValue::Array(Arc::new(value_dict)),
ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
])
.unwrap_err();
assert!(err.to_string().contains("Empty range is not expected"));
}
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use datafusion::arrow::array::Float64Array;
use datafusion::arrow::array::{Float64Array, Float64Builder};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Volatility};
@@ -93,8 +93,14 @@ impl QuantileOverTime {
)),
)?;
// calculation
let mut result_array = Vec::with_capacity(ts_range.len());
let all_values = value_range
.values()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
let mut result_builder = Float64Builder::with_capacity(ts_range.len());
let mut scratch = Vec::new();
match quantile_col {
ColumnarValue::Scalar(quantile_scalar) => {
@@ -107,25 +113,26 @@ impl QuantileOverTime {
};
for index in 0..ts_range.len() {
let timestamps = ts_range.get(index).unwrap();
let values = value_range.get(index).unwrap();
let values = values
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
let (_, ts_len) = ts_range.get_offset_length(index).unwrap();
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
error::ensure(
timestamps.len() == values.len(),
ts_len == value_len,
DataFusionError::Execution(format!(
"{}: time and value arrays in a group should have the same length, found {} and {}",
Self::name(),
timestamps.len(),
values.len()
ts_len,
value_len
)),
)?;
let result = quantile_impl(values, quantile);
result_array.push(result);
match quantile_with_scratch(
&all_values[value_offset..value_offset + value_len],
quantile,
&mut scratch,
) {
Some(value) => result_builder.append_value(value),
None => result_builder.append_null(),
}
}
}
ColumnarValue::Array(quantile_array) => {
@@ -150,20 +157,15 @@ impl QuantileOverTime {
)),
)?;
for index in 0..ts_range.len() {
let timestamps = ts_range.get(index).unwrap();
let values = value_range.get(index).unwrap();
let values = values
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
let (_, ts_len) = ts_range.get_offset_length(index).unwrap();
let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
error::ensure(
timestamps.len() == values.len(),
ts_len == value_len,
DataFusionError::Execution(format!(
"{}: time and value arrays in a group should have the same length, found {} and {}",
Self::name(),
timestamps.len(),
values.len()
ts_len,
value_len
)),
)?;
let quantile = if quantile_array.is_null(index) {
@@ -171,19 +173,32 @@ impl QuantileOverTime {
} else {
quantile_array.value(index)
};
let result = quantile_impl(values, quantile);
result_array.push(result);
match quantile_with_scratch(
&all_values[value_offset..value_offset + value_len],
quantile,
&mut scratch,
) {
Some(value) => result_builder.append_value(value),
None => result_builder.append_null(),
}
}
}
}
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
Ok(result)
}
}
/// Refer to <https://github.com/prometheus/prometheus/blob/6e2905a4d4ff9b47b1f6d201333f5bd53633f921/promql/quantile.go#L357-L386>
pub(crate) fn quantile_impl(values: &[f64], quantile: f64) -> Option<f64> {
let mut scratch = Vec::new();
quantile_with_scratch(values, quantile, &mut scratch)
}
/// Same as [quantile_impl] but reuses a caller-provided scratch buffer to avoid
/// per-call allocation.
fn quantile_with_scratch(values: &[f64], quantile: f64, scratch: &mut Vec<f64>) -> Option<f64> {
if quantile.is_nan() || values.is_empty() {
return Some(f64::NAN);
}
@@ -194,17 +209,18 @@ pub(crate) fn quantile_impl(values: &[f64], quantile: f64) -> Option<f64> {
return Some(f64::INFINITY);
}
let mut values = values.to_vec();
values.sort_unstable_by(f64::total_cmp);
scratch.clear();
scratch.extend_from_slice(values);
scratch.sort_unstable_by(f64::total_cmp);
let length = values.len();
let length = scratch.len();
let rank = quantile * (length - 1) as f64;
let lower_index = 0.max(rank.floor() as usize);
let upper_index = (length - 1).min(lower_index + 1);
let weight = rank - rank.floor();
let result = values[lower_index] * (1.0 - weight) + values[upper_index] * weight;
let result = scratch[lower_index] * (1.0 - weight) + scratch[upper_index] * weight;
Some(result)
}