diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index ebb84a97b8..7d64904d85 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -473,6 +473,16 @@ impl Timeline for ObjectTimeline { Ok(()) } + + fn history<'a>(&'a self) -> Result> { + let lsn = self.last_valid_lsn.load(); + let iter = self.obj_store.objects(self.timelineid, lsn)?; + Ok(Box::new(ObjectHistory { + lsn, + iter, + last_relation_size: None, + })) + } } impl ObjectTimeline { @@ -809,6 +819,89 @@ impl ObjectTimeline { } } +struct ObjectHistory<'a> { + iter: Box)>> + 'a>, + lsn: Lsn, + last_relation_size: Option<(BufferTag, u32)>, +} + +impl<'a> Iterator for ObjectHistory<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_result().transpose() + } +} + +impl<'a> History for ObjectHistory<'a> { + fn lsn(&self) -> Lsn { + self.lsn + } +} + +impl<'a> ObjectHistory<'a> { + fn handle_relation_size( + &mut self, + buf_tag: BufferTag, + entry: RelationSizeEntry, + ) -> Option { + match entry { + RelationSizeEntry::Size(size) => { + // we only want to output truncations, expansions are filtered out + let last_relation_size = self.last_relation_size.replace((buf_tag, size)); + + match last_relation_size { + Some((last_buf, last_size)) if last_buf != buf_tag || size < last_size => { + Some(Update::Truncate { n_blocks: size }) + } + _ => None, + } + } + RelationSizeEntry::Unlink => Some(Update::Unlink), + } + } + + fn handle_page(&mut self, buf_tag: BufferTag, entry: PageEntry) -> Update { + match entry { + PageEntry::Page(img) => Update::Page { + blknum: buf_tag.blknum, + img, + }, + PageEntry::WALRecord(rec) => Update::WALRecord { + blknum: buf_tag.blknum, + rec, + }, + } + } + + fn next_result(&mut self) -> Result> { + while let Some((buf_tag, lsn, value)) = self.iter.next().transpose()? { + if buf_tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM { + continue; + } + + let update = if buf_tag.blknum == RELATION_SIZE_BLKNUM { + let entry = RelationSizeEntry::des(&value)?; + match self.handle_relation_size(buf_tag, entry) { + Some(relation_update) => relation_update, + None => continue, + } + } else { + let entry = PageEntry::des(&value)?; + self.handle_page(buf_tag, entry) + }; + + return Ok(Some(RelationUpdate { + rel: buf_tag.rel, + lsn, + update, + })); + } + + Ok(None) + } +} + /// /// We store two kinds of page versions in the repository: /// @@ -846,12 +939,16 @@ pub enum RelationSizeEntry { Unlink, } +// No real block in PostgreSQL will have block number u32::MAX +// See vendor/postgres/src/include/storage/block.h +const RELATION_SIZE_BLKNUM: u32 = u32::MAX; + const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey { ObjectKey { timeline: timelineid, buf_tag: BufferTag { rel, - blknum: u32::MAX, + blknum: RELATION_SIZE_BLKNUM, }, } } diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs index dbda9e0d8f..3ef1d9d4f7 100644 --- a/pageserver/src/object_store.rs +++ b/pageserver/src/object_store.rs @@ -1,5 +1,5 @@ //! Low-level key-value storage abstraction. -//! +//! use crate::repository::{BufferTag, RelTag}; use crate::ZTimelineId; use anyhow::Result; @@ -50,6 +50,16 @@ pub trait ObjectStore: Send + Sync { lsn: Lsn, ) -> Result)> + 'a>>; + /// Iterate through versions of all objects in a timeline. + /// + /// Returns objects in increasing key-version order. + /// Returns all versions up to and including the specified LSN. + fn objects<'a>( + &'a self, + timeline: ZTimelineId, + lsn: Lsn, + ) -> Result)>> + 'a>>; + /// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'. /// /// This is used to implement 'create database' diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 0113f98a00..a420ff9783 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -86,6 +86,34 @@ pub trait Timeline: Send + Sync { /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. fn checkpoint(&self) -> Result<()>; + + /// Events for all relations in the timeline. + /// Contains updates from start up to the last valid LSN + /// at time of history() call. This lsn can be read via the lsn() function. + /// + /// Relation size is increased implicitly and decreased with Truncate updates. + // TODO ordering guarantee? + fn history<'a>(&'a self) -> Result>; +} + +pub trait History: Iterator> { + /// The last_valid_lsn at the time of history() call. + fn lsn(&self) -> Lsn; +} + +#[derive(Debug, PartialEq, Eq)] +pub struct RelationUpdate { + pub rel: RelTag, + pub lsn: Lsn, + pub update: Update, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum Update { + Page { blknum: u32, img: Bytes }, + WALRecord { blknum: u32, rec: WALRecord }, + Truncate { n_blocks: u32 }, + Unlink, } #[derive(Clone)] @@ -123,6 +151,15 @@ pub struct RelTag { pub relnode: u32, } +impl RelTag { + pub const ZEROED: Self = Self { + forknum: 0, + spcnode: 0, + dbnode: 0, + relnode: 0, + }; +} + /// Display RelTag in the same format that's used in most PostgreSQL debug messages: /// /// //[_fsm|_vm|_init] @@ -154,7 +191,14 @@ pub struct BufferTag { pub blknum: u32, } -#[derive(Debug, Clone, Serialize, Deserialize)] +impl BufferTag { + pub const ZEROED: Self = Self { + rel: RelTag::ZEROED, + blknum: 0, + }; +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, @@ -392,6 +436,57 @@ mod tests { Ok(()) } + #[test] + fn test_history() -> Result<()> { + let repo = get_test_repo("test_snapshot")?; + + let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); + let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; + + let mut snapshot = tline.history()?; + assert_eq!(snapshot.lsn(), Lsn(0)); + assert_eq!(None, snapshot.next().transpose()?); + + // add a page and advance the last valid LSN + let buf = TEST_BUF(1); + tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; + tline.advance_last_valid_lsn(Lsn(1)); + let mut snapshot = tline.history()?; + assert_eq!(snapshot.lsn(), Lsn(1)); + let expected_page = RelationUpdate { + rel: buf.rel, + lsn: Lsn(1), + update: Update::Page { + blknum: buf.blknum, + img: TEST_IMG("blk 1 @ lsn 1"), + }, + }; + assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); + assert_eq!(None, snapshot.next().transpose()?); + + // truncate to zero, but don't advance the last valid LSN + tline.put_truncation(buf.rel, Lsn(2), 0)?; + let mut snapshot = tline.history()?; + assert_eq!(snapshot.lsn(), Lsn(1)); + assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); + assert_eq!(None, snapshot.next().transpose()?); + + // advance the last valid LSN and the truncation should be observable + tline.advance_last_valid_lsn(Lsn(2)); + let mut snapshot = tline.history()?; + assert_eq!(snapshot.lsn(), Lsn(2)); + assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); + let expected_truncate = RelationUpdate { + rel: buf.rel, + lsn: Lsn(2), + update: Update::Truncate { n_blocks: 0 }, + }; + assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API + assert_eq!(None, snapshot.next().transpose()?); + + Ok(()) + } + // Mock WAL redo manager that doesn't do much struct TestRedoManager {} diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs index c5e461031b..9da9b04d7f 100644 --- a/pageserver/src/rocksdb_storage.rs +++ b/pageserver/src/rocksdb_storage.rs @@ -17,6 +17,19 @@ struct StorageKey { lsn: Lsn, } +impl StorageKey { + /// The first key for a given timeline + fn timeline_start(timeline: ZTimelineId) -> Self { + Self { + obj_key: ObjectKey { + timeline, + buf_tag: BufferTag::ZEROED, + }, + lsn: Lsn(0), + } + } +} + pub struct RocksObjectStore { _conf: &'static PageServerConf, @@ -109,6 +122,29 @@ impl ObjectStore for RocksObjectStore { Ok(rels) } + + /// Iterate through versions of all objects in a timeline. + /// + /// Returns objects in increasing key-version order. + /// Returns all versions up to and including the specified LSN. + fn objects<'a>( + &'a self, + timeline: ZTimelineId, + lsn: Lsn, + ) -> Result)>> + 'a>> { + let start_key = StorageKey::timeline_start(timeline); + let start_key_bytes = StorageKey::ser(&start_key)?; + let iter = self.db.iterator(rocksdb::IteratorMode::From( + &start_key_bytes, + rocksdb::Direction::Forward, + )); + + Ok(Box::new(RocksObjects { + timeline, + lsn, + iter, + })) + } } impl RocksObjectStore { @@ -206,3 +242,39 @@ impl<'a> Iterator for RocksObjectVersionIter<'a> { Some((key.lsn, result)) } } + +struct RocksObjects<'r> { + iter: rocksdb::DBIterator<'r>, + timeline: ZTimelineId, + lsn: Lsn, +} + +impl<'r> Iterator for RocksObjects<'r> { + // TODO consider returning Box<[u8]> + type Item = Result<(BufferTag, Lsn, Vec)>; + + fn next(&mut self) -> Option { + self.next_result().transpose() + } +} + +impl<'r> RocksObjects<'r> { + fn next_result(&mut self) -> Result)>> { + for (key_bytes, v) in &mut self.iter { + let key = StorageKey::des(&key_bytes)?; + + if key.obj_key.timeline != self.timeline { + return Ok(None); + } + + if key.lsn > self.lsn { + // TODO can speed up by seeking iterator + continue; + } + + return Ok(Some((key.obj_key.buf_tag, key.lsn, v.to_vec()))); + } + + Ok(None) + } +}