diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index e1affe8391..0d8d7fa654 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -505,13 +505,6 @@ async fn main() -> anyhow::Result<()> { } } - if let Some(mut redis_kv_client) = redis_kv_client { - maintenance_tasks.spawn(async move { - redis_kv_client.try_connect().await?; - handle_cancel_messages(&mut redis_kv_client, rx_cancel).await - }); - } - if let Some(regional_redis_client) = regional_redis_client { let cache = api.caches.endpoints_cache.clone(); let con = regional_redis_client; @@ -524,6 +517,15 @@ async fn main() -> anyhow::Result<()> { } } + if let Some(mut redis_kv_client) = redis_kv_client { + maintenance_tasks.spawn(async move { + if let Err(err) = redis_kv_client.try_connect().await { + tracing::error!(?err, "could not connect to redis") + } + handle_cancel_messages(&mut redis_kv_client, rx_cancel).await + }); + } + let maintenance = loop { // get one complete task match futures::future::select( diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs deleted file mode 100644 index 30d8b83e60..0000000000 --- a/proxy/src/redis/cancellation_publisher.rs +++ /dev/null @@ -1,114 +0,0 @@ -use core::net::IpAddr; -use std::sync::Arc; - -use pq_proto::CancelKeyData; -use tokio::sync::Mutex; -use uuid::Uuid; - -use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; -use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo}; - -pub trait CancellationPublisherMut: Send + Sync + 'static { - #[allow(async_fn_in_trait)] - async fn try_publish( - &mut self, - cancel_key_data: CancelKeyData, - session_id: Uuid, - peer_addr: IpAddr, - ) -> anyhow::Result<()>; -} - -pub trait CancellationPublisher: Send + Sync + 'static { - #[allow(async_fn_in_trait)] - async fn try_publish( - &self, - cancel_key_data: CancelKeyData, - session_id: Uuid, - peer_addr: IpAddr, - ) -> anyhow::Result<()>; -} - -impl CancellationPublisher for () { - async fn try_publish( - &self, - _cancel_key_data: CancelKeyData, - _session_id: Uuid, - _peer_addr: IpAddr, - ) -> anyhow::Result<()> { - Ok(()) - } -} - -impl CancellationPublisherMut for P { - async fn try_publish( - &mut self, - cancel_key_data: CancelKeyData, - session_id: Uuid, - peer_addr: IpAddr, - ) -> anyhow::Result<()> { -

::try_publish(self, cancel_key_data, session_id, peer_addr) - .await - } -} - -impl CancellationPublisher for Option

{ - async fn try_publish( - &self, - cancel_key_data: CancelKeyData, - session_id: Uuid, - peer_addr: IpAddr, - ) -> anyhow::Result<()> { - if let Some(p) = self { - p.try_publish(cancel_key_data, session_id, peer_addr).await - } else { - Ok(()) - } - } -} - -impl CancellationPublisher for Arc> { - async fn try_publish( - &self, - cancel_key_data: CancelKeyData, - session_id: Uuid, - peer_addr: IpAddr, - ) -> anyhow::Result<()> { - self.lock() - .await - .try_publish(cancel_key_data, session_id, peer_addr) - .await - } -} - -pub struct RedisPublisherClient { - #[allow(dead_code)] - client: ConnectionWithCredentialsProvider, - _region_id: String, - _limiter: GlobalRateLimiter, -} - -impl RedisPublisherClient { - pub fn new( - client: ConnectionWithCredentialsProvider, - region_id: String, - info: &'static [RateBucketInfo], - ) -> anyhow::Result { - Ok(Self { - client, - _region_id: region_id, - _limiter: GlobalRateLimiter::new(info.into()), - }) - } - - #[allow(dead_code)] - pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> { - match self.client.connect().await { - Ok(()) => {} - Err(e) => { - tracing::error!("failed to connect to redis: {e}"); - return Err(e); - } - } - Ok(()) - } -} diff --git a/proxy/src/redis/mod.rs b/proxy/src/redis/mod.rs index 8b46a8e6ca..4f5e24ab5f 100644 --- a/proxy/src/redis/mod.rs +++ b/proxy/src/redis/mod.rs @@ -1,4 +1,3 @@ -pub mod cancellation_publisher; pub mod connection_with_credentials_provider; pub mod elasticache; pub mod keys;