From aece52036572ad5d2aff2b95a5338591c2a52ca3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 16 May 2025 06:41:16 +0200 Subject: [PATCH] remove replies for store/remove ops --- proxy/src/cancellation.rs | 64 +++------------------------------------ 1 file changed, 4 insertions(+), 60 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index d8bccc9c65..987b8545c0 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -34,7 +34,6 @@ pub enum CancelKeyOp { StoreCancelKey { key: String, value: String, - resp_tx: Option>>, _guard: CancelChannelSizeGuard<'static>, expire: i64, // TTL for key }, @@ -46,7 +45,6 @@ pub enum CancelKeyOp { RemoveCancelKey { key: String, field: String, - resp_tx: Option>>, _guard: CancelChannelSizeGuard<'static>, }, } @@ -104,13 +102,6 @@ impl Pipeline { fn add_command_no_reply(&mut self, cmd: Cmd) { self.inner.add_command(cmd).ignore(); } - - fn add_command(&mut self, cmd: Cmd, reply: Option) { - match reply { - Some(reply) => self.add_command_with_reply(cmd, reply), - None => self.add_command_no_reply(cmd), - } - } } impl CancelKeyOp { @@ -120,13 +111,10 @@ impl CancelKeyOp { CancelKeyOp::StoreCancelKey { key, value, - resp_tx, _guard, expire, } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::StoreCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hset(&key, "data", value), reply); + pipe.add_command_no_reply(Cmd::hset(&key, "data", value)); pipe.add_command_no_reply(Cmd::expire(key, expire)); } CancelKeyOp::GetCancelData { @@ -137,15 +125,8 @@ impl CancelKeyOp { let reply = CancelReplyOp::GetCancelData { resp_tx, _guard }; pipe.add_command_with_reply(Cmd::hget(key, "data"), reply); } - CancelKeyOp::RemoveCancelKey { - key, - field, - resp_tx, - _guard, - } => { - let reply = - resp_tx.map(|resp_tx| CancelReplyOp::RemoveCancelKey { resp_tx, _guard }); - pipe.add_command(Cmd::hdel(key, field), reply); + CancelKeyOp::RemoveCancelKey { key, field, _guard } => { + pipe.add_command_no_reply(Cmd::hdel(key, field)); } } } @@ -153,54 +134,26 @@ impl CancelKeyOp { // Message types for sending through mpsc channel pub enum CancelReplyOp { - StoreCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, GetCancelData { resp_tx: oneshot::Sender>, _guard: CancelChannelSizeGuard<'static>, }, - RemoveCancelKey { - resp_tx: oneshot::Sender>, - _guard: CancelChannelSizeGuard<'static>, - }, } impl CancelReplyOp { fn send_err(self, e: anyhow::Error) { match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } CancelReplyOp::GetCancelData { resp_tx, _guard } => { resp_tx .send(Err(e)) .inspect_err(|_| tracing::debug!("could not send reply")) .ok(); } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - resp_tx - .send(Err(e)) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } } } fn send_value(self, v: redis::Value) { match self { - CancelReplyOp::StoreCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } CancelReplyOp::GetCancelData { resp_tx, _guard } => { let send = FromRedisValue::from_owned_redis_value(v).context("could not parse value"); @@ -209,14 +162,6 @@ impl CancelReplyOp { .inspect_err(|_| tracing::debug!("could not send reply")) .ok(); } - CancelReplyOp::RemoveCancelKey { resp_tx, _guard } => { - let send = - FromRedisValue::from_owned_redis_value(v).context("could not parse value"); - resp_tx - .send(send) - .inspect_err(|_| tracing::debug!("could not send reply")) - .ok(); - } } } } @@ -380,6 +325,7 @@ impl CancellationHandler { Ok(Some(cancel_closure)) } + /// Try to cancel a running query for the corresponding connection. /// If the cancellation key is not found, it will be published to Redis. /// check_allowed - if true, check if the IP is allowed to cancel the query. @@ -526,7 +472,6 @@ impl Session { let op = CancelKeyOp::StoreCancelKey { key: self.redis_key.clone(), value: closure_json, - resp_tx: None, _guard: Metrics::get() .proxy .cancel_channel_size @@ -550,7 +495,6 @@ impl Session { let op = CancelKeyOp::RemoveCancelKey { key: self.redis_key.clone(), field: "data".to_string(), - resp_tx: None, _guard: Metrics::get() .proxy .cancel_channel_size