Introduce common enum ObjectVal for all values stored in ObjectStore.

Based on Konstantin's original patch (PR #275), but I introduced helper
functions for serializing/deserializing the different kinds of
ObjectValues, which made it more pleasant to use, as the deserialization
checks are now performed in the helper functions.
This commit is contained in:
Heikki Linnakangas
2021-06-30 12:50:35 +03:00
parent 9c856ecf43
commit 3386ce6f35

View File

@@ -106,17 +106,18 @@ impl Repository for ObjectRepository {
) -> Result<Arc<dyn Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
// Write metadata key
// Write initial metadata key.
let metadata = MetadataEntry {
last_valid_lsn: start_lsn,
last_record_lsn: start_lsn,
ancestor_timeline: None,
ancestor_lsn: start_lsn,
};
let val = ObjectValue::TimelineMetadata(metadata);
self.obj_store.put(
&timeline_metadata_key(timelineid),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
&ObjectValue::ser(&val)?,
)?;
info!("Created empty timeline {}", timelineid);
@@ -149,10 +150,11 @@ impl Repository for ObjectRepository {
ancestor_timeline: Some(src),
ancestor_lsn: at_lsn,
};
let val = ObjectValue::TimelineMetadata(metadata);
self.obj_store.put(
&timeline_metadata_key(dst),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
&ObjectValue::ser(&val)?,
)?;
Ok(())
@@ -223,7 +225,7 @@ impl ObjectTimeline {
let v = obj_store
.get(&timeline_metadata_key(timelineid), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let metadata = MetadataEntry::des(&v)?;
let metadata = ObjectValue::des_timeline_metadata(&v)?;
let timeline = ObjectTimeline {
timelineid,
@@ -306,7 +308,7 @@ impl Timeline for ObjectTimeline {
.obj_store
.get(&timeline_metadata_key(timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let metadata = MetadataEntry::des(&v)?;
let metadata = ObjectValue::des_timeline_metadata(&v)?;
prev_timeline = metadata.ancestor_timeline;
lsn = metadata.ancestor_lsn;
@@ -327,13 +329,8 @@ impl Timeline for ObjectTimeline {
/// current end-of-file.
fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> {
let lsn = rec.lsn;
let key = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(tag),
};
let val = PageEntry::WALRecord(rec);
self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?;
self.put_page_entry(&tag, lsn, PageEntry::WALRecord(rec))?;
debug!(
"put_wal_record rel {} blk {} at {}",
tag.rel, tag.blknum, lsn
@@ -344,8 +341,6 @@ impl Timeline for ObjectTimeline {
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = RelationSizeEntry::Size(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
@@ -355,8 +350,7 @@ impl Timeline for ObjectTimeline {
lsn
);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
@@ -372,13 +366,7 @@ impl Timeline for ObjectTimeline {
/// Unlink object. This method is used for marking dropped relations.
fn put_unlink(&self, rel_tag: RelTag, lsn: Lsn) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationMetadata(rel_tag),
};
let val = RelationSizeEntry::Unlink;
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
self.put_relsize_entry(&rel_tag, lsn, RelationSizeEntry::Unlink)?;
Ok(())
}
@@ -387,13 +375,8 @@ impl Timeline for ObjectTimeline {
/// Memorize a full image of a page version
///
fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(tag),
};
let val = PageEntry::Page(img);
self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?;
self.put_page_entry(&tag, lsn, PageEntry::Page(img))?;
debug!(
"put_page_image rel {} blk {} at {}",
@@ -405,8 +388,6 @@ impl Timeline for ObjectTimeline {
if tag.blknum >= old_nblocks {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = RelationSizeEntry::Size(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
@@ -416,8 +397,7 @@ impl Timeline for ObjectTimeline {
lsn
);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
self.put_relsize_entry(&tag.rel, lsn, RelationSizeEntry::Size(new_nblocks))?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
@@ -436,13 +416,9 @@ impl Timeline for ObjectTimeline {
/// associating it with all pages started with specified block number
///
fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> {
let key = relation_size_key(self.timelineid, rel);
let val = RelationSizeEntry::Size(nblocks);
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
self.obj_store
.put(&key, lsn, &RelationSizeEntry::ser(&val)?)?;
self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(nblocks))?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
rel,
@@ -525,14 +501,10 @@ impl Timeline for ObjectTimeline {
ancestor_timeline: self.ancestor_timeline,
ancestor_lsn: self.ancestor_lsn,
};
self.obj_store.put(
&timeline_metadata_key(self.timelineid),
Lsn(0),
&MetadataEntry::ser(&metadata)?,
)?;
trace!("checkpoint at {}", metadata.last_valid_lsn);
self.put_timeline_metadata_entry(metadata)?;
Ok(())
}
@@ -566,9 +538,8 @@ impl Timeline for ObjectTimeline {
// Process relation metadata versions
let mut latest_version = true;
for vers in self.obj_store.object_versions(&key, horizon)? {
let lsn = vers.0;
let rel_meta = RelationSizeEntry::des(&vers.1)?;
for (lsn, val) in self.obj_store.object_versions(&key, horizon)? {
let rel_meta = ObjectValue::des_relsize(&val)?;
// If relation is dropped at the horizon,
// we can remove all its versions including latest (Unlink)
match rel_meta {
@@ -641,7 +612,7 @@ impl ObjectTimeline {
if let Some((version_lsn, value)) = iter.next().transpose()? {
let page_img: Bytes;
match PageEntry::des(&value)? {
match ObjectValue::des_page(&value)? {
PageEntry::Page(img) => {
page_img = img;
}
@@ -695,7 +666,7 @@ impl ObjectTimeline {
let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?;
if let Some((version_lsn, value)) = iter.next().transpose()? {
match RelationSizeEntry::des(&value)? {
match ObjectValue::des_relsize(&value)? {
RelationSizeEntry::Size(nblocks) => {
trace!(
"relation {} has size {} at {} (request {})",
@@ -744,7 +715,7 @@ impl ObjectTimeline {
};
let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?;
while let Some((_key, value)) = iter.next().transpose()? {
match PageEntry::des(&value)? {
match ObjectValue::des_page(&value)? {
PageEntry::Page(img) => {
// We have a base image. No need to dig deeper into the list of
// records
@@ -838,6 +809,33 @@ impl ObjectTimeline {
ancestor_lsn: self.ancestor_lsn,
})
}
//
// Helper functions to store different kinds of objects to the underlying ObjectStore
//
fn put_page_entry(&self, tag: &BufferTag, lsn: Lsn, val: PageEntry) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag: ObjectTag::RelationBuffer(*tag),
};
let val = ObjectValue::Page(val);
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)
}
fn put_relsize_entry(&self, tag: &RelTag, lsn: Lsn, val: RelationSizeEntry) -> Result<()> {
let key = relation_size_key(self.timelineid, *tag);
let val = ObjectValue::RelationSize(val);
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)
}
fn put_timeline_metadata_entry(&self, val: MetadataEntry) -> Result<()> {
let key = timeline_metadata_key(self.timelineid);
let val = ObjectValue::TimelineMetadata(val);
self.obj_store.put(&key, Lsn(0), &ObjectValue::ser(&val)?)
}
}
struct ObjectHistory<'a> {
@@ -900,15 +898,16 @@ impl<'a> ObjectHistory<'a> {
let (rel_tag, update) = match object_tag {
ObjectTag::TimelineMetadataTag => continue,
ObjectTag::RelationMetadata(rel_tag) => {
let entry = RelationSizeEntry::des(&value)?;
let entry = ObjectValue::des_relsize(&value)?;
match self.handle_relation_size(rel_tag, entry) {
Some(relation_update) => (rel_tag, relation_update),
None => continue,
}
}
ObjectTag::RelationBuffer(buf_tag) => {
let entry = PageEntry::des(&value)?;
let entry = ObjectValue::des_page(&value)?;
let update = self.handle_page(buf_tag, entry);
(buf_tag.rel, update)
}
};
@@ -925,10 +924,19 @@ impl<'a> ObjectHistory<'a> {
}
///
/// We store two kinds of page versions in the repository:
/// We store several kinds of objects in the repository.
/// We have per-page, per-relation and per-timeline entries.
///
/// 1. Ready-made images of the block
/// 2. WAL records, to be applied on top of the "previous" entry
#[derive(Debug, Clone, Serialize, Deserialize)]
enum ObjectValue {
Page(PageEntry),
RelationSize(RelationSizeEntry),
TimelineMetadata(MetadataEntry),
}
///
/// This is what we store for each page in the object store. Use
/// ObjectTag::RelationBuffer as key.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum PageEntry {
@@ -948,10 +956,14 @@ enum PageEntry {
///
/// In addition to page versions, we store relation size as a separate, versioned,
/// object.
/// object. That way we can answer nblocks requests faster, and we also use it to
/// support relation truncation without having to add a tombstone page version for
/// each block that is truncated away.
///
/// Use ObjectTag::RelationMetadata as the key.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RelationSizeEntry {
enum RelationSizeEntry {
Size(u32),
/// Tombstone for a dropped relation.
@@ -966,8 +978,9 @@ const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey {
}
///
/// In addition to those per-page and per-relation entries, we also
/// store a little metadata blob for each timeline.
/// In addition to the per-page and per-relation entries, we also store
/// a little metadata blob for each timeline. This is not versioned, use
/// ObjectTag::TimelineMetadataTag with constant Lsn(0) as the key.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetadataEntry {
@@ -984,6 +997,45 @@ const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey {
}
}
///
/// Helper functions to deserialize ObjectValue, when the caller knows what kind of
/// a value it should be.
///
/// There are no matching helper functions for serializing. Instead, there are
/// `put_page_entry`, `put_relsize_entry`, and `put_timeline_metadata_entry` helper
/// functions in ObjectTimeline that both construct the right kind of key and
/// serialize the value in the same call.
///
impl ObjectValue {
fn des_page(v: &[u8]) -> Result<PageEntry> {
match ObjectValue::des(&v)? {
ObjectValue::Page(p) => Ok(p),
_ => {
bail!("Invalid object kind, expected a page entry");
}
}
}
fn des_relsize(v: &[u8]) -> Result<RelationSizeEntry> {
match ObjectValue::des(&v)? {
ObjectValue::RelationSize(rs) => Ok(rs),
_ => {
bail!("Invalid object kind, expected a relation size entry");
}
}
}
fn des_timeline_metadata(v: &[u8]) -> Result<MetadataEntry> {
match ObjectValue::des(&v)? {
ObjectValue::TimelineMetadata(t) => Ok(t),
_ => {
bail!("Invalid object kind, expected a timeline metadata entry");
}
}
}
}
///
/// Iterator for `object_versions`. Returns all page versions of a given block, in
/// reverse LSN order. This implements the traversal of ancestor timelines. If
@@ -1043,8 +1095,7 @@ impl<'a> ObjectVersionIter<'a> {
.obj_store
.get(&timeline_metadata_key(ancestor_timeline), Lsn(0))
.with_context(|| "timeline not found in repository")?;
let ancestor_metadata = MetadataEntry::des(&v)?;
let ancestor_metadata = ObjectValue::des_timeline_metadata(&v)?;
self.ancestor_timeline = ancestor_metadata.ancestor_timeline;
self.ancestor_lsn = ancestor_metadata.ancestor_lsn;
self.current_iter = ancestor_iter;