feat(promql): supports quantile and count_values (#5652)

* feat(promql): supports quantile

* fix: merge_batch

* chore: sqlness test

* test: unit tests

* feat: implements count_values

* fix: typo

* refactor: planner

* chore: apply review suggestions

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2025-03-10 14:41:40 +08:00
committed by GitHub
parent 73ca39f37e
commit 530ff53422
10 changed files with 719 additions and 67 deletions

1
Cargo.lock generated
View File

@@ -8761,6 +8761,7 @@ dependencies = [
"common-recordbatch",
"common-telemetry",
"datafusion",
"datafusion-common",
"datafusion-expr",
"datatypes",
"futures",

View File

@@ -16,6 +16,7 @@ common-macro.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
futures.workspace = true

View File

@@ -20,6 +20,7 @@ mod holt_winters;
mod idelta;
mod predict_linear;
mod quantile;
mod quantile_aggr;
mod resets;
mod round;
#[cfg(test)]
@@ -39,6 +40,7 @@ pub use holt_winters::HoltWinters;
pub use idelta::IDelta;
pub use predict_linear::PredictLinear;
pub use quantile::QuantileOverTime;
pub use quantile_aggr::quantile_udaf;
pub use resets::Resets;
pub use round::Round;

View File

@@ -125,7 +125,7 @@ impl QuantileOverTime {
}
/// Refer to <https://github.com/prometheus/prometheus/blob/6e2905a4d4ff9b47b1f6d201333f5bd53633f921/promql/quantile.go#L357-L386>
fn quantile_impl(values: &[f64], quantile: f64) -> Option<f64> {
pub(crate) fn quantile_impl(values: &[f64], quantile: f64) -> Option<f64> {
if quantile.is_nan() || values.is_empty() {
return Some(f64::NAN);
}

View File

@@ -0,0 +1,297 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, AsArray};
use datafusion::common::cast::{as_list_array, as_primitive_array, as_struct_array};
use datafusion::error::Result as DfResult;
use datafusion::logical_expr::{Accumulator as DfAccumulator, AggregateUDF, Volatility};
use datafusion::prelude::create_udaf;
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ListArray, StructArray};
use datatypes::arrow::datatypes::{DataType, Field, Float64Type};
use crate::functions::quantile::quantile_impl;
const QUANTILE_NAME: &str = "quantile";
const VALUES_FIELD_NAME: &str = "values";
const DEFAULT_LIST_FIELD_NAME: &str = "item";
#[derive(Debug, Default)]
pub struct QuantileAccumulator {
q: f64,
values: Vec<Option<f64>>,
}
/// Create a quantile `AggregateUDF` for PromQL quantile operator,
/// which calculates φ-quantile (0 ≤ φ ≤ 1) over dimensions
pub fn quantile_udaf(q: f64) -> Arc<AggregateUDF> {
Arc::new(create_udaf(
QUANTILE_NAME,
// Input type: (values)
vec![DataType::Float64],
// Output type: the φ-quantile
Arc::new(DataType::Float64),
Volatility::Immutable,
// Create the accumulator
Arc::new(move |_| Ok(Box::new(QuantileAccumulator::new(q)))),
// Intermediate state types
Arc::new(vec![DataType::Struct(
vec![Field::new(
VALUES_FIELD_NAME,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
)]
.into(),
)]),
))
}
impl QuantileAccumulator {
pub fn new(q: f64) -> Self {
Self {
q,
..Default::default()
}
}
}
impl DfAccumulator for QuantileAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
let f64_array = values[0].as_primitive::<Float64Type>();
self.values.extend(f64_array);
Ok(())
}
fn evaluate(&mut self) -> DfResult<ScalarValue> {
let values: Vec<_> = self.values.iter().map(|v| v.unwrap_or(0.0)).collect();
let result = quantile_impl(&values, self.q);
ScalarValue::new_primitive::<Float64Type>(result, &DataType::Float64)
}
fn size(&self) -> usize {
std::mem::size_of::<Self>() + self.values.capacity() * std::mem::size_of::<Option<f64>>()
}
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
let values_array = Arc::new(ListArray::from_iter_primitive::<Float64Type, _, _>(vec![
Some(self.values.clone()),
]));
let state_struct = StructArray::new(
vec![Field::new(
VALUES_FIELD_NAME,
DataType::List(Arc::new(Field::new(
DEFAULT_LIST_FIELD_NAME,
DataType::Float64,
true,
))),
false,
)]
.into(),
vec![values_array],
None,
);
Ok(vec![ScalarValue::Struct(Arc::new(state_struct))])
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> {
if states.is_empty() {
return Ok(());
}
for state in states {
let state = as_struct_array(state)?;
for list in as_list_array(state.column(0))?.iter().flatten() {
let f64_array = as_primitive_array::<Float64Type>(&list)?.clone();
self.values.extend(&f64_array);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::arrow::array::{ArrayRef, Float64Array};
use datafusion_common::ScalarValue;
use super::*;
fn create_f64_array(values: Vec<Option<f64>>) -> ArrayRef {
Arc::new(Float64Array::from(values)) as ArrayRef
}
#[test]
fn test_quantile_accumulator_empty() {
let mut accumulator = QuantileAccumulator::new(0.5);
let result = accumulator.evaluate().unwrap();
match result {
ScalarValue::Float64(_) => (),
_ => panic!("Expected Float64 scalar value"),
}
}
#[test]
fn test_quantile_accumulator_single_value() {
let mut accumulator = QuantileAccumulator::new(0.5);
let input = create_f64_array(vec![Some(10.0)]);
accumulator.update_batch(&[input]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(10.0)));
}
#[test]
fn test_quantile_accumulator_multiple_values() {
let mut accumulator = QuantileAccumulator::new(0.5);
let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
accumulator.update_batch(&[input]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
}
#[test]
fn test_quantile_accumulator_with_nulls() {
let mut accumulator = QuantileAccumulator::new(0.5);
let input = create_f64_array(vec![Some(1.0), None, Some(3.0), Some(4.0), Some(5.0)]);
accumulator.update_batch(&[input]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
}
#[test]
fn test_quantile_accumulator_multiple_batches() {
let mut accumulator = QuantileAccumulator::new(0.5);
let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]);
let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]);
accumulator.update_batch(&[input1]).unwrap();
accumulator.update_batch(&[input2]).unwrap();
let result = accumulator.evaluate().unwrap();
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
}
#[test]
fn test_quantile_accumulator_different_quantiles() {
let mut min_accumulator = QuantileAccumulator::new(0.0);
let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
min_accumulator.update_batch(&[input.clone()]).unwrap();
assert_eq!(
min_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(1.0))
);
let mut q1_accumulator = QuantileAccumulator::new(0.25);
q1_accumulator.update_batch(&[input.clone()]).unwrap();
assert_eq!(
q1_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(2.0))
);
let mut q3_accumulator = QuantileAccumulator::new(0.75);
q3_accumulator.update_batch(&[input.clone()]).unwrap();
assert_eq!(
q3_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(4.0))
);
let mut max_accumulator = QuantileAccumulator::new(1.0);
max_accumulator.update_batch(&[input]).unwrap();
assert_eq!(
max_accumulator.evaluate().unwrap(),
ScalarValue::Float64(Some(5.0))
);
}
#[test]
fn test_quantile_accumulator_size() {
let mut accumulator = QuantileAccumulator::new(0.5);
let input = create_f64_array(vec![Some(1.0), Some(2.0), Some(3.0)]);
let initial_size = accumulator.size();
accumulator.update_batch(&[input]).unwrap();
let after_update_size = accumulator.size();
assert!(after_update_size >= initial_size);
}
#[test]
fn test_quantile_accumulator_state_and_merge() -> DfResult<()> {
let mut acc1 = QuantileAccumulator::new(0.5);
let input1 = create_f64_array(vec![Some(1.0), Some(2.0)]);
acc1.update_batch(&[input1])?;
let state1 = acc1.state()?;
let mut acc2 = QuantileAccumulator::new(0.5);
let input2 = create_f64_array(vec![Some(3.0), Some(4.0), Some(5.0)]);
acc2.update_batch(&[input2])?;
let mut struct_builders = vec![];
for scalar in &state1 {
if let ScalarValue::Struct(struct_array) = scalar {
struct_builders.push(struct_array.clone() as ArrayRef);
}
}
acc2.merge_batch(&struct_builders)?;
let result = acc2.evaluate()?;
assert_eq!(result, ScalarValue::Float64(Some(3.0)));
Ok(())
}
#[test]
fn test_quantile_accumulator_with_extreme_values() {
let mut accumulator = QuantileAccumulator::new(0.5);
let input = create_f64_array(vec![Some(f64::MAX), Some(f64::MIN), Some(0.0)]);
accumulator.update_batch(&[input]).unwrap();
let _result = accumulator.evaluate().unwrap();
}
#[test]
fn test_quantile_udaf_creation() {
let q = 0.5;
let udaf = quantile_udaf(q);
assert_eq!(udaf.name(), QUANTILE_NAME);
assert_eq!(udaf.return_type(&[]).unwrap(), DataType::Float64);
}
}

