diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index fb70f800bf..0b8f42cd46 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -164,10 +164,10 @@ pub async fn task_main( } }; - let (version, conn) =match conn.get_ref().1.alpn_protocol() { - Some(b"http/1.1") => (http_auto::Version::H1, Rewind::new(hyper_util::rt::TokioIo::new(conn))), - Some(b"h2") => (http_auto::Version::H2, Rewind::new(hyper_util::rt::TokioIo::new(conn))), - _ => match http_auto::read_version(hyper_util::rt::TokioIo::new(conn)).await { + let (version, conn) = match conn.get_ref().1.alpn_protocol() { + Some(b"http/1.1") => (http_auto::Version::H1, Rewind::new(conn)), + Some(b"h2") => (http_auto::Version::H2, Rewind::new(conn)), + _ => match http_auto::read_version(conn).await { Ok(v) => v, Err(e) => { tracing::warn!("HTTP connection error {e}"); diff --git a/proxy/src/serverless/http_auto.rs b/proxy/src/serverless/http_auto.rs index caf942f888..d0c4d14d34 100644 --- a/proxy/src/serverless/http_auto.rs +++ b/proxy/src/serverless/http_auto.rs @@ -5,7 +5,7 @@ use futures::ready; use hyper1::body::Body; use hyper1::rt::ReadBufCursor; use hyper1::service::HttpService; -use hyper_util::rt::{TokioExecutor, TokioTimer}; +use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use std::future::Future; use std::marker::PhantomPinned; use std::mem::MaybeUninit; @@ -68,7 +68,7 @@ impl Builder { S::Error: Into>, B: Body + 'static, B::Error: Into>, - I: Read + Write + Unpin + Send + 'static, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, TokioExecutor: Http2ServerConnExec, { match version { @@ -96,10 +96,10 @@ pub(crate) enum Version { pub(crate) fn read_version(io: I) -> ReadVersion where - I: Read + Unpin, + I: tokio::io::AsyncRead + Unpin, { ReadVersion { - io: Some(io), + io: Some(TokioIo::new(io)), buf: [MaybeUninit::uninit(); 24], filled: 0, version: Version::H2, @@ -109,7 +109,7 @@ where pin_project! { pub(crate) struct ReadVersion { - io: Option, + io: Option>, buf: [MaybeUninit; 24], // the amount of `buf` thats been filled filled: usize, @@ -122,7 +122,7 @@ pin_project! { impl Future for ReadVersion where - I: Read + Unpin, + I: tokio::io::AsyncRead + Unpin, { type Output = io::Result<(Version, Rewind)>; @@ -195,7 +195,7 @@ impl UpgradeableConnection where S: HttpService, S::Error: Into>, - I: Read + Write + Unpin, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, B: Body + 'static, B::Error: Into>, TokioExecutor: Http2ServerConnExec, @@ -223,7 +223,7 @@ where S::Error: Into>, B: Body + 'static, B::Error: Into>, - I: Read + Write + Unpin + Send + 'static, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, TokioExecutor: Http2ServerConnExec, { type Output = Result<()>; @@ -241,18 +241,18 @@ where #[derive(Debug)] pub(crate) struct Rewind { pre: Option, - inner: T, + inner: TokioIo, } impl Rewind { pub(crate) fn new(io: T) -> Self { Rewind { pre: None, - inner: io, + inner: TokioIo::new(io), } } - pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { + pub(crate) fn new_buffered(io: TokioIo, buf: Bytes) -> Self { Rewind { pre: Some(buf), inner: io, @@ -262,7 +262,7 @@ impl Rewind { impl Read for Rewind where - T: Read + Unpin, + T: tokio::io::AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, @@ -317,7 +317,7 @@ fn put_slice(cursor: &mut ReadBufCursor<'_>, slice: &[u8]) { impl Write for Rewind where - T: Write + Unpin, + T: tokio::io::AsyncWrite + Unpin, { fn poll_write( mut self: Pin<&mut Self>,