proxy: Shorten the initial TTL of cancel keys (#12647)

## Problem

A high rate of short-lived connections means that there a lot of cancel
keys in Redis with TTL=10min that could be avoided by having a much
shorter initial TTL.

## Summary of changes

* Introduce an initial TTL of 1min used with the SET command.
* Fix: don't delay repushing cancel data when expired.
* Prepare for exponentially increasing TTLs.

## Alternatives

A best-effort UNLINK command on connection termination would clean up
cancel keys right away. This needs a bigger refactor due to how batching
is handled.
This commit is contained in:
Folke Behrens
2025-07-17 23:52:20 +02:00
committed by GitHub
parent 53a05e8ccb
commit 64d0008389

View File

@@ -32,8 +32,11 @@ use crate::util::run_until;
type IpSubnetKey = IpNet;
const CANCEL_KEY_TTL: Duration = Duration::from_secs(600);
const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570);
/// Initial period and TTL is shorter to clear keys of short-lived connections faster.
const CANCEL_KEY_INITIAL_PERIOD: Duration = Duration::from_secs(60);
const CANCEL_KEY_REFRESH_PERIOD: Duration = Duration::from_secs(10 * 60);
/// `CANCEL_KEY_TTL_SLACK` is added to the periods to determine the actual TTL.
const CANCEL_KEY_TTL_SLACK: Duration = Duration::from_secs(30);
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
@@ -54,6 +57,24 @@ pub enum CancelKeyOp {
},
}
impl CancelKeyOp {
const fn redis_msg_kind(&self) -> RedisMsgKind {
match self {
CancelKeyOp::Store { .. } => RedisMsgKind::Set,
CancelKeyOp::Refresh { .. } => RedisMsgKind::Expire,
CancelKeyOp::Get { .. } => RedisMsgKind::Get,
CancelKeyOp::GetOld { .. } => RedisMsgKind::HGet,
}
}
fn cancel_channel_metric_guard(&self) -> CancelChannelSizeGuard<'static> {
Metrics::get()
.proxy
.cancel_channel_size
.guard(self.redis_msg_kind())
}
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum PipelineError {
#[error("could not send cmd to redis: {0}")]
@@ -483,50 +504,49 @@ impl Session {
let mut cancel = pin!(cancel);
enum State {
Set,
Init,
Refresh,
}
let mut state = State::Set;
let mut state = State::Init;
loop {
let guard_op = match state {
State::Set => {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::Store {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
};
let (op, mut wait_interval) = match state {
State::Init => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registering cancellation key"
);
(guard, op)
(
CancelKeyOp::Store {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_INITIAL_PERIOD + CANCEL_KEY_TTL_SLACK,
},
CANCEL_KEY_INITIAL_PERIOD,
)
}
State::Refresh => {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Expire);
let op = CancelKeyOp::Refresh {
key: self.key,
expire: CANCEL_KEY_TTL,
};
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"refreshing cancellation key"
);
(guard, op)
(
CancelKeyOp::Refresh {
key: self.key,
expire: CANCEL_KEY_REFRESH_PERIOD + CANCEL_KEY_TTL_SLACK,
},
CANCEL_KEY_REFRESH_PERIOD,
)
}
};
match tx.call(guard_op, cancel.as_mut()).await {
match tx
.call((op.cancel_channel_metric_guard(), op), cancel.as_mut())
.await
{
// SET returns OK
Ok(Value::Okay) => {
tracing::debug!(
@@ -549,23 +569,23 @@ impl Session {
Ok(_) => {
// Any other response likely means the key expired.
tracing::warn!(src=%self.key, "refreshing cancellation key failed");
// Re-enter the SET loop to repush full data.
state = State::Set;
// Re-enter the SET loop quickly to repush full data.
state = State::Init;
wait_interval = Duration::ZERO;
}
// retry immediately.
Err(BatchQueueError::Result(error)) => {
tracing::warn!(?error, "error refreshing cancellation key");
// Small delay to prevent busy loop with high cpu and logging.
tokio::time::sleep(Duration::from_millis(10)).await;
continue;
wait_interval = Duration::from_millis(10);
}
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
}
// wait before continuing. break immediately if cancelled.
if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut())
if run_until(tokio::time::sleep(wait_interval), cancel.as_mut())
.await
.is_err()
{