diff --git a/libs/wal_decoder/src/decoder.rs b/libs/wal_decoder/src/decoder.rs index 8b13789179..780fce3d69 100644 --- a/libs/wal_decoder/src/decoder.rs +++ b/libs/wal_decoder/src/decoder.rs @@ -1 +1,970 @@ +//! This module contains logic for decoding and interpreting +//! raw bytes which represent a raw Postgres WAL record. +use crate::models::*; +use bytes::{Buf, Bytes, BytesMut}; +use pageserver_api::key::rel_block_to_key; +use pageserver_api::record::NeonWalRecord; +use pageserver_api::reltag::{RelTag, SlruKind}; +use pageserver_api::shard::ShardIdentity; +use pageserver_api::value::Value; +use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; +use postgres_ffi::walrecord::*; +use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ}; +use utils::lsn::Lsn; + +impl InterpretedWalRecord { + /// Decode and interpreted raw bytes which represent one Postgres WAL record. + /// Data blocks which do not match the provided shard identity are filtered out. + /// Shard 0 is a special case since it tracks all relation sizes. We only give it + /// the keys that are being written as that is enough for updating relation sizes. + pub fn from_bytes_filtered( + buf: Bytes, + shard: &ShardIdentity, + lsn: Lsn, + pg_version: u32, + ) -> anyhow::Result { + let mut decoded = DecodedWALRecord::default(); + decode_wal_record(buf, &mut decoded, pg_version)?; + + let flush_uncommitted = if decoded.is_dbase_create_copy(pg_version) { + FlushUncommittedRecords::Yes + } else { + FlushUncommittedRecords::No + }; + + let metadata_record = MetadataRecord::from_decoded(&decoded, lsn, pg_version)?; + + let mut blocks = Vec::default(); + for blk in decoded.blocks.iter() { + let rel = RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum, + }; + + let key = rel_block_to_key(rel, blk.blkno); + + if !key.is_valid_key_on_write_path() { + anyhow::bail!("Unsupported key decoded at LSN {}: {}", lsn, key); + } + + let key_is_local = shard.is_key_local(&key); + + tracing::debug!( + lsn=%lsn, + key=%key, + "ingest: shard decision {}", + if !key_is_local { "drop" } else { "keep" }, + ); + + if !key_is_local { + if shard.is_shard_zero() { + // Shard 0 tracks relation sizes. Although we will not store this block, we will observe + // its blkno in case it implicitly extends a relation. + blocks.push((key.to_compact(), None)); + } + + continue; + } + + // Instead of storing full-page-image WAL record, + // it is better to store extracted image: we can skip wal-redo + // in this case. Also some FPI records may contain multiple (up to 32) pages, + // so them have to be copied multiple times. + // + let value = if blk.apply_image + && blk.has_image + && decoded.xl_rmid == pg_constants::RM_XLOG_ID + && (decoded.xl_info == pg_constants::XLOG_FPI + || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) + // compression of WAL is not yet supported: fall back to storing the original WAL record + && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version) + // do not materialize null pages because them most likely be soon replaced with real data + && blk.bimg_len != 0 + { + // Extract page image from FPI record + let img_len = blk.bimg_len as usize; + let img_offs = blk.bimg_offset as usize; + let mut image = BytesMut::with_capacity(BLCKSZ as usize); + // TODO(vlad): skip the copy + image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); + + if blk.hole_length != 0 { + let tail = image.split_off(blk.hole_offset as usize); + image.resize(image.len() + blk.hole_length as usize, 0u8); + image.unsplit(tail); + } + // + // Match the logic of XLogReadBufferForRedoExtended: + // The page may be uninitialized. If so, we can't set the LSN because + // that would corrupt the page. + // + if !page_is_new(&image) { + page_set_lsn(&mut image, lsn) + } + assert_eq!(image.len(), BLCKSZ as usize); + + Value::Image(image.freeze()) + } else { + Value::WalRecord(NeonWalRecord::Postgres { + will_init: blk.will_init || blk.apply_image, + rec: decoded.record.clone(), + }) + }; + + blocks.push((key.to_compact(), Some(value))); + } + + Ok(InterpretedWalRecord { + metadata_record, + blocks, + lsn, + flush_uncommitted, + xid: decoded.xl_xid, + }) + } +} + +impl MetadataRecord { + fn from_decoded( + decoded: &DecodedWALRecord, + lsn: Lsn, + pg_version: u32, + ) -> anyhow::Result> { + // Note: this doesn't actually copy the bytes since + // the [`Bytes`] type implements it via a level of indirection. + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + + match decoded.xl_rmid { + pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { + Self::decode_heapam_record(&mut buf, decoded, pg_version) + } + pg_constants::RM_NEON_ID => Self::decode_neonmgr_record(&mut buf, decoded, pg_version), + // Handle other special record types + pg_constants::RM_SMGR_ID => Self::decode_smgr_record(&mut buf, decoded), + pg_constants::RM_DBASE_ID => Self::decode_dbase_record(&mut buf, decoded, pg_version), + pg_constants::RM_TBLSPC_ID => { + tracing::trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + Ok(None) + } + pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version), + pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, lsn), + pg_constants::RM_MULTIXACT_ID => { + Self::decode_multixact_record(&mut buf, decoded, pg_version) + } + pg_constants::RM_RELMAP_ID => Self::decode_relmap_record(&mut buf, decoded), + // This is an odd duck. It needs to go to all shards. + // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY + // in WalIngest::new), we have to send the whole DecodedWalRecord::record to + // the pageserver and decode it there. + // + // Alternatively, one can make the checkpoint part of the subscription protocol + // to the pageserver. This should work fine, but can be done at a later point. + pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, lsn), + pg_constants::RM_LOGICALMSG_ID => { + Self::decode_logical_message_record(&mut buf, decoded) + } + pg_constants::RM_STANDBY_ID => Self::decode_standby_record(&mut buf, decoded), + pg_constants::RM_REPLORIGIN_ID => Self::decode_replorigin_record(&mut buf, decoded), + _unexpected => { + // TODO: consider failing here instead of blindly doing something without + // understanding the protocol + Ok(None) + } + } + } + + fn decode_heapam_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + // Handle VM bit updates that are implicitly part of heap records. + + // First, look at the record to determine which VM bits need + // to be cleared. If either of these variables is set, we + // need to clear the corresponding bits in the visibility map. + let mut new_heap_blkno: Option = None; + let mut old_heap_blkno: Option = None; + let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; + + match pg_version { + 14 => { + if decoded.xl_rmid == pg_constants::RM_HEAP_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = v14::XlHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = v14::XlHeapDelete::decode(buf); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = v14::XlHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); + } + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a + // non-HOT update where the new tuple goes to different page than + // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is + // set. + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v14::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = v14::XlHeapMultiInsert::decode(buf); + + let offset_array_len = + if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v14::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else { + anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); + } + } + 15 => { + if decoded.xl_rmid == pg_constants::RM_HEAP_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = v15::XlHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = v15::XlHeapDelete::decode(buf); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = v15::XlHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); + } + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a + // non-HOT update where the new tuple goes to different page than + // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is + // set. + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v15::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = v15::XlHeapMultiInsert::decode(buf); + + let offset_array_len = + if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v15::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else { + anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); + } + } + 16 => { + if decoded.xl_rmid == pg_constants::RM_HEAP_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = v16::XlHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = v16::XlHeapDelete::decode(buf); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = v16::XlHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); + } + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a + // non-HOT update where the new tuple goes to different page than + // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is + // set. + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v16::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = v16::XlHeapMultiInsert::decode(buf); + + let offset_array_len = + if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v16::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else { + anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); + } + } + 17 => { + if decoded.xl_rmid == pg_constants::RM_HEAP_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + + if info == pg_constants::XLOG_HEAP_INSERT { + let xlrec = v17::XlHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_DELETE { + let xlrec = v17::XlHeapDelete::decode(buf); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_UPDATE + || info == pg_constants::XLOG_HEAP_HOT_UPDATE + { + let xlrec = v17::XlHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); + } + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a + // non-HOT update where the new tuple goes to different page than + // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is + // set. + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP_LOCK { + let xlrec = v17::XlHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { + let xlrec = v17::XlHeapMultiInsert::decode(buf); + + let offset_array_len = + if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { + let xlrec = v17::XlHeapLockUpdated::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + } else { + anyhow::bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); + } + } + _ => {} + } + + if new_heap_blkno.is_some() || old_heap_blkno.is_some() { + let vm_rel = RelTag { + forknum: VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }; + + Ok(Some(MetadataRecord::Heapam(HeapamRecord::ClearVmBits( + ClearVmBits { + new_heap_blkno, + old_heap_blkno, + vm_rel, + flags, + }, + )))) + } else { + Ok(None) + } + } + + fn decode_neonmgr_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + // Handle VM bit updates that are implicitly part of heap records. + + // First, look at the record to determine which VM bits need + // to be cleared. If either of these variables is set, we + // need to clear the corresponding bits in the visibility map. + let mut new_heap_blkno: Option = None; + let mut old_heap_blkno: Option = None; + let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; + + assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); + + match pg_version { + 16 | 17 => { + let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; + + match info { + pg_constants::XLOG_NEON_HEAP_INSERT => { + let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf); + assert_eq!(0, buf.remaining()); + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } + pg_constants::XLOG_NEON_HEAP_DELETE => { + let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf); + if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } + pg_constants::XLOG_NEON_HEAP_UPDATE + | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => { + let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf); + // the size of tuple data is inferred from the size of the record. + // we can't validate the remaining number of bytes without parsing + // the tuple data. + if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); + } + if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { + // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a + // non-HOT update where the new tuple goes to different page than + // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is + // set. + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } + pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => { + let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf); + + let offset_array_len = + if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { + // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set + 0 + } else { + size_of::() * xlrec.ntuples as usize + }; + assert_eq!(offset_array_len, buf.remaining()); + + if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { + new_heap_blkno = Some(decoded.blocks[0].blkno); + } + } + pg_constants::XLOG_NEON_HEAP_LOCK => { + let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf); + if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { + old_heap_blkno = Some(decoded.blocks[0].blkno); + flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; + } + } + info => anyhow::bail!("Unknown WAL record type for Neon RMGR: {}", info), + } + } + _ => anyhow::bail!( + "Neon RMGR has no known compatibility with PostgreSQL version {}", + pg_version + ), + } + + if new_heap_blkno.is_some() || old_heap_blkno.is_some() { + let vm_rel = RelTag { + forknum: VISIBILITYMAP_FORKNUM, + spcnode: decoded.blocks[0].rnode_spcnode, + dbnode: decoded.blocks[0].rnode_dbnode, + relnode: decoded.blocks[0].rnode_relnode, + }; + + Ok(Some(MetadataRecord::Neonrmgr(NeonrmgrRecord::ClearVmBits( + ClearVmBits { + new_heap_blkno, + old_heap_blkno, + vm_rel, + flags, + }, + )))) + } else { + Ok(None) + } + } + + fn decode_smgr_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_SMGR_CREATE { + let create = XlSmgrCreate::decode(buf); + let rel = RelTag { + spcnode: create.rnode.spcnode, + dbnode: create.rnode.dbnode, + relnode: create.rnode.relnode, + forknum: create.forknum, + }; + + return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Create(SmgrCreate { + rel, + })))); + } else if info == pg_constants::XLOG_SMGR_TRUNCATE { + let truncate = XlSmgrTruncate::decode(buf); + return Ok(Some(MetadataRecord::Smgr(SmgrRecord::Truncate(truncate)))); + } + + Ok(None) + } + + fn decode_dbase_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + // TODO: Refactor this to avoid the duplication between postgres versions. + + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + tracing::debug!(%info, %pg_version, "handle RM_DBASE_ID"); + + if pg_version == 14 { + if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { + let createdb = XlCreateDatabase::decode(buf); + tracing::debug!("XLOG_DBASE_CREATE v14"); + + let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + })); + + return Ok(Some(record)); + } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + + let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + })); + + return Ok(Some(record)); + } + } else if pg_version == 15 { + if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { + tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + })); + + return Ok(Some(record)); + } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + })); + + return Ok(Some(record)); + } + } else if pg_version == 16 { + if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { + tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + })); + + return Ok(Some(record)); + } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + })); + + return Ok(Some(record)); + } + } else if pg_version == 17 { + if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { + tracing::debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + tracing::debug!("XLOG_DBASE_CREATE_FILE_COPY"); + + let createdb = XlCreateDatabase::decode(buf); + let record = MetadataRecord::Dbase(DbaseRecord::Create(DbaseCreate { + db_id: createdb.db_id, + tablespace_id: createdb.tablespace_id, + src_db_id: createdb.src_db_id, + src_tablespace_id: createdb.src_tablespace_id, + })); + + return Ok(Some(record)); + } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { + let dropdb = XlDropDatabase::decode(buf); + let record = MetadataRecord::Dbase(DbaseRecord::Drop(DbaseDrop { + db_id: dropdb.db_id, + tablespace_ids: dropdb.tablespace_ids, + })); + + return Ok(Some(record)); + } + } + + Ok(None) + } + + fn decode_clog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + + if info == pg_constants::CLOG_ZEROPAGE { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + Ok(Some(MetadataRecord::Clog(ClogRecord::ZeroPage( + ClogZeroPage { segno, rpageno }, + )))) + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + let xlrec = XlClogTruncate::decode(buf, pg_version); + + Ok(Some(MetadataRecord::Clog(ClogRecord::Truncate( + ClogTruncate { + pageno: xlrec.pageno, + oldest_xid: xlrec.oldest_xid, + oldest_xid_db: xlrec.oldest_xid_db, + }, + )))) + } + } + + fn decode_xact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; + let origin_id = decoded.origin_id; + let xl_xid = decoded.xl_xid; + + if info == pg_constants::XLOG_XACT_COMMIT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(MetadataRecord::Xact(XactRecord::Commit(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + })))); + } else if info == pg_constants::XLOG_XACT_ABORT { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(MetadataRecord::Xact(XactRecord::Abort(XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + })))); + } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(MetadataRecord::Xact(XactRecord::CommitPrepared( + XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }, + )))); + } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED { + let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); + return Ok(Some(MetadataRecord::Xact(XactRecord::AbortPrepared( + XactCommon { + parsed, + origin_id, + xl_xid, + lsn, + }, + )))); + } else if info == pg_constants::XLOG_XACT_PREPARE { + return Ok(Some(MetadataRecord::Xact(XactRecord::Prepare( + XactPrepare { + xl_xid: decoded.xl_xid, + data: Bytes::copy_from_slice(&buf[..]), + }, + )))); + } + + Ok(None) + } + + fn decode_multixact_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + pg_version: u32, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + let pageno = if pg_version < 17 { + buf.get_u32_le() + } else { + buf.get_u64_le() as u32 + }; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + let slru_kind = match info { + pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets, + pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers, + _ => unreachable!(), + }; + + return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::ZeroPage( + MultiXactZeroPage { + slru_kind, + segno, + rpageno, + }, + )))); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(buf); + return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Create( + xlrec, + )))); + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(buf); + return Ok(Some(MetadataRecord::MultiXact(MultiXactRecord::Truncate( + xlrec, + )))); + } + + Ok(None) + } + + fn decode_relmap_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + ) -> anyhow::Result> { + let update = XlRelmapUpdate::decode(buf); + + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + // skip xl_relmap_update + buf.advance(12); + + Ok(Some(MetadataRecord::Relmap(RelmapRecord::Update( + RelmapUpdate { + update, + buf: Bytes::copy_from_slice(&buf[..]), + }, + )))) + } + + fn decode_xlog_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + lsn: Lsn, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + Ok(Some(MetadataRecord::Xlog(XlogRecord::Raw(RawXlogRecord { + info, + lsn, + buf: buf.clone(), + })))) + } + + fn decode_logical_message_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_LOGICAL_MESSAGE { + let xlrec = XlLogicalMessage::decode(buf); + let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; + + #[cfg(feature = "testing")] + if prefix == "neon-test" { + return Ok(Some(MetadataRecord::LogicalMessage( + LogicalMessageRecord::Failpoint, + ))); + } + + if let Some(path) = prefix.strip_prefix("neon-file:") { + let buf_size = xlrec.prefix_size + xlrec.message_size; + let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); + return Ok(Some(MetadataRecord::LogicalMessage( + LogicalMessageRecord::Put(PutLogicalMessage { + path: path.to_string(), + buf, + }), + ))); + } + } + + Ok(None) + } + + fn decode_standby_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_RUNNING_XACTS { + let xlrec = XlRunningXacts::decode(buf); + return Ok(Some(MetadataRecord::Standby(StandbyRecord::RunningXacts( + StandbyRunningXacts { + oldest_running_xid: xlrec.oldest_running_xid, + }, + )))); + } + + Ok(None) + } + + fn decode_replorigin_record( + buf: &mut Bytes, + decoded: &DecodedWALRecord, + ) -> anyhow::Result> { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_REPLORIGIN_SET { + let xlrec = XlReploriginSet::decode(buf); + return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Set( + xlrec, + )))); + } else if info == pg_constants::XLOG_REPLORIGIN_DROP { + let xlrec = XlReploriginDrop::decode(buf); + return Ok(Some(MetadataRecord::Replorigin(ReploriginRecord::Drop( + xlrec, + )))); + } + + Ok(None) + } +} diff --git a/libs/wal_decoder/src/models.rs b/libs/wal_decoder/src/models.rs index 58f8e1b2da..92b66fcefd 100644 --- a/libs/wal_decoder/src/models.rs +++ b/libs/wal_decoder/src/models.rs @@ -25,7 +25,9 @@ //! |--> write to KV store within the pageserver use bytes::Bytes; +use pageserver_api::key::CompactKey; use pageserver_api::reltag::{RelTag, SlruKind}; +use pageserver_api::value::Value; use postgres_ffi::walrecord::{ XlMultiXactCreate, XlMultiXactTruncate, XlRelmapUpdate, XlReploriginDrop, XlReploriginSet, XlSmgrTruncate, XlXactParsedRecord, @@ -33,6 +35,48 @@ use postgres_ffi::walrecord::{ use postgres_ffi::{Oid, TransactionId}; use utils::lsn::Lsn; +pub enum FlushUncommittedRecords { + Yes, + No, +} + +/// An interpreted Postgres WAL record, ready to be handled by the pageserver +pub struct InterpretedWalRecord { + /// Optional metadata record - may cause writes to metadata keys + /// in the storage engine + pub metadata_record: Option, + /// Images or deltas for blocks modified in the original WAL record. + /// The [`Value`] is optional to avoid sending superfluous data to + /// shard 0 for relation size tracking. + pub blocks: Vec<(CompactKey, Option)>, + /// Byte offset within WAL for the end of the original PG WAL record + pub lsn: Lsn, + /// Whether to flush all uncommitted modifications to the storage engine + /// before ingesting this record. This is currently only used for legacy PG + /// database creations which read pages from a template database. Such WAL + /// records require reading data blocks while ingesting, hence the need to flush. + pub flush_uncommitted: FlushUncommittedRecords, + /// Transaction id of the original PG WAL record + pub xid: TransactionId, +} + +/// The interpreted part of the Postgres WAL record which requires metadata +/// writes to the underlying storage engine. +pub enum MetadataRecord { + Heapam(HeapamRecord), + Neonrmgr(NeonrmgrRecord), + Smgr(SmgrRecord), + Dbase(DbaseRecord), + Clog(ClogRecord), + Xact(XactRecord), + MultiXact(MultiXactRecord), + Relmap(RelmapRecord), + Xlog(XlogRecord), + LogicalMessage(LogicalMessageRecord), + Standby(StandbyRecord), + Replorigin(ReploriginRecord), +} + pub enum HeapamRecord { ClearVmBits(ClearVmBits), } diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 530c91c4da..06c4553e1c 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -12,6 +12,7 @@ use pageserver_api::key::rel_block_to_key; use tokio::io::{AsyncRead, AsyncReadExt}; use tokio_tar::Archive; use tracing::*; +use wal_decoder::models::InterpretedWalRecord; use walkdir::WalkDir; use crate::context::RequestContext; @@ -23,7 +24,6 @@ use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::waldecoder::WalStreamDecoder; -use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord}; use postgres_ffi::ControlFileData; use postgres_ffi::DBState_DB_SHUTDOWNED; use postgres_ffi::Oid; @@ -312,11 +312,15 @@ async fn import_wal( let mut modification = tline.begin_modification(last_lsn); while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let mut decoded = DecodedWALRecord::default(); - decode_wal_record(recdata, &mut decoded, tline.pg_version)?; + let interpreted = InterpretedWalRecord::from_bytes_filtered( + recdata, + tline.get_shard_identity(), + lsn, + tline.pg_version, + )?; walingest - .ingest_record(decoded, lsn, &mut modification, ctx) + .ingest_record(interpreted, &mut modification, ctx) .await?; WAL_INGEST.records_committed.inc(); @@ -453,10 +457,15 @@ pub async fn import_wal_from_tar( let mut modification = tline.begin_modification(last_lsn); while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let mut decoded = DecodedWALRecord::default(); - decode_wal_record(recdata, &mut decoded, tline.pg_version)?; + let interpreted = InterpretedWalRecord::from_bytes_filtered( + recdata, + tline.get_shard_identity(), + lsn, + tline.pg_version, + )?; + walingest - .ingest_record(decoded, lsn, &mut modification, ctx) + .ingest_record(interpreted, &mut modification, ctx) .await?; modification.commit(ctx).await?; last_lsn = lsn; diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 739fadbc6b..eb19fb691f 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -22,6 +22,7 @@ use tokio::{select, sync::watch, time}; use tokio_postgres::{replication::ReplicationStream, Client}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, trace, warn, Instrument}; +use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord}; use super::TaskStateUpdate; use crate::{ @@ -35,7 +36,6 @@ use crate::{ use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; -use postgres_ffi::walrecord::{decode_wal_record, DecodedWALRecord}; use utils::{id::NodeId, lsn::Lsn}; use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError}; @@ -339,11 +339,15 @@ pub(super) async fn handle_walreceiver_connection( return Err(WalReceiverError::Other(anyhow!("LSN not aligned"))); } - // Deserialize WAL record - let mut decoded = DecodedWALRecord::default(); - decode_wal_record(recdata, &mut decoded, modification.tline.pg_version)?; + // Deserialize and interpret WAL record + let interpreted = InterpretedWalRecord::from_bytes_filtered( + recdata, + modification.tline.get_shard_identity(), + lsn, + modification.tline.pg_version, + )?; - if decoded.is_dbase_create_copy(timeline.pg_version) + if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) && uncommitted_records > 0 { // Special case: legacy PG database creations operate by reading pages from a 'template' database: @@ -360,7 +364,7 @@ pub(super) async fn handle_walreceiver_connection( // Ingest the records without immediately committing them. let ingested = walingest - .ingest_record(decoded, lsn, &mut modification, &ctx) + .ingest_record(interpreted, &mut modification, &ctx) .await .with_context(|| format!("could not ingest record at {lsn}"))?; if !ingested { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 27b3f93845..84353970b7 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -3,17 +3,17 @@ //! //! The pipeline for ingesting WAL looks like this: //! -//! WAL receiver -> WalIngest -> Repository +//! WAL receiver -> [`wal_decoder`] -> WalIngest -> Repository //! -//! The WAL receiver receives a stream of WAL from the WAL safekeepers, -//! and decodes it to individual WAL records. It feeds the WAL records -//! to WalIngest, which parses them and stores them in the Repository. +//! The WAL receiver receives a stream of WAL from the WAL safekeepers. +//! Records get decoded and interpreted in the [`wal_decoder`] module +//! and then stored to the Repository by WalIngest. //! //! The neon Repository can store page versions in two formats: as -//! page images, or a WAL records. WalIngest::ingest_record() extracts -//! page images out of some WAL records, but most it stores as WAL +//! page images, or a WAL records. [`wal_decoder::models::InterpretedWalRecord::from_bytes_filtered`] +//! extracts page images out of some WAL records, but mostly it's WAL //! records. If a WAL record modifies multiple pages, WalIngest -//! will call Repository::put_wal_record or put_page_image functions +//! will call Repository::put_rel_wal_record or put_rel_page_image functions //! separately for each modified page. //! //! To reconstruct a page using a WAL record, the Repository calls the @@ -28,14 +28,15 @@ use std::time::Duration; use std::time::Instant; use std::time::SystemTime; +use pageserver_api::key::Key; use pageserver_api::shard::ShardIdentity; +use postgres_ffi::fsm_logical_to_physical; use postgres_ffi::walrecord::*; use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz}; -use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; use wal_decoder::models::*; use anyhow::{bail, Context, Result}; -use bytes::{Buf, Bytes, BytesMut}; +use bytes::{Buf, Bytes}; use tracing::*; use utils::failpoint_support; use utils::rate_limit::RateLimit; @@ -50,10 +51,10 @@ use crate::ZERO_PAGE; use pageserver_api::key::rel_block_to_key; use pageserver_api::record::NeonWalRecord; use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind}; +use pageserver_api::value::Value; use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::TransactionId; -use postgres_ffi::BLCKSZ; use utils::bin_ser::SerializeError; use utils::lsn::Lsn; @@ -140,257 +141,161 @@ impl WalIngest { }) } - /// - /// Decode a PostgreSQL WAL record and store it in the repository, in the given timeline. + /// Ingest an interpreted PostgreSQL WAL record by doing writes to the underlying key value + /// storage of a given timeline. /// /// This function updates `lsn` field of `DatadirModification` /// - /// Helper function to parse a WAL record and call the Timeline's PUT functions for all the - /// relations/pages that the record affects. - /// /// This function returns `true` if the record was ingested, and `false` if it was filtered out pub async fn ingest_record( &mut self, - decoded: DecodedWALRecord, - lsn: Lsn, + interpreted: InterpretedWalRecord, modification: &mut DatadirModification<'_>, ctx: &RequestContext, ) -> anyhow::Result { WAL_INGEST.records_received.inc(); - let pg_version = modification.tline.pg_version; let prev_len = modification.len(); - modification.set_lsn(lsn)?; + modification.set_lsn(interpreted.lsn)?; - if decoded.is_dbase_create_copy(pg_version) { + if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) { // Records of this type should always be preceded by a commit(), as they // rely on reading data pages back from the Timeline. assert!(!modification.has_dirty_data_pages()); } - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - assert!(!self.checkpoint_modified); - if decoded.xl_xid != pg_constants::INVALID_TRANSACTION_ID - && self.checkpoint.update_next_xid(decoded.xl_xid) + if interpreted.xid != pg_constants::INVALID_TRANSACTION_ID + && self.checkpoint.update_next_xid(interpreted.xid) { self.checkpoint_modified = true; } failpoint_support::sleep_millis_async!("wal-ingest-record-sleep"); - match decoded.xl_rmid { - pg_constants::RM_HEAP_ID | pg_constants::RM_HEAP2_ID => { - // Heap AM records need some special handling, because they modify VM pages - // without registering them with the standard mechanism. - let maybe_heapam_record = - Self::decode_heapam_record(&mut buf, &decoded, pg_version)?; - if let Some(heapam_record) = maybe_heapam_record { - match heapam_record { - HeapamRecord::ClearVmBits(clear_vm_bits) => { - self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) - .await?; - } - } - } - } - pg_constants::RM_NEON_ID => { - let maybe_nenonrmgr_record = - Self::decode_neonmgr_record(&mut buf, &decoded, pg_version)?; - if let Some(neonrmgr_record) = maybe_nenonrmgr_record { - match neonrmgr_record { - NeonrmgrRecord::ClearVmBits(clear_vm_bits) => { - self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) - .await?; - } - } - } - } - // Handle other special record types - pg_constants::RM_SMGR_ID => { - let maybe_smgr_record = - Self::decode_smgr_record(&mut buf, &decoded, pg_version).unwrap(); - if let Some(smgr_record) = maybe_smgr_record { - match smgr_record { - SmgrRecord::Create(create) => { - self.ingest_xlog_smgr_create(create, modification, ctx) - .await?; - } - SmgrRecord::Truncate(truncate) => { - self.ingest_xlog_smgr_truncate(truncate, modification, ctx) - .await?; - } - } - } - } - pg_constants::RM_DBASE_ID => { - let maybe_dbase_record = - Self::decode_dbase_record(&mut buf, &decoded, pg_version).unwrap(); - - if let Some(dbase_record) = maybe_dbase_record { - match dbase_record { - DbaseRecord::Create(create) => { - self.ingest_xlog_dbase_create(create, modification, ctx) - .await?; - } - DbaseRecord::Drop(drop) => { - self.ingest_xlog_dbase_drop(drop, modification, ctx).await?; - } - } - } - } - pg_constants::RM_TBLSPC_ID => { - trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); - } - pg_constants::RM_CLOG_ID => { - // [`Self::decode_clog_record`] may never fail and always returns. - // It has this interface to match all the other decoding methods. - let clog_record = Self::decode_clog_record(&mut buf, &decoded, pg_version) - .unwrap() - .unwrap(); - - match clog_record { - ClogRecord::ZeroPage(zero_page) => { - self.ingest_clog_zero_page(zero_page, modification, ctx) - .await?; - } - ClogRecord::Truncate(truncate) => { - self.ingest_clog_truncate(truncate, modification, ctx) - .await?; - } - } - } - pg_constants::RM_XACT_ID => { - let maybe_xact_record = - Self::decode_xact_record(&mut buf, &decoded, lsn, pg_version).unwrap(); - if let Some(xact_record) = maybe_xact_record { - self.ingest_xact_record(xact_record, modification, ctx) + match interpreted.metadata_record { + Some(MetadataRecord::Heapam(rec)) => match rec { + HeapamRecord::ClearVmBits(clear_vm_bits) => { + self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) .await?; } - } - pg_constants::RM_MULTIXACT_ID => { - let maybe_multixact_record = - Self::decode_multixact_record(&mut buf, &decoded, pg_version).unwrap(); - if let Some(multixact_record) = maybe_multixact_record { - match multixact_record { - MultiXactRecord::ZeroPage(zero_page) => { - self.ingest_multixact_zero_page(zero_page, modification, ctx) - .await?; - } - MultiXactRecord::Create(create) => { - self.ingest_multixact_create(modification, &create)?; - } - MultiXactRecord::Truncate(truncate) => { - self.ingest_multixact_truncate(modification, &truncate, ctx) - .await?; - } - } - } - } - pg_constants::RM_RELMAP_ID => { - let relmap_record = Self::decode_relmap_record(&mut buf, &decoded, pg_version) - .unwrap() - .unwrap(); - match relmap_record { - RelmapRecord::Update(update) => { - self.ingest_relmap_update(update, modification, ctx).await?; - } - } - } - // This is an odd duck. It needs to go to all shards. - // Since it uses the checkpoint image (that's initialized from CHECKPOINT_KEY - // in WalIngest::new), we have to send the whole DecodedWalRecord::record to - // the pageserver and decode it there. - // - // Alternatively, one can make the checkpoint part of the subscription protocol - // to the pageserver. This should work fine, but can be done at a later point. - pg_constants::RM_XLOG_ID => { - let xlog_record = Self::decode_xlog_record(&mut buf, &decoded, lsn, pg_version) - .unwrap() - .unwrap(); - - match xlog_record { - XlogRecord::Raw(raw) => { - self.ingest_raw_xlog_record(raw, modification, ctx).await?; - } - } - } - pg_constants::RM_LOGICALMSG_ID => { - let maybe_logical_message_record = - Self::decode_logical_message_record(&mut buf, &decoded, pg_version).unwrap(); - if let Some(logical_message_record) = maybe_logical_message_record { - match logical_message_record { - LogicalMessageRecord::Put(put) => { - self.ingest_logical_message_put(put, modification, ctx) - .await?; - } - #[cfg(feature = "testing")] - LogicalMessageRecord::Failpoint => { - // This is a convenient way to make the WAL ingestion pause at - // particular point in the WAL. For more fine-grained control, - // we could peek into the message and only pause if it contains - // a particular string, for example, but this is enough for now. - failpoint_support::sleep_millis_async!( - "pageserver-wal-ingest-logical-message-sleep" - ); - } - } - } - } - pg_constants::RM_STANDBY_ID => { - let maybe_standby_record = - Self::decode_standby_record(&mut buf, &decoded, pg_version).unwrap(); - if let Some(standby_record) = maybe_standby_record { - self.ingest_standby_record(standby_record).unwrap(); - } - } - pg_constants::RM_REPLORIGIN_ID => { - let maybe_replorigin_record = - Self::decode_replorigin_record(&mut buf, &decoded, pg_version).unwrap(); - if let Some(replorigin_record) = maybe_replorigin_record { - self.ingest_replorigin_record(replorigin_record, modification) + }, + Some(MetadataRecord::Neonrmgr(rec)) => match rec { + NeonrmgrRecord::ClearVmBits(clear_vm_bits) => { + self.ingest_clear_vm_bits(clear_vm_bits, modification, ctx) .await?; } + }, + Some(MetadataRecord::Smgr(rec)) => match rec { + SmgrRecord::Create(create) => { + self.ingest_xlog_smgr_create(create, modification, ctx) + .await?; + } + SmgrRecord::Truncate(truncate) => { + self.ingest_xlog_smgr_truncate(truncate, modification, ctx) + .await?; + } + }, + Some(MetadataRecord::Dbase(rec)) => match rec { + DbaseRecord::Create(create) => { + self.ingest_xlog_dbase_create(create, modification, ctx) + .await?; + } + DbaseRecord::Drop(drop) => { + self.ingest_xlog_dbase_drop(drop, modification, ctx).await?; + } + }, + Some(MetadataRecord::Clog(rec)) => match rec { + ClogRecord::ZeroPage(zero_page) => { + self.ingest_clog_zero_page(zero_page, modification, ctx) + .await?; + } + ClogRecord::Truncate(truncate) => { + self.ingest_clog_truncate(truncate, modification, ctx) + .await?; + } + }, + Some(MetadataRecord::Xact(rec)) => { + self.ingest_xact_record(rec, modification, ctx).await?; } - _x => { - // TODO: should probably log & fail here instead of blindly - // doing something without understanding the protocol + Some(MetadataRecord::MultiXact(rec)) => match rec { + MultiXactRecord::ZeroPage(zero_page) => { + self.ingest_multixact_zero_page(zero_page, modification, ctx) + .await?; + } + MultiXactRecord::Create(create) => { + self.ingest_multixact_create(modification, &create)?; + } + MultiXactRecord::Truncate(truncate) => { + self.ingest_multixact_truncate(modification, &truncate, ctx) + .await?; + } + }, + Some(MetadataRecord::Relmap(rec)) => match rec { + RelmapRecord::Update(update) => { + self.ingest_relmap_update(update, modification, ctx).await?; + } + }, + Some(MetadataRecord::Xlog(rec)) => match rec { + XlogRecord::Raw(raw) => { + self.ingest_raw_xlog_record(raw, modification, ctx).await?; + } + }, + Some(MetadataRecord::LogicalMessage(rec)) => match rec { + LogicalMessageRecord::Put(put) => { + self.ingest_logical_message_put(put, modification, ctx) + .await?; + } + #[cfg(feature = "testing")] + LogicalMessageRecord::Failpoint => { + // This is a convenient way to make the WAL ingestion pause at + // particular point in the WAL. For more fine-grained control, + // we could peek into the message and only pause if it contains + // a particular string, for example, but this is enough for now. + failpoint_support::sleep_millis_async!( + "pageserver-wal-ingest-logical-message-sleep" + ); + } + }, + Some(MetadataRecord::Standby(rec)) => { + self.ingest_standby_record(rec).unwrap(); + } + Some(MetadataRecord::Replorigin(rec)) => { + self.ingest_replorigin_record(rec, modification).await?; + } + None => { + // There are two cases through which we end up here: + // 1. The resource manager for the original PG WAL record + // is [`pg_constants::RM_TBLSPC_ID`]. This is not a supported + // record type within Neon. + // 2. The resource manager id was unknown to + // [`wal_decoder::decoder::MetadataRecord::from_decoded`]. + // TODO(vlad): Tighten this up more once we build confidence + // that case (2) does not happen in the field. } } - // Iterate through all the blocks that the record modifies, and - // "put" a separate copy of the record for each block. - for blk in decoded.blocks.iter() { - let rel = RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum, - }; - - let key = rel_block_to_key(rel, blk.blkno); - let key_is_local = self.shard.is_key_local(&key); - - tracing::debug!( - lsn=%lsn, - key=%key, - "ingest: shard decision {} (checkpoint={})", - if !key_is_local { "drop" } else { "keep" }, - self.checkpoint_modified - ); - - if !key_is_local { - if self.shard.is_shard_zero() { - // Shard 0 tracks relation sizes. Although we will not store this block, we will observe - // its blkno in case it implicitly extends a relation. - self.observe_decoded_block(modification, blk, ctx).await?; + // Iterate through all the key value pairs provided in the interpreted block + // and update the modification currently in-flight to include them. + for (compact_key, maybe_value) in interpreted.blocks.into_iter() { + let (rel, blk) = Key::from_compact(compact_key).to_rel_block()?; + match maybe_value { + Some(Value::Image(img)) => { + self.put_rel_page_image(modification, rel, blk, img, ctx) + .await?; + } + Some(Value::WalRecord(rec)) => { + self.put_rel_wal_record(modification, rel, blk, rec, ctx) + .await?; + } + None => { + // Shard 0 tracks relation sizes. We will observe + // its blkno in case it implicitly extends a relation. + assert!(self.shard.is_shard_zero()); + self.observe_decoded_block(modification, rel, blk, ctx) + .await?; } - - continue; } - self.ingest_decoded_block(modification, lsn, &decoded, blk, ctx) - .await?; } // If checkpoint data was updated, store the new version in the repository @@ -433,82 +338,11 @@ impl WalIngest { async fn observe_decoded_block( &mut self, modification: &mut DatadirModification<'_>, - blk: &DecodedBkpBlock, + rel: RelTag, + blkno: BlockNumber, ctx: &RequestContext, ) -> Result<(), PageReconstructError> { - let rel = RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum, - }; - self.handle_rel_extend(modification, rel, blk.blkno, ctx) - .await - } - - async fn ingest_decoded_block( - &mut self, - modification: &mut DatadirModification<'_>, - lsn: Lsn, - decoded: &DecodedWALRecord, - blk: &DecodedBkpBlock, - ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { - let rel = RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum, - }; - - // - // Instead of storing full-page-image WAL record, - // it is better to store extracted image: we can skip wal-redo - // in this case. Also some FPI records may contain multiple (up to 32) pages, - // so them have to be copied multiple times. - // - if blk.apply_image - && blk.has_image - && decoded.xl_rmid == pg_constants::RM_XLOG_ID - && (decoded.xl_info == pg_constants::XLOG_FPI - || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) - // compression of WAL is not yet supported: fall back to storing the original WAL record - && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version) - // do not materialize null pages because them most likely be soon replaced with real data - && blk.bimg_len != 0 - { - // Extract page image from FPI record - let img_len = blk.bimg_len as usize; - let img_offs = blk.bimg_offset as usize; - let mut image = BytesMut::with_capacity(BLCKSZ as usize); - image.extend_from_slice(&decoded.record[img_offs..img_offs + img_len]); - - if blk.hole_length != 0 { - let tail = image.split_off(blk.hole_offset as usize); - image.resize(image.len() + blk.hole_length as usize, 0u8); - image.unsplit(tail); - } - // - // Match the logic of XLogReadBufferForRedoExtended: - // The page may be uninitialized. If so, we can't set the LSN because - // that would corrupt the page. - // - if !page_is_new(&image) { - page_set_lsn(&mut image, lsn) - } - assert_eq!(image.len(), BLCKSZ as usize); - - self.put_rel_page_image(modification, rel, blk.blkno, image.freeze(), ctx) - .await?; - } else { - let rec = NeonWalRecord::Postgres { - will_init: blk.will_init || blk.apply_image, - rec: decoded.record.clone(), - }; - self.put_rel_wal_record(modification, rel, blk.blkno, rec, ctx) - .await?; - } - Ok(()) + self.handle_rel_extend(modification, rel, blkno, ctx).await } async fn ingest_clear_vm_bits( @@ -599,413 +433,6 @@ impl WalIngest { Ok(()) } - fn decode_heapam_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - pg_version: u32, - ) -> anyhow::Result> { - // Handle VM bit updates that are implicitly part of heap records. - - // First, look at the record to determine which VM bits need - // to be cleared. If either of these variables is set, we - // need to clear the corresponding bits in the visibility map. - let mut new_heap_blkno: Option = None; - let mut old_heap_blkno: Option = None; - let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - - match pg_version { - 14 => { - if decoded.xl_rmid == pg_constants::RM_HEAP_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - - if info == pg_constants::XLOG_HEAP_INSERT { - let xlrec = v14::XlHeapInsert::decode(buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_DELETE { - let xlrec = v14::XlHeapDelete::decode(buf); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_UPDATE - || info == pg_constants::XLOG_HEAP_HOT_UPDATE - { - let xlrec = v14::XlHeapUpdate::decode(buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a - // non-HOT update where the new tuple goes to different page than - // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is - // set. - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_LOCK { - let xlrec = v14::XlHeapLock::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { - let xlrec = v14::XlHeapMultiInsert::decode(buf); - - let offset_array_len = - if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { - let xlrec = v14::XlHeapLockUpdated::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else { - bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); - } - } - 15 => { - if decoded.xl_rmid == pg_constants::RM_HEAP_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - - if info == pg_constants::XLOG_HEAP_INSERT { - let xlrec = v15::XlHeapInsert::decode(buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_DELETE { - let xlrec = v15::XlHeapDelete::decode(buf); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_UPDATE - || info == pg_constants::XLOG_HEAP_HOT_UPDATE - { - let xlrec = v15::XlHeapUpdate::decode(buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a - // non-HOT update where the new tuple goes to different page than - // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is - // set. - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_LOCK { - let xlrec = v15::XlHeapLock::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { - let xlrec = v15::XlHeapMultiInsert::decode(buf); - - let offset_array_len = - if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { - let xlrec = v15::XlHeapLockUpdated::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else { - bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); - } - } - 16 => { - if decoded.xl_rmid == pg_constants::RM_HEAP_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - - if info == pg_constants::XLOG_HEAP_INSERT { - let xlrec = v16::XlHeapInsert::decode(buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_DELETE { - let xlrec = v16::XlHeapDelete::decode(buf); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_UPDATE - || info == pg_constants::XLOG_HEAP_HOT_UPDATE - { - let xlrec = v16::XlHeapUpdate::decode(buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a - // non-HOT update where the new tuple goes to different page than - // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is - // set. - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_LOCK { - let xlrec = v16::XlHeapLock::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { - let xlrec = v16::XlHeapMultiInsert::decode(buf); - - let offset_array_len = - if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { - let xlrec = v16::XlHeapLockUpdated::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else { - bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); - } - } - 17 => { - if decoded.xl_rmid == pg_constants::RM_HEAP_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - - if info == pg_constants::XLOG_HEAP_INSERT { - let xlrec = v17::XlHeapInsert::decode(buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_DELETE { - let xlrec = v17::XlHeapDelete::decode(buf); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_UPDATE - || info == pg_constants::XLOG_HEAP_HOT_UPDATE - { - let xlrec = v17::XlHeapUpdate::decode(buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a - // non-HOT update where the new tuple goes to different page than - // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is - // set. - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP_LOCK { - let xlrec = v17::XlHeapLock::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else if decoded.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { - let xlrec = v17::XlHeapMultiInsert::decode(buf); - - let offset_array_len = - if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } else if info == pg_constants::XLOG_HEAP2_LOCK_UPDATED { - let xlrec = v17::XlHeapLockUpdated::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - } else { - bail!("Unknown RMGR {} for Heap decoding", decoded.xl_rmid); - } - } - _ => {} - } - - if new_heap_blkno.is_some() || old_heap_blkno.is_some() { - let vm_rel = RelTag { - forknum: VISIBILITYMAP_FORKNUM, - spcnode: decoded.blocks[0].rnode_spcnode, - dbnode: decoded.blocks[0].rnode_dbnode, - relnode: decoded.blocks[0].rnode_relnode, - }; - - Ok(Some(HeapamRecord::ClearVmBits(ClearVmBits { - new_heap_blkno, - old_heap_blkno, - vm_rel, - flags, - }))) - } else { - Ok(None) - } - } - - fn decode_neonmgr_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - pg_version: u32, - ) -> anyhow::Result> { - // Handle VM bit updates that are implicitly part of heap records. - - // First, look at the record to determine which VM bits need - // to be cleared. If either of these variables is set, we - // need to clear the corresponding bits in the visibility map. - let mut new_heap_blkno: Option = None; - let mut old_heap_blkno: Option = None; - let mut flags = pg_constants::VISIBILITYMAP_VALID_BITS; - - assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); - - match pg_version { - 16 | 17 => { - let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; - - match info { - pg_constants::XLOG_NEON_HEAP_INSERT => { - let xlrec = v17::rm_neon::XlNeonHeapInsert::decode(buf); - assert_eq!(0, buf.remaining()); - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } - pg_constants::XLOG_NEON_HEAP_DELETE => { - let xlrec = v17::rm_neon::XlNeonHeapDelete::decode(buf); - if (xlrec.flags & pg_constants::XLH_DELETE_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } - pg_constants::XLOG_NEON_HEAP_UPDATE - | pg_constants::XLOG_NEON_HEAP_HOT_UPDATE => { - let xlrec = v17::rm_neon::XlNeonHeapUpdate::decode(buf); - // the size of tuple data is inferred from the size of the record. - // we can't validate the remaining number of bytes without parsing - // the tuple data. - if (xlrec.flags & pg_constants::XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks.last().unwrap().blkno); - } - if (xlrec.flags & pg_constants::XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED) != 0 { - // PostgreSQL only uses XLH_UPDATE_NEW_ALL_VISIBLE_CLEARED on a - // non-HOT update where the new tuple goes to different page than - // the old one. Otherwise, only XLH_UPDATE_OLD_ALL_VISIBLE_CLEARED is - // set. - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } - pg_constants::XLOG_NEON_HEAP_MULTI_INSERT => { - let xlrec = v17::rm_neon::XlNeonHeapMultiInsert::decode(buf); - - let offset_array_len = - if decoded.xl_info & pg_constants::XLOG_HEAP_INIT_PAGE > 0 { - // the offsets array is omitted if XLOG_HEAP_INIT_PAGE is set - 0 - } else { - size_of::() * xlrec.ntuples as usize - }; - assert_eq!(offset_array_len, buf.remaining()); - - if (xlrec.flags & pg_constants::XLH_INSERT_ALL_VISIBLE_CLEARED) != 0 { - new_heap_blkno = Some(decoded.blocks[0].blkno); - } - } - pg_constants::XLOG_NEON_HEAP_LOCK => { - let xlrec = v17::rm_neon::XlNeonHeapLock::decode(buf); - if (xlrec.flags & pg_constants::XLH_LOCK_ALL_FROZEN_CLEARED) != 0 { - old_heap_blkno = Some(decoded.blocks[0].blkno); - flags = pg_constants::VISIBILITYMAP_ALL_FROZEN; - } - } - info => bail!("Unknown WAL record type for Neon RMGR: {}", info), - } - } - _ => bail!( - "Neon RMGR has no known compatibility with PostgreSQL version {}", - pg_version - ), - } - - if new_heap_blkno.is_some() || old_heap_blkno.is_some() { - let vm_rel = RelTag { - forknum: VISIBILITYMAP_FORKNUM, - spcnode: decoded.blocks[0].rnode_spcnode, - dbnode: decoded.blocks[0].rnode_dbnode, - relnode: decoded.blocks[0].rnode_relnode, - }; - - Ok(Some(NeonrmgrRecord::ClearVmBits(ClearVmBits { - new_heap_blkno, - old_heap_blkno, - vm_rel, - flags, - }))) - } else { - Ok(None) - } - } - /// Subroutine of ingest_record(), to handle an XLOG_DBASE_CREATE record. async fn ingest_xlog_dbase_create( &mut self, @@ -1122,125 +549,6 @@ impl WalIngest { Ok(()) } - fn decode_dbase_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - pg_version: u32, - ) -> anyhow::Result> { - // TODO: Refactor this to avoid the duplication between postgres versions. - - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - debug!(%info, %pg_version, "handle RM_DBASE_ID"); - - if pg_version == 14 { - if info == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE { - let createdb = XlCreateDatabase::decode(buf); - debug!("XLOG_DBASE_CREATE v14"); - - let record = DbaseRecord::Create(DbaseCreate { - db_id: createdb.db_id, - tablespace_id: createdb.tablespace_id, - src_db_id: createdb.src_db_id, - src_tablespace_id: createdb.src_tablespace_id, - }); - - return Ok(Some(record)); - } else if info == postgres_ffi::v14::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(buf); - - let record = DbaseRecord::Drop(DbaseDrop { - db_id: dropdb.db_id, - tablespace_ids: dropdb.tablespace_ids, - }); - - return Ok(Some(record)); - } - } else if pg_version == 15 { - if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - - let createdb = XlCreateDatabase::decode(buf); - let record = DbaseRecord::Create(DbaseCreate { - db_id: createdb.db_id, - tablespace_id: createdb.tablespace_id, - src_db_id: createdb.src_db_id, - src_tablespace_id: createdb.src_tablespace_id, - }); - - return Ok(Some(record)); - } else if info == postgres_ffi::v15::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(buf); - let record = DbaseRecord::Drop(DbaseDrop { - db_id: dropdb.db_id, - tablespace_ids: dropdb.tablespace_ids, - }); - - return Ok(Some(record)); - } - } else if pg_version == 16 { - if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - - let createdb = XlCreateDatabase::decode(buf); - let record = DbaseRecord::Create(DbaseCreate { - db_id: createdb.db_id, - tablespace_id: createdb.tablespace_id, - src_db_id: createdb.src_db_id, - src_tablespace_id: createdb.src_tablespace_id, - }); - - return Ok(Some(record)); - } else if info == postgres_ffi::v16::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(buf); - let record = DbaseRecord::Drop(DbaseDrop { - db_id: dropdb.db_id, - tablespace_ids: dropdb.tablespace_ids, - }); - - return Ok(Some(record)); - } - } else if pg_version == 17 { - if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_WAL_LOG { - debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_CREATE_FILE_COPY { - // The XLOG record was renamed between v14 and v15, - // but the record format is the same. - // So we can reuse XlCreateDatabase here. - debug!("XLOG_DBASE_CREATE_FILE_COPY"); - - let createdb = XlCreateDatabase::decode(buf); - let record = DbaseRecord::Create(DbaseCreate { - db_id: createdb.db_id, - tablespace_id: createdb.tablespace_id, - src_db_id: createdb.src_db_id, - src_tablespace_id: createdb.src_tablespace_id, - }); - - return Ok(Some(record)); - } else if info == postgres_ffi::v17::bindings::XLOG_DBASE_DROP { - let dropdb = XlDropDatabase::decode(buf); - let record = DbaseRecord::Drop(DbaseDrop { - db_id: dropdb.db_id, - tablespace_ids: dropdb.tablespace_ids, - }); - - return Ok(Some(record)); - } - } - - Ok(None) - } - async fn ingest_xlog_smgr_create( &mut self, create: SmgrCreate, @@ -1252,30 +560,6 @@ impl WalIngest { Ok(()) } - fn decode_smgr_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - _pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_SMGR_CREATE { - let create = XlSmgrCreate::decode(buf); - let rel = RelTag { - spcnode: create.rnode.spcnode, - dbnode: create.rnode.dbnode, - relnode: create.rnode.relnode, - forknum: create.forknum, - }; - - return Ok(Some(SmgrRecord::Create(SmgrCreate { rel }))); - } else if info == pg_constants::XLOG_SMGR_TRUNCATE { - let truncate = XlSmgrTruncate::decode(buf); - return Ok(Some(SmgrRecord::Truncate(truncate))); - } - - Ok(None) - } - /// Subroutine of ingest_record(), to handle an XLOG_SMGR_TRUNCATE record. /// /// This is the same logic as in PostgreSQL's smgr_redo() function. @@ -1535,59 +819,6 @@ impl WalIngest { Ok(()) } - // TODO(vlad): Standardise interface for `decode_...` - fn decode_xact_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - lsn: Lsn, - _pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - let origin_id = decoded.origin_id; - let xl_xid = decoded.xl_xid; - - if info == pg_constants::XLOG_XACT_COMMIT { - let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); - return Ok(Some(XactRecord::Commit(XactCommon { - parsed, - origin_id, - xl_xid, - lsn, - }))); - } else if info == pg_constants::XLOG_XACT_ABORT { - let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); - return Ok(Some(XactRecord::Abort(XactCommon { - parsed, - origin_id, - xl_xid, - lsn, - }))); - } else if info == pg_constants::XLOG_XACT_COMMIT_PREPARED { - let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); - return Ok(Some(XactRecord::CommitPrepared(XactCommon { - parsed, - origin_id, - xl_xid, - lsn, - }))); - } else if info == pg_constants::XLOG_XACT_ABORT_PREPARED { - let parsed = XlXactParsedRecord::decode(buf, decoded.xl_xid, decoded.xl_info); - return Ok(Some(XactRecord::AbortPrepared(XactCommon { - parsed, - origin_id, - xl_xid, - lsn, - }))); - } else if info == pg_constants::XLOG_XACT_PREPARE { - return Ok(Some(XactRecord::Prepare(XactPrepare { - xl_xid: decoded.xl_xid, - data: Bytes::copy_from_slice(&buf[..]), - }))); - } - - Ok(None) - } - async fn ingest_clog_truncate( &mut self, truncate: ClogTruncate, @@ -1681,35 +912,6 @@ impl WalIngest { .await } - fn decode_clog_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; - - if info == pg_constants::CLOG_ZEROPAGE { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - - Ok(Some(ClogRecord::ZeroPage(ClogZeroPage { segno, rpageno }))) - } else { - assert!(info == pg_constants::CLOG_TRUNCATE); - let xlrec = XlClogTruncate::decode(buf, pg_version); - - Ok(Some(ClogRecord::Truncate(ClogTruncate { - pageno: xlrec.pageno, - oldest_xid: xlrec.oldest_xid, - oldest_xid_db: xlrec.oldest_xid_db, - }))) - } - } - fn ingest_multixact_create( &mut self, modification: &mut DatadirModification, @@ -1880,46 +1082,6 @@ impl WalIngest { .await } - fn decode_multixact_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE - || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE - { - let pageno = if pg_version < 17 { - buf.get_u32_le() - } else { - buf.get_u64_le() as u32 - }; - let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - - let slru_kind = match info { - pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE => SlruKind::MultiXactOffsets, - pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE => SlruKind::MultiXactMembers, - _ => unreachable!(), - }; - - return Ok(Some(MultiXactRecord::ZeroPage(MultiXactZeroPage { - slru_kind, - segno, - rpageno, - }))); - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(buf); - return Ok(Some(MultiXactRecord::Create(xlrec))); - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(buf); - return Ok(Some(MultiXactRecord::Truncate(xlrec))); - } - - Ok(None) - } - async fn ingest_relmap_update( &mut self, update: RelmapUpdate, @@ -1933,24 +1095,6 @@ impl WalIngest { .await } - fn decode_relmap_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - _pg_version: u32, - ) -> anyhow::Result> { - let update = XlRelmapUpdate::decode(buf); - - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - // skip xl_relmap_update - buf.advance(12); - - Ok(Some(RelmapRecord::Update(RelmapUpdate { - update, - buf: Bytes::copy_from_slice(&buf[..]), - }))) - } - async fn ingest_raw_xlog_record( &mut self, raw_record: RawXlogRecord, @@ -2051,20 +1195,6 @@ impl WalIngest { Ok(()) } - fn decode_xlog_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - lsn: Lsn, - _pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - Ok(Some(XlogRecord::Raw(RawXlogRecord { - info, - lsn, - buf: buf.clone(), - }))) - } - async fn ingest_logical_message_put( &mut self, put: PutLogicalMessage, @@ -2075,50 +1205,6 @@ impl WalIngest { modification.put_file(path.as_str(), &buf, ctx).await } - fn decode_logical_message_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - _pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_LOGICAL_MESSAGE { - let xlrec = XlLogicalMessage::decode(buf); - let prefix = std::str::from_utf8(&buf[0..xlrec.prefix_size - 1])?; - - #[cfg(feature = "testing")] - if prefix == "neon-test" { - return Ok(Some(LogicalMessageRecord::Failpoint)); - } - - if let Some(path) = prefix.strip_prefix("neon-file:") { - let buf_size = xlrec.prefix_size + xlrec.message_size; - let buf = Bytes::copy_from_slice(&buf[xlrec.prefix_size..buf_size]); - return Ok(Some(LogicalMessageRecord::Put(PutLogicalMessage { - path: path.to_string(), - buf, - }))); - } - } - - Ok(None) - } - - fn decode_standby_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - _pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_RUNNING_XACTS { - let xlrec = XlRunningXacts::decode(buf); - return Ok(Some(StandbyRecord::RunningXacts(StandbyRunningXacts { - oldest_running_xid: xlrec.oldest_running_xid, - }))); - } - - Ok(None) - } - fn ingest_standby_record(&mut self, record: StandbyRecord) -> Result<()> { match record { StandbyRecord::RunningXacts(running_xacts) => { @@ -2133,23 +1219,6 @@ impl WalIngest { Ok(()) } - fn decode_replorigin_record( - buf: &mut Bytes, - decoded: &DecodedWALRecord, - _pg_version: u32, - ) -> anyhow::Result> { - let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_REPLORIGIN_SET { - let xlrec = XlReploriginSet::decode(buf); - return Ok(Some(ReploriginRecord::Set(xlrec))); - } else if info == pg_constants::XLOG_REPLORIGIN_DROP { - let xlrec = XlReploriginDrop::decode(buf); - return Ok(Some(ReploriginRecord::Drop(xlrec))); - } - - Ok(None) - } - async fn ingest_replorigin_record( &mut self, record: ReploriginRecord, @@ -3010,7 +2079,6 @@ mod tests { async fn test_ingest_real_wal() { use crate::tenant::harness::*; use postgres_ffi::waldecoder::WalStreamDecoder; - use postgres_ffi::walrecord::decode_wal_record; use postgres_ffi::WAL_SEGMENT_SIZE; // Define test data path and constants. @@ -3082,10 +2150,16 @@ mod tests { for chunk in bytes[xlogoff..].chunks(50) { decoder.feed_bytes(chunk); while let Some((lsn, recdata)) = decoder.poll_decode().unwrap() { - let mut decoded = DecodedWALRecord::default(); - decode_wal_record(recdata, &mut decoded, modification.tline.pg_version).unwrap(); + let interpreted = InterpretedWalRecord::from_bytes_filtered( + recdata, + modification.tline.get_shard_identity(), + lsn, + modification.tline.pg_version, + ) + .unwrap(); + walingest - .ingest_record(decoded, lsn, &mut modification, &ctx) + .ingest_record(interpreted, &mut modification, &ctx) .instrument(span.clone()) .await .unwrap();