mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-05 06:20:37 +00:00
proxy: more connection metrics (#5464)
## Problem Hard to tell 1. How many clients are connected to proxy 2. How many requests clients are making 3. How many connections are made to a database 1 and 2 are different because of the properties of HTTP. We have 2 already tracked through `proxy_accepted_connections_total` and `proxy_closed_connections_total`, but nothing for 1 and 3 ## Summary of changes Adds 2 new counter gauges. * `proxy_opened_client_connections_total`,`proxy_closed_client_connections_total` - how many client connections are open to proxy * `proxy_opened_db_connections_total`,`proxy_closed_db_connections_total` - how many active connections are made through to a database. For TCP and Websockets, we expect all 3 of these quantities to be roughly the same, barring users connecting but with invalid details. For HTTP: * client_connections/connections can differ because the client connections can be reused. * connections/db_connections can differ because of connection pooling.
This commit is contained in:
@@ -20,6 +20,7 @@ use tokio_postgres::AsyncMessage;
|
||||
use crate::{
|
||||
auth, console,
|
||||
metrics::{Ids, MetricCounter, USAGE_METRICS},
|
||||
proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER},
|
||||
};
|
||||
use crate::{compute, config};
|
||||
|
||||
@@ -418,36 +419,42 @@ async fn connect_to_compute_once(
|
||||
};
|
||||
|
||||
tokio::spawn(
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
async move {
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
}
|
||||
poll_fn(move |cx| {
|
||||
if matches!(rx.has_changed(), Ok(true)) {
|
||||
session = *rx.borrow_and_update();
|
||||
info!(%session, "changed session");
|
||||
}
|
||||
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
loop {
|
||||
let message = ready!(connection.poll_message(cx));
|
||||
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
match message {
|
||||
Some(Ok(AsyncMessage::Notice(notice))) => {
|
||||
info!(%session, "notice: {}", notice);
|
||||
}
|
||||
Some(Ok(AsyncMessage::Notification(notif))) => {
|
||||
warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received");
|
||||
}
|
||||
Some(Ok(_)) => {
|
||||
warn!(%session, "unknown message");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!(%session, "connection error: {}", e);
|
||||
return Poll::Ready(())
|
||||
}
|
||||
None => {
|
||||
info!("connection closed");
|
||||
return Poll::Ready(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}).await
|
||||
}
|
||||
.instrument(span)
|
||||
);
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ use url::Url;
|
||||
use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
|
||||
use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER};
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
|
||||
@@ -245,6 +247,13 @@ async fn handle_inner(
|
||||
conn_pool: Arc<GlobalConnPool>,
|
||||
session_id: uuid::Uuid,
|
||||
) -> anyhow::Result<Response<Body>> {
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc();
|
||||
}
|
||||
|
||||
//
|
||||
// Determine the destination and connection params
|
||||
//
|
||||
|
||||
@@ -3,7 +3,10 @@ use crate::{
|
||||
config::ProxyConfig,
|
||||
error::io_error,
|
||||
protocol2::{ProxyProtocolAccept, WithClientIp},
|
||||
proxy::{handle_client, ClientMode},
|
||||
proxy::{
|
||||
handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER,
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER,
|
||||
},
|
||||
};
|
||||
use bytes::{Buf, Bytes};
|
||||
use futures::{Sink, Stream, StreamExt};
|
||||
@@ -275,23 +278,25 @@ pub async fn task_main(
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn(
|
||||
move |req: Request<Body>| {
|
||||
let sni_name = sni_name.clone();
|
||||
let conn_pool = conn_pool.clone();
|
||||
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
async move {
|
||||
let cancel_map = Arc::new(CancelMap::default());
|
||||
let session_id = uuid::Uuid::new_v4();
|
||||
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
}))
|
||||
ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name)
|
||||
.instrument(info_span!(
|
||||
"ws-client",
|
||||
session = %session_id,
|
||||
%peer_addr,
|
||||
))
|
||||
.await
|
||||
}
|
||||
},
|
||||
)))
|
||||
}
|
||||
},
|
||||
);
|
||||
@@ -303,3 +308,41 @@ pub async fn task_main(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct MetricService<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> MetricService<S> {
|
||||
fn new(inner: S) -> MetricService<S> {
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
MetricService { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Drop for MetricService<S> {
|
||||
fn drop(&mut self) {
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER
|
||||
.with_label_values(&["http"])
|
||||
.inc();
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, ReqBody> hyper::service::Service<Request<ReqBody>> for MetricService<S>
|
||||
where
|
||||
S: hyper::service::Service<Request<ReqBody>>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.inner.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Request<ReqBody>) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,19 +39,55 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
|
||||
static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of TCP client connections accepted.",
|
||||
"proxy_opened_db_connections_total",
|
||||
"Number of opened connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_db_connections_total",
|
||||
"Number of closed connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_opened_client_connections_total",
|
||||
"Number of opened connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_client_connections_total",
|
||||
"Number of closed connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of client connections accepted.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_closed_connections_total",
|
||||
"Number of TCP client connections closed.",
|
||||
"Number of client connections closed.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
@@ -218,12 +254,16 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
"handling interactive connection from client"
|
||||
);
|
||||
|
||||
// The `closed` counter will increase when this future is destroyed.
|
||||
let proto = mode.protocol_label();
|
||||
NUM_CLIENT_CONNECTION_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
NUM_CONNECTIONS_ACCEPTED_COUNTER
|
||||
.with_label_values(&[mode.protocol_label()])
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc();
|
||||
NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
}
|
||||
|
||||
let tls = config.tls_config.as_ref();
|
||||
@@ -258,7 +298,7 @@ pub async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
mode.allow_self_signed_compute(config),
|
||||
);
|
||||
cancel_map
|
||||
.with_session(|session| client.connect_to_db(session, mode.allow_cleartext()))
|
||||
.with_session(|session| client.connect_to_db(session, mode))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -734,7 +774,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
async fn connect_to_db(
|
||||
self,
|
||||
session: cancellation::Session<'_>,
|
||||
allow_cleartext: bool,
|
||||
mode: ClientMode,
|
||||
) -> anyhow::Result<()> {
|
||||
let Self {
|
||||
mut stream,
|
||||
@@ -750,7 +790,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
};
|
||||
|
||||
let auth_result = match creds
|
||||
.authenticate(&extra, &mut stream, allow_cleartext)
|
||||
.authenticate(&extra, &mut stream, mode.allow_cleartext())
|
||||
.await
|
||||
{
|
||||
Ok(auth_result) => auth_result,
|
||||
@@ -776,6 +816,14 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
|
||||
.or_else(|e| stream.throw_error(e))
|
||||
.await?;
|
||||
|
||||
let proto = mode.protocol_label();
|
||||
NUM_DB_CONNECTIONS_OPENED_COUNTER
|
||||
.with_label_values(&[proto])
|
||||
.inc();
|
||||
scopeguard::defer! {
|
||||
NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc();
|
||||
}
|
||||
|
||||
prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?;
|
||||
// Before proxy passing, forward to compute whatever data is left in the
|
||||
// PqStream input buffer. Normally there is none, but our serverless npm
|
||||
|
||||
Reference in New Issue
Block a user