diff --git a/Cargo.lock b/Cargo.lock index afc2fbbc71..c8b69e0a7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,7 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 - [[package]] name = "ahash" version = "0.4.7" @@ -1232,24 +1230,6 @@ dependencies = [ "tokio-postgres 0.7.1", ] -[[package]] -name = "postgres-protocol" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" -dependencies = [ - "base64", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand", - "sha2", - "stringprep", -] - [[package]] name = "postgres-protocol" version = "0.6.1" @@ -1269,14 +1249,21 @@ dependencies = [ ] [[package]] -name = "postgres-types" -version = "0.2.1" +name = "postgres-protocol" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" dependencies = [ + "base64", + "byteorder", "bytes", "fallible-iterator", - "postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", ] [[package]] @@ -1289,6 +1276,17 @@ dependencies = [ "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", ] +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "postgres_ffi" version = "0.1.0" diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index b6cdca5f41..4f814b7b04 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use std::time::SystemTime; use tar::{Builder, EntryType, Header}; -use crate::object_key::{DatabaseTag, ObjectTag}; +use crate::relish::*; use crate::repository::Timeline; use postgres_ffi::xlog_utils::*; use postgres_ffi::*; @@ -32,9 +32,6 @@ pub struct Basebackup<'a> { timeline: &'a Arc, lsn: Lsn, prev_record_lsn: Lsn, - slru_buf: [u8; pg_constants::SLRU_SEG_SIZE], - slru_segno: u32, - slru_path: &'static str, } impl<'a> Basebackup<'a> { @@ -49,9 +46,6 @@ impl<'a> Basebackup<'a> { timeline, lsn, prev_record_lsn, - slru_path: "", - slru_segno: u32::MAX, - slru_buf: [0u8; pg_constants::SLRU_SEG_SIZE], } } @@ -82,21 +76,19 @@ impl<'a> Basebackup<'a> { // It allows to easily construct SLRU segments. for obj in self.timeline.list_nonrels(self.lsn)? { match obj { - ObjectTag::Clog(slru) => self.add_slru_segment("pg_xact", &obj, slru.blknum)?, - ObjectTag::MultiXactMembers(slru) => { - self.add_slru_segment("pg_multixact/members", &obj, slru.blknum)? + RelishTag::Slru { slru, segno } => { + self.add_slru_segment(slru, segno)?; } - ObjectTag::MultiXactOffsets(slru) => { - self.add_slru_segment("pg_multixact/offsets", &obj, slru.blknum)? + RelishTag::FileNodeMap { spcnode, dbnode } => { + self.add_relmap_file(spcnode, dbnode)?; + } + RelishTag::TwoPhase { xid } => { + self.add_twophase_file(xid)?; } - ObjectTag::FileNodeMap(db) => self.add_relmap_file(&obj, &db)?, - ObjectTag::TwoPhase(prepare) => self.add_twophase_file(&obj, prepare.xid)?, _ => {} } } - // write last non-completed SLRU segment (if any) - self.finish_slru_segment()?; // Generate pg_control and bootstrap WAL segment. self.add_pgcontrol_file()?; self.ar.finish()?; @@ -107,45 +99,33 @@ impl<'a> Basebackup<'a> { // // Generate SLRU segment files from repository. Path identifies SLRU kind (pg_xact, pg_multixact/members, ...). // - fn add_slru_segment( - &mut self, - path: &'static str, - tag: &ObjectTag, - blknum: u32, - ) -> anyhow::Result<()> { - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; - // Zero length image indicates truncated segment: just skip it - if !img.is_empty() { - assert!(img.len() == pg_constants::BLCKSZ as usize); - let segno = blknum / pg_constants::SLRU_PAGES_PER_SEGMENT; - if self.slru_path != "" && (self.slru_segno != segno || self.slru_path != path) { - // Switch to new segment: save old one - let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); - let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; - self.ar.append(&header, &self.slru_buf[..])?; - self.slru_buf = [0u8; pg_constants::SLRU_SEG_SIZE]; // reinitialize segment buffer - } - self.slru_segno = segno; - self.slru_path = path; - let offs_start = (blknum % pg_constants::SLRU_PAGES_PER_SEGMENT) as usize - * pg_constants::BLCKSZ as usize; - let offs_end = offs_start + pg_constants::BLCKSZ as usize; - self.slru_buf[offs_start..offs_end].copy_from_slice(&img); - } - Ok(()) - } + fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> { + let nblocks = self + .timeline + .get_rel_size(RelishTag::Slru { slru, segno }, self.lsn)?; - // - // We flush SLRU segments to the tarball once them are completed. - // This method is used to flush last (may be incompleted) segment. - // - fn finish_slru_segment(&mut self) -> anyhow::Result<()> { - if self.slru_path != "" { - // is there is some incompleted segment - let segname = format!("{}/{:>04X}", self.slru_path, self.slru_segno); - let header = new_tar_header(&segname, pg_constants::SLRU_SEG_SIZE as u64)?; - self.ar.append(&header, &self.slru_buf[..])?; + let mut slru_buf: Vec = + Vec::with_capacity(nblocks as usize * pg_constants::BLCKSZ as usize); + for blknum in 0..nblocks { + let img = self.timeline.get_page_at_lsn_nowait( + RelishTag::Slru { slru, segno }, + blknum, + self.lsn, + )?; + assert!(img.len() == pg_constants::BLCKSZ as usize); + + slru_buf.extend_from_slice(&img); } + + let dir = match slru { + SlruKind::Clog => "pg_xact", + SlruKind::MultiXactMembers => "pg_multixact/members", + SlruKind::MultiXactOffsets => "pg_multixact/offsets", + }; + + let segname = format!("{}/{:>04X}", dir, segno); + let header = new_tar_header(&segname, slru_buf.len() as u64)?; + self.ar.append(&header, slru_buf.as_slice())?; Ok(()) } @@ -153,10 +133,13 @@ impl<'a> Basebackup<'a> { // Extract pg_filenode.map files from repository // Along with them also send PG_VERSION for each database. // - fn add_relmap_file(&mut self, tag: &ObjectTag, db: &DatabaseTag) -> anyhow::Result<()> { - trace!("add_relmap_file {:?}", db); - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; - let path = if db.spcnode == pg_constants::GLOBALTABLESPACE_OID { + fn add_relmap_file(&mut self, spcnode: u32, dbnode: u32) -> anyhow::Result<()> { + let img = self.timeline.get_page_at_lsn_nowait( + RelishTag::FileNodeMap { spcnode, dbnode }, + 0, + self.lsn, + )?; + let path = if spcnode == pg_constants::GLOBALTABLESPACE_OID { let dst_path = "PG_VERSION"; let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes(); let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; @@ -169,19 +152,19 @@ impl<'a> Basebackup<'a> { String::from("global/pg_filenode.map") // filenode map for global tablespace } else { // User defined tablespaces are not supported - assert!(db.spcnode == pg_constants::DEFAULTTABLESPACE_OID); + assert!(spcnode == pg_constants::DEFAULTTABLESPACE_OID); // Append dir path for each database - let path = format!("base/{}", db.dbnode); + let path = format!("base/{}", dbnode); let header = new_tar_header_dir(&path)?; self.ar.append(&header, &mut io::empty())?; - let dst_path = format!("base/{}/PG_VERSION", db.dbnode); + let dst_path = format!("base/{}/PG_VERSION", dbnode); let version_bytes = pg_constants::PG_MAJORVERSION.as_bytes(); let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; self.ar.append(&header, &version_bytes[..])?; - format!("base/{}/pg_filenode.map", db.dbnode) + format!("base/{}/pg_filenode.map", dbnode) }; assert!(img.len() == 512); let header = new_tar_header(&path, img.len() as u64)?; @@ -192,12 +175,14 @@ impl<'a> Basebackup<'a> { // // Extract twophase state files // - fn add_twophase_file(&mut self, tag: &ObjectTag, xid: TransactionId) -> anyhow::Result<()> { + fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> { // Include in tarball two-phase files only of in-progress transactions if self.timeline.get_tx_status(xid, self.lsn)? == pg_constants::TRANSACTION_STATUS_IN_PROGRESS { - let img = self.timeline.get_page_at_lsn_nowait(*tag, self.lsn)?; + let img = + self.timeline + .get_page_at_lsn_nowait(RelishTag::TwoPhase { xid }, 0, self.lsn)?; let mut buf = BytesMut::new(); buf.extend_from_slice(&img[..]); let crc = crc32c::crc32c(&img[..]); @@ -214,12 +199,12 @@ impl<'a> Basebackup<'a> { // Also send zenith.signal file with extra bootstrap data. // fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { - let checkpoint_bytes = self - .timeline - .get_page_at_lsn_nowait(ObjectTag::Checkpoint, self.lsn)?; - let pg_control_bytes = self - .timeline - .get_page_at_lsn_nowait(ObjectTag::ControlFile, self.lsn)?; + let checkpoint_bytes = + self.timeline + .get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, self.lsn)?; + let pg_control_bytes = + self.timeline + .get_page_at_lsn_nowait(RelishTag::ControlFile, 0, self.lsn)?; let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; @@ -262,6 +247,7 @@ impl<'a> Basebackup<'a> { let wal_file_path = format!("pg_wal/{}", wal_file_name); let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?; let wal_seg = generate_wal_segment(&pg_control); + assert!(wal_seg.len() == pg_constants::WAL_SEGMENT_SIZE); self.ar.append(&header, &wal_seg[..])?; Ok(()) } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4bdc1b4e3d..0f8be62b4b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -15,6 +15,7 @@ pub mod object_repository; pub mod object_store; pub mod page_cache; pub mod page_service; +pub mod relish; pub mod repository; pub mod restore_local_repo; pub mod rocksdb_storage; diff --git a/pageserver/src/object_key.rs b/pageserver/src/object_key.rs index a087ca3a21..bcae9e335b 100644 --- a/pageserver/src/object_key.rs +++ b/pageserver/src/object_key.rs @@ -1,5 +1,8 @@ -use crate::repository::{BufferTag, RelTag}; -use crate::waldecoder::TransactionId; +//! +//! Common structs shared by object_repository.rs and object_store.rs. +//! + +use crate::relish::RelishTag; use crate::ZTimelineId; use serde::{Deserialize, Serialize}; @@ -8,6 +11,7 @@ use serde::{Deserialize, Serialize}; /// repository. It is shared between object_repository.rs and object_store.rs. /// It is mostly opaque to ObjectStore, it just stores and retrieves objects /// using the key given by the caller. +/// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ObjectKey { pub timeline: ZTimelineId, @@ -15,70 +19,31 @@ pub struct ObjectKey { } /// -/// Non-relation transaction status files (clog (a.k.a. pg_xact) and pg_multixact) -/// in Postgres are handled by SLRU (Simple LRU) buffer, hence the name. -/// -/// These files are global for a postgres instance. -/// -/// These files are divided into segments, which are divided into pages -/// of the same BLCKSZ as used for relation files. -/// -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -pub struct SlruBufferTag { - pub blknum: u32, -} - -/// -/// Special type of Postgres files: pg_filenode.map is needed to map -/// catalog table OIDs to filenode numbers, which define filename. -/// -/// Each database has a map file for its local mapped catalogs, -/// and there is a separate map file for shared catalogs. -/// -/// These files have untypical size of 512 bytes. -/// -/// See PostgreSQL relmapper.c for details. -/// -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -pub struct DatabaseTag { - pub spcnode: u32, - pub dbnode: u32, -} - -/// -/// Non-relation files that keep state for prepared transactions. -/// Unlike other files these are not divided into pages. -/// -/// See PostgreSQL twophase.c for details. -/// -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -pub struct PrepareTag { - pub xid: TransactionId, -} - /// ObjectTag is a part of ObjectKey that is specific to the type of /// the stored object. /// /// NB: the order of the enum values is significant! In particular, /// rocksdb_storage.rs assumes that TimelineMetadataTag is first /// +/// Buffer is the kind of object that is accessible by the public +/// get_page_at_lsn() / put_page_image() / put_wal_record() functions in +/// the repository.rs interface. The rest are internal objects stored in +/// the key-value store, to store various metadata. They're not directly +/// accessible outside object_repository.rs +/// #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub enum ObjectTag { // dummy tag preceeding all other keys FirstTag, + + // Metadata about a timeline. Not versioned. TimelineMetadataTag, - // Special entry that represents PostgreSQL checkpoint. - // We use it to track fields needed to restore controlfile checkpoint. - Checkpoint, - // Various types of non-relation files. - // We need them to bootstrap compute node. - ControlFile, - Clog(SlruBufferTag), - MultiXactMembers(SlruBufferTag), - MultiXactOffsets(SlruBufferTag), - FileNodeMap(DatabaseTag), - TwoPhase(PrepareTag), - // put relations at the end of enum to allow efficient iterations through non-rel objects - RelationMetadata(RelTag), - RelationBuffer(BufferTag), + + // These objects store metadata about one relish. Currently it's used + // just to track the relish's size. It's not used for non-blocky relishes + // at all. + RelationMetadata(RelishTag), + + // These are the pages exposed in the public Repository/Timeline interface. + Buffer(RelishTag, u32), } diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index d043157007..4000973d6d 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -14,6 +14,7 @@ //! key-value store for each timeline. use crate::object_store::ObjectStore; +use crate::relish::*; use crate::repository::*; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; @@ -162,18 +163,6 @@ impl Repository for ObjectRepository { &ObjectValue::ser(&val)?, )?; - // Copy non-rel objects - for tag in src_timeline.list_nonrels(at_lsn)? { - match tag { - ObjectTag::TimelineMetadataTag => {} // skip it - _ => { - let img = src_timeline.get_page_at_lsn_nowait(tag, at_lsn)?; - let val = ObjectValue::Page(PageEntry::Page(img)); - let key = ObjectKey { timeline: dst, tag }; - self.obj_store.put(&key, at_lsn, &ObjectValue::ser(&val)?)?; - } - } - } Ok(()) } } @@ -187,7 +176,7 @@ impl Repository for ObjectRepository { /// To prevent memory overflow metadata only of the most recent version of relation is cached. /// If page server needs to access some older version, then object storage has to be accessed. /// -struct RelMetadata { +struct RelishMetadata { size: u32, // size of relation last_updated: Lsn, // lsn of last metadata update (used to determine when cache value can be used) } @@ -227,7 +216,7 @@ pub struct ObjectTimeline { ancestor_timeline: Option, ancestor_lsn: Lsn, - rel_meta: RwLock>, + rel_meta: RwLock>, } impl ObjectTimeline { @@ -266,19 +255,28 @@ impl Timeline for ObjectTimeline { //------------------------------------------------------------------------------ /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { + fn get_page_at_lsn(&self, tag: RelishTag, blknum: u32, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; - self.get_page_at_lsn_nowait(tag, lsn) + self.get_page_at_lsn_nowait(tag, blknum, lsn) } - fn get_page_at_lsn_nowait(&self, tag: ObjectTag, req_lsn: Lsn) -> Result { + fn get_page_at_lsn_nowait(&self, rel: RelishTag, blknum: u32, req_lsn: Lsn) -> Result { + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } + const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; // Look up the page entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. + let object_tag = ObjectTag::Buffer(rel, blknum); let searchkey = ObjectKey { timeline: self.timelineid, - tag, + tag: object_tag, }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?; @@ -291,14 +289,16 @@ impl Timeline for ObjectTimeline { } ObjectValue::Page(PageEntry::WALRecord(_rec)) => { // Request the WAL redo manager to apply the WAL records for us. - let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; - page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; + let (base_img, records) = self.collect_records_for_apply(rel, blknum, lsn)?; + page_img = self + .walredo_mgr + .request_redo(rel, blknum, lsn, base_img, records)?; // Garbage collection assumes that we remember the materialized page // version. Otherwise we could opt to not do it, with the downside that // the next GetPage@LSN call of the same page version would have to // redo the WAL again. - self.put_page_image(tag, lsn, page_img.clone(), false)?; + self.put_page_image(rel, blknum, lsn, page_img.clone(), false)?; } ObjectValue::SLRUTruncate => page_img = Bytes::from_static(&ZERO_PAGE), _ => bail!("Invalid object kind, expected a page entry or SLRU truncate"), @@ -310,19 +310,23 @@ impl Timeline for ObjectTimeline { "Returning page with LSN {:X}/{:X} for {:?} from {} (request {})", page_lsn_hi, page_lsn_lo, - tag, + object_tag, lsn, req_lsn ); return Ok(page_img); } - trace!("page {:?} at {} not found", tag, req_lsn); + trace!("page {:?} at {} not found", object_tag, req_lsn); Ok(Bytes::from_static(&ZERO_PAGE)) /* return Err("could not find page image")?; */ } /// Get size of relation - fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result { + fn get_rel_size(&self, rel: RelishTag, lsn: Lsn) -> Result { + if !rel.is_blocky() { + bail!("invalid get_rel_size request for non-blocky relish {}", rel); + } + let lsn = self.wait_lsn(lsn)?; match self.relsize_get_nowait(rel, lsn)? { @@ -332,7 +336,7 @@ impl Timeline for ObjectTimeline { } /// Does relation exist at given LSN? - fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { + fn get_rel_exists(&self, rel: RelishTag, req_lsn: Lsn) -> Result { let lsn = self.wait_lsn(req_lsn)?; { let rel_meta = self.rel_meta.read().unwrap(); @@ -353,8 +357,34 @@ impl Timeline for ObjectTimeline { } /// Get a list of non-relational objects - fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result + 'a>> { - self.obj_store.list_objects(self.timelineid, true, lsn) + fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result> { + // List all non-relations in this timeline. + let mut all_rels = self.obj_store.list_nonrels(self.timelineid, lsn)?; + + // Also list all nonrelations in ancestor timelines. If a nonrelation hasn't been modified + // after the fork, there will be no trace of it in the object store with the current + // timeline id. + let mut prev_timeline: Option = self.ancestor_timeline; + let mut lsn = self.ancestor_lsn; + while let Some(timeline) = prev_timeline { + let this_rels = self.obj_store.list_nonrels(timeline, lsn)?; + + for rel in this_rels { + all_rels.insert(rel); + } + + // Load ancestor metadata. + let v = self + .obj_store + .get(&timeline_metadata_key(timeline), Lsn(0)) + .with_context(|| "timeline not found in repository")?; + let metadata = ObjectValue::des_timeline_metadata(&v)?; + + prev_timeline = metadata.ancestor_timeline; + lsn = metadata.ancestor_lsn; + } + + Ok(all_rels) } /// Get a list of all distinct relations in given tablespace and database. @@ -400,31 +430,39 @@ impl Timeline for ObjectTimeline { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> { + fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } + let lsn = rec.lsn; - self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?; - debug!("put_wal_record {:?} at {}", tag, lsn); + self.put_page_entry(&rel, blknum, lsn, PageEntry::WALRecord(rec))?; + debug!("put_wal_record {} at {}", rel, lsn); - if let ObjectTag::RelationBuffer(tag) = tag { + if rel.is_blocky() { // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + let old_nblocks = self.relsize_get_nowait(rel, lsn)?.unwrap_or(0); - if tag.blknum >= old_nblocks { - let new_nblocks = tag.blknum + 1; + if blknum >= old_nblocks { + let new_nblocks = blknum + 1; trace!( - "Extended relation {} from {} to {} blocks at {}", - tag.rel, + "Extended {} from {} to {} blocks at {}", + rel, old_nblocks, new_nblocks, lsn ); - self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; + self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(new_nblocks))?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( - tag.rel, - RelMetadata { + rel, + RelishMetadata { size: new_nblocks, last_updated: lsn, }, @@ -435,23 +473,12 @@ impl Timeline for ObjectTimeline { } /// Unlink relation. This method is used for marking dropped relations. - fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> { + fn put_unlink(&self, rel_tag: RelishTag, lsn: Lsn) -> Result<()> { self.put_relsize_entry(&rel_tag, lsn, RelationSizeEntry::Unlink)?; Ok(()) } - /// Truncate SLRU segment - fn put_slru_truncate(&self, tag: ObjectTag, lsn: Lsn) -> Result<()> { - let key = ObjectKey { - timeline: self.timelineid, - tag, - }; - let val = ObjectValue::SLRUTruncate; - self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; - Ok(()) - } - fn get_next_tag(&self, tag: ObjectTag) -> Result> { let key = ObjectKey { timeline: self.timelineid, @@ -478,38 +505,47 @@ impl Timeline for ObjectTimeline { /// fn put_page_image( &self, - tag: ObjectTag, + rel: RelishTag, + blknum: u32, lsn: Lsn, img: Bytes, update_meta: bool, ) -> Result<()> { - self.put_page_entry(&tag, lsn, PageEntry::Page(img))?; + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } - debug!("put_page_image rel {:?} at {}", tag, lsn); + self.put_page_entry(&rel, blknum, lsn, PageEntry::Page(img))?; + + debug!("put_page_image {} at {}", rel, lsn); if !update_meta { return Ok(()); } - if let ObjectTag::RelationBuffer(tag) = tag { + if rel.is_blocky() { // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + let old_nblocks = self.relsize_get_nowait(rel, lsn)?.unwrap_or(0); - if tag.blknum >= old_nblocks { - let new_nblocks = tag.blknum + 1; + if blknum >= old_nblocks { + let new_nblocks = blknum + 1; trace!( - "Extended relation {} from {} to {} blocks at {}", - tag.rel, + "Extended {} from {} to {} blocks at {}", + rel, old_nblocks, new_nblocks, lsn ); - self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?; + self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(new_nblocks))?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( - tag.rel, - RelMetadata { + rel, + RelishMetadata { size: new_nblocks, last_updated: lsn, }, @@ -523,14 +559,18 @@ impl Timeline for ObjectTimeline { /// Adds a relation-wide WAL record (like truncate) to the repository, /// associating it with all pages started with specified block number /// - fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> { + fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()> { + if !rel.is_blocky() { + bail!("invalid truncation for non-blocky relish {}", rel); + } + info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(nblocks))?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( rel, - RelMetadata { + RelishMetadata { size: nblocks, last_updated: lsn, }, @@ -641,14 +681,11 @@ impl Timeline for ObjectTimeline { let now = Instant::now(); let mut prepared_horizon = Lsn(u64::MAX); // Iterate through all objects in timeline - for obj in self - .obj_store - .list_objects(self.timelineid, false, last_lsn)? - { + for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { result.inspected += 1; match obj { // Prepared transactions - ObjectTag::TwoPhase(prepare) => { + ObjectTag::Buffer(RelishTag::TwoPhase { xid }, _blknum) => { let key = ObjectKey { timeline: self.timelineid, tag: obj, @@ -656,7 +693,7 @@ impl Timeline for ObjectTimeline { for vers in self.obj_store.object_versions(&key, horizon)? { let lsn = vers.0; prepared_horizon = Lsn::min(lsn, prepared_horizon); - if self.get_tx_status(prepare.xid, horizon)? + if self.get_tx_status(xid, horizon)? != pg_constants::TRANSACTION_STATUS_IN_PROGRESS { self.obj_store.unlink(&key, lsn)?; @@ -693,96 +730,62 @@ impl Timeline for ObjectTimeline { } } } - ObjectTag::RelationBuffer(tag) => { - // Reconstruct page at horizon unless relation was dropped - // and delete all older versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - result.truncated += 1; - last_version = false; - if let Some(rel_size) = - self.relsize_get_nowait(tag.rel, last_lsn)? - { - if rel_size > tag.blknum { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(obj, lsn)?; - continue; - } - debug!("Drop last block {} of relation {:?} at {} because it is beyond relation size {}", tag.blknum, tag.rel, lsn, rel_size); - } else { + ObjectTag::Buffer(rel, blknum) => { + if rel.is_blocky() { + // Reconstruct page at horizon unless relation was dropped + // and delete all older versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + result.truncated += 1; + last_version = false; if let Some(rel_size) = - self.relsize_get_nowait(tag.rel, last_lsn)? + self.relsize_get_nowait(rel, last_lsn)? { - debug!("Preserve block {} of relation {:?} at {} because relation has size {} at {}", tag.rel, tag, lsn, rel_size, last_lsn); - continue; + if rel_size > blknum { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(rel, blknum, lsn)?; + continue; + } + debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); + } else { + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); + continue; + } + debug!("Relation {} was dropped at {}", rel, lsn); } - debug!("Relation {:?} was dropped at {}", tag.rel, lsn); + // relation was dropped or truncated so this block can be removed } - // relation was dropped or truncated so this block can be removed + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; } - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } - // SLRU-s - ObjectTag::Clog(_) - | ObjectTag::MultiXactOffsets(_) - | ObjectTag::MultiXactMembers(_) => { - // Remove old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self - .obj_store - .object_versions(&key, Lsn::min(prepared_horizon, horizon))? - { - let lsn = vers.0; - if last_version { - let content = vers.1; - match ObjectValue::des(&content[..])? { - ObjectValue::SLRUTruncate => { - self.obj_store.unlink(&key, lsn)?; - result.slru_deleted += 1; - } - ObjectValue::Page(PageEntry::WALRecord(_)) => { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(obj, lsn)?; - } - _ => {} // do nothing if already materialized + } else { + // versioned always materialized objects: no need to reconstruct pages + + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + // preserve last version + last_version = false; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + result.chkp_deleted += 1; } - last_version = false; - } else { - // delete deteriorated version - self.obj_store.unlink(&key, lsn)?; - result.slru_deleted += 1; - } - } - } - // versioned always materialized objects: no need to reconstruct pages - ObjectTag::Checkpoint | ObjectTag::ControlFile => { - // Remove old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - // preserve last version - last_version = false; - } else { - // delete deteriorated version - self.obj_store.unlink(&key, lsn)?; - result.chkp_deleted += 1; } } } @@ -806,7 +809,7 @@ impl ObjectTimeline { /// /// The caller must ensure that WAL has been received up to 'lsn'. /// - fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result> { + fn relsize_get_nowait(&self, rel: RelishTag, lsn: Lsn) -> Result> { { let rel_meta = self.rel_meta.read().unwrap(); if let Some(meta) = rel_meta.get(&rel) { @@ -854,7 +857,8 @@ impl ObjectTimeline { /// fn collect_records_for_apply( &self, - tag: ObjectTag, + rel: RelishTag, + blknum: u32, lsn: Lsn, ) -> Result<(Option, Vec)> { let mut base_img: Option = None; @@ -864,7 +868,7 @@ impl ObjectTimeline { // old page image. let searchkey = ObjectKey { timeline: self.timelineid, - tag, + tag: ObjectTag::Buffer(rel, blknum), }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; while let Some((_key, value)) = iter.next().transpose()? { @@ -966,17 +970,17 @@ impl ObjectTimeline { // // Helper functions to store different kinds of objects to the underlying ObjectStore // - fn put_page_entry(&self, tag: &ObjectTag, lsn: Lsn, val: PageEntry) -> Result<()> { + fn put_page_entry(&self, tag: &RelishTag, blknum: u32, lsn: Lsn, val: PageEntry) -> Result<()> { let key = ObjectKey { timeline: self.timelineid, - tag: *tag, + tag: ObjectTag::Buffer(*tag, blknum), }; let val = ObjectValue::Page(val); self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?) } - fn put_relsize_entry(&self, tag: &RelTag, lsn: Lsn, val: RelationSizeEntry) -> Result<()> { + fn put_relsize_entry(&self, tag: &RelishTag, lsn: Lsn, val: RelationSizeEntry) -> Result<()> { let key = relation_size_key(self.timelineid, *tag); let val = ObjectValue::RelationSize(val); @@ -1060,7 +1064,7 @@ pub enum RelationSizeEntry { Unlink, } -const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey { +const fn relation_size_key(timelineid: ZTimelineId, rel: RelishTag) -> ObjectKey { ObjectKey { timeline: timelineid, tag: ObjectTag::RelationMetadata(rel), diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index 50c3f95a4a..f8fcc07793 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -1,7 +1,7 @@ //! Low-level key-value storage abstraction. //! use crate::object_key::*; -use crate::repository::RelTag; +use crate::relish::*; use crate::ZTimelineId; use anyhow::Result; use std::collections::HashSet; @@ -69,6 +69,12 @@ pub trait ObjectStore: Send + Sync { lsn: Lsn, ) -> Result>; + /// Iterate through non-rel relishes + /// + /// This is used to prepare tarball for new node startup. + /// Returns objects in increasing key-version order. + fn list_nonrels<'a>(&'a self, timelineid: ZTimelineId, lsn: Lsn) -> Result>; + /// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated. /// /// This is used to implement GC and preparing tarball for new node startup @@ -76,7 +82,6 @@ pub trait ObjectStore: Send + Sync { fn list_objects<'a>( &'a self, timelineid: ZTimelineId, - nonrel_only: bool, lsn: Lsn, ) -> Result + 'a>>; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2855abbd44..98fe7e6838 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -28,9 +28,9 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; use crate::branches; -use crate::object_key::ObjectTag; use crate::page_cache; -use crate::repository::{BufferTag, Modification, RelTag}; +use crate::relish::*; +use crate::repository::Modification; use crate::walreceiver; use crate::PageServerConf; use crate::ZTenantId; @@ -206,12 +206,13 @@ impl PageServerHandler { let response = match zenith_fe_msg { PagestreamFeMessage::Exists(req) => { - let tag = RelTag { + let rel = RelTag { spcnode: req.spcnode, dbnode: req.dbnode, relnode: req.relnode, forknum: req.forknum, }; + let tag = RelishTag::Relation(rel); let exist = timeline.get_rel_exists(tag, req.lsn).unwrap_or(false); @@ -221,29 +222,28 @@ impl PageServerHandler { }) } PagestreamFeMessage::Nblocks(req) => { - let tag = RelTag { + let rel = RelTag { spcnode: req.spcnode, dbnode: req.dbnode, relnode: req.relnode, forknum: req.forknum, }; + let tag = RelishTag::Relation(rel); let n_blocks = timeline.get_rel_size(tag, req.lsn).unwrap_or(0); PagestreamBeMessage::Nblocks(PagestreamStatusResponse { ok: true, n_blocks }) } PagestreamFeMessage::Read(req) => { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: RelTag { - spcnode: req.spcnode, - dbnode: req.dbnode, - relnode: req.relnode, - forknum: req.forknum, - }, - blknum: req.blkno, - }); + let rel = RelTag { + spcnode: req.spcnode, + dbnode: req.dbnode, + relnode: req.relnode, + forknum: req.forknum, + }; + let tag = RelishTag::Relation(rel); - let read_response = match timeline.get_page_at_lsn(tag, req.lsn) { + let read_response = match timeline.get_page_at_lsn(tag, req.blkno, req.lsn) { Ok(p) => PagestreamReadResponse { ok: true, n_blocks: 0, @@ -431,11 +431,7 @@ impl postgres_backend::Handler for PageServerHandler { let modification = Modification::des(&bytes)?; last_lsn = modification.lsn; - timeline.put_raw_data( - modification.tag, - last_lsn, - &modification.data[..], - )?; + timeline.put_raw_data(modification.tag, modification.lsn, &modification.data)?; } FeMessage::CopyDone => { timeline.advance_last_valid_lsn(last_lsn); diff --git a/pageserver/src/relish.rs b/pageserver/src/relish.rs new file mode 100644 index 0000000000..782388871b --- /dev/null +++ b/pageserver/src/relish.rs @@ -0,0 +1,218 @@ +//! +//! Zenith stores PostgreSQL relations, and some other files, in the +//! repository. The relations (i.e. tables and indexes) take up most +//! of the space in a typical installation, while the other files are +//! small. We call each relation and other file that is stored in the +//! repository a "relish". It comes from "rel"-ish, as in "kind of a +//! rel", because it covers relations as well as other things that are +//! not relations, but are treated similarly for the purposes of the +//! storage layer. +//! +//! This source file contains the definition of the RelishTag struct, +//! which uniquely identifies a relish. +//! +//! Relishes come in two flavors: blocky and non-blocky. Relations and +//! SLRUs are blocky, that is, they are divided into 8k blocks, and +//! the repository tracks their size. Other relishes are non-blocky: +//! the content of the whole relish is stored as one blob. Block +//! number must be passed as 0 for all operations on a non-blocky +//! relish. The one "block" that you store in a non-blocky relish can +//! have arbitrary size, but they are expected to be small, or you +//! will have performance issues. +//! +//! All relishes are versioned by LSN in the repository. +//! + +use serde::{Deserialize, Serialize}; +use std::fmt; + +use postgres_ffi::relfile_utils::forknumber_to_name; +use postgres_ffi::{Oid, TransactionId}; + +/// +/// RelishTag identifies one relish. +/// +#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum RelishTag { + // Relations correspond to PostgreSQL relation forks. Each + // PostgreSQL relation fork is considered a separate relish. + Relation(RelTag), + + // SLRUs include pg_clog, pg_multixact/members, and + // pg_multixact/offsets. There are other SLRUs in PostgreSQL, but + // they don't need to be stored permanently (e.g. pg_subtrans), + // or we do not support them in zenith yet (pg_commit_ts). + // + // These are currently never requested directly by the compute + // nodes, although in principle that would be possible. However, + // when a new compute node is created, these are included in the + // tarball that we send to the compute node to initialize the + // PostgreSQL data directory. + // + // Each SLRU segment in PostgreSQL is considered a separate + // relish. For example, pg_clog/0000, pg_clog/0001, and so forth. + // + // SLRU segments are divided into blocks, like relations. + Slru { + slru: SlruKind, + segno: u32, + }, + + // Miscellaneous other files that need to be included in the + // tarball at compute node creation. These are non-blocky, and are + // expected to be small. + + // + // FileNodeMap represents PostgreSQL's 'pg_filenode.map' + // files. They are needed to map catalog table OIDs to filenode + // numbers. Usually the mapping is done by looking up a relation's + // 'relfilenode' field in the 'pg_class' system table, but that + // doesn't work for 'pg_class' itself and a few other such system + // relations. See PostgreSQL relmapper.c for details. + // + // Each database has a map file for its local mapped catalogs, + // and there is a separate map file for shared catalogs. + // + // These files are always 512 bytes long (although we don't check + // or care about that in the page server). + // + FileNodeMap { + spcnode: Oid, + dbnode: Oid, + }, + + // + // State files for prepared transactions (e.g pg_twophase/1234) + // + TwoPhase { + xid: TransactionId, + }, + + // The control file, stored in global/pg_control + ControlFile, + + // Special entry that represents PostgreSQL checkpoint. It doesn't + // correspond to to any physical file in PostgreSQL, but we use it + // to track fields needed to restore the checkpoint data in the + // control file, when a compute node is created. + Checkpoint, +} + +impl RelishTag { + pub const fn is_blocky(&self) -> bool { + match self { + // These relishes work with blocks + RelishTag::Relation(_) | RelishTag::Slru { slru: _, segno: _ } => true, + + // and these don't + RelishTag::FileNodeMap { + spcnode: _, + dbnode: _, + } + | RelishTag::TwoPhase { xid: _ } + | RelishTag::ControlFile + | RelishTag::Checkpoint => false, + } + } +} + +/// +/// Relation data file segment id throughout the Postgres cluster. +/// +/// Every data file in Postgres is uniquely identified by 4 numbers: +/// - relation id / node (`relnode`) +/// - database id (`dbnode`) +/// - tablespace id (`spcnode`), in short this is a unique id of a separate +/// directory to store data files. +/// - forknumber (`forknum`) is used to split different kinds of data of the same relation +/// between some set of files (`relnode`, `relnode_fsm`, `relnode_vm`). +/// +/// In native Postgres code `RelFileNode` structure and individual `ForkNumber` value +/// are used for the same purpose. +/// [See more related comments here](https:///github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/relfilenode.h#L57). +/// +#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy, Serialize, Deserialize)] +pub struct RelTag { + pub forknum: u8, + pub spcnode: Oid, + pub dbnode: Oid, + pub relnode: Oid, +} + +/// Display RelTag in the same format that's used in most PostgreSQL debug messages: +/// +/// //[_fsm|_vm|_init] +/// +impl fmt::Display for RelTag { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if let Some(forkname) = forknumber_to_name(self.forknum) { + write!( + f, + "{}/{}/{}_{}", + self.spcnode, self.dbnode, self.relnode, forkname + ) + } else { + write!(f, "{}/{}/{}", self.spcnode, self.dbnode, self.relnode) + } + } +} + +/// Display RelTag in the same format that's used in most PostgreSQL debug messages: +/// +/// //[_fsm|_vm|_init] +/// +impl fmt::Display for RelishTag { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + RelishTag::Relation(rel) => rel.fmt(f), + RelishTag::Slru { slru, segno } => { + // e.g. pg_clog/0001 + write!(f, "{}/{:04X}", slru.to_str(), segno) + } + RelishTag::FileNodeMap { spcnode, dbnode } => { + write!(f, "relmapper file for spc {} db {}", spcnode, dbnode) + } + RelishTag::TwoPhase { xid } => { + write!(f, "pg_twophase/{:08X}", xid) + } + RelishTag::ControlFile => { + write!(f, "control file") + } + RelishTag::Checkpoint => { + write!(f, "checkpoint") + } + } + } +} + +/// +/// Non-relation transaction status files (clog (a.k.a. pg_xact) and +/// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer, +/// hence the name. +/// +/// These files are global for a postgres instance. +/// +/// These files are divided into segments, which are divided into +/// pages of the same BLCKSZ as used for relation files. +/// +#[derive(Debug, Clone, Copy, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum SlruKind { + Clog, + MultiXactMembers, + MultiXactOffsets, +} + +impl SlruKind { + fn to_str(&self) -> &'static str { + match self { + Self::Clog => "pg_xact", + Self::MultiXactMembers => "pg_multixact/members", + Self::MultiXactOffsets => "pg_multixact/offsets", + } + } +} + +pub const FIRST_NONREL_RELISH_TAG: RelishTag = RelishTag::Slru { + slru: SlruKind::Clog, + segno: 0, +}; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 2b809ef563..80dd2d5d8a 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,14 +1,13 @@ use crate::object_key::*; -use crate::waldecoder::TransactionId; +use crate::relish::*; use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; use postgres_ffi::nonrelfile_utils::transaction_id_get_status; use postgres_ffi::pg_constants; -use postgres_ffi::relfile_utils::forknumber_to_name; +use postgres_ffi::TransactionId; use serde::{Deserialize, Serialize}; use std::collections::HashSet; -use std::fmt; use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; @@ -57,22 +56,22 @@ pub trait Timeline: Send + Sync { //------------------------------------------------------------------------------ /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: ObjectTag, lsn: Lsn) -> Result; + fn get_page_at_lsn(&self, tag: RelishTag, blknum: u32, lsn: Lsn) -> Result; /// Look up given page in the cache. - fn get_page_at_lsn_nowait(&self, tag: ObjectTag, lsn: Lsn) -> Result; + fn get_page_at_lsn_nowait(&self, tag: RelishTag, blknum: u32, lsn: Lsn) -> Result; /// Get size of relation - fn get_rel_size(&self, tag: RelTag, lsn: Lsn) -> Result; + fn get_rel_size(&self, tag: RelishTag, lsn: Lsn) -> Result; /// Does relation exist? - fn get_rel_exists(&self, tag: RelTag, lsn: Lsn) -> Result; + fn get_rel_exists(&self, tag: RelishTag, lsn: Lsn) -> Result; /// Get a list of all distinct relations in given tablespace and database. fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result>; /// Get a list of non-relational objects - fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result + 'a>>; + fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result>; //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. @@ -84,24 +83,27 @@ pub trait Timeline: Send + Sync { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()>; + fn put_wal_record(&self, tag: RelishTag, blknum: u32, rec: WALRecord) -> Result<()>; + + /// Like put_wal_record, but with ready-made image of the page. + fn put_page_image( + &self, + tag: RelishTag, + blknum: u32, + lsn: Lsn, + img: Bytes, + update_meta: bool, + ) -> Result<()>; + + /// Truncate relation + fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()>; + + /// Unlink relation. This method is used for marking dropped relations. + fn put_unlink(&self, tag: RelishTag, lsn: Lsn) -> Result<()>; /// Put raw data fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>; - /// Like put_wal_record, but with ready-made image of the page. - fn put_page_image(&self, tag: ObjectTag, lsn: Lsn, img: Bytes, update_meta: bool) - -> Result<()>; - - /// Truncate relation - fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; - - /// Unlink relation. This method is used for marking dropped relations. - fn put_unlink(&self, tag: RelTag, lsn: Lsn) -> Result<()>; - - /// Truncate SLRU segment - fn put_slru_truncate(&self, tag: ObjectTag, lsn: Lsn) -> Result<()>; - // Get object tag greater or equal than specified fn get_next_tag(&self, tag: ObjectTag) -> Result>; @@ -156,9 +158,18 @@ pub trait Timeline: Send + Sync { // Check transaction status fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result { - let blknum = xid / pg_constants::CLOG_XACTS_PER_PAGE; - let tag = ObjectTag::Clog(SlruBufferTag { blknum }); - let clog_page = self.get_page_at_lsn(tag, lsn)?; + let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + + let clog_page = self.get_page_at_lsn( + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + lsn, + )?; let status = transaction_id_get_status(xid, &clog_page[..]); Ok(status) } @@ -198,76 +209,6 @@ pub struct RepositoryStats { pub num_getpage_requests: Lsn, } -/// -/// Relation data file segment id throughout the Postgres cluster. -/// -/// Every data file in Postgres is uniquely identified by 4 numbers: -/// - relation id / node (`relnode`) -/// - database id (`dbnode`) -/// - tablespace id (`spcnode`), in short this is a unique id of a separate -/// directory to store data files. -/// - forknumber (`forknum`) is used to split different kinds of data of the same relation -/// between some set of files (`relnode`, `relnode_fsm`, `relnode_vm`). -/// -/// In native Postgres code `RelFileNode` structure and individual `ForkNumber` value -/// are used for the same purpose. -/// [See more related comments here](https:///github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/relfilenode.h#L57). -/// -#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy, Serialize, Deserialize)] -pub struct RelTag { - pub forknum: u8, - pub spcnode: u32, - pub dbnode: u32, - pub relnode: u32, -} - -impl RelTag { - pub const ZEROED: Self = Self { - forknum: 0, - spcnode: 0, - dbnode: 0, - relnode: 0, - }; -} - -/// Display RelTag in the same format that's used in most PostgreSQL debug messages: -/// -/// //[_fsm|_vm|_init] -/// -impl fmt::Display for RelTag { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if let Some(forkname) = forknumber_to_name(self.forknum) { - write!( - f, - "{}/{}/{}_{}", - self.spcnode, self.dbnode, self.relnode, forkname - ) - } else { - write!(f, "{}/{}/{}", self.spcnode, self.dbnode, self.relnode) - } - } -} - -/// -/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. -/// This is used as a part of the key inside key-value storage (RocksDB currently). -/// -/// In Postgres `BufferTag` structure is used for exactly the same purpose. -/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91). -/// -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)] -pub struct BufferTag { - pub rel: RelTag, - pub blknum: u32, -} - -impl BufferTag { - pub const ZEROED: Self = Self { - rel: RelTag::ZEROED, - blknum: 0, - }; -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record @@ -321,28 +262,18 @@ mod tests { use zenith_utils::bin_ser::BeSer; /// Arbitrary relation tag, for testing. - const TESTREL_A: RelTag = RelTag { + const TESTREL_A: RelishTag = RelishTag::Relation(RelTag { spcnode: 0, dbnode: 111, relnode: 1000, forknum: 0, - }; - const TESTREL_B: RelTag = RelTag { + }); + const TESTREL_B: RelishTag = RelishTag::Relation(RelTag { spcnode: 0, dbnode: 111, relnode: 1001, forknum: 0, - }; - - /// Convenience function to create a BufferTag for testing. - /// Helps to keeps the tests shorter. - #[allow(non_snake_case)] - fn TEST_BUF(blknum: u32) -> ObjectTag { - ObjectTag::RelationBuffer(BufferTag { - rel: TESTREL_A, - blknum, - }) - } + }); /// Convenience function to create a page image with given string as the only content #[allow(non_snake_case)] @@ -396,11 +327,11 @@ mod tests { let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; tline.init_valid_lsn(Lsn(1)); - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; - tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"), true)?; - tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"), true)?; - tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"), true)?; + tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; + tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; + tline.put_page_image(TESTREL_A, 0, Lsn(3), TEST_IMG("foo blk 0 at 3"), true)?; + tline.put_page_image(TESTREL_A, 1, Lsn(4), TEST_IMG("foo blk 1 at 4"), true)?; + tline.put_page_image(TESTREL_A, 2, Lsn(5), TEST_IMG("foo blk 2 at 5"), true)?; tline.advance_last_valid_lsn(Lsn(5)); @@ -414,34 +345,34 @@ mod tests { // Check page contents at each LSN assert_eq!( - tline.get_page_at_lsn(TEST_BUF(0), Lsn(2))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(2))?, TEST_IMG("foo blk 0 at 2") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(0), Lsn(3))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(3))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(0), Lsn(4))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(1), Lsn(4))?, + tline.get_page_at_lsn(TESTREL_A, 1, Lsn(4))?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(0), Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(5))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(1), Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 1, Lsn(5))?, TEST_IMG("foo blk 1 at 4") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 2, Lsn(5))?, TEST_IMG("foo blk 2 at 5") ); @@ -452,18 +383,18 @@ mod tests { // Check reported size and contents after truncation assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(6))?, 2); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(0), Lsn(6))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(6))?, TEST_IMG("foo blk 0 at 3") ); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(1), Lsn(6))?, + tline.get_page_at_lsn(TESTREL_A, 1, Lsn(6))?, TEST_IMG("foo blk 1 at 4") ); // should still see the truncated block with older LSN assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(5))?, 3); assert_eq!( - tline.get_page_at_lsn(TEST_BUF(2), Lsn(5))?, + tline.get_page_at_lsn(TESTREL_A, 2, Lsn(5))?, TEST_IMG("foo blk 2 at 5") ); @@ -484,10 +415,10 @@ mod tests { tline.init_valid_lsn(Lsn(1)); let mut lsn = 0; - for i in 0..pg_constants::RELSEG_SIZE + 1 { - let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn))); + for blknum in 0..pg_constants::RELSEG_SIZE + 1 { + let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn))); lsn += 1; - tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img, true)?; + tline.put_page_image(TESTREL_A, blknum as u32, Lsn(lsn), img, true)?; } tline.advance_last_valid_lsn(Lsn(lsn)); @@ -540,16 +471,12 @@ mod tests { // Create a relation on the timeline tline.init_valid_lsn(Lsn(1)); - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; - tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"), true)?; - tline.put_page_image(TEST_BUF(0), Lsn(4), TEST_IMG("foo blk 0 at 4"), true)?; + tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; + tline.put_page_image(TESTREL_A, 0, Lsn(3), TEST_IMG("foo blk 0 at 3"), true)?; + tline.put_page_image(TESTREL_A, 0, Lsn(4), TEST_IMG("foo blk 0 at 4"), true)?; // Create another relation - let buftag2 = ObjectTag::RelationBuffer(BufferTag { - rel: TESTREL_B, - blknum: 0, - }); - tline.put_page_image(buftag2, Lsn(2), TEST_IMG("foobar blk 0 at 2"), true)?; + tline.put_page_image(TESTREL_B, 0, Lsn(2), TEST_IMG("foobar blk 0 at 2"), true)?; tline.advance_last_valid_lsn(Lsn(4)); @@ -558,22 +485,22 @@ mod tests { repo.branch_timeline(timelineid, newtimelineid, Lsn(3))?; let newtline = repo.get_timeline(newtimelineid)?; - newtline.put_page_image(TEST_BUF(0), Lsn(4), TEST_IMG("bar blk 0 at 4"), true)?; + newtline.put_page_image(TESTREL_A, 0, Lsn(4), TEST_IMG("bar blk 0 at 4"), true)?; newtline.advance_last_valid_lsn(Lsn(4)); // Check page contents on both branches assert_eq!( - tline.get_page_at_lsn(TEST_BUF(0), Lsn(4))?, + tline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?, TEST_IMG("foo blk 0 at 4") ); assert_eq!( - newtline.get_page_at_lsn(TEST_BUF(0), Lsn(4))?, + newtline.get_page_at_lsn(TESTREL_A, 0, Lsn(4))?, TEST_IMG("bar blk 0 at 4") ); assert_eq!( - newtline.get_page_at_lsn(buftag2, Lsn(4))?, + newtline.get_page_at_lsn(TESTREL_B, 0, Lsn(4))?, TEST_IMG("foobar blk 0 at 2") ); @@ -595,13 +522,11 @@ mod tests { // add a page and advance the last valid LSN let rel = TESTREL_A; - let tag = TEST_BUF(1); - - tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"), true)?; + tline.put_page_image(rel, 1, Lsn(1), TEST_IMG("blk 1 @ lsn 1"), true)?; tline.advance_last_valid_lsn(Lsn(1)); let expected_page = Modification { - tag, + tag: ObjectTag::Buffer(rel, 1), lsn: Lsn(1), data: ObjectValue::ser(&ObjectValue::Page(PageEntry::Page(TEST_IMG( "blk 1 @ lsn 1", @@ -665,14 +590,16 @@ mod tests { impl WalRedoManager for TestRedoManager { fn request_redo( &self, - tag: ObjectTag, + rel: RelishTag, + blknum: u32, lsn: Lsn, base_img: Option, records: Vec, ) -> Result { let s = format!( - "redo for {:?} to get to {}, with {} and {} records", - tag, + "redo for {} blk {} to get to {}, with {} and {} records", + rel, + blknum, lsn, if base_img.is_some() { "base image" diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 140172272a..ad9eee38db 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -14,11 +14,12 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use bytes::{Buf, Bytes}; -use crate::object_key::*; +use crate::relish::*; use crate::repository::*; use crate::waldecoder::*; use postgres_ffi::relfile_utils::*; use postgres_ffi::xlog_utils::*; +use postgres_ffi::Oid; use postgres_ffi::{pg_constants, CheckPoint, ControlFileData}; use zenith_utils::lsn::Lsn; @@ -43,21 +44,21 @@ pub fn import_timeline_from_postgres_datadir( // These special files appear in the snapshot, but are not needed by the page server Some("pg_control") => { - import_nonrel_file(timeline, lsn, ObjectTag::ControlFile, &direntry.path())?; + import_nonrel_file(timeline, lsn, RelishTag::ControlFile, &direntry.path())?; // Extract checkpoint record from pg_control and store is as separate object let pg_control_bytes = - timeline.get_page_at_lsn_nowait(ObjectTag::ControlFile, lsn)?; + timeline.get_page_at_lsn_nowait(RelishTag::ControlFile, 0, lsn)?; let pg_control = ControlFileData::decode(&pg_control_bytes)?; let checkpoint_bytes = pg_control.checkPointCopy.encode(); - timeline.put_page_image(ObjectTag::Checkpoint, lsn, checkpoint_bytes, false)?; + timeline.put_page_image(RelishTag::Checkpoint, 0, lsn, checkpoint_bytes, false)?; } Some("pg_filenode.map") => import_nonrel_file( timeline, lsn, - ObjectTag::FileNodeMap(DatabaseTag { + RelishTag::FileNodeMap { spcnode: pg_constants::GLOBALTABLESPACE_OID, dbnode: 0, - }), + }, &direntry.path(), )?, @@ -94,10 +95,10 @@ pub fn import_timeline_from_postgres_datadir( Some("pg_filenode.map") => import_nonrel_file( timeline, lsn, - ObjectTag::FileNodeMap(DatabaseTag { + RelishTag::FileNodeMap { spcnode: pg_constants::DEFAULTTABLESPACE_OID, dbnode: dboid, - }), + }, &direntry.path(), )?, @@ -114,40 +115,20 @@ pub fn import_timeline_from_postgres_datadir( } for entry in fs::read_dir(path.join("pg_xact"))? { let entry = entry?; - import_slru_file( - timeline, - lsn, - |blknum| ObjectTag::Clog(SlruBufferTag { blknum }), - &entry.path(), - )?; + import_slru_file(timeline, lsn, SlruKind::Clog, &entry.path())?; } for entry in fs::read_dir(path.join("pg_multixact").join("members"))? { let entry = entry?; - import_slru_file( - timeline, - lsn, - |blknum| ObjectTag::MultiXactMembers(SlruBufferTag { blknum }), - &entry.path(), - )?; + import_slru_file(timeline, lsn, SlruKind::MultiXactMembers, &entry.path())?; } for entry in fs::read_dir(path.join("pg_multixact").join("offsets"))? { let entry = entry?; - import_slru_file( - timeline, - lsn, - |blknum| ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }), - &entry.path(), - )?; + import_slru_file(timeline, lsn, SlruKind::MultiXactOffsets, &entry.path())?; } for entry in fs::read_dir(path.join("pg_twophase"))? { let entry = entry?; let xid = u32::from_str_radix(&entry.path().to_str().unwrap(), 16)?; - import_nonrel_file( - timeline, - lsn, - ObjectTag::TwoPhase(PrepareTag { xid }), - &entry.path(), - )?; + import_nonrel_file(timeline, lsn, RelishTag::TwoPhase { xid }, &entry.path())?; } // TODO: Scan pg_tblspc @@ -181,16 +162,14 @@ fn import_relfile( let r = file.read_exact(&mut buf); match r { Ok(_) => { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: RelTag { - spcnode: spcoid, - dbnode: dboid, - relnode, - forknum, - }, - blknum, - }); - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf), true)?; + let rel = RelTag { + spcnode: spcoid, + dbnode: dboid, + relnode, + forknum, + }; + let tag = RelishTag::Relation(rel); + timeline.put_page_image(tag, blknum, lsn, Bytes::copy_from_slice(&buf), true)?; } // TODO: UnexpectedEof is expected @@ -212,10 +191,16 @@ fn import_relfile( Ok(()) } +/// +/// Import a "non-blocky" file into the repository +/// +/// This is used for small files like the control file, twophase files etc. that +/// are just slurped into the repository as one blob. +/// fn import_nonrel_file( timeline: &dyn Timeline, lsn: Lsn, - tag: ObjectTag, + tag: RelishTag, path: &Path, ) -> Result<()> { let mut file = File::open(path)?; @@ -223,31 +208,34 @@ fn import_nonrel_file( // read the whole file file.read_to_end(&mut buffer)?; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buffer[..]), false)?; + info!("importing non-rel file {}", path.display()); + + timeline.put_page_image(tag, 0, lsn, Bytes::copy_from_slice(&buffer[..]), false)?; Ok(()) } -fn import_slru_file( - timeline: &dyn Timeline, - lsn: Lsn, - gen_tag: fn(blknum: u32) -> ObjectTag, - path: &Path, -) -> Result<()> { - // Does it look like a relation file? - +/// +/// Import an SLRU segment file +/// +fn import_slru_file(timeline: &dyn Timeline, lsn: Lsn, slru: SlruKind, path: &Path) -> Result<()> { + // Does it look like an SLRU file? let mut file = File::open(path)?; let mut buf: [u8; 8192] = [0u8; 8192]; let segno = u32::from_str_radix(path.file_name().unwrap().to_str().unwrap(), 16)?; - let mut blknum: u32 = segno * pg_constants::SLRU_PAGES_PER_SEGMENT; + + info!("importing slru file {}", path.display()); + + let mut rpageno = 0; loop { let r = file.read_exact(&mut buf); match r { Ok(_) => { timeline.put_page_image( - gen_tag(blknum), + RelishTag::Slru { slru, segno }, + rpageno, lsn, Bytes::copy_from_slice(&buf), - false, + true, )?; } @@ -264,7 +252,9 @@ fn import_slru_file( } }, }; - blknum += 1; + rpageno += 1; + + // TODO: Check that the file isn't unexpectedly large, not larger than SLRU_PAGES_PER_SEGMENT pages } Ok(()) @@ -279,7 +269,7 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: let mut offset = startpoint.segment_offset(pg_constants::WAL_SEGMENT_SIZE); let mut last_lsn = startpoint; - let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; loop { @@ -343,11 +333,10 @@ pub fn import_timeline_wal(walpath: &Path, timeline: &dyn Timeline, startpoint: info!("reached end of WAL at {}", last_lsn); let checkpoint_bytes = checkpoint.encode(); - timeline.put_page_image(ObjectTag::Checkpoint, last_lsn, checkpoint_bytes, false)?; + timeline.put_page_image(RelishTag::Checkpoint, 0, last_lsn, checkpoint_bytes, false)?; timeline.advance_last_valid_lsn(last_lsn); timeline.checkpoint()?; - Ok(()) } @@ -367,14 +356,11 @@ pub fn save_decoded_record( // Iterate through all the blocks that the record modifies, and // "put" a separate copy of the record for each block. for blk in decoded.blocks.iter() { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: RelTag { - spcnode: blk.rnode_spcnode, - dbnode: blk.rnode_dbnode, - relnode: blk.rnode_relnode, - forknum: blk.forknum as u8, - }, - blknum: blk.blkno, + let tag = RelishTag::Relation(RelTag { + spcnode: blk.rnode_spcnode, + dbnode: blk.rnode_dbnode, + relnode: blk.rnode_relnode, + forknum: blk.forknum as u8, }); let rec = WALRecord { @@ -384,7 +370,7 @@ pub fn save_decoded_record( main_data_offset: decoded.main_data_offset as u32, }; - timeline.put_wal_record(tag, rec)?; + timeline.put_wal_record(tag, blk.blkno, rec)?; } let mut buf = decoded.record.clone(); @@ -407,37 +393,25 @@ pub fn save_decoded_record( } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { trace!("XLOG_TBLSPC_CREATE/DROP is not handled yet"); } else if decoded.xl_rmid == pg_constants::RM_CLOG_ID { - let blknum = buf.get_u32_le(); let info = decoded.xl_info & !pg_constants::XLR_INFO_MASK; - let tag = ObjectTag::Clog(SlruBufferTag { blknum }); if info == pg_constants::CLOG_ZEROPAGE { - let rec = WALRecord { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_page_image( + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, lsn, - will_init: true, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - timeline.put_page_image(tag, lsn, ZERO_PAGE, false)?; + ZERO_PAGE, + true, + )?; } else { assert!(info == pg_constants::CLOG_TRUNCATE); - checkpoint.oldestXid = buf.get_u32_le(); - checkpoint.oldestXidDB = buf.get_u32_le(); - trace!( - "RM_CLOG_ID truncate blkno {} oldestXid {} oldestXidDB {}", - blknum, - checkpoint.oldestXid, - checkpoint.oldestXidDB - ); - if let Some(ObjectTag::Clog(first_slru_tag)) = - timeline.get_next_tag(ObjectTag::Clog(SlruBufferTag { blknum: 0 }))? - { - for trunc_blknum in first_slru_tag.blknum..=blknum { - let tag = ObjectTag::Clog(SlruBufferTag { - blknum: trunc_blknum, - }); - timeline.put_slru_truncate(tag, lsn)?; - } - } + let xlrec = XlClogTruncate::decode(&mut buf); + save_clog_truncate_record(checkpoint, timeline, lsn, &xlrec)?; } } else if decoded.xl_rmid == pg_constants::RM_XACT_ID { let info = decoded.xl_info & pg_constants::XLOG_XACT_OPMASK; @@ -456,30 +430,44 @@ pub fn save_decoded_record( main_data_offset: decoded.main_data_offset as u32, }; timeline.put_wal_record( - ObjectTag::TwoPhase(PrepareTag { + RelishTag::TwoPhase { xid: decoded.xl_xid, - }), + }, + 0, rec, )?; } } else if decoded.xl_rmid == pg_constants::RM_MULTIXACT_ID { let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK; - if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE - || info == pg_constants::XLOG_MULTIXACT_ZERO_MEM_PAGE - { - let blknum = buf.get_u32_le(); - let rec = WALRecord { + + if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_page_image( + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + }, + rpageno, lsn, - will_init: true, - rec: recdata.clone(), - main_data_offset: decoded.main_data_offset as u32, - }; - let tag = if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { - ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }) - } else { - ObjectTag::MultiXactMembers(SlruBufferTag { blknum }) - }; - timeline.put_page_image(tag, lsn, ZERO_PAGE, false)?; + ZERO_PAGE, + true, + )?; + } else if info == pg_constants::XLOG_MULTIXACT_ZERO_OFF_PAGE { + let pageno = buf.get_u32_le(); + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_page_image( + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + }, + rpageno, + lsn, + ZERO_PAGE, + true, + )?; } else if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); save_multixact_create_record(checkpoint, timeline, lsn, &xlrec, decoded)?; @@ -543,7 +531,7 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab assert_eq!(src_rel.spcnode, src_tablespace_id); assert_eq!(src_rel.dbnode, src_db_id); - let nblocks = timeline.get_rel_size(src_rel, req_lsn)?; + let nblocks = timeline.get_rel_size(RelishTag::Relation(src_rel), req_lsn)?; let dst_rel = RelTag { spcnode: tablespace_id, dbnode: db_id, @@ -553,26 +541,18 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab // Copy content for blknum in 0..nblocks { - let src_key = ObjectTag::RelationBuffer(BufferTag { - rel: src_rel, - blknum, - }); - let dst_key = ObjectTag::RelationBuffer(BufferTag { - rel: dst_rel, - blknum, - }); + let content = + timeline.get_page_at_lsn_nowait(RelishTag::Relation(src_rel), blknum, req_lsn)?; - let content = timeline.get_page_at_lsn_nowait(src_key, req_lsn)?; + debug!("copying block {} from {} to {}", blknum, src_rel, dst_rel); - debug!("copying block {:?} to {:?}", src_key, dst_key); - - timeline.put_page_image(dst_key, lsn, content, true)?; + timeline.put_page_image(RelishTag::Relation(dst_rel), blknum, lsn, content, true)?; num_blocks_copied += 1; } if nblocks == 0 { // make sure we have some trace of the relation, even if it's empty - timeline.put_truncation(dst_rel, lsn, 0)?; + timeline.put_truncation(RelishTag::Relation(dst_rel), lsn, 0)?; } num_rels_copied += 1; @@ -580,14 +560,14 @@ fn save_xlog_dbase_create(timeline: &dyn Timeline, lsn: Lsn, rec: &XlCreateDatab // Copy relfilemap for tag in timeline.list_nonrels(req_lsn)? { match tag { - ObjectTag::FileNodeMap(db) => { - if db.spcnode == src_tablespace_id && db.dbnode == src_db_id { - let img = timeline.get_page_at_lsn_nowait(tag, req_lsn)?; - let new_tag = ObjectTag::FileNodeMap(DatabaseTag { + RelishTag::FileNodeMap { spcnode, dbnode } => { + if spcnode == src_tablespace_id && dbnode == src_db_id { + let img = timeline.get_page_at_lsn_nowait(tag, 0, req_lsn)?; + let new_tag = RelishTag::FileNodeMap { spcnode: tablespace_id, dbnode: db_id, - }); - timeline.put_page_image(new_tag, lsn, img, false)?; + }; + timeline.put_page_image(new_tag, 0, lsn, img, false)?; break; } } @@ -616,7 +596,7 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca relnode, forknum: pg_constants::MAIN_FORKNUM, }; - timeline.put_truncation(rel, lsn, rec.blkno)?; + timeline.put_truncation(RelishTag::Relation(rel), lsn, rec.blkno)?; } if (rec.flags & pg_constants::SMGR_TRUNCATE_FSM) != 0 { let rel = RelTag { @@ -639,7 +619,7 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca info!("Partial truncation of FSM is not supported"); } let num_fsm_blocks = 0; - timeline.put_truncation(rel, lsn, num_fsm_blocks)?; + timeline.put_truncation(RelishTag::Relation(rel), lsn, num_fsm_blocks)?; } if (rec.flags & pg_constants::SMGR_TRUNCATE_VM) != 0 { let rel = RelTag { @@ -658,7 +638,7 @@ fn save_xlog_smgr_truncate(timeline: &dyn Timeline, lsn: Lsn, rec: &XlSmgrTrunca info!("Partial truncation of VM is not supported"); } let num_vm_blocks = 0; - timeline.put_truncation(rel, lsn, num_vm_blocks)?; + timeline.put_truncation(RelishTag::Relation(rel), lsn, num_vm_blocks)?; } Ok(()) } @@ -673,38 +653,94 @@ fn save_xact_record( decoded: &DecodedWALRecord, ) -> Result<()> { // Record update of CLOG page - let mut blknum = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; - let tag = ObjectTag::Clog(SlruBufferTag { blknum }); + let mut pageno = parsed.xid / pg_constants::CLOG_XACTS_PER_PAGE; + + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; let rec = WALRecord { lsn, will_init: false, rec: decoded.record.clone(), main_data_offset: decoded.main_data_offset as u32, }; - timeline.put_wal_record(tag, rec.clone())?; + timeline.put_wal_record( + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + rec.clone(), + )?; for subxact in &parsed.subxacts { - let subxact_blknum = subxact / pg_constants::CLOG_XACTS_PER_PAGE; - if subxact_blknum != blknum { - blknum = subxact_blknum; - let tag = ObjectTag::Clog(SlruBufferTag { blknum }); - timeline.put_wal_record(tag, rec.clone())?; + let subxact_pageno = subxact / pg_constants::CLOG_XACTS_PER_PAGE; + if subxact_pageno != pageno { + pageno = subxact_pageno; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + }, + rpageno, + rec.clone(), + )?; } } for xnode in &parsed.xnodes { for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { - let rel_tag = RelTag { + let rel = RelTag { forknum, spcnode: xnode.spcnode, dbnode: xnode.dbnode, relnode: xnode.relnode, }; - timeline.put_unlink(rel_tag, lsn)?; + timeline.put_unlink(RelishTag::Relation(rel), lsn)?; } } Ok(()) } +fn save_clog_truncate_record( + checkpoint: &mut CheckPoint, + _timeline: &dyn Timeline, + _lsn: Lsn, + xlrec: &XlClogTruncate, +) -> Result<()> { + trace!( + "RM_CLOG_ID truncate pageno {} oldestXid {} oldestXidDB {}", + xlrec.pageno, + xlrec.oldest_xid, + xlrec.oldest_xid_db + ); + + checkpoint.oldestXid = xlrec.oldest_xid; + checkpoint.oldestXidDB = xlrec.oldest_xid_db; + + // FIXME: Handle XID wraparound! I just commented this out, + // because it was wrong in a dangerous way. But what this should + // now do is identify the CLOG segments in the repository that are + // older than the threshold in the WAL recor - taking XID + // wraparound into account like the corresponding PostgreSQL code + // does! - and call put_unlink() for the segments that are no + // longer needed. + + /* + if let Some(ObjectTag::Clog(first_slru_tag)) = + timeline.get_next_tag(ObjectTag::Clog(SlruBufferTag { blknum: 0 }))? + { + for trunc_blknum in first_slru_tag.blknum..=pageno { + let tag = ObjectTag::Clog(SlruBufferTag { + blknum: trunc_blknum, + }); + timeline.put_slru_truncate(tag, lsn)?; + } + } + */ + Ok(()) +} + fn save_multixact_create_record( checkpoint: &mut CheckPoint, timeline: &dyn Timeline, @@ -718,31 +754,47 @@ fn save_multixact_create_record( rec: decoded.record.clone(), main_data_offset: decoded.main_data_offset as u32, }; - let blknum = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); - timeline.put_wal_record(tag, rec.clone())?; + let pageno = xlrec.mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + }, + rpageno, + rec.clone(), + )?; - let first_mbr_blkno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_blkno = + let first_mbr_pageno = xlrec.moff / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let last_mbr_pageno = (xlrec.moff + xlrec.nmembers - 1) / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; // The members SLRU can, in contrast to the offsets one, be filled to almost // the full range at once. So we need to handle wraparound. - let mut blknum = first_mbr_blkno; + let mut pageno = first_mbr_pageno; loop { // Update members page - let tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); - timeline.put_wal_record(tag, rec.clone())?; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + timeline.put_wal_record( + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + }, + rpageno, + rec.clone(), + )?; - if blknum == last_mbr_blkno { + if pageno == last_mbr_pageno { // last block inclusive break; } // handle wraparound - if blknum == MAX_MBR_BLKNO { - blknum = 0; + if pageno == MAX_MBR_BLKNO { + pageno = 0; } else { - blknum += 1; + pageno += 1; } } if xlrec.mid >= checkpoint.nextMulti { @@ -762,6 +814,18 @@ fn save_multixact_create_record( Ok(()) } +#[allow(non_upper_case_globals)] +const MaxMultiXactOffset: u32 = 0xFFFFFFFF; + +#[allow(non_snake_case)] +const fn MXOffsetToMemberPage(xid: u32) -> u32 { + xid / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32 +} +#[allow(non_snake_case)] +const fn MXOffsetToMemberSegment(xid: u32) -> i32 { + (MXOffsetToMemberPage(xid) / pg_constants::SLRU_PAGES_PER_SEGMENT) as i32 +} + fn save_multixact_truncate_record( checkpoint: &mut CheckPoint, timeline: &dyn Timeline, @@ -770,31 +834,35 @@ fn save_multixact_truncate_record( ) -> Result<()> { checkpoint.oldestMulti = xlrec.end_trunc_off; checkpoint.oldestMultiDB = xlrec.oldest_multi_db; - let first_off_blkno = xlrec.start_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; - let last_off_blkno = xlrec.end_trunc_off / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32; + + // PerformMembersTruncation + let maxsegment: i32 = MXOffsetToMemberSegment(MaxMultiXactOffset); + let startsegment: i32 = MXOffsetToMemberSegment(xlrec.start_trunc_memb); + let endsegment: i32 = MXOffsetToMemberSegment(xlrec.end_trunc_memb); + let mut segment: i32 = startsegment; + // Delete all the segments except the last one. The last segment can still // contain, possibly partially, valid data. - for blknum in first_off_blkno..last_off_blkno { - let tag = ObjectTag::MultiXactOffsets(SlruBufferTag { blknum }); - timeline.put_slru_truncate(tag, lsn)?; - } - let first_mbr_blkno = xlrec.start_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - let last_mbr_blkno = xlrec.end_trunc_memb / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - // The members SLRU can, in contrast to the offsets one, be filled to almost - // the full range at once. So we need to handle wraparound. - let mut blknum = first_mbr_blkno; - // Delete all the segments but the last one. The last segment can still - // contain, possibly partially, valid data. - while blknum != last_mbr_blkno { - let tag = ObjectTag::MultiXactMembers(SlruBufferTag { blknum }); - timeline.put_slru_truncate(tag, lsn)?; - // handle wraparound - if blknum == MAX_MBR_BLKNO { - blknum = 0; + while segment != endsegment { + timeline.put_unlink( + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno: segment as u32, + }, + lsn, + )?; + + /* move to next segment, handling wraparound correctly */ + if segment == maxsegment { + segment = 0; } else { - blknum += 1; + segment += 1; } } + + // Truncate offsets + // FIXME: this did not handle wraparound correctly + Ok(()) } @@ -810,10 +878,10 @@ fn save_relmap_record( rec: decoded.record.clone(), main_data_offset: decoded.main_data_offset as u32, }; - let tag = ObjectTag::FileNodeMap(DatabaseTag { + let tag = RelishTag::FileNodeMap { spcnode: xlrec.tsid, dbnode: xlrec.dbid, - }); - timeline.put_wal_record(tag, rec)?; + }; + timeline.put_wal_record(tag, 0, rec)?; Ok(()) } diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index f13b478d05..4d2aaf029c 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -3,7 +3,7 @@ //! use crate::object_key::*; use crate::object_store::ObjectStore; -use crate::repository::RelTag; +use crate::relish::*; use crate::PageServerConf; use crate::ZTenantId; use crate::ZTimelineId; @@ -144,10 +144,9 @@ impl ObjectStore for RocksObjectStore { fn list_objects<'a>( &'a self, timeline: ZTimelineId, - nonrel_only: bool, lsn: Lsn, ) -> Result + 'a>> { - let iter = RocksObjectIter::new(&self.db, timeline, nonrel_only, lsn)?; + let iter = RocksObjectIter::new(&self.db, timeline, lsn)?; Ok(Box::new(iter)) } @@ -179,7 +178,7 @@ impl ObjectStore for RocksObjectStore { let search_key = StorageKey { obj_key: ObjectKey { timeline: timelineid, - tag: ObjectTag::RelationMetadata(search_rel_tag), + tag: ObjectTag::RelationMetadata(RelishTag::Relation(search_rel_tag)), }, lsn: Lsn(0), }; @@ -189,7 +188,7 @@ impl ObjectStore for RocksObjectStore { } let key = StorageKey::des(iter.key().unwrap())?; - if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag { + if let ObjectTag::RelationMetadata(RelishTag::Relation(rel_tag)) = key.obj_key.tag { if spcnode != 0 && rel_tag.spcnode != spcnode || dbnode != 0 && rel_tag.dbnode != dbnode { @@ -212,6 +211,48 @@ impl ObjectStore for RocksObjectStore { Ok(rels) } + /// Get a list of all distinct NON-relations in timeline + /// + /// TODO: This implementation is very inefficient, it scans + /// through all non-rel page versions in the system. In practice, this + /// is used when initializing a new compute node, and the non-rel files + /// are never very large nor change very frequently, so this will do for now. + fn list_nonrels(&self, timelineid: ZTimelineId, lsn: Lsn) -> Result> { + let mut rels: HashSet = HashSet::new(); + + let search_key = StorageKey { + obj_key: ObjectKey { + timeline: timelineid, + tag: ObjectTag::Buffer(FIRST_NONREL_RELISH_TAG, 0), + }, + lsn: Lsn(0), + }; + + let mut iter = self.db.raw_iterator(); + iter.seek(search_key.ser()?); + while iter.valid() { + let key = StorageKey::des(iter.key().unwrap())?; + + if key.obj_key.timeline != timelineid { + // reached end of this timeline in the store + break; + } + + if let ObjectTag::Buffer(rel_tag, _blknum) = key.obj_key.tag { + if key.lsn <= lsn { + // visible in this snapshot + rels.insert(rel_tag); + } + } + // TODO: we could skip to next relation here like we do in list_rels(), + // but hopefully there are not that many SLRU segments or other non-rel + // entries for it to matter. + iter.next(); + } + + Ok(rels) + } + /// Iterate through versions of all objects in a timeline. /// /// Returns objects in increasing key-version order. @@ -387,17 +428,11 @@ impl<'r> RocksObjects<'r> { struct RocksObjectIter<'a> { timeline: ZTimelineId, key: StorageKey, - nonrel_only: bool, lsn: Lsn, dbiter: rocksdb::DBRawIterator<'a>, } impl<'a> RocksObjectIter<'a> { - fn new( - db: &'a rocksdb::DB, - timeline: ZTimelineId, - nonrel_only: bool, - lsn: Lsn, - ) -> Result> { + fn new(db: &'a rocksdb::DB, timeline: ZTimelineId, lsn: Lsn) -> Result> { let key = StorageKey { obj_key: ObjectKey { timeline, @@ -409,7 +444,6 @@ impl<'a> RocksObjectIter<'a> { Ok(RocksObjectIter { key, timeline, - nonrel_only, lsn, dbiter, }) @@ -433,15 +467,7 @@ impl<'a> Iterator for RocksObjectIter<'a> { self.key.lsn = Lsn(u64::MAX); // next seek should skip all versions if key.lsn <= self.lsn { // visible in this snapshot - if self.nonrel_only { - match key.obj_key.tag { - ObjectTag::RelationMetadata(_) => return None, - ObjectTag::RelationBuffer(_) => return None, - _ => return Some(key.obj_key.tag), - } - } else { - return Some(key.obj_key.tag); - } + return Some(key.obj_key.tag); } } } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 1801a061c5..10e66dac63 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -9,12 +9,11 @@ use postgres_ffi::xlog_utils::*; use postgres_ffi::XLogLongPageHeaderData; use postgres_ffi::XLogPageHeaderData; use postgres_ffi::XLogRecord; +use postgres_ffi::{Oid, TransactionId}; use std::cmp::min; use thiserror::Error; use zenith_utils::lsn::Lsn; -pub type Oid = u32; -pub type TransactionId = u32; pub type BlockNumber = u32; pub type OffsetNumber = u16; pub type MultiXactId = TransactionId; @@ -496,6 +495,24 @@ impl XlXactParsedRecord { } } +#[repr(C)] +#[derive(Debug)] +pub struct XlClogTruncate { + pub pageno: u32, + pub oldest_xid: TransactionId, + pub oldest_xid_db: Oid, +} + +impl XlClogTruncate { + pub fn decode(buf: &mut Bytes) -> XlClogTruncate { + XlClogTruncate { + pageno: buf.get_u32_le(), + oldest_xid: buf.get_u32_le(), + oldest_xid_db: buf.get_u32_le(), + } + } +} + #[repr(C)] #[derive(Debug)] pub struct MultiXactMember { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index abd3218e86..f3f612bc2f 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -4,8 +4,8 @@ //! //! We keep one WAL receiver active per timeline. -use crate::object_key::*; use crate::page_cache; +use crate::relish::*; use crate::restore_local_repo; use crate::waldecoder::*; use crate::PageServerConf; @@ -171,7 +171,7 @@ fn walreceiver_main( let mut waldecoder = WalStreamDecoder::new(startpoint); - let checkpoint_bytes = timeline.get_page_at_lsn_nowait(ObjectTag::Checkpoint, startpoint)?; + let checkpoint_bytes = timeline.get_page_at_lsn_nowait(RelishTag::Checkpoint, 0, startpoint)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value); @@ -215,7 +215,8 @@ fn walreceiver_main( // Check if checkpoint data was updated by save_decoded_record if new_checkpoint_bytes != old_checkpoint_bytes { timeline.put_page_image( - ObjectTag::Checkpoint, + RelishTag::Checkpoint, + 0, lsn, new_checkpoint_bytes, false, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 4a2820aeba..dd06266a12 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -18,6 +18,7 @@ use byteorder::{ByteOrder, LittleEndian}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::fs; use std::fs::OpenOptions; @@ -36,8 +37,7 @@ use tokio::time::timeout; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; -use crate::object_key::*; -use crate::repository::BufferTag; +use crate::relish::*; use crate::repository::WALRecord; use crate::waldecoder::XlXactParsedRecord; use crate::waldecoder::{MultiXactId, XlMultiXactCreate}; @@ -47,6 +47,19 @@ use postgres_ffi::nonrelfile_utils::transaction_id_set_status; use postgres_ffi::pg_constants; use postgres_ffi::XLogRecord; +/// +/// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster. +/// This is used as a part of the key inside key-value storage (RocksDB currently). +/// +/// In Postgres `BufferTag` structure is used for exactly the same purpose. +/// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91). +/// +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize, Deserialize)] +pub struct BufferTag { + pub rel: RelTag, + pub blknum: u32, +} + /// /// WAL Redo Manager is responsible for replaying WAL records. /// @@ -60,7 +73,8 @@ pub trait WalRedoManager: Send + Sync { /// the reords. fn request_redo( &self, - tag: ObjectTag, + rel: RelishTag, + blknum: u32, lsn: Lsn, base_img: Option, records: Vec, @@ -76,7 +90,8 @@ pub struct DummyRedoManager {} impl crate::walredo::WalRedoManager for DummyRedoManager { fn request_redo( &self, - _tag: ObjectTag, + _rel: RelishTag, + _blknum: u32, _lsn: Lsn, _base_img: Option, _records: Vec, @@ -107,7 +122,8 @@ struct PostgresRedoManagerInternal { #[derive(Debug)] struct WalRedoRequest { - tag: ObjectTag, + rel: RelishTag, + blknum: u32, lsn: Lsn, base_img: Option, @@ -173,7 +189,8 @@ impl WalRedoManager for PostgresRedoManager { /// fn request_redo( &self, - tag: ObjectTag, + rel: RelishTag, + blknum: u32, lsn: Lsn, base_img: Option, records: Vec, @@ -182,7 +199,8 @@ impl WalRedoManager for PostgresRedoManager { let (tx, rx) = mpsc::channel::>(); let request = WalRedoRequest { - tag, + rel, + blknum, lsn, base_img, records, @@ -274,7 +292,8 @@ impl PostgresRedoManagerInternal { process: &PostgresRedoProcess, request: &WalRedoRequest, ) -> Result { - let tag = request.tag; + let rel = request.rel; + let blknum = request.blknum; let lsn = request.lsn; let base_img = request.base_img.clone(); let records = &request.records; @@ -284,11 +303,11 @@ impl PostgresRedoManagerInternal { let start = Instant::now(); let apply_result: Result; - if let ObjectTag::RelationBuffer(buf_tag) = tag { + if let RelishTag::Relation(rel) = rel { // Relational WAL records are applied using wal-redo-postgres + let buf_tag = BufferTag { rel, blknum }; apply_result = process.apply_wal_records(buf_tag, base_img, records).await; } else { - // Non-relational WAL records we apply ourselves. const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; let mut page = BytesMut::new(); if let Some(fpi) = base_img { @@ -317,16 +336,21 @@ impl PostgresRedoManagerInternal { if xlogrec.xl_rmid == pg_constants::RM_XACT_ID { // Transaction manager stuff let info = xlogrec.xl_info & pg_constants::XLOG_XACT_OPMASK; - let tag_blknum = match tag { - ObjectTag::Clog(slru) => slru.blknum, - ObjectTag::TwoPhase(_) => { + let rec_segno = match rel { + RelishTag::Slru { slru, segno } => { + if slru != SlruKind::Clog { + panic!("Not valid XACT relish tag {:?}", rel); + } + segno + } + RelishTag::TwoPhase { xid: _ } => { assert!(info == pg_constants::XLOG_XACT_PREPARE); trace!("Apply prepare {} record", xlogrec.xl_xid); page.clear(); page.extend_from_slice(&buf[..]); continue; } - _ => panic!("Not valid XACT object tag {:?}", tag), + _ => panic!("Not valid XACT relish tag {:?}", rel), }; let parsed_xact = XlXactParsedRecord::decode(&mut buf, xlogrec.xl_xid, xlogrec.xl_info); @@ -339,9 +363,11 @@ impl PostgresRedoManagerInternal { &mut page, ); for subxact in &parsed_xact.subxacts { - let blkno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; // only update xids on the requested page - if tag_blknum == blkno { + if rec_segno == segno && blknum == rpageno { transaction_id_set_status( *subxact, pg_constants::TRANSACTION_STATUS_SUB_COMMITTED, @@ -358,9 +384,11 @@ impl PostgresRedoManagerInternal { &mut page, ); for subxact in &parsed_xact.subxacts { - let blkno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let pageno = *subxact as u32 / pg_constants::CLOG_XACTS_PER_PAGE; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; // only update xids on the requested page - if tag_blknum == blkno { + if rec_segno == segno && blknum == rpageno { transaction_id_set_status( *subxact, pg_constants::TRANSACTION_STATUS_ABORTED, @@ -374,36 +402,49 @@ impl PostgresRedoManagerInternal { let info = xlogrec.xl_info & pg_constants::XLR_RMGR_INFO_MASK; if info == pg_constants::XLOG_MULTIXACT_CREATE_ID { let xlrec = XlMultiXactCreate::decode(&mut buf); - if let ObjectTag::MultiXactMembers(slru) = tag { - for i in 0..xlrec.nmembers { - let blkno = i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; - if blkno == slru.blknum { - // update only target block - let offset = xlrec.moff + i; - let memberoff = mx_offset_to_member_offset(offset); - let flagsoff = mx_offset_to_flags_offset(offset); - let bshift = mx_offset_to_flags_bitshift(offset); - let mut flagsval = - LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); - flagsval &= - !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) + if let RelishTag::Slru { + slru, + segno: rec_segno, + } = rel + { + if slru == SlruKind::MultiXactMembers { + for i in 0..xlrec.nmembers { + let pageno = + i / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32; + let segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; + let rpageno = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; + if segno == rec_segno && rpageno == blknum { + // update only target block + let offset = xlrec.moff + i; + let memberoff = mx_offset_to_member_offset(offset); + let flagsoff = mx_offset_to_flags_offset(offset); + let bshift = mx_offset_to_flags_bitshift(offset); + let mut flagsval = + LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]); + flagsval &= !(((1 + << pg_constants::MXACT_MEMBER_BITS_PER_XACT) + - 1) << bshift); - flagsval |= xlrec.members[i as usize].status << bshift; - LittleEndian::write_u32( - &mut page[flagsoff..flagsoff + 4], - flagsval, - ); - LittleEndian::write_u32( - &mut page[memberoff..memberoff + 4], - xlrec.members[i as usize].xid, - ); + flagsval |= xlrec.members[i as usize].status << bshift; + LittleEndian::write_u32( + &mut page[flagsoff..flagsoff + 4], + flagsval, + ); + LittleEndian::write_u32( + &mut page[memberoff..memberoff + 4], + xlrec.members[i as usize].xid, + ); + } } + } else { + // Multixact offsets SLRU + let offs = (xlrec.mid + % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 + * 4) as usize; + LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); } } else { - // Multixact offsets SLRU - let offs = (xlrec.mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32 - * 4) as usize; - LittleEndian::write_u32(&mut page[offs..offs + 4], xlrec.moff); + panic!(); } } else { panic!(); diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index 12076a8418..6bd70525b1 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -48,8 +48,8 @@ def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 30 - assert row['deleted'] == 3 + assert row['truncated'] == 31 + assert row['deleted'] == 4 # Insert two more rows and run GC. print("Inserting two more rows and running GC") @@ -61,8 +61,8 @@ def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 30 - assert row['deleted'] == 2 + assert row['truncated'] == 31 + assert row['deleted'] == 4 # Insert one more row. It creates one more page version, but doesn't affect the # relation size. @@ -74,8 +74,8 @@ def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 30 - assert row['deleted'] == 1 + assert row['truncated'] == 31 + assert row['deleted'] == 2 # Run GC again, with no changes in the database. Should not remove anything. pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") @@ -83,7 +83,7 @@ def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) assert row['n_relations'] == n_relations assert row['dropped'] == 0 - assert row['truncated'] == 30 + assert row['truncated'] == 31 assert row['deleted'] == 0 #