Use B-tree for the index in image and delta layers.

We now use a page cache for those, instead of slurping the whole index into
memory.

Fixes https://github.com/zenithdb/zenith/issues/1356

This is a backwards-incompatible change to the storage format, so
bump STORAGE_FORMAT_VERSION.
This commit is contained in:
Heikki Linnakangas
2022-04-07 20:50:16 +03:00
parent c4b57e4b8f
commit 214567bf8f
12 changed files with 3287 additions and 225 deletions

1
Cargo.lock generated
View File

@@ -1499,6 +1499,7 @@ dependencies = [
"daemonize",
"fail",
"futures",
"hex",
"hex-literal",
"humantime",
"hyper",

View File

@@ -10,6 +10,7 @@ regex = "1.4.5"
bytes = { version = "1.0.1", features = ['serde'] }
byteorder = "1.4.3"
futures = "0.3.13"
hex = "0.4.3"
hyper = "0.14"
itertools = "0.10.3"
lazy_static = "1.4.0"

View File

@@ -58,6 +58,7 @@ use zenith_utils::seqwait::SeqWait;
mod blob_io;
pub mod block_io;
mod delta_layer;
mod disk_btree;
pub(crate) mod ephemeral_file;
mod filename;
mod image_layer;
@@ -1602,15 +1603,6 @@ impl LayeredTimeline {
debug!("Could not compact because no partitioning specified yet");
}
// Call unload() on all frozen layers, to release memory.
// This shouldn't be much memory, as only metadata is slurped
// into memory.
let layers = self.layers.lock().unwrap();
for layer in layers.iter_historic_layers() {
layer.unload()?;
}
drop(layers);
Ok(())
}

View File

@@ -4,6 +4,7 @@
use crate::page_cache;
use crate::page_cache::{ReadBufResult, PAGE_SZ};
use bytes::Bytes;
use lazy_static::lazy_static;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
@@ -172,3 +173,47 @@ where
}
}
}
///
/// Trait for block-oriented output
///
pub trait BlockWriter {
///
/// Write a page to the underlying storage.
///
/// 'buf' must be of size PAGE_SZ. Returns the block number the page was
/// written to.
///
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error>;
}
///
/// A simple in-memory buffer of blocks.
///
pub struct BlockBuf {
pub blocks: Vec<Bytes>,
}
impl BlockWriter for BlockBuf {
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
let blknum = self.blocks.len();
self.blocks.push(buf);
tracing::info!("buffered block {}", blknum);
Ok(blknum as u32)
}
}
impl BlockBuf {
pub fn new() -> Self {
BlockBuf { blocks: Vec::new() }
}
pub fn size(&self) -> u64 {
(self.blocks.len() * PAGE_SZ) as u64
}
}
impl Default for BlockBuf {
fn default() -> Self {
Self::new()
}
}

View File

