diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs
index 8c363ac34c..cfedec4b28 100644
--- a/pageserver/src/walredo.rs
+++ b/pageserver/src/walredo.rs
@@ -17,9 +17,15 @@
//! records. It achieves it by dropping privileges before replaying
//! any WAL records, so that even if an attacker hijacks the Postgres
//! process, he cannot escape out of it.
-//!
+
+/// Process lifecycle and abstracction for the IPC protocol.
+mod process;
+
+/// Code to apply [`NeonWalRecord`]s.
+mod apply_neon;
+
use anyhow::Context;
-use byteorder::{ByteOrder, LittleEndian};
+
use bytes::{Bytes, BytesMut};
use pageserver_api::models::WalRedoManagerStatus;
@@ -42,18 +48,7 @@ use crate::metrics::{
use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
-use pageserver_api::key::{key_to_rel_block, key_to_slru_block};
-use pageserver_api::reltag::SlruKind;
-use postgres_ffi::pg_constants;
-use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
-use postgres_ffi::v14::nonrelfile_utils::{
- mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
- transaction_id_set_status,
-};
-use postgres_ffi::BLCKSZ;
-
-mod process;
-use process::WalRedoProcess;
+use pageserver_api::key::key_to_rel_block;
///
/// This is the real implementation that uses a Postgres process to
@@ -66,22 +61,7 @@ pub struct PostgresRedoManager {
tenant_shard_id: TenantShardId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex>,
- redo_process: RwLock >>,
-}
-
-/// Can this request be served by neon redo functions
-/// or we need to pass it to wal-redo postgres process?
-fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
- // Currently, we don't have bespoken Rust code to replay any
- // Postgres WAL records. But everything else is handled in neon.
- #[allow(clippy::match_like_matches_macro)]
- match rec {
- NeonWalRecord::Postgres {
- will_init: _,
- rec: _,
- } => false,
- _ => true,
- }
+ redo_process: RwLock >>,
}
///
@@ -111,10 +91,10 @@ impl PostgresRedoManager {
let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
let mut img = base_img.map(|p| p.1);
- let mut batch_neon = can_apply_in_neon(&records[0].1);
+ let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
let mut batch_start = 0;
for (i, record) in records.iter().enumerate().skip(1) {
- let rec_neon = can_apply_in_neon(&record.1);
+ let rec_neon = apply_neon::can_apply_in_neon(&record.1);
if rec_neon != batch_neon {
let result = if batch_neon {
@@ -220,7 +200,7 @@ impl PostgresRedoManager {
let mut n_attempts = 0u32;
loop {
// launch the WAL redo process on first use
- let proc: Arc = {
+ let proc: Arc = {
let proc_guard = self.redo_process.read().unwrap();
match &*proc_guard {
None => {
@@ -231,7 +211,7 @@ impl PostgresRedoManager {
None => {
let start = Instant::now();
let proc = Arc::new(
- WalRedoProcess::launch(
+ process::WalRedoProcess::launch(
self.conf,
self.tenant_shard_id,
pg_version,
@@ -387,212 +367,7 @@ impl PostgresRedoManager {
_record_lsn: Lsn,
record: &NeonWalRecord,
) -> anyhow::Result<()> {
- match record {
- NeonWalRecord::Postgres {
- will_init: _,
- rec: _,
- } => {
- anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
- }
- NeonWalRecord::ClearVisibilityMapFlags {
- new_heap_blkno,
- old_heap_blkno,
- flags,
- } => {
- // sanity check that this is modifying the correct relation
- let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
- assert!(
- rel.forknum == VISIBILITYMAP_FORKNUM,
- "ClearVisibilityMapFlags record on unexpected rel {}",
- rel
- );
- if let Some(heap_blkno) = *new_heap_blkno {
- // Calculate the VM block and offset that corresponds to the heap block.
- let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
- let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
- let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
-
- // Check that we're modifying the correct VM block.
- assert!(map_block == blknum);
-
- // equivalent to PageGetContents(page)
- let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
-
- map[map_byte as usize] &= !(flags << map_offset);
- }
-
- // Repeat for 'old_heap_blkno', if any
- if let Some(heap_blkno) = *old_heap_blkno {
- let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
- let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
- let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
-
- assert!(map_block == blknum);
-
- let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
-
- map[map_byte as usize] &= !(flags << map_offset);
- }
- }
- // Non-relational WAL records are handled here, with custom code that has the
- // same effects as the corresponding Postgres WAL redo function.
- NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
- let (slru_kind, segno, blknum) =
- key_to_slru_block(key).context("invalid record")?;
- assert_eq!(
- slru_kind,
- SlruKind::Clog,
- "ClogSetCommitted record with unexpected key {}",
- key
- );
- for &xid in xids {
- let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
- let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
- let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
-
- // Check that we're modifying the correct CLOG block.
- assert!(
- segno == expected_segno,
- "ClogSetCommitted record for XID {} with unexpected key {}",
- xid,
- key
- );
- assert!(
- blknum == expected_blknum,
- "ClogSetCommitted record for XID {} with unexpected key {}",
- xid,
- key
- );
-
- transaction_id_set_status(
- xid,
- pg_constants::TRANSACTION_STATUS_COMMITTED,
- page,
- );
- }
-
- // Append the timestamp
- if page.len() == BLCKSZ as usize + 8 {
- page.truncate(BLCKSZ as usize);
- }
- if page.len() == BLCKSZ as usize {
- page.extend_from_slice(×tamp.to_be_bytes());
- } else {
- warn!(
- "CLOG blk {} in seg {} has invalid size {}",
- blknum,
- segno,
- page.len()
- );
- }
- }
- NeonWalRecord::ClogSetAborted { xids } => {
- let (slru_kind, segno, blknum) =
- key_to_slru_block(key).context("invalid record")?;
- assert_eq!(
- slru_kind,
- SlruKind::Clog,
- "ClogSetAborted record with unexpected key {}",
- key
- );
- for &xid in xids {
- let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
- let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
- let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
-
- // Check that we're modifying the correct CLOG block.
- assert!(
- segno == expected_segno,
- "ClogSetAborted record for XID {} with unexpected key {}",
- xid,
- key
- );
- assert!(
- blknum == expected_blknum,
- "ClogSetAborted record for XID {} with unexpected key {}",
- xid,
- key
- );
-
- transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
- }
- }
- NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
- let (slru_kind, segno, blknum) =
- key_to_slru_block(key).context("invalid record")?;
- assert_eq!(
- slru_kind,
- SlruKind::MultiXactOffsets,
- "MultixactOffsetCreate record with unexpected key {}",
- key
- );
- // Compute the block and offset to modify.
- // See RecordNewMultiXact in PostgreSQL sources.
- let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
- let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
- let offset = (entryno * 4) as usize;
-
- // Check that we're modifying the correct multixact-offsets block.
- let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
- let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
- assert!(
- segno == expected_segno,
- "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
- mid,
- key
- );
- assert!(
- blknum == expected_blknum,
- "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
- mid,
- key
- );
-
- LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
- }
- NeonWalRecord::MultixactMembersCreate { moff, members } => {
- let (slru_kind, segno, blknum) =
- key_to_slru_block(key).context("invalid record")?;
- assert_eq!(
- slru_kind,
- SlruKind::MultiXactMembers,
- "MultixactMembersCreate record with unexpected key {}",
- key
- );
- for (i, member) in members.iter().enumerate() {
- let offset = moff + i as u32;
-
- // Compute the block and offset to modify.
- // See RecordNewMultiXact in PostgreSQL sources.
- let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
- let memberoff = mx_offset_to_member_offset(offset);
- let flagsoff = mx_offset_to_flags_offset(offset);
- let bshift = mx_offset_to_flags_bitshift(offset);
-
- // Check that we're modifying the correct multixact-members block.
- let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
- let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
- assert!(
- segno == expected_segno,
- "MultiXactMembersCreate record for offset {} with unexpected key {}",
- moff,
- key
- );
- assert!(
- blknum == expected_blknum,
- "MultiXactMembersCreate record for offset {} with unexpected key {}",
- moff,
- key
- );
-
- let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
- flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
- flagsval |= member.status << bshift;
- LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
- LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
- }
- }
- }
+ apply_neon::apply_in_neon(record, key, page)?;
Ok(())
}
diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs
new file mode 100644
index 0000000000..3b3cc5c084
--- /dev/null
+++ b/pageserver/src/walredo/apply_neon.rs
@@ -0,0 +1,251 @@
+use bytes::BytesMut;
+
+use crate::walrecord::NeonWalRecord;
+
+use anyhow::Context;
+use byteorder::{ByteOrder, LittleEndian};
+
+
+
+
+
+
+
+use tracing::*;
+
+
+#[cfg(feature = "testing")]
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+
+
+
+use pageserver_api::key::{key_to_rel_block, key_to_slru_block, Key};
+use pageserver_api::reltag::SlruKind;
+use postgres_ffi::pg_constants;
+use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
+use postgres_ffi::v14::nonrelfile_utils::{
+ mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
+ transaction_id_set_status,
+};
+use postgres_ffi::BLCKSZ;
+
+/// Can this request be served by neon redo functions
+/// or we need to pass it to wal-redo postgres process?
+pub(crate) fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
+ // Currently, we don't have bespoken Rust code to replay any
+ // Postgres WAL records. But everything else is handled in neon.
+ #[allow(clippy::match_like_matches_macro)]
+ match rec {
+ NeonWalRecord::Postgres {
+ will_init: _,
+ rec: _,
+ } => false,
+ _ => true,
+ }
+}
+
+pub(crate) fn apply_in_neon(
+ record: &NeonWalRecord,
+ key: Key,
+ page: &mut BytesMut,
+) -> Result<(), anyhow::Error> {
+ Ok(match record {
+ NeonWalRecord::Postgres {
+ will_init: _,
+ rec: _,
+ } => {
+ anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
+ }
+ NeonWalRecord::ClearVisibilityMapFlags {
+ new_heap_blkno,
+ old_heap_blkno,
+ flags,
+ } => {
+ // sanity check that this is modifying the correct relation
+ let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
+ assert!(
+ rel.forknum == VISIBILITYMAP_FORKNUM,
+ "ClearVisibilityMapFlags record on unexpected rel {}",
+ rel
+ );
+ if let Some(heap_blkno) = *new_heap_blkno {
+ // Calculate the VM block and offset that corresponds to the heap block.
+ let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
+ let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
+ let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
+
+ // Check that we're modifying the correct VM block.
+ assert!(map_block == blknum);
+
+ // equivalent to PageGetContents(page)
+ let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
+
+ map[map_byte as usize] &= !(flags << map_offset);
+ }
+
+ // Repeat for 'old_heap_blkno', if any
+ if let Some(heap_blkno) = *old_heap_blkno {
+ let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
+ let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
+ let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
+
+ assert!(map_block == blknum);
+
+ let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
+
+ map[map_byte as usize] &= !(flags << map_offset);
+ }
+ }
+ // Non-relational WAL records are handled here, with custom code that has the
+ // same effects as the corresponding Postgres WAL redo function.
+ NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
+ let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
+ assert_eq!(
+ slru_kind,
+ SlruKind::Clog,
+ "ClogSetCommitted record with unexpected key {}",
+ key
+ );
+ for &xid in xids {
+ let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
+ let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
+ let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
+
+ // Check that we're modifying the correct CLOG block.
+ assert!(
+ segno == expected_segno,
+ "ClogSetCommitted record for XID {} with unexpected key {}",
+ xid,
+ key
+ );
+ assert!(
+ blknum == expected_blknum,
+ "ClogSetCommitted record for XID {} with unexpected key {}",
+ xid,
+ key
+ );
+
+ transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_COMMITTED, page);
+ }
+
+ // Append the timestamp
+ if page.len() == BLCKSZ as usize + 8 {
+ page.truncate(BLCKSZ as usize);
+ }
+ if page.len() == BLCKSZ as usize {
+ page.extend_from_slice(×tamp.to_be_bytes());
+ } else {
+ warn!(
+ "CLOG blk {} in seg {} has invalid size {}",
+ blknum,
+ segno,
+ page.len()
+ );
+ }
+ }
+ NeonWalRecord::ClogSetAborted { xids } => {
+ let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
+ assert_eq!(
+ slru_kind,
+ SlruKind::Clog,
+ "ClogSetAborted record with unexpected key {}",
+ key
+ );
+ for &xid in xids {
+ let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
+ let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
+ let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
+
+ // Check that we're modifying the correct CLOG block.
+ assert!(
+ segno == expected_segno,
+ "ClogSetAborted record for XID {} with unexpected key {}",
+ xid,
+ key
+ );
+ assert!(
+ blknum == expected_blknum,
+ "ClogSetAborted record for XID {} with unexpected key {}",
+ xid,
+ key
+ );
+
+ transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
+ }
+ }
+ NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
+ let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
+ assert_eq!(
+ slru_kind,
+ SlruKind::MultiXactOffsets,
+ "MultixactOffsetCreate record with unexpected key {}",
+ key
+ );
+ // Compute the block and offset to modify.
+ // See RecordNewMultiXact in PostgreSQL sources.
+ let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
+ let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
+ let offset = (entryno * 4) as usize;
+
+ // Check that we're modifying the correct multixact-offsets block.
+ let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
+ let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
+ assert!(
+ segno == expected_segno,
+ "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
+ mid,
+ key
+ );
+ assert!(
+ blknum == expected_blknum,
+ "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
+ mid,
+ key
+ );
+
+ LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
+ }
+ NeonWalRecord::MultixactMembersCreate { moff, members } => {
+ let (slru_kind, segno, blknum) = key_to_slru_block(key).context("invalid record")?;
+ assert_eq!(
+ slru_kind,
+ SlruKind::MultiXactMembers,
+ "MultixactMembersCreate record with unexpected key {}",
+ key
+ );
+ for (i, member) in members.iter().enumerate() {
+ let offset = moff + i as u32;
+
+ // Compute the block and offset to modify.
+ // See RecordNewMultiXact in PostgreSQL sources.
+ let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
+ let memberoff = mx_offset_to_member_offset(offset);
+ let flagsoff = mx_offset_to_flags_offset(offset);
+ let bshift = mx_offset_to_flags_bitshift(offset);
+
+ // Check that we're modifying the correct multixact-members block.
+ let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
+ let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
+ assert!(
+ segno == expected_segno,
+ "MultiXactMembersCreate record for offset {} with unexpected key {}",
+ moff,
+ key
+ );
+ assert!(
+ blknum == expected_blknum,
+ "MultiXactMembersCreate record for offset {} with unexpected key {}",
+ moff,
+ key
+ );
+
+ let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
+ flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
+ flagsval |= member.status << bshift;
+ LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
+ LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
+ }
+ }
+ })
+}