feat: impl topk and bottomk (#5602)

* feat: impl topk and bottomk

* chore: test and project fields

* refactor: prom_topk_bottomk_to_plan

* fix: order

* chore: adds topk plan test

* chore: comment

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2025-03-03 15:32:24 +08:00
committed by GitHub
parent dee76f0a73
commit 87b1408d76
3 changed files with 531 additions and 23 deletions

View File

@@ -30,18 +30,19 @@ use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion::functions_aggregate::stddev::stddev_pop_udaf;
use datafusion::functions_aggregate::sum::sum_udaf;
use datafusion::functions_aggregate::variance::var_pop_udaf;
use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction};
use datafusion::functions_window::row_number::RowNumber;
use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, WindowFunction};
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
BinaryExpr, Cast, Extension, LogicalPlan, LogicalPlanBuilder, Operator,
ScalarUDF as ScalarUdfDef,
ScalarUDF as ScalarUdfDef, WindowFrame, WindowFunctionDefinition,
};
use datafusion::prelude as df_prelude;
use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_expr::utils::conjunction;
use datafusion_expr::SortExpr;
use datafusion_expr::{col, lit, SortExpr};
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
@@ -265,32 +266,120 @@ impl PromPlanner {
aggr_expr: &AggregateExpr,
) -> Result<LogicalPlan> {
let AggregateExpr {
op,
expr,
// TODO(ruihang): support param
param: _param,
modifier,
op, expr, modifier, ..
} = aggr_expr;
let input = self.prom_expr_to_plan(expr, session_state).await?;
match (*op).id() {
token::T_TOPK | token::T_BOTTOMK => {
self.prom_topk_bottomk_to_plan(aggr_expr, input).await
}
_ => {
// calculate columns to group by
// Need to append time index column into group by columns
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, true)?;
// convert op and value columns to aggregate exprs
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
// calculate columns to group by
// Need to append time index column into group by columns
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?;
// create plan
let group_sort_expr = group_exprs
.clone()
.into_iter()
.map(|expr| expr.sort(true, false));
LogicalPlanBuilder::from(input)
.aggregate(group_exprs.clone(), aggr_exprs)
.context(DataFusionPlanningSnafu)?
.sort(group_sort_expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
}
}
// convert op and value columns to aggregate exprs
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
/// Create logical plan for PromQL topk and bottomk expr.
async fn prom_topk_bottomk_to_plan(
&mut self,
aggr_expr: &AggregateExpr,
input: LogicalPlan,
) -> Result<LogicalPlan> {
let AggregateExpr {
op,
param,
modifier,
..
} = aggr_expr;
// create plan
let group_sort_expr = group_exprs
.clone()
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier, false)?;
let param = param
.as_deref()
.with_context(|| FunctionInvalidArgumentSnafu {
fn_name: (*op).to_string(),
})?;
let PromExpr::NumberLiteral(NumberLiteral { val }) = param else {
return FunctionInvalidArgumentSnafu {
fn_name: (*op).to_string(),
}
.fail();
};
// convert op and value columns to window exprs.
let window_exprs = self.create_window_exprs(*op, group_exprs.clone(), &input)?;
let rank_columns: Vec<_> = window_exprs
.iter()
.map(|expr| expr.schema_name().to_string())
.collect();
// Create ranks filter with `Operator::Or`.
// Safety: at least one rank column
let filter: DfExpr = rank_columns
.iter()
.fold(None, |expr, rank| {
let predicate = DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(col(rank)),
op: Operator::LtEq,
right: Box::new(lit(*val)),
});
match expr {
None => Some(predicate),
Some(expr) => Some(DfExpr::BinaryExpr(BinaryExpr {
left: Box::new(expr),
op: Operator::Or,
right: Box::new(predicate),
})),
}
})
.unwrap();
let rank_columns: Vec<_> = rank_columns.into_iter().map(col).collect();
let mut new_group_exprs = group_exprs.clone();
// Order by ranks
new_group_exprs.extend(rank_columns);
let group_sort_expr = new_group_exprs
.into_iter()
.map(|expr| expr.sort(true, false));
let project_fields = self
.create_field_column_exprs()?
.into_iter()
.chain(self.create_tag_column_exprs()?)
.chain(Some(self.create_time_index_column_expr()?));
LogicalPlanBuilder::from(input)
.aggregate(group_exprs, aggr_exprs)
.window(window_exprs)
.context(DataFusionPlanningSnafu)?
.filter(filter)
.context(DataFusionPlanningSnafu)?
.sort(group_sort_expr)
.context(DataFusionPlanningSnafu)?
.project(project_fields)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
}
@@ -959,15 +1048,18 @@ impl PromPlanner {
///
/// # Side effect
///
/// This method will also change the tag columns in ctx.
/// This method will also change the tag columns in ctx if `update_ctx` is true.
fn agg_modifier_to_col(
&mut self,
input_schema: &DFSchemaRef,
modifier: &Option<LabelModifier>,
update_ctx: bool,
) -> Result<Vec<DfExpr>> {
match modifier {
None => {
self.ctx.tag_columns = vec![];
if update_ctx {
self.ctx.tag_columns = vec![];
}
Ok(vec![self.create_time_index_column_expr()?])
}
Some(LabelModifier::Include(labels)) => {
@@ -979,8 +1071,10 @@ impl PromPlanner {
}
}
// change the tag columns in context
self.ctx.tag_columns.clone_from(&labels.labels);
if update_ctx {
// change the tag columns in context
self.ctx.tag_columns.clone_from(&labels.labels);
}
// add timestamp column
exprs.push(self.create_time_index_column_expr()?);
@@ -1008,8 +1102,10 @@ impl PromPlanner {
let _ = all_fields.remove(value);
}
// change the tag columns in context
self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
if update_ctx {
// change the tag columns in context
self.ctx.tag_columns = all_fields.iter().map(|col| (*col).clone()).collect();
}
// collect remaining fields and convert to col expr
let mut exprs = all_fields
@@ -1772,6 +1868,15 @@ impl PromPlanner {
Ok(result)
}
fn create_field_column_exprs(&self) -> Result<Vec<DfExpr>> {
let mut result = Vec::with_capacity(self.ctx.field_columns.len());
for field in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(field));
result.push(expr);
}
Ok(result)
}
fn create_tag_and_time_index_column_sort_exprs(&self) -> Result<Vec<SortExpr>> {
let mut result = self
.ctx
@@ -1884,6 +1989,59 @@ impl PromPlanner {
Ok(exprs)
}
/// Create [DfExpr::WindowFunction] expr for each value column with given window function.
///
fn create_window_exprs(
&mut self,
op: TokenType,
group_exprs: Vec<DfExpr>,
input_plan: &LogicalPlan,
) -> Result<Vec<DfExpr>> {
ensure!(
self.ctx.field_columns.len() == 1,
UnsupportedExprSnafu {
name: "topk or bottomk on multi-value input"
}
);
assert!(matches!(op.id(), token::T_TOPK | token::T_BOTTOMK));
let asc = matches!(op.id(), token::T_BOTTOMK);
let tag_sort_exprs = self
.create_tag_column_exprs()?
.into_iter()
.map(|expr| expr.sort(asc, false));
// perform window operation to each value column
let exprs: Vec<DfExpr> = self
.ctx
.field_columns
.iter()
.map(|col| {
let mut sort_exprs = Vec::with_capacity(self.ctx.tag_columns.len() + 1);
// Order by value in the specific order
sort_exprs.push(DfExpr::Column(Column::from(col)).sort(asc, false));
// Then tags if the values are equal,
// Try to ensure the relative stability of the output results.
sort_exprs.extend(tag_sort_exprs.clone());
DfExpr::WindowFunction(WindowFunction {
fun: WindowFunctionDefinition::WindowUDF(Arc::new(RowNumber::new().into())),
args: vec![],
partition_by: group_exprs.clone(),
order_by: sort_exprs,
window_frame: WindowFrame::new(Some(true)),
null_treatment: None,
})
})
.collect();
let normalized_exprs =
normalize_cols(exprs.iter().cloned(), input_plan).context(DataFusionPlanningSnafu)?;
Ok(normalized_exprs)
}
/// Create a [SPECIAL_HISTOGRAM_QUANTILE] plan.
async fn create_histogram_plan(
&mut self,
@@ -4041,4 +4199,53 @@ mod test {
TableScan: prometheus_tsdb_head_series [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N, field_2:Float64;N]"#;
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
#[tokio::test]
async fn test_topk_expr() {
let mut eval_stmt = EvalStmt {
expr: PromExpr::NumberLiteral(NumberLiteral { val: 1.0 }),
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};
let case = r#"topk(10, sum(prometheus_tsdb_head_series{ip=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237:9090)"}) by (ip))"#;
let prom_expr = parser::parse(case).unwrap();
eval_stmt.expr = prom_expr;
let table_provider = build_test_table_provider_with_fields(
&[
(
DEFAULT_SCHEMA_NAME.to_string(),
"prometheus_tsdb_head_series".to_string(),
),
(
DEFAULT_SCHEMA_NAME.to_string(),
"http_server_requests_seconds_count".to_string(),
),
],
&["ip"],
)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();
let expected = r#"Projection: sum(prometheus_tsdb_head_series.greptime_value), prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp [sum(prometheus_tsdb_head_series.greptime_value):Float64;N, ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None)]
Sort: prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
Filter: row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW <= Float64(10) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
WindowAggr: windowExpr=[[row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N, row_number() PARTITION BY [prometheus_tsdb_head_series.greptime_timestamp] ORDER BY [sum(prometheus_tsdb_head_series.greptime_value) DESC NULLS LAST, prometheus_tsdb_head_series.ip DESC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:UInt64]
Sort: prometheus_tsdb_head_series.ip ASC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp ASC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
Aggregate: groupBy=[[prometheus_tsdb_head_series.ip, prometheus_tsdb_head_series.greptime_timestamp]], aggr=[[sum(prometheus_tsdb_head_series.greptime_value)]] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), sum(prometheus_tsdb_head_series.greptime_value):Float64;N]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[greptime_timestamp] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesNormalize: offset=[0], time index=[greptime_timestamp], filter NaN: [false] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
PromSeriesDivide: tags=["ip"] [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Sort: prometheus_tsdb_head_series.ip DESC NULLS LAST, prometheus_tsdb_head_series.greptime_timestamp DESC NULLS LAST [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
Filter: prometheus_tsdb_head_series.ip ~ Utf8("(10\.0\.160\.237:8080|10\.0\.160\.237:9090)") AND prometheus_tsdb_head_series.greptime_timestamp >= TimestampMillisecond(-1000, None) AND prometheus_tsdb_head_series.greptime_timestamp <= TimestampMillisecond(100001000, None) [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]
TableScan: prometheus_tsdb_head_series [ip:Utf8, greptime_timestamp:Timestamp(Millisecond, None), greptime_value:Float64;N]"#;
assert_eq!(plan.display_indent_schema().to_string(), expected);
}
}

View File

@@ -0,0 +1,223 @@
-- test single value table --
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
INSERT INTO TABLE test VALUES
(0, 'host1', "idc1", 1),
(0, 'host2', "idc1", 2),
(0, 'host3', "idc2", 3),
(5000, 'host1', "idc1", 1),
(5000, 'host2', "idc1", 4),
(5000, 'host3', "idc2", 1),
(10000, 'host1', "idc1", 3),
(10000, 'host2', "idc1", 5),
(10000, 'host3', "idc2", 3),
(15000, 'host1', "idc1", 1),
(15000, 'host2', "idc1", 2),
(15000, 'host3', "idc2", 3);
Affected Rows: 12
TQL EVAL (0, 15, '5s') topk(1, test);
+-----+-------+------+---------------------+
| val | host | idc | ts |
+-----+-------+------+---------------------+
| 3 | host3 | idc2 | 1970-01-01T00:00:00 |
| 4 | host2 | idc1 | 1970-01-01T00:00:05 |
| 5 | host2 | idc1 | 1970-01-01T00:00:10 |
| 3 | host3 | idc2 | 1970-01-01T00:00:15 |
+-----+-------+------+---------------------+
TQL EVAL (0, 15, '5s') topk(3, test);
+-----+-------+------+---------------------+
| val | host | idc | ts |
+-----+-------+------+---------------------+
| 3 | host3 | idc2 | 1970-01-01T00:00:00 |
| 2 | host2 | idc1 | 1970-01-01T00:00:00 |
| 1 | host1 | idc1 | 1970-01-01T00:00:00 |
| 4 | host2 | idc1 | 1970-01-01T00:00:05 |
| 1 | host3 | idc2 | 1970-01-01T00:00:05 |
| 1 | host1 | idc1 | 1970-01-01T00:00:05 |
| 5 | host2 | idc1 | 1970-01-01T00:00:10 |
| 3 | host3 | idc2 | 1970-01-01T00:00:10 |
| 3 | host1 | idc1 | 1970-01-01T00:00:10 |
| 3 | host3 | idc2 | 1970-01-01T00:00:15 |
| 2 | host2 | idc1 | 1970-01-01T00:00:15 |
| 1 | host1 | idc1 | 1970-01-01T00:00:15 |
+-----+-------+------+---------------------+
TQL EVAL (0, 15, '5s') topk(1, sum(test) by (idc));
+---------------+------+---------------------+
| sum(test.val) | idc | ts |
+---------------+------+---------------------+
| 3 | idc2 | 1970-01-01T00:00:00 |
| 5 | idc1 | 1970-01-01T00:00:05 |
| 8 | idc1 | 1970-01-01T00:00:10 |
| 3 | idc2 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
TQL EVAL (0, 15, '5s') topk(2, sum(test) by (idc));
+---------------+------+---------------------+
| sum(test.val) | idc | ts |
+---------------+------+---------------------+
| 3 | idc2 | 1970-01-01T00:00:00 |
| 3 | idc1 | 1970-01-01T00:00:00 |
| 5 | idc1 | 1970-01-01T00:00:05 |
| 1 | idc2 | 1970-01-01T00:00:05 |
| 8 | idc1 | 1970-01-01T00:00:10 |
| 3 | idc2 | 1970-01-01T00:00:10 |
| 3 | idc2 | 1970-01-01T00:00:15 |
| 3 | idc1 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
TQL EVAL (0, 15, '5s') bottomk(1, test);
+-----+-------+------+---------------------+
| val | host | idc | ts |
+-----+-------+------+---------------------+
| 1 | host1 | idc1 | 1970-01-01T00:00:00 |
| 1 | host1 | idc1 | 1970-01-01T00:00:05 |
| 3 | host1 | idc1 | 1970-01-01T00:00:10 |
| 1 | host1 | idc1 | 1970-01-01T00:00:15 |
+-----+-------+------+---------------------+
TQL EVAL (0, 15, '5s') bottomk(3, test);
+-----+-------+------+---------------------+
| val | host | idc | ts |
+-----+-------+------+---------------------+
| 1 | host1 | idc1 | 1970-01-01T00:00:00 |
| 2 | host2 | idc1 | 1970-01-01T00:00:00 |
| 3 | host3 | idc2 | 1970-01-01T00:00:00 |
| 1 | host1 | idc1 | 1970-01-01T00:00:05 |
| 1 | host3 | idc2 | 1970-01-01T00:00:05 |
| 4 | host2 | idc1 | 1970-01-01T00:00:05 |
| 3 | host1 | idc1 | 1970-01-01T00:00:10 |
| 3 | host3 | idc2 | 1970-01-01T00:00:10 |
| 5 | host2 | idc1 | 1970-01-01T00:00:10 |
| 1 | host1 | idc1 | 1970-01-01T00:00:15 |
| 2 | host2 | idc1 | 1970-01-01T00:00:15 |
| 3 | host3 | idc2 | 1970-01-01T00:00:15 |
+-----+-------+------+---------------------+
TQL EVAL (0, 15, '5s') bottomk(1, sum(test) by (idc));
+---------------+------+---------------------+
| sum(test.val) | idc | ts |
+---------------+------+---------------------+
| 3 | idc1 | 1970-01-01T00:00:00 |
| 1 | idc2 | 1970-01-01T00:00:05 |
| 3 | idc2 | 1970-01-01T00:00:10 |
| 3 | idc1 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
TQL EVAL (0, 15, '5s') bottomk(2, sum(test) by (idc));
+---------------+------+---------------------+
| sum(test.val) | idc | ts |
+---------------+------+---------------------+
| 3 | idc1 | 1970-01-01T00:00:00 |
| 3 | idc2 | 1970-01-01T00:00:00 |
| 1 | idc2 | 1970-01-01T00:00:05 |
| 5 | idc1 | 1970-01-01T00:00:05 |
| 3 | idc2 | 1970-01-01T00:00:10 |
| 8 | idc1 | 1970-01-01T00:00:10 |
| 3 | idc1 | 1970-01-01T00:00:15 |
| 3 | idc2 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
DROP table test;
Affected Rows: 0
-- test multi-values table --
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
cpu BIGINT,
mem BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
INSERT INTO TABLE test VALUES
(0, 'host1', "idc1", 1, 3),
(0, 'host2', "idc1", 2, 2),
(0, 'host3', "idc2", 3, 1),
(5000, 'host1', "idc1", 1, 1),
(5000, 'host2', "idc1", 4, 4),
(5000, 'host3', "idc2", 1, 1),
(10000, 'host1', "idc1", 3, 3),
(10000, 'host2', "idc1", 5, 5),
(10000, 'host3', "idc2", 3, 3),
(15000, 'host1', "idc1", 1, 3),
(15000, 'host2', "idc1", 2, 2),
(15000, 'host3', "idc2", 3, 1);
Affected Rows: 12
TQL EVAL (0, 15, '5s') topk(1, test);
Error: 1004(InvalidArguments), Unsupported expr type: topk or bottomk on multi-value input
TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='cpu'}) by (idc));
+---------------+------+---------------------+
| sum(test.cpu) | idc | ts |
+---------------+------+---------------------+
| 3 | idc2 | 1970-01-01T00:00:00 |
| 5 | idc1 | 1970-01-01T00:00:05 |
| 8 | idc1 | 1970-01-01T00:00:10 |
| 3 | idc2 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='mem'}) by (idc));
+---------------+------+---------------------+
| sum(test.mem) | idc | ts |
+---------------+------+---------------------+
| 5 | idc1 | 1970-01-01T00:00:00 |
| 5 | idc1 | 1970-01-01T00:00:05 |
| 8 | idc1 | 1970-01-01T00:00:10 |
| 5 | idc1 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='cpu'}) by (idc));
+---------------+------+---------------------+
| sum(test.cpu) | idc | ts |
+---------------+------+---------------------+
| 3 | idc1 | 1970-01-01T00:00:00 |
| 1 | idc2 | 1970-01-01T00:00:05 |
| 3 | idc2 | 1970-01-01T00:00:10 |
| 3 | idc1 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='mem'}) by (idc));
+---------------+------+---------------------+
| sum(test.mem) | idc | ts |
+---------------+------+---------------------+
| 1 | idc2 | 1970-01-01T00:00:00 |
| 1 | idc2 | 1970-01-01T00:00:05 |
| 3 | idc2 | 1970-01-01T00:00:10 |
| 1 | idc2 | 1970-01-01T00:00:15 |
+---------------+------+---------------------+
DROP table test;
Affected Rows: 0

