mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-04 03:52:56 +00:00
[proxy] use the proxy protocol v2 command to silence some logs (#9620)
The PROXY Protocol V2 offers a "command" concept. It can be of two different values. "Local" and "Proxy". The spec suggests that "Local" be used for health-checks. We can thus use this to silence logging for such health checks such as those from NLB. This additionally refactors the flow to be a bit more type-safe, self documenting and using zerocopy deser.
This commit is contained in:
3
Cargo.lock
generated
3
Cargo.lock
generated
@@ -4365,6 +4365,7 @@ dependencies = [
|
||||
"walkdir",
|
||||
"workspace_hack",
|
||||
"x509-parser",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7374,6 +7375,7 @@ dependencies = [
|
||||
"tracing",
|
||||
"tracing-core",
|
||||
"url",
|
||||
"zerocopy",
|
||||
"zeroize",
|
||||
"zstd",
|
||||
"zstd-safe",
|
||||
@@ -7446,6 +7448,7 @@ version = "0.7.31"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
|
||||
@@ -196,6 +196,7 @@ walkdir = "2.3.2"
|
||||
rustls-native-certs = "0.8"
|
||||
x509-parser = "0.16"
|
||||
whoami = "1.5.1"
|
||||
zerocopy = { version = "0.7", features = ["derive"] }
|
||||
|
||||
## TODO replace this with tracing
|
||||
env_logger = "0.10"
|
||||
|
||||
@@ -98,6 +98,7 @@ rustls-native-certs.workspace = true
|
||||
x509-parser.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
redis.workspace = true
|
||||
zerocopy.workspace = true
|
||||
|
||||
# jwt stuff
|
||||
jose-jwa = "0.1.2"
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||
use futures::TryFutureExt;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, Instrument};
|
||||
use tracing::{debug, error, info, Instrument};
|
||||
|
||||
use crate::auth::backend::ConsoleRedirectBackend;
|
||||
use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal};
|
||||
@@ -11,7 +11,7 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::protocol2::{read_proxy_protocol, ConnectionInfo};
|
||||
use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo};
|
||||
use crate::proxy::connect_compute::{connect_to_compute, TcpMechanism};
|
||||
use crate::proxy::handshake::{handshake, HandshakeData};
|
||||
use crate::proxy::passthrough::ProxyPassthrough;
|
||||
@@ -49,7 +49,7 @@ pub async fn task_main(
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
|
||||
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
|
||||
connections.spawn(async move {
|
||||
let (socket, peer_addr) = match read_proxy_protocol(socket).await {
|
||||
@@ -57,16 +57,21 @@ pub async fn task_main(
|
||||
error!("per-client task finished with an error: {e:#}");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
// our load balancers will not send any more data. let's just exit immediately
|
||||
Ok((_socket, ConnectHeader::Local)) => {
|
||||
debug!("healthcheck received");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, ConnectHeader::Missing)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
error!("missing required proxy protocol header");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
Ok((_socket, ConnectHeader::Proxy(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
error!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(info))) => (socket, info),
|
||||
Ok((socket, None)) => (socket, ConnectionInfo{ addr: peer_addr, extra: None }),
|
||||
Ok((socket, ConnectHeader::Proxy(info))) => (socket, info),
|
||||
Ok((socket, ConnectHeader::Missing)) => (socket, ConnectionInfo{ addr: peer_addr, extra: None }),
|
||||
};
|
||||
|
||||
match socket.inner.set_nodelay(true) {
|
||||
|
||||
@@ -11,6 +11,7 @@ use bytes::{Buf, Bytes, BytesMut};
|
||||
use pin_project_lite::pin_project;
|
||||
use strum_macros::FromRepr;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
|
||||
use zerocopy::{FromBytes, FromZeroes};
|
||||
|
||||
pin_project! {
|
||||
/// A chained [`AsyncRead`] with [`AsyncWrite`] passthrough
|
||||
@@ -57,16 +58,31 @@ impl<T: AsyncWrite> AsyncWrite for ChainRW<T> {
|
||||
}
|
||||
|
||||
/// Proxy Protocol Version 2 Header
|
||||
const HEADER: [u8; 12] = [
|
||||
const SIGNATURE: [u8; 12] = [
|
||||
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
|
||||
];
|
||||
|
||||
const LOCAL_V2: u8 = 0x20;
|
||||
const PROXY_V2: u8 = 0x21;
|
||||
|
||||
const TCP_OVER_IPV4: u8 = 0x11;
|
||||
const UDP_OVER_IPV4: u8 = 0x12;
|
||||
const TCP_OVER_IPV6: u8 = 0x21;
|
||||
const UDP_OVER_IPV6: u8 = 0x22;
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug)]
|
||||
pub struct ConnectionInfo {
|
||||
pub addr: SocketAddr,
|
||||
pub extra: Option<ConnectionInfoExtra>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Eq, Clone, Debug)]
|
||||
pub enum ConnectHeader {
|
||||
Missing,
|
||||
Local,
|
||||
Proxy(ConnectionInfo),
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionInfo {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match &self.extra {
|
||||
@@ -89,96 +105,31 @@ pub enum ConnectionInfoExtra {
|
||||
|
||||
pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
|
||||
mut read: T,
|
||||
) -> std::io::Result<(ChainRW<T>, Option<ConnectionInfo>)> {
|
||||
) -> std::io::Result<(ChainRW<T>, ConnectHeader)> {
|
||||
let mut buf = BytesMut::with_capacity(128);
|
||||
while buf.len() < 16 {
|
||||
let header = loop {
|
||||
let bytes_read = read.read_buf(&mut buf).await?;
|
||||
|
||||
// exit for bad header
|
||||
let len = usize::min(buf.len(), HEADER.len());
|
||||
if buf[..len] != HEADER[..len] {
|
||||
return Ok((ChainRW { inner: read, buf }, None));
|
||||
// exit for bad header signature
|
||||
let len = usize::min(buf.len(), SIGNATURE.len());
|
||||
if buf[..len] != SIGNATURE[..len] {
|
||||
return Ok((ChainRW { inner: read, buf }, ConnectHeader::Missing));
|
||||
}
|
||||
|
||||
// if no more bytes available then exit
|
||||
if bytes_read == 0 {
|
||||
return Ok((ChainRW { inner: read, buf }, None));
|
||||
return Ok((ChainRW { inner: read, buf }, ConnectHeader::Missing));
|
||||
};
|
||||
}
|
||||
|
||||
let header = buf.split_to(16);
|
||||
|
||||
// The next byte (the 13th one) is the protocol version and command.
|
||||
// The highest four bits contains the version. As of this specification, it must
|
||||
// always be sent as \x2 and the receiver must only accept this value.
|
||||
let vc = header[12];
|
||||
let version = vc >> 4;
|
||||
let command = vc & 0b1111;
|
||||
if version != 2 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol version. expected version 2",
|
||||
));
|
||||
}
|
||||
match command {
|
||||
// the connection was established on purpose by the proxy
|
||||
// without being relayed. The connection endpoints are the sender and the
|
||||
// receiver. Such connections exist when the proxy sends health-checks to the
|
||||
// server. The receiver must accept this connection as valid and must use the
|
||||
// real connection endpoints and discard the protocol block including the
|
||||
// family which is ignored.
|
||||
0 => {}
|
||||
// the connection was established on behalf of another node,
|
||||
// and reflects the original connection endpoints. The receiver must then use
|
||||
// the information provided in the protocol block to get original the address.
|
||||
1 => {}
|
||||
// other values are unassigned and must not be emitted by senders. Receivers
|
||||
// must drop connections presenting unexpected values here.
|
||||
_ => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol command. expected local (0) or proxy (1)",
|
||||
))
|
||||
// check if we have enough bytes to continue
|
||||
if let Some(header) = buf.try_get::<ProxyProtocolV2Header>() {
|
||||
break header;
|
||||
}
|
||||
};
|
||||
|
||||
// The 14th byte contains the transport protocol and address family. The highest 4
|
||||
// bits contain the address family, the lowest 4 bits contain the protocol.
|
||||
let ft = header[13];
|
||||
let address_length = match ft {
|
||||
// - \x11 : TCP over IPv4 : the forwarded connection uses TCP over the AF_INET
|
||||
// protocol family. Address length is 2*4 + 2*2 = 12 bytes.
|
||||
// - \x12 : UDP over IPv4 : the forwarded connection uses UDP over the AF_INET
|
||||
// protocol family. Address length is 2*4 + 2*2 = 12 bytes.
|
||||
0x11 | 0x12 => 12,
|
||||
// - \x21 : TCP over IPv6 : the forwarded connection uses TCP over the AF_INET6
|
||||
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
|
||||
// - \x22 : UDP over IPv6 : the forwarded connection uses UDP over the AF_INET6
|
||||
// protocol family. Address length is 2*16 + 2*2 = 36 bytes.
|
||||
0x21 | 0x22 => 36,
|
||||
// unspecified or unix stream. ignore the addresses
|
||||
_ => 0,
|
||||
};
|
||||
let remaining_length = usize::from(header.len.get());
|
||||
|
||||
// The 15th and 16th bytes is the address length in bytes in network endian order.
|
||||
// It is used so that the receiver knows how many address bytes to skip even when
|
||||
// it does not implement the presented protocol. Thus the length of the protocol
|
||||
// header in bytes is always exactly 16 + this value. When a sender presents a
|
||||
// LOCAL connection, it should not present any address so it sets this field to
|
||||
// zero. Receivers MUST always consider this field to skip the appropriate number
|
||||
// of bytes and must not assume zero is presented for LOCAL connections. When a
|
||||
// receiver accepts an incoming connection showing an UNSPEC address family or
|
||||
// protocol, it may or may not decide to log the address information if present.
|
||||
let remaining_length = u16::from_be_bytes(header[14..16].try_into().unwrap());
|
||||
if remaining_length < address_length {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol length. not enough to fit requested IP addresses",
|
||||
));
|
||||
}
|
||||
drop(header);
|
||||
|
||||
while buf.len() < remaining_length as usize {
|
||||
while buf.len() < remaining_length {
|
||||
if read.read_buf(&mut buf).await? == 0 {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
@@ -186,36 +137,69 @@ pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
|
||||
));
|
||||
}
|
||||
}
|
||||
let payload = buf.split_to(remaining_length);
|
||||
|
||||
// Starting from the 17th byte, addresses are presented in network byte order.
|
||||
// The address order is always the same :
|
||||
// - source layer 3 address in network byte order
|
||||
// - destination layer 3 address in network byte order
|
||||
// - source layer 4 address if any, in network byte order (port)
|
||||
// - destination layer 4 address if any, in network byte order (port)
|
||||
let mut header = buf.split_to(usize::from(remaining_length));
|
||||
let mut addr = header.split_to(usize::from(address_length));
|
||||
let socket = match addr.len() {
|
||||
12 => {
|
||||
let src_addr = Ipv4Addr::from_bits(addr.get_u32());
|
||||
let _dst_addr = Ipv4Addr::from_bits(addr.get_u32());
|
||||
let src_port = addr.get_u16();
|
||||
let _dst_port = addr.get_u16();
|
||||
Some(SocketAddr::from((src_addr, src_port)))
|
||||
let res = process_proxy_payload(header, payload)?;
|
||||
Ok((ChainRW { inner: read, buf }, res))
|
||||
}
|
||||
|
||||
fn process_proxy_payload(
|
||||
header: ProxyProtocolV2Header,
|
||||
mut payload: BytesMut,
|
||||
) -> std::io::Result<ConnectHeader> {
|
||||
match header.version_and_command {
|
||||
// the connection was established on purpose by the proxy
|
||||
// without being relayed. The connection endpoints are the sender and the
|
||||
// receiver. Such connections exist when the proxy sends health-checks to the
|
||||
// server. The receiver must accept this connection as valid and must use the
|
||||
// real connection endpoints and discard the protocol block including the
|
||||
// family which is ignored.
|
||||
LOCAL_V2 => return Ok(ConnectHeader::Local),
|
||||
// the connection was established on behalf of another node,
|
||||
// and reflects the original connection endpoints. The receiver must then use
|
||||
// the information provided in the protocol block to get original the address.
|
||||
PROXY_V2 => {}
|
||||
// other values are unassigned and must not be emitted by senders. Receivers
|
||||
// must drop connections presenting unexpected values here.
|
||||
#[rustfmt::skip] // https://github.com/rust-lang/rustfmt/issues/6384
|
||||
_ => return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"invalid proxy protocol command 0x{:02X}. expected local (0x20) or proxy (0x21)",
|
||||
header.version_and_command
|
||||
),
|
||||
)),
|
||||
};
|
||||
|
||||
let size_err =
|
||||
"invalid proxy protocol length. payload not large enough to fit requested IP addresses";
|
||||
let addr = match header.protocol_and_family {
|
||||
TCP_OVER_IPV4 | UDP_OVER_IPV4 => {
|
||||
let addr = payload
|
||||
.try_get::<ProxyProtocolV2HeaderV4>()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
|
||||
|
||||
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
|
||||
}
|
||||
36 => {
|
||||
let src_addr = Ipv6Addr::from_bits(addr.get_u128());
|
||||
let _dst_addr = Ipv6Addr::from_bits(addr.get_u128());
|
||||
let src_port = addr.get_u16();
|
||||
let _dst_port = addr.get_u16();
|
||||
Some(SocketAddr::from((src_addr, src_port)))
|
||||
TCP_OVER_IPV6 | UDP_OVER_IPV6 => {
|
||||
let addr = payload
|
||||
.try_get::<ProxyProtocolV2HeaderV6>()
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, size_err))?;
|
||||
|
||||
SocketAddr::from((addr.src_addr.get(), addr.src_port.get()))
|
||||
}
|
||||
// unspecified or unix stream. ignore the addresses
|
||||
_ => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
"invalid proxy protocol address family/transport protocol.",
|
||||
))
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mut extra = None;
|
||||
|
||||
while let Some(mut tlv) = read_tlv(&mut header) {
|
||||
while let Some(mut tlv) = read_tlv(&mut payload) {
|
||||
match Pp2Kind::from_repr(tlv.kind) {
|
||||
Some(Pp2Kind::Aws) => {
|
||||
if tlv.value.is_empty() {
|
||||
@@ -259,9 +243,7 @@ pub(crate) async fn read_proxy_protocol<T: AsyncRead + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
let conn_info = socket.map(|addr| ConnectionInfo { addr, extra });
|
||||
|
||||
Ok((ChainRW { inner: read, buf }, conn_info))
|
||||
Ok(ConnectHeader::Proxy(ConnectionInfo { addr, extra }))
|
||||
}
|
||||
|
||||
#[derive(FromRepr, Debug, Copy, Clone)]
|
||||
@@ -337,27 +319,93 @@ struct Tlv {
|
||||
}
|
||||
|
||||
fn read_tlv(b: &mut BytesMut) -> Option<Tlv> {
|
||||
if b.len() < 3 {
|
||||
return None;
|
||||
}
|
||||
let kind = b.get_u8();
|
||||
let len = usize::from(b.get_u16());
|
||||
let tlv_header = b.try_get::<TlvHeader>()?;
|
||||
let len = usize::from(tlv_header.len.get());
|
||||
if b.len() < len {
|
||||
return None;
|
||||
}
|
||||
let value = b.split_to(len).freeze();
|
||||
Some(Tlv { kind, value })
|
||||
Some(Tlv {
|
||||
kind: tlv_header.kind,
|
||||
value: b.split_to(len).freeze(),
|
||||
})
|
||||
}
|
||||
|
||||
trait BufExt: Sized {
|
||||
fn try_get<T: FromBytes>(&mut self) -> Option<T>;
|
||||
}
|
||||
impl BufExt for BytesMut {
|
||||
fn try_get<T: FromBytes>(&mut self) -> Option<T> {
|
||||
let res = T::read_from_prefix(self)?;
|
||||
self.advance(size_of::<T>());
|
||||
Some(res)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
struct ProxyProtocolV2Header {
|
||||
signature: [u8; 12],
|
||||
version_and_command: u8,
|
||||
protocol_and_family: u8,
|
||||
len: zerocopy::byteorder::network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
struct ProxyProtocolV2HeaderV4 {
|
||||
src_addr: NetworkEndianIpv4,
|
||||
dst_addr: NetworkEndianIpv4,
|
||||
src_port: zerocopy::byteorder::network_endian::U16,
|
||||
dst_port: zerocopy::byteorder::network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
struct ProxyProtocolV2HeaderV6 {
|
||||
src_addr: NetworkEndianIpv6,
|
||||
dst_addr: NetworkEndianIpv6,
|
||||
src_port: zerocopy::byteorder::network_endian::U16,
|
||||
dst_port: zerocopy::byteorder::network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(C)]
|
||||
struct TlvHeader {
|
||||
kind: u8,
|
||||
len: zerocopy::byteorder::network_endian::U16,
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(transparent)]
|
||||
struct NetworkEndianIpv4(zerocopy::byteorder::network_endian::U32);
|
||||
impl NetworkEndianIpv4 {
|
||||
#[inline]
|
||||
fn get(self) -> Ipv4Addr {
|
||||
Ipv4Addr::from_bits(self.0.get())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(FromBytes, FromZeroes, Copy, Clone)]
|
||||
#[repr(transparent)]
|
||||
struct NetworkEndianIpv6(zerocopy::byteorder::network_endian::U128);
|
||||
impl NetworkEndianIpv6 {
|
||||
#[inline]
|
||||
fn get(self) -> Ipv6Addr {
|
||||
Ipv6Addr::from_bits(self.0.get())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use crate::protocol2::read_proxy_protocol;
|
||||
use crate::protocol2::{
|
||||
read_proxy_protocol, ConnectHeader, LOCAL_V2, PROXY_V2, TCP_OVER_IPV4, UDP_OVER_IPV6,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ipv4() {
|
||||
let header = super::HEADER
|
||||
let header = super::SIGNATURE
|
||||
// Proxy command, IPV4 | TCP
|
||||
.chain([(2 << 4) | 1, (1 << 4) | 1].as_slice())
|
||||
// 12 + 3 bytes
|
||||
@@ -384,15 +432,17 @@ mod tests {
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let info = info.unwrap();
|
||||
let ConnectHeader::Proxy(info) = info else {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(info.addr, ([127, 0, 0, 1], 65535).into());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ipv6() {
|
||||
let header = super::HEADER
|
||||
let header = super::SIGNATURE
|
||||
// Proxy command, IPV6 | UDP
|
||||
.chain([(2 << 4) | 1, (2 << 4) | 2].as_slice())
|
||||
.chain([PROXY_V2, UDP_OVER_IPV6].as_slice())
|
||||
// 36 + 3 bytes
|
||||
.chain([0, 39].as_slice())
|
||||
// src ip
|
||||
@@ -417,7 +467,9 @@ mod tests {
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let info = info.unwrap();
|
||||
let ConnectHeader::Proxy(info) = info else {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(
|
||||
info.addr,
|
||||
([15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0], 257).into()
|
||||
@@ -433,7 +485,7 @@ mod tests {
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(info, None);
|
||||
assert_eq!(info, ConnectHeader::Missing);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -445,7 +497,7 @@ mod tests {
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
assert_eq!(bytes, data);
|
||||
assert_eq!(info, None);
|
||||
assert_eq!(info, ConnectHeader::Missing);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -454,9 +506,9 @@ mod tests {
|
||||
let tlv_len = (tlv.len() as u16).to_be_bytes();
|
||||
let len = (12 + 3 + tlv.len() as u16).to_be_bytes();
|
||||
|
||||
let header = super::HEADER
|
||||
let header = super::SIGNATURE
|
||||
// Proxy command, Inet << 4 | Stream
|
||||
.chain([(2 << 4) | 1, (1 << 4) | 1].as_slice())
|
||||
.chain([PROXY_V2, TCP_OVER_IPV4].as_slice())
|
||||
// 12 + 3 bytes
|
||||
.chain(len.as_slice())
|
||||
// src ip
|
||||
@@ -483,7 +535,30 @@ mod tests {
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let info = info.unwrap();
|
||||
let ConnectHeader::Proxy(info) = info else {
|
||||
panic!()
|
||||
};
|
||||
assert_eq!(info.addr, ([55, 56, 57, 58], 65535).into());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_local() {
|
||||
let len = 0u16.to_be_bytes();
|
||||
let header = super::SIGNATURE
|
||||
.chain([LOCAL_V2, 0x00].as_slice())
|
||||
.chain(len.as_slice());
|
||||
|
||||
let extra_data = [0xaa; 256];
|
||||
|
||||
let (mut read, info) = read_proxy_protocol(header.chain(extra_data.as_slice()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut bytes = vec![];
|
||||
read.read_to_end(&mut bytes).await.unwrap();
|
||||
|
||||
assert_eq!(bytes, extra_data);
|
||||
|
||||
let ConnectHeader::Local = info else { panic!() };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use smol_str::{format_smolstr, SmolStr};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, warn, Instrument};
|
||||
use tracing::{debug, error, info, warn, Instrument};
|
||||
|
||||
use self::connect_compute::{connect_to_compute, TcpMechanism};
|
||||
use self::passthrough::ProxyPassthrough;
|
||||
@@ -28,7 +28,7 @@ use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::error::ReportableError;
|
||||
use crate::metrics::{Metrics, NumClientConnectionsGuard};
|
||||
use crate::protocol2::{read_proxy_protocol, ConnectionInfo};
|
||||
use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo};
|
||||
use crate::proxy::handshake::{handshake, HandshakeData};
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::stream::{PqStream, Stream};
|
||||
@@ -83,7 +83,7 @@ pub async fn task_main(
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
let cancellation_handler = Arc::clone(&cancellation_handler);
|
||||
|
||||
tracing::info!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
debug!(protocol = "tcp", %session_id, "accepted new TCP connection");
|
||||
let endpoint_rate_limiter2 = endpoint_rate_limiter.clone();
|
||||
|
||||
connections.spawn(async move {
|
||||
@@ -92,16 +92,21 @@ pub async fn task_main(
|
||||
warn!("per-client task finished with an error: {e:#}");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, None)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
// our load balancers will not send any more data. let's just exit immediately
|
||||
Ok((_socket, ConnectHeader::Local)) => {
|
||||
debug!("healthcheck received");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, ConnectHeader::Missing)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
warn!("missing required proxy protocol header");
|
||||
return;
|
||||
}
|
||||
Ok((_socket, Some(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
Ok((_socket, ConnectHeader::Proxy(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
warn!("proxy protocol header not supported");
|
||||
return;
|
||||
}
|
||||
Ok((socket, Some(info))) => (socket, info),
|
||||
Ok((socket, None)) => (socket, ConnectionInfo { addr: peer_addr, extra: None }),
|
||||
Ok((socket, ConnectHeader::Proxy(info))) => (socket, info),
|
||||
Ok((socket, ConnectHeader::Missing)) => (socket, ConnectionInfo { addr: peer_addr, extra: None }),
|
||||
};
|
||||
|
||||
match socket.inner.set_nodelay(true) {
|
||||
|
||||
@@ -47,7 +47,7 @@ use crate::cancellation::CancellationHandlerMain;
|
||||
use crate::config::{ProxyConfig, ProxyProtocolV2};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::Metrics;
|
||||
use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectionInfo};
|
||||
use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo};
|
||||
use crate::proxy::run_until_cancelled;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::serverless::backend::PoolingBackend;
|
||||
@@ -251,16 +251,21 @@ async fn connection_startup(
|
||||
};
|
||||
|
||||
let conn_info = match peer {
|
||||
None if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
// our load balancers will not send any more data. let's just exit immediately
|
||||
ConnectHeader::Local => {
|
||||
tracing::debug!("healthcheck received");
|
||||
return None;
|
||||
}
|
||||
ConnectHeader::Missing if config.proxy_protocol_v2 == ProxyProtocolV2::Required => {
|
||||
tracing::warn!("missing required proxy protocol header");
|
||||
return None;
|
||||
}
|
||||
Some(_) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
ConnectHeader::Proxy(_) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => {
|
||||
tracing::warn!("proxy protocol header not supported");
|
||||
return None;
|
||||
}
|
||||
Some(info) => info,
|
||||
None => ConnectionInfo {
|
||||
ConnectHeader::Proxy(info) => info,
|
||||
ConnectHeader::Missing => ConnectionInfo {
|
||||
addr: peer_addr,
|
||||
extra: None,
|
||||
},
|
||||
|
||||
@@ -88,6 +88,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
tracing-core = { version = "0.1" }
|
||||
url = { version = "2", features = ["serde"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zeroize = { version = "1", features = ["derive", "serde"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
@@ -126,6 +127,7 @@ serde = { version = "1", features = ["alloc", "derive"] }
|
||||
syn = { version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
|
||||
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
zerocopy = { version = "0.7", features = ["derive", "simd"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
Reference in New Issue
Block a user