remove dead code

add support for pre-determined http version
This commit is contained in:
Conrad Ludgate
2024-03-10 08:13:33 +00:00
parent 7afa5b3f35
commit 71fda96c21
2 changed files with 80 additions and 377 deletions

View File

@@ -163,31 +163,34 @@ pub async fn task_main(
}
};
let conn = server.serve_connection_with_upgrades(
hyper_util::rt::TokioIo::new(conn),
hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {
let backend = backend.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let cancellation_handler = cancellation_handler.clone();
let service = hyper1::service::service_fn(move |req: hyper1::Request<Incoming>| {
let backend = backend.clone();
let ws_connections = ws_connections.clone();
let endpoint_rate_limiter = endpoint_rate_limiter.clone();
let cancellation_handler = cancellation_handler.clone();
async move {
Ok::<_, Infallible>(
request_handler(
req,
config,
backend,
ws_connections,
cancellation_handler,
peer_addr,
endpoint_rate_limiter,
)
.await
.map_or_else(api_error_into_response, |r| r),
async move {
Ok::<_, Infallible>(
request_handler(
req,
config,
backend,
ws_connections,
cancellation_handler,
peer_addr,
endpoint_rate_limiter,
)
}
}),
);
.await
.map_or_else(api_error_into_response, |r| r),
)
}
});
let conn = match conn.get_ref().1.alpn_protocol() {
Some(b"http/1.1") => server.serve_http1_connection_with_upgrades(hyper_util::rt::TokioIo::new(conn), service),
Some(b"h2") => server.serve_http2_connection(hyper_util::rt::TokioIo::new(conn), service),
_ => server.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(conn), service)
};
let cancel = pin!(cancellation_token.cancelled());
let conn = pin!(conn);

View File

