proxy: expose session_id to clients and proxy latency to probes (#12656)

Implements #8728
This commit is contained in:
Ivan Efremov
2025-07-21 23:27:15 +03:00
committed by GitHub
parent 0dbe551802
commit 050c9f704f
7 changed files with 82 additions and 10 deletions

View File

@@ -180,8 +180,6 @@ async fn authenticate(
return Err(auth::AuthError::NetworkNotAllowed);
}
client.write_message(BeMessage::NoticeResponse("Connecting to database."));
// Backwards compatibility. pg_sni_proxy uses "--" in domain names
// while direct connections do not. Once we migrate to pg_sni_proxy
// everywhere, we can remove this.

View File

@@ -1,3 +1,4 @@
use std::env;
use std::net::SocketAddr;
use std::pin::pin;
use std::sync::Arc;
@@ -264,6 +265,14 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
timeout: Duration::from_secs(2),
};
let greetings = env::var_os("NEON_MOTD").map_or(String::new(), |s| match s.into_string() {
Ok(s) => s,
Err(_) => {
debug!("NEON_MOTD environment variable is not valid UTF-8");
String::new()
}
});
Ok(Box::leak(Box::new(ProxyConfig {
tls_config: ArcSwapOption::from(None),
metric_collection: None,
@@ -290,6 +299,7 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?,
connect_compute_locks,
connect_to_compute: compute_config,
greetings,
#[cfg(feature = "testing")]
disable_pg_session_jwt: args.disable_pg_session_jwt,
})))

View File

@@ -1,4 +1,3 @@
#[cfg(any(test, feature = "testing"))]
use std::env;
use std::net::SocketAddr;
use std::path::PathBuf;
@@ -21,7 +20,7 @@ use tokio::net::TcpListener;
use tokio::sync::Notify;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use tracing::{debug, error, info, warn};
use utils::sentry_init::init_sentry;
use utils::{project_build_tag, project_git_version};
@@ -730,6 +729,25 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
}
};
let mut greetings = env::var_os("NEON_MOTD").map_or(String::new(), |s| match s.into_string() {
Ok(s) => s,
Err(_) => {
debug!("NEON_MOTD environment variable is not valid UTF-8");
String::new()
}
});
match &args.auth_backend {
AuthBackendType::ControlPlane => {}
#[cfg(any(test, feature = "testing"))]
AuthBackendType::Postgres => {}
#[cfg(any(test, feature = "testing"))]
AuthBackendType::Local => {}
AuthBackendType::ConsoleRedirect => {
greetings = "Connected to database".to_string();
}
}
let config = ProxyConfig {
tls_config,
metric_collection,
@@ -740,6 +758,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?,
connect_compute_locks,
connect_to_compute: compute_config,
greetings,
#[cfg(feature = "testing")]
disable_pg_session_jwt: false,
#[cfg(feature = "rest_broker")]

View File

@@ -39,6 +39,7 @@ pub struct ProxyConfig {
pub wake_compute_retry_config: RetryConfig,
pub connect_compute_locks: ApiLocks<Host>,
pub connect_to_compute: ComputeConfig,
pub greetings: String, // Greeting message sent to the client after connection establishment and contains session_id.
#[cfg(feature = "testing")]
pub disable_pg_session_jwt: bool,
}

View File

@@ -233,7 +233,13 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
let session = cancellation_handler.get_key();
finish_client_init(&pg_settings, *session.key(), &mut stream);
finish_client_init(
ctx,
&pg_settings,
*session.key(),
&mut stream,
&config.greetings,
);
let stream = stream.flush_and_into_inner().await?;
let session_id = ctx.session_id();

View File

@@ -385,10 +385,10 @@ pub enum RedisMsgKind {
#[derive(Default, Clone)]
pub struct LatencyAccumulated {
cplane: time::Duration,
client: time::Duration,
compute: time::Duration,
retry: time::Duration,
pub cplane: time::Duration,
pub client: time::Duration,
pub compute: time::Duration,
pub retry: time::Duration,
}
impl std::fmt::Display for LatencyAccumulated {

View File

@@ -145,7 +145,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
let session = cancellation_handler.get_key();
finish_client_init(&pg_settings, *session.key(), client);
finish_client_init(ctx, &pg_settings, *session.key(), client, &config.greetings);
let session_id = ctx.session_id();
let (cancel_on_shutdown, cancel) = oneshot::channel();
@@ -165,9 +165,11 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
/// Finish client connection initialization: confirm auth success, send params, etc.
pub(crate) fn finish_client_init(
ctx: &RequestContext,
settings: &compute::PostgresSettings,
cancel_key_data: CancelKeyData,
client: &mut PqStream<impl AsyncRead + AsyncWrite + Unpin>,
greetings: &String,
) {
// Forward all deferred notices to the client.
for notice in &settings.delayed_notice {
@@ -176,6 +178,12 @@ pub(crate) fn finish_client_init(
});
}
// Expose session_id to clients if we have a greeting message.
if !greetings.is_empty() {
let session_msg = format!("{}, session_id: {}", greetings, ctx.session_id());
client.write_message(BeMessage::NoticeResponse(session_msg.as_str()));
}
// Forward all postgres connection params to the client.
for (name, value) in &settings.params {
client.write_message(BeMessage::ParameterStatus {
@@ -184,6 +192,36 @@ pub(crate) fn finish_client_init(
});
}
// Forward recorded latencies for probing requests
if let Some(testodrome_id) = ctx.get_testodrome_id() {
client.write_message(BeMessage::ParameterStatus {
name: "neon.testodrome_id".as_bytes(),
value: testodrome_id.as_bytes(),
});
let latency_measured = ctx.get_proxy_latency();
client.write_message(BeMessage::ParameterStatus {
name: "neon.cplane_latency".as_bytes(),
value: latency_measured.cplane.as_micros().to_string().as_bytes(),
});
client.write_message(BeMessage::ParameterStatus {
name: "neon.client_latency".as_bytes(),
value: latency_measured.client.as_micros().to_string().as_bytes(),
});
client.write_message(BeMessage::ParameterStatus {
name: "neon.compute_latency".as_bytes(),
value: latency_measured.compute.as_micros().to_string().as_bytes(),
});
client.write_message(BeMessage::ParameterStatus {
name: "neon.retry_latency".as_bytes(),
value: latency_measured.retry.as_micros().to_string().as_bytes(),
});
}
client.write_message(BeMessage::BackendKeyData(cancel_key_data));
client.write_message(BeMessage::ReadyForQuery);
}