more refactoring

This commit is contained in:
Conrad Ludgate
2024-10-30 10:19:52 +00:00
parent 2acc621c4c
commit 7c57234de1

View File

@@ -14,6 +14,7 @@ mod local_conn_pool;
mod sql_over_http;
mod websocket;
use std::future::Future;
use std::net::{IpAddr, SocketAddr};
use std::pin::{pin, Pin};
use std::sync::Arc;
@@ -30,8 +31,7 @@ use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Empty};
use hyper::body::Incoming;
use hyper::upgrade::OnUpgrade;
use hyper_util::rt::TokioExecutor;
use hyper_util::server::conn::auto::Builder;
use hyper_util::rt::{TokioExecutor, TokioIo};
use rand::rngs::StdRng;
use rand::SeedableRng;
use smallvec::SmallVec;
@@ -43,7 +43,7 @@ use tokio::time::timeout;
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
use tracing::{info, warn, Instrument};
use tracing::{debug, info, warn, Instrument};
use utils::http::error::ApiError;
use crate::cancellation::CancellationHandlerMain;
@@ -296,6 +296,57 @@ async fn connection_startup(
Some((conn, peer_addr, alpn))
}
trait GracefulShutdown: Future<Output = Result<(), hyper::Error>> + Send {
fn graceful_shutdown(self: Pin<&mut Self>);
}
impl<B, S> GracefulShutdown
for hyper::server::conn::http1::UpgradeableConnection<TokioIo<AsyncRW>, S>
where
S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B> + Send,
S::Future: Send + 'static,
B: hyper::body::Body + Send + 'static,
B::Data: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
fn graceful_shutdown(self: Pin<&mut Self>) {
self.graceful_shutdown();
}
}
impl<B, S> GracefulShutdown for hyper::server::conn::http1::Connection<TokioIo<AsyncRW>, S>
where
S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B> + Send,
S::Future: Send + 'static,
B: hyper::body::Body + Send + 'static,
B::Data: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
fn graceful_shutdown(self: Pin<&mut Self>) {
self.graceful_shutdown();
}
}
impl<B, S> GracefulShutdown
for hyper::server::conn::http2::Connection<TokioIo<AsyncRW>, S, TokioExecutor>
where
S: hyper::service::HttpService<hyper::body::Incoming, ResBody = B> + Send,
S::Future: Send + 'static,
B: hyper::body::Body + Send + 'static,
B::Data: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
fn graceful_shutdown(self: Pin<&mut Self>) {
hyper::server::conn::http2::Connection::graceful_shutdown(self);
}
}
/// Handles HTTP connection
/// 1. With graceful shutdowns
/// 2. With graceful request cancellation with connection failure
@@ -310,7 +361,7 @@ async fn connection_handler(
cancellation_token: CancellationToken,
conn: AsyncRW,
peer_addr: IpAddr,
_alpn: Alpn,
alpn: Alpn,
session_id: uuid::Uuid,
) {
let session_id = AtomicTake::new(session_id);
@@ -322,6 +373,15 @@ async fn connection_handler(
let (ws_tx, mut ws_rx) = mpsc::channel(1);
let auth_backend = backend.auth_backend;
let http2 = match &*alpn {
b"h2" => true,
b"http/1.1" => false,
_ => {
debug!("no alpn negotiated");
false
}
};
let service = hyper::service::service_fn(move |req: hyper::Request<Incoming>| {
// First HTTP request shares the same session ID
let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4);
@@ -372,12 +432,24 @@ async fn connection_handler(
}
});
let server = Builder::new(TokioExecutor::new());
let io = hyper_util::rt::TokioIo::new(conn);
let conn = if http2 {
let conn = hyper::server::conn::http2::Builder::new(TokioExecutor::new())
.serve_connection(io, service);
let conn = server.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(conn), service);
Box::pin(conn) as Pin<Box<dyn GracefulShutdown>>
} else {
let serve = hyper::server::conn::http1::Builder::new().serve_connection(io, service);
if config.http_config.accept_websockets {
Box::pin(serve.with_upgrades()) as Pin<Box<dyn GracefulShutdown>>
} else {
Box::pin(serve) as Pin<Box<dyn GracefulShutdown>>
}
};
// On cancellation, trigger the HTTP connection handler to shut down.
let res = match select(pin!(cancellation_token.cancelled()), pin!(conn)).await {
let res = match select(pin!(cancellation_token.cancelled()), conn).await {
Either::Left((_cancelled, mut conn)) => {
tracing::debug!(%peer_addr, "cancelling connection");
conn.as_mut().graceful_shutdown();