mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-05 20:42:54 +00:00
delta_layer - read page versions from disk
split the page versions into two chapters: PAGE_VERSION_METAS - a rust BTreeMap from (block #, lsn) -> page & WAL byte ranges in PAGE_VERSIONS_CHAPTER PAGE_VERSIONS_CHAPTER - raw page images and serialized WAL records
This commit is contained in:
committed by
Patrick Insinger
parent
78963ad104
commit
98f49671c1
@@ -39,6 +39,7 @@ use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::{AtomicLsn, Lsn};
|
||||
use zenith_utils::seqwait::SeqWait;
|
||||
|
||||
mod blob;
|
||||
mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
|
||||
45
pageserver/src/layered_repository/blob.rs
Normal file
45
pageserver/src/layered_repository/blob.rs
Normal file
@@ -0,0 +1,45 @@
|
||||
use std::{fs::File, io::Write};
|
||||
|
||||
use anyhow::Result;
|
||||
use bookfile::{BookWriter, BoundedReader, ChapterId, ChapterWriter};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct BlobRange {
|
||||
offset: u64,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result<Vec<u8>> {
|
||||
let mut buf = vec![0u8; range.size];
|
||||
reader.read_exact_at(&mut buf, range.offset)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
pub struct BlobWriter {
|
||||
writer: ChapterWriter<File>,
|
||||
offset: u64,
|
||||
}
|
||||
|
||||
impl BlobWriter {
|
||||
// This function takes a BookWriter and creates a new chapter to ensure offset is 0.
|
||||
pub fn new(book_writer: BookWriter<File>, chapter_id: impl Into<ChapterId>) -> Self {
|
||||
let writer = book_writer.new_chapter(chapter_id);
|
||||
Self { writer, offset: 0 }
|
||||
}
|
||||
|
||||
pub fn write_blob(&mut self, blob: &[u8]) -> Result<BlobRange> {
|
||||
self.writer.write_all(blob)?;
|
||||
|
||||
let range = BlobRange {
|
||||
offset: self.offset,
|
||||
size: blob.len(),
|
||||
};
|
||||
self.offset += blob.len() as u64;
|
||||
Ok(range)
|
||||
}
|
||||
|
||||
pub fn close(self) -> bookfile::Result<BookWriter<File>> {
|
||||
self.writer.close()
|
||||
}
|
||||
}
|
||||
@@ -11,8 +11,11 @@
|
||||
//! can happen when you create a new branch in the middle of a delta layer, and the WAL
|
||||
//! records on the new branch are put in a new delta layer.
|
||||
//!
|
||||
//! When a delta file needs to be accessed, we slurp the whole file into memory, into
|
||||
//! the DeltaLayer struct. See load() and unload() functions.
|
||||
//! When a delta file needs to be accessed, we slurp the metadata and relsize chapters
|
||||
//! into memory, into the DeltaLayerInner struct. See load() and unload() functions.
|
||||
//! To access a page/WAL record, we search `page_version_metas` for the block # and LSN.
|
||||
//! The byte ranges in the metadata can be used to find the page/WAL record in
|
||||
//! PAGE_VERSIONS_CHAPTER.
|
||||
//!
|
||||
//! On disk, the delta files are stored in timelines/<timelineid> directory.
|
||||
//! Currently, there are no subdirectories, and each delta file is named like this:
|
||||
@@ -34,14 +37,18 @@
|
||||
//! A detlta file is constructed using the 'bookfile' crate. Each file consists of two
|
||||
//! parts: the page versions and the relation sizes. They are stored as separate chapters.
|
||||
//!
|
||||
use crate::layered_repository::blob::BlobWriter;
|
||||
use crate::layered_repository::filename::DeltaFileName;
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||
};
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
@@ -55,11 +62,24 @@ use bookfile::{Book, BookWriter};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
use super::blob::{read_blob, BlobRange};
|
||||
|
||||
// Magic constant to identify a Zenith delta file
|
||||
static DELTA_FILE_MAGIC: u32 = 0x5A616E01;
|
||||
|
||||
static PAGE_VERSIONS_CHAPTER: u64 = 1;
|
||||
static REL_SIZES_CHAPTER: u64 = 2;
|
||||
/// Mapping from (block #, lsn) -> page/WAL record
|
||||
/// byte ranges in PAGE_VERSIONS_CHAPTER
|
||||
static PAGE_VERSION_METAS_CHAPTER: u64 = 1;
|
||||
/// Page/WAL bytes - cannot be interpreted
|
||||
/// without PAGE_VERSION_METAS_CHAPTER
|
||||
static PAGE_VERSIONS_CHAPTER: u64 = 2;
|
||||
static REL_SIZES_CHAPTER: u64 = 3;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct PageVersionMeta {
|
||||
page_image_range: Option<BlobRange>,
|
||||
record_range: Option<BlobRange>,
|
||||
}
|
||||
|
||||
///
|
||||
/// DeltaLayer is the in-memory data structure associated with an
|
||||
@@ -91,13 +111,13 @@ pub struct DeltaLayer {
|
||||
}
|
||||
|
||||
pub struct DeltaLayerInner {
|
||||
/// If false, the 'page_versions' and 'relsizes' have not been
|
||||
/// If false, the 'page_version_metas' and 'relsizes' have not been
|
||||
/// loaded into memory yet.
|
||||
loaded: bool,
|
||||
|
||||
/// All versions of all pages in the file are are kept here.
|
||||
/// Indexed by block number and LSN.
|
||||
page_versions: BTreeMap<(u32, Lsn), PageVersion>,
|
||||
page_version_metas: BTreeMap<(u32, Lsn), PageVersionMeta>,
|
||||
|
||||
/// `relsizes` tracks the size of the relation at different points in time.
|
||||
relsizes: BTreeMap<Lsn, u32>,
|
||||
@@ -145,21 +165,28 @@ impl Layer for DeltaLayer {
|
||||
) -> Result<Option<Lsn>> {
|
||||
// Scan the BTreeMap backwards, starting from the given entry.
|
||||
let mut need_base_image_lsn: Option<Lsn> = Some(lsn);
|
||||
|
||||
// TODO: avoid opening the snapshot file for each read
|
||||
let (_path, book) = self.open_book()?;
|
||||
let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?;
|
||||
{
|
||||
let inner = self.load()?;
|
||||
let minkey = (blknum, Lsn(0));
|
||||
let maxkey = (blknum, lsn);
|
||||
let mut iter = inner
|
||||
.page_versions
|
||||
.page_version_metas
|
||||
.range((Included(&minkey), Included(&maxkey)));
|
||||
while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() {
|
||||
if let Some(img) = &entry.page_image {
|
||||
reconstruct_data.page_img = Some(img.clone());
|
||||
if let Some(img_range) = &entry.page_image_range {
|
||||
let img = Bytes::from(read_blob(&page_version_reader, img_range)?);
|
||||
reconstruct_data.page_img = Some(img);
|
||||
need_base_image_lsn = None;
|
||||
break;
|
||||
} else if let Some(rec) = &entry.record {
|
||||
reconstruct_data.records.push(rec.clone());
|
||||
if rec.will_init {
|
||||
} else if let Some(rec_range) = &entry.record_range {
|
||||
let rec = WALRecord::des(&read_blob(&page_version_reader, rec_range)?)?;
|
||||
let will_init = rec.will_init;
|
||||
reconstruct_data.records.push(rec);
|
||||
if will_init {
|
||||
// This WAL record initializes the page, so no need to go further back
|
||||
need_base_image_lsn = None;
|
||||
break;
|
||||
@@ -233,7 +260,7 @@ impl Layer for DeltaLayer {
|
||||
///
|
||||
fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.page_version_metas = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
@@ -303,12 +330,12 @@ impl DeltaLayer {
|
||||
dropped,
|
||||
inner: Mutex::new(DeltaLayerInner {
|
||||
loaded: true,
|
||||
page_versions: page_versions,
|
||||
page_version_metas: BTreeMap::new(),
|
||||
relsizes: relsizes,
|
||||
}),
|
||||
predecessor,
|
||||
};
|
||||
let inner = delta_layer.inner.lock().unwrap();
|
||||
let mut inner = delta_layer.inner.lock().unwrap();
|
||||
|
||||
// Write the in-memory btreemaps into a file
|
||||
let path = delta_layer.path();
|
||||
@@ -318,9 +345,38 @@ impl DeltaLayer {
|
||||
let file = File::create(&path)?;
|
||||
let book = BookWriter::new(file, DELTA_FILE_MAGIC)?;
|
||||
|
||||
// Write out the other page versions
|
||||
let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER);
|
||||
let buf = BTreeMap::ser(&inner.page_versions)?;
|
||||
let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER);
|
||||
|
||||
for (key, page_version) in page_versions {
|
||||
let page_image_range = page_version
|
||||
.page_image
|
||||
.map(|page_image| page_version_writer.write_blob(page_image.as_ref()))
|
||||
.transpose()?;
|
||||
|
||||
let record_range = page_version
|
||||
.record
|
||||
.map(|record| {
|
||||
let buf = WALRecord::ser(&record)?;
|
||||
page_version_writer.write_blob(&buf)
|
||||
})
|
||||
.transpose()?;
|
||||
|
||||
let old = inner.page_version_metas.insert(
|
||||
key,
|
||||
PageVersionMeta {
|
||||
page_image_range,
|
||||
record_range,
|
||||
},
|
||||
);
|
||||
|
||||
assert!(old.is_none());
|
||||
}
|
||||
|
||||
let book = page_version_writer.close()?;
|
||||
|
||||
// Write out page versions
|
||||
let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER);
|
||||
let buf = BTreeMap::ser(&inner.page_version_metas)?;
|
||||
chapter.write_all(&buf)?;
|
||||
let book = chapter.close()?;
|
||||
|
||||
@@ -339,17 +395,7 @@ impl DeltaLayer {
|
||||
Ok(delta_layer)
|
||||
}
|
||||
|
||||
///
|
||||
/// Load the contents of the file into memory
|
||||
///
|
||||
fn load(&self) -> Result<MutexGuard<DeltaLayerInner>> {
|
||||
// quick exit if already loaded
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
if inner.loaded {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
fn open_book(&self) -> Result<(PathBuf, Book<File>)> {
|
||||
let path = Self::path_for(
|
||||
self.conf,
|
||||
self.timelineid,
|
||||
@@ -365,8 +411,24 @@ impl DeltaLayer {
|
||||
let file = File::open(&path)?;
|
||||
let book = Book::new(file)?;
|
||||
|
||||
let chapter = book.read_chapter(PAGE_VERSIONS_CHAPTER)?;
|
||||
let page_versions = BTreeMap::des(&chapter)?;
|
||||
Ok((path, book))
|
||||
}
|
||||
|
||||
///
|
||||
/// Load the contents of the file into memory
|
||||
///
|
||||
fn load(&self) -> Result<MutexGuard<DeltaLayerInner>> {
|
||||
// quick exit if already loaded
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
if inner.loaded {
|
||||
return Ok(inner);
|
||||
}
|
||||
|
||||
let (path, book) = self.open_book()?;
|
||||
|
||||
let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?;
|
||||
let page_version_metas = BTreeMap::des(&chapter)?;
|
||||
|
||||
let chapter = book.read_chapter(REL_SIZES_CHAPTER)?;
|
||||
let relsizes = BTreeMap::des(&chapter)?;
|
||||
@@ -375,7 +437,7 @@ impl DeltaLayer {
|
||||
|
||||
*inner = DeltaLayerInner {
|
||||
loaded: true,
|
||||
page_versions,
|
||||
page_version_metas,
|
||||
relsizes,
|
||||
};
|
||||
|
||||
@@ -400,7 +462,7 @@ impl DeltaLayer {
|
||||
dropped: filename.dropped,
|
||||
inner: Mutex::new(DeltaLayerInner {
|
||||
loaded: false,
|
||||
page_versions: BTreeMap::new(),
|
||||
page_version_metas: BTreeMap::new(),
|
||||
relsizes: BTreeMap::new(),
|
||||
}),
|
||||
predecessor,
|
||||
|
||||
Reference in New Issue
Block a user