View File

@@ -51,8 +51,8 @@ use promql::extension_plan::{
RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn,
};
use promql::functions::{
AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters, IDelta,
Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
quantile_udaf, AbsentOverTime, AvgOverTime, Changes, CountOverTime, Delta, Deriv, HoltWinters,
IDelta, Increase, LastOverTime, MaxOverTime, MinOverTime, PredictLinear, PresentOverTime,
QuantileOverTime, Rate, Resets, Round, StddevOverTime, StdvarOverTime, SumOverTime,
};
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
@@ -266,7 +266,10 @@ impl PromPlanner {
aggr_expr: &AggregateExpr,
) -> Result<LogicalPlan> {
let AggregateExpr {
op, expr, modifier, ..
op,
expr,
modifier,
param,
} = aggr_expr;
let input = self.prom_expr_to_plan(expr, session_state).await?;
@@ -277,19 +280,40 @@ impl PromPlanner {
_ => {
// calculate columns to group by
// Need to append time index column into group by columns
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
let mut group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
// convert op and value columns to aggregate exprs
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
let (aggr_exprs, prev_field_exprs) =
self.create_aggregate_exprs(*op, param, &input)?;
// create plan
let group_sort_expr = group_exprs
.clone()
.into_iter()
.map(|expr| expr.sort(true, false));
LogicalPlanBuilder::from(input)
.aggregate(group_exprs.clone(), aggr_exprs)
.context(DataFusionPlanningSnafu)?
.sort(group_sort_expr)
let builder = LogicalPlanBuilder::from(input);
let builder = if op.id() == token::T_COUNT_VALUES {
let label = Self::get_param_value_as_str(*op, param)?;
// `count_values` must be grouped by fields,
// and project the fields to the new label.
group_exprs.extend(prev_field_exprs.clone());
let project_fields = self
.create_field_column_exprs()?
.into_iter()
.chain(self.create_tag_column_exprs()?)
.chain(Some(self.create_time_index_column_expr()?))
.chain(prev_field_exprs.into_iter().map(|expr| expr.alias(label)));
builder
.aggregate(group_exprs.clone(), aggr_exprs)
.context(DataFusionPlanningSnafu)?
.project(project_fields)
.context(DataFusionPlanningSnafu)?
} else {
builder
.aggregate(group_exprs.clone(), aggr_exprs)
.context(DataFusionPlanningSnafu)?
};
let sort_expr = group_exprs.into_iter().map(|expr| expr.sort(true, false));
builder
.sort(sort_expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
@@ -312,18 +336,7 @@ impl PromPlanner {
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
let param = param
.as_deref()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: (*op).to_string(),
})?;
let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
return FunctionInvalidArgumentSnafu {
fn_name: (*op).to_string(),
}
.fail();
};
let val = Self::get_param_value_as_f64(*op, param)?;
// convert op and value columns to window exprs.
let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
@@ -341,7 +354,7 @@ impl PromPlanner {
let predicate = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(col(rank)),
op: Operator::LtEq,
right: Box::new(lit(*val)),
right: Box::new(lit(val)),
});
match expr {
@@ -1931,32 +1944,44 @@ impl PromPlanner {
})
}
/// Create [DfExpr::AggregateFunction] expr for each value column with given aggregate function.
/// Creates a set of DataFusion `DfExpr::AggregateFunction` expressions for each value column using the specified aggregate function.
///
/// # Side effect
/// # Side Effects
///
/// This method modifies the value columns in the context by replacing them with the new columns
/// created by the aggregate function application.
///
/// # Returns
///
/// Returns a tuple of `(aggregate_expressions, previous_field_expressions)` where:
/// - `aggregate_expressions`: Expressions that apply the aggregate function to the original fields
/// - `previous_field_expressions`: Original field expressions before aggregation. This is non-empty
/// only when the operation is `count_values`, as this operation requires preserving the original
/// values for grouping.
///
/// This method will update value columns in context to the new value columns created by
/// aggregate function.
fn create_aggregate_exprs(
&mut self,
op: TokenType,
param: &Option<Box<PromExpr>>,
input_plan: &LogicalPlan,
) -> Result<Vec<DfExpr>> {
) -> Result<(Vec<DfExpr>, Vec<DfExpr>)> {
let aggr = match op.id() {
token::T_SUM => sum_udaf(),
token::T_QUANTILE => {
let q = Self::get_param_value_as_f64(op, param)?;
quantile_udaf(q)
}
token::T_AVG => avg_udaf(),
token::T_COUNT => count_udaf(),
token::T_COUNT_VALUES | token::T_COUNT => count_udaf(),
token::T_MIN => min_udaf(),
token::T_MAX => max_udaf(),
token::T_GROUP => grouping_udaf(),
token::T_STDDEV => stddev_pop_udaf(),
token::T_STDVAR => var_pop_udaf(),
token::T_TOPK | token::T_BOTTOMK | token::T_COUNT_VALUES | token::T_QUANTILE => {
UnsupportedExprSnafu {
name: format!("{op:?}"),
}
.fail()?
token::T_TOPK | token::T_BOTTOMK => UnsupportedExprSnafu {
name: format!("{op:?}"),
}
.fail()?,
_ => UnexpectedTokenSnafu { token: op }.fail()?,
};
@@ -1966,19 +1991,41 @@ impl PromPlanner {
.field_columns
.iter()
.map(|col| {
DfExpr::AggregateFunction(AggregateFunction {
Ok(DfExpr::AggregateFunction(AggregateFunction {
func: aggr.clone(),
args: vec![DfExpr::Column(Column::from_name(col))],
distinct: false,
filter: None,
order_by: None,
null_treatment: None,
})
}))
})
.collect();
.collect::<Result<Vec<_>>>()?;
// update value column name according to the aggregators
// if the aggregator is `count_values`, it must be grouped by current fields.
let prev_field_exprs = if op.id() == token::T_COUNT_VALUES {
let prev_field_exprs: Vec<_> = self
.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::from_name(col)))
.collect();
ensure!(
self.ctx.field_columns.len() == 1,
UnsupportedExprSnafu {
name: "count_values on multi-value input"
}
);
prev_field_exprs
} else {
vec![]
};
// update value column name according to the aggregators,
let mut new_field_columns = Vec::with_capacity(self.ctx.field_columns.len());
let normalized_exprs =
normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
for expr in normalized_exprs {
@@ -1986,7 +2033,39 @@ impl PromPlanner {
}
self.ctx.field_columns = new_field_columns;
Ok(exprs)
Ok((exprs, prev_field_exprs))
}
fn get_param_value_as_str(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<&str> {
let param = param
.as_deref()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
})?;
let PromExpr::StringLiteral(StringLiteral { val }) = param else {
return FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
}
.fail();
};
Ok(val)
}
fn get_param_value_as_f64(op: TokenType, param: &Option<Box<PromExpr>>) -> Result<f64> {
let param = param
.as_deref()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
})?;
let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
return FunctionInvalidArgumentSnafu {
fn_name: op.to_string(),
}
.fail();
};
Ok(*val)
}
/// Create [DfExpr::WindowFunction] expr for each value column with given window function.
@@ -3342,30 +3421,6 @@ mod test {
do_aggregate_expr_plan("stdvar", "var_pop").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_top_k() {
do_aggregate_expr_plan("topk", "").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_bottom_k() {
do_aggregate_expr_plan("bottomk", "").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_count_values() {
do_aggregate_expr_plan("count_values", "").await;
}
#[tokio::test]
#[should_panic]
async fn aggregate_quantile() {
do_aggregate_expr_plan("quantile", "").await;
}
// TODO(ruihang): add range fn tests once exprs are ready.
// {
@@ -4248,4 +4303,98 @@ mod test {
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_count_values_expr() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"count_values('series', prometheus_tsdb_head_series{ip=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)"}) by (ip)"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"prometheus_tsdb_head_series".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["ip"],
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, series [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N]
Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, prometheus_tsdb_head_series.greptime_value ASC NULLS LAST [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
Projection: count(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value AS series, prometheus_tsdb_head_series.greptime_value [count(prometheus_tsdb_head_series.greptime_value):Int64, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), series:Float64;N, greptime_value:Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp, prometheus_tsdb_head_series.greptime_value]], aggr=[[count(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N, count(prometheus_tsdb_head_series.greptime_value):Int64]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_quantile_expr() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"quantile(0.3, sum(prometheus_tsdb_head_series{ip=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)"}) by (ip))"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"prometheus_tsdb_head_series".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["ip"],
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[quantile(sum(prometheus_tsdb_head_series.greptime_value))]] [greptime_timestamp:Timestamp(Millisecond, None), quantile(sum(prometheus_tsdb_head_series.greptime_value)):Float64;N]
Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
}

