diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index 7bcc0ee4c6..7b4187b9dd 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -411,6 +411,12 @@ impl ShardIdentity { String::new() } } + + /// Convenience for checking if this identity is the 0th shard in a tenant, + /// for special cases on shard 0 such as ingesting relation sizes. + pub fn is_zero(&self) -> bool { + self.number == ShardNumber(0) + } } impl Serialize for ShardIndex { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 6e311041ba..e79bfc9be0 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1167,6 +1167,30 @@ pub(crate) static DELETION_QUEUE: Lazy = Lazy::new(|| { } }); +pub(crate) struct WalIngestMetrics { + pub(crate) records_received: IntCounter, + pub(crate) records_committed: IntCounter, + pub(crate) records_filtered: IntCounter, +} + +pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMetrics { + records_received: register_int_counter!( + "pageserver_wal_ingest_records_received", + "Number of WAL records received from safekeeper" + ) + .expect("failed to define a metric"), + records_committed: register_int_counter!( + "pageserver_wal_ingest_records_committed", + "Number of WAL records which resulted in writes to pageserver storage" + ) + .expect("failed to define a metric"), + records_filtered: register_int_counter!( + "pageserver_wal_ingest_records_filtered", + "Number of WAL records filtered out due to sharding" + ) + .expect("failed to define a metric"), +}); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteOpKind { Upload, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index d37d953696..c653f0b7ea 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1368,6 +1368,10 @@ impl<'a> DatadirModification<'a> { Ok(()) } + pub(crate) fn is_empty(&self) -> bool { + self.pending_updates.is_empty() && self.pending_deletions.is_empty() + } + // Internal helper functions to batch the modifications async fn get(&self, key: Key, ctx: &RequestContext) -> Result { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index cbb08f7ff1..7be78ab3a1 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,6 +21,7 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. +use pageserver_api::shard::ShardIdentity; use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; @@ -30,6 +31,7 @@ use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use crate::context::RequestContext; +use crate::metrics::WAL_INGEST; use crate::pgdatadir_mapping::*; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; @@ -46,6 +48,7 @@ use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; pub struct WalIngest<'a> { + shard: ShardIdentity, timeline: &'a Timeline, checkpoint: CheckPoint, @@ -65,6 +68,7 @@ impl<'a> WalIngest<'a> { trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); Ok(WalIngest { + shard: *timeline.get_shard_identity(), timeline, checkpoint, checkpoint_modified: false, @@ -87,6 +91,8 @@ impl<'a> WalIngest<'a> { decoded: &mut DecodedWALRecord, ctx: &RequestContext, ) -> anyhow::Result<()> { + WAL_INGEST.records_received.inc(); + modification.lsn = lsn; decode_wal_record(recdata, decoded, self.timeline.pg_version)?; @@ -355,6 +361,32 @@ impl<'a> WalIngest<'a> { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + let key_is_local = self.shard.is_key_local(&key); + + tracing::debug!( + "ingest: shard decision {} (checkpoint={}) for key {}", + if !key_is_local { "drop" } else { "keep" }, + self.checkpoint_modified, + key + ); + + if !key_is_local { + if self.shard.is_zero() { + // Shard 0 tracks relation sizes. Although we will not store this block, we will observe + // its blkno in case it implicitly extends a relation. + self.observe_decoded_block(modification, blk, ctx).await?; + } + + continue; + } self.ingest_decoded_block(modification, lsn, decoded, blk, ctx) .await?; } @@ -367,13 +399,37 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = false; } + if modification.is_empty() { + tracing::debug!("ingest: filtered out record @ LSN {lsn}"); + WAL_INGEST.records_filtered.inc(); + return Ok(()); + } + // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN + WAL_INGEST.records_committed.inc(); modification.commit(ctx).await?; Ok(()) } + /// Do not store this block, but observe it for the purposes of updating our relation size state. + async fn observe_decoded_block( + &mut self, + modification: &mut DatadirModification<'_>, + blk: &DecodedBkpBlock, + ctx: &RequestContext, + ) -> Result<(), PageReconstructError> { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + self.handle_rel_extend(modification, rel, blk.blkno, ctx) + .await + } + async fn ingest_decoded_block( &mut self, modification: &mut DatadirModification<'_>, @@ -1465,8 +1521,15 @@ impl<'a> WalIngest<'a> { //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks); modification.put_rel_extend(rel, new_nblocks, ctx).await?; + let mut key = rel_block_to_key(rel, blknum); // fill the gap with zeros for gap_blknum in old_nblocks..blknum { + key.field6 = gap_blknum; + + if self.shard.get_shard_number(&key) != self.shard.number { + continue; + } + modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?; } }