diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index d8331f7dfb..d53c85be92 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -160,9 +160,7 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { // Move the initial WAL file fs::rename( - tmppath - .join("pg_wal") - .join("000000010000000000000001"), + tmppath.join("pg_wal").join("000000010000000000000001"), timelinedir .join("wal") .join("000000010000000000000001.partial"), diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 98b120b554..25c2ca54c5 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -25,7 +25,10 @@ fn test_embedded_wal_proposer() { // start postgres let maintli = storage_cplane.get_branch_timeline("main"); let node = compute_cplane.new_test_master_node(maintli); - node.append_conf("postgresql.conf", &format!("wal_acceptors='{}'\n", wal_acceptors)); + node.append_conf( + "postgresql.conf", + &format!("wal_acceptors='{}'\n", wal_acceptors), + ); node.start().unwrap(); // check basic work with table diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index aa1799d6fe..2882c671d3 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -33,6 +33,7 @@ use crate::page_cache::RelTag; use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; +use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; use zenith_utils::lsn::Lsn; @@ -170,7 +171,17 @@ fn restore_snapshot( } } } - + for entry in fs::read_dir(snapshotpath.join("pg_xact"))? { + let entry = entry?; + restore_nonrelfile( + conf, + pcache, + timeline, + snapshot, + pg_constants::PG_XACT_FORKNUM, + &entry.path(), + )?; + } // TODO: Scan pg_tblspc Ok(()) @@ -241,6 +252,64 @@ fn restore_relfile( Ok(()) } +fn restore_nonrelfile( + _conf: &PageServerConf, + pcache: &PageCache, + _timeline: ZTimelineId, + snapshot: &str, + forknum: u32, + path: &Path, +) -> Result<()> { + let lsn = Lsn::from_hex(snapshot)?; + + // Does it look like a relation file? + + let mut file = File::open(path)?; + let mut buf: [u8; 8192] = [0u8; 8192]; + let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; + + // FIXME: use constants (BLCKSZ) + let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; + loop { + let r = file.read_exact(&mut buf); + match r { + Ok(_) => { + let tag = BufferTag { + rel: RelTag { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: forknum as u8, + }, + blknum, + }; + pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); + /* + if oldest_lsn == 0 || p.lsn < oldest_lsn { + oldest_lsn = p.lsn; + } + */ + } + + // TODO: UnexpectedEof is expected + Err(e) => match e.kind() { + std::io::ErrorKind::UnexpectedEof => { + // reached EOF. That's expected. + // FIXME: maybe check that we read the full length of the file? + break; + } + _ => { + error!("error reading file: {:?} ({})", path, e); + break; + } + }, + }; + blknum += 1; + } + + Ok(()) +} + // Scan WAL on a timeline, starting from gien LSN, and load all the records // into the page cache. fn restore_wal( @@ -312,7 +381,6 @@ fn restore_wal( }, blknum: blk.blkno, }; - let rec = page_cache::WALRecord { lsn, will_init: blk.will_init || blk.apply_image, diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 054721db4d..23ad80c0a4 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -619,6 +619,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::PG_XACT_FORKNUM as u8; blk.blkno = buf.get_i32_le() as u32; + blk.will_init = true; trace!("RM_CLOG_ID updates block {}", blk.blkno); blocks.push(blk); } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c21629850a..dbb23d6547 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -242,11 +242,13 @@ impl WalRedoManagerInternal { let apply_result: Result; if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { - //TODO use base image if any - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let zero_page_bytes: &[u8] = &ZERO_PAGE; - let mut page = BytesMut::from(zero_page_bytes); - + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let mut page = BytesMut::new(); + if let Some(fpi) = base_img { + page.extend_from_slice(&fpi[..]); + } else { + page.extend_from_slice(&ZERO_PAGE); + } for record in records { let mut buf = record.rec.clone(); @@ -265,7 +267,7 @@ impl WalRedoManagerInternal { if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { - page.clone_from_slice(zero_page_bytes); + page.clone_from_slice(&ZERO_PAGE); } } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 4acfa7a7ce..b9e559da12 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -29,6 +29,9 @@ pub const CLOG_TRUNCATE: u8 = 0x10; pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_ABORT: u8 = 0x20; +// From srlu.h +pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; + /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ diff --git a/vendor/postgres b/vendor/postgres index 82703aaa29..eb52f7f49f 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 82703aaa2924566d8ee336fcc5e2069ea6e2a4ce +Subproject commit eb52f7f49f7ea0b7abc39ba27bce309bef3e3a86