From 43acabd4c2baf9bb8359fa3965cffedc7fd8d8b9 Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Thu, 12 Jun 2025 22:46:02 +0300 Subject: [PATCH] [proxy]: Improve backoff strategy for redis reconnection (#12218) Sometimes during a failed redis connection attempt at the init stage proxy pod can continuously restart. This, in turn, can aggravate the problem if redis is overloaded. Solves the #11114 --- proxy/src/binary/proxy.rs | 55 +++++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 757c1e988b..6ab6df5610 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -11,11 +11,13 @@ use anyhow::Context; use anyhow::{bail, ensure}; use arc_swap::ArcSwapOption; use futures::future::Either; +use itertools::{Itertools, Position}; +use rand::{Rng, thread_rng}; use remote_storage::RemoteStorageConfig; use tokio::net::TcpListener; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{Instrument, info, warn}; +use tracing::{Instrument, error, info, warn}; use utils::sentry_init::init_sentry; use utils::{project_build_tag, project_git_version}; @@ -314,7 +316,7 @@ pub async fn run() -> anyhow::Result<()> { let jemalloc = match crate::jemalloc::MetricRecorder::new() { Ok(t) => Some(t), Err(e) => { - tracing::error!(error = ?e, "could not start jemalloc metrics loop"); + error!(error = ?e, "could not start jemalloc metrics loop"); None } }; @@ -520,23 +522,44 @@ pub async fn run() -> anyhow::Result<()> { } } + // Try to connect to Redis 3 times with 1 + (0..0.1) second interval. + // This prevents immediate exit and pod restart, + // which can cause hammering of the redis in case of connection issues. if let Some(mut redis_kv_client) = redis_kv_client { - maintenance_tasks.spawn(async move { - redis_kv_client.try_connect().await?; - handle_cancel_messages( - &mut redis_kv_client, - rx_cancel, - args.cancellation_batch_size, - ) - .await?; + for attempt in (0..3).with_position() { + match redis_kv_client.try_connect().await { + Ok(()) => { + info!("Connected to Redis KV client"); + maintenance_tasks.spawn(async move { + handle_cancel_messages( + &mut redis_kv_client, + rx_cancel, + args.cancellation_batch_size, + ) + .await?; - drop(redis_kv_client); + drop(redis_kv_client); - // `handle_cancel_messages` was terminated due to the tx_cancel - // being dropped. this is not worthy of an error, and this task can only return `Err`, - // so let's wait forever instead. - std::future::pending().await - }); + // `handle_cancel_messages` was terminated due to the tx_cancel + // being dropped. this is not worthy of an error, and this task can only return `Err`, + // so let's wait forever instead. + std::future::pending().await + }); + break; + } + Err(e) => { + error!("Failed to connect to Redis KV client: {e}"); + if matches!(attempt, Position::Last(_)) { + bail!( + "Failed to connect to Redis KV client after {} attempts", + attempt.into_inner() + ); + } + let jitter = thread_rng().gen_range(0..100); + tokio::time::sleep(Duration::from_millis(1000 + jitter)).await; + } + } + } } if let Some(regional_redis_client) = regional_redis_client {