mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Add more safekeeper and walreceiver metrics (#4142)
Add essential safekeeper and pageserver::walreceiver metrics. Mostly counters, such as the number of received queries, broker messages, removed WAL segments, or connection switches events in walreceiver. Also logs broker push loop duration.
This commit is contained in:
committed by
GitHub
parent
586e6e55f8
commit
3ceef7b17a
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3674,6 +3674,7 @@ dependencies = [
|
||||
"remote_storage",
|
||||
"reqwest",
|
||||
"safekeeper_api",
|
||||
"scopeguard",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use metrics::core::{AtomicU64, GenericCounter};
|
||||
use metrics::{
|
||||
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge_vec, register_uint_gauge_vec, Counter, CounterVec,
|
||||
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge,
|
||||
UIntGaugeVec,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
Counter, CounterVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
|
||||
UIntGauge, UIntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -478,6 +478,56 @@ pub static TENANT_TASK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
.expect("Failed to register tenant_task_events metric")
|
||||
});
|
||||
|
||||
// walreceiver metrics
|
||||
|
||||
pub static WALRECEIVER_STARTED_CONNECTIONS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_walreceiver_started_connections_total",
|
||||
"Number of started walreceiver connections"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_ACTIVE_MANAGERS: Lazy<IntGauge> = Lazy::new(|| {
|
||||
register_int_gauge!(
|
||||
"pageserver_walreceiver_active_managers",
|
||||
"Number of active walreceiver managers"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_SWITCHES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_walreceiver_switches_total",
|
||||
"Number of walreceiver manager change_connection calls",
|
||||
&["reason"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_BROKER_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_walreceiver_broker_updates_total",
|
||||
"Number of received broker updates in walreceiver"
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_CANDIDATES_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"pageserver_walreceiver_candidates_events_total",
|
||||
"Number of walreceiver candidate events",
|
||||
&["event"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub static WALRECEIVER_CANDIDATES_ADDED: Lazy<IntCounter> =
|
||||
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["add"]));
|
||||
|
||||
pub static WALRECEIVER_CANDIDATES_REMOVED: Lazy<IntCounter> =
|
||||
Lazy::new(|| WALRECEIVER_CANDIDATES_EVENTS.with_label_values(&["remove"]));
|
||||
|
||||
// Metrics collected on WAL redo operations
|
||||
//
|
||||
// We collect the time spent in actual WAL redo ('redo'), and time waiting
|
||||
|
||||
@@ -13,6 +13,10 @@ use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, ti
|
||||
|
||||
use super::{TaskStateUpdate, WalReceiverConf};
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::metrics::{
|
||||
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
|
||||
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
|
||||
};
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::Timeline;
|
||||
use anyhow::Context;
|
||||
@@ -58,6 +62,11 @@ pub(super) async fn connection_manager_loop_step(
|
||||
}
|
||||
}
|
||||
|
||||
WALRECEIVER_ACTIVE_MANAGERS.inc();
|
||||
scopeguard::defer! {
|
||||
WALRECEIVER_ACTIVE_MANAGERS.dec();
|
||||
}
|
||||
|
||||
let id = TenantTimelineId {
|
||||
tenant_id: connection_manager_state.timeline.tenant_id,
|
||||
timeline_id: connection_manager_state.timeline.timeline_id,
|
||||
@@ -400,6 +409,10 @@ impl ConnectionManagerState {
|
||||
|
||||
/// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
|
||||
async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
|
||||
WALRECEIVER_SWITCHES
|
||||
.with_label_values(&[new_sk.reason.name()])
|
||||
.inc();
|
||||
|
||||
self.drop_old_connection(true).await;
|
||||
|
||||
let id = self.id;
|
||||
@@ -515,6 +528,8 @@ impl ConnectionManagerState {
|
||||
|
||||
/// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
|
||||
fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
|
||||
WALRECEIVER_BROKER_UPDATES.inc();
|
||||
|
||||
let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
|
||||
let old_entry = self.wal_stream_candidates.insert(
|
||||
new_safekeeper_id,
|
||||
@@ -526,6 +541,7 @@ impl ConnectionManagerState {
|
||||
|
||||
if old_entry.is_none() {
|
||||
info!("New SK node was added: {new_safekeeper_id}");
|
||||
WALRECEIVER_CANDIDATES_ADDED.inc();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -794,6 +810,7 @@ impl ConnectionManagerState {
|
||||
for node_id in node_ids_to_remove {
|
||||
info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
|
||||
self.wal_connection_retries.remove(&node_id);
|
||||
WALRECEIVER_CANDIDATES_REMOVED.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -817,8 +834,6 @@ struct NewWalConnectionCandidate {
|
||||
safekeeper_id: NodeId,
|
||||
wal_source_connconf: PgConnectionConfig,
|
||||
availability_zone: Option<String>,
|
||||
// This field is used in `derive(Debug)` only.
|
||||
#[allow(dead_code)]
|
||||
reason: ReconnectReason,
|
||||
}
|
||||
|
||||
@@ -847,6 +862,18 @@ enum ReconnectReason {
|
||||
},
|
||||
}
|
||||
|
||||
impl ReconnectReason {
|
||||
fn name(&self) -> &str {
|
||||
match self {
|
||||
ReconnectReason::NoExistingConnection => "NoExistingConnection",
|
||||
ReconnectReason::LaggingWal { .. } => "LaggingWal",
|
||||
ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
|
||||
ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
|
||||
ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn wal_stream_connection_config(
|
||||
TenantTimelineId {
|
||||
tenant_id,
|
||||
|
||||
@@ -24,8 +24,8 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, trace, warn};
|
||||
|
||||
use super::TaskStateUpdate;
|
||||
use crate::context::RequestContext;
|
||||
use crate::metrics::LIVE_CONNECTIONS_COUNT;
|
||||
use crate::{context::RequestContext, metrics::WALRECEIVER_STARTED_CONNECTIONS};
|
||||
use crate::{
|
||||
task_mgr,
|
||||
task_mgr::TaskKind,
|
||||
@@ -71,6 +71,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
ctx: RequestContext,
|
||||
node: NodeId,
|
||||
) -> anyhow::Result<()> {
|
||||
WALRECEIVER_STARTED_CONNECTIONS.inc();
|
||||
|
||||
// Connect to the database in replication mode.
|
||||
info!("connecting to {wal_source_connconf:?}");
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ parking_lot.workspace = true
|
||||
postgres.workspace = true
|
||||
postgres-protocol.workspace = true
|
||||
regex.workspace = true
|
||||
scopeguard.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
@@ -14,10 +14,13 @@ use storage_broker::proto::SubscribeSafekeeperInfoRequest;
|
||||
use storage_broker::Request;
|
||||
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::{runtime, time::sleep};
|
||||
use tracing::*;
|
||||
|
||||
use crate::metrics::BROKER_PULLED_UPDATES;
|
||||
use crate::metrics::BROKER_PUSHED_UPDATES;
|
||||
use crate::GlobalTimelines;
|
||||
use crate::SafeKeeperConf;
|
||||
|
||||
@@ -49,12 +52,17 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> {
|
||||
// is under plain mutex. That's ok, all this code is not performance
|
||||
// sensitive and there is no risk of deadlock as we don't await while
|
||||
// lock is held.
|
||||
let now = Instant::now();
|
||||
let mut active_tlis = GlobalTimelines::get_all();
|
||||
active_tlis.retain(|tli| tli.is_active());
|
||||
for tli in &active_tlis {
|
||||
let sk_info = tli.get_safekeeper_info(&conf);
|
||||
yield sk_info;
|
||||
BROKER_PUSHED_UPDATES.inc();
|
||||
}
|
||||
let elapsed = now.elapsed();
|
||||
// Log duration every second. Should be about 10MB of logs per day.
|
||||
info!("pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed);
|
||||
sleep(push_interval).await;
|
||||
}
|
||||
};
|
||||
@@ -79,6 +87,10 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
.context("subscribe_safekeper_info request failed")?
|
||||
.into_inner();
|
||||
|
||||
let ok_counter = BROKER_PULLED_UPDATES.with_label_values(&["ok"]);
|
||||
let not_found = BROKER_PULLED_UPDATES.with_label_values(&["not_found"]);
|
||||
let err_counter = BROKER_PULLED_UPDATES.with_label_values(&["error"]);
|
||||
|
||||
while let Some(msg) = stream.message().await? {
|
||||
let proto_ttid = msg
|
||||
.tenant_timeline_id
|
||||
@@ -91,7 +103,15 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> {
|
||||
// connection to the broker.
|
||||
|
||||
// note: there are blocking operations below, but it's considered fine for now
|
||||
tli.record_safekeeper_info(msg).await?
|
||||
let res = tli.record_safekeeper_info(msg).await;
|
||||
if res.is_ok() {
|
||||
ok_counter.inc();
|
||||
} else {
|
||||
err_counter.inc();
|
||||
}
|
||||
res?;
|
||||
} else {
|
||||
not_found.inc();
|
||||
}
|
||||
}
|
||||
bail!("end of stream");
|
||||
|
||||
@@ -10,7 +10,7 @@ use tracing::{info, info_span, Instrument};
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
|
||||
|
||||
use crate::metrics::TrafficMetrics;
|
||||
use crate::metrics::{TrafficMetrics, PG_QUERIES_FINISHED, PG_QUERIES_RECEIVED};
|
||||
use crate::wal_service::ConnectionId;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
use postgres_backend::QueryError;
|
||||
@@ -72,6 +72,15 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
}
|
||||
}
|
||||
|
||||
fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
match cmd {
|
||||
SafekeeperPostgresCommand::StartWalPush => "START_WAL_PUSH",
|
||||
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
|
||||
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
|
||||
SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
for SafekeeperPostgresHandler
|
||||
@@ -168,6 +177,12 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
}
|
||||
|
||||
let cmd = parse_cmd(query_string)?;
|
||||
let cmd_str = cmd_to_string(&cmd);
|
||||
|
||||
PG_QUERIES_RECEIVED.with_label_values(&[cmd_str]).inc();
|
||||
scopeguard::defer! {
|
||||
PG_QUERIES_FINISHED.with_label_values(&[cmd_str]).inc();
|
||||
}
|
||||
|
||||
info!(
|
||||
"got query {:?} in timeline {:?}",
|
||||
|
||||
@@ -10,7 +10,7 @@ use anyhow::Result;
|
||||
use metrics::{
|
||||
core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts},
|
||||
proto::MetricFamily,
|
||||
register_int_counter_vec, Gauge, IntCounterVec, IntGaugeVec,
|
||||
register_int_counter, register_int_counter_vec, Gauge, IntCounter, IntCounterVec, IntGaugeVec,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
@@ -73,6 +73,58 @@ pub static PG_IO_BYTES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
)
|
||||
.expect("Failed to register safekeeper_pg_io_bytes gauge")
|
||||
});
|
||||
pub static BROKER_PUSHED_UPDATES: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_broker_pushed_updates_total",
|
||||
"Number of timeline updates pushed to the broker"
|
||||
)
|
||||
.expect("Failed to register safekeeper_broker_pushed_updates_total counter")
|
||||
});
|
||||
pub static BROKER_PULLED_UPDATES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_broker_pulled_updates_total",
|
||||
"Number of timeline updates pulled and processed from the broker",
|
||||
&["result"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_broker_pulled_updates_total counter")
|
||||
});
|
||||
pub static PG_QUERIES_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_pg_queries_received_total",
|
||||
"Number of queries received through pg protocol",
|
||||
&["query"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_pg_queries_received_total counter")
|
||||
});
|
||||
pub static PG_QUERIES_FINISHED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"safekeeper_pg_queries_finished_total",
|
||||
"Number of queries finished through pg protocol",
|
||||
&["query"]
|
||||
)
|
||||
.expect("Failed to register safekeeper_pg_queries_finished_total counter")
|
||||
});
|
||||
pub static REMOVED_WAL_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_removed_wal_segments_total",
|
||||
"Number of WAL segments removed from the disk"
|
||||
)
|
||||
.expect("Failed to register safekeeper_removed_wal_segments_total counter")
|
||||
});
|
||||
pub static BACKED_UP_SEGMENTS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_backed_up_segments_total",
|
||||
"Number of WAL segments backed up to the broker"
|
||||
)
|
||||
.expect("Failed to register safekeeper_backed_up_segments_total counter")
|
||||
});
|
||||
pub static BACKUP_ERRORS: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"safekeeper_backup_errors_total",
|
||||
"Number of errors during backup"
|
||||
)
|
||||
.expect("Failed to register safekeeper_backup_errors_total counter")
|
||||
});
|
||||
|
||||
pub const LABEL_UNKNOWN: &str = "unknown";
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use tracing::*;
|
||||
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::metrics::{BACKED_UP_SEGMENTS, BACKUP_ERRORS};
|
||||
use crate::timeline::{PeerInfo, Timeline};
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
|
||||
@@ -394,7 +395,13 @@ async fn backup_single_segment(
|
||||
)
|
||||
})?;
|
||||
|
||||
backup_object(&segment_file_path, &remote_segment_path, seg.size()).await?;
|
||||
let res = backup_object(&segment_file_path, &remote_segment_path, seg.size()).await;
|
||||
if res.is_ok() {
|
||||
BACKED_UP_SEGMENTS.inc();
|
||||
} else {
|
||||
BACKUP_ERRORS.inc();
|
||||
}
|
||||
res?;
|
||||
debug!("Backup of {} done", segment_file_path.display());
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -27,7 +27,7 @@ use tracing::*;
|
||||
|
||||
use utils::{id::TenantTimelineId, lsn::Lsn};
|
||||
|
||||
use crate::metrics::{time_io_closure, WalStorageMetrics};
|
||||
use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
|
||||
use crate::safekeeper::SafeKeeperState;
|
||||
|
||||
use crate::wal_backup::read_object;
|
||||
@@ -455,6 +455,7 @@ fn remove_segments_from_disk(
|
||||
n_removed += 1;
|
||||
min_removed = min(min_removed, segno);
|
||||
max_removed = max(max_removed, segno);
|
||||
REMOVED_WAL_SEGMENTS.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user