diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index a11cff6081..aa89db6564 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -18,6 +18,7 @@ mod deriv; mod extrapolate_rate; mod holt_winters; mod idelta; +mod predict_linear; mod quantile; mod resets; #[cfg(test)] @@ -28,13 +29,14 @@ pub use aggr_over_time::{ PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime, }; pub use changes::Changes; -use datafusion::arrow::array::ArrayRef; +use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray}; use datafusion::error::DataFusionError; use datafusion::physical_plan::ColumnarValue; pub use deriv::Deriv; pub use extrapolate_rate::{Delta, Increase, Rate}; pub use holt_winters::HoltWinters; pub use idelta::IDelta; +pub use predict_linear::PredictLinear; pub use quantile::QuantileOverTime; pub use resets::Resets; @@ -63,3 +65,170 @@ pub(crate) fn compensated_sum_inc(inc: f64, sum: f64, mut compensation: f64) -> } (new_sum, compensation) } + +/// linear_regression performs a least-square linear regression analysis on the +/// times and values. It return the slope and intercept based on times and values. +/// Prometheus's implementation: https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L793-L837 +pub(crate) fn linear_regression( + times: &TimestampMillisecondArray, + values: &Float64Array, + intercept_time: i64, +) -> (Option, Option) { + let mut count: f64 = 0.0; + let mut sum_x: f64 = 0.0; + let mut sum_y: f64 = 0.0; + let mut sum_xy: f64 = 0.0; + let mut sum_x2: f64 = 0.0; + let mut comp_x: f64 = 0.0; + let mut comp_y: f64 = 0.0; + let mut comp_xy: f64 = 0.0; + let mut comp_x2: f64 = 0.0; + + let mut const_y = true; + let init_y: f64 = values.value(0); + + for (i, value) in values.iter().enumerate() { + let time = times.value(i) as f64; + if value.is_none() { + continue; + } + let value = value.unwrap(); + if const_y && i > 0 && value != init_y { + const_y = false; + } + count += 1.0; + let x = time - intercept_time as f64 / 1e3; + (sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x); + (sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y); + (sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy); + (sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2); + } + + if count < 2.0 { + return (None, None); + } + + if const_y { + if !init_y.is_finite() { + return (None, None); + } + return (Some(0.0), Some(init_y)); + } + + sum_x += comp_x; + sum_y += comp_y; + sum_xy += comp_xy; + sum_x2 += comp_x2; + + let cov_xy = sum_xy - sum_x * sum_y / count; + let var_x = sum_x2 - sum_x * sum_x / count; + + let slope = cov_xy / var_x; + let intercept = sum_y / count - slope * sum_x / count; + + (Some(slope), Some(intercept)) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn calculate_linear_regression_none() { + let ts_array = TimestampMillisecondArray::from_iter( + [ + 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, + ] + .into_iter() + .map(Some), + ); + let values_array = Float64Array::from_iter([ + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + 1.0 / 0.0, + ]); + let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0)); + assert_eq!(slope, None); + assert_eq!(intercept, None); + } + + #[test] + fn calculate_linear_regression_value_is_const() { + let ts_array = TimestampMillisecondArray::from_iter( + [ + 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, + ] + .into_iter() + .map(Some), + ); + let values_array = + Float64Array::from_iter([10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]); + let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0)); + assert_eq!(slope, Some(0.0)); + assert_eq!(intercept, Some(10.0)); + } + + #[test] + fn calculate_linear_regression() { + let ts_array = TimestampMillisecondArray::from_iter( + [ + 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, + ] + .into_iter() + .map(Some), + ); + let values_array = Float64Array::from_iter([ + 0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0, + ]); + let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0)); + assert_eq!(slope, Some(0.010606060606060607)); + assert_eq!(intercept, Some(6.818181818181818)); + } + + #[test] + fn calculate_linear_regression_value_have_none() { + let ts_array = TimestampMillisecondArray::from_iter( + [ + 0i64, 300, 600, 900, 1200, 1350, 1500, 1800, 2100, 2400, 2550, 2700, 3000, + ] + .into_iter() + .map(Some), + ); + let values_array: Float64Array = [ + Some(0.0), + Some(10.0), + Some(20.0), + Some(30.0), + Some(40.0), + None, + Some(0.0), + Some(10.0), + Some(20.0), + Some(30.0), + None, + Some(40.0), + Some(50.0), + ] + .into_iter() + .collect(); + let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0)); + assert_eq!(slope, Some(0.010606060606060607)); + assert_eq!(intercept, Some(6.818181818181818)); + } + + #[test] + fn calculate_linear_regression_value_all_none() { + let ts_array = TimestampMillisecondArray::from_iter([0i64, 300, 600].into_iter().map(Some)); + let values_array: Float64Array = [None, None, None].into_iter().collect(); + let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0)); + assert_eq!(slope, None); + assert_eq!(intercept, None); + } +} diff --git a/src/promql/src/functions/deriv.rs b/src/promql/src/functions/deriv.rs index ae578aefe6..1f4b6597e4 100644 --- a/src/promql/src/functions/deriv.rs +++ b/src/promql/src/functions/deriv.rs @@ -26,7 +26,7 @@ use datafusion::physical_plan::ColumnarValue; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; -use crate::functions::{compensated_sum_inc, extract_array}; +use crate::functions::{extract_array, linear_regression}; use crate::range_array::RangeArray; #[range_fn(name = "Deriv", ret = "Float64Array", display_name = "prom_drive")] @@ -40,62 +40,6 @@ pub fn drive(times: &TimestampMillisecondArray, values: &Float64Array) -> Option } } -/// linear_regression performs a least-square linear regression analysis on the -/// times and values. It return the slope and intercept based on times and values. -/// Prometheus's implementation: https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L793-L837 -fn linear_regression( - times: &TimestampMillisecondArray, - values: &Float64Array, - intercept_time: i64, -) -> (Option, Option) { - let mut count: f64 = 0.0; - let mut sum_x: f64 = 0.0; - let mut sum_y: f64 = 0.0; - let mut sum_xy: f64 = 0.0; - let mut sum_x2: f64 = 0.0; - let mut comp_x: f64 = 0.0; - let mut comp_y: f64 = 0.0; - let mut comp_xy: f64 = 0.0; - let mut comp_x2: f64 = 0.0; - - let mut const_y = true; - let init_y: f64 = values.value(0); - - for (i, value) in values.iter().enumerate() { - let time = times.value(i) as f64; - let value = value.unwrap(); - if const_y && i > 0 && value != init_y { - const_y = false; - } - count += 1.0; - let x = time - intercept_time as f64 / 1e3; - (sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x); - (sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y); - (sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy); - (sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2); - } - - if const_y { - if init_y.is_finite() { - return (None, None); - } - return (Some(0.0), Some(init_y)); - } - - sum_x += comp_x; - sum_y += comp_y; - sum_xy += comp_xy; - sum_x2 += comp_x2; - - let cov_xy = sum_xy - sum_x * sum_y / count; - let var_x = sum_x2 - sum_x * sum_x / count; - - let slope = cov_xy / var_x; - let intercept = sum_y / count - slope * sum_x / count; - - (Some(slope), Some(intercept)) -} - #[cfg(test)] mod test { use super::*; diff --git a/src/promql/src/functions/predict_linear.rs b/src/promql/src/functions/predict_linear.rs new file mode 100644 index 0000000000..39be3225a5 --- /dev/null +++ b/src/promql/src/functions/predict_linear.rs @@ -0,0 +1,264 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Implementation of [`predict_linear`](https://prometheus.io/docs/prometheus/latest/querying/functions/#predict_linear) in PromQL. Refer to the [original +//! implementation](https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L859-L872). + +use std::sync::Arc; + +use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; +use datafusion::arrow::datatypes::TimeUnit; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::physical_plan::ColumnarValue; +use datatypes::arrow::array::Array; +use datatypes::arrow::datatypes::DataType; + +use crate::error; +use crate::functions::{extract_array, linear_regression}; +use crate::range_array::RangeArray; + +pub struct PredictLinear { + t: i64, +} + +impl PredictLinear { + fn new(t: i64) -> Self { + Self { t } + } + + pub const fn name() -> &'static str { + "prom_predict_linear" + } + + pub fn scalar_udf(t: i64) -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(move |input| Self::new(t).calc(input)), + } + } + + // time index column and value column + fn input_type() -> Vec { + vec![ + RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), + RangeArray::convert_data_type(DataType::Float64), + ] + } + + fn return_type() -> DataType { + DataType::Float64 + } + + fn calc(&self, input: &[ColumnarValue]) -> Result { + // construct matrix from input. + assert_eq!(input.len(), 2); + let ts_array = extract_array(&input[0])?; + let value_array = extract_array(&input[1])?; + + let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?; + let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?; + error::ensure( + ts_range.len() == value_range.len(), + DataFusionError::Execution(format!( + "{}: input arrays should have the same length, found {} and {}", + Self::name(), + ts_range.len(), + value_range.len() + )), + )?; + error::ensure( + ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None), + DataFusionError::Execution(format!( + "{}: expect TimestampMillisecond as time index array's type, found {}", + Self::name(), + ts_range.value_type() + )), + )?; + error::ensure( + value_range.value_type() == DataType::Float64, + DataFusionError::Execution(format!( + "{}: expect Float64 as value array's type, found {}", + Self::name(), + value_range.value_type() + )), + )?; + + // calculation + let mut result_array = Vec::with_capacity(ts_range.len()); + + for index in 0..ts_range.len() { + let timestamps = ts_range + .get(index) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + let values = value_range + .get(index) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + error::ensure( + timestamps.len() == values.len(), + DataFusionError::Execution(format!( + "{}: input arrays should have the same length, found {} and {}", + Self::name(), + timestamps.len(), + values.len() + )), + )?; + + let ret = predict_linear_impl(×tamps, &values, self.t); + + result_array.push(ret); + } + + let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array))); + Ok(result) + } +} + +fn predict_linear_impl( + timestamps: &TimestampMillisecondArray, + values: &Float64Array, + t: i64, +) -> Option { + if timestamps.len() < 2 { + return None; + } + + let intercept_time = timestamps.value(0); + let (slope, intercept) = linear_regression(timestamps, values, intercept_time); + + if slope.is_none() || intercept.is_none() { + return None; + } + + Some(slope.unwrap() * t as f64 + intercept.unwrap()) +} + +#[cfg(test)] +mod test { + use std::vec; + + use super::*; + use crate::functions::test_util::simple_range_udf_runner; + + // build timestamp range and value range arrays for test + fn build_test_range_arrays() -> (RangeArray, RangeArray) { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [ + 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000, + ] + .into_iter() + .map(Some), + )); + let ranges = [(0, 11)]; + + let values_array = Arc::new(Float64Array::from_iter([ + 0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0, + ])); + + let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + + (ts_range_array, value_range_array) + } + + #[test] + fn calculate_predict_linear_none() { + let ts_array = Arc::new(TimestampMillisecondArray::from_iter( + [0i64].into_iter().map(Some), + )); + let ranges = [(0, 0), (0, 1)]; + let values_array = Arc::new(Float64Array::from_iter([0.0])); + let ts_array = RangeArray::from_ranges(ts_array, ranges).unwrap(); + let value_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + simple_range_udf_runner( + PredictLinear::scalar_udf(0), + ts_array, + value_array, + vec![None, None], + ); + } + + #[test] + fn calculate_predict_linear_test1() { + let (ts_array, value_array) = build_test_range_arrays(); + simple_range_udf_runner( + PredictLinear::scalar_udf(0), + ts_array, + value_array, + // value at t = 0 + vec![Some(6.818181818181818)], + ); + } + + #[test] + fn calculate_predict_linear_test2() { + let (ts_array, value_array) = build_test_range_arrays(); + simple_range_udf_runner( + PredictLinear::scalar_udf(3000), + ts_array, + value_array, + // value at t = 3000 + vec![Some(38.63636363636364)], + ); + } + + #[test] + fn calculate_predict_linear_test3() { + let (ts_array, value_array) = build_test_range_arrays(); + simple_range_udf_runner( + PredictLinear::scalar_udf(4200), + ts_array, + value_array, + // value at t = 4200 + vec![Some(51.36363636363637)], + ); + } + + #[test] + fn calculate_predict_linear_test4() { + let (ts_array, value_array) = build_test_range_arrays(); + simple_range_udf_runner( + PredictLinear::scalar_udf(6600), + ts_array, + value_array, + // value at t = 6600 + vec![Some(76.81818181818181)], + ); + } + + #[test] + fn calculate_predict_linear_test5() { + let (ts_array, value_array) = build_test_range_arrays(); + simple_range_udf_runner( + PredictLinear::scalar_udf(7800), + ts_array, + value_array, + // value at t = 7800 + vec![Some(89.54545454545455)], + ); + } +} diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index b75fdad169..358bc54103 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -52,8 +52,8 @@ use crate::extension_plan::{ }; use crate::functions::{ AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, HoltWinters, IDelta, Increase, - LastOverTime, MaxOverTime, MinOverTime, PresentOverTime, QuantileOverTime, Rate, Resets, - StddevOverTime, StdvarOverTime, SumOverTime, + LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime, QuantileOverTime, Rate, + Resets, StddevOverTime, StdvarOverTime, SumOverTime, }; const LEFT_PLAN_JOIN_ALIAS: &str = "lhs"; @@ -796,6 +796,16 @@ impl PromPlanner { }; ScalarFunc::Udf(QuantileOverTime::scalar_udf(quantile_expr)) } + "predict_linear" => { + let t_expr = match other_input_exprs.get(0) { + Some(DfExpr::Literal(ScalarValue::Time64Microsecond(Some(t)))) => *t, + other => UnexpectedPlanExprSnafu { + desc: format!("expect i64 literal as t, but found {:?}", other), + } + .fail()?, + }; + ScalarFunc::Udf(PredictLinear::scalar_udf(t_expr)) + } "holt_winters" => { let sf_exp = match other_input_exprs.get(0) { Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => *sf,