From dcb629532b075a68ba6a2aeeb3933e8ac73efbb9 Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 3 Dec 2024 17:22:49 +0000 Subject: [PATCH] pageserver: only store SLRUs & aux files on shard zero (#9786) ## Problem Since https://github.com/neondatabase/neon/pull/9423 the non-zero shards no longer need SLRU content in order to do GC. This data is now redundant on shards >0. One release cycle after merging that PR, we may merge this one, which also stops writing those pages to shards > 0, reaping the efficiency benefit. Closes: https://github.com/neondatabase/neon/issues/7512 Closes: https://github.com/neondatabase/neon/issues/9641 ## Summary of changes - Avoid storing SLRUs on non-zero shards - Bonus: avoid storing aux files on non-zero shards --- libs/pageserver_api/src/key.rs | 5 ++ libs/pageserver_api/src/shard.rs | 34 +++++++++--- libs/wal_decoder/src/decoder.rs | 54 ++++++++++-------- pageserver/src/import_datadir.rs | 18 ++++-- pageserver/src/pgdatadir_mapping.rs | 55 ++++++++++++------- .../src/tenant/timeline/import_pgdata/flow.rs | 49 +++++++---------- pageserver/src/walingest.rs | 4 ++ 7 files changed, 134 insertions(+), 85 deletions(-) diff --git a/libs/pageserver_api/src/key.rs b/libs/pageserver_api/src/key.rs index 523d143381..37dff6fe46 100644 --- a/libs/pageserver_api/src/key.rs +++ b/libs/pageserver_api/src/key.rs @@ -770,6 +770,11 @@ impl Key { && self.field6 == 1 } + #[inline(always)] + pub fn is_aux_file_key(&self) -> bool { + self.field1 == AUX_KEY_PREFIX + } + /// Guaranteed to return `Ok()` if [`Self::is_rel_block_key`] returns `true` for `key`. #[inline(always)] pub fn to_rel_block(self) -> anyhow::Result<(RelTag, BlockNumber)> { diff --git a/libs/pageserver_api/src/shard.rs b/libs/pageserver_api/src/shard.rs index e83cf4c855..a5c94a82c1 100644 --- a/libs/pageserver_api/src/shard.rs +++ b/libs/pageserver_api/src/shard.rs @@ -170,19 +170,37 @@ impl ShardIdentity { } } + /// Return true if the key should be stored on all shards, not just one. + fn is_key_global(&self, key: &Key) -> bool { + if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() { + // Special keys that are only stored on shard 0 + false + } else if key.is_rel_block_key() { + // Ordinary relation blocks are distributed across shards + false + } else if key.is_rel_size_key() { + // All shards maintain rel size keys (although only shard 0 is responsible for + // keeping it strictly accurate, other shards just reflect the highest block they've ingested) + true + } else { + // For everything else, we assume it must be kept everywhere, because ingest code + // might assume this -- this covers functionality where the ingest code has + // not (yet) been made fully shard aware. + true + } + } + /// Return true if the key should be discarded if found in this shard's /// data store, e.g. during compaction after a split. /// /// Shards _may_ drop keys which return false here, but are not obliged to. pub fn is_key_disposable(&self, key: &Key) -> bool { - if key_is_shard0(key) { - // Q: Why can't we dispose of shard0 content if we're not shard 0? - // A1: because the WAL ingestion logic currently ingests some shard 0 - // content on all shards, even though it's only read on shard 0. If we - // dropped it, then subsequent WAL ingest to these keys would encounter - // an error. - // A2: because key_is_shard0 also covers relation size keys, which are written - // on all shards even though they're only maintained accurately on shard 0. + if self.count < ShardCount(2) { + // Fast path: unsharded tenant doesn't dispose of anything + return false; + } + + if self.is_key_global(key) { false } else { !self.is_key_local(key) diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 36c4b19266..aa50c62911 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -112,30 +112,38 @@ impl MetadataRecord { }; // Next, filter the metadata record by shard. - - // Route VM page updates to the shards that own them. VM pages are stored in the VM fork - // of the main relation. These are sharded and managed just like regular relation pages. - // See: https://github.com/neondatabase/neon/issues/9855 - if let Some( - MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits)) - | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)), - ) = metadata_record - { - let is_local_vm_page = |heap_blk| { - let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk); - shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk)) - }; - // Send the old and new VM page updates to their respective shards. - clear_vm_bits.old_heap_blkno = clear_vm_bits - .old_heap_blkno - .filter(|&blkno| is_local_vm_page(blkno)); - clear_vm_bits.new_heap_blkno = clear_vm_bits - .new_heap_blkno - .filter(|&blkno| is_local_vm_page(blkno)); - // If neither VM page belongs to this shard, discard the record. - if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() { - metadata_record = None + match metadata_record { + Some( + MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits)) + | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)), + ) => { + // Route VM page updates to the shards that own them. VM pages are stored in the VM fork + // of the main relation. These are sharded and managed just like regular relation pages. + // See: https://github.com/neondatabase/neon/issues/9855 + let is_local_vm_page = |heap_blk| { + let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk); + shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk)) + }; + // Send the old and new VM page updates to their respective shards. + clear_vm_bits.old_heap_blkno = clear_vm_bits + .old_heap_blkno + .filter(|&blkno| is_local_vm_page(blkno)); + clear_vm_bits.new_heap_blkno = clear_vm_bits + .new_heap_blkno + .filter(|&blkno| is_local_vm_page(blkno)); + // If neither VM page belongs to this shard, discard the record. + if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() + { + metadata_record = None + } } + Some(MetadataRecord::LogicalMessage(LogicalMessageRecord::Put(_))) => { + // Filter LogicalMessage records (AUX files) to only be stored on shard zero + if !shard.is_shard_zero() { + metadata_record = None; + } + } + _ => {} } Ok(metadata_record) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 06c4553e1c..c061714010 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -575,18 +575,24 @@ async fn import_file( } else if file_path.starts_with("pg_xact") { let slru = SlruKind::Clog; - import_slru(modification, slru, file_path, reader, len, ctx).await?; - debug!("imported clog slru"); + if modification.tline.tenant_shard_id.is_shard_zero() { + import_slru(modification, slru, file_path, reader, len, ctx).await?; + debug!("imported clog slru"); + } } else if file_path.starts_with("pg_multixact/offsets") { let slru = SlruKind::MultiXactOffsets; - import_slru(modification, slru, file_path, reader, len, ctx).await?; - debug!("imported multixact offsets slru"); + if modification.tline.tenant_shard_id.is_shard_zero() { + import_slru(modification, slru, file_path, reader, len, ctx).await?; + debug!("imported multixact offsets slru"); + } } else if file_path.starts_with("pg_multixact/members") { let slru = SlruKind::MultiXactMembers; - import_slru(modification, slru, file_path, reader, len, ctx).await?; - debug!("imported multixact members slru"); + if modification.tline.tenant_shard_id.is_shard_zero() { + import_slru(modification, slru, file_path, reader, len, ctx).await?; + debug!("imported multixact members slru"); + } } else if file_path.starts_with("pg_twophase") { let bytes = read_all_bytes(reader).await?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index a00ec761e2..255bd01e25 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -530,6 +530,7 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); let n_blocks = self .get_slru_segment_size(kind, segno, Version::Lsn(lsn), ctx) .await?; @@ -552,6 +553,7 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); let key = slru_block_to_key(kind, segno, blknum); self.get(key, lsn, ctx).await } @@ -564,6 +566,7 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); let key = slru_segment_size_to_key(kind, segno); let mut buf = version.get(self, key, ctx).await?; Ok(buf.get_u32_le()) @@ -577,6 +580,7 @@ impl Timeline { version: Version<'_>, ctx: &RequestContext, ) -> Result { + assert!(self.tenant_shard_id.is_shard_zero()); // fetch directory listing let key = slru_dir_to_key(kind); let buf = version.get(self, key, ctx).await?; @@ -1047,26 +1051,28 @@ impl Timeline { } // Iterate SLRUs next - for kind in [ - SlruKind::Clog, - SlruKind::MultiXactMembers, - SlruKind::MultiXactOffsets, - ] { - let slrudir_key = slru_dir_to_key(kind); - result.add_key(slrudir_key); - let buf = self.get(slrudir_key, lsn, ctx).await?; - let dir = SlruSegmentDirectory::des(&buf)?; - let mut segments: Vec = dir.segments.iter().cloned().collect(); - segments.sort_unstable(); - for segno in segments { - let segsize_key = slru_segment_size_to_key(kind, segno); - let mut buf = self.get(segsize_key, lsn, ctx).await?; - let segsize = buf.get_u32_le(); + if self.tenant_shard_id.is_shard_zero() { + for kind in [ + SlruKind::Clog, + SlruKind::MultiXactMembers, + SlruKind::MultiXactOffsets, + ] { + let slrudir_key = slru_dir_to_key(kind); + result.add_key(slrudir_key); + let buf = self.get(slrudir_key, lsn, ctx).await?; + let dir = SlruSegmentDirectory::des(&buf)?; + let mut segments: Vec = dir.segments.iter().cloned().collect(); + segments.sort_unstable(); + for segno in segments { + let segsize_key = slru_segment_size_to_key(kind, segno); + let mut buf = self.get(segsize_key, lsn, ctx).await?; + let segsize = buf.get_u32_le(); - result.add_range( - slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize), - ); - result.add_key(segsize_key); + result.add_range( + slru_block_to_key(kind, segno, 0)..slru_block_to_key(kind, segno, segsize), + ); + result.add_key(segsize_key); + } } } @@ -1468,6 +1474,10 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, rec: NeonWalRecord, ) -> anyhow::Result<()> { + if !self.tline.tenant_shard_id.is_shard_zero() { + return Ok(()); + } + self.put( slru_block_to_key(kind, segno, blknum), Value::WalRecord(rec), @@ -1501,6 +1511,8 @@ impl<'a> DatadirModification<'a> { blknum: BlockNumber, img: Bytes, ) -> anyhow::Result<()> { + assert!(self.tline.tenant_shard_id.is_shard_zero()); + let key = slru_block_to_key(kind, segno, blknum); if !key.is_valid_key_on_write_path() { anyhow::bail!( @@ -1542,6 +1554,7 @@ impl<'a> DatadirModification<'a> { segno: u32, blknum: BlockNumber, ) -> anyhow::Result<()> { + assert!(self.tline.tenant_shard_id.is_shard_zero()); let key = slru_block_to_key(kind, segno, blknum); if !key.is_valid_key_on_write_path() { anyhow::bail!( @@ -1853,6 +1866,8 @@ impl<'a> DatadirModification<'a> { nblocks: BlockNumber, ctx: &RequestContext, ) -> anyhow::Result<()> { + assert!(self.tline.tenant_shard_id.is_shard_zero()); + // Add it to the directory entry let dir_key = slru_dir_to_key(kind); let buf = self.get(dir_key, ctx).await?; @@ -1885,6 +1900,8 @@ impl<'a> DatadirModification<'a> { segno: u32, nblocks: BlockNumber, ) -> anyhow::Result<()> { + assert!(self.tline.tenant_shard_id.is_shard_zero()); + // Put size let size_key = slru_segment_size_to_key(kind, segno); let buf = nblocks.to_le_bytes(); diff --git a/pageserver/src/tenant/timeline/import_pgdata/flow.rs b/pageserver/src/tenant/timeline/import_pgdata/flow.rs index cbd4168c06..4388072606 100644 --- a/pageserver/src/tenant/timeline/import_pgdata/flow.rs +++ b/pageserver/src/tenant/timeline/import_pgdata/flow.rs @@ -129,22 +129,23 @@ impl Flow { } // Import SLRUs - - // pg_xact (01:00 keyspace) - self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact")) + if self.timeline.tenant_shard_id.is_shard_zero() { + // pg_xact (01:00 keyspace) + self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact")) + .await?; + // pg_multixact/members (01:01 keyspace) + self.import_slru( + SlruKind::MultiXactMembers, + &self.storage.pgdata().join("pg_multixact/members"), + ) .await?; - // pg_multixact/members (01:01 keyspace) - self.import_slru( - SlruKind::MultiXactMembers, - &self.storage.pgdata().join("pg_multixact/members"), - ) - .await?; - // pg_multixact/offsets (01:02 keyspace) - self.import_slru( - SlruKind::MultiXactOffsets, - &self.storage.pgdata().join("pg_multixact/offsets"), - ) - .await?; + // pg_multixact/offsets (01:02 keyspace) + self.import_slru( + SlruKind::MultiXactOffsets, + &self.storage.pgdata().join("pg_multixact/offsets"), + ) + .await?; + } // Import pg_twophase. // TODO: as empty @@ -302,6 +303,8 @@ impl Flow { } async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> { + assert!(self.timeline.tenant_shard_id.is_shard_zero()); + let segments = self.storage.listfilesindir(path).await?; let segments: Vec<(String, u32, usize)> = segments .into_iter() @@ -337,7 +340,6 @@ impl Flow { debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment"); self.tasks .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new( - *self.timeline.get_shard_identity(), start_key..end_key, &p, self.storage.clone(), @@ -631,21 +633,14 @@ impl ImportTask for ImportRelBlocksTask { } struct ImportSlruBlocksTask { - shard_identity: ShardIdentity, key_range: Range, path: RemotePath, storage: RemoteStorageWrapper, } impl ImportSlruBlocksTask { - fn new( - shard_identity: ShardIdentity, - key_range: Range, - path: &RemotePath, - storage: RemoteStorageWrapper, - ) -> Self { + fn new(key_range: Range, path: &RemotePath, storage: RemoteStorageWrapper) -> Self { ImportSlruBlocksTask { - shard_identity, key_range, path: path.clone(), storage, @@ -673,17 +668,13 @@ impl ImportTask for ImportSlruBlocksTask { let mut file_offset = 0; while blknum < end_blk { let key = slru_block_to_key(kind, segno, blknum); - assert!( - !self.shard_identity.is_key_disposable(&key), - "SLRU keys need to go into every shard" - ); let buf = &buf[file_offset..(file_offset + 8192)]; file_offset += 8192; layer_writer .put_image(key, Bytes::copy_from_slice(buf), ctx) .await?; - blknum += 1; nimages += 1; + blknum += 1; } Ok(nimages) } diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index d568da596a..93ae88936f 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -1392,6 +1392,10 @@ impl WalIngest { img: Bytes, ctx: &RequestContext, ) -> Result<()> { + if !self.shard.is_shard_zero() { + return Ok(()); + } + self.handle_slru_extend(modification, kind, segno, blknum, ctx) .await?; modification.put_slru_page_image(kind, segno, blknum, img)?;