feat: ask leader (#1957)

* feat: ask leader

* fix: license header

* chore: by comment
This commit is contained in:
JeremyHi
2023-07-14 11:32:47 +08:00
committed by GitHub
parent 5bfd0d9857
commit c9cce0225d
4 changed files with 178 additions and 74 deletions

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod ask_leader;
mod ddl;
mod heartbeat;
mod load_balance;
@@ -201,7 +202,7 @@ impl MetaClient {
}
if let Some(client) = &mut self.ddl {
client.start(urls).await?;
info!("Ddl client started");
info!("DDL client started");
}
Ok(())

View File

@@ -0,0 +1,109 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::{Arc, RwLock};
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
use tonic::transport::Channel;
use crate::client::Id;
use crate::error;
use crate::error::Result;
#[derive(Debug)]
struct LeadershipGroup {
leader: Option<String>,
peers: Vec<String>,
}
#[derive(Clone, Debug)]
pub struct AskLeader {
id: Id,
role: Role,
leadership_group: Arc<RwLock<LeadershipGroup>>,
channel_manager: ChannelManager,
}
impl AskLeader {
pub fn new(
id: Id,
role: Role,
peers: impl Into<Vec<String>>,
channel_manager: ChannelManager,
) -> Self {
let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
leader: None,
peers: peers.into(),
}));
Self {
id,
role,
leadership_group,
channel_manager,
}
}
pub fn get_leader(&self) -> Option<String> {
self.leadership_group.read().unwrap().leader.clone()
}
pub async fn ask_leader(&self) -> Result<String> {
let mut peers = {
let leadership_group = self.leadership_group.read().unwrap();
leadership_group.peers.clone()
};
peers.shuffle(&mut rand::thread_rng());
let header = RequestHeader::new(self.id, self.role);
let mut leader = None;
for addr in &peers {
let req = AskLeaderRequest {
header: Some(header.clone()),
};
let mut client = self.create_asker(addr)?;
match client.ask_leader(req).await {
Ok(res) => {
let Some(endpoint) = res.into_inner().leader else {
warn!("No leader from: {addr}");
continue;
};
leader = Some(endpoint.addr);
break;
}
Err(status) => {
warn!("Failed to ask leader from: {addr}, {status}");
}
}
}
let leader = leader.context(error::NoLeaderSnafu)?;
let mut leadership_group = self.leadership_group.write().unwrap();
leadership_group.leader = Some(leader.clone());
Ok(leader)
}
fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
Ok(HeartbeatClient::new(
self.channel_manager
.get(addr)
.context(error::CreateChannelSnafu)?,
))
}
}

View File

@@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use crate::client::heartbeat::Inner as HeartbeatInner;
use crate::client::ask_leader::AskLeader;
use crate::client::Id;
use crate::error;
use crate::error::Result;
@@ -36,8 +36,8 @@ impl Client {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager: channel_manager.clone(),
heartbeat_inner: HeartbeatInner::new(id, role, channel_manager),
channel_manager,
ask_leader: None,
}));
Self { inner }
@@ -61,7 +61,7 @@ impl Client {
&self,
req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let mut inner = self.inner.write().await;
let inner = self.inner.read().await;
inner.submit_ddl_task(req).await
}
}
@@ -72,7 +72,7 @@ struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
heartbeat_inner: HeartbeatInner,
ask_leader: Option<AskLeader>,
}
impl Inner {
@@ -84,11 +84,22 @@ impl Inner {
ensure!(
!self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Router client already started",
err_msg: "DDL client already started",
}
);
self.heartbeat_inner.start(urls).await?;
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect::<Vec<_>>();
self.ask_leader = Some(AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
));
Ok(())
}
@@ -103,17 +114,24 @@ impl Inner {
#[inline]
fn is_started(&self) -> bool {
self.heartbeat_inner.is_started()
self.ask_leader.is_some()
}
pub async fn submit_ddl_task(
&mut self,
&self,
mut req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
req.set_header(self.id, self.role);
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "DDL client not start"
}
);
req.set_header(self.id, self.role);
let ask_leader = self.ask_leader.as_ref().unwrap();
loop {
if let Some(leader) = &self.heartbeat_inner.get_leader() {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;
let res = client
.submit_ddl_task(req.clone())
@@ -125,14 +143,14 @@ impl Inner {
if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
self.heartbeat_inner.ask_leader().await?;
let _ = ask_leader.ask_leader().await?;
continue;
}
}
}
return Ok(res);
} else if let Err(err) = self.heartbeat_inner.ask_leader().await {
} else if let Err(err) = ask_leader.ask_leader().await {
return Err(err);
}
}

