diff --git a/Cargo.lock b/Cargo.lock index ae69448c6c..123bdaadec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12675,6 +12675,7 @@ dependencies = [ "jsonb", "lazy_static", "pretty_assertions", + "promql-parser", "regex", "serde", "serde_json", diff --git a/src/catalog/src/system_schema/information_schema/flows.rs b/src/catalog/src/system_schema/information_schema/flows.rs index 531fb0a5ce..5a6c3c755e 100644 --- a/src/catalog/src/system_schema/information_schema/flows.rs +++ b/src/catalog/src/system_schema/information_schema/flows.rs @@ -327,19 +327,14 @@ impl InformationSchemaFlowsBuilder { })); let mut source_table_names = vec![]; - let catalog_name = self.catalog_name.clone(); let catalog_manager = self .catalog_manager .upgrade() .context(UpgradeWeakCatalogManagerRefSnafu)?; - for schema_name in catalog_manager.schema_names(&catalog_name, None).await? { - source_table_names.extend( - catalog_manager - .tables_by_ids(&catalog_name, &schema_name, flow_info.source_table_ids()) - .await? - .into_iter() - .map(|table| table.table_info().full_table_name()), - ); + for table_id in flow_info.source_table_ids() { + if let Some(table_info) = catalog_manager.table_info_by_id(*table_id).await? { + source_table_names.push(table_info.full_table_name()); + } } let source_table_names = source_table_names.join(","); diff --git a/src/operator/src/expr_helper.rs b/src/operator/src/expr_helper.rs index 17e6522894..968ec74b6f 100644 --- a/src/operator/src/expr_helper.rs +++ b/src/operator/src/expr_helper.rs @@ -1116,11 +1116,51 @@ TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http "greptime.public.cnt_reqs", expr.sink_table_name.map(to_dot_sep).unwrap() ); - assert!(expr.source_table_names.is_empty()); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.public.http_requests", + to_dot_sep(expr.source_table_names[0].clone()) + ); assert_eq!( r#"TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests)"#, expr.sql ); + + let sql = r#" +CREATE FLOW calc_reqs SINK TO cnt_reqs AS +TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.greptime_private.http_requests", + to_dot_sep(expr.source_table_names[0].clone()) + ); + + let sql = r#" +CREATE FLOW calc_reqs SINK TO cnt_reqs AS +TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__database__="greptime_private"});"#; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + let Statement::CreateFlow(create_flow) = stmt else { + unreachable!() + }; + let expr = to_create_flow_task_expr(create_flow, &QueryContext::arc()).unwrap(); + assert_eq!(1, expr.source_table_names.len()); + assert_eq!( + "greptime.greptime_private.http_requests", + to_dot_sep(expr.source_table_names[0].clone()) + ); } #[test] diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 7d7e0bf1da..1ab966fc7e 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -36,6 +36,7 @@ iso8601 = "0.6.1" itertools.workspace = true jsonb.workspace = true lazy_static.workspace = true +promql-parser.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index 4134c8bbf6..1f533468e0 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -16,17 +16,28 @@ use std::collections::HashSet; use std::fmt::{Display, Formatter}; use itertools::Itertools; +use promql_parser::label::{METRIC_NAME, MatchOp}; +use promql_parser::parser::{ + AggregateExpr as PromAggregateExpr, BinaryExpr as PromBinaryExpr, Call as PromCall, + Expr as PromExpr, MatrixSelector as PromMatrixSelector, ParenExpr as PromParenExpr, + SubqueryExpr as PromSubqueryExpr, UnaryExpr as PromUnaryExpr, + VectorSelector as PromVectorSelector, +}; use serde::Serialize; use snafu::ensure; use sqlparser::ast::{ - Array, Expr, Ident, ObjectName, SetExpr, SqlOption, StructField, TableFactor, Value, - ValueWithSpan, + Array, Expr, Ident, ObjectName, ObjectNamePart, SetExpr, SqlOption, StructField, TableFactor, + Value, ValueWithSpan, }; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::ObjectNamePartExt; use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result}; use crate::statements::create::SqlOrTql; +use crate::statements::tql::Tql; + +const SCHEMA_MATCHER: &str = "__schema__"; +const DATABASE_MATCHER: &str = "__database__"; /// Format an [ObjectName] without any quote of its idents. pub fn format_raw_object_name(name: &ObjectName) -> String { @@ -181,15 +192,90 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator extract_tables_from_set_expr(&query.body, &mut names), - SqlOrTql::Tql(_tql, _) => { - // since tql have sliding time window, so we don't need to extract tables from it - // (because we are going to eval it fully anyway) - } + SqlOrTql::Tql(tql, _) => extract_tables_from_tql(tql, &mut names), } names.into_iter() } +fn extract_tables_from_tql(tql: &Tql, names: &mut HashSet) { + let promql = match tql { + Tql::Eval(eval) => &eval.query, + Tql::Explain(explain) => &explain.query, + Tql::Analyze(analyze) => &analyze.query, + }; + + if let Ok(expr) = promql_parser::parser::parse(promql) { + extract_tables_from_prom_expr(&expr, names); + } +} + +fn extract_tables_from_prom_expr(expr: &PromExpr, names: &mut HashSet) { + match expr { + PromExpr::Aggregate(PromAggregateExpr { expr, .. }) => { + extract_tables_from_prom_expr(expr, names); + } + PromExpr::Unary(PromUnaryExpr { expr, .. }) => { + extract_tables_from_prom_expr(expr, names); + } + PromExpr::Binary(PromBinaryExpr { lhs, rhs, .. }) => { + extract_tables_from_prom_expr(lhs, names); + extract_tables_from_prom_expr(rhs, names); + } + PromExpr::Paren(PromParenExpr { expr }) => { + extract_tables_from_prom_expr(expr, names); + } + PromExpr::Subquery(PromSubqueryExpr { expr, .. }) => { + extract_tables_from_prom_expr(expr, names); + } + PromExpr::VectorSelector(selector) => { + extract_metric_name_from_vector_selector(selector, names); + } + PromExpr::MatrixSelector(PromMatrixSelector { vs, .. }) => { + extract_metric_name_from_vector_selector(vs, names); + } + PromExpr::Call(PromCall { args, .. }) => { + for arg in &args.args { + extract_tables_from_prom_expr(arg, names); + } + } + PromExpr::NumberLiteral(_) | PromExpr::StringLiteral(_) | PromExpr::Extension(_) => {} + } +} + +fn extract_metric_name_from_vector_selector( + selector: &PromVectorSelector, + names: &mut HashSet, +) { + let metric_name = selector.name.clone().or_else(|| { + let mut metric_name_matchers = selector.matchers.find_matchers(METRIC_NAME); + if metric_name_matchers.len() == 1 && metric_name_matchers[0].op == MatchOp::Equal { + metric_name_matchers.pop().map(|matcher| matcher.value) + } else { + None + } + }); + let Some(metric_name) = metric_name else { + return; + }; + + let schema_matcher = selector.matchers.matchers.iter().rev().find(|matcher| { + matcher.op == MatchOp::Equal + && (matcher.name == SCHEMA_MATCHER || matcher.name == DATABASE_MATCHER) + }); + + if let Some(schema) = schema_matcher { + names.insert(ObjectName(vec![ + ObjectNamePart::Identifier(Ident::new(&schema.value)), + ObjectNamePart::Identifier(Ident::new(metric_name)), + ])); + } else { + names.insert(ObjectName(vec![ObjectNamePart::Identifier(Ident::new( + metric_name, + ))])); + } +} + /// translate the start location to the index in the sql string pub fn location_to_index(sql: &str, location: &sqlparser::tokenizer::Location) -> usize { let mut index = 0; @@ -245,7 +331,8 @@ mod tests { use super::*; use crate::dialect::GreptimeDbDialect; - use crate::parser::ParserContext; + use crate::parser::{ParseOptions, ParserContext}; + use crate::statements::statement::Statement; #[test] fn test_location_to_index() { @@ -287,4 +374,59 @@ WHERE a = } } } + + #[test] + fn test_extract_tables_from_tql_query() { + let testcases = vec![ + ( + r#" +CREATE FLOW calc_reqs SINK TO cnt_reqs AS +TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#, + vec!["http_requests".to_string()], + ), + ( + r#" +CREATE FLOW calc_reqs SINK TO cnt_reqs AS +TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", {__name__="http_requests"});"#, + vec!["http_requests".to_string()], + ), + ]; + + for (sql, expected_tables) in testcases { + let mut stmts = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap(); + let Statement::CreateFlow(create_flow) = stmts.pop().unwrap() else { + unreachable!() + }; + + let mut tables = extract_tables_from_query(&create_flow.query) + .map(|table| format_raw_object_name(&table)) + .collect_vec(); + tables.sort(); + assert_eq!(expected_tables, tables); + } + } + + #[test] + fn test_extract_tables_from_tql_query_with_schema_matcher() { + let sql = r#" +CREATE FLOW calc_reqs SINK TO cnt_reqs AS +TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + let Statement::CreateFlow(create_flow) = stmts.pop().unwrap() else { + unreachable!() + }; + + let mut tables = extract_tables_from_query(&create_flow.query) + .map(|table| format_raw_object_name(&table)) + .collect_vec(); + tables.sort(); + assert_eq!(vec!["greptime_private.http_requests".to_string()], tables); + } } diff --git a/tests/cases/distributed/flow-tql/flow_tql.result b/tests/cases/distributed/flow-tql/flow_tql.result index 7e56009552..040ca34dbc 100644 --- a/tests/cases/distributed/flow-tql/flow_tql.result +++ b/tests/cases/distributed/flow-tql/flow_tql.result @@ -13,6 +13,14 @@ TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_ Affected Rows: 0 +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_reqs'; + ++-------------------------------+ +| source_table_names | ++-------------------------------+ +| greptime.public.http_requests | ++-------------------------------+ + SHOW CREATE TABLE cnt_reqs; +----------+---------------------------------------------------+ @@ -380,6 +388,29 @@ SELECT count(*)>0 FROM rate_reqs; | true | +---------------------+ +CREATE FLOW calc_rate_by_matcher +SINK TO rate_reqs_by_matcher +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate({__name__="http_requests_total",job="my_service"}[1m]); + +Affected Rows: 0 + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_rate_by_matcher'; + ++-------------------------------------+ +| source_table_names | ++-------------------------------------+ +| greptime.public.http_requests_total | ++-------------------------------------+ + +DROP FLOW calc_rate_by_matcher; + +Affected Rows: 0 + +DROP TABLE rate_reqs_by_matcher; + +Affected Rows: 0 + DROP FLOW calc_rate; Affected Rows: 0 diff --git a/tests/cases/standalone/flow-tql/flow_tql.result b/tests/cases/standalone/flow-tql/flow_tql.result index 72fe7759ae..50e1d000bc 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.result +++ b/tests/cases/standalone/flow-tql/flow_tql.result @@ -13,6 +13,14 @@ TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_ Affected Rows: 0 +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_reqs'; + ++-------------------------------+ +| source_table_names | ++-------------------------------+ +| greptime.public.http_requests | ++-------------------------------+ + SHOW CREATE TABLE cnt_reqs; +----------+---------------------------------------------------+ @@ -380,6 +388,29 @@ SELECT count(*)>0 FROM rate_reqs; | true | +---------------------+ +CREATE FLOW calc_rate_by_matcher +SINK TO rate_reqs_by_matcher +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate({__name__="http_requests_total",job="my_service"}[1m]); + +Affected Rows: 0 + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_rate_by_matcher'; + ++-------------------------------------+ +| source_table_names | ++-------------------------------------+ +| greptime.public.http_requests_total | ++-------------------------------------+ + +DROP FLOW calc_rate_by_matcher; + +Affected Rows: 0 + +DROP TABLE rate_reqs_by_matcher; + +Affected Rows: 0 + DROP FLOW calc_rate; Affected Rows: 0 diff --git a/tests/cases/standalone/flow-tql/flow_tql.sql b/tests/cases/standalone/flow-tql/flow_tql.sql index c32c10ff3a..1eb016011c 100644 --- a/tests/cases/standalone/flow-tql/flow_tql.sql +++ b/tests/cases/standalone/flow-tql/flow_tql.sql @@ -9,6 +9,8 @@ CREATE TABLE http_requests ( CREATE FLOW calc_reqs SINK TO cnt_reqs EVAL INTERVAL '1m' AS TQL EVAL (now() - '1m'::interval, now(), '5s') count_values("status_code", http_requests); +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_reqs'; + SHOW CREATE TABLE cnt_reqs; -- test if sink table is tql queryable @@ -177,6 +179,16 @@ ADMIN FLUSH_FLOW('calc_rate'); SELECT count(*)>0 FROM rate_reqs; +CREATE FLOW calc_rate_by_matcher +SINK TO rate_reqs_by_matcher +EVAL INTERVAL '1m' AS +TQL EVAL (now() - '1m'::interval, now(), '30s') rate({__name__="http_requests_total",job="my_service"}[1m]); + +SELECT source_table_names FROM information_schema.flows WHERE flow_name = 'calc_rate_by_matcher'; + +DROP FLOW calc_rate_by_matcher; +DROP TABLE rate_reqs_by_matcher; + DROP FLOW calc_rate; DROP TABLE http_requests_total; DROP TABLE rate_reqs;