From b476584f56df1a3d18ace00835451755a2b3f6d7 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 24 Apr 2025 15:17:10 +0800 Subject: [PATCH] feat: remove hyper parameter from promql functions (#5955) * quantile udaf Signed-off-by: Ruihang Xia * extrapolate rate Signed-off-by: Ruihang Xia * predict_linear, round, holt_winters, quantile_overtime Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * fix quantile function Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/promql/src/functions.rs | 12 +- src/promql/src/functions/aggr_over_time.rs | 12 ++ src/promql/src/functions/changes.rs | 3 + src/promql/src/functions/deriv.rs | 2 + src/promql/src/functions/extrapolate_rate.rs | 51 ++++++-- src/promql/src/functions/holt_winters.rs | 48 +++++++- src/promql/src/functions/idelta.rs | 2 + src/promql/src/functions/predict_linear.rs | 41 +++++-- src/promql/src/functions/quantile.rs | 21 +++- src/promql/src/functions/quantile_aggr.rs | 80 +++++++++---- src/promql/src/functions/resets.rs | 3 + src/promql/src/functions/round.rs | 30 ++++- src/promql/src/functions/test_util.rs | 9 +- src/query/src/promql/planner.rs | 112 ++++++------------ .../standalone/common/promql/quantile.result | 56 ++++----- .../standalone/common/promql/round_fn.result | 80 ++++++------- .../common/promql/simple_histogram.result | 42 +++---- .../standalone/common/promql/subquery.result | 20 ++-- .../common/tql-explain-analyze/analyze.result | 4 +- 19 files changed, 383 insertions(+), 245 deletions(-) diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index dade00ea7b..81a9c9cedb 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -44,13 +44,13 @@ pub use quantile_aggr::quantile_udaf; pub use resets::Resets; pub use round::Round; +/// Extracts an array from a `ColumnarValue`. +/// +/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1. pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result { - if let ColumnarValue::Array(array) = columnar_value { - Ok(array.clone()) - } else { - Err(DataFusionError::Execution( - "expect array as input, found scalar value".to_string(), - )) + match columnar_value { + ColumnarValue::Array(array) => Ok(array.clone()), + ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?), } } diff --git a/src/promql/src/functions/aggr_over_time.rs b/src/promql/src/functions/aggr_over_time.rs index 298959ef35..841f28e0df 100644 --- a/src/promql/src/functions/aggr_over_time.rs +++ b/src/promql/src/functions/aggr_over_time.rs @@ -231,6 +231,7 @@ mod test { AvgOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(49.9999995), Some(45.8618844), @@ -253,6 +254,7 @@ mod test { MinOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(12.345678), Some(12.345678), @@ -275,6 +277,7 @@ mod test { MaxOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(87.654321), Some(87.654321), @@ -297,6 +300,7 @@ mod test { SumOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(99.999999), Some(229.309422), @@ -319,6 +323,7 @@ mod test { CountOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(2.0), Some(5.0), @@ -341,6 +346,7 @@ mod test { LastOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(87.654321), Some(70.710678), @@ -363,6 +369,7 @@ mod test { AbsentOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ None, None, @@ -385,6 +392,7 @@ mod test { PresentOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(1.0), Some(1.0), @@ -407,6 +415,7 @@ mod test { StdvarOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(1417.8479276253622), Some(808.999919713209), @@ -442,6 +451,7 @@ mod test { StdvarOverTime::scalar_udf(), RangeArray::from_ranges(ts_array, ranges).unwrap(), RangeArray::from_ranges(values_array, ranges).unwrap(), + vec![], vec![Some(0.0), Some(10.559999999999999)], ); } @@ -453,6 +463,7 @@ mod test { StddevOverTime::scalar_udf(), ts_array, value_array, + vec![], vec![ Some(37.6543215), Some(28.442923895289123), @@ -488,6 +499,7 @@ mod test { StddevOverTime::scalar_udf(), RangeArray::from_ranges(ts_array, ranges).unwrap(), RangeArray::from_ranges(values_array, ranges).unwrap(), + vec![], vec![Some(0.0), Some(3.249615361854384)], ); } diff --git a/src/promql/src/functions/changes.rs b/src/promql/src/functions/changes.rs index 743f941652..21819436e6 100644 --- a/src/promql/src/functions/changes.rs +++ b/src/promql/src/functions/changes.rs @@ -90,6 +90,7 @@ mod test { Changes::scalar_udf(), ts_array_1, value_array_1, + vec![], vec![Some(0.0), Some(3.0), Some(5.0), Some(8.0), None], ); @@ -101,6 +102,7 @@ mod test { Changes::scalar_udf(), ts_array_2, value_array_2, + vec![], vec![Some(0.0), Some(3.0), Some(5.0), Some(9.0), None], ); @@ -111,6 +113,7 @@ mod test { Changes::scalar_udf(), ts_array_3, value_array_3, + vec![], vec![Some(0.0), Some(0.0), Some(1.0), Some(1.0), None], ); } diff --git a/src/promql/src/functions/deriv.rs b/src/promql/src/functions/deriv.rs index 90b09f0d40..49e6718911 100644 --- a/src/promql/src/functions/deriv.rs +++ b/src/promql/src/functions/deriv.rs @@ -74,6 +74,7 @@ mod test { Deriv::scalar_udf(), ts_array, value_array, + vec![], vec![Some(10.606060606060607), None], ); } @@ -99,6 +100,7 @@ mod test { Deriv::scalar_udf(), ts_range_array, value_range_array, + vec![], vec![Some(0.0)], ); } diff --git a/src/promql/src/functions/extrapolate_rate.rs b/src/promql/src/functions/extrapolate_rate.rs index 8977eaf083..bdcd6a2b1b 100644 --- a/src/promql/src/functions/extrapolate_rate.rs +++ b/src/promql/src/functions/extrapolate_rate.rs @@ -34,11 +34,11 @@ use std::sync::Arc; use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray}; use datafusion::arrow::datatypes::TimeUnit; -use datafusion::common::DataFusionError; +use datafusion::common::{DataFusionError, Result as DfResult}; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; use datafusion_expr::create_udf; -use datatypes::arrow::array::Array; +use datatypes::arrow::array::{Array, Int64Array}; use datatypes::arrow::datatypes::DataType; use crate::extension_plan::Millisecond; @@ -53,7 +53,7 @@ pub type Increase = ExtrapolatedRate; /// from #[derive(Debug)] pub struct ExtrapolatedRate { - /// Range duration in millisecond + /// Range length in milliseconds. range_length: i64, } @@ -63,7 +63,7 @@ impl ExtrapolatedRate ScalarUDF { + fn scalar_udf_with_name(name: &str) -> ScalarUDF { let input_types = vec![ // timestamp range vector RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), @@ -71,6 +71,8 @@ impl ExtrapolatedRate ExtrapolatedRate Result { - assert_eq!(input.len(), 3); + fn create_function(inputs: &[ColumnarValue]) -> DfResult { + if inputs.len() != 4 { + return Err(DataFusionError::Plan( + "ExtrapolatedRate function should have 4 inputs".to_string(), + )); + } + + let range_length_array = extract_array(&inputs[3])?; + let range_length = range_length_array + .as_any() + .downcast_ref::() + .unwrap() + .value(0) as i64; + + Ok(Self::new(range_length)) + } + + /// Input parameters: + /// * 0: timestamp range vector + /// * 1: value range vector + /// * 2: timestamp vector + /// * 3: range length. Range duration in millisecond. Not used here + fn calc(&self, input: &[ColumnarValue]) -> DfResult { + assert_eq!(input.len(), 4); // construct matrix from input let ts_array = extract_array(&input[0])?; @@ -208,8 +232,8 @@ impl ExtrapolatedRate { "prom_delta" } - pub fn scalar_udf(range_length: i64) -> ScalarUDF { - Self::scalar_udf_with_name(Self::name(), range_length) + pub fn scalar_udf() -> ScalarUDF { + Self::scalar_udf_with_name(Self::name()) } } @@ -219,8 +243,8 @@ impl ExtrapolatedRate { "prom_rate" } - pub fn scalar_udf(range_length: i64) -> ScalarUDF { - Self::scalar_udf_with_name(Self::name(), range_length) + pub fn scalar_udf() -> ScalarUDF { + Self::scalar_udf_with_name(Self::name()) } } @@ -230,8 +254,8 @@ impl ExtrapolatedRate { "prom_increase" } - pub fn scalar_udf(range_length: i64) -> ScalarUDF { - Self::scalar_udf_with_name(Self::name(), range_length) + pub fn scalar_udf() -> ScalarUDF { + Self::scalar_udf_with_name(Self::name()) } } @@ -271,6 +295,7 @@ mod test { ColumnarValue::Array(Arc::new(ts_range.into_dict())), ColumnarValue::Array(Arc::new(value_range.into_dict())), ColumnarValue::Array(timestamps), + ColumnarValue::Array(Arc::new(Int64Array::from(vec![5]))), ]; let output = extract_array( &ExtrapolatedRate::::new(5) diff --git a/src/promql/src/functions/holt_winters.rs b/src/promql/src/functions/holt_winters.rs index 3f26abffb1..8e722c8651 100644 --- a/src/promql/src/functions/holt_winters.rs +++ b/src/promql/src/functions/holt_winters.rs @@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_common::ScalarValue; use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -62,6 +63,10 @@ impl HoltWinters { vec![ RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), RangeArray::convert_data_type(DataType::Float64), + // sf + DataType::Float64, + // tf + DataType::Float64, ] } @@ -69,20 +74,39 @@ impl HoltWinters { DataType::Float64 } - pub fn scalar_udf(level: f64, trend: f64) -> ScalarUDF { + pub fn scalar_udf() -> ScalarUDF { create_udf( Self::name(), Self::input_type(), Self::return_type(), Volatility::Volatile, - Arc::new(move |input: &_| Self::new(level, trend).calc(input)) as _, + Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _, ) } + fn create_function(inputs: &[ColumnarValue]) -> Result { + if inputs.len() != 4 { + return Err(DataFusionError::Plan( + "HoltWinters function should have 4 inputs".to_string(), + )); + } + let ColumnarValue::Scalar(ScalarValue::Float64(Some(sf))) = inputs[2] else { + return Err(DataFusionError::Plan( + "HoltWinters function's third input should be a scalar float64".to_string(), + )); + }; + let ColumnarValue::Scalar(ScalarValue::Float64(Some(tf))) = inputs[3] else { + return Err(DataFusionError::Plan( + "HoltWinters function's fourth input should be a scalar float64".to_string(), + )); + }; + Ok(Self::new(sf, tf)) + } + fn calc(&self, input: &[ColumnarValue]) -> Result { // construct matrix from input. // The third one is level param, the fourth - trend param which are included in fields. - assert_eq!(input.len(), 2); + assert_eq!(input.len(), 4); let ts_array = extract_array(&input[0])?; let value_array = extract_array(&input[1])?; @@ -264,9 +288,13 @@ mod tests { let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap(); let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); simple_range_udf_runner( - HoltWinters::scalar_udf(0.5, 0.1), + HoltWinters::scalar_udf(), ts_range_array, value_range_array, + vec![ + ScalarValue::Float64(Some(0.5)), + ScalarValue::Float64(Some(0.1)), + ], vec![Some(5.0)], ); } @@ -287,9 +315,13 @@ mod tests { let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap(); let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); simple_range_udf_runner( - HoltWinters::scalar_udf(0.5, 0.1), + HoltWinters::scalar_udf(), ts_range_array, value_range_array, + vec![ + ScalarValue::Float64(Some(0.5)), + ScalarValue::Float64(Some(0.1)), + ], vec![Some(38.18119566835938)], ); } @@ -315,9 +347,13 @@ mod tests { let (ts_range_array, value_range_array) = create_ts_and_value_range_arrays(query, ranges.clone()); simple_range_udf_runner( - HoltWinters::scalar_udf(0.01, 0.1), + HoltWinters::scalar_udf(), ts_range_array, value_range_array, + vec![ + ScalarValue::Float64(Some(0.01)), + ScalarValue::Float64(Some(0.1)), + ], vec![Some(expected)], ); } diff --git a/src/promql/src/functions/idelta.rs b/src/promql/src/functions/idelta.rs index c5d1897a3e..a70a1dee3c 100644 --- a/src/promql/src/functions/idelta.rs +++ b/src/promql/src/functions/idelta.rs @@ -190,6 +190,7 @@ mod test { IDelta::::scalar_udf(), ts_range_array, value_range_array, + vec![], vec![Some(1.0), Some(-5.0), None, Some(6.0), None, None], ); @@ -200,6 +201,7 @@ mod test { IDelta::::scalar_udf(), ts_range_array, value_range_array, + vec![], // the second point represent counter reset vec![Some(0.5), Some(0.0), None, Some(3.0), None, None], ); diff --git a/src/promql/src/functions/predict_linear.rs b/src/promql/src/functions/predict_linear.rs index 4b945cabbb..d3c1e8214c 100644 --- a/src/promql/src/functions/predict_linear.rs +++ b/src/promql/src/functions/predict_linear.rs @@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_common::ScalarValue; use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -44,25 +45,41 @@ impl PredictLinear { "prom_predict_linear" } - pub fn scalar_udf(t: i64) -> ScalarUDF { + pub fn scalar_udf() -> ScalarUDF { let input_types = vec![ // time index column RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), // value column RangeArray::convert_data_type(DataType::Float64), + // t + DataType::Int64, ]; create_udf( Self::name(), input_types, DataType::Float64, Volatility::Volatile, - Arc::new(move |input: &_| Self::new(t).predict_linear(input)) as _, + Arc::new(move |input: &_| Self::create_function(input)?.predict_linear(input)) as _, ) } + fn create_function(inputs: &[ColumnarValue]) -> Result { + if inputs.len() != 3 { + return Err(DataFusionError::Plan( + "PredictLinear function should have 3 inputs".to_string(), + )); + } + let ColumnarValue::Scalar(ScalarValue::Int64(Some(t))) = inputs[2] else { + return Err(DataFusionError::Plan( + "PredictLinear function's third input should be a scalar int64".to_string(), + )); + }; + Ok(Self::new(t)) + } + fn predict_linear(&self, input: &[ColumnarValue]) -> Result { // construct matrix from input. - assert_eq!(input.len(), 2); + assert_eq!(input.len(), 3); let ts_array = extract_array(&input[0])?; let value_array = extract_array(&input[1])?; @@ -190,9 +207,10 @@ mod test { let ts_array = RangeArray::from_ranges(ts_array, ranges).unwrap(); let value_array = RangeArray::from_ranges(values_array, ranges).unwrap(); simple_range_udf_runner( - PredictLinear::scalar_udf(0), + PredictLinear::scalar_udf(), ts_array, value_array, + vec![ScalarValue::Int64(Some(0))], vec![None, None], ); } @@ -201,9 +219,10 @@ mod test { fn calculate_predict_linear_test1() { let (ts_array, value_array) = build_test_range_arrays(); simple_range_udf_runner( - PredictLinear::scalar_udf(0), + PredictLinear::scalar_udf(), ts_array, value_array, + vec![ScalarValue::Int64(Some(0))], // value at t = 0 vec![Some(38.63636363636364)], ); @@ -213,9 +232,10 @@ mod test { fn calculate_predict_linear_test2() { let (ts_array, value_array) = build_test_range_arrays(); simple_range_udf_runner( - PredictLinear::scalar_udf(3000), + PredictLinear::scalar_udf(), ts_array, value_array, + vec![ScalarValue::Int64(Some(3000))], // value at t = 3000 vec![Some(31856.818181818187)], ); @@ -225,9 +245,10 @@ mod test { fn calculate_predict_linear_test3() { let (ts_array, value_array) = build_test_range_arrays(); simple_range_udf_runner( - PredictLinear::scalar_udf(4200), + PredictLinear::scalar_udf(), ts_array, value_array, + vec![ScalarValue::Int64(Some(4200))], // value at t = 4200 vec![Some(44584.09090909091)], ); @@ -237,9 +258,10 @@ mod test { fn calculate_predict_linear_test4() { let (ts_array, value_array) = build_test_range_arrays(); simple_range_udf_runner( - PredictLinear::scalar_udf(6600), + PredictLinear::scalar_udf(), ts_array, value_array, + vec![ScalarValue::Int64(Some(6600))], // value at t = 6600 vec![Some(70038.63636363638)], ); @@ -249,9 +271,10 @@ mod test { fn calculate_predict_linear_test5() { let (ts_array, value_array) = build_test_range_arrays(); simple_range_udf_runner( - PredictLinear::scalar_udf(7800), + PredictLinear::scalar_udf(), ts_array, value_array, + vec![ScalarValue::Int64(Some(7800))], // value at t = 7800 vec![Some(82765.9090909091)], ); diff --git a/src/promql/src/functions/quantile.rs b/src/promql/src/functions/quantile.rs index f975a76cf4..7fd553287d 100644 --- a/src/promql/src/functions/quantile.rs +++ b/src/promql/src/functions/quantile.rs @@ -19,6 +19,7 @@ use datafusion::arrow::datatypes::TimeUnit; use datafusion::common::DataFusionError; use datafusion::logical_expr::{ScalarUDF, Volatility}; use datafusion::physical_plan::ColumnarValue; +use datafusion_common::ScalarValue; use datafusion_expr::create_udf; use datatypes::arrow::array::Array; use datatypes::arrow::datatypes::DataType; @@ -40,22 +41,38 @@ impl QuantileOverTime { "prom_quantile_over_time" } - pub fn scalar_udf(quantile: f64) -> ScalarUDF { + pub fn scalar_udf() -> ScalarUDF { let input_types = vec![ // time index column RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)), // value column RangeArray::convert_data_type(DataType::Float64), + // quantile + DataType::Float64, ]; create_udf( Self::name(), input_types, DataType::Float64, Volatility::Volatile, - Arc::new(move |input: &_| Self::new(quantile).quantile_over_time(input)) as _, + Arc::new(move |input: &_| Self::create_function(input)?.quantile_over_time(input)) as _, ) } + fn create_function(inputs: &[ColumnarValue]) -> Result { + if inputs.len() != 3 { + return Err(DataFusionError::Plan( + "QuantileOverTime function should have 3 inputs".to_string(), + )); + } + let ColumnarValue::Scalar(ScalarValue::Float64(Some(quantile))) = inputs[2] else { + return Err(DataFusionError::Plan( + "QuantileOverTime function's third input should be a scalar float64".to_string(), + )); + }; + Ok(Self::new(quantile)) + } + fn quantile_over_time( &self, input: &[ColumnarValue], diff --git a/src/promql/src/functions/quantile_aggr.rs b/src/promql/src/functions/quantile_aggr.rs index 2f8d9edd9d..08d18c8c4f 100644 --- a/src/promql/src/functions/quantile_aggr.rs +++ b/src/promql/src/functions/quantile_aggr.rs @@ -16,10 +16,12 @@ use std::sync::Arc; use datafusion::arrow::array::{ArrayRef, AsArray}; use datafusion::common::cast::{as_list_array, as_primitive_array, as_struct_array}; -use datafusion::error::Result as DfResult; +use datafusion::error::{DataFusionError, Result as DfResult}; use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility}; +use datafusion::physical_plan::expressions::Literal; use datafusion::prelude::create_udaf; use datafusion_common::ScalarValue; +use datafusion_expr::function::AccumulatorArgs; use datatypes::arrow::array::{ListArray, StructArray}; use datatypes::arrow::datatypes::{DataType, Field, Float64Type}; @@ -38,16 +40,16 @@ pub struct QuantileAccumulator { /// Create a quantile `AggregateUDF` for PromQL quantile operator, /// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions -pub fn quantile_udaf(q: f64) -> Arc { +pub fn quantile_udaf() -> Arc { Arc::new(create_udaf( QUANTILE_NAME, - // Input type: (values) - vec![DataType::Float64], + // Input type: (φ, values) + vec![DataType::Float64, DataType::Float64], // Output type: the φ-quantile Arc::new(DataType::Float64), Volatility::Volatile, // Create the accumulator - Arc::new(move |_| Ok(Box::new(QuantileAccumulator::new(q)))), + Arc::new(QuantileAccumulator::from_args), // Intermediate state types Arc::new(vec![DataType::Struct( vec![Field::new( @@ -65,17 +67,40 @@ pub fn quantile_udaf(q: f64) -> Arc { } impl QuantileAccumulator { - pub fn new(q: f64) -> Self { + fn new(q: f64) -> Self { Self { q, ..Default::default() } } + + pub fn from_args(args: AccumulatorArgs) -> DfResult> { + if args.exprs.len() != 2 { + return Err(DataFusionError::Plan( + "Quantile function should have 2 inputs".to_string(), + )); + } + + let q = match &args.exprs[0] + .as_any() + .downcast_ref::() + .map(|lit| lit.value()) + { + Some(ScalarValue::Float64(Some(q))) => *q, + _ => { + return Err(DataFusionError::Internal( + "Invalid quantile value".to_string(), + )) + } + }; + + Ok(Box::new(Self::new(q))) + } } impl DfAccumulator for QuantileAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> { - let f64_array = values[0].as_primitive::(); + let f64_array = values[1].as_primitive::(); self.values.extend(f64_array); @@ -162,9 +187,10 @@ mod tests { #[test] fn test_quantile_accumulator_single_value() { let mut accumulator = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input = create_f64_array(vec![Some(10.0)]); - accumulator.update_batch(&[input]).unwrap(); + accumulator.update_batch(&[q, input]).unwrap(); let result = accumulator.evaluate().unwrap(); assert_eq!(result, ScalarValue::Float64(Some(10.0))); @@ -173,9 +199,10 @@ mod tests { #[test] fn test_quantile_accumulator_multiple_values() { let mut accumulator = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]); - accumulator.update_batch(&[input]).unwrap(); + accumulator.update_batch(&[q, input]).unwrap(); let result = accumulator.evaluate().unwrap(); assert_eq!(result, ScalarValue::Float64(Some(3.0))); @@ -184,9 +211,10 @@ mod tests { #[test] fn test_quantile_accumulator_with_nulls() { let mut accumulator = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input = create_f64_array(vec![Some(1.0), None, Some(3.0), Some(4.0), Some(5.0)]); - accumulator.update_batch(&[input]).unwrap(); + accumulator.update_batch(&[q, input]).unwrap(); let result = accumulator.evaluate().unwrap(); assert_eq!(result, ScalarValue::Float64(Some(3.0))); @@ -195,11 +223,12 @@ mod tests { #[test] fn test_quantile_accumulator_multiple_batches() { let mut accumulator = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]); let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]); - accumulator.update_batch(&[input1]).unwrap(); - accumulator.update_batch(&[input2]).unwrap(); + accumulator.update_batch(&[q.clone(), input1]).unwrap(); + accumulator.update_batch(&[q, input2]).unwrap(); let result = accumulator.evaluate().unwrap(); assert_eq!(result, ScalarValue::Float64(Some(3.0))); @@ -208,29 +237,33 @@ mod tests { #[test] fn test_quantile_accumulator_different_quantiles() { let mut min_accumulator = QuantileAccumulator::new(0.0); + let q = create_f64_array(vec![Some(0.0)]); let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]); - min_accumulator.update_batch(&[input.clone()]).unwrap(); + min_accumulator.update_batch(&[q, input.clone()]).unwrap(); assert_eq!( min_accumulator.evaluate().unwrap(), ScalarValue::Float64(Some(1.0)) ); let mut q1_accumulator = QuantileAccumulator::new(0.25); - q1_accumulator.update_batch(&[input.clone()]).unwrap(); + let q = create_f64_array(vec![Some(0.25)]); + q1_accumulator.update_batch(&[q, input.clone()]).unwrap(); assert_eq!( q1_accumulator.evaluate().unwrap(), ScalarValue::Float64(Some(2.0)) ); let mut q3_accumulator = QuantileAccumulator::new(0.75); - q3_accumulator.update_batch(&[input.clone()]).unwrap(); + let q = create_f64_array(vec![Some(0.75)]); + q3_accumulator.update_batch(&[q, input.clone()]).unwrap(); assert_eq!( q3_accumulator.evaluate().unwrap(), ScalarValue::Float64(Some(4.0)) ); let mut max_accumulator = QuantileAccumulator::new(1.0); - max_accumulator.update_batch(&[input]).unwrap(); + let q = create_f64_array(vec![Some(1.0)]); + max_accumulator.update_batch(&[q, input]).unwrap(); assert_eq!( max_accumulator.evaluate().unwrap(), ScalarValue::Float64(Some(5.0)) @@ -240,10 +273,11 @@ mod tests { #[test] fn test_quantile_accumulator_size() { let mut accumulator = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0)]); let initial_size = accumulator.size(); - accumulator.update_batch(&[input]).unwrap(); + accumulator.update_batch(&[q, input]).unwrap(); let after_update_size = accumulator.size(); assert!(after_update_size >= initial_size); @@ -252,14 +286,16 @@ mod tests { #[test] fn test_quantile_accumulator_state_and_merge() -> DfResult<()> { let mut acc1 = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]); - acc1.update_batch(&[input1])?; + acc1.update_batch(&[q, input1])?; let state1 = acc1.state()?; let mut acc2 = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]); - acc2.update_batch(&[input2])?; + acc2.update_batch(&[q, input2])?; let mut struct_builders = vec![]; for scalar in &state1 { @@ -280,16 +316,16 @@ mod tests { #[test] fn test_quantile_accumulator_with_extreme_values() { let mut accumulator = QuantileAccumulator::new(0.5); + let q = create_f64_array(vec![Some(0.5)]); let input = create_f64_array(vec![Some(f64::MAX), Some(f64::MIN), Some(0.0)]); - accumulator.update_batch(&[input]).unwrap(); + accumulator.update_batch(&[q, input]).unwrap(); let _result = accumulator.evaluate().unwrap(); } #[test] fn test_quantile_udaf_creation() { - let q = 0.5; - let udaf = quantile_udaf(q); + let udaf = quantile_udaf(); assert_eq!(udaf.name(), QUANTILE_NAME); assert_eq!(udaf.return_type(&[]).unwrap(), DataType::Float64); diff --git a/src/promql/src/functions/resets.rs b/src/promql/src/functions/resets.rs index 7df44b5e76..05d091db0d 100644 --- a/src/promql/src/functions/resets.rs +++ b/src/promql/src/functions/resets.rs @@ -90,6 +90,7 @@ mod test { Resets::scalar_udf(), ts_array_1, value_array_1, + vec![], vec![Some(0.0), Some(1.0), Some(2.0), Some(3.0), None], ); @@ -101,6 +102,7 @@ mod test { Resets::scalar_udf(), ts_array_2, value_array_2, + vec![], vec![Some(0.0), Some(0.0), Some(1.0), Some(1.0), None], ); @@ -111,6 +113,7 @@ mod test { Resets::scalar_udf(), ts_array_3, value_array_3, + vec![], vec![Some(0.0), Some(0.0), Some(0.0), Some(0.0), None], ); } diff --git a/src/promql/src/functions/round.rs b/src/promql/src/functions/round.rs index d1c9d318d8..c3931c5424 100644 --- a/src/promql/src/functions/round.rs +++ b/src/promql/src/functions/round.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use datafusion::error::DataFusionError; +use datafusion_common::ScalarValue; use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility}; use datatypes::arrow::array::AsArray; use datatypes::arrow::datatypes::{DataType, Float64Type}; @@ -36,25 +37,39 @@ impl Round { } fn input_type() -> Vec { - vec![DataType::Float64] + vec![DataType::Float64, DataType::Float64] } pub fn return_type() -> DataType { DataType::Float64 } - pub fn scalar_udf(nearest: f64) -> ScalarUDF { + pub fn scalar_udf() -> ScalarUDF { create_udf( Self::name(), Self::input_type(), Self::return_type(), Volatility::Volatile, - Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _, + Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _, ) } + fn create_function(inputs: &[ColumnarValue]) -> Result { + if inputs.len() != 2 { + return Err(DataFusionError::Plan( + "Round function should have 2 inputs".to_string(), + )); + } + let ColumnarValue::Scalar(ScalarValue::Float64(Some(nearest))) = inputs[1] else { + return Err(DataFusionError::Plan( + "Round function's second input should be a scalar float64".to_string(), + )); + }; + Ok(Self::new(nearest)) + } + fn calc(&self, input: &[ColumnarValue]) -> Result { - assert_eq!(input.len(), 1); + assert_eq!(input.len(), 2); let value_array = extract_array(&input[0])?; @@ -80,8 +95,11 @@ mod tests { use super::*; fn test_round_f64(value: Vec, nearest: f64, expected: Vec) { - let round_udf = Round::scalar_udf(nearest); - let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))]; + let round_udf = Round::scalar_udf(); + let input = vec![ + ColumnarValue::Array(Arc::new(Float64Array::from(value))), + ColumnarValue::Scalar(ScalarValue::Float64(Some(nearest))), + ]; let args = ScalarFunctionArgs { args: input, number_rows: 1, diff --git a/src/promql/src/functions/test_util.rs b/src/promql/src/functions/test_util.rs index 46ad6ec1a8..fb76ca52b5 100644 --- a/src/promql/src/functions/test_util.rs +++ b/src/promql/src/functions/test_util.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use datafusion::arrow::array::Float64Array; use datafusion::logical_expr::ScalarUDF; use datafusion::physical_plan::ColumnarValue; +use datafusion_common::ScalarValue; use datafusion_expr::ScalarFunctionArgs; use datatypes::arrow::datatypes::DataType; @@ -28,13 +29,17 @@ pub fn simple_range_udf_runner( range_fn: ScalarUDF, input_ts: RangeArray, input_value: RangeArray, + other_args: Vec, expected: Vec>, ) { let num_rows = input_ts.len(); - let input = vec![ + let input = [ ColumnarValue::Array(Arc::new(input_ts.into_dict())), ColumnarValue::Array(Arc::new(input_value.into_dict())), - ]; + ] + .into_iter() + .chain(other_args.into_iter().map(ColumnarValue::Scalar)) + .collect::>(); let args = ScalarFunctionArgs { args: input, number_rows: num_rows, diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index f81d2052dc..9f5d67a578 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -31,7 +31,7 @@ use datafusion::functions_aggregate::stddev::stddev_pop_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::functions_aggregate::variance::var_pop_udaf; use datafusion::functions_window::row_number::RowNumber; -use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, WindowFunction}; +use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction}; use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator, @@ -1425,15 +1425,18 @@ impl PromPlanner { let field_column_pos = 0; let mut exprs = Vec::with_capacity(self.ctx.field_columns.len()); let scalar_func = match func.name { - "increase" => ScalarFunc::ExtrapolateUdf(Arc::new(Increase::scalar_udf( + "increase" => ScalarFunc::ExtrapolateUdf( + Arc::new(Increase::scalar_udf()), self.ctx.range.context(ExpectRangeSelectorSnafu)?, - ))), - "rate" => ScalarFunc::ExtrapolateUdf(Arc::new(Rate::scalar_udf( + ), + "rate" => ScalarFunc::ExtrapolateUdf( + Arc::new(Rate::scalar_udf()), self.ctx.range.context(ExpectRangeSelectorSnafu)?, - ))), - "delta" => ScalarFunc::ExtrapolateUdf(Arc::new(Delta::scalar_udf( + ), + "delta" => ScalarFunc::ExtrapolateUdf( + Arc::new(Delta::scalar_udf()), self.ctx.range.context(ExpectRangeSelectorSnafu)?, - ))), + ), "idelta" => ScalarFunc::Udf(Arc::new(IDelta::::scalar_udf())), "irate" => ScalarFunc::Udf(Arc::new(IDelta::::scalar_udf())), "resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())), @@ -1449,50 +1452,9 @@ impl PromPlanner { "present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())), "stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())), "stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())), - "quantile_over_time" => { - let quantile_expr = match other_input_exprs.pop_front() { - Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile, - other => UnexpectedPlanExprSnafu { - desc: format!("expected f64 literal as quantile, but found {:?}", other), - } - .fail()?, - }; - ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf(quantile_expr))) - } - "predict_linear" => { - let t_expr = match other_input_exprs.pop_front() { - Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t as i64, - Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t, - other => UnexpectedPlanExprSnafu { - desc: format!("expected i64 literal as t, but found {:?}", other), - } - .fail()?, - }; - ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf(t_expr))) - } - "holt_winters" => { - let sf_exp = match other_input_exprs.pop_front() { - Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf, - other => UnexpectedPlanExprSnafu { - desc: format!( - "expected f64 literal as smoothing factor, but found {:?}", - other - ), - } - .fail()?, - }; - let tf_exp = match other_input_exprs.pop_front() { - Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf, - other => UnexpectedPlanExprSnafu { - desc: format!( - "expected f64 literal as trend factor, but found {:?}", - other - ), - } - .fail()?, - }; - ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf(sf_exp, tf_exp))) - } + "quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())), + "predict_linear" => ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf())), + "holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())), "time" => { exprs.push(build_special_time_expr( self.ctx.time_index_column.as_ref().unwrap(), @@ -1627,17 +1589,10 @@ impl PromPlanner { ScalarFunc::GeneratedExpr } "round" => { - let nearest = match other_input_exprs.pop_front() { - Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t, - Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64, - None => 0.0, - other => UnexpectedPlanExprSnafu { - desc: format!("expected f64 literal as t, but found {:?}", other), - } - .fail()?, - }; - - ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest))) + if other_input_exprs.is_empty() { + other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0)))); + } + ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf())) } _ => { @@ -1695,7 +1650,7 @@ impl PromPlanner { let _ = other_input_exprs.remove(field_column_pos + 1); let _ = other_input_exprs.remove(field_column_pos); } - ScalarFunc::ExtrapolateUdf(func) => { + ScalarFunc::ExtrapolateUdf(func, range_length) => { let ts_range_expr = DfExpr::Column(Column::from_name( RangeManipulate::build_timestamp_range_name( self.ctx.time_index_column.as_ref().unwrap(), @@ -1705,11 +1660,13 @@ impl PromPlanner { other_input_exprs.insert(field_column_pos + 1, col_expr); other_input_exprs .insert(field_column_pos + 2, self.create_time_index_column_expr()?); + other_input_exprs.push_back(lit(range_length)); let fn_expr = DfExpr::ScalarFunction(ScalarFunction { func, args: other_input_exprs.clone().into(), }); exprs.push(fn_expr); + let _ = other_input_exprs.pop_back(); let _ = other_input_exprs.remove(field_column_pos + 2); let _ = other_input_exprs.remove(field_column_pos + 1); let _ = other_input_exprs.remove(field_column_pos); @@ -1972,11 +1929,13 @@ impl PromPlanner { param: &Option>, input_plan: &LogicalPlan, ) -> Result<(Vec, Vec)> { + let mut non_col_args = Vec::new(); let aggr = match op.id() { token::T_SUM => sum_udaf(), token::T_QUANTILE => { let q = Self::get_param_value_as_f64(op, param)?; - quantile_udaf(q) + non_col_args.push(lit(q)); + quantile_udaf() } token::T_AVG => avg_udaf(), token::T_COUNT_VALUES | token::T_COUNT => count_udaf(), @@ -1998,16 +1957,12 @@ impl PromPlanner { .field_columns .iter() .map(|col| { - Ok(DfExpr::AggregateFunction(AggregateFunction { - func: aggr.clone(), - args: vec![DfExpr::Column(Column::from_name(col))], - distinct: false, - filter: None, - order_by: None, - null_treatment: None, - })) + non_col_args.push(DfExpr::Column(Column::from_name(col))); + let expr = aggr.call(non_col_args.clone()); + non_col_args.pop(); + expr }) - .collect::>>()?; + .collect::>(); // if the aggregator is `count_values`, it must be grouped by current fields. let prev_field_exprs = if op.id() == token::T_COUNT_VALUES { @@ -2941,7 +2896,8 @@ enum ScalarFunc { Udf(Arc), // todo(ruihang): maybe merge with Udf later /// UDF that require extra information like range length to be evaluated. - ExtrapolateUdf(Arc), + /// The second argument is range length. + ExtrapolateUdf(Arc, i64), /// Func that doesn't require input, like `time()`. GeneratedExpr, } @@ -3595,8 +3551,8 @@ mod test { async fn increase_aggr() { let query = "increase(some_metric[5m])"; let expected = String::from( - "Filter: prom_increase(timestamp_range,field_0,timestamp) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ - \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp) AS prom_increase(timestamp_range,field_0,timestamp), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\ + "Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\ + \n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\ \n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\ \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ \n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\ @@ -4395,8 +4351,8 @@ mod test { let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) .await .unwrap(); - let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\ - \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\ + let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\ + \n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\ \n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\ \n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\ \n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\ diff --git a/tests/cases/standalone/common/promql/quantile.result b/tests/cases/standalone/common/promql/quantile.result index 8676bbbb77..c3aa1ef1ec 100644 --- a/tests/cases/standalone/common/promql/quantile.result +++ b/tests/cases/standalone/common/promql/quantile.result @@ -30,40 +30,40 @@ Affected Rows: 16 TQL EVAL (0, 15, '5s') quantile(0.5, test); -+---------------------+--------------------+ -| ts | quantile(test.val) | -+---------------------+--------------------+ -| 1970-01-01T00:00:00 | 2.5 | -| 1970-01-01T00:00:05 | 6.5 | -| 1970-01-01T00:00:10 | 10.5 | -| 1970-01-01T00:00:15 | 14.5 | -+---------------------+--------------------+ ++---------------------+---------------------------------+ +| ts | quantile(Float64(0.5),test.val) | ++---------------------+---------------------------------+ +| 1970-01-01T00:00:00 | 2.5 | +| 1970-01-01T00:00:05 | 6.5 | +| 1970-01-01T00:00:10 | 10.5 | +| 1970-01-01T00:00:15 | 14.5 | ++---------------------+---------------------------------+ TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc); -+------+---------------------+--------------------+ -| idc | ts | quantile(test.val) | -+------+---------------------+--------------------+ -| idc1 | 1970-01-01T00:00:00 | 1.5 | -| idc1 | 1970-01-01T00:00:05 | 5.5 | -| idc1 | 1970-01-01T00:00:10 | 9.5 | -| idc1 | 1970-01-01T00:00:15 | 13.5 | -| idc2 | 1970-01-01T00:00:00 | 3.5 | -| idc2 | 1970-01-01T00:00:05 | 7.5 | -| idc2 | 1970-01-01T00:00:10 | 11.5 | -| idc2 | 1970-01-01T00:00:15 | 15.5 | -+------+---------------------+--------------------+ ++------+---------------------+---------------------------------+ +| idc | ts | quantile(Float64(0.5),test.val) | ++------+---------------------+---------------------------------+ +| idc1 | 1970-01-01T00:00:00 | 1.5 | +| idc1 | 1970-01-01T00:00:05 | 5.5 | +| idc1 | 1970-01-01T00:00:10 | 9.5 | +| idc1 | 1970-01-01T00:00:15 | 13.5 | +| idc2 | 1970-01-01T00:00:00 | 3.5 | +| idc2 | 1970-01-01T00:00:05 | 7.5 | +| idc2 | 1970-01-01T00:00:10 | 11.5 | +| idc2 | 1970-01-01T00:00:15 | 15.5 | ++------+---------------------+---------------------------------+ TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc)); -+---------------------+-------------------------+ -| ts | quantile(sum(test.val)) | -+---------------------+-------------------------+ -| 1970-01-01T00:00:00 | 5.0 | -| 1970-01-01T00:00:05 | 13.0 | -| 1970-01-01T00:00:10 | 21.0 | -| 1970-01-01T00:00:15 | 29.0 | -+---------------------+-------------------------+ ++---------------------+--------------------------------------+ +| ts | quantile(Float64(0.5),sum(test.val)) | ++---------------------+--------------------------------------+ +| 1970-01-01T00:00:00 | 5.0 | +| 1970-01-01T00:00:05 | 13.0 | +| 1970-01-01T00:00:10 | 21.0 | +| 1970-01-01T00:00:15 | 29.0 | ++---------------------+--------------------------------------+ DROP TABLE test; diff --git a/tests/cases/standalone/common/promql/round_fn.result b/tests/cases/standalone/common/promql/round_fn.result index fe12ca6f67..5fe7e2beb0 100644 --- a/tests/cases/standalone/common/promql/round_fn.result +++ b/tests/cases/standalone/common/promql/round_fn.result @@ -18,62 +18,62 @@ Affected Rows: 4 -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') round(cache_hit, 0.01); -+---------------------+----------------------------+-------+ -| ts | prom_round(greptime_value) | job | -+---------------------+----------------------------+-------+ -| 1970-01-01T00:00:03 | 123.45 | read | -| 1970-01-01T00:00:03 | 234.57 | write | -| 1970-01-01T00:00:04 | 345.68 | read | -| 1970-01-01T00:00:04 | 456.79 | write | -+---------------------+----------------------------+-------+ ++---------------------+------------------------------------------+-------+ +| ts | prom_round(greptime_value,Float64(0.01)) | job | ++---------------------+------------------------------------------+-------+ +| 1970-01-01T00:00:03 | 123.45 | read | +| 1970-01-01T00:00:03 | 234.57 | write | +| 1970-01-01T00:00:04 | 345.68 | read | +| 1970-01-01T00:00:04 | 456.79 | write | ++---------------------+------------------------------------------+-------+ -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') round(cache_hit, 0.1); -+---------------------+----------------------------+-------+ -| ts | prom_round(greptime_value) | job | -+---------------------+----------------------------+-------+ -| 1970-01-01T00:00:03 | 123.5 | read | -| 1970-01-01T00:00:03 | 234.60000000000002 | write | -| 1970-01-01T00:00:04 | 345.70000000000005 | read | -| 1970-01-01T00:00:04 | 456.8 | write | -+---------------------+----------------------------+-------+ ++---------------------+-----------------------------------------+-------+ +| ts | prom_round(greptime_value,Float64(0.1)) | job | ++---------------------+-----------------------------------------+-------+ +| 1970-01-01T00:00:03 | 123.5 | read | +| 1970-01-01T00:00:03 | 234.60000000000002 | write | +| 1970-01-01T00:00:04 | 345.70000000000005 | read | +| 1970-01-01T00:00:04 | 456.8 | write | ++---------------------+-----------------------------------------+-------+ -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') round(cache_hit, 1.0); -+---------------------+----------------------------+-------+ -| ts | prom_round(greptime_value) | job | -+---------------------+----------------------------+-------+ -| 1970-01-01T00:00:03 | 123.0 | read | -| 1970-01-01T00:00:03 | 235.0 | write | -| 1970-01-01T00:00:04 | 346.0 | read | -| 1970-01-01T00:00:04 | 457.0 | write | -+---------------------+----------------------------+-------+ ++---------------------+---------------------------------------+-------+ +| ts | prom_round(greptime_value,Float64(1)) | job | ++---------------------+---------------------------------------+-------+ +| 1970-01-01T00:00:03 | 123.0 | read | +| 1970-01-01T00:00:03 | 235.0 | write | +| 1970-01-01T00:00:04 | 346.0 | read | +| 1970-01-01T00:00:04 | 457.0 | write | ++---------------------+---------------------------------------+-------+ -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') round(cache_hit); -+---------------------+----------------------------+-------+ -| ts | prom_round(greptime_value) | job | -+---------------------+----------------------------+-------+ -| 1970-01-01T00:00:03 | 123.0 | read | -| 1970-01-01T00:00:03 | 235.0 | write | -| 1970-01-01T00:00:04 | 346.0 | read | -| 1970-01-01T00:00:04 | 457.0 | write | -+---------------------+----------------------------+-------+ ++---------------------+---------------------------------------+-------+ +| ts | prom_round(greptime_value,Float64(0)) | job | ++---------------------+---------------------------------------+-------+ +| 1970-01-01T00:00:03 | 123.0 | read | +| 1970-01-01T00:00:03 | 235.0 | write | +| 1970-01-01T00:00:04 | 346.0 | read | +| 1970-01-01T00:00:04 | 457.0 | write | ++---------------------+---------------------------------------+-------+ -- SQLNESS SORT_RESULT 3 1 tql eval (3, 4, '1s') round(cache_hit, 10.0); -+---------------------+----------------------------+-------+ -| ts | prom_round(greptime_value) | job | -+---------------------+----------------------------+-------+ -| 1970-01-01T00:00:03 | 120.0 | read | -| 1970-01-01T00:00:03 | 230.0 | write | -| 1970-01-01T00:00:04 | 350.0 | read | -| 1970-01-01T00:00:04 | 460.0 | write | -+---------------------+----------------------------+-------+ ++---------------------+----------------------------------------+-------+ +| ts | prom_round(greptime_value,Float64(10)) | job | ++---------------------+----------------------------------------+-------+ +| 1970-01-01T00:00:03 | 120.0 | read | +| 1970-01-01T00:00:03 | 230.0 | write | +| 1970-01-01T00:00:04 | 350.0 | read | +| 1970-01-01T00:00:04 | 460.0 | write | ++---------------------+----------------------------------------+-------+ drop table cache_hit; diff --git a/tests/cases/standalone/common/promql/simple_histogram.result b/tests/cases/standalone/common/promql/simple_histogram.result index 1b4e35e934..0ca95f02c3 100644 --- a/tests/cases/standalone/common/promql/simple_histogram.result +++ b/tests/cases/standalone/common/promql/simple_histogram.result @@ -228,27 +228,27 @@ tql eval (420, 420, '1s') histogram_quantile(0.833, histogram2_bucket); tql eval (2820, 2820, '1s') histogram_quantile(0.166, rate(histogram2_bucket[15m])); -+---------------------+----------------------------+ -| ts | prom_rate(ts_range,val,ts) | -+---------------------+----------------------------+ -| 1970-01-01T00:47:00 | 0.996 | -+---------------------+----------------------------+ ++---------------------+------------------------------------------+ +| ts | prom_rate(ts_range,val,ts,Int64(900000)) | ++---------------------+------------------------------------------+ +| 1970-01-01T00:47:00 | 0.996 | ++---------------------+------------------------------------------+ tql eval (2820, 2820, '1s') histogram_quantile(0.5, rate(histogram2_bucket[15m])); -+---------------------+----------------------------+ -| ts | prom_rate(ts_range,val,ts) | -+---------------------+----------------------------+ -| 1970-01-01T00:47:00 | 3.0 | -+---------------------+----------------------------+ ++---------------------+------------------------------------------+ +| ts | prom_rate(ts_range,val,ts,Int64(900000)) | ++---------------------+------------------------------------------+ +| 1970-01-01T00:47:00 | 3.0 | ++---------------------+------------------------------------------+ tql eval (2820, 2820, '1s') histogram_quantile(0.833, rate(histogram2_bucket[15m])); -+---------------------+----------------------------+ -| ts | prom_rate(ts_range,val,ts) | -+---------------------+----------------------------+ -| 1970-01-01T00:47:00 | 4.998 | -+---------------------+----------------------------+ ++---------------------+------------------------------------------+ +| ts | prom_rate(ts_range,val,ts,Int64(900000)) | ++---------------------+------------------------------------------+ +| 1970-01-01T00:47:00 | 4.998 | ++---------------------+------------------------------------------+ drop table histogram2_bucket; @@ -284,12 +284,12 @@ Affected Rows: 12 tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m]))); -+---+---------------------+---------------------------------+ -| s | ts | sum(prom_rate(ts_range,val,ts)) | -+---+---------------------+---------------------------------+ -| a | 1970-01-01T00:50:00 | 0.55 | -| a | 1970-01-01T00:50:03 | 0.5500000000000002 | -+---+---------------------+---------------------------------+ ++---+---------------------+-----------------------------------------------+ +| s | ts | sum(prom_rate(ts_range,val,ts,Int64(300000))) | ++---+---------------------+-----------------------------------------------+ +| a | 1970-01-01T00:50:00 | 0.55 | +| a | 1970-01-01T00:50:03 | 0.5500000000000002 | ++---+---------------------+-----------------------------------------------+ drop table histogram3_bucket; diff --git a/tests/cases/standalone/common/promql/subquery.result b/tests/cases/standalone/common/promql/subquery.result index d088468b17..12e65c4310 100644 --- a/tests/cases/standalone/common/promql/subquery.result +++ b/tests/cases/standalone/common/promql/subquery.result @@ -45,19 +45,19 @@ tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]); tql eval (10, 10, '1s') rate(metric_total[20s:10s]); -+---------------------+----------------------------+ -| ts | prom_rate(ts_range,val,ts) | -+---------------------+----------------------------+ -| 1970-01-01T00:00:10 | 0.1 | -+---------------------+----------------------------+ ++---------------------+-----------------------------------------+ +| ts | prom_rate(ts_range,val,ts,Int64(20000)) | ++---------------------+-----------------------------------------+ +| 1970-01-01T00:00:10 | 0.1 | ++---------------------+-----------------------------------------+ tql eval (20, 20, '1s') rate(metric_total[20s:5s]); -+---------------------+----------------------------+ -| ts | prom_rate(ts_range,val,ts) | -+---------------------+----------------------------+ -| 1970-01-01T00:00:20 | 0.06666666666666667 | -+---------------------+----------------------------+ ++---------------------+-----------------------------------------+ +| ts | prom_rate(ts_range,val,ts,Int64(20000)) | ++---------------------+-----------------------------------------+ +| 1970-01-01T00:00:20 | 0.06666666666666667 | ++---------------------+-----------------------------------------+ drop table metric_total; diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index af1a3d4fce..63491df951 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -137,8 +137,8 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]); | stage | node | plan_| +-+-+-+ | 0_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED -|_|_|_FilterExec: prom_rate(j_range,i,j)@1 IS NOT NULL REDACTED -|_|_|_ProjectionExec: expr=[j@1 as j, prom_rate(j_range@4, i@0, j@1) as prom_rate(j_range,i,j), k@2 as k, l@3 as l] REDACTED +|_|_|_FilterExec: prom_rate(j_range,i,j,Int64(10000))@1 IS NOT NULL REDACTED +|_|_|_ProjectionExec: expr=[j@1 as j, prom_rate(j_range@4, i@0, j@1, 10000) as prom_rate(j_range,i,j,Int64(10000)), k@2 as k, l@3 as l] REDACTED |_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED |_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED |_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED