optimise future sizes for cancel maintenance

This commit is contained in:
Conrad Ludgate
2025-06-10 06:16:25 -07:00
parent f37a558280
commit 3370e8cb00
3 changed files with 41 additions and 35 deletions

View File

@@ -35,12 +35,12 @@ const CANCEL_KEY_REFRESH: std::time::Duration = std::time::Duration::from_secs(5
// Message types for sending through mpsc channel
pub enum CancelKeyOp {
StoreCancelKey {
key: String,
value: String,
key: CancelKeyData,
value: Box<str>,
expire: std::time::Duration,
},
GetCancelData {
key: String,
key: CancelKeyData,
},
}
@@ -99,10 +99,12 @@ impl CancelKeyOp {
#[allow(clippy::used_underscore_binding)]
match self {
CancelKeyOp::StoreCancelKey { key, value, expire } => {
pipe.add_command_with_reply(Cmd::hset(key, "data", value));
pipe.add_command_no_reply(Cmd::expire(key, expire.as_secs() as i64));
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_with_reply(Cmd::hset(&key, "data", &**value));
pipe.add_command_no_reply(Cmd::expire(&key, expire.as_secs() as i64));
}
CancelKeyOp::GetCancelData { key } => {
let key = KeyPrefix::Cancel(*key).build_redis_key();
pipe.add_command_with_reply(Cmd::hget(key, "data"));
}
}
@@ -213,13 +215,9 @@ impl CancellationHandler {
let key: CancelKeyData = rand::random();
let prefix_key: KeyPrefix = KeyPrefix::Cancel(key);
let redis_key = prefix_key.build_redis_key();
debug!("registered new query cancellation key {key}");
Session {
key,
redis_key,
cancellation_handler: self,
}
}
@@ -228,14 +226,11 @@ impl CancellationHandler {
&self,
key: CancelKeyData,
) -> Result<Option<CancelClosure>, CancelError> {
let prefix_key: KeyPrefix = KeyPrefix::Cancel(key);
let redis_key = prefix_key.build_redis_key();
let guard = Metrics::get()
.proxy
.cancel_channel_size
.guard(RedisMsgKind::HGet);
let op = CancelKeyOp::GetCancelData { key: redis_key };
let op = CancelKeyOp::GetCancelData { key };
let Some(tx) = self.tx.get() else {
tracing::warn!("cancellation handler is not available");
@@ -359,7 +354,7 @@ impl CancelClosure {
}
/// Cancels the query running on user's compute node.
pub(crate) async fn try_cancel_query(
self,
&self,
compute_config: &ComputeConfig,
) -> Result<(), CancelError> {
let socket = TcpStream::connect(self.socket_addr).await?;
@@ -380,7 +375,6 @@ impl CancelClosure {
pub(crate) struct Session {
/// The user-facing key identifying this session.
key: CancelKeyData,
redis_key: String,
cancellation_handler: Arc<CancellationHandler>,
}
@@ -392,16 +386,17 @@ impl Session {
/// Ensure the cancel key is continously refreshed,
/// but stop when the channel is dropped.
pub(crate) async fn maintain_cancel_key(
self,
&self,
session_id: uuid::Uuid,
cancel: tokio::sync::oneshot::Receiver<Infallible>,
cancel_closure: CancelClosure,
cancel_closure: &CancelClosure,
compute_config: &ComputeConfig,
) {
tokio::select! {
_ = self.maintain_redis_cancel_key(&cancel_closure) => {}
_ = cancel => {}
};
futures::future::select(
std::pin::pin!(self.maintain_redis_cancel_key(cancel_closure)),
cancel,
)
.await;
if let Err(err) = cancel_closure
.try_cancel_query(compute_config)
@@ -425,7 +420,8 @@ impl Session {
};
let closure_json = serde_json::to_string(&cancel_closure)
.expect("serialising to json string should not fail");
.expect("serialising to json string should not fail")
.into_boxed_str();
loop {
let guard = Metrics::get()
@@ -433,7 +429,7 @@ impl Session {
.cancel_channel_size
.guard(RedisMsgKind::HSet);
let op = CancelKeyOp::StoreCancelKey {
key: self.redis_key.clone(),
key: self.key,
value: closure_json.clone(),
expire: CANCEL_KEY_TTL,
};

View File

@@ -237,13 +237,18 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
prepare_client_connection(&node, *session.key(), &mut stream);
let stream = stream.flush_and_into_inner().await?;
let session_id = ctx.session_id();
let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel();
tokio::spawn(session.maintain_cancel_key(
ctx.session_id(),
cancel,
node.cancel_closure,
&config.connect_to_compute,
));
tokio::spawn(async move {
session
.maintain_cancel_key(
session_id,
cancel,
&node.cancel_closure,
&config.connect_to_compute,
)
.await;
});
Ok(Some(ProxyPassthrough {
client: stream,

View File

@@ -377,13 +377,18 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
prepare_client_connection(&node, *session.key(), &mut stream);
let stream = stream.flush_and_into_inner().await?;
let session_id = ctx.session_id();
let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel();
tokio::spawn(session.maintain_cancel_key(
ctx.session_id(),
cancel,
node.cancel_closure,
&config.connect_to_compute,
));
tokio::spawn(async move {
session
.maintain_cancel_key(
session_id,
cancel,
&node.cancel_closure,
&config.connect_to_compute,
)
.await;
});
let private_link_id = match ctx.extra() {
Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()),