proxy: parse proxy protocol header for health checks

This commit is contained in:
Conrad Ludgate
2024-03-07 12:35:20 +00:00
parent d03ec9d998
commit 84fbfc02ac
3 changed files with 79 additions and 8 deletions

View File

@@ -220,7 +220,7 @@ async fn main() -> anyhow::Result<()> {
// Check that we can bind to address before further initialization
let http_address: SocketAddr = args.http.parse()?;
info!("Starting http on {http_address}");
let http_listener = TcpListener::bind(http_address).await?.into_std()?;
let http_listener = TcpListener::bind(http_address).await?;
let mgmt_address: SocketAddr = args.mgmt.parse()?;
info!("Starting mgmt on {mgmt_address}");

View File

@@ -1,8 +1,21 @@
use anyhow::{anyhow, bail};
use hyper::{Body, Request, Response, StatusCode};
use std::{convert::Infallible, net::TcpListener};
use hyper::{
body::HttpBody,
server::conn::{AddrIncoming, AddrStream},
service::Service,
Body, Request, Response, StatusCode,
};
use routerify::{RequestService, RequestServiceBuilder, Router};
use std::{
convert::Infallible,
future::{ready, Ready},
task::{Context, Poll},
};
use tokio::net::TcpListener;
use tracing::info;
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder, RouterService};
use utils::http::{endpoint, error::ApiError, json::json_response, RouterBuilder};
use crate::protocol2::{ProxyProtocolAccept, WithClientIp, WithConnectionGuard};
async fn status_handler(_: Request<Body>) -> Result<Response<Body>, ApiError> {
json_response(StatusCode::OK, "")
@@ -17,11 +30,56 @@ pub async fn task_main(http_listener: TcpListener) -> anyhow::Result<Infallible>
info!("http has shut down");
}
let service = || RouterService::new(make_router().build()?);
let service = || RouterService2::new(make_router().build()?);
hyper::Server::from_tcp(http_listener)?
let mut addr_incoming = AddrIncoming::from_listener(http_listener)?;
let _ = addr_incoming.set_nodelay(true);
let addr_incoming = ProxyProtocolAccept {
incoming: addr_incoming,
protocol: "health_checks",
};
hyper::Server::builder(addr_incoming)
.serve(service().map_err(|e| anyhow!(e))?)
.await?;
bail!("hyper server without shutdown handling cannot shutdown successfully");
}
#[derive(Debug)]
pub struct RouterService2<B, E> {
builder: RequestServiceBuilder<B, E>,
}
impl<
B: HttpBody + Send + Sync + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
> RouterService2<B, E>
{
/// Creates a new service with the provided router and it's ready to be used with the hyper [`serve`](https://docs.rs/hyper/0.14.4/hyper/server/struct.Builder.html#method.serve)
/// method.
pub fn new(router: Router<B, E>) -> routerify::Result<RouterService2<B, E>> {
let builder = RequestServiceBuilder::new(router)?;
Ok(RouterService2 { builder })
}
}
impl<
B: HttpBody + Send + Sync + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
> Service<&WithConnectionGuard<WithClientIp<AddrStream>>> for RouterService2<B, E>
{
type Response = RequestService<B, E>;
type Error = Infallible;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, conn: &WithConnectionGuard<WithClientIp<AddrStream>>) -> Self::Future {
let req_service = self.builder.build(conn.inner.inner.remote_addr());
ready(Ok(req_service))
}
}

View File

@@ -17,7 +17,7 @@ use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use uuid::Uuid;
use crate::{metrics::NUM_CLIENT_CONNECTION_GAUGE, serverless::tls_listener::AsyncAccept};
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
pub struct ProxyProtocolAccept {
pub incoming: AddrIncoming,
@@ -331,7 +331,20 @@ impl<T: AsyncRead> AsyncRead for WithClientIp<T> {
}
}
impl AsyncAccept for ProxyProtocolAccept {
impl Accept for ProxyProtocolAccept {
type Conn = WithConnectionGuard<WithClientIp<AddrStream>>;
type Error = io::Error;
fn poll_accept(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
<Self as crate::serverless::tls_listener::AsyncAccept>::poll_accept(self, cx)
}
}
impl crate::serverless::tls_listener::AsyncAccept for ProxyProtocolAccept {
type Connection = WithConnectionGuard<WithClientIp<AddrStream>>;
type Error = io::Error;