From dd99ad6dc7a38bec911d7ae24b44db7143eb9bc7 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Tue, 15 Jul 2025 17:23:06 +0200 Subject: [PATCH] proxy: queue UNLINK command when conn terminates --- proxy/src/cancellation.rs | 35 +++++++++++++++++++++++++++++++++-- proxy/src/metrics.rs | 1 + 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 617cf21b2c..2322e5091f 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -55,6 +55,9 @@ pub enum CancelKeyOp { GetOld { key: CancelKeyData, }, + Delete { + key: CancelKeyData, + }, } impl CancelKeyOp { @@ -64,6 +67,7 @@ impl CancelKeyOp { CancelKeyOp::Refresh { .. } => RedisMsgKind::Expire, CancelKeyOp::Get { .. } => RedisMsgKind::Get, CancelKeyOp::GetOld { .. } => RedisMsgKind::HGet, + CancelKeyOp::Delete { .. } => RedisMsgKind::Unlink, } } @@ -128,6 +132,10 @@ impl Pipeline { self.inner.add_command(cmd); self.replies += 1; } + + fn add_command_ignore_reply(&mut self, cmd: Cmd) { + self.inner.add_command(cmd).ignore(); + } } impl CancelKeyOp { @@ -153,6 +161,10 @@ impl CancelKeyOp { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::get(key)); } + CancelKeyOp::Delete { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_ignore_reply(Cmd::unlink(key)); + } } } } @@ -497,9 +509,11 @@ impl Session { let mut cancel = pin!(cancel); + #[derive(Copy, Clone, PartialEq, Eq)] enum State { Set, Refresh, + Delete, } let mut state = State::Set; @@ -529,9 +543,23 @@ impl Session { expire: CANCEL_KEY_TTL, } } + + State::Delete => { + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "deleting cancellation key" + ); + CancelKeyOp::Delete { key: self.key } + } }; let (id, rx) = tx.enqueue((op.metric_guard(), op)); + if state == State::Delete { + // The key deletion is just best effort. We enqueue the command, + // but don't drive the queue and wait for a response. + break; + } match tx.call(id, rx, cancel.as_mut()).await { // SET returns OK @@ -568,7 +596,10 @@ impl Session { continue; } - Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, + Err(BatchQueueError::Cancelled(Err(_cancelled))) => { + state = State::Delete; + continue; + } } // wait before continuing. break immediately if cancelled. @@ -576,7 +607,7 @@ impl Session { .await .is_err() { - break; + state = State::Delete; } } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index bf4d5a11eb..e2eba13b57 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -378,6 +378,7 @@ pub enum RedisMsgKind { Get, Expire, HGet, + Unlink, } #[derive(Default, Clone)]