From 61244afb59597b0dc30d01f83528c213bad9013d Mon Sep 17 00:00:00 2001 From: John Spray Date: Sun, 5 Nov 2023 14:02:05 +0000 Subject: [PATCH] pageserver: filter WAL by ShardIdentity --- pageserver/src/pgdatadir_mapping.rs | 2 +- pageserver/src/walingest.rs | 59 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 9e8a6b02cc..12aff7674f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1514,7 +1514,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key { } } -fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key { +pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key { Key { field1: 0x00, field2: rel.spcnode, diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 23367928d3..19804da34f 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,6 +21,7 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. +use pageserver_api::shard::ShardIdentity; use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; @@ -46,6 +47,7 @@ use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; pub struct WalIngest<'a> { + shard: ShardIdentity, timeline: &'a Timeline, checkpoint: CheckPoint, @@ -65,6 +67,7 @@ impl<'a> WalIngest<'a> { trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); Ok(WalIngest { + shard: timeline.get_shard(), timeline, checkpoint, checkpoint_modified: false, @@ -90,6 +93,29 @@ impl<'a> WalIngest<'a> { modification.lsn = lsn; decode_wal_record(recdata, decoded, self.timeline.pg_version)?; + // Fast path: we may skip the entire record if it only references blocks on another shard. + // Otherwise we proceed, and filter blocks later. + let any_local_blocks = decoded.blocks.iter().any(|blk| { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + self.shard.is_key_local(&key) + }); + // - We need at least one block to skip: otherwise we assume the record's + // payload is all in its other fields, which are metadata-ish things that + // we broadcast to all shards + // - ...and obviously, we can only skip a WAL record if it doesn't need to + // write to any pages in this shard. + let skip_record = decoded.blocks.len() > 0 && !any_local_blocks; + // TODO: actually skip (and update LSN at the time). Currently we just + // check later in the function that if we set skip_record==true, then we + // really would not have done any local IO. + let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -358,6 +384,26 @@ impl<'a> WalIngest<'a> { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + let key_is_local = self.shard.is_key_local(&key); + + tracing::info!( + "ingest: shard decision {} (checkpoint={}) for key {}", + if !key_is_local { "drop" } else { "keep" }, + self.checkpoint_modified, + key + ); + + if !key_is_local { + continue; + } self.ingest_decoded_block(modification, lsn, decoded, blk, ctx) .await?; } @@ -370,6 +416,12 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = false; } + if skip_record && !modification.is_no_op() { + tracing::error!( + "WAL record @ {lsn} would have been dropped, but we actually did modifications!" + ); + } + // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN modification.commit(ctx).await?; @@ -1459,8 +1511,15 @@ impl<'a> WalIngest<'a> { //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks); modification.put_rel_extend(rel, new_nblocks, ctx).await?; + let mut key = rel_block_to_key(rel, blknum); // fill the gap with zeros for gap_blknum in old_nblocks..blknum { + key.field6 = gap_blknum; + + if self.shard.get_shard_number(&key) != self.shard.number { + continue; + } + modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?; } }