From 3370e8cb00ab13675b8cd940df63f88d6d27c6bb Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 10 Jun 2025 06:16:25 -0700 Subject: [PATCH] optimise future sizes for cancel maintenance --- proxy/src/cancellation.rs | 42 +++++++++++++---------------- proxy/src/console_redirect_proxy.rs | 17 +++++++----- proxy/src/proxy/mod.rs | 17 +++++++----- 3 files changed, 41 insertions(+), 35 deletions(-) diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 17d33c9ccd..8ce0f1365b 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -35,12 +35,12 @@ const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(5 // Message types for sending through mpsc channel pub enum CancelKeyOp { StoreCancelKey { - key: String, - value: String, + key: CancelKeyData, + value: Box, expire: std::time::Duration, }, GetCancelData { - key: String, + key: CancelKeyData, }, } @@ -99,10 +99,12 @@ impl CancelKeyOp { #[allow(clippy::used_underscore_binding)] match self { CancelKeyOp::StoreCancelKey { key, value, expire } => { - pipe.add_command_with_reply(Cmd::hset(key, "data", value)); - pipe.add_command_no_reply(Cmd::expire(key, expire.as_secs() as i64)); + let key = KeyPrefix::Cancel(*key).build_redis_key(); + pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value)); + pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64)); } CancelKeyOp::GetCancelData { key } => { + let key = KeyPrefix::Cancel(*key).build_redis_key(); pipe.add_command_with_reply(Cmd::hget(key, "data")); } } @@ -213,13 +215,9 @@ impl CancellationHandler { let key: CancelKeyData = rand::random(); - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); - debug!("registered new query cancellation key {key}"); Session { key, - redis_key, cancellation_handler: self, } } @@ -228,14 +226,11 @@ impl CancellationHandler { &self, key: CancelKeyData, ) -> Result, CancelError> { - let prefix_key: KeyPrefix = KeyPrefix::Cancel(key); - let redis_key = prefix_key.build_redis_key(); - let guard = Metrics::get() .proxy .cancel_channel_size .guard(RedisMsgKind::HGet); - let op = CancelKeyOp::GetCancelData { key: redis_key }; + let op = CancelKeyOp::GetCancelData { key }; let Some(tx) = self.tx.get() else { tracing::warn!("cancellation handler is not available"); @@ -359,7 +354,7 @@ impl CancelClosure { } /// Cancels the query running on user's compute node. pub(crate) async fn try_cancel_query( - self, + &self, compute_config: &ComputeConfig, ) -> Result<(), CancelError> { let socket = TcpStream::connect(self.socket_addr).await?; @@ -380,7 +375,6 @@ impl CancelClosure { pub(crate) struct Session { /// The user-facing key identifying this session. key: CancelKeyData, - redis_key: String, cancellation_handler: Arc, } @@ -392,16 +386,17 @@ impl Session { /// Ensure the cancel key is continously refreshed, /// but stop when the channel is dropped. pub(crate) async fn maintain_cancel_key( - self, + &self, session_id: uuid::Uuid, cancel: tokio::sync::oneshot::Receiver, - cancel_closure: CancelClosure, + cancel_closure: &CancelClosure, compute_config: &ComputeConfig, ) { - tokio::select! { - _ = self.maintain_redis_cancel_key(&cancel_closure) => {} - _ = cancel => {} - }; + futures::future::select( + std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)), + cancel, + ) + .await; if let Err(err) = cancel_closure .try_cancel_query(compute_config) @@ -425,7 +420,8 @@ impl Session { }; let closure_json = serde_json::to_string(&cancel_closure) - .expect("serialising to json string should not fail"); + .expect("serialising to json string should not fail") + .into_boxed_str(); loop { let guard = Metrics::get() @@ -433,7 +429,7 @@ impl Session { .cancel_channel_size .guard(RedisMsgKind::HSet); let op = CancelKeyOp::StoreCancelKey { - key: self.redis_key.clone(), + key: self.key, value: closure_json.clone(), expire: CANCEL_KEY_TTL, }; diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 96ce090e78..89adfc9049 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -237,13 +237,18 @@ pub(crate) async fn handle_client( prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); - tokio::spawn(session.maintain_cancel_key( - ctx.session_id(), - cancel, - node.cancel_closure, - &config.connect_to_compute, - )); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); Ok(Some(ProxyPassthrough { client: stream, diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 988a66cb76..7da1b8d8fa 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -377,13 +377,18 @@ pub(crate) async fn handle_client( prepare_client_connection(&node, *session.key(), &mut stream); let stream = stream.flush_and_into_inner().await?; + let session_id = ctx.session_id(); let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); - tokio::spawn(session.maintain_cancel_key( - ctx.session_id(), - cancel, - node.cancel_closure, - &config.connect_to_compute, - )); + tokio::spawn(async move { + session + .maintain_cancel_key( + session_id, + cancel, + &node.cancel_closure, + &config.connect_to_compute, + ) + .await; + }); let private_link_id = match ctx.extra() { Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()),