From b9b25e13a06cb1caa42a0e183dfd03d9109d579a Mon Sep 17 00:00:00 2001 From: Ivan Efremov Date: Wed, 16 Apr 2025 22:03:23 +0300 Subject: [PATCH] feat(proxy): Return prefixed errors to testodrome (#11561) Testodrome measures uptime based on the failed requests and errors. In case of testodrome request we send back error based on the service. This will help us distinguish error types in testodrome and rely on the uptime SLI. --- proxy/src/binary/pg_sni_router.rs | 2 +- proxy/src/console_redirect_proxy.rs | 4 +- proxy/src/context/mod.rs | 10 +-- proxy/src/proxy/handshake.rs | 6 +- proxy/src/proxy/mod.rs | 9 +- proxy/src/serverless/mod.rs | 13 +-- proxy/src/serverless/websocket.rs | 1 - proxy/src/stream.rs | 122 ++++++++++++++++++++++------ 8 files changed, 116 insertions(+), 51 deletions(-) diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 1aa290399c..aef5c9383e 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -258,7 +258,7 @@ async fn ssl_handshake( "unexpected startup packet, rejecting connection" ); stream - .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User) + .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User, None) .await? } } diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 1156545f34..0f2c3def0d 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -222,7 +222,7 @@ pub(crate) async fn handle_client( { Ok(auth_result) => auth_result, Err(e) => { - return stream.throw_error(e).await?; + return stream.throw_error(e, Some(ctx)).await?; } }; @@ -238,7 +238,7 @@ pub(crate) async fn handle_client( config.wake_compute_retry_config, &config.connect_to_compute, ) - .or_else(|e| stream.throw_error(e)) + .or_else(|e| stream.throw_error(e, Some(ctx))) .await?; let cancellation_handler_clone = Arc::clone(&cancellation_handler); diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 7c1a6206c1..5f649d2b21 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -63,7 +63,7 @@ struct RequestContextInner { success: bool, pub(crate) cold_start_info: ColdStartInfo, pg_options: Option, - testodrome_query_id: Option, + testodrome_query_id: Option, // extra // This sender is here to keep the request monitoring channel open while requests are taking place. @@ -219,7 +219,7 @@ impl RequestContext { for option in options_str.split_whitespace() { if option.starts_with("neon_query_id:") { if let Some(value) = option.strip_prefix("neon_query_id:") { - this.set_testodrome_id(value.to_string()); + this.set_testodrome_id(value.into()); break; } } @@ -272,7 +272,7 @@ impl RequestContext { .set_user_agent(user_agent); } - pub(crate) fn set_testodrome_id(&self, query_id: String) { + pub(crate) fn set_testodrome_id(&self, query_id: SmolStr) { self.0 .try_lock() .expect("should not deadlock") @@ -378,7 +378,7 @@ impl RequestContext { .accumulated() } - pub(crate) fn get_testodrome_id(&self) -> Option { + pub(crate) fn get_testodrome_id(&self) -> Option { self.0 .try_lock() .expect("should not deadlock") @@ -447,7 +447,7 @@ impl RequestContextInner { self.user = Some(user); } - fn set_testodrome_id(&mut self, query_id: String) { + fn set_testodrome_id(&mut self, query_id: SmolStr) { self.testodrome_query_id = Some(query_id); } diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index 2582e4c069..c05031ad97 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -196,7 +196,11 @@ pub(crate) async fn handshake( // OR we didn't provide it at all (for dev purposes). if tls.is_some() { return stream - .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User) + .throw_error_str( + ERR_INSECURE_CONNECTION, + crate::error::ErrorKind::User, + None, + ) .await?; } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 2e7d332a8b..cf331b8bc0 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -329,7 +329,7 @@ pub(crate) async fn handle_client( let user_info = match result { Ok(user_info) => user_info, - Err(e) => stream.throw_error(e).await?, + Err(e) => stream.throw_error(e, Some(ctx)).await?, }; let user = user_info.get_user().to_owned(); @@ -349,7 +349,10 @@ pub(crate) async fn handle_client( let app = params.get("application_name"); let params_span = tracing::info_span!("", ?user, ?db, ?app); - return stream.throw_error(e).instrument(params_span).await?; + return stream + .throw_error(e, Some(ctx)) + .instrument(params_span) + .await?; } }; @@ -374,7 +377,7 @@ pub(crate) async fn handle_client( config.wake_compute_retry_config, &config.connect_to_compute, ) - .or_else(|e| stream.throw_error(e)) + .or_else(|e| stream.throw_error(e, Some(ctx))) .await?; let cancellation_handler_clone = Arc::clone(&cancellation_handler); diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 9c11f32083..6f24ad3dec 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -434,17 +434,6 @@ async fn request_handler( .map(Into::into), ); - let testodrome_id = request - .headers() - .get("X-Neon-Query-ID") - .and_then(|value| value.to_str().ok()) - .map(|s| s.to_string()); - - if let Some(query_id) = testodrome_id { - info!(parent: &ctx.span(), "testodrome query ID: {query_id}"); - ctx.set_testodrome_id(query_id); - } - let span = ctx.span(); info!(parent: &span, "performing websocket upgrade"); @@ -491,7 +480,7 @@ async fn request_handler( if let Some(query_id) = testodrome_id { info!(parent: &ctx.span(), "testodrome query ID: {query_id}"); - ctx.set_testodrome_id(query_id); + ctx.set_testodrome_id(query_id.into()); } sql_over_http::handle(config, ctx, request, backend, http_cancellation_token) diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 01d37d0eec..7fc48105c5 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -157,7 +157,6 @@ pub(crate) async fn serve_websocket( match res { Err(e) => { - // todo: log and push to ctx the error kind ctx.set_error_kind(e.get_error_kind()); Err(e.into()) } diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index ace27a7284..360550b0ac 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -6,11 +6,13 @@ use bytes::BytesMut; use pq_proto::framed::{ConnectionError, Framed}; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError}; use rustls::ServerConfig; +use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_rustls::server::TlsStream; use tracing::debug; +use crate::control_plane::messages::ColdStartInfo; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::metrics::Metrics; use crate::tls::TlsServerEndPoint; @@ -100,6 +102,44 @@ impl ReportableError for ReportedError { } } +#[derive(Serialize, Deserialize, Debug)] +enum ErrorTag { + #[serde(rename = "proxy")] + Proxy, + #[serde(rename = "compute")] + Compute, + #[serde(rename = "client")] + Client, + #[serde(rename = "controlplane")] + ControlPlane, + #[serde(rename = "other")] + Other, +} + +impl From for ErrorTag { + fn from(error_kind: ErrorKind) -> Self { + match error_kind { + ErrorKind::User => Self::Client, + ErrorKind::ClientDisconnect => Self::Client, + ErrorKind::RateLimit => Self::Proxy, + ErrorKind::ServiceRateLimit => Self::Proxy, // considering rate limit as proxy error for SLI + ErrorKind::Quota => Self::Proxy, + ErrorKind::Service => Self::Proxy, + ErrorKind::ControlPlane => Self::ControlPlane, + ErrorKind::Postgres => Self::Other, + ErrorKind::Compute => Self::Compute, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +struct ProbeErrorData { + tag: ErrorTag, + msg: String, + cold_start_info: Option, +} + impl PqStream { /// Write the message into an internal buffer, but don't flush the underlying stream. pub(crate) fn write_message_noflush( @@ -125,26 +165,54 @@ impl PqStream { Ok(self) } - /// Write the error message using [`Self::write_message`], then re-throw it. + /// Writes message with the given error kind to the stream. + /// Used only for probe queries + async fn write_format_message( + &mut self, + msg: &str, + error_kind: ErrorKind, + ctx: Option<&crate::context::RequestContext>, + ) -> String { + let formatted_msg = match ctx { + Some(ctx) if ctx.get_testodrome_id().is_some() => { + serde_json::to_string(&ProbeErrorData { + tag: ErrorTag::from(error_kind), + msg: msg.to_string(), + cold_start_info: Some(ctx.cold_start_info()), + }) + .unwrap_or_default() + } + _ => msg.to_string(), + }; + + // already error case, ignore client IO error + self.write_message(&BeMessage::ErrorResponse(&formatted_msg, None)) + .await + .inspect_err(|e| debug!("write_message failed: {e}")) + .ok(); + + formatted_msg + } + + /// Write the error message using [`Self::write_format_message`], then re-throw it. /// Allowing string literals is safe under the assumption they might not contain any runtime info. /// This method exists due to `&str` not implementing `Into`. + /// If `ctx` is provided and has testodrome_id set, error messages will be prefixed according to error kind. pub async fn throw_error_str( &mut self, msg: &'static str, error_kind: ErrorKind, + ctx: Option<&crate::context::RequestContext>, ) -> Result { - // TODO: only log this for actually interesting errors - tracing::info!( - kind = error_kind.to_metric_label(), - msg, - "forwarding error to user" - ); + self.write_format_message(msg, error_kind, ctx).await; - // already error case, ignore client IO error - self.write_message(&BeMessage::ErrorResponse(msg, None)) - .await - .inspect_err(|e| debug!("write_message failed: {e}")) - .ok(); + if error_kind != ErrorKind::RateLimit && error_kind != ErrorKind::User { + tracing::info!( + kind = error_kind.to_metric_label(), + msg, + "forwarding error to user" + ); + } Err(ReportedError { source: anyhow::anyhow!(msg), @@ -152,26 +220,28 @@ impl PqStream { }) } - /// Write the error message using [`Self::write_message`], then re-throw it. + /// Write the error message using [`Self::write_format_message`], then re-throw it. /// Trait [`UserFacingError`] acts as an allowlist for error types. - pub(crate) async fn throw_error(&mut self, error: E) -> Result + /// If `ctx` is provided and has testodrome_id set, error messages will be prefixed according to error kind. + pub(crate) async fn throw_error( + &mut self, + error: E, + ctx: Option<&crate::context::RequestContext>, + ) -> Result where E: UserFacingError + Into, { let error_kind = error.get_error_kind(); let msg = error.to_string_client(); - tracing::info!( - kind=error_kind.to_metric_label(), - error=%error, - msg, - "forwarding error to user" - ); - - // already error case, ignore client IO error - self.write_message(&BeMessage::ErrorResponse(&msg, None)) - .await - .inspect_err(|e| debug!("write_message failed: {e}")) - .ok(); + self.write_format_message(&msg, error_kind, ctx).await; + if error_kind != ErrorKind::RateLimit && error_kind != ErrorKind::User { + tracing::info!( + kind=error_kind.to_metric_label(), + error=%error, + msg, + "forwarding error to user", + ); + } Err(ReportedError { source: anyhow::anyhow!(error),