fix: always create flow reconcile ticker

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-03-16 14:45:25 +08:00
parent e9fec341ec
commit da914ee6ad
6 changed files with 422 additions and 6 deletions

View File

@@ -59,6 +59,7 @@ use frontend::frontend::Frontend;
use frontend::instance::StandaloneDatanodeManager;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_srv::flow::{PendingFlowReconcileManager, PendingFlowReconcileTicker};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use plugins::PluginOptions;
use plugins::frontend::context::{
@@ -122,12 +123,44 @@ pub struct Instance {
datanode: Datanode,
frontend: Frontend,
flownode: FlownodeInstance,
pending_flow_reconcile_runtime: PendingFlowReconcileRuntime,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
struct PendingFlowReconcileRuntime {
manager: Option<PendingFlowReconcileManager>,
ticker: PendingFlowReconcileTicker,
}
impl PendingFlowReconcileRuntime {
fn new(ddl_manager: Arc<DdlManager>) -> Self {
let (manager, ticker) = PendingFlowReconcileManager::new(ddl_manager);
Self {
manager: Some(manager),
ticker,
}
}
fn start(&mut self) -> Result<()> {
if let Some(manager) = self.manager.take() {
manager
.try_start()
.map_err(BoxedError::new)
.context(OtherSnafu)?;
self.ticker.start();
}
Ok(())
}
fn stop(&self) {
self.ticker.stop();
}
}
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
@@ -176,11 +209,14 @@ impl App for Instance {
.context(error::StartFrontendSnafu)?;
self.flownode.start().await.context(StartFlownodeSnafu)?;
self.pending_flow_reconcile_runtime.start()?;
Ok(())
}
async fn stop(&mut self) -> Result<()> {
self.pending_flow_reconcile_runtime.stop();
self.frontend
.shutdown()
.await
@@ -559,6 +595,8 @@ impl StartCommand {
.procedure_executor_creator
.create(Arc::new(ddl_manager), procedure_manager.clone())
.await?;
let ddl_manager = Arc::new(ddl_manager);
let pending_flow_reconcile_runtime = PendingFlowReconcileRuntime::new(ddl_manager.clone());
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
@@ -611,6 +649,7 @@ impl StartCommand {
datanode,
frontend,
flownode,
pending_flow_reconcile_runtime,
procedure_manager,
wal_provider,
_guard: vec![],

View File

@@ -108,7 +108,17 @@ impl ActivatePendingFlowProcedure {
};
if current_flow_info.get_inner_ref().is_active() {
return Ok(Status::done());
let routes = self
.context
.flow_metadata_manager
.flow_route_manager()
.routes(self.data.flow_id)
.await?;
self.data.peers = routes.into_iter().map(|(_, value)| value.peer).collect();
self.data.resolved_table_ids =
current_flow_info.get_inner_ref().source_table_ids.clone();
self.data.state = ActivatePendingFlowState::InvalidateFlowCache;
return Ok(Status::executing(true));
}
let resolution =
@@ -206,7 +216,8 @@ impl ActivatePendingFlowProcedure {
&self.data.resolved_table_ids,
&self.data.peers,
)
.await?;
.await
.map_err(error::Error::retry_later)?;
Ok(Status::done_with_output(self.data.flow_id))
}
}
@@ -344,8 +355,8 @@ fn build_create_request(
.map(|table_id| api::v1::TableId { id: *table_id })
.collect_vec(),
sink_table_name: Some(flow_info.sink_table_name.clone().into()),
create_if_not_exists: true,
or_replace: false,
create_if_not_exists: false,
or_replace: true,
expire_after: flow_info.expire_after.map(|value| ExpireAfter { value }),
eval_interval: flow_info
.eval_interval_secs

View File

@@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use api::v1::flow::flow_request::Body as PbFlowRequest;
use api::v1::flow::{CreateRequest, FlowRequest, FlowResponse};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure_test::execute_procedure_until_done;
use common_time::TimeToLive;
@@ -23,7 +25,38 @@ use crate::ddl::activate_flow::ActivatePendingFlowProcedure;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::tests::create_flow::create_test_flow;
use crate::key::table_route::TableRouteValue;
use crate::test_util::{MockFlownodeManager, new_ddl_context};
use crate::test_util::{MockFlownodeHandler, MockFlownodeManager, new_ddl_context};
#[derive(Clone, Default)]
struct RecordingFlownodeHandler {
create_requests: Arc<Mutex<Vec<CreateRequest>>>,
}
#[async_trait::async_trait]
impl MockFlownodeHandler for RecordingFlownodeHandler {
async fn handle(
&self,
_peer: &crate::peer::Peer,
request: FlowRequest,
) -> crate::error::Result<FlowResponse> {
if let Some(PbFlowRequest::Create(create_req)) = request.body {
self.create_requests.lock().unwrap().push(create_req);
}
Ok(FlowResponse {
affected_rows: 0,
..Default::default()
})
}
async fn handle_inserts(
&self,
_peer: &crate::peer::Peer,
_requests: api::v1::region::InsertRequests,
) -> crate::error::Result<FlowResponse> {
unreachable!()
}
}
#[tokio::test]
async fn test_activate_pending_flow() {
@@ -156,3 +189,53 @@ async fn test_activate_pending_flow_require_streaming_keeps_pending() {
.contains("requires streaming activation")
);
}
#[tokio::test]
async fn test_activate_pending_flow_uses_replace_semantics() {
let source_table_names = vec![TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"activate_replace_source_table",
)];
let sink_table_name = TableName::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
"activate_replace_sink_table",
);
let handler = RecordingFlownodeHandler::default();
let node_manager = Arc::new(MockFlownodeManager::new(handler.clone()));
let ddl_context = new_ddl_context(node_manager);
let flow_id = create_test_flow(
&ddl_context,
"activate_pending_flow_replace_semantics",
source_table_names,
sink_table_name,
)
.await;
let create_table_task = test_create_table_task("activate_replace_source_table", 1027);
ddl_context
.table_metadata_manager
.create_table_metadata(
create_table_task.table_info.clone(),
TableRouteValue::physical(vec![]),
Default::default(),
)
.await
.unwrap();
let mut procedure =
ActivatePendingFlowProcedure::new(flow_id, DEFAULT_CATALOG_NAME.to_string(), ddl_context);
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let activated_flow_id = output.downcast_ref::<u32>().unwrap();
assert_eq!(*activated_flow_id, flow_id);
let create_requests = handler.create_requests.lock().unwrap();
assert!(!create_requests.is_empty());
for req in create_requests.iter() {
assert!(!req.create_if_not_exists);
assert!(req.or_replace);
}
}

View File

@@ -799,6 +799,10 @@ impl Metasrv {
.start()
.await
.context(StartProcedureManagerSnafu)?;
if let Some(pending_flow_reconcile_ticker) = &self.pending_flow_reconcile_ticker {
pending_flow_reconcile_ticker.start();
}
}
info!("Metasrv started");

View File

@@ -0,0 +1,191 @@
CREATE FLOW pending_user_cnt SINK TO pending_user_cnt_sink AS
SELECT host, COUNT(val) AS total_val
FROM pending_user_cnt_source
GROUP BY host;
Affected Rows: 0
SELECT source_table_ids, source_table_names, flownode_ids
FROM information_schema.flows
WHERE flow_name = 'pending_user_cnt';
+------------------+--------------------+--------------+
| source_table_ids | source_table_names | flownode_ids |
+------------------+--------------------+--------------+
| [] | | {} |
+------------------+--------------------+--------------+
SHOW CREATE TABLE pending_user_cnt_sink;
Error: 4001(TableNotFound), Table not found: pending_user_cnt_sink
CREATE TABLE pending_user_cnt_source (
ts TIMESTAMP TIME INDEX,
host STRING,
val DOUBLE,
PRIMARY KEY(host)
);
Affected Rows: 0
-- SQLNESS SLEEP 15s
SHOW CREATE TABLE pending_user_cnt_sink;
+-----------------------+------------------------------------------------------+
| Table | Create Table |
+-----------------------+------------------------------------------------------+
| pending_user_cnt_sink | CREATE TABLE IF NOT EXISTS "pending_user_cnt_sink" ( |
| | "host" STRING NULL, |
| | "total_val" BIGINT NULL, |
| | "update_at" TIMESTAMP(3) NULL, |
| | "__ts_placeholder" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("__ts_placeholder"), |
| | PRIMARY KEY ("host") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | 'comment' = 'Auto created table by flow engine' |
| | ) |
+-----------------------+------------------------------------------------------+
INSERT INTO pending_user_cnt_source VALUES
('2026-03-12T00:00:00Z', 'host1', 1.0),
('2026-03-12T00:00:01Z', 'host1', 2.0),
('2026-03-12T00:00:02Z', 'host2', 3.0);
Affected Rows: 3
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('pending_user_cnt');
+--------------------------------------+
| ADMIN FLUSH_FLOW('pending_user_cnt') |
+--------------------------------------+
| FLOW_FLUSHED |
+--------------------------------------+
SELECT host, total_val FROM pending_user_cnt_sink ORDER BY host;
+-------+-----------+
| host | total_val |
+-------+-----------+
| host1 | 2 |
| host2 | 1 |
+-------+-----------+
DROP FLOW pending_user_cnt;
Affected Rows: 0
DROP TABLE pending_user_cnt_source;
Affected Rows: 0
DROP TABLE pending_user_cnt_sink;
Affected Rows: 0
CREATE TABLE pending_replace_src_a (
ts TIMESTAMP TIME INDEX,
host STRING,
val DOUBLE,
PRIMARY KEY(host)
);
Affected Rows: 0
CREATE FLOW pending_replace_flow SINK TO pending_replace_sink AS
SELECT host, COUNT(val) AS total_val
FROM pending_replace_src_a
GROUP BY host;
Affected Rows: 0
INSERT INTO pending_replace_src_a VALUES
('2026-03-12T00:10:00Z', 'host_old', 1.0),
('2026-03-12T00:10:01Z', 'host_old', 2.0);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('pending_replace_flow');
+------------------------------------------+
| ADMIN FLUSH_FLOW('pending_replace_flow') |
+------------------------------------------+
| FLOW_FLUSHED |
+------------------------------------------+
SELECT host, total_val FROM pending_replace_sink WHERE host = 'host_old';
+----------+-----------+
| host | total_val |
+----------+-----------+
| host_old | 2 |
+----------+-----------+
CREATE OR REPLACE FLOW pending_replace_flow SINK TO pending_replace_sink AS
SELECT host, COUNT(val) AS total_val
FROM pending_replace_src_b
GROUP BY host;
Affected Rows: 0
CREATE TABLE pending_replace_src_b (
ts TIMESTAMP TIME INDEX,
host STRING,
val DOUBLE,
PRIMARY KEY(host)
);
Affected Rows: 0
-- SQLNESS SLEEP 15s
INSERT INTO pending_replace_src_a VALUES
('2026-03-12T00:11:00Z', 'host_old_after_replace', 9.0);
Affected Rows: 1
INSERT INTO pending_replace_src_b VALUES
('2026-03-12T00:11:00Z', 'host_new_after_replace', 3.0),
('2026-03-12T00:11:01Z', 'host_new_after_replace', 4.0);
Affected Rows: 2
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('pending_replace_flow');
+------------------------------------------+
| ADMIN FLUSH_FLOW('pending_replace_flow') |
+------------------------------------------+
| FLOW_FLUSHED |
+------------------------------------------+
SELECT host, total_val
FROM pending_replace_sink
WHERE host IN ('host_old_after_replace', 'host_new_after_replace')
ORDER BY host;
+------------------------+-----------+
| host | total_val |
+------------------------+-----------+
| host_new_after_replace | 2 |
+------------------------+-----------+
DROP FLOW pending_replace_flow;
Affected Rows: 0
DROP TABLE pending_replace_src_a;
Affected Rows: 0
DROP TABLE pending_replace_src_b;
Affected Rows: 0
DROP TABLE pending_replace_sink;
Affected Rows: 0

View File

@@ -0,0 +1,88 @@
CREATE FLOW pending_user_cnt SINK TO pending_user_cnt_sink AS
SELECT host, COUNT(val) AS total_val
FROM pending_user_cnt_source
GROUP BY host;
SELECT source_table_ids, source_table_names, flownode_ids
FROM information_schema.flows
WHERE flow_name = 'pending_user_cnt';
SHOW CREATE TABLE pending_user_cnt_sink;
CREATE TABLE pending_user_cnt_source (
ts TIMESTAMP TIME INDEX,
host STRING,
val DOUBLE,
PRIMARY KEY(host)
);
-- SQLNESS SLEEP 15s
SHOW CREATE TABLE pending_user_cnt_sink;
INSERT INTO pending_user_cnt_source VALUES
('2026-03-12T00:00:00Z', 'host1', 1.0),
('2026-03-12T00:00:01Z', 'host1', 2.0),
('2026-03-12T00:00:02Z', 'host2', 3.0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('pending_user_cnt');
SELECT host, total_val FROM pending_user_cnt_sink ORDER BY host;
DROP FLOW pending_user_cnt;
DROP TABLE pending_user_cnt_source;
DROP TABLE pending_user_cnt_sink;
CREATE TABLE pending_replace_src_a (
ts TIMESTAMP TIME INDEX,
host STRING,
val DOUBLE,
PRIMARY KEY(host)
);
CREATE FLOW pending_replace_flow SINK TO pending_replace_sink AS
SELECT host, COUNT(val) AS total_val
FROM pending_replace_src_a
GROUP BY host;
INSERT INTO pending_replace_src_a VALUES
('2026-03-12T00:10:00Z', 'host_old', 1.0),
('2026-03-12T00:10:01Z', 'host_old', 2.0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('pending_replace_flow');
SELECT host, total_val FROM pending_replace_sink WHERE host = 'host_old';
CREATE OR REPLACE FLOW pending_replace_flow SINK TO pending_replace_sink AS
SELECT host, COUNT(val) AS total_val
FROM pending_replace_src_b
GROUP BY host;
CREATE TABLE pending_replace_src_b (
ts TIMESTAMP TIME INDEX,
host STRING,
val DOUBLE,
PRIMARY KEY(host)
);
-- SQLNESS SLEEP 15s
INSERT INTO pending_replace_src_a VALUES
('2026-03-12T00:11:00Z', 'host_old_after_replace', 9.0);
INSERT INTO pending_replace_src_b VALUES
('2026-03-12T00:11:00Z', 'host_new_after_replace', 3.0),
('2026-03-12T00:11:01Z', 'host_new_after_replace', 4.0);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('pending_replace_flow');
SELECT host, total_val
FROM pending_replace_sink
WHERE host IN ('host_old_after_replace', 'host_new_after_replace')
ORDER BY host;
DROP FLOW pending_replace_flow;
DROP TABLE pending_replace_src_a;
DROP TABLE pending_replace_src_b;
DROP TABLE pending_replace_sink;