mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-22 21:59:59 +00:00
proxy: Use EXPIRE command to refresh cancel entries (#12580)
## Problem When refreshing cancellation data we resend the entire value again just to reset the TTL, which causes unnecessary load in proxy, on network and possibly on redis side. ## Summary of changes * Switch from using SET with full value to using EXPIRE to reset TTL. * Add a tiny delay between retries to prevent busy loop. * Shorten CancelKeyOp variants: drop redundant suffix. * Retry SET when EXPIRE failed.
This commit is contained in:
@@ -32,20 +32,24 @@ use crate::util::run_until;
|
||||
|
||||
type IpSubnetKey = IpNet;
|
||||
|
||||
const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600);
|
||||
const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570);
|
||||
const CANCEL_KEY_TTL: Duration = Duration::from_secs(600);
|
||||
const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570);
|
||||
|
||||
// Message types for sending through mpsc channel
|
||||
pub enum CancelKeyOp {
|
||||
StoreCancelKey {
|
||||
Store {
|
||||
key: CancelKeyData,
|
||||
value: Box<str>,
|
||||
expire: std::time::Duration,
|
||||
expire: Duration,
|
||||
},
|
||||
GetCancelData {
|
||||
Refresh {
|
||||
key: CancelKeyData,
|
||||
expire: Duration,
|
||||
},
|
||||
Get {
|
||||
key: CancelKeyData,
|
||||
},
|
||||
GetCancelDataOld {
|
||||
GetOld {
|
||||
key: CancelKeyData,
|
||||
},
|
||||
}
|
||||
@@ -108,7 +112,7 @@ impl Pipeline {
|
||||
impl CancelKeyOp {
|
||||
fn register(&self, pipe: &mut Pipeline) {
|
||||
match self {
|
||||
CancelKeyOp::StoreCancelKey { key, value, expire } => {
|
||||
CancelKeyOp::Store { key, value, expire } => {
|
||||
let key = KeyPrefix::Cancel(*key).build_redis_key();
|
||||
pipe.add_command(Cmd::set_options(
|
||||
&key,
|
||||
@@ -116,11 +120,15 @@ impl CancelKeyOp {
|
||||
SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())),
|
||||
));
|
||||
}
|
||||
CancelKeyOp::GetCancelDataOld { key } => {
|
||||
CancelKeyOp::Refresh { key, expire } => {
|
||||
let key = KeyPrefix::Cancel(*key).build_redis_key();
|
||||
pipe.add_command(Cmd::expire(&key, expire.as_secs() as i64));
|
||||
}
|
||||
CancelKeyOp::GetOld { key } => {
|
||||
let key = KeyPrefix::Cancel(*key).build_redis_key();
|
||||
pipe.add_command(Cmd::hget(key, "data"));
|
||||
}
|
||||
CancelKeyOp::GetCancelData { key } => {
|
||||
CancelKeyOp::Get { key } => {
|
||||
let key = KeyPrefix::Cancel(*key).build_redis_key();
|
||||
pipe.add_command(Cmd::get(key));
|
||||
}
|
||||
@@ -264,7 +272,7 @@ impl CancellationHandler {
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::Get);
|
||||
let op = CancelKeyOp::GetCancelData { key };
|
||||
let op = CancelKeyOp::Get { key };
|
||||
let result = timeout(
|
||||
TIMEOUT,
|
||||
tx.call((guard, op), std::future::pending::<Infallible>()),
|
||||
@@ -289,7 +297,7 @@ impl CancellationHandler {
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::HGet);
|
||||
let op = CancelKeyOp::GetCancelDataOld { key };
|
||||
let op = CancelKeyOp::GetOld { key };
|
||||
timeout(
|
||||
TIMEOUT,
|
||||
tx.call((guard, op), std::future::pending::<Infallible>()),
|
||||
@@ -474,45 +482,95 @@ impl Session {
|
||||
|
||||
let mut cancel = pin!(cancel);
|
||||
|
||||
enum State {
|
||||
Set,
|
||||
Refresh,
|
||||
}
|
||||
let mut state = State::Set;
|
||||
|
||||
loop {
|
||||
let guard = Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::Set);
|
||||
let op = CancelKeyOp::StoreCancelKey {
|
||||
key: self.key,
|
||||
value: closure_json.clone(),
|
||||
expire: CANCEL_KEY_TTL,
|
||||
let guard_op = match state {
|
||||
State::Set => {
|
||||
let guard = Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::Set);
|
||||
let op = CancelKeyOp::Store {
|
||||
key: self.key,
|
||||
value: closure_json.clone(),
|
||||
expire: CANCEL_KEY_TTL,
|
||||
};
|
||||
tracing::debug!(
|
||||
src=%self.key,
|
||||
dest=?cancel_closure.cancel_token,
|
||||
"registering cancellation key"
|
||||
);
|
||||
(guard, op)
|
||||
}
|
||||
|
||||
State::Refresh => {
|
||||
let guard = Metrics::get()
|
||||
.proxy
|
||||
.cancel_channel_size
|
||||
.guard(RedisMsgKind::Expire);
|
||||
let op = CancelKeyOp::Refresh {
|
||||
key: self.key,
|
||||
expire: CANCEL_KEY_TTL,
|
||||
};
|
||||
tracing::debug!(
|
||||
src=%self.key,
|
||||
dest=?cancel_closure.cancel_token,
|
||||
"refreshing cancellation key"
|
||||
);
|
||||
(guard, op)
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
src=%self.key,
|
||||
dest=?cancel_closure.cancel_token,
|
||||
"registering cancellation key"
|
||||
);
|
||||
|
||||
match tx.call((guard, op), cancel.as_mut()).await {
|
||||
Ok(_) => {
|
||||
match tx.call(guard_op, cancel.as_mut()).await {
|
||||
// SET returns OK
|
||||
Ok(Value::Okay) => {
|
||||
tracing::debug!(
|
||||
src=%self.key,
|
||||
dest=?cancel_closure.cancel_token,
|
||||
"registered cancellation key"
|
||||
);
|
||||
|
||||
// wait before continuing. break immediately if cancelled.
|
||||
if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
state = State::Refresh;
|
||||
}
|
||||
|
||||
// EXPIRE returns 1
|
||||
Ok(Value::Int(1)) => {
|
||||
tracing::debug!(
|
||||
src=%self.key,
|
||||
dest=?cancel_closure.cancel_token,
|
||||
"refreshed cancellation key"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(_) => {
|
||||
// Any other response likely means the key expired.
|
||||
tracing::warn!(src=%self.key, "refreshing cancellation key failed");
|
||||
// Re-enter the SET loop to repush full data.
|
||||
state = State::Set;
|
||||
}
|
||||
|
||||
// retry immediately.
|
||||
Err(BatchQueueError::Result(error)) => {
|
||||
tracing::warn!(?error, "error registering cancellation key");
|
||||
tracing::warn!(?error, "error refreshing cancellation key");
|
||||
// Small delay to prevent busy loop with high cpu and logging.
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
|
||||
}
|
||||
|
||||
// wait before continuing. break immediately if cancelled.
|
||||
if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut())
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = cancel_closure
|
||||
|
||||
@@ -376,6 +376,7 @@ pub enum Waiting {
|
||||
pub enum RedisMsgKind {
|
||||
Set,
|
||||
Get,
|
||||
Expire,
|
||||
HGet,
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user