@@ -7,14 +7,8 @@
//! must be page images or WAL records with the 'will_init' flag set, so that
//! they can be replayed without referring to an older page version.
//!
//! When a delta file needs to be accessed, we slurp the 'index' metadata
//! into memory, into the DeltaLayerInner struct. See load() and unload() functions.
//! To access a particular value, we search `index` for the given key.
//! The byte offset in the index can be used to find the value in
//! VALUES_CHAPTER.
//!
//! On disk, the delta files are stored in timelines/<timelineid> directory.
//! Currently, there are no subdirectories, and each delta file is named like this:
//! The delta files are stored in timelines/<timelineid> directory. Currently,
//! there are no subdirectories, and each delta file is named like this:
//!
//! <key start>-<key end>__<start LSN>-<end LSN
//!
@@ -22,23 +16,23 @@
//!
//! 000000067F000032BE0000400000000020B6-000000067F000032BE0000400000000030B6__000000578C6B29-0000000057A50051
//!
//!
//! Every delta file consists of three parts: "summary", "index", and
//! "values". The summary is a fixed size header at the beginning of the file,
//! and it contains basic information about the layer, and offsets to the other
//! parts. The "index" is a serialized HashMap mapping from Key and LSN to an offset in the
//! parts. The "index" is a B-tree, mapping from Key and LSN to an offset in the
//! "values" part. The actual page images and WAL records are stored in the
//! "values" part.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockCursor, BlockReader, FileBlockReader};
use crate::layered_repository::block_io::{BlockBuf, BlockCursor, BlockReader, FileBlockReader};
use crate::layered_repository::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::page_cache::{PageReadGuard, PAGE_SZ};
use crate::repository::{Key, Value};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::virtual_file::VirtualFile;
use crate::walrecord;
use crate::{ZTenantId, ZTimelineId};
@@ -46,8 +40,6 @@ use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use zenith_utils::vec_map::VecMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
@@ -57,11 +49,17 @@ use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
///
/// Header stored in the beginning of the file
///
/// After this comes the 'values' part, starting on block 1. After that,
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct Summary {
/// Magic value to identify this as a zenith delta file. Always DELTA_FILE_MAGIC.
@@ -75,6 +73,8 @@ struct Summary {
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
}
impl From<&DeltaLayer> for Summary {
@@ -89,6 +89,7 @@ impl From<&DeltaLayer> for Summary {
lsn_range: layer.lsn_range.clone(),
index_start_blk: 0,
index_root_blk: 0,
}
}
}
@@ -123,6 +124,46 @@ impl BlobRef {
}
}
const DELTA_KEY_SIZE: usize = KEY_SIZE + 8;
struct DeltaKey([u8; DELTA_KEY_SIZE]);
///
/// This is the key of the B-tree index stored in the delta layer. It consists
/// of the serialized representation of a Key and LSN.
///
impl DeltaKey {
fn from_slice(buf: &[u8]) -> Self {
let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
bytes.copy_from_slice(buf);
DeltaKey(bytes)
}
fn from_key_lsn(key: &Key, lsn: Lsn) -> Self {
let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE];
key.write_to_byte_slice(&mut bytes[0..KEY_SIZE]);
bytes[KEY_SIZE..].copy_from_slice(&u64::to_be_bytes(lsn.0));
DeltaKey(bytes)
}
fn key(&self) -> Key {
Key::from_slice(&self.0)
}
fn lsn(&self) -> Lsn {
Lsn(u64::from_be_bytes(self.0[KEY_SIZE..].try_into().unwrap()))
}
fn extract_key_from_buf(buf: &[u8]) -> Key {
Key::from_slice(&buf[..KEY_SIZE])
}
fn extract_lsn_from_buf(buf: &[u8]) -> Lsn {
let mut lsn_buf = [0u8; 8];
lsn_buf.copy_from_slice(&buf[KEY_SIZE..]);
Lsn(u64::from_be_bytes(lsn_buf))
}
}
///
/// DeltaLayer is the in-memory data structure associated with an
/// on-disk delta file. We keep a DeltaLayer in memory for each
@@ -143,18 +184,12 @@ pub struct DeltaLayer {
}
pub struct DeltaLayerInner {
/// If false, the 'index' has not been loaded into memory yet.
/// If false, the fields below have not been loaded into memory yet.
loaded: bool,
///
/// All versions of all pages in the layer are kept here.
/// Indexed by block number and LSN. The value is an offset into the
/// chapter where the page version is stored.
///
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
@@ -196,27 +231,46 @@ impl Layer for DeltaLayer {
let inner = self.load()?;
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let mut reader = inner.file.as_ref().unwrap().block_cursor();
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, blob_ref) in slice.iter().rev() {
let buf = reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1));
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
})?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let mut cursor = file.block_cursor();
for (entry_lsn, pos) in offsets {
let buf = cursor.read_blob(pos)?;
let val = Value::des(&buf)?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
@@ -241,36 +295,6 @@ impl Layer for DeltaLayer {
}
}
///
/// Release most of the memory used by this layer. If it's accessed again later,
/// it will need to be loaded back.
///
fn unload(&self) -> Result<()> {
// FIXME: In debug mode, loading and unloading the index slows
// things down so much that you get timeout errors. At least
// with the test_parallel_copy test. So as an even more ad hoc
// stopgap fix for that, only unload every on average 10
// checkpoint cycles.
use rand::RngCore;
if rand::thread_rng().next_u32() > (u32::MAX / 10) {
return Ok(());
}
let mut inner = match self.inner.try_write() {
Ok(inner) => inner,
Err(TryLockError::WouldBlock) => return Ok(()),
Err(TryLockError::Poisoned(_)) => panic!("DeltaLayer lock was poisoned"),
};
inner.index = HashMap::default();
inner.loaded = false;
// Note: we keep the Book open. Is that a good idea? The virtual file
// machinery has its own rules for closing the file descriptor if it's not
// needed, but the Book struct uses up some memory, too.
Ok(())
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
@@ -303,21 +327,36 @@ impl Layer for DeltaLayer {
let inner = self.load()?;
let mut values: Vec<(&Key, &VecMap<Lsn, BlobRef>)> = inner.index.iter().collect();
values.sort_by_key(|k| k.0);
println!(
"index_start_blk: {}, root {}",
inner.index_start_blk, inner.index_root_blk
);
let mut reader = inner.file.as_ref().unwrap().block_cursor();
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
tree_reader.dump()?;
let mut cursor = file.block_cursor();
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|delta_key, val| {
let blob_ref = BlobRef(val);
let key = DeltaKey::extract_key_from_buf(delta_key);
let lsn = DeltaKey::extract_lsn_from_buf(delta_key);
for (key, versions) in values {
for (lsn, blob_ref) in versions.as_slice() {
let mut desc = String::new();
match reader.read_blob(blob_ref.pos()) {
match cursor.read_blob(blob_ref.pos()) {
Ok(buf) => {
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
write!(&mut desc, " img {} bytes", img.len()).unwrap();
}
Ok(Value::WalRecord(rec)) => {
let wal_desc = walrecord::describe_wal_record(&rec);
@@ -327,20 +366,22 @@ impl Layer for DeltaLayer {
buf.len(),
rec.will_init(),
wal_desc
)?;
)
.unwrap();
}
Err(err) => {
write!(&mut desc, " DESERIALIZATION ERROR: {}", err)?;
write!(&mut desc, " DESERIALIZATION ERROR: {}", err).unwrap();
}
}
}
Err(err) => {
write!(&mut desc, " READ ERROR: {}", err)?;
write!(&mut desc, " READ ERROR: {}", err).unwrap();
}
}
println!(" key {} at {}: {}", key, lsn, desc);
}
}
true
},
)?;
Ok(())
}
@@ -409,6 +450,7 @@ impl DeltaLayer {
PathOrConf::Conf(_) => {
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
}
@@ -427,17 +469,11 @@ impl DeltaLayer {
}
}
file.file.seek(SeekFrom::Start(
actual_summary.index_start_blk as u64 * PAGE_SZ as u64,
))?;
let mut buf_reader = std::io::BufReader::new(&mut file.file);
let index = HashMap::des_from(&mut buf_reader)?;
inner.index_start_blk = actual_summary.index_start_blk;
inner.index_root_blk = actual_summary.index_root_blk;
debug!("loaded from {}", &path.display());
inner.index = index;
inner.loaded = true;
Ok(())
}
@@ -457,9 +493,9 @@ impl DeltaLayer {
lsn_range: filename.lsn_range.clone(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
index: HashMap::default(),
file: None,
index_start_blk: 0,
index_root_blk: 0,
}),
}
}
@@ -485,8 +521,8 @@ impl DeltaLayer {
inner: RwLock::new(DeltaLayerInner {
loaded: false,
file: None,
index: HashMap::default(),
index_start_blk: 0,
index_root_blk: 0,
}),
})
}
@@ -529,7 +565,7 @@ pub struct DeltaLayerWriter {
key_start: Key,
lsn_range: Range<Lsn>,
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
tree: DiskBtreeBuilder<BlockBuf, DELTA_KEY_SIZE>,
blob_writer: WriteBlobWriter<BufWriter<VirtualFile>>,
}
@@ -558,10 +594,15 @@ impl DeltaLayerWriter {
u64::from(lsn_range.end)
));
let mut file = VirtualFile::create(&path)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
let buf_writer = BufWriter::new(file);
let blob_writer = WriteBlobWriter::new(buf_writer, PAGE_SZ as u64);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf);
Ok(DeltaLayerWriter {
conf,
path,
@@ -569,7 +610,7 @@ impl DeltaLayerWriter {
tenantid,
key_start,
lsn_range,
index: HashMap::new(),
tree: tree_builder,
blob_writer,
})
}
@@ -584,23 +625,16 @@ impl DeltaLayerWriter {
let off = self.blob_writer.write_blob(&Value::ser(&val)?)?;
let vec_map = self.index.entry(key).or_default();
let blob_ref = BlobRef::new(off, val.will_init());
let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
bail!(
"Value for {} at {} already exists in delta layer being built",
key,
lsn
);
}
let delta_key = DeltaKey::from_key_lsn(&key, lsn);
self.tree.append(&delta_key.0, blob_ref.0)?;
Ok(())
}
pub fn size(&self) -> u64 {
self.blob_writer.size()
self.blob_writer.size() + self.tree.borrow_writer().size()
}
///
@@ -614,9 +648,11 @@ impl DeltaLayerWriter {
let mut file = buf_writer.into_inner()?;
// Write out the index
let buf = HashMap::ser(&self.index)?;
let (index_root_blk, block_buf) = self.tree.finish()?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?;
file.write_all(&buf)?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref())?;
}
// Fill in the summary on blk 0
let summary = Summary {
@@ -627,6 +663,7 @@ impl DeltaLayerWriter {
key_range: self.key_start..key_end,
lsn_range: self.lsn_range.clone(),
index_start_blk,
index_root_blk,
};
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
@@ -642,9 +679,9 @@ impl DeltaLayerWriter {
lsn_range: self.lsn_range.clone(),
inner: RwLock::new(DeltaLayerInner {
loaded: false,
index: HashMap::new(),
file: None,
index_start_blk,
index_root_blk,
}),
};
@@ -677,7 +714,7 @@ impl DeltaLayerWriter {
/// fashion.
///
struct DeltaValueIter<'a> {
all_offsets: Vec<(Key, Lsn, BlobRef)>,
all_offsets: Vec<(DeltaKey, BlobRef)>,
next_idx: usize,
reader: BlockCursor<Adapter<'a>>,
}
@@ -702,15 +739,22 @@ impl<'a> Iterator for DeltaValueIter<'a> {
impl<'a> DeltaValueIter<'a> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let mut index: Vec<(&Key, &VecMap<Lsn, BlobRef>)> = inner.index.iter().collect();
index.sort_by_key(|x| x.0);
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
inner.index_start_blk,
inner.index_root_blk,
file,
);
let mut all_offsets: Vec<(Key, Lsn, BlobRef)> = Vec::new();
for (key, vec_map) in index.iter() {
for (lsn, blob_ref) in vec_map.as_slice().iter() {
all_offsets.push((**key, *lsn, *blob_ref));
}
}
let mut all_offsets: Vec<(DeltaKey, BlobRef)> = Vec::new();
tree_reader.visit(
&[0u8; DELTA_KEY_SIZE],
VisitDirection::Forwards,
|key, value| {
all_offsets.push((DeltaKey::from_slice(key), BlobRef(value)));
true
},
)?;
let iter = DeltaValueIter {
all_offsets,
@@ -723,13 +767,15 @@ impl<'a> DeltaValueIter<'a> {
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if self.next_idx < self.all_offsets.len() {
let (key, lsn, off) = &self.all_offsets[self.next_idx];
let (delta_key, blob_ref) = &self.all_offsets[self.next_idx];
//let mut reader = BlobReader::new(self.inner.file.as_ref().unwrap());
let buf = self.reader.read_blob(off.pos())?;
let key = delta_key.key();
let lsn = delta_key.lsn();
let buf = self.reader.read_blob(blob_ref.pos())?;
let val = Value::des(&buf)?;
self.next_idx += 1;
Ok(Some((*key, *lsn, val)))
Ok(Some((key, lsn, val)))
} else {
Ok(None)
}

View File

@@ -0,0 +1,979 @@
//!
//! Simple on-disk B-tree implementation
//!
//! This is used as the index structure within image and delta layers
//!
//! Features:
//! - Fixed-width keys
//! - Fixed-width values (VALUE_SZ)
//! - The tree is created in a bulk operation. Insert/deletion after creation
//! is not suppported
//! - page-oriented
//!
//! TODO:
//! - better errors (e.g. with thiserror?)
//! - maybe something like an Adaptive Radix Tree would be more efficient?
//! - the values stored by image and delta layers are offsets into the file,
//! and they are in monotonically increasing order. Prefix compression would
//! be very useful for them, too.
//! - An Iterator interface would be more convenient for the callers than the
//! 'visit' function
//!
use anyhow;
use byteorder::{ReadBytesExt, BE};
use bytes::{BufMut, Bytes, BytesMut};
use hex;
use std::cmp::Ordering;
use crate::layered_repository::block_io::{BlockReader, BlockWriter};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
pub const VALUE_SZ: usize = 5;
pub const MAX_VALUE: u64 = 0x007f_ffff_ffff;
#[allow(dead_code)]
pub const PAGE_SZ: usize = 8192;
#[derive(Clone, Copy, Debug)]
struct Value([u8; VALUE_SZ]);
impl Value {
fn from_slice(slice: &[u8]) -> Value {
let mut b = [0u8; VALUE_SZ];
b.copy_from_slice(slice);
Value(b)
}
fn from_u64(x: u64) -> Value {
assert!(x <= 0x007f_ffff_ffff);
Value([
(x >> 32) as u8,
(x >> 24) as u8,
(x >> 16) as u8,
(x >> 8) as u8,
x as u8,
])
}
fn from_blknum(x: u32) -> Value {
Value([
0x80,
(x >> 24) as u8,
(x >> 16) as u8,
(x >> 8) as u8,
x as u8,
])
}
#[allow(dead_code)]
fn is_offset(self) -> bool {
self.0[0] & 0x80 != 0
}
fn to_u64(self) -> u64 {
let b = &self.0;
(b[0] as u64) << 32
| (b[1] as u64) << 24
| (b[2] as u64) << 16
| (b[3] as u64) << 8
| b[4] as u64
}
fn to_blknum(self) -> u32 {
let b = &self.0;
assert!(b[0] == 0x80);
(b[1] as u32) << 24 | (b[2] as u32) << 16 | (b[3] as u32) << 8 | b[4] as u32
}
}
/// This is the on-disk representation.
struct OnDiskNode<'a, const L: usize> {
// Fixed-width fields
num_children: u16,
level: u8,
prefix_len: u8,
suffix_len: u8,
// Variable-length fields. These are stored on-disk after the fixed-width
// fields, in this order. In the in-memory representation, these point to
// the right parts in the page buffer.
prefix: &'a [u8],
keys: &'a [u8],
values: &'a [u8],
}
impl<'a, const L: usize> OnDiskNode<'a, L> {
///
/// Interpret a PAGE_SZ page as a node.
///
fn deparse(buf: &[u8]) -> OnDiskNode<L> {
let mut cursor = std::io::Cursor::new(buf);
let num_children = cursor.read_u16::<BE>().unwrap();
let level = cursor.read_u8().unwrap();
let prefix_len = cursor.read_u8().unwrap();
let suffix_len = cursor.read_u8().unwrap();
let mut off = cursor.position();
let prefix_off = off as usize;
off += prefix_len as u64;
let keys_off = off as usize;
let keys_len = num_children as usize * suffix_len as usize;
off += keys_len as u64;
let values_off = off as usize;
let values_len = num_children as usize * VALUE_SZ as usize;
//off += values_len as u64;
let prefix = &buf[prefix_off..prefix_off + prefix_len as usize];
let keys = &buf[keys_off..keys_off + keys_len];
let values = &buf[values_off..values_off + values_len];
OnDiskNode {
num_children,
level,
prefix_len,
suffix_len,
prefix,
keys,
values,
}
}
///
/// Read a value at 'idx'
///
fn value(&self, idx: usize) -> Value {
let value_off = idx * VALUE_SZ;
let value_slice = &self.values[value_off..value_off + VALUE_SZ];
Value::from_slice(value_slice)
}
fn binary_search(&self, search_key: &[u8; L], keybuf: &mut [u8]) -> Result<usize, usize> {
let mut size = self.num_children as usize;
let mut low = 0;
let mut high = size;
while low < high {
let mid = low + size / 2;
let key_off = mid as usize * self.suffix_len as usize;
let suffix = &self.keys[key_off..key_off + self.suffix_len as usize];
// Does this match?
keybuf[self.prefix_len as usize..].copy_from_slice(suffix);
let cmp = keybuf[..].cmp(search_key);
if cmp == Ordering::Less {
low = mid + 1;
} else if cmp == Ordering::Greater {
high = mid;
} else {
return Ok(mid);
}
size = high - low;
}
Err(low)
}
}
///
/// Public reader object, to search the tree.
///
pub struct DiskBtreeReader<R, const L: usize>
where
R: BlockReader,
{
start_blk: u32,
root_blk: u32,
reader: R,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum VisitDirection {
Forwards,
Backwards,
}
impl<R, const L: usize> DiskBtreeReader<R, L>
where
R: BlockReader,
{
pub fn new(start_blk: u32, root_blk: u32, reader: R) -> Self {
DiskBtreeReader {
start_blk,
root_blk,
reader,
}
}
///
/// Read the value for given key. Returns the value, or None if it doesn't exist.
///
pub fn get(&self, search_key: &[u8; L]) -> anyhow::Result<Option<u64>> {
let mut result: Option<u64> = None;
self.visit(search_key, VisitDirection::Forwards, |key, value| {
if key == search_key {
result = Some(value);
}
false
})?;
Ok(result)
}
///
/// Scan the tree, starting from 'search_key', in the given direction. 'visitor'
/// will be called for every key >= 'search_key' (or <= 'search_key', if scanning
/// backwards)
///
pub fn visit<V>(
&self,
search_key: &[u8; L],
dir: VisitDirection,
mut visitor: V,
) -> anyhow::Result<bool>
where
V: FnMut(&[u8], u64) -> bool,
{
self.search_recurse(self.root_blk, search_key, dir, &mut visitor)
}
fn search_recurse<V>(
&self,
node_blknum: u32,
search_key: &[u8; L],
dir: VisitDirection,
visitor: &mut V,
) -> anyhow::Result<bool>
where
V: FnMut(&[u8], u64) -> bool,
{
// Locate the node.
let blk = self.reader.read_blk(self.start_blk + node_blknum)?;
// Search all entries on this node
self.search_node(blk.as_ref(), search_key, dir, visitor)
}
fn search_node<V>(
&self,
node_buf: &[u8],
search_key: &[u8; L],
dir: VisitDirection,
visitor: &mut V,
) -> anyhow::Result<bool>
where
V: FnMut(&[u8], u64) -> bool,
{
let node = OnDiskNode::deparse(node_buf);
let prefix_len = node.prefix_len as usize;
let suffix_len = node.suffix_len as usize;
assert!(node.num_children > 0);
let mut keybuf = Vec::new();
keybuf.extend(node.prefix);
keybuf.resize(prefix_len + suffix_len, 0);
if dir == VisitDirection::Forwards {
// Locate the first match
let mut idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
Ok(idx) => idx,
Err(idx) => {
if node.level == 0 {
// Imagine that the node contains the following keys:
//
// 1
// 3 <-- idx
// 5
//
// If the search key is '2' and there is exact match,
// the binary search would return the index of key
// '3'. That's cool, '3' is the first key to return.
idx
} else {
// This is an internal page, so each key represents a lower
// bound for what's in the child page. If there is no exact
// match, we have to return the *previous* entry.
//
// 1 <-- return this
// 3 <-- idx
// 5
idx.saturating_sub(1)
}
}
};
// idx points to the first match now. Keep going from there
let mut key_off = idx * suffix_len;
while idx < node.num_children as usize {
let suffix = &node.keys[key_off..key_off + suffix_len];
keybuf[prefix_len..].copy_from_slice(suffix);
let value = node.value(idx as usize);
#[allow(clippy::collapsible_if)]
if node.level == 0 {
// leaf
if !visitor(&keybuf, value.to_u64()) {
return Ok(false);
}
} else {
#[allow(clippy::collapsible_if)]
if !self.search_recurse(value.to_blknum(), search_key, dir, visitor)? {
return Ok(false);
}
}
idx += 1;
key_off += suffix_len;
}
} else {
let mut idx = match node.binary_search(search_key, keybuf.as_mut_slice()) {
Ok(idx) => {
// Exact match. That's the first entry to return, and walk
// backwards from there. (The loop below starts from 'idx -
// 1', so add one here to compensate.)
idx + 1
}
Err(idx) => {
// No exact match. The binary search returned the index of the
// first key that's > search_key. Back off by one, and walk
// backwards from there. (The loop below starts from idx - 1,
// so we don't need to subtract one here)
idx
}
};
// idx points to the first match + 1 now. Keep going from there.
let mut key_off = idx * suffix_len;
while idx > 0 {
idx -= 1;
key_off -= suffix_len;
let suffix = &node.keys[key_off..key_off + suffix_len];
keybuf[prefix_len..].copy_from_slice(suffix);
let value = node.value(idx as usize);
#[allow(clippy::collapsible_if)]
if node.level == 0 {
// leaf
if !visitor(&keybuf, value.to_u64()) {
return Ok(false);
}
} else {
#[allow(clippy::collapsible_if)]
if !self.search_recurse(value.to_blknum(), search_key, dir, visitor)? {
return Ok(false);
}
}
if idx == 0 {
break;
}
}
}
Ok(true)
}
#[allow(dead_code)]
pub fn dump(&self) -> anyhow::Result<()> {
self.dump_recurse(self.root_blk, &[], 0)
}
fn dump_recurse(&self, blknum: u32, path: &[u8], depth: usize) -> anyhow::Result<()> {
let blk = self.reader.read_blk(self.start_blk + blknum)?;
let buf: &[u8] = blk.as_ref();
let node = OnDiskNode::<L>::deparse(buf);
print!("{:indent$}", "", indent = depth * 2);
println!(
"blk #{}: path {}: prefix {}, suffix_len {}",
blknum,
hex::encode(path),
hex::encode(node.prefix),
node.suffix_len
);
let mut idx = 0;
let mut key_off = 0;
while idx < node.num_children {
let key = &node.keys[key_off..key_off + node.suffix_len as usize];
let val = node.value(idx as usize);
print!("{:indent$}", "", indent = depth * 2 + 2);
println!("{}: {}", hex::encode(key), hex::encode(val.0));
if node.level > 0 {
let child_path = [path, node.prefix].concat();
self.dump_recurse(val.to_blknum(), &child_path, depth + 1)?;
}
idx += 1;
key_off += node.suffix_len as usize;
}
Ok(())
}
}
///
/// Public builder object, for creating a new tree.
///
/// Usage: Create a builder object by calling 'new', load all the data into the
/// tree by calling 'append' for each key-value pair, and then call 'finish'
///
/// 'L' is the key length in bytes
pub struct DiskBtreeBuilder<W, const L: usize>
where
W: BlockWriter,
{
writer: W,
///
/// stack[0] is the current root page, stack.last() is the leaf.
///
stack: Vec<BuildNode<L>>,
/// Last key that was appended to the tree. Used to sanity check that append
/// is called in increasing key order.
last_key: Option<[u8; L]>,
}
impl<W, const L: usize> DiskBtreeBuilder<W, L>
where
W: BlockWriter,
{
pub fn new(writer: W) -> Self {
DiskBtreeBuilder {
writer,
last_key: None,
stack: vec![BuildNode::new(0)],
}
}
pub fn append(&mut self, key: &[u8; L], value: u64) -> Result<(), anyhow::Error> {
assert!(value <= MAX_VALUE);
if let Some(last_key) = &self.last_key {
assert!(key > last_key, "unsorted input");
}
self.last_key = Some(*key);
Ok(self.append_internal(key, Value::from_u64(value))?)
}
fn append_internal(&mut self, key: &[u8; L], value: Value) -> Result<(), std::io::Error> {
// Try to append to the current leaf buffer
let last = self.stack.last_mut().unwrap();
let level = last.level;
if last.push(key, value) {
return Ok(());
}
// It did not fit. Try to compress, and it it succeeds to make some room
// on the node, try appending to it again.
#[allow(clippy::collapsible_if)]
if last.compress() {
if last.push(key, value) {
return Ok(());
}
}
// Could not append to the current leaf. Flush it and create a new one.
self.flush_node()?;
// Replace the node we flushed with an empty one and append the new
// key to it.
let mut last = BuildNode::new(level);
if !last.push(key, value) {
panic!("could not push to new leaf node");
}
self.stack.push(last);
Ok(())
}
fn flush_node(&mut self) -> Result<(), std::io::Error> {
let last = self.stack.pop().unwrap();
let buf = last.pack();
let downlink_key = last.first_key();
let downlink_ptr = self.writer.write_blk(buf)?;
// Append the downlink to the parent
if self.stack.is_empty() {
self.stack.push(BuildNode::new(last.level + 1));
}
self.append_internal(&downlink_key, Value::from_blknum(downlink_ptr))?;
Ok(())
}
///
/// Flushes everything to disk, and returns the block number of the root page.
/// The caller must store the root block number "out-of-band", and pass it
/// to the DiskBtreeReader::new() when you want to read the tree again.
/// (In the image and delta layers, it is stored in the beginning of the file,
/// in the summary header)
///
pub fn finish(mut self) -> Result<(u32, W), std::io::Error> {
// flush all levels, except the root.
while self.stack.len() > 1 {
self.flush_node()?;
}
let root = self.stack.first().unwrap();
let buf = root.pack();
let root_blknum = self.writer.write_blk(buf)?;
Ok((root_blknum, self.writer))
}
pub fn borrow_writer(&self) -> &W {
&self.writer
}
}
///
/// BuildNode represesnts an incomplete page that we are appending to.
///
#[derive(Clone, Debug)]
struct BuildNode<const L: usize> {
num_children: u16,
level: u8,
prefix: Vec<u8>,
suffix_len: usize,
keys: Vec<u8>,
values: Vec<u8>,
size: usize, // physical size of this node, if it was written to disk like this
}
const NODE_SIZE: usize = PAGE_SZ;
const NODE_HDR_SIZE: usize = 2 + 1 + 1 + 1;
impl<const L: usize> BuildNode<L> {
fn new(level: u8) -> Self {
BuildNode {
num_children: 0,
level,
prefix: Vec::new(),
suffix_len: 0,
keys: Vec::new(),
values: Vec::new(),
size: NODE_HDR_SIZE,
}
}
/// Try to append a key-value pair to this node. Returns 'true' on
/// success, 'false' if the page was full or the key was
/// incompatible with the prefix of the existing keys.
fn push(&mut self, key: &[u8; L], value: Value) -> bool {
// If we have already performed prefix-compression on the page,
// check that the incoming key has the same prefix.
if self.num_children > 0 {
// does the prefix allow it?
if !key.starts_with(&self.prefix) {
return false;
}
} else {
self.suffix_len = key.len();
}
// Is the node too full?
if self.size + self.suffix_len + VALUE_SZ >= NODE_SIZE {
return false;
}
// All clear
self.num_children += 1;
self.keys.extend(&key[self.prefix.len()..]);
self.values.extend(value.0);
assert!(self.keys.len() == self.num_children as usize * self.suffix_len as usize);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
self.size += self.suffix_len + VALUE_SZ;
true
}
///
/// Perform prefix-compression.
///
/// Returns 'true' on success, 'false' if no compression was possible.
///
fn compress(&mut self) -> bool {
let first_suffix = self.first_suffix();
let last_suffix = self.last_suffix();
// Find the common prefix among all keys
let mut prefix_len = 0;
while prefix_len < self.suffix_len {
if first_suffix[prefix_len] != last_suffix[prefix_len] {
break;
}
prefix_len += 1;
}
if prefix_len == 0 {
return false;
}
// Can compress. Rewrite the keys without the common prefix.
self.prefix.extend(&self.keys[..prefix_len]);
let mut new_keys = Vec::new();
let mut key_off = 0;
while key_off < self.keys.len() {
let next_key_off = key_off + self.suffix_len;
new_keys.extend(&self.keys[key_off + prefix_len..next_key_off]);
key_off = next_key_off;
}
self.keys = new_keys;
self.suffix_len -= prefix_len;
self.size -= prefix_len * self.num_children as usize;
self.size += prefix_len;
assert!(self.keys.len() == self.num_children as usize * self.suffix_len as usize);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
true
}
///
/// Serialize the node to on-disk format.
///
fn pack(&self) -> Bytes {
assert!(self.keys.len() == self.num_children as usize * self.suffix_len as usize);
assert!(self.values.len() == self.num_children as usize * VALUE_SZ);
assert!(self.num_children > 0);
let mut buf = BytesMut::new();
buf.put_u16(self.num_children);
buf.put_u8(self.level);
buf.put_u8(self.prefix.len() as u8);
buf.put_u8(self.suffix_len as u8);
buf.put(&self.prefix[..]);
buf.put(&self.keys[..]);
buf.put(&self.values[..]);
assert!(buf.len() == self.size);
assert!(buf.len() <= PAGE_SZ);
buf.resize(PAGE_SZ, 0);
buf.freeze()
}
fn first_suffix(&self) -> &[u8] {
&self.keys[..self.suffix_len]
}
fn last_suffix(&self) -> &[u8] {
&self.keys[self.keys.len() - self.suffix_len..]
}
/// Return the full first key of the page, including the prefix
fn first_key(&self) -> [u8; L] {
let mut key = [0u8; L];
key[..self.prefix.len()].copy_from_slice(&self.prefix);
key[self.prefix.len()..].copy_from_slice(self.first_suffix());
key
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Clone, Default)]
struct TestDisk {
blocks: Vec<Bytes>,
}
impl TestDisk {
fn new() -> Self {
Self::default()
}
}
impl BlockReader for TestDisk {
type BlockLease = std::rc::Rc<[u8; PAGE_SZ]>;
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
let mut buf = [0u8; PAGE_SZ];
buf.copy_from_slice(&self.blocks[blknum as usize]);
Ok(std::rc::Rc::new(buf))
}
}
impl BlockWriter for &mut TestDisk {
fn write_blk(&mut self, buf: Bytes) -> Result<u32, std::io::Error> {
let blknum = self.blocks.len();
self.blocks.push(buf);
Ok(blknum as u32)
}
}
#[test]
fn basic() -> anyhow::Result<()> {
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
let all_keys: Vec<&[u8; 6]> = vec![
b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
];
let all_data: Vec<(&[u8; 6], u64)> = all_keys
.iter()
.enumerate()
.map(|(idx, key)| (*key, idx as u64))
.collect();
for (key, val) in all_data.iter() {
writer.append(key, *val)?;
}
let (root_offset, _writer) = writer.finish()?;
let reader = DiskBtreeReader::new(0, root_offset, disk);
reader.dump()?;
// Test the `get` function on all the keys.
for (key, val) in all_data.iter() {
assert_eq!(reader.get(key)?, Some(*val));
}
// And on some keys that don't exist
assert_eq!(reader.get(b"aaaaaa")?, None);
assert_eq!(reader.get(b"zzzzzz")?, None);
assert_eq!(reader.get(b"xaaabx")?, None);
// Test search with `visit` function
let search_key = b"xabaaa";
let expected: Vec<(Vec<u8>, u64)> = all_data
.iter()
.filter(|(key, _value)| key[..] >= search_key[..])
.map(|(key, value)| (key.to_vec(), *value))
.collect();
let mut data = Vec::new();
reader.visit(search_key, VisitDirection::Forwards, |key, value| {
data.push((key.to_vec(), value));
true
})?;
assert_eq!(data, expected);
// Test a backwards scan
let mut expected: Vec<(Vec<u8>, u64)> = all_data
.iter()
.filter(|(key, _value)| key[..] <= search_key[..])
.map(|(key, value)| (key.to_vec(), *value))
.collect();
expected.reverse();
let mut data = Vec::new();
reader.visit(search_key, VisitDirection::Backwards, |key, value| {
data.push((key.to_vec(), value));
true
})?;
assert_eq!(data, expected);
// Backward scan where nothing matches
reader.visit(b"aaaaaa", VisitDirection::Backwards, |key, value| {
panic!("found unexpected key {}: {}", hex::encode(key), value);
})?;
// Full scan
let expected: Vec<(Vec<u8>, u64)> = all_data
.iter()
.map(|(key, value)| (key.to_vec(), *value))
.collect();
let mut data = Vec::new();
reader.visit(&[0u8; 6], VisitDirection::Forwards, |key, value| {
data.push((key.to_vec(), value));
true
})?;
assert_eq!(data, expected);
Ok(())
}
#[test]
fn lots_of_keys() -> anyhow::Result<()> {
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
const NUM_KEYS: u64 = 1000;
let mut all_data: BTreeMap<u64, u64> = BTreeMap::new();
for idx in 0..NUM_KEYS {
let key_int: u64 = 1 + idx * 2;
let key = u64::to_be_bytes(key_int);
writer.append(&key, idx)?;
all_data.insert(key_int, idx);
}
let (root_offset, _writer) = writer.finish()?;
let reader = DiskBtreeReader::new(0, root_offset, disk);
reader.dump()?;
use std::sync::Mutex;
let result = Mutex::new(Vec::new());
let limit: AtomicUsize = AtomicUsize::new(10);
let take_ten = |key: &[u8], value: u64| {
let mut keybuf = [0u8; 8];
keybuf.copy_from_slice(key);
let key_int = u64::from_be_bytes(keybuf);
let mut result = result.lock().unwrap();
result.push((key_int, value));
// keep going until we have 10 matches
result.len() < limit.load(Ordering::Relaxed)
};
for search_key_int in 0..(NUM_KEYS * 2 + 10) {
let search_key = u64::to_be_bytes(search_key_int);
assert_eq!(
reader.get(&search_key)?,
all_data.get(&search_key_int).cloned()
);
// Test a forward scan starting with this key
result.lock().unwrap().clear();
reader.visit(&search_key, VisitDirection::Forwards, take_ten)?;
let expected = all_data
.range(search_key_int..)
.take(10)
.map(|(&key, &val)| (key, val))
.collect::<Vec<(u64, u64)>>();
assert_eq!(*result.lock().unwrap(), expected);
// And a backwards scan
result.lock().unwrap().clear();
reader.visit(&search_key, VisitDirection::Backwards, take_ten)?;
let expected = all_data
.range(..=search_key_int)
.rev()
.take(10)
.map(|(&key, &val)| (key, val))
.collect::<Vec<(u64, u64)>>();
assert_eq!(*result.lock().unwrap(), expected);
}
// full scan
let search_key = u64::to_be_bytes(0);
limit.store(usize::MAX, Ordering::Relaxed);
result.lock().unwrap().clear();
reader.visit(&search_key, VisitDirection::Forwards, take_ten)?;
let expected = all_data
.iter()
.map(|(&key, &val)| (key, val))
.collect::<Vec<(u64, u64)>>();
assert_eq!(*result.lock().unwrap(), expected);
// full scan
let search_key = u64::to_be_bytes(u64::MAX);
limit.store(usize::MAX, Ordering::Relaxed);
result.lock().unwrap().clear();
reader.visit(&search_key, VisitDirection::Backwards, take_ten)?;
let expected = all_data
.iter()
.rev()
.map(|(&key, &val)| (key, val))
.collect::<Vec<(u64, u64)>>();
assert_eq!(*result.lock().unwrap(), expected);
Ok(())
}
#[test]
fn random_data() -> anyhow::Result<()> {
// Generate random keys with exponential distribution, to
// exercise the prefix compression
const NUM_KEYS: usize = 100000;
let mut all_data: BTreeMap<u128, u64> = BTreeMap::new();
for idx in 0..NUM_KEYS {
let u: f64 = rand::thread_rng().gen_range(0.0..1.0);
let t = -(f64::ln(u));
let key_int = (t * 1000000.0) as u128;
all_data.insert(key_int as u128, idx as u64);
}
// Build a tree from it
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 16>::new(&mut disk);
for (&key, &val) in all_data.iter() {
writer.append(&u128::to_be_bytes(key), val)?;
}
let (root_offset, _writer) = writer.finish()?;
let reader = DiskBtreeReader::new(0, root_offset, disk);
// Test get() operation on all the keys
for (&key, &val) in all_data.iter() {
let search_key = u128::to_be_bytes(key);
assert_eq!(reader.get(&search_key)?, Some(val));
}
// Test get() operations on random keys, most of which will not exist
for _ in 0..100000 {
let key_int = rand::thread_rng().gen::<u128>();
let search_key = u128::to_be_bytes(key_int);
assert!(reader.get(&search_key)? == all_data.get(&key_int).cloned());
}
// Test boundary cases
assert!(reader.get(&u128::to_be_bytes(u128::MIN))? == all_data.get(&u128::MIN).cloned());
assert!(reader.get(&u128::to_be_bytes(u128::MAX))? == all_data.get(&u128::MAX).cloned());
Ok(())
}
#[test]
#[should_panic(expected = "unsorted input")]
fn unsorted_input() {
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 2>::new(&mut disk);
let _ = writer.append(b"ba", 1);
let _ = writer.append(b"bb", 2);
let _ = writer.append(b"aa", 3);
}
///
/// This test contains a particular data set, see disk_btree_test_data.rs
///
#[test]
fn particular_data() -> anyhow::Result<()> {
// Build a tree from it
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
for (key, val) in disk_btree_test_data::TEST_DATA {
writer.append(&key, val)?;
}
let (root_offset, writer) = writer.finish()?;
println!("SIZE: {} blocks", writer.blocks.len());
let reader = DiskBtreeReader::new(0, root_offset, disk);
// Test get() operation on all the keys
for (key, val) in disk_btree_test_data::TEST_DATA {
assert_eq!(reader.get(&key)?, Some(val));
}
// Test full scan
let mut count = 0;
reader.visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| {
count += 1;
true
})?;
assert_eq!(count, disk_btree_test_data::TEST_DATA.len());
reader.dump()?;
Ok(())
}
}
#[cfg(test)]
#[path = "disk_btree_test_data.rs"]
mod disk_btree_test_data;

