diff --git a/proxy/src/http/conn_pool.rs b/proxy/src/http/conn_pool.rs index a7ef15d342..7258f1a2cf 100644 --- a/proxy/src/http/conn_pool.rs +++ b/proxy/src/http/conn_pool.rs @@ -20,6 +20,7 @@ use tokio_postgres::AsyncMessage; use crate::{ auth, console, metrics::{Ids, MetricCounter, USAGE_METRICS}, + proxy::{NUM_DB_CONNECTIONS_CLOSED_COUNTER, NUM_DB_CONNECTIONS_OPENED_COUNTER}, }; use crate::{compute, config}; @@ -418,36 +419,42 @@ async fn connect_to_compute_once( }; tokio::spawn( - poll_fn(move |cx| { - if matches!(rx.has_changed(), Ok(true)) { - session = *rx.borrow_and_update(); - info!(%session, "changed session"); + async move { + NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc(); + scopeguard::defer! { + NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); } + poll_fn(move |cx| { + if matches!(rx.has_changed(), Ok(true)) { + session = *rx.borrow_and_update(); + info!(%session, "changed session"); + } - loop { - let message = ready!(connection.poll_message(cx)); + loop { + let message = ready!(connection.poll_message(cx)); - match message { - Some(Ok(AsyncMessage::Notice(notice))) => { - info!(%session, "notice: {}", notice); - } - Some(Ok(AsyncMessage::Notification(notif))) => { - warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); - } - Some(Ok(_)) => { - warn!(%session, "unknown message"); - } - Some(Err(e)) => { - error!(%session, "connection error: {}", e); - return Poll::Ready(()) - } - None => { - info!("connection closed"); - return Poll::Ready(()) + match message { + Some(Ok(AsyncMessage::Notice(notice))) => { + info!(%session, "notice: {}", notice); + } + Some(Ok(AsyncMessage::Notification(notif))) => { + warn!(%session, pid = notif.process_id(), channel = notif.channel(), "notification received"); + } + Some(Ok(_)) => { + warn!(%session, "unknown message"); + } + Some(Err(e)) => { + error!(%session, "connection error: {}", e); + return Poll::Ready(()) + } + None => { + info!("connection closed"); + return Poll::Ready(()) + } } } - } - }) + }).await + } .instrument(span) ); diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index ffc788e873..fbf19bcb50 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -24,6 +24,8 @@ use url::Url; use utils::http::error::ApiError; use utils::http::json::json_response; +use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER}; + use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; @@ -245,6 +247,13 @@ async fn handle_inner( conn_pool: Arc, session_id: uuid::Uuid, ) -> anyhow::Result> { + NUM_CONNECTIONS_ACCEPTED_COUNTER + .with_label_values(&["http"]) + .inc(); + scopeguard::defer! { + NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); + } + // // Determine the destination and connection params // diff --git a/proxy/src/http/websocket.rs b/proxy/src/http/websocket.rs index 994a7de764..ddebbccabc 100644 --- a/proxy/src/http/websocket.rs +++ b/proxy/src/http/websocket.rs @@ -3,7 +3,10 @@ use crate::{ config::ProxyConfig, error::io_error, protocol2::{ProxyProtocolAccept, WithClientIp}, - proxy::{handle_client, ClientMode}, + proxy::{ + handle_client, ClientMode, NUM_CLIENT_CONNECTION_CLOSED_COUNTER, + NUM_CLIENT_CONNECTION_OPENED_COUNTER, + }, }; use bytes::{Buf, Bytes}; use futures::{Sink, Stream, StreamExt}; @@ -275,23 +278,25 @@ pub async fn task_main( let conn_pool = conn_pool.clone(); async move { - Ok::<_, Infallible>(hyper::service::service_fn(move |req: Request| { - let sni_name = sni_name.clone(); - let conn_pool = conn_pool.clone(); + Ok::<_, Infallible>(MetricService::new(hyper::service::service_fn( + move |req: Request| { + let sni_name = sni_name.clone(); + let conn_pool = conn_pool.clone(); - async move { - let cancel_map = Arc::new(CancelMap::default()); - let session_id = uuid::Uuid::new_v4(); + async move { + let cancel_map = Arc::new(CancelMap::default()); + let session_id = uuid::Uuid::new_v4(); - ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) - .instrument(info_span!( - "ws-client", - session = %session_id, - %peer_addr, - )) - .await - } - })) + ws_handler(req, config, conn_pool, cancel_map, session_id, sni_name) + .instrument(info_span!( + "ws-client", + session = %session_id, + %peer_addr, + )) + .await + } + }, + ))) } }, ); @@ -303,3 +308,41 @@ pub async fn task_main( Ok(()) } + +struct MetricService { + inner: S, +} + +impl MetricService { + fn new(inner: S) -> MetricService { + NUM_CLIENT_CONNECTION_OPENED_COUNTER + .with_label_values(&["http"]) + .inc(); + MetricService { inner } + } +} + +impl Drop for MetricService { + fn drop(&mut self) { + NUM_CLIENT_CONNECTION_CLOSED_COUNTER + .with_label_values(&["http"]) + .inc(); + } +} + +impl hyper::service::Service> for MetricService +where + S: hyper::service::Service>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.inner.call(req) + } +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index f1343ae014..9cfe98347b 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -39,19 +39,55 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { +pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( - "proxy_accepted_connections_total", - "Number of TCP client connections accepted.", + "proxy_opened_db_connections_total", + "Number of opened connections to a database.", &["protocol"], ) .unwrap() }); -static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { +pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_closed_db_connections_total", + "Number of closed connections to a database.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_opened_client_connections_total", + "Number of opened connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_closed_client_connections_total", + "Number of closed connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_accepted_connections_total", + "Number of client connections accepted.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { register_int_counter_vec!( "proxy_closed_connections_total", - "Number of TCP client connections closed.", + "Number of client connections closed.", &["protocol"], ) .unwrap() @@ -218,12 +254,16 @@ pub async fn handle_client( "handling interactive connection from client" ); - // The `closed` counter will increase when this future is destroyed. + let proto = mode.protocol_label(); + NUM_CLIENT_CONNECTION_OPENED_COUNTER + .with_label_values(&[proto]) + .inc(); NUM_CONNECTIONS_ACCEPTED_COUNTER - .with_label_values(&[mode.protocol_label()]) + .with_label_values(&[proto]) .inc(); scopeguard::defer! { - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[mode.protocol_label()]).inc(); + NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc(); + NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); } let tls = config.tls_config.as_ref(); @@ -258,7 +298,7 @@ pub async fn handle_client( mode.allow_self_signed_compute(config), ); cancel_map - .with_session(|session| client.connect_to_db(session, mode.allow_cleartext())) + .with_session(|session| client.connect_to_db(session, mode)) .await } @@ -734,7 +774,7 @@ impl Client<'_, S> { async fn connect_to_db( self, session: cancellation::Session<'_>, - allow_cleartext: bool, + mode: ClientMode, ) -> anyhow::Result<()> { let Self { mut stream, @@ -750,7 +790,7 @@ impl Client<'_, S> { }; let auth_result = match creds - .authenticate(&extra, &mut stream, allow_cleartext) + .authenticate(&extra, &mut stream, mode.allow_cleartext()) .await { Ok(auth_result) => auth_result, @@ -776,6 +816,14 @@ impl Client<'_, S> { .or_else(|e| stream.throw_error(e)) .await?; + let proto = mode.protocol_label(); + NUM_DB_CONNECTIONS_OPENED_COUNTER + .with_label_values(&[proto]) + .inc(); + scopeguard::defer! { + NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); + } + prepare_client_connection(&node, reported_auth_ok, session, &mut stream).await?; // Before proxy passing, forward to compute whatever data is left in the // PqStream input buffer. Normally there is none, but our serverless npm