View File

@@ -0,0 +1,78 @@
-- test single value table --
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
INSERT INTO TABLE test VALUES
(0, 'host1', "idc1", 1),
(0, 'host2', "idc1", 2),
(0, 'host3', "idc2", 3),
(5000, 'host1', "idc1", 1),
(5000, 'host2', "idc1", 4),
(5000, 'host3', "idc2", 1),
(10000, 'host1', "idc1", 3),
(10000, 'host2', "idc1", 5),
(10000, 'host3', "idc2", 3),
(15000, 'host1', "idc1", 1),
(15000, 'host2', "idc1", 2),
(15000, 'host3', "idc2", 3);
TQL EVAL (0, 15, '5s') topk(1, test);
TQL EVAL (0, 15, '5s') topk(3, test);
TQL EVAL (0, 15, '5s') topk(1, sum(test) by (idc));
TQL EVAL (0, 15, '5s') topk(2, sum(test) by (idc));
TQL EVAL (0, 15, '5s') bottomk(1, test);
TQL EVAL (0, 15, '5s') bottomk(3, test);
TQL EVAL (0, 15, '5s') bottomk(1, sum(test) by (idc));
TQL EVAL (0, 15, '5s') bottomk(2, sum(test) by (idc));
DROP table test;
-- test multi-values table --
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
cpu BIGINT,
mem BIGINT,
PRIMARY KEY(host, idc),
);
INSERT INTO TABLE test VALUES
(0, 'host1', "idc1", 1, 3),
(0, 'host2', "idc1", 2, 2),
(0, 'host3', "idc2", 3, 1),
(5000, 'host1', "idc1", 1, 1),
(5000, 'host2', "idc1", 4, 4),
(5000, 'host3', "idc2", 1, 1),
(10000, 'host1', "idc1", 3, 3),
(10000, 'host2', "idc1", 5, 5),
(10000, 'host3', "idc2", 3, 3),
(15000, 'host1', "idc1", 1, 3),
(15000, 'host2', "idc1", 2, 2),
(15000, 'host3', "idc2", 3, 1);
TQL EVAL (0, 15, '5s') topk(1, test);
TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='cpu'}) by (idc));
TQL EVAL (0, 15, '5s') topk(1, sum(test{__field__='mem'}) by (idc));
TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='cpu'}) by (idc));
TQL EVAL (0, 15, '5s') bottomk(1, sum(test{__field__='mem'}) by (idc));
DROP table test;