feat: remove hyper parameter from promql functions (#5955)

* quantile udaf

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* extrapolate rate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* predict_linear, round, holt_winters, quantile_overtime

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix quantile function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-24 15:17:10 +08:00
committed by GitHub
parent ff3a46b1d0
commit b476584f56
19 changed files with 383 additions and 245 deletions

View File

@@ -44,13 +44,13 @@ pub use quantile_aggr::quantile_udaf;
pub use resets::Resets;
pub use round::Round;
/// Extracts an array from a `ColumnarValue`.
///
/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {
Ok(array.clone())
} else {
Err(DataFusionError::Execution(
"expect array as input, found scalar value".to_string(),
))
match columnar_value {
ColumnarValue::Array(array) => Ok(array.clone()),
ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
}
}

View File

@@ -231,6 +231,7 @@ mod test {
AvgOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(49.9999995),
Some(45.8618844),
@@ -253,6 +254,7 @@ mod test {
MinOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(12.345678),
Some(12.345678),
@@ -275,6 +277,7 @@ mod test {
MaxOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(87.654321),
Some(87.654321),
@@ -297,6 +300,7 @@ mod test {
SumOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(99.999999),
Some(229.309422),
@@ -319,6 +323,7 @@ mod test {
CountOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(2.0),
Some(5.0),
@@ -341,6 +346,7 @@ mod test {
LastOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(87.654321),
Some(70.710678),
@@ -363,6 +369,7 @@ mod test {
AbsentOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
None,
None,
@@ -385,6 +392,7 @@ mod test {
PresentOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(1.0),
Some(1.0),
@@ -407,6 +415,7 @@ mod test {
StdvarOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(1417.8479276253622),
Some(808.999919713209),
@@ -442,6 +451,7 @@ mod test {
StdvarOverTime::scalar_udf(),
RangeArray::from_ranges(ts_array, ranges).unwrap(),
RangeArray::from_ranges(values_array, ranges).unwrap(),
vec![],
vec![Some(0.0), Some(10.559999999999999)],
);
}
@@ -453,6 +463,7 @@ mod test {
StddevOverTime::scalar_udf(),
ts_array,
value_array,
vec![],
vec![
Some(37.6543215),
Some(28.442923895289123),
@@ -488,6 +499,7 @@ mod test {
StddevOverTime::scalar_udf(),
RangeArray::from_ranges(ts_array, ranges).unwrap(),
RangeArray::from_ranges(values_array, ranges).unwrap(),
vec![],
vec![Some(0.0), Some(3.249615361854384)],
);
}

View File

@@ -90,6 +90,7 @@ mod test {
Changes::scalar_udf(),
ts_array_1,
value_array_1,
vec![],
vec![Some(0.0), Some(3.0), Some(5.0), Some(8.0), None],
);
@@ -101,6 +102,7 @@ mod test {
Changes::scalar_udf(),
ts_array_2,
value_array_2,
vec![],
vec![Some(0.0), Some(3.0), Some(5.0), Some(9.0), None],
);
@@ -111,6 +113,7 @@ mod test {
Changes::scalar_udf(),
ts_array_3,
value_array_3,
vec![],
vec![Some(0.0), Some(0.0), Some(1.0), Some(1.0), None],
);
}

View File

@@ -74,6 +74,7 @@ mod test {
Deriv::scalar_udf(),
ts_array,
value_array,
vec![],
vec![Some(10.606060606060607), None],
);
}
@@ -99,6 +100,7 @@ mod test {
Deriv::scalar_udf(),
ts_range_array,
value_range_array,
vec![],
vec![Some(0.0)],
);
}

View File

@@ -34,11 +34,11 @@ use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::common::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::{ScalarUDF, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datafusion_expr::create_udf;
use datatypes::arrow::array::Array;
use datatypes::arrow::array::{Array, Int64Array};
use datatypes::arrow::datatypes::DataType;
use crate::extension_plan::Millisecond;
@@ -53,7 +53,7 @@ pub type Increase = ExtrapolatedRate<true, false>;
/// from <https://github.com/prometheus/prometheus/blob/v0.40.1/promql/functions.go#L66>
#[derive(Debug)]
pub struct ExtrapolatedRate<const IS_COUNTER: bool, const IS_RATE: bool> {
/// Range duration in millisecond
/// Range length in milliseconds.
range_length: i64,
}
@@ -63,7 +63,7 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
Self { range_length }
}
fn scalar_udf_with_name(name: &str, range_length: i64) -> ScalarUDF {
fn scalar_udf_with_name(name: &str) -> ScalarUDF {
let input_types = vec![
// timestamp range vector
RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
@@ -71,6 +71,8 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
RangeArray::convert_data_type(DataType::Float64),
// timestamp vector
DataType::Timestamp(TimeUnit::Millisecond, None),
// range length
DataType::Int64,
];
create_udf(
@@ -78,12 +80,34 @@ impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, I
input_types,
DataType::Float64,
Volatility::Volatile,
Arc::new(move |input: &_| Self::new(range_length).calc(input)) as _,
Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
)
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 3);
fn create_function(inputs: &[ColumnarValue]) -> DfResult<Self> {
if inputs.len() != 4 {
return Err(DataFusionError::Plan(
"ExtrapolatedRate function should have 4 inputs".to_string(),
));
}
let range_length_array = extract_array(&inputs[3])?;
let range_length = range_length_array
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0) as i64;
Ok(Self::new(range_length))
}
/// Input parameters:
/// * 0: timestamp range vector
/// * 1: value range vector
/// * 2: timestamp vector
/// * 3: range length. Range duration in millisecond. Not used here
fn calc(&self, input: &[ColumnarValue]) -> DfResult<ColumnarValue> {
assert_eq!(input.len(), 4);
// construct matrix from input
let ts_array = extract_array(&input[0])?;
@@ -208,8 +232,8 @@ impl ExtrapolatedRate<false, false> {
"prom_delta"
}
pub fn scalar_udf(range_length: i64) -> ScalarUDF {
Self::scalar_udf_with_name(Self::name(), range_length)
pub fn scalar_udf() -> ScalarUDF {
Self::scalar_udf_with_name(Self::name())
}
}
@@ -219,8 +243,8 @@ impl ExtrapolatedRate<true, true> {
"prom_rate"
}
pub fn scalar_udf(range_length: i64) -> ScalarUDF {
Self::scalar_udf_with_name(Self::name(), range_length)
pub fn scalar_udf() -> ScalarUDF {
Self::scalar_udf_with_name(Self::name())
}
}
@@ -230,8 +254,8 @@ impl ExtrapolatedRate<true, false> {
"prom_increase"
}
pub fn scalar_udf(range_length: i64) -> ScalarUDF {
Self::scalar_udf_with_name(Self::name(), range_length)
pub fn scalar_udf() -> ScalarUDF {
Self::scalar_udf_with_name(Self::name())
}
}
@@ -271,6 +295,7 @@ mod test {
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(timestamps),
ColumnarValue::Array(Arc::new(Int64Array::from(vec![5]))),
];
let output = extract_array(
&ExtrapolatedRate::<IS_COUNTER, IS_RATE>::new(5)

View File

@@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_expr::create_udf;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
@@ -62,6 +63,10 @@ impl HoltWinters {
vec![
RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
RangeArray::convert_data_type(DataType::Float64),
// sf
DataType::Float64,
// tf
DataType::Float64,
]
}
@@ -69,20 +74,39 @@ impl HoltWinters {
DataType::Float64
}
pub fn scalar_udf(level: f64, trend: f64) -> ScalarUDF {
pub fn scalar_udf() -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Volatile,
Arc::new(move |input: &_| Self::new(level, trend).calc(input)) as _,
Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
)
}
fn create_function(inputs: &[ColumnarValue]) -> Result<Self, DataFusionError> {
if inputs.len() != 4 {
return Err(DataFusionError::Plan(
"HoltWinters function should have 4 inputs".to_string(),
));
}
let ColumnarValue::Scalar(ScalarValue::Float64(Some(sf))) = inputs[2] else {
return Err(DataFusionError::Plan(
"HoltWinters function's third input should be a scalar float64".to_string(),
));
};
let ColumnarValue::Scalar(ScalarValue::Float64(Some(tf))) = inputs[3] else {
return Err(DataFusionError::Plan(
"HoltWinters function's fourth input should be a scalar float64".to_string(),
));
};
Ok(Self::new(sf, tf))
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input.
// The third one is level param, the fourth - trend param which are included in fields.
assert_eq!(input.len(), 2);
assert_eq!(input.len(), 4);
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
@@ -264,9 +288,13 @@ mod tests {
let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
simple_range_udf_runner(
HoltWinters::scalar_udf(0.5, 0.1),
HoltWinters::scalar_udf(),
ts_range_array,
value_range_array,
vec![
ScalarValue::Float64(Some(0.5)),
ScalarValue::Float64(Some(0.1)),
],
vec![Some(5.0)],
);
}
@@ -287,9 +315,13 @@ mod tests {
let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
simple_range_udf_runner(
HoltWinters::scalar_udf(0.5, 0.1),
HoltWinters::scalar_udf(),
ts_range_array,
value_range_array,
vec![
ScalarValue::Float64(Some(0.5)),
ScalarValue::Float64(Some(0.1)),
],
vec![Some(38.18119566835938)],
);
}
@@ -315,9 +347,13 @@ mod tests {
let (ts_range_array, value_range_array) =
create_ts_and_value_range_arrays(query, ranges.clone());
simple_range_udf_runner(
HoltWinters::scalar_udf(0.01, 0.1),
HoltWinters::scalar_udf(),
ts_range_array,
value_range_array,
vec![
ScalarValue::Float64(Some(0.01)),
ScalarValue::Float64(Some(0.1)),
],
vec![Some(expected)],
);
}

View File

@@ -190,6 +190,7 @@ mod test {
IDelta::<false>::scalar_udf(),
ts_range_array,
value_range_array,
vec![],
vec![Some(1.0), Some(-5.0), None, Some(6.0), None, None],
);
@@ -200,6 +201,7 @@ mod test {
IDelta::<true>::scalar_udf(),
ts_range_array,
value_range_array,
vec![],
// the second point represent counter reset
vec![Some(0.5), Some(0.0), None, Some(3.0), None, None],
);

View File

@@ -22,6 +22,7 @@ use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_expr::create_udf;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
@@ -44,25 +45,41 @@ impl PredictLinear {
"prom_predict_linear"
}
pub fn scalar_udf(t: i64) -> ScalarUDF {
pub fn scalar_udf() -> ScalarUDF {
let input_types = vec![
// time index column
RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
// value column
RangeArray::convert_data_type(DataType::Float64),
// t
DataType::Int64,
];
create_udf(
Self::name(),
input_types,
DataType::Float64,
Volatility::Volatile,
Arc::new(move |input: &_| Self::new(t).predict_linear(input)) as _,
Arc::new(move |input: &_| Self::create_function(input)?.predict_linear(input)) as _,
)
}
fn create_function(inputs: &[ColumnarValue]) -> Result<Self, DataFusionError> {
if inputs.len() != 3 {
return Err(DataFusionError::Plan(
"PredictLinear function should have 3 inputs".to_string(),
));
}
let ColumnarValue::Scalar(ScalarValue::Int64(Some(t))) = inputs[2] else {
return Err(DataFusionError::Plan(
"PredictLinear function's third input should be a scalar int64".to_string(),
));
};
Ok(Self::new(t))
}
fn predict_linear(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input.
assert_eq!(input.len(), 2);
assert_eq!(input.len(), 3);
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
@@ -190,9 +207,10 @@ mod test {
let ts_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_array = RangeArray::from_ranges(values_array, ranges).unwrap();
simple_range_udf_runner(
PredictLinear::scalar_udf(0),
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(0))],
vec![None, None],
);
}
@@ -201,9 +219,10 @@ mod test {
fn calculate_predict_linear_test1() {
let (ts_array, value_array) = build_test_range_arrays();
simple_range_udf_runner(
PredictLinear::scalar_udf(0),
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(0))],
// value at t = 0
vec![Some(38.63636363636364)],
);
@@ -213,9 +232,10 @@ mod test {
fn calculate_predict_linear_test2() {
let (ts_array, value_array) = build_test_range_arrays();
simple_range_udf_runner(
PredictLinear::scalar_udf(3000),
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(3000))],
// value at t = 3000
vec![Some(31856.818181818187)],
);
@@ -225,9 +245,10 @@ mod test {
fn calculate_predict_linear_test3() {
let (ts_array, value_array) = build_test_range_arrays();
simple_range_udf_runner(
PredictLinear::scalar_udf(4200),
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(4200))],
// value at t = 4200
vec![Some(44584.09090909091)],
);
@@ -237,9 +258,10 @@ mod test {
fn calculate_predict_linear_test4() {
let (ts_array, value_array) = build_test_range_arrays();
simple_range_udf_runner(
PredictLinear::scalar_udf(6600),
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(6600))],
// value at t = 6600
vec![Some(70038.63636363638)],
);
@@ -249,9 +271,10 @@ mod test {
fn calculate_predict_linear_test5() {
let (ts_array, value_array) = build_test_range_arrays();
simple_range_udf_runner(
PredictLinear::scalar_udf(7800),
PredictLinear::scalar_udf(),
ts_array,
value_array,
vec![ScalarValue::Int64(Some(7800))],
// value at t = 7800
vec![Some(82765.9090909091)],
);

View File

@@ -19,6 +19,7 @@ use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_expr::create_udf;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
@@ -40,22 +41,38 @@ impl QuantileOverTime {
"prom_quantile_over_time"
}
pub fn scalar_udf(quantile: f64) -> ScalarUDF {
pub fn scalar_udf() -> ScalarUDF {
let input_types = vec![
// time index column
RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
// value column
RangeArray::convert_data_type(DataType::Float64),
// quantile
DataType::Float64,
];
create_udf(
Self::name(),
input_types,
DataType::Float64,
Volatility::Volatile,
Arc::new(move |input: &_| Self::new(quantile).quantile_over_time(input)) as _,
Arc::new(move |input: &_| Self::create_function(input)?.quantile_over_time(input)) as _,
)
}
fn create_function(inputs: &[ColumnarValue]) -> Result<Self, DataFusionError> {
if inputs.len() != 3 {
return Err(DataFusionError::Plan(
"QuantileOverTime function should have 3 inputs".to_string(),
));
}
let ColumnarValue::Scalar(ScalarValue::Float64(Some(quantile))) = inputs[2] else {
return Err(DataFusionError::Plan(
"QuantileOverTime function's third input should be a scalar float64".to_string(),
));
};
Ok(Self::new(quantile))
}
fn quantile_over_time(
&self,
input: &[ColumnarValue],

View File

@@ -16,10 +16,12 @@ use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, AsArray};
use datafusion::common::cast::{as_list_array, as_primitive_array, as_struct_array};
use datafusion::error::Result as DfResult;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility};
use datafusion::physical_plan::expressions::Literal;
use datafusion::prelude::create_udaf;
use datafusion_common::ScalarValue;
use datafusion_expr::function::AccumulatorArgs;
use datatypes::arrow::array::{ListArray, StructArray};
use datatypes::arrow::datatypes::{DataType, Field, Float64Type};
@@ -38,16 +40,16 @@ pub struct QuantileAccumulator {
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
pub fn quantile_udaf(q: f64) -> Arc<AggregateUDF> {
pub fn quantile_udaf() -> Arc<AggregateUDF> {
Arc::new(create_udaf(
QUANTILE_NAME,
// Input type: (values)
vec![DataType::Float64],
// Input type: (φ, values)
vec![DataType::Float64, DataType::Float64],
// Output type: the φ-quantile
Arc::new(DataType::Float64),
Volatility::Volatile,
// Create the accumulator
Arc::new(move |_| Ok(Box::new(QuantileAccumulator::new(q)))),
Arc::new(QuantileAccumulator::from_args),
// Intermediate state types
Arc::new(vec![DataType::Struct(
vec![Field::new(
@@ -65,17 +67,40 @@ pub fn quantile_udaf(q: f64) -> Arc<AggregateUDF> {
}
impl QuantileAccumulator {
pub fn new(q: f64) -> Self {
fn new(q: f64) -> Self {
Self {
q,
..Default::default()
}
}
pub fn from_args(args: AccumulatorArgs) -> DfResult<Box<dyn DfAccumulator>> {
if args.exprs.len() != 2 {
return Err(DataFusionError::Plan(
"Quantile function should have 2 inputs".to_string(),
));
}
let q = match &args.exprs[0]
.as_any()
.downcast_ref::<Literal>()
.map(|lit| lit.value())
{
Some(ScalarValue::Float64(Some(q))) => *q,
_ => {
return Err(DataFusionError::Internal(
"Invalid quantile value".to_string(),
))
}
};
Ok(Box::new(Self::new(q)))
}
}
impl DfAccumulator for QuantileAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let f64_array = values[0].as_primitive::<Float64Type>();
let f64_array = values[1].as_primitive::<Float64Type>();
self.values.extend(f64_array);
@@ -162,9 +187,10 @@ mod tests {
#[test]
fn test_quantile_accumulator_single_value() {
let mut accumulator = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input = create_f64_array(vec![Some(10.0)]);
accumulator.update_batch(&[input]).unwrap();
accumulator.update_batch(&[q, input]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(10.0)));
@@ -173,9 +199,10 @@ mod tests {
#[test]
fn test_quantile_accumulator_multiple_values() {
let mut accumulator = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
accumulator.update_batch(&[input]).unwrap();
accumulator.update_batch(&[q, input]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
@@ -184,9 +211,10 @@ mod tests {
#[test]
fn test_quantile_accumulator_with_nulls() {
let mut accumulator = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input = create_f64_array(vec![Some(1.0), None, Some(3.0), Some(4.0), Some(5.0)]);
accumulator.update_batch(&[input]).unwrap();
accumulator.update_batch(&[q, input]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
@@ -195,11 +223,12 @@ mod tests {
#[test]
fn test_quantile_accumulator_multiple_batches() {
let mut accumulator = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]);
let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]);
accumulator.update_batch(&[input1]).unwrap();
accumulator.update_batch(&[input2]).unwrap();
accumulator.update_batch(&[q.clone(), input1]).unwrap();
accumulator.update_batch(&[q, input2]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
@@ -208,29 +237,33 @@ mod tests {
#[test]
fn test_quantile_accumulator_different_quantiles() {
let mut min_accumulator = QuantileAccumulator::new(0.0);
let q = create_f64_array(vec![Some(0.0)]);
let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
min_accumulator.update_batch(&[input.clone()]).unwrap();
min_accumulator.update_batch(&[q, input.clone()]).unwrap();
assert_eq!(
min_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(1.0))
);
let mut q1_accumulator = QuantileAccumulator::new(0.25);
q1_accumulator.update_batch(&[input.clone()]).unwrap();
let q = create_f64_array(vec![Some(0.25)]);
q1_accumulator.update_batch(&[q, input.clone()]).unwrap();
assert_eq!(
q1_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(2.0))
);
let mut q3_accumulator = QuantileAccumulator::new(0.75);
q3_accumulator.update_batch(&[input.clone()]).unwrap();
let q = create_f64_array(vec![Some(0.75)]);
q3_accumulator.update_batch(&[q, input.clone()]).unwrap();
assert_eq!(
q3_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(4.0))
);
let mut max_accumulator = QuantileAccumulator::new(1.0);
max_accumulator.update_batch(&[input]).unwrap();
let q = create_f64_array(vec![Some(1.0)]);
max_accumulator.update_batch(&[q, input]).unwrap();
assert_eq!(
max_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(5.0))
@@ -240,10 +273,11 @@ mod tests {
#[test]
fn test_quantile_accumulator_size() {
let mut accumulator = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0)]);
let initial_size = accumulator.size();
accumulator.update_batch(&[input]).unwrap();
accumulator.update_batch(&[q, input]).unwrap();
let after_update_size = accumulator.size();
assert!(after_update_size >= initial_size);
@@ -252,14 +286,16 @@ mod tests {
#[test]
fn test_quantile_accumulator_state_and_merge() -> DfResult<()> {
let mut acc1 = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]);
acc1.update_batch(&[input1])?;
acc1.update_batch(&[q, input1])?;
let state1 = acc1.state()?;
let mut acc2 = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]);
acc2.update_batch(&[input2])?;
acc2.update_batch(&[q, input2])?;
let mut struct_builders = vec![];
for scalar in &state1 {
@@ -280,16 +316,16 @@ mod tests {
#[test]
fn test_quantile_accumulator_with_extreme_values() {
let mut accumulator = QuantileAccumulator::new(0.5);
let q = create_f64_array(vec![Some(0.5)]);
let input = create_f64_array(vec![Some(f64::MAX), Some(f64::MIN), Some(0.0)]);
accumulator.update_batch(&[input]).unwrap();
accumulator.update_batch(&[q, input]).unwrap();
let _result = accumulator.evaluate().unwrap();
}
#[test]
fn test_quantile_udaf_creation() {
let q = 0.5;
let udaf = quantile_udaf(q);
let udaf = quantile_udaf();
assert_eq!(udaf.name(), QUANTILE_NAME);
assert_eq!(udaf.return_type(&[]).unwrap(), DataType::Float64);

View File

@@ -90,6 +90,7 @@ mod test {
Resets::scalar_udf(),
ts_array_1,
value_array_1,
vec![],
vec![Some(0.0), Some(1.0), Some(2.0), Some(3.0), None],
);
@@ -101,6 +102,7 @@ mod test {
Resets::scalar_udf(),
ts_array_2,
value_array_2,
vec![],
vec![Some(0.0), Some(0.0), Some(1.0), Some(1.0), None],
);
@@ -111,6 +113,7 @@ mod test {
Resets::scalar_udf(),
ts_array_3,
value_array_3,
vec![],
vec![Some(0.0), Some(0.0), Some(0.0), Some(0.0), None],
);
}

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion_common::ScalarValue;
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
use datatypes::arrow::array::AsArray;
use datatypes::arrow::datatypes::{DataType, Float64Type};
@@ -36,25 +37,39 @@ impl Round {
}
fn input_type() -> Vec<DataType> {
vec![DataType::Float64]
vec![DataType::Float64, DataType::Float64]
}
pub fn return_type() -> DataType {
DataType::Float64
}
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
pub fn scalar_udf() -> ScalarUDF {
create_udf(
Self::name(),
Self::input_type(),
Self::return_type(),
Volatility::Volatile,
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
)
}
fn create_function(inputs: &[ColumnarValue]) -> Result<Self, DataFusionError> {
if inputs.len() != 2 {
return Err(DataFusionError::Plan(
"Round function should have 2 inputs".to_string(),
));
}
let ColumnarValue::Scalar(ScalarValue::Float64(Some(nearest))) = inputs[1] else {
return Err(DataFusionError::Plan(
"Round function's second input should be a scalar float64".to_string(),
));
};
Ok(Self::new(nearest))
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 1);
assert_eq!(input.len(), 2);
let value_array = extract_array(&input[0])?;
@@ -80,8 +95,11 @@ mod tests {
use super::*;
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
let round_udf = Round::scalar_udf(nearest);
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
let round_udf = Round::scalar_udf();
let input = vec![
ColumnarValue::Array(Arc::new(Float64Array::from(value))),
ColumnarValue::Scalar(ScalarValue::Float64(Some(nearest))),
];
let args = ScalarFunctionArgs {
args: input,
number_rows: 1,

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use datafusion::arrow::array::Float64Array;
use datafusion::logical_expr::ScalarUDF;
use datafusion::physical_plan::ColumnarValue;
use datafusion_common::ScalarValue;
use datafusion_expr::ScalarFunctionArgs;
use datatypes::arrow::datatypes::DataType;
@@ -28,13 +29,17 @@ pub fn simple_range_udf_runner(
range_fn: ScalarUDF,
input_ts: RangeArray,
input_value: RangeArray,
other_args: Vec<ScalarValue>,
expected: Vec<Option<f64>>,
) {
let num_rows = input_ts.len();
let input = vec![
let input = [
ColumnarValue::Array(Arc::new(input_ts.into_dict())),
ColumnarValue::Array(Arc::new(input_value.into_dict())),
];
]
.into_iter()
.chain(other_args.into_iter().map(ColumnarValue::Scalar))
.collect::<Vec<_>>();
let args = ScalarFunctionArgs {
args: input,
number_rows: num_rows,

View File

@@ -31,7 +31,7 @@ use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::functions_aggregate::variance::var_pop_udaf;
use datafusion::functions_window::row_number::RowNumber;
use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, WindowFunction};
use datafusion::logical_expr::expr::{Alias, ScalarFunction, WindowFunction};
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
@@ -1425,15 +1425,18 @@ impl PromPlanner {
let field_column_pos = 0;
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
let scalar_func = match func.name {
"increase" => ScalarFunc::ExtrapolateUdf(Arc::new(Increase::scalar_udf(
"increase" => ScalarFunc::ExtrapolateUdf(
Arc::new(Increase::scalar_udf()),
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
))),
"rate" => ScalarFunc::ExtrapolateUdf(Arc::new(Rate::scalar_udf(
),
"rate" => ScalarFunc::ExtrapolateUdf(
Arc::new(Rate::scalar_udf()),
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
))),
"delta" => ScalarFunc::ExtrapolateUdf(Arc::new(Delta::scalar_udf(
),
"delta" => ScalarFunc::ExtrapolateUdf(
Arc::new(Delta::scalar_udf()),
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
))),
),
"idelta" => ScalarFunc::Udf(Arc::new(IDelta::<false>::scalar_udf())),
"irate" => ScalarFunc::Udf(Arc::new(IDelta::<true>::scalar_udf())),
"resets" => ScalarFunc::Udf(Arc::new(Resets::scalar_udf())),
@@ -1449,50 +1452,9 @@ impl PromPlanner {
"present_over_time" => ScalarFunc::Udf(Arc::new(PresentOverTime::scalar_udf())),
"stddev_over_time" => ScalarFunc::Udf(Arc::new(StddevOverTime::scalar_udf())),
"stdvar_over_time" => ScalarFunc::Udf(Arc::new(StdvarOverTime::scalar_udf())),
"quantile_over_time" => {
let quantile_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(quantile)))) => quantile,
other => UnexpectedPlanExprSnafu {
desc: format!("expected f64 literal as quantile, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf(quantile_expr)))
}
"predict_linear" => {
let t_expr = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t as i64,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t,
other => UnexpectedPlanExprSnafu {
desc: format!("expected i64 literal as t, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf(t_expr)))
}
"holt_winters" => {
let sf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(sf)))) => sf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expected f64 literal as smoothing factor, but found {:?}",
other
),
}
.fail()?,
};
let tf_exp = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(tf)))) => tf,
other => UnexpectedPlanExprSnafu {
desc: format!(
"expected f64 literal as trend factor, but found {:?}",
other
),
}
.fail()?,
};
ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf(sf_exp, tf_exp)))
}
"quantile_over_time" => ScalarFunc::Udf(Arc::new(QuantileOverTime::scalar_udf())),
"predict_linear" => ScalarFunc::Udf(Arc::new(PredictLinear::scalar_udf())),
"holt_winters" => ScalarFunc::Udf(Arc::new(HoltWinters::scalar_udf())),
"time" => {
exprs.push(build_special_time_expr(
self.ctx.time_index_column.as_ref().unwrap(),
@@ -1627,17 +1589,10 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"round" => {
let nearest = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
None => 0.0,
other => UnexpectedPlanExprSnafu {
desc: format!("expected f64 literal as t, but found {:?}", other),
}
.fail()?,
};
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
if other_input_exprs.is_empty() {
other_input_exprs.push_front(DfExpr::Literal(ScalarValue::Float64(Some(0.0))));
}
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf()))
}
_ => {
@@ -1695,7 +1650,7 @@ impl PromPlanner {
let _ = other_input_exprs.remove(field_column_pos + 1);
let _ = other_input_exprs.remove(field_column_pos);
}
ScalarFunc::ExtrapolateUdf(func) => {
ScalarFunc::ExtrapolateUdf(func, range_length) => {
let ts_range_expr = DfExpr::Column(Column::from_name(
RangeManipulate::build_timestamp_range_name(
self.ctx.time_index_column.as_ref().unwrap(),
@@ -1705,11 +1660,13 @@ impl PromPlanner {
other_input_exprs.insert(field_column_pos + 1, col_expr);
other_input_exprs
.insert(field_column_pos + 2, self.create_time_index_column_expr()?);
other_input_exprs.push_back(lit(range_length));
let fn_expr = DfExpr::ScalarFunction(ScalarFunction {
func,
args: other_input_exprs.clone().into(),
});
exprs.push(fn_expr);
let _ = other_input_exprs.pop_back();
let _ = other_input_exprs.remove(field_column_pos + 2);
let _ = other_input_exprs.remove(field_column_pos + 1);
let _ = other_input_exprs.remove(field_column_pos);
@@ -1972,11 +1929,13 @@ impl PromPlanner {
param: &Option<Box<PromExpr>>,
input_plan: &LogicalPlan,
) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
let mut non_col_args = Vec::new();
let aggr = match op.id() {
token::T_SUM => sum_udaf(),
token::T_QUANTILE => {
let q = Self::get_param_value_as_f64(op, param)?;
quantile_udaf(q)
non_col_args.push(lit(q));
quantile_udaf()
}
token::T_AVG => avg_udaf(),
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
@@ -1998,16 +1957,12 @@ impl PromPlanner {
.field_columns
.iter()
.map(|col| {
Ok(DfExpr::AggregateFunction(AggregateFunction {
func: aggr.clone(),
args: vec![DfExpr::Column(Column::from_name(col))],
distinct: false,
filter: None,
order_by: None,
null_treatment: None,
}))
non_col_args.push(DfExpr::Column(Column::from_name(col)));
let expr = aggr.call(non_col_args.clone());
non_col_args.pop();
expr
})
.collect::<Result<Vec<_>>>()?;
.collect::<Vec<_>>();
// if the aggregator is `count_values`, it must be grouped by current fields.
let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
@@ -2941,7 +2896,8 @@ enum ScalarFunc {
Udf(Arc<ScalarUdfDef>),
// todo(ruihang): maybe merge with Udf later
/// UDF that require extra information like range length to be evaluated.
ExtrapolateUdf(Arc<ScalarUdfDef>),
/// The second argument is range length.
ExtrapolateUdf(Arc<ScalarUdfDef>, i64),
/// Func that doesn't require input, like `time()`.
GeneratedExpr,
}
@@ -3595,8 +3551,8 @@ mod test {
async fn increase_aggr() {
let query = "increase(some_metric[5m])";
let expected = String::from(
"Filter: prom_increase(timestamp_range,field_0,timestamp) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp) AS prom_increase(timestamp_range,field_0,timestamp), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\
"Filter: prom_increase(timestamp_range,field_0,timestamp,Int64(300000)) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp, Int64(300000)) AS prom_increase(timestamp_range,field_0,timestamp,Int64(300000)), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp,Int64(300000)):Float64;N, tag_0:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
@@ -4395,8 +4351,8 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
let expected = "Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(Float64(0.3), sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(Float64(0.3),sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]\
\n Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
\n Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]\

View File

@@ -30,40 +30,40 @@ Affected Rows: 16
TQL EVAL (0, 15, '5s') quantile(0.5, test);
+---------------------+--------------------+
| ts | quantile(test.val) |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 2.5 |
| 1970-01-01T00:00:05 | 6.5 |
| 1970-01-01T00:00:10 | 10.5 |
| 1970-01-01T00:00:15 | 14.5 |
+---------------------+--------------------+
+---------------------+---------------------------------+
| ts | quantile(Float64(0.5),test.val) |
+---------------------+---------------------------------+
| 1970-01-01T00:00:00 | 2.5 |
| 1970-01-01T00:00:05 | 6.5 |
| 1970-01-01T00:00:10 | 10.5 |
| 1970-01-01T00:00:15 | 14.5 |
+---------------------+---------------------------------+
TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc);
+------+---------------------+--------------------+
| idc | ts | quantile(test.val) |
+------+---------------------+--------------------+
| idc1 | 1970-01-01T00:00:00 | 1.5 |
| idc1 | 1970-01-01T00:00:05 | 5.5 |
| idc1 | 1970-01-01T00:00:10 | 9.5 |
| idc1 | 1970-01-01T00:00:15 | 13.5 |
| idc2 | 1970-01-01T00:00:00 | 3.5 |
| idc2 | 1970-01-01T00:00:05 | 7.5 |
| idc2 | 1970-01-01T00:00:10 | 11.5 |
| idc2 | 1970-01-01T00:00:15 | 15.5 |
+------+---------------------+--------------------+
+------+---------------------+---------------------------------+
| idc | ts | quantile(Float64(0.5),test.val) |
+------+---------------------+---------------------------------+
| idc1 | 1970-01-01T00:00:00 | 1.5 |
| idc1 | 1970-01-01T00:00:05 | 5.5 |
| idc1 | 1970-01-01T00:00:10 | 9.5 |
| idc1 | 1970-01-01T00:00:15 | 13.5 |
| idc2 | 1970-01-01T00:00:00 | 3.5 |
| idc2 | 1970-01-01T00:00:05 | 7.5 |
| idc2 | 1970-01-01T00:00:10 | 11.5 |
| idc2 | 1970-01-01T00:00:15 | 15.5 |
+------+---------------------+---------------------------------+
TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc));
+---------------------+-------------------------+
| ts | quantile(sum(test.val)) |
+---------------------+-------------------------+
| 1970-01-01T00:00:00 | 5.0 |
| 1970-01-01T00:00:05 | 13.0 |
| 1970-01-01T00:00:10 | 21.0 |
| 1970-01-01T00:00:15 | 29.0 |
+---------------------+-------------------------+
+---------------------+--------------------------------------+
| ts | quantile(Float64(0.5),sum(test.val)) |
+---------------------+--------------------------------------+
| 1970-01-01T00:00:00 | 5.0 |
| 1970-01-01T00:00:05 | 13.0 |
| 1970-01-01T00:00:10 | 21.0 |
| 1970-01-01T00:00:15 | 29.0 |
+---------------------+--------------------------------------+
DROP TABLE test;

View File

@@ -18,62 +18,62 @@ Affected Rows: 4
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.01);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.45 | read |
| 1970-01-01T00:00:03 | 234.57 | write |
| 1970-01-01T00:00:04 | 345.68 | read |
| 1970-01-01T00:00:04 | 456.79 | write |
+---------------------+----------------------------+-------+
+---------------------+------------------------------------------+-------+
| ts | prom_round(greptime_value,Float64(0.01)) | job |
+---------------------+------------------------------------------+-------+
| 1970-01-01T00:00:03 | 123.45 | read |
| 1970-01-01T00:00:03 | 234.57 | write |
| 1970-01-01T00:00:04 | 345.68 | read |
| 1970-01-01T00:00:04 | 456.79 | write |
+---------------------+------------------------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 0.1);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.5 | read |
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
| 1970-01-01T00:00:04 | 456.8 | write |
+---------------------+----------------------------+-------+
+---------------------+-----------------------------------------+-------+
| ts | prom_round(greptime_value,Float64(0.1)) | job |
+---------------------+-----------------------------------------+-------+
| 1970-01-01T00:00:03 | 123.5 | read |
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
| 1970-01-01T00:00:04 | 456.8 | write |
+---------------------+-----------------------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 1.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
+---------------------+---------------------------------------+-------+
| ts | prom_round(greptime_value,Float64(1)) | job |
+---------------------+---------------------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+---------------------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+----------------------------+-------+
+---------------------+---------------------------------------+-------+
| ts | prom_round(greptime_value,Float64(0)) | job |
+---------------------+---------------------------------------+-------+
| 1970-01-01T00:00:03 | 123.0 | read |
| 1970-01-01T00:00:03 | 235.0 | write |
| 1970-01-01T00:00:04 | 346.0 | read |
| 1970-01-01T00:00:04 | 457.0 | write |
+---------------------+---------------------------------------+-------+
-- SQLNESS SORT_RESULT 3 1
tql eval (3, 4, '1s') round(cache_hit, 10.0);
+---------------------+----------------------------+-------+
| ts | prom_round(greptime_value) | job |
+---------------------+----------------------------+-------+
| 1970-01-01T00:00:03 | 120.0 | read |
| 1970-01-01T00:00:03 | 230.0 | write |
| 1970-01-01T00:00:04 | 350.0 | read |
| 1970-01-01T00:00:04 | 460.0 | write |
+---------------------+----------------------------+-------+
+---------------------+----------------------------------------+-------+
| ts | prom_round(greptime_value,Float64(10)) | job |
+---------------------+----------------------------------------+-------+
| 1970-01-01T00:00:03 | 120.0 | read |
| 1970-01-01T00:00:03 | 230.0 | write |
| 1970-01-01T00:00:04 | 350.0 | read |
| 1970-01-01T00:00:04 | 460.0 | write |
+---------------------+----------------------------------------+-------+
drop table cache_hit;

View File

@@ -228,27 +228,27 @@ tql eval (420, 420, '1s') histogram_quantile(0.833, histogram2_bucket);
tql eval (2820, 2820, '1s') histogram_quantile(0.166, rate(histogram2_bucket[15m]));
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:47:00 | 0.996 |
+---------------------+----------------------------+
+---------------------+------------------------------------------+
| ts | prom_rate(ts_range,val,ts,Int64(900000)) |
+---------------------+------------------------------------------+
| 1970-01-01T00:47:00 | 0.996 |
+---------------------+------------------------------------------+
tql eval (2820, 2820, '1s') histogram_quantile(0.5, rate(histogram2_bucket[15m]));
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:47:00 | 3.0 |
+---------------------+----------------------------+
+---------------------+------------------------------------------+
| ts | prom_rate(ts_range,val,ts,Int64(900000)) |
+---------------------+------------------------------------------+
| 1970-01-01T00:47:00 | 3.0 |
+---------------------+------------------------------------------+
tql eval (2820, 2820, '1s') histogram_quantile(0.833, rate(histogram2_bucket[15m]));
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:47:00 | 4.998 |
+---------------------+----------------------------+
+---------------------+------------------------------------------+
| ts | prom_rate(ts_range,val,ts,Int64(900000)) |
+---------------------+------------------------------------------+
| 1970-01-01T00:47:00 | 4.998 |
+---------------------+------------------------------------------+
drop table histogram2_bucket;
@@ -284,12 +284,12 @@ Affected Rows: 12
tql eval (3000, 3005, '3s') histogram_quantile(0.5, sum by(le, s) (rate(histogram3_bucket[5m])));
+---+---------------------+---------------------------------+
| s | ts | sum(prom_rate(ts_range,val,ts)) |
+---+---------------------+---------------------------------+
| a | 1970-01-01T00:50:00 | 0.55 |
| a | 1970-01-01T00:50:03 | 0.5500000000000002 |
+---+---------------------+---------------------------------+
+---+---------------------+-----------------------------------------------+
| s | ts | sum(prom_rate(ts_range,val,ts,Int64(300000))) |
+---+---------------------+-----------------------------------------------+
| a | 1970-01-01T00:50:00 | 0.55 |
| a | 1970-01-01T00:50:03 | 0.5500000000000002 |
+---+---------------------+-----------------------------------------------+
drop table histogram3_bucket;

View File

@@ -45,19 +45,19 @@ tql eval (359, 359, '1s') sum_over_time(metric_total[60s:10s]);
tql eval (10, 10, '1s') rate(metric_total[20s:10s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+----------------------------+
+---------------------+-----------------------------------------+
| ts | prom_rate(ts_range,val,ts,Int64(20000)) |
+---------------------+-----------------------------------------+
| 1970-01-01T00:00:10 | 0.1 |
+---------------------+-----------------------------------------+
tql eval (20, 20, '1s') rate(metric_total[20s:5s]);
+---------------------+----------------------------+
| ts | prom_rate(ts_range,val,ts) |
+---------------------+----------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+----------------------------+
+---------------------+-----------------------------------------+
| ts | prom_rate(ts_range,val,ts,Int64(20000)) |
+---------------------+-----------------------------------------+
| 1970-01-01T00:00:20 | 0.06666666666666667 |
+---------------------+-----------------------------------------+
drop table metric_total;

View File

@@ -137,8 +137,8 @@ TQL ANALYZE (0, 10, '5s') rate(test[10s]);
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: prom_rate(j_range,i,j)@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[j@1 as j, prom_rate(j_range@4, i@0, j@1) as prom_rate(j_range,i,j), k@2 as k, l@3 as l] REDACTED
|_|_|_FilterExec: prom_rate(j_range,i,j,Int64(10000))@1 IS NOT NULL REDACTED
|_|_|_ProjectionExec: expr=[j@1 as j, prom_rate(j_range@4, i@0, j@1, 10000) as prom_rate(j_range,i,j,Int64(10000)), k@2 as k, l@3 as l] REDACTED
|_|_|_PromRangeManipulateExec: req range=[0..10000], interval=[5000], eval range=[10000], time index=[j] REDACTED
|_|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [true] REDACTED
|_|_|_PromSeriesDivideExec: tags=["k", "l"] REDACTED