feat: implelemt rate, increase and delta in PromQL (#1258)

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix increase fn

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl rate and delta

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix IS_RATE condition

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more tests about rate and delta

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ensure range_length is not zero

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-03-28 15:21:06 +08:00
committed by GitHub
parent 47179a7812
commit f491a040f5
9 changed files with 638 additions and 251 deletions

View File

@@ -1,29 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use promql_parser::parser::{Expr, Value};
use crate::engine::Context;
use crate::error::Result;
/// An evaluator evaluates given expressions over given fixed timestamps.
pub struct Evaluator {}
impl Evaluator {
pub fn eval(_ctx: &Context, _expr: &Expr) -> Result<Arc<dyn Value>> {
unimplemented!();
}
}

View File

@@ -1,15 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! PromQL functions

View File

@@ -94,6 +94,12 @@ pub enum Error {
#[snafu(backtrace)]
source: catalog::error::Error,
},
#[snafu(display("Expect a range selector, but not found"))]
ExpectRangeSelector { backtrace: Backtrace },
#[snafu(display("Zero range in range selector"))]
ZeroRangeSelector { backtrace: Backtrace },
}
impl ErrorExt for Error {
@@ -105,7 +111,9 @@ impl ErrorExt for Error {
| UnsupportedExpr { .. }
| UnexpectedToken { .. }
| MultipleVector { .. }
| ExpectExpr { .. } => StatusCode::InvalidArguments,
| ExpectExpr { .. }
| ExpectRangeSelector { .. }
| ZeroRangeSelector { .. } => StatusCode::InvalidArguments,
UnknownTable { .. }
| DataFusionPlanning { .. }

View File

@@ -42,6 +42,11 @@ use crate::range_array::RangeArray;
///
/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
/// other columns to the same length with the "folded" [RangeArray] column.
///
/// To pass runtime information to the execution plan (or the range function), This plan
/// will add those extra columns:
/// - timestamp range with type [RangeArray], which is the folded timestamp column.
/// - end of current range with the same type as the timestamp column. (todo)
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct RangeManipulate {
start: Millisecond,
@@ -79,14 +84,18 @@ impl RangeManipulate {
})
}
pub fn range_timestamp_name(&self) -> String {
Self::build_timestamp_range_name(&self.time_index)
}
pub fn build_timestamp_range_name(time_index: &str) -> String {
format!("{time_index}_range")
}
pub fn internal_range_end_col_name() -> String {
"__internal_range_end".to_string()
}
fn range_timestamp_name(&self) -> String {
Self::build_timestamp_range_name(&self.time_index)
}
fn calculate_output_schema(
input_schema: &DFSchemaRef,
time_index: &str,
@@ -96,10 +105,10 @@ impl RangeManipulate {
// process time index column
// the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
let Some(index) = input_schema.index_of_column_by_name(None, time_index)? else {
let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index)? else {
return Err(datafusion::common::field_not_found(None, time_index, input_schema.as_ref()))
};
let timestamp_range_field = columns[index]
let timestamp_range_field = columns[ts_col_index]
.field()
.clone()
.with_name(Self::build_timestamp_range_name(time_index));

View File

@@ -15,8 +15,8 @@
mod aggr_over_time;
mod changes;
mod deriv;
mod extrapolate_rate;
mod idelta;
mod increase;
mod resets;
#[cfg(test)]
mod test_util;
@@ -28,8 +28,8 @@ pub use aggr_over_time::{
use datafusion::arrow::array::ArrayRef;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
pub use extrapolate_rate::{Delta, Increase, Rate};
pub use idelta::IDelta;
pub use increase::Increase;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {

View File

@@ -0,0 +1,565 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file also contains some code from prometheus project.
// Copyright 2015 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Implementations of `rate`, `increase` and `delta` functions in PromQL.
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
use datafusion::arrow::datatypes::TimeUnit;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
use crate::extension_plan::Millisecond;
use crate::functions::extract_array;
use crate::range_array::RangeArray;
pub type Delta = ExtrapolatedRate<false, false>;
pub type Rate = ExtrapolatedRate<true, true>;
pub type Increase = ExtrapolatedRate<true, false>;
/// Part of the `extrapolatedRate` in Promql,
/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66
#[derive(Debug)]
pub struct ExtrapolatedRate<const IS_COUNTER: bool, const IS_RATE: bool> {
/// Range duration in millisecond
range_length: i64,
}
impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, IS_RATE> {
/// Constructor. Other public usage should use [`scalar_udf`] instead.
fn new(range_length: i64) -> Self {
Self { range_length }
}
fn input_type() -> Vec<DataType> {
vec![
// timestamp range vector
RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
// value range vector
RangeArray::convert_data_type(DataType::Float64),
// timestamp vector
DataType::Timestamp(TimeUnit::Millisecond, None),
]
}
fn return_type() -> DataType {
DataType::Float64
}
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
assert_eq!(input.len(), 3);
// construct matrix from input
let ts_array = extract_array(&input[0])?;
let ts_range = RangeArray::try_new(ts_array.data().clone().into())?;
let value_array = extract_array(&input[1])?;
let value_range = RangeArray::try_new(value_array.data().clone().into())?;
let ts = extract_array(&input[2])?;
let ts = ts
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
// calculation
let mut result_array = Vec::with_capacity(ts_range.len());
for index in 0..ts_range.len() {
let timestamps = ts_range.get(index).unwrap();
let timestamps = timestamps
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.values();
let end_ts = ts.value(index);
let values = value_range.get(index).unwrap();
let values = values
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
if values.len() < 2 {
result_array.push(None);
continue;
}
// refer to functions.go L83-L110
let mut result_value = values.last().unwrap() - values.first().unwrap();
if IS_COUNTER {
for window in values.windows(2) {
let prev = window[0];
let curr = window[1];
if curr < prev {
result_value += prev
}
}
}
let mut factor = Self::extrapolate_factor(
timestamps,
end_ts,
self.range_length,
*values.first().unwrap(),
result_value,
);
if IS_RATE {
// safety: range_length is checked to be non-zero in the planner.
factor /= self.range_length as f64 / 1000.0;
}
result_array.push(Some(result_value * factor));
}
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
Ok(result)
}
fn extrapolate_factor(
timestamps: &[Millisecond],
range_end: Millisecond,
range_length: Millisecond,
// the following two parameters are for counters.
// see functions.go L121 - L127
first_value: f64,
result_value: f64,
) -> f64 {
// result_value
// refer to functions.go extrapolatedRate fn
// assume offset is processed (and it should be processed in normalize plan)
let range_start = range_end - range_length;
let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0;
let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0;
let sampled_interval =
(timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0;
let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64;
// functions.go L122 - L134. quote:
// Counters cannot be negative. If we have any slope at
// all (i.e. resultValue went up), we can extrapolate
// the zero point of the counter. If the duration to the
// zero point is shorter than the durationToStart, we
// take the zero point as the start of the series,
// thereby avoiding extrapolation to negative counter
// values.
if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
let duration_to_zero = sampled_interval * (first_value / result_value);
if duration_to_zero < duration_to_start {
duration_to_start = duration_to_zero;
}
}
let extrapolation_threshold = average_duration_between_samples * 1.1;
let mut extrapolate_to_interval = sampled_interval;
if duration_to_start < extrapolation_threshold {
extrapolate_to_interval += duration_to_start;
} else {
extrapolate_to_interval += average_duration_between_samples / 2.0;
}
if duration_to_end < extrapolation_threshold {
extrapolate_to_interval += duration_to_end;
} else {
extrapolate_to_interval += average_duration_between_samples / 2.0;
}
extrapolate_to_interval / sampled_interval
}
}
// delta
impl ExtrapolatedRate<false, false> {
pub fn name() -> &'static str {
"prom_delta"
}
pub fn scalar_udf(range_length: i64) -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
TypeSignature::Exact(Self::input_type()),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(move |input| Self::new(range_length).calc(input)),
}
}
}
// rate
impl ExtrapolatedRate<true, true> {
pub fn name() -> &'static str {
"prom_rate"
}
pub fn scalar_udf(range_length: i64) -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
TypeSignature::Exact(Self::input_type()),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(move |input| Self::new(range_length).calc(input)),
}
}
}
// increase
impl ExtrapolatedRate<true, false> {
pub fn name() -> &'static str {
"prom_increase"
}
pub fn scalar_udf(range_length: i64) -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
TypeSignature::Exact(Self::input_type()),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(move |input| Self::new(range_length).calc(input)),
}
}
}
impl Display for ExtrapolatedRate<false, false> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PromQL Delta Function")
}
}
impl Display for ExtrapolatedRate<true, true> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PromQL Rate Function")
}
}
impl Display for ExtrapolatedRate<true, false> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PromQL Increase Function")
}
}
#[cfg(test)]
mod test {
use datafusion::arrow::array::ArrayRef;
use super::*;
/// Range length is fixed to 5
fn extrapolated_rate_runner<const IS_COUNTER: bool, const IS_RATE: bool>(
ts_range: RangeArray,
value_range: RangeArray,
timestamps: ArrayRef,
expected: Vec<f64>,
) {
let input = vec![
ColumnarValue::Array(Arc::new(ts_range.into_dict())),
ColumnarValue::Array(Arc::new(value_range.into_dict())),
ColumnarValue::Array(timestamps),
];
let output = extract_array(
&ExtrapolatedRate::<IS_COUNTER, IS_RATE>::new(5)
.calc(&input)
.unwrap(),
)
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values()
.to_vec();
assert_eq!(output, expected);
}
#[test]
fn increase_abnormal_input() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
Some(2),
Some(5),
Some(2),
Some(6),
Some(9),
None,
])) as _;
extrapolated_rate_runner::<true, false>(
ts_range,
value_range,
timestamps,
vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0],
);
}
#[test]
fn increase_normal_input() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<true, false>(
ts_range,
value_range,
timestamps,
// `2.0` is because that `duration_to_zero` less than `extrapolation_threshold`
vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
);
}
#[test]
fn increase_short_input() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 1),
(1, 0),
(2, 1),
(3, 0),
(4, 3),
(5, 1),
(6, 0),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
Some(1),
None,
Some(3),
None,
Some(7),
Some(6),
None,
Some(9),
])) as _;
extrapolated_rate_runner::<true, false>(
ts_range,
value_range,
timestamps,
vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5],
);
}
#[test]
fn increase_counter_reset() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
// this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<true, false>(
ts_range,
value_range,
timestamps,
// that two `2.0` is because `duration_to_start` are shrunk to to
// `duration_to_zero`, and causes `duration_to_zero` less than
// `extrapolation_threshold`.
vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5],
);
}
#[test]
fn rate_counter_reset() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
// this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<true, true>(
ts_range,
value_range,
timestamps,
vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0],
);
}
#[test]
fn rate_normal_input() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<true, true>(
ts_range,
value_range,
timestamps,
vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0],
);
}
#[test]
fn delta_counter_reset() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
// this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<false, false>(
ts_range,
value_range,
timestamps,
// delta doesn't handle counter reset, thus there is a negative value
vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5],
);
}
#[test]
fn delta_normal_input() {
let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
[1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
));
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
[2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
)) as _;
extrapolated_rate_runner::<false, false>(
ts_range,
value_range,
timestamps,
vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
);
}
}

