diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 7999ab2f83..95de2a525c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -39,6 +39,7 @@ use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; +mod blob; mod delta_layer; mod filename; mod image_layer; diff --git a/pageserver/src/layered_repository/blob.rs b/pageserver/src/layered_repository/blob.rs new file mode 100644 index 0000000000..252920e33e --- /dev/null +++ b/pageserver/src/layered_repository/blob.rs @@ -0,0 +1,45 @@ +use std::{fs::File, io::Write}; + +use anyhow::Result; +use bookfile::{BookWriter, BoundedReader, ChapterId, ChapterWriter}; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct BlobRange { + offset: u64, + size: usize, +} + +pub fn read_blob(reader: &BoundedReader<&'_ File>, range: &BlobRange) -> Result> { + let mut buf = vec![0u8; range.size]; + reader.read_exact_at(&mut buf, range.offset)?; + Ok(buf) +} + +pub struct BlobWriter { + writer: ChapterWriter, + offset: u64, +} + +impl BlobWriter { + // This function takes a BookWriter and creates a new chapter to ensure offset is 0. + pub fn new(book_writer: BookWriter, chapter_id: impl Into) -> Self { + let writer = book_writer.new_chapter(chapter_id); + Self { writer, offset: 0 } + } + + pub fn write_blob(&mut self, blob: &[u8]) -> Result { + self.writer.write_all(blob)?; + + let range = BlobRange { + offset: self.offset, + size: blob.len(), + }; + self.offset += blob.len() as u64; + Ok(range) + } + + pub fn close(self) -> bookfile::Result> { + self.writer.close() + } +} diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 8a135492cc..94eca551bf 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -11,8 +11,11 @@ //! can happen when you create a new branch in the middle of a delta layer, and the WAL //! records on the new branch are put in a new delta layer. //! -//! When a delta file needs to be accessed, we slurp the whole file into memory, into -//! the DeltaLayer struct. See load() and unload() functions. +//! When a delta file needs to be accessed, we slurp the metadata and relsize chapters +//! into memory, into the DeltaLayerInner struct. See load() and unload() functions. +//! To access a page/WAL record, we search `page_version_metas` for the block # and LSN. +//! The byte ranges in the metadata can be used to find the page/WAL record in +//! PAGE_VERSIONS_CHAPTER. //! //! On disk, the delta files are stored in timelines/ directory. //! Currently, there are no subdirectories, and each delta file is named like this: @@ -34,14 +37,18 @@ //! A detlta file is constructed using the 'bookfile' crate. Each file consists of two //! parts: the page versions and the relation sizes. They are stored as separate chapters. //! +use crate::layered_repository::blob::BlobWriter; use crate::layered_repository::filename::DeltaFileName; use crate::layered_repository::storage_layer::{ Layer, PageReconstructData, PageVersion, SegmentTag, }; +use crate::repository::WALRecord; use crate::PageServerConf; use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; +use bytes::Bytes; use log::*; +use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::fs; use std::fs::File; @@ -55,11 +62,24 @@ use bookfile::{Book, BookWriter}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; +use super::blob::{read_blob, BlobRange}; + // Magic constant to identify a Zenith delta file static DELTA_FILE_MAGIC: u32 = 0x5A616E01; -static PAGE_VERSIONS_CHAPTER: u64 = 1; -static REL_SIZES_CHAPTER: u64 = 2; +/// Mapping from (block #, lsn) -> page/WAL record +/// byte ranges in PAGE_VERSIONS_CHAPTER +static PAGE_VERSION_METAS_CHAPTER: u64 = 1; +/// Page/WAL bytes - cannot be interpreted +/// without PAGE_VERSION_METAS_CHAPTER +static PAGE_VERSIONS_CHAPTER: u64 = 2; +static REL_SIZES_CHAPTER: u64 = 3; + +#[derive(Serialize, Deserialize)] +struct PageVersionMeta { + page_image_range: Option, + record_range: Option, +} /// /// DeltaLayer is the in-memory data structure associated with an @@ -91,13 +111,13 @@ pub struct DeltaLayer { } pub struct DeltaLayerInner { - /// If false, the 'page_versions' and 'relsizes' have not been + /// If false, the 'page_version_metas' and 'relsizes' have not been /// loaded into memory yet. loaded: bool, /// All versions of all pages in the file are are kept here. /// Indexed by block number and LSN. - page_versions: BTreeMap<(u32, Lsn), PageVersion>, + page_version_metas: BTreeMap<(u32, Lsn), PageVersionMeta>, /// `relsizes` tracks the size of the relation at different points in time. relsizes: BTreeMap, @@ -145,21 +165,28 @@ impl Layer for DeltaLayer { ) -> Result> { // Scan the BTreeMap backwards, starting from the given entry. let mut need_base_image_lsn: Option = Some(lsn); + + // TODO: avoid opening the snapshot file for each read + let (_path, book) = self.open_book()?; + let page_version_reader = book.chapter_reader(PAGE_VERSIONS_CHAPTER)?; { let inner = self.load()?; let minkey = (blknum, Lsn(0)); let maxkey = (blknum, lsn); let mut iter = inner - .page_versions + .page_version_metas .range((Included(&minkey), Included(&maxkey))); while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { - if let Some(img) = &entry.page_image { - reconstruct_data.page_img = Some(img.clone()); + if let Some(img_range) = &entry.page_image_range { + let img = Bytes::from(read_blob(&page_version_reader, img_range)?); + reconstruct_data.page_img = Some(img); need_base_image_lsn = None; break; - } else if let Some(rec) = &entry.record { - reconstruct_data.records.push(rec.clone()); - if rec.will_init { + } else if let Some(rec_range) = &entry.record_range { + let rec = WALRecord::des(&read_blob(&page_version_reader, rec_range)?)?; + let will_init = rec.will_init; + reconstruct_data.records.push(rec); + if will_init { // This WAL record initializes the page, so no need to go further back need_base_image_lsn = None; break; @@ -233,7 +260,7 @@ impl Layer for DeltaLayer { /// fn unload(&self) -> Result<()> { let mut inner = self.inner.lock().unwrap(); - inner.page_versions = BTreeMap::new(); + inner.page_version_metas = BTreeMap::new(); inner.relsizes = BTreeMap::new(); inner.loaded = false; Ok(()) @@ -303,12 +330,12 @@ impl DeltaLayer { dropped, inner: Mutex::new(DeltaLayerInner { loaded: true, - page_versions: page_versions, + page_version_metas: BTreeMap::new(), relsizes: relsizes, }), predecessor, }; - let inner = delta_layer.inner.lock().unwrap(); + let mut inner = delta_layer.inner.lock().unwrap(); // Write the in-memory btreemaps into a file let path = delta_layer.path(); @@ -318,9 +345,38 @@ impl DeltaLayer { let file = File::create(&path)?; let book = BookWriter::new(file, DELTA_FILE_MAGIC)?; - // Write out the other page versions - let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER); - let buf = BTreeMap::ser(&inner.page_versions)?; + let mut page_version_writer = BlobWriter::new(book, PAGE_VERSIONS_CHAPTER); + + for (key, page_version) in page_versions { + let page_image_range = page_version + .page_image + .map(|page_image| page_version_writer.write_blob(page_image.as_ref())) + .transpose()?; + + let record_range = page_version + .record + .map(|record| { + let buf = WALRecord::ser(&record)?; + page_version_writer.write_blob(&buf) + }) + .transpose()?; + + let old = inner.page_version_metas.insert( + key, + PageVersionMeta { + page_image_range, + record_range, + }, + ); + + assert!(old.is_none()); + } + + let book = page_version_writer.close()?; + + // Write out page versions + let mut chapter = book.new_chapter(PAGE_VERSION_METAS_CHAPTER); + let buf = BTreeMap::ser(&inner.page_version_metas)?; chapter.write_all(&buf)?; let book = chapter.close()?; @@ -339,17 +395,7 @@ impl DeltaLayer { Ok(delta_layer) } - /// - /// Load the contents of the file into memory - /// - fn load(&self) -> Result> { - // quick exit if already loaded - let mut inner = self.inner.lock().unwrap(); - - if inner.loaded { - return Ok(inner); - } - + fn open_book(&self) -> Result<(PathBuf, Book)> { let path = Self::path_for( self.conf, self.timelineid, @@ -365,8 +411,24 @@ impl DeltaLayer { let file = File::open(&path)?; let book = Book::new(file)?; - let chapter = book.read_chapter(PAGE_VERSIONS_CHAPTER)?; - let page_versions = BTreeMap::des(&chapter)?; + Ok((path, book)) + } + + /// + /// Load the contents of the file into memory + /// + fn load(&self) -> Result> { + // quick exit if already loaded + let mut inner = self.inner.lock().unwrap(); + + if inner.loaded { + return Ok(inner); + } + + let (path, book) = self.open_book()?; + + let chapter = book.read_chapter(PAGE_VERSION_METAS_CHAPTER)?; + let page_version_metas = BTreeMap::des(&chapter)?; let chapter = book.read_chapter(REL_SIZES_CHAPTER)?; let relsizes = BTreeMap::des(&chapter)?; @@ -375,7 +437,7 @@ impl DeltaLayer { *inner = DeltaLayerInner { loaded: true, - page_versions, + page_version_metas, relsizes, }; @@ -400,7 +462,7 @@ impl DeltaLayer { dropped: filename.dropped, inner: Mutex::new(DeltaLayerInner { loaded: false, - page_versions: BTreeMap::new(), + page_version_metas: BTreeMap::new(), relsizes: BTreeMap::new(), }), predecessor,