From d91ff747bb8909b436068cf6806d7c9d0e2ca3a2 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 8 Mar 2024 19:16:39 +0000 Subject: [PATCH] remove tls listener file --- proxy/src/serverless.rs | 28 ++- proxy/src/serverless/tls_listener.rs | 283 --------------------------- 2 files changed, 22 insertions(+), 289 deletions(-) delete mode 100644 proxy/src/serverless/tls_listener.rs diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index d5387c172c..fada8c0701 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -21,10 +21,11 @@ use rand::SeedableRng; pub use reqwest_middleware::{ClientWithMiddleware, Error}; pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use serde::Serialize; +use tokio::time::timeout; use tokio_util::task::TaskTracker; use crate::context::RequestMonitoring; -use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE; +use crate::metrics::{NUM_CLIENT_CONNECTION_GAUGE, TLS_HANDSHAKE_FAILURES}; use crate::protocol2::WithClientIp; use crate::proxy::run_until_cancelled; use crate::rate_limiter::EndpointRateLimiter; @@ -35,6 +36,7 @@ use std::convert::Infallible; use std::net::IpAddr; use std::pin::pin; use std::sync::Arc; +use std::time::Duration; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn, Instrument}; @@ -128,7 +130,9 @@ pub async fn task_main( let peer = match conn.wait_for_addr().await { Ok(peer) => peer, Err(e) => { - tracing::error!("could not parse PROXY protocol header: {e}"); + tracing::error!( + "failed to accept TCP connection: invalid PROXY protocol V2 header: {e:#}" + ); return; } }; @@ -137,10 +141,22 @@ pub async fn task_main( peer_addr = peer; } - let conn = match tls.accept(conn).await { - Ok(conn) => conn, - Err(e) => { - tracing::error!(protocol = "http", "TLS failure: {e}"); + let accept = tls.accept(conn); + let conn = match timeout(Duration::from_secs(10), accept).await { + Ok(Ok(conn)) => { + info!(%peer_addr, protocol = "http", "accepted new TLS connection"); + conn + } + // The handshake failed, try getting another connection from the queue + Ok(Err(e)) => { + TLS_HANDSHAKE_FAILURES.inc(); + warn!(%peer_addr, protocol = "http", "failed to accept TLS connection: {e:?}"); + return; + } + // The handshake timed out, try getting another connection from the queue + Err(_) => { + TLS_HANDSHAKE_FAILURES.inc(); + warn!(%peer_addr, protocol = "http", "failed to accept TLS connection: timeout"); return; } }; diff --git a/proxy/src/serverless/tls_listener.rs b/proxy/src/serverless/tls_listener.rs deleted file mode 100644 index 6196ff393c..0000000000 --- a/proxy/src/serverless/tls_listener.rs +++ /dev/null @@ -1,283 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; - -use futures::{Future, Stream, StreamExt}; -use pin_project_lite::pin_project; -use thiserror::Error; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - task::JoinSet, - time::timeout, -}; - -/// Default timeout for the TLS handshake. -pub const DEFAULT_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); - -/// Trait for TLS implementation. -/// -/// Implementations are provided by the rustls and native-tls features. -pub trait AsyncTls: Clone { - /// The type of the TLS stream created from the underlying stream. - type Stream: Send + 'static; - /// Error type for completing the TLS handshake - type Error: std::error::Error + Send + 'static; - /// Type of the Future for the TLS stream that is accepted. - type AcceptFuture: Future> + Send + 'static; - - /// Accept a TLS connection on an underlying stream - fn accept(&self, stream: C) -> Self::AcceptFuture; -} - -/// Asynchronously accept connections. -pub trait AsyncAccept { - /// The type of the connection that is accepted. - type Connection: AsyncRead + AsyncWrite; - /// The type of error that may be returned. - type Error; - - /// Poll to accept the next connection. - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>>; - - /// Return a new `AsyncAccept` that stops accepting connections after - /// `ender` completes. - /// - /// Useful for graceful shutdown. - /// - /// See [examples/echo.rs](https://github.com/tmccombs/tls-listener/blob/main/examples/echo.rs) - /// for example of how to use. - fn until(self, ender: F) -> Until - where - Self: Sized, - { - Until { - acceptor: self, - ender, - } - } -} - -pin_project! { - /// - /// Wraps a `Stream` of connections (such as a TCP listener) so that each connection is itself - /// encrypted using TLS. - /// - /// It is similar to: - /// - /// ```ignore - /// tcpListener.and_then(|s| tlsAcceptor.accept(s)) - /// ``` - /// - /// except that it has the ability to accept multiple transport-level connections - /// simultaneously while the TLS handshake is pending for other connections. - /// - /// By default, if a client fails the TLS handshake, that is treated as an error, and the - /// `TlsListener` will return an `Err`. If the `TlsListener` is passed directly to a hyper - /// [`Server`][1], then an invalid handshake can cause the server to stop accepting connections. - /// See [`http-stream.rs`][2] or [`http-low-level`][3] examples, for examples of how to avoid this. - /// - /// Note that if the maximum number of pending connections is greater than 1, the resulting - /// [`T::Stream`][4] connections may come in a different order than the connections produced by the - /// underlying listener. - /// - /// [1]: https://docs.rs/hyper/latest/hyper/server/struct.Server.html - /// [2]: https://github.com/tmccombs/tls-listener/blob/main/examples/http-stream.rs - /// [3]: https://github.com/tmccombs/tls-listener/blob/main/examples/http-low-level.rs - /// [4]: AsyncTls::Stream - /// - #[allow(clippy::type_complexity)] - pub struct TlsListener> { - #[pin] - listener: A, - tls: T, - waiting: JoinSet, tokio::time::error::Elapsed>>, - timeout: Duration, - } -} - -/// Builder for `TlsListener`. -#[derive(Clone)] -pub struct Builder { - tls: T, - handshake_timeout: Duration, -} - -/// Wraps errors from either the listener or the TLS Acceptor -#[derive(Debug, Error)] -pub enum Error { - /// An error that arose from the listener ([AsyncAccept::Error]) - #[error("{0}")] - ListenerError(#[source] LE), - /// An error that occurred during the TLS accept handshake - #[error("{0}")] - TlsAcceptError(#[source] TE), -} - -impl TlsListener -where - T: AsyncTls, -{ - /// Create a `TlsListener` with default options. - pub fn new(tls: T, listener: A) -> Self { - builder(tls).listen(listener) - } -} - -impl TlsListener -where - A: AsyncAccept, - A::Error: std::error::Error, - T: AsyncTls, -{ - /// Accept the next connection - /// - /// This is essentially an alias to `self.next()` with a more domain-appropriate name. - pub async fn accept(&mut self) -> Option<::Item> - where - Self: Unpin, - { - self.next().await - } - - /// Replaces the Tls Acceptor configuration, which will be used for new connections. - /// - /// This can be used to change the certificate used at runtime. - pub fn replace_acceptor(&mut self, acceptor: T) { - self.tls = acceptor; - } - - /// Replaces the Tls Acceptor configuration from a pinned reference to `Self`. - /// - /// This is useful if your listener is `!Unpin`. - /// - /// This can be used to change the certificate used at runtime. - pub fn replace_acceptor_pin(self: Pin<&mut Self>, acceptor: T) { - *self.project().tls = acceptor; - } -} - -impl Stream for TlsListener -where - A: AsyncAccept, - A::Error: std::error::Error, - T: AsyncTls, -{ - type Item = Result>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - match this.listener.as_mut().poll_accept(cx) { - Poll::Pending => break, - Poll::Ready(Some(Ok(conn))) => { - this.waiting - .spawn(timeout(*this.timeout, this.tls.accept(conn))); - } - Poll::Ready(Some(Err(e))) => { - return Poll::Ready(Some(Err(Error::ListenerError(e)))); - } - Poll::Ready(None) => return Poll::Ready(None), - } - } - - loop { - return match this.waiting.poll_join_next(cx) { - Poll::Ready(Some(Ok(Ok(conn)))) => { - Poll::Ready(Some(conn.map_err(Error::TlsAcceptError))) - } - // The handshake timed out, try getting another connection from the queue - Poll::Ready(Some(Ok(Err(_)))) => continue, - // The handshake panicked - Poll::Ready(Some(Err(e))) if e.is_panic() => { - std::panic::resume_unwind(e.into_panic()) - } - // The handshake was externally aborted - Poll::Ready(Some(Err(_))) => unreachable!("handshake tasks are never aborted"), - _ => Poll::Pending, - }; - } - } -} - -impl AsyncTls for tokio_rustls::TlsAcceptor { - type Stream = tokio_rustls::server::TlsStream; - type Error = std::io::Error; - type AcceptFuture = tokio_rustls::Accept; - - fn accept(&self, conn: C) -> Self::AcceptFuture { - tokio_rustls::TlsAcceptor::accept(self, conn) - } -} - -impl Builder { - /// Set the timeout for handshakes. - /// - /// If a timeout takes longer than `timeout`, then the handshake will be - /// aborted and the underlying connection will be dropped. - /// - /// Defaults to `DEFAULT_HANDSHAKE_TIMEOUT`. - pub fn handshake_timeout(&mut self, timeout: Duration) -> &mut Self { - self.handshake_timeout = timeout; - self - } - - /// Create a `TlsListener` from the builder - /// - /// Actually build the `TlsListener`. The `listener` argument should be - /// an implementation of the `AsyncAccept` trait that accepts new connections - /// that the `TlsListener` will encrypt using TLS. - pub fn listen(&self, listener: A) -> TlsListener - where - T: AsyncTls, - { - TlsListener { - listener, - tls: self.tls.clone(), - waiting: JoinSet::new(), - timeout: self.handshake_timeout, - } - } -} - -/// Create a new Builder for a TlsListener -/// -/// `server_config` will be used to configure the TLS sessions. -pub fn builder(tls: T) -> Builder { - Builder { - tls, - handshake_timeout: DEFAULT_HANDSHAKE_TIMEOUT, - } -} - -pin_project! { - /// See [`AsyncAccept::until`] - pub struct Until { - #[pin] - acceptor: A, - #[pin] - ender: E, - } -} - -impl AsyncAccept for Until { - type Connection = A::Connection; - type Error = A::Error; - - fn poll_accept( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let this = self.project(); - - match this.ender.poll(cx) { - Poll::Pending => this.acceptor.poll_accept(cx), - Poll::Ready(_) => Poll::Ready(None), - } - } -}