View File

@@ -1,188 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::Float64Array;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::DataType;
use crate::functions::extract_array;
use crate::range_array::RangeArray;
/// Part of the `extrapolatedRate` in Promql,
/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66
#[derive(Debug)]
pub struct Increase {}
impl Increase {
pub fn name() -> &'static str {
"prom_increase"
}
fn input_type() -> DataType {
RangeArray::convert_data_type(DataType::Float64)
}
fn return_type() -> DataType {
DataType::Float64
}
fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input
assert_eq!(input.len(), 1);
let input_array = extract_array(input.first().unwrap())?;
let array_data = input_array.data().clone();
let range_array: RangeArray = RangeArray::try_new(array_data.into())?;
// calculation
let mut result_array = Vec::with_capacity(range_array.len());
for index in 0..range_array.len() {
let range = range_array.get(index).unwrap();
let range = range
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
if range.len() < 2 {
result_array.push(None);
continue;
}
// refer to functions.go L83-L110
let mut result_value = range.last().unwrap() - range.first().unwrap();
for window in range.windows(2) {
let prev = window[0];
let curr = window[1];
if curr < prev {
result_value += prev
}
}
result_array.push(Some(result_value));
}
let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
Ok(result)
}
pub fn scalar_udf() -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
TypeSignature::Exact(vec![Self::input_type()]),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(Self::calc),
}
}
}
impl Display for Increase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PromQL Increase Function")
}
}
#[cfg(test)]
mod test {
use super::*;
fn increase_runner(input: RangeArray, expected: Vec<f64>) {
let input = vec![ColumnarValue::Array(Arc::new(input.into_dict()))];
let output = extract_array(&Increase::calc(&input).unwrap())
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values()
.to_vec();
assert_eq!(output, expected);
}
#[test]
fn abnormal_input() {
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![1.0, 4.0, 0.0, 2.0, 0.0, 0.0]);
}
#[test]
fn normal_input() {
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]);
}
#[test]
fn short_input() {
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 1),
(1, 0),
(2, 1),
(3, 0),
(4, 3),
(5, 1),
(6, 0),
(7, 2),
];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]);
}
#[test]
fn counter_reset() {
// this series should be treated [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]);
}
}

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod engine;
pub mod error;
pub mod extension_plan;
pub mod functions;

