diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 01ebacec65..eeb0cd1e99 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat, Role}; +use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -387,10 +387,7 @@ pub async fn new_metasrv_client( .connect_timeout(meta_config.connect_timeout), ); - let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode) - .enable_heartbeat() - .enable_router() - .enable_store() + let mut meta_client = MetaClientBuilder::datanode_default_options(cluster_id, member_id) .channel_manager(channel_manager) .heartbeat_channel_manager(heartbeat_channel_manager) .build(); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 096c3de247..5768d34e7c 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -24,7 +24,6 @@ pub mod standalone; use std::sync::Arc; -use api::v1::meta::Role; use api::v1::{RowDeleteRequests, RowInsertRequests}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -146,13 +145,7 @@ impl Instance { let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config); let cluster_id = 0; // It is currently a reserved field and has not been enabled. - let member_id = 0; // Frontend does not need a member id. - let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Frontend) - .enable_router() - .enable_store() - .enable_heartbeat() - .enable_procedure() - .enable_access_cluster_info() + let mut meta_client = MetaClientBuilder::frontend_default_options(cluster_id) .channel_manager(channel_manager) .ddl_channel_manager(ddl_channel_manager) .build(); diff --git a/src/meta-client/examples/meta_client.rs b/src/meta-client/examples/meta_client.rs index 304199812e..1e4043b5d6 100644 --- a/src/meta-client/examples/meta_client.rs +++ b/src/meta-client/examples/meta_client.rs @@ -14,7 +14,7 @@ use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, Peer, Role}; +use api::v1::meta::{HeartbeatRequest, Peer}; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest, @@ -37,10 +37,7 @@ async fn run() { .connect_timeout(Duration::from_secs(5)) .tcp_nodelay(true); let channel_manager = ChannelManager::with_config(config); - let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode) - .enable_heartbeat() - .enable_router() - .enable_store() + let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1) .channel_manager(channel_manager) .build(); meta_client.start(&["127.0.0.1:3002"]).await.unwrap(); diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 2c609d4a9b..93b999635d 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -65,7 +65,6 @@ pub struct MetaClientBuilder { id: Id, role: Role, enable_heartbeat: bool, - enable_router: bool, enable_store: bool, enable_lock: bool, enable_procedure: bool, @@ -84,6 +83,23 @@ impl MetaClientBuilder { } } + /// Returns the role of Frontend's default options. + pub fn frontend_default_options(cluster_id: ClusterId) -> Self { + // Frontend does not need a member id. + Self::new(cluster_id, 0, Role::Frontend) + .enable_store() + .enable_heartbeat() + .enable_procedure() + .enable_access_cluster_info() + } + + /// Returns the role of Datanode's default options. + pub fn datanode_default_options(cluster_id: ClusterId, member_id: u64) -> Self { + Self::new(cluster_id, member_id, Role::Datanode) + .enable_store() + .enable_heartbeat() + } + pub fn enable_heartbeat(self) -> Self { Self { enable_heartbeat: true, @@ -91,13 +107,6 @@ impl MetaClientBuilder { } } - pub fn enable_router(self) -> Self { - Self { - enable_router: true, - ..self - } - } - pub fn enable_store(self) -> Self { Self { enable_store: true, @@ -154,10 +163,6 @@ impl MetaClientBuilder { MetaClient::new(self.id) }; - if !(self.enable_heartbeat || self.enable_router || self.enable_store || self.enable_lock) { - panic!("At least one client needs to be enabled.") - } - let mgr = client.channel_manager.clone(); if self.enable_heartbeat { @@ -169,12 +174,15 @@ impl MetaClientBuilder { DEFAULT_ASK_LEADER_MAX_RETRY, )); } + if self.enable_store { client.store = Some(StoreClient::new(self.id, self.role, mgr.clone())); } + if self.enable_lock { client.lock = Some(LockClient::new(self.id, self.role, mgr.clone())); } + if self.enable_procedure { let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone()); client.procedure = Some(ProcedureClient::new( @@ -184,6 +192,7 @@ impl MetaClientBuilder { DEFAULT_SUBMIT_DDL_MAX_RETRY, )); } + if self.enable_access_cluster_info { client.cluster = Some(ClusterClient::new( self.id, @@ -579,9 +588,7 @@ mod tests { assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); - let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) - .enable_router() - .build(); + let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode).build(); assert!(meta_client.heartbeat_client().is_err()); assert!(meta_client.store_client().is_err()); meta_client.start(urls).await.unwrap(); @@ -595,7 +602,6 @@ mod tests { let mut meta_client = MetaClientBuilder::new(1, 2, Role::Datanode) .enable_heartbeat() - .enable_router() .enable_store() .build(); assert_eq!(1, meta_client.id().0); @@ -609,7 +615,6 @@ mod tests { async fn test_not_start_heartbeat_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) - .enable_router() .enable_store() .build(); meta_client.start(urls).await.unwrap(); @@ -622,7 +627,6 @@ mod tests { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode) .enable_heartbeat() - .enable_router() .build(); meta_client.start(urls).await.unwrap(); @@ -630,12 +634,6 @@ mod tests { assert!(matches!(res.err(), Some(error::Error::NotStarted { .. }))); } - #[should_panic] - #[test] - fn test_failed_when_start_nothing() { - let _ = MetaClientBuilder::new(0, 0, Role::Datanode).build(); - } - #[tokio::test] async fn test_ask_leader() { let tc = new_client("test_ask_leader").await; diff --git a/src/meta-client/src/mocks.rs b/src/meta-client/src/mocks.rs index 3054e84be3..cb102bc024 100644 --- a/src/meta-client/src/mocks.rs +++ b/src/meta-client/src/mocks.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::Role; use meta_srv::mocks as server_mock; use meta_srv::mocks::MockInfo; @@ -37,10 +36,7 @@ pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient { } = mock_info; let id = (1000u64, 2000u64); - let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode) - .enable_heartbeat() - .enable_router() - .enable_store() + let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1) .channel_manager(channel_manager) .build(); meta_client.start(&[&server_addr]).await.unwrap(); diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index cf8016a435..b75b6f11aa 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -17,7 +17,6 @@ use std::env; use std::sync::Arc; use std::time::Duration; -use api::v1::meta::Role; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; @@ -312,12 +311,10 @@ impl GreptimeDbClusterBuilder { } async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode { - let mut meta_client = MetaClientBuilder::new(1000, opts.node_id.unwrap(), Role::Datanode) - .enable_router() - .enable_store() - .enable_heartbeat() - .channel_manager(metasrv.channel_manager) - .build(); + let mut meta_client = + MetaClientBuilder::datanode_default_options(1000, opts.node_id.unwrap()) + .channel_manager(metasrv.channel_manager) + .build(); meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_backend = Arc::new(MetaKvBackend { @@ -341,13 +338,8 @@ impl GreptimeDbClusterBuilder { metasrv: MockInfo, datanode_clients: Arc, ) -> Arc { - let mut meta_client = MetaClientBuilder::new(1000, 0, Role::Frontend) - .enable_router() - .enable_store() - .enable_heartbeat() + let mut meta_client = MetaClientBuilder::frontend_default_options(1000) .channel_manager(metasrv.channel_manager) - .enable_procedure() - .enable_access_cluster_info() .build(); meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client);