From 76a7d37f7e266a946a0de91dae89f7ded66ef09f Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Mon, 19 May 2025 13:10:55 +0300 Subject: [PATCH] proxy: Drop cancellation ops if they don't fit into the queue (#11950) Add a redis ops batch size argument for proxy and remove timeouts by using try_send() --- proxy/src/binary/proxy.rs | 12 ++++++++++-- proxy/src/cancellation.rs | 20 +++++++++----------- proxy/src/console_redirect_proxy.rs | 4 +--- proxy/src/proxy/mod.rs | 4 +--- proxy/src/proxy/passthrough.rs | 2 +- 5 files changed, 22 insertions(+), 20 deletions(-) diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 51713902bc..f40d5041c1 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -161,8 +161,11 @@ struct ProxyCliArgs { #[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)] redis_rps_limit: Vec, /// Cancellation channel size (max queue size for redis kv client) - #[clap(long, default_value = "1024")] + #[clap(long, default_value_t = 1024)] cancellation_ch_size: usize, + /// Cancellation ops batch size for redis + #[clap(long, default_value_t = 8)] + cancellation_batch_size: usize, /// cache for `allowed_ips` (use `size=0` to disable) #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] allowed_ips_cache: String, @@ -542,7 +545,12 @@ pub async fn run() -> anyhow::Result<()> { if let Some(mut redis_kv_client) = redis_kv_client { maintenance_tasks.spawn(async move { redis_kv_client.try_connect().await?; - handle_cancel_messages(&mut redis_kv_client, rx_cancel).await?; + handle_cancel_messages( + &mut redis_kv_client, + rx_cancel, + args.cancellation_batch_size, + ) + .await?; drop(redis_kv_client); diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index f34fb747ca..a6e7bf85a0 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -30,8 +30,6 @@ use crate::tls::postgres_rustls::MakeRustlsConnect; type IpSubnetKey = IpNet; const CANCEL_KEY_TTL: i64 = 1_209_600; // 2 weeks cancellation key expire time -const REDIS_SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(10); -const BATCH_SIZE: usize = 8; // Message types for sending through mpsc channel pub enum CancelKeyOp { @@ -231,12 +229,13 @@ impl CancelReplyOp { pub async fn handle_cancel_messages( client: &mut RedisKVClient, mut rx: mpsc::Receiver, + batch_size: usize, ) -> anyhow::Result<()> { - let mut batch = Vec::with_capacity(BATCH_SIZE); - let mut pipeline = Pipeline::with_capacity(BATCH_SIZE); + let mut batch = Vec::with_capacity(batch_size); + let mut pipeline = Pipeline::with_capacity(batch_size); loop { - if rx.recv_many(&mut batch, BATCH_SIZE).await == 0 { + if rx.recv_many(&mut batch, batch_size).await == 0 { warn!("shutting down cancellation queue"); break Ok(()); } @@ -367,8 +366,7 @@ impl CancellationHandler { return Err(CancelError::InternalError); }; - tx.send_timeout(op, REDIS_SEND_TIMEOUT) - .await + tx.try_send(op) .map_err(|e| { tracing::warn!("failed to send GetCancelData for {key}: {e}"); }) @@ -570,7 +568,7 @@ impl Session { } // Send the store key op to the cancellation handler and set TTL for the key - pub(crate) async fn write_cancel_key( + pub(crate) fn write_cancel_key( &self, cancel_closure: CancelClosure, ) -> Result<(), CancelError> { @@ -596,14 +594,14 @@ impl Session { expire: CANCEL_KEY_TTL, }; - let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| { + let _ = tx.try_send(op).map_err(|e| { let key = self.key; tracing::warn!("failed to send StoreCancelKey for {key}: {e}"); }); Ok(()) } - pub(crate) async fn remove_cancel_key(&self) -> Result<(), CancelError> { + pub(crate) fn remove_cancel_key(&self) -> Result<(), CancelError> { let Some(tx) = &self.cancellation_handler.tx else { tracing::warn!("cancellation handler is not available"); return Err(CancelError::InternalError); @@ -619,7 +617,7 @@ impl Session { .guard(RedisMsgKind::HDel), }; - let _ = tx.send_timeout(op, REDIS_SEND_TIMEOUT).await.map_err(|e| { + let _ = tx.try_send(op).map_err(|e| { let key = self.key; tracing::warn!("failed to send RemoveCancelKey for {key}: {e}"); }); diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 0f2c3def0d..e3184e20d1 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -244,9 +244,7 @@ pub(crate) async fn handle_client( let cancellation_handler_clone = Arc::clone(&cancellation_handler); let session = cancellation_handler_clone.get_key(); - session - .write_cancel_key(node.cancel_closure.clone()) - .await?; + session.write_cancel_key(node.cancel_closure.clone())?; prepare_client_connection(&node, *session.key(), &mut stream).await?; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index cf331b8bc0..0a86022e78 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -383,9 +383,7 @@ pub(crate) async fn handle_client( let cancellation_handler_clone = Arc::clone(&cancellation_handler); let session = cancellation_handler_clone.get_key(); - session - .write_cancel_key(node.cancel_closure.clone()) - .await?; + session.write_cancel_key(node.cancel_closure.clone())?; prepare_client_connection(&node, *session.key(), &mut stream).await?; diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index c100b8d716..8f9bd2de2d 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -94,7 +94,7 @@ impl ProxyPassthrough { tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database"); } - drop(self.cancel.remove_cancel_key().await); // we don't need a result. If the queue is full, we just log the error + drop(self.cancel.remove_cancel_key()); // we don't need a result. If the queue is full, we just log the error res }