diff --git a/proxy/src/bin/pglb.rs b/proxy/src/bin/pglb.rs index 175e34e370..a5e56564b1 100644 --- a/proxy/src/bin/pglb.rs +++ b/proxy/src/bin/pglb.rs @@ -1,4 +1,5 @@ use std::{ + convert::Infallible, net::SocketAddr, sync::{Arc, Mutex}, time::Duration, @@ -7,7 +8,11 @@ use std::{ use anyhow::Context; use indexmap::IndexMap; use quinn::{Connection, Endpoint}; -use tokio::time::timeout; +use tokio::{ + net::{TcpListener, TcpStream}, + time::timeout, +}; +use tracing::error; type AuthConnId = usize; struct AuthConnState { @@ -19,8 +24,13 @@ struct AuthConn { // latency info... } +#[global_allocator] +static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; + #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { + let _logging_guard = proxy::logging::init().await?; + let auth_endpoint: Endpoint = endpoint_config("0.0.0.0:5634".parse().unwrap()) .await .unwrap(); @@ -34,9 +44,11 @@ async fn main() { auth_connections.clone(), )); - // tcp listener goes here + let _frontend_handle = tokio::spawn(start_frontend("127.0.0.1:0")); quinn_handle.await.unwrap(); + + Ok(()) } async fn endpoint_config(addr: SocketAddr) -> anyhow::Result { @@ -93,3 +105,34 @@ async fn quinn_server(ep: Endpoint, state: Arc) { }); } } + +async fn start_frontend(addr: &str) -> anyhow::Result { + let addr: SocketAddr = addr.parse()?; + let listener = TcpListener::bind(addr).await?; + socket2::SockRef::from(&listener).set_keepalive(true)?; + + let connections = tokio_util::task::task_tracker::TaskTracker::new(); + + loop { + match listener.accept().await { + Ok((socket, peer_addr)) => { + connections.spawn(handle_frontend_connection(socket, peer_addr)); + } + Err(e) => {} + } + } +} + +async fn handle_frontend_connection(socket: TcpStream, _peer_addr: SocketAddr) { + match socket.set_nodelay(true) { + Ok(()) => {} + Err(e) => { + error!("per-client task finished with an error: failed to set socket option: {e:#}"); + return; + } + }; + + // TODO: HAProxy protocol? +} + +// TODO: client state machine