mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
[proxy]: Fix redis IRSA expiration failure errors (#12430)
Relates to the [#30688](https://github.com/neondatabase/cloud/issues/30688)
This commit is contained in:
committed by
Conrad Ludgate
parent
c82d646598
commit
fcf8127900
@@ -64,6 +64,13 @@ impl Pipeline {
|
||||
let responses = self.replies;
|
||||
let batch_size = self.inner.len();
|
||||
|
||||
if !client.credentials_refreshed() {
|
||||
tracing::debug!(
|
||||
"Redis credentials are not refreshed. Sleeping for 5 seconds before retrying..."
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
match client.query(&self.inner).await {
|
||||
// for each reply, we expect that many values.
|
||||
Ok(Value::Array(values)) if values.len() == responses => {
|
||||
@@ -127,6 +134,14 @@ impl QueueProcessing for CancellationProcessor {
|
||||
}
|
||||
|
||||
async fn apply(&mut self, batch: Vec<Self::Req>) -> Vec<Self::Res> {
|
||||
if !self.client.credentials_refreshed() {
|
||||
// this will cause a timeout for cancellation operations
|
||||
tracing::debug!(
|
||||
"Redis credentials are not refreshed. Sleeping for 5 seconds before retrying..."
|
||||
);
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
let mut pipeline = Pipeline::with_capacity(batch.len());
|
||||
|
||||
let batch_size = batch.len();
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
@@ -33,6 +33,7 @@ pub struct ConnectionWithCredentialsProvider {
|
||||
con: Option<MultiplexedConnection>,
|
||||
refresh_token_task: Option<JoinHandle<()>>,
|
||||
mutex: tokio::sync::Mutex<()>,
|
||||
credentials_refreshed: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl Clone for ConnectionWithCredentialsProvider {
|
||||
@@ -42,6 +43,7 @@ impl Clone for ConnectionWithCredentialsProvider {
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
credentials_refreshed: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -65,6 +67,7 @@ impl ConnectionWithCredentialsProvider {
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
credentials_refreshed: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +81,7 @@ impl ConnectionWithCredentialsProvider {
|
||||
con: None,
|
||||
refresh_token_task: None,
|
||||
mutex: tokio::sync::Mutex::new(()),
|
||||
credentials_refreshed: Arc::new(AtomicBool::new(true)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,6 +89,10 @@ impl ConnectionWithCredentialsProvider {
|
||||
redis::cmd("PING").query_async(con).await
|
||||
}
|
||||
|
||||
pub(crate) fn credentials_refreshed(&self) -> bool {
|
||||
self.credentials_refreshed.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
pub(crate) async fn connect(&mut self) -> anyhow::Result<()> {
|
||||
let _guard = self.mutex.lock().await;
|
||||
if let Some(con) = self.con.as_mut() {
|
||||
@@ -112,11 +120,15 @@ impl ConnectionWithCredentialsProvider {
|
||||
if let Credentials::Dynamic(credentials_provider, _) = &self.credentials {
|
||||
let credentials_provider = credentials_provider.clone();
|
||||
let con2 = con.clone();
|
||||
let credentials_refreshed = self.credentials_refreshed.clone();
|
||||
let f = tokio::spawn(async move {
|
||||
Self::keep_connection(con2, credentials_provider)
|
||||
.await
|
||||
.inspect_err(|e| debug!("keep_connection failed: {e}"))
|
||||
.ok();
|
||||
let result = Self::keep_connection(con2, credentials_provider).await;
|
||||
if let Err(e) = result {
|
||||
credentials_refreshed.store(false, Ordering::Release);
|
||||
debug!("keep_connection failed: {e}");
|
||||
} else {
|
||||
credentials_refreshed.store(true, Ordering::Release);
|
||||
}
|
||||
});
|
||||
self.refresh_token_task = Some(f);
|
||||
}
|
||||
|
||||
@@ -40,6 +40,10 @@ impl RedisKVClient {
|
||||
.inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
|
||||
}
|
||||
|
||||
pub(crate) fn credentials_refreshed(&self) -> bool {
|
||||
self.client.credentials_refreshed()
|
||||
}
|
||||
|
||||
pub(crate) async fn query<T: FromRedisValue>(
|
||||
&mut self,
|
||||
q: &impl Queryable,
|
||||
@@ -49,7 +53,7 @@ impl RedisKVClient {
|
||||
Err(e) => e,
|
||||
};
|
||||
|
||||
tracing::error!("failed to run query: {e}");
|
||||
tracing::debug!("failed to run query: {e}");
|
||||
match e.retry_method() {
|
||||
redis::RetryMethod::Reconnect => {
|
||||
tracing::info!("Redis client is disconnected. Reconnecting...");
|
||||
|
||||
Reference in New Issue
Block a user