feat: track flow source tables for TQL and info schema (#7697)

* feat: track flow source tables for TQL and info schema

* handle schema matcher

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* sqlness tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* cover __name__ case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-02-11 11:19:09 +08:00
committed by GitHub
parent 43afb7962a
commit db46849f40
8 changed files with 270 additions and 17 deletions

View File

@@ -36,6 +36,7 @@ iso8601 = "0.6.1"
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
promql-parser.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -16,17 +16,28 @@ use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use itertools::Itertools;
use promql_parser::label::{METRIC_NAME, MatchOp};
use promql_parser::parser::{
AggregateExpr as PromAggregateExpr, BinaryExpr as PromBinaryExpr, Call as PromCall,
Expr as PromExpr, MatrixSelector as PromMatrixSelector, ParenExpr as PromParenExpr,
SubqueryExpr as PromSubqueryExpr, UnaryExpr as PromUnaryExpr,
VectorSelector as PromVectorSelector,
};
use serde::Serialize;
use snafu::ensure;
use sqlparser::ast::{
Array, Expr, Ident, ObjectName, SetExpr, SqlOption, StructField, TableFactor, Value,
ValueWithSpan,
Array, Expr, Ident, ObjectName, ObjectNamePart, SetExpr, SqlOption, StructField, TableFactor,
Value, ValueWithSpan,
};
use sqlparser_derive::{Visit, VisitMut};
use crate::ast::ObjectNamePartExt;
use crate::error::{InvalidExprAsOptionValueSnafu, InvalidSqlSnafu, Result};
use crate::statements::create::SqlOrTql;
use crate::statements::tql::Tql;
const SCHEMA_MATCHER: &str = "__schema__";
const DATABASE_MATCHER: &str = "__database__";
/// Format an [ObjectName] without any quote of its idents.
pub fn format_raw_object_name(name: &ObjectName) -> String {
@@ -181,15 +192,90 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator<Item = Objec
match query {
SqlOrTql::Sql(query, _) => extract_tables_from_set_expr(&query.body, &mut names),
SqlOrTql::Tql(_tql, _) => {
// since tql have sliding time window, so we don't need to extract tables from it
// (because we are going to eval it fully anyway)
}
SqlOrTql::Tql(tql, _) => extract_tables_from_tql(tql, &mut names),
}
names.into_iter()
}
fn extract_tables_from_tql(tql: &Tql, names: &mut HashSet<ObjectName>) {
let promql = match tql {
Tql::Eval(eval) => &eval.query,
Tql::Explain(explain) => &explain.query,
Tql::Analyze(analyze) => &analyze.query,
};
if let Ok(expr) = promql_parser::parser::parse(promql) {
extract_tables_from_prom_expr(&expr, names);
}
}
fn extract_tables_from_prom_expr(expr: &PromExpr, names: &mut HashSet<ObjectName>) {
match expr {
PromExpr::Aggregate(PromAggregateExpr { expr, .. }) => {
extract_tables_from_prom_expr(expr, names);
}
PromExpr::Unary(PromUnaryExpr { expr, .. }) => {
extract_tables_from_prom_expr(expr, names);
}
PromExpr::Binary(PromBinaryExpr { lhs, rhs, .. }) => {
extract_tables_from_prom_expr(lhs, names);
extract_tables_from_prom_expr(rhs, names);
}
PromExpr::Paren(PromParenExpr { expr }) => {
extract_tables_from_prom_expr(expr, names);
}
PromExpr::Subquery(PromSubqueryExpr { expr, .. }) => {
extract_tables_from_prom_expr(expr, names);
}
PromExpr::VectorSelector(selector) => {
extract_metric_name_from_vector_selector(selector, names);
}
PromExpr::MatrixSelector(PromMatrixSelector { vs, .. }) => {
extract_metric_name_from_vector_selector(vs, names);
}
PromExpr::Call(PromCall { args, .. }) => {
for arg in &args.args {
extract_tables_from_prom_expr(arg, names);
}
}
PromExpr::NumberLiteral(_) | PromExpr::StringLiteral(_) | PromExpr::Extension(_) => {}
}
}
fn extract_metric_name_from_vector_selector(
selector: &PromVectorSelector,
names: &mut HashSet<ObjectName>,
) {
let metric_name = selector.name.clone().or_else(|| {
let mut metric_name_matchers = selector.matchers.find_matchers(METRIC_NAME);
if metric_name_matchers.len() == 1 && metric_name_matchers[0].op == MatchOp::Equal {
metric_name_matchers.pop().map(|matcher| matcher.value)
} else {
None
}
});
let Some(metric_name) = metric_name else {
return;
};
let schema_matcher = selector.matchers.matchers.iter().rev().find(|matcher| {
matcher.op == MatchOp::Equal
&& (matcher.name == SCHEMA_MATCHER || matcher.name == DATABASE_MATCHER)
});
if let Some(schema) = schema_matcher {
names.insert(ObjectName(vec![
ObjectNamePart::Identifier(Ident::new(&schema.value)),
ObjectNamePart::Identifier(Ident::new(metric_name)),
]));
} else {
names.insert(ObjectName(vec![ObjectNamePart::Identifier(Ident::new(
metric_name,
))]));
}
}
/// translate the start location to the index in the sql string
pub fn location_to_index(sql: &str, location: &sqlparser::tokenizer::Location) -> usize {
let mut index = 0;
@@ -245,7 +331,8 @@ mod tests {
use super::*;
use crate::dialect::GreptimeDbDialect;
use crate::parser::ParserContext;
use crate::parser::{ParseOptions, ParserContext};
use crate::statements::statement::Statement;
#[test]
fn test_location_to_index() {
@@ -287,4 +374,59 @@ WHERE a =
}
}
}
#[test]
fn test_extract_tables_from_tql_query() {
let testcases = vec![
(
r#"
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests);"#,
vec!["http_requests".to_string()],
),
(
r#"
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", {__name__="http_requests"});"#,
vec!["http_requests".to_string()],
),
];
for (sql, expected_tables) in testcases {
let mut stmts = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
let Statement::CreateFlow(create_flow) = stmts.pop().unwrap() else {
unreachable!()
};
let mut tables = extract_tables_from_query(&create_flow.query)
.map(|table| format_raw_object_name(&table))
.collect_vec();
tables.sort();
assert_eq!(expected_tables, tables);
}
}
#[test]
fn test_extract_tables_from_tql_query_with_schema_matcher() {
let sql = r#"
CREATE FLOW calc_reqs SINK TO cnt_reqs AS
TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", http_requests{__schema__="greptime_private"});"#;
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
let Statement::CreateFlow(create_flow) = stmts.pop().unwrap() else {
unreachable!()
};
let mut tables = extract_tables_from_query(&create_flow.query)
.map(|table| format_raw_object_name(&table))
.collect_vec();
tables.sort();
assert_eq!(vec!["greptime_private.http_requests".to_string()], tables);
}
}