diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index f5c37f5181..e9b804a499 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; +use common_meta::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef}; use common_meta::peer::Peer; use moka::future::{Cache, CacheBuilder}; @@ -45,7 +45,7 @@ impl Debug for NodeClients { } #[async_trait::async_trait] -impl NodeManager for NodeClients { +impl DatanodeManager for NodeClients { async fn datanode(&self, datanode: &Peer) -> DatanodeRef { let client = self.get_client(datanode).await; @@ -60,7 +60,10 @@ impl NodeManager for NodeClients { *accept_compression, )) } +} +#[async_trait::async_trait] +impl FlownodeManager for NodeClients { async fn flownode(&self, flownode: &Peer) -> FlownodeRef { let client = self.get_client(flownode).await; diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 372903975f..2126f7dc78 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -911,7 +911,7 @@ mod tests { use crate::key::flow::FlowMetadataManager; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; - use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager}; + use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef}; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; use crate::region_registry::LeaderRegionRegistry; @@ -923,11 +923,14 @@ mod tests { pub struct DummyDatanodeManager; #[async_trait::async_trait] - impl NodeManager for DummyDatanodeManager { + impl DatanodeManager for DummyDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { unimplemented!() } + } + #[async_trait::async_trait] + impl FlownodeManager for DummyDatanodeManager { async fn flownode(&self, _node: &Peer) -> FlownodeRef { unimplemented!() } diff --git a/src/common/meta/src/node_manager.rs b/src/common/meta/src/node_manager.rs index b640cf307c..0410a0fa28 100644 --- a/src/common/meta/src/node_manager.rs +++ b/src/common/meta/src/node_manager.rs @@ -51,12 +51,23 @@ pub type FlownodeRef = Arc; /// Datanode manager #[async_trait::async_trait] -pub trait NodeManager: Send + Sync { - /// Retrieves a target `datanode`. +pub trait DatanodeManager: Send + Sync { async fn datanode(&self, node: &Peer) -> DatanodeRef; +} - /// Retrieves a target `flownode`. +pub type DatanodeManagerRef = Arc; + +/// Flownode manager +#[async_trait::async_trait] +pub trait FlownodeManager: Send + Sync { async fn flownode(&self, node: &Peer) -> FlownodeRef; } +pub type FlownodeManagerRef = Arc; + +/// Node manager +pub trait NodeManager: DatanodeManager + FlownodeManager {} + +impl NodeManager for T {} + pub type NodeManagerRef = Arc; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index ac93e13c94..9d8e9a6822 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -33,7 +33,7 @@ use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::kv_backend::KvBackendRef; use crate::node_manager::{ - Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef, + Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef, }; use crate::peer::{Peer, PeerLookupService}; use crate::region_keeper::MemoryRegionKeeper; @@ -120,15 +120,18 @@ impl Datanode for MockNode { } #[async_trait::async_trait] -impl NodeManager for MockDatanodeManager { +impl DatanodeManager for MockDatanodeManager { async fn datanode(&self, peer: &Peer) -> DatanodeRef { Arc::new(MockNode { peer: peer.clone(), handler: self.handler.clone(), }) } +} - async fn flownode(&self, _node: &Peer) -> FlownodeRef { +#[async_trait::async_trait] +impl FlownodeManager for MockDatanodeManager { + async fn flownode(&self, _peer: &Peer) -> FlownodeRef { unimplemented!() } } @@ -149,11 +152,7 @@ impl Flownode for MockNode { } #[async_trait::async_trait] -impl NodeManager for MockFlownodeManager { - async fn datanode(&self, _peer: &Peer) -> DatanodeRef { - unimplemented!() - } - +impl FlownodeManager for MockFlownodeManager { async fn flownode(&self, peer: &Peer) -> FlownodeRef { Arc::new(MockNode { peer: peer.clone(), @@ -162,6 +161,13 @@ impl NodeManager for MockFlownodeManager { } } +#[async_trait::async_trait] +impl DatanodeManager for MockFlownodeManager { + async fn datanode(&self, _peer: &Peer) -> DatanodeRef { + unimplemented!() + } +} + /// Returns a test purpose [DdlContext]. pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext { let kv_backend = Arc::new(MemoryKvBackend::new()); diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 55a11cf127..f7fbc43f63 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -20,7 +20,9 @@ use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager}; +use common_meta::node_manager::{ + Datanode, DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef, +}; use common_meta::peer::Peer; use common_query::request::QueryRequest; use common_recordbatch::SendableRecordBatchStream; @@ -38,11 +40,14 @@ pub struct StandaloneDatanodeManager { } #[async_trait] -impl NodeManager for StandaloneDatanodeManager { +impl DatanodeManager for StandaloneDatanodeManager { async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { RegionInvoker::arc(self.region_server.clone()) } +} +#[async_trait] +impl FlownodeManager for StandaloneDatanodeManager { async fn flownode(&self, _node: &Peer) -> FlownodeRef { self.flow_server.clone() }