mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 00:42:54 +00:00
box the connect future and respect the redis retry methods on err
This commit is contained in:
@@ -1,3 +1,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use futures::FutureExt;
|
||||
use redis::aio::ConnectionLike;
|
||||
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
|
||||
|
||||
@@ -35,14 +38,11 @@ impl RedisKVClient {
|
||||
}
|
||||
|
||||
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
|
||||
match self.client.connect().await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
tracing::error!("failed to connect to redis: {e}");
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
self.client
|
||||
.connect()
|
||||
.boxed()
|
||||
.await
|
||||
.inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
|
||||
}
|
||||
|
||||
pub(crate) async fn query<T: FromRedisValue>(
|
||||
@@ -54,15 +54,25 @@ impl RedisKVClient {
|
||||
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
||||
}
|
||||
|
||||
match q.query(&mut self.client).await {
|
||||
let e = match q.query(&mut self.client).await {
|
||||
Ok(t) => return Ok(t),
|
||||
Err(e) => {
|
||||
tracing::error!("failed to run query: {e}");
|
||||
Err(e) => e,
|
||||
};
|
||||
|
||||
tracing::error!("failed to run query: {e}");
|
||||
match e.retry_method() {
|
||||
redis::RetryMethod::Reconnect => {
|
||||
tracing::info!("Redis client is disconnected. Reconnecting...");
|
||||
self.try_connect().await?;
|
||||
}
|
||||
redis::RetryMethod::RetryImmediately => {}
|
||||
redis::RetryMethod::WaitAndRetry => {
|
||||
// somewhat arbitrary.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
_ => Err(e)?,
|
||||
}
|
||||
|
||||
tracing::info!("Redis client is disconnected. Reconnecting...");
|
||||
self.try_connect().await?;
|
||||
Ok(q.query(&mut self.client).await?)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user