diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index ffc0cf43f1..74413f1a7d 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -64,6 +64,13 @@ impl Pipeline { let responses = self.replies; let batch_size = self.inner.len(); + if !client.credentials_refreshed() { + tracing::debug!( + "Redis credentials are not refreshed. Sleeping for 5 seconds before retrying..." + ); + tokio::time::sleep(Duration::from_secs(5)).await; + } + match client.query(&self.inner).await { // for each reply, we expect that many values. Ok(Value::Array(values)) if values.len() == responses => { @@ -127,6 +134,14 @@ impl QueueProcessing for CancellationProcessor { } async fn apply(&mut self, batch: Vec) -> Vec { + if !self.client.credentials_refreshed() { + // this will cause a timeout for cancellation operations + tracing::debug!( + "Redis credentials are not refreshed. Sleeping for 5 seconds before retrying..." + ); + tokio::time::sleep(Duration::from_secs(5)).await; + } + let mut pipeline = Pipeline::with_capacity(batch.len()); let batch_size = batch.len(); diff --git a/proxy/src/redis/connection_with_credentials_provider.rs b/proxy/src/redis/connection_with_credentials_provider.rs index fe656557ac..510701cb27 100644 --- a/proxy/src/redis/connection_with_credentials_provider.rs +++ b/proxy/src/redis/connection_with_credentials_provider.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering}; use std::time::Duration; use futures::FutureExt; @@ -33,6 +33,7 @@ pub struct ConnectionWithCredentialsProvider { con: Option, refresh_token_task: Option>, mutex: tokio::sync::Mutex<()>, + credentials_refreshed: Arc, } impl Clone for ConnectionWithCredentialsProvider { @@ -42,6 +43,7 @@ impl Clone for ConnectionWithCredentialsProvider { con: None, refresh_token_task: None, mutex: tokio::sync::Mutex::new(()), + credentials_refreshed: Arc::new(AtomicBool::new(false)), } } } @@ -65,6 +67,7 @@ impl ConnectionWithCredentialsProvider { con: None, refresh_token_task: None, mutex: tokio::sync::Mutex::new(()), + credentials_refreshed: Arc::new(AtomicBool::new(false)), } } @@ -78,6 +81,7 @@ impl ConnectionWithCredentialsProvider { con: None, refresh_token_task: None, mutex: tokio::sync::Mutex::new(()), + credentials_refreshed: Arc::new(AtomicBool::new(true)), } } @@ -85,6 +89,10 @@ impl ConnectionWithCredentialsProvider { redis::cmd("PING").query_async(con).await } + pub(crate) fn credentials_refreshed(&self) -> bool { + self.credentials_refreshed.load(Ordering::Relaxed) + } + pub(crate) async fn connect(&mut self) -> anyhow::Result<()> { let _guard = self.mutex.lock().await; if let Some(con) = self.con.as_mut() { @@ -112,11 +120,15 @@ impl ConnectionWithCredentialsProvider { if let Credentials::Dynamic(credentials_provider, _) = &self.credentials { let credentials_provider = credentials_provider.clone(); let con2 = con.clone(); + let credentials_refreshed = self.credentials_refreshed.clone(); let f = tokio::spawn(async move { - Self::keep_connection(con2, credentials_provider) - .await - .inspect_err(|e| debug!("keep_connection failed: {e}")) - .ok(); + let result = Self::keep_connection(con2, credentials_provider).await; + if let Err(e) = result { + credentials_refreshed.store(false, Ordering::Release); + debug!("keep_connection failed: {e}"); + } else { + credentials_refreshed.store(true, Ordering::Release); + } }); self.refresh_token_task = Some(f); } diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index 671fe09b0b..cfdbc21839 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -40,6 +40,10 @@ impl RedisKVClient { .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) } + pub(crate) fn credentials_refreshed(&self) -> bool { + self.client.credentials_refreshed() + } + pub(crate) async fn query( &mut self, q: &impl Queryable, @@ -49,7 +53,7 @@ impl RedisKVClient { Err(e) => e, }; - tracing::error!("failed to run query: {e}"); + tracing::debug!("failed to run query: {e}"); match e.retry_method() { redis::RetryMethod::Reconnect => { tracing::info!("Redis client is disconnected. Reconnecting...");