diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index af22de5d95..6576dd0eba 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -95,6 +95,14 @@ impl InterpretedWalRecord { && self.metadata_record.is_none() && matches!(self.flush_uncommitted, FlushUncommittedRecords::No) } + + /// Checks if the WAL record is observed (i.e. contains only metadata + /// for observed values) + pub fn is_observed(&self) -> bool { + self.batch.is_observed() + && self.metadata_record.is_none() + && matches!(self.flush_uncommitted, FlushUncommittedRecords::No) + } } /// The interpreted part of the Postgres WAL record which requires metadata diff --git a/libs/wal_decoder/src/serialized_batch.rs b/libs/wal_decoder/src/serialized_batch.rs index 41294da7a0..af2b179e05 100644 --- a/libs/wal_decoder/src/serialized_batch.rs +++ b/libs/wal_decoder/src/serialized_batch.rs @@ -501,6 +501,11 @@ impl SerializedValueBatch { !self.has_data() && self.metadata.is_empty() } + /// Checks if the batch contains only observed values + pub fn is_observed(&self) -> bool { + !self.has_data() && !self.metadata.is_empty() + } + /// Checks if the batch contains data /// /// Note that if this returns false, it may still contain observed values or diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b4e20cb8b9..e825d538a2 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -2337,13 +2337,15 @@ macro_rules! redo_bytes_histogram_count_buckets { pub(crate) struct WalIngestMetrics { pub(crate) bytes_received: IntCounter, pub(crate) records_received: IntCounter, + pub(crate) records_observed: IntCounter, pub(crate) records_committed: IntCounter, pub(crate) records_filtered: IntCounter, pub(crate) gap_blocks_zeroed_on_rel_extend: IntCounter, pub(crate) clear_vm_bits_unknown: IntCounterVec, } -pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMetrics { +pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| { + WalIngestMetrics { bytes_received: register_int_counter!( "pageserver_wal_ingest_bytes_received", "Bytes of WAL ingested from safekeepers", @@ -2354,6 +2356,11 @@ pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMet "Number of WAL records received from safekeepers" ) .expect("failed to define a metric"), + records_observed: register_int_counter!( + "pageserver_wal_ingest_records_observed", + "Number of WAL records observed from safekeepers. These are metadata only records for shard 0." + ) + .expect("failed to define a metric"), records_committed: register_int_counter!( "pageserver_wal_ingest_records_committed", "Number of WAL records which resulted in writes to pageserver storage" @@ -2375,6 +2382,7 @@ pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMet &["entity"], ) .expect("failed to define a metric"), +} }); pub(crate) static PAGESERVER_TIMELINE_WAL_RECORDS_RECEIVED: Lazy = Lazy::new(|| { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 3f10eeda60..d74faa1af5 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -319,27 +319,11 @@ pub(super) async fn handle_walreceiver_connection( return Ok(()); } - async fn commit( - modification: &mut DatadirModification<'_>, - uncommitted: &mut u64, - filtered: &mut u64, - ctx: &RequestContext, - ) -> anyhow::Result<()> { - WAL_INGEST - .records_committed - .inc_by(*uncommitted - *filtered); - modification.commit(ctx).await?; - *uncommitted = 0; - *filtered = 0; - Ok(()) - } - let status_update = match replication_message { ReplicationMessage::RawInterpretedWalRecords(raw) => { WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64); let mut uncommitted_records = 0; - let mut filtered_records = 0; // This is the end LSN of the raw WAL from which the records // were interpreted. @@ -380,31 +364,23 @@ pub(super) async fn handle_walreceiver_connection( if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) && uncommitted_records > 0 { - commit( - &mut modification, - &mut uncommitted_records, - &mut filtered_records, - &ctx, - ) - .await?; + modification.commit(&ctx).await?; + uncommitted_records = 0; } let local_next_record_lsn = interpreted.next_record_lsn; - let ingested = walingest + + if interpreted.is_observed() { + WAL_INGEST.records_observed.inc(); + } + + walingest .ingest_record(interpreted, &mut modification, &ctx) .await .with_context(|| { format!("could not ingest record at {local_next_record_lsn}") })?; - if !ingested { - tracing::debug!( - "ingest: filtered out record @ LSN {local_next_record_lsn}" - ); - WAL_INGEST.records_filtered.inc(); - filtered_records += 1; - } - uncommitted_records += 1; // FIXME: this cannot be made pausable_failpoint without fixing the @@ -418,13 +394,8 @@ pub(super) async fn handle_walreceiver_connection( || modification.approx_pending_bytes() > DatadirModification::MAX_PENDING_BYTES { - commit( - &mut modification, - &mut uncommitted_records, - &mut filtered_records, - &ctx, - ) - .await?; + modification.commit(&ctx).await?; + uncommitted_records = 0; } } @@ -442,13 +413,7 @@ pub(super) async fn handle_walreceiver_connection( if uncommitted_records > 0 || needs_last_record_lsn_advance { // Commit any uncommitted records - commit( - &mut modification, - &mut uncommitted_records, - &mut filtered_records, - &ctx, - ) - .await?; + modification.commit(&ctx).await?; } if !caught_up && streaming_lsn >= end_of_wal { @@ -469,6 +434,21 @@ pub(super) async fn handle_walreceiver_connection( } ReplicationMessage::XLogData(xlog_data) => { + async fn commit( + modification: &mut DatadirModification<'_>, + uncommitted: &mut u64, + filtered: &mut u64, + ctx: &RequestContext, + ) -> anyhow::Result<()> { + WAL_INGEST + .records_committed + .inc_by(*uncommitted - *filtered); + modification.commit(ctx).await?; + *uncommitted = 0; + *filtered = 0; + Ok(()) + } + // Pass the WAL data to the decoder, and see if we can decode // more records as a result. let data = xlog_data.data(); diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 4c381b563f..673904a1cd 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -561,11 +561,17 @@ def test_sharding_split_smoke( workload.write_rows(256) # Note which pageservers initially hold a shard after tenant creation - pre_split_pageserver_ids = [loc["node_id"] for loc in env.storage_controller.locate(tenant_id)] - log.info("Pre-split pageservers: {pre_split_pageserver_ids}") + pre_split_pageserver_ids = dict() + for loc in env.storage_controller.locate(tenant_id): + shard_no = TenantShardId.parse(loc["shard_id"]).shard_number + pre_split_pageserver_ids[loc["node_id"]] = shard_no + log.info(f"Pre-split pageservers: {pre_split_pageserver_ids}") # For pageservers holding a shard, validate their ingest statistics # reflect a proper splitting of the WAL. + + observed_on_shard_zero = 0 + received_on_non_zero_shard = 0 for pageserver in env.pageservers: if pageserver.id not in pre_split_pageserver_ids: continue @@ -573,28 +579,38 @@ def test_sharding_split_smoke( metrics = pageserver.http_client().get_metrics_values( [ "pageserver_wal_ingest_records_received_total", - "pageserver_wal_ingest_records_committed_total", - "pageserver_wal_ingest_records_filtered_total", + "pageserver_wal_ingest_records_observed_total", ] ) log.info(f"Pageserver {pageserver.id} metrics: {metrics}") - # Not everything received was committed - assert ( - metrics["pageserver_wal_ingest_records_received_total"] - > metrics["pageserver_wal_ingest_records_committed_total"] - ) + received = metrics["pageserver_wal_ingest_records_received_total"] + observed = metrics["pageserver_wal_ingest_records_observed_total"] - # Something was committed - assert metrics["pageserver_wal_ingest_records_committed_total"] > 0 + shard_number: int | None = pre_split_pageserver_ids.get(pageserver.id, None) + if shard_number is None: + assert received == 0 + assert observed == 0 + elif shard_number == 0: + # Shard 0 receives its own records and observes records of other shards + # for relation size tracking. + assert observed > 0 + assert received > 0 + observed_on_shard_zero = int(observed) + else: + # Non zero shards do not observe any records, but only receive their own. + assert observed == 0 + assert received > 0 + received_on_non_zero_shard += int(received) - # Counts are self consistent - assert ( - metrics["pageserver_wal_ingest_records_received_total"] - == metrics["pageserver_wal_ingest_records_committed_total"] - + metrics["pageserver_wal_ingest_records_filtered_total"] - ) + # Some records are sent to multiple shards and some shard 0 records include both value observations + # and other metadata. Hence, we do a sanity check below that shard 0 observes the majority of records + # received by other shards. + assert ( + observed_on_shard_zero <= received_on_non_zero_shard + and observed_on_shard_zero >= received_on_non_zero_shard // 2 + ) # TODO: validate that shards have different sizes @@ -633,7 +649,7 @@ def test_sharding_split_smoke( # We should have split into 8 shards, on the same 4 pageservers we started on. assert len(post_split_pageserver_ids) == split_shard_count assert len(set(post_split_pageserver_ids)) == shard_count - assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids) + assert set(post_split_pageserver_ids) == set(pre_split_pageserver_ids.keys()) # The old parent shards should no longer exist on disk assert not shards_on_disk(old_shard_ids) @@ -739,7 +755,7 @@ def test_sharding_split_smoke( # all the pageservers that originally held an attached shard should still hold one, otherwise # it would indicate that we had done some unnecessary migration. log.info(f"attached: {attached}") - for ps_id in pre_split_pageserver_ids: + for ps_id in pre_split_pageserver_ids.keys(): log.info("Pre-split pageserver {ps_id} should still hold an attached location") assert ps_id in attached