diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 481bd8501c..5b0984cc4f 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -26,9 +26,10 @@ use utils::sentry_init::init_sentry; use crate::context::RequestContext; use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::pglb::TlsRequired; use crate::pqproto::FeStartupPacket; use crate::protocol2::ConnectionInfo; -use crate::proxy::{ErrorSource, TlsRequired, copy_bidirectional_client_compute}; +use crate::proxy::{ErrorSource, copy_bidirectional_client_compute}; use crate::stream::{PqStream, Stream}; use crate::util::run_until_cancelled; diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index 757c1e988b..b40871113d 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -410,7 +410,7 @@ pub async fn run() -> anyhow::Result<()> { match auth_backend { Either::Left(auth_backend) => { if let Some(proxy_listener) = proxy_listener { - client_tasks.spawn(crate::proxy::task_main( + client_tasks.spawn(crate::pglb::task_main( config, auth_backend, proxy_listener, diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 5331ea41fd..12c64d2be2 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -13,9 +13,10 @@ use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; use crate::pglb::handshake::{HandshakeData, handshake}; use crate::pglb::passthrough::ProxyPassthrough; +use crate::pglb::{ClientRequestError, ErrorSource}; use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; -use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection}; +use crate::proxy::prepare_client_connection; use crate::util::run_until_cancelled; pub async fn task_main( diff --git a/proxy/src/pglb/handshake.rs b/proxy/src/pglb/handshake.rs index 6970ab8714..25a2d01b4a 100644 --- a/proxy/src/pglb/handshake.rs +++ b/proxy/src/pglb/handshake.rs @@ -8,10 +8,10 @@ use crate::config::TlsConfig; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::Metrics; +use crate::pglb::TlsRequired; use crate::pqproto::{ BeMessage, CancelKeyData, FeStartupPacket, ProtocolVersion, StartupMessageParams, }; -use crate::proxy::TlsRequired; use crate::stream::{PqStream, Stream, StreamUpgradeError}; use crate::tls::PG_ALPN_PROTOCOL; diff --git a/proxy/src/pglb/mod.rs b/proxy/src/pglb/mod.rs index cb82524cf6..9967562d85 100644 --- a/proxy/src/pglb/mod.rs +++ b/proxy/src/pglb/mod.rs @@ -2,3 +2,390 @@ pub mod copy_bidirectional; pub mod handshake; pub mod inprocess; pub mod passthrough; + +use std::sync::Arc; + +use futures::FutureExt; +use smol_str::ToSmolStr; +use thiserror::Error; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, debug, error, info, warn}; + +use crate::cancellation::{self, CancellationHandler}; +use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; +use crate::context::RequestContext; +use crate::error::{ReportableError, UserFacingError}; +use crate::metrics::{Metrics, NumClientConnectionsGuard}; +pub use crate::pglb::copy_bidirectional::ErrorSource; +use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake}; +use crate::pglb::passthrough::ProxyPassthrough; +use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; +use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::proxy::{NeonOptions, prepare_client_connection}; +use crate::rate_limiter::EndpointRateLimiter; +use crate::stream::Stream; +use crate::util::run_until_cancelled; +use crate::{auth, compute}; + +pub const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; + +#[derive(Error, Debug)] +#[error("{ERR_INSECURE_CONNECTION}")] +pub struct TlsRequired; + +impl ReportableError for TlsRequired { + fn get_error_kind(&self) -> crate::error::ErrorKind { + crate::error::ErrorKind::User + } +} + +impl UserFacingError for TlsRequired {} + +pub async fn task_main( + config: &'static ProxyConfig, + auth_backend: &'static auth::Backend<'static, ()>, + listener: tokio::net::TcpListener, + cancellation_token: CancellationToken, + cancellation_handler: Arc, + endpoint_rate_limiter: Arc, +) -> anyhow::Result<()> { + scopeguard::defer! { + info!("proxy has shut down"); + } + + // When set for the server socket, the keepalive setting + // will be inherited by all accepted client sockets. + socket2::SockRef::from(&listener).set_keepalive(true)?; + + let connections = tokio_util::task::task_tracker::TaskTracker::new(); + let cancellations = tokio_util::task::task_tracker::TaskTracker::new(); + + while let Some(accept_result) = + run_until_cancelled(listener.accept(), &cancellation_token).await + { + let (socket, peer_addr) = accept_result?; + + let conn_gauge = Metrics::get() + .proxy + .client_connections + .guard(crate::metrics::Protocol::Tcp); + + let session_id = uuid::Uuid::new_v4(); + let cancellation_handler = Arc::clone(&cancellation_handler); + let cancellations = cancellations.clone(); + + debug!(protocol = "tcp", %session_id, "accepted new TCP connection"); + let endpoint_rate_limiter2 = endpoint_rate_limiter.clone(); + + connections.spawn(async move { + let (socket, conn_info) = match config.proxy_protocol_v2 { + ProxyProtocolV2::Required => { + match read_proxy_protocol(socket).await { + Err(e) => { + warn!("per-client task finished with an error: {e:#}"); + return; + } + // our load balancers will not send any more data. let's just exit immediately + Ok((_socket, ConnectHeader::Local)) => { + debug!("healthcheck received"); + return; + } + Ok((socket, ConnectHeader::Proxy(info))) => (socket, info), + } + } + // ignore the header - it cannot be confused for a postgres or http connection so will + // error later. + ProxyProtocolV2::Rejected => ( + socket, + ConnectionInfo { + addr: peer_addr, + extra: None, + }, + ), + }; + + match socket.set_nodelay(true) { + Ok(()) => {} + Err(e) => { + error!( + "per-client task finished with an error: failed to set socket option: {e:#}" + ); + return; + } + } + + let ctx = RequestContext::new( + session_id, + conn_info, + crate::metrics::Protocol::Tcp, + &config.region, + ); + + let res = handle_client( + config, + auth_backend, + &ctx, + cancellation_handler, + socket, + ClientMode::Tcp, + endpoint_rate_limiter2, + conn_gauge, + cancellations, + ) + .instrument(ctx.span()) + .boxed() + .await; + + match res { + Err(e) => { + ctx.set_error_kind(e.get_error_kind()); + warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}"); + } + Ok(None) => { + ctx.set_success(); + } + Ok(Some(p)) => { + ctx.set_success(); + let _disconnect = ctx.log_connect(); + match p.proxy_pass(&config.connect_to_compute).await { + Ok(()) => {} + Err(ErrorSource::Client(e)) => { + warn!( + ?session_id, + "per-client task finished with an IO error from the client: {e:#}" + ); + } + Err(ErrorSource::Compute(e)) => { + error!( + ?session_id, + "per-client task finished with an IO error from the compute: {e:#}" + ); + } + } + } + } + }); + } + + connections.close(); + cancellations.close(); + drop(listener); + + // Drain connections + connections.wait().await; + cancellations.wait().await; + + Ok(()) +} + +pub(crate) enum ClientMode { + Tcp, + Websockets { hostname: Option }, +} + +/// Abstracts the logic of handling TCP vs WS clients +impl ClientMode { + pub(crate) fn allow_cleartext(&self) -> bool { + match self { + ClientMode::Tcp => false, + ClientMode::Websockets { .. } => true, + } + } + + fn hostname<'a, S>(&'a self, s: &'a Stream) -> Option<&'a str> { + match self { + ClientMode::Tcp => s.sni_hostname(), + ClientMode::Websockets { hostname } => hostname.as_deref(), + } + } + + fn handshake_tls<'a>(&self, tls: Option<&'a TlsConfig>) -> Option<&'a TlsConfig> { + match self { + ClientMode::Tcp => tls, + // TLS is None here if using websockets, because the connection is already encrypted. + ClientMode::Websockets { .. } => None, + } + } +} + +#[derive(Debug, Error)] +// almost all errors should be reported to the user, but there's a few cases where we cannot +// 1. Cancellation: we are not allowed to tell the client any cancellation statuses for security reasons +// 2. Handshake: handshake reports errors if it can, otherwise if the handshake fails due to protocol violation, +// we cannot be sure the client even understands our error message +// 3. PrepareClient: The client disconnected, so we can't tell them anyway... +pub(crate) enum ClientRequestError { + #[error("{0}")] + Cancellation(#[from] cancellation::CancelError), + #[error("{0}")] + Handshake(#[from] HandshakeError), + #[error("{0}")] + HandshakeTimeout(#[from] tokio::time::error::Elapsed), + #[error("{0}")] + PrepareClient(#[from] std::io::Error), + #[error("{0}")] + ReportedError(#[from] crate::stream::ReportedError), +} + +impl ReportableError for ClientRequestError { + fn get_error_kind(&self) -> crate::error::ErrorKind { + match self { + ClientRequestError::Cancellation(e) => e.get_error_kind(), + ClientRequestError::Handshake(e) => e.get_error_kind(), + ClientRequestError::HandshakeTimeout(_) => crate::error::ErrorKind::RateLimit, + ClientRequestError::ReportedError(e) => e.get_error_kind(), + ClientRequestError::PrepareClient(_) => crate::error::ErrorKind::ClientDisconnect, + } + } +} + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn handle_client( + config: &'static ProxyConfig, + auth_backend: &'static auth::Backend<'static, ()>, + ctx: &RequestContext, + cancellation_handler: Arc, + stream: S, + mode: ClientMode, + endpoint_rate_limiter: Arc, + conn_gauge: NumClientConnectionsGuard<'static>, + cancellations: tokio_util::task::task_tracker::TaskTracker, +) -> Result>, ClientRequestError> { + debug!( + protocol = %ctx.protocol(), + "handling interactive connection from client" + ); + + let metrics = &Metrics::get().proxy; + let proto = ctx.protocol(); + let request_gauge = metrics.connection_requests.guard(proto); + + let tls = config.tls_config.load(); + let tls = tls.as_deref(); + + let record_handshake_error = !ctx.has_private_peer_addr(); + let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client); + let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error); + + let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake) + .await?? + { + HandshakeData::Startup(stream, params) => (stream, params), + HandshakeData::Cancel(cancel_key_data) => { + // spawn a task to cancel the session, but don't wait for it + cancellations.spawn({ + let cancellation_handler_clone = Arc::clone(&cancellation_handler); + let ctx = ctx.clone(); + let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id()); + cancel_span.follows_from(tracing::Span::current()); + async move { + cancellation_handler_clone + .cancel_session( + cancel_key_data, + ctx, + config.authentication_config.ip_allowlist_check_enabled, + config.authentication_config.is_vpc_acccess_proxy, + auth_backend.get_api(), + ) + .await + .inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok(); + }.instrument(cancel_span) + }); + + return Ok(None); + } + }; + drop(pause); + + ctx.set_db_options(params.clone()); + + let hostname = mode.hostname(stream.get_ref()); + + let common_names = tls.map(|tls| &tls.common_names); + + // Extract credentials which we're going to use for auth. + let result = auth_backend + .as_ref() + .map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names)) + .transpose(); + + let user_info = match result { + Ok(user_info) => user_info, + Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, + }; + + let user = user_info.get_user().to_owned(); + let user_info = match user_info + .authenticate( + ctx, + &mut stream, + mode.allow_cleartext(), + &config.authentication_config, + endpoint_rate_limiter, + ) + .await + { + Ok(auth_result) => auth_result, + Err(e) => { + let db = params.get("database"); + let app = params.get("application_name"); + let params_span = tracing::info_span!("", ?user, ?db, ?app); + + return Err(stream + .throw_error(e, Some(ctx)) + .instrument(params_span) + .await)?; + } + }; + + let (cplane, creds) = match user_info { + auth::Backend::ControlPlane(cplane, creds) => (cplane, creds), + auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"), + }; + let params_compat = creds.info.options.get(NeonOptions::PARAMS_COMPAT).is_some(); + let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys); + auth_info.set_startup_params(¶ms, params_compat); + + let res = connect_to_compute( + ctx, + &TcpMechanism { + user_info: creds.info.clone(), + auth: auth_info, + locks: &config.connect_compute_locks, + }, + &auth::Backend::ControlPlane(cplane, creds.info), + config.wake_compute_retry_config, + &config.connect_to_compute, + ) + .await; + + let node = match res { + Ok(node) => node, + Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, + }; + + let cancellation_handler_clone = Arc::clone(&cancellation_handler); + let session = cancellation_handler_clone.get_key(); + + session.write_cancel_key(node.cancel_closure.clone())?; + prepare_client_connection(&node, *session.key(), &mut stream); + let stream = stream.flush_and_into_inner().await?; + + let private_link_id = match ctx.extra() { + Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()), + Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()), + None => None, + }; + + Ok(Some(ProxyPassthrough { + client: stream, + aux: node.aux.clone(), + private_link_id, + compute: node, + session_id: ctx.session_id(), + cancel: session, + _req: request_gauge, + _conn: conn_gauge, + })) +} diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4211406f6c..24846124c2 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -5,397 +5,18 @@ pub(crate) mod connect_compute; pub(crate) mod retry; pub(crate) mod wake_compute; -use std::sync::Arc; - -use futures::FutureExt; use itertools::Itertools; use once_cell::sync::OnceCell; use regex::Regex; use serde::{Deserialize, Serialize}; -use smol_str::{SmolStr, ToSmolStr, format_smolstr}; -use thiserror::Error; +use smol_str::{SmolStr, format_smolstr}; use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::sync::CancellationToken; -use tracing::{Instrument, debug, error, info, warn}; -use crate::cancellation::{self, CancellationHandler}; -use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; -use crate::context::RequestContext; -use crate::error::{ReportableError, UserFacingError}; -use crate::metrics::{Metrics, NumClientConnectionsGuard}; +use crate::compute; pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute}; -use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake}; -use crate::pglb::passthrough::ProxyPassthrough; use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams}; -use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; -use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; -use crate::rate_limiter::EndpointRateLimiter; -use crate::stream::{PqStream, Stream}; +use crate::stream::PqStream; use crate::types::EndpointCacheKey; -use crate::util::run_until_cancelled; -use crate::{auth, compute}; - -const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; - -#[derive(Error, Debug)] -#[error("{ERR_INSECURE_CONNECTION}")] -pub struct TlsRequired; - -impl ReportableError for TlsRequired { - fn get_error_kind(&self) -> crate::error::ErrorKind { - crate::error::ErrorKind::User - } -} - -impl UserFacingError for TlsRequired {} - -pub async fn task_main( - config: &'static ProxyConfig, - auth_backend: &'static auth::Backend<'static, ()>, - listener: tokio::net::TcpListener, - cancellation_token: CancellationToken, - cancellation_handler: Arc, - endpoint_rate_limiter: Arc, -) -> anyhow::Result<()> { - scopeguard::defer! { - info!("proxy has shut down"); - } - - // When set for the server socket, the keepalive setting - // will be inherited by all accepted client sockets. - socket2::SockRef::from(&listener).set_keepalive(true)?; - - let connections = tokio_util::task::task_tracker::TaskTracker::new(); - let cancellations = tokio_util::task::task_tracker::TaskTracker::new(); - - while let Some(accept_result) = - run_until_cancelled(listener.accept(), &cancellation_token).await - { - let (socket, peer_addr) = accept_result?; - - let conn_gauge = Metrics::get() - .proxy - .client_connections - .guard(crate::metrics::Protocol::Tcp); - - let session_id = uuid::Uuid::new_v4(); - let cancellation_handler = Arc::clone(&cancellation_handler); - let cancellations = cancellations.clone(); - - debug!(protocol = "tcp", %session_id, "accepted new TCP connection"); - let endpoint_rate_limiter2 = endpoint_rate_limiter.clone(); - - connections.spawn(async move { - let (socket, conn_info) = match config.proxy_protocol_v2 { - ProxyProtocolV2::Required => { - match read_proxy_protocol(socket).await { - Err(e) => { - warn!("per-client task finished with an error: {e:#}"); - return; - } - // our load balancers will not send any more data. let's just exit immediately - Ok((_socket, ConnectHeader::Local)) => { - debug!("healthcheck received"); - return; - } - Ok((socket, ConnectHeader::Proxy(info))) => (socket, info), - } - } - // ignore the header - it cannot be confused for a postgres or http connection so will - // error later. - ProxyProtocolV2::Rejected => ( - socket, - ConnectionInfo { - addr: peer_addr, - extra: None, - }, - ), - }; - - match socket.set_nodelay(true) { - Ok(()) => {} - Err(e) => { - error!( - "per-client task finished with an error: failed to set socket option: {e:#}" - ); - return; - } - } - - let ctx = RequestContext::new( - session_id, - conn_info, - crate::metrics::Protocol::Tcp, - &config.region, - ); - - let res = handle_client( - config, - auth_backend, - &ctx, - cancellation_handler, - socket, - ClientMode::Tcp, - endpoint_rate_limiter2, - conn_gauge, - cancellations, - ) - .instrument(ctx.span()) - .boxed() - .await; - - match res { - Err(e) => { - ctx.set_error_kind(e.get_error_kind()); - warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}"); - } - Ok(None) => { - ctx.set_success(); - } - Ok(Some(p)) => { - ctx.set_success(); - let _disconnect = ctx.log_connect(); - match p.proxy_pass(&config.connect_to_compute).await { - Ok(()) => {} - Err(ErrorSource::Client(e)) => { - warn!( - ?session_id, - "per-client task finished with an IO error from the client: {e:#}" - ); - } - Err(ErrorSource::Compute(e)) => { - error!( - ?session_id, - "per-client task finished with an IO error from the compute: {e:#}" - ); - } - } - } - } - }); - } - - connections.close(); - cancellations.close(); - drop(listener); - - // Drain connections - connections.wait().await; - cancellations.wait().await; - - Ok(()) -} - -pub(crate) enum ClientMode { - Tcp, - Websockets { hostname: Option }, -} - -/// Abstracts the logic of handling TCP vs WS clients -impl ClientMode { - pub(crate) fn allow_cleartext(&self) -> bool { - match self { - ClientMode::Tcp => false, - ClientMode::Websockets { .. } => true, - } - } - - fn hostname<'a, S>(&'a self, s: &'a Stream) -> Option<&'a str> { - match self { - ClientMode::Tcp => s.sni_hostname(), - ClientMode::Websockets { hostname } => hostname.as_deref(), - } - } - - fn handshake_tls<'a>(&self, tls: Option<&'a TlsConfig>) -> Option<&'a TlsConfig> { - match self { - ClientMode::Tcp => tls, - // TLS is None here if using websockets, because the connection is already encrypted. - ClientMode::Websockets { .. } => None, - } - } -} - -#[derive(Debug, Error)] -// almost all errors should be reported to the user, but there's a few cases where we cannot -// 1. Cancellation: we are not allowed to tell the client any cancellation statuses for security reasons -// 2. Handshake: handshake reports errors if it can, otherwise if the handshake fails due to protocol violation, -// we cannot be sure the client even understands our error message -// 3. PrepareClient: The client disconnected, so we can't tell them anyway... -pub(crate) enum ClientRequestError { - #[error("{0}")] - Cancellation(#[from] cancellation::CancelError), - #[error("{0}")] - Handshake(#[from] HandshakeError), - #[error("{0}")] - HandshakeTimeout(#[from] tokio::time::error::Elapsed), - #[error("{0}")] - PrepareClient(#[from] std::io::Error), - #[error("{0}")] - ReportedError(#[from] crate::stream::ReportedError), -} - -impl ReportableError for ClientRequestError { - fn get_error_kind(&self) -> crate::error::ErrorKind { - match self { - ClientRequestError::Cancellation(e) => e.get_error_kind(), - ClientRequestError::Handshake(e) => e.get_error_kind(), - ClientRequestError::HandshakeTimeout(_) => crate::error::ErrorKind::RateLimit, - ClientRequestError::ReportedError(e) => e.get_error_kind(), - ClientRequestError::PrepareClient(_) => crate::error::ErrorKind::ClientDisconnect, - } - } -} - -#[allow(clippy::too_many_arguments)] -pub(crate) async fn handle_client( - config: &'static ProxyConfig, - auth_backend: &'static auth::Backend<'static, ()>, - ctx: &RequestContext, - cancellation_handler: Arc, - stream: S, - mode: ClientMode, - endpoint_rate_limiter: Arc, - conn_gauge: NumClientConnectionsGuard<'static>, - cancellations: tokio_util::task::task_tracker::TaskTracker, -) -> Result>, ClientRequestError> { - debug!( - protocol = %ctx.protocol(), - "handling interactive connection from client" - ); - - let metrics = &Metrics::get().proxy; - let proto = ctx.protocol(); - let request_gauge = metrics.connection_requests.guard(proto); - - let tls = config.tls_config.load(); - let tls = tls.as_deref(); - - let record_handshake_error = !ctx.has_private_peer_addr(); - let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Client); - let do_handshake = handshake(ctx, stream, mode.handshake_tls(tls), record_handshake_error); - - let (mut stream, params) = match tokio::time::timeout(config.handshake_timeout, do_handshake) - .await?? - { - HandshakeData::Startup(stream, params) => (stream, params), - HandshakeData::Cancel(cancel_key_data) => { - // spawn a task to cancel the session, but don't wait for it - cancellations.spawn({ - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let ctx = ctx.clone(); - let cancel_span = tracing::span!(parent: None, tracing::Level::INFO, "cancel_session", session_id = ?ctx.session_id()); - cancel_span.follows_from(tracing::Span::current()); - async move { - cancellation_handler_clone - .cancel_session( - cancel_key_data, - ctx, - config.authentication_config.ip_allowlist_check_enabled, - config.authentication_config.is_vpc_acccess_proxy, - auth_backend.get_api(), - ) - .await - .inspect_err(|e | debug!(error = ?e, "cancel_session failed")).ok(); - }.instrument(cancel_span) - }); - - return Ok(None); - } - }; - drop(pause); - - ctx.set_db_options(params.clone()); - - let hostname = mode.hostname(stream.get_ref()); - - let common_names = tls.map(|tls| &tls.common_names); - - // Extract credentials which we're going to use for auth. - let result = auth_backend - .as_ref() - .map(|()| auth::ComputeUserInfoMaybeEndpoint::parse(ctx, ¶ms, hostname, common_names)) - .transpose(); - - let user_info = match result { - Ok(user_info) => user_info, - Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, - }; - - let user = user_info.get_user().to_owned(); - let user_info = match user_info - .authenticate( - ctx, - &mut stream, - mode.allow_cleartext(), - &config.authentication_config, - endpoint_rate_limiter, - ) - .await - { - Ok(auth_result) => auth_result, - Err(e) => { - let db = params.get("database"); - let app = params.get("application_name"); - let params_span = tracing::info_span!("", ?user, ?db, ?app); - - return Err(stream - .throw_error(e, Some(ctx)) - .instrument(params_span) - .await)?; - } - }; - - let (cplane, creds) = match user_info { - auth::Backend::ControlPlane(cplane, creds) => (cplane, creds), - auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"), - }; - let params_compat = creds.info.options.get(NeonOptions::PARAMS_COMPAT).is_some(); - let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys); - auth_info.set_startup_params(¶ms, params_compat); - - let res = connect_to_compute( - ctx, - &TcpMechanism { - user_info: creds.info.clone(), - auth: auth_info, - locks: &config.connect_compute_locks, - }, - &auth::Backend::ControlPlane(cplane, creds.info), - config.wake_compute_retry_config, - &config.connect_to_compute, - ) - .await; - - let node = match res { - Ok(node) => node, - Err(e) => Err(stream.throw_error(e, Some(ctx)).await)?, - }; - - let cancellation_handler_clone = Arc::clone(&cancellation_handler); - let session = cancellation_handler_clone.get_key(); - - session.write_cancel_key(node.cancel_closure.clone())?; - prepare_client_connection(&node, *session.key(), &mut stream); - let stream = stream.flush_and_into_inner().await?; - - let private_link_id = match ctx.extra() { - Some(ConnectionInfoExtra::Aws { vpce_id }) => Some(vpce_id.clone()), - Some(ConnectionInfoExtra::Azure { link_id }) => Some(link_id.to_smolstr()), - None => None, - }; - - Ok(Some(ProxyPassthrough { - client: stream, - aux: node.aux.clone(), - private_link_id, - compute: node, - session_id: ctx.session_id(), - cancel: session, - _req: request_gauge, - _conn: conn_gauge, - })) -} /// Finish client connection initialization: confirm auth success, send params, etc. pub(crate) fn prepare_client_connection( @@ -429,7 +50,7 @@ impl NeonOptions { // proxy options: /// `PARAMS_COMPAT` allows opting in to forwarding all startup parameters from client to compute. - const PARAMS_COMPAT: &str = "proxy_params_compat"; + pub const PARAMS_COMPAT: &str = "proxy_params_compat"; // cplane options: diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index 12de5cbc09..a62406ea83 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -3,6 +3,7 @@ mod mitm; +use std::sync::Arc; use std::time::Duration; use anyhow::{Context, bail}; @@ -20,16 +21,20 @@ use tracing_test::traced_test; use super::retry::CouldRetry; use super::*; use crate::auth::backend::{ComputeUserInfo, MaybeOwned}; -use crate::config::{ComputeConfig, RetryConfig}; +use crate::config::{ComputeConfig, RetryConfig, TlsConfig}; +use crate::context::RequestContext; use crate::control_plane::client::{ControlPlaneClient, TestControlPlaneClient}; use crate::control_plane::messages::{ControlPlaneErrorMessage, Details, MetricsAuxInfo, Status}; use crate::control_plane::{self, CachedNodeInfo, NodeInfo, NodeInfoCache}; -use crate::error::ErrorKind; -use crate::proxy::connect_compute::ConnectMechanism; +use crate::error::{ErrorKind, ReportableError}; +use crate::pglb::ERR_INSECURE_CONNECTION; +use crate::pglb::handshake::{HandshakeData, handshake}; +use crate::proxy::connect_compute::{ConnectMechanism, connect_to_compute}; +use crate::stream::Stream; use crate::tls::client_config::compute_client_config_with_certs; use crate::tls::server_config::CertResolver; use crate::types::{BranchId, EndpointId, ProjectId}; -use crate::{sasl, scram}; +use crate::{auth, sasl, scram}; /// Generate a set of TLS certificates: CA + server. fn generate_certs( diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 8648a94869..ceb43e02e3 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -17,7 +17,8 @@ use crate::config::ProxyConfig; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::Metrics; -use crate::proxy::{ClientMode, ErrorSource, handle_client}; +use crate::pglb::{ClientMode, handle_client}; +use crate::proxy::ErrorSource; use crate::rate_limiter::EndpointRateLimiter; pin_project! {