diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 7d7809b78c..431f5a3ed4 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -419,8 +419,7 @@ impl Timeline for ObjectTimeline { lsn ); - self.obj_store - .put(&key, lsn, &ObjectValue::ser(&val)?)?; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( tag.rel, @@ -446,6 +445,15 @@ impl Timeline for ObjectTimeline { Ok(()) } + fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + tag, + }; + self.obj_store.put(&key, lsn, data)?; + Ok(()) + } + /// /// Memorize a full image of a page version /// @@ -468,17 +476,12 @@ impl Timeline for ObjectTimeline { let new_nblocks = tag.blknum + 1; let key = relation_size_key(self.timelineid, tag.rel); let val = ObjectValue::RelationSize(new_nblocks); - trace!( "Extended relation {} from {} to {} blocks at {}", - tag.rel, - old_nblocks, - new_nblocks, - lsn + tag.rel, old_nblocks, new_nblocks, lsn ); - self.obj_store - .put(&key, lsn, &ObjectValue::ser(&val)?)?; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( tag.rel, @@ -502,8 +505,7 @@ impl Timeline for ObjectTimeline { info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); - self.obj_store - .put(&key, lsn, &ObjectValue::ser(&val)?)?; + self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?; let mut rel_meta = self.rel_meta.write().unwrap(); rel_meta.insert( rel, @@ -601,11 +603,7 @@ impl Timeline for ObjectTimeline { fn history<'a>(&'a self) -> Result> { let lsn = self.last_valid_lsn.load(); let iter = self.obj_store.objects(self.timelineid, lsn)?; - Ok(Box::new(ObjectHistory { - lsn, - iter, - last_relation_size: None, - })) + Ok(Box::new(ObjectHistory { lsn, iter })) } } @@ -939,14 +937,15 @@ impl ObjectTimeline { struct ObjectHistory<'a> { iter: Box)>> + 'a>, lsn: Lsn, - last_relation_size: Option<(RelTag, u32)>, } impl<'a> Iterator for ObjectHistory<'a> { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { - self.next_result().transpose() + self.iter + .next() + .map(|result| result.map(|t| Modification::new(t))) } } @@ -956,64 +955,12 @@ impl<'a> History for ObjectHistory<'a> { } } -impl<'a> ObjectHistory<'a> { - fn next_result(&mut self) -> Result> { - while let Some((tag, lsn, value)) = self.iter.next().transpose()? { - let entry = ObjectValue::des(&value)?; - let rel_tag: RelTag; - let update = match tag { - ObjectTag::RelationMetadata(rel) => { - rel_tag = rel; - match entry { - ObjectValue::RelationSize(size) => { - // we only want to output truncations, expansions are filtered out - let last_relation_size = self.last_relation_size.replace((rel, size)); - - match last_relation_size { - Some((last_rel, last_size)) - if last_rel != rel || size < last_size => - { - Update::Truncate { n_blocks: size } - } - _ => continue, - } - } - ObjectValue::Unlink => Update::Unlink, - _ => continue, - } - } - ObjectTag::RelationBuffer(buf_tag) => { - rel_tag = buf_tag.rel; - match entry { - ObjectValue::Page(img) => Update::Page { - blknum: buf_tag.blknum, - img, - }, - ObjectValue::WALRecord(rec) => Update::WALRecord { - blknum: buf_tag.blknum, - rec, - }, - _ => continue, - } - } - _ => continue, - }; - return Ok(Some(RelationUpdate { - rel: rel_tag, - lsn, - update, - })); - } - Ok(None) - } -} - /// /// We store several kinds of objects in the repository. /// We have per-page, per-relation(or non-rel file) and per-timeline entries. /// #[derive(Debug, Clone, Serialize, Deserialize)] -enum ObjectValue { +pub enum ObjectValue { /// Ready-made images of the block Page(Bytes), /// WAL records, to be applied on top of the "previous" entry diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 5271e31a25..815d209223 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -27,7 +27,7 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn}; use crate::basebackup; use crate::branches; use crate::page_cache; -use crate::repository::{BufferTag, ObjectTag, RelTag, RelationUpdate, Update}; +use crate::repository::{BufferTag, Modification, ObjectTag, RelTag}; use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; @@ -410,38 +410,14 @@ impl postgres_backend::Handler for PageServerHandler { while let Some(msg) = pgb.read_message()? { match msg { FeMessage::CopyData(bytes) => { - let relation_update = RelationUpdate::des(&bytes)?; + let modification = Modification::des(&bytes)?; - last_lsn = relation_update.lsn; - - match relation_update.update { - Update::Page { blknum, img } => { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: relation_update.rel, - blknum, - }); - - timeline.put_page_image(tag, relation_update.lsn, img)?; - } - Update::WALRecord { blknum, rec } => { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: relation_update.rel, - blknum, - }); - - timeline.put_wal_record(tag, rec)?; - } - Update::Truncate { n_blocks } => { - timeline.put_truncation( - relation_update.rel, - relation_update.lsn, - n_blocks, - )?; - } - Update::Unlink => { - todo!() - } - } + last_lsn = modification.lsn; + timeline.put_raw_data( + modification.tag, + last_lsn, + &modification.data[..], + )?; } FeMessage::CopyDone => { timeline.advance_last_valid_lsn(last_lsn); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3d95dce9a0..f419224dcd 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -61,6 +61,9 @@ pub trait Timeline: Send + Sync { // These are called by the WAL receiver to digest WAL records. //------------------------------------------------------------------------------ + /// Put raw data + fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>; + /// Put a new page version that can be constructed from a WAL record /// /// This will implicitly extend the relation, if the page is beyond the @@ -119,24 +122,26 @@ pub trait Timeline: Send + Sync { } } -pub trait History: Iterator> { +pub trait History: Iterator> { /// The last_valid_lsn at the time of history() call. fn lsn(&self) -> Lsn; } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct RelationUpdate { - pub rel: RelTag, +pub struct Modification { + pub tag: ObjectTag, pub lsn: Lsn, - pub update: Update, + pub data: Vec, } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum Update { - Page { blknum: u32, img: Bytes }, - WALRecord { blknum: u32, rec: WALRecord }, - Truncate { n_blocks: u32 }, - Unlink, +impl Modification { + pub fn new(entry: (ObjectTag, Lsn, Vec)) -> Modification { + Modification { + tag: entry.0, + lsn: entry.1, + data: entry.2, + } + } } #[derive(Clone)] @@ -329,6 +334,7 @@ impl WALRecord { mod tests { use super::*; use crate::object_repository::ObjectRepository; + use crate::object_repository::ObjectValue; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::PageServerConf; @@ -337,6 +343,7 @@ mod tests { use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; + use zenith_utils::bin_ser::BeSer; /// Arbitrary relation tag, for testing. const TESTREL_A: RelTag = RelTag { @@ -533,45 +540,82 @@ mod tests { let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(0)); + let mut snapshot = snapshot.skip_while(|r| match r { + Ok(m) => match m.tag { + ObjectTag::RelationBuffer(_) => false, + _ => true, + }, + _ => true, + }); assert_eq!(None, snapshot.next().transpose()?); // add a page and advance the last valid LSN let rel = TESTREL_A; - let buf = TEST_BUF(1); - tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; + let tag = TEST_BUF(1); + tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?; tline.advance_last_valid_lsn(Lsn(1)); - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(1)); - let expected_page = RelationUpdate { - rel, - lsn: Lsn(1), - update: Update::Page { - blknum: 1, - img: TEST_IMG("blk 1 @ lsn 1"), + let mut snapshot = snapshot.skip_while(|r| match r { + Ok(m) => match m.tag { + ObjectTag::RelationBuffer(_) => false, + _ => true, }, + _ => true, + }); + let expected_page = Modification { + tag, + lsn: Lsn(1), + data: ObjectValue::ser(&ObjectValue::Page(TEST_IMG("blk 1 @ lsn 1")))?, }; assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); // truncate to zero, but don't advance the last valid LSN tline.put_truncation(rel, Lsn(2), 0)?; - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(1)); + let mut snapshot = snapshot.skip_while(|r| match r { + Ok(m) => match m.tag { + ObjectTag::RelationBuffer(_) => false, + _ => true, + }, + _ => true, + }); assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); // advance the last valid LSN and the truncation should be observable tline.advance_last_valid_lsn(Lsn(2)); - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(2)); - let expected_truncate = RelationUpdate { - rel, - lsn: Lsn(2), - update: Update::Truncate { n_blocks: 0 }, + let mut snapshot = snapshot.skip_while(|r| match r { + Ok(m) => match m.tag { + ObjectTag::RelationMetadata(_) => false, + _ => true, + }, + _ => true, + }); + let expected_truncate = Modification { + tag: ObjectTag::RelationMetadata(rel), + lsn: Lsn(1), + data: ObjectValue::ser(&ObjectValue::RelationSize(2))?, }; - assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API + assert_eq!( + Some(&expected_truncate), + snapshot.next().transpose()?.as_ref() + ); // TODO ordering not guaranteed by API + let expected_truncate = Modification { + tag: ObjectTag::RelationMetadata(rel), + lsn: Lsn(2), + data: ObjectValue::ser(&ObjectValue::RelationSize(0))?, + }; + assert_eq!( + Some(&expected_truncate), + snapshot.next().transpose()?.as_ref() + ); // TODO ordering not guaranteed by API assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?);