From cc633585dca4c98e028ba37acf578f7c8cd17c99 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Thu, 14 Dec 2023 17:21:39 +0000 Subject: [PATCH] gauge guards (#6138) ## Problem The websockets gauge for active db connections seems to be growing more than the gauge for client connections over websockets, which does not make sense. ## Summary of changes refactor how our counter-pair gauges are represented. not sure if this will improve the problem, but it should be harder to mess-up the counters. The API is much nicer though now and doesn't require scopeguard::defer hacks --- libs/metrics/src/lib.rs | 139 +++++++++++++++++++++++++- pageserver/src/metrics.rs | 36 +++---- pageserver/src/tenant/tasks.rs | 8 +- proxy/src/compute.rs | 12 ++- proxy/src/proxy.rs | 65 ++++-------- proxy/src/serverless.rs | 22 ++-- proxy/src/serverless/conn_pool.rs | 15 ++- proxy/src/serverless/sql_over_http.rs | 9 +- safekeeper/src/handler.rs | 7 +- safekeeper/src/metrics.rs | 13 +-- 10 files changed, 209 insertions(+), 117 deletions(-) diff --git a/libs/metrics/src/lib.rs b/libs/metrics/src/lib.rs index ed375a152f..d09ba11344 100644 --- a/libs/metrics/src/lib.rs +++ b/libs/metrics/src/lib.rs @@ -3,8 +3,11 @@ //! Otherwise, we might not see all metrics registered via //! a default registry. #![deny(clippy::undocumented_unsafe_blocks)] + use once_cell::sync::Lazy; -use prometheus::core::{AtomicU64, Collector, GenericGauge, GenericGaugeVec}; +use prometheus::core::{ + Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec, +}; pub use prometheus::opts; pub use prometheus::register; pub use prometheus::Error; @@ -132,3 +135,137 @@ fn get_rusage_stats() -> libc::rusage { rusage.assume_init() } } + +/// Create an [`IntCounterPairVec`] and registers to default registry. +#[macro_export(local_inner_macros)] +macro_rules! register_int_counter_pair_vec { + ($NAME1:expr, $HELP1:expr, $NAME2:expr, $HELP2:expr, $LABELS_NAMES:expr $(,)?) => {{ + match ( + $crate::register_int_counter_vec!($NAME1, $HELP1, $LABELS_NAMES), + $crate::register_int_counter_vec!($NAME2, $HELP2, $LABELS_NAMES), + ) { + (Ok(inc), Ok(dec)) => Ok($crate::IntCounterPairVec::new(inc, dec)), + (Err(e), _) | (_, Err(e)) => Err(e), + } + }}; +} +/// Create an [`IntCounterPair`] and registers to default registry. +#[macro_export(local_inner_macros)] +macro_rules! register_int_counter_pair { + ($NAME1:expr, $HELP1:expr, $NAME2:expr, $HELP2:expr $(,)?) => {{ + match ( + $crate::register_int_counter!($NAME1, $HELP1), + $crate::register_int_counter!($NAME2, $HELP2), + ) { + (Ok(inc), Ok(dec)) => Ok($crate::IntCounterPair::new(inc, dec)), + (Err(e), _) | (_, Err(e)) => Err(e), + } + }}; +} + +/// A Pair of [`GenericCounterVec`]s. Like an [`GenericGaugeVec`] but will always observe changes +pub struct GenericCounterPairVec { + inc: GenericCounterVec

, + dec: GenericCounterVec

, +} + +/// A Pair of [`GenericCounter`]s. Like an [`GenericGauge`] but will always observe changes +pub struct GenericCounterPair { + inc: GenericCounter

, + dec: GenericCounter

, +} + +impl GenericCounterPairVec

