From 725aed694b2ec776f2f27a6e48dcec57bcb73461 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 27 Jun 2025 16:50:42 +0100 Subject: [PATCH] do not replace cancelkeydata --- proxy/src/console_redirect_proxy.rs | 54 ++++++++++++------------ proxy/src/proxy/mod.rs | 65 ++++++++++++++++------------- 2 files changed, 63 insertions(+), 56 deletions(-) diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index f947abebc0..fe799258a2 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -15,6 +15,7 @@ use crate::metrics::{Metrics, NumClientConnectionsGuard}; use crate::pglb::ClientRequestError; use crate::pglb::handshake::{HandshakeData, handshake}; use crate::pglb::passthrough::ProxyPassthrough; +use crate::pqproto::CancelKeyData; use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; use crate::proxy::{ ErrorSource, connect_compute, forward_compute_params_to_client, send_client_greeting, @@ -207,7 +208,7 @@ pub(crate) async fn handle_client( ctx.set_db_options(params.clone()); - let (node_info, mut auth_info, user_info) = match backend + let (node_info, mut auth_info, _user_info) = match backend .authenticate(ctx, &config.authentication_config, &mut stream) .await { @@ -231,35 +232,34 @@ pub(crate) async fn handle_client( .await?; send_client_greeting(ctx, &config.greetings, &mut stream); - let session = cancellation_handler.get_key(); + // let session = cancellation_handler.get_key(); - let (process_id, secret_key) = - forward_compute_params_to_client(ctx, *session.key(), &mut stream, &mut node.stream) - .await?; + let (_process_id, _secret_key) = + forward_compute_params_to_client(ctx, None, &mut stream, &mut node.stream).await?; let stream = stream.flush_and_into_inner().await?; - let hostname = node.hostname.to_string(); + // let hostname = node.hostname.to_string(); - let session_id = ctx.session_id(); - let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); - tokio::spawn(async move { - session - .maintain_cancel_key( - session_id, - cancel, - &CancelClosure { - socket_addr: node.socket_addr, - cancel_token: RawCancelToken { - ssl_mode: node.ssl_mode, - process_id, - secret_key, - }, - hostname, - user_info, - }, - &config.connect_to_compute, - ) - .await; - }); + // let session_id = ctx.session_id(); + let (cancel_on_shutdown, _cancel) = tokio::sync::oneshot::channel(); + // tokio::spawn(async move { + // session + // .maintain_cancel_key( + // session_id, + // cancel, + // &CancelClosure { + // socket_addr: node.socket_addr, + // cancel_token: RawCancelToken { + // ssl_mode: node.ssl_mode, + // process_id, + // secret_key, + // }, + // hostname, + // user_info, + // }, + // &config.connect_to_compute, + // ) + // .await; + // }); Ok(Some(ProxyPassthrough { client: stream, diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 99374c390c..2d96b76479 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -42,7 +42,7 @@ pub(crate) async fn handle_client( config: &'static ProxyConfig, auth_backend: &'static auth::Backend<'static, ()>, ctx: &RequestContext, - cancellation_handler: Arc, + _cancellation_handler: Arc, client: &mut PqStream>, mode: &ClientMode, endpoint_rate_limiter: Arc, @@ -114,7 +114,7 @@ pub(crate) async fn handle_client( send_client_greeting(ctx, &config.greetings, client); - let auth::Backend::ControlPlane(_, user_info) = backend else { + let auth::Backend::ControlPlane(_, _user_info) = backend else { unreachable!("ensured above"); }; @@ -124,33 +124,33 @@ pub(crate) async fn handle_client( client.write_message(BeMessage::AuthenticationOk); } - let session = cancellation_handler.get_key(); + // let session = cancellation_handler.get_key(); - let (process_id, secret_key) = - forward_compute_params_to_client(ctx, *session.key(), client, &mut node.stream).await?; - let hostname = node.hostname.to_string(); + let (_process_id, _secret_key) = + forward_compute_params_to_client(ctx, None, client, &mut node.stream).await?; + // let hostname = node.hostname.to_string(); - let session_id = ctx.session_id(); - let (cancel_on_shutdown, cancel) = oneshot::channel(); - tokio::spawn(async move { - session - .maintain_cancel_key( - session_id, - cancel, - &CancelClosure { - socket_addr: node.socket_addr, - cancel_token: RawCancelToken { - ssl_mode: node.ssl_mode, - process_id, - secret_key, - }, - hostname, - user_info, - }, - &config.connect_to_compute, - ) - .await; - }); + // let session_id = ctx.session_id(); + let (cancel_on_shutdown, _cancel) = oneshot::channel(); + // tokio::spawn(async move { + // session + // .maintain_cancel_key( + // session_id, + // cancel, + // &CancelClosure { + // socket_addr: node.socket_addr, + // cancel_token: RawCancelToken { + // ssl_mode: node.ssl_mode, + // process_id, + // secret_key, + // }, + // hostname, + // user_info, + // }, + // &config.connect_to_compute, + // ) + // .await; + // }); Ok((node, cancel_on_shutdown)) } @@ -200,7 +200,7 @@ pub(crate) fn send_client_greeting( pub(crate) async fn forward_compute_params_to_client( ctx: &RequestContext, - cancel_key_data: CancelKeyData, + cancel_key_data: Option, client: &mut PqStream, compute: &mut StartupStream, ) -> Result<(i32, i32), ClientRequestError> { @@ -219,9 +219,16 @@ pub(crate) async fn forward_compute_params_to_client( match msg { // Send our cancellation key data instead. Some(Message::BackendKeyData(body)) => { - client.write_message(BeMessage::BackendKeyData(cancel_key_data)); process_id = body.process_id(); secret_key = body.secret_key(); + + let cancel_key_data = cancel_key_data.unwrap_or_else(|| { + let pid = process_id as u32; + let key = secret_key as u32; + CancelKeyData(((pid as u64) << 32 | (key as u64)).into()) + }); + + client.write_message(BeMessage::BackendKeyData(cancel_key_data)); } // Forward all postgres connection params to the client. Some(Message::ParameterStatus(body)) => {