From 4571db1750907c8268d7dfe3fb4038ee2f626791 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 2 Feb 2024 10:14:50 +0000 Subject: [PATCH] extract NeonWalRecord apply logic --- pageserver/src/walredo.rs | 255 ++------------------------- pageserver/src/walredo/apply_neon.rs | 251 ++++++++++++++++++++++++++ 2 files changed, 266 insertions(+), 240 deletions(-) create mode 100644 pageserver/src/walredo/apply_neon.rs diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 8c363ac34c..cfedec4b28 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -17,9 +17,15 @@ //! records. It achieves it by dropping privileges before replaying //! any WAL records, so that even if an attacker hijacks the Postgres //! process, he cannot escape out of it. -//! + +/// Process lifecycle and abstracction for the IPC protocol. +mod process; + +/// Code to apply [`NeonWalRecord`]s. +mod apply_neon; + use anyhow::Context; -use byteorder::{ByteOrder, LittleEndian}; + use bytes::{Bytes, BytesMut}; use pageserver_api::models::WalRedoManagerStatus; @@ -42,18 +48,7 @@ use crate::metrics::{ use crate::repository::Key; use crate::walrecord::NeonWalRecord; -use pageserver_api::key::{key_to_rel_block, key_to_slru_block}; -use pageserver_api::reltag::SlruKind; -use postgres_ffi::pg_constants; -use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; -use postgres_ffi::v14::nonrelfile_utils::{ - mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, - transaction_id_set_status, -}; -use postgres_ffi::BLCKSZ; - -mod process; -use process::WalRedoProcess; +use pageserver_api::key::key_to_rel_block; /// /// This is the real implementation that uses a Postgres process to @@ -66,22 +61,7 @@ pub struct PostgresRedoManager { tenant_shard_id: TenantShardId, conf: &'static PageServerConf, last_redo_at: std::sync::Mutex>, - redo_process: RwLock>>, -} - -/// Can this request be served by neon redo functions -/// or we need to pass it to wal-redo postgres process? -fn can_apply_in_neon(rec: &NeonWalRecord) -> bool { - // Currently, we don't have bespoken Rust code to replay any - // Postgres WAL records. But everything else is handled in neon. - #[allow(clippy::match_like_matches_macro)] - match rec { - NeonWalRecord::Postgres { - will_init: _, - rec: _, - } => false, - _ => true, - } + redo_process: RwLock>>, } /// @@ -111,10 +91,10 @@ impl PostgresRedoManager { let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID); let mut img = base_img.map(|p| p.1); - let mut batch_neon = can_apply_in_neon(&records[0].1); + let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1); let mut batch_start = 0; for (i, record) in records.iter().enumerate().skip(1) { - let rec_neon = can_apply_in_neon(&record.1); + let rec_neon = apply_neon::can_apply_in_neon(&record.1); if rec_neon != batch_neon { let result = if batch_neon { @@ -220,7 +200,7 @@ impl PostgresRedoManager { let mut n_attempts = 0u32; loop { // launch the WAL redo process on first use - let proc: Arc = { + let proc: Arc = { let proc_guard = self.redo_process.read().unwrap(); match &*proc_guard { None => { @@ -231,7 +211,7 @@ impl PostgresRedoManager { None => { let start = Instant::now(); let proc = Arc::new( - WalRedoProcess::launch( + process::WalRedoProcess::launch( self.conf, self.tenant_shard_id, pg_version, @@ -387,212 +367,7 @@ impl PostgresRedoManager { _record_lsn: Lsn, record: &NeonWalRecord, ) -> anyhow::Result<()> { - match record { - NeonWalRecord::Postgres { - will_init: _, - rec: _, - } => { - anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); - } - NeonWalRecord::ClearVisibilityMapFlags { - new_heap_blkno, - old_heap_blkno, - flags, - } => { - // sanity check that this is modifying the correct relation - let (rel, blknum) = key_to_rel_block(key).context("invalid record")?; - assert!( - rel.forknum == VISIBILITYMAP_FORKNUM, - "ClearVisibilityMapFlags record on unexpected rel {}", - rel - ); - if let Some(heap_blkno) = *new_heap_blkno { - // Calculate the VM block and offset that corresponds to the heap block. - let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); - let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); - let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); - - // Check that we're modifying the correct VM block. - assert!(map_block == blknum); - - // equivalent to PageGetContents(page) - let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; - - map[map_byte as usize] &= !(flags << map_offset); - } - - // Repeat for 'old_heap_blkno', if any - if let Some(heap_blkno) = *old_heap_blkno { - let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); - let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); - let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); - - assert!(map_block == blknum); - - let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; - - map[map_byte as usize] &= !(flags << map_offset); - } - } - // Non-relational WAL records are handled here, with custom code that has the - // same effects as the corresponding Postgres WAL redo function. - NeonWalRecord::ClogSetCommitted { xids, timestamp } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::Clog, - "ClogSetCommitted record with unexpected key {}", - key - ); - for &xid in xids { - let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - - // Check that we're modifying the correct CLOG block. - assert!( - segno == expected_segno, - "ClogSetCommitted record for XID {} with unexpected key {}", - xid, - key - ); - assert!( - blknum == expected_blknum, - "ClogSetCommitted record for XID {} with unexpected key {}", - xid, - key - ); - - transaction_id_set_status( - xid, - pg_constants::TRANSACTION_STATUS_COMMITTED, - page, - ); - } - - // Append the timestamp - if page.len() == BLCKSZ as usize + 8 { - page.truncate(BLCKSZ as usize); - } - if page.len() == BLCKSZ as usize { - page.extend_from_slice(×tamp.to_be_bytes()); - } else { - warn!( - "CLOG blk {} in seg {} has invalid size {}", - blknum, - segno, - page.len() - ); - } - } - NeonWalRecord::ClogSetAborted { xids } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::Clog, - "ClogSetAborted record with unexpected key {}", - key - ); - for &xid in xids { - let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - - // Check that we're modifying the correct CLOG block. - assert!( - segno == expected_segno, - "ClogSetAborted record for XID {} with unexpected key {}", - xid, - key - ); - assert!( - blknum == expected_blknum, - "ClogSetAborted record for XID {} with unexpected key {}", - xid, - key - ); - - transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page); - } - } - NeonWalRecord::MultixactOffsetCreate { mid, moff } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::MultiXactOffsets, - "MultixactOffsetCreate record with unexpected key {}", - key - ); - // Compute the block and offset to modify. - // See RecordNewMultiXact in PostgreSQL sources. - let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let offset = (entryno * 4) as usize; - - // Check that we're modifying the correct multixact-offsets block. - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - assert!( - segno == expected_segno, - "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", - mid, - key - ); - assert!( - blknum == expected_blknum, - "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", - mid, - key - ); - - LittleEndian::write_u32(&mut page[offset..offset + 4], *moff); - } - NeonWalRecord::MultixactMembersCreate { moff, members } => { - let (slru_kind, segno, blknum) = - key_to_slru_block(key).context("invalid record")?; - assert_eq!( - slru_kind, - SlruKind::MultiXactMembers, - "MultixactMembersCreate record with unexpected key {}", - key - ); - for (i, member) in members.iter().enumerate() { - let offset = moff + i as u32; - - // Compute the block and offset to modify. - // See RecordNewMultiXact in PostgreSQL sources. - let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - - // Check that we're modifying the correct multixact-members block. - let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; - let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; - assert!( - segno == expected_segno, - "MultiXactMembersCreate record for offset {} with unexpected key {}", - moff, - key - ); - assert!( - blknum == expected_blknum, - "MultiXactMembersCreate record for offset {} with unexpected key {}", - moff, - key - ); - - let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); - flagsval |= member.status << bshift; - LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval); - LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); - } - } - } + apply_neon::apply_in_neon(record, key, page)?; Ok(()) } diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs new file mode 100644 index 0000000000..3b3cc5c084 --- /dev/null +++ b/pageserver/src/walredo/apply_neon.rs @@ -0,0 +1,251 @@ +use bytes::BytesMut; + +use crate::walrecord::NeonWalRecord; + +use anyhow::Context; +use byteorder::{ByteOrder, LittleEndian}; + + + + + + + +use tracing::*; + + +#[cfg(feature = "testing")] +use std::sync::atomic::{AtomicUsize, Ordering}; + + + + +use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key}; +use pageserver_api::reltag::SlruKind; +use postgres_ffi::pg_constants; +use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; +use postgres_ffi::v14::nonrelfile_utils::{ + mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, + transaction_id_set_status, +}; +use postgres_ffi::BLCKSZ; + +/// Can this request be served by neon redo functions +/// or we need to pass it to wal-redo postgres process? +pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool { + // Currently, we don't have bespoken Rust code to replay any + // Postgres WAL records. But everything else is handled in neon. + #[allow(clippy::match_like_matches_macro)] + match rec { + NeonWalRecord::Postgres { + will_init: _, + rec: _, + } => false, + _ => true, + } +} + +pub(crate) fn apply_in_neon( + record: &NeonWalRecord, + key: Key, + page: &mut BytesMut, +) -> Result<(), anyhow::Error> { + Ok(match record { + NeonWalRecord::Postgres { + will_init: _, + rec: _, + } => { + anyhow::bail!("tried to pass postgres wal record to neon WAL redo"); + } + NeonWalRecord::ClearVisibilityMapFlags { + new_heap_blkno, + old_heap_blkno, + flags, + } => { + // sanity check that this is modifying the correct relation + let (rel, blknum) = key_to_rel_block(key).context("invalid record")?; + assert!( + rel.forknum == VISIBILITYMAP_FORKNUM, + "ClearVisibilityMapFlags record on unexpected rel {}", + rel + ); + if let Some(heap_blkno) = *new_heap_blkno { + // Calculate the VM block and offset that corresponds to the heap block. + let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); + let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); + let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); + + // Check that we're modifying the correct VM block. + assert!(map_block == blknum); + + // equivalent to PageGetContents(page) + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + + map[map_byte as usize] &= !(flags << map_offset); + } + + // Repeat for 'old_heap_blkno', if any + if let Some(heap_blkno) = *old_heap_blkno { + let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno); + let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno); + let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno); + + assert!(map_block == blknum); + + let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..]; + + map[map_byte as usize] &= !(flags << map_offset); + } + } + // Non-relational WAL records are handled here, with custom code that has the + // same effects as the corresponding Postgres WAL redo function. + NeonWalRecord::ClogSetCommitted { xids, timestamp } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::Clog, + "ClogSetCommitted record with unexpected key {}", + key + ); + for &xid in xids { + let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + segno == expected_segno, + "ClogSetCommitted record for XID {} with unexpected key {}", + xid, + key + ); + assert!( + blknum == expected_blknum, + "ClogSetCommitted record for XID {} with unexpected key {}", + xid, + key + ); + + transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page); + } + + // Append the timestamp + if page.len() == BLCKSZ as usize + 8 { + page.truncate(BLCKSZ as usize); + } + if page.len() == BLCKSZ as usize { + page.extend_from_slice(×tamp.to_be_bytes()); + } else { + warn!( + "CLOG blk {} in seg {} has invalid size {}", + blknum, + segno, + page.len() + ); + } + } + NeonWalRecord::ClogSetAborted { xids } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::Clog, + "ClogSetAborted record with unexpected key {}", + key + ); + for &xid in xids { + let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + // Check that we're modifying the correct CLOG block. + assert!( + segno == expected_segno, + "ClogSetAborted record for XID {} with unexpected key {}", + xid, + key + ); + assert!( + blknum == expected_blknum, + "ClogSetAborted record for XID {} with unexpected key {}", + xid, + key + ); + + transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page); + } + } + NeonWalRecord::MultixactOffsetCreate { mid, moff } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::MultiXactOffsets, + "MultixactOffsetCreate record with unexpected key {}", + key + ); + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let offset = (entryno * 4) as usize; + + // Check that we're modifying the correct multixact-offsets block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + segno == expected_segno, + "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", + mid, + key + ); + assert!( + blknum == expected_blknum, + "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}", + mid, + key + ); + + LittleEndian::write_u32(&mut page[offset..offset + 4], *moff); + } + NeonWalRecord::MultixactMembersCreate { moff, members } => { + let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?; + assert_eq!( + slru_kind, + SlruKind::MultiXactMembers, + "MultixactMembersCreate record with unexpected key {}", + key + ); + for (i, member) in members.iter().enumerate() { + let offset = moff + i as u32; + + // Compute the block and offset to modify. + // See RecordNewMultiXact in PostgreSQL sources. + let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + + // Check that we're modifying the correct multixact-members block. + let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + assert!( + segno == expected_segno, + "MultiXactMembersCreate record for offset {} with unexpected key {}", + moff, + key + ); + assert!( + blknum == expected_blknum, + "MultiXactMembersCreate record for offset {} with unexpected key {}", + moff, + key + ); + + let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); + flagsval |= member.status << bshift; + LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval); + LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid); + } + } + }) +}