@@ -5,19 +5,19 @@ use futures::ready;
use hyper1::body::Body;
use hyper1::rt::ReadBufCursor;
use hyper1::service::HttpService;
use hyper_util::rt::TokioExecutor;
use hyper_util::rt::{TokioExecutor, TokioTimer};
use std::future::Future;
use std::marker::PhantomPinned;
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{error::Error as StdError, io, marker::Unpin, time::Duration};
use std::{error::Error as StdError, io, marker::Unpin};
use ::http1::{Request, Response};
use bytes::{Buf, Bytes};
use hyper1::{
body::Incoming,
rt::{Read, ReadBuf, Timer, Write},
rt::{Read, ReadBuf, Write},
service::Service,
};
@@ -61,40 +61,15 @@ impl Builder {
/// auto::Builder::new(TokioExecutor::new());
/// ```
pub fn new() -> Self {
Self {
let mut builder = Self {
http1: http1::Builder::new(),
http2: http2::Builder::new(TokioExecutor::new()),
}
}
};
/// Http1 configuration.
pub fn http1(&mut self) -> Http1Builder<'_> {
Http1Builder { inner: self }
}
builder.http1.timer(TokioTimer::new());
builder.http2.timer(TokioTimer::new());
/// Http2 configuration.
pub fn http2(&mut self) -> Http2Builder<'_> {
Http2Builder { inner: self }
}
/// Bind a connection together with a [`Service`].
pub fn serve_connection<I, S, B>(&self, io: I, service: S) -> Connection<'_, I, S>
where
S: Service<Request<Incoming>, Response = Response<B>>,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + 'static,
TokioExecutor: HttpServerConnExec<S::Future, B>,
{
Connection {
state: ConnState::ReadVersion {
read_version: read_version(io),
builder: self,
service: Some(service),
},
}
builder
}
/// Bind a connection together with a [`Service`], with the ability to
@@ -122,6 +97,51 @@ impl Builder {
},
}
}
/// Bind a HTTP2 connection together with a [`Service`]. This requires that the IO object implements `Send`.
pub fn serve_http2_connection<I, S, B>(
&self,
io: I,
service: S,
) -> UpgradeableConnection<'_, I, S>
where
S: Service<Request<Incoming>, Response = Response<B>>,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + Send + 'static,
TokioExecutor: HttpServerConnExec<S::Future, B>,
{
let conn = self.http2.serve_connection(Rewind::new(io), service);
UpgradeableConnection {
state: UpgradeableConnState::H2 { conn },
}
}
/// Bind a HTTP2 connection together with a [`Service`]. This requires that the IO object implements `Send`.
pub fn serve_http1_connection_with_upgrades<I, S, B>(
&self,
io: I,
service: S,
) -> UpgradeableConnection<'_, I, S>
where
S: Service<Request<Incoming>, Response = Response<B>>,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + Send + 'static,
TokioExecutor: HttpServerConnExec<S::Future, B>,
{
let conn = self
.http1
.serve_connection(Rewind::new(io), service)
.with_upgrades();
UpgradeableConnection {
state: UpgradeableConnState::H1 { conn },
}
}
}
#[derive(Copy, Clone)]
@@ -180,7 +200,7 @@ where
// We starts as H2 and switch to H1 when we don't get the preface.
if buf.filled().len() == len
|| &buf.filled()[len..] != &H2_PREFACE[len..buf.filled().len()]
|| buf.filled()[len..] != H2_PREFACE[len..buf.filled().len()]
{
*this.version = Version::H1;
break;
@@ -233,32 +253,6 @@ pin_project! {
}
}
impl<I, S, B> Connection<'_, I, S>
where
S: HttpService<Incoming, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
TokioExecutor: HttpServerConnExec<S::Future, B>,
{
/// Start a graceful shutdown process for this connection.
///
/// This `Connection` should continue to be polled until shutdown can finish.
///
/// # Note
///
/// This should only be called while the `Connection` future is still pending. If called after
/// `Connection::poll` has resolved, this does nothing.
pub fn graceful_shutdown(self: Pin<&mut Self>) {
match self.project().state.project() {
ConnStateProj::ReadVersion { .. } => {}
ConnStateProj::H1 { conn } => conn.graceful_shutdown(),
ConnStateProj::H2 { conn } => conn.graceful_shutdown(),
}
}
}
impl<I, S, B> Future for Connection<'_, I, S>
where
S: Service<Request<Incoming>, Response = Response<B>>,
@@ -413,284 +407,6 @@ where
}
}
/// Http1 part of builder.
pub struct Http1Builder<'a> {
inner: &'a mut Builder,
}
impl Http1Builder<'_> {
/// Http2 configuration.
pub fn http2(&mut self) -> Http2Builder<'_> {
Http2Builder { inner: self.inner }
}
/// Set whether HTTP/1 connections should support half-closures.
///
/// Clients can chose to shutdown their write-side while waiting
/// for the server to respond. Setting this to `true` will
/// prevent closing the connection immediately if `read`
/// detects an EOF in the middle of a request.
///
/// Default is `false`.
pub fn half_close(&mut self, val: bool) -> &mut Self {
self.inner.http1.half_close(val);
self
}
/// Enables or disables HTTP/1 keep-alive.
///
/// Default is true.
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.inner.http1.keep_alive(val);
self
}
/// Set whether HTTP/1 connections will write header names as title case at
/// the socket level.
///
/// Note that this setting does not affect HTTP/2.
///
/// Default is false.
pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self {
self.inner.http1.title_case_headers(enabled);
self
}
/// Set whether to support preserving original header cases.
///
/// Currently, this will record the original cases received, and store them
/// in a private extension on the `Request`. It will also look for and use
/// such an extension in any provided `Response`.
///
/// Since the relevant extension is still private, there is no way to
/// interact with the original cases. The only effect this can have now is
/// to forward the cases in a proxy-like fashion.
///
/// Note that this setting does not affect HTTP/2.
///
/// Default is false.
pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self {
self.inner.http1.preserve_header_case(enabled);
self
}
/// Set a timeout for reading client request headers. If a client does not
/// transmit the entire header within this time, the connection is closed.
///
/// Default is None.
pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
self.inner.http1.header_read_timeout(read_timeout);
self
}
/// Set whether HTTP/1 connections should try to use vectored writes,
/// or always flatten into a single buffer.
///
/// Note that setting this to false may mean more copies of body data,
/// but may also improve performance when an IO transport doesn't
/// support vectored writes well, such as most TLS implementations.
///
/// Setting this to true will force hyper to use queued strategy
/// which may eliminate unnecessary cloning on some TLS backends
///
/// Default is `auto`. In this mode hyper will try to guess which
/// mode to use
pub fn writev(&mut self, val: bool) -> &mut Self {
self.inner.http1.writev(val);
self
}
/// Set the maximum buffer size for the connection.
///
/// Default is ~400kb.
///
/// # Panics
///
/// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
self.inner.http1.max_buf_size(max);
self
}
/// Aggregates flushes to better support pipelined responses.
///
/// Experimental, may have bugs.
///
/// Default is false.
pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
self.inner.http1.pipeline_flush(enabled);
self
}
/// Set the timer used in background tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Self
where
M: Timer + Send + Sync + 'static,
{
self.inner.http1.timer(timer);
self
}
/// Bind a connection together with a [`Service`].
pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
where
S: Service<Request<Incoming>, Response = Response<B>>,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + 'static,
TokioExecutor: HttpServerConnExec<S::Future, B>,
{
self.inner.serve_connection(io, service).await
}
}
/// Http2 part of builder.
pub struct Http2Builder<'a> {
inner: &'a mut Builder,
}
impl Http2Builder<'_> {
/// Http1 configuration.
pub fn http1(&mut self) -> Http1Builder<'_> {
Http1Builder { inner: self.inner }
}
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
/// stream-level flow control.
///
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.inner.http2.initial_stream_window_size(sz);
self
}
/// Sets the max connection-level flow control for HTTP2.
///
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.inner.http2.initial_connection_window_size(sz);
self
}
/// Sets whether to use an adaptive flow control.
///
/// Enabling this will override the limits set in
/// `http2_initial_stream_window_size` and
/// `http2_initial_connection_window_size`.
pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
self.inner.http2.adaptive_window(enabled);
self
}
/// Sets the maximum frame size to use for HTTP2.
///
/// Passing `None` will do nothing.
///
/// If not set, hyper will use a default.
pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
self.inner.http2.max_frame_size(sz);
self
}
/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
/// connections.
///
/// Default is 200. Passing `None` will remove any limit.
///
/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
self.inner.http2.max_concurrent_streams(max);
self
}
/// Sets an interval for HTTP2 Ping frames should be sent to keep a
/// connection alive.
///
/// Pass `None` to disable HTTP2 keep-alive.
///
/// Default is currently disabled.
///
/// # Cargo Feature
///
pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
self.inner.http2.keep_alive_interval(interval);
self
}
/// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
///
/// If the ping is not acknowledged within the timeout, the connection will
/// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
///
/// Default is 20 seconds.
///
/// # Cargo Feature
///
pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
self.inner.http2.keep_alive_timeout(timeout);
self
}
/// Set the maximum write buffer size for each HTTP/2 stream.
///
/// Default is currently ~400KB, but may change.
///
/// # Panics
///
/// The value must be no larger than `u32::MAX`.
pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
self.inner.http2.max_send_buf_size(max);
self
}
/// Enables the [extended CONNECT protocol].
///
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
pub fn enable_connect_protocol(&mut self) -> &mut Self {
self.inner.http2.enable_connect_protocol();
self
}
/// Sets the max size of received header frames.
///
/// Default is currently ~16MB, but may change.
pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
self.inner.http2.max_header_list_size(max);
self
}
/// Set the timer used in background tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Self
where
M: Timer + Send + Sync + 'static,
{
self.inner.http2.timer(timer);
self
}
/// Bind a connection together with a [`Service`].
pub async fn serve_connection<I, S, B>(&self, io: I, service: S) -> Result<()>
where
S: Service<Request<Incoming>, Response = Response<B>>,
S::Future: 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
I: Read + Write + Unpin + 'static,
TokioExecutor: HttpServerConnExec<S::Future, B>,
{
self.inner.serve_connection(io, service).await
}
}
/// Combine a buffer with an IO, rewinding reads to use the buffer.
#[derive(Debug)]
pub(crate) struct Rewind<T> {
@@ -699,7 +415,6 @@ pub(crate) struct Rewind<T> {
}
impl<T> Rewind<T> {
#[cfg(test)]
pub(crate) fn new(io: T) -> Self {
Rewind {
pre: None,
@@ -707,27 +422,12 @@ impl<T> Rewind<T> {
}
}
#[allow(dead_code)]
pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self {
Rewind {
pre: Some(buf),
inner: io,
}
}
#[cfg(test)]
pub(crate) fn rewind(&mut self, bs: Bytes) {
debug_assert!(self.pre.is_none());
self.pre = Some(bs);
}
// pub(crate) fn into_inner(self) -> (T, Bytes) {
// (self.inner, self.pre.unwrap_or_else(Bytes::new))
// }
// pub(crate) fn get_mut(&mut self) -> &mut T {
// &mut self.inner
// }
}
impl<T> Read for Rewind<T>