pageserver: filter WAL by ShardIdentity

This commit is contained in:
John Spray
2023-11-05 14:02:05 +00:00
parent 61fe9d360d
commit f7795ee2a5
4 changed files with 97 additions and 0 deletions

View File

@@ -411,6 +411,12 @@ impl ShardIdentity {
String::new()
}
}
/// Convenience for checking if this identity is the 0th shard in a tenant,
/// for special cases on shard 0 such as ingesting relation sizes.
pub fn is_zero(&self) -> bool {
self.number == ShardNumber(0)
}
}
impl Serialize for ShardIndex {

View File

@@ -1167,6 +1167,30 @@ pub(crate) static DELETION_QUEUE: Lazy<DeletionQueueMetrics> = Lazy::new(|| {
}
});
pub(crate) struct WalIngestMetrics {
pub(crate) records_received: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
}
pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| WalIngestMetrics {
records_received: register_int_counter!(
"pageserver_wal_ingest_records_received",
"Number of WAL records received from safekeeper"
)
.expect("failed to define a metric"),
records_committed: register_int_counter!(
"pageserver_wal_ingest_records_committed",
"Number of WAL records which resulted in writes to pageserver storage"
)
.expect("failed to define a metric"),
records_filtered: register_int_counter!(
"pageserver_wal_ingest_records_filtered",
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RemoteOpKind {
Upload,

View File

@@ -1368,6 +1368,10 @@ impl<'a> DatadirModification<'a> {
Ok(())
}
pub(crate) fn is_empty(&self) -> bool {
self.pending_updates.is_empty() && self.pending_deletions.is_empty()
}
// Internal helper functions to batch the modifications
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {

View File

@@ -21,6 +21,7 @@
//! redo Postgres process, but some records it can handle directly with
//! bespoken Rust code.
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -30,6 +31,7 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use crate::context::RequestContext;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
@@ -46,6 +48,7 @@ use postgres_ffi::BLCKSZ;
use utils::lsn::Lsn;
pub struct WalIngest<'a> {
shard: ShardIdentity,
timeline: &'a Timeline,
checkpoint: CheckPoint,
@@ -65,6 +68,7 @@ impl<'a> WalIngest<'a> {
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
timeline,
checkpoint,
checkpoint_modified: false,
@@ -87,6 +91,8 @@ impl<'a> WalIngest<'a> {
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
) -> anyhow::Result<()> {
WAL_INGEST.records_received.inc();
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
@@ -355,6 +361,32 @@ impl<'a> WalIngest<'a> {
// Iterate through all the blocks that the record modifies, and
// "put" a separate copy of the record for each block.
for blk in decoded.blocks.iter() {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
let key = rel_block_to_key(rel, blk.blkno);
let key_is_local = self.shard.is_key_local(&key);
tracing::debug!(
"ingest: shard decision {} (checkpoint={}) for key {}",
if !key_is_local { "drop" } else { "keep" },
self.checkpoint_modified,
key
);
if !key_is_local {
if self.shard.is_zero() {
// Shard 0 tracks relation sizes. Although we will not store this block, we will observe
// its blkno in case it implicitly extends a relation.
self.observe_decoded_block(modification, blk, ctx).await?;
}
continue;
}
self.ingest_decoded_block(modification, lsn, decoded, blk, ctx)
.await?;
}
@@ -367,13 +399,37 @@ impl<'a> WalIngest<'a> {
self.checkpoint_modified = false;
}
if modification.is_empty() {
tracing::debug!("ingest: filtered out record @ LSN {lsn}");
WAL_INGEST.records_filtered.inc();
return Ok(());
}
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
WAL_INGEST.records_committed.inc();
modification.commit(ctx).await?;
Ok(())
}
/// Do not store this block, but observe it for the purposes of updating our relation size state.
async fn observe_decoded_block(
&mut self,
modification: &mut DatadirModification<'_>,
blk: &DecodedBkpBlock,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
let rel = RelTag {
spcnode: blk.rnode_spcnode,
dbnode: blk.rnode_dbnode,
relnode: blk.rnode_relnode,
forknum: blk.forknum,
};
self.handle_rel_extend(modification, rel, blk.blkno, ctx)
.await
}
async fn ingest_decoded_block(
&mut self,
modification: &mut DatadirModification<'_>,
@@ -1465,8 +1521,15 @@ impl<'a> WalIngest<'a> {
//info!("extending {} {} to {}", rel, old_nblocks, new_nblocks);
modification.put_rel_extend(rel, new_nblocks, ctx).await?;
let mut key = rel_block_to_key(rel, blknum);
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
key.field6 = gap_blknum;
if self.shard.get_shard_number(&key) != self.shard.number {
continue;
}
modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?;
}
}