From 63dd37dca3e74f0eec7f563980a5adaa26530880 Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Wed, 3 Dec 2025 16:56:15 +0800 Subject: [PATCH] fix: reset cached channel on errors with VIP (#7335) Signed-off-by: jeremyhi --- src/meta-client/src/client/ask_leader.rs | 33 +++++++++++++++++++++--- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/src/meta-client/src/client/ask_leader.rs b/src/meta-client/src/client/ask_leader.rs index 95d7851b95..e34d0dfedf 100644 --- a/src/meta-client/src/client/ask_leader.rs +++ b/src/meta-client/src/client/ask_leader.rs @@ -24,7 +24,7 @@ use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_telemetry::tracing_context::TracingContext; use common_telemetry::warn; use rand::seq::SliceRandom; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use tokio::time::timeout; use tonic::transport::Channel; @@ -101,12 +101,14 @@ impl AskLeader { }; let (tx, mut rx) = tokio::sync::mpsc::channel(peers.len()); + let channel_manager = self.channel_manager.clone(); for addr in &peers { let mut client = self.create_asker(addr)?; let tx_clone = tx.clone(); let req = req.clone(); let addr = addr.clone(); + let channel_manager = channel_manager.clone(); tokio::spawn(async move { match client.ask_leader(req).await { Ok(res) => { @@ -117,13 +119,19 @@ impl AskLeader { }; } Err(status) => { + // Reset cached channel even on generic errors: the VIP may keep us on a dead + // backend, so forcing a reconnect gives us a chance to hit a healthy peer. + Self::reset_channels_with_manager( + &channel_manager, + std::slice::from_ref(&addr), + ); warn!("Failed to ask leader from: {addr}, {status}"); } } }); } - let leader = timeout( + let leader = match timeout( self.channel_manager .config() .timeout @@ -131,8 +139,16 @@ impl AskLeader { rx.recv(), ) .await - .context(error::AskLeaderTimeoutSnafu)? - .context(error::NoLeaderSnafu)?; + { + Ok(Some(leader)) => leader, + Ok(None) => return error::NoLeaderSnafu.fail(), + Err(e) => { + // All peers timed out. Reset channels to force reconnection, + // which may help escape dead backends in VIP/LB scenarios. + Self::reset_channels_with_manager(&self.channel_manager, &peers); + return Err(e).context(error::AskLeaderTimeoutSnafu); + } + }; let mut leadership_group = self.leadership_group.write().unwrap(); leadership_group.leader = Some(leader.clone()); @@ -169,6 +185,15 @@ impl AskLeader { .context(error::CreateChannelSnafu)?, )) } + + /// Drop cached channels for the given peers so a fresh connection is used next time. + fn reset_channels_with_manager(channel_manager: &ChannelManager, peers: &[String]) { + if peers.is_empty() { + return; + } + + channel_manager.retain_channel(|addr, _| !peers.iter().any(|peer| peer == addr)); + } } #[async_trait]