From b0ad3f0bb44e4175b172404be269140058543875 Mon Sep 17 00:00:00 2001 From: discord9 <55937128+discord9@users.noreply.github.com> Date: Thu, 1 May 2025 16:43:06 +0800 Subject: [PATCH] fix: force streaming mode for instant source table (#6031) * fix: force streaming mode for instant source table * tests: sqlness test&refactor: get table * refactor: per review --- src/operator/src/statement/ddl.rs | 48 ++++- .../common/flow/flow_advance_ttl.result | 193 ++++++++++++++---- .../common/flow/flow_advance_ttl.sql | 66 +++++- 3 files changed, 259 insertions(+), 48 deletions(-) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index ad2f5bcf71..c995999672 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -37,7 +37,7 @@ use common_meta::rpc::ddl::{ }; use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_query::Output; -use common_telemetry::{debug, info, tracing}; +use common_telemetry::{debug, info, tracing, warn}; use common_time::Timezone; use datafusion_common::tree_node::TreeNodeVisitor; use datafusion_expr::LogicalPlan; @@ -369,7 +369,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let flow_type = self - .determine_flow_type(&expr.sql, query_context.clone()) + .determine_flow_type(&expr, query_context.clone()) .await?; info!("determined flow={} type: {:#?}", expr.flow_name, flow_type); @@ -398,9 +398,49 @@ impl StatementExecutor { /// Determine the flow type based on the SQL query /// /// If it contains aggregation or distinct, then it is a batch flow, otherwise it is a streaming flow - async fn determine_flow_type(&self, sql: &str, query_ctx: QueryContextRef) -> Result { + async fn determine_flow_type( + &self, + expr: &CreateFlowExpr, + query_ctx: QueryContextRef, + ) -> Result { + // first check if source table's ttl is instant, if it is, force streaming mode + for src_table_name in &expr.source_table_names { + let table = self + .catalog_manager() + .table( + &src_table_name.catalog_name, + &src_table_name.schema_name, + &src_table_name.table_name, + Some(&query_ctx), + ) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &src_table_name.catalog_name, + &src_table_name.schema_name, + &src_table_name.table_name, + ), + })?; + + // instant source table can only be handled by streaming mode + if table.table_info().meta.options.ttl == Some(common_time::TimeToLive::Instant) { + warn!( + "Source table `{}` for flow `{}`'s ttl=instant, fallback to streaming mode", + format_full_table_name( + &src_table_name.catalog_name, + &src_table_name.schema_name, + &src_table_name.table_name + ), + expr.flow_name + ); + return Ok(FlowType::Streaming); + } + } + let engine = &self.query_engine; - let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx) + let stmt = QueryLanguageParser::parse_sql(&expr.sql, &query_ctx) .map_err(BoxedError::new) .context(ExternalSnafu)?; let plan = engine diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.result b/tests/cases/standalone/common/flow/flow_advance_ttl.result index 6addda09bd..05ae665be8 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.result +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.result @@ -8,7 +8,7 @@ CREATE TABLE distinct_basic ( Affected Rows: 0 --- should fail +-- should fallback to streaming mode -- SQLNESS REPLACE id=\d+ id=REDACTED CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT @@ -16,9 +16,151 @@ SELECT FROM distinct_basic; -Error: 3001(EngineExecuteQuery), Unsupported: Source table `greptime.public.distinct_basic`(id=REDACTED) has instant TTL, Instant TTL is not supported under batching mode. Consider using a TTL longer than flush interval +Affected Rows: 0 -ALTER TABLE distinct_basic SET 'ttl' = '5s'; +-- flow_options should have a flow_type:streaming +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + ++---------------------+---------------------------+ +| flow_name | options | ++---------------------+---------------------------+ +| test_distinct_basic | {"flow_type":"streaming"} | ++---------------------+---------------------------+ + +SHOW CREATE TABLE distinct_basic; + ++----------------+-----------------------------------------------------------+ +| Table | Create Table | ++----------------+-----------------------------------------------------------+ +| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( | +| | "number" INT NULL, | +| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | +| | TIME INDEX ("ts"), | +| | PRIMARY KEY ("number") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | ttl = 'instant' | +| | ) | ++----------------+-----------------------------------------------------------+ + +SHOW CREATE TABLE out_distinct_basic; + ++--------------------+---------------------------------------------------+ +| Table | Create Table | ++--------------------+---------------------------------------------------+ +| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | +| | "dis" INT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("dis") | +| | ) | +| | | +| | ENGINE=mito | +| | | ++--------------------+---------------------------------------------------+ + +-- SQLNESS SLEEP 3s +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('test_distinct_basic') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | ++-----+ + +SELECT number FROM distinct_basic; + +++ +++ + +-- SQLNESS SLEEP 6s +ADMIN FLUSH_TABLE('distinct_basic'); + ++-------------------------------------+ +| ADMIN FLUSH_TABLE('distinct_basic') | ++-------------------------------------+ +| 0 | ++-------------------------------------+ + +INSERT INTO + distinct_basic +VALUES + (23, "2021-07-01 00:00:01.600"); + +Affected Rows: 0 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + ++-----------------------------------------+ +| ADMIN FLUSH_FLOW('test_distinct_basic') | ++-----------------------------------------+ +| FLOW_FLUSHED | ++-----------------------------------------+ + +SELECT + dis +FROM + out_distinct_basic; + ++-----+ +| dis | ++-----+ +| 20 | +| 22 | +| 23 | ++-----+ + +SELECT number FROM distinct_basic; + +++ +++ + +DROP FLOW test_distinct_basic; + +Affected Rows: 0 + +DROP TABLE distinct_basic; + +Affected Rows: 0 + +DROP TABLE out_distinct_basic; + +Affected Rows: 0 + +-- test ttl = 5s +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +)WITH ('ttl' = '5s'); Affected Rows: 0 @@ -30,6 +172,16 @@ FROM Affected Rows: 0 +-- flow_options should have a flow_type:batching +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + ++---------------------+--------------------------+ +| flow_name | options | ++---------------------+--------------------------+ +| test_distinct_basic | {"flow_type":"batching"} | ++---------------------+--------------------------+ + -- SQLNESS ARG restart=true SELECT 1; @@ -139,41 +291,6 @@ ADMIN FLUSH_FLOW('test_distinct_basic'); | FLOW_FLUSHED | +-----------------------------------------+ -SHOW CREATE TABLE distinct_basic; - -+----------------+-----------------------------------------------------------+ -| Table | Create Table | -+----------------+-----------------------------------------------------------+ -| distinct_basic | CREATE TABLE IF NOT EXISTS "distinct_basic" ( | -| | "number" INT NULL, | -| | "ts" TIMESTAMP(3) NOT NULL DEFAULT current_timestamp(), | -| | TIME INDEX ("ts"), | -| | PRIMARY KEY ("number") | -| | ) | -| | | -| | ENGINE=mito | -| | WITH( | -| | ttl = '5s' | -| | ) | -+----------------+-----------------------------------------------------------+ - -SHOW CREATE TABLE out_distinct_basic; - -+--------------------+---------------------------------------------------+ -| Table | Create Table | -+--------------------+---------------------------------------------------+ -| out_distinct_basic | CREATE TABLE IF NOT EXISTS "out_distinct_basic" ( | -| | "dis" INT NULL, | -| | "update_at" TIMESTAMP(3) NULL, | -| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | -| | TIME INDEX ("__ts_placeholder"), | -| | PRIMARY KEY ("dis") | -| | ) | -| | | -| | ENGINE=mito | -| | | -+--------------------+---------------------------------------------------+ - SELECT dis FROM diff --git a/tests/cases/standalone/common/flow/flow_advance_ttl.sql b/tests/cases/standalone/common/flow/flow_advance_ttl.sql index 3b9b46b613..141c595e89 100644 --- a/tests/cases/standalone/common/flow/flow_advance_ttl.sql +++ b/tests/cases/standalone/common/flow/flow_advance_ttl.sql @@ -6,7 +6,7 @@ CREATE TABLE distinct_basic ( TIME INDEX(ts) )WITH ('ttl' = 'instant'); --- should fail +-- should fallback to streaming mode -- SQLNESS REPLACE id=\d+ id=REDACTED CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT @@ -14,7 +14,61 @@ SELECT FROM distinct_basic; -ALTER TABLE distinct_basic SET 'ttl' = '5s'; +-- flow_options should have a flow_type:streaming +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + +SHOW CREATE TABLE distinct_basic; + +SHOW CREATE TABLE out_distinct_basic; + +-- SQLNESS SLEEP 3s +INSERT INTO + distinct_basic +VALUES + (20, "2021-07-01 00:00:00.200"), + (20, "2021-07-01 00:00:00.200"), + (22, "2021-07-01 00:00:00.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + +SELECT + dis +FROM + out_distinct_basic; + +SELECT number FROM distinct_basic; + +-- SQLNESS SLEEP 6s +ADMIN FLUSH_TABLE('distinct_basic'); + +INSERT INTO + distinct_basic +VALUES + (23, "2021-07-01 00:00:01.600"); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('test_distinct_basic'); + +SELECT + dis +FROM + out_distinct_basic; + +SELECT number FROM distinct_basic; + +DROP FLOW test_distinct_basic; +DROP TABLE distinct_basic; +DROP TABLE out_distinct_basic; + +-- test ttl = 5s +CREATE TABLE distinct_basic ( + number INT, + ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(number), + TIME INDEX(ts) +)WITH ('ttl' = '5s'); CREATE FLOW test_distinct_basic SINK TO out_distinct_basic AS SELECT @@ -22,6 +76,10 @@ SELECT FROM distinct_basic; +-- flow_options should have a flow_type:batching +-- since source table's ttl=instant +SELECT flow_name, options FROM INFORMATION_SCHEMA.FLOWS; + -- SQLNESS ARG restart=true SELECT 1; @@ -58,10 +116,6 @@ VALUES -- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | ADMIN FLUSH_FLOW('test_distinct_basic'); -SHOW CREATE TABLE distinct_basic; - -SHOW CREATE TABLE out_distinct_basic; - SELECT dis FROM