diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 234288def4..e394aa2a09 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod ask_leader; mod ddl; mod heartbeat; mod load_balance; @@ -201,7 +202,7 @@ impl MetaClient { } if let Some(client) = &mut self.ddl { client.start(urls).await?; - info!("Ddl client started"); + info!("DDL client started"); } Ok(()) diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs new file mode 100644 index 0000000000..529c8f94a0 --- /dev/null +++ b/src/meta-client/src/client/ask_leader.rs @@ -0,0 +1,109 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::{Arc, RwLock}; + +use api::v1::meta::heartbeat_client::HeartbeatClient; +use api::v1::meta::{AskLeaderRequest, RequestHeader, Role}; +use common_grpc::channel_manager::ChannelManager; +use common_telemetry::warn; +use rand::seq::SliceRandom; +use snafu::{OptionExt, ResultExt}; +use tonic::transport::Channel; + +use crate::client::Id; +use crate::error; +use crate::error::Result; + +#[derive(Debug)] +struct LeadershipGroup { + leader: Option, + peers: Vec, +} + +#[derive(Clone, Debug)] +pub struct AskLeader { + id: Id, + role: Role, + leadership_group: Arc>, + channel_manager: ChannelManager, +} + +impl AskLeader { + pub fn new( + id: Id, + role: Role, + peers: impl Into>, + channel_manager: ChannelManager, + ) -> Self { + let leadership_group = Arc::new(RwLock::new(LeadershipGroup { + leader: None, + peers: peers.into(), + })); + Self { + id, + role, + leadership_group, + channel_manager, + } + } + + pub fn get_leader(&self) -> Option { + self.leadership_group.read().unwrap().leader.clone() + } + + pub async fn ask_leader(&self) -> Result { + let mut peers = { + let leadership_group = self.leadership_group.read().unwrap(); + leadership_group.peers.clone() + }; + peers.shuffle(&mut rand::thread_rng()); + + let header = RequestHeader::new(self.id, self.role); + let mut leader = None; + for addr in &peers { + let req = AskLeaderRequest { + header: Some(header.clone()), + }; + let mut client = self.create_asker(addr)?; + match client.ask_leader(req).await { + Ok(res) => { + let Some(endpoint) = res.into_inner().leader else { + warn!("No leader from: {addr}"); + continue; + }; + leader = Some(endpoint.addr); + break; + } + Err(status) => { + warn!("Failed to ask leader from: {addr}, {status}"); + } + } + } + + let leader = leader.context(error::NoLeaderSnafu)?; + let mut leadership_group = self.leadership_group.write().unwrap(); + leadership_group.leader = Some(leader.clone()); + + Ok(leader) + } + + fn create_asker(&self, addr: impl AsRef) -> Result> { + Ok(HeartbeatClient::new( + self.channel_manager + .get(addr) + .context(error::CreateChannelSnafu)?, + )) + } +} diff --git a/src/meta-client/src/client/ddl.rs b/src/meta-client/src/client/ddl.rs index 8e5fda721d..e006480183 100644 --- a/src/meta-client/src/client/ddl.rs +++ b/src/meta-client/src/client/ddl.rs @@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt}; use tokio::sync::RwLock; use tonic::transport::Channel; -use crate::client::heartbeat::Inner as HeartbeatInner; +use crate::client::ask_leader::AskLeader; use crate::client::Id; use crate::error; use crate::error::Result; @@ -36,8 +36,8 @@ impl Client { let inner = Arc::new(RwLock::new(Inner { id, role, - channel_manager: channel_manager.clone(), - heartbeat_inner: HeartbeatInner::new(id, role, channel_manager), + channel_manager, + ask_leader: None, })); Self { inner } @@ -61,7 +61,7 @@ impl Client { &self, req: SubmitDdlTaskRequest, ) -> Result { - let mut inner = self.inner.write().await; + let inner = self.inner.read().await; inner.submit_ddl_task(req).await } } @@ -72,7 +72,7 @@ struct Inner { id: Id, role: Role, channel_manager: ChannelManager, - heartbeat_inner: HeartbeatInner, + ask_leader: Option, } impl Inner { @@ -84,11 +84,22 @@ impl Inner { ensure!( !self.is_started(), error::IllegalGrpcClientStateSnafu { - err_msg: "Router client already started", + err_msg: "DDL client already started", } ); - self.heartbeat_inner.start(urls).await?; + let peers = urls + .as_ref() + .iter() + .map(|url| url.as_ref().to_string()) + .collect::>(); + self.ask_leader = Some(AskLeader::new( + self.id, + self.role, + peers, + self.channel_manager.clone(), + )); + Ok(()) } @@ -103,17 +114,24 @@ impl Inner { #[inline] fn is_started(&self) -> bool { - self.heartbeat_inner.is_started() + self.ask_leader.is_some() } pub async fn submit_ddl_task( - &mut self, + &self, mut req: SubmitDdlTaskRequest, ) -> Result { - req.set_header(self.id, self.role); + ensure!( + self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "DDL client not start" + } + ); + req.set_header(self.id, self.role); + let ask_leader = self.ask_leader.as_ref().unwrap(); loop { - if let Some(leader) = &self.heartbeat_inner.get_leader() { + if let Some(leader) = &ask_leader.get_leader() { let mut client = self.make_client(leader)?; let res = client .submit_ddl_task(req.clone()) @@ -125,14 +143,14 @@ impl Inner { if let Some(header) = res.header.as_ref() { if let Some(err) = header.error.as_ref() { if err.code == ErrorCode::NotLeader as i32 { - self.heartbeat_inner.ask_leader().await?; + let _ = ask_leader.ask_leader().await?; continue; } } } return Ok(res); - } else if let Err(err) = self.heartbeat_inner.ask_leader().await { + } else if let Err(err) = ask_leader.ask_leader().await { return Err(err); } } diff --git a/src/meta-client/src/client/heartbeat.rs b/src/meta-client/src/client/heartbeat.rs index 108ca85de1..0ec04d0537 100644 --- a/src/meta-client/src/client/heartbeat.rs +++ b/src/meta-client/src/client/heartbeat.rs @@ -11,21 +11,20 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - -use std::collections::HashSet; use std::sync::Arc; use api::v1::meta::heartbeat_client::HeartbeatClient; -use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; +use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role}; use common_grpc::channel_manager::ChannelManager; use common_meta::rpc::util; -use common_telemetry::{debug, info}; +use common_telemetry::info; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::{mpsc, RwLock}; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::Streaming; +use crate::client::ask_leader::AskLeader; use crate::client::Id; use crate::error; use crate::error::{InvalidResponseHeaderSnafu, Result}; @@ -95,14 +94,7 @@ pub struct Client { impl Client { pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { - let inner = Arc::new(RwLock::new(Inner { - id, - role, - channel_manager, - peers: HashSet::default(), - leader: None, - })); - + let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager))); Self { inner } } @@ -116,12 +108,12 @@ impl Client { } pub async fn ask_leader(&mut self) -> Result<()> { - let mut inner = self.inner.write().await; + let inner = self.inner.read().await; inner.ask_leader().await } pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> { - let mut inner = self.inner.write().await; + let inner = self.inner.read().await; inner.ask_leader().await?; inner.heartbeat().await } @@ -133,25 +125,24 @@ impl Client { } #[derive(Debug)] -pub(crate) struct Inner { +struct Inner { id: Id, role: Role, channel_manager: ChannelManager, - peers: HashSet, - leader: Option, + ask_leader: Option, } impl Inner { - pub(crate) fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { + fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self { Self { id, role, channel_manager, - peers: HashSet::new(), - leader: None, + ask_leader: None, } } - pub(crate) async fn start(&mut self, urls: A) -> Result<()> + + async fn start(&mut self, urls: A) -> Result<()> where U: AsRef, A: AsRef<[U]>, @@ -163,20 +154,22 @@ impl Inner { } ); - self.peers = urls + let peers = urls .as_ref() .iter() .map(|url| url.as_ref().to_string()) - .collect(); + .collect::>(); + self.ask_leader = Some(AskLeader::new( + self.id, + self.role, + peers, + self.channel_manager.clone(), + )); Ok(()) } - pub(crate) fn get_leader(&self) -> Option { - self.leader.clone() - } - - pub(crate) async fn ask_leader(&mut self) -> Result<()> { + async fn ask_leader(&self) -> Result<()> { ensure!( self.is_started(), error::IllegalGrpcClientStateSnafu { @@ -184,31 +177,24 @@ impl Inner { } ); - let header = RequestHeader::new(self.id, self.role); - let mut leader = None; - for addr in &self.peers { - let req = AskLeaderRequest { - header: Some(header.clone()), - }; - let mut client = self.make_client(addr)?; - match client.ask_leader(req).await { - Ok(res) => { - if let Some(endpoint) = res.into_inner().leader { - leader = Some(endpoint.addr); - break; - } - } - Err(status) => { - debug!("Failed to ask leader from: {}, {}", addr, status); - } - } - } - self.leader = Some(leader.context(error::AskLeaderSnafu)?); + let _ = self.ask_leader.as_ref().unwrap().ask_leader().await; Ok(()) } async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> { - let leader = self.leader.as_ref().context(error::NoLeaderSnafu)?; + ensure!( + self.is_started(), + error::IllegalGrpcClientStateSnafu { + err_msg: "Heartbeat client not start" + } + ); + + let leader = self + .ask_leader + .as_ref() + .unwrap() + .get_leader() + .context(error::NoLeaderSnafu)?; let mut leader = self.make_client(leader)?; let (sender, receiver) = mpsc::channel::(128); @@ -256,7 +242,7 @@ impl Inner { #[inline] pub(crate) fn is_started(&self) -> bool { - !self.peers.is_empty() + self.ask_leader.is_some() } } @@ -291,16 +277,6 @@ mod test { )); } - #[tokio::test] - async fn test_start_with_duplicate_peers() { - let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default()); - client - .start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"]) - .await - .unwrap(); - assert_eq!(1, client.inner.write().await.peers.len()); - } - #[tokio::test] async fn test_heartbeat_stream() { let (sender, mut receiver) = mpsc::channel::(100);