From f36f0068b83bd536d33c49b238d964dcd96c9479 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 20 Nov 2024 17:50:39 +0000 Subject: [PATCH] chore(proxy): demote more logs during successful connection attempts (#9828) Follow up to #9803 See https://github.com/neondatabase/cloud/issues/14378 In collaboration with @cloneable and @awarus, we sifted through logs and simply demoted some logs to debug. This is not at all finished and there are more logs to review, but we ran out of time in the session we organised. In any slightly more nuanced cases, we didn't touch the log, instead leaving a TODO comment. I've also slightly refactored the sql-over-http body read/length reject code. I can split that into a separate PR. It just felt natural after I switched to `read_body_with_limit` as we discussed during the meet. --- libs/pq_proto/src/lib.rs | 1 + proxy/src/bin/local_proxy.rs | 2 +- proxy/src/bin/proxy.rs | 2 +- proxy/src/config.rs | 2 +- proxy/src/control_plane/client/neon.rs | 1 + proxy/src/http/mod.rs | 10 +-- proxy/src/proxy/connect_compute.rs | 15 +++-- proxy/src/proxy/copy_bidirectional.rs | 2 + proxy/src/proxy/handshake.rs | 10 ++- proxy/src/proxy/mod.rs | 2 +- proxy/src/proxy/passthrough.rs | 4 +- proxy/src/proxy/wake_compute.rs | 7 +- proxy/src/rate_limiter/limit_algorithm.rs | 6 +- .../src/rate_limiter/limit_algorithm/aimd.rs | 24 ++++--- proxy/src/redis/cancellation_publisher.rs | 3 +- proxy/src/serverless/backend.rs | 8 +-- proxy/src/serverless/conn_pool.rs | 2 +- proxy/src/serverless/conn_pool_lib.rs | 4 +- proxy/src/serverless/http_conn_pool.rs | 2 +- proxy/src/serverless/local_conn_pool.rs | 4 +- proxy/src/serverless/sql_over_http.rs | 66 +++++++++++-------- 21 files changed, 104 insertions(+), 73 deletions(-) diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 9ffaaba584..b9e5387d86 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -184,6 +184,7 @@ pub struct CancelKeyData { impl fmt::Display for CancelKeyData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: this is producing strange results, with 0xffffffff........ always in the logs. let hi = (self.backend_pid as u64) << 32; let lo = self.cancel_key as u64; let id = hi | lo; diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index c4ec1300f2..968682cf0f 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -111,7 +111,7 @@ struct SqlOverHttpArgs { sql_over_http_cancel_set_shards: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: u64, + sql_over_http_max_request_size_bytes: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB sql_over_http_max_response_size_bytes: usize, diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 232721338d..45fbe4a398 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -276,7 +276,7 @@ struct SqlOverHttpArgs { sql_over_http_cancel_set_shards: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: u64, + sql_over_http_max_request_size_bytes: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB sql_over_http_max_response_size_bytes: usize, diff --git a/proxy/src/config.rs b/proxy/src/config.rs index b048c9d389..8bc8e3f96f 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -64,7 +64,7 @@ pub struct HttpConfig { pub pool_options: GlobalConnPoolOptions, pub cancel_set: CancelSet, pub client_conn_threshold: u64, - pub max_request_size_bytes: u64, + pub max_request_size_bytes: usize, pub max_response_size_bytes: usize, } diff --git a/proxy/src/control_plane/client/neon.rs b/proxy/src/control_plane/client/neon.rs index 53f9234926..757ea6720a 100644 --- a/proxy/src/control_plane/client/neon.rs +++ b/proxy/src/control_plane/client/neon.rs @@ -380,6 +380,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { // after getting back a permit - it's possible the cache was filled // double check if permit.should_check_cache() { + // TODO: if there is something in the cache, mark the permit as success. check_cache!(); } diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index b1642cedb3..ed88c77256 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -122,18 +122,18 @@ impl Endpoint { } #[derive(Error, Debug)] -pub(crate) enum ReadBodyError { +pub(crate) enum ReadBodyError { #[error("Content length exceeds limit of {limit} bytes")] BodyTooLarge { limit: usize }, #[error(transparent)] - Read(#[from] reqwest::Error), + Read(#[from] E), } -pub(crate) async fn read_body_with_limit( - mut b: impl Body + Unpin, +pub(crate) async fn read_body_with_limit( + mut b: impl Body + Unpin, limit: usize, -) -> Result, ReadBodyError> { +) -> Result, ReadBodyError> { // We could use `b.limited().collect().await.to_bytes()` here // but this ends up being slightly more efficient as far as I can tell. diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index b30aec09c1..2e759b0894 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -117,7 +117,6 @@ where node_info.set_keys(user_info.get_keys()); node_info.allow_self_signed_compute = allow_self_signed_compute; mechanism.update_connect_config(&mut node_info.config); - let retry_type = RetryType::ConnectToCompute; // try once let err = match mechanism @@ -129,7 +128,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); @@ -147,7 +146,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); @@ -156,8 +155,9 @@ where node_info } else { // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node - info!("compute node's state has likely changed; requesting a wake-up"); + debug!("compute node's state has likely changed; requesting a wake-up"); let old_node_info = invalidate_cache(node_info); + // TODO: increment num_retries? let mut node_info = wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; node_info.reuse_settings(old_node_info); @@ -169,7 +169,7 @@ where // now that we have a new node, try connect to it repeatedly. // this can error for a few reasons, for instance: // * DNS connection settings haven't quite propagated yet - info!("wake_compute success. attempting to connect"); + debug!("wake_compute success. attempting to connect"); num_retries = 1; loop { match mechanism @@ -181,10 +181,11 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); + // TODO: is this necessary? We have a metric. info!(?num_retries, "connected to compute node after"); return Ok(res); } @@ -194,7 +195,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 91a3ceff75..4e4af88634 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -87,6 +87,8 @@ where transfer_one_direction(cx, &mut compute_to_client, compute, client) .map_err(ErrorSource::from_compute)?; + // TODO: 1 info log, with a enum label for close direction. + // Early termination checks from compute to client. if let TransferState::Done(_) = compute_to_client { if let TransferState::Running(buf) = &client_to_compute { diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index 3ada3a9995..e27c211932 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -5,7 +5,7 @@ use pq_proto::{ }; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::auth::endpoint_sni; use crate::config::{TlsConfig, PG_ALPN_PROTOCOL}; @@ -199,6 +199,8 @@ pub(crate) async fn handshake( .await?; } + // This log highlights the start of the connection. + // This contains useful information for debugging, not logged elsewhere, like role name and endpoint id. info!( ?version, ?params, @@ -211,7 +213,7 @@ pub(crate) async fn handshake( FeStartupPacket::StartupMessage { params, version } if version.major() == 3 && version > PG_PROTOCOL_LATEST => { - warn!(?version, "unsupported minor version"); + debug!(?version, "unsupported minor version"); // no protocol extensions are supported. // @@ -233,14 +235,16 @@ pub(crate) async fn handshake( info!( ?version, + ?params, session_type = "normal", "successful handshake; unsupported minor version requested" ); break Ok(HandshakeData::Startup(stream, params)); } - FeStartupPacket::StartupMessage { version, .. } => { + FeStartupPacket::StartupMessage { version, params } => { warn!( ?version, + ?params, session_type = "normal", "unsuccessful handshake; unsupported version" ); diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4be4006d15..9415b54a4a 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -254,7 +254,7 @@ pub(crate) async fn handle_client( endpoint_rate_limiter: Arc, conn_gauge: NumClientConnectionsGuard<'static>, ) -> Result>, ClientRequestError> { - info!( + debug!( protocol = %ctx.protocol(), "handling interactive connection from client" ); diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index e3b4730982..5e07c8eeae 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,5 +1,5 @@ use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::info; +use tracing::debug; use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; @@ -45,7 +45,7 @@ pub(crate) async fn proxy_pass( ); // Starting from here we only proxy the client's traffic. - info!("performing the proxy pass..."); + debug!("performing the proxy pass..."); let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute( &mut client, &mut compute, diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index d09e0b1f41..8a672d48dc 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -17,7 +17,6 @@ pub(crate) async fn wake_compute( api: &B, config: RetryConfig, ) -> Result { - let retry_type = RetryType::WakeCompute; loop { match api.wake_compute(ctx).await { Err(e) if !should_retry(&e, *num_retries, config) => { @@ -26,7 +25,7 @@ pub(crate) async fn wake_compute( Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::WakeCompute, }, (*num_retries).into(), ); @@ -40,10 +39,12 @@ pub(crate) async fn wake_compute( Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::WakeCompute, }, (*num_retries).into(), ); + // TODO: is this necessary? We have a metric. + // TODO: this log line is misleading as "wake_compute" might return cached (and stale) info. info!(?num_retries, "compute node woken up after"); return Ok(n); } diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 16c398f303..b74a9ab17e 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -195,7 +195,11 @@ impl DynamicLimiter { /// /// Set the outcome to `None` to ignore the job. fn release_inner(&self, start: Instant, outcome: Option) { - tracing::info!("outcome is {:?}", outcome); + if outcome.is_none() { + tracing::warn!("outcome is {:?}", outcome); + } else { + tracing::debug!("outcome is {:?}", outcome); + } if self.config.initial_limit == 0 { return; } diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs index 5332a5184f..3000cc4c2a 100644 --- a/proxy/src/rate_limiter/limit_algorithm/aimd.rs +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -31,26 +31,32 @@ impl LimitAlgorithm for Aimd { if utilisation > self.utilisation { let limit = old_limit + self.inc; - let increased_limit = limit.clamp(self.min, self.max); - if increased_limit > old_limit { - tracing::info!(increased_limit, "limit increased"); + let new_limit = limit.clamp(self.min, self.max); + if new_limit > old_limit { + tracing::info!(old_limit, new_limit, "limit increased"); + } else { + tracing::debug!(old_limit, new_limit, "limit clamped at max"); } - increased_limit + new_limit } else { old_limit } } Outcome::Overload => { - let limit = old_limit as f32 * self.dec; + let new_limit = old_limit as f32 * self.dec; // Floor instead of round, so the limit reduces even with small numbers. // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1 - let limit = limit.floor() as usize; + let new_limit = new_limit.floor() as usize; - let limit = limit.clamp(self.min, self.max); - tracing::info!(limit, "limit decreased"); - limit + let new_limit = new_limit.clamp(self.min, self.max); + if new_limit < old_limit { + tracing::info!(old_limit, new_limit, "limit decreased"); + } else { + tracing::debug!(old_limit, new_limit, "limit clamped at min"); + } + new_limit } } } diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 0000246971..7392b0d316 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -121,6 +121,7 @@ impl RedisPublisherClient { cancel_key_data: CancelKeyData, session_id: Uuid, ) -> anyhow::Result<()> { + // TODO: review redundant error duplication logs. if !self.limiter.check() { tracing::info!("Rate limit exceeded. Skipping cancellation message"); return Err(anyhow::anyhow!("Rate limit exceeded")); @@ -146,7 +147,7 @@ impl CancellationPublisherMut for RedisPublisherClient { tracing::info!("publishing cancellation key to Redis"); match self.try_publish_internal(cancel_key_data, session_id).await { Ok(()) => { - tracing::info!("cancellation key successfuly published to Redis"); + tracing::debug!("cancellation key successfuly published to Redis"); Ok(()) } Err(e) => { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index d9dcf6fbb7..7df978f84c 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -167,10 +167,10 @@ impl PoolingBackend { force_new: bool, ) -> Result, HttpConnError> { let maybe_client = if force_new { - info!("pool: pool is disabled"); + debug!("pool: pool is disabled"); None } else { - info!("pool: looking for an existing connection"); + debug!("pool: looking for an existing connection"); self.pool.get(ctx, &conn_info)? }; @@ -204,14 +204,14 @@ impl PoolingBackend { ctx: &RequestContext, conn_info: ConnInfo, ) -> Result, HttpConnError> { - info!("pool: looking for an existing connection"); + debug!("pool: looking for an existing connection"); if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) { return Ok(client); } let conn_id = uuid::Uuid::new_v4(); tracing::Span::current().record("conn_id", display(conn_id)); - info!(%conn_id, "pool: opening a new connection '{conn_info}'"); + debug!(%conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials { info: ComputeUserInfo { user: conn_info.user_info.user.clone(), diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 07ba1ae9af..f716326a68 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -243,7 +243,7 @@ mod tests { }, cancel_set: CancelSet::new(0), client_conn_threshold: u64::MAX, - max_request_size_bytes: u64::MAX, + max_request_size_bytes: usize::MAX, max_response_size_bytes: usize::MAX, })); let pool = GlobalConnPool::new(config); diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index fe3c422c3b..c5db025870 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -232,7 +232,7 @@ impl EndpointConnPool { // do logging outside of the mutex if returned { - info!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); + debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); } else { info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); } @@ -409,7 +409,7 @@ impl GlobalConnPool { "pid", tracing::field::display(client.inner.get_process_id()), ); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index bc86c4b1cd..e9455420c0 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -227,7 +227,7 @@ impl GlobalConnPool { }; tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index cadcbd7530..310af08221 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -29,7 +29,7 @@ use tokio_postgres::tls::NoTlsStream; use tokio_postgres::types::ToSql; use tokio_postgres::{AsyncMessage, Socket}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, warn, Instrument}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -110,7 +110,7 @@ impl LocalConnPool { "pid", tracing::field::display(client.inner.get_process_id()), ); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "local_pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 36d8595902..ab75086884 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -8,7 +8,7 @@ use http::header::AUTHORIZATION; use http::Method; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; -use hyper::body::{Body, Incoming}; +use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; use hyper::{header, HeaderMap, Request, Response, StatusCode}; use pq_proto::StartupMessageParamsBuilder; @@ -18,7 +18,7 @@ use tokio::time; use tokio_postgres::error::{DbError, ErrorPosition, SqlState}; use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{debug, error, info}; use typed_json::json; use url::Url; use urlencoding; @@ -36,6 +36,7 @@ use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; +use crate::http::{read_body_with_limit, ReadBodyError}; use crate::metrics::{HttpDirection, Metrics}; use crate::proxy::{run_until_cancelled, NeonOptions}; use crate::serverless::backend::HttpConnError; @@ -357,8 +358,6 @@ pub(crate) enum SqlOverHttpError { ConnectCompute(#[from] HttpConnError), #[error("{0}")] ConnInfo(#[from] ConnInfoError), - #[error("request is too large (max is {0} bytes)")] - RequestTooLarge(u64), #[error("response is too large (max is {0} bytes)")] ResponseTooLarge(usize), #[error("invalid isolation level")] @@ -377,7 +376,6 @@ impl ReportableError for SqlOverHttpError { SqlOverHttpError::ReadPayload(e) => e.get_error_kind(), SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(), SqlOverHttpError::ConnInfo(e) => e.get_error_kind(), - SqlOverHttpError::RequestTooLarge(_) => ErrorKind::User, SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User, SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User, SqlOverHttpError::Postgres(p) => p.get_error_kind(), @@ -393,7 +391,6 @@ impl UserFacingError for SqlOverHttpError { SqlOverHttpError::ReadPayload(p) => p.to_string(), SqlOverHttpError::ConnectCompute(c) => c.to_string_client(), SqlOverHttpError::ConnInfo(c) => c.to_string_client(), - SqlOverHttpError::RequestTooLarge(_) => self.to_string(), SqlOverHttpError::ResponseTooLarge(_) => self.to_string(), SqlOverHttpError::InvalidIsolationLevel => self.to_string(), SqlOverHttpError::Postgres(p) => p.to_string(), @@ -406,13 +403,12 @@ impl UserFacingError for SqlOverHttpError { impl HttpCodeError for SqlOverHttpError { fn get_http_status_code(&self) -> StatusCode { match self { - SqlOverHttpError::ReadPayload(_) => StatusCode::BAD_REQUEST, + SqlOverHttpError::ReadPayload(e) => e.get_http_status_code(), SqlOverHttpError::ConnectCompute(h) => match h.get_error_kind() { ErrorKind::User => StatusCode::BAD_REQUEST, _ => StatusCode::INTERNAL_SERVER_ERROR, }, SqlOverHttpError::ConnInfo(_) => StatusCode::BAD_REQUEST, - SqlOverHttpError::RequestTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE, SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST, @@ -426,19 +422,41 @@ impl HttpCodeError for SqlOverHttpError { pub(crate) enum ReadPayloadError { #[error("could not read the HTTP request body: {0}")] Read(#[from] hyper::Error), + #[error("request is too large (max is {limit} bytes)")] + BodyTooLarge { limit: usize }, #[error("could not parse the HTTP request body: {0}")] Parse(#[from] serde_json::Error), } +impl From> for ReadPayloadError { + fn from(value: ReadBodyError) -> Self { + match value { + ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit }, + ReadBodyError::Read(e) => Self::Read(e), + } + } +} + impl ReportableError for ReadPayloadError { fn get_error_kind(&self) -> ErrorKind { match self { ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect, + ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User, ReadPayloadError::Parse(_) => ErrorKind::User, } } } +impl HttpCodeError for ReadPayloadError { + fn get_http_status_code(&self) -> StatusCode { + match self { + ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST, + ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE, + ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST, + } + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum SqlOverHttpCancel { #[error("query was cancelled")] @@ -580,28 +598,20 @@ async fn handle_db_inner( let parsed_headers = HttpHeaders::try_parse(headers)?; - let request_content_length = match request.body().size_hint().upper() { - Some(v) => v, - None => config.http_config.max_request_size_bytes + 1, - }; - info!(request_content_length, "request size in bytes"); - Metrics::get() - .proxy - .http_conn_content_length_bytes - .observe(HttpDirection::Request, request_content_length as f64); - - // we don't have a streaming request support yet so this is to prevent OOM - // from a malicious user sending an extremely large request body - if request_content_length > config.http_config.max_request_size_bytes { - return Err(SqlOverHttpError::RequestTooLarge( - config.http_config.max_request_size_bytes, - )); - } - let fetch_and_process_request = Box::pin( async { - let body = request.into_body().collect().await?.to_bytes(); - info!(length = body.len(), "request payload read"); + let body = read_body_with_limit( + request.into_body(), + config.http_config.max_request_size_bytes, + ) + .await?; + + Metrics::get() + .proxy + .http_conn_content_length_bytes + .observe(HttpDirection::Request, body.len() as f64); + + debug!(length = body.len(), "request payload read"); let payload: Payload = serde_json::from_slice(&body)?; Ok::(payload) // Adjust error type accordingly }