View File

@@ -41,16 +41,17 @@ use snafu::{ensure, OptionExt, ResultExt};
use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, MultipleVectorSnafu, Result,
TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, ValueNotFoundSnafu,
CatalogSnafu, DataFusionPlanningSnafu, ExpectExprSnafu, ExpectRangeSelectorSnafu,
MultipleVectorSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, ValueNotFoundSnafu,
ZeroRangeSelectorSnafu,
};
use crate::extension_plan::{
EmptyMetric, InstantManipulate, Millisecond, RangeManipulate, SeriesDivide, SeriesNormalize,
};
use crate::functions::{
AbsentOverTime, AvgOverTime, CountOverTime, IDelta, Increase, LastOverTime, MaxOverTime,
MinOverTime, PresentOverTime, SumOverTime,
AbsentOverTime, AvgOverTime, CountOverTime, Delta, IDelta, Increase, LastOverTime, MaxOverTime,
MinOverTime, PresentOverTime, Rate, SumOverTime,
};
const LEFT_PLAN_JOIN_ALIAS: &str = "lhs";
@@ -74,6 +75,8 @@ struct PromPlannerContext {
time_index_column: Option<String>,
value_columns: Vec<String>,
tag_columns: Vec<String>,
/// The range in millisecond of range selector. None if there is no range selector.
range: Option<Millisecond>,
}
impl PromPlannerContext {
@@ -317,6 +320,11 @@ impl PromPlanner {
} = vector_selector;
let matchers = self.preprocess_label_matchers(matchers)?;
self.setup_context().await?;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
let range_ms = range.as_millis() as _;
self.ctx.range = Some(range_ms);
let normalize = self
.selector_to_series_normalize_plan(offset, matchers)
.await?;
@@ -325,7 +333,7 @@ impl PromPlanner {
self.ctx.end,
self.ctx.interval,
// TODO(ruihang): convert via Timestamp datatypes to support different time units
range.as_millis() as _,
range_ms,
self.ctx
.time_index_column
.clone()
@@ -668,7 +676,15 @@ impl PromPlanner {
// TODO(ruihang): set this according to in-param list
let value_column_pos = 0;
let scalar_func = match func.name {
"increase" => ScalarFunc::Udf(Increase::scalar_udf()),
"increase" => ScalarFunc::ExtrapolateUdf(Increase::scalar_udf(
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
)),
"rate" => ScalarFunc::ExtrapolateUdf(Rate::scalar_udf(
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
)),
"delta" => ScalarFunc::ExtrapolateUdf(Delta::scalar_udf(
self.ctx.range.context(ExpectRangeSelectorSnafu)?,
)),
"idelta" => ScalarFunc::Udf(IDelta::<false>::scalar_udf()),
"irate" => ScalarFunc::Udf(IDelta::<true>::scalar_udf()),
"avg_over_time" => ScalarFunc::Udf(AvgOverTime::scalar_udf()),
@@ -720,6 +736,25 @@ impl PromPlanner {
other_input_exprs.remove(value_column_pos + 1);
other_input_exprs.remove(value_column_pos);
}
ScalarFunc::ExtrapolateUdf(fun) => {
let ts_range_expr = DfExpr::Column(Column::from_name(
RangeManipulate::build_timestamp_range_name(
self.ctx.time_index_column.as_ref().unwrap(),
),
));
other_input_exprs.insert(value_column_pos, ts_range_expr);
other_input_exprs.insert(value_column_pos + 1, col_expr);
other_input_exprs
.insert(value_column_pos + 2, self.create_time_index_column_expr()?);
let fn_expr = DfExpr::ScalarUDF {
fun: Arc::new(fun),
args: other_input_exprs.clone(),
};
exprs.push(fn_expr);
other_input_exprs.remove(value_column_pos + 2);
other_input_exprs.remove(value_column_pos + 1);
other_input_exprs.remove(value_column_pos);
}
}
}
@@ -1032,6 +1067,9 @@ struct FunctionArgs {
enum ScalarFunc {
DataFusionBuiltin(BuiltinScalarFunction),
Udf(ScalarUDF),
// todo(ruihang): maybe merge with Udf later
/// UDF that require extra information like range length to be evaluated.
ExtrapolateUdf(ScalarUDF),
}
#[cfg(test)]
@@ -1600,8 +1638,8 @@ mod test {
async fn increase_aggr() {
let query = "increase(some_metric[5m])";
let expected = String::from(
"Filter: prom_increase(timestamp_range,field_0) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0) AS prom_increase(timestamp_range,field_0), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0):Float64;N, tag_0:Utf8]\
"Filter: prom_increase(timestamp_range,field_0,timestamp) IS NOT NULL [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\
\n Projection: some_metric.timestamp, prom_increase(timestamp_range, field_0, some_metric.timestamp) AS prom_increase(timestamp_range,field_0,timestamp), some_metric.tag_0 [timestamp:Timestamp(Millisecond, None), prom_increase(timestamp_range,field_0,timestamp):Float64;N, tag_0:Utf8]\
\n PromRangeManipulate: req range=[0..100000000], interval=[5000], eval range=[300000], time index=[timestamp], values=[\"field_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Dictionary(Int64, Float64);N, timestamp_range:Dictionary(Int64, Timestamp(Millisecond, None))]\
\n PromSeriesNormalize: offset=[0], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\