From a5fe67f3616b55135fa3a58c2db89bf30a9eb955 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Sun, 13 Jul 2025 19:27:39 +0200 Subject: [PATCH] proxy: cancel maintain_cancel_key task immediately (#12586) ## Problem When a connection terminates its maintain_cancel_key task keeps running until the CANCEL_KEY_REFRESH sleep finishes and then it triggers another cancel key TTL refresh before exiting. ## Summary of changes * Check for cancellation while sleeping and interrupt sleep. * If cancelled, break the loop, don't send a refresh cmd. --- proxy/src/cancellation.rs | 10 ++++++++-- proxy/src/util.rs | 14 +++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 4ea4c4ea54..03be9dd4cf 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -28,6 +28,7 @@ use crate::pqproto::CancelKeyData; use crate::rate_limiter::LeakyBucketRateLimiter; use crate::redis::keys::KeyPrefix; use crate::redis::kv_ops::{RedisKVClient, RedisKVClientError}; +use crate::util::run_until; type IpSubnetKey = IpNet; @@ -498,8 +499,13 @@ impl Session { "registered cancellation key" ); - // wait before continuing. - tokio::time::sleep(CANCEL_KEY_REFRESH).await; + // wait before continuing. break immediately if cancelled. + if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + .await + .is_err() + { + break; + } } // retry immediately. Err(BatchQueueError::Result(error)) => { diff --git a/proxy/src/util.rs b/proxy/src/util.rs index 7fc2d9fbdb..0291216d94 100644 --- a/proxy/src/util.rs +++ b/proxy/src/util.rs @@ -7,8 +7,16 @@ pub async fn run_until_cancelled( f: F, cancellation_token: &CancellationToken, ) -> Option { - match select(pin!(f), pin!(cancellation_token.cancelled())).await { - Either::Left((f, _)) => Some(f), - Either::Right(((), _)) => None, + run_until(f, cancellation_token.cancelled()).await.ok() +} + +/// Runs the future `f` unless interrupted by future `condition`. +pub async fn run_until( + f: F1, + condition: F2, +) -> Result { + match select(pin!(f), pin!(condition)).await { + Either::Left((f1, _)) => Ok(f1), + Either::Right((f2, _)) => Err(f2), } }