mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
move up ws handling even more
This commit is contained in:
@@ -177,31 +177,49 @@ pub async fn task_main(
|
||||
.client_connections
|
||||
.guard(crate::metrics::Protocol::Http);
|
||||
|
||||
let startup_result = Box::pin(connection_startup(
|
||||
config,
|
||||
tls_acceptor,
|
||||
session_id,
|
||||
conn,
|
||||
peer_addr,
|
||||
))
|
||||
.await;
|
||||
let Some((conn, peer_addr, alpn)) = startup_result else {
|
||||
let startup_result =
|
||||
connection_startup(config, tls_acceptor, session_id, conn, peer_addr)
|
||||
.boxed()
|
||||
.await;
|
||||
let Some(conn) = startup_result else {
|
||||
return;
|
||||
};
|
||||
let (_, peer_addr, _) = conn;
|
||||
|
||||
Box::pin(connection_handler(
|
||||
let ws_upgrade = http_connection_handler(
|
||||
config,
|
||||
backend,
|
||||
connections2,
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
conn_token,
|
||||
conn,
|
||||
peer_addr,
|
||||
alpn,
|
||||
session_id,
|
||||
))
|
||||
)
|
||||
.boxed()
|
||||
.await;
|
||||
|
||||
if let Some((session_id, host, websocket)) = ws_upgrade {
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Ws,
|
||||
&config.region,
|
||||
);
|
||||
|
||||
let ws = websocket::serve_websocket(
|
||||
config,
|
||||
auth_backend,
|
||||
ctx,
|
||||
websocket,
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
host,
|
||||
)
|
||||
.boxed();
|
||||
|
||||
if let Err(e) = ws.await {
|
||||
warn!("error in websocket connection: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(http_conn_span),
|
||||
);
|
||||
@@ -244,6 +262,7 @@ impl MaybeTlsAcceptor for NoTls {
|
||||
}
|
||||
|
||||
type Alpn = SmallVec<[u8; 8]>;
|
||||
type ConnWithInfo = (AsyncRW, IpAddr, Alpn);
|
||||
|
||||
/// Handles the TCP startup lifecycle.
|
||||
/// 1. Parses PROXY protocol V2
|
||||
@@ -254,7 +273,7 @@ async fn connection_startup(
|
||||
session_id: uuid::Uuid,
|
||||
conn: TcpStream,
|
||||
peer_addr: SocketAddr,
|
||||
) -> Option<(AsyncRW, IpAddr, Alpn)> {
|
||||
) -> Option<ConnWithInfo> {
|
||||
// handle PROXY protocol
|
||||
let (conn, peer) = match read_proxy_protocol(conn).await {
|
||||
Ok(c) => c,
|
||||
@@ -328,19 +347,15 @@ impl GracefulShutdown
|
||||
/// 1. With graceful shutdowns
|
||||
/// 2. With graceful request cancellation with connection failure
|
||||
/// 3. With websocket upgrade support.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn connection_handler(
|
||||
async fn http_connection_handler(
|
||||
config: &'static ProxyConfig,
|
||||
backend: Arc<PoolingBackend>,
|
||||
connections: TaskTracker,
|
||||
cancellation_handler: Arc<CancellationHandlerMain>,
|
||||
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
|
||||
cancellation_token: CancellationToken,
|
||||
conn: AsyncRW,
|
||||
peer_addr: IpAddr,
|
||||
alpn: Alpn,
|
||||
conn: ConnWithInfo,
|
||||
session_id: uuid::Uuid,
|
||||
) {
|
||||
) -> Option<WsUpgrade> {
|
||||
let (conn, peer_addr, alpn) = conn;
|
||||
let session_id = AtomicTake::new(session_id);
|
||||
|
||||
// Cancel all current inflight HTTP requests if the HTTP connection is closed.
|
||||
@@ -349,7 +364,6 @@ async fn connection_handler(
|
||||
|
||||
let (ws_tx, ws_rx) = oneshot::channel();
|
||||
let ws_tx = Arc::new(AtomicTake::new(ws_tx));
|
||||
let auth_backend = backend.auth_backend;
|
||||
|
||||
let http2 = match &*alpn {
|
||||
b"h2" => true,
|
||||
@@ -403,35 +417,17 @@ async fn connection_handler(
|
||||
|
||||
match res {
|
||||
Ok(()) => {
|
||||
if let Ok((session_id, host, websocket)) = ws_rx.await {
|
||||
if let Ok(ws_upgrade) = ws_rx.await {
|
||||
tracing::info!(%peer_addr, "connection upgraded to websockets");
|
||||
|
||||
let ctx = RequestMonitoring::new(
|
||||
session_id,
|
||||
peer_addr,
|
||||
crate::metrics::Protocol::Ws,
|
||||
&config.region,
|
||||
);
|
||||
|
||||
if let Err(e) = websocket::serve_websocket(
|
||||
config,
|
||||
auth_backend,
|
||||
ctx,
|
||||
websocket,
|
||||
cancellation_handler,
|
||||
endpoint_rate_limiter,
|
||||
host,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("error in websocket connection: {e:#}");
|
||||
}
|
||||
} else {
|
||||
tracing::info!(%peer_addr, "HTTP connection closed");
|
||||
return Some(ws_upgrade);
|
||||
}
|
||||
tracing::info!(%peer_addr, "HTTP connection closed");
|
||||
}
|
||||
Err(e) => tracing::warn!(%peer_addr, "HTTP connection error {e}"),
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
struct ProxyService {
|
||||
|
||||
Reference in New Issue
Block a user