diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 30e9486446..4470e2dfbf 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -256,7 +256,6 @@ impl PostgresNode { // new data directory pub fn init_from_page_server(&self) -> Result<()> { let pgdata = self.pgdata(); - println!( "Extracting base backup to create postgres instance: path={} port={}", pgdata.display(), @@ -348,7 +347,6 @@ impl PostgresNode { fs::create_dir_all(self.pgdata().join("pg_wal"))?; fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; self.pg_resetwal(&["-f"])?; - Ok(()) } diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 8989cecdf2..a82883ea2c 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::time::SystemTime; use std::{thread, time}; -use control_plane::compute::ComputeControlPlane; +use control_plane::compute::{PostgresNode, ComputeControlPlane}; use integration_tests; use integration_tests::PostgresNodeExt; @@ -26,7 +26,6 @@ fn start_node_with_wal_proposer( } #[test] -//#[ignore] fn test_embedded_wal_proposer() { let local_env = integration_tests::create_test_env("test_embedded_wal_proposer"); diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 821cf049e6..1ec344a569 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -149,6 +149,7 @@ fn add_pgcontrol_file( let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; checkpoint.redo = lsn.0; + checkpoint.nextXid.value += 1; // TODO: When we restart master there are no active transaction and oldestXid is // equal to nextXid if there are no prepared transactions. // Let's ignore them for a while... diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 107b8dfceb..1dc49d2911 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -8,7 +8,7 @@ use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use fs::File; use fs_extra; -use postgres_ffi::xlog_utils; +use postgres_ffi::{pg_constants, xlog_utils}; use rand::Rng; use serde::{Deserialize, Serialize}; use std::env; @@ -99,9 +99,6 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; - force_crash_recovery(&tmppath)?; - println!("updated pg_control"); - let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename(tmppath, &target)?; @@ -229,7 +226,7 @@ pub(crate) fn create_branch( &oldtimelinedir.join("wal"), &newtimelinedir.join("wal"), startpoint.lsn, - 16 * 1024 * 1024, // FIXME: assume default WAL segment size + pg_constants::WAL_SEGMENT_SIZE, )?; Ok(BranchInfo { @@ -310,31 +307,6 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } -// If control file says the cluster was shut down cleanly, modify it, to mark -// it as crashed. That forces crash recovery when you start the cluster. -// -// FIXME: -// We currently do this to the initial snapshot in "zenith init". It would -// be more natural to do this when the snapshot is restored instead, but we -// currently don't have any code to create new snapshots, so it doesn't matter -// Or better yet, use a less hacky way of putting the cluster into recovery. -// Perhaps create a backup label file in the data directory when it's restored. -fn force_crash_recovery(datadir: &Path) -> Result<()> { - // Read in the control file - let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); - let mut controlfile = - postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?; - - controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; - - fs::write( - controlfilepath.as_path(), - postgres_ffi::encode_pg_control(controlfile), - )?; - - Ok(()) -} - fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { // Create initial timeline let mut tli_buf = [0u8; 16]; @@ -361,7 +333,7 @@ fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Resu /// If the given LSN is in the middle of a segment, the last segment containing it /// is written out as .partial, and padded with zeros. /// -fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()> { +fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: usize) -> Result<()> { let last_segno = upto.segment_number(wal_seg_size); let last_segoff = upto.segment_offset(wal_seg_size); @@ -391,11 +363,11 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Res let src_file = File::open(entry.path())?; let mut dst_file = File::create(dst_dir.join(&dst_fname))?; - std::io::copy(&mut src_file.take(copylen), &mut dst_file)?; + std::io::copy(&mut src_file.take(copylen as u64), &mut dst_file)?; if copylen < wal_seg_size { std::io::copy( - &mut std::io::repeat(0).take(wal_seg_size - copylen), + &mut std::io::repeat(0).take((wal_seg_size - copylen) as u64), &mut dst_file, )?; } @@ -407,7 +379,7 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Res // Find the end of valid WAL in a wal directory pub fn find_end_of_wal(conf: &PageServerConf, timeline: ZTimelineId) -> Result { let waldir = conf.timeline_path(timeline).join("wal"); - let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true); + let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, pg_constants::WAL_SEGMENT_SIZE, true); Ok(Lsn(lsn)) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c46c7d3bf8..a06e6b2441 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -925,7 +925,7 @@ impl Connection { // find latest snapshot let snapshot_lsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); - let req_lsn = lsn.unwrap_or(snapshot_lsn); + let req_lsn = lsn.unwrap_or(timeline.get_last_valid_lsn()); basebackup::send_tarball_at_lsn( &mut CopyDataSink { stream }, timelineid, diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 0d49166efb..2550b46fd0 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -707,11 +707,11 @@ impl Timeline for RocksTimeline { /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations /// but can be also applied to normal relations. fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> { - let lsn = self.wait_lsn(lsn)?; + let _lsn = self.wait_lsn(lsn)?; let mut key = CacheKey { // minimal key to start with tag: BufferTag { rel, blknum: 0 }, - lsn, + lsn: Lsn(0), }; let mut iter = self.db.raw_iterator(); iter.seek(key.to_bytes()); // locate first entry @@ -817,12 +817,8 @@ impl Timeline for RocksTimeline { /// fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result> { let key = CacheKey { tag, lsn }; - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - if let Some(bytes) = self.db.get(&key_buf[..])? { - let mut buf = BytesMut::new(); - buf.extend_from_slice(&bytes); - let content = CacheEntryContent::unpack(&mut buf); + if let Some(bytes) = self.db.get(key.to_bytes())? { + let content = CacheEntryContent::from_slice(&bytes); if let CacheEntryContent::PageImage(img) = content { return Ok(Some(img)); } diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 6a77c080d3..47c3b77116 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -454,6 +454,8 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn if rec.is_err() { // Assume that an error means we've reached the end of // a partial WAL record. So that's ok. + trace!("WAL decoder error {:?}", rec); + waldecoder.set_position(Lsn((segno + 1) * pg_constants::WAL_SEGMENT_SIZE as u64)); break; } if let Some((lsn, recdata)) = rec.unwrap() { @@ -466,7 +468,10 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn nrecords += 1; } - info!("restored {} records from WAL file {}", nrecords, filename); + info!( + "restored {} records from WAL file {} at {}", + nrecords, filename, last_lsn + ); segno += 1; offset = 0; diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index fb316275e6..8ab7c2e507 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -84,6 +84,13 @@ impl WalStreamDecoder { } } + pub fn set_position(&mut self, lsn: Lsn) { + self.lsn = lsn; + self.contlen = 0; + self.padlen = 0; + self.inputbuf.clear(); + } + pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } @@ -957,7 +964,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW trace!("XLOG_TBLSPC_DROP is not handled yet"); } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; if info == pg_constants::XLOG_HEAP_INSERT { let xlrec = XlHeapInsert::decode(&mut buf); @@ -1011,7 +1018,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { let xlrec = XlHeapMultiInsert::decode(&mut buf); if (xlrec.flags @@ -1030,7 +1037,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } } } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { let mut blk = DecodedBkpBlock::new(); blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; @@ -1117,6 +1124,14 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blk.rnode_relnode = 0; blk.will_init = true; blocks.push(blk); + } else if xlogrec.xl_rmid == pg_constants::RM_XLOG_ID { + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if next_oid > checkpoint.nextOid { + checkpoint.nextOid = next_oid; + } + } } DecodedWALRecord { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e8992f944f..b0744afd18 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -196,6 +196,7 @@ fn walreceiver_main( while let Some((lsn, recdata)) = waldecoder.poll_decode()? { let old_checkpoint_bytes = encode_checkpoint(checkpoint); + //info!("Decode WAL record at LSN {}", lsn); let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f3a756ea7b..a287ddd6b1 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -293,6 +293,10 @@ impl PostgresRedoManagerInternal { let mut status = 0; if info == pg_constants::XLOG_XACT_COMMIT { status = pg_constants::TRANSACTION_STATUS_COMMITTED; + info!( + "Mark transaction {} as committed at LSN {}", + xlogrec.xl_xid, lsn + ); transaction_id_set_status(xlogrec.xl_xid, status, &mut page); //handle subtrans let _xact_time = buf.get_i64_le(); @@ -350,7 +354,7 @@ impl PostgresRedoManagerInternal { record.main_data_offset, record.rec.len()); } } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { page.copy_from_slice(&ZERO_PAGE); } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 2a8b5c4879..29d07b8c0c 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -83,6 +83,7 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // From pg_control.h and rmgrlist.h +pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;