diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index faa600101d..7192e9117a 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -252,6 +252,7 @@ impl PostgresNode { // new data directory pub fn init_from_page_server(&self) -> Result<()> { let pgdata = self.pgdata(); + println!( "Extracting base backup to create postgres instance: path={} port={}", pgdata.display(), @@ -340,9 +341,6 @@ impl PostgresNode { ), )?; - fs::create_dir_all(self.pgdata().join("pg_wal"))?; - fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; - self.pg_resetwal(&["-f"])?; Ok(()) } @@ -398,19 +396,6 @@ impl PostgresNode { Ok(()) } - fn pg_resetwal(&self, args: &[&str]) -> Result<()> { - let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); - - let pg_ctl = Command::new(pg_resetwal_path) - .args([&["-D", self.pgdata().to_str().unwrap()], args].concat()) - .status() - .with_context(|| "pg_resetwal failed")?; - if !pg_ctl.success() { - anyhow::bail!("pg_resetwal failed"); - } - Ok(()) - } - pub fn start(&self) -> Result<()> { println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl(&["start"]) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 1096f0ed38..7c1a8de4dc 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -2,185 +2,27 @@ use crate::ZTimelineId; use log::*; use std::io::Write; use std::sync::Arc; -use std::time::SystemTime; -use tar::{Builder, Header}; +use tar::Builder; use walkdir::WalkDir; -use crate::repository::{BufferTag, RelTag, Timeline}; +use crate::repository::Timeline; use postgres_ffi::relfile_utils::*; -use postgres_ffi::*; use zenith_utils::lsn::Lsn; -fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ - let mut header = Header::new_gnu(); - header.set_size(size); - header.set_path(path)?; - header.set_mode(0b110000000); - header.set_mtime( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(), - ); - header.set_cksum(); - Ok(header) -} - -// -// Generate SRLU segment files from repository -// -fn add_slru_segments( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - path: &str, - forknum: u8, - lsn: Lsn, -) -> anyhow::Result<()> { - let rel = RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum, - }; - let (first, last) = timeline.get_range(rel, lsn)?; - const SEG_SIZE: usize = - pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize; - let mut seg_buf = [0u8; SEG_SIZE]; - let mut curr_segno: Option = None; - for page in first..last { - let tag = BufferTag { rel, blknum: page }; - let img = timeline.get_page_at_lsn(tag, lsn)?; - // Zero length image indicates truncated segment: just skip it - if !img.is_empty() { - assert!(img.len() == pg_constants::BLCKSZ as usize); - - let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; - if curr_segno.is_some() && curr_segno.unwrap() != segno { - let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); - let header = new_tar_header(&segname, SEG_SIZE as u64)?; - ar.append(&header, &seg_buf[..])?; - seg_buf = [0u8; SEG_SIZE]; - } - curr_segno = Some(segno); - let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize - * pg_constants::BLCKSZ as usize; - let offs_end = offs_start + pg_constants::BLCKSZ as usize; - seg_buf[offs_start..offs_end].copy_from_slice(&img); - } - } - if curr_segno.is_some() { - let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); - let header = new_tar_header(&segname, SEG_SIZE as u64)?; - ar.append(&header, &seg_buf[..])?; - } - Ok(()) -} - -// -// Extract pg_filenode.map files from repository -// -fn add_relmap_files( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - lsn: Lsn, - snappath: &str, -) -> anyhow::Result<()> { - for db in timeline.get_databases(lsn)?.iter() { - let tag = BufferTag { - rel: *db, - blknum: 0, - }; - let img = timeline.get_page_at_lsn(tag, lsn)?; - let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { - String::from("global/pg_filenode.map") - } else { - // User defined tablespaces are not supported - assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); - let src_path = format!("{}/base/1/PG_VERSION", snappath); - let dst_path = format!("base/{}/PG_VERSION", db.dbnode); - ar.append_path_with_name(&src_path, &dst_path)?; - format!("base/{}/pg_filenode.map", db.dbnode) - }; - info!("Deliver {}", path); - assert!(img.len() == 512); - let header = new_tar_header(&path, img.len() as u64)?; - ar.append(&header, &img[..])?; - } - Ok(()) -} - -// -// Extract twophase state files -// -fn add_twophase_files( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - lsn: Lsn, -) -> anyhow::Result<()> { - for xid in timeline.get_twophase(lsn)?.iter() { - let tag = BufferTag { - rel: RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum: pg_constants::PG_TWOPHASE_FORKNUM, - }, - blknum: *xid, - }; - let img = timeline.get_page_at_lsn(tag, lsn)?; - let path = format!("pg_twophase/{:>08X}", xid); - let header = new_tar_header(&path, img.len() as u64)?; - ar.append(&header, &img[..])?; - } - Ok(()) -} - -// -// Add generated pg_control file -// -fn add_pgcontrol_file( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - lsn: Lsn, -) -> anyhow::Result<()> { - if let Some(checkpoint_bytes) = - timeline.get_page_image(BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM), Lsn(0))? - { - if let Some(pg_control_bytes) = timeline.get_page_image( - BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM), - Lsn(0), - )? { - let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; - let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; - - checkpoint.redo = lsn.0; - checkpoint.nextXid.value += 1; - // TODO: When we restart master there are no active transaction and oldestXid is - // equal to nextXid if there are no prepared transactions. - // Let's ignore them for a while... - checkpoint.oldestXid = checkpoint.nextXid.value as u32; - pg_control.checkPointCopy = checkpoint; - let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control); - let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; - ar.append(&header, &pg_control_bytes[..])?; - } - } - Ok(()) -} - /// /// Generate tarball with non-relational files from repository /// pub fn send_tarball_at_lsn( write: &mut dyn Write, timelineid: ZTimelineId, - timeline: &Arc, - lsn: Lsn, + _timeline: &Arc, + _lsn: Lsn, snapshot_lsn: Lsn, ) -> anyhow::Result<()> { let mut ar = Builder::new(write); let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0); + let walpath = format!("timelines/{}/wal", timelineid); debug!("sending tarball of snapshot in {}", snappath); for entry in WalkDir::new(&snappath) { @@ -207,14 +49,8 @@ pub fn send_tarball_at_lsn( trace!("sending shared catalog {}", relpath.display()); ar.append_path_with_name(fullpath, relpath)?; } else if !is_rel_file_path(relpath.to_str().unwrap()) { - if entry.file_name() != "pg_filenode.map" - && entry.file_name() != "pg_control" - && !relpath.starts_with("pg_xact/") - && !relpath.starts_with("pg_multixact/") - { - trace!("sending {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } + trace!("sending {}", relpath.display()); + ar.append_path_with_name(fullpath, relpath)?; } else { trace!("not sending {}", relpath.display()); } @@ -223,31 +59,27 @@ pub fn send_tarball_at_lsn( } } - add_slru_segments( - &mut ar, - timeline, - "pg_xact", - pg_constants::PG_XACT_FORKNUM, - lsn, - )?; - add_slru_segments( - &mut ar, - timeline, - "pg_multixact/members", - pg_constants::PG_MXACT_MEMBERS_FORKNUM, - lsn, - )?; - add_slru_segments( - &mut ar, - timeline, - "pg_multixact/offsets", - pg_constants::PG_MXACT_OFFSETS_FORKNUM, - lsn, - )?; - add_relmap_files(&mut ar, timeline, lsn, &snappath)?; - add_twophase_files(&mut ar, timeline, lsn)?; - add_pgcontrol_file(&mut ar, timeline, lsn)?; + // FIXME: Also send all the WAL. The compute node would only need + // the WAL that applies to non-relation files, because the page + // server handles all the relation files. But we don't have a + // mechanism for separating relation and non-relation WAL at the + // moment. + for entry in std::fs::read_dir(&walpath)? { + let entry = entry?; + let fullpath = &entry.path(); + let relpath = fullpath.strip_prefix(&walpath).unwrap(); + if !entry.path().is_file() { + continue; + } + + let archive_fname = relpath.to_str().unwrap(); + let archive_fname = archive_fname + .strip_suffix(".partial") + .unwrap_or(&archive_fname); + let archive_path = "pg_wal/".to_owned() + archive_fname; + ar.append_path_with_name(fullpath, archive_path)?; + } ar.finish()?; debug!("all tarred up!"); Ok(()) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 8543d7c8dd..e5fc490038 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -101,6 +101,9 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; + force_crash_recovery(&tmppath)?; + println!("updated pg_control"); + let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename(tmppath, &target)?; @@ -309,6 +312,31 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } +// If control file says the cluster was shut down cleanly, modify it, to mark +// it as crashed. That forces crash recovery when you start the cluster. +// +// FIXME: +// We currently do this to the initial snapshot in "zenith init". It would +// be more natural to do this when the snapshot is restored instead, but we +// currently don't have any code to create new snapshots, so it doesn't matter +// Or better yet, use a less hacky way of putting the cluster into recovery. +// Perhaps create a backup label file in the data directory when it's restored. +fn force_crash_recovery(datadir: &Path) -> Result<()> { + // Read in the control file + let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); + let mut controlfile = + postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?; + + controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; + + fs::write( + controlfilepath.as_path(), + postgres_ffi::encode_pg_control(controlfile), + )?; + + Ok(()) +} + fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { // Create initial timeline let mut tli_buf = [0u8; 16]; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3376a7b7e1..b699df3023 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -921,7 +921,7 @@ impl Connection { // find latest snapshot let snapshot_lsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); + let req_lsn = lsn.unwrap_or(snapshot_lsn); basebackup::send_tarball_at_lsn( &mut CopyDataSink { stream }, timelineid, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 1595ccb0eb..9fd114293e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,6 @@ pub mod rocksdb; -use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId, XlCreateDatabase, XlSmgrTruncate}; +use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate}; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -48,9 +48,6 @@ pub trait Timeline { /// Does relation exist? fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result; - /// Get page image at the particular LSN - fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result>; - //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. // @@ -166,16 +163,6 @@ pub trait Timeline { /// valid LSN, so that the WAL receiver knows where to restart streaming. fn advance_last_record_lsn(&self, lsn: Lsn); fn get_last_record_lsn(&self) -> Lsn; - - /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations - /// but can be also applied to normal relations. - fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>; - - /// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used) - fn get_databases(&self, lsn: Lsn) -> Result>; - - /// Get vector of prepared twophase transactions - fn get_twophase(&self, lsn: Lsn) -> Result>; } #[derive(Clone)] @@ -236,18 +223,6 @@ pub struct BufferTag { } impl BufferTag { - pub fn fork(forknum: u8) -> BufferTag { - BufferTag { - rel: RelTag { - forknum, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - } - } - pub fn pack(&self, buf: &mut BytesMut) { self.rel.pack(buf); buf.put_u32(self.blknum); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index d7bccd2112..09fe1ad63b 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -7,7 +7,7 @@ use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; use crate::restore_local_repo::restore_timeline; -use crate::waldecoder::{Oid, TransactionId}; +use crate::waldecoder::Oid; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; @@ -16,8 +16,6 @@ use crate::ZTimelineId; use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; -use postgres_ffi::nonrelfile_utils::transaction_id_get_status; -use postgres_ffi::*; use std::cmp::min; use std::collections::HashMap; use std::convert::TryInto; @@ -417,19 +415,6 @@ impl RocksTimeline { let mut minkey = maxkey.clone(); minkey.lsn = Lsn(0); // first version - // Special handling of delete of PREPARE WAL record - if last_lsn < horizon - && key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM - { - if (v[0] & UNUSED_VERSION_FLAG) == 0 { - let mut v = v.to_owned(); - v[0] |= UNUSED_VERSION_FLAG; - self.db.put(key.to_bytes(), &v[..])?; - deleted += 1; - } - maxkey = minkey; - continue; - } // reconstruct most recent page version if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { // force reconstruction of most recent page version @@ -623,116 +608,6 @@ impl Timeline for RocksTimeline { self.relsize_get_nowait(rel, lsn) } - /// Get vector of prepared twophase transactions - fn get_twophase(&self, lsn: Lsn) -> Result> { - let key = CacheKey { - // minimal key - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::PG_TWOPHASE_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut gxacts = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM { - break; // we are done with this fork - } - if key.lsn <= lsn { - let xid = key.tag.blknum; - let tag = BufferTag { - rel: RelTag { - forknum: pg_constants::PG_XACT_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE, - }; - let clog_page = self.get_page_at_lsn(tag, lsn)?; - let status = transaction_id_get_status(xid, &clog_page[..]); - if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { - gxacts.push(xid); - } - } - iter.next(); - } - Ok(gxacts) - } - - /// Get databases. This function is used to local pg_filenode.map files - fn get_databases(&self, lsn: Lsn) -> Result> { - let key = CacheKey { - // minimal key - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::PG_FILENODEMAP_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut dbs = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - let mut prev_tag = key.tag.rel; - while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { - break; // we are done with this fork - } - if key.tag.rel != prev_tag && key.lsn <= lsn { - prev_tag = key.tag.rel; - dbs.push(prev_tag); // collect unique tags - } - iter.next(); - } - Ok(dbs) - } - - /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations - /// but can be also applied to normal relations. - fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> { - let _lsn = self.wait_lsn(lsn)?; - let mut key = CacheKey { - // minimal key to start with - tag: BufferTag { rel, blknum: 0 }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); // locate first entry - if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); - let tag = thiskey.tag; - if tag.rel == rel { - // still trversing this relation - let first_blknum = tag.blknum; - key.tag.blknum = u32::MAX; // maximal key - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.to_bytes()); // localte last entry - if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); - let last_blknum = thiskey.tag.blknum; - return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive - } - } - } - Ok((0, 0)) // empty range - } - /// /// Does relation exist at given LSN? /// @@ -811,20 +686,6 @@ impl Timeline for RocksTimeline { Ok(()) } - /// - /// Get page image at particular LSN - /// - fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result> { - let key = CacheKey { tag, lsn }; - if let Some(bytes) = self.db.get(key.to_bytes())? { - let content = CacheEntryContent::from_slice(&bytes); - if let CacheEntryContent::PageImage(img) = content { - return Ok(Some(img)); - } - } - Ok(None) - } - /// /// Memorize a full image of a page version /// @@ -866,42 +727,35 @@ impl Timeline for RocksTimeline { src_db_id: Oid, src_tablespace_id: Oid, ) -> Result<()> { - let mut n = 0; - for forknum in &[ - pg_constants::MAIN_FORKNUM, - pg_constants::FSM_FORKNUM, - pg_constants::VISIBILITYMAP_FORKNUM, - pg_constants::INIT_FORKNUM, - pg_constants::PG_FILENODEMAP_FORKNUM, - ] { - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: *forknum, - }, - blknum: 0, + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - while iter.valid() { - let mut key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - - let v = iter.value().unwrap(); - self.db.put(key.to_bytes(), v)?; - n += 1; - iter.next(); + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut iter = self.db.raw_iterator(); + iter.seek(key.to_bytes()); + let mut n = 0; + while iter.valid() { + let mut key = CacheKey::from_slice(iter.key().unwrap()); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; } + + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + + let v = iter.value().unwrap(); + self.db.put(key.to_bytes(), v)?; + n += 1; + iter.next(); } info!( "Create database {}/{}, copy {} entries", @@ -912,7 +766,6 @@ impl Timeline for RocksTimeline { /// Remember that WAL has been received and added to the timeline up to the given LSN fn advance_last_valid_lsn(&self, lsn: Lsn) { - let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes let old = self.last_valid_lsn.advance(lsn); // Can't move backwards. @@ -930,8 +783,7 @@ impl Timeline for RocksTimeline { /// NOTE: this updates last_valid_lsn as well. /// fn advance_last_record_lsn(&self, lsn: Lsn) { - let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes - // Can't move backwards. + // Can't move backwards. let old = self.last_record_lsn.fetch_max(lsn); assert!(old <= lsn); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 47c3b77116..d6d822a1bf 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -26,9 +26,9 @@ use crate::repository::{BufferTag, RelTag, Timeline}; use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; -use postgres_ffi::*; use zenith_utils::lsn::Lsn; /// @@ -103,7 +103,7 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re } fn restore_snapshot( - conf: &PageServerConf, + _conf: &PageServerConf, timeline: &dyn Timeline, timelineid: ZTimelineId, snapshot: &str, @@ -120,28 +120,8 @@ fn restore_snapshot( None => continue, // These special files appear in the snapshot, but are not needed by the page server - Some("pg_control") => restore_nonrel_file( - conf, - timeline, - timelineid, - "0", - 0, - 0, - pg_constants::PG_CONTROLFILE_FORKNUM, - 0, - &direntry.path(), - )?, - Some("pg_filenode.map") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::GLOBALTABLESPACE_OID, - 0, - pg_constants::PG_FILENODEMAP_FORKNUM, - 0, - &direntry.path(), - )?, + Some("pg_control") => continue, + Some("pg_filenode.map") => continue, // Load any relation files into the page server _ => restore_relfile( @@ -168,17 +148,7 @@ fn restore_snapshot( // These special files appear in the snapshot, but are not needed by the page server Some("PG_VERSION") => continue, - Some("pg_filenode.map") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::DEFAULTTABLESPACE_OID, - dboid, - pg_constants::PG_FILENODEMAP_FORKNUM, - 0, - &direntry.path(), - )?, + Some("pg_filenode.map") => continue, // Load any relation files into the page server _ => restore_relfile( @@ -191,54 +161,6 @@ fn restore_snapshot( } } } - for entry in fs::read_dir(snapshotpath.join("pg_xact"))? { - let entry = entry?; - restore_slru_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::PG_XACT_FORKNUM, - &entry.path(), - )?; - } - for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? { - let entry = entry?; - restore_slru_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::PG_MXACT_MEMBERS_FORKNUM, - &entry.path(), - )?; - } - for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? { - let entry = entry?; - restore_slru_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::PG_MXACT_OFFSETS_FORKNUM, - &entry.path(), - )?; - } - for entry in fs::read_dir(snapshotpath.join("pg_twophase"))? { - let entry = entry?; - let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?; - restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - 0, - 0, - pg_constants::PG_TWOPHASE_FORKNUM, - xid, - &entry.path(), - )?; - } // TODO: Scan pg_tblspc Ok(()) @@ -306,96 +228,6 @@ fn restore_relfile( Ok(()) } -fn restore_nonrel_file( - _conf: &PageServerConf, - timeline: &dyn Timeline, - _timelineid: ZTimelineId, - snapshot: &str, - spcoid: Oid, - dboid: Oid, - forknum: u8, - blknum: u32, - path: &Path, -) -> Result<()> { - let lsn = Lsn::from_hex(snapshot)?; - - // Does it look like a relation file? - - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - let tag = BufferTag { - rel: RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode: 0, - forknum, - }, - blknum, - }; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..])); - Ok(()) -} - -fn restore_slru_file( - _conf: &PageServerConf, - timeline: &dyn Timeline, - _timelineid: ZTimelineId, - snapshot: &str, - forknum: u8, - path: &Path, -) -> Result<()> { - let lsn = Lsn::from_hex(snapshot)?; - - // Does it look like a relation file? - - let mut file = File::open(path)?; - let mut buf: [u8; 8192] = [0u8; 8192]; - let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; - - let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; - loop { - let r = file.read_exact(&mut buf); - match r { - Ok(_) => { - let tag = BufferTag { - rel: RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum, - }, - blknum, - }; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); - /* - if oldest_lsn == 0 || p.lsn < oldest_lsn { - oldest_lsn = p.lsn; - } - */ - } - - // TODO: UnexpectedEof is expected - Err(e) => match e.kind() { - std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected. - // FIXME: maybe check that we read the full length of the file? - break; - } - _ => { - error!("error reading file: {:?} ({})", path, e); - break; - } - }, - }; - blknum += 1; - } - - Ok(()) -} - // Scan WAL on a timeline, starting from given LSN, and load all the records // into the page cache. fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> { @@ -406,17 +238,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = Lsn(0); - - let mut checkpoint = CheckPoint::new(startpoint.0, 1); - let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM); - let pg_control_tag = BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM); - if let Some(pg_control_bytes) = timeline.get_page_image(pg_control_tag, Lsn(0))? { - let pg_control = decode_pg_control(pg_control_bytes)?; - checkpoint = pg_control.checkPointCopy.clone(); - } else { - error!("No control file is found in reposistory"); - } - loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); @@ -454,12 +275,10 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn if rec.is_err() { // Assume that an error means we've reached the end of // a partial WAL record. So that's ok. - trace!("WAL decoder error {:?}", rec); - waldecoder.set_position(Lsn((segno + 1) * pg_constants::WAL_SEGMENT_SIZE as u64)); break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); + let decoded = decode_wal_record(recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; last_lsn = lsn; } else { @@ -468,16 +287,12 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn nrecords += 1; } - info!( - "restored {} records from WAL file {} at {}", - nrecords, filename, last_lsn - ); + info!("restored {} records from WAL file {}", nrecords, filename); segno += 1; offset = 0; } info!("reached end of WAL at {}", last_lsn); - let checkpoint_bytes = encode_checkpoint(checkpoint); - timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes); + Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 375bd6470a..b726bf5a0d 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,7 +1,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::XLogRecord; -use postgres_ffi::*; use std::cmp::min; use std::str; use thiserror::Error; @@ -14,19 +14,17 @@ pub type OffsetNumber = u16; pub type MultiXactId = TransactionId; pub type MultiXactOffset = u32; pub type MultiXactStatus = u32; -pub type TimeLineID = u32; -pub type PgTime = i64; // From PostgreSQL headers #[repr(C)] #[derive(Debug)] pub struct XLogPageHeaderData { - xlp_magic: u16, /* magic value for correctness checks */ - xlp_info: u16, /* flag bits, see below */ - xlp_tli: TimeLineID, /* TimeLineID of first record on page */ - xlp_pageaddr: u64, /* XLOG address of this page */ - xlp_rem_len: u32, /* total len of remaining data for record */ + xlp_magic: u16, /* magic value for correctness checks */ + xlp_info: u16, /* flag bits, see below */ + xlp_tli: u32, /* TimeLineID of first record on page */ + xlp_pageaddr: u64, /* XLOG address of this page */ + xlp_rem_len: u32, /* total len of remaining data for record */ } // FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end @@ -84,13 +82,6 @@ impl WalStreamDecoder { } } - pub fn set_position(&mut self, lsn: Lsn) { - self.lsn = lsn; - self.contlen = 0; - self.padlen = 0; - self.inputbuf.clear(); - } - pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } @@ -556,7 +547,7 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord { +pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; @@ -574,12 +565,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW xlogrec.xl_rmid, xlogrec.xl_info ); - if xlogrec.xl_xid > checkpoint.nextXid.value as u32 { - // TODO: handle XID wraparound - checkpoint.nextXid = FullTransactionId { - value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64, - }; - } + let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { @@ -790,30 +776,10 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW assert_eq!(buf.remaining(), main_data_len as usize); } - //5. Handle special CLOG and XACT records - if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = buf.get_i32_le() as u32; - blk.will_init = true; - trace!("RM_CLOG_ID updates block {}", blk.blkno); - blocks.push(blk); - } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + //5. Handle special XACT records + if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; - trace!( - "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xlogrec.xl_info, (xlogrec.xl_prev >> 32), - xlogrec.xl_prev & 0xffffffff, - xlogrec.xl_xid, - blk.blkno, - main_data_len - ); - blocks.push(blk); - //parse commit record to extract subtrans entries // xl_xact_commit starts with time of commit let _xact_time = buf.get_i64_le(); @@ -828,17 +794,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); - let mut prev_blkno = u32::MAX; for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; - if prev_blkno != blkno { - prev_blkno = blkno; - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = blkno; - blocks.push(blk); - } + let _subxact = buf.get_u32_le(); } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { @@ -869,18 +826,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW //TODO handle this to be able to restore pg_twophase on node start } } else if info == pg_constants::XLOG_XACT_ABORT { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; - trace!( - "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xlogrec.xl_info, (xlogrec.xl_prev >> 32), - xlogrec.xl_prev & 0xffffffff, - xlogrec.xl_xid, - blk.blkno, - main_data_len - ); - blocks.push(blk); //parse abort record to extract subtrans entries // xl_xact_abort starts with time of commit let _xact_time = buf.get_i64_le(); @@ -895,17 +840,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); - let mut prev_blkno = u32::MAX; for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; - if prev_blkno != blkno { - prev_blkno = blkno; - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = blkno; - blocks.push(blk); - } + let _subxact = buf.get_u32_le(); } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { @@ -927,11 +863,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW let _xid = buf.get_u32_le(); trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } - } else if info == pg_constants::XLOG_XACT_PREPARE { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM; - blk.blkno = xlogrec.xl_xid; - blk.will_init = true; } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; @@ -964,7 +895,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW trace!("XLOG_TBLSPC_DROP is not handled yet"); } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; if info == pg_constants::XLOG_HEAP_INSERT { let xlrec = XlHeapInsert::decode(&mut buf); @@ -1018,7 +949,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { let xlrec = XlHeapMultiInsert::decode(&mut buf); if (xlrec.flags @@ -1036,104 +967,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blocks.push(blk); } } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; - blk.blkno = buf.get_u32_le(); - blk.will_init = true; - blocks.push(blk); - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; - blk.blkno = buf.get_u32_le(); - blk.will_init = true; - blocks.push(blk); - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - // Update offset page - let mut blk = DecodedBkpBlock::new(); - blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; - blocks.push(blk); - let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_blkno = - (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - for blkno in first_mbr_blkno..=last_mbr_blkno { - // Update members page - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; - blk.blkno = blkno; - blocks.push(blk); - } - if xlrec.mid > checkpoint.nextMulti { - // See MultiXactAdvanceNextMXact in postgres code - checkpoint.nextMulti = xlrec.mid + 1; - } - if xlrec.moff > checkpoint.nextMultiOffset { - // See MultiXactAdvanceNextMXact in postgres code - checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; - } - let max_xid = xlrec - .members - .iter() - .fold(checkpoint.nextXid.value as u32, |acc, mbr| { - if mbr.xid > acc { - mbr.xid - } else { - acc - } - }); - checkpoint.nextXid = FullTransactionId { - value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | max_xid as u64, - }; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - checkpoint.oldestXid = xlrec.end_trunc_off; - checkpoint.oldestMultiDB = xlrec.oldest_multi_db; - let first_off_blkno = - xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let last_off_blkno = - xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - for blkno in first_off_blkno..last_off_blkno { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; - blk.blkno = blkno; - blk.will_init = true; - blocks.push(blk); - } - let first_mbr_blkno = - xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_blkno = - xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - for blkno in first_mbr_blkno..last_mbr_blkno { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; - blk.blkno = blkno; - blk.will_init = true; - blocks.push(blk); - } - } else { - panic!() - } - } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { - let xlrec = XlRelmapUpdate::decode(&mut buf); - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM; - blk.rnode_spcnode = xlrec.tsid; - blk.rnode_dbnode = xlrec.dbid; - blk.rnode_relnode = 0; - blk.will_init = true; - blocks.push(blk); - } else if xlogrec.xl_rmid == pg_constants::RM_XLOG_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if next_oid > checkpoint.nextOid { - checkpoint.nextOid = next_oid; - } - } } DecodedWALRecord { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e8992f944f..fbcf67eb39 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,7 +7,6 @@ //! use crate::page_cache; -use crate::repository::*; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -17,8 +16,8 @@ use log::*; use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; +use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; -use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::collections::HashMap; @@ -147,11 +146,6 @@ fn walreceiver_main( error!("No previous WAL position"); } - startpoint = Lsn::max( - startpoint, - Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), - ); - // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning @@ -171,14 +165,6 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let mut checkpoint = CheckPoint::new(startpoint.0, identify.timeline); - let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM); - if let Some(checkpoint_bytes) = timeline.get_page_image(checkpoint_tag, Lsn(0))? { - checkpoint = decode_checkpoint(checkpoint_bytes)?; - trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); - } else { - error!("No checkpoint record was found in reposistory"); - } while let Some(replication_message) = physical_stream.next()? { match replication_message { ReplicationMessage::XLogData(xlog_data) => { @@ -195,14 +181,9 @@ fn walreceiver_main( waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let old_checkpoint_bytes = encode_checkpoint(checkpoint); - let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); + let decoded = decode_wal_record(recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; - let new_checkpoint_bytes = encode_checkpoint(checkpoint); - if new_checkpoint_bytes != old_checkpoint_bytes { - timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes); - } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN timeline.advance_last_record_lsn(lsn); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index ab18ad9205..97ef72ad2d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -14,8 +14,7 @@ //! TODO: Even though the postgres code runs in a separate process, //! it's not a secure sandbox. //! -use byteorder::{ByteOrder, LittleEndian}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use log::*; use std::assert; use std::cell::RefCell; @@ -37,11 +36,7 @@ use zenith_utils::lsn::Lsn; use crate::repository::BufferTag; use crate::repository::WALRecord; -use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; -use postgres_ffi::nonrelfile_utils::transaction_id_set_status; -use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils::XLogRecord; /// /// WAL Redo Manager is responsible for replaying WAL records. @@ -169,24 +164,6 @@ impl WalRedoManager for PostgresRedoManager { } } -fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize { - ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16 - % pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE - * pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize -} - -fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 { - (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP - * pg_constants::MXACT_MEMBER_BITS_PER_XACT -} - -/* Location (byte offset within page) of TransactionId of given member */ -fn mx_offset_to_member_offset(xid: MultiXactId) -> usize { - mx_offset_to_flags_offset(xid) - + (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP - + (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize -} - /// /// WAL redo thread /// @@ -255,151 +232,7 @@ impl PostgresRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - if tag.rel.forknum > pg_constants::INIT_FORKNUM { - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let mut page = BytesMut::new(); - if let Some(fpi) = base_img { - page.extend_from_slice(&fpi[..]); - } else { - page.extend_from_slice(&ZERO_PAGE); - } - for record in records { - let mut buf = record.rec.clone(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let xlogrec = XLogRecord::from_bytes(&mut buf); - - //move to main data - // TODO probably, we should store some records in our special format - // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; - if buf.remaining() > skip { - buf.advance(skip); - } - - if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { - let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::CLOG_ZEROPAGE { - page.copy_from_slice(&ZERO_PAGE); - } - } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - let mut status = 0; - if info == pg_constants::XLOG_XACT_COMMIT { - status = pg_constants::TRANSACTION_STATUS_COMMITTED; - transaction_id_set_status(xlogrec.xl_xid, status, &mut page); - //handle subtrans - let _xact_time = buf.get_i64_le(); - let mut xinfo = 0; - if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - xinfo = buf.get_u32_le(); - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - let _dbid = buf.get_u32_le(); - let _tsid = buf.get_u32_le(); - } - } - - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - // only update xids on the requested page - if tag.blknum == blkno { - status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED; - transaction_id_set_status(subxact, status, &mut page); - } - } - } - } else if info == pg_constants::XLOG_XACT_ABORT { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - transaction_id_set_status(xlogrec.xl_xid, status, &mut page); - //handle subtrans - let _xact_time = buf.get_i64_le(); - let mut xinfo = 0; - if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - xinfo = buf.get_u32_le(); - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - let _dbid = buf.get_u32_le(); - let _tsid = buf.get_u32_le(); - } - } - - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - // only update xids on the requested page - if tag.blknum == blkno { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - transaction_id_set_status(subxact, status, &mut page); - } - } - } - } else if info != pg_constants::XLOG_XACT_PREPARE { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", - status, - record.lsn, - record.main_data_offset, record.rec.len()); - } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE - || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE - { - page.copy_from_slice(&ZERO_PAGE); - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM { - let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); - } else { - assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM); - for i in 0..xlrec.nmembers { - let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - if blkno == tag.blknum { - // update only target block - let offset = xlrec.moff + i; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= - !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) - << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); - } - } - } - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - // empty page image indicates that this SLRU page is truncated and can be removed by GC - page.clear(); - } else { - panic!(); - } - } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { - page.clear(); - page.extend_from_slice(&buf[12..]); // skip xl_relmap_update - assert!(page.len() == 512); // size of pg_filenode.map - } - } - - apply_result = Ok::(page.freeze()); - } else { - apply_result = process.apply_wal_records(tag, base_img, records).await; - } + apply_result = process.apply_wal_records(tag, base_img, records).await; let duration = start.elapsed(); diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index c15f19188f..b834bd99db 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -18,8 +18,6 @@ fn main() { // included header files changed. .parse_callbacks(Box::new(bindgen::CargoCallbacks)) .whitelist_type("ControlFileData") - .whitelist_type("CheckPoint") - .whitelist_type("FullTransactionId") .whitelist_var("PG_CONTROL_FILE_SIZE") .whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") .whitelist_type("DBState") diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 29a01c3445..a1a12a901d 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -3,7 +3,6 @@ #![allow(non_snake_case)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -pub mod nonrelfile_utils; pub mod pg_constants; pub mod relfile_utils; pub mod xlog_utils; @@ -12,7 +11,6 @@ use bytes::{Buf, Bytes, BytesMut}; // sizeof(ControlFileData) const SIZEOF_CONTROLDATA: usize = std::mem::size_of::(); -const SIZEOF_CHECKPOINT: usize = std::mem::size_of::(); const OFFSETOF_CRC: usize = PG_CONTROLFILEDATA_OFFSETOF_CRC as usize; impl ControlFileData { @@ -71,42 +69,3 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes { buf.into() } - -pub fn encode_checkpoint(checkpoint: CheckPoint) -> Bytes { - let b: [u8; SIZEOF_CHECKPOINT]; - b = unsafe { std::mem::transmute::(checkpoint) }; - Bytes::copy_from_slice(&b[..]) -} - -pub fn decode_checkpoint(mut buf: Bytes) -> Result { - let mut b = [0u8; SIZEOF_CHECKPOINT]; - buf.copy_to_slice(&mut b); - let checkpoint: CheckPoint; - checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) }; - Ok(checkpoint) -} - -impl CheckPoint { - pub fn new(lsn: u64, timeline: u32) -> CheckPoint { - CheckPoint { - redo: lsn, - ThisTimeLineID: timeline, - PrevTimeLineID: timeline, - fullPageWrites: true, // TODO: get actual value of full_page_writes - nextXid: FullTransactionId { - value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64, - }, // TODO: handle epoch? - nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID, - nextMulti: 1, - nextMultiOffset: 0, - oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID, - oldestXidDB: 0, - oldestMulti: 1, - oldestMultiDB: 0, - time: 0, - oldestCommitTsXid: 0, - newestCommitTsXid: 0, - oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID, - } - } -} diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 29d07b8c0c..19abe5f741 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -15,32 +15,10 @@ pub const MAIN_FORKNUM: u8 = 0; pub const FSM_FORKNUM: u8 = 1; pub const VISIBILITYMAP_FORKNUM: u8 = 2; pub const INIT_FORKNUM: u8 = 3; -// Special values for non-rel files' tags (Zenith-specific) -//Special values for non-rel files' tags -pub const PG_CONTROLFILE_FORKNUM: u8 = 42; -pub const PG_FILENODEMAP_FORKNUM: u8 = 43; -pub const PG_XACT_FORKNUM: u8 = 44; -pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45; -pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46; -pub const PG_TWOPHASE_FORKNUM: u8 = 47; -pub const PG_CHECKPOINT_FORKNUM: u8 = 48; // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - -// -// constants from clog.h -// -pub const CLOG_XACTS_PER_BYTE: u32 = 4; -pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; -pub const CLOG_BITS_PER_XACT: u8 = 2; -pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; - // // Constants from visbilitymap.h // @@ -48,22 +26,15 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; -pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00; pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; -pub const CLOG_ZEROPAGE: u8 = 0x00; -pub const CLOG_TRUNCATE: u8 = 0x10; - // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10; pub const XLOG_XACT_ABORT: u8 = 0x20; -// From srlu.h -pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; - /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ @@ -83,29 +54,9 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // From pg_control.h and rmgrlist.h -pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; -// From multixact.h -pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00; -pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10; -pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20; -pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30; - -pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4; -pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8; -pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1; -pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4; -pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 = - MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE; -/* size in bytes of a complete group */ -pub const MULTIXACT_MEMBERGROUP_SIZE: u16 = - 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP; -pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE; -pub const MULTIXACT_MEMBERS_PER_PAGE: u16 = - MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP; - // From heapam_xlog.h pub const XLOG_HEAP_INSERT: u8 = 0x00; pub const XLOG_HEAP_DELETE: u8 = 0x10; @@ -144,6 +95,11 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const SIZEOF_XLOGRECORD: u32 = 24; +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); + // // from xlogrecord.h // @@ -166,11 +122,5 @@ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ -/* From transam.h */ -pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; -pub const INVALID_TRANSACTION_ID: u32 = 0; -pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000; -pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; - /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; diff --git a/postgres_ffi/src/relfile_utils.rs b/postgres_ffi/src/relfile_utils.rs index c292ff9582..97c8f0afea 100644 --- a/postgres_ffi/src/relfile_utils.rs +++ b/postgres_ffi/src/relfile_utils.rs @@ -38,16 +38,6 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { pg_constants::FSM_FORKNUM => Some("fsm"), pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"), pg_constants::INIT_FORKNUM => Some("init"), - - // These should not appear in WAL records, but we use them internally, - // and need to be prepared to print them out in log messages and such - pg_constants::PG_CONTROLFILE_FORKNUM => Some("controlfile"), - pg_constants::PG_FILENODEMAP_FORKNUM => Some("filenodemap"), - pg_constants::PG_XACT_FORKNUM => Some("xact"), - pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"), - pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"), - pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"), - _ => Some("UNKNOWN FORKNUM"), } } diff --git a/vendor/postgres b/vendor/postgres index fd9de093b0..62852ad64e 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit fd9de093b0e115459c8816895d2ac37da2599aca +Subproject commit 62852ad64e0a12d806aa3888f213ddcce5279c0f diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index f110a1e293..70cf0d5bf0 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -233,9 +233,7 @@ impl TimelineTools for Option> { fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { let seg_size = self.get().get_info().server.wal_seg_size as usize; let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); - let wal_start = Lsn((seg_size * 2) as u64); // FIXME: handle pg_resetwal - let lsn = Lsn::max(Lsn(lsn), wal_start); - (lsn, timeline) + (Lsn(lsn), timeline) } }