diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 03be9dd4cf..77062d3bb4 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -32,20 +32,24 @@ use crate::util::run_until; type IpSubnetKey = IpNet; -const CANCEL_KEY_TTL: std::time::Duration = std::time::Duration::from_secs(600); -const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(570); +const CANCEL_KEY_TTL: Duration = Duration::from_secs(600); +const CANCEL_KEY_REFRESH: Duration = Duration::from_secs(570); // Message types for sending through mpsc channel pub enum CancelKeyOp { - StoreCancelKey { + Store { key: CancelKeyData, value: Box, - expire: std::time::Duration, + expire: Duration, }, - GetCancelData { + Refresh { + key: CancelKeyData, + expire: Duration, + }, + Get { key: CancelKeyData, }, - GetCancelDataOld { + GetOld { key: CancelKeyData, }, } @@ -108,7 +112,7 @@ impl Pipeline { impl CancelKeyOp { fn register(&self, pipe: &mut Pipeline) { match self { - CancelKeyOp::StoreCancelKey { key, value, expire } => { + CancelKeyOp::Store { key, value, expire } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::set_options( &key, @@ -116,11 +120,15 @@ impl CancelKeyOp { SetOptions::default().with_expiration(SetExpiry::EX(expire.as_secs())), )); } - CancelKeyOp::GetCancelDataOld { key } => { + CancelKeyOp::Refresh { key, expire } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command(Cmd::expire(&key, expire.as_secs() as i64)); + } + CancelKeyOp::GetOld { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::hget(key, "data")); } - CancelKeyOp::GetCancelData { key } => { + CancelKeyOp::Get { key } => { let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command(Cmd::get(key)); } @@ -264,7 +272,7 @@ impl CancellationHandler { .proxy .cancel_channel_size .guard(RedisMsgKind::Get); - let op = CancelKeyOp::GetCancelData { key }; + let op = CancelKeyOp::Get { key }; let result = timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -289,7 +297,7 @@ impl CancellationHandler { .proxy .cancel_channel_size .guard(RedisMsgKind::HGet); - let op = CancelKeyOp::GetCancelDataOld { key }; + let op = CancelKeyOp::GetOld { key }; timeout( TIMEOUT, tx.call((guard, op), std::future::pending::()), @@ -474,45 +482,95 @@ impl Session { let mut cancel = pin!(cancel); + enum State { + Set, + Refresh, + } + let mut state = State::Set; + loop { - let guard = Metrics::get() - .proxy - .cancel_channel_size - .guard(RedisMsgKind::Set); - let op = CancelKeyOp::StoreCancelKey { - key: self.key, - value: closure_json.clone(), - expire: CANCEL_KEY_TTL, + let guard_op = match state { + State::Set => { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Set); + let op = CancelKeyOp::Store { + key: self.key, + value: closure_json.clone(), + expire: CANCEL_KEY_TTL, + }; + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "registering cancellation key" + ); + (guard, op) + } + + State::Refresh => { + let guard = Metrics::get() + .proxy + .cancel_channel_size + .guard(RedisMsgKind::Expire); + let op = CancelKeyOp::Refresh { + key: self.key, + expire: CANCEL_KEY_TTL, + }; + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "refreshing cancellation key" + ); + (guard, op) + } }; - tracing::debug!( - src=%self.key, - dest=?cancel_closure.cancel_token, - "registering cancellation key" - ); - - match tx.call((guard, op), cancel.as_mut()).await { - Ok(_) => { + match tx.call(guard_op, cancel.as_mut()).await { + // SET returns OK + Ok(Value::Okay) => { tracing::debug!( src=%self.key, dest=?cancel_closure.cancel_token, "registered cancellation key" ); - - // wait before continuing. break immediately if cancelled. - if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) - .await - .is_err() - { - break; - } + state = State::Refresh; } + + // EXPIRE returns 1 + Ok(Value::Int(1)) => { + tracing::debug!( + src=%self.key, + dest=?cancel_closure.cancel_token, + "refreshed cancellation key" + ); + } + + Ok(_) => { + // Any other response likely means the key expired. + tracing::warn!(src=%self.key, "refreshing cancellation key failed"); + // Re-enter the SET loop to repush full data. + state = State::Set; + } + // retry immediately. Err(BatchQueueError::Result(error)) => { - tracing::warn!(?error, "error registering cancellation key"); + tracing::warn!(?error, "error refreshing cancellation key"); + // Small delay to prevent busy loop with high cpu and logging. + tokio::time::sleep(Duration::from_millis(10)).await; + continue; } + Err(BatchQueueError::Cancelled(Err(_cancelled))) => break, } + + // wait before continuing. break immediately if cancelled. + if run_until(tokio::time::sleep(CANCEL_KEY_REFRESH), cancel.as_mut()) + .await + .is_err() + { + break; + } } if let Err(err) = cancel_closure diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 8439082498..bf4d5a11eb 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -376,6 +376,7 @@ pub enum Waiting { pub enum RedisMsgKind { Set, Get, + Expire, HGet, }