From 71fda96c21456b0f3576186d9247e76f4bce8de4 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Sun, 10 Mar 2024 08:13:33 +0000 Subject: [PATCH] remove dead code add support for pre-determined http version --- proxy/src/serverless.rs | 49 ++-- proxy/src/serverless/http_auto.rs | 408 ++++-------------------------- 2 files changed, 80 insertions(+), 377 deletions(-) diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index 046f81d1e4..71386e7233 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -163,31 +163,34 @@ pub async fn task_main( } }; - let conn = server.serve_connection_with_upgrades( - hyper_util::rt::TokioIo::new(conn), - hyper1::service::service_fn(move |req: hyper1::Request| { - let backend = backend.clone(); - let ws_connections = ws_connections.clone(); - let endpoint_rate_limiter = endpoint_rate_limiter.clone(); - let cancellation_handler = cancellation_handler.clone(); + let service = hyper1::service::service_fn(move |req: hyper1::Request| { + let backend = backend.clone(); + let ws_connections = ws_connections.clone(); + let endpoint_rate_limiter = endpoint_rate_limiter.clone(); + let cancellation_handler = cancellation_handler.clone(); - async move { - Ok::<_, Infallible>( - request_handler( - req, - config, - backend, - ws_connections, - cancellation_handler, - peer_addr, - endpoint_rate_limiter, - ) - .await - .map_or_else(api_error_into_response, |r| r), + async move { + Ok::<_, Infallible>( + request_handler( + req, + config, + backend, + ws_connections, + cancellation_handler, + peer_addr, + endpoint_rate_limiter, ) - } - }), - ); + .await + .map_or_else(api_error_into_response, |r| r), + ) + } + }); + + let conn = match conn.get_ref().1.alpn_protocol() { + Some(b"http/1.1") => server.serve_http1_connection_with_upgrades(hyper_util::rt::TokioIo::new(conn), service), + Some(b"h2") => server.serve_http2_connection(hyper_util::rt::TokioIo::new(conn), service), + _ => server.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(conn), service) + }; let cancel = pin!(cancellation_token.cancelled()); let conn = pin!(conn); diff --git a/proxy/src/serverless/http_auto.rs b/proxy/src/serverless/http_auto.rs index 831effe4ac..0a5a5b8547 100644 --- a/proxy/src/serverless/http_auto.rs +++ b/proxy/src/serverless/http_auto.rs @@ -5,19 +5,19 @@ use futures::ready; use hyper1::body::Body; use hyper1::rt::ReadBufCursor; use hyper1::service::HttpService; -use hyper_util::rt::TokioExecutor; +use hyper_util::rt::{TokioExecutor, TokioTimer}; use std::future::Future; use std::marker::PhantomPinned; use std::mem::MaybeUninit; use std::pin::Pin; use std::task::{Context, Poll}; -use std::{error::Error as StdError, io, marker::Unpin, time::Duration}; +use std::{error::Error as StdError, io, marker::Unpin}; use ::http1::{Request, Response}; use bytes::{Buf, Bytes}; use hyper1::{ body::Incoming, - rt::{Read, ReadBuf, Timer, Write}, + rt::{Read, ReadBuf, Write}, service::Service, }; @@ -61,40 +61,15 @@ impl Builder { /// auto::Builder::new(TokioExecutor::new()); /// ``` pub fn new() -> Self { - Self { + let mut builder = Self { http1: http1::Builder::new(), http2: http2::Builder::new(TokioExecutor::new()), - } - } + }; - /// Http1 configuration. - pub fn http1(&mut self) -> Http1Builder<'_> { - Http1Builder { inner: self } - } + builder.http1.timer(TokioTimer::new()); + builder.http2.timer(TokioTimer::new()); - /// Http2 configuration. - pub fn http2(&mut self) -> Http2Builder<'_> { - Http2Builder { inner: self } - } - - /// Bind a connection together with a [`Service`]. - pub fn serve_connection(&self, io: I, service: S) -> Connection<'_, I, S> - where - S: Service, Response = Response>, - S::Future: 'static, - S::Error: Into>, - B: Body + 'static, - B::Error: Into>, - I: Read + Write + Unpin + 'static, - TokioExecutor: HttpServerConnExec, - { - Connection { - state: ConnState::ReadVersion { - read_version: read_version(io), - builder: self, - service: Some(service), - }, - } + builder } /// Bind a connection together with a [`Service`], with the ability to @@ -122,6 +97,51 @@ impl Builder { }, } } + + /// Bind a HTTP2 connection together with a [`Service`]. This requires that the IO object implements `Send`. + pub fn serve_http2_connection( + &self, + io: I, + service: S, + ) -> UpgradeableConnection<'_, I, S> + where + S: Service, Response = Response>, + S::Future: 'static, + S::Error: Into>, + B: Body + 'static, + B::Error: Into>, + I: Read + Write + Unpin + Send + 'static, + TokioExecutor: HttpServerConnExec, + { + let conn = self.http2.serve_connection(Rewind::new(io), service); + UpgradeableConnection { + state: UpgradeableConnState::H2 { conn }, + } + } + + /// Bind a HTTP2 connection together with a [`Service`]. This requires that the IO object implements `Send`. + pub fn serve_http1_connection_with_upgrades( + &self, + io: I, + service: S, + ) -> UpgradeableConnection<'_, I, S> + where + S: Service, Response = Response>, + S::Future: 'static, + S::Error: Into>, + B: Body + 'static, + B::Error: Into>, + I: Read + Write + Unpin + Send + 'static, + TokioExecutor: HttpServerConnExec, + { + let conn = self + .http1 + .serve_connection(Rewind::new(io), service) + .with_upgrades(); + UpgradeableConnection { + state: UpgradeableConnState::H1 { conn }, + } + } } #[derive(Copy, Clone)] @@ -180,7 +200,7 @@ where // We starts as H2 and switch to H1 when we don't get the preface. if buf.filled().len() == len - || &buf.filled()[len..] != &H2_PREFACE[len..buf.filled().len()] + || buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()] { *this.version = Version::H1; break; @@ -233,32 +253,6 @@ pin_project! { } } -impl Connection<'_, I, S> -where - S: HttpService, - S::Error: Into>, - I: Read + Write + Unpin, - B: Body + 'static, - B::Error: Into>, - TokioExecutor: HttpServerConnExec, -{ - /// Start a graceful shutdown process for this connection. - /// - /// This `Connection` should continue to be polled until shutdown can finish. - /// - /// # Note - /// - /// This should only be called while the `Connection` future is still pending. If called after - /// `Connection::poll` has resolved, this does nothing. - pub fn graceful_shutdown(self: Pin<&mut Self>) { - match self.project().state.project() { - ConnStateProj::ReadVersion { .. } => {} - ConnStateProj::H1 { conn } => conn.graceful_shutdown(), - ConnStateProj::H2 { conn } => conn.graceful_shutdown(), - } - } -} - impl Future for Connection<'_, I, S> where S: Service, Response = Response>, @@ -413,284 +407,6 @@ where } } -/// Http1 part of builder. -pub struct Http1Builder<'a> { - inner: &'a mut Builder, -} - -impl Http1Builder<'_> { - /// Http2 configuration. - pub fn http2(&mut self) -> Http2Builder<'_> { - Http2Builder { inner: self.inner } - } - - /// Set whether HTTP/1 connections should support half-closures. - /// - /// Clients can chose to shutdown their write-side while waiting - /// for the server to respond. Setting this to `true` will - /// prevent closing the connection immediately if `read` - /// detects an EOF in the middle of a request. - /// - /// Default is `false`. - pub fn half_close(&mut self, val: bool) -> &mut Self { - self.inner.http1.half_close(val); - self - } - - /// Enables or disables HTTP/1 keep-alive. - /// - /// Default is true. - pub fn keep_alive(&mut self, val: bool) -> &mut Self { - self.inner.http1.keep_alive(val); - self - } - - /// Set whether HTTP/1 connections will write header names as title case at - /// the socket level. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.title_case_headers(enabled); - self - } - - /// Set whether to support preserving original header cases. - /// - /// Currently, this will record the original cases received, and store them - /// in a private extension on the `Request`. It will also look for and use - /// such an extension in any provided `Response`. - /// - /// Since the relevant extension is still private, there is no way to - /// interact with the original cases. The only effect this can have now is - /// to forward the cases in a proxy-like fashion. - /// - /// Note that this setting does not affect HTTP/2. - /// - /// Default is false. - pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.preserve_header_case(enabled); - self - } - - /// Set a timeout for reading client request headers. If a client does not - /// transmit the entire header within this time, the connection is closed. - /// - /// Default is None. - pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { - self.inner.http1.header_read_timeout(read_timeout); - self - } - - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - pub fn writev(&mut self, val: bool) -> &mut Self { - self.inner.http1.writev(val); - self - } - - /// Set the maximum buffer size for the connection. - /// - /// Default is ~400kb. - /// - /// # Panics - /// - /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - pub fn max_buf_size(&mut self, max: usize) -> &mut Self { - self.inner.http1.max_buf_size(max); - self - } - - /// Aggregates flushes to better support pipelined responses. - /// - /// Experimental, may have bugs. - /// - /// Default is false. - pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.pipeline_flush(enabled); - self - } - - /// Set the timer used in background tasks. - pub fn timer(&mut self, timer: M) -> &mut Self - where - M: Timer + Send + Sync + 'static, - { - self.inner.http1.timer(timer); - self - } - - /// Bind a connection together with a [`Service`]. - pub async fn serve_connection(&self, io: I, service: S) -> Result<()> - where - S: Service, Response = Response>, - S::Future: 'static, - S::Error: Into>, - B: Body + 'static, - B::Error: Into>, - I: Read + Write + Unpin + 'static, - TokioExecutor: HttpServerConnExec, - { - self.inner.serve_connection(io, service).await - } -} - -/// Http2 part of builder. -pub struct Http2Builder<'a> { - inner: &'a mut Builder, -} - -impl Http2Builder<'_> { - /// Http1 configuration. - pub fn http1(&mut self) -> Http1Builder<'_> { - Http1Builder { inner: self.inner } - } - - /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 - /// stream-level flow control. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - pub fn initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - self.inner.http2.initial_stream_window_size(sz); - self - } - - /// Sets the max connection-level flow control for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - pub fn initial_connection_window_size(&mut self, sz: impl Into>) -> &mut Self { - self.inner.http2.initial_connection_window_size(sz); - self - } - - /// Sets whether to use an adaptive flow control. - /// - /// Enabling this will override the limits set in - /// `http2_initial_stream_window_size` and - /// `http2_initial_connection_window_size`. - pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { - self.inner.http2.adaptive_window(enabled); - self - } - - /// Sets the maximum frame size to use for HTTP2. - /// - /// Passing `None` will do nothing. - /// - /// If not set, hyper will use a default. - pub fn max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - self.inner.http2.max_frame_size(sz); - self - } - - /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 - /// connections. - /// - /// Default is 200. Passing `None` will remove any limit. - /// - /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - pub fn max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - self.inner.http2.max_concurrent_streams(max); - self - } - - /// Sets an interval for HTTP2 Ping frames should be sent to keep a - /// connection alive. - /// - /// Pass `None` to disable HTTP2 keep-alive. - /// - /// Default is currently disabled. - /// - /// # Cargo Feature - /// - pub fn keep_alive_interval(&mut self, interval: impl Into>) -> &mut Self { - self.inner.http2.keep_alive_interval(interval); - self - } - - /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. - /// - /// If the ping is not acknowledged within the timeout, the connection will - /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. - /// - /// Default is 20 seconds. - /// - /// # Cargo Feature - /// - pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.inner.http2.keep_alive_timeout(timeout); - self - } - - /// Set the maximum write buffer size for each HTTP/2 stream. - /// - /// Default is currently ~400KB, but may change. - /// - /// # Panics - /// - /// The value must be no larger than `u32::MAX`. - pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { - self.inner.http2.max_send_buf_size(max); - self - } - - /// Enables the [extended CONNECT protocol]. - /// - /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - pub fn enable_connect_protocol(&mut self) -> &mut Self { - self.inner.http2.enable_connect_protocol(); - self - } - - /// Sets the max size of received header frames. - /// - /// Default is currently ~16MB, but may change. - pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { - self.inner.http2.max_header_list_size(max); - self - } - - /// Set the timer used in background tasks. - pub fn timer(&mut self, timer: M) -> &mut Self - where - M: Timer + Send + Sync + 'static, - { - self.inner.http2.timer(timer); - self - } - - /// Bind a connection together with a [`Service`]. - pub async fn serve_connection(&self, io: I, service: S) -> Result<()> - where - S: Service, Response = Response>, - S::Future: 'static, - S::Error: Into>, - B: Body + 'static, - B::Error: Into>, - I: Read + Write + Unpin + 'static, - TokioExecutor: HttpServerConnExec, - { - self.inner.serve_connection(io, service).await - } -} - /// Combine a buffer with an IO, rewinding reads to use the buffer. #[derive(Debug)] pub(crate) struct Rewind { @@ -699,7 +415,6 @@ pub(crate) struct Rewind { } impl Rewind { - #[cfg(test)] pub(crate) fn new(io: T) -> Self { Rewind { pre: None, @@ -707,27 +422,12 @@ impl Rewind { } } - #[allow(dead_code)] pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { Rewind { pre: Some(buf), inner: io, } } - - #[cfg(test)] - pub(crate) fn rewind(&mut self, bs: Bytes) { - debug_assert!(self.pre.is_none()); - self.pre = Some(bs); - } - - // pub(crate) fn into_inner(self) -> (T, Bytes) { - // (self.inner, self.pre.unwrap_or_else(Bytes::new)) - // } - - // pub(crate) fn get_mut(&mut self) -> &mut T { - // &mut self.inner - // } } impl Read for Rewind