fix: run eval-interval flow without time window (#8231)

* fix: run eval-interval flow without time window

Signed-off-by: discord9 <discord9@163.com>

* test: cover eval-interval flow join query

Signed-off-by: discord9 <discord9@163.com>

* fix: address eval interval flow review comments

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-03 21:06:22 +08:00
committed by GitHub
parent ca07a53deb
commit d304df6e75
5 changed files with 426 additions and 26 deletions

View File

@@ -959,7 +959,7 @@ impl BatchingTask {
let (expire_lower_bound, expire_upper_bound) =
match (expire_time_window_bound, &self.config.query_type) {
(Some((Some(l), Some(u))), QueryType::Sql) => (l, u),
(None, QueryType::Sql) => {
(None, QueryType::Sql) if self.config.flow_eval_interval.is_none() => {
// if it's sql query and no time window lower/upper bound is found, just return the original query(with auto columns)
// use sink_table_meta to add to query the `update_at` and `__ts_placeholder` column's value too for compatibility reason
debug!(
@@ -980,7 +980,8 @@ impl BatchingTask {
}
_ => {
// Clean dirty windows for full-query/non-scoped paths,
// such as TQL, that cannot use a time-window filter.
// such as TQL or evaluation-interval SQL without a recognized
// time-window expression, that cannot use a time-window filter.
let (_, dirty_windows_to_restore) = self.drain_dirty_windows_signal();
let plan_info = self

View File

@@ -974,6 +974,38 @@ async fn test_non_scoped_path_generates_plan_with_empty_dirty_signal() {
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
}
#[tokio::test]
async fn test_no_time_window_sql_with_eval_interval_generates_plan_without_dirty_signal() {
let TestTaskParts {
mut task,
query_engine,
..
} = new_test_task_engine_and_plan_with_query(
"SELECT number, ts FROM numbers_with_ts",
"missing_sink",
)
.await;
Arc::get_mut(&mut task.config)
.expect("test task config should be uniquely owned")
.flow_eval_interval = Some(Duration::from_secs(60));
task.state.write().unwrap().dirty_time_windows.clean();
let sink_schema = Arc::new(Schema::new(vec![
ColumnSchema::new("number", CDT::uint32_datatype(), false),
ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false).with_time_index(true),
]));
let plan = task
.gen_query_with_time_window(query_engine, &sink_schema, &[], false, None)
.await
.unwrap()
.expect(
"eval-interval SQL without a time-window expr should run by interval, not dirty signal",
);
assert!(plan.can_advance_checkpoints);
assert!(task.state.read().unwrap().dirty_time_windows.is_empty());
}
#[tokio::test]
async fn test_executed_query_failure_restores_scoped_dirty_windows_for_flush_path() {
let (task, plan) = new_test_task_and_plan_with_missing_sink().await;

View File

@@ -27,7 +27,7 @@ use serde::Serialize;
use snafu::ensure;
use sqlparser::ast::{
Array, Expr, Ident, ObjectName, ObjectNamePart, SetExpr, SqlOption, StructField, TableFactor,
Value, ValueWithSpan,
TableWithJoins, Value, ValueWithSpan,
};
use sqlparser_derive::{Visit, VisitMut};
@@ -195,7 +195,7 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator<Item = Objec
match query {
SqlOrTql::Sql(query, _) => {
extract_tables_from_set_expr(&query.inner.body, &mut names);
extract_tables_from_sql_query(&query.inner, &mut names);
extract_tables_from_hybrid_cte_query(query, &mut names);
}
SqlOrTql::Tql(tql, _) => extract_tables_from_tql(tql, &mut names),
@@ -205,26 +205,34 @@ pub fn extract_tables_from_query(query: &SqlOrTql) -> impl Iterator<Item = Objec
}
fn extract_tables_from_hybrid_cte_query(query: &Query, sql_names: &mut HashSet<ObjectName>) {
let mut tql_names = HashSet::new();
let mut cte_names: HashSet<String> = HashSet::new();
if let Some(hybrid_cte) = &query.hybrid_cte {
let mut cte_names: HashSet<String> = hybrid_cte
.cte_tables
.iter()
.map(|cte| ParserContext::canonicalize_identifier(cte.name.clone()).value)
.collect();
remove_cte_names(sql_names, &cte_names);
cte_names.clear();
for cte in &hybrid_cte.cte_tables {
cte_names.insert(ParserContext::canonicalize_identifier(cte.name.clone()).value);
if let CteContent::Tql(tql) = &cte.content {
extract_tables_from_tql(tql, &mut tql_names);
let cte_name = ParserContext::canonicalize_identifier(cte.name.clone()).value;
let mut cte_query_names = HashSet::new();
match &cte.content {
CteContent::Sql(cte_query) => {
extract_tables_from_sql_query(cte_query, &mut cte_query_names)
}
CteContent::Tql(tql) => extract_tables_from_tql(tql, &mut cte_query_names),
}
if hybrid_cte.recursive {
cte_names.insert(cte_name.clone());
}
remove_cte_names(&mut cte_query_names, &cte_names);
sql_names.extend(cte_query_names);
if !hybrid_cte.recursive {
cte_names.insert(cte_name);
}
}
}
if let Some(with) = &query.inner.with {
for cte in &with.cte_tables {
cte_names.insert(ParserContext::canonicalize_identifier(cte.alias.name.clone()).value);
}
}
remove_cte_names(sql_names, &cte_names);
sql_names.extend(tql_names);
}
fn remove_cte_names(names: &mut HashSet<ObjectName>, cte_names: &HashSet<String>) {
@@ -339,6 +347,33 @@ pub fn location_to_index(sql: &str, location: &sqlparser::tokenizer::Location) -
index - 1
}
/// Helper function for [extract_tables_from_query].
///
/// Handle [sqlparser::ast::Query].
fn extract_tables_from_sql_query(query: &sqlparser::ast::Query, names: &mut HashSet<ObjectName>) {
let mut cte_names = HashSet::new();
if let Some(with) = &query.with {
for cte in &with.cte_tables {
let cte_name = ParserContext::canonicalize_identifier(cte.alias.name.clone()).value;
let mut cte_query_names = HashSet::new();
extract_tables_from_sql_query(&cte.query, &mut cte_query_names);
if with.recursive {
cte_names.insert(cte_name.clone());
}
remove_cte_names(&mut cte_query_names, &cte_names);
names.extend(cte_query_names);
if !with.recursive {
cte_names.insert(cte_name);
}
}
}
let mut body_names = HashSet::new();
extract_tables_from_set_expr(&query.body, &mut body_names);
remove_cte_names(&mut body_names, &cte_names);
names.extend(body_names);
}
/// Helper function for [extract_tables_from_query].
///
/// Handle [SetExpr].
@@ -346,14 +381,11 @@ fn extract_tables_from_set_expr(set_expr: &SetExpr, names: &mut HashSet<ObjectNa
match set_expr {
SetExpr::Select(select) => {
for from in &select.from {
table_factor_to_object_name(&from.relation, names);
for join in &from.joins {
table_factor_to_object_name(&join.relation, names);
}
extract_tables_from_table_with_joins(from, names);
}
}
SetExpr::Query(query) => {
extract_tables_from_set_expr(&query.body, names);
extract_tables_from_sql_query(query, names);
}
SetExpr::SetOperation { left, right, .. } => {
extract_tables_from_set_expr(left, names);
@@ -363,12 +395,47 @@ fn extract_tables_from_set_expr(set_expr: &SetExpr, names: &mut HashSet<ObjectNa
};
}
/// Helper function for [extract_tables_from_query].
///
/// Handle [TableWithJoins].
fn extract_tables_from_table_with_joins(
table_with_joins: &TableWithJoins,
names: &mut HashSet<ObjectName>,
) {
table_factor_to_object_name(&table_with_joins.relation, names);
for join in &table_with_joins.joins {
table_factor_to_object_name(&join.relation, names);
}
}
/// Helper function for [extract_tables_from_query].
///
/// Handle [TableFactor].
fn table_factor_to_object_name(table_factor: &TableFactor, names: &mut HashSet<ObjectName>) {
if let TableFactor::Table { name, .. } = table_factor {
names.insert(name.to_owned());
match table_factor {
TableFactor::Table { name, .. } => {
names.insert(name.to_owned());
}
TableFactor::Derived { subquery, .. } => {
extract_tables_from_sql_query(subquery, names);
}
TableFactor::NestedJoin {
table_with_joins, ..
} => {
extract_tables_from_table_with_joins(table_with_joins, names);
}
TableFactor::Pivot { table, .. }
| TableFactor::Unpivot { table, .. }
| TableFactor::MatchRecognize { table, .. } => {
table_factor_to_object_name(table, names);
}
TableFactor::TableFunction { .. }
| TableFactor::Function { .. }
| TableFactor::UNNEST { .. }
| TableFactor::JsonTable { .. }
| TableFactor::OpenJsonTable { .. }
| TableFactor::XmlTable { .. }
| TableFactor::SemanticView { .. } => {}
}
}
@@ -458,6 +525,91 @@ TQL EVAL (now() - '15s'::interval, now(), '5s') count_values("status_code", {__n
}
}
#[test]
fn test_extract_tables_from_sql_query_with_derived_join() {
let sql = r#"
CREATE FLOW flow_batch_join_subquery SINK TO flow_batch_join_sink
EVAL INTERVAL '1m' AS
SELECT a.symbol, b.mark_price
FROM (
SELECT inst_id AS symbol, max(ts) AS mark_iv_ts
FROM flow_batch_join_opt_summary
GROUP BY inst_id
) a
LEFT JOIN (
SELECT symbol, max(mark_price) AS mark_price
FROM flow_batch_join_market_v5
WHERE "type" = 'OPTION_MARK'
GROUP BY symbol
) b ON a.symbol = b.symbol;
"#;
let mut stmts =
ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default())
.unwrap();
let Statement::CreateFlow(create_flow) = stmts.pop().unwrap() else {
unreachable!()
};
let mut tables = extract_tables_from_query(&create_flow.query)
.map(|table| format_raw_object_name(&table))
.collect_vec();
tables.sort();
assert_eq!(
vec![
"flow_batch_join_market_v5".to_string(),
"flow_batch_join_opt_summary".to_string(),
],
tables
);
}
#[test]
fn test_extract_tables_from_sql_query_with_cte_scopes() {
let testcases = vec![
(
r#"
WITH source AS (
SELECT * FROM source
)
SELECT * FROM source;
"#,
vec!["source".to_string()],
),
(
r#"
WITH first_cte AS (
SELECT * FROM physical_source
), second_cte AS (
SELECT * FROM first_cte
)
SELECT * FROM second_cte;
"#,
vec!["physical_source".to_string()],
),
];
for (sql, expected_tables) in testcases {
let mut stmts = ParserContext::create_with_dialect(
sql,
&GreptimeDbDialect {},
ParseOptions::default(),
)
.unwrap();
let Statement::Query(query) = stmts.pop().unwrap() else {
unreachable!()
};
let mut tables = HashSet::new();
extract_tables_from_sql_query(&query.inner, &mut tables);
let mut tables = tables
.into_iter()
.map(|table| format_raw_object_name(&table))
.collect_vec();
tables.sort();
assert_eq!(expected_tables, tables);
}
}
#[test]
fn test_extract_tables_from_tql_query_with_schema_matcher() {
let sql = r#"

View File

@@ -0,0 +1,130 @@
CREATE DATABASE flow_join_fixture;
Affected Rows: 1
CREATE TABLE flow_join_fixture."left_samples" (
source_id STRING,
left_value DOUBLE,
event_ts TIMESTAMP,
observed_at TIMESTAMP TIME INDEX
);
Affected Rows: 0
CREATE TABLE flow_join_fixture."right_samples" (
source_id STRING,
right_value DOUBLE,
sample_kind STRING,
event_ts TIMESTAMP,
observed_at TIMESTAMP TIME INDEX
);
Affected Rows: 0
-- Verify batching flow creation accepts aggregate subqueries joined by LEFT JOIN.
CREATE FLOW flow_batch_join_subquery SINK TO flow_batch_join_sink
EVAL INTERVAL '5m' AS
SELECT
l.source_id,
l.measure_name,
l.bucket_time,
l.left_event_ts,
l.left_value,
r.right_event_ts,
r.right_value
FROM (
SELECT
source_id,
'sample' AS measure_name,
date_trunc('minute', now()) AS bucket_time,
max(event_ts) AS left_event_ts,
last_value(left_value ORDER BY observed_at) AS left_value
FROM
flow_join_fixture."left_samples"
WHERE
observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes'
AND date_trunc('minute', now())
GROUP BY
source_id
) l
LEFT JOIN (
SELECT
source_id,
'sample' AS measure_name,
date_trunc('minute', now()) AS bucket_time,
max(event_ts) AS right_event_ts,
last_value(right_value ORDER BY observed_at) AS right_value
FROM
flow_join_fixture."right_samples"
WHERE
observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes'
AND date_trunc('minute', now())
AND sample_kind = 'primary'
GROUP BY
source_id
) r ON l.source_id = r.source_id AND l.bucket_time = r.bucket_time;
Affected Rows: 0
SELECT
source_table_names LIKE '%left_samples%' AS has_left_source,
source_table_names LIKE '%right_samples%' AS has_right_source,
options LIKE '%"flow_type":"batching"%' AS is_batching_flow
FROM
INFORMATION_SCHEMA.FLOWS
WHERE
flow_name = 'flow_batch_join_subquery';
+-----------------+------------------+------------------+
| has_left_source | has_right_source | is_batching_flow |
+-----------------+------------------+------------------+
| true | true | true |
+-----------------+------------------+------------------+
INSERT INTO flow_join_fixture."left_samples" VALUES
('source-a', 0.12, date_trunc('minute', now()), date_trunc('minute', now()));
Affected Rows: 1
INSERT INTO flow_join_fixture."right_samples" VALUES
('source-a', 100.5, 'primary', date_trunc('minute', now()), date_trunc('minute', now()));
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_batch_join_subquery');
+----------------------------------------------+
| ADMIN FLUSH_FLOW('flow_batch_join_subquery') |
+----------------------------------------------+
| FLOW_FLUSHED |
+----------------------------------------------+
SELECT source_id, measure_name, left_value, right_value FROM flow_batch_join_sink ORDER BY source_id;
+-----------+--------------+------------+-------------+
| source_id | measure_name | left_value | right_value |
+-----------+--------------+------------+-------------+
| source-a | sample | 0.12 | 100.5 |
+-----------+--------------+------------+-------------+
DROP FLOW flow_batch_join_subquery;
Affected Rows: 0
DROP TABLE flow_batch_join_sink;
Affected Rows: 0
DROP TABLE flow_join_fixture."left_samples";
Affected Rows: 0
DROP TABLE flow_join_fixture."right_samples";
Affected Rows: 0
DROP DATABASE flow_join_fixture;
Affected Rows: 0

View File

@@ -0,0 +1,85 @@
CREATE DATABASE flow_join_fixture;
CREATE TABLE flow_join_fixture."left_samples" (
source_id STRING,
left_value DOUBLE,
event_ts TIMESTAMP,
observed_at TIMESTAMP TIME INDEX
);
CREATE TABLE flow_join_fixture."right_samples" (
source_id STRING,
right_value DOUBLE,
sample_kind STRING,
event_ts TIMESTAMP,
observed_at TIMESTAMP TIME INDEX
);
-- Verify batching flow creation accepts aggregate subqueries joined by LEFT JOIN.
CREATE FLOW flow_batch_join_subquery SINK TO flow_batch_join_sink
EVAL INTERVAL '5m' AS
SELECT
l.source_id,
l.measure_name,
l.bucket_time,
l.left_event_ts,
l.left_value,
r.right_event_ts,
r.right_value
FROM (
SELECT
source_id,
'sample' AS measure_name,
date_trunc('minute', now()) AS bucket_time,
max(event_ts) AS left_event_ts,
last_value(left_value ORDER BY observed_at) AS left_value
FROM
flow_join_fixture."left_samples"
WHERE
observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes'
AND date_trunc('minute', now())
GROUP BY
source_id
) l
LEFT JOIN (
SELECT
source_id,
'sample' AS measure_name,
date_trunc('minute', now()) AS bucket_time,
max(event_ts) AS right_event_ts,
last_value(right_value ORDER BY observed_at) AS right_value
FROM
flow_join_fixture."right_samples"
WHERE
observed_at BETWEEN date_trunc('minute', now()) - INTERVAL '5 minutes'
AND date_trunc('minute', now())
AND sample_kind = 'primary'
GROUP BY
source_id
) r ON l.source_id = r.source_id AND l.bucket_time = r.bucket_time;
SELECT
source_table_names LIKE '%left_samples%' AS has_left_source,
source_table_names LIKE '%right_samples%' AS has_right_source,
options LIKE '%"flow_type":"batching"%' AS is_batching_flow
FROM
INFORMATION_SCHEMA.FLOWS
WHERE
flow_name = 'flow_batch_join_subquery';
INSERT INTO flow_join_fixture."left_samples" VALUES
('source-a', 0.12, date_trunc('minute', now()), date_trunc('minute', now()));
INSERT INTO flow_join_fixture."right_samples" VALUES
('source-a', 100.5, 'primary', date_trunc('minute', now()), date_trunc('minute', now()));
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('flow_batch_join_subquery');
SELECT source_id, measure_name, left_value, right_value FROM flow_batch_join_sink ORDER BY source_id;
DROP FLOW flow_batch_join_subquery;
DROP TABLE flow_batch_join_sink;
DROP TABLE flow_join_fixture."left_samples";
DROP TABLE flow_join_fixture."right_samples";
DROP DATABASE flow_join_fixture;