diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index fae984c27a..0657858609 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -158,6 +158,7 @@ pub struct FlowWorkerManager { flow_err_collectors: RwLock>, src_send_buf_lens: RwLock>>, tick_manager: FlowTickManager, + /// This node id is only available in distributed mode, on standalone mode this is guaranteed to be `None` pub node_id: Option, /// Lock for flushing, will be `read` by `handle_inserts` and `write` by `flush_flow` /// diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index af557c8404..87510f18ed 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -126,8 +126,11 @@ impl FlowDualEngine { allow_create: bool, allow_drop: bool, ) -> Result<(), Error> { + // use nodeid to determine if this is standalone/distributed mode, and retrieve all flows in this node(in distributed mode)/or all flows(in standalone mode) let nodeid = self.streaming_engine.node_id; let should_exists: Vec<_> = if let Some(nodeid) = nodeid { + // nodeid is available, so we only need to check flows on this node + // which also means we are in distributed mode let to_be_recover = self .flow_metadata_manager .flownode_flow_manager() @@ -139,6 +142,8 @@ impl FlowDualEngine { })?; to_be_recover.into_iter().map(|(id, _)| id).collect() } else { + // nodeid is not available, so we need to check all flows + // which also means we are in standalone mode let all_catalogs = self .catalog_manager .catalog_names()