From 249f5ea17de148a83043dc5828bfce0c6d30258c Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 26 Sep 2024 13:44:40 +0100 Subject: [PATCH] cleaner local-proxy conn error code --- proxy/src/serverless/backend.rs | 94 ++++++++++++++++++--------- proxy/src/serverless/sql_over_http.rs | 14 +++- 2 files changed, 76 insertions(+), 32 deletions(-) diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index e29d3a0c76..89eeec3e6f 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -237,12 +237,10 @@ impl PoolingBackend { pub(crate) enum HttpConnError { #[error("pooled connection closed at inconsistent state")] ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError), - #[error("could not connection to compute")] - ConnectionError(#[from] tokio_postgres::Error), - #[error("could not connection to compute")] - IoConnectionError(#[from] std::io::Error), - #[error("could not establish h2 connection to compute")] - H2ConnectionError(#[from] hyper1::Error), + #[error("could not connection to postgres in compute")] + PostgresConnectionError(#[from] tokio_postgres::Error), + #[error("could not connection to local-proxy in compute")] + LocalProxyConnectionError(#[from] LocalProxyConnError), #[error("could not get auth info")] GetAuthInfo(#[from] GetAuthInfoError), @@ -254,13 +252,20 @@ pub(crate) enum HttpConnError { TooManyConnectionAttempts(#[from] ApiLockError), } +#[derive(Debug, thiserror::Error)] +pub(crate) enum LocalProxyConnError { + #[error("error with connection to local-proxy")] + Io(#[source] std::io::Error), + #[error("could not establish h2 connection")] + H2(#[from] hyper1::Error), +} + impl ReportableError for HttpConnError { fn get_error_kind(&self) -> ErrorKind { match self { HttpConnError::ConnectionClosedAbruptly(_) => ErrorKind::Compute, - HttpConnError::ConnectionError(p) => p.get_error_kind(), - HttpConnError::IoConnectionError(_) => ErrorKind::Compute, - HttpConnError::H2ConnectionError(_) => ErrorKind::Compute, + HttpConnError::PostgresConnectionError(p) => p.get_error_kind(), + HttpConnError::LocalProxyConnectionError(_) => ErrorKind::Compute, HttpConnError::GetAuthInfo(a) => a.get_error_kind(), HttpConnError::AuthError(a) => a.get_error_kind(), HttpConnError::WakeCompute(w) => w.get_error_kind(), @@ -273,9 +278,8 @@ impl UserFacingError for HttpConnError { fn to_string_client(&self) -> String { match self { HttpConnError::ConnectionClosedAbruptly(_) => self.to_string(), - HttpConnError::ConnectionError(p) => p.to_string(), - HttpConnError::IoConnectionError(p) => p.to_string(), - HttpConnError::H2ConnectionError(_) => "Could not establish HTTP connection to the database".to_string(), + HttpConnError::PostgresConnectionError(p) => p.to_string(), + HttpConnError::LocalProxyConnectionError(p) => p.to_string(), HttpConnError::GetAuthInfo(c) => c.to_string_client(), HttpConnError::AuthError(c) => c.to_string_client(), HttpConnError::WakeCompute(c) => c.to_string_client(), @@ -289,9 +293,8 @@ impl UserFacingError for HttpConnError { impl CouldRetry for HttpConnError { fn could_retry(&self) -> bool { match self { - HttpConnError::ConnectionError(e) => e.could_retry(), - HttpConnError::IoConnectionError(e) => e.could_retry(), - HttpConnError::H2ConnectionError(_) => false, + HttpConnError::PostgresConnectionError(e) => e.could_retry(), + HttpConnError::LocalProxyConnectionError(e) => e.could_retry(), HttpConnError::ConnectionClosedAbruptly(_) => false, HttpConnError::GetAuthInfo(_) => false, HttpConnError::AuthError(_) => false, @@ -303,7 +306,7 @@ impl CouldRetry for HttpConnError { impl ShouldRetryWakeCompute for HttpConnError { fn should_retry_wake_compute(&self) -> bool { match self { - HttpConnError::ConnectionError(e) => e.should_retry_wake_compute(), + HttpConnError::PostgresConnectionError(e) => e.should_retry_wake_compute(), // we never checked cache validity HttpConnError::TooManyConnectionAttempts(_) => false, _ => true, @@ -311,6 +314,38 @@ impl ShouldRetryWakeCompute for HttpConnError { } } +impl ReportableError for LocalProxyConnError { + fn get_error_kind(&self) -> ErrorKind { + match self { + LocalProxyConnError::Io(_) => ErrorKind::Compute, + LocalProxyConnError::H2(_) => ErrorKind::Compute, + } + } +} + +impl UserFacingError for LocalProxyConnError { + fn to_string_client(&self) -> String { + "Could not establish HTTP connection to the database".to_string() + } +} + +impl CouldRetry for LocalProxyConnError { + fn could_retry(&self) -> bool { + match self { + LocalProxyConnError::Io(_) => false, + LocalProxyConnError::H2(_) => false, + } + } +} +impl ShouldRetryWakeCompute for LocalProxyConnError { + fn should_retry_wake_compute(&self) -> bool { + match self { + LocalProxyConnError::Io(_) => false, + LocalProxyConnError::H2(_) => false, + } + } +} + struct TokioMechanism { pool: Arc>, conn_info: ConnInfo, @@ -410,39 +445,40 @@ async fn connect_http2( host: &str, port: u16, timeout: Duration, -) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), HttpConnError> { - let mut addrs = lookup_host((host, port)).await?; +) -> Result<(http_conn_pool::Send, http_conn_pool::Connect), LocalProxyConnError> { + // assumption: host is an ip address so this should not actually perform any requests. + // todo: add that assumption as a guarantee in the control-plane API. + let mut addrs = lookup_host((host, port)) + .await + .map_err(LocalProxyConnError::Io)?; let mut last_err = None; let stream = loop { let Some(addr) = addrs.next() else { return Err(last_err.unwrap_or_else(|| { - HttpConnError::IoConnectionError(io::Error::new( + LocalProxyConnError::Io(io::Error::new( io::ErrorKind::InvalidInput, "could not resolve any addresses", )) })); }; - let stream = match tokio::time::timeout(timeout, TcpStream::connect(addr)).await { - Ok(Ok(stream)) => stream, + match tokio::time::timeout(timeout, TcpStream::connect(addr)).await { + Ok(Ok(stream)) => { + stream.set_nodelay(true).map_err(LocalProxyConnError::Io)?; + break stream; + } Ok(Err(e)) => { - last_err = Some(HttpConnError::IoConnectionError(e)); - continue; + last_err = Some(LocalProxyConnError::Io(e)); } Err(e) => { - last_err = Some(HttpConnError::IoConnectionError(io::Error::new( + last_err = Some(LocalProxyConnError::Io(io::Error::new( io::ErrorKind::TimedOut, e, ))); - continue; } }; - - stream.set_nodelay(true)?; - - break stream; }; let (client, connection) = hyper1::client::conn::http2::Builder::new(TokioExecutor::new()) diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index ef8e475d25..4db3a09e9a 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -60,6 +60,7 @@ use crate::usage_metrics::MetricCounterRecorder; use crate::DbName; use crate::RoleName; +use super::backend::LocalProxyConnError; use super::backend::PoolingBackend; use super::conn_pool::AuthData; use super::conn_pool::Client; @@ -303,7 +304,7 @@ pub(crate) async fn handle( let mut message = e.to_string_client(); let db_error = match &e { - SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(e)) + SqlOverHttpError::ConnectCompute(HttpConnError::PostgresConnectionError(e)) | SqlOverHttpError::Postgres(e) => e.as_db_error(), _ => None, }; @@ -758,7 +759,11 @@ async fn handle_auth_broker_inner( // always completes instantly in http2 mode // but good just in case - client.ready().await.map_err(HttpConnError::from)?; + client + .ready() + .await + .map_err(LocalProxyConnError::from) + .map_err(HttpConnError::from)?; let local_proxy_uri = ::http::Uri::from_static("http://proxy.local/sql"); @@ -783,6 +788,7 @@ async fn handle_auth_broker_inner( Ok(client .send_request(req) .await + .map_err(LocalProxyConnError::from) .map_err(HttpConnError::from)? .map(|b| b.boxed())) } @@ -836,7 +842,9 @@ impl QueryData { // query failed or was cancelled. Ok(Err(error)) => { let db_error = match &error { - SqlOverHttpError::ConnectCompute(HttpConnError::ConnectionError(e)) + SqlOverHttpError::ConnectCompute( + HttpConnError::PostgresConnectionError(e), + ) | SqlOverHttpError::Postgres(e) => e.as_db_error(), _ => None, };