From 050c9f704f94b3434fa3dc0a602f3597acf7ac0d Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Mon, 21 Jul 2025 23:27:15 +0300 Subject: [PATCH] proxy: expose session_id to clients and proxy latency to probes (#12656) Implements #8728 --- proxy/src/auth/backend/console_redirect.rs | 2 -- proxy/src/binary/local_proxy.rs | 10 ++++++ proxy/src/binary/proxy.rs | 23 +++++++++++-- proxy/src/config.rs | 1 + proxy/src/console_redirect_proxy.rs | 8 ++++- proxy/src/metrics.rs | 8 ++--- proxy/src/proxy/mod.rs | 40 +++++++++++++++++++++- 7 files changed, 82 insertions(+), 10 deletions(-) diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index f561df9202..b06ed3a0ae 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -180,8 +180,6 @@ async fn authenticate( return Err(auth::AuthError::NetworkNotAllowed); } - client.write_message(BeMessage::NoticeResponse("Connecting to database.")); - // Backwards compatibility. pg_sni_proxy uses "--" in domain names // while direct connections do not. Once we migrate to pg_sni_proxy // everywhere, we can remove this. diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index e3f7ba4c15..7b9012dc69 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -1,3 +1,4 @@ +use std::env; use std::net::SocketAddr; use std::pin::pin; use std::sync::Arc; @@ -264,6 +265,14 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig timeout: Duration::from_secs(2), }; + let greetings = env::var_os("NEON_MOTD").map_or(String::new(), |s| match s.into_string() { + Ok(s) => s, + Err(_) => { + debug!("NEON_MOTD environment variable is not valid UTF-8"); + String::new() + } + }); + Ok(Box::leak(Box::new(ProxyConfig { tls_config: ArcSwapOption::from(None), metric_collection: None, @@ -290,6 +299,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?, connect_compute_locks, connect_to_compute: compute_config, + greetings, #[cfg(feature = "testing")] disable_pg_session_jwt: args.disable_pg_session_jwt, }))) diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 194a1ed34c..d1dd2fef2a 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -1,4 +1,3 @@ -#[cfg(any(test, feature = "testing"))] use std::env; use std::net::SocketAddr; use std::path::PathBuf; @@ -21,7 +20,7 @@ use tokio::net::TcpListener; use tokio::sync::Notify; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use utils::sentry_init::init_sentry; use utils::{project_build_tag, project_git_version}; @@ -730,6 +729,25 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { } }; + let mut greetings = env::var_os("NEON_MOTD").map_or(String::new(), |s| match s.into_string() { + Ok(s) => s, + Err(_) => { + debug!("NEON_MOTD environment variable is not valid UTF-8"); + String::new() + } + }); + + match &args.auth_backend { + AuthBackendType::ControlPlane => {} + #[cfg(any(test, feature = "testing"))] + AuthBackendType::Postgres => {} + #[cfg(any(test, feature = "testing"))] + AuthBackendType::Local => {} + AuthBackendType::ConsoleRedirect => { + greetings = "Connected to database".to_string(); + } + } + let config = ProxyConfig { tls_config, metric_collection, @@ -740,6 +758,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?, connect_compute_locks, connect_to_compute: compute_config, + greetings, #[cfg(feature = "testing")] disable_pg_session_jwt: false, #[cfg(feature = "rest_broker")] diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 20bbfd77d8..16b1dff5f4 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -39,6 +39,7 @@ pub struct ProxyConfig { pub wake_compute_retry_config: RetryConfig, pub connect_compute_locks: ApiLocks, pub connect_to_compute: ComputeConfig, + pub greetings: String, // Greeting message sent to the client after connection establishment and contains session_id. #[cfg(feature = "testing")] pub disable_pg_session_jwt: bool, } diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 041a56e032..014317d823 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -233,7 +233,13 @@ pub(crate) async fn handle_client( let session = cancellation_handler.get_key(); - finish_client_init(&pg_settings, *session.key(), &mut stream); + finish_client_init( + ctx, + &pg_settings, + *session.key(), + &mut stream, + &config.greetings, + ); let stream = stream.flush_and_into_inner().await?; let session_id = ctx.session_id(); diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 916604e2ec..7524133093 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -385,10 +385,10 @@ pub enum RedisMsgKind { #[derive(Default, Clone)] pub struct LatencyAccumulated { - cplane: time::Duration, - client: time::Duration, - compute: time::Duration, - retry: time::Duration, + pub cplane: time::Duration, + pub client: time::Duration, + pub compute: time::Duration, + pub retry: time::Duration, } impl std::fmt::Display for LatencyAccumulated { diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 02651109e0..8b7c4ff55d 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -145,7 +145,7 @@ pub(crate) async fn handle_client( let session = cancellation_handler.get_key(); - finish_client_init(&pg_settings, *session.key(), client); + finish_client_init(ctx, &pg_settings, *session.key(), client, &config.greetings); let session_id = ctx.session_id(); let (cancel_on_shutdown, cancel) = oneshot::channel(); @@ -165,9 +165,11 @@ pub(crate) async fn handle_client( /// Finish client connection initialization: confirm auth success, send params, etc. pub(crate) fn finish_client_init( + ctx: &RequestContext, settings: &compute::PostgresSettings, cancel_key_data: CancelKeyData, client: &mut PqStream, + greetings: &String, ) { // Forward all deferred notices to the client. for notice in &settings.delayed_notice { @@ -176,6 +178,12 @@ pub(crate) fn finish_client_init( }); } + // Expose session_id to clients if we have a greeting message. + if !greetings.is_empty() { + let session_msg = format!("{}, session_id: {}", greetings, ctx.session_id()); + client.write_message(BeMessage::NoticeResponse(session_msg.as_str())); + } + // Forward all postgres connection params to the client. for (name, value) in &settings.params { client.write_message(BeMessage::ParameterStatus { @@ -184,6 +192,36 @@ pub(crate) fn finish_client_init( }); } + // Forward recorded latencies for probing requests + if let Some(testodrome_id) = ctx.get_testodrome_id() { + client.write_message(BeMessage::ParameterStatus { + name: "neon.testodrome_id".as_bytes(), + value: testodrome_id.as_bytes(), + }); + + let latency_measured = ctx.get_proxy_latency(); + + client.write_message(BeMessage::ParameterStatus { + name: "neon.cplane_latency".as_bytes(), + value: latency_measured.cplane.as_micros().to_string().as_bytes(), + }); + + client.write_message(BeMessage::ParameterStatus { + name: "neon.client_latency".as_bytes(), + value: latency_measured.client.as_micros().to_string().as_bytes(), + }); + + client.write_message(BeMessage::ParameterStatus { + name: "neon.compute_latency".as_bytes(), + value: latency_measured.compute.as_micros().to_string().as_bytes(), + }); + + client.write_message(BeMessage::ParameterStatus { + name: "neon.retry_latency".as_bytes(), + value: latency_measured.retry.as_micros().to_string().as_bytes(), + }); + } + client.write_message(BeMessage::BackendKeyData(cancel_key_data)); client.write_message(BeMessage::ReadyForQuery); }