File diff suppressed because it is too large Load Diff

View File

@@ -16,40 +16,43 @@
//! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the
//! beginning of the file, and it contains basic information about the
//! layer, and offsets to the other parts. The "index" is a serialized
//! HashMap, mapping from Key to an offset in the "values" part. The
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
//!
//! Only the "index" is loaded into memory by the load function.
//! When images are needed, they are read directly from disk.
//!
use crate::config::PageServerConf;
use crate::layered_repository::blob_io::{BlobCursor, BlobWriter, WriteBlobWriter};
use crate::layered_repository::block_io::{BlockReader, FileBlockReader};
use crate::layered_repository::block_io::{BlockBuf, BlockReader, FileBlockReader};
use crate::layered_repository::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection};
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::page_cache::PAGE_SZ;
use crate::repository::{Key, Value};
use crate::repository::{Key, Value, KEY_SIZE};
use crate::virtual_file::VirtualFile;
use crate::{ZTenantId, ZTimelineId};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use hex;
use log::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs;
use std::io::Write;
use std::io::{Seek, SeekFrom};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{RwLock, RwLockReadGuard, TryLockError};
use std::sync::{RwLock, RwLockReadGuard};
use zenith_utils::bin_ser::BeSer;
use zenith_utils::lsn::Lsn;
///
/// Header stored in the beginning of the file
///
/// After this comes the 'values' part, starting on block 1. After that,
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
struct Summary {
/// Magic value to identify this as a zenith image file. Always IMAGE_FILE_MAGIC.
@@ -63,6 +66,9 @@ struct Summary {
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
// the 'values' part starts after the summary header, on block 1.
}
impl From<&ImageLayer> for Summary {
@@ -73,10 +79,10 @@ impl From<&ImageLayer> for Summary {
tenantid: layer.tenantid,
timelineid: layer.timelineid,
key_range: layer.key_range.clone(),
lsn: layer.lsn,
index_start_blk: 0,
index_root_blk: 0,
}
}
}
@@ -104,11 +110,9 @@ pub struct ImageLayerInner {
/// If false, the 'index' has not been loaded into memory yet.
loaded: bool,
/// offset of each value
index: HashMap<Key, u64>,
// values copied from summary
index_start_blk: u32,
index_root_blk: u32,
/// Reader object for reading blocks from the file. (None if not loaded yet)
file: Option<FileBlockReader<VirtualFile>>,
@@ -147,21 +151,21 @@ impl Layer for ImageLayer {
assert!(lsn_range.end >= self.lsn);
let inner = self.load()?;
if let Some(&offset) = inner.index.get(&key) {
let buf = inner
.file
.as_ref()
.unwrap()
.block_cursor()
.read_blob(offset)
.with_context(|| {
format!(
"failed to read blob from data file {} at offset {}",
self.filename().display(),
offset
)
})?;
let value = Bytes::from(buf);
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf)? {
let blob = file.block_cursor().read_blob(offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.filename().display(),
offset
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
@@ -174,33 +178,6 @@ impl Layer for ImageLayer {
todo!();
}
fn unload(&self) -> Result<()> {
// Unload the index.
//
// TODO: we should access the index directly from pages on the disk,
// using the buffer cache. This load/unload mechanism is really ad hoc.
// FIXME: In debug mode, loading and unloading the index slows
// things down so much that you get timeout errors. At least
// with the test_parallel_copy test. So as an even more ad hoc
// stopgap fix for that, only unload every on average 10
// checkpoint cycles.
use rand::RngCore;
if rand::thread_rng().next_u32() > (u32::MAX / 10) {
return Ok(());
}
let mut inner = match self.inner.try_write() {
Ok(inner) => inner,
Err(TryLockError::WouldBlock) => return Ok(()),
Err(TryLockError::Poisoned(_)) => panic!("ImageLayer lock was poisoned"),
};
inner.index = HashMap::default();
inner.loaded = false;
Ok(())
}
fn delete(&self) -> Result<()> {
// delete underlying file
fs::remove_file(self.path())?;
@@ -227,10 +204,16 @@ impl Layer for ImageLayer {
}
let inner = self.load()?;
let file = inner.file.as_ref().unwrap();
let tree_reader =
DiskBtreeReader::<_, KEY_SIZE>::new(inner.index_start_blk, inner.index_root_blk, file);
for (key, offset) in inner.index.iter() {
println!("key: {} offset {}", key, offset);
}
tree_reader.dump()?;
tree_reader.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
println!("key: {} offset {}", hex::encode(key), value);
true
})?;
Ok(())
}
@@ -300,6 +283,7 @@ impl ImageLayer {
PathOrConf::Conf(_) => {
let mut expected_summary = Summary::from(self);
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!("in-file summary does not match expected summary. actual = {:?} expected = {:?}", actual_summary, expected_summary);
@@ -319,17 +303,8 @@ impl ImageLayer {
}
}
file.file.seek(SeekFrom::Start(
actual_summary.index_start_blk as u64 * PAGE_SZ as u64,
))?;
let mut buf_reader = std::io::BufReader::new(&mut file.file);
let index = HashMap::des_from(&mut buf_reader)?;
inner.index_start_blk = actual_summary.index_start_blk;
info!("loaded from {}", &path.display());
inner.index = index;
inner.index_root_blk = actual_summary.index_root_blk;
inner.loaded = true;
Ok(())
}
@@ -348,10 +323,10 @@ impl ImageLayer {
key_range: filename.key_range.clone(),
lsn: filename.lsn,
inner: RwLock::new(ImageLayerInner {
index: HashMap::new(),
loaded: false,
file: None,
index_start_blk: 0,
index_root_blk: 0,
}),
}
}
@@ -376,9 +351,9 @@ impl ImageLayer {
lsn: summary.lsn,
inner: RwLock::new(ImageLayerInner {
file: None,
index: HashMap::new(),
loaded: false,
index_start_blk: 0,
index_root_blk: 0,
}),
})
}
@@ -420,9 +395,8 @@ pub struct ImageLayerWriter {
key_range: Range<Key>,
lsn: Lsn,
index: HashMap<Key, u64>,
blob_writer: WriteBlobWriter<VirtualFile>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
}
impl ImageLayerWriter {
@@ -447,9 +421,15 @@ impl ImageLayerWriter {
},
);
info!("new image layer {}", path.display());
let file = VirtualFile::create(&path)?;
let mut file = VirtualFile::create(&path)?;
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64))?;
let blob_writer = WriteBlobWriter::new(file, PAGE_SZ as u64);
// Initialize the b-tree index builder
let block_buf = BlockBuf::new();
let tree_builder = DiskBtreeBuilder::new(block_buf);
let writer = ImageLayerWriter {
conf,
_path: path,
@@ -457,7 +437,7 @@ impl ImageLayerWriter {
tenantid,
key_range: key_range.clone(),
lsn,
index: HashMap::new(),
tree: tree_builder,
blob_writer,
};
@@ -473,8 +453,9 @@ impl ImageLayerWriter {
ensure!(self.key_range.contains(&key));
let off = self.blob_writer.write_blob(img)?;
let old = self.index.insert(key, off);
assert!(old.is_none());
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
self.tree.append(&keybuf, off)?;
Ok(())
}
@@ -486,9 +467,11 @@ impl ImageLayerWriter {
let mut file = self.blob_writer.into_inner();
// Write out the index
let buf = HashMap::ser(&self.index)?;
file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64))?;
file.write_all(&buf)?;
let (index_root_blk, block_buf) = self.tree.finish()?;
for buf in block_buf.blocks {
file.write_all(buf.as_ref())?;
}
// Fill in the summary on blk 0
let summary = Summary {
@@ -499,6 +482,7 @@ impl ImageLayerWriter {
key_range: self.key_range.clone(),
lsn: self.lsn,
index_start_blk,
index_root_blk,
};
file.seek(SeekFrom::Start(0))?;
Summary::ser_into(&summary, &mut file)?;
@@ -514,9 +498,9 @@ impl ImageLayerWriter {
lsn: self.lsn,
inner: RwLock::new(ImageLayerInner {
loaded: false,
index: HashMap::new(),
file: None,
index_start_blk,
index_root_blk,
}),
};
trace!("created image layer {}", layer.path().display());

View File

@@ -166,13 +166,6 @@ impl Layer for InMemoryLayer {
todo!();
}
/// Cannot unload anything in an in-memory layer, since there's no backing
/// store. To release memory used by an in-memory layer, use 'freeze' to turn
/// it into an on-disk layer.
fn unload(&self) -> Result<()> {
Ok(())
}
/// Nothing to do here. When you drop the last reference to the layer, it will
/// be deallocated.
fn delete(&self) -> Result<()> {

View File

@@ -134,10 +134,6 @@ pub trait Layer: Send + Sync {
/// Iterate through all keys and values stored in the layer
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_>;
/// Release memory used by this layer. There is no corresponding 'load'
/// function, that's done implicitly when you call one of the get-functions.
fn unload(&self) -> Result<()>;
/// Permanently remove this layer from disk.
fn delete(&self) -> Result<()>;

View File

@@ -38,7 +38,7 @@ use pgdatadir_mapping::DatadirTimeline;
/// This is embedded in the metadata file, and also in the header of all the
/// layer files. If you make any backwards-incompatible changes to the storage
/// format, bump this!
pub const STORAGE_FORMAT_VERSION: u16 = 2;
pub const STORAGE_FORMAT_VERSION: u16 = 3;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;

View File

@@ -3,6 +3,7 @@ use crate::remote_storage::RemoteIndex;
use crate::walrecord::ZenithWalRecord;
use crate::CheckpointConfig;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt;
@@ -27,6 +28,8 @@ pub struct Key {
pub field6: u32,
}
pub const KEY_SIZE: usize = 18;
impl Key {
pub fn next(&self) -> Key {
self.add(1)
@@ -61,7 +64,7 @@ impl Key {
key
}
pub fn from_array(b: [u8; 18]) -> Self {
pub fn from_slice(b: &[u8]) -> Self {
Key {
field1: b[0],
field2: u32::from_be_bytes(b[1..5].try_into().unwrap()),
@@ -71,6 +74,15 @@ impl Key {
field6: u32::from_be_bytes(b[14..18].try_into().unwrap()),
}
}
pub fn write_to_byte_slice(&self, buf: &mut [u8]) {
buf[0] = self.field1;
BE::write_u32(&mut buf[1..5], self.field2);
BE::write_u32(&mut buf[5..9], self.field3);
BE::write_u32(&mut buf[9..13], self.field4);
buf[13] = self.field5;
BE::write_u32(&mut buf[14..18], self.field6);
}
}
pub fn key_range_size(key_range: &Range<Key>) -> u32 {
@@ -569,7 +581,7 @@ mod tests {
use lazy_static::lazy_static;
lazy_static! {
static ref TEST_KEY: Key = Key::from_array(hex!("112222222233333333444444445500000001"));
static ref TEST_KEY: Key = Key::from_slice(&hex!("112222222233333333444444445500000001"));
}
#[test]