Handle non-relational data in PUSH command

This commit is contained in:
Konstantin Knizhnik
2021-06-16 00:58:31 +03:00
parent 87fce3fcd5
commit 0d9805a505
3 changed files with 98 additions and 131 deletions

View File

@@ -419,8 +419,7 @@ impl Timeline for ObjectTimeline {
lsn
);
self.obj_store
.put(&key, lsn, &ObjectValue::ser(&val)?)?;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
@@ -446,6 +445,15 @@ impl Timeline for ObjectTimeline {
Ok(())
}
fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()> {
let key = ObjectKey {
timeline: self.timelineid,
tag,
};
self.obj_store.put(&key, lsn, data)?;
Ok(())
}
///
/// Memorize a full image of a page version
///
@@ -468,17 +476,12 @@ impl Timeline for ObjectTimeline {
let new_nblocks = tag.blknum + 1;
let key = relation_size_key(self.timelineid, tag.rel);
let val = ObjectValue::RelationSize(new_nblocks);
trace!(
"Extended relation {} from {} to {} blocks at {}",
tag.rel,
old_nblocks,
new_nblocks,
lsn
tag.rel, old_nblocks, new_nblocks, lsn
);
self.obj_store
.put(&key, lsn, &ObjectValue::ser(&val)?)?;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
tag.rel,
@@ -502,8 +505,7 @@ impl Timeline for ObjectTimeline {
info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn);
self.obj_store
.put(&key, lsn, &ObjectValue::ser(&val)?)?;
self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?)?;
let mut rel_meta = self.rel_meta.write().unwrap();
rel_meta.insert(
rel,
@@ -601,11 +603,7 @@ impl Timeline for ObjectTimeline {
fn history<'a>(&'a self) -> Result<Box<dyn History + 'a>> {
let lsn = self.last_valid_lsn.load();
let iter = self.obj_store.objects(self.timelineid, lsn)?;
Ok(Box::new(ObjectHistory {
lsn,
iter,
last_relation_size: None,
}))
Ok(Box::new(ObjectHistory { lsn, iter }))
}
}
@@ -939,14 +937,15 @@ impl ObjectTimeline {
struct ObjectHistory<'a> {
iter: Box<dyn Iterator<Item = Result<(ObjectTag, Lsn, Vec<u8>)>> + 'a>,
lsn: Lsn,
last_relation_size: Option<(RelTag, u32)>,
}
impl<'a> Iterator for ObjectHistory<'a> {
type Item = Result<RelationUpdate>;
type Item = Result<Modification>;
fn next(&mut self) -> Option<Self::Item> {
self.next_result().transpose()
self.iter
.next()
.map(|result| result.map(|t| Modification::new(t)))
}
}
@@ -956,64 +955,12 @@ impl<'a> History for ObjectHistory<'a> {
}
}
impl<'a> ObjectHistory<'a> {
fn next_result(&mut self) -> Result<Option<RelationUpdate>> {
while let Some((tag, lsn, value)) = self.iter.next().transpose()? {
let entry = ObjectValue::des(&value)?;
let rel_tag: RelTag;
let update = match tag {
ObjectTag::RelationMetadata(rel) => {
rel_tag = rel;
match entry {
ObjectValue::RelationSize(size) => {
// we only want to output truncations, expansions are filtered out
let last_relation_size = self.last_relation_size.replace((rel, size));
match last_relation_size {
Some((last_rel, last_size))
if last_rel != rel || size < last_size =>
{
Update::Truncate { n_blocks: size }
}
_ => continue,
}
}
ObjectValue::Unlink => Update::Unlink,
_ => continue,
}
}
ObjectTag::RelationBuffer(buf_tag) => {
rel_tag = buf_tag.rel;
match entry {
ObjectValue::Page(img) => Update::Page {
blknum: buf_tag.blknum,
img,
},
ObjectValue::WALRecord(rec) => Update::WALRecord {
blknum: buf_tag.blknum,
rec,
},
_ => continue,
}
}
_ => continue,
};
return Ok(Some(RelationUpdate {
rel: rel_tag,
lsn,
update,
}));
}
Ok(None)
}
}
///
/// We store several kinds of objects in the repository.
/// We have per-page, per-relation(or non-rel file) and per-timeline entries.
///
#[derive(Debug, Clone, Serialize, Deserialize)]
enum ObjectValue {
pub enum ObjectValue {
/// Ready-made images of the block
Page(Bytes),
/// WAL records, to be applied on top of the "previous" entry

View File

@@ -27,7 +27,7 @@ use zenith_utils::{bin_ser::BeSer, lsn::Lsn};
use crate::basebackup;
use crate::branches;
use crate::page_cache;
use crate::repository::{BufferTag, ObjectTag, RelTag, RelationUpdate, Update};
use crate::repository::{BufferTag, Modification, ObjectTag, RelTag};
use crate::restore_local_repo;
use crate::walreceiver;
use crate::PageServerConf;
@@ -410,38 +410,14 @@ impl postgres_backend::Handler for PageServerHandler {
while let Some(msg) = pgb.read_message()? {
match msg {
FeMessage::CopyData(bytes) => {
let relation_update = RelationUpdate::des(&bytes)?;
let modification = Modification::des(&bytes)?;
last_lsn = relation_update.lsn;
match relation_update.update {
Update::Page { blknum, img } => {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: relation_update.rel,
blknum,
});
timeline.put_page_image(tag, relation_update.lsn, img)?;
}
Update::WALRecord { blknum, rec } => {
let tag = ObjectTag::RelationBuffer(BufferTag {
rel: relation_update.rel,
blknum,
});
timeline.put_wal_record(tag, rec)?;
}
Update::Truncate { n_blocks } => {
timeline.put_truncation(
relation_update.rel,
relation_update.lsn,
n_blocks,
)?;
}
Update::Unlink => {
todo!()
}
}
last_lsn = modification.lsn;
timeline.put_raw_data(
modification.tag,
last_lsn,
&modification.data[..],
)?;
}
FeMessage::CopyDone => {
timeline.advance_last_valid_lsn(last_lsn);

View File

@@ -61,6 +61,9 @@ pub trait Timeline: Send + Sync {
// These are called by the WAL receiver to digest WAL records.
//------------------------------------------------------------------------------
/// Put raw data
fn put_raw_data(&self, tag: ObjectTag, lsn: Lsn, data: &[u8]) -> Result<()>;
/// Put a new page version that can be constructed from a WAL record
///
/// This will implicitly extend the relation, if the page is beyond the
@@ -119,24 +122,26 @@ pub trait Timeline: Send + Sync {
}
}
pub trait History: Iterator<Item = Result<RelationUpdate>> {
pub trait History: Iterator<Item = Result<Modification>> {
/// The last_valid_lsn at the time of history() call.
fn lsn(&self) -> Lsn;
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RelationUpdate {
pub rel: RelTag,
pub struct Modification {
pub tag: ObjectTag,
pub lsn: Lsn,
pub update: Update,
pub data: Vec<u8>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Update {
Page { blknum: u32, img: Bytes },
WALRecord { blknum: u32, rec: WALRecord },
Truncate { n_blocks: u32 },
Unlink,
impl Modification {
pub fn new(entry: (ObjectTag, Lsn, Vec<u8>)) -> Modification {
Modification {
tag: entry.0,
lsn: entry.1,
data: entry.2,
}
}
}
#[derive(Clone)]
@@ -329,6 +334,7 @@ impl WALRecord {
mod tests {
use super::*;
use crate::object_repository::ObjectRepository;
use crate::object_repository::ObjectValue;
use crate::rocksdb_storage::RocksObjectStore;
use crate::walredo::{WalRedoError, WalRedoManager};
use crate::PageServerConf;
@@ -337,6 +343,7 @@ mod tests {
use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use zenith_utils::bin_ser::BeSer;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag {
@@ -533,45 +540,82 @@ mod tests {
let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap();
let tline = repo.create_empty_timeline(timelineid, Lsn(0))?;
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(0));
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationBuffer(_) => false,
_ => true,
},
_ => true,
});
assert_eq!(None, snapshot.next().transpose()?);
// add a page and advance the last valid LSN
let rel = TESTREL_A;
let buf = TEST_BUF(1);
tline.put_page_image(buf, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?;
let tag = TEST_BUF(1);
tline.put_page_image(tag, Lsn(1), TEST_IMG("blk 1 @ lsn 1"))?;
tline.advance_last_valid_lsn(Lsn(1));
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let expected_page = RelationUpdate {
rel,
lsn: Lsn(1),
update: Update::Page {
blknum: 1,
img: TEST_IMG("blk 1 @ lsn 1"),
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationBuffer(_) => false,
_ => true,
},
_ => true,
});
let expected_page = Modification {
tag,
lsn: Lsn(1),
data: ObjectValue::ser(&ObjectValue::Page(TEST_IMG("blk 1 @ lsn 1")))?,
};
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// truncate to zero, but don't advance the last valid LSN
tline.put_truncation(rel, Lsn(2), 0)?;
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(1));
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationBuffer(_) => false,
_ => true,
},
_ => true,
});
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);
// advance the last valid LSN and the truncation should be observable
tline.advance_last_valid_lsn(Lsn(2));
let mut snapshot = tline.history()?;
let snapshot = tline.history()?;
assert_eq!(snapshot.lsn(), Lsn(2));
let expected_truncate = RelationUpdate {
rel,
lsn: Lsn(2),
update: Update::Truncate { n_blocks: 0 },
let mut snapshot = snapshot.skip_while(|r| match r {
Ok(m) => match m.tag {
ObjectTag::RelationMetadata(_) => false,
_ => true,
},
_ => true,
});
let expected_truncate = Modification {
tag: ObjectTag::RelationMetadata(rel),
lsn: Lsn(1),
data: ObjectValue::ser(&ObjectValue::RelationSize(2))?,
};
assert_eq!(Some(expected_truncate), snapshot.next().transpose()?); // TODO ordering not guaranteed by API
assert_eq!(
Some(&expected_truncate),
snapshot.next().transpose()?.as_ref()
); // TODO ordering not guaranteed by API
let expected_truncate = Modification {
tag: ObjectTag::RelationMetadata(rel),
lsn: Lsn(2),
data: ObjectValue::ser(&ObjectValue::RelationSize(0))?,
};
assert_eq!(
Some(&expected_truncate),
snapshot.next().transpose()?.as_ref()
); // TODO ordering not guaranteed by API
assert_eq!(Some(&expected_page), snapshot.next().transpose()?.as_ref());
assert_eq!(None, snapshot.next().transpose()?);