diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 1895f25bfc..36c4b19266 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -4,6 +4,7 @@ use crate::models::*; use crate::serialized_batch::SerializedValueBatch; use bytes::{Buf, Bytes}; +use pageserver_api::key::rel_block_to_key; use pageserver_api::reltag::{RelTag, SlruKind}; use pageserver_api::shard::ShardIdentity; use postgres_ffi::pg_constants; @@ -32,7 +33,8 @@ impl InterpretedWalRecord { FlushUncommittedRecords::No }; - let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?; + let metadata_record = + MetadataRecord::from_decoded_filtered(&decoded, shard, next_record_lsn, pg_version)?; let batch = SerializedValueBatch::from_decoded_filtered( decoded, shard, @@ -51,8 +53,13 @@ impl InterpretedWalRecord { } impl MetadataRecord { - fn from_decoded( + /// Builds a metadata record for this WAL record, if any. + /// + /// Only metadata records relevant for the given shard are emitted. Currently, most metadata + /// records are broadcast to all shards for simplicity, but this should be improved. + fn from_decoded_filtered( decoded: &DecodedWALRecord, + shard: &ShardIdentity, next_record_lsn: Lsn, pg_version: u32, ) -> anyhow::Result> { @@ -61,26 +68,27 @@ impl MetadataRecord { let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); - match decoded.xl_rmid { + // First, generate metadata records from the decoded WAL record. + let mut metadata_record = match decoded.xl_rmid { pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { - Self::decode_heapam_record(&mut buf, decoded, pg_version) + Self::decode_heapam_record(&mut buf, decoded, pg_version)? } - pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version), + pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version)?, // Handle other special record types - pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded), - pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version), + pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded)?, + pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version)?, pg_constants::RM_TBLSPC_ID => { tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); - Ok(None) + None } - pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version), + pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version)?, pg_constants::RM_XACT_ID => { - Self::decode_xact_record(&mut buf, decoded, next_record_lsn) + Self::decode_xact_record(&mut buf, decoded, next_record_lsn)? } pg_constants::RM_MULTIXACT_ID => { - Self::decode_multixact_record(&mut buf, decoded, pg_version) + Self::decode_multixact_record(&mut buf, decoded, pg_version)? } - pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded), + pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded)?, // This is an odd duck. It needs to go to all shards. // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY // in WalIngest::new), we have to send the whole DecodedWalRecord::record to @@ -89,19 +97,48 @@ impl MetadataRecord { // Alternatively, one can make the checkpoint part of the subscription protocol // to the pageserver. This should work fine, but can be done at a later point. pg_constants::RM_XLOG_ID => { - Self::decode_xlog_record(&mut buf, decoded, next_record_lsn) + Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)? } pg_constants::RM_LOGICALMSG_ID => { - Self::decode_logical_message_record(&mut buf, decoded) + Self::decode_logical_message_record(&mut buf, decoded)? } - pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded), - pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded), + pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded)?, + pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded)?, _unexpected => { // TODO: consider failing here instead of blindly doing something without // understanding the protocol - Ok(None) + None + } + }; + + // Next, filter the metadata record by shard. + + // Route VM page updates to the shards that own them. VM pages are stored in the VM fork + // of the main relation. These are sharded and managed just like regular relation pages. + // See: https://github.com/neondatabase/neon/issues/9855 + if let Some( + MetadataRecord::Heapam(HeapamRecord::ClearVmBits(ref mut clear_vm_bits)) + | MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits(ref mut clear_vm_bits)), + ) = metadata_record + { + let is_local_vm_page = |heap_blk| { + let vm_blk = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blk); + shard.is_key_local(&rel_block_to_key(clear_vm_bits.vm_rel, vm_blk)) + }; + // Send the old and new VM page updates to their respective shards. + clear_vm_bits.old_heap_blkno = clear_vm_bits + .old_heap_blkno + .filter(|&blkno| is_local_vm_page(blkno)); + clear_vm_bits.new_heap_blkno = clear_vm_bits + .new_heap_blkno + .filter(|&blkno| is_local_vm_page(blkno)); + // If neither VM page belongs to this shard, discard the record. + if clear_vm_bits.old_heap_blkno.is_none() && clear_vm_bits.new_heap_blkno.is_none() { + metadata_record = None } } + + Ok(metadata_record) } fn decode_heapam_record(