diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs
index b3d4fc0411..fc10970121 100644
--- a/proxy/src/bin/proxy.rs
+++ b/proxy/src/bin/proxy.rs
@@ -220,7 +220,7 @@ async fn main() -> anyhow::Result<()> {
// Check that we can bind to address before further initialization
let http_address: SocketAddr = args.http.parse()?;
info!("Starting http on {http_address}");
- let http_listener = TcpListener::bind(http_address).await?.into_std()?;
+ let http_listener = TcpListener::bind(http_address).await?;
let mgmt_address: SocketAddr = args.mgmt.parse()?;
info!("Starting mgmt on {mgmt_address}");
diff --git a/proxy/src/http/health_server.rs b/proxy/src/http/health_server.rs
index 6186ddde0d..5e6604561d 100644
--- a/proxy/src/http/health_server.rs
+++ b/proxy/src/http/health_server.rs
@@ -1,8 +1,21 @@
use anyhow::{anyhow, bail};
-use hyper::{Body, Request, Response, StatusCode};
-use std::{convert::Infallible, net::TcpListener};
+use hyper::{
+ body::HttpBody,
+ server::conn::{AddrIncoming, AddrStream},
+ service::Service,
+ Body, Request, Response, StatusCode,
+};
+use routerify::{RequestService, RequestServiceBuilder, Router};
+use std::{
+ convert::Infallible,
+ future::{ready, Ready},
+ task::{Context, Poll},
+};
+use tokio::net::TcpListener;
use tracing::info;
-use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
+use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder};
+
+use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
async fn status_handler(_: Request
) -> Result, ApiError> {
json_response(StatusCode::OK, "")
@@ -17,11 +30,56 @@ pub async fn task_main(http_listener: TcpListener) -> anyhow::Result
info!("http has shut down");
}
- let service = || RouterService::new(make_router().build()?);
+ let service = || RouterService2::new(make_router().build()?);
- hyper::Server::from_tcp(http_listener)?
+ let mut addr_incoming = AddrIncoming::from_listener(http_listener)?;
+ let _ = addr_incoming.set_nodelay(true);
+ let addr_incoming = ProxyProtocolAccept {
+ incoming: addr_incoming,
+ protocol: "health_checks",
+ };
+
+ hyper::Server::builder(addr_incoming)
.serve(service().map_err(|e| anyhow!(e))?)
.await?;
bail!("hyper server without shutdown handling cannot shutdown successfully");
}
+
+#[derive(Debug)]
+pub struct RouterService2 {
+ builder: RequestServiceBuilder,
+}
+
+impl<
+ B: HttpBody + Send + Sync + 'static,
+ E: Into> + 'static,
+ > RouterService2
+{
+ /// Creates a new service with the provided router and it's ready to be used with the hyper [`serve`](https://docs.rs/hyper/0.14.4/hyper/server/struct.Builder.html#method.serve)
+ /// method.
+ pub fn new(router: Router) -> routerify::Result> {
+ let builder = RequestServiceBuilder::new(router)?;
+ Ok(RouterService2 { builder })
+ }
+}
+
+impl<
+ B: HttpBody + Send + Sync + 'static,
+ E: Into> + 'static,
+ > Service<&WithConnectionGuard>> for RouterService2
+{
+ type Response = RequestService;
+ type Error = Infallible;
+ type Future = Ready>;
+
+ fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> {
+ Poll::Ready(Ok(()))
+ }
+
+ fn call(&mut self, conn: &WithConnectionGuard>) -> Self::Future {
+ let req_service = self.builder.build(conn.inner.inner.remote_addr());
+
+ ready(Ok(req_service))
+ }
+}
diff --git a/proxy/src/protocol2.rs b/proxy/src/protocol2.rs
index 3a7aabca32..1807820972 100644
--- a/proxy/src/protocol2.rs
+++ b/proxy/src/protocol2.rs
@@ -17,7 +17,7 @@ use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use uuid::Uuid;
-use crate::{metrics::NUM_CLIENT_CONNECTION_GAUGE, serverless::tls_listener::AsyncAccept};
+use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
pub struct ProxyProtocolAccept {
pub incoming: AddrIncoming,
@@ -331,7 +331,20 @@ impl AsyncRead for WithClientIp {
}
}
-impl AsyncAccept for ProxyProtocolAccept {
+impl Accept for ProxyProtocolAccept {
+ type Conn = WithConnectionGuard>;
+
+ type Error = io::Error;
+
+ fn poll_accept(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll