From 6a9c036ac165de37fcb449c467ed2a2985d37511 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 24 May 2021 14:57:23 +0300 Subject: [PATCH] Revert all changes related to storing and restoring non-rel data in page server This includes the following commits: 35a1c3d521729709a7bafe47fb4a3a7bdf52ad65 Specify right LSN in test_createdb.py d95e1da7425674bec41348066a5950cfc761f043 Fix issue with propagation of CREATE DATABASE to the branch 8465738aa5cd360ae7b71c95e435bd70527f572c [refer #167] Fix handling of pg_filenode.map files in page server 86056abd0e071b86c447a6523226b37160dd5387 Fix merge conflict: set initial WAL position to second segment because of pg_resetwal 2bf2dd1d8871261de4c6a104de5664d32f208260 Add nonrelfile_utils.rs file 20b6279beba10b1f92606e0b4450758eae217c79 Fix restoring non-relational data during compute node startup 06f96f96001ddb0033b23fc6cb7beaa84ce9fa8f Do not transfer WAL to computation nodes: use pg_resetwal for node startup As well as some older changes related to storing CLOG and MultiXact data as "pseudorelation" in the page server. With this revert, we go back to the situtation that when you create a new compute node, we ship *all* the WAL from the beginning of time to the compute node. Obviously we need a better solution, like the code that this reverts. But per discussion with Konstantin and Stas, this stuff was still half-baked, and it's better for it to live in a branch for now, until it's more complete and has gone through some review. --- control_plane/src/compute.rs | 17 +- pageserver/src/basebackup.rs | 222 ++++----------------------- pageserver/src/branches.rs | 28 ++++ pageserver/src/page_service.rs | 2 +- pageserver/src/repository.rs | 27 +--- pageserver/src/repository/rocksdb.rs | 206 ++++--------------------- pageserver/src/restore_local_repo.rs | 201 +----------------------- pageserver/src/waldecoder.rs | 195 ++--------------------- pageserver/src/walreceiver.rs | 23 +-- pageserver/src/walredo.rs | 171 +-------------------- postgres_ffi/build.rs | 2 - postgres_ffi/src/lib.rs | 41 ----- postgres_ffi/src/pg_constants.rs | 60 +------- postgres_ffi/src/relfile_utils.rs | 10 -- vendor/postgres | 2 +- walkeeper/src/timeline.rs | 4 +- 16 files changed, 120 insertions(+), 1091 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index faa600101d..7192e9117a 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -252,6 +252,7 @@ impl PostgresNode { // new data directory pub fn init_from_page_server(&self) -> Result<()> { let pgdata = self.pgdata(); + println!( "Extracting base backup to create postgres instance: path={} port={}", pgdata.display(), @@ -340,9 +341,6 @@ impl PostgresNode { ), )?; - fs::create_dir_all(self.pgdata().join("pg_wal"))?; - fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; - self.pg_resetwal(&["-f"])?; Ok(()) } @@ -398,19 +396,6 @@ impl PostgresNode { Ok(()) } - fn pg_resetwal(&self, args: &[&str]) -> Result<()> { - let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); - - let pg_ctl = Command::new(pg_resetwal_path) - .args([&["-D", self.pgdata().to_str().unwrap()], args].concat()) - .status() - .with_context(|| "pg_resetwal failed")?; - if !pg_ctl.success() { - anyhow::bail!("pg_resetwal failed"); - } - Ok(()) - } - pub fn start(&self) -> Result<()> { println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl(&["start"]) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 1096f0ed38..7c1a8de4dc 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -2,185 +2,27 @@ use crate::ZTimelineId; use log::*; use std::io::Write; use std::sync::Arc; -use std::time::SystemTime; -use tar::{Builder, Header}; +use tar::Builder; use walkdir::WalkDir; -use crate::repository::{BufferTag, RelTag, Timeline}; +use crate::repository::Timeline; use postgres_ffi::relfile_utils::*; -use postgres_ffi::*; use zenith_utils::lsn::Lsn; -fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ - let mut header = Header::new_gnu(); - header.set_size(size); - header.set_path(path)?; - header.set_mode(0b110000000); - header.set_mtime( - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(), - ); - header.set_cksum(); - Ok(header) -} - -// -// Generate SRLU segment files from repository -// -fn add_slru_segments( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - path: &str, - forknum: u8, - lsn: Lsn, -) -> anyhow::Result<()> { - let rel = RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum, - }; - let (first, last) = timeline.get_range(rel, lsn)?; - const SEG_SIZE: usize = - pg_constants::BLCKSZ as usize * pg_constants::SLRU_PAGES_PER_SEGMENT as usize; - let mut seg_buf = [0u8; SEG_SIZE]; - let mut curr_segno: Option = None; - for page in first..last { - let tag = BufferTag { rel, blknum: page }; - let img = timeline.get_page_at_lsn(tag, lsn)?; - // Zero length image indicates truncated segment: just skip it - if !img.is_empty() { - assert!(img.len() == pg_constants::BLCKSZ as usize); - - let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; - if curr_segno.is_some() && curr_segno.unwrap() != segno { - let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); - let header = new_tar_header(&segname, SEG_SIZE as u64)?; - ar.append(&header, &seg_buf[..])?; - seg_buf = [0u8; SEG_SIZE]; - } - curr_segno = Some(segno); - let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize - * pg_constants::BLCKSZ as usize; - let offs_end = offs_start + pg_constants::BLCKSZ as usize; - seg_buf[offs_start..offs_end].copy_from_slice(&img); - } - } - if curr_segno.is_some() { - let segname = format!("{}/{:>04X}", path, curr_segno.unwrap()); - let header = new_tar_header(&segname, SEG_SIZE as u64)?; - ar.append(&header, &seg_buf[..])?; - } - Ok(()) -} - -// -// Extract pg_filenode.map files from repository -// -fn add_relmap_files( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - lsn: Lsn, - snappath: &str, -) -> anyhow::Result<()> { - for db in timeline.get_databases(lsn)?.iter() { - let tag = BufferTag { - rel: *db, - blknum: 0, - }; - let img = timeline.get_page_at_lsn(tag, lsn)?; - let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { - String::from("global/pg_filenode.map") - } else { - // User defined tablespaces are not supported - assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); - let src_path = format!("{}/base/1/PG_VERSION", snappath); - let dst_path = format!("base/{}/PG_VERSION", db.dbnode); - ar.append_path_with_name(&src_path, &dst_path)?; - format!("base/{}/pg_filenode.map", db.dbnode) - }; - info!("Deliver {}", path); - assert!(img.len() == 512); - let header = new_tar_header(&path, img.len() as u64)?; - ar.append(&header, &img[..])?; - } - Ok(()) -} - -// -// Extract twophase state files -// -fn add_twophase_files( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - lsn: Lsn, -) -> anyhow::Result<()> { - for xid in timeline.get_twophase(lsn)?.iter() { - let tag = BufferTag { - rel: RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum: pg_constants::PG_TWOPHASE_FORKNUM, - }, - blknum: *xid, - }; - let img = timeline.get_page_at_lsn(tag, lsn)?; - let path = format!("pg_twophase/{:>08X}", xid); - let header = new_tar_header(&path, img.len() as u64)?; - ar.append(&header, &img[..])?; - } - Ok(()) -} - -// -// Add generated pg_control file -// -fn add_pgcontrol_file( - ar: &mut Builder<&mut dyn Write>, - timeline: &Arc, - lsn: Lsn, -) -> anyhow::Result<()> { - if let Some(checkpoint_bytes) = - timeline.get_page_image(BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM), Lsn(0))? - { - if let Some(pg_control_bytes) = timeline.get_page_image( - BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM), - Lsn(0), - )? { - let mut pg_control = postgres_ffi::decode_pg_control(pg_control_bytes)?; - let mut checkpoint = postgres_ffi::decode_checkpoint(checkpoint_bytes)?; - - checkpoint.redo = lsn.0; - checkpoint.nextXid.value += 1; - // TODO: When we restart master there are no active transaction and oldestXid is - // equal to nextXid if there are no prepared transactions. - // Let's ignore them for a while... - checkpoint.oldestXid = checkpoint.nextXid.value as u32; - pg_control.checkPointCopy = checkpoint; - let pg_control_bytes = postgres_ffi::encode_pg_control(pg_control); - let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; - ar.append(&header, &pg_control_bytes[..])?; - } - } - Ok(()) -} - /// /// Generate tarball with non-relational files from repository /// pub fn send_tarball_at_lsn( write: &mut dyn Write, timelineid: ZTimelineId, - timeline: &Arc, - lsn: Lsn, + _timeline: &Arc, + _lsn: Lsn, snapshot_lsn: Lsn, ) -> anyhow::Result<()> { let mut ar = Builder::new(write); let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0); + let walpath = format!("timelines/{}/wal", timelineid); debug!("sending tarball of snapshot in {}", snappath); for entry in WalkDir::new(&snappath) { @@ -207,14 +49,8 @@ pub fn send_tarball_at_lsn( trace!("sending shared catalog {}", relpath.display()); ar.append_path_with_name(fullpath, relpath)?; } else if !is_rel_file_path(relpath.to_str().unwrap()) { - if entry.file_name() != "pg_filenode.map" - && entry.file_name() != "pg_control" - && !relpath.starts_with("pg_xact/") - && !relpath.starts_with("pg_multixact/") - { - trace!("sending {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } + trace!("sending {}", relpath.display()); + ar.append_path_with_name(fullpath, relpath)?; } else { trace!("not sending {}", relpath.display()); } @@ -223,31 +59,27 @@ pub fn send_tarball_at_lsn( } } - add_slru_segments( - &mut ar, - timeline, - "pg_xact", - pg_constants::PG_XACT_FORKNUM, - lsn, - )?; - add_slru_segments( - &mut ar, - timeline, - "pg_multixact/members", - pg_constants::PG_MXACT_MEMBERS_FORKNUM, - lsn, - )?; - add_slru_segments( - &mut ar, - timeline, - "pg_multixact/offsets", - pg_constants::PG_MXACT_OFFSETS_FORKNUM, - lsn, - )?; - add_relmap_files(&mut ar, timeline, lsn, &snappath)?; - add_twophase_files(&mut ar, timeline, lsn)?; - add_pgcontrol_file(&mut ar, timeline, lsn)?; + // FIXME: Also send all the WAL. The compute node would only need + // the WAL that applies to non-relation files, because the page + // server handles all the relation files. But we don't have a + // mechanism for separating relation and non-relation WAL at the + // moment. + for entry in std::fs::read_dir(&walpath)? { + let entry = entry?; + let fullpath = &entry.path(); + let relpath = fullpath.strip_prefix(&walpath).unwrap(); + if !entry.path().is_file() { + continue; + } + + let archive_fname = relpath.to_str().unwrap(); + let archive_fname = archive_fname + .strip_suffix(".partial") + .unwrap_or(&archive_fname); + let archive_path = "pg_wal/".to_owned() + archive_fname; + ar.append_path_with_name(fullpath, archive_path)?; + } ar.finish()?; debug!("all tarred up!"); Ok(()) diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 8543d7c8dd..e5fc490038 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -101,6 +101,9 @@ pub fn init_repo(conf: &PageServerConf, repo_dir: &Path) -> Result<()> { // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; + force_crash_recovery(&tmppath)?; + println!("updated pg_control"); + let target = timelinedir.join("snapshots").join(&lsnstr); fs::rename(tmppath, &target)?; @@ -309,6 +312,31 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } +// If control file says the cluster was shut down cleanly, modify it, to mark +// it as crashed. That forces crash recovery when you start the cluster. +// +// FIXME: +// We currently do this to the initial snapshot in "zenith init". It would +// be more natural to do this when the snapshot is restored instead, but we +// currently don't have any code to create new snapshots, so it doesn't matter +// Or better yet, use a less hacky way of putting the cluster into recovery. +// Perhaps create a backup label file in the data directory when it's restored. +fn force_crash_recovery(datadir: &Path) -> Result<()> { + // Read in the control file + let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); + let mut controlfile = + postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?; + + controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; + + fs::write( + controlfilepath.as_path(), + postgres_ffi::encode_pg_control(controlfile), + )?; + + Ok(()) +} + fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { // Create initial timeline let mut tli_buf = [0u8; 16]; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 3376a7b7e1..b699df3023 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -921,7 +921,7 @@ impl Connection { // find latest snapshot let snapshot_lsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); - let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); + let req_lsn = lsn.unwrap_or(snapshot_lsn); basebackup::send_tarball_at_lsn( &mut CopyDataSink { stream }, timelineid, diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 1595ccb0eb..9fd114293e 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,6 @@ pub mod rocksdb; -use crate::waldecoder::{DecodedWALRecord, Oid, TransactionId, XlCreateDatabase, XlSmgrTruncate}; +use crate::waldecoder::{DecodedWALRecord, Oid, XlCreateDatabase, XlSmgrTruncate}; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -48,9 +48,6 @@ pub trait Timeline { /// Does relation exist? fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result; - /// Get page image at the particular LSN - fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result>; - //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. // @@ -166,16 +163,6 @@ pub trait Timeline { /// valid LSN, so that the WAL receiver knows where to restart streaming. fn advance_last_record_lsn(&self, lsn: Lsn); fn get_last_record_lsn(&self) -> Lsn; - - /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations - /// but can be also applied to normal relations. - fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)>; - - /// Get vector of databases (represented using RelTag only dbnode and spcnode fields are used) - fn get_databases(&self, lsn: Lsn) -> Result>; - - /// Get vector of prepared twophase transactions - fn get_twophase(&self, lsn: Lsn) -> Result>; } #[derive(Clone)] @@ -236,18 +223,6 @@ pub struct BufferTag { } impl BufferTag { - pub fn fork(forknum: u8) -> BufferTag { - BufferTag { - rel: RelTag { - forknum, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - } - } - pub fn pack(&self, buf: &mut BytesMut) { self.rel.pack(buf); buf.put_u32(self.blknum); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index d7bccd2112..09fe1ad63b 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -7,7 +7,7 @@ use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; use crate::restore_local_repo::restore_timeline; -use crate::waldecoder::{Oid, TransactionId}; +use crate::waldecoder::Oid; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; @@ -16,8 +16,6 @@ use crate::ZTimelineId; use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; -use postgres_ffi::nonrelfile_utils::transaction_id_get_status; -use postgres_ffi::*; use std::cmp::min; use std::collections::HashMap; use std::convert::TryInto; @@ -417,19 +415,6 @@ impl RocksTimeline { let mut minkey = maxkey.clone(); minkey.lsn = Lsn(0); // first version - // Special handling of delete of PREPARE WAL record - if last_lsn < horizon - && key.tag.rel.forknum == pg_constants::PG_TWOPHASE_FORKNUM - { - if (v[0] & UNUSED_VERSION_FLAG) == 0 { - let mut v = v.to_owned(); - v[0] |= UNUSED_VERSION_FLAG; - self.db.put(key.to_bytes(), &v[..])?; - deleted += 1; - } - maxkey = minkey; - continue; - } // reconstruct most recent page version if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { // force reconstruction of most recent page version @@ -623,116 +608,6 @@ impl Timeline for RocksTimeline { self.relsize_get_nowait(rel, lsn) } - /// Get vector of prepared twophase transactions - fn get_twophase(&self, lsn: Lsn) -> Result> { - let key = CacheKey { - // minimal key - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::PG_TWOPHASE_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut gxacts = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.forknum != pg_constants::PG_TWOPHASE_FORKNUM { - break; // we are done with this fork - } - if key.lsn <= lsn { - let xid = key.tag.blknum; - let tag = BufferTag { - rel: RelTag { - forknum: pg_constants::PG_XACT_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: xid / pg_constants::CLOG_XACTS_PER_PAGE, - }; - let clog_page = self.get_page_at_lsn(tag, lsn)?; - let status = transaction_id_get_status(xid, &clog_page[..]); - if status == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { - gxacts.push(xid); - } - } - iter.next(); - } - Ok(gxacts) - } - - /// Get databases. This function is used to local pg_filenode.map files - fn get_databases(&self, lsn: Lsn) -> Result> { - let key = CacheKey { - // minimal key - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::PG_FILENODEMAP_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut dbs = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - let mut prev_tag = key.tag.rel; - while iter.valid() { - let key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.forknum != pg_constants::PG_FILENODEMAP_FORKNUM { - break; // we are done with this fork - } - if key.tag.rel != prev_tag && key.lsn <= lsn { - prev_tag = key.tag.rel; - dbs.push(prev_tag); // collect unique tags - } - iter.next(); - } - Ok(dbs) - } - - /// Get range [begin,end) of stored blocks. Used mostly for SMGR pseudorelations - /// but can be also applied to normal relations. - fn get_range(&self, rel: RelTag, lsn: Lsn) -> Result<(u32, u32)> { - let _lsn = self.wait_lsn(lsn)?; - let mut key = CacheKey { - // minimal key to start with - tag: BufferTag { rel, blknum: 0 }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); // locate first entry - if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); - let tag = thiskey.tag; - if tag.rel == rel { - // still trversing this relation - let first_blknum = tag.blknum; - key.tag.blknum = u32::MAX; // maximal key - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.to_bytes()); // localte last entry - if iter.valid() { - let thiskey = CacheKey::from_slice(iter.key().unwrap()); - let last_blknum = thiskey.tag.blknum; - return Ok((first_blknum, last_blknum + 1)); // upper boundary is exclusive - } - } - } - Ok((0, 0)) // empty range - } - /// /// Does relation exist at given LSN? /// @@ -811,20 +686,6 @@ impl Timeline for RocksTimeline { Ok(()) } - /// - /// Get page image at particular LSN - /// - fn get_page_image(&self, tag: BufferTag, lsn: Lsn) -> Result> { - let key = CacheKey { tag, lsn }; - if let Some(bytes) = self.db.get(key.to_bytes())? { - let content = CacheEntryContent::from_slice(&bytes); - if let CacheEntryContent::PageImage(img) = content { - return Ok(Some(img)); - } - } - Ok(None) - } - /// /// Memorize a full image of a page version /// @@ -866,42 +727,35 @@ impl Timeline for RocksTimeline { src_db_id: Oid, src_tablespace_id: Oid, ) -> Result<()> { - let mut n = 0; - for forknum in &[ - pg_constants::MAIN_FORKNUM, - pg_constants::FSM_FORKNUM, - pg_constants::VISIBILITYMAP_FORKNUM, - pg_constants::INIT_FORKNUM, - pg_constants::PG_FILENODEMAP_FORKNUM, - ] { - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: *forknum, - }, - blknum: 0, + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(key.to_bytes()); - while iter.valid() { - let mut key = CacheKey::from_slice(iter.key().unwrap()); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - - let v = iter.value().unwrap(); - self.db.put(key.to_bytes(), v)?; - n += 1; - iter.next(); + blknum: 0, + }, + lsn: Lsn(0), + }; + let mut iter = self.db.raw_iterator(); + iter.seek(key.to_bytes()); + let mut n = 0; + while iter.valid() { + let mut key = CacheKey::from_slice(iter.key().unwrap()); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; } + + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + + let v = iter.value().unwrap(); + self.db.put(key.to_bytes(), v)?; + n += 1; + iter.next(); } info!( "Create database {}/{}, copy {} entries", @@ -912,7 +766,6 @@ impl Timeline for RocksTimeline { /// Remember that WAL has been received and added to the timeline up to the given LSN fn advance_last_valid_lsn(&self, lsn: Lsn) { - let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes let old = self.last_valid_lsn.advance(lsn); // Can't move backwards. @@ -930,8 +783,7 @@ impl Timeline for RocksTimeline { /// NOTE: this updates last_valid_lsn as well. /// fn advance_last_record_lsn(&self, lsn: Lsn) { - let lsn = Lsn((lsn.0 + 7) & !7); // align position on 8 bytes - // Can't move backwards. + // Can't move backwards. let old = self.last_record_lsn.fetch_max(lsn); assert!(old <= lsn); diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 47c3b77116..d6d822a1bf 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -26,9 +26,9 @@ use crate::repository::{BufferTag, RelTag, Timeline}; use crate::waldecoder::{decode_wal_record, Oid, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; -use postgres_ffi::*; use zenith_utils::lsn::Lsn; /// @@ -103,7 +103,7 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re } fn restore_snapshot( - conf: &PageServerConf, + _conf: &PageServerConf, timeline: &dyn Timeline, timelineid: ZTimelineId, snapshot: &str, @@ -120,28 +120,8 @@ fn restore_snapshot( None => continue, // These special files appear in the snapshot, but are not needed by the page server - Some("pg_control") => restore_nonrel_file( - conf, - timeline, - timelineid, - "0", - 0, - 0, - pg_constants::PG_CONTROLFILE_FORKNUM, - 0, - &direntry.path(), - )?, - Some("pg_filenode.map") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::GLOBALTABLESPACE_OID, - 0, - pg_constants::PG_FILENODEMAP_FORKNUM, - 0, - &direntry.path(), - )?, + Some("pg_control") => continue, + Some("pg_filenode.map") => continue, // Load any relation files into the page server _ => restore_relfile( @@ -168,17 +148,7 @@ fn restore_snapshot( // These special files appear in the snapshot, but are not needed by the page server Some("PG_VERSION") => continue, - Some("pg_filenode.map") => restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::DEFAULTTABLESPACE_OID, - dboid, - pg_constants::PG_FILENODEMAP_FORKNUM, - 0, - &direntry.path(), - )?, + Some("pg_filenode.map") => continue, // Load any relation files into the page server _ => restore_relfile( @@ -191,54 +161,6 @@ fn restore_snapshot( } } } - for entry in fs::read_dir(snapshotpath.join("pg_xact"))? { - let entry = entry?; - restore_slru_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::PG_XACT_FORKNUM, - &entry.path(), - )?; - } - for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("members"))? { - let entry = entry?; - restore_slru_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::PG_MXACT_MEMBERS_FORKNUM, - &entry.path(), - )?; - } - for entry in fs::read_dir(snapshotpath.join("pg_multixact").join("offsets"))? { - let entry = entry?; - restore_slru_file( - conf, - timeline, - timelineid, - snapshot, - pg_constants::PG_MXACT_OFFSETS_FORKNUM, - &entry.path(), - )?; - } - for entry in fs::read_dir(snapshotpath.join("pg_twophase"))? { - let entry = entry?; - let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?; - restore_nonrel_file( - conf, - timeline, - timelineid, - snapshot, - 0, - 0, - pg_constants::PG_TWOPHASE_FORKNUM, - xid, - &entry.path(), - )?; - } // TODO: Scan pg_tblspc Ok(()) @@ -306,96 +228,6 @@ fn restore_relfile( Ok(()) } -fn restore_nonrel_file( - _conf: &PageServerConf, - timeline: &dyn Timeline, - _timelineid: ZTimelineId, - snapshot: &str, - spcoid: Oid, - dboid: Oid, - forknum: u8, - blknum: u32, - path: &Path, -) -> Result<()> { - let lsn = Lsn::from_hex(snapshot)?; - - // Does it look like a relation file? - - let mut file = File::open(path)?; - let mut buffer = Vec::new(); - // read the whole file - file.read_to_end(&mut buffer)?; - - let tag = BufferTag { - rel: RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode: 0, - forknum, - }, - blknum, - }; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..])); - Ok(()) -} - -fn restore_slru_file( - _conf: &PageServerConf, - timeline: &dyn Timeline, - _timelineid: ZTimelineId, - snapshot: &str, - forknum: u8, - path: &Path, -) -> Result<()> { - let lsn = Lsn::from_hex(snapshot)?; - - // Does it look like a relation file? - - let mut file = File::open(path)?; - let mut buf: [u8; 8192] = [0u8; 8192]; - let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; - - let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; - loop { - let r = file.read_exact(&mut buf); - match r { - Ok(_) => { - let tag = BufferTag { - rel: RelTag { - spcnode: 0, - dbnode: 0, - relnode: 0, - forknum, - }, - blknum, - }; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); - /* - if oldest_lsn == 0 || p.lsn < oldest_lsn { - oldest_lsn = p.lsn; - } - */ - } - - // TODO: UnexpectedEof is expected - Err(e) => match e.kind() { - std::io::ErrorKind::UnexpectedEof => { - // reached EOF. That's expected. - // FIXME: maybe check that we read the full length of the file? - break; - } - _ => { - error!("error reading file: {:?} ({})", path, e); - break; - } - }, - }; - blknum += 1; - } - - Ok(()) -} - // Scan WAL on a timeline, starting from given LSN, and load all the records // into the page cache. fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> { @@ -406,17 +238,6 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = Lsn(0); - - let mut checkpoint = CheckPoint::new(startpoint.0, 1); - let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM); - let pg_control_tag = BufferTag::fork(pg_constants::PG_CONTROLFILE_FORKNUM); - if let Some(pg_control_bytes) = timeline.get_page_image(pg_control_tag, Lsn(0))? { - let pg_control = decode_pg_control(pg_control_bytes)?; - checkpoint = pg_control.checkPointCopy.clone(); - } else { - error!("No control file is found in reposistory"); - } - loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); @@ -454,12 +275,10 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn if rec.is_err() { // Assume that an error means we've reached the end of // a partial WAL record. So that's ok. - trace!("WAL decoder error {:?}", rec); - waldecoder.set_position(Lsn((segno + 1) * pg_constants::WAL_SEGMENT_SIZE as u64)); break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); + let decoded = decode_wal_record(recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; last_lsn = lsn; } else { @@ -468,16 +287,12 @@ fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn nrecords += 1; } - info!( - "restored {} records from WAL file {} at {}", - nrecords, filename, last_lsn - ); + info!("restored {} records from WAL file {}", nrecords, filename); segno += 1; offset = 0; } info!("reached end of WAL at {}", last_lsn); - let checkpoint_bytes = encode_checkpoint(checkpoint); - timeline.put_page_image(checkpoint_tag, Lsn(0), checkpoint_bytes); + Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 375bd6470a..b726bf5a0d 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -1,7 +1,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::XLogRecord; -use postgres_ffi::*; use std::cmp::min; use std::str; use thiserror::Error; @@ -14,19 +14,17 @@ pub type OffsetNumber = u16; pub type MultiXactId = TransactionId; pub type MultiXactOffset = u32; pub type MultiXactStatus = u32; -pub type TimeLineID = u32; -pub type PgTime = i64; // From PostgreSQL headers #[repr(C)] #[derive(Debug)] pub struct XLogPageHeaderData { - xlp_magic: u16, /* magic value for correctness checks */ - xlp_info: u16, /* flag bits, see below */ - xlp_tli: TimeLineID, /* TimeLineID of first record on page */ - xlp_pageaddr: u64, /* XLOG address of this page */ - xlp_rem_len: u32, /* total len of remaining data for record */ + xlp_magic: u16, /* magic value for correctness checks */ + xlp_info: u16, /* flag bits, see below */ + xlp_tli: u32, /* TimeLineID of first record on page */ + xlp_pageaddr: u64, /* XLOG address of this page */ + xlp_rem_len: u32, /* total len of remaining data for record */ } // FIXME: this assumes MAXIMUM_ALIGNOF 8. There are 4 padding bytes at end @@ -84,13 +82,6 @@ impl WalStreamDecoder { } } - pub fn set_position(&mut self, lsn: Lsn) { - self.lsn = lsn; - self.contlen = 0; - self.padlen = 0; - self.inputbuf.clear(); - } - pub fn feed_bytes(&mut self, buf: &[u8]) { self.inputbuf.extend_from_slice(buf); } @@ -556,7 +547,7 @@ impl XlMultiXactTruncate { // block data // ... // main data -pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedWALRecord { +pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; let mut rnode_relnode: u32 = 0; @@ -574,12 +565,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW xlogrec.xl_rmid, xlogrec.xl_info ); - if xlogrec.xl_xid > checkpoint.nextXid.value as u32 { - // TODO: handle XID wraparound - checkpoint.nextXid = FullTransactionId { - value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | xlogrec.xl_xid as u64, - }; - } + let remaining = xlogrec.xl_tot_len - SizeOfXLogRecord; if buf.remaining() != remaining as usize { @@ -790,30 +776,10 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW assert_eq!(buf.remaining(), main_data_len as usize); } - //5. Handle special CLOG and XACT records - if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = buf.get_i32_le() as u32; - blk.will_init = true; - trace!("RM_CLOG_ID updates block {}", blk.blkno); - blocks.push(blk); - } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + //5. Handle special XACT records + if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; - trace!( - "XLOG_XACT_COMMIT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xlogrec.xl_info, (xlogrec.xl_prev >> 32), - xlogrec.xl_prev & 0xffffffff, - xlogrec.xl_xid, - blk.blkno, - main_data_len - ); - blocks.push(blk); - //parse commit record to extract subtrans entries // xl_xact_commit starts with time of commit let _xact_time = buf.get_i64_le(); @@ -828,17 +794,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); - let mut prev_blkno = u32::MAX; for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; - if prev_blkno != blkno { - prev_blkno = blkno; - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = blkno; - blocks.push(blk); - } + let _subxact = buf.get_u32_le(); } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { @@ -869,18 +826,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW //TODO handle this to be able to restore pg_twophase on node start } } else if info == pg_constants::XLOG_XACT_ABORT { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = xlogrec.xl_xid / pg_constants::CLOG_XACTS_PER_PAGE; - trace!( - "XLOG_XACT_ABORT xl_info {} xl_prev {:X}/{:X} xid {} updates block {} main_data_len {}", - xlogrec.xl_info, (xlogrec.xl_prev >> 32), - xlogrec.xl_prev & 0xffffffff, - xlogrec.xl_xid, - blk.blkno, - main_data_len - ); - blocks.push(blk); //parse abort record to extract subtrans entries // xl_xact_abort starts with time of commit let _xact_time = buf.get_i64_le(); @@ -895,17 +840,8 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { let nsubxacts = buf.get_i32_le(); - let mut prev_blkno = u32::MAX; for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; - if prev_blkno != blkno { - prev_blkno = blkno; - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_XACT_FORKNUM; - blk.blkno = blkno; - blocks.push(blk); - } + let _subxact = buf.get_u32_le(); } } if xinfo & pg_constants::XACT_XINFO_HAS_RELFILENODES != 0 { @@ -927,11 +863,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW let _xid = buf.get_u32_le(); trace!("XLOG_XACT_ABORT-XACT_XINFO_HAS_TWOPHASE"); } - } else if info == pg_constants::XLOG_XACT_PREPARE { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_TWOPHASE_FORKNUM; - blk.blkno = xlogrec.xl_xid; - blk.will_init = true; } } else if xlogrec.xl_rmid == pg_constants::RM_DBASE_ID { let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; @@ -964,7 +895,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW trace!("XLOG_TBLSPC_DROP is not handled yet"); } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; let blkno = blocks[0].blkno / pg_constants::HEAPBLOCKS_PER_PAGE as u32; if info == pg_constants::XLOG_HEAP_INSERT { let xlrec = XlHeapInsert::decode(&mut buf); @@ -1018,7 +949,7 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW } } } else if xlogrec.xl_rmid == pg_constants::RM_HEAP2_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_HEAP2_MULTI_INSERT { let xlrec = XlHeapMultiInsert::decode(&mut buf); if (xlrec.flags @@ -1036,104 +967,6 @@ pub fn decode_wal_record(checkpoint: &mut CheckPoint, record: Bytes) -> DecodedW blocks.push(blk); } } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; - blk.blkno = buf.get_u32_le(); - blk.will_init = true; - blocks.push(blk); - } else if info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; - blk.blkno = buf.get_u32_le(); - blk.will_init = true; - blocks.push(blk); - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - // Update offset page - let mut blk = DecodedBkpBlock::new(); - blk.blkno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; - blocks.push(blk); - let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_blkno = - (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - for blkno in first_mbr_blkno..=last_mbr_blkno { - // Update members page - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; - blk.blkno = blkno; - blocks.push(blk); - } - if xlrec.mid > checkpoint.nextMulti { - // See MultiXactAdvanceNextMXact in postgres code - checkpoint.nextMulti = xlrec.mid + 1; - } - if xlrec.moff > checkpoint.nextMultiOffset { - // See MultiXactAdvanceNextMXact in postgres code - checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; - } - let max_xid = xlrec - .members - .iter() - .fold(checkpoint.nextXid.value as u32, |acc, mbr| { - if mbr.xid > acc { - mbr.xid - } else { - acc - } - }); - checkpoint.nextXid = FullTransactionId { - value: (checkpoint.nextXid.value & 0xFFFFFFFF00000000) | max_xid as u64, - }; - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - let xlrec = XlMultiXactTruncate::decode(&mut buf); - checkpoint.oldestXid = xlrec.end_trunc_off; - checkpoint.oldestMultiDB = xlrec.oldest_multi_db; - let first_off_blkno = - xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let last_off_blkno = - xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - for blkno in first_off_blkno..last_off_blkno { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_OFFSETS_FORKNUM; - blk.blkno = blkno; - blk.will_init = true; - blocks.push(blk); - } - let first_mbr_blkno = - xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_blkno = - xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - for blkno in first_mbr_blkno..last_mbr_blkno { - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_MXACT_MEMBERS_FORKNUM; - blk.blkno = blkno; - blk.will_init = true; - blocks.push(blk); - } - } else { - panic!() - } - } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { - let xlrec = XlRelmapUpdate::decode(&mut buf); - let mut blk = DecodedBkpBlock::new(); - blk.forknum = pg_constants::PG_FILENODEMAP_FORKNUM; - blk.rnode_spcnode = xlrec.tsid; - blk.rnode_dbnode = xlrec.dbid; - blk.rnode_relnode = 0; - blk.will_init = true; - blocks.push(blk); - } else if xlogrec.xl_rmid == pg_constants::RM_XLOG_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_NEXTOID { - let next_oid = buf.get_u32_le(); - if next_oid > checkpoint.nextOid { - checkpoint.nextOid = next_oid; - } - } } DecodedWALRecord { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e8992f944f..fbcf67eb39 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,7 +7,6 @@ //! use crate::page_cache; -use crate::repository::*; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -17,8 +16,8 @@ use log::*; use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; +use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; -use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::collections::HashMap; @@ -147,11 +146,6 @@ fn walreceiver_main( error!("No previous WAL position"); } - startpoint = Lsn::max( - startpoint, - Lsn(end_of_wal.0 & !(pg_constants::WAL_SEGMENT_SIZE as u64 - 1)), - ); - // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning @@ -171,14 +165,6 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let mut checkpoint = CheckPoint::new(startpoint.0, identify.timeline); - let checkpoint_tag = BufferTag::fork(pg_constants::PG_CHECKPOINT_FORKNUM); - if let Some(checkpoint_bytes) = timeline.get_page_image(checkpoint_tag, Lsn(0))? { - checkpoint = decode_checkpoint(checkpoint_bytes)?; - trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); - } else { - error!("No checkpoint record was found in reposistory"); - } while let Some(replication_message) = physical_stream.next()? { match replication_message { ReplicationMessage::XLogData(xlog_data) => { @@ -195,14 +181,9 @@ fn walreceiver_main( waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { - let old_checkpoint_bytes = encode_checkpoint(checkpoint); - let decoded = decode_wal_record(&mut checkpoint, recdata.clone()); + let decoded = decode_wal_record(recdata.clone()); timeline.save_decoded_record(decoded, recdata, lsn)?; - let new_checkpoint_bytes = encode_checkpoint(checkpoint); - if new_checkpoint_bytes != old_checkpoint_bytes { - timeline.put_page_image(checkpoint_tag, Lsn(0), new_checkpoint_bytes); - } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN timeline.advance_last_record_lsn(lsn); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index ab18ad9205..97ef72ad2d 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -14,8 +14,7 @@ //! TODO: Even though the postgres code runs in a separate process, //! it's not a secure sandbox. //! -use byteorder::{ByteOrder, LittleEndian}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use log::*; use std::assert; use std::cell::RefCell; @@ -37,11 +36,7 @@ use zenith_utils::lsn::Lsn; use crate::repository::BufferTag; use crate::repository::WALRecord; -use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; -use postgres_ffi::nonrelfile_utils::transaction_id_set_status; -use postgres_ffi::pg_constants; -use postgres_ffi::xlog_utils::XLogRecord; /// /// WAL Redo Manager is responsible for replaying WAL records. @@ -169,24 +164,6 @@ impl WalRedoManager for PostgresRedoManager { } } -fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize { - ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16 - % pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE - * pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize -} - -fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 { - (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP - * pg_constants::MXACT_MEMBER_BITS_PER_XACT -} - -/* Location (byte offset within page) of TransactionId of given member */ -fn mx_offset_to_member_offset(xid: MultiXactId) -> usize { - mx_offset_to_flags_offset(xid) - + (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP - + (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize -} - /// /// WAL redo thread /// @@ -255,151 +232,7 @@ impl PostgresRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - if tag.rel.forknum > pg_constants::INIT_FORKNUM { - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let mut page = BytesMut::new(); - if let Some(fpi) = base_img { - page.extend_from_slice(&fpi[..]); - } else { - page.extend_from_slice(&ZERO_PAGE); - } - for record in records { - let mut buf = record.rec.clone(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let xlogrec = XLogRecord::from_bytes(&mut buf); - - //move to main data - // TODO probably, we should store some records in our special format - // to avoid this weird parsing on replay - let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; - if buf.remaining() > skip { - buf.advance(skip); - } - - if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { - let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::CLOG_ZEROPAGE { - page.copy_from_slice(&ZERO_PAGE); - } - } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { - let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - let mut status = 0; - if info == pg_constants::XLOG_XACT_COMMIT { - status = pg_constants::TRANSACTION_STATUS_COMMITTED; - transaction_id_set_status(xlogrec.xl_xid, status, &mut page); - //handle subtrans - let _xact_time = buf.get_i64_le(); - let mut xinfo = 0; - if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - xinfo = buf.get_u32_le(); - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - let _dbid = buf.get_u32_le(); - let _tsid = buf.get_u32_le(); - } - } - - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - // only update xids on the requested page - if tag.blknum == blkno { - status = pg_constants::TRANSACTION_STATUS_SUB_COMMITTED; - transaction_id_set_status(subxact, status, &mut page); - } - } - } - } else if info == pg_constants::XLOG_XACT_ABORT { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - transaction_id_set_status(xlogrec.xl_xid, status, &mut page); - //handle subtrans - let _xact_time = buf.get_i64_le(); - let mut xinfo = 0; - if xlogrec.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { - xinfo = buf.get_u32_le(); - if xinfo & pg_constants::XACT_XINFO_HAS_DBINFO != 0 { - let _dbid = buf.get_u32_le(); - let _tsid = buf.get_u32_le(); - } - } - - if xinfo & pg_constants::XACT_XINFO_HAS_SUBXACTS != 0 { - let nsubxacts = buf.get_i32_le(); - for _i in 0..nsubxacts { - let subxact = buf.get_u32_le(); - let blkno = subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; - // only update xids on the requested page - if tag.blknum == blkno { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - transaction_id_set_status(subxact, status, &mut page); - } - } - } - } else if info != pg_constants::XLOG_XACT_PREPARE { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", - status, - record.lsn, - record.main_data_offset, record.rec.len()); - } - } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { - let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE - || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE - { - page.copy_from_slice(&ZERO_PAGE); - } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { - let xlrec = XlMultiXactCreate::decode(&mut buf); - if tag.rel.forknum == pg_constants::PG_MXACT_OFFSETS_FORKNUM { - let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); - } else { - assert!(tag.rel.forknum == pg_constants::PG_MXACT_MEMBERS_FORKNUM); - for i in 0..xlrec.nmembers { - let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - if blkno == tag.blknum { - // update only target block - let offset = xlrec.moff + i; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= - !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) - << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); - } - } - } - } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { - // empty page image indicates that this SLRU page is truncated and can be removed by GC - page.clear(); - } else { - panic!(); - } - } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { - page.clear(); - page.extend_from_slice(&buf[12..]); // skip xl_relmap_update - assert!(page.len() == 512); // size of pg_filenode.map - } - } - - apply_result = Ok::(page.freeze()); - } else { - apply_result = process.apply_wal_records(tag, base_img, records).await; - } + apply_result = process.apply_wal_records(tag, base_img, records).await; let duration = start.elapsed(); diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index c15f19188f..b834bd99db 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -18,8 +18,6 @@ fn main() { // included header files changed. .parse_callbacks(Box::new(bindgen::CargoCallbacks)) .whitelist_type("ControlFileData") - .whitelist_type("CheckPoint") - .whitelist_type("FullTransactionId") .whitelist_var("PG_CONTROL_FILE_SIZE") .whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") .whitelist_type("DBState") diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 29a01c3445..a1a12a901d 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -3,7 +3,6 @@ #![allow(non_snake_case)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); -pub mod nonrelfile_utils; pub mod pg_constants; pub mod relfile_utils; pub mod xlog_utils; @@ -12,7 +11,6 @@ use bytes::{Buf, Bytes, BytesMut}; // sizeof(ControlFileData) const SIZEOF_CONTROLDATA: usize = std::mem::size_of::(); -const SIZEOF_CHECKPOINT: usize = std::mem::size_of::(); const OFFSETOF_CRC: usize = PG_CONTROLFILEDATA_OFFSETOF_CRC as usize; impl ControlFileData { @@ -71,42 +69,3 @@ pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes { buf.into() } - -pub fn encode_checkpoint(checkpoint: CheckPoint) -> Bytes { - let b: [u8; SIZEOF_CHECKPOINT]; - b = unsafe { std::mem::transmute::(checkpoint) }; - Bytes::copy_from_slice(&b[..]) -} - -pub fn decode_checkpoint(mut buf: Bytes) -> Result { - let mut b = [0u8; SIZEOF_CHECKPOINT]; - buf.copy_to_slice(&mut b); - let checkpoint: CheckPoint; - checkpoint = unsafe { std::mem::transmute::<[u8; SIZEOF_CHECKPOINT], CheckPoint>(b) }; - Ok(checkpoint) -} - -impl CheckPoint { - pub fn new(lsn: u64, timeline: u32) -> CheckPoint { - CheckPoint { - redo: lsn, - ThisTimeLineID: timeline, - PrevTimeLineID: timeline, - fullPageWrites: true, // TODO: get actual value of full_page_writes - nextXid: FullTransactionId { - value: pg_constants::FIRST_NORMAL_TRANSACTION_ID as u64, - }, // TODO: handle epoch? - nextOid: pg_constants::FIRST_BOOTSTRAP_OBJECT_ID, - nextMulti: 1, - nextMultiOffset: 0, - oldestXid: pg_constants::FIRST_NORMAL_TRANSACTION_ID, - oldestXidDB: 0, - oldestMulti: 1, - oldestMultiDB: 0, - time: 0, - oldestCommitTsXid: 0, - newestCommitTsXid: 0, - oldestActiveXid: pg_constants::INVALID_TRANSACTION_ID, - } - } -} diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 29d07b8c0c..19abe5f741 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -15,32 +15,10 @@ pub const MAIN_FORKNUM: u8 = 0; pub const FSM_FORKNUM: u8 = 1; pub const VISIBILITYMAP_FORKNUM: u8 = 2; pub const INIT_FORKNUM: u8 = 3; -// Special values for non-rel files' tags (Zenith-specific) -//Special values for non-rel files' tags -pub const PG_CONTROLFILE_FORKNUM: u8 = 42; -pub const PG_FILENODEMAP_FORKNUM: u8 = 43; -pub const PG_XACT_FORKNUM: u8 = 44; -pub const PG_MXACT_OFFSETS_FORKNUM: u8 = 45; -pub const PG_MXACT_MEMBERS_FORKNUM: u8 = 46; -pub const PG_TWOPHASE_FORKNUM: u8 = 47; -pub const PG_CHECKPOINT_FORKNUM: u8 = 48; // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - -// -// constants from clog.h -// -pub const CLOG_XACTS_PER_BYTE: u32 = 4; -pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; -pub const CLOG_BITS_PER_XACT: u8 = 2; -pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; - // // Constants from visbilitymap.h // @@ -48,22 +26,15 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; -pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00; pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; -pub const CLOG_ZEROPAGE: u8 = 0x00; -pub const CLOG_TRUNCATE: u8 = 0x10; - // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10; pub const XLOG_XACT_ABORT: u8 = 0x20; -// From srlu.h -pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; - /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; /* does this record have a 'xinfo' field or not */ @@ -83,29 +54,9 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // From pg_control.h and rmgrlist.h -pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; -// From multixact.h -pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00; -pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10; -pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20; -pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30; - -pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4; -pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8; -pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1; -pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4; -pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 = - MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE; -/* size in bytes of a complete group */ -pub const MULTIXACT_MEMBERGROUP_SIZE: u16 = - 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP; -pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE; -pub const MULTIXACT_MEMBERS_PER_PAGE: u16 = - MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP; - // From heapam_xlog.h pub const XLOG_HEAP_INSERT: u8 = 0x00; pub const XLOG_HEAP_DELETE: u8 = 0x10; @@ -144,6 +95,11 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const SIZEOF_XLOGRECORD: u32 = 24; +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); + // // from xlogrecord.h // @@ -166,11 +122,5 @@ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ -/* From transam.h */ -pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; -pub const INVALID_TRANSACTION_ID: u32 = 0; -pub const FIRST_BOOTSTRAP_OBJECT_ID: u32 = 12000; -pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; - /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; diff --git a/postgres_ffi/src/relfile_utils.rs b/postgres_ffi/src/relfile_utils.rs index c292ff9582..97c8f0afea 100644 --- a/postgres_ffi/src/relfile_utils.rs +++ b/postgres_ffi/src/relfile_utils.rs @@ -38,16 +38,6 @@ pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { pg_constants::FSM_FORKNUM => Some("fsm"), pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"), pg_constants::INIT_FORKNUM => Some("init"), - - // These should not appear in WAL records, but we use them internally, - // and need to be prepared to print them out in log messages and such - pg_constants::PG_CONTROLFILE_FORKNUM => Some("controlfile"), - pg_constants::PG_FILENODEMAP_FORKNUM => Some("filenodemap"), - pg_constants::PG_XACT_FORKNUM => Some("xact"), - pg_constants::PG_MXACT_OFFSETS_FORKNUM => Some("mxact_offsets"), - pg_constants::PG_MXACT_MEMBERS_FORKNUM => Some("mxact_members"), - pg_constants::PG_TWOPHASE_FORKNUM => Some("twophase"), - _ => Some("UNKNOWN FORKNUM"), } } diff --git a/vendor/postgres b/vendor/postgres index fd9de093b0..62852ad64e 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit fd9de093b0e115459c8816895d2ac37da2599aca +Subproject commit 62852ad64e0a12d806aa3888f213ddcce5279c0f diff --git a/walkeeper/src/timeline.rs b/walkeeper/src/timeline.rs index f110a1e293..70cf0d5bf0 100644 --- a/walkeeper/src/timeline.rs +++ b/walkeeper/src/timeline.rs @@ -233,9 +233,7 @@ impl TimelineTools for Option> { fn find_end_of_wal(&self, data_dir: &Path, precise: bool) -> (Lsn, TimeLineID) { let seg_size = self.get().get_info().server.wal_seg_size as usize; let (lsn, timeline) = find_end_of_wal(data_dir, seg_size, precise); - let wal_start = Lsn((seg_size * 2) as u64); // FIXME: handle pg_resetwal - let lsn = Lsn::max(Lsn(lsn), wal_start); - (lsn, timeline) + (Lsn(lsn), timeline) } }