diff --git a/Cargo.lock b/Cargo.lock index 42857f1e18..802f2567d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8756,8 +8756,7 @@ dependencies = [ [[package]] name = "promql-parser" version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fe99e6f80a79abccf1e8fb48dd63473a36057e600cc6ea36147c8318698ae6f" +source = "git+https://github.com/GreptimeTeam/promql-parser.git?rev=27abb8e16003a50c720f00d6c85f41f5fa2a2a8e#27abb8e16003a50c720f00d6c85f41f5fa2a2a8e" dependencies = [ "cfgrammar", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 5b9f893f19..c74e743ad9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -160,7 +160,9 @@ parquet = { version = "53.0.0", default-features = false, features = ["arrow", " paste = "1.0" pin-project = "1.0" prometheus = { version = "0.13.3", features = ["process"] } -promql-parser = { version = "0.4.3", features = ["ser"] } +promql-parser = { git = "https://github.com/GreptimeTeam/promql-parser.git", features = [ + "ser", +], rev = "27abb8e16003a50c720f00d6c85f41f5fa2a2a8e" } prost = "0.13" raft-engine = { version = "0.4.1", default-features = false } rand = "0.8" diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index f233f75080..42bf447e95 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -612,8 +612,8 @@ impl PromPlanner { // transform function arguments let args = self.create_function_args(&args.args)?; - let input = if let Some(prom_expr) = args.input { - self.prom_expr_to_plan(&prom_expr, session_state).await? + let input = if let Some(prom_expr) = &args.input { + self.prom_expr_to_plan(prom_expr, session_state).await? } else { self.ctx.time_index_column = Some(SPECIAL_TIME_FUNCTION.to_string()); self.ctx.reset_table_name_and_schema(); @@ -631,17 +631,43 @@ impl PromPlanner { ), }) }; - let mut func_exprs = self.create_function_expr(func, args.literals, session_state)?; + let mut func_exprs = + self.create_function_expr(func, args.literals.clone(), session_state)?; func_exprs.insert(0, self.create_time_index_column_expr()?); func_exprs.extend_from_slice(&self.create_tag_column_exprs()?); - LogicalPlanBuilder::from(input) + let builder = LogicalPlanBuilder::from(input) .project(func_exprs) .context(DataFusionPlanningSnafu)? .filter(self.create_empty_values_filter_expr()?) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu) + .context(DataFusionPlanningSnafu)?; + + let builder = match func.name { + "sort" => builder + .sort(self.create_field_columns_sort_exprs(true)) + .context(DataFusionPlanningSnafu)?, + "sort_desc" => builder + .sort(self.create_field_columns_sort_exprs(false)) + .context(DataFusionPlanningSnafu)?, + "sort_by_label" => builder + .sort(Self::create_sort_exprs_by_tags( + func.name, + args.literals, + true, + )?) + .context(DataFusionPlanningSnafu)?, + "sort_by_label_desc" => builder + .sort(Self::create_sort_exprs_by_tags( + func.name, + args.literals, + false, + )?) + .context(DataFusionPlanningSnafu)?, + + _ => builder, + }; + + builder.build().context(DataFusionPlanningSnafu) } async fn prom_ext_expr_to_plan( @@ -1432,6 +1458,16 @@ impl PromPlanner { ScalarFunc::GeneratedExpr } + "sort" | "sort_desc" | "sort_by_label" | "sort_by_label_desc" => { + // These functions are not expression but a part of plan, + // they are processed by `prom_call_expr_to_plan`. + for value in &self.ctx.field_columns { + let expr = DfExpr::Column(Column::from_name(value)); + exprs.push(expr); + } + + ScalarFunc::GeneratedExpr + } _ => { if let Some(f) = session_state.scalar_functions().get(func.name) { ScalarFunc::DataFusionBuiltin(f.clone()) @@ -1691,6 +1727,37 @@ impl PromPlanner { Ok(result) } + fn create_field_columns_sort_exprs(&self, asc: bool) -> Vec { + self.ctx + .field_columns + .iter() + .map(|col| DfExpr::Column(Column::from_name(col)).sort(asc, false)) + .collect::>() + } + + fn create_sort_exprs_by_tags( + func: &str, + tags: Vec, + asc: bool, + ) -> Result> { + ensure!( + !tags.is_empty(), + FunctionInvalidArgumentSnafu { fn_name: func } + ); + + tags.iter() + .map(|col| match col { + DfExpr::Literal(ScalarValue::Utf8(Some(label))) => { + Ok(DfExpr::Column(Column::from_name(label)).sort(asc, false)) + } + other => UnexpectedPlanExprSnafu { + desc: format!("expected label string literal, but found {:?}", other), + } + .fail(), + }) + .collect::>>() + } + fn create_empty_values_filter_expr(&self) -> Result { let mut exprs = Vec::with_capacity(self.ctx.field_columns.len()); for value in &self.ctx.field_columns { diff --git a/tests/cases/standalone/common/promql/sort.result b/tests/cases/standalone/common/promql/sort.result new file mode 100644 index 0000000000..ce878baa5a --- /dev/null +++ b/tests/cases/standalone/common/promql/sort.result @@ -0,0 +1,154 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +Affected Rows: 0 + +INSERT INTO TABLE test VALUES + (0, 'host1', 'idc1', 1), + (0, 'host2', 'idc1', 2), + (5000, 'host1', 'idc2', 3), + (5000, 'host2', 'idc2', 4), + (10000, 'host1', 'idc3', 5), + (10000, 'host2', 'idc3', 6), + (15000, 'host1', 'idc4', 7), + (15000, 'host2', 'idc4', 8); + +Affected Rows: 8 + +TQL EVAL (0, 15, '5s') sort(test{host="host1"}); + ++---------------------+-----+-------+------+ +| ts | val | host | idc | ++---------------------+-----+-------+------+ +| 1970-01-01T00:00:00 | 1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 3 | host1 | idc2 | +| 1970-01-01T00:00:10 | 3 | host1 | idc2 | +| 1970-01-01T00:00:15 | 3 | host1 | idc2 | +| 1970-01-01T00:00:10 | 5 | host1 | idc3 | +| 1970-01-01T00:00:15 | 5 | host1 | idc3 | +| 1970-01-01T00:00:15 | 7 | host1 | idc4 | ++---------------------+-----+-------+------+ + +TQL EVAL (0, 15, '5s') sort_desc(test{host="host1"}); + ++---------------------+-----+-------+------+ +| ts | val | host | idc | ++---------------------+-----+-------+------+ +| 1970-01-01T00:00:15 | 7 | host1 | idc4 | +| 1970-01-01T00:00:10 | 5 | host1 | idc3 | +| 1970-01-01T00:00:15 | 5 | host1 | idc3 | +| 1970-01-01T00:00:05 | 3 | host1 | idc2 | +| 1970-01-01T00:00:10 | 3 | host1 | idc2 | +| 1970-01-01T00:00:15 | 3 | host1 | idc2 | +| 1970-01-01T00:00:00 | 1 | host1 | idc1 | +| 1970-01-01T00:00:05 | 1 | host1 | idc1 | +| 1970-01-01T00:00:10 | 1 | host1 | idc1 | +| 1970-01-01T00:00:15 | 1 | host1 | idc1 | ++---------------------+-----+-------+------+ + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +TQL EVAL (0, 15, '5s') sort(sum(test{host="host2"}) by (idc)); + ++---------------------+---------------+------+ +| ts | sum(test.val) | idc | ++---------------------+---------------+------+ +|timestamp | 2 | idc1 | +|timestamp | 2 | idc1 | +|timestamp | 2 | idc1 | +|timestamp | 2 | idc1 | +|timestamp | 4 | idc2 | +|timestamp | 4 | idc2 | +|timestamp | 4 | idc2 | +|timestamp | 6 | idc3 | +|timestamp | 6 | idc3 | +|timestamp | 8 | idc4 | ++---------------------+---------------+------+ + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +TQL EVAL (0, 15, '5s') sort_desc(sum(test{host="host2"}) by (idc)); + ++---------------------+---------------+------+ +| ts | sum(test.val) | idc | ++---------------------+---------------+------+ +|timestamp | 8 | idc4 | +|timestamp | 6 | idc3 | +|timestamp | 6 | idc3 | +|timestamp | 4 | idc2 | +|timestamp | 4 | idc2 | +|timestamp | 4 | idc2 | +|timestamp | 2 | idc1 | +|timestamp | 2 | idc1 | +|timestamp | 2 | idc1 | +|timestamp | 2 | idc1 | ++---------------------+---------------+------+ + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +-- SQLNESS REPLACE (\s\d\s) val +TQL EVAL (0, 15, '5s') sort_by_label(sum(test) by (idc, host), "idc", "host"); + ++---------------------+---------------+------+-------+ +| ts | sum(test.val) | idc | host | ++---------------------+---------------+------+-------+ +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc2 | host1 | +|timestamp |val | idc2 | host1 | +|timestamp |val | idc2 | host1 | +|timestamp |val | idc2 | host2 | +|timestamp |val | idc2 | host2 | +|timestamp |val | idc2 | host2 | +|timestamp |val | idc3 | host1 | +|timestamp |val | idc3 | host1 | +|timestamp |val | idc3 | host2 | +|timestamp |val | idc3 | host2 | +|timestamp |val | idc4 | host1 | +|timestamp |val | idc4 | host2 | ++---------------------+---------------+------+-------+ + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +-- SQLNESS REPLACE (\s\d\s) val +TQL EVAL (0, 15, '5s') sort_by_label_desc(sum(test) by (idc, host), "idc", "host"); + ++---------------------+---------------+------+-------+ +| ts | sum(test.val) | idc | host | ++---------------------+---------------+------+-------+ +|timestamp |val | idc4 | host2 | +|timestamp |val | idc4 | host1 | +|timestamp |val | idc3 | host2 | +|timestamp |val | idc3 | host2 | +|timestamp |val | idc3 | host1 | +|timestamp |val | idc3 | host1 | +|timestamp |val | idc2 | host2 | +|timestamp |val | idc2 | host2 | +|timestamp |val | idc2 | host2 | +|timestamp |val | idc2 | host1 | +|timestamp |val | idc2 | host1 | +|timestamp |val | idc2 | host1 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host2 | +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host1 | +|timestamp |val | idc1 | host1 | ++---------------------+---------------+------+-------+ + +drop table test; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/sort.sql b/tests/cases/standalone/common/promql/sort.sql new file mode 100644 index 0000000000..83e34a2f5b --- /dev/null +++ b/tests/cases/standalone/common/promql/sort.sql @@ -0,0 +1,38 @@ +CREATE TABLE test ( + ts timestamp(3) time index, + host STRING, + idc STRING, + val BIGINT, + PRIMARY KEY(host, idc), +); + +INSERT INTO TABLE test VALUES + (0, 'host1', 'idc1', 1), + (0, 'host2', 'idc1', 2), + (5000, 'host1', 'idc2', 3), + (5000, 'host2', 'idc2', 4), + (10000, 'host1', 'idc3', 5), + (10000, 'host2', 'idc3', 6), + (15000, 'host1', 'idc4', 7), + (15000, 'host2', 'idc4', 8); + + +TQL EVAL (0, 15, '5s') sort(test{host="host1"}); + +TQL EVAL (0, 15, '5s') sort_desc(test{host="host1"}); + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +TQL EVAL (0, 15, '5s') sort(sum(test{host="host2"}) by (idc)); + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +TQL EVAL (0, 15, '5s') sort_desc(sum(test{host="host2"}) by (idc)); + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +-- SQLNESS REPLACE (\s\d\s) val +TQL EVAL (0, 15, '5s') sort_by_label(sum(test) by (idc, host), "idc", "host"); + +-- SQLNESS REPLACE (\s1970-01-01T\d\d:\d\d:\d\d) timestamp +-- SQLNESS REPLACE (\s\d\s) val +TQL EVAL (0, 15, '5s') sort_by_label_desc(sum(test) by (idc, host), "idc", "host"); + +drop table test;