feat: impl increase and irate/idelta in PromQL (#880)

* feat: impl increase and irate/idelta in PromQL

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add license header

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix styles

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add counter reset test case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-01-29 14:21:13 +08:00
committed by GitHub
parent 5e05c8f884
commit dc9b5339bf
6 changed files with 480 additions and 1 deletions

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_error::prelude::*;
use datafusion::error::DataFusionError;
use promql_parser::parser::{Expr as PromExpr, TokenType};
#[derive(Debug, Snafu)]
@@ -125,3 +126,20 @@ impl ErrorExt for Error {
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
fn from(err: Error) -> Self {
DataFusionError::External(Box::new(err))
}
}
pub(crate) fn ensure(
predicate: bool,
error: DataFusionError,
) -> std::result::Result<(), DataFusionError> {
if predicate {
Ok(())
} else {
Err(error)
}
}

View File

@@ -0,0 +1,32 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod idelta;
mod increase;
use datafusion::arrow::array::ArrayRef;
use datafusion::error::DataFusionError;
use datafusion::physical_plan::ColumnarValue;
pub use idelta::IDelta;
pub use increase::Increase;
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
if let ColumnarValue::Array(array) = columnar_value {
Ok(array.clone())
} else {
Err(DataFusionError::Execution(
"expect array as input, found scalar value".to_string(),
))
}
}

View File

@@ -0,0 +1,231 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::{Float64Array, Int64Array};
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::{Array, PrimitiveArray};
use datatypes::arrow::datatypes::DataType;
use crate::error;
use crate::functions::extract_array;
use crate::range_array::RangeArray;
/// The `funcIdelta` in Promql,
/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L235
#[derive(Debug)]
pub struct IDelta<const IS_RATE: bool> {}
impl<const IS_RATE: bool> IDelta<IS_RATE> {
pub const fn name() -> &'static str {
if IS_RATE {
"prom_irate"
} else {
"prom_idelta"
}
}
pub fn scalar_udf() -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
TypeSignature::Exact(Self::input_type()),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(Self::calc),
}
}
// time index column and value column
fn input_type() -> Vec<DataType> {
vec![
RangeArray::convert_data_type(DataType::Int64),
RangeArray::convert_data_type(DataType::Float64),
]
}
fn return_type() -> DataType {
DataType::Float64
}
fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input
assert_eq!(input.len(), 2);
let ts_array = extract_array(&input[0])?;
let value_array = extract_array(&input[1])?;
let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?;
let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?;
error::ensure(
ts_range.len() == value_range.len(),
DataFusionError::Execution(format!(
"{}: input arrays should have the same length, found {} and {}",
Self::name(),
ts_range.len(),
value_range.len()
)),
)?;
error::ensure(
ts_range.value_type() == DataType::Int64,
DataFusionError::Execution(format!(
"{}: expect Int64 as time index array's type, found {}",
Self::name(),
ts_range.value_type()
)),
)?;
error::ensure(
value_range.value_type() == DataType::Float64,
DataFusionError::Execution(format!(
"{}: expect Int64 as time index array's type, found {}",
Self::name(),
value_range.value_type()
)),
)?;
// calculation
let mut result_array = Vec::with_capacity(ts_range.len());
for index in 0..ts_range.len() {
let timestamps = ts_range.get(index).unwrap();
let timestamps = timestamps
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.values();
let values = value_range.get(index).unwrap();
let values = values
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
error::ensure(
timestamps.len() == values.len(),
DataFusionError::Execution(format!(
"{}: input arrays should have the same length, found {} and {}",
Self::name(),
timestamps.len(),
values.len()
)),
)?;
let len = timestamps.len();
if len < 2 {
result_array.push(0.0);
continue;
}
// if is delta
if !IS_RATE {
result_array.push(values[len - 1] - values[len - 2]);
continue;
}
// else is rate
// TODO(ruihang): "divide 1000" converts the timestamp from millisecond to second.
// it should consider other percisions.
let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) / 1000;
let last_value = values[len - 1];
let prev_value = values[len - 2];
let result_value = if last_value < prev_value {
// counter reset
last_value
} else {
last_value - prev_value
};
result_array.push(result_value / sampled_interval as f64);
}
let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array)));
Ok(result)
}
}
impl<const IS_RATE: bool> Display for IDelta<IS_RATE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PromQL Idelta Function (is_rate: {IS_RATE})",)
}
}
#[cfg(test)]
mod test {
use super::*;
fn idelta_runner(input_ts: RangeArray, input_value: RangeArray, expected: Vec<f64>) {
let input = vec![
ColumnarValue::Array(Arc::new(input_ts.into_dict())),
ColumnarValue::Array(Arc::new(input_value.into_dict())),
];
let output = extract_array(&IDelta::<false>::calc(&input).unwrap())
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values()
.to_vec();
assert_eq!(output, expected);
}
fn irate_runner(input_ts: RangeArray, input_value: RangeArray, expected: Vec<f64>) {
let input = vec![
ColumnarValue::Array(Arc::new(input_ts.into_dict())),
ColumnarValue::Array(Arc::new(input_value.into_dict())),
];
let output = extract_array(&IDelta::<true>::calc(&input).unwrap())
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values()
.to_vec();
assert_eq!(output, expected);
}
#[test]
fn basic_idelta_and_irate() {
let ts_array = Arc::new(Int64Array::from_iter([
1000, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000,
]));
let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 5.0, 0.0, 6.0, 7.0, 8.0, 9.0,
]));
let values_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let ts_range_array = RangeArray::from_ranges(ts_array.clone(), ts_ranges).unwrap();
let value_range_array =
RangeArray::from_ranges(values_array.clone(), values_ranges).unwrap();
idelta_runner(
ts_range_array,
value_range_array,
vec![1.0, -5.0, 0.0, 6.0, 0.0, 0.0],
);
let ts_range_array = RangeArray::from_ranges(ts_array, ts_ranges).unwrap();
let value_range_array = RangeArray::from_ranges(values_array, values_ranges).unwrap();
irate_runner(
ts_range_array,
value_range_array,
vec![0.5, 0.0, 0.0, 3.0, 0.0, 0.0],
);
}
}

