mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 14:00:38 +00:00
pageserver: tweak interpreted ingest record metrics (#10291)
## Problem The filtered record metric doesn't make sense for interpreted ingest. ## Summary of changes While of dubious utility in the first place, this patch replaces them with records received and records observed metrics for interpreted ingest: * received records cause the pageserver to do _something_: write a key, value pair to storage, update some metadata or flush pending modifications * observed records are a shard 0 concept and contain only key metadata used in tracking relation sizes (received records include observed records)
This commit is contained in:
@@ -95,6 +95,14 @@ impl InterpretedWalRecord {
|
||||
&& self.metadata_record.is_none()
|
||||
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
|
||||
}
|
||||
|
||||
/// Checks if the WAL record is observed (i.e. contains only metadata
|
||||
/// for observed values)
|
||||
pub fn is_observed(&self) -> bool {
|
||||
self.batch.is_observed()
|
||||
&& self.metadata_record.is_none()
|
||||
&& matches!(self.flush_uncommitted, FlushUncommittedRecords::No)
|
||||
}
|
||||
}
|
||||
|
||||
/// The interpreted part of the Postgres WAL record which requires metadata
|
||||
|
||||
@@ -501,6 +501,11 @@ impl SerializedValueBatch {
|
||||
!self.has_data() && self.metadata.is_empty()
|
||||
}
|
||||
|
||||
/// Checks if the batch contains only observed values
|
||||
pub fn is_observed(&self) -> bool {
|
||||
!self.has_data() && !self.metadata.is_empty()
|
||||
}
|
||||
|
||||
/// Checks if the batch contains data
|
||||
///
|
||||
/// Note that if this returns false, it may still contain observed values or
|
||||
|
||||
@@ -2337,13 +2337,15 @@ macro_rules! redo_bytes_histogram_count_buckets {
|
||||
pub(crate) struct WalIngestMetrics {
|
||||
pub(crate) bytes_received: IntCounter,
|
||||
pub(crate) records_received: IntCounter,
|
||||
pub(crate) records_observed: IntCounter,
|
||||
pub(crate) records_committed: IntCounter,
|
||||
pub(crate) records_filtered: IntCounter,
|
||||
pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter,
|
||||
pub(crate) clear_vm_bits_unknown: IntCounterVec,
|
||||
}
|
||||
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
|
||||
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
|
||||
WalIngestMetrics {
|
||||
bytes_received: register_int_counter!(
|
||||
"pageserver_wal_ingest_bytes_received",
|
||||
"Bytes of WAL ingested from safekeepers",
|
||||
@@ -2354,6 +2356,11 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
|
||||
"Number of WAL records received from safekeepers"
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
records_observed: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_observed",
|
||||
"Number of WAL records observed from safekeepers. These are metadata only records for shard 0."
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
records_committed: register_int_counter!(
|
||||
"pageserver_wal_ingest_records_committed",
|
||||
"Number of WAL records which resulted in writes to pageserver storage"
|
||||
@@ -2375,6 +2382,7 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMet
|
||||
&["entity"],
|
||||
)
|
||||
.expect("failed to define a metric"),
|
||||
}
|
||||
});
|
||||
|
||||
pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
|
||||
@@ -319,27 +319,11 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let status_update = match replication_message {
|
||||
ReplicationMessage::RawInterpretedWalRecords(raw) => {
|
||||
WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
|
||||
|
||||
let mut uncommitted_records = 0;
|
||||
let mut filtered_records = 0;
|
||||
|
||||
// This is the end LSN of the raw WAL from which the records
|
||||
// were interpreted.
|
||||
@@ -380,31 +364,23 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
|
||||
&& uncommitted_records > 0
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
}
|
||||
|
||||
let local_next_record_lsn = interpreted.next_record_lsn;
|
||||
let ingested = walingest
|
||||
|
||||
if interpreted.is_observed() {
|
||||
WAL_INGEST.records_observed.inc();
|
||||
}
|
||||
|
||||
walingest
|
||||
.ingest_record(interpreted, &mut modification, &ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("could not ingest record at {local_next_record_lsn}")
|
||||
})?;
|
||||
|
||||
if !ingested {
|
||||
tracing::debug!(
|
||||
"ingest: filtered out record @ LSN {local_next_record_lsn}"
|
||||
);
|
||||
WAL_INGEST.records_filtered.inc();
|
||||
filtered_records += 1;
|
||||
}
|
||||
|
||||
uncommitted_records += 1;
|
||||
|
||||
// FIXME: this cannot be made pausable_failpoint without fixing the
|
||||
@@ -418,13 +394,8 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|| modification.approx_pending_bytes()
|
||||
> DatadirModification::MAX_PENDING_BYTES
|
||||
{
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
modification.commit(&ctx).await?;
|
||||
uncommitted_records = 0;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -442,13 +413,7 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
|
||||
if uncommitted_records > 0 || needs_last_record_lsn_advance {
|
||||
// Commit any uncommitted records
|
||||
commit(
|
||||
&mut modification,
|
||||
&mut uncommitted_records,
|
||||
&mut filtered_records,
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
modification.commit(&ctx).await?;
|
||||
}
|
||||
|
||||
if !caught_up && streaming_lsn >= end_of_wal {
|
||||
@@ -469,6 +434,21 @@ pub(super) async fn handle_walreceiver_connection(
|
||||
}
|
||||
|
||||
ReplicationMessage::XLogData(xlog_data) => {
|
||||
async fn commit(
|
||||
modification: &mut DatadirModification<'_>,
|
||||
uncommitted: &mut u64,
|
||||
filtered: &mut u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
WAL_INGEST
|
||||
.records_committed
|
||||
.inc_by(*uncommitted - *filtered);
|
||||
modification.commit(ctx).await?;
|
||||
*uncommitted = 0;
|
||||
*filtered = 0;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Pass the WAL data to the decoder, and see if we can decode
|
||||
// more records as a result.
|
||||
let data = xlog_data.data();
|
||||
|
||||
@@ -561,11 +561,17 @@ def test_sharding_split_smoke(
|
||||
workload.write_rows(256)
|
||||
|
||||
# Note which pageservers initially hold a shard after tenant creation
|
||||
pre_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)]
|
||||
log.info("Pre-split pageservers: {pre_split_pageserver_ids}")
|
||||
pre_split_pageserver_ids = dict()
|
||||
for loc in env.storage_controller.locate(tenant_id):
|
||||
shard_no = TenantShardId.parse(loc["shard_id"]).shard_number
|
||||
pre_split_pageserver_ids[loc["node_id"]] = shard_no
|
||||
log.info(f"Pre-split pageservers: {pre_split_pageserver_ids}")
|
||||
|
||||
# For pageservers holding a shard, validate their ingest statistics
|
||||
# reflect a proper splitting of the WAL.
|
||||
|
||||
observed_on_shard_zero = 0
|
||||
received_on_non_zero_shard = 0
|
||||
for pageserver in env.pageservers:
|
||||
if pageserver.id not in pre_split_pageserver_ids:
|
||||
continue
|
||||
@@ -573,28 +579,38 @@ def test_sharding_split_smoke(
|
||||
metrics = pageserver.http_client().get_metrics_values(
|
||||
[
|
||||
"pageserver_wal_ingest_records_received_total",
|
||||
"pageserver_wal_ingest_records_committed_total",
|
||||
"pageserver_wal_ingest_records_filtered_total",
|
||||
"pageserver_wal_ingest_records_observed_total",
|
||||
]
|
||||
)
|
||||
|
||||
log.info(f"Pageserver {pageserver.id} metrics: {metrics}")
|
||||
|
||||
# Not everything received was committed
|
||||
assert (
|
||||
metrics["pageserver_wal_ingest_records_received_total"]
|
||||
> metrics["pageserver_wal_ingest_records_committed_total"]
|
||||
)
|
||||
received = metrics["pageserver_wal_ingest_records_received_total"]
|
||||
observed = metrics["pageserver_wal_ingest_records_observed_total"]
|
||||
|
||||
# Something was committed
|
||||
assert metrics["pageserver_wal_ingest_records_committed_total"] > 0
|
||||
shard_number: int | None = pre_split_pageserver_ids.get(pageserver.id, None)
|
||||
if shard_number is None:
|
||||
assert received == 0
|
||||
assert observed == 0
|
||||
elif shard_number == 0:
|
||||
# Shard 0 receives its own records and observes records of other shards
|
||||
# for relation size tracking.
|
||||
assert observed > 0
|
||||
assert received > 0
|
||||
observed_on_shard_zero = int(observed)
|
||||
else:
|
||||
# Non zero shards do not observe any records, but only receive their own.
|
||||
assert observed == 0
|
||||
assert received > 0
|
||||
received_on_non_zero_shard += int(received)
|
||||
|
||||
# Counts are self consistent
|
||||
assert (
|
||||
metrics["pageserver_wal_ingest_records_received_total"]
|
||||
== metrics["pageserver_wal_ingest_records_committed_total"]
|
||||
+ metrics["pageserver_wal_ingest_records_filtered_total"]
|
||||
)
|
||||
# Some records are sent to multiple shards and some shard 0 records include both value observations
|
||||
# and other metadata. Hence, we do a sanity check below that shard 0 observes the majority of records
|
||||
# received by other shards.
|
||||
assert (
|
||||
observed_on_shard_zero <= received_on_non_zero_shard
|
||||
and observed_on_shard_zero >= received_on_non_zero_shard // 2
|
||||
)
|
||||
|
||||
# TODO: validate that shards have different sizes
|
||||
|
||||
@@ -633,7 +649,7 @@ def test_sharding_split_smoke(
|
||||
# We should have split into 8 shards, on the same 4 pageservers we started on.
|
||||
assert len(post_split_pageserver_ids) == split_shard_count
|
||||
assert len(set(post_split_pageserver_ids)) == shard_count
|
||||
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids)
|
||||
assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids.keys())
|
||||
|
||||
# The old parent shards should no longer exist on disk
|
||||
assert not shards_on_disk(old_shard_ids)
|
||||
@@ -739,7 +755,7 @@ def test_sharding_split_smoke(
|
||||
# all the pageservers that originally held an attached shard should still hold one, otherwise
|
||||
# it would indicate that we had done some unnecessary migration.
|
||||
log.info(f"attached: {attached}")
|
||||
for ps_id in pre_split_pageserver_ids:
|
||||
for ps_id in pre_split_pageserver_ids.keys():
|
||||
log.info("Pre-split pageserver {ps_id} should still hold an attached location")
|
||||
assert ps_id in attached
|
||||
|
||||
|
||||
Reference in New Issue
Block a user