mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-13 08:22:55 +00:00
wip: adapt layered_repository to snapfile
This commit is contained in:
@@ -41,6 +41,7 @@ use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
mod inmemory_layer;
|
||||
mod layer_map;
|
||||
mod page_history;
|
||||
mod snapshot_layer;
|
||||
mod storage_layer;
|
||||
|
||||
|
||||
@@ -2,15 +2,15 @@
|
||||
//! An in-memory layer stores recently received page versions in memory. The page versions
|
||||
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
|
||||
//!
|
||||
use crate::layered_repository::page_history::PageHistory;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::SnapshotLayer;
|
||||
use crate::layered_repository::{LayeredTimeline, SnapshotLayer};
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::collections::BTreeMap;
|
||||
@@ -43,9 +43,9 @@ pub struct InMemoryLayerInner {
|
||||
|
||||
///
|
||||
/// All versions of all pages in the layer are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
/// Indexed by block number.
|
||||
///
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
pages: BTreeMap<u32, PageHistory>,
|
||||
|
||||
///
|
||||
/// `segsizes` tracks the size of the segment at different points in time.
|
||||
@@ -90,29 +90,32 @@ impl Layer for InMemoryLayer {
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Scan the BTreeMap backwards, starting from reconstruct_data.lsn.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
|
||||
assert!(self.seg.blknum_in_seg(blknum));
|
||||
|
||||
{
|
||||
let inner = self.inner.lock().unwrap();
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
.page_versions
|
||||
.range((Included(&minkey), Included(&maxkey)));
|
||||
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
|
||||
let pages = &inner.pages;
|
||||
|
||||
// FIXME: this assumes the latest page version is always the right answer.
|
||||
// How should this work if the requested lsn is in the past? in the future?
|
||||
|
||||
let latest_version = pages
|
||||
.get(&blknum)
|
||||
.and_then(PageHistory::latest)
|
||||
.ok_or_else(|| anyhow!("page not found"))?;
|
||||
|
||||
let (entry_lsn, entry) = latest_version;
|
||||
if true {
|
||||
if let Some(img) = &entry.page_image {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
need_base_image_lsn = None;
|
||||
break;
|
||||
} else if let Some(rec) = &entry.record {
|
||||
reconstruct_data.records.push(rec.clone());
|
||||
if rec.will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_base_image_lsn = None;
|
||||
break;
|
||||
} else {
|
||||
need_base_image_lsn = Some(*entry_lsn);
|
||||
need_base_image_lsn = Some(entry_lsn);
|
||||
}
|
||||
} else {
|
||||
// No base image, and no WAL record. Huh?
|
||||
@@ -120,7 +123,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
// release lock on 'page_versions'
|
||||
// release lock on self.pages
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
@@ -184,7 +187,7 @@ impl InMemoryLayer {
|
||||
start_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
pages: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
}),
|
||||
})
|
||||
@@ -230,15 +233,11 @@ impl InMemoryLayer {
|
||||
);
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
let old = inner.page_versions.insert((blknum, lsn), pv);
|
||||
|
||||
if old.is_some() {
|
||||
// We already had an entry for this LSN. That's odd..
|
||||
warn!(
|
||||
"Page version of rel {} blk {} at {} already exists",
|
||||
self.seg.rel, blknum, lsn
|
||||
);
|
||||
}
|
||||
let page_history = inner
|
||||
.pages
|
||||
.entry(blknum)
|
||||
.or_insert_with(PageHistory::default);
|
||||
page_history.push(lsn, pv);
|
||||
|
||||
// Also update the relation size, if this extended the relation.
|
||||
if self.seg.rel.is_blocky() {
|
||||
@@ -311,7 +310,7 @@ impl InMemoryLayer {
|
||||
timelineid,
|
||||
lsn
|
||||
);
|
||||
let mut page_versions = BTreeMap::new();
|
||||
let mut pages = BTreeMap::new();
|
||||
let mut segsizes = BTreeMap::new();
|
||||
|
||||
let seg = src.get_seg_tag();
|
||||
@@ -333,7 +332,8 @@ impl InMemoryLayer {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
};
|
||||
page_versions.insert((blknum, lsn), pv);
|
||||
let page_history = PageHistory::from_image(lsn, pv);
|
||||
pages.insert(blknum, page_history);
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
@@ -344,8 +344,8 @@ impl InMemoryLayer {
|
||||
start_lsn: lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: page_versions,
|
||||
segsizes: segsizes,
|
||||
pages,
|
||||
segsizes,
|
||||
}),
|
||||
})
|
||||
}
|
||||
@@ -388,10 +388,11 @@ impl InMemoryLayer {
|
||||
};
|
||||
|
||||
// Divide all the page versions into old and new at the 'end_lsn' cutoff point.
|
||||
let mut before_page_versions;
|
||||
let mut before_pages = BTreeMap::new();
|
||||
let mut before_segsizes;
|
||||
let mut after_page_versions;
|
||||
let mut after_segsizes;
|
||||
let mut after_pages = BTreeMap::new();
|
||||
|
||||
if !dropped {
|
||||
before_segsizes = BTreeMap::new();
|
||||
after_segsizes = BTreeMap::new();
|
||||
@@ -403,20 +404,16 @@ impl InMemoryLayer {
|
||||
}
|
||||
}
|
||||
|
||||
before_page_versions = BTreeMap::new();
|
||||
after_page_versions = BTreeMap::new();
|
||||
for ((blknum, lsn), pv) in inner.page_versions.iter() {
|
||||
if *lsn > end_lsn {
|
||||
after_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
} else {
|
||||
before_page_versions.insert((*blknum, *lsn), pv.clone());
|
||||
}
|
||||
for (blknum, page_history) in inner.pages.iter() {
|
||||
let (old, new) = page_history.clone().split_at(end_lsn);
|
||||
before_pages.insert(*blknum, old);
|
||||
after_pages.insert(*blknum, new);
|
||||
}
|
||||
} else {
|
||||
before_page_versions = inner.page_versions.clone();
|
||||
before_pages = inner.pages.clone();
|
||||
before_segsizes = inner.segsizes.clone();
|
||||
after_segsizes = BTreeMap::new();
|
||||
after_page_versions = BTreeMap::new();
|
||||
after_pages = BTreeMap::new();
|
||||
}
|
||||
|
||||
// we can release the lock now.
|
||||
@@ -431,13 +428,13 @@ impl InMemoryLayer {
|
||||
self.start_lsn,
|
||||
end_lsn,
|
||||
dropped,
|
||||
before_page_versions,
|
||||
before_pages,
|
||||
before_segsizes,
|
||||
)?;
|
||||
|
||||
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
||||
// them
|
||||
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
let new_open = if !after_segsizes.is_empty() || !after_pages.is_empty() {
|
||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||
|
||||
let new_open = Self::copy_snapshot(
|
||||
@@ -449,7 +446,7 @@ impl InMemoryLayer {
|
||||
end_lsn,
|
||||
)?;
|
||||
let mut new_inner = new_open.inner.lock().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.pages.append(&mut after_pages);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
drop(new_inner);
|
||||
|
||||
@@ -476,14 +473,16 @@ impl InMemoryLayer {
|
||||
for (k, v) in inner.segsizes.iter() {
|
||||
result += &format!("{}: {}\n", k, v);
|
||||
}
|
||||
for (k, v) in inner.page_versions.iter() {
|
||||
result += &format!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
k.0,
|
||||
k.1,
|
||||
v.page_image.is_some(),
|
||||
v.record.is_some()
|
||||
);
|
||||
for (page_num, page_history) in inner.pages.iter() {
|
||||
for (lsn, image) in page_history.iter() {
|
||||
result += &format!(
|
||||
"blk {} at {}: {}/{}\n",
|
||||
page_num,
|
||||
lsn,
|
||||
image.page_image.is_some(),
|
||||
image.record.is_some()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
|
||||
94
pageserver/src/layered_repository/page_history.rs
Normal file
94
pageserver/src/layered_repository/page_history.rs
Normal file
@@ -0,0 +1,94 @@
|
||||
use super::storage_layer::PageVersion;
|
||||
use std::collections::VecDeque;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
/// A data structure that holds one or more versions of a particular page number.
|
||||
//
|
||||
#[derive(Default, Clone)]
|
||||
pub struct PageHistory {
|
||||
/// Pages stored in order, from oldest to newest.
|
||||
pages: VecDeque<(Lsn, PageVersion)>,
|
||||
}
|
||||
|
||||
impl PageHistory {
|
||||
/// Create a new PageHistory containing a single image.
|
||||
pub fn from_image(lsn: Lsn, image: PageVersion) -> Self {
|
||||
let mut pages = VecDeque::new();
|
||||
pages.push_back((lsn, image));
|
||||
PageHistory { pages }
|
||||
}
|
||||
|
||||
/// Push a newer page image.
|
||||
pub fn push(&mut self, lsn: Lsn, page: PageVersion) {
|
||||
if let Some((back_lsn, _)) = self.pages.back() {
|
||||
debug_assert_ne!(
|
||||
back_lsn, &lsn,
|
||||
"push page at lsn {:?} but one already exists",
|
||||
lsn
|
||||
);
|
||||
debug_assert!(back_lsn < &lsn, "pushed page is older than latest lsn");
|
||||
}
|
||||
self.pages.push_back((lsn, page));
|
||||
}
|
||||
|
||||
pub fn latest(&self) -> Option<(Lsn, &PageVersion)> {
|
||||
self.pages.back().map(|(lsn, page)| (*lsn, page))
|
||||
}
|
||||
|
||||
/// Split a page history at a particular LSN.
|
||||
///
|
||||
/// This consumes this PageHistory and returns two new ones.
|
||||
/// Any changes exactly matching the split LSN will be in the
|
||||
/// "old" history.
|
||||
//
|
||||
// FIXME: Is this necessary? There is some debate whether "splitting"
|
||||
// layers is the best design.
|
||||
//
|
||||
pub fn split_at(self, split_lsn: Lsn) -> (PageHistory, PageHistory) {
|
||||
let mut old = PageHistory::default();
|
||||
let mut new = PageHistory::default();
|
||||
for (lsn, page) in self.pages {
|
||||
if lsn > split_lsn {
|
||||
new.push(lsn, page)
|
||||
} else {
|
||||
old.push(lsn, page);
|
||||
}
|
||||
}
|
||||
(old, new)
|
||||
}
|
||||
|
||||
pub fn iter(&self) -> impl Iterator<Item = &(Lsn, PageVersion)> {
|
||||
self.pages.iter()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn page_history() {
|
||||
fn make_page(b: u8) -> PageVersion {
|
||||
let image = vec![b; 8192].into();
|
||||
PageVersion {
|
||||
page_image: Some(image),
|
||||
record: None,
|
||||
}
|
||||
}
|
||||
|
||||
let mut ph = PageHistory::default();
|
||||
ph.push(10.into(), make_page(1));
|
||||
ph.push(20.into(), make_page(2));
|
||||
ph.push(30.into(), make_page(3));
|
||||
|
||||
let (latest_lsn, latest_image) = ph.latest().unwrap();
|
||||
assert_eq!(latest_lsn, 30.into());
|
||||
assert!(matches!(latest_image, PageVersion { page_image: Some(im), .. } if im[0] == 3));
|
||||
|
||||
let mut it = ph.iter();
|
||||
assert_eq!(it.next().unwrap().0, 10.into());
|
||||
assert_eq!(it.next().unwrap().0, 20.into());
|
||||
assert_eq!(it.next().unwrap().0, 30.into());
|
||||
assert!(it.next().is_none());
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,7 @@
|
||||
//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two
|
||||
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
||||
//!
|
||||
use crate::layered_repository::page_history::PageHistory;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||
};
|
||||
@@ -236,7 +237,7 @@ pub struct SnapshotLayerInner {
|
||||
|
||||
/// All versions of all pages in the file are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
pages: BTreeMap<u32, PageHistory>,
|
||||
|
||||
/// `relsizes` tracks the size of the relation at different points in time.
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
@@ -270,6 +271,7 @@ impl Layer for SnapshotLayer {
|
||||
lsn: Lsn,
|
||||
reconstruct_data: &mut PageReconstructData,
|
||||
) -> Result<Option<Lsn>> {
|
||||
/*
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
{
|
||||
@@ -303,6 +305,9 @@ impl Layer for SnapshotLayer {
|
||||
}
|
||||
|
||||
Ok(need_base_image_lsn)
|
||||
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Get size of the relation at given LSN
|
||||
@@ -380,7 +385,7 @@ impl SnapshotLayer {
|
||||
start_lsn: Lsn,
|
||||
end_lsn: Lsn,
|
||||
dropped: bool,
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
pages: BTreeMap<u32, PageHistory>,
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
) -> Result<SnapshotLayer> {
|
||||
let snapfile = SnapshotLayer {
|
||||
@@ -393,10 +398,12 @@ impl SnapshotLayer {
|
||||
dropped,
|
||||
inner: Mutex::new(SnapshotLayerInner {
|
||||
loaded: true,
|
||||
page_versions: page_versions,
|
||||
relsizes: relsizes,
|
||||
pages,
|
||||
relsizes,
|
||||
}),
|
||||
};
|
||||
|
||||
/*
|
||||
let inner = snapfile.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
@@ -426,12 +433,16 @@ impl SnapshotLayer {
|
||||
drop(inner);
|
||||
|
||||
Ok(snapfile)
|
||||
*/
|
||||
|
||||
todo!()
|
||||
}
|
||||
|
||||
///
|
||||
/// Load the contents of the file into memory
|
||||
///
|
||||
fn load(&self) -> Result<MutexGuard<SnapshotLayerInner>> {
|
||||
/*
|
||||
// quick exit if already loaded
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
@@ -469,6 +480,9 @@ impl SnapshotLayer {
|
||||
};
|
||||
|
||||
Ok(inner)
|
||||
*/
|
||||
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create SnapshotLayers representing all files on disk
|
||||
@@ -479,6 +493,7 @@ impl SnapshotLayer {
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<Vec<Arc<SnapshotLayer>>> {
|
||||
/*
|
||||
let path = conf.timeline_path(&timelineid, &tenantid);
|
||||
|
||||
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
@@ -506,6 +521,8 @@ impl SnapshotLayer {
|
||||
}
|
||||
}
|
||||
return Ok(snapfiles);
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
pub fn delete(&self) -> Result<()> {
|
||||
@@ -519,11 +536,14 @@ impl SnapshotLayer {
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
pub fn unload(&self) -> Result<()> {
|
||||
/*
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
|
||||
Reference in New Issue
Block a user