View File

@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::sync::Arc;
use datafusion::arrow::array::Float64Array;
use datafusion::common::DataFusionError;
use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion::physical_plan::ColumnarValue;
use datatypes::arrow::array::{Array, PrimitiveArray};
use datatypes::arrow::datatypes::DataType;
use crate::functions::extract_array;
use crate::range_array::RangeArray;
/// Part of the `extrapolatedRate` in Promql,
/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66
#[derive(Debug)]
pub struct Increase {}
impl Increase {
pub fn name() -> &'static str {
"prom_increase"
}
fn input_type() -> DataType {
RangeArray::convert_data_type(DataType::Float64)
}
fn return_type() -> DataType {
DataType::Float64
}
fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
// construct matrix from input
assert_eq!(input.len(), 1);
let input_array = extract_array(input.first().unwrap())?;
let array_data = input_array.data().clone();
let range_array: RangeArray = RangeArray::try_new(array_data.into())?;
// calculation
let mut result_array = Vec::with_capacity(range_array.len());
for index in 0..range_array.len() {
let range = range_array.get(index).unwrap();
let range = range
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values();
if range.len() < 2 {
result_array.push(0.0);
continue;
}
// refer to functions.go L83-L110
let mut result_value = range.last().unwrap() - range.first().unwrap();
for window in range.windows(2) {
let prev = window[0];
let curr = window[1];
if curr < prev {
result_value += prev
}
}
result_array.push(result_value);
}
let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array)));
Ok(result)
}
pub fn scalar_udf() -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
TypeSignature::Exact(vec![Self::input_type()]),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(Self::calc),
}
}
}
impl Display for Increase {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("PromQL Increase Function")
}
}
#[cfg(test)]
mod test {
use super::*;
fn increase_runner(input: RangeArray, expected: Vec<f64>) {
let input = vec![ColumnarValue::Array(Arc::new(input.into_dict()))];
let output = extract_array(&Increase::calc(&input).unwrap())
.unwrap()
.as_any()
.downcast_ref::<Float64Array>()
.unwrap()
.values()
.to_vec();
assert_eq!(output, expected);
}
#[test]
fn abnormal_input() {
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![1.0, 4.0, 0.0, 2.0, 0.0, 0.0]);
}
#[test]
fn normal_input() {
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]);
}
#[test]
fn short_input() {
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
]));
let ranges = [
(0, 1),
(1, 0),
(2, 1),
(3, 0),
(4, 3),
(5, 1),
(6, 0),
(7, 2),
];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]);
}
#[test]
fn counter_reset() {
// this series should be treated [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
let values_array = Arc::new(Float64Array::from_iter([
1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
]));
let ranges = [
(0, 2),
(1, 2),
(2, 2),
(3, 2),
(4, 2),
(5, 2),
(6, 2),
(7, 2),
];
let range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]);
}
}

View File

@@ -15,5 +15,6 @@
pub mod engine;
pub mod error;
pub mod extension_plan;
pub mod functions;
pub mod planner;
pub mod range_array;

View File

@@ -71,6 +71,10 @@ impl RangeArray {
DataType::Int64
}
pub fn value_type(&self) -> DataType {
self.array.value_type()
}
pub fn try_new(dict: DictionaryArray<Int64Type>) -> Result<Self> {
let ranges_iter = dict
.keys()
@@ -185,11 +189,16 @@ impl RangeArray {
let value_type = Box::new(field.data_type().clone());
Field::new(
field.name(),
DataType::Dictionary(Box::new(Self::key_type()), value_type),
Self::convert_data_type(*value_type),
field.is_nullable(),
)
}
/// Build datatype of wrappered [RangeArray] on given value type.
pub fn convert_data_type(value_type: DataType) -> DataType {
DataType::Dictionary(Box::new(Self::key_type()), Box::new(value_type))
}
pub fn values(&self) -> &ArrayRef {
self.array.values()
}