refactor: split node manager trait (#6743)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-08-14 16:33:57 +08:00
committed by GitHub
parent d03f85287e
commit 92fd34ba22
5 changed files with 45 additions and 17 deletions

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use common_meta::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
use common_meta::peer::Peer;
use moka::future::{Cache, CacheBuilder};
@@ -45,7 +45,7 @@ impl Debug for NodeClients {
}
#[async_trait::async_trait]
impl NodeManager for NodeClients {
impl DatanodeManager for NodeClients {
async fn datanode(&self, datanode: &Peer) -> DatanodeRef {
let client = self.get_client(datanode).await;
@@ -60,7 +60,10 @@ impl NodeManager for NodeClients {
*accept_compression,
))
}
}
#[async_trait::async_trait]
impl FlownodeManager for NodeClients {
async fn flownode(&self, flownode: &Peer) -> FlownodeRef {
let client = self.get_client(flownode).await;

View File

@@ -911,7 +911,7 @@ mod tests {
use crate::key::flow::FlowMetadataManager;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::node_manager::{DatanodeRef, FlownodeRef, NodeManager};
use crate::node_manager::{DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef};
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::region_registry::LeaderRegionRegistry;
@@ -923,11 +923,14 @@ mod tests {
pub struct DummyDatanodeManager;
#[async_trait::async_trait]
impl NodeManager for DummyDatanodeManager {
impl DatanodeManager for DummyDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
unimplemented!()
}
}
#[async_trait::async_trait]
impl FlownodeManager for DummyDatanodeManager {
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
unimplemented!()
}

View File

@@ -51,12 +51,23 @@ pub type FlownodeRef = Arc<dyn Flownode>;
/// Datanode manager
#[async_trait::async_trait]
pub trait NodeManager: Send + Sync {
/// Retrieves a target `datanode`.
pub trait DatanodeManager: Send + Sync {
async fn datanode(&self, node: &Peer) -> DatanodeRef;
}
/// Retrieves a target `flownode`.
pub type DatanodeManagerRef = Arc<dyn DatanodeManager>;
/// Flownode manager
#[async_trait::async_trait]
pub trait FlownodeManager: Send + Sync {
async fn flownode(&self, node: &Peer) -> FlownodeRef;
}
pub type FlownodeManagerRef = Arc<dyn FlownodeManager>;
/// Node manager
pub trait NodeManager: DatanodeManager + FlownodeManager {}
impl<T: DatanodeManager + FlownodeManager> NodeManager for T {}
pub type NodeManagerRef = Arc<dyn NodeManager>;

View File

@@ -33,7 +33,7 @@ use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, NodeManagerRef,
};
use crate::peer::{Peer, PeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
@@ -120,15 +120,18 @@ impl<T: MockDatanodeHandler> Datanode for MockNode<T> {
}
#[async_trait::async_trait]
impl<T: MockDatanodeHandler + 'static> NodeManager for MockDatanodeManager<T> {
impl<T: MockDatanodeHandler + 'static> DatanodeManager for MockDatanodeManager<T> {
async fn datanode(&self, peer: &Peer) -> DatanodeRef {
Arc::new(MockNode {
peer: peer.clone(),
handler: self.handler.clone(),
})
}
}
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
#[async_trait::async_trait]
impl<T: 'static + Send + Sync> FlownodeManager for MockDatanodeManager<T> {
async fn flownode(&self, _peer: &Peer) -> FlownodeRef {
unimplemented!()
}
}
@@ -149,11 +152,7 @@ impl<T: MockFlownodeHandler> Flownode for MockNode<T> {
}
#[async_trait::async_trait]
impl<T: MockFlownodeHandler + 'static> NodeManager for MockFlownodeManager<T> {
async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
unimplemented!()
}
impl<T: MockFlownodeHandler + 'static> FlownodeManager for MockFlownodeManager<T> {
async fn flownode(&self, peer: &Peer) -> FlownodeRef {
Arc::new(MockNode {
peer: peer.clone(),
@@ -162,6 +161,13 @@ impl<T: MockFlownodeHandler + 'static> NodeManager for MockFlownodeManager<T> {
}
}
#[async_trait::async_trait]
impl<T: 'static + Send + Sync> DatanodeManager for MockFlownodeManager<T> {
async fn datanode(&self, _peer: &Peer) -> DatanodeRef {
unimplemented!()
}
}
/// Returns a test purpose [DdlContext].
pub fn new_ddl_context(node_manager: NodeManagerRef) -> DdlContext {
let kv_backend = Arc::new(MemoryKvBackend::new());

View File

@@ -20,7 +20,9 @@ use async_trait::async_trait;
use client::region::check_response_header;
use common_error::ext::BoxedError;
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager};
use common_meta::node_manager::{
Datanode, DatanodeManager, DatanodeRef, FlownodeManager, FlownodeRef,
};
use common_meta::peer::Peer;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
@@ -38,11 +40,14 @@ pub struct StandaloneDatanodeManager {
}
#[async_trait]
impl NodeManager for StandaloneDatanodeManager {
impl DatanodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
RegionInvoker::arc(self.region_server.clone())
}
}
#[async_trait]
impl FlownodeManager for StandaloneDatanodeManager {
async fn flownode(&self, _node: &Peer) -> FlownodeRef {
self.flow_server.clone()
}