From 0723d49e0b8b05aed8555c13e4b7f4189af710cc Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 21 Jul 2021 00:41:57 +0300 Subject: [PATCH] Object push (#276) * Introducing common enum ObjectVal for all values * Rewrite push mechanism to use raw object copy * Fix history unit test * Add skip_nonrel_objects functions for history unit tests --- pageserver/src/object_repository.rs | 86 +++-------------------- pageserver/src/page_service.rs | 40 +++-------- pageserver/src/repository.rs | 103 ++++++++++++++++++++-------- 3 files changed, 90 insertions(+), 139 deletions(-) diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index dd1c0d0eba..366055ce6d 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -398,7 +398,6 @@ impl Timeline for ObjectTimeline { /// current end-of-file. fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> { let lsn = rec.lsn; - self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?; debug!("put_wal_record {:?} at {}", tag, lsn); @@ -625,11 +624,7 @@ impl Timeline for ObjectTimeline { fn history<'a>(&'a self) -> Result> { let lsn = self.last_valid_lsn.load(); let iter = self.obj_store.objects(self.timelineid, lsn)?; - Ok(Box::new(ObjectHistory { - lsn, - iter, - last_relation_size: None, - })) + Ok(Box::new(ObjectHistory { lsn, iter })) } fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { @@ -995,14 +990,15 @@ impl ObjectTimeline { struct ObjectHistory<'a> { iter: Box)>> + 'a>, lsn: Lsn, - last_relation_size: Option<(RelTag, u32)>, } impl<'a> Iterator for ObjectHistory<'a> { - type Item = Result; + type Item = Result; fn next(&mut self) -> Option { - self.next_result().transpose() + self.iter + .next() + .map(|result| result.map(|t| Modification::new(t))) } } @@ -1012,77 +1008,12 @@ impl<'a> History for ObjectHistory<'a> { } } -impl<'a> ObjectHistory<'a> { - fn handle_relation_size( - &mut self, - rel_tag: RelTag, - entry: RelationSizeEntry, - ) -> Option { - match entry { - RelationSizeEntry::Size(size) => { - // we only want to output truncations, expansions are filtered out - let last_relation_size = self.last_relation_size.replace((rel_tag, size)); - - match last_relation_size { - Some((last_buf, last_size)) if last_buf != rel_tag || size < last_size => { - Some(Update::Truncate { n_blocks: size }) - } - _ => None, - } - } - RelationSizeEntry::Unlink => Some(Update::Unlink), - } - } - - fn handle_page(&mut self, buf_tag: BufferTag, entry: PageEntry) -> Update { - match entry { - PageEntry::Page(img) => Update::Page { - blknum: buf_tag.blknum, - img, - }, - PageEntry::WALRecord(rec) => Update::WALRecord { - blknum: buf_tag.blknum, - rec, - }, - } - } - - fn next_result(&mut self) -> Result> { - while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? { - let (rel_tag, update) = match object_tag { - ObjectTag::RelationMetadata(rel_tag) => { - let entry = ObjectValue::des_relsize(&value)?; - match self.handle_relation_size(rel_tag, entry) { - Some(relation_update) => (rel_tag, relation_update), - None => continue, - } - } - ObjectTag::RelationBuffer(buf_tag) => { - let entry = ObjectValue::des_page(&value)?; - let update = self.handle_page(buf_tag, entry); - - (buf_tag.rel, update) - } - _ => continue, - }; - - return Ok(Some(RelationUpdate { - rel: rel_tag, - lsn, - update, - })); - } - - Ok(None) - } -} - /// /// We store several kinds of objects in the repository. /// We have per-page, per-relation and per-timeline entries. /// #[derive(Debug, Clone, Serialize, Deserialize)] -enum ObjectValue { +pub enum ObjectValue { Page(PageEntry), RelationSize(RelationSizeEntry), TimelineMetadata(MetadataEntry), @@ -1094,7 +1025,7 @@ enum ObjectValue { /// ObjectTag::RelationBuffer as key. /// #[derive(Debug, Clone, Serialize, Deserialize)] -enum PageEntry { +pub enum PageEntry { /// Ready-made image of the block Page(Bytes), @@ -1118,7 +1049,7 @@ enum PageEntry { /// Use ObjectTag::RelationMetadata as the key. /// #[derive(Debug, Clone, Serialize, Deserialize)] -enum RelationSizeEntry { +pub enum RelationSizeEntry { Size(u32), /// Tombstone for a dropped relation. @@ -1250,6 +1181,7 @@ impl<'a> ObjectVersionIter<'a> { .obj_store .get(&timeline_metadata_key(ancestor_timeline), Lsn(0)) .with_context(|| "timeline not found in repository")?; + let ancestor_metadata = ObjectValue::des_timeline_metadata(&v)?; self.ancestor_timeline = ancestor_metadata.ancestor_timeline; self.ancestor_lsn = ancestor_metadata.ancestor_lsn; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 8110133115..8b905a1eaf 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -30,7 +30,7 @@ use crate::basebackup; use crate::branches; use crate::object_key::ObjectTag; use crate::page_cache; -use crate::repository::{BufferTag, RelTag, RelationUpdate, Update}; +use crate::repository::{BufferTag, Modification, RelTag}; use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; @@ -414,38 +414,14 @@ impl postgres_backend::Handler for PageServerHandler { while let Some(msg) = pgb.read_message()? { match msg { FeMessage::CopyData(bytes) => { - let relation_update = RelationUpdate::des(&bytes)?; + let modification = Modification::des(&bytes)?; - last_lsn = relation_update.lsn; - - match relation_update.update { - Update::Page { blknum, img } => { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: relation_update.rel, - blknum, - }); - - timeline.put_page_image(tag, relation_update.lsn, img, true)?; - } - Update::WALRecord { blknum, rec } => { - let tag = ObjectTag::RelationBuffer(BufferTag { - rel: relation_update.rel, - blknum, - }); - - timeline.put_wal_record(tag, rec)?; - } - Update::Truncate { n_blocks } => { - timeline.put_truncation( - relation_update.rel, - relation_update.lsn, - n_blocks, - )?; - } - Update::Unlink => { - todo!() - } - } + last_lsn = modification.lsn; + timeline.put_raw_data( + modification.tag, + last_lsn, + &modification.data[..], + )?; } FeMessage::CopyDone => { timeline.advance_last_valid_lsn(last_lsn); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 968db48f0a..fd47149327 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -163,24 +163,30 @@ pub trait Timeline: Send + Sync { } } -pub trait History: Iterator> { +pub trait History: Iterator> { /// The last_valid_lsn at the time of history() call. fn lsn(&self) -> Lsn; } +// +// Structure representing any update operation of object storage. +// It is used to copy object storage content in PUSH method. +// #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct RelationUpdate { - pub rel: RelTag, +pub struct Modification { + pub tag: ObjectTag, pub lsn: Lsn, - pub update: Update, + pub data: Vec, } -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -pub enum Update { - Page { blknum: u32, img: Bytes }, - WALRecord { blknum: u32, rec: WALRecord }, - Truncate { n_blocks: u32 }, - Unlink, +impl Modification { + pub fn new(entry: (ObjectTag, Lsn, Vec)) -> Modification { + Modification { + tag: entry.0, + lsn: entry.1, + data: entry.2, + } + } } #[derive(Clone)] @@ -302,6 +308,7 @@ impl WALRecord { mod tests { use super::*; use crate::object_repository::ObjectRepository; + use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry}; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::PageServerConf; @@ -310,6 +317,7 @@ mod tests { use std::path::PathBuf; use std::str::FromStr; use std::time::Duration; + use zenith_utils::bin_ser::BeSer; /// Arbitrary relation tag, for testing. const TESTREL_A: RelTag = RelTag { @@ -505,6 +513,18 @@ mod tests { Ok(()) } + fn skip_nonrel_objects<'a>( + snapshot: Box, + ) -> Result::Item> + 'a> { + Ok(snapshot.skip_while(|r| match r { + Ok(m) => match m.tag { + ObjectTag::RelationMetadata(_) => false, + _ => true, + }, + _ => panic!("Iteration error"), + })) + } + /// /// Test branch creation /// @@ -564,48 +584,71 @@ mod tests { let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(0)); + let mut snapshot = skip_nonrel_objects(snapshot)?; assert_eq!(None, snapshot.next().transpose()?); // add a page and advance the last valid LSN let rel = TESTREL_A; let tag = TEST_BUF(1); + tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"), true)?; tline.advance_last_valid_lsn(Lsn(1)); - let mut snapshot = tline.history()?; - assert_eq!(snapshot.lsn(), Lsn(1)); - let expected_page = RelationUpdate { - rel: rel, + + let expected_page = Modification { + tag, lsn: Lsn(1), - update: Update::Page { - blknum: 1, - img: TEST_IMG("blk 1 @ lsn 1"), - }, + data: ObjectValue::ser(&ObjectValue::Page(PageEntry::Page(TEST_IMG( + "blk 1 @ lsn 1", + ))))?, }; + let expected_init_size = Modification { + tag: ObjectTag::RelationMetadata(rel), + lsn: Lsn(1), + data: ObjectValue::ser(&ObjectValue::RelationSize(RelationSizeEntry::Size(2)))?, + }; + let expected_trunc_size = Modification { + tag: ObjectTag::RelationMetadata(rel), + lsn: Lsn(2), + data: ObjectValue::ser(&ObjectValue::RelationSize(RelationSizeEntry::Size(0)))?, + }; + + let snapshot = tline.history()?; + assert_eq!(snapshot.lsn(), Lsn(1)); + let mut snapshot = skip_nonrel_objects(snapshot)?; + assert_eq!( + Some(&expected_init_size), + snapshot.next().transpose()?.as_ref() + ); assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); // truncate to zero, but don't advance the last valid LSN tline.put_truncation(rel, Lsn(2), 0)?; - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(1)); + let mut snapshot = skip_nonrel_objects(snapshot)?; + assert_eq!( + Some(&expected_init_size), + snapshot.next().transpose()?.as_ref() + ); assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?); // advance the last valid LSN and the truncation should be observable tline.advance_last_valid_lsn(Lsn(2)); - let mut snapshot = tline.history()?; + let snapshot = tline.history()?; assert_eq!(snapshot.lsn(), Lsn(2)); - - // TODO ordering not guaranteed by API. But currently it returns the - // truncation entry before the block data. - let expected_truncate = RelationUpdate { - rel: rel, - lsn: Lsn(2), - update: Update::Truncate { n_blocks: 0 }, - }; - assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); + let mut snapshot = skip_nonrel_objects(snapshot)?; + assert_eq!( + Some(&expected_init_size), + snapshot.next().transpose()?.as_ref() + ); + assert_eq!( + Some(&expected_trunc_size), + snapshot.next().transpose()?.as_ref() + ); assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref()); assert_eq!(None, snapshot.next().transpose()?);