View File

@@ -0,0 +1,67 @@
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
INSERT INTO TABLE http_requests VALUES
(0, 'host1', "idc1", 200),
(0, 'host2', "idc1", 200),
(0, 'host3', "idc2", 200),
(0, 'host4', "idc2", 401),
(5000, 'host1', "idc1", 404),
(5000, 'host2', "idc1", 401),
(5000, 'host3', "idc2", 404),
(5000, 'host4', "idc2", 500),
(10000, 'host1', "idc1", 200),
(10000, 'host2', "idc1", 200),
(10000, 'host3', "idc2", 201),
(10000, 'host4', "idc2", 201),
(15000, 'host1', "idc1", 500),
(15000, 'host2', "idc1", 500),
(15000, 'host3', "idc2", 500),
(15000, 'host4', "idc2", 500);
Affected Rows: 16
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
+--------------------------+---------------------+-------------+
| count(http_requests.val) | ts | status_code |
+--------------------------+---------------------+-------------+
| 3 | 1970-01-01T00:00:00 | 200 |
| 1 | 1970-01-01T00:00:00 | 401 |
| 1 | 1970-01-01T00:00:05 | 401 |
| 2 | 1970-01-01T00:00:05 | 404 |
| 1 | 1970-01-01T00:00:05 | 500 |
| 2 | 1970-01-01T00:00:10 | 200 |
| 2 | 1970-01-01T00:00:10 | 201 |
| 4 | 1970-01-01T00:00:15 | 500 |
+--------------------------+---------------------+-------------+
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests) by (idc);
+--------------------------+------+---------------------+-------------+
| count(http_requests.val) | idc | ts | status_code |
+--------------------------+------+---------------------+-------------+
| 2 | idc1 | 1970-01-01T00:00:00 | 200 |
| 1 | idc1 | 1970-01-01T00:00:05 | 401 |
| 1 | idc1 | 1970-01-01T00:00:05 | 404 |
| 2 | idc1 | 1970-01-01T00:00:10 | 200 |
| 2 | idc1 | 1970-01-01T00:00:15 | 500 |
| 1 | idc2 | 1970-01-01T00:00:00 | 200 |
| 1 | idc2 | 1970-01-01T00:00:00 | 401 |
| 1 | idc2 | 1970-01-01T00:00:05 | 404 |
| 1 | idc2 | 1970-01-01T00:00:05 | 500 |
| 2 | idc2 | 1970-01-01T00:00:10 | 201 |
| 2 | idc2 | 1970-01-01T00:00:15 | 500 |
+--------------------------+------+---------------------+-------------+
DROP TABLE http_requests;
Affected Rows: 0