{ + pub fn new(inc: GenericCounterVec

, dec: GenericCounterVec

) -> Self { + Self { inc, dec } + } + + /// `get_metric_with_label_values` returns the [`GenericCounterPair

`] for the given slice + /// of label values (same order as the VariableLabels in Desc). If that combination of + /// label values is accessed for the first time, a new [`GenericCounterPair

`] is created. + /// + /// An error is returned if the number of label values is not the same as the + /// number of VariableLabels in Desc. + pub fn get_metric_with_label_values(&self, vals: &[&str]) -> Result> { + Ok(GenericCounterPair { + inc: self.inc.get_metric_with_label_values(vals)?, + dec: self.dec.get_metric_with_label_values(vals)?, + }) + } + + /// `with_label_values` works as `get_metric_with_label_values`, but panics if an error + /// occurs. + pub fn with_label_values(&self, vals: &[&str]) -> GenericCounterPair

{ + self.get_metric_with_label_values(vals).unwrap() + } +} + +impl GenericCounterPair

{ + pub fn new(inc: GenericCounter

, dec: GenericCounter

) -> Self { + Self { inc, dec } + } + + /// Increment the gauge by 1, returning a guard that decrements by 1 on drop. + pub fn guard(&self) -> GenericCounterPairGuard

{ + self.inc.inc(); + GenericCounterPairGuard(self.dec.clone()) + } + + /// Increment the gauge by n, returning a guard that decrements by n on drop. + pub fn guard_by(&self, n: P::T) -> GenericCounterPairGuardBy

{ + self.inc.inc_by(n); + GenericCounterPairGuardBy(self.dec.clone(), n) + } + + /// Increase the gauge by 1. + #[inline] + pub fn inc(&self) { + self.inc.inc(); + } + + /// Decrease the gauge by 1. + #[inline] + pub fn dec(&self) { + self.dec.inc(); + } + + /// Add the given value to the gauge. (The value can be + /// negative, resulting in a decrement of the gauge.) + #[inline] + pub fn inc_by(&self, v: P::T) { + self.inc.inc_by(v); + } + + /// Subtract the given value from the gauge. (The value can be + /// negative, resulting in an increment of the gauge.) + #[inline] + pub fn dec_by(&self, v: P::T) { + self.dec.inc_by(v); + } +} + +/// Guard returned by [`GenericCounterPair::guard`] +pub struct GenericCounterPairGuard(GenericCounter

); + +impl Drop for GenericCounterPairGuard

{ + fn drop(&mut self) { + self.0.inc(); + } +} +/// Guard returned by [`GenericCounterPair::guard_by`] +pub struct GenericCounterPairGuardBy(GenericCounter

, P::T); + +impl Drop for GenericCounterPairGuardBy

{ + fn drop(&mut self) { + self.0.inc_by(self.1); + } +} + +/// A Pair of [`IntCounterVec`]s. Like an [`IntGaugeVec`] but will always observe changes +pub type IntCounterPairVec = GenericCounterPairVec; + +/// A Pair of [`IntCounter`]s. Like an [`IntGauge`] but will always observe changes +pub type IntCounterPair = GenericCounterPair; + +/// A guard for [`IntCounterPair`] that will decrement the gauge on drop +pub type IntCounterPairGuard = GenericCounterPairGuard; diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index eefe295f94..ba6fd00bd1 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2,9 +2,10 @@ use enum_map::EnumMap; use metrics::metric_vec_duration::DurationResultObserver; use metrics::{ register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec, - register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, - register_uint_gauge, register_uint_gauge_vec, Counter, CounterVec, GaugeVec, Histogram, - HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec, + register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, + register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec, + Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPairVec, + IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec, }; use once_cell::sync::Lazy; use pageserver_api::shard::TenantShardId; @@ -1343,25 +1344,16 @@ pub(crate) static TENANT_TASK_EVENTS: Lazy = Lazy::new(|| { .expect("Failed to register tenant_task_events metric") }); -pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT: Lazy = - Lazy::new(|| { - register_int_counter_vec!( - "pageserver_background_loop_semaphore_wait_start_count", - "Counter for background loop concurrency-limiting semaphore acquire calls started", - &["task"], - ) - .unwrap() - }); - -pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT: Lazy = - Lazy::new(|| { - register_int_counter_vec!( - "pageserver_background_loop_semaphore_wait_finish_count", - "Counter for background loop concurrency-limiting semaphore acquire calls finished", - &["task"], - ) - .unwrap() - }); +pub(crate) static BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "pageserver_background_loop_semaphore_wait_start_count", + "Counter for background loop concurrency-limiting semaphore acquire calls started", + "pageserver_background_loop_semaphore_wait_finish_count", + "Counter for background loop concurrency-limiting semaphore acquire calls finished", + &["task"], + ) + .unwrap() +}); pub(crate) static BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index dc23030218..4b118442f4 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -63,12 +63,10 @@ pub(crate) async fn concurrent_background_tasks_rate_limit( _ctx: &RequestContext, cancel: &CancellationToken, ) -> Result { - crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_START_COUNT + let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_GAUGE .with_label_values(&[loop_kind.as_static_str()]) - .inc(); - scopeguard::defer!( - crate::metrics::BACKGROUND_LOOP_SEMAPHORE_WAIT_FINISH_COUNT.with_label_values(&[loop_kind.as_static_str()]).inc(); - ); + .guard(); + tokio::select! { permit = CONCURRENT_BACKGROUND_TASKS.acquire() => { match permit { diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 78c56300a5..f5f7270bf4 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,9 +1,13 @@ use crate::{ - auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError, - error::UserFacingError, proxy::neon_option, + auth::parse_endpoint_param, + cancellation::CancelClosure, + console::errors::WakeComputeError, + error::UserFacingError, + proxy::{neon_option, NUM_DB_CONNECTIONS_GAUGE}, }; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; +use metrics::IntCounterPairGuard; use pq_proto::StartupMessageParams; use std::{io, net::SocketAddr, time::Duration}; use thiserror::Error; @@ -223,6 +227,8 @@ pub struct PostgresConnection { pub params: std::collections::HashMap, /// Query cancellation token. pub cancel_closure: CancelClosure, + + _guage: IntCounterPairGuard, } impl ConnCfg { @@ -231,6 +237,7 @@ impl ConnCfg { &self, allow_self_signed_compute: bool, timeout: Duration, + proto: &'static str, ) -> Result { let (socket_addr, stream, host) = self.connect_raw(timeout).await?; @@ -264,6 +271,7 @@ impl ConnCfg { stream, params, cancel_closure, + _guage: NUM_DB_CONNECTIONS_GAUGE.with_label_values(&[proto]).guard(), }; Ok(connection) diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 396db8f96a..da65065179 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -17,7 +17,10 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use futures::TryFutureExt; use itertools::Itertools; -use metrics::{exponential_buckets, register_int_counter_vec, IntCounterVec}; +use metrics::{ + exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec, + IntCounterPairVec, IntCounterVec, +}; use once_cell::sync::{Lazy, OnceCell}; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; use prometheus::{ @@ -44,17 +47,10 @@ const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -pub static NUM_DB_CONNECTIONS_OPENED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "proxy_opened_db_connections_total", "Number of opened connections to a database.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( "proxy_closed_db_connections_total", "Number of closed connections to a database.", &["protocol"], @@ -62,17 +58,10 @@ pub static NUM_DB_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| .unwrap() }); -pub static NUM_CLIENT_CONNECTION_OPENED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "proxy_opened_client_connections_total", "Number of opened connections from a client.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( "proxy_closed_client_connections_total", "Number of closed connections from a client.", &["protocol"], @@ -80,17 +69,10 @@ pub static NUM_CLIENT_CONNECTION_CLOSED_COUNTER: Lazy = Lazy::new .unwrap() }); -pub static NUM_CONNECTIONS_ACCEPTED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "proxy_accepted_connections_total", "Number of client connections accepted.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CONNECTIONS_CLOSED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( "proxy_closed_connections_total", "Number of client connections closed.", &["protocol"], @@ -428,16 +410,12 @@ pub async fn handle_client( ); let proto = mode.protocol_label(); - NUM_CLIENT_CONNECTION_OPENED_COUNTER + let _client_gauge = NUM_CLIENT_CONNECTION_GAUGE .with_label_values(&[proto]) - .inc(); - NUM_CONNECTIONS_ACCEPTED_COUNTER + .guard(); + let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE .with_label_values(&[proto]) - .inc(); - scopeguard::defer! { - NUM_CLIENT_CONNECTION_CLOSED_COUNTER.with_label_values(&[proto]).inc(); - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); - } + .guard(); let tls = config.tls_config.as_ref(); @@ -584,12 +562,13 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg async fn connect_to_compute_once( node_info: &console::CachedNodeInfo, timeout: time::Duration, + proto: &'static str, ) -> Result { let allow_self_signed_compute = node_info.allow_self_signed_compute; node_info .config - .connect(allow_self_signed_compute, timeout) + .connect(allow_self_signed_compute, timeout, proto) .await } @@ -610,6 +589,7 @@ pub trait ConnectMechanism { pub struct TcpMechanism<'a> { /// KV-dictionary with PostgreSQL connection params. pub params: &'a StartupMessageParams, + pub proto: &'static str, } #[async_trait] @@ -623,7 +603,7 @@ impl ConnectMechanism for TcpMechanism<'_> { node_info: &console::CachedNodeInfo, timeout: time::Duration, ) -> Result { - connect_to_compute_once(node_info, timeout).await + connect_to_compute_once(node_info, timeout, self.proto).await } fn update_connect_config(&self, config: &mut compute::ConnCfg) { @@ -1028,7 +1008,7 @@ impl Client<'_, S> { let aux = node_info.aux.clone(); let mut node = connect_to_compute( - &TcpMechanism { params }, + &TcpMechanism { params, proto }, node_info, &extra, &creds, @@ -1037,13 +1017,6 @@ impl Client<'_, S> { .or_else(|e| stream.throw_error(e)) .await?; - NUM_DB_CONNECTIONS_OPENED_COUNTER - .with_label_values(&[proto]) - .inc(); - scopeguard::defer! { - NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&[proto]).inc(); - } - prepare_client_connection(&node, session, &mut stream).await?; // Before proxy passing, forward to compute whatever data is left in the // PqStream input buffer. Normally there is none, but our serverless npm diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index cdff42b529..870e9c1103 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -8,12 +8,13 @@ mod websocket; use anyhow::bail; use hyper::StatusCode; +use metrics::IntCounterPairGuard; pub use reqwest_middleware::{ClientWithMiddleware, Error}; pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio_util::task::TaskTracker; use crate::protocol2::{ProxyProtocolAccept, WithClientIp}; -use crate::proxy::{NUM_CLIENT_CONNECTION_CLOSED_COUNTER, NUM_CLIENT_CONNECTION_OPENED_COUNTER}; +use crate::proxy::NUM_CLIENT_CONNECTION_GAUGE; use crate::rate_limiter::EndpointRateLimiter; use crate::{cancellation::CancelMap, config::ProxyConfig}; use futures::StreamExt; @@ -149,22 +150,17 @@ pub async fn task_main( struct MetricService { inner: S, + _gauge: IntCounterPairGuard, } impl MetricService { fn new(inner: S) -> MetricService { - NUM_CLIENT_CONNECTION_OPENED_COUNTER - .with_label_values(&["http"]) - .inc(); - MetricService { inner } - } -} - -impl Drop for MetricService { - fn drop(&mut self) { - NUM_CLIENT_CONNECTION_CLOSED_COUNTER - .with_label_values(&["http"]) - .inc(); + MetricService { + inner, + _gauge: NUM_CLIENT_CONNECTION_GAUGE + .with_label_values(&["http"]) + .guard(), + } } } diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 4f3b31b9be..69198d79d3 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -24,10 +24,7 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus}; use crate::{ auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list}, console, - proxy::{ - neon_options, LatencyTimer, NUM_DB_CONNECTIONS_CLOSED_COUNTER, - NUM_DB_CONNECTIONS_OPENED_COUNTER, - }, + proxy::{neon_options, LatencyTimer, NUM_DB_CONNECTIONS_GAUGE}, usage_metrics::{Ids, MetricCounter, USAGE_METRICS}, }; use crate::{compute, config}; @@ -477,6 +474,11 @@ async fn connect_to_compute_once( .connect_timeout(timeout) .connect(tokio_postgres::NoTls) .await?; + + let conn_gauge = NUM_DB_CONNECTIONS_GAUGE + .with_label_values(&["http"]) + .guard(); + tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id())); let (tx, mut rx) = tokio::sync::watch::channel(session); @@ -492,10 +494,7 @@ async fn connect_to_compute_once( tokio::spawn( async move { - NUM_DB_CONNECTIONS_OPENED_COUNTER.with_label_values(&["http"]).inc(); - scopeguard::defer! { - NUM_DB_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); - } + let _conn_gauge = conn_gauge; poll_fn(move |cx| { if matches!(rx.has_changed(), Ok(true)) { session = *rx.borrow_and_update(); diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 6e80260193..795ba819c1 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -29,7 +29,7 @@ use utils::http::error::ApiError; use utils::http::json::json_response; use crate::config::HttpConfig; -use crate::proxy::{NUM_CONNECTIONS_ACCEPTED_COUNTER, NUM_CONNECTIONS_CLOSED_COUNTER}; +use crate::proxy::NUM_CONNECTION_REQUESTS_GAUGE; use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; @@ -303,12 +303,9 @@ async fn handle_inner( session_id: uuid::Uuid, peer_addr: IpAddr, ) -> anyhow::Result> { - NUM_CONNECTIONS_ACCEPTED_COUNTER + let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE .with_label_values(&["http"]) - .inc(); - scopeguard::defer! { - NUM_CONNECTIONS_CLOSED_COUNTER.with_label_values(&["http"]).inc(); - } + .guard(); // // Determine the destination and connection params diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index d5333abae6..761541168c 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -11,7 +11,7 @@ use tracing::{debug, info, info_span, Instrument}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; -use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED}; +use crate::metrics::{TrafficMetrics, PG_QUERIES_GAUGE}; use crate::safekeeper::Term; use crate::timeline::TimelineError; use crate::wal_service::ConnectionId; @@ -210,10 +210,7 @@ impl postgres_backend::Handler let cmd = parse_cmd(query_string)?; let cmd_str = cmd_to_string(&cmd); - PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc(); - scopeguard::defer! { - PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc(); - } + let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard(); info!("got query {:?}", query_string); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 0711beb290..11a3f48922 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -11,7 +11,8 @@ use futures::Future; use metrics::{ core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, proto::MetricFamily, - register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec, + register_int_counter, register_int_counter_pair_vec, register_int_counter_vec, Gauge, + IntCounter, IntCounterPairVec, IntCounterVec, IntGaugeVec, }; use once_cell::sync::Lazy; @@ -89,16 +90,10 @@ pub static BROKER_PULLED_UPDATES: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_broker_pulled_updates_total counter") }); -pub static PG_QUERIES_RECEIVED: Lazy = Lazy::new(|| { - register_int_counter_vec!( +pub static PG_QUERIES_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( "safekeeper_pg_queries_received_total", "Number of queries received through pg protocol", - &["query"] - ) - .expect("Failed to register safekeeper_pg_queries_received_total counter") -}); -pub static PG_QUERIES_FINISHED: Lazy = Lazy::new(|| { - register_int_counter_vec!( "safekeeper_pg_queries_finished_total", "Number of queries finished through pg protocol", &["query"]