fix: fix ddl client can not update leader addr (#2205)

* fix: fix ddl client can not update leader addr

* chore: apply suggestions from CR

* feat: add message to context

* fix: only retry if unavailable or deadline exceeded

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2023-08-21 21:57:29 +08:00
committed by GitHub
parent 9352649f22
commit 5b7b2cf77d
5 changed files with 110 additions and 25 deletions

View File

@@ -45,6 +45,9 @@ use crate::error::{ConvertMetaResponseSnafu, Result};
pub type Id = (u64, u64);
const DEFAULT_ASK_LEADER_MAX_RETRY: usize = 3;
const DEFAULT_SUBMIT_DDL_MAX_RETRY: usize = 3;
#[derive(Clone, Debug, Default)]
pub struct MetaClientBuilder {
id: Id,
@@ -130,7 +133,12 @@ impl MetaClientBuilder {
let mgr = client.channel_manager.clone();
if self.enable_heartbeat {
client.heartbeat = Some(HeartbeatClient::new(self.id, self.role, mgr.clone()));
client.heartbeat = Some(HeartbeatClient::new(
self.id,
self.role,
mgr.clone(),
DEFAULT_ASK_LEADER_MAX_RETRY,
));
}
if self.enable_router {
client.router = Some(RouterClient::new(self.id, self.role, mgr.clone()));
@@ -143,7 +151,12 @@ impl MetaClientBuilder {
}
if self.enable_ddl {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(DdlClient::new(self.id, self.role, mgr));
client.ddl = Some(DdlClient::new(
self.id,
self.role,
mgr,
DEFAULT_SUBMIT_DDL_MAX_RETRY,
));
}
client

View File

@@ -38,6 +38,7 @@ pub struct AskLeader {
role: Role,
leadership_group: Arc<RwLock<LeadershipGroup>>,
channel_manager: ChannelManager,
max_retry: usize,
}
impl AskLeader {
@@ -46,6 +47,7 @@ impl AskLeader {
role: Role,
peers: impl Into<Vec<String>>,
channel_manager: ChannelManager,
max_retry: usize,
) -> Self {
let leadership_group = Arc::new(RwLock::new(LeadershipGroup {
leader: None,
@@ -56,6 +58,7 @@ impl AskLeader {
role,
leadership_group,
channel_manager,
max_retry,
}
}
@@ -63,7 +66,7 @@ impl AskLeader {
self.leadership_group.read().unwrap().leader.clone()
}
pub async fn ask_leader(&self) -> Result<String> {
async fn ask_leader_inner(&self) -> Result<String> {
let mut peers = {
let leadership_group = self.leadership_group.read().unwrap();
leadership_group.peers.clone()
@@ -99,6 +102,28 @@ impl AskLeader {
Ok(leader)
}
pub async fn ask_leader(&self) -> Result<String> {
let mut times = 0;
while times < self.max_retry {
match self.ask_leader_inner().await {
Ok(res) => {
return Ok(res);
}
Err(err) => {
warn!("Failed to ask leader, source: {err}, retry {times} times");
times += 1;
continue;
}
}
}
error::RetryTimesExceededSnafu {
msg: "Failed to ask leader",
times: self.max_retry,
}
.fail()
}
fn create_asker(&self, addr: impl AsRef<str>) -> Result<HeartbeatClient<Channel>> {
Ok(HeartbeatClient::new(
self.channel_manager

View File

@@ -15,11 +15,13 @@
use std::sync::Arc;
use api::v1::meta::ddl_task_client::DdlTaskClient;
use api::v1::meta::{ErrorCode, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use api::v1::meta::{ErrorCode, ResponseHeader, Role, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::{info, warn};
use snafu::{ensure, ResultExt};
use tokio::sync::RwLock;
use tonic::transport::Channel;
use tonic::{Code, Status};
use crate::client::ask_leader::AskLeader;
use crate::client::Id;
@@ -32,12 +34,13 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}));
Self { inner }
@@ -73,6 +76,7 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}
impl Inner {
@@ -98,6 +102,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));
Ok(())
@@ -130,29 +135,59 @@ impl Inner {
req.set_header(self.id, self.role);
let ask_leader = self.ask_leader.as_ref().unwrap();
loop {
let mut times = 0;
while times < self.max_retry {
if let Some(leader) = &ask_leader.get_leader() {
let mut client = self.make_client(leader)?;
let res = client
.submit_ddl_task(req.clone())
.await
.map_err(error::Error::from)?;
let res = res.into_inner();
if let Some(header) = res.header.as_ref() {
if let Some(err) = header.error.as_ref() {
if err.code == ErrorCode::NotLeader as i32 {
let _ = ask_leader.ask_leader().await?;
match client.submit_ddl_task(req.clone()).await {
Ok(res) => {
let res = res.into_inner();
if is_not_leader(&res.header) {
warn!("Failed to submitting ddl to {leader}, not a leader");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
}
return Ok(res);
}
Err(status) => {
// The leader may be unreachable.
if is_unreachable(&status) {
warn!("Failed to submitting ddl to {leader}, source: {status}");
let leader = ask_leader.ask_leader().await?;
info!("DDL client updated to new leader addr: {leader}");
times += 1;
continue;
} else {
return Err(error::Error::from(status));
}
}
}
return Ok(res);
} else if let Err(err) = ask_leader.ask_leader().await {
return Err(err);
}
}
error::RetryTimesExceededSnafu {
msg: "Failed to submit DDL task",
times: self.max_retry,
}
.fail()
}
}
fn is_unreachable(status: &Status) -> bool {
status.code() == Code::Unavailable || status.code() == Code::DeadlineExceeded
}
fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
if let Some(header) = header {
if let Some(err) = header.error.as_ref() {
return err.code == ErrorCode::NotLeader as i32;
}
}
false
}

View File

@@ -93,8 +93,13 @@ pub struct Client {
}
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(id, role, channel_manager)));
pub fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
let inner = Arc::new(RwLock::new(Inner::new(
id,
role,
channel_manager,
max_retry,
)));
Self { inner }
}
@@ -130,15 +135,17 @@ struct Inner {
role: Role,
channel_manager: ChannelManager,
ask_leader: Option<AskLeader>,
max_retry: usize,
}
impl Inner {
fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
fn new(id: Id, role: Role, channel_manager: ChannelManager, max_retry: usize) -> Self {
Self {
id,
role,
channel_manager,
ask_leader: None,
max_retry,
}
}
@@ -164,6 +171,7 @@ impl Inner {
self.role,
peers,
self.channel_manager.clone(),
self.max_retry,
));
Ok(())
@@ -251,7 +259,7 @@ mod test {
#[tokio::test]
async fn test_start_client() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
assert!(!client.is_started().await);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
@@ -262,7 +270,7 @@ mod test {
#[tokio::test]
async fn test_already_start() {
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default());
let mut client = Client::new((0, 0), Role::Datanode, ChannelManager::default(), 3);
client
.start(&["127.0.0.1:1000", "127.0.0.1:1001"])
.await

View File

@@ -65,6 +65,9 @@ pub enum Error {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Retry exceeded max times({}), message: {}", times, msg))]
RetryTimesExceeded { times: usize, msg: String },
}
#[allow(dead_code)]
@@ -83,7 +86,8 @@ impl ErrorExt for Error {
| Error::NotStarted { .. }
| Error::SendHeartbeat { .. }
| Error::CreateHeartbeatStream { .. }
| Error::CreateChannel { .. } => StatusCode::Internal,
| Error::CreateChannel { .. }
| Error::RetryTimesExceeded { .. } => StatusCode::Internal,
Error::MetaServer { code, .. } => *code,