diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 5bd328ad9a..3cd96b7525 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -959,7 +959,7 @@ impl BatchingTask { let (expire_lower_bound, expire_upper_bound) = match (expire_time_window_bound, &self.config.query_type) { (Some((Some(l), Some(u))), QueryType::Sql) => (l, u), - (None, QueryType::Sql) => { + (None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => { // if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns) // use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason debug!( @@ -980,7 +980,8 @@ impl BatchingTask { } _ => { // Clean dirty windows for full-query/non-scoped paths, - // such as TQL, that cannot use a time-window filter. + // such as TQL or evaluation-interval SQL without a recognized + // time-window expression, that cannot use a time-window filter. let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal(); let plan_info = self diff --git a/src/flow/src/batching_mode/task/test.rs b/src/flow/src/batching_mode/task/test.rs index d64b4ef1b9..c42d564ce2 100644 --- a/src/flow/src/batching_mode/task/test.rs +++ b/src/flow/src/batching_mode/task/test.rs @@ -974,6 +974,38 @@ async fn test_non_scoped_path_generates_plan_with_empty_dirty_signal() { assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); } +#[tokio::test] +async fn test_no_time_window_sql_with_eval_interval_generates_plan_without_dirty_signal() { + let TestTaskParts { + mut task, + query_engine, + .. + } = new_test_task_engine_and_plan_with_query( + "SELECT number, ts FROM numbers_with_ts", + "missing_sink", + ) + .await; + Arc::get_mut(&mut task.config) + .expect("test task config should be uniquely owned") + .flow_eval_interval = Some(Duration::from_secs(60)); + task.state.write().unwrap().dirty_time_windows.clean(); + let sink_schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("number", CDT::uint32_datatype(), false), + ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true), + ])); + + let plan = task + .gen_query_with_time_window(query_engine, &sink_schema, &[], false, None) + .await + .unwrap() + .expect( + "eval-interval SQL without a time-window expr should run by interval, not dirty signal", + ); + + assert!(plan.can_advance_checkpoints); + assert!(task.state.read().unwrap().dirty_time_windows.is_empty()); +} + #[tokio::test] async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() { let (task, plan) = new_test_task_and_plan_with_missing_sink().await; diff --git a/src/sql/src/util.rs b/src/sql/src/util.rs index bbf5ce3277..f627c43e48 100644 --- a/src/sql/src/util.rs +++ b/src/sql/src/util.rs @@ -27,7 +27,7 @@ use serde::Serialize; use snafu::ensure; use sqlparser::ast::{ Array, Expr, Ident, ObjectName, ObjectNamePart, SetExpr, SqlOption, StructField, TableFactor, - Value, ValueWithSpan, + TableWithJoins, Value, ValueWithSpan, }; use sqlparser_derive::{Visit, VisitMut}; @@ -195,7 +195,7 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator { - extract_tables_from_set_expr(&query.inner.body, &mut names); + extract_tables_from_sql_query(&query.inner, &mut names); extract_tables_from_hybrid_cte_query(query, &mut names); } SqlOrTql::Tql(tql, _) => extract_tables_from_tql(tql, &mut names), @@ -205,26 +205,34 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator) { - let mut tql_names = HashSet::new(); - let mut cte_names: HashSet = HashSet::new(); if let Some(hybrid_cte) = &query.hybrid_cte { + let mut cte_names: HashSet = hybrid_cte + .cte_tables + .iter() + .map(|cte| ParserContext::canonicalize_identifier(cte.name.clone()).value) + .collect(); + remove_cte_names(sql_names, &cte_names); + + cte_names.clear(); for cte in &hybrid_cte.cte_tables { - cte_names.insert(ParserContext::canonicalize_identifier(cte.name.clone()).value); - if let CteContent::Tql(tql) = &cte.content { - extract_tables_from_tql(tql, &mut tql_names); + let cte_name = ParserContext::canonicalize_identifier(cte.name.clone()).value; + let mut cte_query_names = HashSet::new(); + match &cte.content { + CteContent::Sql(cte_query) => { + extract_tables_from_sql_query(cte_query, &mut cte_query_names) + } + CteContent::Tql(tql) => extract_tables_from_tql(tql, &mut cte_query_names), + } + if hybrid_cte.recursive { + cte_names.insert(cte_name.clone()); + } + remove_cte_names(&mut cte_query_names, &cte_names); + sql_names.extend(cte_query_names); + if !hybrid_cte.recursive { + cte_names.insert(cte_name); } } } - - if let Some(with) = &query.inner.with { - for cte in &with.cte_tables { - cte_names.insert(ParserContext::canonicalize_identifier(cte.alias.name.clone()).value); - } - } - - remove_cte_names(sql_names, &cte_names); - - sql_names.extend(tql_names); } fn remove_cte_names(names: &mut HashSet, cte_names: &HashSet) { @@ -339,6 +347,33 @@ pub fn location_to_index(sql: &str, location: &sqlparser::tokenizer::Location) - index - 1 } +/// Helper function for [extract_tables_from_query]. +/// +/// Handle [sqlparser::ast::Query]. +fn extract_tables_from_sql_query(query: &sqlparser::ast::Query, names: &mut HashSet) { + let mut cte_names = HashSet::new(); + if let Some(with) = &query.with { + for cte in &with.cte_tables { + let cte_name = ParserContext::canonicalize_identifier(cte.alias.name.clone()).value; + let mut cte_query_names = HashSet::new(); + extract_tables_from_sql_query(&cte.query, &mut cte_query_names); + if with.recursive { + cte_names.insert(cte_name.clone()); + } + remove_cte_names(&mut cte_query_names, &cte_names); + names.extend(cte_query_names); + if !with.recursive { + cte_names.insert(cte_name); + } + } + } + + let mut body_names = HashSet::new(); + extract_tables_from_set_expr(&query.body, &mut body_names); + remove_cte_names(&mut body_names, &cte_names); + names.extend(body_names); +} + /// Helper function for [extract_tables_from_query]. /// /// Handle [SetExpr]. @@ -346,14 +381,11 @@ fn extract_tables_from_set_expr(set_expr: &SetExpr, names: &mut HashSet { for from in &select.from { - table_factor_to_object_name(&from.relation, names); - for join in &from.joins { - table_factor_to_object_name(&join.relation, names); - } + extract_tables_from_table_with_joins(from, names); } } SetExpr::Query(query) => { - extract_tables_from_set_expr(&query.body, names); + extract_tables_from_sql_query(query, names); } SetExpr::SetOperation { left, right, .. } => { extract_tables_from_set_expr(left, names); @@ -363,12 +395,47 @@ fn extract_tables_from_set_expr(set_expr: &SetExpr, names: &mut HashSet, +) { + table_factor_to_object_name(&table_with_joins.relation, names); + for join in &table_with_joins.joins { + table_factor_to_object_name(&join.relation, names); + } +} + /// Helper function for [extract_tables_from_query]. /// /// Handle [TableFactor]. fn table_factor_to_object_name(table_factor: &TableFactor, names: &mut HashSet) { - if let TableFactor::Table { name, .. } = table_factor { - names.insert(name.to_owned()); + match table_factor { + TableFactor::Table { name, .. } => { + names.insert(name.to_owned()); + } + TableFactor::Derived { subquery, .. } => { + extract_tables_from_sql_query(subquery, names); + } + TableFactor::NestedJoin { + table_with_joins, .. + } => { + extract_tables_from_table_with_joins(table_with_joins, names); + } + TableFactor::Pivot { table, .. } + | TableFactor::Unpivot { table, .. } + | TableFactor::MatchRecognize { table, .. } => { + table_factor_to_object_name(table, names); + } + TableFactor::TableFunction { .. } + | TableFactor::Function { .. } + | TableFactor::UNNEST { .. } + | TableFactor::JsonTable { .. } + | TableFactor::OpenJsonTable { .. } + | TableFactor::XmlTable { .. } + | TableFactor::SemanticView { .. } => {} } } @@ -458,6 +525,91 @@ TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", {__n } } + #[test] + fn test_extract_tables_from_sql_query_with_derived_join() { + let sql = r#" +CREATE FLOW flow_batch_join_subquery SINK TO flow_batch_join_sink +EVAL INTERVAL '1m' AS +SELECT a.symbol, b.mark_price +FROM ( + SELECT inst_id AS symbol, max(ts) AS mark_iv_ts + FROM flow_batch_join_opt_summary + GROUP BY inst_id +) a +LEFT JOIN ( + SELECT symbol, max(mark_price) AS mark_price + FROM flow_batch_join_market_v5 + WHERE "type" = 'OPTION_MARK' + GROUP BY symbol +) b ON a.symbol = b.symbol; +"#; + let mut stmts = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + let Statement::CreateFlow(create_flow) = stmts.pop().unwrap() else { + unreachable!() + }; + + let mut tables = extract_tables_from_query(&create_flow.query) + .map(|table| format_raw_object_name(&table)) + .collect_vec(); + tables.sort(); + assert_eq!( + vec![ + "flow_batch_join_market_v5".to_string(), + "flow_batch_join_opt_summary".to_string(), + ], + tables + ); + } + + #[test] + fn test_extract_tables_from_sql_query_with_cte_scopes() { + let testcases = vec![ + ( + r#" +WITH source AS ( + SELECT * FROM source +) +SELECT * FROM source; +"#, + vec!["source".to_string()], + ), + ( + r#" +WITH first_cte AS ( + SELECT * FROM physical_source +), second_cte AS ( + SELECT * FROM first_cte +) +SELECT * FROM second_cte; +"#, + vec!["physical_source".to_string()], + ), + ]; + + for (sql, expected_tables) in testcases { + let mut stmts = ParserContext::create_with_dialect( + sql, + &GreptimeDbDialect {}, + ParseOptions::default(), + ) + .unwrap(); + let Statement::Query(query) = stmts.pop().unwrap() else { + unreachable!() + }; + + let mut tables = HashSet::new(); + extract_tables_from_sql_query(&query.inner, &mut tables); + let mut tables = tables + .into_iter() + .map(|table| format_raw_object_name(&table)) + .collect_vec(); + tables.sort(); + assert_eq!(expected_tables, tables); + } + } + #[test] fn test_extract_tables_from_tql_query_with_schema_matcher() { let sql = r#" diff --git a/tests/cases/standalone/common/flow/flow_batch_join_subquery.result b/tests/cases/standalone/common/flow/flow_batch_join_subquery.result new file mode 100644 index 0000000000..0d590cbdbc --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_batch_join_subquery.result @@ -0,0 +1,130 @@ +CREATE DATABASE flow_join_fixture; + +Affected Rows: 1 + +CREATE TABLE flow_join_fixture."left_samples" ( + source_id STRING, + left_value DOUBLE, + event_ts TIMESTAMP, + observed_at TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +CREATE TABLE flow_join_fixture."right_samples" ( + source_id STRING, + right_value DOUBLE, + sample_kind STRING, + event_ts TIMESTAMP, + observed_at TIMESTAMP TIME INDEX +); + +Affected Rows: 0 + +-- Verify batching flow creation accepts aggregate subqueries joined by LEFT JOIN. +CREATE FLOW flow_batch_join_subquery SINK TO flow_batch_join_sink +EVAL INTERVAL '5m' AS +SELECT + l.source_id, + l.measure_name, + l.bucket_time, + l.left_event_ts, + l.left_value, + r.right_event_ts, + r.right_value +FROM ( + SELECT + source_id, + 'sample' AS measure_name, + date_trunc('minute', now()) AS bucket_time, + max(event_ts) AS left_event_ts, + last_value(left_value ORDER BY observed_at) AS left_value + FROM + flow_join_fixture."left_samples" + WHERE + observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes' + AND date_trunc('minute', now()) + GROUP BY + source_id +) l +LEFT JOIN ( + SELECT + source_id, + 'sample' AS measure_name, + date_trunc('minute', now()) AS bucket_time, + max(event_ts) AS right_event_ts, + last_value(right_value ORDER BY observed_at) AS right_value + FROM + flow_join_fixture."right_samples" + WHERE + observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes' + AND date_trunc('minute', now()) + AND sample_kind = 'primary' + GROUP BY + source_id +) r ON l.source_id = r.source_id AND l.bucket_time = r.bucket_time; + +Affected Rows: 0 + +SELECT + source_table_names LIKE '%left_samples%' AS has_left_source, + source_table_names LIKE '%right_samples%' AS has_right_source, + options LIKE '%"flow_type":"batching"%' AS is_batching_flow +FROM + INFORMATION_SCHEMA.FLOWS +WHERE + flow_name = 'flow_batch_join_subquery'; + ++-----------------+------------------+------------------+ +| has_left_source | has_right_source | is_batching_flow | ++-----------------+------------------+------------------+ +| true | true | true | ++-----------------+------------------+------------------+ + +INSERT INTO flow_join_fixture."left_samples" VALUES + ('source-a', 0.12, date_trunc('minute', now()), date_trunc('minute', now())); + +Affected Rows: 1 + +INSERT INTO flow_join_fixture."right_samples" VALUES + ('source-a', 100.5, 'primary', date_trunc('minute', now()), date_trunc('minute', now())); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_batch_join_subquery'); + ++----------------------------------------------+ +| ADMIN FLUSH_FLOW('flow_batch_join_subquery') | ++----------------------------------------------+ +| FLOW_FLUSHED | ++----------------------------------------------+ + +SELECT source_id, measure_name, left_value, right_value FROM flow_batch_join_sink ORDER BY source_id; + ++-----------+--------------+------------+-------------+ +| source_id | measure_name | left_value | right_value | ++-----------+--------------+------------+-------------+ +| source-a | sample | 0.12 | 100.5 | ++-----------+--------------+------------+-------------+ + +DROP FLOW flow_batch_join_subquery; + +Affected Rows: 0 + +DROP TABLE flow_batch_join_sink; + +Affected Rows: 0 + +DROP TABLE flow_join_fixture."left_samples"; + +Affected Rows: 0 + +DROP TABLE flow_join_fixture."right_samples"; + +Affected Rows: 0 + +DROP DATABASE flow_join_fixture; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_batch_join_subquery.sql b/tests/cases/standalone/common/flow/flow_batch_join_subquery.sql new file mode 100644 index 0000000000..f37aafdf4f --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_batch_join_subquery.sql @@ -0,0 +1,85 @@ +CREATE DATABASE flow_join_fixture; + +CREATE TABLE flow_join_fixture."left_samples" ( + source_id STRING, + left_value DOUBLE, + event_ts TIMESTAMP, + observed_at TIMESTAMP TIME INDEX +); + +CREATE TABLE flow_join_fixture."right_samples" ( + source_id STRING, + right_value DOUBLE, + sample_kind STRING, + event_ts TIMESTAMP, + observed_at TIMESTAMP TIME INDEX +); + +-- Verify batching flow creation accepts aggregate subqueries joined by LEFT JOIN. +CREATE FLOW flow_batch_join_subquery SINK TO flow_batch_join_sink +EVAL INTERVAL '5m' AS +SELECT + l.source_id, + l.measure_name, + l.bucket_time, + l.left_event_ts, + l.left_value, + r.right_event_ts, + r.right_value +FROM ( + SELECT + source_id, + 'sample' AS measure_name, + date_trunc('minute', now()) AS bucket_time, + max(event_ts) AS left_event_ts, + last_value(left_value ORDER BY observed_at) AS left_value + FROM + flow_join_fixture."left_samples" + WHERE + observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes' + AND date_trunc('minute', now()) + GROUP BY + source_id +) l +LEFT JOIN ( + SELECT + source_id, + 'sample' AS measure_name, + date_trunc('minute', now()) AS bucket_time, + max(event_ts) AS right_event_ts, + last_value(right_value ORDER BY observed_at) AS right_value + FROM + flow_join_fixture."right_samples" + WHERE + observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes' + AND date_trunc('minute', now()) + AND sample_kind = 'primary' + GROUP BY + source_id +) r ON l.source_id = r.source_id AND l.bucket_time = r.bucket_time; + +SELECT + source_table_names LIKE '%left_samples%' AS has_left_source, + source_table_names LIKE '%right_samples%' AS has_right_source, + options LIKE '%"flow_type":"batching"%' AS is_batching_flow +FROM + INFORMATION_SCHEMA.FLOWS +WHERE + flow_name = 'flow_batch_join_subquery'; + +INSERT INTO flow_join_fixture."left_samples" VALUES + ('source-a', 0.12, date_trunc('minute', now()), date_trunc('minute', now())); + +INSERT INTO flow_join_fixture."right_samples" VALUES + ('source-a', 100.5, 'primary', date_trunc('minute', now()), date_trunc('minute', now())); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('flow_batch_join_subquery'); + +SELECT source_id, measure_name, left_value, right_value FROM flow_batch_join_sink ORDER BY source_id; + +DROP FLOW flow_batch_join_subquery; +DROP TABLE flow_batch_join_sink; +DROP TABLE flow_join_fixture."left_samples"; +DROP TABLE flow_join_fixture."right_samples"; +DROP DATABASE flow_join_fixture;