Compare commits

...

2 Commits

Author SHA1 Message Date
Folke Behrens
dd99ad6dc7 proxy: queue UNLINK command when conn terminates 2025-07-15 17:43:48 +02:00
Folke Behrens
11a804a3ac Split enqueuing and driving state machine, reduce noise 2025-07-15 17:42:37 +02:00
3 changed files with 79 additions and 37 deletions

View File

@@ -63,16 +63,19 @@ impl<P: QueueProcessing> BatchQueue<P> {
}
}
pub fn enqueue(&self, req: P::Req) -> (u64, oneshot::Receiver<ProcResult<P>>) {
self.inner.lock_propagate_poison().register_job(req)
}
/// Perform a single request-response process, this may be batched internally.
///
/// This function is not cancel safe.
pub async fn call<R>(
&self,
req: P::Req,
id: u64,
mut rx: oneshot::Receiver<ProcResult<P>>,
cancelled: impl Future<Output = R>,
) -> Result<P::Res, BatchQueueError<P::Err, R>> {
let (id, mut rx) = self.inner.lock_propagate_poison().register_job(req);
let mut cancelled = pin!(cancelled);
let resp: Option<Result<P::Res, P::Err>> = loop {
// try become the leader, or try wait for success.

View File

@@ -6,6 +6,7 @@ use std::time::Duration;
use futures::FutureExt;
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use metrics::MeasuredCounterPairGuard;
use postgres_client::RawCancelToken;
use postgres_client::tls::MakeTlsConnect;
use redis::{Cmd, FromRedisValue, SetExpiry, SetOptions, Value};
@@ -23,7 +24,9 @@ use crate::context::RequestContext;
use crate::control_plane::ControlPlaneApi;
use crate::error::ReportableError;
use crate::ext::LockExt;
use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind};
use crate::metrics::{
CancelChannelSizeGauge, CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind,
};
use crate::pqproto::CancelKeyData;
use crate::rate_limiter::LeakyBucketRateLimiter;
use crate::redis::keys::KeyPrefix;
@@ -52,6 +55,28 @@ pub enum CancelKeyOp {
GetOld {
key: CancelKeyData,
},
Delete {
key: CancelKeyData,
},
}
impl CancelKeyOp {
fn redis_msg_kind(&self) -> RedisMsgKind {
match self {
CancelKeyOp::Store { .. } => RedisMsgKind::Set,
CancelKeyOp::Refresh { .. } => RedisMsgKind::Expire,
CancelKeyOp::Get { .. } => RedisMsgKind::Get,
CancelKeyOp::GetOld { .. } => RedisMsgKind::HGet,
CancelKeyOp::Delete { .. } => RedisMsgKind::Unlink,
}
}
fn metric_guard(&self) -> MeasuredCounterPairGuard<'static, CancelChannelSizeGauge> {
Metrics::get()
.proxy
.cancel_channel_size
.guard(self.redis_msg_kind())
}
}
#[derive(thiserror::Error, Debug, Clone)]
@@ -107,6 +132,10 @@ impl Pipeline {
self.inner.add_command(cmd);
self.replies += 1;
}
fn add_command_ignore_reply(&mut self, cmd: Cmd) {
self.inner.add_command(cmd).ignore();
}
}
impl CancelKeyOp {
@@ -132,6 +161,10 @@ impl CancelKeyOp {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command(Cmd::get(key));
}
CancelKeyOp::Delete { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_ignore_reply(Cmd::unlink(key));
}
}
}
}
@@ -268,14 +301,11 @@ impl CancellationHandler {
return Err(CancelError::InternalError);
};
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Get);
let op = CancelKeyOp::Get { key };
let (id, rx) = tx.enqueue((op.metric_guard(), op));
let result = timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
tx.call(id, rx, std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
@@ -293,14 +323,11 @@ impl CancellationHandler {
&& let Some(errcode) = err.code()
&& errcode == "WRONGTYPE"
{
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetOld { key };
let (id, rx) = tx.enqueue((op.metric_guard(), op));
timeout(
TIMEOUT,
tx.call((guard, op), std::future::pending::<Infallible>()),
tx.call(id, rx, std::future::pending::<Infallible>()),
)
.await
.map_err(|_| {
@@ -482,51 +509,59 @@ impl Session {
let mut cancel = pin!(cancel);
#[derive(Copy, Clone, PartialEq, Eq)]
enum State {
Set,
Refresh,
Delete,
}
let mut state = State::Set;
loop {
let guard_op = match state {
let op = match state {
State::Set => {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Set);
let op = CancelKeyOp::Store {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
};
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"registering cancellation key"
);
(guard, op)
CancelKeyOp::Store {
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
}
}
State::Refresh => {
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::Expire);
let op = CancelKeyOp::Refresh {
key: self.key,
expire: CANCEL_KEY_TTL,
};
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"refreshing cancellation key"
);
(guard, op)
CancelKeyOp::Refresh {
key: self.key,
expire: CANCEL_KEY_TTL,
}
}
State::Delete => {
tracing::debug!(
src=%self.key,
dest=?cancel_closure.cancel_token,
"deleting cancellation key"
);
CancelKeyOp::Delete { key: self.key }
}
};
match tx.call(guard_op, cancel.as_mut()).await {
let (id, rx) = tx.enqueue((op.metric_guard(), op));
if state == State::Delete {
// The key deletion is just best effort. We enqueue the command,
// but don't drive the queue and wait for a response.
break;
}
match tx.call(id, rx, cancel.as_mut()).await {
// SET returns OK
Ok(Value::Okay) => {
tracing::debug!(
@@ -561,7 +596,10 @@ impl Session {
continue;
}
Err(BatchQueueError::Cancelled(Err(_cancelled))) => break,
Err(BatchQueueError::Cancelled(Err(_cancelled))) => {
state = State::Delete;
continue;
}
}
// wait before continuing. break immediately if cancelled.
@@ -569,7 +607,7 @@ impl Session {
.await
.is_err()
{
break;
state = State::Delete;
}
}

View File

@@ -378,6 +378,7 @@ pub enum RedisMsgKind {
Get,
Expire,
HGet,
Unlink,
}
#[derive(Default, Clone)]