View File

@@ -0,0 +1,31 @@
CREATE TABLE http_requests (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
INSERT INTO TABLE http_requests VALUES
(0, 'host1', "idc1", 200),
(0, 'host2', "idc1", 200),
(0, 'host3', "idc2", 200),
(0, 'host4', "idc2", 401),
(5000, 'host1', "idc1", 404),
(5000, 'host2', "idc1", 401),
(5000, 'host3', "idc2", 404),
(5000, 'host4', "idc2", 500),
(10000, 'host1', "idc1", 200),
(10000, 'host2', "idc1", 200),
(10000, 'host3', "idc2", 201),
(10000, 'host4', "idc2", 201),
(15000, 'host1', "idc1", 500),
(15000, 'host2', "idc1", 500),
(15000, 'host3', "idc2", 500),
(15000, 'host4', "idc2", 500);
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests);
TQL EVAL (0, 15, '5s') count_values("status_code", http_requests) by (idc);
DROP TABLE http_requests;

View File

@@ -0,0 +1,71 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
INSERT INTO TABLE test VALUES
(0, 'host1', "idc1", 1),
(0, 'host2', "idc1", 2),
(0, 'host3', "idc2", 3),
(0, 'host4', "idc2", 4),
(5000, 'host1', "idc1", 5),
(5000, 'host2', "idc1", 6),
(5000, 'host3', "idc2", 7),
(5000, 'host4', "idc2", 8),
(10000, 'host1', "idc1", 9),
(10000, 'host2', "idc1", 10),
(10000, 'host3', "idc2", 11),
(10000, 'host4', "idc2", 12),
(15000, 'host1', "idc1", 13),
(15000, 'host2', "idc1", 14),
(15000, 'host3', "idc2", 15),
(15000, 'host4', "idc2", 16);
Affected Rows: 16
TQL EVAL (0, 15, '5s') quantile(0.5, test);
+---------------------+--------------------+
| ts | quantile(test.val) |
+---------------------+--------------------+
| 1970-01-01T00:00:00 | 2.5 |
| 1970-01-01T00:00:05 | 6.5 |
| 1970-01-01T00:00:10 | 10.5 |
| 1970-01-01T00:00:15 | 14.5 |
+---------------------+--------------------+
TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc);
+------+---------------------+--------------------+
| idc | ts | quantile(test.val) |
+------+---------------------+--------------------+
| idc1 | 1970-01-01T00:00:00 | 1.5 |
| idc1 | 1970-01-01T00:00:05 | 5.5 |
| idc1 | 1970-01-01T00:00:10 | 9.5 |
| idc1 | 1970-01-01T00:00:15 | 13.5 |
| idc2 | 1970-01-01T00:00:00 | 3.5 |
| idc2 | 1970-01-01T00:00:05 | 7.5 |
| idc2 | 1970-01-01T00:00:10 | 11.5 |
| idc2 | 1970-01-01T00:00:15 | 15.5 |
+------+---------------------+--------------------+
TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc));
+---------------------+-------------------------+
| ts | quantile(sum(test.val)) |
+---------------------+-------------------------+
| 1970-01-01T00:00:00 | 5.0 |
| 1970-01-01T00:00:05 | 13.0 |
| 1970-01-01T00:00:10 | 21.0 |
| 1970-01-01T00:00:15 | 29.0 |
+---------------------+-------------------------+
DROP TABLE test;
Affected Rows: 0

View File

@@ -0,0 +1,33 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
INSERT INTO TABLE test VALUES
(0, 'host1', "idc1", 1),
(0, 'host2', "idc1", 2),
(0, 'host3', "idc2", 3),
(0, 'host4', "idc2", 4),
(5000, 'host1', "idc1", 5),
(5000, 'host2', "idc1", 6),
(5000, 'host3', "idc2", 7),
(5000, 'host4', "idc2", 8),
(10000, 'host1', "idc1", 9),
(10000, 'host2', "idc1", 10),
(10000, 'host3', "idc2", 11),
(10000, 'host4', "idc2", 12),
(15000, 'host1', "idc1", 13),
(15000, 'host2', "idc1", 14),
(15000, 'host3', "idc2", 15),
(15000, 'host4', "idc2", 16);
TQL EVAL (0, 15, '5s') quantile(0.5, test);
TQL EVAL (0, 15, '5s') quantile(0.5, test) by (idc);
TQL EVAL (0, 15, '5s') quantile(0.5, sum(test) by (idc));
DROP TABLE test;