From 1bc1eae5e8bb2abf1124cfa80ebf412024c897ec Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 3 Jul 2025 10:51:35 +0100 Subject: [PATCH] fix redis credentials check (#12455) ## Problem `keep_connection` does not exit, so it was never setting `credentials_refreshed`. ## Summary of changes Set `credentials_refreshed` to true when we first establish a connection, and after we re-authenticate the connection. --- .../connection_with_credentials_provider.rs | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/proxy/src/redis/connection_with_credentials_provider.rs b/proxy/src/redis/connection_with_credentials_provider.rs index 510701cb27..0465493799 100644 --- a/proxy/src/redis/connection_with_credentials_provider.rs +++ b/proxy/src/redis/connection_with_credentials_provider.rs @@ -1,3 +1,4 @@ +use std::convert::Infallible; use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering}; use std::time::Duration; @@ -5,7 +6,7 @@ use futures::FutureExt; use redis::aio::{ConnectionLike, MultiplexedConnection}; use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult}; use tokio::task::JoinHandle; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; use super::elasticache::CredentialsProvider; @@ -31,7 +32,7 @@ pub struct ConnectionWithCredentialsProvider { credentials: Credentials, // TODO: with more load on the connection, we should consider using a connection pool con: Option, - refresh_token_task: Option>, + refresh_token_task: Option>, mutex: tokio::sync::Mutex<()>, credentials_refreshed: Arc, } @@ -121,15 +122,11 @@ impl ConnectionWithCredentialsProvider { let credentials_provider = credentials_provider.clone(); let con2 = con.clone(); let credentials_refreshed = self.credentials_refreshed.clone(); - let f = tokio::spawn(async move { - let result = Self::keep_connection(con2, credentials_provider).await; - if let Err(e) = result { - credentials_refreshed.store(false, Ordering::Release); - debug!("keep_connection failed: {e}"); - } else { - credentials_refreshed.store(true, Ordering::Release); - } - }); + let f = tokio::spawn(Self::keep_connection( + con2, + credentials_provider, + credentials_refreshed, + )); self.refresh_token_task = Some(f); } match Self::ping(&mut con).await { @@ -165,6 +162,7 @@ impl ConnectionWithCredentialsProvider { async fn get_client(&self) -> anyhow::Result { let client = redis::Client::open(self.get_connection_info().await?)?; + self.credentials_refreshed.store(true, Ordering::Relaxed); Ok(client) } @@ -180,16 +178,19 @@ impl ConnectionWithCredentialsProvider { async fn keep_connection( mut con: MultiplexedConnection, credentials_provider: Arc, - ) -> anyhow::Result<()> { + credentials_refreshed: Arc, + ) -> Infallible { loop { // The connection lives for 12h, for the sanity check we refresh it every hour. tokio::time::sleep(Duration::from_secs(60 * 60)).await; match Self::refresh_token(&mut con, credentials_provider.clone()).await { Ok(()) => { info!("Token refreshed"); + credentials_refreshed.store(true, Ordering::Relaxed); } Err(e) => { error!("Error during token refresh: {e:?}"); + credentials_refreshed.store(false, Ordering::Relaxed); } } }