fix: reset cached channel on errors with VIP (#7335)

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2025-12-03 16:56:15 +08:00
committed by GitHub
parent 68fff3b1aa
commit 63dd37dca3

View File

@@ -24,7 +24,7 @@ use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use rand::seq::SliceRandom;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use tokio::time::timeout;
use tonic::transport::Channel;
@@ -101,12 +101,14 @@ impl AskLeader {
};
let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len());
let channel_manager = self.channel_manager.clone();
for addr in &peers {
let mut client = self.create_asker(addr)?;
let tx_clone = tx.clone();
let req = req.clone();
let addr = addr.clone();
let channel_manager = channel_manager.clone();
tokio::spawn(async move {
match client.ask_leader(req).await {
Ok(res) => {
@@ -117,13 +119,19 @@ impl AskLeader {
};
}
Err(status) => {
// Reset cached channel even on generic errors: the VIP may keep us on a dead
// backend, so forcing a reconnect gives us a chance to hit a healthy peer.
Self::reset_channels_with_manager(
&channel_manager,
std::slice::from_ref(&addr),
);
warn!("Failed to ask leader from: {addr}, {status}");
}
}
});
}
let leader = timeout(
let leader = match timeout(
self.channel_manager
.config()
.timeout
@@ -131,8 +139,16 @@ impl AskLeader {
rx.recv(),
)
.await
.context(error::AskLeaderTimeoutSnafu)?
.context(error::NoLeaderSnafu)?;
{
Ok(Some(leader)) => leader,
Ok(None) => return error::NoLeaderSnafu.fail(),
Err(e) => {
// All peers timed out. Reset channels to force reconnection,
// which may help escape dead backends in VIP/LB scenarios.
Self::reset_channels_with_manager(&self.channel_manager, &peers);
return Err(e).context(error::AskLeaderTimeoutSnafu);
}
};
let mut leadership_group = self.leadership_group.write().unwrap();
leadership_group.leader = Some(leader.clone());
@@ -169,6 +185,15 @@ impl AskLeader {
.context(error::CreateChannelSnafu)?,
))
}
/// Drop cached channels for the given peers so a fresh connection is used next time.
fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) {
if peers.is_empty() {
return;
}
channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr));
}
}
#[async_trait]