pageserver: filter WAL by ShardIdentity

This commit is contained in:
John Spray
2023-11-05 14:02:05 +00:00
parent e20732fdcb
commit 61244afb59
2 changed files with 60 additions and 1 deletions

View File

@@ -1514,7 +1514,7 @@ fn rel_dir_to_key(spcnode: Oid, dbnode: Oid) -> Key {
}
}
fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
pub fn rel_block_to_key(rel: RelTag, blknum: BlockNumber) -> Key {
Key {
field1: 0x00,
field2: rel.spcnode,

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -46,6 +47,7 @@ use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
pub struct WalIngest<'a> {
shard: ShardIdentity,
timeline: &'a Timeline,
checkpoint: CheckPoint,
@@ -65,6 +67,7 @@ impl<'a> WalIngest<'a> {
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: timeline.get_shard(),
timeline,
checkpoint,
checkpoint_modified: false,
@@ -90,6 +93,29 @@ impl<'a> WalIngest<'a> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
// Fast path: we may skip the entire record if it only references blocks on another shard.
// Otherwise we proceed, and filter blocks later.
let any_local_blocks = decoded.blocks.iter().any(|blk| {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
self.shard.is_key_local(&key)
});
// - We need at least one block to skip: otherwise we assume the record's
// payload is all in its other fields, which are metadata-ish things that
// we broadcast to all shards
// - ...and obviously, we can only skip a WAL record if it doesn't need to
// write to any pages in this shard.
let skip_record = decoded.blocks.len() > 0 && !any_local_blocks;
// TODO: actually skip (and update LSN at the time). Currently we just
// check later in the function that if we set skip_record==true, then we
// really would not have done any local IO.
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -358,6 +384,26 @@ impl<'a> WalIngest<'a> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
let key_is_local = self.shard.is_key_local(&key);
tracing::info!(
"ingest: shard decision {} (checkpoint={}) for key {}",
if !key_is_local { "drop" } else { "keep" },
self.checkpoint_modified,
key
);
if !key_is_local {
continue;
}
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
.await?;
}
@@ -370,6 +416,12 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = false;
}
if skip_record && !modification.is_no_op() {
tracing::error!(
"WAL record @ {lsn} would have been dropped, but we actually did modifications!"
);
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit(ctx).await?;
@@ -1459,8 +1511,15 @@ impl<'a> WalIngest<'a> {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
let mut key = rel_block_to_key(rel, blknum);
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
key.field6 = gap_blknum;
if self.shard.get_shard_number(&key) != self.shard.number {
continue;
}
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}