mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 12:02:55 +00:00
proxy: queue UNLINK command when conn terminates
This commit is contained in:
@@ -55,6 +55,9 @@ pub enum CancelKeyOp {
|
||||
GetOld {
|
||||
key: CancelKeyData,
|
||||
},
|
||||
Delete {
|
||||
key: CancelKeyData,
|
||||
},
|
||||
}
|
||||
|
||||
impl CancelKeyOp {
|
||||
@@ -64,6 +67,7 @@ impl CancelKeyOp {
|
||||
CancelKeyOp::Refresh { .. } => RedisMsgKind::Expire,
|
||||
CancelKeyOp::Get { .. } => RedisMsgKind::Get,
|
||||
CancelKeyOp::GetOld { .. } => RedisMsgKind::HGet,
|
||||
CancelKeyOp::Delete { .. } => RedisMsgKind::Unlink,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,6 +132,10 @@ impl Pipeline {
|
||||
self.inner.add_command(cmd);
|
||||
self.replies += 1;
|
||||
}
|
||||
|
||||
fn add_command_ignore_reply(&mut self, cmd: Cmd) {
|
||||
self.inner.add_command(cmd).ignore();
|
||||
}
|
||||
}
|
||||
|
||||
impl CancelKeyOp {
|
||||
@@ -153,6 +161,10 @@ impl CancelKeyOp {
|
||||
let key = KeyPrefix::Cancel(*key).build_redis_key();
|
||||
pipe.add_command(Cmd::get(key));
|
||||
}
|
||||
CancelKeyOp::Delete { key } => {
|
||||
let key = KeyPrefix::Cancel(*key).build_redis_key();
|
||||
pipe.add_command_ignore_reply(Cmd::unlink(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -497,9 +509,11 @@ impl Session {
|
||||
|
||||
let mut cancel = pin!(cancel);
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq)]
|
||||
enum State {
|
||||
Set,
|
||||
Refresh,
|
||||
Delete,
|
||||
}
|
||||
let mut state = State::Set;
|
||||
|
||||
@@ -529,9 +543,23 @@ impl Session {
|
||||
expire: CANCEL_KEY_TTL,
|
||||
}
|
||||
}
|
||||
|
||||
State::Delete => {
|
||||
tracing::debug!(
|
||||
src=%self.key,
|
||||
dest=?cancel_closure.cancel_token,
|
||||
"deleting cancellation key"
|
||||
);
|
||||
CancelKeyOp::Delete { key: self.key }
|
||||
}
|
||||
};
|
||||
|
||||
let (id, rx) = tx.enqueue((op.metric_guard(), op));
|
||||
if state == State::Delete {
|
||||
// The key deletion is just best effort. We enqueue the command,
|
||||
// but don't drive the queue and wait for a response.
|
||||
break;
|
||||
}
|
||||
|
||||
match tx.call(id, rx, cancel.as_mut()).await {
|
||||
// SET returns OK
|
||||
@@ -568,7 +596,10 @@ impl Session {
|
||||
continue;
|
||||
}
|
||||
|
||||
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
|
||||
Err(BatchQueueError::Cancelled(Err(_cancelled))) => {
|
||||
state = State::Delete;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// wait before continuing. break immediately if cancelled.
|
||||
@@ -576,7 +607,7 @@ impl Session {
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
state = State::Delete;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -378,6 +378,7 @@ pub enum RedisMsgKind {
|
||||
Get,
|
||||
Expire,
|
||||
HGet,
|
||||
Unlink,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
|
||||
Reference in New Issue
Block a user