diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 93563a0f7b..1666a65984 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -8,6 +8,7 @@ use std::time::Duration; pub mod basebackup; pub mod branches; +pub mod object_key; pub mod object_repository; pub mod object_store; pub mod page_cache; diff --git a/pageserver/src/object_key.rs b/pageserver/src/object_key.rs new file mode 100644 index 0000000000..0ae8291825 --- /dev/null +++ b/pageserver/src/object_key.rs @@ -0,0 +1,27 @@ +use crate::repository::{BufferTag, RelTag}; +use crate::ZTimelineId; +use serde::{Deserialize, Serialize}; + +/// +/// ObjectKey is the key type used to identify objects stored in an object +/// repository. It is shared between object_repository.rs and object_store.rs. +/// It is mostly opaque to ObjectStore, it just stores and retrieves objects +/// using the key given by the caller. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObjectKey { + pub timeline: ZTimelineId, + pub tag: ObjectTag, +} + +/// ObjectTag is a part of ObjectKey that is specific to the type of +/// the stored object. +/// +/// NB: the order of the enum values is significant! In particular, +/// rocksdb_storage.rs assumes that TimelineMetadataTag is first +/// +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub enum ObjectTag { + TimelineMetadataTag, + RelationMetadata(RelTag), + RelationBuffer(BufferTag), +} diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 4b7ec67e4e..727040d5cf 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -13,7 +13,8 @@ //! until we find the page we're looking for, making a separate lookup into the //! key-value store for each timeline. -use crate::object_store::{ObjectKey, ObjectStore}; +use crate::object_key::*; +use crate::object_store::ObjectStore; use crate::repository::*; use crate::restore_local_repo::import_timeline_wal; use crate::walredo::WalRedoManager; @@ -21,7 +22,6 @@ use crate::{PageServerConf, ZTimelineId}; use anyhow::{bail, Context, Result}; use bytes::Bytes; use log::*; -use postgres_ffi::pg_constants; use serde::{Deserialize, Serialize}; use std::cmp::max; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -329,7 +329,7 @@ impl Timeline for ObjectTimeline { let lsn = rec.lsn; let key = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag: ObjectTag::RelationBuffer(tag), }; let val = PageEntry::WALRecord(rec); @@ -376,7 +376,7 @@ impl Timeline for ObjectTimeline { fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> { let key = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag: ObjectTag::RelationBuffer(tag), }; let val = PageEntry::Page(img); @@ -540,7 +540,7 @@ impl ObjectTimeline { // ask the WAL redo service to reconstruct the page image from the WAL records. let searchkey = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag: ObjectTag::RelationBuffer(tag), }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; @@ -642,7 +642,7 @@ impl ObjectTimeline { // old page image. let searchkey = ObjectKey { timeline: self.timelineid, - buf_tag: tag, + tag: ObjectTag::RelationBuffer(tag), }; let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; while let Some((_key, value)) = iter.next().transpose()? { @@ -690,8 +690,9 @@ impl ObjectTimeline { // Iterate through all relations for rels in &self.obj_store.list_rels(self.timelineid, 0, 0, last_lsn)? { + let rel = *rels; let mut last_version = true; - let mut key = relation_size_key(self.timelineid, *rels); + let key = relation_size_key(self.timelineid, rel); let mut max_size = 0u32; let mut relation_dropped = false; @@ -722,7 +723,14 @@ impl ObjectTimeline { } // Now process all relation blocks for blknum in 0..max_size { - key.buf_tag.blknum = blknum; + let buf_tag = BufferTag { + rel, + blknum, + }; + let key = ObjectKey { + timeline: self.timelineid, + tag: ObjectTag::RelationBuffer(buf_tag), + }; last_version = true; for vers in self.obj_store.object_versions(&key, horizon)? { let lsn = vers.0; @@ -731,7 +739,7 @@ impl ObjectTimeline { truncated += 1; if !relation_dropped { // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(key.buf_tag, lsn)?; + self.get_page_at_lsn_nowait(buf_tag, lsn)?; continue; } } @@ -797,7 +805,7 @@ impl ObjectTimeline { Ok(ObjectVersionIter { obj_store, - buf_tag: key.buf_tag, + object_tag: key.tag, current_iter, ancestor_timeline: self.ancestor_timeline, ancestor_lsn: self.ancestor_lsn, @@ -806,9 +814,9 @@ impl ObjectTimeline { } struct ObjectHistory<'a> { - iter: Box)>> + 'a>, + iter: Box)>> + 'a>, lsn: Lsn, - last_relation_size: Option<(BufferTag, u32)>, + last_relation_size: Option<(RelTag, u32)>, } impl<'a> Iterator for ObjectHistory<'a> { @@ -828,16 +836,16 @@ impl<'a> History for ObjectHistory<'a> { impl<'a> ObjectHistory<'a> { fn handle_relation_size( &mut self, - buf_tag: BufferTag, + rel_tag: RelTag, entry: RelationSizeEntry, ) -> Option { match entry { RelationSizeEntry::Size(size) => { // we only want to output truncations, expansions are filtered out - let last_relation_size = self.last_relation_size.replace((buf_tag, size)); + let last_relation_size = self.last_relation_size.replace((rel_tag, size)); match last_relation_size { - Some((last_buf, last_size)) if last_buf != buf_tag || size < last_size => { + Some((last_buf, last_size)) if last_buf != rel_tag || size < last_size => { Some(Update::Truncate { n_blocks: size }) } _ => None, @@ -861,24 +869,26 @@ impl<'a> ObjectHistory<'a> { } fn next_result(&mut self) -> Result> { - while let Some((buf_tag, lsn, value)) = self.iter.next().transpose()? { - if buf_tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM { - continue; - } + while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? { - let update = if buf_tag.blknum == RELATION_SIZE_BLKNUM { - let entry = RelationSizeEntry::des(&value)?; - match self.handle_relation_size(buf_tag, entry) { - Some(relation_update) => relation_update, - None => continue, + let (rel_tag, update) = match object_tag { + ObjectTag::TimelineMetadataTag => continue, + ObjectTag::RelationMetadata(rel_tag) => { + let entry = RelationSizeEntry::des(&value)?; + match self.handle_relation_size(rel_tag, entry) { + Some(relation_update) => (rel_tag, relation_update), + None => continue, + } + }, + ObjectTag::RelationBuffer(buf_tag) => { + let entry = PageEntry::des(&value)?; + let update = self.handle_page(buf_tag, entry); + (buf_tag.rel, update) } - } else { - let entry = PageEntry::des(&value)?; - self.handle_page(buf_tag, entry) }; return Ok(Some(RelationUpdate { - rel: buf_tag.rel, + rel: rel_tag, lsn, update, })); @@ -925,24 +935,16 @@ pub enum RelationSizeEntry { Unlink, } -// No real block in PostgreSQL will have block number u32::MAX -// See vendor/postgres/src/include/storage/block.h -const RELATION_SIZE_BLKNUM: u32 = u32::MAX; - const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey { ObjectKey { timeline: timelineid, - buf_tag: BufferTag { - rel, - blknum: RELATION_SIZE_BLKNUM, - }, + tag: ObjectTag::RelationMetadata(rel), } } /// /// In addition to those per-page and per-relation entries, we also -/// store a little metadata blob for each timeline. It is stored using -/// STORAGE_SPECIAL_FORKNUM. +/// store a little metadata blob for each timeline. /// #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MetadataEntry { @@ -955,15 +957,7 @@ pub struct MetadataEntry { const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey { ObjectKey { timeline: timelineid, - buf_tag: BufferTag { - rel: RelTag { - forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: 0, - }, + tag: ObjectTag::TimelineMetadataTag, } } @@ -976,7 +970,7 @@ const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey { struct ObjectVersionIter<'a> { obj_store: &'a dyn ObjectStore, - buf_tag: BufferTag, + object_tag: ObjectTag, /// Iterator on the current timeline. current_iter: Box)> + 'a>, @@ -1013,7 +1007,7 @@ impl<'a> ObjectVersionIter<'a> { if let Some(ancestor_timeline) = self.ancestor_timeline { let searchkey = ObjectKey { timeline: ancestor_timeline, - buf_tag: self.buf_tag, + tag: self.object_tag, }; let ancestor_iter = self .obj_store diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index 6bc6c86a24..56d1392ab8 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -1,19 +1,13 @@ //! Low-level key-value storage abstraction. //! -use crate::repository::{BufferTag, RelTag}; +use crate::object_key::*; +use crate::repository::RelTag; use crate::ZTimelineId; use anyhow::Result; -use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::iter::Iterator; use zenith_utils::lsn::Lsn; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ObjectKey { - pub timeline: ZTimelineId, - pub buf_tag: BufferTag, -} - /// /// Low-level storage abstraction. /// @@ -58,7 +52,7 @@ pub trait ObjectStore: Send + Sync { &'a self, timeline: ZTimelineId, lsn: Lsn, - ) -> Result)>> + 'a>>; + ) -> Result)>> + 'a>>; /// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'. /// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 6254dbf607..b1a1c91277 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -139,10 +139,6 @@ pub struct RepositoryStats { /// are used for the same purpose. /// [See more related comments here](https:///github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/relfilenode.h#L57). /// -/// We use additional fork numbers to logically separate relational and -/// non-relational data inside pageserver key-value storage. -/// See, e.g., `ROCKSDB_SPECIAL_FORKNUM`. -/// #[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy, Serialize, Deserialize)] pub struct RelTag { pub forknum: u8, @@ -475,13 +471,16 @@ mod tests { tline.advance_last_valid_lsn(Lsn(2)); let mut snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(2)); - assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); + + // TODO ordering not guaranteed by API. But currently it returns the + // truncation entry before the block data. let expected_truncate = RelationUpdate { rel: buf.rel, lsn: Lsn(2), update: Update::Truncate { n_blocks: 0 }, }; - assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API + assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); + assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); Ok(()) diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index b442053f6c..96723ef2c7 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -1,8 +1,9 @@ //! //! An implementation of the ObjectStore interface, backed by RocksDB //! -use crate::object_store::{ObjectKey, ObjectStore}; -use crate::repository::{BufferTag, RelTag}; +use crate::object_key::*; +use crate::object_store::ObjectStore; +use crate::repository::RelTag; use crate::PageServerConf; use crate::ZTimelineId; use anyhow::{bail, Result}; @@ -24,7 +25,7 @@ impl StorageKey { Self { obj_key: ObjectKey { timeline, - buf_tag: BufferTag::ZEROED, + tag: ObjectTag::TimelineMetadataTag, }, lsn: Lsn(0), } @@ -140,42 +141,46 @@ impl ObjectStore for RocksObjectStore { let mut rels: HashSet = HashSet::new(); - let mut search_key = StorageKey { - obj_key: ObjectKey { - timeline: timelineid, - buf_tag: BufferTag { - rel: RelTag { - spcnode, - dbnode, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - }, - lsn: Lsn(0), + let mut search_rel_tag = RelTag { + spcnode, + dbnode, + relnode: 0, + forknum: 0u8, }; let mut iter = self.db.raw_iterator(); loop { + let search_key = StorageKey { + obj_key: ObjectKey { + timeline: timelineid, + tag: ObjectTag::RelationMetadata(search_rel_tag), + }, + lsn: Lsn(0), + }; iter.seek(search_key.ser()?); if !iter.valid() { break; } let key = StorageKey::des(iter.key().unwrap())?; - if (spcnode != 0 && key.obj_key.buf_tag.rel.spcnode != spcnode) - || (dbnode != 0 && key.obj_key.buf_tag.rel.dbnode != dbnode) - { + + if let ObjectTag::RelationMetadata(rel_tag) = key.obj_key.tag { + if spcnode != 0 && rel_tag.spcnode != spcnode + || dbnode != 0 && rel_tag.dbnode != dbnode + { + break; + } + if key.lsn < lsn + { + // visible in this snapshot + rels.insert(rel_tag); + } + search_rel_tag = rel_tag; + // skip to next relation + // FIXME: What if relnode is u32::MAX ? + search_rel_tag.relnode += 1; + } else { + // no more relation metadata entries break; } - - if key.obj_key.buf_tag.rel.relnode != 0 // skip non-relational records (like timeline metadata) - && key.lsn < lsn - // visible in this snapshot - { - rels.insert(key.obj_key.buf_tag.rel); - } - search_key = key.clone(); - search_key.obj_key.buf_tag.rel.relnode += 1; // skip to next relation } Ok(rels) @@ -189,7 +194,7 @@ impl ObjectStore for RocksObjectStore { &'a self, timeline: ZTimelineId, lsn: Lsn, - ) -> Result)>> + 'a>> { + ) -> Result)>> + 'a>> { let start_key = StorageKey::timeline_start(timeline); let start_key_bytes = StorageKey::ser(&start_key)?; let iter = self.db.iterator(rocksdb::IteratorMode::From( @@ -296,7 +301,7 @@ impl<'a> Iterator for RocksObjectVersionIter<'a> { return None; } let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); - if key.obj_key.buf_tag != self.obj_key.buf_tag { + if key.obj_key.tag != self.obj_key.tag { return None; } let val = self.dbiter.value().unwrap(); @@ -314,7 +319,7 @@ struct RocksObjects<'r> { impl<'r> Iterator for RocksObjects<'r> { // TODO consider returning Box<[u8]> - type Item = Result<(BufferTag, Lsn, Vec)>; + type Item = Result<(ObjectTag, Lsn, Vec)>; fn next(&mut self) -> Option { self.next_result().transpose() @@ -322,7 +327,7 @@ impl<'r> Iterator for RocksObjects<'r> { } impl<'r> RocksObjects<'r> { - fn next_result(&mut self) -> Result)>> { + fn next_result(&mut self) -> Result)>> { for (key_bytes, v) in &mut self.iter { let key = StorageKey::des(&key_bytes)?; @@ -335,7 +340,7 @@ impl<'r> RocksObjects<'r> { continue; } - return Ok(Some((key.obj_key.buf_tag, key.lsn, v.to_vec()))); + return Ok(Some((key.obj_key.tag, key.lsn, v.to_vec()))); } Ok(None) diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index d8223d352f..94262bc438 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -21,8 +21,6 @@ pub const FSM_FORKNUM: u8 = 1; pub const VISIBILITYMAP_FORKNUM: u8 = 2; pub const INIT_FORKNUM: u8 = 3; -pub const ROCKSDB_SPECIAL_FORKNUM: u8 = 50; - // From storage_xlog.h pub const SMGR_TRUNCATE_HEAP: u32 = 0x0001; pub const SMGR_TRUNCATE_VM: u32 = 0x0002;