refactor: rwlock for frontend invoker&async lock

This commit is contained in:
discord9
2024-05-07 16:31:23 +08:00
parent 878737f781
commit d5bdbedcd6
3 changed files with 14 additions and 11 deletions

View File

@@ -415,7 +415,7 @@ impl StartCommand {
table_metadata_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build());
let flownode = Arc::new(flow_builder.build().await);
let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());

View File

@@ -27,6 +27,7 @@ use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::{TableNameKey, TableNameManager};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_telemetry::info;
use greptime_proto::v1;
use hydroflow::scheduled::graph::Hydroflow;
use itertools::Itertools;
@@ -39,7 +40,7 @@ use smallvec::SmallVec;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use table::metadata::TableId;
use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::sync::{broadcast, mpsc, oneshot, Mutex, RwLock};
use tokio::task::LocalSet;
use crate::adapter::error::{
@@ -112,7 +113,7 @@ impl FlownodeBuilder {
}
/// TODO(discord9): error handling
pub fn build(self) -> FlownodeManager {
pub async fn build(self) -> FlownodeManager {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in flownode only translate plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
@@ -131,10 +132,12 @@ impl FlownodeBuilder {
let (flow_node_manager, mut worker) =
FlownodeManager::new_with_worker(node_id, query_engine, self.table_meta.clone());
let _ = tx.send(flow_node_manager);
info!("Flow Worker started in new thread");
worker.run();
});
rx.blocking_recv().unwrap()
let man = rx.await.unwrap();
info!("Flow Node Manager started");
man
}
}
@@ -170,7 +173,7 @@ pub struct FlownodeManager {
query_engine: Arc<dyn QueryEngine>,
/// Getting table name and table schema from table info manager
table_info_source: TableInfoSource,
frontend_invoker: Mutex<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
frontend_invoker: RwLock<Option<Box<dyn FrontendInvoker + Send + Sync>>>,
/// contains mapping from table name to global id, and table schema
node_context: Mutex<FlowNodeContext>,
tick_manager: FlowTickManager,
@@ -193,7 +196,7 @@ impl FlownodeManager {
/// set frontend invoker
pub async fn set_frontend_invoker(&self, frontend: Box<dyn FrontendInvoker + Send + Sync>) {
*self.frontend_invoker.lock().await = Some(frontend);
*self.frontend_invoker.write().await = Some(frontend);
}
/// Create **without** setting `frontend_invoker`
@@ -213,7 +216,7 @@ impl FlownodeManager {
worker_handles,
query_engine,
table_info_source: srv_map,
frontend_invoker: Mutex::new(None),
frontend_invoker: RwLock::new(None),
node_context: Mutex::new(node_context),
tick_manager,
node_id,
@@ -445,7 +448,7 @@ impl FlownodeManager {
};
req_cnt += 1;
self.frontend_invoker
.lock()
.read()
.await
.as_ref()
.with_context(|| UnexpectedSnafu {
@@ -470,7 +473,7 @@ impl FlownodeManager {
req_cnt += 1;
self.frontend_invoker
.lock()
.read()
.await
.as_ref()
.with_context(|| UnexpectedSnafu {

View File

@@ -137,7 +137,7 @@ impl GreptimeDbStandaloneBuilder {
table_metadata_manager.clone(),
)
.with_kv_backend(kv_backend.clone());
let flownode = Arc::new(flow_builder.build());
let flownode = Arc::new(flow_builder.build().await);
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());