View File

@@ -11,21 +11,20 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::meta::heartbeat_client::HeartbeatClient;
use api::v1::meta::{AskLeaderRequest, HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse, RequestHeader, Role};
use common_grpc::channel_manager::ChannelManager;
use common_meta::rpc::util;
use common_telemetry::{debug, info};
use common_telemetry::info;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Channel;
use tonic::Streaming;
use crate::client::ask_leader::AskLeader;
use crate::client::Id;
use crate::error;
use crate::error::{InvalidResponseHeaderSnafu, Result};
@@ -95,14 +94,7 @@ pub struct Client {
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
peers: HashSet::default(),
leader: None,
}));
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
Self { inner }
}
@@ -116,12 +108,12 @@ impl Client {
}
pub async fn ask_leader(&mut self) -> Result<()> {
let mut inner = self.inner.write().await;
let inner = self.inner.read().await;
inner.ask_leader().await
}
pub async fn heartbeat(&mut self) -> Result<(HeartbeatSender, HeartbeatStream)> {
let mut inner = self.inner.write().await;
let inner = self.inner.read().await;
inner.ask_leader().await?;
inner.heartbeat().await
}
@@ -133,25 +125,24 @@ impl Client {
}
#[derive(Debug)]
pub(crate) struct Inner {
struct Inner {
id: Id,
role: Role,
channel_manager: ChannelManager,
peers: HashSet<String>,
leader: Option<String>,
ask_leader: Option<AskLeader>,
}
impl Inner {
pub(crate) fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
Self {
id,
role,
channel_manager,
peers: HashSet::new(),
leader: None,
ask_leader: None,
}
}
pub(crate) async fn start<U, A>(&mut self, urls: A) -> Result<()>
async fn start<U, A>(&mut self, urls: A) -> Result<()>
where
U: AsRef<str>,
A: AsRef<[U]>,
@@ -163,20 +154,22 @@ impl Inner {
}
);
self.peers = urls
let peers = urls
.as_ref()
.iter()
.map(|url| url.as_ref().to_string())
.collect();
.collect::<Vec<_>>();
self.ask_leader = Some(AskLeader::new(
self.id,
self.role,
peers,
self.channel_manager.clone(),
));
Ok(())
}
pub(crate) fn get_leader(&self) -> Option<String> {
self.leader.clone()
}
pub(crate) async fn ask_leader(&mut self) -> Result<()> {
async fn ask_leader(&self) -> Result<()> {
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
@@ -184,31 +177,24 @@ impl Inner {
}
);
let header = RequestHeader::new(self.id, self.role);
let mut leader = None;
for addr in &self.peers {
let req = AskLeaderRequest {
header: Some(header.clone()),
};
let mut client = self.make_client(addr)?;
match client.ask_leader(req).await {
Ok(res) => {
if let Some(endpoint) = res.into_inner().leader {
leader = Some(endpoint.addr);
break;
}
}
Err(status) => {
debug!("Failed to ask leader from: {}, {}", addr, status);
}
}
}
self.leader = Some(leader.context(error::AskLeaderSnafu)?);
let _ = self.ask_leader.as_ref().unwrap().ask_leader().await;
Ok(())
}
async fn heartbeat(&self) -> Result<(HeartbeatSender, HeartbeatStream)> {
let leader = self.leader.as_ref().context(error::NoLeaderSnafu)?;
ensure!(
self.is_started(),
error::IllegalGrpcClientStateSnafu {
err_msg: "Heartbeat client not start"
}
);
let leader = self
.ask_leader
.as_ref()
.unwrap()
.get_leader()
.context(error::NoLeaderSnafu)?;
let mut leader = self.make_client(leader)?;
let (sender, receiver) = mpsc::channel::<HeartbeatRequest>(128);
@@ -256,7 +242,7 @@ impl Inner {
#[inline]
pub(crate) fn is_started(&self) -> bool {
!self.peers.is_empty()
self.ask_leader.is_some()
}
}
@@ -291,16 +277,6 @@ mod test {
));
}
#[tokio::test]
async fn test_start_with_duplicate_peers() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
client
.start(&["127.0.0.1:1000", "127.0.0.1:1000", "127.0.0.1:1000"])
.await
.unwrap();
assert_eq!(1, client.inner.write().await.peers.len());
}
#[tokio::test]
async fn test_heartbeat_stream() {
let (sender, mut receiver) = mpsc::channel::<HeartbeatRequest>(100);