From 2c544343e0e16eca4704f3477a470f0646a3481f Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 8 Dec 2023 10:12:37 +0000 Subject: [PATCH] pageserver: filtered WAL ingest for sharding (#6024) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem Currently, if one creates many shards they will all ingest all the data: not much use! We want them to ingest a proportional share of the data each. Closes: #6025 ## Summary of changes - WalIngest object gets a copy of the ShardIdentity for the Tenant it was created by. - While iterating the `blocks` part of a decoded record, blocks that do not match the current shard are ignored, apart from on shard zero where they are used to update relation sizes in `observe_decoded_block` (but not stored). - Before committing a `DataDirModificiation` from a WAL record, we check if it's empty, and drop the record if so. This check is necessary (rather than just looking at the `blocks` part) because certain record types may modify blocks in non-obvious ways (e.g. `ingest_heapam_record`). - Add WAL ingest metrics to record the total received, total committed, and total filtered out - Behaviour for unsharded tenants is unchanged: they will continue to ingest all blocks, and will take the fast path through `is_key_local` that doesn't bother calculating any hashes. After this change, shards store a subset of the tenant's total data, and accurate relation sizes are only maintained on shard zero. --------- Co-authored-by: Arpad Müller --- libs/pageserver_api/src/shard.rs | 6 +++ pageserver/src/metrics.rs | 24 ++++++++++ pageserver/src/pgdatadir_mapping.rs | 4 ++ pageserver/src/tenant/timeline.rs | 2 +- pageserver/src/walingest.rs | 69 ++++++++++++++++++++++++++++- 5 files changed, 102 insertions(+), 3 deletions(-) diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index f8f9449d86..9e83e0eee2 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -420,6 +420,12 @@ impl ShardIdentity { String::new() } } + + /// Convenience for checking if this identity is the 0th shard in a tenant, + /// for special cases on shard 0 such as ingesting relation sizes. + pub fn is_zero(&self) -> bool { + self.number == ShardNumber(0) + } } impl Serialize for ShardIndex { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 3554a93ed9..80113220ae 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1168,6 +1168,30 @@ pub(crate) static DELETION_QUEUE: Lazy = Lazy::new(|| { } }); +pub(crate) struct WalIngestMetrics { + pub(crate) records_received: IntCounter, + pub(crate) records_committed: IntCounter, + pub(crate) records_filtered: IntCounter, +} + +pub(crate) static WAL_INGEST: Lazy = Lazy::new(|| WalIngestMetrics { + records_received: register_int_counter!( + "pageserver_wal_ingest_records_received", + "Number of WAL records received from safekeepers" + ) + .expect("failed to define a metric"), + records_committed: register_int_counter!( + "pageserver_wal_ingest_records_committed", + "Number of WAL records which resulted in writes to pageserver storage" + ) + .expect("failed to define a metric"), + records_filtered: register_int_counter!( + "pageserver_wal_ingest_records_filtered", + "Number of WAL records filtered out due to sharding" + ) + .expect("failed to define a metric"), +}); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteOpKind { Upload, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index d37d953696..c653f0b7ea 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1368,6 +1368,10 @@ impl<'a> DatadirModification<'a> { Ok(()) } + pub(crate) fn is_empty(&self) -> bool { + self.pending_updates.is_empty() && self.pending_deletions.is_empty() + } + // Internal helper functions to batch the modifications async fn get(&self, key: Key, ctx: &RequestContext) -> Result { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 882a5ef199..551b66b77d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2524,7 +2524,7 @@ impl Timeline { Ok(()) } - fn finish_write(&self, new_lsn: Lsn) { + pub(crate) fn finish_write(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); self.metrics.last_record_gauge.set(new_lsn.0 as i64); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index cbb08f7ff1..75b29a2fed 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -21,6 +21,7 @@ //! redo Postgres process, but some records it can handle directly with //! bespoken Rust code. +use pageserver_api::shard::ShardIdentity; use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes; use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; @@ -30,6 +31,7 @@ use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use crate::context::RequestContext; +use crate::metrics::WAL_INGEST; use crate::pgdatadir_mapping::*; use crate::tenant::PageReconstructError; use crate::tenant::Timeline; @@ -46,6 +48,7 @@ use postgres_ffi::BLCKSZ; use utils::lsn::Lsn; pub struct WalIngest<'a> { + shard: ShardIdentity, timeline: &'a Timeline, checkpoint: CheckPoint, @@ -65,6 +68,7 @@ impl<'a> WalIngest<'a> { trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); Ok(WalIngest { + shard: *timeline.get_shard_identity(), timeline, checkpoint, checkpoint_modified: false, @@ -87,6 +91,8 @@ impl<'a> WalIngest<'a> { decoded: &mut DecodedWALRecord, ctx: &RequestContext, ) -> anyhow::Result<()> { + WAL_INGEST.records_received.inc(); + modification.lsn = lsn; decode_wal_record(recdata, decoded, self.timeline.pg_version)?; @@ -355,6 +361,33 @@ impl<'a> WalIngest<'a> { // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + let key_is_local = self.shard.is_key_local(&key); + + tracing::debug!( + lsn=%lsn, + key=%key, + "ingest: shard decision {} (checkpoint={})", + if !key_is_local { "drop" } else { "keep" }, + self.checkpoint_modified + ); + + if !key_is_local { + if self.shard.is_zero() { + // Shard 0 tracks relation sizes. Although we will not store this block, we will observe + // its blkno in case it implicitly extends a relation. + self.observe_decoded_block(modification, blk, ctx).await?; + } + + continue; + } self.ingest_decoded_block(modification, lsn, decoded, blk, ctx) .await?; } @@ -367,13 +400,38 @@ impl<'a> WalIngest<'a> { self.checkpoint_modified = false; } + if modification.is_empty() { + tracing::debug!("ingest: filtered out record @ LSN {lsn}"); + WAL_INGEST.records_filtered.inc(); + modification.tline.finish_write(lsn); + } else { + WAL_INGEST.records_committed.inc(); + modification.commit(ctx).await?; + } + // Now that this record has been fully handled, including updating the - // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit(ctx).await?; + // checkpoint data, let the repository know that it is up-to-date to this LSN. Ok(()) } + /// Do not store this block, but observe it for the purposes of updating our relation size state. + async fn observe_decoded_block( + &mut self, + modification: &mut DatadirModification<'_>, + blk: &DecodedBkpBlock, + ctx: &RequestContext, + ) -> Result<(), PageReconstructError> { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + self.handle_rel_extend(modification, rel, blk.blkno, ctx) + .await + } + async fn ingest_decoded_block( &mut self, modification: &mut DatadirModification<'_>, @@ -1465,8 +1523,15 @@ impl<'a> WalIngest<'a> { //info!("extending {} {} to {}", rel, old_nblocks, new_nblocks); modification.put_rel_extend(rel, new_nblocks, ctx).await?; + let mut key = rel_block_to_key(rel, blknum); // fill the gap with zeros for gap_blknum in old_nblocks..blknum { + key.field6 = gap_blknum; + + if self.shard.get_shard_number(&key) != self.shard.number { + continue; + } + modification.put_rel_page_image(rel, gap_blknum, ZERO_PAGE.clone())?; } }