feat: metaclient builder options with role (#3909)

* feat: metaclient builder options with default role

* chore: remove unnecessary ut
This commit is contained in:
Jeremyhi
2024-05-11 14:17:14 +08:00
committed by GitHub
parent 9aa2182cb2
commit fa6c371380
6 changed files with 33 additions and 60 deletions

View File

@@ -16,7 +16,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat, Role};
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
@@ -387,10 +387,7 @@ pub async fn new_metasrv_client(
.connect_timeout(meta_config.connect_timeout),
);
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
let mut meta_client = MetaClientBuilder::datanode_default_options(cluster_id, member_id)
.channel_manager(channel_manager)
.heartbeat_channel_manager(heartbeat_channel_manager)
.build();

View File

@@ -24,7 +24,6 @@ pub mod standalone;
use std::sync::Arc;
use api::v1::meta::Role;
use api::v1::{RowDeleteRequests, RowInsertRequests};
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -146,13 +145,7 @@ impl Instance {
let ddl_channel_manager = ChannelManager::with_config(ddl_channel_config);
let cluster_id = 0; // It is currently a reserved field and has not been enabled.
let member_id = 0; // Frontend does not need a member id.
let mut meta_client = MetaClientBuilder::new(cluster_id, member_id, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
let mut meta_client = MetaClientBuilder::frontend_default_options(cluster_id)
.channel_manager(channel_manager)
.ddl_channel_manager(ddl_channel_manager)
.build();

View File

@@ -14,7 +14,7 @@
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use api::v1::meta::{HeartbeatRequest, Peer};
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
@@ -37,10 +37,7 @@ async fn run() {
.connect_timeout(Duration::from_secs(5))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config);
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1)
.channel_manager(channel_manager)
.build();
meta_client.start(&["127.0.0.1:3002"]).await.unwrap();

View File

@@ -65,7 +65,6 @@ pub struct MetaClientBuilder {
id: Id,
role: Role,
enable_heartbeat: bool,
enable_router: bool,
enable_store: bool,
enable_lock: bool,
enable_procedure: bool,
@@ -84,6 +83,23 @@ impl MetaClientBuilder {
}
}
/// Returns the role of Frontend's default options.
pub fn frontend_default_options(cluster_id: ClusterId) -> Self {
// Frontend does not need a member id.
Self::new(cluster_id, 0, Role::Frontend)
.enable_store()
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
}
/// Returns the role of Datanode's default options.
pub fn datanode_default_options(cluster_id: ClusterId, member_id: u64) -> Self {
Self::new(cluster_id, member_id, Role::Datanode)
.enable_store()
.enable_heartbeat()
}
pub fn enable_heartbeat(self) -> Self {
Self {
enable_heartbeat: true,
@@ -91,13 +107,6 @@ impl MetaClientBuilder {
}
}
pub fn enable_router(self) -> Self {
Self {
enable_router: true,
..self
}
}
pub fn enable_store(self) -> Self {
Self {
enable_store: true,
@@ -154,10 +163,6 @@ impl MetaClientBuilder {
MetaClient::new(self.id)
};
if !(self.enable_heartbeat || self.enable_router || self.enable_store || self.enable_lock) {
panic!("At least one client needs to be enabled.")
}
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
@@ -169,12 +174,15 @@ impl MetaClientBuilder {
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_store {
client.store = Some(StoreClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_procedure {
let mgr = self.ddl_channel_manager.unwrap_or(mgr.clone());
client.procedure = Some(ProcedureClient::new(
@@ -184,6 +192,7 @@ impl MetaClientBuilder {
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}
if self.enable_access_cluster_info {
client.cluster = Some(ClusterClient::new(
self.id,
@@ -579,9 +588,7 @@ mod tests {
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_router()
.build();
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode).build();
assert!(meta_client.heartbeat_client().is_err());
assert!(meta_client.store_client().is_err());
meta_client.start(urls).await.unwrap();
@@ -595,7 +602,6 @@ mod tests {
let mut meta_client = MetaClientBuilder::new(1, 2, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
.build();
assert_eq!(1, meta_client.id().0);
@@ -609,7 +615,6 @@ mod tests {
async fn test_not_start_heartbeat_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_router()
.enable_store()
.build();
meta_client.start(urls).await.unwrap();
@@ -622,7 +627,6 @@ mod tests {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];
let mut meta_client = MetaClientBuilder::new(0, 0, Role::Datanode)
.enable_heartbeat()
.enable_router()
.build();
meta_client.start(urls).await.unwrap();
@@ -630,12 +634,6 @@ mod tests {
assert!(matches!(res.err(), Some(error::Error::NotStarted { .. })));
}
#[should_panic]
#[test]
fn test_failed_when_start_nothing() {
let _ = MetaClientBuilder::new(0, 0, Role::Datanode).build();
}
#[tokio::test]
async fn test_ask_leader() {
let tc = new_client("test_ask_leader").await;

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::Role;
use meta_srv::mocks as server_mock;
use meta_srv::mocks::MockInfo;
@@ -37,10 +36,7 @@ pub async fn mock_client_by(mock_info: MockInfo) -> MetaClient {
} = mock_info;
let id = (1000u64, 2000u64);
let mut meta_client = MetaClientBuilder::new(id.0, id.1, Role::Datanode)
.enable_heartbeat()
.enable_router()
.enable_store()
let mut meta_client = MetaClientBuilder::datanode_default_options(id.0, id.1)
.channel_manager(channel_manager)
.build();
meta_client.start(&[&server_addr]).await.unwrap();

View File

@@ -17,7 +17,6 @@ use std::env;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::Role;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
@@ -312,12 +311,10 @@ impl GreptimeDbClusterBuilder {
}
async fn create_datanode(&self, opts: DatanodeOptions, metasrv: MockInfo) -> Datanode {
let mut meta_client = MetaClientBuilder::new(1000, opts.node_id.unwrap(), Role::Datanode)
.enable_router()
.enable_store()
.enable_heartbeat()
.channel_manager(metasrv.channel_manager)
.build();
let mut meta_client =
MetaClientBuilder::datanode_default_options(1000, opts.node_id.unwrap())
.channel_manager(metasrv.channel_manager)
.build();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_backend = Arc::new(MetaKvBackend {
@@ -341,13 +338,8 @@ impl GreptimeDbClusterBuilder {
metasrv: MockInfo,
datanode_clients: Arc<DatanodeClients>,
) -> Arc<FeInstance> {
let mut meta_client = MetaClientBuilder::new(1000, 0, Role::Frontend)
.enable_router()
.enable_store()
.enable_heartbeat()
let mut meta_client = MetaClientBuilder::frontend_default_options(1000)
.channel_manager(metasrv.channel_manager)
.enable_procedure()
.enable_access_cluster_info()
.build();
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);