From 530ff53422f0f23a6e04d8be60e2fbf244120016 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Mon, 10 Mar 2025 14:41:40 +0800 Subject: [PATCH] feat(promql): supports quantile and count_values (#5652) * feat(promql): supports quantile * fix: merge_batch * chore: sqlness test * test: unit tests * feat: implements count_values * fix: typo * refactor: planner * chore: apply review suggestions --------- Co-authored-by: Yingwen --- Cargo.lock | 1 + src/promql/Cargo.toml | 1 + src/promql/src/functions.rs | 2 + src/promql/src/functions/quantile.rs | 2 +- src/promql/src/functions/quantile_aggr.rs | 297 ++++++++++++++++++ src/query/src/promql/planner.rs | 281 +++++++++++++---- .../common/promql/count_values.result | 67 ++++ .../standalone/common/promql/count_values.sql | 31 ++ .../standalone/common/promql/quantile.result | 71 +++++ .../standalone/common/promql/quantile.sql | 33 ++ 10 files changed, 719 insertions(+), 67 deletions(-) create mode 100644 src/promql/src/functions/quantile_aggr.rs create mode 100644 tests/cases/standalone/common/promql/count_values.result create mode 100644 tests/cases/standalone/common/promql/count_values.sql create mode 100644 tests/cases/standalone/common/promql/quantile.result create mode 100644 tests/cases/standalone/common/promql/quantile.sql diff --git a/Cargo.lock b/Cargo.lock index b3b7b3b058..8ababdf21d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8761,6 +8761,7 @@ dependencies = [ "common-recordbatch", "common-telemetry", "datafusion", + "datafusion-common", "datafusion-expr", "datatypes", "futures", diff --git a/src/promql/Cargo.toml b/src/promql/Cargo.toml index 0970b0d38e..f93cb8beb9 100644 --- a/src/promql/Cargo.toml +++ b/src/promql/Cargo.toml @@ -16,6 +16,7 @@ common-macro.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true datafusion.workspace = true +datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true futures.workspace = true diff --git a/src/promql/src/functions.rs b/src/promql/src/functions.rs index 4209a517c6..dade00ea7b 100644 --- a/src/promql/src/functions.rs +++ b/src/promql/src/functions.rs @@ -20,6 +20,7 @@ mod holt_winters; mod idelta; mod predict_linear; mod quantile; +mod quantile_aggr; mod resets; mod round; #[cfg(test)] @@ -39,6 +40,7 @@ pub use holt_winters::HoltWinters; pub use idelta::IDelta; pub use predict_linear::PredictLinear; pub use quantile::QuantileOverTime; +pub use quantile_aggr::quantile_udaf; pub use resets::Resets; pub use round::Round; diff --git a/src/promql/src/functions/quantile.rs b/src/promql/src/functions/quantile.rs index 19f346ee18..4900ab4453 100644 --- a/src/promql/src/functions/quantile.rs +++ b/src/promql/src/functions/quantile.rs @@ -125,7 +125,7 @@ impl QuantileOverTime { } /// Refer to -fn quantile_impl(values: &[f64], quantile: f64) -> Option { +pub(crate) fn quantile_impl(values: &[f64], quantile: f64) -> Option { if quantile.is_nan() || values.is_empty() { return Some(f64::NAN); } diff --git a/src/promql/src/functions/quantile_aggr.rs b/src/promql/src/functions/quantile_aggr.rs new file mode 100644 index 0000000000..e19cfeb21a --- /dev/null +++ b/src/promql/src/functions/quantile_aggr.rs @@ -0,0 +1,297 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use datafusion::arrow::array::{ArrayRef, AsArray}; +use datafusion::common::cast::{as_list_array, as_primitive_array, as_struct_array}; +use datafusion::error::Result as DfResult; +use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility}; +use datafusion::prelude::create_udaf; +use datafusion_common::ScalarValue; +use datatypes::arrow::array::{ListArray, StructArray}; +use datatypes::arrow::datatypes::{DataType, Field, Float64Type}; + +use crate::functions::quantile::quantile_impl; + +const QUANTILE_NAME: &str = "quantile"; + +const VALUES_FIELD_NAME: &str = "values"; +const DEFAULT_LIST_FIELD_NAME: &str = "item"; + +#[derive(Debug, Default)] +pub struct QuantileAccumulator { + q: f64, + values: Vec>, +} + +/// Create a quantile `AggregateUDF` for PromQL quantile operator, +/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions +pub fn quantile_udaf(q: f64) -> Arc { + Arc::new(create_udaf( + QUANTILE_NAME, + // Input type: (values) + vec![DataType::Float64], + // Output type: the φ-quantile + Arc::new(DataType::Float64), + Volatility::Immutable, + // Create the accumulator + Arc::new(move |_| Ok(Box::new(QuantileAccumulator::new(q)))), + // Intermediate state types + Arc::new(vec![DataType::Struct( + vec![Field::new( + VALUES_FIELD_NAME, + DataType::List(Arc::new(Field::new( + DEFAULT_LIST_FIELD_NAME, + DataType::Float64, + true, + ))), + false, + )] + .into(), + )]), + )) +} + +impl QuantileAccumulator { + pub fn new(q: f64) -> Self { + Self { + q, + ..Default::default() + } + } +} + +impl DfAccumulator for QuantileAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> { + let f64_array = values[0].as_primitive::(); + + self.values.extend(f64_array); + + Ok(()) + } + + fn evaluate(&mut self) -> DfResult { + let values: Vec<_> = self.values.iter().map(|v| v.unwrap_or(0.0)).collect(); + + let result = quantile_impl(&values, self.q); + + ScalarValue::new_primitive::(result, &DataType::Float64) + } + + fn size(&self) -> usize { + std::mem::size_of::() + self.values.capacity() * std::mem::size_of::>() + } + + fn state(&mut self) -> DfResult> { + let values_array = Arc::new(ListArray::from_iter_primitive::(vec![ + Some(self.values.clone()), + ])); + + let state_struct = StructArray::new( + vec![Field::new( + VALUES_FIELD_NAME, + DataType::List(Arc::new(Field::new( + DEFAULT_LIST_FIELD_NAME, + DataType::Float64, + true, + ))), + false, + )] + .into(), + vec![values_array], + None, + ); + + Ok(vec![ScalarValue::Struct(Arc::new(state_struct))]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> { + if states.is_empty() { + return Ok(()); + } + + for state in states { + let state = as_struct_array(state)?; + + for list in as_list_array(state.column(0))?.iter().flatten() { + let f64_array = as_primitive_array::(&list)?.clone(); + self.values.extend(&f64_array); + } + } + + Ok(()) + } +} +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::array::{ArrayRef, Float64Array}; + use datafusion_common::ScalarValue; + + use super::*; + + fn create_f64_array(values: Vec>) -> ArrayRef { + Arc::new(Float64Array::from(values)) as ArrayRef + } + + #[test] + fn test_quantile_accumulator_empty() { + let mut accumulator = QuantileAccumulator::new(0.5); + + let result = accumulator.evaluate().unwrap(); + + match result { + ScalarValue::Float64(_) => (), + _ => panic!("Expected Float64 scalar value"), + } + } + + #[test] + fn test_quantile_accumulator_single_value() { + let mut accumulator = QuantileAccumulator::new(0.5); + let input = create_f64_array(vec![Some(10.0)]); + + accumulator.update_batch(&[input]).unwrap(); + let result = accumulator.evaluate().unwrap(); + + assert_eq!(result, ScalarValue::Float64(Some(10.0))); + } + + #[test] + fn test_quantile_accumulator_multiple_values() { + let mut accumulator = QuantileAccumulator::new(0.5); + let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]); + + accumulator.update_batch(&[input]).unwrap(); + let result = accumulator.evaluate().unwrap(); + + assert_eq!(result, ScalarValue::Float64(Some(3.0))); + } + + #[test] + fn test_quantile_accumulator_with_nulls() { + let mut accumulator = QuantileAccumulator::new(0.5); + let input = create_f64_array(vec![Some(1.0), None, Some(3.0), Some(4.0), Some(5.0)]); + + accumulator.update_batch(&[input]).unwrap(); + + let result = accumulator.evaluate().unwrap(); + assert_eq!(result, ScalarValue::Float64(Some(3.0))); + } + + #[test] + fn test_quantile_accumulator_multiple_batches() { + let mut accumulator = QuantileAccumulator::new(0.5); + let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]); + let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]); + + accumulator.update_batch(&[input1]).unwrap(); + accumulator.update_batch(&[input2]).unwrap(); + + let result = accumulator.evaluate().unwrap(); + assert_eq!(result, ScalarValue::Float64(Some(3.0))); + } + + #[test] + fn test_quantile_accumulator_different_quantiles() { + let mut min_accumulator = QuantileAccumulator::new(0.0); + let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]); + min_accumulator.update_batch(&[input.clone()]).unwrap(); + assert_eq!( + min_accumulator.evaluate().unwrap(), + ScalarValue::Float64(Some(1.0)) + ); + + let mut q1_accumulator = QuantileAccumulator::new(0.25); + q1_accumulator.update_batch(&[input.clone()]).unwrap(); + assert_eq!( + q1_accumulator.evaluate().unwrap(), + ScalarValue::Float64(Some(2.0)) + ); + + let mut q3_accumulator = QuantileAccumulator::new(0.75); + q3_accumulator.update_batch(&[input.clone()]).unwrap(); + assert_eq!( + q3_accumulator.evaluate().unwrap(), + ScalarValue::Float64(Some(4.0)) + ); + + let mut max_accumulator = QuantileAccumulator::new(1.0); + max_accumulator.update_batch(&[input]).unwrap(); + assert_eq!( + max_accumulator.evaluate().unwrap(), + ScalarValue::Float64(Some(5.0)) + ); + } + + #[test] + fn test_quantile_accumulator_size() { + let mut accumulator = QuantileAccumulator::new(0.5); + let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0)]); + + let initial_size = accumulator.size(); + accumulator.update_batch(&[input]).unwrap(); + let after_update_size = accumulator.size(); + + assert!(after_update_size >= initial_size); + } + + #[test] + fn test_quantile_accumulator_state_and_merge() -> DfResult<()> { + let mut acc1 = QuantileAccumulator::new(0.5); + let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]); + acc1.update_batch(&[input1])?; + + let state1 = acc1.state()?; + + let mut acc2 = QuantileAccumulator::new(0.5); + let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]); + acc2.update_batch(&[input2])?; + + let mut struct_builders = vec![]; + for scalar in &state1 { + if let ScalarValue::Struct(struct_array) = scalar { + struct_builders.push(struct_array.clone() as ArrayRef); + } + } + + acc2.merge_batch(&struct_builders)?; + + let result = acc2.evaluate()?; + + assert_eq!(result, ScalarValue::Float64(Some(3.0))); + + Ok(()) + } + + #[test] + fn test_quantile_accumulator_with_extreme_values() { + let mut accumulator = QuantileAccumulator::new(0.5); + let input = create_f64_array(vec![Some(f64::MAX), Some(f64::MIN), Some(0.0)]); + + accumulator.update_batch(&[input]).unwrap(); + let _result = accumulator.evaluate().unwrap(); + } + + #[test] + fn test_quantile_udaf_creation() { + let q = 0.5; + let udaf = quantile_udaf(q); + + assert_eq!(udaf.name(), QUANTILE_NAME); + assert_eq!(udaf.return_type(&[]).unwrap(), DataType::Float64); + } +} diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 22477d5049..d86fd1c769 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -51,8 +51,8 @@ use promql::extension_plan::{ RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, }; use promql::functions::{ - AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta, - Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime, + quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, + IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime, QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime, }; use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; @@ -266,7 +266,10 @@ impl PromPlanner { aggr_expr: &AggregateExpr, ) -> Result { let AggregateExpr { - op, expr, modifier, .. + op, + expr, + modifier, + param, } = aggr_expr; let input = self.prom_expr_to_plan(expr, session_state).await?; @@ -277,19 +280,40 @@ impl PromPlanner { _ => { // calculate columns to group by // Need to append time index column into group by columns - let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; + let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; // convert op and value columns to aggregate exprs - let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; + let (aggr_exprs, prev_field_exprs) = + self.create_aggregate_exprs(*op, param, &input)?; // create plan - let group_sort_expr = group_exprs - .clone() - .into_iter() - .map(|expr| expr.sort(true, false)); - LogicalPlanBuilder::from(input) - .aggregate(group_exprs.clone(), aggr_exprs) - .context(DataFusionPlanningSnafu)? - .sort(group_sort_expr) + let builder = LogicalPlanBuilder::from(input); + let builder = if op.id() == token::T_COUNT_VALUES { + let label = Self::get_param_value_as_str(*op, param)?; + // `count_values` must be grouped by fields, + // and project the fields to the new label. + group_exprs.extend(prev_field_exprs.clone()); + let project_fields = self + .create_field_column_exprs()? + .into_iter() + .chain(self.create_tag_column_exprs()?) + .chain(Some(self.create_time_index_column_expr()?)) + .chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label))); + + builder + .aggregate(group_exprs.clone(), aggr_exprs) + .context(DataFusionPlanningSnafu)? + .project(project_fields) + .context(DataFusionPlanningSnafu)? + } else { + builder + .aggregate(group_exprs.clone(), aggr_exprs) + .context(DataFusionPlanningSnafu)? + }; + + let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false)); + + builder + .sort(sort_expr) .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu) @@ -312,18 +336,7 @@ impl PromPlanner { let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; - let param = param - .as_deref() - .with_context(|| FunctionInvalidArgumentSnafu { - fn_name: (*op).to_string(), - })?; - - let PromExpr::NumberLiteral(NumberLiteral { val }) = param else { - return FunctionInvalidArgumentSnafu { - fn_name: (*op).to_string(), - } - .fail(); - }; + let val = Self::get_param_value_as_f64(*op, param)?; // convert op and value columns to window exprs. let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?; @@ -341,7 +354,7 @@ impl PromPlanner { let predicate = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(col(rank)), op: Operator::LtEq, - right: Box::new(lit(*val)), + right: Box::new(lit(val)), }); match expr { @@ -1931,32 +1944,44 @@ impl PromPlanner { }) } - /// Create [DfExpr::AggregateFunction] expr for each value column with given aggregate function. + /// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function. /// - /// # Side effect + /// # Side Effects + /// + /// This method modifies the value columns in the context by replacing them with the new columns + /// created by the aggregate function application. + /// + /// # Returns + /// + /// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where: + /// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields + /// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty + /// only when the operation is `count_values`, as this operation requires preserving the original + /// values for grouping. /// - /// This method will update value columns in context to the new value columns created by - /// aggregate function. fn create_aggregate_exprs( &mut self, op: TokenType, + param: &Option>, input_plan: &LogicalPlan, - ) -> Result> { + ) -> Result<(Vec, Vec)> { let aggr = match op.id() { token::T_SUM => sum_udaf(), + token::T_QUANTILE => { + let q = Self::get_param_value_as_f64(op, param)?; + quantile_udaf(q) + } token::T_AVG => avg_udaf(), - token::T_COUNT => count_udaf(), + token::T_COUNT_VALUES | token::T_COUNT => count_udaf(), token::T_MIN => min_udaf(), token::T_MAX => max_udaf(), token::T_GROUP => grouping_udaf(), token::T_STDDEV => stddev_pop_udaf(), token::T_STDVAR => var_pop_udaf(), - token::T_TOPK | token::T_BOTTOMK | token::T_COUNT_VALUES | token::T_QUANTILE => { - UnsupportedExprSnafu { - name: format!("{op:?}"), - } - .fail()? + token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu { + name: format!("{op:?}"), } + .fail()?, _ => UnexpectedTokenSnafu { token: op }.fail()?, }; @@ -1966,19 +1991,41 @@ impl PromPlanner { .field_columns .iter() .map(|col| { - DfExpr::AggregateFunction(AggregateFunction { + Ok(DfExpr::AggregateFunction(AggregateFunction { func: aggr.clone(), args: vec![DfExpr::Column(Column::from_name(col))], distinct: false, filter: None, order_by: None, null_treatment: None, - }) + })) }) - .collect(); + .collect::>>()?; - // update value column name according to the aggregators + // if the aggregator is `count_values`, it must be grouped by current fields. + let prev_field_exprs = if op.id() == token::T_COUNT_VALUES { + let prev_field_exprs: Vec<_> = self + .ctx + .field_columns + .iter() + .map(|col| DfExpr::Column(Column::from_name(col))) + .collect(); + + ensure!( + self.ctx.field_columns.len() == 1, + UnsupportedExprSnafu { + name: "count_values on multi-value input" + } + ); + + prev_field_exprs + } else { + vec![] + }; + + // update value column name according to the aggregators, let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len()); + let normalized_exprs = normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?; for expr in normalized_exprs { @@ -1986,7 +2033,39 @@ impl PromPlanner { } self.ctx.field_columns = new_field_columns; - Ok(exprs) + Ok((exprs, prev_field_exprs)) + } + + fn get_param_value_as_str(op: TokenType, param: &Option>) -> Result<&str> { + let param = param + .as_deref() + .with_context(|| FunctionInvalidArgumentSnafu { + fn_name: op.to_string(), + })?; + let PromExpr::StringLiteral(StringLiteral { val }) = param else { + return FunctionInvalidArgumentSnafu { + fn_name: op.to_string(), + } + .fail(); + }; + + Ok(val) + } + + fn get_param_value_as_f64(op: TokenType, param: &Option>) -> Result { + let param = param + .as_deref() + .with_context(|| FunctionInvalidArgumentSnafu { + fn_name: op.to_string(), + })?; + let PromExpr::NumberLiteral(NumberLiteral { val }) = param else { + return FunctionInvalidArgumentSnafu { + fn_name: op.to_string(), + } + .fail(); + }; + + Ok(*val) } /// Create [DfExpr::WindowFunction] expr for each value column with given window function. @@ -3342,30 +3421,6 @@ mod test { do_aggregate_expr_plan("stdvar", "var_pop").await; } - #[tokio::test] - #[should_panic] - async fn aggregate_top_k() { - do_aggregate_expr_plan("topk", "").await; - } - - #[tokio::test] - #[should_panic] - async fn aggregate_bottom_k() { - do_aggregate_expr_plan("bottomk", "").await; - } - - #[tokio::test] - #[should_panic] - async fn aggregate_count_values() { - do_aggregate_expr_plan("count_values", "").await; - } - - #[tokio::test] - #[should_panic] - async fn aggregate_quantile() { - do_aggregate_expr_plan("quantile", "").await; - } - // TODO(ruihang): add range fn tests once exprs are ready. // { @@ -4248,4 +4303,98 @@ mod test { assert_eq!(plan.display_indent_schema().to_string(), expected); } + + #[tokio::test] + async fn test_count_values_expr() { + let mut eval_stmt = EvalStmt { + expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)"}) by (ip)"#; + + let prom_expr = parser::parse(case).unwrap(); + eval_stmt.expr = prom_expr; + let table_provider = build_test_table_provider_with_fields( + &[ + ( + DEFAULT_SCHEMA_NAME.to_string(), + "prometheus_tsdb_head_series".to_string(), + ), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "http_server_requests_seconds_count".to_string(), + ), + ], + &["ip"], + ) + .await; + + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); + let expected = r#"Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N] + Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N] + Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N] + Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64] + PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } + + #[tokio::test] + async fn test_quantile_expr() { + let mut eval_stmt = EvalStmt { + expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)"}) by (ip))"#; + + let prom_expr = parser::parse(case).unwrap(); + eval_stmt.expr = prom_expr; + let table_provider = build_test_table_provider_with_fields( + &[ + ( + DEFAULT_SCHEMA_NAME.to_string(), + "prometheus_tsdb_head_series".to_string(), + ), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "http_server_requests_seconds_count".to_string(), + ), + ], + &["ip"], + ) + .await; + + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); + let expected = r#"Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N] + Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N] + Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N] + Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N] + PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } } diff --git a/tests/cases/standalone/common/promql/count_values.result b/tests/cases/standalone/common/promql/count_values.result new file mode 100644 index 0000000000..e2f65eae34 --- /dev/null +++ b/tests/cases/standalone/common/promql/count_values.result @@ -0,0 +1,67 @@ +CREATE TABLE http_requests ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +INSERT INTO TABLE http_requests VALUES + (0, 'host1', "idc1", 200), + (0, 'host2', "idc1", 200), + (0, 'host3', "idc2", 200), + (0, 'host4', "idc2", 401), + (5000, 'host1', "idc1", 404), + (5000, 'host2', "idc1", 401), + (5000, 'host3', "idc2", 404), + (5000, 'host4', "idc2", 500), + (10000, 'host1', "idc1", 200), + (10000, 'host2', "idc1", 200), + (10000, 'host3', "idc2", 201), + (10000, 'host4', "idc2", 201), + (15000, 'host1', "idc1", 500), + (15000, 'host2', "idc1", 500), + (15000, 'host3', "idc2", 500), + (15000, 'host4', "idc2", 500); + +Affected Rows: 16 + +TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); + ++--------------------------+---------------------+-------------+ +| count(http_requests.val) | ts | status_code | ++--------------------------+---------------------+-------------+ +| 3 | 1970-01-01T00:00:00 | 200 | +| 1 | 1970-01-01T00:00:00 | 401 | +| 1 | 1970-01-01T00:00:05 | 401 | +| 2 | 1970-01-01T00:00:05 | 404 | +| 1 | 1970-01-01T00:00:05 | 500 | +| 2 | 1970-01-01T00:00:10 | 200 | +| 2 | 1970-01-01T00:00:10 | 201 | +| 4 | 1970-01-01T00:00:15 | 500 | ++--------------------------+---------------------+-------------+ + +TQL EVAL (0, 15, '5s') count_values("status_code", http_requests) by (idc); + ++--------------------------+------+---------------------+-------------+ +| count(http_requests.val) | idc | ts | status_code | ++--------------------------+------+---------------------+-------------+ +| 2 | idc1 | 1970-01-01T00:00:00 | 200 | +| 1 | idc1 | 1970-01-01T00:00:05 | 401 | +| 1 | idc1 | 1970-01-01T00:00:05 | 404 | +| 2 | idc1 | 1970-01-01T00:00:10 | 200 | +| 2 | idc1 | 1970-01-01T00:00:15 | 500 | +| 1 | idc2 | 1970-01-01T00:00:00 | 200 | +| 1 | idc2 | 1970-01-01T00:00:00 | 401 | +| 1 | idc2 | 1970-01-01T00:00:05 | 404 | +| 1 | idc2 | 1970-01-01T00:00:05 | 500 | +| 2 | idc2 | 1970-01-01T00:00:10 | 201 | +| 2 | idc2 | 1970-01-01T00:00:15 | 500 | ++--------------------------+------+---------------------+-------------+ + +DROP TABLE http_requests; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/count_values.sql b/tests/cases/standalone/common/promql/count_values.sql new file mode 100644 index 0000000000..09c2b136b8 --- /dev/null +++ b/tests/cases/standalone/common/promql/count_values.sql @@ -0,0 +1,31 @@ +CREATE TABLE http_requests ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +INSERT INTO TABLE http_requests VALUES + (0, 'host1', "idc1", 200), + (0, 'host2', "idc1", 200), + (0, 'host3', "idc2", 200), + (0, 'host4', "idc2", 401), + (5000, 'host1', "idc1", 404), + (5000, 'host2', "idc1", 401), + (5000, 'host3', "idc2", 404), + (5000, 'host4', "idc2", 500), + (10000, 'host1', "idc1", 200), + (10000, 'host2', "idc1", 200), + (10000, 'host3', "idc2", 201), + (10000, 'host4', "idc2", 201), + (15000, 'host1', "idc1", 500), + (15000, 'host2', "idc1", 500), + (15000, 'host3', "idc2", 500), + (15000, 'host4', "idc2", 500); + +TQL EVAL (0, 15, '5s') count_values("status_code", http_requests); + +TQL EVAL (0, 15, '5s') count_values("status_code", http_requests) by (idc); + +DROP TABLE http_requests; diff --git a/tests/cases/standalone/common/promql/quantile.result b/tests/cases/standalone/common/promql/quantile.result new file mode 100644 index 0000000000..8676bbbb77 --- /dev/null +++ b/tests/cases/standalone/common/promql/quantile.result @@ -0,0 +1,71 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +INSERT INTO TABLE test VALUES + (0, 'host1', "idc1", 1), + (0, 'host2', "idc1", 2), + (0, 'host3', "idc2", 3), + (0, 'host4', "idc2", 4), + (5000, 'host1', "idc1", 5), + (5000, 'host2', "idc1", 6), + (5000, 'host3', "idc2", 7), + (5000, 'host4', "idc2", 8), + (10000, 'host1', "idc1", 9), + (10000, 'host2', "idc1", 10), + (10000, 'host3', "idc2", 11), + (10000, 'host4', "idc2", 12), + (15000, 'host1', "idc1", 13), + (15000, 'host2', "idc1", 14), + (15000, 'host3', "idc2", 15), + (15000, 'host4', "idc2", 16); + +Affected Rows: 16 + +TQL EVAL (0, 15, '5s') quantile(0.5, test); + ++---------------------+--------------------+ +| ts | quantile(test.val) | ++---------------------+--------------------+ +| 1970-01-01T00:00:00 | 2.5 | +| 1970-01-01T00:00:05 | 6.5 | +| 1970-01-01T00:00:10 | 10.5 | +| 1970-01-01T00:00:15 | 14.5 | ++---------------------+--------------------+ + +TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc); + ++------+---------------------+--------------------+ +| idc | ts | quantile(test.val) | ++------+---------------------+--------------------+ +| idc1 | 1970-01-01T00:00:00 | 1.5 | +| idc1 | 1970-01-01T00:00:05 | 5.5 | +| idc1 | 1970-01-01T00:00:10 | 9.5 | +| idc1 | 1970-01-01T00:00:15 | 13.5 | +| idc2 | 1970-01-01T00:00:00 | 3.5 | +| idc2 | 1970-01-01T00:00:05 | 7.5 | +| idc2 | 1970-01-01T00:00:10 | 11.5 | +| idc2 | 1970-01-01T00:00:15 | 15.5 | ++------+---------------------+--------------------+ + +TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc)); + ++---------------------+-------------------------+ +| ts | quantile(sum(test.val)) | ++---------------------+-------------------------+ +| 1970-01-01T00:00:00 | 5.0 | +| 1970-01-01T00:00:05 | 13.0 | +| 1970-01-01T00:00:10 | 21.0 | +| 1970-01-01T00:00:15 | 29.0 | ++---------------------+-------------------------+ + +DROP TABLE test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/quantile.sql b/tests/cases/standalone/common/promql/quantile.sql new file mode 100644 index 0000000000..4bef39c757 --- /dev/null +++ b/tests/cases/standalone/common/promql/quantile.sql @@ -0,0 +1,33 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +INSERT INTO TABLE test VALUES + (0, 'host1', "idc1", 1), + (0, 'host2', "idc1", 2), + (0, 'host3', "idc2", 3), + (0, 'host4', "idc2", 4), + (5000, 'host1', "idc1", 5), + (5000, 'host2', "idc1", 6), + (5000, 'host3', "idc2", 7), + (5000, 'host4', "idc2", 8), + (10000, 'host1', "idc1", 9), + (10000, 'host2', "idc1", 10), + (10000, 'host3', "idc2", 11), + (10000, 'host4', "idc2", 12), + (15000, 'host1', "idc1", 13), + (15000, 'host2', "idc1", 14), + (15000, 'host3', "idc2", 15), + (15000, 'host4', "idc2", 16); + +TQL EVAL (0, 15, '5s') quantile(0.5, test); + +TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc); + +TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc)); + +DROP TABLE test;