mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-30 19:40:39 +00:00
Add batching to the redis queue, which allows us to clear it out quicker should it slow down temporarily.
69 lines
2.0 KiB
Rust
69 lines
2.0 KiB
Rust
use redis::aio::ConnectionLike;
|
|
use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
|
|
|
|
use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
|
|
use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
|
|
|
|
pub struct RedisKVClient {
|
|
client: ConnectionWithCredentialsProvider,
|
|
limiter: GlobalRateLimiter,
|
|
}
|
|
|
|
#[allow(async_fn_in_trait)]
|
|
pub trait Queryable {
|
|
async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
|
|
}
|
|
|
|
impl Queryable for Pipeline {
|
|
async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
|
|
self.query_async(conn).await
|
|
}
|
|
}
|
|
|
|
impl Queryable for Cmd {
|
|
async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
|
|
self.query_async(conn).await
|
|
}
|
|
}
|
|
|
|
impl RedisKVClient {
|
|
pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
|
|
Self {
|
|
client,
|
|
limiter: GlobalRateLimiter::new(info.into()),
|
|
}
|
|
}
|
|
|
|
pub async fn try_connect(&mut self) -> anyhow::Result<()> {
|
|
match self.client.connect().await {
|
|
Ok(()) => {}
|
|
Err(e) => {
|
|
tracing::error!("failed to connect to redis: {e}");
|
|
return Err(e);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn query<T: FromRedisValue>(
|
|
&mut self,
|
|
q: impl Queryable,
|
|
) -> anyhow::Result<T> {
|
|
if !self.limiter.check() {
|
|
tracing::info!("Rate limit exceeded. Skipping query");
|
|
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
|
}
|
|
|
|
match q.query(&mut self.client).await {
|
|
Ok(t) => return Ok(t),
|
|
Err(e) => {
|
|
tracing::error!("failed to run query: {e}");
|
|
}
|
|
}
|
|
|
|
tracing::info!("Redis client is disconnected. Reconnecting...");
|
|
self.try_connect().await?;
|
|
Ok(q.query(&mut self.client).await?)
|
|
}
|
|
}
|