diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 1aa290399c..aef5c9383e 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -258,7 +258,7 @@ async fn ssl_handshake( "unexpected startup packet, rejecting connection" ); stream - .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User) + .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User, None) .await? } } diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 1156545f34..0f2c3def0d 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -222,7 +222,7 @@ pub(crate) async fn handle_client( { Ok(auth_result) => auth_result, Err(e) => { - return stream.throw_error(e).await?; + return stream.throw_error(e, Some(ctx)).await?; } }; @@ -238,7 +238,7 @@ pub(crate) async fn handle_client( config.wake_compute_retry_config, &config.connect_to_compute, ) - .or_else(|e| stream.throw_error(e)) + .or_else(|e| stream.throw_error(e, Some(ctx))) .await?; let cancellation_handler_clone = Arc::clone(&cancellation_handler); diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 7c1a6206c1..5f649d2b21 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -63,7 +63,7 @@ struct RequestContextInner { success: bool, pub(crate) cold_start_info: ColdStartInfo, pg_options: Option, - testodrome_query_id: Option, + testodrome_query_id: Option, // extra // This sender is here to keep the request monitoring channel open while requests are taking place. @@ -219,7 +219,7 @@ impl RequestContext { for option in options_str.split_whitespace() { if option.starts_with("neon_query_id:") { if let Some(value) = option.strip_prefix("neon_query_id:") { - this.set_testodrome_id(value.to_string()); + this.set_testodrome_id(value.into()); break; } } @@ -272,7 +272,7 @@ impl RequestContext { .set_user_agent(user_agent); } - pub(crate) fn set_testodrome_id(&self, query_id: String) { + pub(crate) fn set_testodrome_id(&self, query_id: SmolStr) { self.0 .try_lock() .expect("should not deadlock") @@ -378,7 +378,7 @@ impl RequestContext { .accumulated() } - pub(crate) fn get_testodrome_id(&self) -> Option { + pub(crate) fn get_testodrome_id(&self) -> Option { self.0 .try_lock() .expect("should not deadlock") @@ -447,7 +447,7 @@ impl RequestContextInner { self.user = Some(user); } - fn set_testodrome_id(&mut self, query_id: String) { + fn set_testodrome_id(&mut self, query_id: SmolStr) { self.testodrome_query_id = Some(query_id); } diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index 2582e4c069..c05031ad97 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -196,7 +196,11 @@ pub(crate) async fn handshake( // OR we didn't provide it at all (for dev purposes). if tls.is_some() { return stream - .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User) + .throw_error_str( + ERR_INSECURE_CONNECTION, + crate::error::ErrorKind::User, + None, + ) .await?; } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 2e7d332a8b..cf331b8bc0 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -329,7 +329,7 @@ pub(crate) async fn handle_client( let user_info = match result { Ok(user_info) => user_info, - Err(e) => stream.throw_error(e).await?, + Err(e) => stream.throw_error(e, Some(ctx)).await?, }; let user = user_info.get_user().to_owned(); @@ -349,7 +349,10 @@ pub(crate) async fn handle_client( let app = params.get("application_name"); let params_span = tracing::info_span!("", ?user, ?db, ?app); - return stream.throw_error(e).instrument(params_span).await?; + return stream + .throw_error(e, Some(ctx)) + .instrument(params_span) + .await?; } }; @@ -374,7 +377,7 @@ pub(crate) async fn handle_client( config.wake_compute_retry_config, &config.connect_to_compute, ) - .or_else(|e| stream.throw_error(e)) + .or_else(|e| stream.throw_error(e, Some(ctx))) .await?; let cancellation_handler_clone = Arc::clone(&cancellation_handler); diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 9c11f32083..6f24ad3dec 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -434,17 +434,6 @@ async fn request_handler( .map(Into::into), ); - let testodrome_id = request - .headers() - .get("X-Neon-Query-ID") - .and_then(|value| value.to_str().ok()) - .map(|s| s.to_string()); - - if let Some(query_id) = testodrome_id { - info!(parent: &ctx.span(), "testodrome query ID: {query_id}"); - ctx.set_testodrome_id(query_id); - } - let span = ctx.span(); info!(parent: &span, "performing websocket upgrade"); @@ -491,7 +480,7 @@ async fn request_handler( if let Some(query_id) = testodrome_id { info!(parent: &ctx.span(), "testodrome query ID: {query_id}"); - ctx.set_testodrome_id(query_id); + ctx.set_testodrome_id(query_id.into()); } sql_over_http::handle(config, ctx, request, backend, http_cancellation_token) diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 01d37d0eec..7fc48105c5 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -157,7 +157,6 @@ pub(crate) async fn serve_websocket( match res { Err(e) => { - // todo: log and push to ctx the error kind ctx.set_error_kind(e.get_error_kind()); Err(e.into()) } diff --git a/proxy/src/stream.rs b/proxy/src/stream.rs index ace27a7284..360550b0ac 100644 --- a/proxy/src/stream.rs +++ b/proxy/src/stream.rs @@ -6,11 +6,13 @@ use bytes::BytesMut; use pq_proto::framed::{ConnectionError, Framed}; use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError}; use rustls::ServerConfig; +use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_rustls::server::TlsStream; use tracing::debug; +use crate::control_plane::messages::ColdStartInfo; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::metrics::Metrics; use crate::tls::TlsServerEndPoint; @@ -100,6 +102,44 @@ impl ReportableError for ReportedError { } } +#[derive(Serialize, Deserialize, Debug)] +enum ErrorTag { + #[serde(rename = "proxy")] + Proxy, + #[serde(rename = "compute")] + Compute, + #[serde(rename = "client")] + Client, + #[serde(rename = "controlplane")] + ControlPlane, + #[serde(rename = "other")] + Other, +} + +impl From for ErrorTag { + fn from(error_kind: ErrorKind) -> Self { + match error_kind { + ErrorKind::User => Self::Client, + ErrorKind::ClientDisconnect => Self::Client, + ErrorKind::RateLimit => Self::Proxy, + ErrorKind::ServiceRateLimit => Self::Proxy, // considering rate limit as proxy error for SLI + ErrorKind::Quota => Self::Proxy, + ErrorKind::Service => Self::Proxy, + ErrorKind::ControlPlane => Self::ControlPlane, + ErrorKind::Postgres => Self::Other, + ErrorKind::Compute => Self::Compute, + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +struct ProbeErrorData { + tag: ErrorTag, + msg: String, + cold_start_info: Option, +} + impl PqStream { /// Write the message into an internal buffer, but don't flush the underlying stream. pub(crate) fn write_message_noflush( @@ -125,26 +165,54 @@ impl PqStream { Ok(self) } - /// Write the error message using [`Self::write_message`], then re-throw it. + /// Writes message with the given error kind to the stream. + /// Used only for probe queries + async fn write_format_message( + &mut self, + msg: &str, + error_kind: ErrorKind, + ctx: Option<&crate::context::RequestContext>, + ) -> String { + let formatted_msg = match ctx { + Some(ctx) if ctx.get_testodrome_id().is_some() => { + serde_json::to_string(&ProbeErrorData { + tag: ErrorTag::from(error_kind), + msg: msg.to_string(), + cold_start_info: Some(ctx.cold_start_info()), + }) + .unwrap_or_default() + } + _ => msg.to_string(), + }; + + // already error case, ignore client IO error + self.write_message(&BeMessage::ErrorResponse(&formatted_msg, None)) + .await + .inspect_err(|e| debug!("write_message failed: {e}")) + .ok(); + + formatted_msg + } + + /// Write the error message using [`Self::write_format_message`], then re-throw it. /// Allowing string literals is safe under the assumption they might not contain any runtime info. /// This method exists due to `&str` not implementing `Into`. + /// If `ctx` is provided and has testodrome_id set, error messages will be prefixed according to error kind. pub async fn throw_error_str( &mut self, msg: &'static str, error_kind: ErrorKind, + ctx: Option<&crate::context::RequestContext>, ) -> Result { - // TODO: only log this for actually interesting errors - tracing::info!( - kind = error_kind.to_metric_label(), - msg, - "forwarding error to user" - ); + self.write_format_message(msg, error_kind, ctx).await; - // already error case, ignore client IO error - self.write_message(&BeMessage::ErrorResponse(msg, None)) - .await - .inspect_err(|e| debug!("write_message failed: {e}")) - .ok(); + if error_kind != ErrorKind::RateLimit && error_kind != ErrorKind::User { + tracing::info!( + kind = error_kind.to_metric_label(), + msg, + "forwarding error to user" + ); + } Err(ReportedError { source: anyhow::anyhow!(msg), @@ -152,26 +220,28 @@ impl PqStream { }) } - /// Write the error message using [`Self::write_message`], then re-throw it. + /// Write the error message using [`Self::write_format_message`], then re-throw it. /// Trait [`UserFacingError`] acts as an allowlist for error types. - pub(crate) async fn throw_error(&mut self, error: E) -> Result + /// If `ctx` is provided and has testodrome_id set, error messages will be prefixed according to error kind. + pub(crate) async fn throw_error( + &mut self, + error: E, + ctx: Option<&crate::context::RequestContext>, + ) -> Result where E: UserFacingError + Into, { let error_kind = error.get_error_kind(); let msg = error.to_string_client(); - tracing::info!( - kind=error_kind.to_metric_label(), - error=%error, - msg, - "forwarding error to user" - ); - - // already error case, ignore client IO error - self.write_message(&BeMessage::ErrorResponse(&msg, None)) - .await - .inspect_err(|e| debug!("write_message failed: {e}")) - .ok(); + self.write_format_message(&msg, error_kind, ctx).await; + if error_kind != ErrorKind::RateLimit && error_kind != ErrorKind::User { + tracing::info!( + kind=error_kind.to_metric_label(), + error=%error, + msg, + "forwarding error to user", + ); + } Err(ReportedError { source: anyhow::anyhow!(error),