Refactor the I/O functions.

This introduces two new abstraction layers for I/O:

- Block I/O, and
- Blob I/O.

The BlockReader trait abstracts a file or something else that can be read
in 8kB pages. It is implemented by EphemeralFiles, and by a new
FileBlockReader struct that allows reading arbitrary VirtualFiles in that
manner, utilizing the page cache.

There is also a new BlockCursor struct that works as a cursor over a
BlockReader. When you create a BlockCursor and read the first page using
it, it keeps the reference to the page. If you access the same page again,
it avoids going to page cache and quickly returns the same page again.
That can save a lot of lookups in the page cache if you perform multiple
reads.

The Blob-oriented API allows reading and writing "blobs" of arbitrary
length. It is a layer on top of the block-oriented API. When you write
a blob with the write_blob() function, it writes a length field
followed by the actual data to the underlying block storage, and
returns the offset where the blob was stored. The blob can be
retrieved later using the offset.

Finally, this replaces the I/O code in image-, delta-, and in-memory
layers to use the new abstractions. These replace the 'bookfile'
crate.

This is a backwards-incompatible change to the storage format.
This commit is contained in:
Heikki Linnakangas
2022-04-07 20:50:08 +03:00
parent 81ba23094e
commit 5d9851f5d1
14 changed files with 774 additions and 405 deletions

36
Cargo.lock generated
View File

@@ -141,30 +141,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aversion"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41992ab8cfcc3026ef9abceffe0c2b0479c043183fc23825e30d22baab6df334"
dependencies = [
"aversion-macros",
"byteorder",
"serde",
"serde_cbor",
"thiserror",
]
[[package]]
name = "aversion-macros"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ba5785f953985aa0caca927ba4005880f3b4f53de87f134e810ae3549f744d2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "aws-creds"
version = "0.27.1"
@@ -264,17 +240,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bookfile"
version = "0.3.0"
source = "git+https://github.com/zenithdb/bookfile.git?rev=bf6e43825dfb6e749ae9b80e8372c8fea76cec2f#bf6e43825dfb6e749ae9b80e8372c8fea76cec2f"
dependencies = [
"aversion",
"byteorder",
"serde",
"thiserror",
]
[[package]]
name = "boxfnonce"
version = "0.1.1"
@@ -1524,7 +1489,6 @@ dependencies = [
"anyhow",
"async-compression",
"async-trait",
"bookfile",
"byteorder",
"bytes",
"chrono",

View File

@@ -4,7 +4,6 @@ version = "0.1.0"
edition = "2021"
[dependencies]
bookfile = { git = "https://github.com/zenithdb/bookfile.git", rev="bf6e43825dfb6e749ae9b80e8372c8fea76cec2f" }
chrono = "0.4.19"
rand = "0.8.3"
regex = "1.4.5"

View File

@@ -4,6 +4,7 @@
use anyhow::Result;
use clap::{App, Arg};
use pageserver::layered_repository::dump_layerfile_from_path;
use pageserver::page_cache;
use pageserver::virtual_file;
use std::path::PathBuf;
use zenith_utils::GIT_VERSION;
@@ -24,6 +25,7 @@ fn main() -> Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);
dump_layerfile_from_path(&path, true)?;

View File

@@ -12,7 +12,6 @@
//!
use anyhow::{anyhow, bail, ensure, Context, Result};
use bookfile::Book;
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
@@ -56,6 +55,8 @@ use zenith_utils::crashsafe_dir;
use zenith_utils::lsn::{AtomicLsn, Lsn, RecordLsn};
use zenith_utils::seqwait::SeqWait;
mod blob_io;
pub mod block_io;
mod delta_layer;
pub(crate) mod ephemeral_file;
mod filename;
@@ -2054,16 +2055,17 @@ impl<'a> TimelineWriter<'_> for LayeredTimelineWriter<'a> {
/// Dump contents of a layer file to stdout.
pub fn dump_layerfile_from_path(path: &Path, verbose: bool) -> Result<()> {
let file = File::open(path)?;
let book = Book::new(file)?;
use std::os::unix::fs::FileExt;
match book.magic() {
crate::DELTA_FILE_MAGIC => {
DeltaLayer::new_for_path(path, &book)?.dump(verbose)?;
}
crate::IMAGE_FILE_MAGIC => {
ImageLayer::new_for_path(path, &book)?.dump(verbose)?;
}
// All layer files start with a two-byte "magic" value, to identify the kind of
// file.
let file = File::open(path)?;
let mut header_buf = [0u8; 2];
file.read_exact_at(&mut header_buf, 0)?;
match u16::from_be_bytes(header_buf) {
crate::IMAGE_FILE_MAGIC => ImageLayer::new_for_path(path, file)?.dump(verbose)?,
crate::DELTA_FILE_MAGIC => DeltaLayer::new_for_path(path, file)?.dump(verbose)?,
magic => bail!("unrecognized magic identifier: {:?}", magic),
}
@@ -2274,7 +2276,6 @@ pub mod tests {
lsn,
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
println!("updating {} at {}", blknum, lsn);
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;

View File

@@ -0,0 +1,122 @@
//!
//! Functions for reading and writing variable-sized "blobs".
//!
//! Each blob begins with a 4-byte length, followed by the actual data.
//!
use crate::layered_repository::block_io::{BlockCursor, BlockReader};
use crate::page_cache::PAGE_SZ;
use std::cmp::min;
use std::io::Error;
/// For reading
pub trait BlobCursor {
fn read_blob(&mut self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf)?;
Ok(buf)
}
fn read_blob_into_buf(
&mut self,
offset: u64,
dstbuf: &mut Vec<u8>,
) -> Result<(), std::io::Error>;
}
impl<'a, R> BlobCursor for BlockCursor<R>
where
R: BlockReader,
{
fn read_blob_into_buf(
&mut self,
offset: u64,
dstbuf: &mut Vec<u8>,
) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
let mut buf = self.read_blk(blknum)?;
// read length
let mut len_buf = [0u8; 4];
let thislen = PAGE_SZ - off;
if thislen < 4 {
// it is split across two pages
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
blknum += 1;
buf = self.read_blk(blknum)?;
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
off = 4 - thislen;
} else {
len_buf.copy_from_slice(&buf[off..off + 4]);
off += 4;
}
let len = u32::from_ne_bytes(len_buf) as usize;
dstbuf.clear();
// Read the payload
let mut remain = len;
while remain > 0 {
let mut page_remain = PAGE_SZ - off;
if page_remain == 0 {
// continue on next page
blknum += 1;
buf = self.read_blk(blknum)?;
off = 0;
page_remain = PAGE_SZ;
}
let this_blk_len = min(remain, page_remain);
dstbuf.extend_from_slice(&buf[off..off + this_blk_len]);
remain -= this_blk_len;
off += this_blk_len;
}
Ok(())
}
}
pub trait BlobWriter {
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error>;
}
pub struct WriteBlobWriter<W>
where
W: std::io::Write,
{
inner: W,
offset: u64,
}
impl<W> WriteBlobWriter<W>
where
W: std::io::Write,
{
pub fn new(inner: W, start_offset: u64) -> Self {
WriteBlobWriter {
inner,
offset: start_offset,
}
}
pub fn size(&self) -> u64 {
self.offset
}
pub fn into_inner(self) -> W {
self.inner
}
}
impl<W> BlobWriter for WriteBlobWriter<W>
where
W: std::io::Write,
{
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
let offset = self.offset;
self.inner
.write_all(&((srcbuf.len()) as u32).to_ne_bytes())?;
self.inner.write_all(srcbuf)?;
self.offset += 4 + srcbuf.len() as u64;
Ok(offset)
}
}

