diff --git a/Cargo.lock b/Cargo.lock index bce2d11188..9436b591d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3674,6 +3674,7 @@ dependencies = [ "remote_storage", "reqwest", "safekeeper_api", + "scopeguard", "serde", "serde_json", "serde_with", diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b5d7eb0132..ec2f49c85a 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1,9 +1,9 @@ use metrics::core::{AtomicU64, GenericCounter}; use metrics::{ register_counter_vec, register_histogram, register_histogram_vec, register_int_counter, - register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, Counter, CounterVec, - Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, - UIntGaugeVec, + register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec, + Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, + UIntGauge, UIntGaugeVec, }; use once_cell::sync::Lazy; use pageserver_api::models::TenantState; @@ -478,6 +478,56 @@ pub static TENANT_TASK_EVENTS: Lazy = Lazy::new(|| { .expect("Failed to register tenant_task_events metric") }); +// walreceiver metrics + +pub static WALRECEIVER_STARTED_CONNECTIONS: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_walreceiver_started_connections_total", + "Number of started walreceiver connections" + ) + .expect("failed to define a metric") +}); + +pub static WALRECEIVER_ACTIVE_MANAGERS: Lazy = Lazy::new(|| { + register_int_gauge!( + "pageserver_walreceiver_active_managers", + "Number of active walreceiver managers" + ) + .expect("failed to define a metric") +}); + +pub static WALRECEIVER_SWITCHES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_walreceiver_switches_total", + "Number of walreceiver manager change_connection calls", + &["reason"] + ) + .expect("failed to define a metric") +}); + +pub static WALRECEIVER_BROKER_UPDATES: Lazy = Lazy::new(|| { + register_int_counter!( + "pageserver_walreceiver_broker_updates_total", + "Number of received broker updates in walreceiver" + ) + .expect("failed to define a metric") +}); + +pub static WALRECEIVER_CANDIDATES_EVENTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_walreceiver_candidates_events_total", + "Number of walreceiver candidate events", + &["event"] + ) + .expect("failed to define a metric") +}); + +pub static WALRECEIVER_CANDIDATES_ADDED: Lazy = + Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["add"])); + +pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy = + Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"])); + // Metrics collected on WAL redo operations // // We collect the time spent in actual WAL redo ('redo'), and time waiting diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 17c66238f2..9cb17ea799 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -13,6 +13,10 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, ti use super::{TaskStateUpdate, WalReceiverConf}; use crate::context::{DownloadBehavior, RequestContext}; +use crate::metrics::{ + WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED, + WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES, +}; use crate::task_mgr::TaskKind; use crate::tenant::Timeline; use anyhow::Context; @@ -58,6 +62,11 @@ pub(super) async fn connection_manager_loop_step( } } + WALRECEIVER_ACTIVE_MANAGERS.inc(); + scopeguard::defer! { + WALRECEIVER_ACTIVE_MANAGERS.dec(); + } + let id = TenantTimelineId { tenant_id: connection_manager_state.timeline.tenant_id, timeline_id: connection_manager_state.timeline.timeline_id, @@ -400,6 +409,10 @@ impl ConnectionManagerState { /// Shuts down the current connection (if any) and immediately starts another one with the given connection string. async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) { + WALRECEIVER_SWITCHES + .with_label_values(&[new_sk.reason.name()]) + .inc(); + self.drop_old_connection(true).await; let id = self.id; @@ -515,6 +528,8 @@ impl ConnectionManagerState { /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key. fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) { + WALRECEIVER_BROKER_UPDATES.inc(); + let new_safekeeper_id = NodeId(timeline_update.safekeeper_id); let old_entry = self.wal_stream_candidates.insert( new_safekeeper_id, @@ -526,6 +541,7 @@ impl ConnectionManagerState { if old_entry.is_none() { info!("New SK node was added: {new_safekeeper_id}"); + WALRECEIVER_CANDIDATES_ADDED.inc(); } } @@ -794,6 +810,7 @@ impl ConnectionManagerState { for node_id in node_ids_to_remove { info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections"); self.wal_connection_retries.remove(&node_id); + WALRECEIVER_CANDIDATES_REMOVED.inc(); } } } @@ -817,8 +834,6 @@ struct NewWalConnectionCandidate { safekeeper_id: NodeId, wal_source_connconf: PgConnectionConfig, availability_zone: Option, - // This field is used in `derive(Debug)` only. - #[allow(dead_code)] reason: ReconnectReason, } @@ -847,6 +862,18 @@ enum ReconnectReason { }, } +impl ReconnectReason { + fn name(&self) -> &str { + match self { + ReconnectReason::NoExistingConnection => "NoExistingConnection", + ReconnectReason::LaggingWal { .. } => "LaggingWal", + ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone", + ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout", + ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives", + } + } +} + fn wal_stream_connection_config( TenantTimelineId { tenant_id, diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 801641a534..1cbed3416c 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -24,8 +24,8 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn}; use super::TaskStateUpdate; -use crate::context::RequestContext; use crate::metrics::LIVE_CONNECTIONS_COUNT; +use crate::{context::RequestContext, metrics::WALRECEIVER_STARTED_CONNECTIONS}; use crate::{ task_mgr, task_mgr::TaskKind, @@ -71,6 +71,8 @@ pub(super) async fn handle_walreceiver_connection( ctx: RequestContext, node: NodeId, ) -> anyhow::Result<()> { + WALRECEIVER_STARTED_CONNECTIONS.inc(); + // Connect to the database in replication mode. info!("connecting to {wal_source_connconf:?}"); diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index b6e8497809..393570df6a 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -25,6 +25,7 @@ parking_lot.workspace = true postgres.workspace = true postgres-protocol.workspace = true regex.workspace = true +scopeguard.workspace = true reqwest = { workspace = true, features = ["json"] } serde.workspace = true serde_json.workspace = true diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 6a98d8fd84..5e25d22ec1 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -14,10 +14,13 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest; use storage_broker::Request; use std::time::Duration; +use std::time::Instant; use tokio::task::JoinHandle; use tokio::{runtime, time::sleep}; use tracing::*; +use crate::metrics::BROKER_PULLED_UPDATES; +use crate::metrics::BROKER_PUSHED_UPDATES; use crate::GlobalTimelines; use crate::SafeKeeperConf; @@ -49,12 +52,17 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // is under plain mutex. That's ok, all this code is not performance // sensitive and there is no risk of deadlock as we don't await while // lock is held. + let now = Instant::now(); let mut active_tlis = GlobalTimelines::get_all(); active_tlis.retain(|tli| tli.is_active()); for tli in &active_tlis { let sk_info = tli.get_safekeeper_info(&conf); yield sk_info; + BROKER_PUSHED_UPDATES.inc(); } + let elapsed = now.elapsed(); + // Log duration every second. Should be about 10MB of logs per day. + info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed); sleep(push_interval).await; } }; @@ -79,6 +87,10 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { .context("subscribe_safekeper_info request failed")? .into_inner(); + let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]); + let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]); + let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]); + while let Some(msg) = stream.message().await? { let proto_ttid = msg .tenant_timeline_id @@ -91,7 +103,15 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { // connection to the broker. // note: there are blocking operations below, but it's considered fine for now - tli.record_safekeeper_info(msg).await? + let res = tli.record_safekeeper_info(msg).await; + if res.is_ok() { + ok_counter.inc(); + } else { + err_counter.inc(); + } + res?; + } else { + not_found.inc(); } } bail!("end of stream"); diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 2c3d1cea0e..7d25ced449 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -10,7 +10,7 @@ use tracing::{info, info_span, Instrument}; use crate::auth::check_permission; use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage}; -use crate::metrics::TrafficMetrics; +use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED}; use crate::wal_service::ConnectionId; use crate::{GlobalTimelines, SafeKeeperConf}; use postgres_backend::QueryError; @@ -72,6 +72,15 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { } } +fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str { + match cmd { + SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH", + SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION", + SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM", + SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL", + } +} + #[async_trait::async_trait] impl postgres_backend::Handler for SafekeeperPostgresHandler @@ -168,6 +177,12 @@ impl postgres_backend::Handler } let cmd = parse_cmd(query_string)?; + let cmd_str = cmd_to_string(&cmd); + + PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc(); + scopeguard::defer! { + PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc(); + } info!( "got query {:?} in timeline {:?}", diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index eafee557d7..189af2b044 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -10,7 +10,7 @@ use anyhow::Result; use metrics::{ core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, proto::MetricFamily, - register_int_counter_vec, Gauge, IntCounterVec, IntGaugeVec, + register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec, }; use once_cell::sync::Lazy; @@ -73,6 +73,58 @@ pub static PG_IO_BYTES: Lazy = Lazy::new(|| { ) .expect("Failed to register safekeeper_pg_io_bytes gauge") }); +pub static BROKER_PUSHED_UPDATES: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_broker_pushed_updates_total", + "Number of timeline updates pushed to the broker" + ) + .expect("Failed to register safekeeper_broker_pushed_updates_total counter") +}); +pub static BROKER_PULLED_UPDATES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "safekeeper_broker_pulled_updates_total", + "Number of timeline updates pulled and processed from the broker", + &["result"] + ) + .expect("Failed to register safekeeper_broker_pulled_updates_total counter") +}); +pub static PG_QUERIES_RECEIVED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "safekeeper_pg_queries_received_total", + "Number of queries received through pg protocol", + &["query"] + ) + .expect("Failed to register safekeeper_pg_queries_received_total counter") +}); +pub static PG_QUERIES_FINISHED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "safekeeper_pg_queries_finished_total", + "Number of queries finished through pg protocol", + &["query"] + ) + .expect("Failed to register safekeeper_pg_queries_finished_total counter") +}); +pub static REMOVED_WAL_SEGMENTS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_removed_wal_segments_total", + "Number of WAL segments removed from the disk" + ) + .expect("Failed to register safekeeper_removed_wal_segments_total counter") +}); +pub static BACKED_UP_SEGMENTS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_backed_up_segments_total", + "Number of WAL segments backed up to the broker" + ) + .expect("Failed to register safekeeper_backed_up_segments_total counter") +}); +pub static BACKUP_ERRORS: Lazy = Lazy::new(|| { + register_int_counter!( + "safekeeper_backup_errors_total", + "Number of errors during backup" + ) + .expect("Failed to register safekeeper_backup_errors_total counter") +}); pub const LABEL_UNKNOWN: &str = "unknown"; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 163ac99be8..953c7d0022 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -25,6 +25,7 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; +use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS}; use crate::timeline::{PeerInfo, Timeline}; use crate::{GlobalTimelines, SafeKeeperConf}; @@ -394,7 +395,13 @@ async fn backup_single_segment( ) })?; - backup_object(&segment_file_path, &remote_segment_path, seg.size()).await?; + let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await; + if res.is_ok() { + BACKED_UP_SEGMENTS.inc(); + } else { + BACKUP_ERRORS.inc(); + } + res?; debug!("Backup of {} done", segment_file_path.display()); Ok(()) diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 5ef22b2f6a..1b82bd754e 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -27,7 +27,7 @@ use tracing::*; use utils::{id::TenantTimelineId, lsn::Lsn}; -use crate::metrics::{time_io_closure, WalStorageMetrics}; +use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS}; use crate::safekeeper::SafeKeeperState; use crate::wal_backup::read_object; @@ -455,6 +455,7 @@ fn remove_segments_from_disk( n_removed += 1; min_removed = min(min_removed, segno); max_removed = max(max_removed, segno); + REMOVED_WAL_SEGMENTS.inc(); } } }