diff --git a/Cargo.lock b/Cargo.lock index 676eaf0822..0f3b58b373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10215,6 +10215,7 @@ dependencies = [ "common-macro", "common-recordbatch", "common-telemetry", + "criterion 0.7.0", "datafusion", "datafusion-common", "datafusion-expr", @@ -11595,9 +11596,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.10" +version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df33b2b81ac578cabaf06b89b0631153a3f416b0a886e8a7a1707fb51abbd1ef" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ "ring", "rustls-pki-types", @@ -13365,9 +13366,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tar" -version = "0.4.45" +version = "0.4.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22692a6476a21fa75fdfc11d452fda482af402c008cdbaf3476414e122040973" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" dependencies = [ "filetime", "libc", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index f93cb8beb9..306563d1ce 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -27,4 +27,12 @@ prost.workspace = true snafu.workspace = true [dev-dependencies] +criterion.workspace = true +datafusion-common.workspace = true +datafusion-expr.workspace = true +datatypes.workspace = true tokio.workspace = true + +[[bench]] +name = "bench_main" +harness = false diff --git a/src/promql/benches/bench_main.rs b/src/promql/benches/bench_main.rs new file mode 100644 index 0000000000..2d93887041 --- /dev/null +++ b/src/promql/benches/bench_main.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::criterion_main; + +mod bench_range_fn; + +criterion_main! { + bench_range_fn::benches +} diff --git a/src/promql/benches/bench_range_fn.rs b/src/promql/benches/bench_range_fn.rs new file mode 100644 index 0000000000..840956b942 --- /dev/null +++ b/src/promql/benches/bench_range_fn.rs @@ -0,0 +1,355 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Benchmarks for PromQL range functions. + +use std::sync::Arc; + +use criterion::{BenchmarkId, Criterion, criterion_group}; +use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::physical_plan::ColumnarValue; +use datafusion_common::ScalarValue; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::ScalarFunctionArgs; +use datatypes::arrow::datatypes::{DataType, Field}; +use promql::functions::{Delta, IDelta, Increase, PredictLinear, QuantileOverTime, Rate}; +use promql::range_array::RangeArray; + +fn build_sliding_ranges( + num_points: usize, + window_size: u32, + values: Vec, + eval_offset_ms: i64, +) -> (RangeArray, RangeArray, Arc) { + let step_ms = 1000i64; + let timestamps: Vec = (0..num_points as i64).map(|i| (i + 1) * step_ms).collect(); + + let ts_array = Arc::new(TimestampMillisecondArray::from(timestamps.clone())); + let val_array = Arc::new(Float64Array::from(values)); + + let num_windows = if num_points >= window_size as usize { + num_points - window_size as usize + 1 + } else { + 0 + }; + + let ranges: Vec<(u32, u32)> = (0..num_windows).map(|i| (i as u32, window_size)).collect(); + + let eval_ts: Vec = (0..num_windows) + .map(|i| timestamps[i + window_size as usize - 1] + eval_offset_ms) + .collect(); + let eval_ts_array = Arc::new(TimestampMillisecondArray::from(eval_ts)); + + let ts_range = RangeArray::from_ranges(ts_array, ranges.clone()).unwrap(); + let val_range = RangeArray::from_ranges(val_array, ranges).unwrap(); + + (ts_range, val_range, eval_ts_array) +} + +fn build_monotonic_counter_values(num_points: usize) -> Vec { + let mut current = 0.0; + (0..num_points) + .map(|i| { + current += 1.0 + (i % 7) as f64 * 0.25; + current + }) + .collect() +} + +fn build_resetting_counter_values(num_points: usize) -> Vec { + let mut current = 0.0; + (0..num_points) + .map(|i| { + if i > 0 && i % 37 == 0 { + current = 1.0; + } else { + current += 1.0 + (i % 5) as f64 * 0.5; + } + current + }) + .collect() +} + +fn build_gauge_values(num_points: usize) -> Vec { + (0..num_points) + .map(|i| ((i % 29) as f64 - 14.0) * 1.25 + (i % 3) as f64 * 0.1) + .collect() +} + +fn build_default_values(num_points: usize) -> Vec { + (0..num_points).map(|i| i as f64 * 1.5 + 0.1).collect() +} + +fn make_extrapolated_rate_input( + num_points: usize, + window_size: u32, + values: Vec, + eval_offset_ms: i64, +) -> Vec { + let (ts_range, val_range, eval_ts) = + build_sliding_ranges(num_points, window_size, values, eval_offset_ms); + let range_length = window_size as i64 * 1000; + vec![ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(val_range.into_dict())), + ColumnarValue::Array(eval_ts), + ColumnarValue::Scalar(ScalarValue::Int64(Some(range_length))), + ] +} + +fn make_idelta_input(num_points: usize, window_size: u32) -> Vec { + let (ts_range, val_range, _) = + build_sliding_ranges(num_points, window_size, build_default_values(num_points), 0); + vec![ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(val_range.into_dict())), + ] +} + +fn make_quantile_input(num_points: usize, window_size: u32) -> Vec { + let (ts_range, val_range, _) = + build_sliding_ranges(num_points, window_size, build_default_values(num_points), 0); + vec![ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(val_range.into_dict())), + ColumnarValue::Scalar(ScalarValue::Float64(Some(0.9))), + ] +} + +fn make_predict_linear_input(num_points: usize, window_size: u32) -> Vec { + let (ts_range, val_range, _) = + build_sliding_ranges(num_points, window_size, build_default_values(num_points), 0); + vec![ + ColumnarValue::Array(Arc::new(ts_range.into_dict())), + ColumnarValue::Array(Arc::new(val_range.into_dict())), + // predict 60s into the future + ColumnarValue::Scalar(ScalarValue::Int64(Some(60))), + ] +} + +struct PreparedUdfCall { + args: Vec, + arg_fields: Vec>, + number_rows: usize, + return_field: Arc, + config_options: Arc, +} + +impl PreparedUdfCall { + fn new(args: Vec) -> Self { + let arg_fields = args + .iter() + .enumerate() + .map(|(i, c)| Arc::new(Field::new(format!("c{i}"), c.data_type(), true))) + .collect(); + let number_rows = args + .iter() + .find_map(|c| match c { + ColumnarValue::Array(a) => Some(a.len()), + _ => None, + }) + .unwrap_or(1); + Self { + args, + arg_fields, + number_rows, + return_field: Arc::new(Field::new("out", DataType::Float64, true)), + config_options: Arc::new(ConfigOptions::default()), + } + } +} + +fn invoke_prepared(udf: &datafusion::logical_expr::ScalarUDF, prepared: &PreparedUdfCall) { + udf.invoke_with_args(ScalarFunctionArgs { + args: prepared.args.clone(), + arg_fields: prepared.arg_fields.clone(), + number_rows: prepared.number_rows, + return_field: prepared.return_field.clone(), + config_options: prepared.config_options.clone(), + }) + .unwrap(); +} + +fn bench_range_functions(c: &mut Criterion) { + let mut group = c.benchmark_group("range_fn"); + + // Benchmark parameters: (total_points, window_size) + let params: &[(usize, u32)] = &[ + (1_000, 10), // small series, small window + (10_000, 10), // large series, small window + (10_000, 60), // large series, typical 1-min window at 1s step + (10_000, 360), // large series, wide 6-min window + ]; + + // --- rate (monotonic counter) --- + let rate_udf = Rate::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_extrapolated_rate_input( + n, + w, + build_monotonic_counter_values(n), + 500, + )); + group.bench_with_input( + BenchmarkId::new("rate_counter", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&rate_udf, &prepared)), + ); + } + + // --- rate (periodic resets) --- + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_extrapolated_rate_input( + n, + w, + build_resetting_counter_values(n), + 500, + )); + group.bench_with_input( + BenchmarkId::new("rate_counter_reset", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&rate_udf, &prepared)), + ); + } + + // --- increase (monotonic counter) --- + let increase_udf = Increase::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_extrapolated_rate_input( + n, + w, + build_monotonic_counter_values(n), + 500, + )); + group.bench_with_input( + BenchmarkId::new("increase_counter", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&increase_udf, &prepared)), + ); + } + + // --- increase (periodic resets) --- + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_extrapolated_rate_input( + n, + w, + build_resetting_counter_values(n), + 500, + )); + group.bench_with_input( + BenchmarkId::new("increase_counter_reset", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&increase_udf, &prepared)), + ); + } + + // --- delta (gauge) --- + let delta_udf = Delta::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_extrapolated_rate_input( + n, + w, + build_gauge_values(n), + 500, + )); + group.bench_with_input( + BenchmarkId::new("delta_gauge", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&delta_udf, &prepared)), + ); + } + + // --- idelta --- + let idelta_udf = IDelta::::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_idelta_input(n, w)); + group.bench_with_input( + BenchmarkId::new("idelta", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&idelta_udf, &prepared)), + ); + } + + // --- irate --- + let irate_udf = IDelta::::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_idelta_input(n, w)); + group.bench_with_input( + BenchmarkId::new("irate", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&irate_udf, &prepared)), + ); + } + + // --- quantile_over_time --- + let quantile_udf = QuantileOverTime::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_quantile_input(n, w)); + group.bench_with_input( + BenchmarkId::new("quantile_over_time", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&quantile_udf, &prepared)), + ); + } + + // --- predict_linear --- + let predict_udf = PredictLinear::scalar_udf(); + for &(n, w) in params { + let prepared = PreparedUdfCall::new(make_predict_linear_input(n, w)); + group.bench_with_input( + BenchmarkId::new("predict_linear", format!("n{n}_w{w}")), + &(n, w), + |b, _| b.iter(|| invoke_prepared(&predict_udf, &prepared)), + ); + } + + // --- RangeArray: get vs get_offset_length micro-benchmark --- + // Isolates the overhead of array slicing vs offset/length lookup + for &(n, w) in params { + let step_ms = 1000i64; + let timestamps: Vec = (0..n as i64).map(|i| (i + 1) * step_ms).collect(); + let ts_array = Arc::new(TimestampMillisecondArray::from(timestamps)); + let num_windows = n - w as usize + 1; + let ranges: Vec<(u32, u32)> = (0..num_windows).map(|i| (i as u32, w)).collect(); + let range_array = RangeArray::from_ranges(ts_array, ranges).unwrap(); + + group.bench_with_input( + BenchmarkId::new("range_array_get", format!("n{n}_w{w}")), + &(), + |b, _| { + b.iter(|| { + for i in 0..range_array.len() { + std::hint::black_box(range_array.get(i)); + } + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("range_array_get_offset_length", format!("n{n}_w{w}")), + &(), + |b, _| { + b.iter(|| { + for i in 0..range_array.len() { + std::hint::black_box(range_array.get_offset_length(i)); + } + }) + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_range_functions); diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index e392d7dcf5..7c7452566a 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -31,9 +31,13 @@ pub use aggr_over_time::{ PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime, }; pub use changes::Changes; -use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{ + ArrayRef, DictionaryArray, Float64Array, TimestampMillisecondArray, +}; use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; +use datatypes::arrow::array::Array; +use datatypes::arrow::datatypes::Int64Type; pub use deriv::Deriv; pub use double_exponential_smoothing::DoubleExponentialSmoothing; pub use extrapolate_rate::{Delta, Increase, Rate}; @@ -44,6 +48,8 @@ pub use quantile_aggr::{QUANTILE_NAME, quantile_udaf}; pub use resets::Resets; pub use round::Round; +use crate::range_array::RangeArray; + /// Extracts an array from a `ColumnarValue`. /// /// If the `ColumnarValue` is a scalar, it converts it to an array of size 1. @@ -54,6 +60,24 @@ pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result Result { + let array = extract_array(columnar_value)?; + let dict = array + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + DataFusionError::Execution(format!( + "expected DictionaryArray, found {}", + array.data_type() + )) + })? + .clone(); + RangeArray::try_new(dict).map_err(DataFusionError::from) +} + /// compensation(Kahan) summation algorithm - a technique for reducing the numerical error /// in floating-point arithmetic. The algorithm also includes the modification ("Neumaier improvement") /// that reduces the numerical error further in cases @@ -78,6 +102,29 @@ pub(crate) fn linear_regression( values: &Float64Array, intercept_time: i64, ) -> (Option, Option) { + linear_regression_slice(times.values(), values, 0, values.len(), intercept_time) +} + +pub(crate) fn linear_regression_slice( + times: &[i64], + values: &Float64Array, + offset: usize, + len: usize, + intercept_time: i64, +) -> (Option, Option) { + linear_regression_slices(times, offset, values, offset, len, intercept_time) +} + +pub(crate) fn linear_regression_slices( + times: &[i64], + time_offset: usize, + values: &Float64Array, + value_offset: usize, + len: usize, + intercept_time: i64, +) -> (Option, Option) { + let raw_values = values.values(); + let has_nulls = values.null_count() > 0; let mut count: f64 = 0.0; let mut sum_x: f64 = 0.0; let mut sum_y: f64 = 0.0; @@ -89,15 +136,18 @@ pub(crate) fn linear_regression( let mut comp_x2: f64 = 0.0; let mut const_y = true; - let init_y: f64 = values.value(0); + let mut init_y = None; - for (i, value) in values.iter().enumerate() { - let time = times.value(i) as f64; - if value.is_none() { + for i in 0..len { + let time_idx = time_offset + i; + let value_idx = value_offset + i; + if has_nulls && values.is_null(value_idx) { continue; } - let value = value.unwrap(); - if const_y && i > 0 && value != init_y { + let value = raw_values[value_idx]; + let time = times[time_idx] as f64; + let initial = init_y.get_or_insert(value); + if const_y && count > 0.0 && value != *initial { const_y = false; } count += 1.0; @@ -113,6 +163,7 @@ pub(crate) fn linear_regression( } if const_y { + let init_y = init_y.unwrap(); if !init_y.is_finite() { return (None, None); } @@ -135,7 +186,14 @@ pub(crate) fn linear_regression( #[cfg(test)] mod test { + use std::sync::Arc; + + use datafusion::physical_plan::ColumnarValue; + use datatypes::arrow::array::Int64Array; + use datatypes::arrow::datatypes::Int64Type; + use super::*; + use crate::range_array::RangeArray; #[test] fn calculate_linear_regression_none() { @@ -253,4 +311,26 @@ mod test { } assert_eq!(sum + c, 2.0) } + + #[test] + fn extract_range_array_rejects_external_dictionary_with_null_keys() { + let keys = Int64Array::from_iter([Some(0), None]); + let values = Arc::new(Float64Array::from_iter([1.0, 2.0])); + let dict = DictionaryArray::::try_new(keys, values).unwrap(); + + let err = extract_range_array(&ColumnarValue::Array(Arc::new(dict))).unwrap_err(); + assert!(err.to_string().contains("Empty range is not expected")); + } + + #[test] + fn extract_range_array_accepts_internal_packed_ranges() { + let values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0])); + let range_array = RangeArray::from_ranges(values, [(0, 2), (1, 2)]).unwrap(); + + let extracted = + extract_range_array(&ColumnarValue::Array(Arc::new(range_array.into_dict()))).unwrap(); + + assert_eq!(extracted.get_offset_length(0), Some((0, 2))); + assert_eq!(extracted.get_offset_length(1), Some((1, 2))); + } } diff --git a/src/promql/src/functions/idelta.rs b/src/promql/src/functions/idelta.rs index eeec9a4be9..0772b0bf1e 100644 --- a/src/promql/src/functions/idelta.rs +++ b/src/promql/src/functions/idelta.rs @@ -15,7 +15,7 @@ use std::fmt::Display; use std::sync::Arc; -use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -94,49 +94,54 @@ impl IDelta { )), )?; - // calculation - let mut result_array = Vec::with_capacity(ts_range.len()); + let ts_values = ts_range.values(); + let ts_values = ts_values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + let value_values = value_range.values(); + let value_values = value_values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); for index in 0..ts_range.len() { - let timestamps = ts_range.get(index).unwrap(); - let timestamps = timestamps - .as_any() - .downcast_ref::() - .unwrap() - .values(); - - let values = value_range.get(index).unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + let (ts_offset, len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + len == value_len, DataFusionError::Execution(format!( "{}: input arrays should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + len, + value_len )), )?; - - let len = timestamps.len(); if len < 2 { - result_array.push(None); + result_builder.append_null(); continue; } - // if is delta + let last_offset = ts_offset + len - 1; + let prev_offset = last_offset - 1; + let sampled_interval = + (ts_values[last_offset] - ts_values[prev_offset]) as f64 / 1000.0; + + let last_value_offset = value_offset + len - 1; + let prev_value_offset = last_value_offset - 1; + let last_value = value_values[last_value_offset]; + let prev_value = value_values[prev_value_offset]; + if !IS_RATE { - result_array.push(Some(values[len - 1] - values[len - 2])); + result_builder.append_value(last_value - prev_value); continue; } - // else is rate - let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) as f64 / 1000.0; - let last_value = values[len - 1]; - let prev_value = values[len - 2]; let result_value = if last_value < prev_value { // counter reset last_value @@ -144,10 +149,10 @@ impl IDelta { last_value - prev_value }; - result_array.push(Some(result_value / sampled_interval as f64)); + result_builder.append_value(result_value / sampled_interval); } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } diff --git a/src/promql/src/functions/predict_linear.rs b/src/promql/src/functions/predict_linear.rs index 09a46ed48f..dc49ec5d9f 100644 --- a/src/promql/src/functions/predict_linear.rs +++ b/src/promql/src/functions/predict_linear.rs @@ -17,7 +17,7 @@ use std::sync::Arc; -use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -28,7 +28,7 @@ use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; use crate::error; -use crate::functions::{extract_array, linear_regression}; +use crate::functions::{extract_range_array, linear_regression_slices}; use crate::range_array::RangeArray; pub struct PredictLinear; @@ -62,12 +62,10 @@ impl PredictLinear { DataFusionError::Plan("prom_predict_linear function should have 3 inputs".to_string()), )?; - let ts_array = extract_array(&input[0])?; - let value_array = extract_array(&input[1])?; let t_col = &input[2]; - let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?; - let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?; + let ts_range = extract_range_array(&input[0])?; + let value_range = extract_range_array(&input[1])?; error::ensure( ts_range.len() == value_range.len(), DataFusionError::Execution(format!( @@ -130,74 +128,85 @@ impl PredictLinear { Box::new(t_array.iter()) } }; - let mut result_array = Vec::with_capacity(ts_range.len()); + let all_timestamps = ts_range + .values() + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let all_values = value_range + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); for (index, t) in t_iter.enumerate() { - let (timestamps, values) = get_ts_values(&ts_range, &value_range, index, Self::name())?; - let ret = predict_linear_impl(×tamps, &values, t.unwrap()); - result_array.push(ret); + match predict_linear_impl( + &ts_range, + &value_range, + all_timestamps, + all_values, + index, + t.unwrap(), + Self::name(), + )? { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } -fn get_ts_values( +fn predict_linear_impl( ts_range: &RangeArray, value_range: &RangeArray, + all_timestamps: &[i64], + all_values: &Float64Array, index: usize, + t: i64, func_name: &str, -) -> Result<(TimestampMillisecondArray, Float64Array), DataFusionError> { - let timestamps = ts_range - .get(index) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .clone(); - let values = value_range - .get(index) - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .clone(); +) -> Result, DataFusionError> { + let (ts_offset, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: time and value arrays in a group should have the same length, found {} and {}", - func_name, - timestamps.len(), - values.len() + func_name, ts_len, value_len )), )?; - Ok((timestamps, values)) -} - -fn predict_linear_impl( - timestamps: &TimestampMillisecondArray, - values: &Float64Array, - t: i64, -) -> Option { - if timestamps.len() < 2 { - return None; + if ts_len < 2 { + return Ok(None); } // last timestamp is evaluation timestamp - let evaluate_ts = timestamps.value(timestamps.len() - 1); - let (slope, intercept) = linear_regression(timestamps, values, evaluate_ts); + let evaluate_ts = all_timestamps[ts_offset + ts_len - 1]; + let (slope, intercept) = linear_regression_slices( + all_timestamps, + ts_offset, + all_values, + value_offset, + value_len, + evaluate_ts, + ); if slope.is_none() || intercept.is_none() { - return None; + return Ok(None); } - Some(slope.unwrap() * t as f64 + intercept.unwrap()) + Ok(Some(slope.unwrap() * t as f64 + intercept.unwrap())) } #[cfg(test)] mod test { use std::vec; + use datafusion::arrow::array::{DictionaryArray, Int64Array}; + use datatypes::arrow::datatypes::Int64Type; + use super::*; use crate::functions::test_util::simple_range_udf_runner; @@ -304,4 +313,44 @@ mod test { vec![Some(82765.9090909091)], ); } + + #[test] + fn calculate_predict_linear_with_misaligned_offsets() { + let ts_values = Arc::new(TimestampMillisecondArray::from_iter( + [0i64, 1000, 2000, 3000].into_iter().map(Some), + )); + let value_values = Arc::new(Float64Array::from_iter([10.0, 20.0, 30.0])); + let ts_array = RangeArray::from_ranges(ts_values, [(1, 3)]).unwrap(); + let value_array = RangeArray::from_ranges(value_values, [(0, 3)]).unwrap(); + + simple_range_udf_runner( + PredictLinear::scalar_udf(), + ts_array, + value_array, + vec![ScalarValue::Int64(Some(0))], + vec![Some(30.0)], + ); + } + + #[test] + fn predict_linear_rejects_external_dictionary_with_null_keys() { + let ts_values = Arc::new(TimestampMillisecondArray::from_iter( + [0i64, 1000].into_iter().map(Some), + )); + let ts_keys = Int64Array::from_iter([Some(0), None]); + let ts_dict = DictionaryArray::::try_new(ts_keys, ts_values).unwrap(); + + let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0])); + let value_keys = Int64Array::from_iter([Some(0), Some(1)]); + let value_dict = DictionaryArray::::try_new(value_keys, value_values).unwrap(); + + let err = PredictLinear::predict_linear(&[ + ColumnarValue::Array(Arc::new(ts_dict)), + ColumnarValue::Array(Arc::new(value_dict)), + ColumnarValue::Scalar(ScalarValue::Int64(Some(0))), + ]) + .unwrap_err(); + + assert!(err.to_string().contains("Empty range is not expected")); + } } diff --git a/src/promql/src/functions/quantile.rs b/src/promql/src/functions/quantile.rs index f368d5908c..93fc632d68 100644 --- a/src/promql/src/functions/quantile.rs +++ b/src/promql/src/functions/quantile.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use datafusion::arrow::array::Float64Array; +use datafusion::arrow::array::{Float64Array, Float64Builder}; use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; @@ -93,8 +93,14 @@ impl QuantileOverTime { )), )?; - // calculation - let mut result_array = Vec::with_capacity(ts_range.len()); + let all_values = value_range + .values() + .as_any() + .downcast_ref::() + .unwrap() + .values(); + let mut result_builder = Float64Builder::with_capacity(ts_range.len()); + let mut scratch = Vec::new(); match quantile_col { ColumnarValue::Scalar(quantile_scalar) => { @@ -107,25 +113,26 @@ impl QuantileOverTime { }; for index in 0..ts_range.len() { - let timestamps = ts_range.get(index).unwrap(); - let values = value_range.get(index).unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + let (_, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: time and value arrays in a group should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + ts_len, + value_len )), )?; - let result = quantile_impl(values, quantile); - result_array.push(result); + match quantile_with_scratch( + &all_values[value_offset..value_offset + value_len], + quantile, + &mut scratch, + ) { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } } ColumnarValue::Array(quantile_array) => { @@ -150,20 +157,15 @@ impl QuantileOverTime { )), )?; for index in 0..ts_range.len() { - let timestamps = ts_range.get(index).unwrap(); - let values = value_range.get(index).unwrap(); - let values = values - .as_any() - .downcast_ref::() - .unwrap() - .values(); + let (_, ts_len) = ts_range.get_offset_length(index).unwrap(); + let (value_offset, value_len) = value_range.get_offset_length(index).unwrap(); error::ensure( - timestamps.len() == values.len(), + ts_len == value_len, DataFusionError::Execution(format!( "{}: time and value arrays in a group should have the same length, found {} and {}", Self::name(), - timestamps.len(), - values.len() + ts_len, + value_len )), )?; let quantile = if quantile_array.is_null(index) { @@ -171,19 +173,32 @@ impl QuantileOverTime { } else { quantile_array.value(index) }; - let result = quantile_impl(values, quantile); - result_array.push(result); + match quantile_with_scratch( + &all_values[value_offset..value_offset + value_len], + quantile, + &mut scratch, + ) { + Some(value) => result_builder.append_value(value), + None => result_builder.append_null(), + } } } } - let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + let result = ColumnarValue::Array(Arc::new(result_builder.finish())); Ok(result) } } /// Refer to pub(crate) fn quantile_impl(values: &[f64], quantile: f64) -> Option { + let mut scratch = Vec::new(); + quantile_with_scratch(values, quantile, &mut scratch) +} + +/// Same as [quantile_impl] but reuses a caller-provided scratch buffer to avoid +/// per-call allocation. +fn quantile_with_scratch(values: &[f64], quantile: f64, scratch: &mut Vec) -> Option { if quantile.is_nan() || values.is_empty() { return Some(f64::NAN); } @@ -194,17 +209,18 @@ pub(crate) fn quantile_impl(values: &[f64], quantile: f64) -> Option { return Some(f64::INFINITY); } - let mut values = values.to_vec(); - values.sort_unstable_by(f64::total_cmp); + scratch.clear(); + scratch.extend_from_slice(values); + scratch.sort_unstable_by(f64::total_cmp); - let length = values.len(); + let length = scratch.len(); let rank = quantile * (length - 1) as f64; let lower_index = 0.max(rank.floor() as usize); let upper_index = (length - 1).min(lower_index + 1); let weight = rank - rank.floor(); - let result = values[lower_index] * (1.0 - weight) + values[upper_index] * weight; + let result = scratch[lower_index] * (1.0 - weight) + scratch[upper_index] * weight; Some(result) }