View File

@@ -0,0 +1,176 @@
//!
//! Low-level Block-oriented I/O functions
//!
//!
//!
use crate::page_cache;
use crate::page_cache::{ReadBufResult, PAGE_SZ};
use lazy_static::lazy_static;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
use std::sync::atomic::AtomicU64;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
///
/// There are currently two implementations: EphemeralFile, and FileBlockReader
/// below.
pub trait BlockReader {
type BlockLease: Deref<Target = [u8; PAGE_SZ]> + 'static;
///
/// Read a block. Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
///
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error>;
///
/// Create a new "cursor" for reading from this reader.
///
/// A cursor caches the last accessed page, allowing for faster
/// access if the same block is accessed repeatedly.
fn block_cursor(&self) -> BlockCursor<&Self>
where
Self: Sized,
{
BlockCursor::new(self)
}
}
impl<B> BlockReader for &B
where
B: BlockReader,
{
type BlockLease = B::BlockLease;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
(*self).read_blk(blknum)
}
}
///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///
/// A cursor caches the last accessed page, allowing for faster access if the
/// same block is accessed repeatedly.
///
/// You can access the last page with `*cursor`. 'read_blk' returns 'self', so
/// that in many cases you can use a BlockCursor as a drop-in replacement for
/// the underlying BlockReader. For example:
///
/// ```no_run
/// # use pageserver::layered_repository::block_io::{BlockReader, FileBlockReader};
/// # let reader: FileBlockReader<std::fs::File> = todo!();
/// let cursor = reader.block_cursor();
/// let buf = cursor.read_blk(1);
/// // do stuff with 'buf'
/// let buf = cursor.read_blk(2);
/// // do stuff with 'buf'
/// ```
///
pub struct BlockCursor<R>
where
R: BlockReader,
{
reader: R,
/// last accessed page
cache: Option<(u32, R::BlockLease)>,
}
impl<R> BlockCursor<R>
where
R: BlockReader,
{
pub fn new(reader: R) -> Self {
BlockCursor {
reader,
cache: None,
}
}
pub fn read_blk(&mut self, blknum: u32) -> Result<&Self, std::io::Error> {
// Fast return if this is the same block as before
if let Some((cached_blk, _buf)) = &self.cache {
if *cached_blk == blknum {
return Ok(self);
}
}
// Read the block from the underlying reader, and cache it
self.cache = None;
let buf = self.reader.read_blk(blknum)?;
self.cache = Some((blknum, buf));
Ok(self)
}
}
impl<R> Deref for BlockCursor<R>
where
R: BlockReader,
{
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &<Self as Deref>::Target {
&self.cache.as_ref().unwrap().1
}
}
lazy_static! {
static ref NEXT_ID: AtomicU64 = AtomicU64::new(1);
}
/// An adapter for reading a (virtual) file using the page cache.
///
/// The file is assumed to be immutable. This doesn't provide any functions
/// for modifying the file, nor for invalidating the cache if it is modified.
pub struct FileBlockReader<F> {
pub file: F,
/// Unique ID of this file, used as key in the page cache.
file_id: u64,
}
impl<F> FileBlockReader<F>
where
F: FileExt,
{
pub fn new(file: F) -> Self {
let file_id = NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
FileBlockReader { file_id, file }
}
/// Read a page from the underlying file into given buffer.
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
}
}
impl<F> BlockReader for FileBlockReader<F>
where
F: FileExt,
{
type BlockLease = page_cache::PageReadGuard<'static>;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
// Look up the right page
let cache = page_cache::get();
loop {
match cache.read_immutable_buf(self.file_id, blknum) {
ReadBufResult::Found(guard) => break Ok(guard),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
}
}

View File

@@ -23,21 +23,27 @@
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
//!
//!
//! A delta file is constructed using the 'bookfile' crate. Each file consists of three
//! parts: the 'index', the values, and a short summary header. They are stored as
//! separate chapters.
//! Every delta file consists of three parts: "summary", "index", and
//! "values". The summary is a fixed size header at the beginning of the file,
//! and it contains basic information about the layer, and offsets to the other
//! parts. The "index" is a serialized HashMap mapping from Key and LSN to an offset in the
//! "values" part. The actual page images and WAL records are stored in the
//! "values" part.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockCursor, BlockReader, FileBlockReader};
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
BlobRef, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value};
use crate::virtual_file::VirtualFile;
use crate::walrecord;
use crate::DELTA_FILE_MAGIC;
use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@@ -46,44 +52,43 @@ use zenith_utils::vec_map::VecMap;
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::fs;
use std::io::BufWriter;
use std::io::Write;
use std::io::{BufWriter, Write};
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
use bookfile::{Book, BookWriter, ChapterWriter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
/// Mapping from (key, lsn) -> page/WAL record
/// byte ranges in VALUES_CHAPTER
static INDEX_CHAPTER: u64 = 1;
/// Page/WAL bytes - cannot be interpreted
/// without the page versions from the INDEX_CHAPTER
static VALUES_CHAPTER: u64 = 2;
/// Contains the [`Summary`] struct
static SUMMARY_CHAPTER: u64 = 3;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct Summary {
/// Magic value to identify this as a zenith delta file. Always DELTA_FILE_MAGIC.
magic: u16,
format_version: u16,
tenantid: ZTenantId,
timelineid: ZTimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
}
impl From<&DeltaLayer> for Summary {
fn from(layer: &DeltaLayer) -> Self {
Self {
magic: DELTA_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenantid: layer.tenantid,
timelineid: layer.timelineid,
key_range: layer.key_range.clone(),
lsn_range: layer.lsn_range.clone(),
index_start_blk: 0,
}
}
}
@@ -118,7 +123,11 @@ pub struct DeltaLayerInner {
///
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
book: Option<Book<VirtualFile>>,
// values copied from summary
index_start_blk: u32,
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
}
impl Layer for DeltaLayer {
@@ -155,45 +164,28 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load()?;
let values_reader = inner
.book
.as_ref()
.expect("should be loaded in load call above")
.chapter_reader(VALUES_CHAPTER)?;
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let mut reader = inner.file.as_ref().unwrap().block_cursor();
let slice = vec_map.slice_range(lsn_range);
let mut size = 0usize;
let mut first_pos = 0u64;
for (_entry_lsn, blob_ref) in slice.iter().rev() {
size += blob_ref.size();
first_pos = blob_ref.pos();
if blob_ref.will_init() {
break;
}
}
if size != 0 {
let mut buf = vec![0u8; size];
values_reader.read_exact_at(&mut buf, first_pos)?;
for (entry_lsn, blob_ref) in slice.iter().rev() {
let offs = (blob_ref.pos() - first_pos) as usize;
let val = Value::des(&buf[offs..offs + blob_ref.size()])?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
for (entry_lsn, blob_ref) in slice.iter().rev() {
let buf = reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
@@ -210,7 +202,7 @@ impl Layer for DeltaLayer {
}
}
fn iter(&self) -> Box<dyn Iterator<Item = anyhow::Result<(Key, Lsn, Value)>> + '_> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = anyhow::Result<(Key, Lsn, Value)>> + 'a> {
let inner = self.load().unwrap();
match DeltaValueIter::new(inner) {
@@ -281,20 +273,16 @@ impl Layer for DeltaLayer {
let inner = self.load()?;
let path = self.path();
let file = std::fs::File::open(&path)?;
let book = Book::new(file)?;
let chapter = book.chapter_reader(VALUES_CHAPTER)?;
let mut values: Vec<(&Key, &VecMap<Lsn, BlobRef>)> = inner.index.iter().collect();
values.sort_by_key(|k| k.0);
let mut reader = inner.file.as_ref().unwrap().block_cursor();
for (key, versions) in values {
for (lsn, blob_ref) in versions.as_slice() {
let mut desc = String::new();
let mut buf = vec![0u8; blob_ref.size()];
match chapter.read_exact_at(&mut buf, blob_ref.pos()) {
Ok(()) => {
match reader.read_blob(blob_ref.pos()) {
Ok(buf) => {
let val = Value::des(&buf);
match val {
@@ -378,19 +366,19 @@ impl DeltaLayer {
let path = self.path();
// Open the file if it's not open already.
if inner.book.is_none() {
let file = VirtualFile::open(&path)?;
inner.book = Some(Book::new(file)?);
if inner.file.is_none() {
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
inner.file = Some(FileBlockReader::new(file));
}
let book = inner.book.as_ref().unwrap();
let file = inner.file.as_mut().unwrap();
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
match &self.path_or_conf {
PathOrConf::Conf(_) => {
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let actual_summary = Summary::des(&chapter)?;
let expected_summary = Summary::from(self);
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
}
@@ -409,8 +397,13 @@ impl DeltaLayer {
}
}
let chapter = book.read_chapter(INDEX_CHAPTER)?;
let index = HashMap::des(&chapter)?;
file.file.seek(SeekFrom::Start(
actual_summary.index_start_blk as u64 * PAGE_SZ as u64,
))?;
let mut buf_reader = std::io::BufReader::new(&mut file.file);
let index = HashMap::des_from(&mut buf_reader)?;
inner.index_start_blk = actual_summary.index_start_blk;
debug!("loaded from {}", &path.display());
@@ -434,8 +427,9 @@ impl DeltaLayer {
lsn_range: filename.lsn_range.clone(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
book: None,
index: HashMap::default(),
file: None,
index_start_blk: 0,
}),
}
}
@@ -443,12 +437,14 @@ impl DeltaLayer {
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'dump_layerfile' binary.
pub fn new_for_path<F>(path: &Path, book: &Book<F>) -> Result<Self>
pub fn new_for_path<F>(path: &Path, file: F) -> Result<Self>
where
F: FileExt,
{
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let summary = Summary::des(&chapter)?;
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
file.read_exact_at(&mut summary_buf, 0)?;
let summary = Summary::des_prefix(&summary_buf)?;
Ok(DeltaLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()),
@@ -458,8 +454,9 @@ impl DeltaLayer {
lsn_range: summary.lsn_range,
inner: RwLock::new(DeltaLayerInner {
loaded: false,
book: None,
file: None,
index: HashMap::default(),
index_start_blk: 0,
}),
})
}
@@ -504,8 +501,7 @@ pub struct DeltaLayerWriter {
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
values_writer: ChapterWriter<BufWriter<VirtualFile>>,
end_offset: u64,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
}
impl DeltaLayerWriter {
@@ -531,13 +527,10 @@ impl DeltaLayerWriter {
u64::from(lsn_range.start),
u64::from(lsn_range.end)
));
let file = VirtualFile::create(&path)?;
let mut file = VirtualFile::create(&path)?;
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
let buf_writer = BufWriter::new(file);
let book = BookWriter::new(buf_writer, DELTA_FILE_MAGIC)?;
// Open the page-versions chapter for writing. The calls to
// `put_value` will use this to write the contents.
let values_writer = book.new_chapter(VALUES_CHAPTER);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
Ok(DeltaLayerWriter {
conf,
@@ -547,8 +540,7 @@ impl DeltaLayerWriter {
key_start,
lsn_range,
index: HashMap::new(),
values_writer,
end_offset: 0,
blob_writer,
})
}
@@ -558,17 +550,12 @@ impl DeltaLayerWriter {
/// The values must be appended in key, lsn order.
///
pub fn put_value(&mut self, key: Key, lsn: Lsn, val: Value) -> Result<()> {
//info!("DELTA: key {} at {} on {}", key, lsn, self.path.display());
assert!(self.lsn_range.start <= lsn);
// Remember the offset and size metadata. The metadata is written
// to a separate chapter, in `finish`.
let off = self.end_offset;
let buf = Value::ser(&val)?;
let len = buf.len();
self.values_writer.write_all(&buf)?;
self.end_offset += len as u64;
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let vec_map = self.index.entry(key).or_default();
let blob_ref = BlobRef::new(off, len, val.will_init());
let blob_ref = BlobRef::new(off, val.will_init());
let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
@@ -583,38 +570,40 @@ impl DeltaLayerWriter {
}
pub fn size(&self) -> u64 {
self.end_offset
self.blob_writer.size()
}
///
/// Finish writing the delta layer.
///
pub fn finish(self, key_end: Key) -> anyhow::Result<DeltaLayer> {
// Close the values chapter
let book = self.values_writer.close()?;
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let buf_writer = self.blob_writer.into_inner();
let mut file = buf_writer.into_inner()?;
// Write out the index
let mut chapter = book.new_chapter(INDEX_CHAPTER);
let buf = HashMap::ser(&self.index)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?;
file.write_all(&buf)?;
let mut chapter = book.new_chapter(SUMMARY_CHAPTER);
// Fill in the summary on blk 0
let summary = Summary {
magic: DELTA_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenantid: self.tenantid,
timelineid: self.timelineid,
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
index_start_blk,
};
Summary::ser_into(&summary, &mut chapter)?;
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
book.close()?;
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
// Note: Because we opened the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.book here. The first read will have to re-open it.
// set inner.file here. The first read will have to re-open it.
let layer = DeltaLayer {
path_or_conf: PathOrConf::Conf(self.conf),
tenantid: self.tenantid,
@@ -624,7 +613,8 @@ impl DeltaLayerWriter {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
index: HashMap::new(),
book: None,
file: None,
index_start_blk,
}),
};
@@ -647,22 +637,6 @@ impl DeltaLayerWriter {
Ok(layer)
}
pub fn abort(self) {
match self.values_writer.close() {
Ok(book) => {
if let Err(err) = book.close() {
error!("error while closing delta layer file: {}", err);
}
}
Err(err) => {
error!("error while closing chapter writer: {}", err);
}
}
if let Err(err) = std::fs::remove_file(self.path) {
error!("error removing unfinished delta layer file: {}", err);
}
}
}
///
@@ -672,13 +646,23 @@ impl DeltaLayerWriter {
/// That takes up quite a lot of memory. Should do this in a more streaming
/// fashion.
///
struct DeltaValueIter {
struct DeltaValueIter<'a> {
all_offsets: Vec<(Key, Lsn, BlobRef)>,
next_idx: usize,
data: Vec<u8>,
reader: BlockCursor<Adapter<'a>>,
}
impl Iterator for DeltaValueIter {
struct Adapter<'a>(RwLockReadGuard<'a, DeltaLayerInner>);
impl<'a> BlockReader for Adapter<'a> {
type BlockLease = PageReadGuard<'static>;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
self.0.file.as_ref().unwrap().read_blk(blknum)
}
}
impl<'a> Iterator for DeltaValueIter<'a> {
type Item = Result<(Key, Lsn, Value)>;
fn next(&mut self) -> Option<Self::Item> {
@@ -686,8 +670,8 @@ impl Iterator for DeltaValueIter {
}
}
impl DeltaValueIter {
fn new(inner: RwLockReadGuard<DeltaLayerInner>) -> Result<Self> {
impl<'a> DeltaValueIter<'a> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let mut index: Vec<(&Key, &VecMap<Lsn, BlobRef>)> = inner.index.iter().collect();
index.sort_by_key(|x| x.0);
@@ -698,30 +682,24 @@ impl DeltaValueIter {
}
}
let values_reader = inner
.book
.as_ref()
.expect("should be loaded in load call above")
.chapter_reader(VALUES_CHAPTER)?;
let file_size = values_reader.len() as usize;
let mut layer = DeltaValueIter {
let iter = DeltaValueIter {
all_offsets,
next_idx: 0,
data: vec![0u8; file_size],
reader: BlockCursor::new(Adapter(inner)),
};
values_reader.read_exact_at(&mut layer.data, 0)?;
Ok(layer)
Ok(iter)
}
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if self.next_idx < self.all_offsets.len() {
let (key, lsn, blob_ref) = self.all_offsets[self.next_idx];
let offs = blob_ref.pos() as usize;
let size = blob_ref.size();
let val = Value::des(&self.data[offs..offs + size])?;
let (key, lsn, off) = &self.all_offsets[self.next_idx];
//let mut reader = BlobReader::new(self.inner.file.as_ref().unwrap());
let buf = self.reader.read_blob(off.pos())?;
let val = Value::des(&buf)?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))
Ok(Some((*key, *lsn, val)))
} else {
Ok(None)
}

View File

@@ -2,6 +2,8 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::BlobWriter;
use crate::layered_repository::block_io::BlockReader;
use crate::page_cache;
use crate::page_cache::PAGE_SZ;
use crate::page_cache::{ReadBufResult, WriteBufResult};
@@ -10,7 +12,7 @@ use lazy_static::lazy_static;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::{Error, ErrorKind, Seek, SeekFrom, Write};
use std::io::{Error, ErrorKind};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
@@ -41,7 +43,7 @@ pub struct EphemeralFile {
_timelineid: ZTimelineId,
file: Arc<VirtualFile>,
pos: u64,
size: u64,
}
impl EphemeralFile {
@@ -70,11 +72,11 @@ impl EphemeralFile {
_tenantid: tenantid,
_timelineid: timelineid,
file: file_rc,
pos: 0,
size: 0,
})
}
pub fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> {
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> {
let mut off = 0;
while off < PAGE_SZ {
let n = self
@@ -93,6 +95,26 @@ impl EphemeralFile {
}
Ok(())
}
fn get_buf_for_write(&self, blkno: u32) -> Result<page_cache::PageWriteGuard, Error> {
// Look up the right page
let cache = page_cache::get();
let mut write_guard = match cache.write_ephemeral_buf(self.file_id, blkno) {
WriteBufResult::Found(guard) => guard,
WriteBufResult::NotFound(mut guard) => {
// Read the page from disk into the buffer
// TODO: if we're overwriting the whole page, no need to read it in first
self.fill_buffer(guard.deref_mut(), blkno)?;
guard.mark_valid();
// And then fall through to modify it.
guard
}
};
write_guard.mark_dirty();
Ok(write_guard)
}
}
/// Does the given filename look like an ephemeral file?
@@ -167,48 +189,49 @@ impl FileExt for EphemeralFile {
}
}
impl Write for EphemeralFile {
fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
let n = self.write_at(buf, self.pos)?;
self.pos += n as u64;
Ok(n)
}
impl BlobWriter for EphemeralFile {
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
let pos = self.size;
fn flush(&mut self) -> Result<(), std::io::Error> {
// we don't need to flush data:
// * we either write input bytes or not, not keeping any intermediate data buffered
// * rust unix file `flush` impl does not flush things either, returning `Ok(())`
Ok(())
}
}
let mut blknum = (self.size / PAGE_SZ as u64) as u32;
let mut off = (pos % PAGE_SZ as u64) as usize;
impl Seek for EphemeralFile {
fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match pos {
SeekFrom::Start(offset) => {
self.pos = offset;
}
SeekFrom::End(_offset) => {
return Err(Error::new(
ErrorKind::Other,
"SeekFrom::End not supported by EphemeralFile",
));
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
let mut buf = self.get_buf_for_write(blknum)?;
// Write the length field
let len_buf = u32::to_ne_bytes(srcbuf.len() as u32);
let thislen = PAGE_SZ - off;
if thislen < 4 {
// it needs to be split across pages
buf[off..(off + thislen)].copy_from_slice(&len_buf[..thislen]);
blknum += 1;
buf = self.get_buf_for_write(blknum)?;
buf[0..4 - thislen].copy_from_slice(&len_buf[thislen..]);
off = 4 - thislen;
} else {
buf[off..off + 4].copy_from_slice(&len_buf);
off += 4;
}
Ok(self.pos)
// Write the payload
let mut buf_remain = srcbuf;
while !buf_remain.is_empty() {
let mut page_remain = PAGE_SZ - off;
if page_remain == 0 {
blknum += 1;
buf = self.get_buf_for_write(blknum)?;
off = 0;
page_remain = PAGE_SZ;
}
let this_blk_len = min(page_remain, buf_remain.len());
buf[off..(off + this_blk_len)].copy_from_slice(&buf_remain[..this_blk_len]);
off += this_blk_len;
buf_remain = &buf_remain[this_blk_len..];
}
drop(buf);
self.size += 4 + srcbuf.len() as u64;
Ok(pos)
}
}
@@ -239,11 +262,34 @@ pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Er
}
}
impl BlockReader for EphemeralFile {
type BlockLease = page_cache::PageReadGuard<'static>;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
// Look up the right page
let cache = page_cache::get();
loop {
match cache.read_ephemeral_buf(self.file_id, blknum) {
ReadBufResult::Found(guard) => return Ok(guard),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::seq::SliceRandom;
use rand::thread_rng;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockCursor;
use rand::{seq::SliceRandom, thread_rng, RngCore};
use std::fs;
use std::str::FromStr;
@@ -281,19 +327,19 @@ mod tests {
fn test_ephemeral_files() -> Result<(), Error> {
let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?;
let mut file_a = EphemeralFile::create(conf, tenantid, timelineid)?;
let file_a = EphemeralFile::create(conf, tenantid, timelineid)?;
file_a.write_all(b"foo")?;
file_a.write_all_at(b"foo", 0)?;
assert_eq!("foo", read_string(&file_a, 0, 20)?);
file_a.write_all(b"bar")?;
file_a.write_all_at(b"bar", 3)?;
assert_eq!("foobar", read_string(&file_a, 0, 20)?);
// Open a lot of files, enough to cause some page evictions.
let mut efiles = Vec::new();
for fileno in 0..100 {
let mut efile = EphemeralFile::create(conf, tenantid, timelineid)?;
efile.write_all(format!("file {}", fileno).as_bytes())?;
let efile = EphemeralFile::create(conf, tenantid, timelineid)?;
efile.write_all_at(format!("file {}", fileno).as_bytes(), 0)?;
assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?);
efiles.push((fileno, efile));
}
@@ -307,4 +353,41 @@ mod tests {
Ok(())
}
#[test]
fn test_ephemeral_blobs() -> Result<(), Error> {
let (conf, tenantid, timelineid) = repo_harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenantid, timelineid)?;
let pos_foo = file.write_blob(b"foo")?;
assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice());
let pos_bar = file.write_blob(b"bar")?;
assert_eq!(b"foo", file.block_cursor().read_blob(pos_foo)?.as_slice());
assert_eq!(b"bar", file.block_cursor().read_blob(pos_bar)?.as_slice());
let mut blobs = Vec::new();
for i in 0..10000 {
let data = Vec::from(format!("blob{}", i).as_bytes());
let pos = file.write_blob(&data)?;
blobs.push((pos, data));
}
let mut cursor = BlockCursor::new(&file);
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos)?;
assert_eq!(actual, expected);
}
drop(cursor);
// Test a large blob that spans multiple pages
let mut large_data = Vec::new();
large_data.resize(20000, 0);
thread_rng().fill_bytes(&mut large_data);
let pos_large = file.write_blob(&large_data)?;
let result = file.block_cursor().read_blob(pos_large)?;
assert_eq!(result, large_data);
Ok(())
}
}

View File

@@ -13,63 +13,70 @@
//!
//! 000000067F000032BE0000400000000070B6-000000067F000032BE0000400000000080B6__00000000346BC568
//!
//! An image file is constructed using the 'bookfile' crate.
//! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the
//! beginning of the file, and it contains basic information about the
//! layer, and offsets to the other parts. The "index" is a serialized
//! HashMap, mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
//!
//! Only metadata is loaded into memory by the load function.
//! Only the "index" is loaded into memory by the load function.
//! When images are needed, they are read directly from disk.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockReader, FileBlockReader};
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
BlobRef, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::virtual_file::VirtualFile;
use crate::IMAGE_FILE_MAGIC;
use crate::{ZTenantId, ZTimelineId};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::io::{BufWriter, Write};
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, TryLockError};
use bookfile::{Book, BookWriter, ChapterWriter};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
/// Mapping from (key, lsn) -> page/WAL record
/// byte ranges in VALUES_CHAPTER
static INDEX_CHAPTER: u64 = 1;
/// Contains each block in block # order
const VALUES_CHAPTER: u64 = 2;
/// Contains the [`Summary`] struct
const SUMMARY_CHAPTER: u64 = 3;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct Summary {
/// Magic value to identify this as a zenith image file. Always IMAGE_FILE_MAGIC.
magic: u16,
format_version: u16,
tenantid: ZTenantId,
timelineid: ZTimelineId,
key_range: Range<Key>,
lsn: Lsn,
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
}
impl From<&ImageLayer> for Summary {
fn from(layer: &ImageLayer) -> Self {
Self {
magic: IMAGE_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenantid: layer.tenantid,
timelineid: layer.timelineid,
key_range: layer.key_range.clone(),
lsn: layer.lsn,
index_start_blk: 0,
}
}
}
@@ -97,12 +104,14 @@ pub struct ImageLayerInner {
/// If false, the 'index' has not been loaded into memory yet.
loaded: bool,
/// The underlying (virtual) file handle. None if the layer hasn't been loaded
/// yet.
book: Option<Book<VirtualFile>>,
/// offset of each value
index: HashMap<Key, BlobRef>,
// values copied from summary
index_start_blk: u32,
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
}
impl Layer for ImageLayer {
@@ -138,26 +147,21 @@ impl Layer for ImageLayer {
assert!(lsn_range.end >= self.lsn);
let inner = self.load()?;
if let Some(blob_ref) = inner.index.get(&key) {
let chapter = inner
.book
let buf = inner
.file
.as_ref()
.unwrap()
.chapter_reader(VALUES_CHAPTER)?;
let mut blob = vec![0; blob_ref.size()];
chapter
.read_exact_at(&mut blob, blob_ref.pos())
.block_cursor()
.read_blob(blob_ref.pos())
.with_context(|| {
format!(
"failed to read {} bytes from data file {} at offset {}",
blob_ref.size(),
"failed to read blob from data file {} at offset {}",
self.filename().display(),
blob_ref.pos()
)
})?;
let value = Bytes::from(blob);
let value = Bytes::from(buf);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
@@ -228,12 +232,7 @@ impl Layer for ImageLayer {
index_vec.sort_by_key(|x| x.1.pos());
for (key, blob_ref) in index_vec {
println!(
"key: {} size {} offset {}",
key,
blob_ref.size(),
blob_ref.pos()
);
println!("key: {} offset {}", key, blob_ref.pos());
}
Ok(())
@@ -291,21 +290,19 @@ impl ImageLayer {
let path = self.path();
// Open the file if it's not open already.
if inner.book.is_none() {
if inner.file.is_none() {
let file = VirtualFile::open(&path)
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
inner.book = Some(Book::new(file).with_context(|| {
format!("Failed to open file '{}' as a bookfile", path.display())
})?);
inner.file = Some(FileBlockReader::new(file));
}
let book = inner.book.as_ref().unwrap();
let file = inner.file.as_mut().unwrap();
let summary_blk = file.read_blk(0)?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
match &self.path_or_conf {
PathOrConf::Conf(_) => {
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let actual_summary = Summary::des(&chapter)?;
let expected_summary = Summary::from(self);
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
@@ -325,14 +322,18 @@ impl ImageLayer {
}
}
let chapter = book.read_chapter(INDEX_CHAPTER)?;
let index = HashMap::des(&chapter)?;
file.file.seek(SeekFrom::Start(
actual_summary.index_start_blk as u64 * PAGE_SZ as u64,
))?;
let mut buf_reader = std::io::BufReader::new(&mut file.file);
let index = HashMap::des_from(&mut buf_reader)?;
inner.index_start_blk = actual_summary.index_start_blk;
info!("loaded from {}", &path.display());
inner.index = index;
inner.loaded = true;
Ok(())
}
@@ -350,9 +351,10 @@ impl ImageLayer {
key_range: filename.key_range.clone(),
lsn: filename.lsn,
inner: RwLock::new(ImageLayerInner {
book: None,
index: HashMap::new(),
loaded: false,
file: None,
index_start_blk: 0,
}),
}
}
@@ -360,12 +362,14 @@ impl ImageLayer {
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'dump_layerfile' binary.
pub fn new_for_path<F>(path: &Path, book: &Book<F>) -> Result<ImageLayer>
pub fn new_for_path<F>(path: &Path, file: F) -> Result<ImageLayer>
where
F: std::os::unix::prelude::FileExt,
{
let chapter = book.read_chapter(SUMMARY_CHAPTER)?;
let summary = Summary::des(&chapter)?;
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
file.read_exact_at(&mut summary_buf, 0)?;
let summary = Summary::des_prefix(&summary_buf)?;
Ok(ImageLayer {
path_or_conf: PathOrConf::Path(path.to_path_buf()),
@@ -374,9 +378,10 @@ impl ImageLayer {
key_range: summary.key_range,
lsn: summary.lsn,
inner: RwLock::new(ImageLayerInner {
book: None,
file: None,
index: HashMap::new(),
loaded: false,
index_start_blk: 0,
}),
})
}
@@ -412,18 +417,15 @@ impl ImageLayer {
///
pub struct ImageLayerWriter {
conf: &'static PageServerConf,
path: PathBuf,
_path: PathBuf,
timelineid: ZTimelineId,
tenantid: ZTenantId,
key_range: Range<Key>,
lsn: Lsn,
values_writer: Option<ChapterWriter<BufWriter<VirtualFile>>>,
end_offset: u64,
index: HashMap<Key, BlobRef>,
finished: bool,
blob_writer: WriteBlobWriter<VirtualFile>,
}
impl ImageLayerWriter {
@@ -449,24 +451,17 @@ impl ImageLayerWriter {
);
info!("new image layer {}", path.display());
let file = VirtualFile::create(&path)?;
let buf_writer = BufWriter::new(file);
let book = BookWriter::new(buf_writer, IMAGE_FILE_MAGIC)?;
// Open the page-images chapter for writing. The calls to
// `put_image` will use this to write the contents.
let chapter = book.new_chapter(VALUES_CHAPTER);
let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64);
let writer = ImageLayerWriter {
conf,
path,
_path: path,
timelineid,
tenantid,
key_range: key_range.clone(),
lsn,
values_writer: Some(chapter),
index: HashMap::new(),
end_offset: 0,
finished: false,
blob_writer,
};
Ok(writer)
@@ -479,49 +474,41 @@ impl ImageLayerWriter {
///
pub fn put_image(&mut self, key: Key, img: &[u8]) -> Result<()> {
ensure!(self.key_range.contains(&key));
let off = self.end_offset;
let off = self.blob_writer.write_blob(img)?;
if let Some(writer) = &mut self.values_writer {
let len = img.len();
writer.write_all(img)?;
self.end_offset += len as u64;
let old = self.index.insert(key, BlobRef::new(off, len, true));
assert!(old.is_none());
} else {
panic!()
}
let old = self.index.insert(key, BlobRef::new(off, true));
assert!(old.is_none());
Ok(())
}
pub fn finish(&mut self) -> anyhow::Result<ImageLayer> {
// Close the values chapter
let book = self.values_writer.take().unwrap().close()?;
pub fn finish(self) -> anyhow::Result<ImageLayer> {
let index_start_blk =
((self.blob_writer.size() + PAGE_SZ as u64 - 1) / PAGE_SZ as u64) as u32;
let mut file = self.blob_writer.into_inner();
// Write out the index
let mut chapter = book.new_chapter(INDEX_CHAPTER);
let buf = HashMap::ser(&self.index)?;
chapter.write_all(&buf)?;
let book = chapter.close()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?;
file.write_all(&buf)?;
// Write out the summary chapter
let mut chapter = book.new_chapter(SUMMARY_CHAPTER);
// Fill in the summary on blk 0
let summary = Summary {
magic: IMAGE_FILE_MAGIC,
format_version: STORAGE_FORMAT_VERSION,
tenantid: self.tenantid,
timelineid: self.timelineid,
key_range: self.key_range.clone(),
lsn: self.lsn,
index_start_blk,
};
Summary::ser_into(&summary, &mut chapter)?;
let book = chapter.close()?;
// This flushes the underlying 'buf_writer'.
book.close()?;
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
// Note: Because we open the file in write-only mode, we cannot
// reuse the same VirtualFile for reading later. That's why we don't
// set inner.book here. The first read will have to re-open it.
// set inner.file here. The first read will have to re-open it.
let layer = ImageLayer {
path_or_conf: PathOrConf::Conf(self.conf),
timelineid: self.timelineid,
@@ -529,28 +516,14 @@ impl ImageLayerWriter {
key_range: self.key_range.clone(),
lsn: self.lsn,
inner: RwLock::new(ImageLayerInner {
book: None,
loaded: false,
index: HashMap::new(),
file: None,
index_start_blk,
}),
};
trace!("created image layer {}", layer.path().display());
self.finished = true;
Ok(layer)
}
}
impl Drop for ImageLayerWriter {
fn drop(&mut self) {
if let Some(page_image_writer) = self.values_writer.take() {
if let Ok(book) = page_image_writer.close() {
let _ = book.close();
}
}
if !self.finished {
let _ = fs::remove_file(&self.path);
}
}
}

View File

@@ -5,10 +5,12 @@
//! its position in the file, is kept in memory, though.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter};
use crate::layered_repository::block_io::BlockReader;
use crate::layered_repository::delta_layer::{DeltaLayer, DeltaLayerWriter};
use crate::layered_repository::ephemeral_file::EphemeralFile;
use crate::layered_repository::storage_layer::{
BlobRef, Layer, ValueReconstructResult, ValueReconstructState,
Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::repository::{Key, Value};
use crate::walrecord;
@@ -19,9 +21,7 @@ use std::collections::HashMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::io::Write;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::sync::RwLock;
use zenith_utils::bin_ser::BeSer;
@@ -54,14 +54,12 @@ pub struct InMemoryLayerInner {
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
///
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
index: HashMap<Key, VecMap<Lsn, u64>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
end_offset: u64,
}
impl InMemoryLayerInner {
@@ -120,10 +118,12 @@ impl Layer for InMemoryLayer {
let inner = self.inner.read().unwrap();
let mut reader = inner.file.block_cursor();
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, blob_ref) in slice.iter().rev() {
for (entry_lsn, pos) in slice.iter().rev() {
match &reconstruct_state.img {
Some((cached_lsn, _)) if entry_lsn <= cached_lsn => {
return Ok(ValueReconstructResult::Complete)
@@ -131,8 +131,7 @@ impl Layer for InMemoryLayer {
_ => {}
}
let mut buf = vec![0u8; blob_ref.size()];
inner.file.read_exact_at(&mut buf, blob_ref.pos())?;
let buf = reader.read_blob(*pos)?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
@@ -208,12 +207,12 @@ impl Layer for InMemoryLayer {
return Ok(());
}
let mut cursor = inner.file.block_cursor();
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
for (lsn, blob_ref) in vec_map.as_slice() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
buf.resize(blob_ref.size(), 0);
inner.file.read_exact_at(&mut buf, blob_ref.pos())?;
cursor.read_blob_into_buf(*pos, &mut buf)?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
@@ -268,7 +267,6 @@ impl InMemoryLayer {
end_lsn: None,
index: HashMap::new(),
file,
end_offset: 0,
}),
})
}
@@ -283,15 +281,10 @@ impl InMemoryLayer {
inner.assert_writeable();
let off = inner.end_offset;
let buf = Value::ser(&val)?;
let len = buf.len();
inner.file.write_all(&buf)?;
inner.end_offset += len as u64;
let off = inner.file.write_blob(&Value::ser(&val)?)?;
let vec_map = inner.index.entry(key).or_default();
let blob_ref = BlobRef::new(off, len, val.will_init());
let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0;
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
@@ -345,21 +338,21 @@ impl InMemoryLayer {
self.start_lsn..inner.end_lsn.unwrap(),
)?;
let mut do_steps = || -> Result<()> {
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, blob_ref) in vec_map.as_slice() {
let mut buf = vec![0u8; blob_ref.size()];
inner.file.read_exact_at(&mut buf, blob_ref.pos())?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(*key, *lsn, val)?;
}
let mut buf = Vec::new();
let mut cursor = inner.file.block_cursor();
let mut keys: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
keys.sort_by_key(|k| k.0);
for (key, vec_map) in keys.iter() {
let key = **key;
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf)?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(key, *lsn, val)?;
}
Ok(())
};
if let Err(err) = do_steps() {
delta_layer_writer.abort();
return Err(err);
}
let delta_layer = delta_layer_writer.finish(Key::MAX)?;

View File

@@ -150,9 +150,10 @@ pub trait Layer: Send + Sync {
const WILL_INIT: u64 = 1;
///
/// Struct representing reference to BLOB in layers. Reference contains BLOB offset and size.
/// For WAL records (delta layer) it also contains `will_init` flag which helps to determine range of records
/// which needs to be applied without reading/deserializing records themselves.
/// Struct representing reference to BLOB in layers. Reference contains BLOB
/// offset, and for WAL records it also contains `will_init` flag. The flag
/// helps to determine the range of records that needs to be applied, without
/// reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(u64);
@@ -163,15 +164,11 @@ impl BlobRef {
}
pub fn pos(&self) -> u64 {
self.0 >> 32
self.0 >> 1
}
pub fn size(&self) -> usize {
((self.0 & 0xFFFFFFFF) >> 1) as usize
}
pub fn new(pos: u64, size: usize, will_init: bool) -> BlobRef {
let mut blob_ref = (pos << 32) | ((size as u64) << 1);
pub fn new(pos: u64, will_init: bool) -> BlobRef {
let mut blob_ref = pos << 1;
if will_init {
blob_ref |= WILL_INIT;
}

View File

@@ -38,11 +38,11 @@ use pgdatadir_mapping::DatadirTimeline;
/// This is embedded in the metadata file, and also in the header of all the
/// layer files. If you make any backwards-incompatible changes to the storage
/// format, bump this!
pub const STORAGE_FORMAT_VERSION: u16 = 1;
pub const STORAGE_FORMAT_VERSION: u16 = 2;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u32 = 0x5A60_0000 | STORAGE_FORMAT_VERSION as u32;
pub const DELTA_FILE_MAGIC: u32 = 0x5A61_0000 | STORAGE_FORMAT_VERSION as u32;
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
lazy_static! {
static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!(

View File

@@ -56,7 +56,7 @@ use crate::layered_repository::writeback_ephemeral_file;
use crate::repository::Key;
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 10;
const TEST_PAGE_CACHE_SIZE: usize = 50;
///
/// Initialize the page cache. This must be called once at page server startup.
@@ -90,6 +90,7 @@ const MAX_USAGE_COUNT: u8 = 5;
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
#[derive(Debug, PartialEq, Eq, Clone)]
#[allow(clippy::enum_variant_names)]
enum CacheKey {
MaterializedPage {
hash_key: MaterializedPageHashKey,
@@ -99,6 +100,10 @@ enum CacheKey {
file_id: u64,
blkno: u32,
},
ImmutableFilePage {
file_id: u64,
blkno: u32,
},
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
@@ -173,6 +178,8 @@ pub struct PageCache {
ephemeral_page_map: RwLock<HashMap<(u64, u32), usize>>,
immutable_page_map: RwLock<HashMap<(u64, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -195,6 +202,12 @@ impl std::ops::Deref for PageReadGuard<'_> {
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.0.buf
}
}
///
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
/// until the guard is dropped.
@@ -226,6 +239,12 @@ impl std::ops::Deref for PageWriteGuard<'_> {
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
}
}
impl PageWriteGuard<'_> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
@@ -381,6 +400,36 @@ impl PageCache {
}
}
// Section 1.3: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_immutable(&self, drop_file_id: u64) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
if let Some(key) = &inner.key {
match key {
CacheKey::ImmutableFilePage { file_id, blkno: _ }
if *file_id == drop_file_id =>
{
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
}
}
}
//
// Section 2: Internal interface functions for lookup/update.
//
@@ -578,6 +627,10 @@ impl PageCache {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
@@ -601,6 +654,10 @@ impl PageCache {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
@@ -632,6 +689,11 @@ impl PageCache {
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
}
}
}
@@ -672,6 +734,16 @@ impl PageCache {
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
None
}
}
}
}
}
@@ -749,6 +821,13 @@ impl PageCache {
CacheKey::EphemeralPage { file_id, blkno } => {
writeback_ephemeral_file(*file_id, *blkno, buf)
}
CacheKey::ImmutableFilePage {
file_id: _,
blkno: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty immutable page",
)),
}
}
@@ -779,6 +858,7 @@ impl PageCache {
Self {
materialized_page_map: Default::default(),
ephemeral_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
}

View File

@@ -65,6 +65,7 @@ lazy_static! {
/// currently open, the 'handle' can still point to the slot where it was last kept. The
/// 'tag' field is used to detect whether the handle still is valid or not.
///
#[derive(Debug)]
pub struct VirtualFile {
/// Lazy handle to the global file descriptor cache. The slot that this points to
/// might contain our File, or it may be empty, or it may contain a File that
@@ -88,7 +89,7 @@ pub struct VirtualFile {
timelineid: String,
}
#[derive(PartialEq, Clone, Copy)]
#[derive(Debug, PartialEq, Clone, Copy)]
struct SlotHandle {
/// Index into OPEN_FILES.slots
index: usize,