diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 0b438dc9dd..2b41363ff9 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,6 @@ pub mod rocksdb; -use crate::waldecoder::Oid; +use crate::waldecoder::{Oid, DecodedWALRecord}; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -47,6 +47,13 @@ pub trait Timeline { src_tablespace_id: Oid, ) -> Result<()>; + fn save_decoded_record( + &self, + decoded: DecodedWALRecord, + recdata: Bytes, + lsn: Lsn + ) -> anyhow::Result<()>; + fn advance_last_valid_lsn(&self, lsn: Lsn); fn get_last_valid_lsn(&self) -> Lsn; fn init_valid_lsn(&self, lsn: Lsn); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 2127569b2d..4558b3e9eb 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -7,7 +7,7 @@ use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; use crate::restore_local_repo::restore_timeline; -use crate::waldecoder::Oid; +use crate::waldecoder::{Oid, DecodedWALRecord, XlSmgrTruncate, XlCreateDatabase}; use crate::walredo::WalRedoManager; use crate::ZTimelineId; use crate::{zenith_repo_dir, PageServerConf}; @@ -24,6 +24,8 @@ use std::thread; use std::time::{Duration, Instant}; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; +use postgres_ffi::pg_constants; + // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -794,6 +796,68 @@ impl Timeline for RocksTimeline { Ok(()) } + // Put the WAL record to the page cache. We make a separate copy of + // it for every block it modifies. + fn save_decoded_record( + &self, + decoded: DecodedWALRecord, + recdata: Bytes, + lsn: Lsn) -> anyhow::Result<()> + { + for blk in decoded.blocks.iter() { + let tag = BufferTag { + rel: RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, + }, + blknum: blk.blkno, + }; + + let rec = WALRecord { + lsn, + will_init: blk.will_init || blk.apply_image, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + + self.put_wal_record(tag, rec); + } + // include truncate wal record in all pages + if decoded.xl_rmid == pg_constants::RM_SMGR_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_SMGR_TRUNCATE + { + let truncate = XlSmgrTruncate::decode(&decoded); + if (truncate.flags & pg_constants::SMGR_TRUNCATE_HEAP) != 0 { + let rel = RelTag { + spcnode: truncate.rnode.spcnode, + dbnode: truncate.rnode.dbnode, + relnode: truncate.rnode.relnode, + forknum: pg_constants::MAIN_FORKNUM, + }; + self.put_truncation(rel, lsn, truncate.blkno)?; + } + } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID + && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == pg_constants::XLOG_DBASE_CREATE + { + let createdb = XlCreateDatabase::decode(&decoded); + self.create_database( + lsn, + createdb.db_id, + createdb.tablespace_id, + createdb.src_db_id, + createdb.src_tablespace_id, + )?; + } + // Now that this record has been handled, let the page cache know that + // it is up-to-date to this LSN + self.advance_last_record_lsn(lsn); + Ok(()) + } + /// Remember that WAL has been received and added to the timeline up to the given LSN fn advance_last_valid_lsn(&self, lsn: Lsn) { let old = self.last_valid_lsn.advance(lsn); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 55049b39cf..ce43c1efc5 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -26,7 +26,7 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use bytes::Bytes; -use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; +use crate::repository::{BufferTag, RelTag, Timeline}; use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; @@ -353,32 +353,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn } if let Some((lsn, recdata)) = rec.unwrap() { let decoded = decode_wal_record(recdata.clone()); - // Put the WAL record to the page cache. We make a separate copy of - // it for every block it modifies. (The actual WAL record is kept in - // a Bytes, which uses a reference counter for the underlying buffer, - // so having multiple copies of it doesn't cost that much) - for blk in decoded.blocks.iter() { - let tag = BufferTag { - rel: RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }, - blknum: blk.blkno, - }; - let rec = WALRecord { - lsn, - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - - timeline.put_wal_record(tag, rec); - } - // Now that this record has been handled, let the page cache know that - // it is up-to-date to this LSN - timeline.advance_last_valid_lsn(lsn); + timeline.save_decoded_record(decoded, recdata, lsn)?; last_lsn = lsn; } else { break; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 23ad80c0a4..9a0ab3add0 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -309,8 +309,6 @@ pub struct DecodedWALRecord { pub type Oid = u32; pub type BlockNumber = u32; -pub const MAIN_FORKNUM: u8 = 0; -pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; #[repr(C)] #[derive(Debug, Clone, Copy)] diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 97002ceac4..3a371a8af4 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,14 +7,13 @@ //! use crate::page_cache; -use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; +use crate::repository::{Repository, Timeline}; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; use anyhow::Error; use lazy_static::lazy_static; use log::*; -use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; @@ -219,61 +218,7 @@ fn walreceiver_main( loop { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { let decoded = decode_wal_record(recdata.clone()); - // Put the WAL record to the page cache. We make a separate copy of - // it for every block it modifies. (The actual WAL record is kept in - // a Bytes, which uses a reference counter for the underlying buffer, - // so having multiple copies of it doesn't cost that much) - for blk in decoded.blocks.iter() { - let tag = BufferTag { - rel: RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }, - blknum: blk.blkno, - }; - - let rec = WALRecord { - lsn, - will_init: blk.will_init || blk.apply_image, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - - timeline.put_wal_record(tag, rec); - } - // include truncate wal record in all pages - if decoded.xl_rmid == pg_constants::RM_SMGR_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_SMGR_TRUNCATE - { - let truncate = XlSmgrTruncate::decode(&decoded); - if (truncate.flags & SMGR_TRUNCATE_HEAP) != 0 { - let rel = RelTag { - spcnode: truncate.rnode.spcnode, - dbnode: truncate.rnode.dbnode, - relnode: truncate.rnode.relnode, - forknum: MAIN_FORKNUM, - }; - timeline.put_truncation(rel, lsn, truncate.blkno)?; - } - } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID - && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_DBASE_CREATE - { - let createdb = XlCreateDatabase::decode(&decoded); - timeline.create_database( - lsn, - createdb.db_id, - createdb.tablespace_id, - createdb.src_db_id, - createdb.src_tablespace_id, - )?; - } - // Now that this record has been handled, let the page cache know that - // it is up-to-date to this LSN - timeline.advance_last_record_lsn(lsn); + timeline.save_decoded_record(decoded, recdata, lsn)?; } else { break; } diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index b9e559da12..392270bf41 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -98,3 +98,7 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ + + +pub const MAIN_FORKNUM: u8 = 0; +pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; diff --git a/vendor/postgres b/vendor/postgres index 4fa86c7de2..98cdb40a5e 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 4fa86c7de2f1ba6ba0085bdc9d3fb4836b65761a +Subproject commit 98cdb40a5ecfe6169be0695679efda83c2bd43c3