diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 77062d3bb4..f25121331f 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -32,8 +32,11 @@ use crate::util::run_until; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: Duration = Duration::from_secs(600); -const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570); +/// Initial period and TTL is shorter to clear keys of short-lived connections faster. +const CANCEL_KEY_INITIAL_PERIOD: Duration = Duration::from_secs(60); +const CANCEL_KEY_REFRESH_PERIOD: Duration = Duration::from_secs(10 * 60); +/// `CANCEL_KEY_TTL_SLACK` is added to the periods to determine the actual TTL. +const CANCEL_KEY_TTL_SLACK: Duration = Duration::from_secs(30); // Message types for sending through mpsc channel pub enum CancelKeyOp { @@ -54,6 +57,24 @@ pub enum CancelKeyOp { }, } +impl CancelKeyOp { + const fn redis_msg_kind(&self) -> RedisMsgKind { + match self { + CancelKeyOp::Store { .. } => RedisMsgKind::Set, + CancelKeyOp::Refresh { .. } => RedisMsgKind::Expire, + CancelKeyOp::Get { .. } => RedisMsgKind::Get, + CancelKeyOp::GetOld { .. } => RedisMsgKind::HGet, + } + } + + fn cancel_channel_metric_guard(&self) -> CancelChannelSizeGuard<'static> { + Metrics::get() + .proxy + .cancel_channel_size + .guard(self.redis_msg_kind()) + } +} + #[derive(thiserror::Error, Debug, Clone)] pub enum PipelineError { #[error("could not send cmd to redis: {0}")] @@ -483,50 +504,49 @@ impl Session { let mut cancel = pin!(cancel); enum State { - Set, + Init, Refresh, } - let mut state = State::Set; + let mut state = State::Init; loop { - let guard_op = match state { - State::Set => { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::Set); - let op = CancelKeyOp::Store { - key: self.key, - value: closure_json.clone(), - expire: CANCEL_KEY_TTL, - }; + let (op, mut wait_interval) = match state { + State::Init => { tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, "registering cancellation key" ); - (guard, op) + ( + CancelKeyOp::Store { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_INITIAL_PERIOD + CANCEL_KEY_TTL_SLACK, + }, + CANCEL_KEY_INITIAL_PERIOD, + ) } State::Refresh => { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::Expire); - let op = CancelKeyOp::Refresh { - key: self.key, - expire: CANCEL_KEY_TTL, - }; tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, "refreshing cancellation key" ); - (guard, op) + ( + CancelKeyOp::Refresh { + key: self.key, + expire: CANCEL_KEY_REFRESH_PERIOD + CANCEL_KEY_TTL_SLACK, + }, + CANCEL_KEY_REFRESH_PERIOD, + ) } }; - match tx.call(guard_op, cancel.as_mut()).await { + match tx + .call((op.cancel_channel_metric_guard(), op), cancel.as_mut()) + .await + { // SET returns OK Ok(Value::Okay) => { tracing::debug!( @@ -549,23 +569,23 @@ impl Session { Ok(_) => { // Any other response likely means the key expired. tracing::warn!(src=%self.key, "refreshing cancellation key failed"); - // Re-enter the SET loop to repush full data. - state = State::Set; + // Re-enter the SET loop quickly to repush full data. + state = State::Init; + wait_interval = Duration::ZERO; } // retry immediately. Err(BatchQueueError::Result(error)) => { tracing::warn!(?error, "error refreshing cancellation key"); // Small delay to prevent busy loop with high cpu and logging. - tokio::time::sleep(Duration::from_millis(10)).await; - continue; + wait_interval = Duration::from_millis(10); } Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, } // wait before continuing. break immediately if cancelled. - if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + if run_until(tokio::time::sleep(wait_interval), cancel.as_mut()) .await .is_err() {