mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
fix redis credentials check (#12455)
## Problem `keep_connection` does not exit, so it was never setting `credentials_refreshed`. ## Summary of changes Set `credentials_refreshed` to true when we first establish a connection, and after we re-authenticate the connection.
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
|
use std::convert::Infallible;
|
||||||
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
|
use std::sync::{Arc, atomic::AtomicBool, atomic::Ordering};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@@ -5,7 +6,7 @@ use futures::FutureExt;
|
|||||||
use redis::aio::{ConnectionLike, MultiplexedConnection};
|
use redis::aio::{ConnectionLike, MultiplexedConnection};
|
||||||
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult};
|
use redis::{ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{error, info, warn};
|
||||||
|
|
||||||
use super::elasticache::CredentialsProvider;
|
use super::elasticache::CredentialsProvider;
|
||||||
|
|
||||||
@@ -31,7 +32,7 @@ pub struct ConnectionWithCredentialsProvider {
|
|||||||
credentials: Credentials,
|
credentials: Credentials,
|
||||||
// TODO: with more load on the connection, we should consider using a connection pool
|
// TODO: with more load on the connection, we should consider using a connection pool
|
||||||
con: Option<MultiplexedConnection>,
|
con: Option<MultiplexedConnection>,
|
||||||
refresh_token_task: Option<JoinHandle<()>>,
|
refresh_token_task: Option<JoinHandle<Infallible>>,
|
||||||
mutex: tokio::sync::Mutex<()>,
|
mutex: tokio::sync::Mutex<()>,
|
||||||
credentials_refreshed: Arc<AtomicBool>,
|
credentials_refreshed: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
@@ -121,15 +122,11 @@ impl ConnectionWithCredentialsProvider {
|
|||||||
let credentials_provider = credentials_provider.clone();
|
let credentials_provider = credentials_provider.clone();
|
||||||
let con2 = con.clone();
|
let con2 = con.clone();
|
||||||
let credentials_refreshed = self.credentials_refreshed.clone();
|
let credentials_refreshed = self.credentials_refreshed.clone();
|
||||||
let f = tokio::spawn(async move {
|
let f = tokio::spawn(Self::keep_connection(
|
||||||
let result = Self::keep_connection(con2, credentials_provider).await;
|
con2,
|
||||||
if let Err(e) = result {
|
credentials_provider,
|
||||||
credentials_refreshed.store(false, Ordering::Release);
|
credentials_refreshed,
|
||||||
debug!("keep_connection failed: {e}");
|
));
|
||||||
} else {
|
|
||||||
credentials_refreshed.store(true, Ordering::Release);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
self.refresh_token_task = Some(f);
|
self.refresh_token_task = Some(f);
|
||||||
}
|
}
|
||||||
match Self::ping(&mut con).await {
|
match Self::ping(&mut con).await {
|
||||||
@@ -165,6 +162,7 @@ impl ConnectionWithCredentialsProvider {
|
|||||||
|
|
||||||
async fn get_client(&self) -> anyhow::Result<redis::Client> {
|
async fn get_client(&self) -> anyhow::Result<redis::Client> {
|
||||||
let client = redis::Client::open(self.get_connection_info().await?)?;
|
let client = redis::Client::open(self.get_connection_info().await?)?;
|
||||||
|
self.credentials_refreshed.store(true, Ordering::Relaxed);
|
||||||
Ok(client)
|
Ok(client)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,16 +178,19 @@ impl ConnectionWithCredentialsProvider {
|
|||||||
async fn keep_connection(
|
async fn keep_connection(
|
||||||
mut con: MultiplexedConnection,
|
mut con: MultiplexedConnection,
|
||||||
credentials_provider: Arc<CredentialsProvider>,
|
credentials_provider: Arc<CredentialsProvider>,
|
||||||
) -> anyhow::Result<()> {
|
credentials_refreshed: Arc<AtomicBool>,
|
||||||
|
) -> Infallible {
|
||||||
loop {
|
loop {
|
||||||
// The connection lives for 12h, for the sanity check we refresh it every hour.
|
// The connection lives for 12h, for the sanity check we refresh it every hour.
|
||||||
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
|
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
|
||||||
match Self::refresh_token(&mut con, credentials_provider.clone()).await {
|
match Self::refresh_token(&mut con, credentials_provider.clone()).await {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
info!("Token refreshed");
|
info!("Token refreshed");
|
||||||
|
credentials_refreshed.store(true, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error during token refresh: {e:?}");
|
error!("Error during token refresh: {e:?}");
|
||||||
|
credentials_refreshed.store(false, Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user