proxy: Drop cancellation ops if they don't fit into the queue (#11950)

Add a redis ops batch size argument for proxy and remove timeouts by
using try_send()
This commit is contained in:
Ivan Efremov
2025-05-19 13:10:55 +03:00
committed by GitHub
parent cdb6479c8a
commit 76a7d37f7e
5 changed files with 22 additions and 20 deletions

View File

@@ -161,8 +161,11 @@ struct ProxyCliArgs {
#[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)]
redis_rps_limit: Vec<RateBucketInfo>,
/// Cancellation channel size (max queue size for redis kv client)
#[clap(long, default_value = "1024")]
#[clap(long, default_value_t = 1024)]
cancellation_ch_size: usize,
/// Cancellation ops batch size for redis
#[clap(long, default_value_t = 8)]
cancellation_batch_size: usize,
/// cache for `allowed_ips` (use `size=0` to disable)
#[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)]
allowed_ips_cache: String,
@@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> {
if let Some(mut redis_kv_client) = redis_kv_client {
maintenance_tasks.spawn(async move {
redis_kv_client.try_connect().await?;
handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?;
handle_cancel_messages(
&mut redis_kv_client,
rx_cancel,
args.cancellation_batch_size,
)
.await?;
drop(redis_kv_client);

View File

@@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time
const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10);
const BATCH_SIZE: usize = 8;
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -231,12 +229,13 @@ impl CancelReplyOp {
pub async fn handle_cancel_messages(
client: &mut RedisKVClient,
mut rx: mpsc::Receiver<CancelKeyOp>,
batch_size: usize,
) -> anyhow::Result<()> {
let mut batch = Vec::with_capacity(BATCH_SIZE);
let mut pipeline = Pipeline::with_capacity(BATCH_SIZE);
let mut batch = Vec::with_capacity(batch_size);
let mut pipeline = Pipeline::with_capacity(batch_size);
loop {
if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 {
if rx.recv_many(&mut batch, batch_size).await == 0 {
warn!("shutting down cancellation queue");
break Ok(());
}
@@ -367,8 +366,7 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
tx.send_timeout(op, REDIS_SEND_TIMEOUT)
.await
tx.try_send(op)
.map_err(|e| {
tracing::warn!("failed to send GetCancelData for {key}: {e}");
})
@@ -570,7 +568,7 @@ impl Session {
}
// Send the store key op to the cancellation handler and set TTL for the key
pub(crate) async fn write_cancel_key(
pub(crate) fn write_cancel_key(
&self,
cancel_closure: CancelClosure,
) -> Result<(), CancelError> {
@@ -596,14 +594,14 @@ impl Session {
expire: CANCEL_KEY_TTL,
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send StoreCancelKey for {key}: {e}");
});
Ok(())
}
pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> {
pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> {
let Some(tx) = &self.cancellation_handler.tx else {
tracing::warn!("cancellation handler is not available");
return Err(CancelError::InternalError);
@@ -619,7 +617,7 @@ impl Session {
.guard(RedisMsgKind::HDel),
};
let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| {
let _ = tx.try_send(op).map_err(|e| {
let key = self.key;
tracing::warn!("failed to send RemoveCancelKey for {key}: {e}");
});

View File

@@ -244,9 +244,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -383,9 +383,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let cancellation_handler_clone = Arc::clone(&cancellation_handler);
let session = cancellation_handler_clone.get_key();
session
.write_cancel_key(node.cancel_closure.clone())
.await?;
session.write_cancel_key(node.cancel_closure.clone())?;
prepare_client_connection(&node, *session.key(), &mut stream).await?;

View File

@@ -94,7 +94,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<S> {
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
}
drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error
drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error
res
}