Object push (#276)

* Introducing common enum ObjectVal for all values

* Rewrite push mechanism to use raw object copy

* Fix history unit test

* Add skip_nonrel_objects functions for history unit tests
This commit is contained in:
Konstantin Knizhnik
2021-07-21 00:41:57 +03:00
committed by GitHub
parent ad79ca05e9
commit 0723d49e0b
3 changed files with 90 additions and 139 deletions

View File

@@ -398,7 +398,6 @@ impl Timeline for ObjectTimeline {
/// current end-of-file.
fn put_wal_record(&self, tag: ObjectTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn;
self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?;
debug!("put_wal_record {:?} at {}", tag, lsn);
@@ -625,11 +624,7 @@ impl Timeline for ObjectTimeline {
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>> {
let lsn = self.last_valid_lsn.load();
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory {
lsn,
iter,
last_relation_size: None,
}))
Ok(Box::new(ObjectHistory { lsn, iter }))
}
fn gc_iteration(&self, horizon: u64, compact: bool) -> Result<GcResult> {
@@ -995,14 +990,15 @@ impl ObjectTimeline {
struct ObjectHistory<'a> {
iter: Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>,
lsn: Lsn,
last_relation_size: Option<(RelTag, u32)>,
}
impl<'a> Iterator for ObjectHistory<'a> {
type Item = Result<RelationUpdate>;
type Item = Result<Modification>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
self.iter
.next()
.map(|result| result.map(|t| Modification::new(t)))
}
}
@@ -1012,77 +1008,12 @@ impl<'a> History for ObjectHistory<'a> {
}
}
impl<'a> ObjectHistory<'a> {
fn handle_relation_size(
&mut self,
rel_tag: RelTag,
entry: RelationSizeEntry,
) -> Option<Update> {
match entry {
RelationSizeEntry::Size(size) => {
// we only want to output truncations, expansions are filtered out
let last_relation_size = self.last_relation_size.replace((rel_tag, size));
match last_relation_size {
Some((last_buf, last_size)) if last_buf != rel_tag || size < last_size => {
Some(Update::Truncate { n_blocks: size })
}
_ => None,
}
}
RelationSizeEntry::Unlink => Some(Update::Unlink),
}
}
fn handle_page(&mut self, buf_tag: BufferTag, entry: PageEntry) -> Update {
match entry {
PageEntry::Page(img) => Update::Page {
blknum: buf_tag.blknum,
img,
},
PageEntry::WALRecord(rec) => Update::WALRecord {
blknum: buf_tag.blknum,
rec,
},
}
}
fn next_result(&mut self) -> Result<Option<RelationUpdate>> {
while let Some((object_tag, lsn, value)) = self.iter.next().transpose()? {
let (rel_tag, update) = match object_tag {
ObjectTag::RelationMetadata(rel_tag) => {
let entry = ObjectValue::des_relsize(&value)?;
match self.handle_relation_size(rel_tag, entry) {
Some(relation_update) => (rel_tag, relation_update),
None => continue,
}
}
ObjectTag::RelationBuffer(buf_tag) => {
let entry = ObjectValue::des_page(&value)?;
let update = self.handle_page(buf_tag, entry);
(buf_tag.rel, update)
}
_ => continue,
};
return Ok(Some(RelationUpdate {
rel: rel_tag,
lsn,
update,
}));
}
Ok(None)
}
}
///
/// We store several kinds of objects in the repository.
/// We have per-page, per-relation and per-timeline entries.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum ObjectValue {
pub enum ObjectValue {
Page(PageEntry),
RelationSize(RelationSizeEntry),
TimelineMetadata(MetadataEntry),
@@ -1094,7 +1025,7 @@ enum ObjectValue {
/// ObjectTag::RelationBuffer as key.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum PageEntry {
pub enum PageEntry {
/// Ready-made image of the block
Page(Bytes),
@@ -1118,7 +1049,7 @@ enum PageEntry {
/// Use ObjectTag::RelationMetadata as the key.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum RelationSizeEntry {
pub enum RelationSizeEntry {
Size(u32),
/// Tombstone for a dropped relation.
@@ -1250,6 +1181,7 @@ impl<'a> ObjectVersionIter<'a> {
.obj_store
.get(&timeline_metadata_key(ancestor_timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let ancestor_metadata = ObjectValue::des_timeline_metadata(&v)?;
self.ancestor_timeline = ancestor_metadata.ancestor_timeline;
self.ancestor_lsn = ancestor_metadata.ancestor_lsn;

View File

@@ -30,7 +30,7 @@ use crate::basebackup;
use crate::branches;
use crate::object_key::ObjectTag;
use crate::page_cache;
use crate::repository::{BufferTag, RelTag, RelationUpdate, Update};
use crate::repository::{BufferTag, Modification, RelTag};
use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
@@ -414,38 +414,14 @@ impl postgres_backend::Handler for PageServerHandler {
while let Some(msg) = pgb.read_message()? {
match msg {
FeMessage::CopyData(bytes) => {
let relation_update = RelationUpdate::des(&bytes)?;
let modification = Modification::des(&bytes)?;
last_lsn = relation_update.lsn;
match relation_update.update {
Update::Page { blknum, img } => {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: relation_update.rel,
blknum,
});
timeline.put_page_image(tag, relation_update.lsn, img, true)?;
}
Update::WALRecord { blknum, rec } => {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: relation_update.rel,
blknum,
});
timeline.put_wal_record(tag, rec)?;
}
Update::Truncate { n_blocks } => {
timeline.put_truncation(
relation_update.rel,
relation_update.lsn,
n_blocks,
)?;
}
Update::Unlink => {
todo!()
}
}
last_lsn = modification.lsn;
timeline.put_raw_data(
modification.tag,
last_lsn,
&modification.data[..],
)?;
}
FeMessage::CopyDone => {
timeline.advance_last_valid_lsn(last_lsn);

View File

@@ -163,24 +163,30 @@ pub trait Timeline: Send + Sync {
}
}
pub trait History: Iterator<Item = Result<RelationUpdate>> {
pub trait History: Iterator<Item = Result<Modification>> {
/// The last_valid_lsn at the time of history() call.
fn lsn(&self) -> Lsn;
}
//
// Structure representing any update operation of object storage.
// It is used to copy object storage content in PUSH method.
//
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RelationUpdate {
pub rel: RelTag,
pub struct Modification {
pub tag: ObjectTag,
pub lsn: Lsn,
pub update: Update,
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Update {
Page { blknum: u32, img: Bytes },
WALRecord { blknum: u32, rec: WALRecord },
Truncate { n_blocks: u32 },
Unlink,
impl Modification {
pub fn new(entry: (ObjectTag, Lsn, Vec<u8>)) -> Modification {
Modification {
tag: entry.0,
lsn: entry.1,
data: entry.2,
}
}
}
#[derive(Clone)]
@@ -302,6 +308,7 @@ impl WALRecord {
mod tests {
use super::*;
use crate::object_repository::ObjectRepository;
use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry};
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
@@ -310,6 +317,7 @@ mod tests {
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag {
@@ -505,6 +513,18 @@ mod tests {
Ok(())
}
fn skip_nonrel_objects<'a>(
snapshot: Box<dyn History + 'a>,
) -> Result<impl Iterator<Item = <dyn History as Iterator>::Item> + 'a> {
Ok(snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationMetadata(_) => false,
_ => true,
},
_ => panic!("Iteration error"),
}))
}
///
/// Test branch creation
///
@@ -564,48 +584,71 @@ mod tests {
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(0));
let mut snapshot = skip_nonrel_objects(snapshot)?;
assert_eq!(None, snapshot.next().transpose()?);
// add a page and advance the last valid LSN
let rel = TESTREL_A;
let tag = TEST_BUF(1);
tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"), true)?;
tline.advance_last_valid_lsn(Lsn(1));
let mut snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let expected_page = RelationUpdate {
rel: rel,
let expected_page = Modification {
tag,
lsn: Lsn(1),
update: Update::Page {
blknum: 1,
img: TEST_IMG("blk 1 @ lsn 1"),
},
data: ObjectValue::ser(&ObjectValue::Page(PageEntry::Page(TEST_IMG(
"blk 1 @ lsn 1",
))))?,
};
let expected_init_size = Modification {
tag: ObjectTag::RelationMetadata(rel),
lsn: Lsn(1),
data: ObjectValue::ser(&ObjectValue::RelationSize(RelationSizeEntry::Size(2)))?,
};
let expected_trunc_size = Modification {
tag: ObjectTag::RelationMetadata(rel),
lsn: Lsn(2),
data: ObjectValue::ser(&ObjectValue::RelationSize(RelationSizeEntry::Size(0)))?,
};
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let mut snapshot = skip_nonrel_objects(snapshot)?;
assert_eq!(
Some(&expected_init_size),
snapshot.next().transpose()?.as_ref()
);
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// truncate to zero, but don't advance the last valid LSN
tline.put_truncation(rel, Lsn(2), 0)?;
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let mut snapshot = skip_nonrel_objects(snapshot)?;
assert_eq!(
Some(&expected_init_size),
snapshot.next().transpose()?.as_ref()
);
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// advance the last valid LSN and the truncation should be observable
tline.advance_last_valid_lsn(Lsn(2));
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(2));
// TODO ordering not guaranteed by API. But currently it returns the
// truncation entry before the block data.
let expected_truncate = RelationUpdate {
rel: rel,
lsn: Lsn(2),
update: Update::Truncate { n_blocks: 0 },
};
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?);
let mut snapshot = skip_nonrel_objects(snapshot)?;
assert_eq!(
Some(&expected_init_size),
snapshot.next().transpose()?.as_ref()
);
assert_eq!(
Some(&expected_trunc_size),
snapshot.next().transpose()?.as_ref()
);
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);