From 84fbfc02ac3bfc97e51ac4331e4c122ff4112215 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 7 Mar 2024 12:35:20 +0000 Subject: [PATCH] proxy: parse proxy protocol header for health checks --- proxy/src/bin/proxy.rs | 2 +- proxy/src/http/health_server.rs | 68 ++++++++++++++++++++++++++++++--- proxy/src/protocol2.rs | 17 ++++++++- 3 files changed, 79 insertions(+), 8 deletions(-) diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index b3d4fc0411..fc10970121 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -220,7 +220,7 @@ async fn main() -> anyhow::Result<()> { // Check that we can bind to address before further initialization let http_address: SocketAddr = args.http.parse()?; info!("Starting http on {http_address}"); - let http_listener = TcpListener::bind(http_address).await?.into_std()?; + let http_listener = TcpListener::bind(http_address).await?; let mgmt_address: SocketAddr = args.mgmt.parse()?; info!("Starting mgmt on {mgmt_address}"); diff --git a/proxy/src/http/health_server.rs b/proxy/src/http/health_server.rs index 6186ddde0d..5e6604561d 100644 --- a/proxy/src/http/health_server.rs +++ b/proxy/src/http/health_server.rs @@ -1,8 +1,21 @@ use anyhow::{anyhow, bail}; -use hyper::{Body, Request, Response, StatusCode}; -use std::{convert::Infallible, net::TcpListener}; +use hyper::{ + body::HttpBody, + server::conn::{AddrIncoming, AddrStream}, + service::Service, + Body, Request, Response, StatusCode, +}; +use routerify::{RequestService, RequestServiceBuilder, Router}; +use std::{ + convert::Infallible, + future::{ready, Ready}, + task::{Context, Poll}, +}; +use tokio::net::TcpListener; use tracing::info; -use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService}; +use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder}; + +use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard}; async fn status_handler(_: Request) -> Result, ApiError> { json_response(StatusCode::OK, "") @@ -17,11 +30,56 @@ pub async fn task_main(http_listener: TcpListener) -> anyhow::Result info!("http has shut down"); } - let service = || RouterService::new(make_router().build()?); + let service = || RouterService2::new(make_router().build()?); - hyper::Server::from_tcp(http_listener)? + let mut addr_incoming = AddrIncoming::from_listener(http_listener)?; + let _ = addr_incoming.set_nodelay(true); + let addr_incoming = ProxyProtocolAccept { + incoming: addr_incoming, + protocol: "health_checks", + }; + + hyper::Server::builder(addr_incoming) .serve(service().map_err(|e| anyhow!(e))?) .await?; bail!("hyper server without shutdown handling cannot shutdown successfully"); } + +#[derive(Debug)] +pub struct RouterService2 { + builder: RequestServiceBuilder, +} + +impl< + B: HttpBody + Send + Sync + 'static, + E: Into> + 'static, + > RouterService2 +{ + /// Creates a new service with the provided router and it's ready to be used with the hyper [`serve`](https://docs.rs/hyper/0.14.4/hyper/server/struct.Builder.html#method.serve) + /// method. + pub fn new(router: Router) -> routerify::Result> { + let builder = RequestServiceBuilder::new(router)?; + Ok(RouterService2 { builder }) + } +} + +impl< + B: HttpBody + Send + Sync + 'static, + E: Into> + 'static, + > Service<&WithConnectionGuard>> for RouterService2 +{ + type Response = RequestService; + type Error = Infallible; + type Future = Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, conn: &WithConnectionGuard>) -> Self::Future { + let req_service = self.builder.build(conn.inner.inner.remote_addr()); + + ready(Ok(req_service)) + } +} diff --git a/proxy/src/protocol2.rs b/proxy/src/protocol2.rs index 3a7aabca32..1807820972 100644 --- a/proxy/src/protocol2.rs +++ b/proxy/src/protocol2.rs @@ -17,7 +17,7 @@ use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; use uuid::Uuid; -use crate::{metrics::NUM_CLIENT_CONNECTION_GAUGE, serverless::tls_listener::AsyncAccept}; +use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE; pub struct ProxyProtocolAccept { pub incoming: AddrIncoming, @@ -331,7 +331,20 @@ impl AsyncRead for WithClientIp { } } -impl AsyncAccept for ProxyProtocolAccept { +impl Accept for ProxyProtocolAccept { + type Conn = WithConnectionGuard>; + + type Error = io::Error; + + fn poll_accept( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + ::poll_accept(self, cx) + } +} + +impl crate::serverless::tls_listener::AsyncAccept for ProxyProtocolAccept { type Connection = WithConnectionGuard>; type Error = io::Error;