mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-16 18:22:55 +00:00
feat: introduce prom_round fn (#5604)
* feat: introduce `prom_round` fn * test: add sqlness tests
This commit is contained in:
@@ -32,5 +32,5 @@ pub mod types;
|
||||
pub mod value;
|
||||
pub mod vectors;
|
||||
|
||||
pub use arrow;
|
||||
pub use arrow::{self, compute};
|
||||
pub use error::{Error, Result};
|
||||
|
||||
@@ -21,6 +21,7 @@ mod idelta;
|
||||
mod predict_linear;
|
||||
mod quantile;
|
||||
mod resets;
|
||||
mod round;
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
||||
@@ -39,6 +40,7 @@ pub use idelta::IDelta;
|
||||
pub use predict_linear::PredictLinear;
|
||||
pub use quantile::QuantileOverTime;
|
||||
pub use resets::Resets;
|
||||
pub use round::Round;
|
||||
|
||||
pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
|
||||
if let ColumnarValue::Array(array) = columnar_value {
|
||||
|
||||
105
src/promql/src/functions/round.rs
Normal file
105
src/promql/src/functions/round.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use datafusion::error::DataFusionError;
|
||||
use datafusion_expr::{create_udf, ColumnarValue, ScalarUDF, Volatility};
|
||||
use datatypes::arrow::array::AsArray;
|
||||
use datatypes::arrow::datatypes::{DataType, Float64Type};
|
||||
use datatypes::compute;
|
||||
|
||||
use crate::functions::extract_array;
|
||||
|
||||
pub struct Round {
|
||||
nearest: f64,
|
||||
}
|
||||
|
||||
impl Round {
|
||||
fn new(nearest: f64) -> Self {
|
||||
Self { nearest }
|
||||
}
|
||||
|
||||
pub const fn name() -> &'static str {
|
||||
"prom_round"
|
||||
}
|
||||
|
||||
fn input_type() -> Vec<DataType> {
|
||||
vec![DataType::Float64]
|
||||
}
|
||||
|
||||
pub fn return_type() -> DataType {
|
||||
DataType::Float64
|
||||
}
|
||||
|
||||
pub fn scalar_udf(nearest: f64) -> ScalarUDF {
|
||||
create_udf(
|
||||
Self::name(),
|
||||
Self::input_type(),
|
||||
Self::return_type(),
|
||||
Volatility::Immutable,
|
||||
Arc::new(move |input: &_| Self::new(nearest).calc(input)) as _,
|
||||
)
|
||||
}
|
||||
|
||||
fn calc(&self, input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
|
||||
assert_eq!(input.len(), 1);
|
||||
|
||||
let value_array = extract_array(&input[0])?;
|
||||
|
||||
if self.nearest == 0.0 {
|
||||
let values = value_array.as_primitive::<Float64Type>();
|
||||
let result = compute::unary::<_, _, Float64Type>(values, |a| a.round());
|
||||
Ok(ColumnarValue::Array(Arc::new(result) as _))
|
||||
} else {
|
||||
let values = value_array.as_primitive::<Float64Type>();
|
||||
let nearest = self.nearest;
|
||||
let result =
|
||||
compute::unary::<_, _, Float64Type>(values, |a| ((a / nearest).round() * nearest));
|
||||
Ok(ColumnarValue::Array(Arc::new(result) as _))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datatypes::arrow::array::Float64Array;
|
||||
|
||||
use super::*;
|
||||
|
||||
fn test_round_f64(value: Vec<f64>, nearest: f64, expected: Vec<f64>) {
|
||||
let round_udf = Round::scalar_udf(nearest);
|
||||
let input = vec![ColumnarValue::Array(Arc::new(Float64Array::from(value)))];
|
||||
let result = round_udf.invoke_batch(&input, 1).unwrap();
|
||||
let result_array = extract_array(&result).unwrap();
|
||||
assert_eq!(result_array.len(), 1);
|
||||
assert_eq!(
|
||||
result_array.as_primitive::<Float64Type>().values(),
|
||||
&expected
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round() {
|
||||
test_round_f64(vec![123.456], 0.001, vec![123.456]);
|
||||
test_round_f64(vec![123.456], 0.01, vec![123.46000000000001]);
|
||||
test_round_f64(vec![123.456], 0.1, vec![123.5]);
|
||||
test_round_f64(vec![123.456], 0.0, vec![123.0]);
|
||||
test_round_f64(vec![123.456], 1.0, vec![123.0]);
|
||||
test_round_f64(vec![123.456], 10.0, vec![120.0]);
|
||||
test_round_f64(vec![123.456], 100.0, vec![100.0]);
|
||||
test_round_f64(vec![123.456], 105.0, vec![105.0]);
|
||||
test_round_f64(vec![123.456], 1000.0, vec![0.0]);
|
||||
}
|
||||
}
|
||||
@@ -52,7 +52,7 @@ use promql::extension_plan::{
|
||||
use promql::functions::{
|
||||
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
|
||||
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
|
||||
QuantileOverTime, Rate, Resets, StddevOverTime, StdvarOverTime, SumOverTime,
|
||||
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
|
||||
};
|
||||
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
|
||||
use promql_parser::parser::token::TokenType;
|
||||
@@ -1509,6 +1509,20 @@ impl PromPlanner {
|
||||
|
||||
ScalarFunc::GeneratedExpr
|
||||
}
|
||||
"round" => {
|
||||
let nearest = match other_input_exprs.pop_front() {
|
||||
Some(DfExpr::Literal(ScalarValue::Float64(Some(t)))) => t,
|
||||
Some(DfExpr::Literal(ScalarValue::Int64(Some(t)))) => t as f64,
|
||||
None => 0.0,
|
||||
other => UnexpectedPlanExprSnafu {
|
||||
desc: format!("expected f64 literal as t, but found {:?}", other),
|
||||
}
|
||||
.fail()?,
|
||||
};
|
||||
|
||||
ScalarFunc::DataFusionUdf(Arc::new(Round::scalar_udf(nearest)))
|
||||
}
|
||||
|
||||
_ => {
|
||||
if let Some(f) = session_state.scalar_functions().get(func.name) {
|
||||
ScalarFunc::DataFusionBuiltin(f.clone())
|
||||
|
||||
81
tests/cases/standalone/common/promql/round_fn.result
Normal file
81
tests/cases/standalone/common/promql/round_fn.result
Normal file
@@ -0,0 +1,81 @@
|
||||
create table cache_hit (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
greptime_value double,
|
||||
primary key (job)
|
||||
);
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
insert into cache_hit values
|
||||
(3000, "read", 123.45),
|
||||
(3000, "write", 234.567),
|
||||
(4000, "read", 345.678),
|
||||
(4000, "write", 456.789);
|
||||
|
||||
Affected Rows: 4
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.01);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.45 | read |
|
||||
| 1970-01-01T00:00:03 | 234.57 | write |
|
||||
| 1970-01-01T00:00:04 | 345.68 | read |
|
||||
| 1970-01-01T00:00:04 | 456.79 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.1);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.5 | read |
|
||||
| 1970-01-01T00:00:03 | 234.60000000000002 | write |
|
||||
| 1970-01-01T00:00:04 | 345.70000000000005 | read |
|
||||
| 1970-01-01T00:00:04 | 456.8 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 1.0);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.0 | read |
|
||||
| 1970-01-01T00:00:03 | 235.0 | write |
|
||||
| 1970-01-01T00:00:04 | 346.0 | read |
|
||||
| 1970-01-01T00:00:04 | 457.0 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 123.0 | read |
|
||||
| 1970-01-01T00:00:03 | 235.0 | write |
|
||||
| 1970-01-01T00:00:04 | 346.0 | read |
|
||||
| 1970-01-01T00:00:04 | 457.0 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 10.0);
|
||||
|
||||
+---------------------+----------------------------+-------+
|
||||
| ts | prom_round(greptime_value) | job |
|
||||
+---------------------+----------------------------+-------+
|
||||
| 1970-01-01T00:00:03 | 120.0 | read |
|
||||
| 1970-01-01T00:00:03 | 230.0 | write |
|
||||
| 1970-01-01T00:00:04 | 350.0 | read |
|
||||
| 1970-01-01T00:00:04 | 460.0 | write |
|
||||
+---------------------+----------------------------+-------+
|
||||
|
||||
drop table cache_hit;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
30
tests/cases/standalone/common/promql/round_fn.sql
Normal file
30
tests/cases/standalone/common/promql/round_fn.sql
Normal file
@@ -0,0 +1,30 @@
|
||||
|
||||
create table cache_hit (
|
||||
ts timestamp time index,
|
||||
job string,
|
||||
greptime_value double,
|
||||
primary key (job)
|
||||
);
|
||||
|
||||
insert into cache_hit values
|
||||
(3000, "read", 123.45),
|
||||
(3000, "write", 234.567),
|
||||
(4000, "read", 345.678),
|
||||
(4000, "write", 456.789);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.01);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 0.1);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 1.0);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit);
|
||||
|
||||
-- SQLNESS SORT_RESULT 3 1
|
||||
tql eval (3, 4, '1s') round(cache_hit, 10.0);
|
||||
|
||||
drop table cache_hit;
|
||||
Reference in New Issue
Block a user