diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index b43d6f1fcc..4dac62baa1 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -303,6 +303,8 @@ impl PostgresNode { ), )?; + fs::create_dir_all(self.pgdata().join("pg_wal"))?; + fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; Ok(()) } diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index e467609ff9..26cba88d2b 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -4,181 +4,261 @@ //! TODO: this module has nothing to do with PostgreSQL pg_basebackup. //! It could use a better name. //! +//! Stateless Postgres compute node is lauched by sending taball which contains on-relational data (multixacts, clog, filenodemaps, twophase files) +//! and generate pg_control and dummy segment of WAL. This module is responsible for creation of such tarball from snapshot directry and +//! data stored in object storage. +//! use crate::ZTimelineId; +use bytes::{BufMut, BytesMut}; use log::*; use std::io::Write; use std::sync::Arc; -use tar::Builder; +use std::time::SystemTime; +use tar::{Builder, Header}; use walkdir::WalkDir; +use crate::object_key::*; use crate::repository::Timeline; use postgres_ffi::relfile_utils::*; +use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use zenith_utils::lsn::Lsn; -/// -/// Generate tarball with non-relational files from repository -/// -pub fn send_tarball_at_lsn( - write: &mut dyn Write, - timelineid: ZTimelineId, - _timeline: &Arc, - _lsn: Lsn, - snapshot_lsn: Lsn, -) -> anyhow::Result<()> { - let mut ar = Builder::new(write); - - let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0); - let walpath = format!("timelines/{}/wal", timelineid); - - debug!("sending tarball of snapshot in {}", snappath); - for entry in WalkDir::new(&snappath) { - let entry = entry?; - let fullpath = entry.path(); - let relpath = entry.path().strip_prefix(&snappath).unwrap(); - - if relpath.to_str().unwrap() == "" { - continue; - } - - if entry.file_type().is_dir() { - trace!( - "sending dir {} as {}", - fullpath.display(), - relpath.display() - ); - ar.append_dir(relpath, fullpath)?; - } else if entry.file_type().is_symlink() { - error!("ignoring symlink in snapshot dir"); - } else if entry.file_type().is_file() { - // Shared catalogs are exempt - if relpath.starts_with("global/") { - trace!("sending shared catalog {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else if !is_rel_file_path(relpath.to_str().unwrap()) { - trace!("sending {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else { - trace!("not sending {}", relpath.display()); - } - } else { - error!("unknown file type: {}", fullpath.display()); - } - } - - // FIXME: Also send all the WAL. The compute node would only need - // the WAL that applies to non-relation files, because the page - // server handles all the relation files. But we don't have a - // mechanism for separating relation and non-relation WAL at the - // moment. - for entry in std::fs::read_dir(&walpath)? { - let entry = entry?; - let fullpath = &entry.path(); - let relpath = fullpath.strip_prefix(&walpath).unwrap(); - - if !entry.path().is_file() { - continue; - } - - let archive_fname = relpath.to_str().unwrap(); - let archive_fname = archive_fname - .strip_suffix(".partial") - .unwrap_or(&archive_fname); - let archive_path = "pg_wal/".to_owned() + archive_fname; - ar.append_path_with_name(fullpath, archive_path)?; - } - ar.finish()?; - debug!("all tarred up!"); - Ok(()) +/// This is short-living object only for the time of tarball creation, +/// created mostly to avoid passing a lot of parameters between various functions +/// used for constructing tarball. +pub struct Basebackup<'a> { + ar: Builder<&'a mut dyn Write>, + timeline: &'a Arc, + lsn: Lsn, + snappath: String, + slru_buf: [u8; pg_constants::SLRU_SEG_SIZE], + slru_segno: u32, + slru_path: &'static str, } -/// -/// Send a tarball containing a snapshot of all non-relation files in the -/// PostgreSQL data directory, at given LSN -/// -/// There must be a snapshot at the given LSN in the snapshots directory, we cannot -/// reconstruct the state at an arbitrary LSN at the moment. -/// -pub fn send_snapshot_tarball( - write: &mut dyn Write, - timelineid: ZTimelineId, - snapshotlsn: Lsn, -) -> Result<(), std::io::Error> { - let mut ar = Builder::new(write); - - let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshotlsn.0); - let walpath = format!("timelines/{}/wal", timelineid); - - debug!("sending tarball of snapshot in {}", snappath); - //ar.append_dir_all("", &snappath)?; - - for entry in WalkDir::new(&snappath) { - let entry = entry?; - let fullpath = entry.path(); - let relpath = entry.path().strip_prefix(&snappath).unwrap(); - - if relpath.to_str().unwrap() == "" { - continue; +impl<'a> Basebackup<'a> { + pub fn new( + write: &'a mut dyn Write, + timelineid: ZTimelineId, + timeline: &'a Arc, + lsn: Lsn, + snapshot_lsn: Lsn, + ) -> Basebackup<'a> { + Basebackup { + ar: Builder::new(write), + timeline, + lsn, + snappath: format!("timelines/{}/snapshots/{:016X}", timelineid, snapshot_lsn.0), + slru_path: "", + slru_segno: u32::MAX, + slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE], } + } - if entry.file_type().is_dir() { - trace!( - "sending dir {} as {}", - fullpath.display(), - relpath.display() - ); - ar.append_dir(relpath, fullpath)?; - } else if entry.file_type().is_symlink() { - error!("ignoring symlink in snapshot dir"); - } else if entry.file_type().is_file() { - // Shared catalogs are exempt - if relpath.starts_with("global/") { - trace!("sending shared catalog {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else if !is_rel_file_path(relpath.to_str().unwrap()) { - trace!("sending {}", relpath.display()); - ar.append_path_with_name(fullpath, relpath)?; - } else { - trace!("not sending {}", relpath.display()); + pub fn send_tarball(&mut self) -> anyhow::Result<()> { + debug!("sending tarball of snapshot in {}", self.snappath); + for entry in WalkDir::new(&self.snappath) { + let entry = entry?; + let fullpath = entry.path(); + let relpath = entry.path().strip_prefix(&self.snappath).unwrap(); - // FIXME: For now, also send all the relation files. - // This really shouldn't be necessary, and kind of - // defeats the point of having a page server in the - // first place. But it is useful at least when - // debugging with the DEBUG_COMPARE_LOCAL option (see - // vendor/postgres/src/backend/storage/smgr/pagestore_smgr.c) - - ar.append_path_with_name(fullpath, relpath)?; + if relpath.to_str().unwrap() == "" { + continue; } + + if entry.file_type().is_dir() { + trace!( + "sending dir {} as {}", + fullpath.display(), + relpath.display() + ); + self.ar.append_dir(relpath, fullpath)?; + } else if entry.file_type().is_symlink() { + error!("ignoring symlink in snapshot dir"); + } else if entry.file_type().is_file() { + if !is_rel_file_path(relpath.to_str().unwrap()) { + if entry.file_name() != "pg_filenode.map" // this files will be generated from object storage + && !relpath.starts_with("pg_xact/") + && !relpath.starts_with("pg_multixact/") + { + trace!("sending {}", relpath.display()); + self.ar.append_path_with_name(fullpath, relpath)?; + } + } else { + // relation pages are loaded on demand and should not be included in tarball + trace!("not sending {}", relpath.display()); + } + } else { + error!("unknown file type: {}", fullpath.display()); + } + } + + // Generate non-relational files. + // Iteration is sorted order: all objects of the same time are grouped and traversed + // in key ascending order. For example all pg_xact records precede pg_multixact records and are sorted by block number. + // It allows to easily construct SLRU segments (32 blocks). + for obj in self.timeline.list_nonrels(self.lsn)? { + match obj { + ObjectTag::Clog(slru) => self.add_slru_segment("pg_xact", &obj, slru.blknum)?, + ObjectTag::MultiXactMembers(slru) => { + self.add_slru_segment("pg_multixact/members", &obj, slru.blknum)? + } + ObjectTag::MultiXactOffsets(slru) => { + self.add_slru_segment("pg_multixact/offsets", &obj, slru.blknum)? + } + ObjectTag::FileNodeMap(db) => self.add_relmap_file(&obj, &db)?, + ObjectTag::TwoPhase(prepare) => self.add_twophase_file(&obj, prepare.xid)?, + _ => {} + } + } + self.finish_slru_segment()?; // write last non-completed SLRU segment (if any) + self.add_pgcontrol_file()?; + self.ar.finish()?; + debug!("all tarred up!"); + Ok(()) + } + + // + // Generate SRLU segment files from repository. Path identifiers SLRU kind (pg_xact, pg_multixact/members, ...). + // Intiallly pass is empty string. + // + fn add_slru_segment( + &mut self, + path: &'static str, + tag: &ObjectTag, + page: u32, + ) -> anyhow::Result<()> { + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + // Zero length image indicates truncated segment: just skip it + if !img.is_empty() { + assert!(img.len() == pg_constants::BLCKSZ as usize); + let segno = page / pg_constants::SLRU_PAGES_PER_SEGMENT; + if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) { + // Switch to new segment: save old one + let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); + let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; + self.ar.append(&header, &self.slru_buf[..])?; + self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; // reinitialize segment buffer + } + self.slru_segno = segno; + self.slru_path = path; + let offs_start = (page % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize + * pg_constants::BLCKSZ as usize; + let offs_end = offs_start + pg_constants::BLCKSZ as usize; + self.slru_buf[offs_start..offs_end].copy_from_slice(&img); + } + Ok(()) + } + + // + // We flush SLRU segments to the tarball once them are completed. + // This method is used to flush last (may be incompleted) segment. + // + fn finish_slru_segment(&mut self) -> anyhow::Result<()> { + if self.slru_path != "" { + // is there is some incompleted segment + let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); + let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; + self.ar.append(&header, &self.slru_buf[..])?; + } + Ok(()) + } + + // + // Extract pg_filenode.map files from repository + // + fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + info!("add_relmap_file {:?}", db); + let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { + String::from("global/pg_filenode.map") // filenode map for global tablespace } else { - error!("unknown file type: {}", fullpath.display()); - } + // User defined tablespaces are not supported + assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); + let src_path = format!("{}/base/1/PG_VERSION", self.snappath); + let dst_path = format!("base/{}/PG_VERSION", db.dbnode); + self.ar.append_path_with_name(&src_path, &dst_path)?; + format!("base/{}/pg_filenode.map", db.dbnode) + }; + assert!(img.len() == 512); + let header = new_tar_header(&path, img.len() as u64)?; + self.ar.append(&header, &img[..])?; + Ok(()) } - // FIXME: Also send all the WAL. The compute node would only need - // the WAL that applies to non-relation files, because the page - // server handles all the relation files. But we don't have a - // mechanism for separating relation and non-relation WAL at the - // moment. - for entry in std::fs::read_dir(&walpath)? { - let entry = entry?; - let fullpath = &entry.path(); - let relpath = fullpath.strip_prefix(&walpath).unwrap(); - - if !entry.path().is_file() { - continue; + // + // Extract twophase state files + // + fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> { + // Include in tarball two-phase files only of in-progress transactions + if self.timeline.get_tx_status(xid, self.lsn)? + == pg_constants::TRANSACTION_STATUS_IN_PROGRESS + { + let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + let mut buf = BytesMut::new(); + buf.extend_from_slice(&img[..]); + let crc = crc32c::crc32c(&img[..]); + buf.put_u32_le(crc); + let path = format!("pg_twophase/{:>08X}", xid); + let header = new_tar_header(&path, buf.len() as u64)?; + self.ar.append(&header, &buf[..])?; } - - let archive_fname = relpath.to_str().unwrap(); - let archive_fname = archive_fname - .strip_suffix(".partial") - .unwrap_or(&archive_fname); - let archive_path = "pg_wal/".to_owned() + archive_fname; - ar.append_path_with_name(fullpath, archive_path)?; + Ok(()) } - ar.finish()?; - debug!("all tarred up!"); - Ok(()) + // + // Add generated pg_control file + // + fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { + let checkpoint_bytes = self + .timeline + .get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?; + let pg_control_bytes = self + .timeline + .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; + let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; + let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + + // Generate new pg_control and WAL needed for bootstrap + let checkpoint_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); + let checkpoint_lsn = XLogSegNoOffsetToRecPtr( + checkpoint_segno, + XLOG_SIZE_OF_XLOG_LONG_PHD as u32, + pg_constants::WAL_SEGMENT_SIZE, + ); + checkpoint.redo = self.lsn.0 + self.lsn.calc_padding(8u32); + + //reset some fields we don't want to preserve + checkpoint.oldestActiveXid = 0; + + //save new values in pg_control + pg_control.checkPoint = checkpoint_lsn; + pg_control.checkPointCopy = checkpoint; + info!("pg_control.state = {}", pg_control.state); + pg_control.state = pg_constants::DB_SHUTDOWNED; + + // add zenith.signal file + self.ar + .append(&new_tar_header("zenith.signal", 0)?, &b""[..])?; + + //send pg_control + let pg_control_bytes = pg_control.encode(); + let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; + self.ar.append(&header, &pg_control_bytes[..])?; + + //send wal segment + let wal_file_name = XLogFileName( + 1, // FIXME: always use Postgres timeline 1 + checkpoint_segno, + pg_constants::WAL_SEGMENT_SIZE, + ); + let wal_file_path = format!("pg_wal/{}", wal_file_name); + let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?; + let wal_seg = generate_wal_segment(&pg_control); + self.ar.append(&header, &wal_seg[..])?; + Ok(()) + } } /// @@ -228,6 +308,28 @@ fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { } } +// +// Check if it is relational file +// fn is_rel_file_path(path: &str) -> bool { parse_rel_file_path(path).is_ok() } + +// +// Create new tarball entry header +// +fn new_tar_header(path: &str, size: u64) -> anyhow::Result
{ + let mut header = Header::new_gnu(); + header.set_size(size); + header.set_path(path)?; + header.set_mode(0b110000000); // -rw------- + header.set_mtime( + // use currenttime as last modified time + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ); + header.set_cksum(); + Ok(header) +} diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 6efca05a0c..c92c711b21 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -127,9 +127,6 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { // Remove pg_wal fs::remove_dir_all(tmppath.join("pg_wal"))?; - force_crash_recovery(&tmppath)?; - println!("updated pg_control"); - // Move the data directory as an initial base backup. // FIXME: It would be enough to only copy the non-relation files here, the relation // data was already loaded into the repository. @@ -345,27 +342,6 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { bail!("could not parse point-in-time {}", s); } -// If control file says the cluster was shut down cleanly, modify it, to mark -// it as crashed. That forces crash recovery when you start the cluster. -// -// FIXME: -// We currently do this to the initial snapshot in "zenith init". It would -// be more natural to do this when the snapshot is restored instead, but we -// currently don't have any code to create new snapshots, so it doesn't matter -// Or better yet, use a less hacky way of putting the cluster into recovery. -// Perhaps create a backup label file in the data directory when it's restored. -fn force_crash_recovery(datadir: &Path) -> Result<()> { - // Read in the control file - let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); - let mut controlfile = ControlFileData::decode(&fs::read(controlfilepath.as_path())?)?; - - controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; - - fs::write(controlfilepath.as_path(), controlfile.encode())?; - - Ok(()) -} - fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { // Create initial timeline let mut tli_buf = [0u8; 16]; diff --git a/pageserver/src/object_key.rs b/pageserver/src/object_key.rs index 0ae8291825..a087ca3a21 100644 --- a/pageserver/src/object_key.rs +++ b/pageserver/src/object_key.rs @@ -1,4 +1,5 @@ use crate::repository::{BufferTag, RelTag}; +use crate::waldecoder::TransactionId; use crate::ZTimelineId; use serde::{Deserialize, Serialize}; @@ -13,6 +14,48 @@ pub struct ObjectKey { pub tag: ObjectTag, } +/// +/// Non-relation transaction status files (clog (a.k.a. pg_xact) and pg_multixact) +/// in Postgres are handled by SLRU (Simple LRU) buffer, hence the name. +/// +/// These files are global for a postgres instance. +/// +/// These files are divided into segments, which are divided into pages +/// of the same BLCKSZ as used for relation files. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct SlruBufferTag { + pub blknum: u32, +} + +/// +/// Special type of Postgres files: pg_filenode.map is needed to map +/// catalog table OIDs to filenode numbers, which define filename. +/// +/// Each database has a map file for its local mapped catalogs, +/// and there is a separate map file for shared catalogs. +/// +/// These files have untypical size of 512 bytes. +/// +/// See PostgreSQL relmapper.c for details. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct DatabaseTag { + pub spcnode: u32, + pub dbnode: u32, +} + +/// +/// Non-relation files that keep state for prepared transactions. +/// Unlike other files these are not divided into pages. +/// +/// See PostgreSQL twophase.c for details. +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct PrepareTag { + pub xid: TransactionId, +} + /// ObjectTag is a part of ObjectKey that is specific to the type of /// the stored object. /// @@ -21,7 +64,21 @@ pub struct ObjectKey { /// #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum ObjectTag { + // dummy tag preceeding all other keys + FirstTag, TimelineMetadataTag, + // Special entry that represents PostgreSQL checkpoint. + // We use it to track fields needed to restore controlfile checkpoint. + Checkpoint, + // Various types of non-relation files. + // We need them to bootstrap compute node. + ControlFile, + Clog(SlruBufferTag), + MultiXactMembers(SlruBufferTag), + MultiXactOffsets(SlruBufferTag), + FileNodeMap(DatabaseTag), + TwoPhase(PrepareTag), + // put relations at the end of enum to allow efficient iterations through non-rel objects RelationMetadata(RelTag), RelationBuffer(BufferTag), } diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index e3bb4ceba9..78bf60878a 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -22,8 +22,8 @@ use crate::{PageServerConf, ZTimelineId}; use anyhow::{bail, Context, Result}; use bytes::Bytes; use log::*; +use postgres_ffi::pg_constants; use serde::{Deserialize, Serialize}; -use std::cmp::max; use std::collections::{BTreeMap, HashMap, HashSet}; use std::convert::TryInto; use std::sync::{Arc, Mutex, RwLock}; @@ -139,8 +139,7 @@ impl Repository for ObjectRepository { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { - // just to check the source timeline exists - let _ = self.get_timeline(src)?; + let src_timeline = self.get_timeline(src)?; // Write a metadata key, noting the ancestor of th new timeline. There is initially // no data in it, but all the read-calls know to look into the ancestor. @@ -157,6 +156,18 @@ impl Repository for ObjectRepository { &ObjectValue::ser(&val)?, )?; + // Copy non-rel objects + for tag in src_timeline.list_nonrels(at_lsn)? { + match tag { + ObjectTag::TimelineMetadataTag => {} // skip it + _ => { + let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?; + let val = ObjectValue::Page(PageEntry::Page(img)); + let key = ObjectKey { timeline: dst, tag }; + self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?; + } + } + } Ok(()) } } @@ -247,12 +258,61 @@ impl Timeline for ObjectTimeline { //------------------------------------------------------------------------------ /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result { + fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; self.get_page_at_lsn_nowait(tag, lsn) } + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + // Look up the page entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let searchkey = ObjectKey { + timeline: self.timelineid, + tag, + }; + let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?; + + if let Some((lsn, value)) = iter.next().transpose()? { + let page_img: Bytes; + + match ObjectValue::des(&value)? { + ObjectValue::Page(PageEntry::Page(img)) => { + page_img = img; + } + ObjectValue::Page(PageEntry::WALRecord(_rec)) => { + // Request the WAL redo manager to apply the WAL records for us. + let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; + page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; + + // Garbage collection assumes that we remember the materialized page + // version. Otherwise we could opt to not do it, with the downside that + // the next GetPage@LSN call of the same page version would have to + // redo the WAL again. + self.put_page_image(tag, lsn, page_img.clone())?; + } + ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE), + _ => bail!("Invalid object kind, expected a page entry or SRLU truncate"), + } + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + trace!( + "Returning page with LSN {:X}/{:X} for {:?} from {} (request {})", + page_lsn_hi, + page_lsn_lo, + tag, + version_lsn, + lsn + ); + return Ok(page_img); + } + trace!("page {:?} at {} not found", tag, req_lsn); + Ok(Bytes::from_static(&ZERO_PAGE)) + /* return Err("could not find page image")?; */ + } + /// Get size of relation fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result { let lsn = self.wait_lsn(lsn)?; @@ -284,6 +344,11 @@ impl Timeline for ObjectTimeline { Ok(false) } + /// Get a list of non-relational objects + fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result + 'a>> { + self.obj_store.list_objects(self.timelineid, true, lsn) + } + /// Get a list of all distinct relations in given tablespace and database. fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { // List all relations in this timeline. @@ -327,86 +392,114 @@ impl Timeline for ObjectTimeline { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> { + fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> { let lsn = rec.lsn; self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?; - debug!( - "put_wal_record rel {} blk {} at {}", - tag.rel, tag.blknum, lsn - ); + debug!("put_wal_record {:?} at {}", tag, lsn); - // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + if let ObjectTag::RelationBuffer(tag) = tag { + // Also check if this created or extended the file + let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); - if tag.blknum >= old_nblocks { - let new_nblocks = tag.blknum + 1; + if tag.blknum >= old_nblocks { + let new_nblocks = tag.blknum + 1; - trace!( - "Extended relation {} from {} to {} blocks at {}", - tag.rel, - old_nblocks, - new_nblocks, - lsn - ); + trace!( + "Extended relation {} from {} to {} blocks at {}", + tag.rel, + old_nblocks, + new_nblocks, + lsn + ); - self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - tag.rel, - RelMetadata { - size: new_nblocks, - last_updated: lsn, - }, - ); + self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; + let mut rel_meta = self.rel_meta.write().unwrap(); + rel_meta.insert( + tag.rel, + RelMetadata { + size: new_nblocks, + last_updated: lsn, + }, + ); + } } - Ok(()) } - /// Unlink object. This method is used for marking dropped relations. + /// Unlink relation. This method is used for marking dropped relations. fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> { self.put_relsize_entry(&rel_tag, lsn, RelationSizeEntry::Unlink)?; Ok(()) } + /// Truncate SRLU segment + fn put_slru_truncate(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + tag, + }; + let val = ObjectValue::SLRUTruncate; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; + Ok(()) + } + + fn get_next_tag(&self, tag: ObjectTag) -> Result> { + let key = ObjectKey { + timeline: self.timelineid, + tag, + }; + if let Some(key) = self.obj_store.get_next_key(&key)? { + Ok(Some(key.tag)) + } else { + Ok(None) + } + } + + fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + tag, + }; + self.obj_store.put(&key, lsn, data)?; + Ok(()) + } + /// /// Memorize a full image of a page version /// - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> { + fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes) -> Result<()> { self.put_page_entry(&tag, lsn, PageEntry::Page(img))?; - debug!( - "put_page_image rel {} blk {} at {}", - tag.rel, tag.blknum, lsn - ); + debug!("put_page_image rel {:?} at {}", tag, lsn); - // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + if let ObjectTag::RelationBuffer(tag) = tag { + // Also check if this created or extended the file + let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); - if tag.blknum >= old_nblocks { - let new_nblocks = tag.blknum + 1; + if tag.blknum >= old_nblocks { + let new_nblocks = tag.blknum + 1; - trace!( - "Extended relation {} from {} to {} blocks at {}", - tag.rel, - old_nblocks, - new_nblocks, - lsn - ); + trace!( + "Extended relation {} from {} to {} blocks at {}", + tag.rel, + old_nblocks, + new_nblocks, + lsn + ); - self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - tag.rel, - RelMetadata { - size: new_nblocks, - last_updated: lsn, - }, - ); + self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; + let mut rel_meta = self.rel_meta.write().unwrap(); + rel_meta.insert( + tag.rel, + RelMetadata { + size: new_nblocks, + last_updated: lsn, + }, + ); + } } - Ok(()) } @@ -526,72 +619,152 @@ impl Timeline for ObjectTimeline { // WAL is large enough to perform GC let now = Instant::now(); - // Iterate through all relations - for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? { - let rel = *rels; - let key = relation_size_key(self.timelineid, rel); - let mut max_size = 0u32; - let mut relation_dropped = false; - - result.n_relations += 1; - - // Process relation metadata versions - let mut latest_version = true; - for (lsn, val) in self.obj_store.object_versions(&key, horizon)? { - let rel_meta = ObjectValue::des_relsize(&val)?; - // If relation is dropped at the horizon, - // we can remove all its versions including latest (Unlink) - match rel_meta { - RelationSizeEntry::Size(size) => max_size = max(max_size, size), - RelationSizeEntry::Unlink => { - if latest_version { - relation_dropped = true; - info!("Relation {:?} dropped", rels); - result.dropped += 1; + // Iterate through all objects in timeline + for obj in self + .obj_store + .list_objects(self.timelineid, false, last_lsn)? + { + result.inspected += 1; + match obj { + // Prepared transactions + ObjectTag::TwoPhase(prepare) => { + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + if self.get_tx_status(prepare.xid, horizon)? + != pg_constants::TRANSACTION_STATUS_IN_PROGRESS + { + let lsn = vers.0; + self.obj_store.unlink(&key, lsn)?; + result.prep_deleted += 1; } } } - // preserve latest version, unless the relation was dropped completely. - if latest_version { - latest_version = false; - if !relation_dropped { - continue; - } - } - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - // Now process all relation blocks - for blknum in 0..max_size { - let buf_tag = BufferTag { rel, blknum }; - let key = ObjectKey { - timeline: self.timelineid, - tag: ObjectTag::RelationBuffer(buf_tag), - }; - let mut latest_version = true; - let mut deleted_page_versions = 0; - for (lsn, _val) in self.obj_store.object_versions(&key, horizon)? { - // Preserve and materialize latest version before deleting all preceding versions. - // We let get_page_at_lsn_nowait() do the materialization. - if latest_version { - latest_version = false; - if !relation_dropped { - self.get_page_at_lsn_nowait(buf_tag, lsn)?; - continue; + ObjectTag::RelationMetadata(_) => { + // Do not need to reconstruct page images, + // just delete all old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + result.dropped += 1; + } + _ => (), // preserve last version + } + last_version = false; + result.truncated += 1; + result.n_relations += 1; + } else { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; } } - self.obj_store.unlink(&key, lsn)?; - deleted_page_versions += 1; } - if deleted_page_versions > 0 && !relation_dropped { - result.truncated += 1; + ObjectTag::RelationBuffer(tag) => { + // Reconstruct page at horizon unless relation was dropped + // and delete all older versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + result.truncated += 1; + last_version = false; + if let Some(rel_size) = self.relsize_get_nowait(tag.rel, lsn)? { + if rel_size > tag.blknum { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(obj, lsn)?; + continue; + } + debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size); + } else { + if let Some(rel_size) = + self.relsize_get_nowait(tag.rel, last_lsn)? + { + debug!("Preserve block {} of relation {:?} at {} because relation has size {} at {}", tag.rel, tag, lsn, rel_size, last_lsn); + continue; + } + debug!("Relation {:?} was dropped at {}", tag.rel, lsn); + } + // relation was dropped or truncated so this block can be removed + } + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } } - result.deleted += deleted_page_versions; + // SLRU-s + ObjectTag::Clog(_) + | ObjectTag::MultiXactOffsets(_) + | ObjectTag::MultiXactMembers(_) => { + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::SLRUTruncate => { + self.obj_store.unlink(&key, lsn)?; + result.slru_deleted += 1; + } + ObjectValue::Page(PageEntry::WALRecord(_)) => { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(obj, lsn)?; + } + _ => {} // do nothing if already materialized + } + last_version = false; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + result.slru_deleted += 1; + } + } + } + // versioned always materialized objects: no need to reconstruct pages + ObjectTag::Checkpoint | ObjectTag::ControlFile => { + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + // preserve last version + last_version = false; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + result.chkp_deleted += 1; + } + } + } + _ => (), // do nothing } } result.elapsed = now.elapsed(); - info!("Garbage collection completed in {:?}: {} relations inspected, {} version histories truncated, {} versions deleted, {} relations dropped", - result.elapsed, &result.n_relations, &result.truncated, &result.deleted, &result.dropped); + info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", + result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); self.obj_store.compact(); } Ok(result) @@ -599,54 +772,6 @@ impl Timeline for ObjectTimeline { } impl ObjectTimeline { - fn get_page_at_lsn_nowait(&self, tag: BufferTag, req_lsn: Lsn) -> Result { - // Look up the page entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let searchkey = ObjectKey { - timeline: self.timelineid, - tag: ObjectTag::RelationBuffer(tag), - }; - let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?; - - if let Some((lsn, value)) = iter.next().transpose()? { - let page_img: Bytes; - - match ObjectValue::des_page(&value)? { - PageEntry::Page(img) => { - page_img = img; - } - PageEntry::WALRecord(_rec) => { - // Request the WAL redo manager to apply the WAL records for us. - let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; - page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; - - // Garbage collection assumes that we remember the materialized page - // version. Otherwise we could opt to not do it, with the downside that - // the next GetPage@LSN call of the same page version would have to - // redo the WAL again. - self.put_page_image(tag, lsn, page_img.clone())?; - } - } - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - trace!( - "Returning page with LSN {:X}/{:X} for {} blk {} from {} (request {})", - page_lsn_hi, - page_lsn_lo, - tag.rel, - tag.blknum, - lsn, - req_lsn - ); - return Ok(page_img); - } - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - trace!("page {} blk {} at {} not found", tag.rel, tag.blknum, req_lsn); - Ok(Bytes::from_static(&ZERO_PAGE)) - /* return Err("could not find page image")?; */ - } - /// /// Internal function to get relation size at given LSN. /// @@ -700,7 +825,7 @@ impl ObjectTimeline { /// fn collect_records_for_apply( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, ) -> Result<(Option, Vec)> { let mut base_img: Option = None; @@ -710,7 +835,7 @@ impl ObjectTimeline { // old page image. let searchkey = ObjectKey { timeline: self.timelineid, - tag: ObjectTag::RelationBuffer(tag), + tag, }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; while let Some((_key, value)) = iter.next().transpose()? { @@ -812,10 +937,10 @@ impl ObjectTimeline { // // Helper functions to store different kinds of objects to the underlying ObjectStore // - fn put_page_entry(&self, tag: &BufferTag, lsn: Lsn, val: PageEntry) -> Result<()> { + fn put_page_entry(&self, tag: &ObjectTag, lsn: Lsn, val: PageEntry) -> Result<()> { let key = ObjectKey { timeline: self.timelineid, - tag: ObjectTag::RelationBuffer(*tag), + tag: *tag, }; let val = ObjectValue::Page(val); @@ -895,7 +1020,6 @@ impl<'a> ObjectHistory<'a> { fn next_result(&mut self) -> Result> { while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? { let (rel_tag, update) = match object_tag { - ObjectTag::TimelineMetadataTag => continue, ObjectTag::RelationMetadata(rel_tag) => { let entry = ObjectValue::des_relsize(&value)?; match self.handle_relation_size(rel_tag, entry) { @@ -909,6 +1033,7 @@ impl<'a> ObjectHistory<'a> { (buf_tag.rel, update) } + _ => continue, }; return Ok(Some(RelationUpdate { @@ -931,6 +1056,7 @@ enum ObjectValue { Page(PageEntry), RelationSize(RelationSizeEntry), TimelineMetadata(MetadataEntry), + SLRUTruncate, } /// diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index d1c2b5a09d..50c3f95a4a 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -34,6 +34,9 @@ pub trait ObjectStore: Send + Sync { /// correspond to any real relation. fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result>; + /// Read key greater or equal than specified + fn get_next_key(&self, key: &ObjectKey) -> Result>; + /// Iterate through all page versions of one object. /// /// Returns all page versions in descending LSN order, along with the LSN @@ -66,6 +69,17 @@ pub trait ObjectStore: Send + Sync { lsn: Lsn, ) -> Result>; + /// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated. + /// + /// This is used to implement GC and preparing tarball for new node startup + /// Returns objects in increasing key-version order. + fn list_objects<'a>( + &'a self, + timelineid: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result + 'a>>; + /// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion. fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 59b7a8bcab..d210b25468 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -28,6 +28,7 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; use crate::branches; +use crate::object_key::ObjectTag; use crate::page_cache; use crate::repository::{BufferTag, RelTag, RelationUpdate, Update}; use crate::restore_local_repo; @@ -231,7 +232,7 @@ impl PageServerHandler { PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) } PagestreamFeMessage::Read(req) => { - let buf_tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: RelTag { spcnode: req.spcnode, dbnode: req.dbnode, @@ -239,9 +240,9 @@ impl PageServerHandler { forknum: req.forknum, }, blknum: req.blkno, - }; + }); - let read_response = match timeline.get_page_at_lsn(buf_tag, req.lsn) { + let read_response = match timeline.get_page_at_lsn(tag, req.lsn) { Ok(p) => PagestreamReadResponse { ok: true, n_blocks: 0, @@ -292,14 +293,20 @@ impl PageServerHandler { // find latest snapshot let snapshot_lsn = restore_local_repo::find_latest_snapshot(&self.conf, timelineid).unwrap(); - let req_lsn = lsn.unwrap_or(snapshot_lsn); - basebackup::send_tarball_at_lsn( - &mut CopyDataSink { pgb }, - timelineid, - &timeline, - req_lsn, - snapshot_lsn, - )?; + + let req_lsn = lsn.unwrap_or_else(|| timeline.get_last_valid_lsn()); + + { + let mut writer = CopyDataSink { pgb }; + let mut basebackup = basebackup::Basebackup::new( + &mut writer, + timelineid, + &timeline, + req_lsn, + snapshot_lsn, + ); + basebackup.send_tarball()?; + } pgb.write_message(&BeMessage::CopyDone)?; debug!("CopyDone sent!"); @@ -412,18 +419,18 @@ impl postgres_backend::Handler for PageServerHandler { match relation_update.update { Update::Page { blknum, img } => { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: relation_update.rel, blknum, - }; + }); timeline.put_page_image(tag, relation_update.lsn, img)?; } Update::WALRecord { blknum, rec } => { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: relation_update.rel, blknum, - }; + }); timeline.put_wal_record(tag, rec)?; } @@ -545,6 +552,24 @@ impl postgres_backend::Handler for PageServerHandler { typlen: 8, ..Default::default() }, + RowDescriptor { + name: b"prep_deleted", + typoid: 20, + typlen: 8, + ..Default::default() + }, + RowDescriptor { + name: b"slru_deleted", + typoid: 20, + typlen: 8, + ..Default::default() + }, + RowDescriptor { + name: b"chkp_deleted", + typoid: 20, + typlen: 8, + ..Default::default() + }, RowDescriptor { name: b"dropped", typoid: 20, @@ -562,6 +587,9 @@ impl postgres_backend::Handler for PageServerHandler { Some(&result.n_relations.to_string().as_bytes()), Some(&result.truncated.to_string().as_bytes()), Some(&result.deleted.to_string().as_bytes()), + Some(&result.prep_deleted.to_string().as_bytes()), + Some(&result.slru_deleted.to_string().as_bytes()), + Some(&result.chkp_deleted.to_string().as_bytes()), Some(&result.dropped.to_string().as_bytes()), Some(&result.elapsed.as_millis().to_string().as_bytes()), ]))? diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 9d62402ce1..1f3de164c1 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,10 +1,15 @@ +use crate::object_key::*; +use crate::waldecoder::TransactionId; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use postgres_ffi::nonrelfile_utils::transaction_id_get_status; +use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::forknumber_to_name; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::fmt; +use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use zenith_utils::lsn::Lsn; @@ -35,8 +40,12 @@ pub trait Repository: Send + Sync { #[derive(Default)] pub struct GcResult { pub n_relations: u64, + pub inspected: u64, pub truncated: u64, pub deleted: u64, + pub prep_deleted: u64, // 2PC prepare + pub slru_deleted: u64, // SLRU (clog, multixact) + pub chkp_deleted: u64, // Checkpoints pub dropped: u64, pub elapsed: Duration, } @@ -47,7 +56,10 @@ pub trait Timeline: Send + Sync { //------------------------------------------------------------------------------ /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result; + fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result; + + /// Look up given page in the cache. + fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result; /// Get size of relation fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result; @@ -58,6 +70,9 @@ pub trait Timeline: Send + Sync { /// Get a list of all distinct relations in given tablespace and database. fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result>; + /// Get a list of non-relational objects + fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result + 'a>>; + //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. // @@ -68,17 +83,26 @@ pub trait Timeline: Send + Sync { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>; + fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()>; + + /// Put raw data + fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>; + fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes) -> Result<()>; /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; - /// Unlink object. This method is used for marking dropped relations. + /// Unlink relation. This method is used for marking dropped relations. fn put_unlink(&self, tag: RelTag, lsn: Lsn) -> Result<()>; + /// Truncate SRLU segment + fn put_slru_truncate(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>; + + // Get object tag greater or equal than specified + fn get_next_tag(&self, tag: ObjectTag) -> Result>; + /// Remember the all WAL before the given LSN has been processed. /// /// The WAL receiver calls this after the put_* functions, to indicate that @@ -117,6 +141,15 @@ pub trait Timeline: Send + Sync { /// /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). fn gc_iteration(&self, horizon: u64) -> Result; + + // Check transaction status + fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result { + let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + let clog_page = self.get_page_at_lsn(tag, lsn)?; + let status = transaction_id_get_status(xid, &clog_page[..]); + Ok(status) + } } pub trait History: Iterator> { @@ -278,11 +311,11 @@ mod tests { /// Convenience function to create a BufferTag for testing. /// Helps to keeps the tests shorter. #[allow(non_snake_case)] - fn TEST_BUF(blknum: u32) -> BufferTag { - BufferTag { + fn TEST_BUF(blknum: u32) -> ObjectTag { + ObjectTag::RelationBuffer(BufferTag { rel: TESTREL_A, blknum, - } + }) } /// Convenience function to create a page image with given string as the only content @@ -467,16 +500,17 @@ mod tests { assert_eq!(None, snapshot.next().transpose()?); // add a page and advance the last valid LSN - let buf = TEST_BUF(1); - tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; + let rel = TESTREL_A; + let tag = TEST_BUF(1); + tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; tline.advance_last_valid_lsn(Lsn(1)); let mut snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(1)); let expected_page = RelationUpdate { - rel: buf.rel, + rel: rel, lsn: Lsn(1), update: Update::Page { - blknum: buf.blknum, + blknum: 1, img: TEST_IMG("blk 1 @ lsn 1"), }, }; @@ -484,7 +518,7 @@ mod tests { assert_eq!(None, snapshot.next().transpose()?); // truncate to zero, but don't advance the last valid LSN - tline.put_truncation(buf.rel, Lsn(2), 0)?; + tline.put_truncation(rel, Lsn(2), 0)?; let mut snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(1)); assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); @@ -498,7 +532,7 @@ mod tests { // TODO ordering not guaranteed by API. But currently it returns the // truncation entry before the block data. let expected_truncate = RelationUpdate { - rel: buf.rel, + rel: rel, lsn: Lsn(2), update: Update::Truncate { n_blocks: 0 }, }; @@ -515,15 +549,14 @@ mod tests { impl WalRedoManager for TestRedoManager { fn request_redo( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, records: Vec, ) -> Result { let s = format!( - "redo for rel {} blk {} to get to {}, with {} and {} records", - tag.rel, - tag.blknum, + "redo for {:?} to get to {}, with {} and {} records", + tag, lsn, if base_img.is_some() { "base image" diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 4999a6b934..cae423c77a 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -12,18 +12,21 @@ use std::io::SeekFrom; use std::path::{Path, PathBuf}; use anyhow::Result; -use bytes::Bytes; +use bytes::{Buf, Bytes}; -use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; -use crate::waldecoder::{decode_wal_record, DecodedWALRecord, Oid, WalStreamDecoder}; -use crate::waldecoder::{XlCreateDatabase, XlSmgrTruncate, XlXactParsedRecord}; +use crate::object_key::*; +use crate::repository::*; +use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; -use postgres_ffi::pg_constants; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::{pg_constants, CheckPoint, ControlFileData}; use zenith_utils::lsn::Lsn; +const MAX_MBR_BLKNO: u32 = + pg_constants::MAX_MULTIXACT_ID / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + /// /// Find latest snapshot in a timeline's 'snapshots' directory /// @@ -63,8 +66,18 @@ pub fn import_timeline_from_postgres_datadir( None => continue, // These special files appear in the snapshot, but are not needed by the page server - Some("pg_control") => continue, - Some("pg_filenode.map") => continue, + Some("pg_control") => { + import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())? + } + Some("pg_filenode.map") => import_nonrel_file( + timeline, + lsn, + ObjectTag::FileNodeMap(DatabaseTag { + spcnode: pg_constants::GLOBALTABLESPACE_OID, + dbnode: 0, + }), + &direntry.path(), + )?, // Load any relation files into the page server _ => import_relfile( @@ -91,7 +104,15 @@ pub fn import_timeline_from_postgres_datadir( // These special files appear in the snapshot, but are not needed by the page server Some("PG_VERSION") => continue, - Some("pg_filenode.map") => continue, + Some("pg_filenode.map") => import_nonrel_file( + timeline, + lsn, + ObjectTag::FileNodeMap(DatabaseTag { + spcnode: pg_constants::DEFAULTTABLESPACE_OID, + dbnode: dboid, + }), + &direntry.path(), + )?, // Load any relation files into the page server _ => import_relfile( @@ -104,6 +125,43 @@ pub fn import_timeline_from_postgres_datadir( } } } + for entry in fs::read_dir(path.join("pg_xact"))? { + let entry = entry?; + import_slru_file( + timeline, + lsn, + |blknum| ObjectTag::Clog(SlruBufferTag { blknum }), + &entry.path(), + )?; + } + for entry in fs::read_dir(path.join("pg_multixact").join("members"))? { + let entry = entry?; + import_slru_file( + timeline, + lsn, + |blknum| ObjectTag::MultiXactMembers(SlruBufferTag { blknum }), + &entry.path(), + )?; + } + for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? { + let entry = entry?; + import_slru_file( + timeline, + lsn, + |blknum| ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }), + &entry.path(), + )?; + } + for entry in fs::read_dir(path.join("pg_twophase"))? { + let entry = entry?; + let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?; + import_nonrel_file( + timeline, + lsn, + ObjectTag::TwoPhase(PrepareTag { xid }), + &entry.path(), + )?; + } // TODO: Scan pg_tblspc timeline.checkpoint()?; @@ -136,7 +194,7 @@ fn import_relfile( let r = file.read_exact(&mut buf); match r { Ok(_) => { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: RelTag { spcnode: spcoid, dbnode: dboid, @@ -144,13 +202,61 @@ fn import_relfile( forknum, }, blknum, - }; + }); timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?; - /* - if oldest_lsn == 0 || p.lsn < oldest_lsn { - oldest_lsn = p.lsn; + } + + // TODO: UnexpectedEof is expected + Err(e) => match e.kind() { + std::io::ErrorKind::UnexpectedEof => { + // reached EOF. That's expected. + // FIXME: maybe check that we read the full length of the file? + break; } - */ + _ => { + error!("error reading file: {:?} ({})", path, e); + break; + } + }, + }; + blknum += 1; + } + + Ok(()) +} + +fn import_nonrel_file( + timeline: &dyn Timeline, + lsn: Lsn, + tag: ObjectTag, + path: &Path, +) -> Result<()> { + let mut file = File::open(path)?; + let mut buffer = Vec::new(); + // read the whole file + file.read_to_end(&mut buffer)?; + + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]))?; + Ok(()) +} + +fn import_slru_file( + timeline: &dyn Timeline, + lsn: Lsn, + gen_tag: fn(blknum: u32) -> ObjectTag, + path: &Path, +) -> Result<()> { + // Does it look like a relation file? + + let mut file = File::open(path)?; + let mut buf: [u8; 8192] = [0u8; 8192]; + let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; + let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; + loop { + let r = file.read_exact(&mut buf); + match r { + Ok(_) => { + timeline.put_page_image(gen_tag(blknum), lsn, Bytes::copy_from_slice(&buf))?; } // TODO: UnexpectedEof is expected @@ -180,6 +286,16 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut segno = startpoint.segment_number(pg_constants::WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; + + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; + let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + if checkpoint.nextXid.value == 0 { + let pg_control_bytes = + timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, startpoint)?; + let pg_control = ControlFileData::decode(&pg_control_bytes)?; + checkpoint = pg_control.checkPointCopy; + } + loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, pg_constants::WAL_SEGMENT_SIZE); @@ -217,11 +333,12 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: if rec.is_err() { // Assume that an error means we've reached the end of // a partial WAL record. So that's ok. + trace!("WAL decoder error {:?}", rec); break; } if let Some((lsn, recdata)) = rec.unwrap() { let decoded = decode_wal_record(recdata.clone()); - save_decoded_record(timeline, &decoded, recdata, lsn)?; + save_decoded_record(&mut checkpoint, timeline, &decoded, recdata, lsn)?; last_lsn = lsn; } else { break; @@ -240,6 +357,8 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: offset = 0; } info!("reached end of WAL at {}", last_lsn); + let checkpoint_bytes = checkpoint.encode(); + timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes)?; Ok(()) } @@ -248,15 +367,18 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: /// relations/pages that the record affects. /// pub fn save_decoded_record( + checkpoint: &mut CheckPoint, timeline: &dyn Timeline, decoded: &DecodedWALRecord, recdata: Bytes, lsn: Lsn, ) -> Result<()> { + checkpoint.update_next_xid(decoded.xl_xid); + // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - let tag = BufferTag { + let tag = ObjectTag::RelationBuffer(BufferTag { rel: RelTag { spcnode: blk.rnode_spcnode, dbnode: blk.rnode_dbnode, @@ -264,7 +386,7 @@ pub fn save_decoded_record( forknum: blk.forknum as u8, }, blknum: blk.blkno, - }; + }); let rec = WALRecord { lsn, @@ -276,15 +398,18 @@ pub fn save_decoded_record( timeline.put_wal_record(tag, rec)?; } + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + // Handle a few special record types if decoded.xl_rmid == pg_constants::RM_SMGR_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_SMGR_TRUNCATE { - let truncate = XlSmgrTruncate::decode(&decoded); + let truncate = XlSmgrTruncate::decode(&mut buf); save_xlog_smgr_truncate(timeline, lsn, &truncate)?; } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { - let createdb = XlCreateDatabase::decode(&decoded); + let createdb = XlCreateDatabase::decode(&mut buf); save_xlog_dbase_create(timeline, lsn, &createdb)?; } else { // TODO @@ -292,6 +417,39 @@ pub fn save_decoded_record( } } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); + } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID { + let blknum = buf.get_u32_le(); + let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; + let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + if info == pg_constants::CLOG_ZEROPAGE { + let rec = WALRecord { + lsn, + will_init: true, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + timeline.put_wal_record(tag, rec)?; + } else { + assert!(info == pg_constants::CLOG_TRUNCATE); + checkpoint.oldestXid = buf.get_u32_le(); + checkpoint.oldestXidDB = buf.get_u32_le(); + trace!( + "RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}", + blknum, + checkpoint.oldestXid, + checkpoint.oldestXidDB + ); + if let Some(ObjectTag::Clog(first_slru_tag)) = + timeline.get_next_tag(ObjectTag::Clog(SlruBufferTag { blknum: 0 }))? + { + for trunc_blknum in first_slru_tag.blknum..=blknum { + let tag = ObjectTag::Clog(SlruBufferTag { + blknum: trunc_blknum, + }); + timeline.put_slru_truncate(tag, lsn)?; + } + } + } } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; if info == pg_constants::XLOG_XACT_COMMIT @@ -299,11 +457,75 @@ pub fn save_decoded_record( || info == pg_constants::XLOG_XACT_ABORT || info == pg_constants::XLOG_XACT_ABORT_PREPARED { - let parsed_xact = XlXactParsedRecord::decode(&decoded); - save_xact_record(timeline, lsn, &parsed_xact)?; + let parsed_xact = XlXactParsedRecord::decode(&mut buf, decoded.xl_xid, decoded.xl_info); + save_xact_record(timeline, lsn, &parsed_xact, decoded)?; + } else if info == pg_constants::XLOG_XACT_PREPARE { + let rec = WALRecord { + lsn, + will_init: true, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + timeline.put_wal_record( + ObjectTag::TwoPhase(PrepareTag { + xid: decoded.xl_xid, + }), + rec, + )?; + } + } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + let blknum = buf.get_u32_le(); + let rec = WALRecord { + lsn, + will_init: true, + rec: recdata.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + let tag = if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }) + } else { + ObjectTag::MultiXactMembers(SlruBufferTag { blknum }) + }; + timeline.put_wal_record(tag, rec)?; + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + save_multixact_create_record(checkpoint, timeline, lsn, &xlrec, decoded)?; + } else if info == pg_constants::XLOG_MULTIXACT_TRUNCATE_ID { + let xlrec = XlMultiXactTruncate::decode(&mut buf); + save_multixact_truncate_record(checkpoint, timeline, lsn, &xlrec)?; + } + } else if decoded.xl_rmid == pg_constants::RM_RELMAP_ID { + let xlrec = XlRelmapUpdate::decode(&mut buf); + save_relmap_record(timeline, lsn, &xlrec, decoded)?; + } else if decoded.xl_rmid == pg_constants::RM_XLOG_ID { + let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_NEXTOID { + let next_oid = buf.get_u32_le(); + if next_oid > checkpoint.nextOid { + checkpoint.nextOid = next_oid; + } + } else if info == pg_constants::XLOG_CHECKPOINT_ONLINE + || info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN + { + let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT]; + let mut buf = decoded.record.clone(); + buf.advance(decoded.main_data_offset); + buf.copy_to_slice(&mut checkpoint_bytes); + let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes).unwrap(); + trace!( + "xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}", + xlog_checkpoint.oldestXid, + checkpoint.oldestXid + ); + if (checkpoint.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 { + checkpoint.oldestXid = xlog_checkpoint.oldestXid; + } } } - // Now that this record has been handled, let the repository know that // it is up-to-date to this LSN timeline.advance_last_record_lsn(lsn); @@ -344,18 +566,18 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab // Copy content for blknum in 0..nblocks { - let src_key = BufferTag { + let src_key = ObjectTag::RelationBuffer(BufferTag { rel: src_rel, blknum, - }; - let dst_key = BufferTag { + }); + let dst_key = ObjectTag::RelationBuffer(BufferTag { rel: dst_rel, blknum, - }; + }); - let content = timeline.get_page_at_lsn(src_key, req_lsn)?; + let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?; - info!("copying block {:?} to {:?}", src_key, dst_key); + debug!("copying block {:?} to {:?}", src_key, dst_key); timeline.put_page_image(dst_key, lsn, content)?; num_blocks_copied += 1; @@ -368,6 +590,23 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab num_rels_copied += 1; } + // Copy relfilemap + for tag in timeline.list_nonrels(req_lsn)? { + match tag { + ObjectTag::FileNodeMap(db) => { + if db.spcnode == src_tablespace_id && db.dbnode == src_db_id { + let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?; + let new_tag = ObjectTag::FileNodeMap(DatabaseTag { + spcnode: tablespace_id, + dbnode: db_id, + }); + timeline.put_page_image(new_tag, lsn, img)?; + break; + } + } + _ => {} // do nothing + } + } info!( "Created database {}/{}, copied {} blocks in {} rels at {}", tablespace_id, db_id, num_blocks_copied, num_rels_copied, lsn @@ -440,8 +679,32 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca /// Subroutine of save_decoded_record(), to handle an XLOG_XACT_* records. /// /// We are currently only interested in the dropped relations. -fn save_xact_record(timeline: &dyn Timeline, lsn: Lsn, rec: &XlXactParsedRecord) -> Result<()> { - for xnode in &rec.xnodes { +fn save_xact_record( + timeline: &dyn Timeline, + lsn: Lsn, + parsed: &XlXactParsedRecord, + decoded: &DecodedWALRecord, +) -> Result<()> { + // Record update of CLOG page + let mut blknum = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; + let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + let rec = WALRecord { + lsn, + will_init: false, + rec: decoded.record.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + timeline.put_wal_record(tag, rec.clone())?; + + for subxact in &parsed.subxacts { + let subxact_blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if subxact_blknum != blknum { + blknum = subxact_blknum; + let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + timeline.put_wal_record(tag, rec.clone())?; + } + } + for xnode in &parsed.xnodes { for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { let rel_tag = RelTag { forknum, @@ -454,3 +717,113 @@ fn save_xact_record(timeline: &dyn Timeline, lsn: Lsn, rec: &XlXactParsedRecord) } Ok(()) } + +fn save_multixact_create_record( + checkpoint: &mut CheckPoint, + timeline: &dyn Timeline, + lsn: Lsn, + xlrec: &XlMultiXactCreate, + decoded: &DecodedWALRecord, +) -> Result<()> { + let rec = WALRecord { + lsn, + will_init: false, + rec: decoded.record.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + let blknum = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); + timeline.put_wal_record(tag, rec.clone())?; + + let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_blkno = + (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + // The members SLRU can, in contrast to the offsets one, be filled to almost + // the full range at once. So we need to handle wraparound. + let mut blknum = first_mbr_blkno; + loop { + // Update members page + let tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); + timeline.put_wal_record(tag, rec.clone())?; + + if blknum == last_mbr_blkno { + // last block inclusive + break; + } + + // handle wraparound + if blknum == MAX_MBR_BLKNO { + blknum = 0; + } else { + blknum += 1; + } + } + if xlrec.mid >= checkpoint.nextMulti { + checkpoint.nextMulti = xlrec.mid + 1; + } + if xlrec.moff + xlrec.nmembers > checkpoint.nextMultiOffset { + checkpoint.nextMultiOffset = xlrec.moff + xlrec.nmembers; + } + let max_mbr_xid = xlrec + .members + .iter() + .fold(0u32, |acc, mbr| if mbr.xid > acc { mbr.xid } else { acc }); + checkpoint.update_next_xid(max_mbr_xid); + Ok(()) +} + +fn save_multixact_truncate_record( + checkpoint: &mut CheckPoint, + timeline: &dyn Timeline, + lsn: Lsn, + xlrec: &XlMultiXactTruncate, +) -> Result<()> { + checkpoint.oldestMulti = xlrec.end_trunc_off; + checkpoint.oldestMultiDB = xlrec.oldest_multi_db; + let first_off_blkno = xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let last_off_blkno = xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + // Delete all the segments except the last one. The last segment can still + // contain, possibly partially, valid data. + for blknum in first_off_blkno..last_off_blkno { + let tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); + timeline.put_slru_truncate(tag, lsn)?; + } + let first_mbr_blkno = xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_blkno = xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + // The members SLRU can, in contrast to the offsets one, be filled to almost + // the full range at once. So we need to handle wraparound. + let mut blknum = first_mbr_blkno; + // Delete all the segments but the last one. The last segment can still + // contain, possibly partially, valid data. + while blknum != last_mbr_blkno { + let tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); + timeline.put_slru_truncate(tag, lsn)?; + // handle wraparound + if blknum == MAX_MBR_BLKNO { + blknum = 0; + } else { + blknum += 1; + } + } + Ok(()) +} + +fn save_relmap_record( + timeline: &dyn Timeline, + lsn: Lsn, + xlrec: &XlRelmapUpdate, + decoded: &DecodedWALRecord, +) -> Result<()> { + let rec = WALRecord { + lsn, + will_init: true, + rec: decoded.record.clone(), + main_data_offset: decoded.main_data_offset as u32, + }; + let tag = ObjectTag::FileNodeMap(DatabaseTag { + spcnode: xlrec.tsid, + dbnode: xlrec.dbid, + }); + timeline.put_wal_record(tag, rec)?; + Ok(()) +} diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index bf016a3849..3164def5a9 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -94,6 +94,21 @@ impl ObjectStore for RocksObjectStore { } } + fn get_next_key(&self, key: &ObjectKey) -> Result> { + let mut iter = self.db.raw_iterator(); + let search_key = StorageKey { + obj_key: key.clone(), + lsn: Lsn(0), + }; + iter.seek(search_key.ser()?); + if !iter.valid() { + Ok(None) + } else { + let key = StorageKey::des(iter.key().unwrap())?; + Ok(Some(key.obj_key.clone())) + } + } + fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> { self.db.put( StorageKey::ser(&StorageKey { @@ -124,6 +139,17 @@ impl ObjectStore for RocksObjectStore { Ok(Box::new(iter)) } + /// Iterate through all timeline objects + fn list_objects<'a>( + &'a self, + timeline: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result + 'a>> { + let iter = RocksObjectIter::new(&self.db, timeline, nonrel_only, lsn)?; + Ok(Box::new(iter)) + } + /// Get a list of all distinct relations in given tablespace and database. /// /// TODO: This implementation is very inefficient, it scans @@ -349,3 +375,69 @@ impl<'r> RocksObjects<'r> { Ok(None) } } + +/// +/// Iterator for `list_objects`. Returns all objects preceeding specified LSN +/// +struct RocksObjectIter<'a> { + timeline: ZTimelineId, + key: StorageKey, + nonrel_only: bool, + lsn: Lsn, + dbiter: rocksdb::DBRawIterator<'a>, +} +impl<'a> RocksObjectIter<'a> { + fn new( + db: &'a rocksdb::DB, + timeline: ZTimelineId, + nonrel_only: bool, + lsn: Lsn, + ) -> Result> { + let key = StorageKey { + obj_key: ObjectKey { + timeline, + tag: ObjectTag::FirstTag, + }, + lsn: Lsn(0), + }; + let dbiter = db.raw_iterator(); + Ok(RocksObjectIter { + key, + timeline, + nonrel_only, + lsn, + dbiter, + }) + } +} +impl<'a> Iterator for RocksObjectIter<'a> { + type Item = ObjectTag; + + fn next(&mut self) -> std::option::Option { + loop { + self.dbiter.seek(StorageKey::ser(&self.key).unwrap()); + if !self.dbiter.valid() { + return None; + } + let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); + if key.obj_key.timeline != self.timeline { + // End of this timeline + return None; + } + self.key = key.clone(); + self.key.lsn = Lsn(u64::MAX); // next seek should skip all versions + if key.lsn <= self.lsn { + // visible in this snapshot + if self.nonrel_only { + match key.obj_key.tag { + ObjectTag::RelationMetadata(_) => return None, + ObjectTag::RelationBuffer(_) => return None, + _ => return Some(key.obj_key.tag), + } + } else { + return Some(key.obj_key.tag); + } + } + } + } +} diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index e432459cc9..52c1770b0f 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -9,7 +9,6 @@ use postgres_ffi::xlog_utils::*; use postgres_ffi::XLogLongPageHeaderData; use postgres_ffi::XLogPageHeaderData; use postgres_ffi::XLogRecord; - use std::cmp::min; use thiserror::Error; use zenith_utils::lsn::Lsn; @@ -18,7 +17,9 @@ pub type Oid = u32; pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; -pub type TimestampTz = i64; +pub type MultiXactId = TransactionId; +pub type MultiXactOffset = u32; +pub type MultiXactStatus = u32; #[allow(dead_code)] pub struct WalStreamDecoder { @@ -245,6 +246,7 @@ impl DecodedBkpBlock { } pub struct DecodedWALRecord { + pub xl_xid: TransactionId, pub xl_info: u8, pub xl_rmid: u8, pub record: Bytes, // raw XLogRecord @@ -261,6 +263,24 @@ pub struct RelFileNode { pub relnode: Oid, /* relation */ } +#[repr(C)] +#[derive(Debug)] +pub struct XlRelmapUpdate { + pub dbid: Oid, /* database ID, or 0 for shared map */ + pub tsid: Oid, /* database's tablespace, or pg_global */ + pub nbytes: i32, /* size of relmap data */ +} + +impl XlRelmapUpdate { + pub fn decode(buf: &mut Bytes) -> XlRelmapUpdate { + XlRelmapUpdate { + dbid: buf.get_u32_le(), + tsid: buf.get_u32_le(), + nbytes: buf.get_i32_le(), + } + } +} + #[repr(C)] #[derive(Debug)] pub struct XlSmgrTruncate { @@ -270,9 +290,7 @@ pub struct XlSmgrTruncate { } impl XlSmgrTruncate { - pub fn decode(decoded: &DecodedWALRecord) -> XlSmgrTruncate { - let mut buf = decoded.record.clone(); - buf.advance((XLOG_SIZE_OF_XLOG_RECORD + 2) as usize); + pub fn decode(buf: &mut Bytes) -> XlSmgrTruncate { XlSmgrTruncate { blkno: buf.get_u32_le(), rnode: RelFileNode { @@ -295,9 +313,7 @@ pub struct XlCreateDatabase { } impl XlCreateDatabase { - pub fn decode(decoded: &DecodedWALRecord) -> XlCreateDatabase { - let mut buf = decoded.record.clone(); - buf.advance((XLOG_SIZE_OF_XLOG_RECORD + 2) as usize); + pub fn decode(buf: &mut Bytes) -> XlCreateDatabase { XlCreateDatabase { db_id: buf.get_u32_le(), tablespace_id: buf.get_u32_le(), @@ -392,6 +408,7 @@ impl XlHeapUpdate { /// #[derive(Debug)] pub struct XlXactParsedRecord { + pub xid: TransactionId, pub info: u8, pub xact_time: TimestampTz, pub xinfo: u32, @@ -408,15 +425,12 @@ impl XlXactParsedRecord { /// Decode a XLOG_XACT_COMMIT/ABORT/COMMIT_PREPARED/ABORT_PREPARED /// record. This should agree with the ParseCommitRecord and ParseAbortRecord /// functions in PostgreSQL (in src/backend/access/rmgr/xactdesc.c) - pub fn decode(decoded: &DecodedWALRecord) -> XlXactParsedRecord { - let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; - let mut buf = decoded.record.clone(); - buf.advance(decoded.main_data_offset); - + pub fn decode(buf: &mut Bytes, mut xid: TransactionId, xl_info: u8) -> XlXactParsedRecord { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; // The record starts with time of commit/abort - let xact_time = buf.get_i64_le(); + let xact_time = buf.get_u64_le(); let xinfo; - if decoded.xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { + if xl_info & pg_constants::XLOG_XACT_HAS_INFO != 0 { xinfo = buf.get_u32_le(); } else { xinfo = 0; @@ -466,10 +480,11 @@ impl XlXactParsedRecord { } } if xinfo & pg_constants::XACT_XINFO_HAS_TWOPHASE != 0 { - let _xid = buf.get_u32_le(); + xid = buf.get_u32_le(); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); } XlXactParsedRecord { + xid, info, xact_time, xinfo, @@ -481,6 +496,74 @@ impl XlXactParsedRecord { } } +#[repr(C)] +#[derive(Debug)] +pub struct MultiXactMember { + pub xid: TransactionId, + pub status: MultiXactStatus, +} + +impl MultiXactMember { + pub fn decode(buf: &mut Bytes) -> MultiXactMember { + MultiXactMember { + xid: buf.get_u32_le(), + status: buf.get_u32_le(), + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactCreate { + pub mid: MultiXactId, /* new MultiXact's ID */ + pub moff: MultiXactOffset, /* its starting offset in members file */ + pub nmembers: u32, /* number of member XIDs */ + pub members: Vec, +} + +impl XlMultiXactCreate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactCreate { + let mid = buf.get_u32_le(); + let moff = buf.get_u32_le(); + let nmembers = buf.get_u32_le(); + let mut members = Vec::new(); + for _ in 0..nmembers { + members.push(MultiXactMember::decode(buf)); + } + XlMultiXactCreate { + mid, + moff, + nmembers, + members, + } + } +} + +#[repr(C)] +#[derive(Debug)] +pub struct XlMultiXactTruncate { + pub oldest_multi_db: Oid, + /* to-be-truncated range of multixact offsets */ + pub start_trunc_off: MultiXactId, /* just for completeness' sake */ + pub end_trunc_off: MultiXactId, + + /* to-be-truncated range of multixact members */ + pub start_trunc_memb: MultiXactOffset, + pub end_trunc_memb: MultiXactOffset, +} + +impl XlMultiXactTruncate { + pub fn decode(buf: &mut Bytes) -> XlMultiXactTruncate { + XlMultiXactTruncate { + oldest_multi_db: buf.get_u32_le(), + start_trunc_off: buf.get_u32_le(), + end_trunc_off: buf.get_u32_le(), + start_trunc_memb: buf.get_u32_le(), + end_trunc_memb: buf.get_u32_le(), + } + } +} + /// Main routine to decode a WAL record and figure out which blocks are modified // // See xlogrecord.h for details @@ -806,6 +889,7 @@ pub fn decode_wal_record(record: Bytes) -> DecodedWALRecord { } DecodedWALRecord { + xl_xid: xlogrec.xl_xid, xl_info: xlogrec.xl_info, xl_rmid: xlogrec.xl_rmid, record, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index e1c80932a1..a4f0029674 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -4,6 +4,7 @@ //! //! We keep one WAL receiver active per timeline. +use crate::object_key::*; use crate::page_cache; use crate::restore_local_repo; use crate::waldecoder::*; @@ -15,8 +16,8 @@ use log::*; use postgres::fallible_iterator::FallibleIterator; use postgres::replication::ReplicationIter; use postgres::{Client, NoTls, SimpleQueryMessage, SimpleQueryRow}; -use postgres_ffi::pg_constants; use postgres_ffi::xlog_utils::*; +use postgres_ffi::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; use std::cmp::{max, min}; @@ -168,6 +169,10 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; + let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; + trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); + while let Some(replication_message) = physical_stream.next()? { let status_update = match replication_message { ReplicationMessage::XLogData(xlog_data) => { @@ -185,9 +190,25 @@ fn walreceiver_main( waldecoder.feed_bytes(data); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { + let old_checkpoint_bytes = checkpoint.encode(); let decoded = decode_wal_record(recdata.clone()); - restore_local_repo::save_decoded_record(&*timeline, &decoded, recdata, lsn)?; + restore_local_repo::save_decoded_record( + &mut checkpoint, + &*timeline, + &decoded, + recdata, + lsn, + )?; last_rec_lsn = lsn; + + let new_checkpoint_bytes = checkpoint.encode(); + if new_checkpoint_bytes != old_checkpoint_bytes { + timeline.put_page_image( + ObjectTag::Checkpoint, + lsn, + new_checkpoint_bytes, + )?; + } } // Update the last_valid LSN value in the page cache one more time. We updated diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f410693178..e5cdf10ff6 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -15,7 +15,8 @@ //! TODO: Even though the postgres code runs in a separate process, //! it's not a secure sandbox. //! -use bytes::{BufMut, Bytes, BytesMut}; +use byteorder::{ByteOrder, LittleEndian}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; use std::cell::RefCell; use std::fs; @@ -35,9 +36,15 @@ use tokio::time::timeout; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; +use crate::object_key::*; use crate::repository::BufferTag; use crate::repository::WALRecord; +use crate::waldecoder::XlXactParsedRecord; +use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; use crate::PageServerConf; +use postgres_ffi::nonrelfile_utils::transaction_id_set_status; +use postgres_ffi::pg_constants; +use postgres_ffi::XLogRecord; /// /// WAL Redo Manager is responsible for replaying WAL records. @@ -52,7 +59,7 @@ pub trait WalRedoManager: Send + Sync { /// the reords. fn request_redo( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, records: Vec, @@ -68,7 +75,7 @@ pub struct DummyRedoManager {} impl crate::walredo::WalRedoManager for DummyRedoManager { fn request_redo( &self, - _tag: BufferTag, + _tag: ObjectTag, _lsn: Lsn, _base_img: Option, _records: Vec, @@ -97,7 +104,7 @@ struct PostgresRedoManagerInternal { #[derive(Debug)] struct WalRedoRequest { - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, @@ -159,7 +166,7 @@ impl WalRedoManager for PostgresRedoManager { /// fn request_redo( &self, - tag: BufferTag, + tag: ObjectTag, lsn: Lsn, base_img: Option, records: Vec, @@ -186,6 +193,24 @@ impl WalRedoManager for PostgresRedoManager { } } +fn mx_offset_to_flags_offset(xid: MultiXactId) -> usize { + ((xid / pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP as u32) as u16 + % pg_constants::MULTIXACT_MEMBERGROUPS_PER_PAGE + * pg_constants::MULTIXACT_MEMBERGROUP_SIZE) as usize +} + +fn mx_offset_to_flags_bitshift(xid: MultiXactId) -> u16 { + (xid as u16) % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP + * pg_constants::MXACT_MEMBER_BITS_PER_XACT +} + +/* Location (byte offset within page) of TransactionId of given member */ +fn mx_offset_to_member_offset(xid: MultiXactId) -> usize { + mx_offset_to_flags_offset(xid) + + (pg_constants::MULTIXACT_FLAGBYTES_PER_GROUP + + (xid as u16 % pg_constants::MULTIXACT_MEMBERS_PER_MEMBERGROUP) * 4) as usize +} + /// /// WAL redo thread /// @@ -249,7 +274,151 @@ impl PostgresRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - apply_result = process.apply_wal_records(tag, base_img, records).await; + if let ObjectTag::RelationBuffer(buf_tag) = tag { + // Relational WAL records are applied using wal-redo-postgres + apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + } else { + // Non-relational WAL records we will aply ourselves. + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let mut page = BytesMut::new(); + if let Some(fpi) = base_img { + // If full-page image is provided, then use it... + page.extend_from_slice(&fpi[..]); + } else { + // otherwise initialize page with zeros + page.extend_from_slice(&ZERO_PAGE); + } + // Apply all callected WAL records + for record in records { + let mut buf = record.rec.clone(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let xlogrec = XLogRecord::from_bytes(&mut buf); + + //move to main data + // TODO probably, we should store some records in our special format + // to avoid this weird parsing on replay + let skip = (record.main_data_offset - pg_constants::SIZEOF_XLOGRECORD) as usize; + if buf.remaining() > skip { + buf.advance(skip); + } + + if xlogrec.xl_rmid == pg_constants::RM_CLOG_ID { + let info = xlogrec.xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + // The only operation we need to implement is CLOG_ZEROPAGE + page.copy_from_slice(&ZERO_PAGE); + } + } else if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { + // Transaction manager stuff + let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; + let tag_blknum = match tag { + ObjectTag::Clog(slru) => slru.blknum, + ObjectTag::TwoPhase(_) => { + assert!(info == pg_constants::XLOG_XACT_PREPARE); + trace!("Apply prepare {} record", xlogrec.xl_xid); + page.clear(); + page.extend_from_slice(&buf[..]); + continue; + } + _ => panic!("Not valid XACT object tag {:?}", tag), + }; + let parsed_xact = + XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); + if parsed_xact.info == pg_constants::XLOG_XACT_COMMIT + || parsed_xact.info == pg_constants::XLOG_XACT_COMMIT_PREPARED + { + transaction_id_set_status( + parsed_xact.xid, + pg_constants::TRANSACTION_STATUS_COMMITTED, + &mut page, + ); + for subxact in &parsed_xact.subxacts { + let blkno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // only update xids on the requested page + if tag_blknum == blkno { + transaction_id_set_status( + *subxact, + pg_constants::TRANSACTION_STATUS_SUB_COMMITTED, + &mut page, + ); + } + } + } else if parsed_xact.info == pg_constants::XLOG_XACT_ABORT + || parsed_xact.info == pg_constants::XLOG_XACT_ABORT_PREPARED + { + transaction_id_set_status( + parsed_xact.xid, + pg_constants::TRANSACTION_STATUS_ABORTED, + &mut page, + ); + for subxact in &parsed_xact.subxacts { + let blkno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + // only update xids on the requested page + if tag_blknum == blkno { + transaction_id_set_status( + *subxact, + pg_constants::TRANSACTION_STATUS_ABORTED, + &mut page, + ); + } + } + } + } else if xlogrec.xl_rmid == pg_constants::RM_MULTIXACT_ID { + // Multiexact operations + let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE + || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE + { + // Just need to ero page + page.copy_from_slice(&ZERO_PAGE); + } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { + let xlrec = XlMultiXactCreate::decode(&mut buf); + if let ObjectTag::MultiXactMembers(slru) = tag { + for i in 0..xlrec.nmembers { + let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + if blkno == slru.blknum { + // update only target block + let offset = xlrec.moff + i; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + let mut flagsval = + LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= + !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) + << bshift); + flagsval |= xlrec.members[i as usize].status << bshift; + LittleEndian::write_u32( + &mut page[flagsoff..flagsoff + 4], + flagsval, + ); + LittleEndian::write_u32( + &mut page[memberoff..memberoff + 4], + xlrec.members[i as usize].xid, + ); + } + } + } else { + // Multixact offsets SLRU + let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 + * 4) as usize; + LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); + } + } else { + panic!(); + } + } else if xlogrec.xl_rmid == pg_constants::RM_RELMAP_ID { + // Ralation map file has size 512 bytes + page.clear(); + page.extend_from_slice(&buf[12..]); // skip xl_relmap_update + assert!(page.len() == 512); // size of pg_filenode.map + } + } + + apply_result = Ok::(page.freeze()); + } let duration = start.elapsed(); diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index 56dd1f1003..e7553721b2 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -4,6 +4,7 @@ include!(concat!(env!("OUT_DIR"), "/bindings.rs")); pub mod controlfile_utils; +pub mod nonrelfile_utils; pub mod pg_constants; pub mod relfile_utils; pub mod xlog_utils; diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index d0d70210ac..337d94a2a4 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -26,6 +26,19 @@ pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; pub const SMGR_TRUNCATE_VM: u32 = 0x0002; pub const SMGR_TRUNCATE_FSM: u32 = 0x0004; +// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and +// --with-segsize=SEGSIZE, but assume the defaults for now. +pub const BLCKSZ: u16 = 8192; +pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); + +// +// constants from clog.h +// +pub const CLOG_XACTS_PER_BYTE: u32 = 4; +pub const CLOG_XACTS_PER_PAGE: u32 = BLCKSZ as u32 * CLOG_XACTS_PER_BYTE; +pub const CLOG_BITS_PER_XACT: u8 = 2; +pub const CLOG_XACT_BITMASK: u8 = (1 << CLOG_BITS_PER_XACT) - 1; + // // Constants from visbilitymap.h // @@ -33,6 +46,14 @@ pub const SIZE_OF_PAGE_HEADER: u16 = 24; pub const BITS_PER_HEAPBLOCK: u16 = 2; pub const HEAPBLOCKS_PER_PAGE: u16 = (BLCKSZ - SIZE_OF_PAGE_HEADER) * 8 / BITS_PER_HEAPBLOCK; +pub const TRANSACTION_STATUS_IN_PROGRESS: u8 = 0x00; +pub const TRANSACTION_STATUS_COMMITTED: u8 = 0x01; +pub const TRANSACTION_STATUS_ABORTED: u8 = 0x02; +pub const TRANSACTION_STATUS_SUB_COMMITTED: u8 = 0x03; + +pub const CLOG_ZEROPAGE: u8 = 0x00; +pub const CLOG_TRUNCATE: u8 = 0x10; + // From xact.h pub const XLOG_XACT_COMMIT: u8 = 0x00; pub const XLOG_XACT_PREPARE: u8 = 0x10; @@ -40,6 +61,10 @@ pub const XLOG_XACT_ABORT: u8 = 0x20; pub const XLOG_XACT_COMMIT_PREPARED: u8 = 0x30; pub const XLOG_XACT_ABORT_PREPARED: u8 = 0x40; +// From srlu.h +pub const SLRU_PAGES_PER_SEGMENT: u32 = 32; +pub const SLRU_SEG_SIZE: usize = BLCKSZ as usize * SLRU_PAGES_PER_SEGMENT as usize; + /* mask for filtering opcodes out of xl_info */ pub const XLOG_XACT_OPMASK: u8 = 0x70; pub const XLOG_HEAP_OPMASK: u8 = 0x70; @@ -60,8 +85,32 @@ pub const XACT_XINFO_HAS_TWOPHASE: u32 = 1u32 << 4; // pub const XACT_XINFO_HAS_GID: u32 = 1u32 << 7; // From pg_control.h and rmgrlist.h +pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; +pub const DB_SHUTDOWNED: u32 = 1; + +// From multixact.h +pub const FIRST_MULTIXACT_ID: u32 = 1; +pub const MAX_MULTIXACT_ID: u32 = 0xFFFFFFFF; + +pub const XLOG_MULTIXACT_ZERO_OFF_PAGE: u8 = 0x00; +pub const XLOG_MULTIXACT_ZERO_MEM_PAGE: u8 = 0x10; +pub const XLOG_MULTIXACT_CREATE_ID: u8 = 0x20; +pub const XLOG_MULTIXACT_TRUNCATE_ID: u8 = 0x30; + +pub const MULTIXACT_OFFSETS_PER_PAGE: u16 = BLCKSZ / 4; +pub const MXACT_MEMBER_BITS_PER_XACT: u16 = 8; +pub const MXACT_MEMBER_FLAGS_PER_BYTE: u16 = 1; +pub const MULTIXACT_FLAGBYTES_PER_GROUP: u16 = 4; +pub const MULTIXACT_MEMBERS_PER_MEMBERGROUP: u16 = + MULTIXACT_FLAGBYTES_PER_GROUP * MXACT_MEMBER_FLAGS_PER_BYTE; +/* size in bytes of a complete group */ +pub const MULTIXACT_MEMBERGROUP_SIZE: u16 = + 4 * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP; +pub const MULTIXACT_MEMBERGROUPS_PER_PAGE: u16 = BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE; +pub const MULTIXACT_MEMBERS_PER_PAGE: u16 = + MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP; // From heapam_xlog.h pub const XLOG_HEAP_INSERT: u8 = 0x00; @@ -101,11 +150,6 @@ pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const SIZEOF_XLOGRECORD: u32 = 24; -// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and -// --with-segsize=SEGSIZE, but assume the defaults for now. -pub const BLCKSZ: u16 = 8192; -pub const RELSEG_SIZE: u32 = 1024 * 1024 * 1024 / (BLCKSZ as u32); - // // from xlogrecord.h // @@ -136,3 +180,8 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; /* FIXME: pageserver should request wal_seg_size from compute node */ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; + +pub const XLOG_BLCKSZ: usize = 8192; +pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; +pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; +pub const XLP_LONG_HEADER: u16 = 0x0002; diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index aabbf8628b..bc4a30a423 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -9,14 +9,16 @@ use crate::pg_constants; use crate::CheckPoint; +use crate::ControlFileData; use crate::FullTransactionId; use crate::XLogLongPageHeaderData; use crate::XLogPageHeaderData; use crate::XLogRecord; - use crate::XLOG_PAGE_MAGIC; + use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, Bytes}; +use bytes::{BufMut, BytesMut}; use crc32c::*; use log::*; use std::cmp::min; @@ -35,12 +37,15 @@ pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; pub const XLOG_SIZE_OF_XLOG_SHORT_PHD: usize = std::mem::size_of::(); pub const XLOG_SIZE_OF_XLOG_LONG_PHD: usize = std::mem::size_of::(); pub const XLOG_SIZE_OF_XLOG_RECORD: usize = std::mem::size_of::(); +pub const SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT: usize = 1 * 2; pub type XLogRecPtr = u64; pub type TimeLineID = u32; pub type TimestampTz = u64; pub type XLogSegNo = u64; +const XID_CHECKPOINT_INTERVAL: u32 = 1024; + #[allow(non_snake_case)] pub fn XLogSegmentsPerXLogId(wal_segsz_bytes: usize) -> XLogSegNo { (0x100000000u64 / wal_segsz_bytes as u64) as XLogSegNo @@ -368,6 +373,7 @@ impl CheckPoint { // Next XID should be greater than new_xid. // Also take in account 32-bit wrap-around. pub fn update_next_xid(&mut self, xid: u32) { + let xid = xid.wrapping_add(XID_CHECKPOINT_INTERVAL - 1) & !(XID_CHECKPOINT_INTERVAL - 1); let full_xid = self.nextXid.value; let new_xid = std::cmp::max(xid + 1, pg_constants::FIRST_NORMAL_TRANSACTION_ID); let old_xid = full_xid as u32; @@ -383,3 +389,58 @@ impl CheckPoint { } } } + +pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { + let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); + + let hdr = XLogLongPageHeaderData { + std: { + XLogPageHeaderData { + xlp_magic: XLOG_PAGE_MAGIC as u16, + xlp_info: pg_constants::XLP_LONG_HEADER, + xlp_tli: 1, // FIXME: always use Postgres timeline 1 + xlp_pageaddr: pg_control.checkPoint - XLOG_SIZE_OF_XLOG_LONG_PHD as u64, + xlp_rem_len: 0, + } + }, + xlp_sysid: pg_control.system_identifier, + xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, + xlp_xlog_blcksz: XLOG_BLCKSZ as u32, + }; + + let hdr_bytes = hdr.encode(); + seg_buf.extend_from_slice(&hdr_bytes); + + let rec_hdr = XLogRecord { + xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD + + SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT + + SIZEOF_CHECKPOINT) as u32, + xl_xid: 0, //0 is for InvalidTransactionId + xl_prev: 0, + xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN, + xl_rmid: pg_constants::RM_XLOG_ID, + xl_crc: 0, + }; + + let mut rec_shord_hdr_bytes = BytesMut::new(); + rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); + rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8); + + let rec_bytes = rec_hdr.encode(); + let checkpoint_bytes = pg_control.checkPointCopy.encode(); + + //calculate record checksum + let mut crc = 0; + crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]); + crc = crc32c_append(crc, &checkpoint_bytes[..]); + crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]); + + seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]); + seg_buf.put_u32_le(crc); + seg_buf.extend_from_slice(&rec_shord_hdr_bytes); + seg_buf.extend_from_slice(&checkpoint_bytes); + + //zero out remainig file + seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); + seg_buf.freeze() +} diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index 773fcb5949..116e628c8f 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -47,8 +47,8 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 1 - assert row['deleted'] == 1 + assert row['truncated'] == 30 + assert row['deleted'] == 3 # Insert two more rows and run GC. print("Inserting two more rows and running GC") @@ -60,7 +60,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 1 + assert row['truncated'] == 30 assert row['deleted'] == 2 # Insert one more row. It creates one more page version, but doesn't affect the @@ -73,7 +73,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 1 + assert row['truncated'] == 30 assert row['deleted'] == 1 # Run GC again, with no changes in the database. Should not remove anything. @@ -82,7 +82,7 @@ def test_gc(zenith_cli, pageserver, postgres, pg_bin): print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 0 + assert row['truncated'] == 30 assert row['deleted'] == 0 # diff --git a/test_runner/batch_others/test_restart_compute.py b/test_runner/batch_others/test_restart_compute.py index d11842e35f..dc3be94bfe 100644 --- a/test_runner/batch_others/test_restart_compute.py +++ b/test_runner/batch_others/test_restart_compute.py @@ -19,7 +19,7 @@ def test_restart_compute(zenith_cli, pageserver, postgres, pg_bin): cur.execute("INSERT INTO foo VALUES ('bar')") # Stop and restart the Postgres instance - pg.stop().start() + pg.stop_and_destroy().create_start('test_restart_compute') with closing(pg.connect()) as conn: with conn.cursor() as cur: