diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 7997f8946b..22477d5049 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -30,18 +30,19 @@ use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf}; use datafusion::functions_aggregate::stddev::stddev_pop_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::functions_aggregate::variance::var_pop_udaf; -use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction}; +use datafusion::functions_window::row_number::RowNumber; +use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, WindowFunction}; use datafusion::logical_expr::expr_rewriter::normalize_cols; use datafusion::logical_expr::{ BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator, - ScalarUDF as ScalarUdfDef, + ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition, }; use datafusion::prelude as df_prelude; use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; use datafusion_expr::utils::conjunction; -use datafusion_expr::SortExpr; +use datafusion_expr::{col, lit, SortExpr}; use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; use datatypes::data_type::ConcreteDataType; use itertools::Itertools; @@ -265,32 +266,120 @@ impl PromPlanner { aggr_expr: &AggregateExpr, ) -> Result { let AggregateExpr { - op, - expr, - // TODO(ruihang): support param - param: _param, - modifier, + op, expr, modifier, .. } = aggr_expr; let input = self.prom_expr_to_plan(expr, session_state).await?; + match (*op).id() { + token::T_TOPK | token::T_BOTTOMK => { + self.prom_topk_bottomk_to_plan(aggr_expr, input).await + } + _ => { + // calculate columns to group by + // Need to append time index column into group by columns + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?; + // convert op and value columns to aggregate exprs + let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; - // calculate columns to group by - // Need to append time index column into group by columns - let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?; + // create plan + let group_sort_expr = group_exprs + .clone() + .into_iter() + .map(|expr| expr.sort(true, false)); + LogicalPlanBuilder::from(input) + .aggregate(group_exprs.clone(), aggr_exprs) + .context(DataFusionPlanningSnafu)? + .sort(group_sort_expr) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu) + } + } + } - // convert op and value columns to aggregate exprs - let aggr_exprs = self.create_aggregate_exprs(*op, &input)?; + /// Create logical plan for PromQL topk and bottomk expr. + async fn prom_topk_bottomk_to_plan( + &mut self, + aggr_expr: &AggregateExpr, + input: LogicalPlan, + ) -> Result { + let AggregateExpr { + op, + param, + modifier, + .. + } = aggr_expr; - // create plan - let group_sort_expr = group_exprs - .clone() + let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?; + + let param = param + .as_deref() + .with_context(|| FunctionInvalidArgumentSnafu { + fn_name: (*op).to_string(), + })?; + + let PromExpr::NumberLiteral(NumberLiteral { val }) = param else { + return FunctionInvalidArgumentSnafu { + fn_name: (*op).to_string(), + } + .fail(); + }; + + // convert op and value columns to window exprs. + let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?; + + let rank_columns: Vec<_> = window_exprs + .iter() + .map(|expr| expr.schema_name().to_string()) + .collect(); + + // Create ranks filter with `Operator::Or`. + // Safety: at least one rank column + let filter: DfExpr = rank_columns + .iter() + .fold(None, |expr, rank| { + let predicate = DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(col(rank)), + op: Operator::LtEq, + right: Box::new(lit(*val)), + }); + + match expr { + None => Some(predicate), + Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr { + left: Box::new(expr), + op: Operator::Or, + right: Box::new(predicate), + })), + } + }) + .unwrap(); + + let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect(); + + let mut new_group_exprs = group_exprs.clone(); + // Order by ranks + new_group_exprs.extend(rank_columns); + + let group_sort_expr = new_group_exprs .into_iter() .map(|expr| expr.sort(true, false)); + + let project_fields = self + .create_field_column_exprs()? + .into_iter() + .chain(self.create_tag_column_exprs()?) + .chain(Some(self.create_time_index_column_expr()?)); + LogicalPlanBuilder::from(input) - .aggregate(group_exprs, aggr_exprs) + .window(window_exprs) + .context(DataFusionPlanningSnafu)? + .filter(filter) .context(DataFusionPlanningSnafu)? .sort(group_sort_expr) .context(DataFusionPlanningSnafu)? + .project(project_fields) + .context(DataFusionPlanningSnafu)? .build() .context(DataFusionPlanningSnafu) } @@ -959,15 +1048,18 @@ impl PromPlanner { /// /// # Side effect /// - /// This method will also change the tag columns in ctx. + /// This method will also change the tag columns in ctx if `update_ctx` is true. fn agg_modifier_to_col( &mut self, input_schema: &DFSchemaRef, modifier: &Option, + update_ctx: bool, ) -> Result> { match modifier { None => { - self.ctx.tag_columns = vec![]; + if update_ctx { + self.ctx.tag_columns = vec![]; + } Ok(vec![self.create_time_index_column_expr()?]) } Some(LabelModifier::Include(labels)) => { @@ -979,8 +1071,10 @@ impl PromPlanner { } } - // change the tag columns in context - self.ctx.tag_columns.clone_from(&labels.labels); + if update_ctx { + // change the tag columns in context + self.ctx.tag_columns.clone_from(&labels.labels); + } // add timestamp column exprs.push(self.create_time_index_column_expr()?); @@ -1008,8 +1102,10 @@ impl PromPlanner { let _ = all_fields.remove(value); } - // change the tag columns in context - self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect(); + if update_ctx { + // change the tag columns in context + self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect(); + } // collect remaining fields and convert to col expr let mut exprs = all_fields @@ -1772,6 +1868,15 @@ impl PromPlanner { Ok(result) } + fn create_field_column_exprs(&self) -> Result> { + let mut result = Vec::with_capacity(self.ctx.field_columns.len()); + for field in &self.ctx.field_columns { + let expr = DfExpr::Column(Column::from_name(field)); + result.push(expr); + } + Ok(result) + } + fn create_tag_and_time_index_column_sort_exprs(&self) -> Result> { let mut result = self .ctx @@ -1884,6 +1989,59 @@ impl PromPlanner { Ok(exprs) } + /// Create [DfExpr::WindowFunction] expr for each value column with given window function. + /// + fn create_window_exprs( + &mut self, + op: TokenType, + group_exprs: Vec, + input_plan: &LogicalPlan, + ) -> Result> { + ensure!( + self.ctx.field_columns.len() == 1, + UnsupportedExprSnafu { + name: "topk or bottomk on multi-value input" + } + ); + + assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK)); + + let asc = matches!(op.id(), token::T_BOTTOMK); + + let tag_sort_exprs = self + .create_tag_column_exprs()? + .into_iter() + .map(|expr| expr.sort(asc, false)); + + // perform window operation to each value column + let exprs: Vec = self + .ctx + .field_columns + .iter() + .map(|col| { + let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1); + // Order by value in the specific order + sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, false)); + // Then tags if the values are equal, + // Try to ensure the relative stability of the output results. + sort_exprs.extend(tag_sort_exprs.clone()); + + DfExpr::WindowFunction(WindowFunction { + fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())), + args: vec![], + partition_by: group_exprs.clone(), + order_by: sort_exprs, + window_frame: WindowFrame::new(Some(true)), + null_treatment: None, + }) + }) + .collect(); + + let normalized_exprs = + normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?; + Ok(normalized_exprs) + } + /// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan. async fn create_histogram_plan( &mut self, @@ -4041,4 +4199,53 @@ mod test { TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]"#; assert_eq!(plan.display_indent_schema().to_string(), expected); } + + #[tokio::test] + async fn test_topk_expr() { + let mut eval_stmt = EvalStmt { + expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }), + start: UNIX_EPOCH, + end: UNIX_EPOCH + .checked_add(Duration::from_secs(100_000)) + .unwrap(), + interval: Duration::from_secs(5), + lookback_delta: Duration::from_secs(1), + }; + let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)"}) by (ip))"#; + + let prom_expr = parser::parse(case).unwrap(); + eval_stmt.expr = prom_expr; + let table_provider = build_test_table_provider_with_fields( + &[ + ( + DEFAULT_SCHEMA_NAME.to_string(), + "prometheus_tsdb_head_series".to_string(), + ), + ( + DEFAULT_SCHEMA_NAME.to_string(), + "http_server_requests_seconds_count".to_string(), + ), + ], + &["ip"], + ) + .await; + + let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state()) + .await + .unwrap(); + let expected = r#"Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)] + Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64] + Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64] + WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64] + Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N] + Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N] + PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N] + TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#; + + assert_eq!(plan.display_indent_schema().to_string(), expected); + } } diff --git a/tests/cases/standalone/common/promql/topk_bottomk.result b/tests/cases/standalone/common/promql/topk_bottomk.result new file mode 100644 index 0000000000..978c2b1968 --- /dev/null +++ b/tests/cases/standalone/common/promql/topk_bottomk.result @@ -0,0 +1,223 @@ +-- test single value table -- +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +INSERT INTO TABLE test VALUES + (0, 'host1', "idc1", 1), + (0, 'host2', "idc1", 2), + (0, 'host3', "idc2", 3), + (5000, 'host1', "idc1", 1), + (5000, 'host2', "idc1", 4), + (5000, 'host3', "idc2", 1), + (10000, 'host1', "idc1", 3), + (10000, 'host2', "idc1", 5), + (10000, 'host3', "idc2", 3), + (15000, 'host1', "idc1", 1), + (15000, 'host2', "idc1", 2), + (15000, 'host3', "idc2", 3); + +Affected Rows: 12 + +TQL EVAL (0, 15, '5s') topk(1, test); + ++-----+-------+------+---------------------+ +| val | host | idc | ts | ++-----+-------+------+---------------------+ +| 3 | host3 | idc2 | 1970-01-01T00:00:00 | +| 4 | host2 | idc1 | 1970-01-01T00:00:05 | +| 5 | host2 | idc1 | 1970-01-01T00:00:10 | +| 3 | host3 | idc2 | 1970-01-01T00:00:15 | ++-----+-------+------+---------------------+ + +TQL EVAL (0, 15, '5s') topk(3, test); + ++-----+-------+------+---------------------+ +| val | host | idc | ts | ++-----+-------+------+---------------------+ +| 3 | host3 | idc2 | 1970-01-01T00:00:00 | +| 2 | host2 | idc1 | 1970-01-01T00:00:00 | +| 1 | host1 | idc1 | 1970-01-01T00:00:00 | +| 4 | host2 | idc1 | 1970-01-01T00:00:05 | +| 1 | host3 | idc2 | 1970-01-01T00:00:05 | +| 1 | host1 | idc1 | 1970-01-01T00:00:05 | +| 5 | host2 | idc1 | 1970-01-01T00:00:10 | +| 3 | host3 | idc2 | 1970-01-01T00:00:10 | +| 3 | host1 | idc1 | 1970-01-01T00:00:10 | +| 3 | host3 | idc2 | 1970-01-01T00:00:15 | +| 2 | host2 | idc1 | 1970-01-01T00:00:15 | +| 1 | host1 | idc1 | 1970-01-01T00:00:15 | ++-----+-------+------+---------------------+ + +TQL EVAL (0, 15, '5s') topk(1, sum(test) by (idc)); + ++---------------+------+---------------------+ +| sum(test.val) | idc | ts | ++---------------+------+---------------------+ +| 3 | idc2 | 1970-01-01T00:00:00 | +| 5 | idc1 | 1970-01-01T00:00:05 | +| 8 | idc1 | 1970-01-01T00:00:10 | +| 3 | idc2 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +TQL EVAL (0, 15, '5s') topk(2, sum(test) by (idc)); + ++---------------+------+---------------------+ +| sum(test.val) | idc | ts | ++---------------+------+---------------------+ +| 3 | idc2 | 1970-01-01T00:00:00 | +| 3 | idc1 | 1970-01-01T00:00:00 | +| 5 | idc1 | 1970-01-01T00:00:05 | +| 1 | idc2 | 1970-01-01T00:00:05 | +| 8 | idc1 | 1970-01-01T00:00:10 | +| 3 | idc2 | 1970-01-01T00:00:10 | +| 3 | idc2 | 1970-01-01T00:00:15 | +| 3 | idc1 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +TQL EVAL (0, 15, '5s') bottomk(1, test); + ++-----+-------+------+---------------------+ +| val | host | idc | ts | ++-----+-------+------+---------------------+ +| 1 | host1 | idc1 | 1970-01-01T00:00:00 | +| 1 | host1 | idc1 | 1970-01-01T00:00:05 | +| 3 | host1 | idc1 | 1970-01-01T00:00:10 | +| 1 | host1 | idc1 | 1970-01-01T00:00:15 | ++-----+-------+------+---------------------+ + +TQL EVAL (0, 15, '5s') bottomk(3, test); + ++-----+-------+------+---------------------+ +| val | host | idc | ts | ++-----+-------+------+---------------------+ +| 1 | host1 | idc1 | 1970-01-01T00:00:00 | +| 2 | host2 | idc1 | 1970-01-01T00:00:00 | +| 3 | host3 | idc2 | 1970-01-01T00:00:00 | +| 1 | host1 | idc1 | 1970-01-01T00:00:05 | +| 1 | host3 | idc2 | 1970-01-01T00:00:05 | +| 4 | host2 | idc1 | 1970-01-01T00:00:05 | +| 3 | host1 | idc1 | 1970-01-01T00:00:10 | +| 3 | host3 | idc2 | 1970-01-01T00:00:10 | +| 5 | host2 | idc1 | 1970-01-01T00:00:10 | +| 1 | host1 | idc1 | 1970-01-01T00:00:15 | +| 2 | host2 | idc1 | 1970-01-01T00:00:15 | +| 3 | host3 | idc2 | 1970-01-01T00:00:15 | ++-----+-------+------+---------------------+ + +TQL EVAL (0, 15, '5s') bottomk(1, sum(test) by (idc)); + ++---------------+------+---------------------+ +| sum(test.val) | idc | ts | ++---------------+------+---------------------+ +| 3 | idc1 | 1970-01-01T00:00:00 | +| 1 | idc2 | 1970-01-01T00:00:05 | +| 3 | idc2 | 1970-01-01T00:00:10 | +| 3 | idc1 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +TQL EVAL (0, 15, '5s') bottomk(2, sum(test) by (idc)); + ++---------------+------+---------------------+ +| sum(test.val) | idc | ts | ++---------------+------+---------------------+ +| 3 | idc1 | 1970-01-01T00:00:00 | +| 3 | idc2 | 1970-01-01T00:00:00 | +| 1 | idc2 | 1970-01-01T00:00:05 | +| 5 | idc1 | 1970-01-01T00:00:05 | +| 3 | idc2 | 1970-01-01T00:00:10 | +| 8 | idc1 | 1970-01-01T00:00:10 | +| 3 | idc1 | 1970-01-01T00:00:15 | +| 3 | idc2 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +DROP table test; + +Affected Rows: 0 + +-- test multi-values table -- +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + cpu BIGINT, + mem BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +INSERT INTO TABLE test VALUES + (0, 'host1', "idc1", 1, 3), + (0, 'host2', "idc1", 2, 2), + (0, 'host3', "idc2", 3, 1), + (5000, 'host1', "idc1", 1, 1), + (5000, 'host2', "idc1", 4, 4), + (5000, 'host3', "idc2", 1, 1), + (10000, 'host1', "idc1", 3, 3), + (10000, 'host2', "idc1", 5, 5), + (10000, 'host3', "idc2", 3, 3), + (15000, 'host1', "idc1", 1, 3), + (15000, 'host2', "idc1", 2, 2), + (15000, 'host3', "idc2", 3, 1); + +Affected Rows: 12 + +TQL EVAL (0, 15, '5s') topk(1, test); + +Error: 1004(InvalidArguments), Unsupported expr type: topk or bottomk on multi-value input + +TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='cpu'}) by (idc)); + ++---------------+------+---------------------+ +| sum(test.cpu) | idc | ts | ++---------------+------+---------------------+ +| 3 | idc2 | 1970-01-01T00:00:00 | +| 5 | idc1 | 1970-01-01T00:00:05 | +| 8 | idc1 | 1970-01-01T00:00:10 | +| 3 | idc2 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='mem'}) by (idc)); + ++---------------+------+---------------------+ +| sum(test.mem) | idc | ts | ++---------------+------+---------------------+ +| 5 | idc1 | 1970-01-01T00:00:00 | +| 5 | idc1 | 1970-01-01T00:00:05 | +| 8 | idc1 | 1970-01-01T00:00:10 | +| 5 | idc1 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='cpu'}) by (idc)); + ++---------------+------+---------------------+ +| sum(test.cpu) | idc | ts | ++---------------+------+---------------------+ +| 3 | idc1 | 1970-01-01T00:00:00 | +| 1 | idc2 | 1970-01-01T00:00:05 | +| 3 | idc2 | 1970-01-01T00:00:10 | +| 3 | idc1 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='mem'}) by (idc)); + ++---------------+------+---------------------+ +| sum(test.mem) | idc | ts | ++---------------+------+---------------------+ +| 1 | idc2 | 1970-01-01T00:00:00 | +| 1 | idc2 | 1970-01-01T00:00:05 | +| 3 | idc2 | 1970-01-01T00:00:10 | +| 1 | idc2 | 1970-01-01T00:00:15 | ++---------------+------+---------------------+ + +DROP table test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/topk_bottomk.sql b/tests/cases/standalone/common/promql/topk_bottomk.sql new file mode 100644 index 0000000000..c82b196579 --- /dev/null +++ b/tests/cases/standalone/common/promql/topk_bottomk.sql @@ -0,0 +1,78 @@ +-- test single value table -- +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +INSERT INTO TABLE test VALUES + (0, 'host1', "idc1", 1), + (0, 'host2', "idc1", 2), + (0, 'host3', "idc2", 3), + (5000, 'host1', "idc1", 1), + (5000, 'host2', "idc1", 4), + (5000, 'host3', "idc2", 1), + (10000, 'host1', "idc1", 3), + (10000, 'host2', "idc1", 5), + (10000, 'host3', "idc2", 3), + (15000, 'host1', "idc1", 1), + (15000, 'host2', "idc1", 2), + (15000, 'host3', "idc2", 3); + +TQL EVAL (0, 15, '5s') topk(1, test); + +TQL EVAL (0, 15, '5s') topk(3, test); + +TQL EVAL (0, 15, '5s') topk(1, sum(test) by (idc)); + +TQL EVAL (0, 15, '5s') topk(2, sum(test) by (idc)); + +TQL EVAL (0, 15, '5s') bottomk(1, test); + +TQL EVAL (0, 15, '5s') bottomk(3, test); + +TQL EVAL (0, 15, '5s') bottomk(1, sum(test) by (idc)); + +TQL EVAL (0, 15, '5s') bottomk(2, sum(test) by (idc)); + + +DROP table test; + +-- test multi-values table -- + +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + cpu BIGINT, + mem BIGINT, + PRIMARY KEY(host, idc), +); + +INSERT INTO TABLE test VALUES + (0, 'host1', "idc1", 1, 3), + (0, 'host2', "idc1", 2, 2), + (0, 'host3', "idc2", 3, 1), + (5000, 'host1', "idc1", 1, 1), + (5000, 'host2', "idc1", 4, 4), + (5000, 'host3', "idc2", 1, 1), + (10000, 'host1', "idc1", 3, 3), + (10000, 'host2', "idc1", 5, 5), + (10000, 'host3', "idc2", 3, 3), + (15000, 'host1', "idc1", 1, 3), + (15000, 'host2', "idc1", 2, 2), + (15000, 'host3', "idc2", 3, 1); + +TQL EVAL (0, 15, '5s') topk(1, test); + +TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='cpu'}) by (idc)); + +TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='mem'}) by (idc)); + +TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='cpu'}) by (idc)); + +TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='mem'}) by (idc)); + +DROP table test;