From da914ee6adf974516083545676c39cb684748dfd Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 16 Mar 2026 14:45:25 +0800 Subject: [PATCH] fix: always create flow reconcile ticker Signed-off-by: discord9 --- src/cmd/src/standalone.rs | 39 ++++ src/common/meta/src/ddl/activate_flow.rs | 19 +- .../meta/src/ddl/tests/activate_flow.rs | 87 +++++++- src/meta-srv/src/metasrv.rs | 4 + .../common/flow/flow_pending.result | 191 ++++++++++++++++++ .../standalone/common/flow/flow_pending.sql | 88 ++++++++ 6 files changed, 422 insertions(+), 6 deletions(-) create mode 100644 tests/cases/standalone/common/flow/flow_pending.result create mode 100644 tests/cases/standalone/common/flow/flow_pending.sql diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 215bea0ec5..3a644c86df 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -59,6 +59,7 @@ use frontend::frontend::Frontend; use frontend::instance::StandaloneDatanodeManager; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; +use meta_srv::flow::{PendingFlowReconcileManager, PendingFlowReconcileTicker}; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use plugins::PluginOptions; use plugins::frontend::context::{ @@ -122,12 +123,44 @@ pub struct Instance { datanode: Datanode, frontend: Frontend, flownode: FlownodeInstance, + pending_flow_reconcile_runtime: PendingFlowReconcileRuntime, procedure_manager: ProcedureManagerRef, wal_provider: WalProviderRef, // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } +struct PendingFlowReconcileRuntime { + manager: Option, + ticker: PendingFlowReconcileTicker, +} + +impl PendingFlowReconcileRuntime { + fn new(ddl_manager: Arc) -> Self { + let (manager, ticker) = PendingFlowReconcileManager::new(ddl_manager); + Self { + manager: Some(manager), + ticker, + } + } + + fn start(&mut self) -> Result<()> { + if let Some(manager) = self.manager.take() { + manager + .try_start() + .map_err(BoxedError::new) + .context(OtherSnafu)?; + self.ticker.start(); + } + + Ok(()) + } + + fn stop(&self) { + self.ticker.stop(); + } +} + impl Instance { /// Find the socket addr of a server by its `name`. pub fn server_addr(&self, name: &str) -> Option { @@ -176,11 +209,14 @@ impl App for Instance { .context(error::StartFrontendSnafu)?; self.flownode.start().await.context(StartFlownodeSnafu)?; + self.pending_flow_reconcile_runtime.start()?; Ok(()) } async fn stop(&mut self) -> Result<()> { + self.pending_flow_reconcile_runtime.stop(); + self.frontend .shutdown() .await @@ -559,6 +595,8 @@ impl StartCommand { .procedure_executor_creator .create(Arc::new(ddl_manager), procedure_manager.clone()) .await?; + let ddl_manager = Arc::new(ddl_manager); + let pending_flow_reconcile_runtime = PendingFlowReconcileRuntime::new(ddl_manager.clone()); let fe_instance = FrontendBuilder::new( fe_opts.clone(), @@ -611,6 +649,7 @@ impl StartCommand { datanode, frontend, flownode, + pending_flow_reconcile_runtime, procedure_manager, wal_provider, _guard: vec![], diff --git a/src/common/meta/src/ddl/activate_flow.rs b/src/common/meta/src/ddl/activate_flow.rs index 2f82f50f77..41f1cffdea 100644 --- a/src/common/meta/src/ddl/activate_flow.rs +++ b/src/common/meta/src/ddl/activate_flow.rs @@ -108,7 +108,17 @@ impl ActivatePendingFlowProcedure { }; if current_flow_info.get_inner_ref().is_active() { - return Ok(Status::done()); + let routes = self + .context + .flow_metadata_manager + .flow_route_manager() + .routes(self.data.flow_id) + .await?; + self.data.peers = routes.into_iter().map(|(_, value)| value.peer).collect(); + self.data.resolved_table_ids = + current_flow_info.get_inner_ref().source_table_ids.clone(); + self.data.state = ActivatePendingFlowState::InvalidateFlowCache; + return Ok(Status::executing(true)); } let resolution = @@ -206,7 +216,8 @@ impl ActivatePendingFlowProcedure { &self.data.resolved_table_ids, &self.data.peers, ) - .await?; + .await + .map_err(error::Error::retry_later)?; Ok(Status::done_with_output(self.data.flow_id)) } } @@ -344,8 +355,8 @@ fn build_create_request( .map(|table_id| api::v1::TableId { id: *table_id }) .collect_vec(), sink_table_name: Some(flow_info.sink_table_name.clone().into()), - create_if_not_exists: true, - or_replace: false, + create_if_not_exists: false, + or_replace: true, expire_after: flow_info.expire_after.map(|value| ExpireAfter { value }), eval_interval: flow_info .eval_interval_secs diff --git a/src/common/meta/src/ddl/tests/activate_flow.rs b/src/common/meta/src/ddl/tests/activate_flow.rs index 9d0a514f26..fa9a4848b5 100644 --- a/src/common/meta/src/ddl/tests/activate_flow.rs +++ b/src/common/meta/src/ddl/tests/activate_flow.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use api::v1::flow::flow_request::Body as PbFlowRequest; +use api::v1::flow::{CreateRequest, FlowRequest, FlowResponse}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure_test::execute_procedure_until_done; use common_time::TimeToLive; @@ -23,7 +25,38 @@ use crate::ddl::activate_flow::ActivatePendingFlowProcedure; use crate::ddl::test_util::create_table::test_create_table_task; use crate::ddl::tests::create_flow::create_test_flow; use crate::key::table_route::TableRouteValue; -use crate::test_util::{MockFlownodeManager, new_ddl_context}; +use crate::test_util::{MockFlownodeHandler, MockFlownodeManager, new_ddl_context}; + +#[derive(Clone, Default)] +struct RecordingFlownodeHandler { + create_requests: Arc>>, +} + +#[async_trait::async_trait] +impl MockFlownodeHandler for RecordingFlownodeHandler { + async fn handle( + &self, + _peer: &crate::peer::Peer, + request: FlowRequest, + ) -> crate::error::Result { + if let Some(PbFlowRequest::Create(create_req)) = request.body { + self.create_requests.lock().unwrap().push(create_req); + } + + Ok(FlowResponse { + affected_rows: 0, + ..Default::default() + }) + } + + async fn handle_inserts( + &self, + _peer: &crate::peer::Peer, + _requests: api::v1::region::InsertRequests, + ) -> crate::error::Result { + unreachable!() + } +} #[tokio::test] async fn test_activate_pending_flow() { @@ -156,3 +189,53 @@ async fn test_activate_pending_flow_require_streaming_keeps_pending() { .contains("requires streaming activation") ); } + +#[tokio::test] +async fn test_activate_pending_flow_uses_replace_semantics() { + let source_table_names = vec![TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "activate_replace_source_table", + )]; + let sink_table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "activate_replace_sink_table", + ); + + let handler = RecordingFlownodeHandler::default(); + let node_manager = Arc::new(MockFlownodeManager::new(handler.clone())); + let ddl_context = new_ddl_context(node_manager); + + let flow_id = create_test_flow( + &ddl_context, + "activate_pending_flow_replace_semantics", + source_table_names, + sink_table_name, + ) + .await; + + let create_table_task = test_create_table_task("activate_replace_source_table", 1027); + ddl_context + .table_metadata_manager + .create_table_metadata( + create_table_task.table_info.clone(), + TableRouteValue::physical(vec![]), + Default::default(), + ) + .await + .unwrap(); + + let mut procedure = + ActivatePendingFlowProcedure::new(flow_id, DEFAULT_CATALOG_NAME.to_string(), ddl_context); + let output = execute_procedure_until_done(&mut procedure).await.unwrap(); + let activated_flow_id = output.downcast_ref::().unwrap(); + assert_eq!(*activated_flow_id, flow_id); + + let create_requests = handler.create_requests.lock().unwrap(); + assert!(!create_requests.is_empty()); + for req in create_requests.iter() { + assert!(!req.create_if_not_exists); + assert!(req.or_replace); + } +} diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 1c5fd3fbfd..344792193c 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -799,6 +799,10 @@ impl Metasrv { .start() .await .context(StartProcedureManagerSnafu)?; + + if let Some(pending_flow_reconcile_ticker) = &self.pending_flow_reconcile_ticker { + pending_flow_reconcile_ticker.start(); + } } info!("Metasrv started"); diff --git a/tests/cases/standalone/common/flow/flow_pending.result b/tests/cases/standalone/common/flow/flow_pending.result new file mode 100644 index 0000000000..a0546a4d2f --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_pending.result @@ -0,0 +1,191 @@ +CREATE FLOW pending_user_cnt SINK TO pending_user_cnt_sink AS +SELECT host, COUNT(val) AS total_val +FROM pending_user_cnt_source +GROUP BY host; + +Affected Rows: 0 + +SELECT source_table_ids, source_table_names, flownode_ids +FROM information_schema.flows +WHERE flow_name = 'pending_user_cnt'; + ++------------------+--------------------+--------------+ +| source_table_ids | source_table_names | flownode_ids | ++------------------+--------------------+--------------+ +| [] | | {} | ++------------------+--------------------+--------------+ + +SHOW CREATE TABLE pending_user_cnt_sink; + +Error: 4001(TableNotFound), Table not found: pending_user_cnt_sink + +CREATE TABLE pending_user_cnt_source ( + ts TIMESTAMP TIME INDEX, + host STRING, + val DOUBLE, + PRIMARY KEY(host) +); + +Affected Rows: 0 + +-- SQLNESS SLEEP 15s +SHOW CREATE TABLE pending_user_cnt_sink; + ++-----------------------+------------------------------------------------------+ +| Table | Create Table | ++-----------------------+------------------------------------------------------+ +| pending_user_cnt_sink | CREATE TABLE IF NOT EXISTS "pending_user_cnt_sink" ( | +| | "host" STRING NULL, | +| | "total_val" BIGINT NULL, | +| | "update_at" TIMESTAMP(3) NULL, | +| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, | +| | TIME INDEX ("__ts_placeholder"), | +| | PRIMARY KEY ("host") | +| | ) | +| | | +| | ENGINE=mito | +| | WITH( | +| | 'comment' = 'Auto created table by flow engine' | +| | ) | ++-----------------------+------------------------------------------------------+ + +INSERT INTO pending_user_cnt_source VALUES + ('2026-03-12T00:00:00Z', 'host1', 1.0), + ('2026-03-12T00:00:01Z', 'host1', 2.0), + ('2026-03-12T00:00:02Z', 'host2', 3.0); + +Affected Rows: 3 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('pending_user_cnt'); + ++--------------------------------------+ +| ADMIN FLUSH_FLOW('pending_user_cnt') | ++--------------------------------------+ +| FLOW_FLUSHED | ++--------------------------------------+ + +SELECT host, total_val FROM pending_user_cnt_sink ORDER BY host; + ++-------+-----------+ +| host | total_val | ++-------+-----------+ +| host1 | 2 | +| host2 | 1 | ++-------+-----------+ + +DROP FLOW pending_user_cnt; + +Affected Rows: 0 + +DROP TABLE pending_user_cnt_source; + +Affected Rows: 0 + +DROP TABLE pending_user_cnt_sink; + +Affected Rows: 0 + +CREATE TABLE pending_replace_src_a ( + ts TIMESTAMP TIME INDEX, + host STRING, + val DOUBLE, + PRIMARY KEY(host) +); + +Affected Rows: 0 + +CREATE FLOW pending_replace_flow SINK TO pending_replace_sink AS +SELECT host, COUNT(val) AS total_val +FROM pending_replace_src_a +GROUP BY host; + +Affected Rows: 0 + +INSERT INTO pending_replace_src_a VALUES + ('2026-03-12T00:10:00Z', 'host_old', 1.0), + ('2026-03-12T00:10:01Z', 'host_old', 2.0); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('pending_replace_flow'); + ++------------------------------------------+ +| ADMIN FLUSH_FLOW('pending_replace_flow') | ++------------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------------+ + +SELECT host, total_val FROM pending_replace_sink WHERE host = 'host_old'; + ++----------+-----------+ +| host | total_val | ++----------+-----------+ +| host_old | 2 | ++----------+-----------+ + +CREATE OR REPLACE FLOW pending_replace_flow SINK TO pending_replace_sink AS +SELECT host, COUNT(val) AS total_val +FROM pending_replace_src_b +GROUP BY host; + +Affected Rows: 0 + +CREATE TABLE pending_replace_src_b ( + ts TIMESTAMP TIME INDEX, + host STRING, + val DOUBLE, + PRIMARY KEY(host) +); + +Affected Rows: 0 + +-- SQLNESS SLEEP 15s +INSERT INTO pending_replace_src_a VALUES + ('2026-03-12T00:11:00Z', 'host_old_after_replace', 9.0); + +Affected Rows: 1 + +INSERT INTO pending_replace_src_b VALUES + ('2026-03-12T00:11:00Z', 'host_new_after_replace', 3.0), + ('2026-03-12T00:11:01Z', 'host_new_after_replace', 4.0); + +Affected Rows: 2 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('pending_replace_flow'); + ++------------------------------------------+ +| ADMIN FLUSH_FLOW('pending_replace_flow') | ++------------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------------+ + +SELECT host, total_val +FROM pending_replace_sink +WHERE host IN ('host_old_after_replace', 'host_new_after_replace') +ORDER BY host; + ++------------------------+-----------+ +| host | total_val | ++------------------------+-----------+ +| host_new_after_replace | 2 | ++------------------------+-----------+ + +DROP FLOW pending_replace_flow; + +Affected Rows: 0 + +DROP TABLE pending_replace_src_a; + +Affected Rows: 0 + +DROP TABLE pending_replace_src_b; + +Affected Rows: 0 + +DROP TABLE pending_replace_sink; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_pending.sql b/tests/cases/standalone/common/flow/flow_pending.sql new file mode 100644 index 0000000000..e4a780b7b3 --- /dev/null +++ b/tests/cases/standalone/common/flow/flow_pending.sql @@ -0,0 +1,88 @@ +CREATE FLOW pending_user_cnt SINK TO pending_user_cnt_sink AS +SELECT host, COUNT(val) AS total_val +FROM pending_user_cnt_source +GROUP BY host; + +SELECT source_table_ids, source_table_names, flownode_ids +FROM information_schema.flows +WHERE flow_name = 'pending_user_cnt'; + +SHOW CREATE TABLE pending_user_cnt_sink; + +CREATE TABLE pending_user_cnt_source ( + ts TIMESTAMP TIME INDEX, + host STRING, + val DOUBLE, + PRIMARY KEY(host) +); + +-- SQLNESS SLEEP 15s +SHOW CREATE TABLE pending_user_cnt_sink; + +INSERT INTO pending_user_cnt_source VALUES + ('2026-03-12T00:00:00Z', 'host1', 1.0), + ('2026-03-12T00:00:01Z', 'host1', 2.0), + ('2026-03-12T00:00:02Z', 'host2', 3.0); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('pending_user_cnt'); + +SELECT host, total_val FROM pending_user_cnt_sink ORDER BY host; + +DROP FLOW pending_user_cnt; +DROP TABLE pending_user_cnt_source; +DROP TABLE pending_user_cnt_sink; + +CREATE TABLE pending_replace_src_a ( + ts TIMESTAMP TIME INDEX, + host STRING, + val DOUBLE, + PRIMARY KEY(host) +); + +CREATE FLOW pending_replace_flow SINK TO pending_replace_sink AS +SELECT host, COUNT(val) AS total_val +FROM pending_replace_src_a +GROUP BY host; + +INSERT INTO pending_replace_src_a VALUES + ('2026-03-12T00:10:00Z', 'host_old', 1.0), + ('2026-03-12T00:10:01Z', 'host_old', 2.0); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('pending_replace_flow'); + +SELECT host, total_val FROM pending_replace_sink WHERE host = 'host_old'; + +CREATE OR REPLACE FLOW pending_replace_flow SINK TO pending_replace_sink AS +SELECT host, COUNT(val) AS total_val +FROM pending_replace_src_b +GROUP BY host; + +CREATE TABLE pending_replace_src_b ( + ts TIMESTAMP TIME INDEX, + host STRING, + val DOUBLE, + PRIMARY KEY(host) +); + +-- SQLNESS SLEEP 15s +INSERT INTO pending_replace_src_a VALUES + ('2026-03-12T00:11:00Z', 'host_old_after_replace', 9.0); + +INSERT INTO pending_replace_src_b VALUES + ('2026-03-12T00:11:00Z', 'host_new_after_replace', 3.0), + ('2026-03-12T00:11:01Z', 'host_new_after_replace', 4.0); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('pending_replace_flow'); + +SELECT host, total_val +FROM pending_replace_sink +WHERE host IN ('host_old_after_replace', 'host_new_after_replace') +ORDER BY host; + +DROP FLOW pending_replace_flow; +DROP TABLE pending_replace_src_a; +DROP TABLE pending_replace_src_b; +DROP TABLE pending_replace_sink;