Add inmemory storage for verification of correctness of layered storage

This commit is contained in:
Konstantin Knizhnik
2021-09-06 15:50:15 +03:00
parent 1110a35b25
commit 060f6f73ee
3 changed files with 175 additions and 10 deletions

View File

@@ -18,13 +18,15 @@ use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{BTreeSet, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fs;
use std::fs::File;
use std::io::prelude::*;
use std::io::Write;
use std::ops::Bound::Included;
use std::path::Path;
use std::str::FromStr;
use std::sync::RwLock;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
@@ -435,6 +437,19 @@ impl LayeredRepository {
}
}
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)]
struct ObjectKey {
rel: RelishTag,
blknum: u32,
lsn: Lsn,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
enum ObjectValue {
Image(Bytes),
Delta(WALRecord),
}
/// Metadata stored on disk for each timeline
///
/// The fields correspond to the values we hold in memory, in LayeredTimeline.
@@ -497,6 +512,128 @@ pub struct LayeredTimeline {
// of the branch point.
ancestor_timeline: Option<Arc<LayeredTimeline>>,
ancestor_lsn: Lsn,
db: RwLock<BTreeMap<ObjectKey, ObjectValue>>,
}
impl LayeredTimeline {
fn inmem_save(&self) -> Result<()> {
let path = self
.conf
.timeline_path(&self.timelineid, &self.tenantid)
.join("memstore.dmp");
let mut f = File::create(path)?;
let db = self.db.read().unwrap();
f.write(&db.ser()?)?;
Ok(())
}
fn inmem_load(&self) -> Result<()> {
let path = self
.conf
.timeline_path(&self.timelineid, &self.tenantid)
.join("memstore.dmp");
if let Ok(mut f) = File::open(path) {
let mut buffer = Vec::new();
// read the whole file
f.read_to_end(&mut buffer)?;
let mut db = self.db.write().unwrap();
*db = BTreeMap::des(&buffer)?;
} else {
info!("Not found memstore for timeline {}", &self.timelineid);
}
Ok(())
}
fn inmem_put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) {
let mut db = self.db.write().unwrap();
db.insert(
ObjectKey {
rel,
blknum,
lsn: rec.lsn,
},
ObjectValue::Delta(rec),
);
}
fn inmem_put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) {
let mut db = self.db.write().unwrap();
db.insert(ObjectKey { rel, blknum, lsn }, ObjectValue::Image(img));
}
fn inmem_get_page_at(&self, rel: RelishTag, blknum: u32, lsn: Lsn) -> Result<Bytes> {
let from = ObjectKey {
rel: rel.clone(),
blknum,
lsn: Lsn(0),
};
let mut till = ObjectKey {
rel: rel.clone(),
blknum,
lsn,
};
let mut timeline = self;
while lsn < timeline.ancestor_lsn {
trace!("going into ancestor {} ", timeline.ancestor_lsn);
till.lsn = timeline.ancestor_lsn;
timeline = &timeline.ancestor_timeline.as_ref().unwrap();
}
let mut records: Vec<WALRecord> = Vec::new();
loop {
// loop through timeline
let db = timeline.db.read().unwrap();
let mut versions = db.range(&from..=&till);
loop {
// loop through versions
match versions.next_back() {
Some((key, value)) => match value {
ObjectValue::Image(page) => {
if records.is_empty() {
return Ok(page.clone());
} else {
records.reverse();
return Ok(self.walredo_mgr.request_redo(
rel,
blknum,
lsn,
Some(page.clone()),
records,
)?);
}
}
ObjectValue::Delta(record) => {
records.push(record.clone());
if record.will_init {
records.reverse();
return Ok(self
.walredo_mgr
.request_redo(rel, blknum, lsn, None, records)?);
}
}
},
None => {
if let Some(ancestor) = timeline.ancestor_timeline.as_ref() {
trace!("continue search in ancestor {} ", ancestor.timelineid);
till.lsn = timeline.ancestor_lsn;
timeline = ancestor;
break;
} else {
bail!(
"Block {} of relation {:?} not found at {} in tli={}, ancestor_lsn={}",
blknum,
rel,
lsn,
timeline.timelineid,
timeline.ancestor_lsn,
);
}
}
}
}
}
}
}
/// Public interface functions
@@ -519,9 +656,28 @@ impl Timeline for LayeredTimeline {
let seg = SegmentTag::from_blknum(rel, blknum);
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
RECONSTRUCT_TIME
.observe_closure_duration(|| self.materialize_page(seg, blknum, lsn, &*layer))
if let Some((layer, layer_lsn)) = self.get_layer_for_read(seg, lsn)? {
trace!(
"Find layer {} [{}..{}] for page {} of relation {:?} at LSN {} ({} requested)",
layer.get_timeline_id(),
layer.get_start_lsn(),
layer.get_end_lsn(),
blknum,
rel,
layer_lsn,
lsn
);
let page1 = RECONSTRUCT_TIME.observe_closure_duration(|| {
self.materialize_page(seg, blknum, layer_lsn, &*layer)
});
let page2 = self.inmem_get_page_at(rel, blknum, lsn);
assert!(
(page1.is_err() && page2.is_err())
|| (page1.is_ok()
&& page2.is_ok()
&& page1.as_ref().unwrap() == page2.as_ref().unwrap())
);
page1
} else {
bail!("relish {} not found at {}", rel, lsn);
}
@@ -641,6 +797,8 @@ impl Timeline for LayeredTimeline {
);
}
self.inmem_put_wal_record(rel, blknum, rec.clone());
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, rec.lsn)?;
@@ -742,6 +900,7 @@ impl Timeline for LayeredTimeline {
rel
);
}
self.inmem_put_page_image(rel, blknum, lsn, img.clone());
let seg = SegmentTag::from_blknum(rel, blknum);
@@ -813,6 +972,7 @@ impl LayeredTimeline {
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn,
db: RwLock::new(BTreeMap::new()),
};
Ok(timeline)
}
@@ -825,6 +985,7 @@ impl LayeredTimeline {
"loading layer map for timeline {} into memory",
self.timelineid
);
self.inmem_load()?;
let mut layers = self.layers.lock().unwrap();
let (imgfilenames, mut deltafilenames) =
filename::list_files(self.conf, self.timelineid, self.tenantid)?;
@@ -833,7 +994,7 @@ impl LayeredTimeline {
for filename in imgfilenames.iter() {
let layer = ImageLayer::new(self.conf, self.timelineid, self.tenantid, filename);
info!(
trace!(
"found layer {} {} on timeline {}",
layer.get_seg_tag(),
layer.get_start_lsn(),
@@ -864,7 +1025,7 @@ impl LayeredTimeline {
predecessor,
);
info!(
trace!(
"found layer {} on timeline {}, predecessor: {}",
layer.filename().display(),
self.timelineid,
@@ -1169,6 +1330,8 @@ impl LayeredTimeline {
// Also update the in-memory copy
self.disk_consistent_lsn.store(disk_consistent_lsn);
self.inmem_save()?;
Ok(())
}

View File

@@ -218,7 +218,7 @@ impl Layer for DeltaLayer {
if let Some(cont_lsn) = cont_lsn {
if let Some(cont_layer) = &self.predecessor {
Ok(PageReconstructResult::Continue(
cont_lsn,
self.start_lsn, //cont_lsn,
Arc::clone(cont_layer),
))
} else {

View File

@@ -177,7 +177,7 @@ impl Layer for InMemoryLayer {
if let Some(cont_lsn) = cont_lsn {
if let Some(cont_layer) = &self.predecessor {
Ok(PageReconstructResult::Continue(
cont_lsn,
self.start_lsn, //cont_lsn,
Arc::clone(cont_layer),
))
} else {
@@ -455,9 +455,11 @@ impl InMemoryLayer {
// This is needed just to call materialize_page()
timeline: &LayeredTimeline,
) -> Result<(Vec<Arc<dyn Layer>>, Option<Arc<InMemoryLayer>>)> {
info!(
trace!(
"freezing in memory layer for {} on timeline {} at {}",
self.seg, self.timelineid, cutoff_lsn
self.seg,
self.timelineid,
cutoff_lsn
);
let inner = self.inner.lock().unwrap();