diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index d95d500f7e..de6a7285d7 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use datafusion::error::DataFusionError; use promql_parser::parser::{Expr as PromExpr, TokenType}; #[derive(Debug, Snafu)] @@ -125,3 +126,20 @@ impl ErrorExt for Error { } pub type Result = std::result::Result; + +impl From for DataFusionError { + fn from(err: Error) -> Self { + DataFusionError::External(Box::new(err)) + } +} + +pub(crate) fn ensure( + predicate: bool, + error: DataFusionError, +) -> std::result::Result<(), DataFusionError> { + if predicate { + Ok(()) + } else { + Err(error) + } +} diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs new file mode 100644 index 0000000000..aaa6aa0258 --- /dev/null +++ b/src/promql/src/functions.rs @@ -0,0 +1,32 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod idelta; +mod increase; + +use datafusion::arrow::array::ArrayRef; +use datafusion::error::DataFusionError; +use datafusion::physical_plan::ColumnarValue; +pub use idelta::IDelta; +pub use increase::Increase; + +pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result { + if let ColumnarValue::Array(array) = columnar_value { + Ok(array.clone()) + } else { + Err(DataFusionError::Execution( + "expect array as input, found scalar value".to_string(), + )) + } +} diff --git a/src/promql/src/functions/idelta.rs b/src/promql/src/functions/idelta.rs new file mode 100644 index 0000000000..1a12cbff38 --- /dev/null +++ b/src/promql/src/functions/idelta.rs @@ -0,0 +1,231 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::sync::Arc; + +use datafusion::arrow::array::{Float64Array, Int64Array}; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::physical_plan::ColumnarValue; +use datatypes::arrow::array::{Array, PrimitiveArray}; +use datatypes::arrow::datatypes::DataType; + +use crate::error; +use crate::functions::extract_array; +use crate::range_array::RangeArray; + +/// The `funcIdelta` in Promql, +/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L235 +#[derive(Debug)] +pub struct IDelta {} + +impl IDelta { + pub const fn name() -> &'static str { + if IS_RATE { + "prom_irate" + } else { + "prom_idelta" + } + } + + pub fn scalar_udf() -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(Self::input_type()), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(Self::calc), + } + } + + // time index column and value column + fn input_type() -> Vec { + vec![ + RangeArray::convert_data_type(DataType::Int64), + RangeArray::convert_data_type(DataType::Float64), + ] + } + + fn return_type() -> DataType { + DataType::Float64 + } + + fn calc(input: &[ColumnarValue]) -> Result { + // construct matrix from input + assert_eq!(input.len(), 2); + let ts_array = extract_array(&input[0])?; + let value_array = extract_array(&input[1])?; + + let ts_range: RangeArray = RangeArray::try_new(ts_array.data().clone().into())?; + let value_range: RangeArray = RangeArray::try_new(value_array.data().clone().into())?; + error::ensure( + ts_range.len() == value_range.len(), + DataFusionError::Execution(format!( + "{}: input arrays should have the same length, found {} and {}", + Self::name(), + ts_range.len(), + value_range.len() + )), + )?; + error::ensure( + ts_range.value_type() == DataType::Int64, + DataFusionError::Execution(format!( + "{}: expect Int64 as time index array's type, found {}", + Self::name(), + ts_range.value_type() + )), + )?; + error::ensure( + value_range.value_type() == DataType::Float64, + DataFusionError::Execution(format!( + "{}: expect Int64 as time index array's type, found {}", + Self::name(), + value_range.value_type() + )), + )?; + + // calculation + let mut result_array = Vec::with_capacity(ts_range.len()); + + for index in 0..ts_range.len() { + let timestamps = ts_range.get(index).unwrap(); + let timestamps = timestamps + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + let values = value_range.get(index).unwrap(); + let values = values + .as_any() + .downcast_ref::() + .unwrap() + .values(); + error::ensure( + timestamps.len() == values.len(), + DataFusionError::Execution(format!( + "{}: input arrays should have the same length, found {} and {}", + Self::name(), + timestamps.len(), + values.len() + )), + )?; + + let len = timestamps.len(); + if len < 2 { + result_array.push(0.0); + continue; + } + + // if is delta + if !IS_RATE { + result_array.push(values[len - 1] - values[len - 2]); + continue; + } + + // else is rate + // TODO(ruihang): "divide 1000" converts the timestamp from millisecond to second. + // it should consider other percisions. + let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) / 1000; + let last_value = values[len - 1]; + let prev_value = values[len - 2]; + let result_value = if last_value < prev_value { + // counter reset + last_value + } else { + last_value - prev_value + }; + + result_array.push(result_value / sampled_interval as f64); + } + + let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array))); + Ok(result) + } +} + +impl Display for IDelta { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PromQL Idelta Function (is_rate: {IS_RATE})",) + } +} + +#[cfg(test)] +mod test { + + use super::*; + + fn idelta_runner(input_ts: RangeArray, input_value: RangeArray, expected: Vec) { + let input = vec![ + ColumnarValue::Array(Arc::new(input_ts.into_dict())), + ColumnarValue::Array(Arc::new(input_value.into_dict())), + ]; + let output = extract_array(&IDelta::::calc(&input).unwrap()) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert_eq!(output, expected); + } + + fn irate_runner(input_ts: RangeArray, input_value: RangeArray, expected: Vec) { + let input = vec![ + ColumnarValue::Array(Arc::new(input_ts.into_dict())), + ColumnarValue::Array(Arc::new(input_value.into_dict())), + ]; + let output = extract_array(&IDelta::::calc(&input).unwrap()) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert_eq!(output, expected); + } + + #[test] + fn basic_idelta_and_irate() { + let ts_array = Arc::new(Int64Array::from_iter([ + 1000, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000, + ])); + let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; + + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 5.0, 0.0, 6.0, 7.0, 8.0, 9.0, + ])); + let values_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; + + let ts_range_array = RangeArray::from_ranges(ts_array.clone(), ts_ranges).unwrap(); + let value_range_array = + RangeArray::from_ranges(values_array.clone(), values_ranges).unwrap(); + idelta_runner( + ts_range_array, + value_range_array, + vec![1.0, -5.0, 0.0, 6.0, 0.0, 0.0], + ); + + let ts_range_array = RangeArray::from_ranges(ts_array, ts_ranges).unwrap(); + let value_range_array = RangeArray::from_ranges(values_array, values_ranges).unwrap(); + irate_runner( + ts_range_array, + value_range_array, + vec![0.5, 0.0, 0.0, 3.0, 0.0, 0.0], + ); + } +} diff --git a/src/promql/src/functions/increase.rs b/src/promql/src/functions/increase.rs new file mode 100644 index 0000000000..1c3dede2ca --- /dev/null +++ b/src/promql/src/functions/increase.rs @@ -0,0 +1,188 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::sync::Arc; + +use datafusion::arrow::array::Float64Array; +use datafusion::common::DataFusionError; +use datafusion::logical_expr::{ScalarUDF, Signature, TypeSignature, Volatility}; +use datafusion::physical_plan::ColumnarValue; +use datatypes::arrow::array::{Array, PrimitiveArray}; +use datatypes::arrow::datatypes::DataType; + +use crate::functions::extract_array; +use crate::range_array::RangeArray; + +/// Part of the `extrapolatedRate` in Promql, +/// from https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L66 +#[derive(Debug)] +pub struct Increase {} + +impl Increase { + pub fn name() -> &'static str { + "prom_increase" + } + + fn input_type() -> DataType { + RangeArray::convert_data_type(DataType::Float64) + } + + fn return_type() -> DataType { + DataType::Float64 + } + + fn calc(input: &[ColumnarValue]) -> Result { + // construct matrix from input + assert_eq!(input.len(), 1); + let input_array = extract_array(input.first().unwrap())?; + let array_data = input_array.data().clone(); + let range_array: RangeArray = RangeArray::try_new(array_data.into())?; + + // calculation + let mut result_array = Vec::with_capacity(range_array.len()); + for index in 0..range_array.len() { + let range = range_array.get(index).unwrap(); + let range = range + .as_any() + .downcast_ref::() + .unwrap() + .values(); + + if range.len() < 2 { + result_array.push(0.0); + continue; + } + + // refer to functions.go L83-L110 + let mut result_value = range.last().unwrap() - range.first().unwrap(); + for window in range.windows(2) { + let prev = window[0]; + let curr = window[1]; + if curr < prev { + result_value += prev + } + } + + result_array.push(result_value); + } + + let result = ColumnarValue::Array(Arc::new(PrimitiveArray::from_iter(result_array))); + Ok(result) + } + + pub fn scalar_udf() -> ScalarUDF { + ScalarUDF { + name: Self::name().to_string(), + signature: Signature::new( + TypeSignature::Exact(vec![Self::input_type()]), + Volatility::Immutable, + ), + return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))), + fun: Arc::new(Self::calc), + } + } +} + +impl Display for Increase { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("PromQL Increase Function") + } +} + +#[cfg(test)] +mod test { + + use super::*; + + fn increase_runner(input: RangeArray, expected: Vec) { + let input = vec![ColumnarValue::Array(Arc::new(input.into_dict()))]; + let output = extract_array(&Increase::calc(&input).unwrap()) + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .values() + .to_vec(); + assert_eq!(output, expected); + } + + #[test] + fn abnormal_input() { + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)]; + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + increase_runner(range_array, vec![1.0, 4.0, 0.0, 2.0, 0.0, 0.0]); + } + + #[test] + fn normal_input() { + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]); + } + + #[test] + fn short_input() { + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + ])); + let ranges = [ + (0, 1), + (1, 0), + (2, 1), + (3, 0), + (4, 3), + (5, 1), + (6, 0), + (7, 2), + ]; + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + increase_runner(range_array, vec![0.0, 0.0, 0.0, 0.0, 2.0, 0.0, 0.0, 1.0]); + } + + #[test] + fn counter_reset() { + // this series should be treated [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] + let values_array = Arc::new(Float64Array::from_iter([ + 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0, + ])); + let ranges = [ + (0, 2), + (1, 2), + (2, 2), + (3, 2), + (4, 2), + (5, 2), + (6, 2), + (7, 2), + ]; + let range_array = RangeArray::from_ranges(values_array, ranges).unwrap(); + increase_runner(range_array, vec![1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]); + } +} diff --git a/src/promql/src/lib.rs b/src/promql/src/lib.rs index 40cf3bbf00..c4b6d796b1 100644 --- a/src/promql/src/lib.rs +++ b/src/promql/src/lib.rs @@ -15,5 +15,6 @@ pub mod engine; pub mod error; pub mod extension_plan; +pub mod functions; pub mod planner; pub mod range_array; diff --git a/src/promql/src/range_array.rs b/src/promql/src/range_array.rs index e5a4fe7d8f..a85a3fe647 100644 --- a/src/promql/src/range_array.rs +++ b/src/promql/src/range_array.rs @@ -71,6 +71,10 @@ impl RangeArray { DataType::Int64 } + pub fn value_type(&self) -> DataType { + self.array.value_type() + } + pub fn try_new(dict: DictionaryArray) -> Result { let ranges_iter = dict .keys() @@ -185,11 +189,16 @@ impl RangeArray { let value_type = Box::new(field.data_type().clone()); Field::new( field.name(), - DataType::Dictionary(Box::new(Self::key_type()), value_type), + Self::convert_data_type(*value_type), field.is_nullable(), ) } + /// Build datatype of wrappered [RangeArray] on given value type. + pub fn convert_data_type(value_type: DataType) -> DataType { + DataType::Dictionary(Box::new(Self::key_type()), Box::new(value_type)) + } + pub fn values(&self) -> &ArrayRef { self.array.values() }