diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 992e7cf22f..0bfc199cd4 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -18,13 +18,15 @@ use log::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::collections::{BTreeSet, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::fs; use std::fs::File; +use std::io::prelude::*; use std::io::Write; use std::ops::Bound::Included; use std::path::Path; use std::str::FromStr; +use std::sync::RwLock; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -435,6 +437,19 @@ impl LayeredRepository { } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize)] +struct ObjectKey { + rel: RelishTag, + blknum: u32, + lsn: Lsn, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +enum ObjectValue { + Image(Bytes), + Delta(WALRecord), +} + /// Metadata stored on disk for each timeline /// /// The fields correspond to the values we hold in memory, in LayeredTimeline. @@ -497,6 +512,128 @@ pub struct LayeredTimeline { // of the branch point. ancestor_timeline: Option>, ancestor_lsn: Lsn, + + db: RwLock>, +} + +impl LayeredTimeline { + fn inmem_save(&self) -> Result<()> { + let path = self + .conf + .timeline_path(&self.timelineid, &self.tenantid) + .join("memstore.dmp"); + let mut f = File::create(path)?; + let db = self.db.read().unwrap(); + f.write(&db.ser()?)?; + Ok(()) + } + + fn inmem_load(&self) -> Result<()> { + let path = self + .conf + .timeline_path(&self.timelineid, &self.tenantid) + .join("memstore.dmp"); + if let Ok(mut f) = File::open(path) { + let mut buffer = Vec::new(); + // read the whole file + f.read_to_end(&mut buffer)?; + let mut db = self.db.write().unwrap(); + *db = BTreeMap::des(&buffer)?; + } else { + info!("Not found memstore for timeline {}", &self.timelineid); + } + Ok(()) + } + + fn inmem_put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) { + let mut db = self.db.write().unwrap(); + db.insert( + ObjectKey { + rel, + blknum, + lsn: rec.lsn, + }, + ObjectValue::Delta(rec), + ); + } + + fn inmem_put_page_image(&self, rel: RelishTag, blknum: u32, lsn: Lsn, img: Bytes) { + let mut db = self.db.write().unwrap(); + db.insert(ObjectKey { rel, blknum, lsn }, ObjectValue::Image(img)); + } + + fn inmem_get_page_at(&self, rel: RelishTag, blknum: u32, lsn: Lsn) -> Result { + let from = ObjectKey { + rel: rel.clone(), + blknum, + lsn: Lsn(0), + }; + let mut till = ObjectKey { + rel: rel.clone(), + blknum, + lsn, + }; + let mut timeline = self; + while lsn < timeline.ancestor_lsn { + trace!("going into ancestor {} ", timeline.ancestor_lsn); + till.lsn = timeline.ancestor_lsn; + timeline = &timeline.ancestor_timeline.as_ref().unwrap(); + } + + let mut records: Vec = Vec::new(); + loop { + // loop through timeline + let db = timeline.db.read().unwrap(); + let mut versions = db.range(&from..=&till); + loop { + // loop through versions + match versions.next_back() { + Some((key, value)) => match value { + ObjectValue::Image(page) => { + if records.is_empty() { + return Ok(page.clone()); + } else { + records.reverse(); + return Ok(self.walredo_mgr.request_redo( + rel, + blknum, + lsn, + Some(page.clone()), + records, + )?); + } + } + ObjectValue::Delta(record) => { + records.push(record.clone()); + if record.will_init { + records.reverse(); + return Ok(self + .walredo_mgr + .request_redo(rel, blknum, lsn, None, records)?); + } + } + }, + None => { + if let Some(ancestor) = timeline.ancestor_timeline.as_ref() { + trace!("continue search in ancestor {} ", ancestor.timelineid); + till.lsn = timeline.ancestor_lsn; + timeline = ancestor; + break; + } else { + bail!( + "Block {} of relation {:?} not found at {} in tli={}, ancestor_lsn={}", + blknum, + rel, + lsn, + timeline.timelineid, + timeline.ancestor_lsn, + ); + } + } + } + } + } + } } /// Public interface functions @@ -519,9 +656,28 @@ impl Timeline for LayeredTimeline { let seg = SegmentTag::from_blknum(rel, blknum); - if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? { - RECONSTRUCT_TIME - .observe_closure_duration(|| self.materialize_page(seg, blknum, lsn, &*layer)) + if let Some((layer, layer_lsn)) = self.get_layer_for_read(seg, lsn)? { + trace!( + "Find layer {} [{}..{}] for page {} of relation {:?} at LSN {} ({} requested)", + layer.get_timeline_id(), + layer.get_start_lsn(), + layer.get_end_lsn(), + blknum, + rel, + layer_lsn, + lsn + ); + let page1 = RECONSTRUCT_TIME.observe_closure_duration(|| { + self.materialize_page(seg, blknum, layer_lsn, &*layer) + }); + let page2 = self.inmem_get_page_at(rel, blknum, lsn); + assert!( + (page1.is_err() && page2.is_err()) + || (page1.is_ok() + && page2.is_ok() + && page1.as_ref().unwrap() == page2.as_ref().unwrap()) + ); + page1 } else { bail!("relish {} not found at {}", rel, lsn); } @@ -641,6 +797,8 @@ impl Timeline for LayeredTimeline { ); } + self.inmem_put_wal_record(rel, blknum, rec.clone()); + let seg = SegmentTag::from_blknum(rel, blknum); let layer = self.get_layer_for_write(seg, rec.lsn)?; @@ -742,6 +900,7 @@ impl Timeline for LayeredTimeline { rel ); } + self.inmem_put_page_image(rel, blknum, lsn, img.clone()); let seg = SegmentTag::from_blknum(rel, blknum); @@ -813,6 +972,7 @@ impl LayeredTimeline { ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn, + db: RwLock::new(BTreeMap::new()), }; Ok(timeline) } @@ -825,6 +985,7 @@ impl LayeredTimeline { "loading layer map for timeline {} into memory", self.timelineid ); + self.inmem_load()?; let mut layers = self.layers.lock().unwrap(); let (imgfilenames, mut deltafilenames) = filename::list_files(self.conf, self.timelineid, self.tenantid)?; @@ -833,7 +994,7 @@ impl LayeredTimeline { for filename in imgfilenames.iter() { let layer = ImageLayer::new(self.conf, self.timelineid, self.tenantid, filename); - info!( + trace!( "found layer {} {} on timeline {}", layer.get_seg_tag(), layer.get_start_lsn(), @@ -864,7 +1025,7 @@ impl LayeredTimeline { predecessor, ); - info!( + trace!( "found layer {} on timeline {}, predecessor: {}", layer.filename().display(), self.timelineid, @@ -1169,6 +1330,8 @@ impl LayeredTimeline { // Also update the in-memory copy self.disk_consistent_lsn.store(disk_consistent_lsn); + self.inmem_save()?; + Ok(()) } diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index f1ed95ab0f..6971cbcd4a 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -218,7 +218,7 @@ impl Layer for DeltaLayer { if let Some(cont_lsn) = cont_lsn { if let Some(cont_layer) = &self.predecessor { Ok(PageReconstructResult::Continue( - cont_lsn, + self.start_lsn, //cont_lsn, Arc::clone(cont_layer), )) } else { diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 7a7ff3c762..b893eaecc6 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -177,7 +177,7 @@ impl Layer for InMemoryLayer { if let Some(cont_lsn) = cont_lsn { if let Some(cont_layer) = &self.predecessor { Ok(PageReconstructResult::Continue( - cont_lsn, + self.start_lsn, //cont_lsn, Arc::clone(cont_layer), )) } else { @@ -455,9 +455,11 @@ impl InMemoryLayer { // This is needed just to call materialize_page() timeline: &LayeredTimeline, ) -> Result<(Vec>, Option>)> { - info!( + trace!( "freezing in memory layer for {} on timeline {} at {}", - self.seg, self.timelineid, cutoff_lsn + self.seg, + self.timelineid, + cutoff_lsn ); let inner = self.inner.lock().unwrap();