mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 12:32:54 +00:00
pageserver: add pageserver_wal_ingest_values_committed metric (#10653)
## Problem We don't have visibility into the ratio of image vs. delta pages ingested in Pageservers. This might be useful to determine whether we should compress WAL records before storing them, which in turn might make compaction more efficient. ## Summary of changes Add `pageserver_wal_ingest_values_committed` metric with dimensions `class=metadata|data` and `kind=image|delta`.
This commit is contained in:
@@ -32,6 +32,7 @@ use utils::id::TimelineId;
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::context::{PageContentKind, RequestContext};
|
||||
use crate::pgdatadir_mapping::DatadirModificationStats;
|
||||
use crate::task_mgr::TaskKind;
|
||||
use crate::tenant::layer_map::LayerMap;
|
||||
use crate::tenant::mgr::TenantSlot;
|
||||
@@ -2378,10 +2379,40 @@ pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) records_observed: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) values_committed_metadata_images: IntCounter,
|
||||
pub(crate) values_committed_metadata_deltas: IntCounter,
|
||||
pub(crate) values_committed_data_images: IntCounter,
|
||||
pub(crate) values_committed_data_deltas: IntCounter,
|
||||
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
|
||||
}
|
||||
|
||||
impl WalIngestMetrics {
|
||||
pub(crate) fn inc_values_committed(&self, stats: &DatadirModificationStats) {
|
||||
if stats.metadata_images > 0 {
|
||||
self.values_committed_metadata_images
|
||||
.inc_by(stats.metadata_images);
|
||||
}
|
||||
if stats.metadata_deltas > 0 {
|
||||
self.values_committed_metadata_deltas
|
||||
.inc_by(stats.metadata_deltas);
|
||||
}
|
||||
if stats.data_images > 0 {
|
||||
self.values_committed_data_images.inc_by(stats.data_images);
|
||||
}
|
||||
if stats.data_deltas > 0 {
|
||||
self.values_committed_data_deltas.inc_by(stats.data_deltas);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
let values_committed = register_int_counter_vec!(
|
||||
"pageserver_wal_ingest_values_committed",
|
||||
"Number of values committed to pageserver storage from WAL records",
|
||||
&["class", "kind"],
|
||||
)
|
||||
.expect("failed to define a metric");
|
||||
|
||||
WalIngestMetrics {
|
||||
bytes_received: register_int_counter!(
|
||||
"pageserver_wal_ingest_bytes_received",
|
||||
@@ -2408,6 +2439,10 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
"Number of WAL records filtered out due to sharding"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
|
||||
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
|
||||
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
|
||||
values_committed_data_deltas: values_committed.with_label_values(&["data", "delta"]),
|
||||
gap_blocks_zeroed_on_rel_extend: register_int_counter!(
|
||||
"pageserver_gap_blocks_zeroed_on_rel_extend",
|
||||
"Total number of zero gap blocks written on relation extends"
|
||||
|
||||
@@ -48,7 +48,7 @@ use tracing::{debug, trace, warn};
|
||||
use utils::bin_ser::DeserializeError;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn};
|
||||
use wal_decoder::serialized_batch::SerializedValueBatch;
|
||||
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
|
||||
|
||||
/// Max delta records appended to the AUX_FILES_KEY (for aux v1). The write path will write a full image once this threshold is reached.
|
||||
pub const MAX_AUX_FILE_DELTAS: usize = 1024;
|
||||
@@ -1297,6 +1297,26 @@ impl DatadirModification<'_> {
|
||||
.is_some_and(|b| b.has_data())
|
||||
}
|
||||
|
||||
/// Returns statistics about the currently pending modifications.
|
||||
pub(crate) fn stats(&self) -> DatadirModificationStats {
|
||||
let mut stats = DatadirModificationStats::default();
|
||||
for (_, _, value) in self.pending_metadata_pages.values().flatten() {
|
||||
match value {
|
||||
Value::Image(_) => stats.metadata_images += 1,
|
||||
Value::WalRecord(r) if r.will_init() => stats.metadata_images += 1,
|
||||
Value::WalRecord(_) => stats.metadata_deltas += 1,
|
||||
}
|
||||
}
|
||||
for valuemeta in self.pending_data_batch.iter().flat_map(|b| &b.metadata) {
|
||||
match valuemeta {
|
||||
ValueMeta::Serialized(s) if s.will_init => stats.data_images += 1,
|
||||
ValueMeta::Serialized(_) => stats.data_deltas += 1,
|
||||
ValueMeta::Observed(_) => {}
|
||||
}
|
||||
}
|
||||
stats
|
||||
}
|
||||
|
||||
/// Set the current lsn
|
||||
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
ensure!(
|
||||
@@ -2317,6 +2337,15 @@ impl DatadirModification<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics for a DatadirModification.
|
||||
#[derive(Default)]
|
||||
pub struct DatadirModificationStats {
|
||||
pub metadata_images: u64,
|
||||
pub metadata_deltas: u64,
|
||||
pub data_images: u64,
|
||||
pub data_deltas: u64,
|
||||
}
|
||||
|
||||
/// This struct facilitates accessing either a committed key from the timeline at a
|
||||
/// specific LSN, or the latest uncommitted key from a pending modification.
|
||||
///
|
||||
|
||||
@@ -355,6 +355,19 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
// advances it to its end LSN. 0 is just an initialization placeholder.
|
||||
let mut modification = timeline.begin_modification(Lsn(0));
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
ctx: &RequestContext,
|
||||
uncommitted: &mut u64,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST.records_committed.inc_by(*uncommitted);
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
if !records.is_empty() {
|
||||
timeline
|
||||
.metrics
|
||||
@@ -366,8 +379,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
}
|
||||
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
@@ -396,8 +408,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -415,7 +426,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
modification.commit(&ctx).await?;
|
||||
commit(&mut modification, &ctx, &mut uncommitted_records).await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
@@ -442,10 +453,12 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = modification.stats();
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
WAL_INGEST.inc_values_committed(&stats);
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user