feat(proxy): Return prefixed errors to testodrome (#11561)

Testodrome measures uptime based on the failed requests and errors. In
case of testodrome request we send back error based on the service. This
will help us distinguish error types in testodrome and rely on the
uptime SLI.
This commit is contained in:
Ivan Efremov
2025-04-16 22:03:23 +03:00
committed by GitHub
parent cf2e695f49
commit b9b25e13a0
8 changed files with 116 additions and 51 deletions

View File

@@ -258,7 +258,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
"unexpected startup packet, rejecting connection"
);
stream
.throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User)
.throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User, None)
.await?
}
}

View File

@@ -222,7 +222,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
{
Ok(auth_result) => auth_result,
Err(e) => {
return stream.throw_error(e).await?;
return stream.throw_error(e, Some(ctx)).await?;
}
};
@@ -238,7 +238,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e))
.or_else(|e| stream.throw_error(e, Some(ctx)))
.await?;
let cancellation_handler_clone = Arc::clone(&cancellation_handler);

View File

@@ -63,7 +63,7 @@ struct RequestContextInner {
success: bool,
pub(crate) cold_start_info: ColdStartInfo,
pg_options: Option<StartupMessageParams>,
testodrome_query_id: Option<String>,
testodrome_query_id: Option<SmolStr>,
// extra
// This sender is here to keep the request monitoring channel open while requests are taking place.
@@ -219,7 +219,7 @@ impl RequestContext {
for option in options_str.split_whitespace() {
if option.starts_with("neon_query_id:") {
if let Some(value) = option.strip_prefix("neon_query_id:") {
this.set_testodrome_id(value.to_string());
this.set_testodrome_id(value.into());
break;
}
}
@@ -272,7 +272,7 @@ impl RequestContext {
.set_user_agent(user_agent);
}
pub(crate) fn set_testodrome_id(&self, query_id: String) {
pub(crate) fn set_testodrome_id(&self, query_id: SmolStr) {
self.0
.try_lock()
.expect("should not deadlock")
@@ -378,7 +378,7 @@ impl RequestContext {
.accumulated()
}
pub(crate) fn get_testodrome_id(&self) -> Option<String> {
pub(crate) fn get_testodrome_id(&self) -> Option<SmolStr> {
self.0
.try_lock()
.expect("should not deadlock")
@@ -447,7 +447,7 @@ impl RequestContextInner {
self.user = Some(user);
}
fn set_testodrome_id(&mut self, query_id: String) {
fn set_testodrome_id(&mut self, query_id: SmolStr) {
self.testodrome_query_id = Some(query_id);
}

View File

@@ -196,7 +196,11 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
// OR we didn't provide it at all (for dev purposes).
if tls.is_some() {
return stream
.throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User)
.throw_error_str(
ERR_INSECURE_CONNECTION,
crate::error::ErrorKind::User,
None,
)
.await?;
}

View File

@@ -329,7 +329,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let user_info = match result {
Ok(user_info) => user_info,
Err(e) => stream.throw_error(e).await?,
Err(e) => stream.throw_error(e, Some(ctx)).await?,
};
let user = user_info.get_user().to_owned();
@@ -349,7 +349,10 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
let app = params.get("application_name");
let params_span = tracing::info_span!("", ?user, ?db, ?app);
return stream.throw_error(e).instrument(params_span).await?;
return stream
.throw_error(e, Some(ctx))
.instrument(params_span)
.await?;
}
};
@@ -374,7 +377,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
config.wake_compute_retry_config,
&config.connect_to_compute,
)
.or_else(|e| stream.throw_error(e))
.or_else(|e| stream.throw_error(e, Some(ctx)))
.await?;
let cancellation_handler_clone = Arc::clone(&cancellation_handler);

View File

@@ -434,17 +434,6 @@ async fn request_handler(
.map(Into::into),
);
let testodrome_id = request
.headers()
.get("X-Neon-Query-ID")
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string());
if let Some(query_id) = testodrome_id {
info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
ctx.set_testodrome_id(query_id);
}
let span = ctx.span();
info!(parent: &span, "performing websocket upgrade");
@@ -491,7 +480,7 @@ async fn request_handler(
if let Some(query_id) = testodrome_id {
info!(parent: &ctx.span(), "testodrome query ID: {query_id}");
ctx.set_testodrome_id(query_id);
ctx.set_testodrome_id(query_id.into());
}
sql_over_http::handle(config, ctx, request, backend, http_cancellation_token)

View File

@@ -157,7 +157,6 @@ pub(crate) async fn serve_websocket(
match res {
Err(e) => {
// todo: log and push to ctx the error kind
ctx.set_error_kind(e.get_error_kind());
Err(e.into())
}

View File

@@ -6,11 +6,13 @@ use bytes::BytesMut;
use pq_proto::framed::{ConnectionError, Framed};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};
use rustls::ServerConfig;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::debug;
use crate::control_plane::messages::ColdStartInfo;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::metrics::Metrics;
use crate::tls::TlsServerEndPoint;
@@ -100,6 +102,44 @@ impl ReportableError for ReportedError {
}
}
#[derive(Serialize, Deserialize, Debug)]
enum ErrorTag {
#[serde(rename = "proxy")]
Proxy,
#[serde(rename = "compute")]
Compute,
#[serde(rename = "client")]
Client,
#[serde(rename = "controlplane")]
ControlPlane,
#[serde(rename = "other")]
Other,
}
impl From<ErrorKind> for ErrorTag {
fn from(error_kind: ErrorKind) -> Self {
match error_kind {
ErrorKind::User => Self::Client,
ErrorKind::ClientDisconnect => Self::Client,
ErrorKind::RateLimit => Self::Proxy,
ErrorKind::ServiceRateLimit => Self::Proxy, // considering rate limit as proxy error for SLI
ErrorKind::Quota => Self::Proxy,
ErrorKind::Service => Self::Proxy,
ErrorKind::ControlPlane => Self::ControlPlane,
ErrorKind::Postgres => Self::Other,
ErrorKind::Compute => Self::Compute,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "snake_case")]
struct ProbeErrorData {
tag: ErrorTag,
msg: String,
cold_start_info: Option<ColdStartInfo>,
}
impl<S: AsyncWrite + Unpin> PqStream<S> {
/// Write the message into an internal buffer, but don't flush the underlying stream.
pub(crate) fn write_message_noflush(
@@ -125,26 +165,54 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
Ok(self)
}
/// Write the error message using [`Self::write_message`], then re-throw it.
/// Writes message with the given error kind to the stream.
/// Used only for probe queries
async fn write_format_message(
&mut self,
msg: &str,
error_kind: ErrorKind,
ctx: Option<&crate::context::RequestContext>,
) -> String {
let formatted_msg = match ctx {
Some(ctx) if ctx.get_testodrome_id().is_some() => {
serde_json::to_string(&ProbeErrorData {
tag: ErrorTag::from(error_kind),
msg: msg.to_string(),
cold_start_info: Some(ctx.cold_start_info()),
})
.unwrap_or_default()
}
_ => msg.to_string(),
};
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(&formatted_msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
formatted_msg
}
/// Write the error message using [`Self::write_format_message`], then re-throw it.
/// Allowing string literals is safe under the assumption they might not contain any runtime info.
/// This method exists due to `&str` not implementing `Into<anyhow::Error>`.
/// If `ctx` is provided and has testodrome_id set, error messages will be prefixed according to error kind.
pub async fn throw_error_str<T>(
&mut self,
msg: &'static str,
error_kind: ErrorKind,
ctx: Option<&crate::context::RequestContext>,
) -> Result<T, ReportedError> {
// TODO: only log this for actually interesting errors
tracing::info!(
kind = error_kind.to_metric_label(),
msg,
"forwarding error to user"
);
self.write_format_message(msg, error_kind, ctx).await;
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
if error_kind != ErrorKind::RateLimit && error_kind != ErrorKind::User {
tracing::info!(
kind = error_kind.to_metric_label(),
msg,
"forwarding error to user"
);
}
Err(ReportedError {
source: anyhow::anyhow!(msg),
@@ -152,26 +220,28 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
})
}
/// Write the error message using [`Self::write_message`], then re-throw it.
/// Write the error message using [`Self::write_format_message`], then re-throw it.
/// Trait [`UserFacingError`] acts as an allowlist for error types.
pub(crate) async fn throw_error<T, E>(&mut self, error: E) -> Result<T, ReportedError>
/// If `ctx` is provided and has testodrome_id set, error messages will be prefixed according to error kind.
pub(crate) async fn throw_error<T, E>(
&mut self,
error: E,
ctx: Option<&crate::context::RequestContext>,
) -> Result<T, ReportedError>
where
E: UserFacingError + Into<anyhow::Error>,
{
let error_kind = error.get_error_kind();
let msg = error.to_string_client();
tracing::info!(
kind=error_kind.to_metric_label(),
error=%error,
msg,
"forwarding error to user"
);
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(&msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
self.write_format_message(&msg, error_kind, ctx).await;
if error_kind != ErrorKind::RateLimit && error_kind != ErrorKind::User {
tracing::info!(
kind=error_kind.to_metric_label(),
error=%error,
msg,
"forwarding error to user",
);
}
Err(ReportedError {
source: anyhow::anyhow!(error),