diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 09832905e7..30e9486446 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -345,6 +345,10 @@ impl PostgresNode { ), ); + fs::create_dir_all(self.pgdata().join("pg_wal"))?; + fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; + self.pg_resetwal(&["-f"])?; + Ok(()) } @@ -401,6 +405,19 @@ impl PostgresNode { Ok(()) } + fn pg_resetwal(&self, args: &[&str]) -> Result<()> { + let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); + + let pg_ctl = Command::new(pg_resetwal_path) + .args([&["-D", self.pgdata().to_str().unwrap()], args].concat()) + .status() + .with_context(|| "pg_resetwal failed")?; + if !pg_ctl.success() { + anyhow::bail!("pg_resetwal failed"); + } + Ok(()) + } + pub fn start(&self) -> Result<()> { println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl(&["start"]) diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 469cd9e11c..8989cecdf2 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -11,6 +11,20 @@ use integration_tests::TestStorageControlPlane; const DOWNTIME: u64 = 2; +fn start_node_with_wal_proposer( + timeline: &str, + compute_cplane: &mut ComputeControlPlane, + wal_acceptors: &String, +) -> Arc { + let node = compute_cplane.new_test_master_node(timeline); + node.append_conf( + "postgresql.conf", + &format!("wal_acceptors='{}'\n", wal_acceptors), + ); + node.start().unwrap(); + node +} + #[test] //#[ignore] fn test_embedded_wal_proposer() { @@ -22,12 +36,7 @@ fn test_embedded_wal_proposer() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_test_master_node("main"); - node.append_conf( - "postgresql.conf", - &format!("wal_acceptors='{}'\n", wal_acceptors), - ); - node.start().unwrap(); + let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); // check basic work with table node.safe_psql( @@ -58,11 +67,7 @@ fn test_acceptors_normal_work() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_test_master_node("main"); - node.start().unwrap(); - - // start proxy - let _proxy = node.start_proxy(&wal_acceptors); + let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); // check basic work with table node.safe_psql( @@ -111,10 +116,8 @@ fn test_many_timelines() { // start postgres on each timeline let mut nodes = Vec::new(); for tli_name in timelines { - let node = compute_cplane.new_test_node(&tli_name); + let node = start_node_with_wal_proposer(&tli_name, &mut compute_cplane, &wal_acceptors); nodes.push(node.clone()); - node.start().unwrap(); - node.start_proxy(&wal_acceptors); } // create schema @@ -160,11 +163,8 @@ fn test_acceptors_restarts() { let mut rng = rand::thread_rng(); // start postgres - let node = compute_cplane.new_test_master_node("main"); - node.start().unwrap(); + let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); - // start proxy - let _proxy = node.start_proxy(&wal_acceptors); let mut failed_node: Option = None; // check basic work with table @@ -220,11 +220,7 @@ fn test_acceptors_unavailability() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_test_master_node("main"); - node.start().unwrap(); - - // start proxy - let _proxy = node.start_proxy(&wal_acceptors); + let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); // check basic work with table node.safe_psql( @@ -306,11 +302,7 @@ fn test_race_conditions() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_test_master_node("main"); - node.start().unwrap(); - - // start proxy - let _proxy = node.start_proxy(&wal_acceptors); + let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); // check basic work with table node.safe_psql( diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index cacd5c515f..821cf049e6 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -7,8 +7,8 @@ use tar::{Builder, Header}; use walkdir::WalkDir; use crate::repository::{BufferTag, RelTag, Timeline}; -use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; +use postgres_ffi::*; use zenith_utils::lsn::Lsn; fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ @@ -130,6 +130,38 @@ fn add_twophase_files( Ok(()) } +// +// Add generated pg_control file +// +fn add_pgcontrol_file( + ar: &mut Builder<&mut dyn Write>, + timeline: &Arc, + lsn: Lsn, +) -> anyhow::Result<()> { + if let Some(checkpoint_bytes) = + timeline.get_page_image(BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM), Lsn(0))? + { + if let Some(pg_control_bytes) = timeline.get_page_image( + BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM), + Lsn(0), + )? { + let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; + let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; + + checkpoint.redo = lsn.0; + // TODO: When we restart master there are no active transaction and oldestXid is + // equal to nextXid if there are no prepared transactions. + // Let's ignore them for a while... + checkpoint.oldestXid = checkpoint.nextXid.value as u32; + pg_control.checkPointCopy = checkpoint; + let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control); + let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; + ar.append(&header, &pg_control_bytes[..])?; + } + } + Ok(()) +} + /// /// Generate tarball with non-relational files from repository /// @@ -143,7 +175,6 @@ pub fn send_tarball_at_lsn( let mut ar = Builder::new(write); let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0); - let walpath = format!("timelines/{}/wal", timelineid); debug!("sending tarball of snapshot in {}", snappath); for entry in WalkDir::new(&snappath) { @@ -171,6 +202,7 @@ pub fn send_tarball_at_lsn( ar.append_path_with_name(fullpath, relpath)?; } else if !is_rel_file_path(relpath.to_str().unwrap()) { if entry.file_name() != "pg_filenode.map" + && entry.file_name() != "pg_control" && !relpath.starts_with("pg_xact/") && !relpath.starts_with("pg_multixact/") { @@ -208,28 +240,8 @@ pub fn send_tarball_at_lsn( )?; add_relmap_files(&mut ar, timeline, lsn)?; add_twophase_files(&mut ar, timeline, lsn)?; + add_pgcontrol_file(&mut ar, timeline, lsn)?; - // FIXME: Also send all the WAL. The compute node would only need - // the WAL that applies to non-relation files, because the page - // server handles all the relation files. But we don't have a - // mechanism for separating relation and non-relation WAL at the - // moment. - for entry in std::fs::read_dir(&walpath)? { - let entry = entry?; - let fullpath = &entry.path(); - let relpath = fullpath.strip_prefix(&walpath).unwrap(); - - if !entry.path().is_file() { - continue; - } - - let archive_fname = relpath.to_str().unwrap(); - let archive_fname = archive_fname - .strip_suffix(".partial") - .unwrap_or(&archive_fname); - let archive_path = "pg_wal/".to_owned() + archive_fname; - ar.append_path_with_name(fullpath, archive_path)?; - } ar.finish()?; debug!("all tarred up!"); Ok(()) diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index c24ae17d72..d9e8d2b393 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -48,6 +48,9 @@ pub trait Timeline { /// Does relation exist? fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result; + /// Get page image at the particular LSN + fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result>; + //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. // @@ -64,7 +67,7 @@ pub trait Timeline { fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes); /// Truncate relation - fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()>; + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; /// Create a new database from a template database /// @@ -89,7 +92,7 @@ pub trait Timeline { decoded: DecodedWALRecord, recdata: Bytes, lsn: Lsn, - ) -> anyhow::Result<()> { + ) -> Result<()> { // Figure out which blocks the record applies to, and "put" a separate copy // of the record for each block. for blk in decoded.blocks.iter() { @@ -233,6 +236,18 @@ pub struct BufferTag { } impl BufferTag { + pub fn fork(forknum: u8) -> BufferTag { + BufferTag { + rel: RelTag { + forknum, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: 0, + } + } + pub fn pack(&self, buf: &mut BytesMut) { self.rel.pack(buf); buf.put_u32(self.blknum); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 821a25351d..0d49166efb 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -16,7 +16,8 @@ use crate::ZTimelineId; use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; -use postgres_ffi::pg_constants; +use postgres_ffi::nonrelfile_utils::transaction_id_get_status; +use postgres_ffi::*; use std::cmp::min; use std::collections::HashMap; use std::convert::TryInto; @@ -340,7 +341,7 @@ impl RocksTimeline { // // The caller must ensure that WAL has been received up to 'lsn'. // - fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> anyhow::Result { + fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result { assert!(lsn <= self.last_valid_lsn.load()); let mut key = CacheKey { @@ -375,7 +376,8 @@ impl RocksTimeline { Ok(0) } - fn do_gc(&self, conf: &'static PageServerConf) -> anyhow::Result { + + fn do_gc(&self, conf: &'static PageServerConf) -> Result { loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); @@ -529,7 +531,7 @@ impl RocksTimeline { // // Wait until WAL has been received up to the given LSN. // - fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + fn wait_lsn(&self, mut lsn: Lsn) -> Result { // When invalid LSN is requested, it means "don't wait, return latest version of the page" // This is necessary for bootstrap. if lsn == Lsn(0) { @@ -541,7 +543,7 @@ impl RocksTimeline { ); lsn = last_valid_lsn; } - + //trace!("Start waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); self.last_valid_lsn .wait_for_timeout(lsn, TIMEOUT) .with_context(|| { @@ -550,6 +552,7 @@ impl RocksTimeline { lsn ) })?; + //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); Ok(lsn) } @@ -646,7 +649,21 @@ impl Timeline for RocksTimeline { break; // we are done with this fork } if key.lsn <= lsn { - gxacts.push(key.tag.blknum); // XID + let xid = key.tag.blknum; + let tag = BufferTag { + rel: RelTag { + forknum: pg_constants::PG_XACT_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE, + }; + let clog_page = self.get_page_at_lsn(tag, lsn)?; + let status = transaction_id_get_status(xid, &clog_page[..]); + if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { + gxacts.push(xid); + } } iter.next(); } @@ -772,7 +789,7 @@ impl Timeline for RocksTimeline { /// Adds a relation-wide WAL record (like truncate) to the repository, /// associating it with all pages started with specified block number /// - fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> anyhow::Result<()> { + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> { // What was the size of the relation before this record? let last_lsn = self.last_valid_lsn.load(); let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?; @@ -795,6 +812,24 @@ impl Timeline for RocksTimeline { Ok(()) } + /// + /// Get page image at particular LSN + /// + fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result> { + let key = CacheKey { tag, lsn }; + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + if let Some(bytes) = self.db.get(&key_buf[..])? { + let mut buf = BytesMut::new(); + buf.extend_from_slice(&bytes); + let content = CacheEntryContent::unpack(&mut buf); + if let CacheEntryContent::PageImage(img) = content { + return Ok(Some(img)); + } + } + return Ok(None); + } + /// /// Memorize a full image of a page version /// @@ -835,7 +870,7 @@ impl Timeline for RocksTimeline { tablespace_id: Oid, src_db_id: Oid, src_tablespace_id: Oid, - ) -> anyhow::Result<()> { + ) -> Result<()> { let key = CacheKey { tag: BufferTag { rel: RelTag { @@ -874,6 +909,7 @@ impl Timeline for RocksTimeline { /// Remember that WAL has been received and added to the timeline up to the given LSN fn advance_last_valid_lsn(&self, lsn: Lsn) { + let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes let old = self.last_valid_lsn.advance(lsn); // Can't move backwards. @@ -891,7 +927,8 @@ impl Timeline for RocksTimeline { /// NOTE: this updates last_valid_lsn as well. /// fn advance_last_record_lsn(&self, lsn: Lsn) { - // Can't move backwards. + let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes + // Can't move backwards. let old = self.last_record_lsn.fetch_max(lsn); assert!(old <= lsn); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 0824245451..6a77c080d3 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -26,9 +26,9 @@ use crate::repository::{BufferTag, RelTag, Timeline}; use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; -use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use zenith_utils::lsn::Lsn; /// @@ -124,8 +124,8 @@ fn restore_snapshot( conf, timeline, timelineid, - snapshot, - pg_constants::GLOBALTABLESPACE_OID, + "0", + 0, 0, pg_constants::PG_CONTROLFILE_FORKNUM, 0, @@ -403,13 +403,23 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn let mut waldecoder = WalStreamDecoder::new(startpoint); - const SEG_SIZE: u64 = 16 * 1024 * 1024; - let mut segno = startpoint.segment_number(SEG_SIZE); - let mut offset = startpoint.segment_offset(SEG_SIZE); + let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = Lsn(0); + + let mut checkpoint = CheckPoint::new(startpoint.0, 1); + let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM); + let pg_control_tag = BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM); + if let Some(pg_control_bytes) = timeline.get_page_image(pg_control_tag, Lsn(0))? { + let pg_control = decode_pg_control(pg_control_bytes)?; + checkpoint = pg_control.checkPointCopy.clone(); + } else { + error!("No control file is found in reposistory"); + } + loop { // FIXME: assume postgresql tli 1 for now - let filename = XLogFileName(1, segno, 16 * 1024 * 1024); + let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); let mut path = walpath.clone() + "/" + &filename; // It could be as .partial @@ -432,7 +442,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn let mut buf = Vec::new(); let nread = file.read_to_end(&mut buf)?; - if nread != 16 * 1024 * 1024 - offset as usize { + if nread != pg_constants::WAL_SEGMENT_SIZE - offset as usize { // Maybe allow this for .partial files? error!("read only {} bytes from WAL file", nread); } @@ -447,7 +457,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = decode_wal_record(recdata.clone()); + let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; last_lsn = lsn; } else { @@ -462,6 +472,7 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn offset = 0; } info!("reached end of WAL at {}", last_lsn); - + let checkpoint_bytes = encode_checkpoint(checkpoint); + timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes); Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 352de3451f..fb316275e6 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,25 +1,32 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; -use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::XLogRecord; +use postgres_ffi::*; use std::cmp::min; use std::str; use thiserror::Error; use zenith_utils::lsn::Lsn; -// FIXME: this is configurable in PostgreSQL, 16 MB is the default -const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024; +pub type Oid = u32; +pub type TransactionId = u32; +pub type BlockNumber = u32; +pub type OffsetNumber = u16; +pub type MultiXactId = TransactionId; +pub type MultiXactOffset = u32; +pub type MultiXactStatus = u32; +pub type TimeLineID = u32; +pub type PgTime = i64; // From PostgreSQL headers #[repr(C)] #[derive(Debug)] pub struct XLogPageHeaderData { - xlp_magic: u16, /* magic value for correctness checks */ - xlp_info: u16, /* flag bits, see below */ - xlp_tli: u32, /* TimeLineID of first record on page */ - xlp_pageaddr: u64, /* XLOG address of this page */ - xlp_rem_len: u32, /* total len of remaining data for record */ + xlp_magic: u16, /* magic value for correctness checks */ + xlp_info: u16, /* flag bits, see below */ + xlp_tli: TimeLineID, /* TimeLineID of first record on page */ + xlp_pageaddr: u64, /* XLOG address of this page */ + xlp_rem_len: u32, /* total len of remaining data for record */ } // FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end @@ -92,7 +99,7 @@ impl WalStreamDecoder { pub fn poll_decode(&mut self) -> Result, WalDecodeError> { loop { // parse and verify page boundaries as we go - if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { + if self.lsn.segment_offset(pg_constants::WAL_SEGMENT_SIZE) == 0 { // parse long header if self.inputbuf.remaining() < SizeOfXLogLongPHD { @@ -185,7 +192,8 @@ impl WalStreamDecoder { let xlogrec = XLogRecord::from_bytes(&mut buf); if xlogrec.is_xlog_switch_record() { trace!("saw xlog switch record at {}", self.lsn); - self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32; + self.padlen = + self.lsn.calc_padding(pg_constants::WAL_SEGMENT_SIZE as u64) as u32; } else { // Pad to an 8-byte boundary self.padlen = self.lsn.calc_padding(8u32) as u32; @@ -306,14 +314,6 @@ pub struct DecodedWALRecord { pub main_data_offset: usize, } -pub type Oid = u32; -pub type TransactionId = u32; -pub type BlockNumber = u32; -pub type OffsetNumber = u16; -pub type MultiXactId = TransactionId; -pub type MultiXactOffset = u32; -pub type MultiXactStatus = u32; - #[repr(C)] #[derive(Debug, Clone, Copy)] pub struct RelFileNode { @@ -549,7 +549,7 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { +pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; @@ -567,7 +567,12 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { xlogrec.xl_rmid, xlogrec.xl_info ); - + if xlogrec.xl_xid > checkpoint.nextXid.value as u32 { + // TODO: handle XID wraparound + checkpoint.nextXid = FullTransactionId { + value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64, + }; + } let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { @@ -1055,8 +1060,29 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { blk.blkno = blkno; blocks.push(blk); } + if xlrec.mid > checkpoint.nextMulti { + checkpoint.nextMulti = xlrec.mid; + } + if xlrec.moff > checkpoint.nextMultiOffset { + checkpoint.nextMultiOffset = xlrec.moff; + } + let max_xid = xlrec + .members + .iter() + .fold(checkpoint.nextXid.value as u32, |acc, mbr| { + if mbr.xid > acc { + mbr.xid + } else { + acc + } + }); + checkpoint.nextXid = FullTransactionId { + value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | max_xid as u64, + }; } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { let xlrec = XlMultiXactTruncate::decode(&mut buf); + checkpoint.oldestXid = xlrec.end_trunc_off; + checkpoint.oldestMultiDB = xlrec.oldest_multi_db; let first_off_blkno = xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; let last_off_blkno = diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 237bc190d7..e8992f944f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,6 +7,7 @@ //! use crate::page_cache; +use crate::repository::*; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -17,6 +18,7 @@ use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::collections::HashMap; @@ -145,6 +147,11 @@ fn walreceiver_main( error!("No previous WAL position"); } + startpoint = Lsn::max( + startpoint, + Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), + ); + // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning @@ -164,6 +171,14 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); + let mut checkpoint = CheckPoint::new(startpoint.0, identify.timeline); + let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM); + if let Some(checkpoint_bytes) = timeline.get_page_image(checkpoint_tag, Lsn(0))? { + checkpoint = decode_checkpoint(checkpoint_bytes)?; + trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); + } else { + error!("No checkpoint record was found in reposistory"); + } while let Some(replication_message) = physical_stream.next()? { match replication_message { ReplicationMessage::XLogData(xlog_data) => { @@ -173,21 +188,21 @@ fn walreceiver_main( let startlsn = Lsn::from(xlog_data.wal_start()); let endlsn = startlsn + data.len() as u64; - write_wal_file( - startlsn, - timelineid, - 16 * 1024 * 1024, // FIXME - data, - )?; + write_wal_file(startlsn, timelineid, pg_constants::WAL_SEGMENT_SIZE, data)?; trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let decoded = decode_wal_record(recdata.clone()); + let old_checkpoint_bytes = encode_checkpoint(checkpoint); + let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; + let new_checkpoint_bytes = encode_checkpoint(checkpoint); + if new_checkpoint_bytes != old_checkpoint_bytes { + timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes); + } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN timeline.advance_last_record_lsn(lsn); @@ -299,7 +314,7 @@ fn write_wal_file( let wal_dir = PathBuf::from(format!("timelines/{}/wal", timeline)); /* Extract WAL location for this block */ - let mut xlogoff = start_pos.segment_offset(wal_seg_size as u64) as usize; + let mut xlogoff = start_pos.segment_offset(wal_seg_size); while bytes_left != 0 { let bytes_to_write; @@ -315,7 +330,7 @@ fn write_wal_file( } /* Open file */ - let segno = start_pos.segment_number(wal_seg_size as u64); + let segno = start_pos.segment_number(wal_seg_size); let wal_file_name = XLogFileName( 1, // FIXME: always use Postgres timeline 1 segno, @@ -367,7 +382,7 @@ fn write_wal_file( xlogoff += bytes_to_write; /* Did we reach the end of a WAL segment? */ - if start_pos.segment_offset(wal_seg_size as u64) == 0 { + if start_pos.segment_offset(wal_seg_size) == 0 { xlogoff = 0; if partial { fs::rename(&wal_file_partial_path, &wal_file_path)?; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index b9d4bfc944..f3a756ea7b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -39,6 +39,7 @@ use crate::repository::BufferTag; use crate::repository::WALRecord; use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; +use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::XLogRecord; @@ -241,22 +242,6 @@ impl PostgresRedoManagerInternal { } } - fn transaction_id_set_status_bit(&self, xid: u32, status: u8, page: &mut BytesMut) { - trace!( - "handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort, 3-sub_commit)", - status - ); - - let byteno: usize = ((xid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) - / pg_constants::CLOG_XACTS_PER_BYTE) as usize; - - let bshift: u8 = ((xid % pg_constants::CLOG_XACTS_PER_BYTE) - * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; - - page[byteno] = - (page[byteno] & !(pg_constants::CLOG_XACT_BITMASK << bshift)) | (status << bshift); - } - /// /// Process one request for WAL redo. /// @@ -308,7 +293,7 @@ impl PostgresRedoManagerInternal { let mut status = 0; if info == pg_constants::XLOG_XACT_COMMIT { status = pg_constants::TRANSACTION_STATUS_COMMITTED; - self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page); + transaction_id_set_status(xlogrec.xl_xid, status, &mut page); //handle subtrans let _xact_time = buf.get_i64_le(); let mut xinfo = 0; @@ -328,13 +313,13 @@ impl PostgresRedoManagerInternal { // only update xids on the requested page if tag.blknum == blkno { status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED; - self.transaction_id_set_status_bit(subxact, status, &mut page); + transaction_id_set_status(subxact, status, &mut page); } } } } else if info == pg_constants::XLOG_XACT_ABORT { status = pg_constants::TRANSACTION_STATUS_ABORTED; - self.transaction_id_set_status_bit(xlogrec.xl_xid, status, &mut page); + transaction_id_set_status(xlogrec.xl_xid, status, &mut page); //handle subtrans let _xact_time = buf.get_i64_le(); let mut xinfo = 0; @@ -354,7 +339,7 @@ impl PostgresRedoManagerInternal { // only update xids on the requested page if tag.blknum == blkno { status = pg_constants::TRANSACTION_STATUS_ABORTED; - self.transaction_id_set_status_bit(subxact, status, &mut page); + transaction_id_set_status(subxact, status, &mut page); } } } diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index b834bd99db..c15f19188f 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -18,6 +18,8 @@ fn main() { // included header files changed. .parse_callbacks(Box::new(bindgen::CargoCallbacks)) .whitelist_type("ControlFileData") + .whitelist_type("CheckPoint") + .whitelist_type("FullTransactionId") .whitelist_var("PG_CONTROL_FILE_SIZE") .whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") .whitelist_type("DBState") diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index a1a12a901d..4dcad7a081 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -3,6 +3,7 @@ #![allow(non_snake_case)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); +pub mod nonrelfile_utils; pub mod pg_constants; pub mod relfile_utils; pub mod xlog_utils; @@ -11,6 +12,7 @@ use bytes::{Buf, Bytes, BytesMut}; // sizeof(ControlFileData) const SIZEOF_CONTROLDATA: usize = std::mem::size_of::(); +const SIZEOF_CHECKPOINT: usize = std::mem::size_of::(); const OFFSETOF_CRC: usize = PG_CONTROLFILEDATA_OFFSETOF_CRC as usize; impl ControlFileData { @@ -69,3 +71,42 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes { buf.into() } + +pub fn encode_checkpoint(checkpoint: CheckPoint) -> Bytes { + let b: [u8; SIZEOF_CHECKPOINT]; + b = unsafe { std::mem::transmute::(checkpoint) }; + return Bytes::copy_from_slice(&b[..]); +} + +pub fn decode_checkpoint(mut buf: Bytes) -> Result { + let mut b = [0u8; SIZEOF_CHECKPOINT]; + buf.copy_to_slice(&mut b); + let checkpoint: CheckPoint; + checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) }; + Ok(checkpoint) +} + +impl CheckPoint { + pub fn new(lsn: u64, timeline: u32) -> CheckPoint { + CheckPoint { + redo: lsn, + ThisTimeLineID: timeline, + PrevTimeLineID: timeline, + fullPageWrites: true, // TODO: get actual value of full_page_writes + nextXid: FullTransactionId { + value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64, + }, // TODO: handle epoch? + nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID, + nextMulti: 1, + nextMultiOffset: 0, + oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID, + oldestXidDB: 0, + oldestMulti: 1, + oldestMultiDB: 0, + time: 0, + oldestCommitTsXid: 0, + newestCommitTsXid: 0, + oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID, + } + } +} diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index f301dcfc49..2a8b5c4879 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -23,15 +23,21 @@ pub const PG_XACT_FORKNUM: u8 = 44; pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45; pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46; pub const PG_TWOPHASE_FORKNUM: u8 = 47; +pub const PG_CHECKPOINT_FORKNUM: u8 = 48; // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); + // // constants from clog.h // pub const CLOG_XACTS_PER_BYTE: u32 = 4; -pub const CLOG_XACTS_PER_PAGE: u32 = 8192 * CLOG_XACTS_PER_BYTE; +pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; pub const CLOG_BITS_PER_XACT: u8 = 2; pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; @@ -42,6 +48,7 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; +pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00; pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; @@ -136,11 +143,6 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const SIZEOF_XLOGRECORD: u32 = 24; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - // // from xlogrecord.h // @@ -162,3 +164,12 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ + +/* From transam.h */ +pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; +pub const INVALID_TRANSACTION_ID: u32 = 0; +pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000; +pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; + +/* FIXME: pageserver should request wal_seg_size from compute node */ +pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; diff --git a/vendor/postgres b/vendor/postgres index ed314e6001..3705405983 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit ed314e6001bc122f0c920563a1af218fba4bc8d6 +Subproject commit 370540598383a14ba2d5225be3f5168823f6e511 diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index 766cd8e65c..04c50bfb01 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -51,13 +51,13 @@ impl Lsn { } /// Compute the offset into a segment - pub fn segment_offset(self, seg_sz: u64) -> u64 { - self.0 % seg_sz + pub fn segment_offset(self, seg_sz: usize) -> usize { + (self.0 % seg_sz as u64) as usize } /// Compute the segment number - pub fn segment_number(self, seg_sz: u64) -> u64 { - self.0 / seg_sz + pub fn segment_number(self, seg_sz: usize) -> u64 { + self.0 / seg_sz as u64 } /// Compute the offset into a block @@ -230,7 +230,7 @@ mod tests { assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1))); assert_eq!(Lsn(1234).checked_sub(1235u64), None); - let seg_sz = 16u64 * 1024 * 1024; + let seg_sz: usize = 16 * 1024 * 1024; assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7u64); assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64);