mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
chore(proxy): demote more logs during successful connection attempts (#9828)
Follow up to #9803 See https://github.com/neondatabase/cloud/issues/14378 In collaboration with @cloneable and @awarus, we sifted through logs and simply demoted some logs to debug. This is not at all finished and there are more logs to review, but we ran out of time in the session we organised. In any slightly more nuanced cases, we didn't touch the log, instead leaving a TODO comment. I've also slightly refactored the sql-over-http body read/length reject code. I can split that into a separate PR. It just felt natural after I switched to `read_body_with_limit` as we discussed during the meet.
This commit is contained in:
@@ -184,6 +184,7 @@ pub struct CancelKeyData {
|
||||
|
||||
impl fmt::Display for CancelKeyData {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
// TODO: this is producing strange results, with 0xffffffff........ always in the logs.
|
||||
let hi = (self.backend_pid as u64) << 32;
|
||||
let lo = self.cancel_key as u64;
|
||||
let id = hi | lo;
|
||||
|
||||
@@ -111,7 +111,7 @@ struct SqlOverHttpArgs {
|
||||
sql_over_http_cancel_set_shards: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_request_size_bytes: u64,
|
||||
sql_over_http_max_request_size_bytes: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_response_size_bytes: usize,
|
||||
|
||||
@@ -276,7 +276,7 @@ struct SqlOverHttpArgs {
|
||||
sql_over_http_cancel_set_shards: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_request_size_bytes: u64,
|
||||
sql_over_http_max_request_size_bytes: usize,
|
||||
|
||||
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
|
||||
sql_over_http_max_response_size_bytes: usize,
|
||||
|
||||
@@ -64,7 +64,7 @@ pub struct HttpConfig {
|
||||
pub pool_options: GlobalConnPoolOptions,
|
||||
pub cancel_set: CancelSet,
|
||||
pub client_conn_threshold: u64,
|
||||
pub max_request_size_bytes: u64,
|
||||
pub max_request_size_bytes: usize,
|
||||
pub max_response_size_bytes: usize,
|
||||
}
|
||||
|
||||
|
||||
@@ -380,6 +380,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
|
||||
// after getting back a permit - it's possible the cache was filled
|
||||
// double check
|
||||
if permit.should_check_cache() {
|
||||
// TODO: if there is something in the cache, mark the permit as success.
|
||||
check_cache!();
|
||||
}
|
||||
|
||||
|
||||
@@ -122,18 +122,18 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub(crate) enum ReadBodyError {
|
||||
pub(crate) enum ReadBodyError<E> {
|
||||
#[error("Content length exceeds limit of {limit} bytes")]
|
||||
BodyTooLarge { limit: usize },
|
||||
|
||||
#[error(transparent)]
|
||||
Read(#[from] reqwest::Error),
|
||||
Read(#[from] E),
|
||||
}
|
||||
|
||||
pub(crate) async fn read_body_with_limit(
|
||||
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
|
||||
pub(crate) async fn read_body_with_limit<E>(
|
||||
mut b: impl Body<Data = Bytes, Error = E> + Unpin,
|
||||
limit: usize,
|
||||
) -> Result<Vec<u8>, ReadBodyError> {
|
||||
) -> Result<Vec<u8>, ReadBodyError<E>> {
|
||||
// We could use `b.limited().collect().await.to_bytes()` here
|
||||
// but this ends up being slightly more efficient as far as I can tell.
|
||||
|
||||
|
||||
@@ -117,7 +117,6 @@ where
|
||||
node_info.set_keys(user_info.get_keys());
|
||||
node_info.allow_self_signed_compute = allow_self_signed_compute;
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
let retry_type = RetryType::ConnectToCompute;
|
||||
|
||||
// try once
|
||||
let err = match mechanism
|
||||
@@ -129,7 +128,7 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Success,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
@@ -147,7 +146,7 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
@@ -156,8 +155,9 @@ where
|
||||
node_info
|
||||
} else {
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
debug!("compute node's state has likely changed; requesting a wake-up");
|
||||
let old_node_info = invalidate_cache(node_info);
|
||||
// TODO: increment num_retries?
|
||||
let mut node_info =
|
||||
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
|
||||
node_info.reuse_settings(old_node_info);
|
||||
@@ -169,7 +169,7 @@ where
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
debug!("wake_compute success. attempting to connect");
|
||||
num_retries = 1;
|
||||
loop {
|
||||
match mechanism
|
||||
@@ -181,10 +181,11 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Success,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
// TODO: is this necessary? We have a metric.
|
||||
info!(?num_retries, "connected to compute node after");
|
||||
return Ok(res);
|
||||
}
|
||||
@@ -194,7 +195,7 @@ where
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
retry_type: RetryType::ConnectToCompute,
|
||||
},
|
||||
num_retries.into(),
|
||||
);
|
||||
|
||||
@@ -87,6 +87,8 @@ where
|
||||
transfer_one_direction(cx, &mut compute_to_client, compute, client)
|
||||
.map_err(ErrorSource::from_compute)?;
|
||||
|
||||
// TODO: 1 info log, with a enum label for close direction.
|
||||
|
||||
// Early termination checks from compute to client.
|
||||
if let TransferState::Done(_) = compute_to_client {
|
||||
if let TransferState::Running(buf) = &client_to_compute {
|
||||
|
||||
@@ -5,7 +5,7 @@ use pq_proto::{
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::auth::endpoint_sni;
|
||||
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
|
||||
@@ -199,6 +199,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
.await?;
|
||||
}
|
||||
|
||||
// This log highlights the start of the connection.
|
||||
// This contains useful information for debugging, not logged elsewhere, like role name and endpoint id.
|
||||
info!(
|
||||
?version,
|
||||
?params,
|
||||
@@ -211,7 +213,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
FeStartupPacket::StartupMessage { params, version }
|
||||
if version.major() == 3 && version > PG_PROTOCOL_LATEST =>
|
||||
{
|
||||
warn!(?version, "unsupported minor version");
|
||||
debug!(?version, "unsupported minor version");
|
||||
|
||||
// no protocol extensions are supported.
|
||||
// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/backend/tcop/backend_startup.c#L744-L753>
|
||||
@@ -233,14 +235,16 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
|
||||
info!(
|
||||
?version,
|
||||
?params,
|
||||
session_type = "normal",
|
||||
"successful handshake; unsupported minor version requested"
|
||||
);
|
||||
break Ok(HandshakeData::Startup(stream, params));
|
||||
}
|
||||
FeStartupPacket::StartupMessage { version, .. } => {
|
||||
FeStartupPacket::StartupMessage { version, params } => {
|
||||
warn!(
|
||||
?version,
|
||||
?params,
|
||||
session_type = "normal",
|
||||
"unsuccessful handshake; unsupported version"
|
||||
);
|
||||
|
||||
@@ -254,7 +254,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
conn_gauge: NumClientConnectionsGuard<'static>,
|
||||
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
|
||||
info!(
|
||||
debug!(
|
||||
protocol = %ctx.protocol(),
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::info;
|
||||
use tracing::debug;
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
use super::copy_bidirectional::ErrorSource;
|
||||
@@ -45,7 +45,7 @@ pub(crate) async fn proxy_pass(
|
||||
);
|
||||
|
||||
// Starting from here we only proxy the client's traffic.
|
||||
info!("performing the proxy pass...");
|
||||
debug!("performing the proxy pass...");
|
||||
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
|
||||
&mut client,
|
||||
&mut compute,
|
||||
|
||||
@@ -17,7 +17,6 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
api: &B,
|
||||
config: RetryConfig,
|
||||
) -> Result<CachedNodeInfo, WakeComputeError> {
|
||||
let retry_type = RetryType::WakeCompute;
|
||||
loop {
|
||||
match api.wake_compute(ctx).await {
|
||||
Err(e) if !should_retry(&e, *num_retries, config) => {
|
||||
@@ -26,7 +25,7 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Failed,
|
||||
retry_type,
|
||||
retry_type: RetryType::WakeCompute,
|
||||
},
|
||||
(*num_retries).into(),
|
||||
);
|
||||
@@ -40,10 +39,12 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
|
||||
Metrics::get().proxy.retries_metric.observe(
|
||||
RetriesMetricGroup {
|
||||
outcome: ConnectOutcome::Success,
|
||||
retry_type,
|
||||
retry_type: RetryType::WakeCompute,
|
||||
},
|
||||
(*num_retries).into(),
|
||||
);
|
||||
// TODO: is this necessary? We have a metric.
|
||||
// TODO: this log line is misleading as "wake_compute" might return cached (and stale) info.
|
||||
info!(?num_retries, "compute node woken up after");
|
||||
return Ok(n);
|
||||
}
|
||||
|
||||
@@ -195,7 +195,11 @@ impl DynamicLimiter {
|
||||
///
|
||||
/// Set the outcome to `None` to ignore the job.
|
||||
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
|
||||
tracing::info!("outcome is {:?}", outcome);
|
||||
if outcome.is_none() {
|
||||
tracing::warn!("outcome is {:?}", outcome);
|
||||
} else {
|
||||
tracing::debug!("outcome is {:?}", outcome);
|
||||
}
|
||||
if self.config.initial_limit == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -31,26 +31,32 @@ impl LimitAlgorithm for Aimd {
|
||||
|
||||
if utilisation > self.utilisation {
|
||||
let limit = old_limit + self.inc;
|
||||
let increased_limit = limit.clamp(self.min, self.max);
|
||||
if increased_limit > old_limit {
|
||||
tracing::info!(increased_limit, "limit increased");
|
||||
let new_limit = limit.clamp(self.min, self.max);
|
||||
if new_limit > old_limit {
|
||||
tracing::info!(old_limit, new_limit, "limit increased");
|
||||
} else {
|
||||
tracing::debug!(old_limit, new_limit, "limit clamped at max");
|
||||
}
|
||||
|
||||
increased_limit
|
||||
new_limit
|
||||
} else {
|
||||
old_limit
|
||||
}
|
||||
}
|
||||
Outcome::Overload => {
|
||||
let limit = old_limit as f32 * self.dec;
|
||||
let new_limit = old_limit as f32 * self.dec;
|
||||
|
||||
// Floor instead of round, so the limit reduces even with small numbers.
|
||||
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
|
||||
let limit = limit.floor() as usize;
|
||||
let new_limit = new_limit.floor() as usize;
|
||||
|
||||
let limit = limit.clamp(self.min, self.max);
|
||||
tracing::info!(limit, "limit decreased");
|
||||
limit
|
||||
let new_limit = new_limit.clamp(self.min, self.max);
|
||||
if new_limit < old_limit {
|
||||
tracing::info!(old_limit, new_limit, "limit decreased");
|
||||
} else {
|
||||
tracing::debug!(old_limit, new_limit, "limit clamped at min");
|
||||
}
|
||||
new_limit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +121,7 @@ impl RedisPublisherClient {
|
||||
cancel_key_data: CancelKeyData,
|
||||
session_id: Uuid,
|
||||
) -> anyhow::Result<()> {
|
||||
// TODO: review redundant error duplication logs.
|
||||
if !self.limiter.check() {
|
||||
tracing::info!("Rate limit exceeded. Skipping cancellation message");
|
||||
return Err(anyhow::anyhow!("Rate limit exceeded"));
|
||||
@@ -146,7 +147,7 @@ impl CancellationPublisherMut for RedisPublisherClient {
|
||||
tracing::info!("publishing cancellation key to Redis");
|
||||
match self.try_publish_internal(cancel_key_data, session_id).await {
|
||||
Ok(()) => {
|
||||
tracing::info!("cancellation key successfuly published to Redis");
|
||||
tracing::debug!("cancellation key successfuly published to Redis");
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -167,10 +167,10 @@ impl PoolingBackend {
|
||||
force_new: bool,
|
||||
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
|
||||
let maybe_client = if force_new {
|
||||
info!("pool: pool is disabled");
|
||||
debug!("pool: pool is disabled");
|
||||
None
|
||||
} else {
|
||||
info!("pool: looking for an existing connection");
|
||||
debug!("pool: looking for an existing connection");
|
||||
self.pool.get(ctx, &conn_info)?
|
||||
};
|
||||
|
||||
@@ -204,14 +204,14 @@ impl PoolingBackend {
|
||||
ctx: &RequestContext,
|
||||
conn_info: ConnInfo,
|
||||
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
|
||||
info!("pool: looking for an existing connection");
|
||||
debug!("pool: looking for an existing connection");
|
||||
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
|
||||
return Ok(client);
|
||||
}
|
||||
|
||||
let conn_id = uuid::Uuid::new_v4();
|
||||
tracing::Span::current().record("conn_id", display(conn_id));
|
||||
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
|
||||
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
|
||||
info: ComputeUserInfo {
|
||||
user: conn_info.user_info.user.clone(),
|
||||
|
||||
@@ -243,7 +243,7 @@ mod tests {
|
||||
},
|
||||
cancel_set: CancelSet::new(0),
|
||||
client_conn_threshold: u64::MAX,
|
||||
max_request_size_bytes: u64::MAX,
|
||||
max_request_size_bytes: usize::MAX,
|
||||
max_response_size_bytes: usize::MAX,
|
||||
}));
|
||||
let pool = GlobalConnPool::new(config);
|
||||
|
||||
@@ -232,7 +232,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
|
||||
|
||||
// do logging outside of the mutex
|
||||
if returned {
|
||||
info!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
|
||||
} else {
|
||||
info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
|
||||
}
|
||||
@@ -409,7 +409,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
@@ -227,7 +227,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
|
||||
};
|
||||
|
||||
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
|
||||
info!(
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
@@ -29,7 +29,7 @@ use tokio_postgres::tls::NoTlsStream;
|
||||
use tokio_postgres::types::ToSql;
|
||||
use tokio_postgres::{AsyncMessage, Socket};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use tracing::{debug, error, info, info_span, warn, Instrument};
|
||||
|
||||
use super::backend::HttpConnError;
|
||||
use super::conn_pool_lib::{
|
||||
@@ -110,7 +110,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
|
||||
"pid",
|
||||
tracing::field::display(client.inner.get_process_id()),
|
||||
);
|
||||
info!(
|
||||
debug!(
|
||||
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
|
||||
"local_pool: reusing connection '{conn_info}'"
|
||||
);
|
||||
|
||||
@@ -8,7 +8,7 @@ use http::header::AUTHORIZATION;
|
||||
use http::Method;
|
||||
use http_body_util::combinators::BoxBody;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::body::{Body, Incoming};
|
||||
use hyper::body::Incoming;
|
||||
use hyper::http::{HeaderName, HeaderValue};
|
||||
use hyper::{header, HeaderMap, Request, Response, StatusCode};
|
||||
use pq_proto::StartupMessageParamsBuilder;
|
||||
@@ -18,7 +18,7 @@ use tokio::time;
|
||||
use tokio_postgres::error::{DbError, ErrorPosition, SqlState};
|
||||
use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info};
|
||||
use tracing::{debug, error, info};
|
||||
use typed_json::json;
|
||||
use url::Url;
|
||||
use urlencoding;
|
||||
@@ -36,6 +36,7 @@ use crate::auth::{endpoint_sni, ComputeUserInfoParseError};
|
||||
use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
|
||||
use crate::context::RequestContext;
|
||||
use crate::error::{ErrorKind, ReportableError, UserFacingError};
|
||||
use crate::http::{read_body_with_limit, ReadBodyError};
|
||||
use crate::metrics::{HttpDirection, Metrics};
|
||||
use crate::proxy::{run_until_cancelled, NeonOptions};
|
||||
use crate::serverless::backend::HttpConnError;
|
||||
@@ -357,8 +358,6 @@ pub(crate) enum SqlOverHttpError {
|
||||
ConnectCompute(#[from] HttpConnError),
|
||||
#[error("{0}")]
|
||||
ConnInfo(#[from] ConnInfoError),
|
||||
#[error("request is too large (max is {0} bytes)")]
|
||||
RequestTooLarge(u64),
|
||||
#[error("response is too large (max is {0} bytes)")]
|
||||
ResponseTooLarge(usize),
|
||||
#[error("invalid isolation level")]
|
||||
@@ -377,7 +376,6 @@ impl ReportableError for SqlOverHttpError {
|
||||
SqlOverHttpError::ReadPayload(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
|
||||
SqlOverHttpError::RequestTooLarge(_) => ErrorKind::User,
|
||||
SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User,
|
||||
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
|
||||
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
|
||||
@@ -393,7 +391,6 @@ impl UserFacingError for SqlOverHttpError {
|
||||
SqlOverHttpError::ReadPayload(p) => p.to_string(),
|
||||
SqlOverHttpError::ConnectCompute(c) => c.to_string_client(),
|
||||
SqlOverHttpError::ConnInfo(c) => c.to_string_client(),
|
||||
SqlOverHttpError::RequestTooLarge(_) => self.to_string(),
|
||||
SqlOverHttpError::ResponseTooLarge(_) => self.to_string(),
|
||||
SqlOverHttpError::InvalidIsolationLevel => self.to_string(),
|
||||
SqlOverHttpError::Postgres(p) => p.to_string(),
|
||||
@@ -406,13 +403,12 @@ impl UserFacingError for SqlOverHttpError {
|
||||
impl HttpCodeError for SqlOverHttpError {
|
||||
fn get_http_status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
SqlOverHttpError::ReadPayload(_) => StatusCode::BAD_REQUEST,
|
||||
SqlOverHttpError::ReadPayload(e) => e.get_http_status_code(),
|
||||
SqlOverHttpError::ConnectCompute(h) => match h.get_error_kind() {
|
||||
ErrorKind::User => StatusCode::BAD_REQUEST,
|
||||
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
},
|
||||
SqlOverHttpError::ConnInfo(_) => StatusCode::BAD_REQUEST,
|
||||
SqlOverHttpError::RequestTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE,
|
||||
SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE,
|
||||
SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST,
|
||||
SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST,
|
||||
@@ -426,19 +422,41 @@ impl HttpCodeError for SqlOverHttpError {
|
||||
pub(crate) enum ReadPayloadError {
|
||||
#[error("could not read the HTTP request body: {0}")]
|
||||
Read(#[from] hyper::Error),
|
||||
#[error("request is too large (max is {limit} bytes)")]
|
||||
BodyTooLarge { limit: usize },
|
||||
#[error("could not parse the HTTP request body: {0}")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
impl From<ReadBodyError<hyper::Error>> for ReadPayloadError {
|
||||
fn from(value: ReadBodyError<hyper::Error>) -> Self {
|
||||
match value {
|
||||
ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit },
|
||||
ReadBodyError::Read(e) => Self::Read(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReportableError for ReadPayloadError {
|
||||
fn get_error_kind(&self) -> ErrorKind {
|
||||
match self {
|
||||
ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect,
|
||||
ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User,
|
||||
ReadPayloadError::Parse(_) => ErrorKind::User,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpCodeError for ReadPayloadError {
|
||||
fn get_http_status_code(&self) -> StatusCode {
|
||||
match self {
|
||||
ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST,
|
||||
ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE,
|
||||
ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub(crate) enum SqlOverHttpCancel {
|
||||
#[error("query was cancelled")]
|
||||
@@ -580,28 +598,20 @@ async fn handle_db_inner(
|
||||
|
||||
let parsed_headers = HttpHeaders::try_parse(headers)?;
|
||||
|
||||
let request_content_length = match request.body().size_hint().upper() {
|
||||
Some(v) => v,
|
||||
None => config.http_config.max_request_size_bytes + 1,
|
||||
};
|
||||
info!(request_content_length, "request size in bytes");
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_conn_content_length_bytes
|
||||
.observe(HttpDirection::Request, request_content_length as f64);
|
||||
|
||||
// we don't have a streaming request support yet so this is to prevent OOM
|
||||
// from a malicious user sending an extremely large request body
|
||||
if request_content_length > config.http_config.max_request_size_bytes {
|
||||
return Err(SqlOverHttpError::RequestTooLarge(
|
||||
config.http_config.max_request_size_bytes,
|
||||
));
|
||||
}
|
||||
|
||||
let fetch_and_process_request = Box::pin(
|
||||
async {
|
||||
let body = request.into_body().collect().await?.to_bytes();
|
||||
info!(length = body.len(), "request payload read");
|
||||
let body = read_body_with_limit(
|
||||
request.into_body(),
|
||||
config.http_config.max_request_size_bytes,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Metrics::get()
|
||||
.proxy
|
||||
.http_conn_content_length_bytes
|
||||
.observe(HttpDirection::Request, body.len() as f64);
|
||||
|
||||
debug!(length = body.len(), "request payload read");
|
||||
let payload: Payload = serde_json::from_slice(&body)?;
|
||||
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user