diff --git a/Cargo.lock b/Cargo.lock index bb27df7012..e0b6288f63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,30 +141,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" -[[package]] -name = "aversion" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41992ab8cfcc3026ef9abceffe0c2b0479c043183fc23825e30d22baab6df334" -dependencies = [ - "aversion-macros", - "byteorder", - "serde", - "serde_cbor", - "thiserror", -] - -[[package]] -name = "aversion-macros" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ba5785f953985aa0caca927ba4005880f3b4f53de87f134e810ae3549f744d2" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "aws-creds" version = "0.27.1" @@ -264,17 +240,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "bookfile" -version = "0.3.0" -source = "git+https://github.com/zenithdb/bookfile.git?rev=bf6e43825dfb6e749ae9b80e8372c8fea76cec2f#bf6e43825dfb6e749ae9b80e8372c8fea76cec2f" -dependencies = [ - "aversion", - "byteorder", - "serde", - "thiserror", -] - [[package]] name = "boxfnonce" version = "0.1.1" @@ -1524,7 +1489,6 @@ dependencies = [ "anyhow", "async-compression", "async-trait", - "bookfile", "byteorder", "bytes", "chrono", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 6a77af1691..a5283cb331 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] -bookfile = { git = "https://github.com/zenithdb/bookfile.git", rev="bf6e43825dfb6e749ae9b80e8372c8fea76cec2f" } chrono = "0.4.19" rand = "0.8.3" regex = "1.4.5" diff --git a/pageserver/src/bin/dump_layerfile.rs b/pageserver/src/bin/dump_layerfile.rs index 27d41d50d9..7cf39566ac 100644 --- a/pageserver/src/bin/dump_layerfile.rs +++ b/pageserver/src/bin/dump_layerfile.rs @@ -4,6 +4,7 @@ use anyhow::Result; use clap::{App, Arg}; use pageserver::layered_repository::dump_layerfile_from_path; +use pageserver::page_cache; use pageserver::virtual_file; use std::path::PathBuf; use zenith_utils::GIT_VERSION; @@ -24,6 +25,7 @@ fn main() -> Result<()> { // Basic initialization of things that don't change after startup virtual_file::init(10); + page_cache::init(100); dump_layerfile_from_path(&path, true)?; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 2d9b680624..5adf4a89ff 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -12,7 +12,6 @@ //! use anyhow::{anyhow, bail, ensure, Context, Result}; -use bookfile::Book; use bytes::Bytes; use fail::fail_point; use itertools::Itertools; @@ -56,6 +55,8 @@ use zenith_utils::crashsafe_dir; use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn}; use zenith_utils::seqwait::SeqWait; +mod blob_io; +pub mod block_io; mod delta_layer; pub(crate) mod ephemeral_file; mod filename; @@ -2054,16 +2055,17 @@ impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> { /// Dump contents of a layer file to stdout. pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> { - let file = File::open(path)?; - let book = Book::new(file)?; + use std::os::unix::fs::FileExt; - match book.magic() { - crate::DELTA_FILE_MAGIC => { - DeltaLayer::new_for_path(path, &book)?.dump(verbose)?; - } - crate::IMAGE_FILE_MAGIC => { - ImageLayer::new_for_path(path, &book)?.dump(verbose)?; - } + // All layer files start with a two-byte "magic" value, to identify the kind of + // file. + let file = File::open(path)?; + let mut header_buf = [0u8; 2]; + file.read_exact_at(&mut header_buf, 0)?; + + match u16::from_be_bytes(header_buf) { + crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose)?, + crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose)?, magic => bail!("unrecognized magic identifier: {:?}", magic), } @@ -2274,7 +2276,6 @@ pub mod tests { lsn, Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )?; - println!("updating {} at {}", blknum, lsn); writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; diff --git a/pageserver/src/layered_repository/blob_io.rs b/pageserver/src/layered_repository/blob_io.rs new file mode 100644 index 0000000000..10bfea934d --- /dev/null +++ b/pageserver/src/layered_repository/blob_io.rs @@ -0,0 +1,122 @@ +//! +//! Functions for reading and writing variable-sized "blobs". +//! +//! Each blob begins with a 4-byte length, followed by the actual data. +//! +use crate::layered_repository::block_io::{BlockCursor, BlockReader}; +use crate::page_cache::PAGE_SZ; +use std::cmp::min; +use std::io::Error; + +/// For reading +pub trait BlobCursor { + fn read_blob(&mut self, offset: u64) -> Result, std::io::Error> { + let mut buf = Vec::new(); + self.read_blob_into_buf(offset, &mut buf)?; + Ok(buf) + } + + fn read_blob_into_buf( + &mut self, + offset: u64, + dstbuf: &mut Vec, + ) -> Result<(), std::io::Error>; +} + +impl<'a, R> BlobCursor for BlockCursor +where + R: BlockReader, +{ + fn read_blob_into_buf( + &mut self, + offset: u64, + dstbuf: &mut Vec, + ) -> Result<(), std::io::Error> { + let mut blknum = (offset / PAGE_SZ as u64) as u32; + let mut off = (offset % PAGE_SZ as u64) as usize; + + let mut buf = self.read_blk(blknum)?; + + // read length + let mut len_buf = [0u8; 4]; + let thislen = PAGE_SZ - off; + if thislen < 4 { + // it is split across two pages + len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]); + blknum += 1; + buf = self.read_blk(blknum)?; + len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]); + off = 4 - thislen; + } else { + len_buf.copy_from_slice(&buf[off..off + 4]); + off += 4; + } + let len = u32::from_ne_bytes(len_buf) as usize; + + dstbuf.clear(); + + // Read the payload + let mut remain = len; + while remain > 0 { + let mut page_remain = PAGE_SZ - off; + if page_remain == 0 { + // continue on next page + blknum += 1; + buf = self.read_blk(blknum)?; + off = 0; + page_remain = PAGE_SZ; + } + let this_blk_len = min(remain, page_remain); + dstbuf.extend_from_slice(&buf[off..off + this_blk_len]); + remain -= this_blk_len; + off += this_blk_len; + } + Ok(()) + } +} + +pub trait BlobWriter { + fn write_blob(&mut self, srcbuf: &[u8]) -> Result; +} + +pub struct WriteBlobWriter +where + W: std::io::Write, +{ + inner: W, + offset: u64, +} + +impl WriteBlobWriter +where + W: std::io::Write, +{ + pub fn new(inner: W, start_offset: u64) -> Self { + WriteBlobWriter { + inner, + offset: start_offset, + } + } + + pub fn size(&self) -> u64 { + self.offset + } + + pub fn into_inner(self) -> W { + self.inner + } +} + +impl BlobWriter for WriteBlobWriter +where + W: std::io::Write, +{ + fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + let offset = self.offset; + self.inner + .write_all(&((srcbuf.len()) as u32).to_ne_bytes())?; + self.inner.write_all(srcbuf)?; + self.offset += 4 + srcbuf.len() as u64; + Ok(offset) + } +} diff --git a/pageserver/src/layered_repository/block_io.rs b/pageserver/src/layered_repository/block_io.rs new file mode 100644 index 0000000000..2b8e31e1ee --- /dev/null +++ b/pageserver/src/layered_repository/block_io.rs @@ -0,0 +1,176 @@ +//! +//! Low-level Block-oriented I/O functions +//! +//! +//! + +use crate::page_cache; +use crate::page_cache::{ReadBufResult, PAGE_SZ}; +use lazy_static::lazy_static; +use std::ops::{Deref, DerefMut}; +use std::os::unix::fs::FileExt; +use std::sync::atomic::AtomicU64; + +/// This is implemented by anything that can read 8 kB (PAGE_SZ) +/// blocks, using the page cache +/// +/// There are currently two implementations: EphemeralFile, and FileBlockReader +/// below. +pub trait BlockReader { + type BlockLease: Deref + 'static; + + /// + /// Read a block. Returns a "lease" object that can be used to + /// access to the contents of the page. (For the page cache, the + /// lease object represents a lock on the buffer.) + /// + fn read_blk(&self, blknum: u32) -> Result; + + /// + /// Create a new "cursor" for reading from this reader. + /// + /// A cursor caches the last accessed page, allowing for faster + /// access if the same block is accessed repeatedly. + fn block_cursor(&self) -> BlockCursor<&Self> + where + Self: Sized, + { + BlockCursor::new(self) + } +} + +impl BlockReader for &B +where + B: BlockReader, +{ + type BlockLease = B::BlockLease; + + fn read_blk(&self, blknum: u32) -> Result { + (*self).read_blk(blknum) + } +} + +/// +/// A "cursor" for efficiently reading multiple pages from a BlockReader +/// +/// A cursor caches the last accessed page, allowing for faster access if the +/// same block is accessed repeatedly. +/// +/// You can access the last page with `*cursor`. 'read_blk' returns 'self', so +/// that in many cases you can use a BlockCursor as a drop-in replacement for +/// the underlying BlockReader. For example: +/// +/// ```no_run +/// # use pageserver::layered_repository::block_io::{BlockReader, FileBlockReader}; +/// # let reader: FileBlockReader = todo!(); +/// let cursor = reader.block_cursor(); +/// let buf = cursor.read_blk(1); +/// // do stuff with 'buf' +/// let buf = cursor.read_blk(2); +/// // do stuff with 'buf' +/// ``` +/// +pub struct BlockCursor +where + R: BlockReader, +{ + reader: R, + /// last accessed page + cache: Option<(u32, R::BlockLease)>, +} + +impl BlockCursor +where + R: BlockReader, +{ + pub fn new(reader: R) -> Self { + BlockCursor { + reader, + cache: None, + } + } + + pub fn read_blk(&mut self, blknum: u32) -> Result<&Self, std::io::Error> { + // Fast return if this is the same block as before + if let Some((cached_blk, _buf)) = &self.cache { + if *cached_blk == blknum { + return Ok(self); + } + } + + // Read the block from the underlying reader, and cache it + self.cache = None; + let buf = self.reader.read_blk(blknum)?; + self.cache = Some((blknum, buf)); + + Ok(self) + } +} + +impl Deref for BlockCursor +where + R: BlockReader, +{ + type Target = [u8; PAGE_SZ]; + + fn deref(&self) -> &::Target { + &self.cache.as_ref().unwrap().1 + } +} + +lazy_static! { + static ref NEXT_ID: AtomicU64 = AtomicU64::new(1); +} + +/// An adapter for reading a (virtual) file using the page cache. +/// +/// The file is assumed to be immutable. This doesn't provide any functions +/// for modifying the file, nor for invalidating the cache if it is modified. +pub struct FileBlockReader { + pub file: F, + + /// Unique ID of this file, used as key in the page cache. + file_id: u64, +} + +impl FileBlockReader +where + F: FileExt, +{ + pub fn new(file: F) -> Self { + let file_id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + FileBlockReader { file_id, file } + } + + /// Read a page from the underlying file into given buffer. + fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + assert!(buf.len() == PAGE_SZ); + self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + } +} + +impl BlockReader for FileBlockReader +where + F: FileExt, +{ + type BlockLease = page_cache::PageReadGuard<'static>; + + fn read_blk(&self, blknum: u32) -> Result { + // Look up the right page + let cache = page_cache::get(); + loop { + match cache.read_immutable_buf(self.file_id, blknum) { + ReadBufResult::Found(guard) => break Ok(guard), + ReadBufResult::NotFound(mut write_guard) => { + // Read the page from disk into the buffer + self.fill_buffer(write_guard.deref_mut(), blknum)?; + write_guard.mark_valid(); + + // Swap for read lock + continue; + } + }; + } + } +} diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 7013c2417c..f8828b541f 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -23,21 +23,27 @@ //! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051 //! //! -//! A delta file is constructed using the 'bookfile' crate. Each file consists of three -//! parts: the 'index', the values, and a short summary header. They are stored as -//! separate chapters. +//! Every delta file consists of three parts: "summary", "index", and +//! "values". The summary is a fixed size header at the beginning of the file, +//! and it contains basic information about the layer, and offsets to the other +//! parts. The "index" is a serialized HashMap mapping from Key and LSN to an offset in the +//! "values" part. The actual page images and WAL records are stored in the +//! "values" part. //! use crate::config::PageServerConf; +use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; +use crate::layered_repository::block_io::{BlockCursor, BlockReader, FileBlockReader}; use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ BlobRef, Layer, ValueReconstructResult, ValueReconstructState, }; +use crate::page_cache::{PageReadGuard, PAGE_SZ}; use crate::repository::{Key, Value}; use crate::virtual_file::VirtualFile; use crate::walrecord; -use crate::DELTA_FILE_MAGIC; use crate::{ZTenantId, ZTimelineId}; -use anyhow::{bail, ensure, Result}; +use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; +use anyhow::{bail, ensure, Context, Result}; use log::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -46,44 +52,43 @@ use zenith_utils::vec_map::VecMap; // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::fs; -use std::io::BufWriter; -use std::io::Write; +use std::io::{BufWriter, Write}; +use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError}; -use bookfile::{Book, BookWriter, ChapterWriter}; - use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; -/// Mapping from (key, lsn) -> page/WAL record -/// byte ranges in VALUES_CHAPTER -static INDEX_CHAPTER: u64 = 1; - -/// Page/WAL bytes - cannot be interpreted -/// without the page versions from the INDEX_CHAPTER -static VALUES_CHAPTER: u64 = 2; - -/// Contains the [`Summary`] struct -static SUMMARY_CHAPTER: u64 = 3; - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] struct Summary { + /// Magic value to identify this as a zenith delta file. Always DELTA_FILE_MAGIC. + magic: u16, + format_version: u16, + tenantid: ZTenantId, timelineid: ZTimelineId, key_range: Range, lsn_range: Range, + + /// Block number where the 'index' part of the file begins. + index_start_blk: u32, } impl From<&DeltaLayer> for Summary { fn from(layer: &DeltaLayer) -> Self { Self { + magic: DELTA_FILE_MAGIC, + format_version: STORAGE_FORMAT_VERSION, + tenantid: layer.tenantid, timelineid: layer.timelineid, key_range: layer.key_range.clone(), lsn_range: layer.lsn_range.clone(), + + index_start_blk: 0, } } } @@ -118,7 +123,11 @@ pub struct DeltaLayerInner { /// index: HashMap>, - book: Option>, + // values copied from summary + index_start_blk: u32, + + /// Reader object for reading blocks from the file. (None if not loaded yet) + file: Option>, } impl Layer for DeltaLayer { @@ -155,45 +164,28 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory let inner = self.load()?; - let values_reader = inner - .book - .as_ref() - .expect("should be loaded in load call above") - .chapter_reader(VALUES_CHAPTER)?; // Scan the page versions backwards, starting from `lsn`. if let Some(vec_map) = inner.index.get(&key) { + let mut reader = inner.file.as_ref().unwrap().block_cursor(); let slice = vec_map.slice_range(lsn_range); - let mut size = 0usize; - let mut first_pos = 0u64; - for (_entry_lsn, blob_ref) in slice.iter().rev() { - size += blob_ref.size(); - first_pos = blob_ref.pos(); - if blob_ref.will_init() { - break; - } - } - if size != 0 { - let mut buf = vec![0u8; size]; - values_reader.read_exact_at(&mut buf, first_pos)?; - for (entry_lsn, blob_ref) in slice.iter().rev() { - let offs = (blob_ref.pos() - first_pos) as usize; - let val = Value::des(&buf[offs..offs + blob_ref.size()])?; - match val { - Value::Image(img) => { - reconstruct_state.img = Some((*entry_lsn, img)); + for (entry_lsn, blob_ref) in slice.iter().rev() { + let buf = reader.read_blob(blob_ref.pos())?; + let val = Value::des(&buf)?; + match val { + Value::Image(img) => { + reconstruct_state.img = Some((*entry_lsn, img)); + need_image = false; + break; + } + Value::WalRecord(rec) => { + let will_init = rec.will_init(); + reconstruct_state.records.push((*entry_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back need_image = false; break; } - Value::WalRecord(rec) => { - let will_init = rec.will_init(); - reconstruct_state.records.push((*entry_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back - need_image = false; - break; - } - } } } } @@ -210,7 +202,7 @@ impl Layer for DeltaLayer { } } - fn iter(&self) -> Box> + '_> { + fn iter<'a>(&'a self) -> Box> + 'a> { let inner = self.load().unwrap(); match DeltaValueIter::new(inner) { @@ -281,20 +273,16 @@ impl Layer for DeltaLayer { let inner = self.load()?; - let path = self.path(); - let file = std::fs::File::open(&path)?; - let book = Book::new(file)?; - let chapter = book.chapter_reader(VALUES_CHAPTER)?; - let mut values: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); values.sort_by_key(|k| k.0); + let mut reader = inner.file.as_ref().unwrap().block_cursor(); + for (key, versions) in values { for (lsn, blob_ref) in versions.as_slice() { let mut desc = String::new(); - let mut buf = vec![0u8; blob_ref.size()]; - match chapter.read_exact_at(&mut buf, blob_ref.pos()) { - Ok(()) => { + match reader.read_blob(blob_ref.pos()) { + Ok(buf) => { let val = Value::des(&buf); match val { @@ -378,19 +366,19 @@ impl DeltaLayer { let path = self.path(); // Open the file if it's not open already. - if inner.book.is_none() { - let file = VirtualFile::open(&path)?; - inner.book = Some(Book::new(file)?); + if inner.file.is_none() { + let file = VirtualFile::open(&path) + .with_context(|| format!("Failed to open file '{}'", path.display()))?; + inner.file = Some(FileBlockReader::new(file)); } - let book = inner.book.as_ref().unwrap(); + let file = inner.file.as_mut().unwrap(); + let summary_blk = file.read_blk(0)?; + let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; match &self.path_or_conf { PathOrConf::Conf(_) => { - let chapter = book.read_chapter(SUMMARY_CHAPTER)?; - let actual_summary = Summary::des(&chapter)?; - - let expected_summary = Summary::from(self); - + let mut expected_summary = Summary::from(self); + expected_summary.index_start_blk = actual_summary.index_start_blk; if actual_summary != expected_summary { bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary); } @@ -409,8 +397,13 @@ impl DeltaLayer { } } - let chapter = book.read_chapter(INDEX_CHAPTER)?; - let index = HashMap::des(&chapter)?; + file.file.seek(SeekFrom::Start( + actual_summary.index_start_blk as u64 * PAGE_SZ as u64, + ))?; + let mut buf_reader = std::io::BufReader::new(&mut file.file); + let index = HashMap::des_from(&mut buf_reader)?; + + inner.index_start_blk = actual_summary.index_start_blk; debug!("loaded from {}", &path.display()); @@ -434,8 +427,9 @@ impl DeltaLayer { lsn_range: filename.lsn_range.clone(), inner: RwLock::new(DeltaLayerInner { loaded: false, - book: None, index: HashMap::default(), + file: None, + index_start_blk: 0, }), } } @@ -443,12 +437,14 @@ impl DeltaLayer { /// Create a DeltaLayer struct representing an existing file on disk. /// /// This variant is only used for debugging purposes, by the 'dump_layerfile' binary. - pub fn new_for_path(path: &Path, book: &Book) -> Result + pub fn new_for_path(path: &Path, file: F) -> Result where F: FileExt, { - let chapter = book.read_chapter(SUMMARY_CHAPTER)?; - let summary = Summary::des(&chapter)?; + let mut summary_buf = Vec::new(); + summary_buf.resize(PAGE_SZ, 0); + file.read_exact_at(&mut summary_buf, 0)?; + let summary = Summary::des_prefix(&summary_buf)?; Ok(DeltaLayer { path_or_conf: PathOrConf::Path(path.to_path_buf()), @@ -458,8 +454,9 @@ impl DeltaLayer { lsn_range: summary.lsn_range, inner: RwLock::new(DeltaLayerInner { loaded: false, - book: None, + file: None, index: HashMap::default(), + index_start_blk: 0, }), }) } @@ -504,8 +501,7 @@ pub struct DeltaLayerWriter { index: HashMap>, - values_writer: ChapterWriter>, - end_offset: u64, + blob_writer: WriteBlobWriter>, } impl DeltaLayerWriter { @@ -531,13 +527,10 @@ impl DeltaLayerWriter { u64::from(lsn_range.start), u64::from(lsn_range.end) )); - let file = VirtualFile::create(&path)?; + let mut file = VirtualFile::create(&path)?; + file.seek(SeekFrom::Start(PAGE_SZ as u64))?; let buf_writer = BufWriter::new(file); - let book = BookWriter::new(buf_writer, DELTA_FILE_MAGIC)?; - - // Open the page-versions chapter for writing. The calls to - // `put_value` will use this to write the contents. - let values_writer = book.new_chapter(VALUES_CHAPTER); + let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64); Ok(DeltaLayerWriter { conf, @@ -547,8 +540,7 @@ impl DeltaLayerWriter { key_start, lsn_range, index: HashMap::new(), - values_writer, - end_offset: 0, + blob_writer, }) } @@ -558,17 +550,12 @@ impl DeltaLayerWriter { /// The values must be appended in key, lsn order. /// pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> { - //info!("DELTA: key {} at {} on {}", key, lsn, self.path.display()); assert!(self.lsn_range.start <= lsn); - // Remember the offset and size metadata. The metadata is written - // to a separate chapter, in `finish`. - let off = self.end_offset; - let buf = Value::ser(&val)?; - let len = buf.len(); - self.values_writer.write_all(&buf)?; - self.end_offset += len as u64; + + let off = self.blob_writer.write_blob(&Value::ser(&val)?)?; + let vec_map = self.index.entry(key).or_default(); - let blob_ref = BlobRef::new(off, len, val.will_init()); + let blob_ref = BlobRef::new(off, val.will_init()); let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -583,38 +570,40 @@ impl DeltaLayerWriter { } pub fn size(&self) -> u64 { - self.end_offset + self.blob_writer.size() } /// /// Finish writing the delta layer. /// pub fn finish(self, key_end: Key) -> anyhow::Result { - // Close the values chapter - let book = self.values_writer.close()?; + let index_start_blk = + ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; + + let buf_writer = self.blob_writer.into_inner(); + let mut file = buf_writer.into_inner()?; // Write out the index - let mut chapter = book.new_chapter(INDEX_CHAPTER); let buf = HashMap::ser(&self.index)?; - chapter.write_all(&buf)?; - let book = chapter.close()?; + file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?; + file.write_all(&buf)?; - let mut chapter = book.new_chapter(SUMMARY_CHAPTER); + // Fill in the summary on blk 0 let summary = Summary { + magic: DELTA_FILE_MAGIC, + format_version: STORAGE_FORMAT_VERSION, tenantid: self.tenantid, timelineid: self.timelineid, key_range: self.key_start..key_end, lsn_range: self.lsn_range.clone(), + index_start_blk, }; - Summary::ser_into(&summary, &mut chapter)?; - let book = chapter.close()?; - - // This flushes the underlying 'buf_writer'. - book.close()?; + file.seek(SeekFrom::Start(0))?; + Summary::ser_into(&summary, &mut file)?; // Note: Because we opened the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't - // set inner.book here. The first read will have to re-open it. + // set inner.file here. The first read will have to re-open it. let layer = DeltaLayer { path_or_conf: PathOrConf::Conf(self.conf), tenantid: self.tenantid, @@ -624,7 +613,8 @@ impl DeltaLayerWriter { inner: RwLock::new(DeltaLayerInner { loaded: false, index: HashMap::new(), - book: None, + file: None, + index_start_blk, }), }; @@ -647,22 +637,6 @@ impl DeltaLayerWriter { Ok(layer) } - - pub fn abort(self) { - match self.values_writer.close() { - Ok(book) => { - if let Err(err) = book.close() { - error!("error while closing delta layer file: {}", err); - } - } - Err(err) => { - error!("error while closing chapter writer: {}", err); - } - } - if let Err(err) = std::fs::remove_file(self.path) { - error!("error removing unfinished delta layer file: {}", err); - } - } } /// @@ -672,13 +646,23 @@ impl DeltaLayerWriter { /// That takes up quite a lot of memory. Should do this in a more streaming /// fashion. /// -struct DeltaValueIter { +struct DeltaValueIter<'a> { all_offsets: Vec<(Key, Lsn, BlobRef)>, next_idx: usize, - data: Vec, + reader: BlockCursor>, } -impl Iterator for DeltaValueIter { +struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>); + +impl<'a> BlockReader for Adapter<'a> { + type BlockLease = PageReadGuard<'static>; + + fn read_blk(&self, blknum: u32) -> Result { + self.0.file.as_ref().unwrap().read_blk(blknum) + } +} + +impl<'a> Iterator for DeltaValueIter<'a> { type Item = Result<(Key, Lsn, Value)>; fn next(&mut self) -> Option { @@ -686,8 +670,8 @@ impl Iterator for DeltaValueIter { } } -impl DeltaValueIter { - fn new(inner: RwLockReadGuard) -> Result { +impl<'a> DeltaValueIter<'a> { + fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result { let mut index: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); index.sort_by_key(|x| x.0); @@ -698,30 +682,24 @@ impl DeltaValueIter { } } - let values_reader = inner - .book - .as_ref() - .expect("should be loaded in load call above") - .chapter_reader(VALUES_CHAPTER)?; - let file_size = values_reader.len() as usize; - let mut layer = DeltaValueIter { + let iter = DeltaValueIter { all_offsets, next_idx: 0, - data: vec![0u8; file_size], + reader: BlockCursor::new(Adapter(inner)), }; - values_reader.read_exact_at(&mut layer.data, 0)?; - Ok(layer) + Ok(iter) } fn next_res(&mut self) -> Result> { if self.next_idx < self.all_offsets.len() { - let (key, lsn, blob_ref) = self.all_offsets[self.next_idx]; - let offs = blob_ref.pos() as usize; - let size = blob_ref.size(); - let val = Value::des(&self.data[offs..offs + size])?; + let (key, lsn, off) = &self.all_offsets[self.next_idx]; + + //let mut reader = BlobReader::new(self.inner.file.as_ref().unwrap()); + let buf = self.reader.read_blob(off.pos())?; + let val = Value::des(&buf)?; self.next_idx += 1; - Ok(Some((key, lsn, val))) + Ok(Some((*key, *lsn, val))) } else { Ok(None) } diff --git a/pageserver/src/layered_repository/ephemeral_file.rs b/pageserver/src/layered_repository/ephemeral_file.rs index 79a72f4563..d509186e6f 100644 --- a/pageserver/src/layered_repository/ephemeral_file.rs +++ b/pageserver/src/layered_repository/ephemeral_file.rs @@ -2,6 +2,8 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; +use crate::layered_repository::blob_io::BlobWriter; +use crate::layered_repository::block_io::BlockReader; use crate::page_cache; use crate::page_cache::PAGE_SZ; use crate::page_cache::{ReadBufResult, WriteBufResult}; @@ -10,7 +12,7 @@ use lazy_static::lazy_static; use std::cmp::min; use std::collections::HashMap; use std::fs::OpenOptions; -use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; +use std::io::{Error, ErrorKind}; use std::ops::DerefMut; use std::path::PathBuf; use std::sync::{Arc, RwLock}; @@ -41,7 +43,7 @@ pub struct EphemeralFile { _timelineid: ZTimelineId, file: Arc, - pos: u64, + size: u64, } impl EphemeralFile { @@ -70,11 +72,11 @@ impl EphemeralFile { _tenantid: tenantid, _timelineid: timelineid, file: file_rc, - pos: 0, + size: 0, }) } - pub fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> { + fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> { let mut off = 0; while off < PAGE_SZ { let n = self @@ -93,6 +95,26 @@ impl EphemeralFile { } Ok(()) } + + fn get_buf_for_write(&self, blkno: u32) -> Result { + // Look up the right page + let cache = page_cache::get(); + let mut write_guard = match cache.write_ephemeral_buf(self.file_id, blkno) { + WriteBufResult::Found(guard) => guard, + WriteBufResult::NotFound(mut guard) => { + // Read the page from disk into the buffer + // TODO: if we're overwriting the whole page, no need to read it in first + self.fill_buffer(guard.deref_mut(), blkno)?; + guard.mark_valid(); + + // And then fall through to modify it. + guard + } + }; + write_guard.mark_dirty(); + + Ok(write_guard) + } } /// Does the given filename look like an ephemeral file? @@ -167,48 +189,49 @@ impl FileExt for EphemeralFile { } } -impl Write for EphemeralFile { - fn write(&mut self, buf: &[u8]) -> Result { - let n = self.write_at(buf, self.pos)?; - self.pos += n as u64; - Ok(n) - } +impl BlobWriter for EphemeralFile { + fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + let pos = self.size; - fn flush(&mut self) -> Result<(), std::io::Error> { - // we don't need to flush data: - // * we either write input bytes or not, not keeping any intermediate data buffered - // * rust unix file `flush` impl does not flush things either, returning `Ok(())` - Ok(()) - } -} + let mut blknum = (self.size / PAGE_SZ as u64) as u32; + let mut off = (pos % PAGE_SZ as u64) as usize; -impl Seek for EphemeralFile { - fn seek(&mut self, pos: SeekFrom) -> Result { - match pos { - SeekFrom::Start(offset) => { - self.pos = offset; - } - SeekFrom::End(_offset) => { - return Err(Error::new( - ErrorKind::Other, - "SeekFrom::End not supported by EphemeralFile", - )); - } - SeekFrom::Current(offset) => { - let pos = self.pos as i128 + offset as i128; - if pos < 0 { - return Err(Error::new( - ErrorKind::InvalidInput, - "offset would be negative", - )); - } - if pos > u64::MAX as i128 { - return Err(Error::new(ErrorKind::InvalidInput, "offset overflow")); - } - self.pos = pos as u64; - } + let mut buf = self.get_buf_for_write(blknum)?; + + // Write the length field + let len_buf = u32::to_ne_bytes(srcbuf.len() as u32); + let thislen = PAGE_SZ - off; + if thislen < 4 { + // it needs to be split across pages + buf[off..(off + thislen)].copy_from_slice(&len_buf[..thislen]); + blknum += 1; + buf = self.get_buf_for_write(blknum)?; + buf[0..4 - thislen].copy_from_slice(&len_buf[thislen..]); + off = 4 - thislen; + } else { + buf[off..off + 4].copy_from_slice(&len_buf); + off += 4; } - Ok(self.pos) + + // Write the payload + let mut buf_remain = srcbuf; + while !buf_remain.is_empty() { + let mut page_remain = PAGE_SZ - off; + if page_remain == 0 { + blknum += 1; + buf = self.get_buf_for_write(blknum)?; + off = 0; + page_remain = PAGE_SZ; + } + let this_blk_len = min(page_remain, buf_remain.len()); + buf[off..(off + this_blk_len)].copy_from_slice(&buf_remain[..this_blk_len]); + off += this_blk_len; + buf_remain = &buf_remain[this_blk_len..]; + } + drop(buf); + self.size += 4 + srcbuf.len() as u64; + + Ok(pos) } } @@ -239,11 +262,34 @@ pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Er } } +impl BlockReader for EphemeralFile { + type BlockLease = page_cache::PageReadGuard<'static>; + + fn read_blk(&self, blknum: u32) -> Result { + // Look up the right page + let cache = page_cache::get(); + loop { + match cache.read_ephemeral_buf(self.file_id, blknum) { + ReadBufResult::Found(guard) => return Ok(guard), + ReadBufResult::NotFound(mut write_guard) => { + // Read the page from disk into the buffer + self.fill_buffer(write_guard.deref_mut(), blknum)?; + write_guard.mark_valid(); + + // Swap for read lock + continue; + } + }; + } + } +} + #[cfg(test)] mod tests { use super::*; - use rand::seq::SliceRandom; - use rand::thread_rng; + use crate::layered_repository::blob_io::{BlobCursor, BlobWriter}; + use crate::layered_repository::block_io::BlockCursor; + use rand::{seq::SliceRandom, thread_rng, RngCore}; use std::fs; use std::str::FromStr; @@ -281,19 +327,19 @@ mod tests { fn test_ephemeral_files() -> Result<(), Error> { let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?; - let mut file_a = EphemeralFile::create(conf, tenantid, timelineid)?; + let file_a = EphemeralFile::create(conf, tenantid, timelineid)?; - file_a.write_all(b"foo")?; + file_a.write_all_at(b"foo", 0)?; assert_eq!("foo", read_string(&file_a, 0, 20)?); - file_a.write_all(b"bar")?; + file_a.write_all_at(b"bar", 3)?; assert_eq!("foobar", read_string(&file_a, 0, 20)?); // Open a lot of files, enough to cause some page evictions. let mut efiles = Vec::new(); for fileno in 0..100 { - let mut efile = EphemeralFile::create(conf, tenantid, timelineid)?; - efile.write_all(format!("file {}", fileno).as_bytes())?; + let efile = EphemeralFile::create(conf, tenantid, timelineid)?; + efile.write_all_at(format!("file {}", fileno).as_bytes(), 0)?; assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?); efiles.push((fileno, efile)); } @@ -307,4 +353,41 @@ mod tests { Ok(()) } + + #[test] + fn test_ephemeral_blobs() -> Result<(), Error> { + let (conf, tenantid, timelineid) = repo_harness("ephemeral_blobs")?; + + let mut file = EphemeralFile::create(conf, tenantid, timelineid)?; + + let pos_foo = file.write_blob(b"foo")?; + assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice()); + let pos_bar = file.write_blob(b"bar")?; + assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice()); + assert_eq!(b"bar", file.block_cursor().read_blob(pos_bar)?.as_slice()); + + let mut blobs = Vec::new(); + for i in 0..10000 { + let data = Vec::from(format!("blob{}", i).as_bytes()); + let pos = file.write_blob(&data)?; + blobs.push((pos, data)); + } + + let mut cursor = BlockCursor::new(&file); + for (pos, expected) in blobs { + let actual = cursor.read_blob(pos)?; + assert_eq!(actual, expected); + } + drop(cursor); + + // Test a large blob that spans multiple pages + let mut large_data = Vec::new(); + large_data.resize(20000, 0); + thread_rng().fill_bytes(&mut large_data); + let pos_large = file.write_blob(&large_data)?; + let result = file.block_cursor().read_blob(pos_large)?; + assert_eq!(result, large_data); + + Ok(()) + } } diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 68d1cd4a8a..a8e5de09f5 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -13,63 +13,70 @@ //! //! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568 //! -//! An image file is constructed using the 'bookfile' crate. +//! Every image layer file consists of three parts: "summary", +//! "index", and "values". The summary is a fixed size header at the +//! beginning of the file, and it contains basic information about the +//! layer, and offsets to the other parts. The "index" is a serialized +//! HashMap, mapping from Key to an offset in the "values" part. The +//! actual page images are stored in the "values" part. //! -//! Only metadata is loaded into memory by the load function. +//! Only the "index" is loaded into memory by the load function. //! When images are needed, they are read directly from disk. //! use crate::config::PageServerConf; +use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter}; +use crate::layered_repository::block_io::{BlockReader, FileBlockReader}; use crate::layered_repository::filename::{ImageFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ BlobRef, Layer, ValueReconstructResult, ValueReconstructState, }; +use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value}; use crate::virtual_file::VirtualFile; -use crate::IMAGE_FILE_MAGIC; use crate::{ZTenantId, ZTimelineId}; +use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; use bytes::Bytes; use log::*; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs; -use std::io::{BufWriter, Write}; +use std::io::Write; +use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::{RwLock, RwLockReadGuard, TryLockError}; -use bookfile::{Book, BookWriter, ChapterWriter}; - use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; -/// Mapping from (key, lsn) -> page/WAL record -/// byte ranges in VALUES_CHAPTER -static INDEX_CHAPTER: u64 = 1; - -/// Contains each block in block # order -const VALUES_CHAPTER: u64 = 2; - -/// Contains the [`Summary`] struct -const SUMMARY_CHAPTER: u64 = 3; - #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] struct Summary { + /// Magic value to identify this as a zenith image file. Always IMAGE_FILE_MAGIC. + magic: u16, + format_version: u16, + tenantid: ZTenantId, timelineid: ZTimelineId, key_range: Range, - lsn: Lsn, + + /// Block number where the 'index' part of the file begins. + index_start_blk: u32, } impl From<&ImageLayer> for Summary { fn from(layer: &ImageLayer) -> Self { Self { + magic: IMAGE_FILE_MAGIC, + format_version: STORAGE_FORMAT_VERSION, tenantid: layer.tenantid, timelineid: layer.timelineid, key_range: layer.key_range.clone(), lsn: layer.lsn, + + index_start_blk: 0, } } } @@ -97,12 +104,14 @@ pub struct ImageLayerInner { /// If false, the 'index' has not been loaded into memory yet. loaded: bool, - /// The underlying (virtual) file handle. None if the layer hasn't been loaded - /// yet. - book: Option>, - /// offset of each value index: HashMap, + + // values copied from summary + index_start_blk: u32, + + /// Reader object for reading blocks from the file. (None if not loaded yet) + file: Option>, } impl Layer for ImageLayer { @@ -138,26 +147,21 @@ impl Layer for ImageLayer { assert!(lsn_range.end >= self.lsn); let inner = self.load()?; - if let Some(blob_ref) = inner.index.get(&key) { - let chapter = inner - .book + let buf = inner + .file .as_ref() .unwrap() - .chapter_reader(VALUES_CHAPTER)?; - - let mut blob = vec![0; blob_ref.size()]; - chapter - .read_exact_at(&mut blob, blob_ref.pos()) + .block_cursor() + .read_blob(blob_ref.pos()) .with_context(|| { format!( - "failed to read {} bytes from data file {} at offset {}", - blob_ref.size(), + "failed to read blob from data file {} at offset {}", self.filename().display(), blob_ref.pos() ) })?; - let value = Bytes::from(blob); + let value = Bytes::from(buf); reconstruct_state.img = Some((self.lsn, value)); Ok(ValueReconstructResult::Complete) @@ -228,12 +232,7 @@ impl Layer for ImageLayer { index_vec.sort_by_key(|x| x.1.pos()); for (key, blob_ref) in index_vec { - println!( - "key: {} size {} offset {}", - key, - blob_ref.size(), - blob_ref.pos() - ); + println!("key: {} offset {}", key, blob_ref.pos()); } Ok(()) @@ -291,21 +290,19 @@ impl ImageLayer { let path = self.path(); // Open the file if it's not open already. - if inner.book.is_none() { + if inner.file.is_none() { let file = VirtualFile::open(&path) .with_context(|| format!("Failed to open file '{}'", path.display()))?; - inner.book = Some(Book::new(file).with_context(|| { - format!("Failed to open file '{}' as a bookfile", path.display()) - })?); + inner.file = Some(FileBlockReader::new(file)); } - let book = inner.book.as_ref().unwrap(); + let file = inner.file.as_mut().unwrap(); + let summary_blk = file.read_blk(0)?; + let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; match &self.path_or_conf { PathOrConf::Conf(_) => { - let chapter = book.read_chapter(SUMMARY_CHAPTER)?; - let actual_summary = Summary::des(&chapter)?; - - let expected_summary = Summary::from(self); + let mut expected_summary = Summary::from(self); + expected_summary.index_start_blk = actual_summary.index_start_blk; if actual_summary != expected_summary { bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary); @@ -325,14 +322,18 @@ impl ImageLayer { } } - let chapter = book.read_chapter(INDEX_CHAPTER)?; - let index = HashMap::des(&chapter)?; + file.file.seek(SeekFrom::Start( + actual_summary.index_start_blk as u64 * PAGE_SZ as u64, + ))?; + let mut buf_reader = std::io::BufReader::new(&mut file.file); + let index = HashMap::des_from(&mut buf_reader)?; + + inner.index_start_blk = actual_summary.index_start_blk; info!("loaded from {}", &path.display()); inner.index = index; inner.loaded = true; - Ok(()) } @@ -350,9 +351,10 @@ impl ImageLayer { key_range: filename.key_range.clone(), lsn: filename.lsn, inner: RwLock::new(ImageLayerInner { - book: None, index: HashMap::new(), loaded: false, + file: None, + index_start_blk: 0, }), } } @@ -360,12 +362,14 @@ impl ImageLayer { /// Create an ImageLayer struct representing an existing file on disk. /// /// This variant is only used for debugging purposes, by the 'dump_layerfile' binary. - pub fn new_for_path(path: &Path, book: &Book) -> Result + pub fn new_for_path(path: &Path, file: F) -> Result where F: std::os::unix::prelude::FileExt, { - let chapter = book.read_chapter(SUMMARY_CHAPTER)?; - let summary = Summary::des(&chapter)?; + let mut summary_buf = Vec::new(); + summary_buf.resize(PAGE_SZ, 0); + file.read_exact_at(&mut summary_buf, 0)?; + let summary = Summary::des_prefix(&summary_buf)?; Ok(ImageLayer { path_or_conf: PathOrConf::Path(path.to_path_buf()), @@ -374,9 +378,10 @@ impl ImageLayer { key_range: summary.key_range, lsn: summary.lsn, inner: RwLock::new(ImageLayerInner { - book: None, + file: None, index: HashMap::new(), loaded: false, + index_start_blk: 0, }), }) } @@ -412,18 +417,15 @@ impl ImageLayer { /// pub struct ImageLayerWriter { conf: &'static PageServerConf, - path: PathBuf, + _path: PathBuf, timelineid: ZTimelineId, tenantid: ZTenantId, key_range: Range, lsn: Lsn, - values_writer: Option>>, - end_offset: u64, - index: HashMap, - finished: bool, + blob_writer: WriteBlobWriter, } impl ImageLayerWriter { @@ -449,24 +451,17 @@ impl ImageLayerWriter { ); info!("new image layer {}", path.display()); let file = VirtualFile::create(&path)?; - let buf_writer = BufWriter::new(file); - let book = BookWriter::new(buf_writer, IMAGE_FILE_MAGIC)?; - - // Open the page-images chapter for writing. The calls to - // `put_image` will use this to write the contents. - let chapter = book.new_chapter(VALUES_CHAPTER); + let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64); let writer = ImageLayerWriter { conf, - path, + _path: path, timelineid, tenantid, key_range: key_range.clone(), lsn, - values_writer: Some(chapter), index: HashMap::new(), - end_offset: 0, - finished: false, + blob_writer, }; Ok(writer) @@ -479,49 +474,41 @@ impl ImageLayerWriter { /// pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> { ensure!(self.key_range.contains(&key)); - let off = self.end_offset; + let off = self.blob_writer.write_blob(img)?; - if let Some(writer) = &mut self.values_writer { - let len = img.len(); - writer.write_all(img)?; - self.end_offset += len as u64; - - let old = self.index.insert(key, BlobRef::new(off, len, true)); - assert!(old.is_none()); - } else { - panic!() - } + let old = self.index.insert(key, BlobRef::new(off, true)); + assert!(old.is_none()); Ok(()) } - pub fn finish(&mut self) -> anyhow::Result { - // Close the values chapter - let book = self.values_writer.take().unwrap().close()?; + pub fn finish(self) -> anyhow::Result { + let index_start_blk = + ((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32; + + let mut file = self.blob_writer.into_inner(); // Write out the index - let mut chapter = book.new_chapter(INDEX_CHAPTER); let buf = HashMap::ser(&self.index)?; - chapter.write_all(&buf)?; - let book = chapter.close()?; + file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?; + file.write_all(&buf)?; - // Write out the summary chapter - let mut chapter = book.new_chapter(SUMMARY_CHAPTER); + // Fill in the summary on blk 0 let summary = Summary { + magic: IMAGE_FILE_MAGIC, + format_version: STORAGE_FORMAT_VERSION, tenantid: self.tenantid, timelineid: self.timelineid, key_range: self.key_range.clone(), lsn: self.lsn, + index_start_blk, }; - Summary::ser_into(&summary, &mut chapter)?; - let book = chapter.close()?; - - // This flushes the underlying 'buf_writer'. - book.close()?; + file.seek(SeekFrom::Start(0))?; + Summary::ser_into(&summary, &mut file)?; // Note: Because we open the file in write-only mode, we cannot // reuse the same VirtualFile for reading later. That's why we don't - // set inner.book here. The first read will have to re-open it. + // set inner.file here. The first read will have to re-open it. let layer = ImageLayer { path_or_conf: PathOrConf::Conf(self.conf), timelineid: self.timelineid, @@ -529,28 +516,14 @@ impl ImageLayerWriter { key_range: self.key_range.clone(), lsn: self.lsn, inner: RwLock::new(ImageLayerInner { - book: None, loaded: false, index: HashMap::new(), + file: None, + index_start_blk, }), }; trace!("created image layer {}", layer.path().display()); - self.finished = true; - Ok(layer) } } - -impl Drop for ImageLayerWriter { - fn drop(&mut self) { - if let Some(page_image_writer) = self.values_writer.take() { - if let Ok(book) = page_image_writer.close() { - let _ = book.close(); - } - } - if !self.finished { - let _ = fs::remove_file(&self.path); - } - } -} diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 8670442a2c..8a24528732 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -5,10 +5,12 @@ //! its position in the file, is kept in memory, though. //! use crate::config::PageServerConf; +use crate::layered_repository::blob_io::{BlobCursor, BlobWriter}; +use crate::layered_repository::block_io::BlockReader; use crate::layered_repository::delta_layer::{DeltaLayer, DeltaLayerWriter}; use crate::layered_repository::ephemeral_file::EphemeralFile; use crate::layered_repository::storage_layer::{ - BlobRef, Layer, ValueReconstructResult, ValueReconstructState, + Layer, ValueReconstructResult, ValueReconstructState, }; use crate::repository::{Key, Value}; use crate::walrecord; @@ -19,9 +21,7 @@ use std::collections::HashMap; // avoid binding to Write (conflicts with std::io::Write) // while being able to use std::fmt::Write's methods use std::fmt::Write as _; -use std::io::Write; use std::ops::Range; -use std::os::unix::fs::FileExt; use std::path::PathBuf; use std::sync::RwLock; use zenith_utils::bin_ser::BeSer; @@ -54,14 +54,12 @@ pub struct InMemoryLayerInner { /// by block number and LSN. The value is an offset into the /// ephemeral file where the page version is stored. /// - index: HashMap>, + index: HashMap>, /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. /// PerSeg::page_versions map stores offsets into this file. file: EphemeralFile, - - end_offset: u64, } impl InMemoryLayerInner { @@ -120,10 +118,12 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); + let mut reader = inner.file.block_cursor(); + // Scan the page versions backwards, starting from `lsn`. if let Some(vec_map) = inner.index.get(&key) { let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, blob_ref) in slice.iter().rev() { + for (entry_lsn, pos) in slice.iter().rev() { match &reconstruct_state.img { Some((cached_lsn, _)) if entry_lsn <= cached_lsn => { return Ok(ValueReconstructResult::Complete) @@ -131,8 +131,7 @@ impl Layer for InMemoryLayer { _ => {} } - let mut buf = vec![0u8; blob_ref.size()]; - inner.file.read_exact_at(&mut buf, blob_ref.pos())?; + let buf = reader.read_blob(*pos)?; let value = Value::des(&buf)?; match value { Value::Image(img) => { @@ -208,12 +207,12 @@ impl Layer for InMemoryLayer { return Ok(()); } + let mut cursor = inner.file.block_cursor(); let mut buf = Vec::new(); for (key, vec_map) in inner.index.iter() { - for (lsn, blob_ref) in vec_map.as_slice() { + for (lsn, pos) in vec_map.as_slice() { let mut desc = String::new(); - buf.resize(blob_ref.size(), 0); - inner.file.read_exact_at(&mut buf, blob_ref.pos())?; + cursor.read_blob_into_buf(*pos, &mut buf)?; let val = Value::des(&buf); match val { Ok(Value::Image(img)) => { @@ -268,7 +267,6 @@ impl InMemoryLayer { end_lsn: None, index: HashMap::new(), file, - end_offset: 0, }), }) } @@ -283,15 +281,10 @@ impl InMemoryLayer { inner.assert_writeable(); - let off = inner.end_offset; - let buf = Value::ser(&val)?; - let len = buf.len(); - inner.file.write_all(&buf)?; - inner.end_offset += len as u64; + let off = inner.file.write_blob(&Value::ser(&val)?)?; let vec_map = inner.index.entry(key).or_default(); - let blob_ref = BlobRef::new(off, len, val.will_init()); - let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0; + let old = vec_map.append_or_update_last(lsn, off).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. warn!("Key {} at {} already exists", key, lsn); @@ -345,21 +338,21 @@ impl InMemoryLayer { self.start_lsn..inner.end_lsn.unwrap(), )?; - let mut do_steps = || -> Result<()> { - for (key, vec_map) in inner.index.iter() { - // Write all page versions - for (lsn, blob_ref) in vec_map.as_slice() { - let mut buf = vec![0u8; blob_ref.size()]; - inner.file.read_exact_at(&mut buf, blob_ref.pos())?; - let val = Value::des(&buf)?; - delta_layer_writer.put_value(*key, *lsn, val)?; - } + let mut buf = Vec::new(); + + let mut cursor = inner.file.block_cursor(); + + let mut keys: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); + keys.sort_by_key(|k| k.0); + + for (key, vec_map) in keys.iter() { + let key = **key; + // Write all page versions + for (lsn, pos) in vec_map.as_slice() { + cursor.read_blob_into_buf(*pos, &mut buf)?; + let val = Value::des(&buf)?; + delta_layer_writer.put_value(key, *lsn, val)?; } - Ok(()) - }; - if let Err(err) = do_steps() { - delta_layer_writer.abort(); - return Err(err); } let delta_layer = delta_layer_writer.finish(Key::MAX)?; diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 2711640736..b5366da223 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -150,9 +150,10 @@ pub trait Layer: Send + Sync { const WILL_INIT: u64 = 1; /// -/// Struct representing reference to BLOB in layers. Reference contains BLOB offset and size. -/// For WAL records (delta layer) it also contains `will_init` flag which helps to determine range of records -/// which needs to be applied without reading/deserializing records themselves. +/// Struct representing reference to BLOB in layers. Reference contains BLOB +/// offset, and for WAL records it also contains `will_init` flag. The flag +/// helps to determine the range of records that needs to be applied, without +/// reading/deserializing records themselves. /// #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub struct BlobRef(u64); @@ -163,15 +164,11 @@ impl BlobRef { } pub fn pos(&self) -> u64 { - self.0 >> 32 + self.0 >> 1 } - pub fn size(&self) -> usize { - ((self.0 & 0xFFFFFFFF) >> 1) as usize - } - - pub fn new(pos: u64, size: usize, will_init: bool) -> BlobRef { - let mut blob_ref = (pos << 32) | ((size as u64) << 1); + pub fn new(pos: u64, will_init: bool) -> BlobRef { + let mut blob_ref = pos << 1; if will_init { blob_ref |= WILL_INIT; } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 4790ab6652..6d2631b2b1 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -38,11 +38,11 @@ use pgdatadir_mapping::DatadirTimeline; /// This is embedded in the metadata file, and also in the header of all the /// layer files. If you make any backwards-incompatible changes to the storage /// format, bump this! -pub const STORAGE_FORMAT_VERSION: u16 = 1; +pub const STORAGE_FORMAT_VERSION: u16 = 2; // Magic constants used to identify different kinds of files -pub const IMAGE_FILE_MAGIC: u32 = 0x5A60_0000 | STORAGE_FORMAT_VERSION as u32; -pub const DELTA_FILE_MAGIC: u32 = 0x5A61_0000 | STORAGE_FORMAT_VERSION as u32; +pub const IMAGE_FILE_MAGIC: u16 = 0x5A60; +pub const DELTA_FILE_MAGIC: u16 = 0x5A61; lazy_static! { static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!( diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index c485e46f47..bd44384a44 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -56,7 +56,7 @@ use crate::layered_repository::writeback_ephemeral_file; use crate::repository::Key; static PAGE_CACHE: OnceCell = OnceCell::new(); -const TEST_PAGE_CACHE_SIZE: usize = 10; +const TEST_PAGE_CACHE_SIZE: usize = 50; /// /// Initialize the page cache. This must be called once at page server startup. @@ -90,6 +90,7 @@ const MAX_USAGE_COUNT: u8 = 5; /// CacheKey uniquely identifies a "thing" to cache in the page cache. /// #[derive(Debug, PartialEq, Eq, Clone)] +#[allow(clippy::enum_variant_names)] enum CacheKey { MaterializedPage { hash_key: MaterializedPageHashKey, @@ -99,6 +100,10 @@ enum CacheKey { file_id: u64, blkno: u32, }, + ImmutableFilePage { + file_id: u64, + blkno: u32, + }, } #[derive(Debug, PartialEq, Eq, Hash, Clone)] @@ -173,6 +178,8 @@ pub struct PageCache { ephemeral_page_map: RwLock>, + immutable_page_map: RwLock>, + /// The actual buffers with their metadata. slots: Box<[Slot]>, @@ -195,6 +202,12 @@ impl std::ops::Deref for PageReadGuard<'_> { } } +impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { + fn as_ref(&self) -> &[u8; PAGE_SZ] { + self.0.buf + } +} + /// /// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked /// until the guard is dropped. @@ -226,6 +239,12 @@ impl std::ops::Deref for PageWriteGuard<'_> { } } +impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { + fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] { + self.inner.buf + } +} + impl PageWriteGuard<'_> { /// Mark that the buffer contents are now valid. pub fn mark_valid(&mut self) { @@ -381,6 +400,36 @@ impl PageCache { } } + // Section 1.3: Public interface functions for working with immutable file pages. + + pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult { + let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; + + self.lock_for_read(&mut cache_key) + } + + /// Immediately drop all buffers belonging to given file, without writeback + pub fn drop_buffers_for_immutable(&self, drop_file_id: u64) { + for slot_idx in 0..self.slots.len() { + let slot = &self.slots[slot_idx]; + + let mut inner = slot.inner.write().unwrap(); + if let Some(key) = &inner.key { + match key { + CacheKey::ImmutableFilePage { file_id, blkno: _ } + if *file_id == drop_file_id => + { + // remove mapping for old buffer + self.remove_mapping(key); + inner.key = None; + inner.dirty = false; + } + _ => {} + } + } + } + } + // // Section 2: Internal interface functions for lookup/update. // @@ -578,6 +627,10 @@ impl PageCache { let map = self.ephemeral_page_map.read().unwrap(); Some(*map.get(&(*file_id, *blkno))?) } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let map = self.immutable_page_map.read().unwrap(); + Some(*map.get(&(*file_id, *blkno))?) + } } } @@ -601,6 +654,10 @@ impl PageCache { let map = self.ephemeral_page_map.read().unwrap(); Some(*map.get(&(*file_id, *blkno))?) } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let map = self.immutable_page_map.read().unwrap(); + Some(*map.get(&(*file_id, *blkno))?) + } } } @@ -632,6 +689,11 @@ impl PageCache { map.remove(&(*file_id, *blkno)) .expect("could not find old key in mapping"); } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let mut map = self.immutable_page_map.write().unwrap(); + map.remove(&(*file_id, *blkno)) + .expect("could not find old key in mapping"); + } } } @@ -672,6 +734,16 @@ impl PageCache { } } } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let mut map = self.immutable_page_map.write().unwrap(); + match map.entry((*file_id, *blkno)) { + Entry::Occupied(entry) => Some(*entry.get()), + Entry::Vacant(entry) => { + entry.insert(slot_idx); + None + } + } + } } } @@ -749,6 +821,13 @@ impl PageCache { CacheKey::EphemeralPage { file_id, blkno } => { writeback_ephemeral_file(*file_id, *blkno, buf) } + CacheKey::ImmutableFilePage { + file_id: _, + blkno: _, + } => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "unexpected dirty immutable page", + )), } } @@ -779,6 +858,7 @@ impl PageCache { Self { materialized_page_map: Default::default(), ephemeral_page_map: Default::default(), + immutable_page_map: Default::default(), slots, next_evict_slot: AtomicUsize::new(0), } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 858cff29cb..64f9db2338 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -65,6 +65,7 @@ lazy_static! { /// currently open, the 'handle' can still point to the slot where it was last kept. The /// 'tag' field is used to detect whether the handle still is valid or not. /// +#[derive(Debug)] pub struct VirtualFile { /// Lazy handle to the global file descriptor cache. The slot that this points to /// might contain our File, or it may be empty, or it may contain a File that @@ -88,7 +89,7 @@ pub struct VirtualFile { timelineid: String, } -#[derive(PartialEq, Clone, Copy)] +#[derive(Debug, PartialEq, Clone, Copy)] struct SlotHandle { /// Index into OPEN_FILES.slots index: usize,