diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8edf3bf2cb..10714d88e5 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -415,7 +415,7 @@ impl StartCommand { table_metadata_manager.clone(), ) .with_kv_backend(kv_backend.clone()); - let flownode = Arc::new(flow_builder.build()); + let flownode = Arc::new(flow_builder.build().await); let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index e5fbf7713b..bbd4abcb54 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -27,6 +27,7 @@ use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::{TableNameKey, TableNameManager}; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_telemetry::info; use greptime_proto::v1; use hydroflow::scheduled::graph::Hydroflow; use itertools::Itertools; @@ -39,7 +40,7 @@ use smallvec::SmallVec; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use table::metadata::TableId; -use tokio::sync::{broadcast, mpsc, oneshot, Mutex}; +use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock}; use tokio::task::LocalSet; use crate::adapter::error::{ @@ -112,7 +113,7 @@ impl FlownodeBuilder { } /// TODO(discord9): error handling - pub fn build(self) -> FlownodeManager { + pub async fn build(self) -> FlownodeManager { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in flownode only translate plan with resolved table source. MemoryCatalogManager::with_default_setup(), @@ -131,10 +132,12 @@ impl FlownodeBuilder { let (flow_node_manager, mut worker) = FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone()); let _ = tx.send(flow_node_manager); + info!("Flow Worker started in new thread"); worker.run(); }); - - rx.blocking_recv().unwrap() + let man = rx.await.unwrap(); + info!("Flow Node Manager started"); + man } } @@ -170,7 +173,7 @@ pub struct FlownodeManager { query_engine: Arc, /// Getting table name and table schema from table info manager table_info_source: TableInfoSource, - frontend_invoker: Mutex>>, + frontend_invoker: RwLock>>, /// contains mapping from table name to global id, and table schema node_context: Mutex, tick_manager: FlowTickManager, @@ -193,7 +196,7 @@ impl FlownodeManager { /// set frontend invoker pub async fn set_frontend_invoker(&self, frontend: Box) { - *self.frontend_invoker.lock().await = Some(frontend); + *self.frontend_invoker.write().await = Some(frontend); } /// Create **without** setting `frontend_invoker` @@ -213,7 +216,7 @@ impl FlownodeManager { worker_handles, query_engine, table_info_source: srv_map, - frontend_invoker: Mutex::new(None), + frontend_invoker: RwLock::new(None), node_context: Mutex::new(node_context), tick_manager, node_id, @@ -445,7 +448,7 @@ impl FlownodeManager { }; req_cnt += 1; self.frontend_invoker - .lock() + .read() .await .as_ref() .with_context(|| UnexpectedSnafu { @@ -470,7 +473,7 @@ impl FlownodeManager { req_cnt += 1; self.frontend_invoker - .lock() + .read() .await .as_ref() .with_context(|| UnexpectedSnafu { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 86da0de5a1..4fad9e4b2e 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -137,7 +137,7 @@ impl GreptimeDbStandaloneBuilder { table_metadata_manager.clone(), ) .with_kv_backend(kv_backend.clone()); - let flownode = Arc::new(flow_builder.build()); + let flownode = Arc::new(flow_builder.build().await); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());