feat(promql): supports sort, sort_desc etc. functions (#5542)

* feat(promql): supports sort, sort_desc etc. functions

* chore: fix toml format and tests

* chore: update deps

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: remove fixme

* fix: cargo lock

* chore: style

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
dennis zhuang
2025-02-19 21:13:49 +08:00
committed by GitHub
parent c8bdeaaa6a
commit 62a8b8b9dc
5 changed files with 270 additions and 10 deletions

3
Cargo.lock generated
View File

@@ -8756,8 +8756,7 @@ dependencies = [
[[package]]
name = "promql-parser"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe99e6f80a79abccf1e8fb48dd63473a36057e600cc6ea36147c8318698ae6f"
source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=27abb8e16003a50c720f00d6c85f41f5fa2a2a8e#27abb8e16003a50c720f00d6c85f41f5fa2a2a8e"
dependencies = [
"cfgrammar",
"chrono",

View File

@@ -160,7 +160,9 @@ parquet = { version = "53.0.0", default-features = false, features = ["arrow", "
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
promql-parser = { version = "0.4.3", features = ["ser"] }
promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", features = [
"ser",
], rev = "27abb8e16003a50c720f00d6c85f41f5fa2a2a8e" }
prost = "0.13"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"

View File

@@ -612,8 +612,8 @@ impl PromPlanner {
// transform function arguments
let args = self.create_function_args(&args.args)?;
let input = if let Some(prom_expr) = args.input {
self.prom_expr_to_plan(&prom_expr, session_state).await?
let input = if let Some(prom_expr) = &args.input {
self.prom_expr_to_plan(prom_expr, session_state).await?
} else {
self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string());
self.ctx.reset_table_name_and_schema();
@@ -631,17 +631,43 @@ impl PromPlanner {
),
})
};
let mut func_exprs = self.create_function_expr(func, args.literals, session_state)?;
let mut func_exprs =
self.create_function_expr(func, args.literals.clone(), session_state)?;
func_exprs.insert(0, self.create_time_index_column_expr()?);
func_exprs.extend_from_slice(&self.create_tag_column_exprs()?);
LogicalPlanBuilder::from(input)
let builder = LogicalPlanBuilder::from(input)
.project(func_exprs)
.context(DataFusionPlanningSnafu)?
.filter(self.create_empty_values_filter_expr()?)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)
.context(DataFusionPlanningSnafu)?;
let builder = match func.name {
"sort" => builder
.sort(self.create_field_columns_sort_exprs(true))
.context(DataFusionPlanningSnafu)?,
"sort_desc" => builder
.sort(self.create_field_columns_sort_exprs(false))
.context(DataFusionPlanningSnafu)?,
"sort_by_label" => builder
.sort(Self::create_sort_exprs_by_tags(
func.name,
args.literals,
true,
)?)
.context(DataFusionPlanningSnafu)?,
"sort_by_label_desc" => builder
.sort(Self::create_sort_exprs_by_tags(
func.name,
args.literals,
false,
)?)
.context(DataFusionPlanningSnafu)?,
_ => builder,
};
builder.build().context(DataFusionPlanningSnafu)
}
async fn prom_ext_expr_to_plan(
@@ -1432,6 +1458,16 @@ impl PromPlanner {
ScalarFunc::GeneratedExpr
}
"sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => {
// These functions are not expression but a part of plan,
// they are processed by `prom_call_expr_to_plan`.
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}
ScalarFunc::GeneratedExpr
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
@@ -1691,6 +1727,37 @@ impl PromPlanner {
Ok(result)
}
fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec<SortExpr> {
self.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, false))
.collect::<Vec<_>>()
}
fn create_sort_exprs_by_tags(
func: &str,
tags: Vec<DfExpr>,
asc: bool,
) -> Result<Vec<SortExpr>> {
ensure!(
!tags.is_empty(),
FunctionInvalidArgumentSnafu { fn_name: func }
);
tags.iter()
.map(|col| match col {
DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false))
}
other => UnexpectedPlanExprSnafu {
desc: format!("expected label string literal, but found {:?}", other),
}
.fail(),
})
.collect::<Result<Vec<_>>>()
}
fn create_empty_values_filter_expr(&self) -> Result<DfExpr> {
let mut exprs = Vec::with_capacity(self.ctx.field_columns.len());
for value in &self.ctx.field_columns {

View File

@@ -0,0 +1,154 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
Affected Rows: 0
INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2', 3),
(5000, 'host2', 'idc2', 4),
(10000, 'host1', 'idc3', 5),
(10000, 'host2', 'idc3', 6),
(15000, 'host1', 'idc4', 7),
(15000, 'host2', 'idc4', 8);
Affected Rows: 8
TQL EVAL (0, 15, '5s') sort(test{host="host1"});
+---------------------+-----+-------+------+
| ts | val | host | idc |
+---------------------+-----+-------+------+
| 1970-01-01T00:00:00 | 1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | host1 | idc2 |
| 1970-01-01T00:00:10 | 3 | host1 | idc2 |
| 1970-01-01T00:00:15 | 3 | host1 | idc2 |
| 1970-01-01T00:00:10 | 5 | host1 | idc3 |
| 1970-01-01T00:00:15 | 5 | host1 | idc3 |
| 1970-01-01T00:00:15 | 7 | host1 | idc4 |
+---------------------+-----+-------+------+
TQL EVAL (0, 15, '5s') sort_desc(test{host="host1"});
+---------------------+-----+-------+------+
| ts | val | host | idc |
+---------------------+-----+-------+------+
| 1970-01-01T00:00:15 | 7 | host1 | idc4 |
| 1970-01-01T00:00:10 | 5 | host1 | idc3 |
| 1970-01-01T00:00:15 | 5 | host1 | idc3 |
| 1970-01-01T00:00:05 | 3 | host1 | idc2 |
| 1970-01-01T00:00:10 | 3 | host1 | idc2 |
| 1970-01-01T00:00:15 | 3 | host1 | idc2 |
| 1970-01-01T00:00:00 | 1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 1 | host1 | idc1 |
+---------------------+-----+-------+------+
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort(sum(test{host="host2"}) by (idc));
+---------------------+---------------+------+
| ts | sum(test.val) | idc |
+---------------------+---------------+------+
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 6 | idc3 |
|timestamp | 6 | idc3 |
|timestamp | 8 | idc4 |
+---------------------+---------------+------+
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort_desc(sum(test{host="host2"}) by (idc));
+---------------------+---------------+------+
| ts | sum(test.val) | idc |
+---------------------+---------------+------+
|timestamp | 8 | idc4 |
|timestamp | 6 | idc3 |
|timestamp | 6 | idc3 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 4 | idc2 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
|timestamp | 2 | idc1 |
+---------------------+---------------+------+
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label(sum(test) by (idc, host), "idc", "host");
+---------------------+---------------+------+-------+
| ts | sum(test.val) | idc | host |
+---------------------+---------------+------+-------+
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc4 | host1 |
|timestamp |val | idc4 | host2 |
+---------------------+---------------+------+-------+
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label_desc(sum(test) by (idc, host), "idc", "host");
+---------------------+---------------+------+-------+
| ts | sum(test.val) | idc | host |
+---------------------+---------------+------+-------+
|timestamp |val | idc4 | host2 |
|timestamp |val | idc4 | host1 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc3 | host2 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc3 | host1 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host2 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc2 | host1 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host2 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
|timestamp |val | idc1 | host1 |
+---------------------+---------------+------+-------+
drop table test;
Affected Rows: 0

View File

@@ -0,0 +1,38 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);
INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2', 3),
(5000, 'host2', 'idc2', 4),
(10000, 'host1', 'idc3', 5),
(10000, 'host2', 'idc3', 6),
(15000, 'host1', 'idc4', 7),
(15000, 'host2', 'idc4', 8);
TQL EVAL (0, 15, '5s') sort(test{host="host1"});
TQL EVAL (0, 15, '5s') sort_desc(test{host="host1"});
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort(sum(test{host="host2"}) by (idc));
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
TQL EVAL (0, 15, '5s') sort_desc(sum(test{host="host2"}) by (idc));
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label(sum(test) by (idc, host), "idc", "host");
-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp
-- SQLNESS REPLACE (\s\d\s) val
TQL EVAL (0, 15, '5s') sort_by_label_desc(sum(test) by (idc, host), "idc", "host");
drop table test;