diff --git a/proxy/src/redis/kv_ops.rs b/proxy/src/redis/kv_ops.rs index f71730c533..f8d3b5cc66 100644 --- a/proxy/src/redis/kv_ops.rs +++ b/proxy/src/redis/kv_ops.rs @@ -1,3 +1,6 @@ +use std::time::Duration; + +use futures::FutureExt; use redis::aio::ConnectionLike; use redis::{Cmd, FromRedisValue, Pipeline, RedisResult}; @@ -35,14 +38,11 @@ impl RedisKVClient { } pub async fn try_connect(&mut self) -> anyhow::Result<()> { - match self.client.connect().await { - Ok(()) => {} - Err(e) => { - tracing::error!("failed to connect to redis: {e}"); - return Err(e); - } - } - Ok(()) + self.client + .connect() + .boxed() + .await + .inspect_err(|e| tracing::error!("failed to connect to redis: {e}")) } pub(crate) async fn query( @@ -54,15 +54,25 @@ impl RedisKVClient { return Err(anyhow::anyhow!("Rate limit exceeded")); } - match q.query(&mut self.client).await { + let e = match q.query(&mut self.client).await { Ok(t) => return Ok(t), - Err(e) => { - tracing::error!("failed to run query: {e}"); + Err(e) => e, + }; + + tracing::error!("failed to run query: {e}"); + match e.retry_method() { + redis::RetryMethod::Reconnect => { + tracing::info!("Redis client is disconnected. Reconnecting..."); + self.try_connect().await?; } + redis::RetryMethod::RetryImmediately => {} + redis::RetryMethod::WaitAndRetry => { + // somewhat arbitrary. + tokio::time::sleep(Duration::from_millis(100)).await; + } + _ => Err(e)?, } - tracing::info!("Redis client is disconnected. Reconnecting..."); - self.try_connect().await?; Ok(q.query(&mut self.client).await?) } }