Optimize reading versions for delta_layer

Store blob size in layer metadata for all layers types

Heikki: This is a squashed version of PR #1369
This commit is contained in:
Konstantin Knizhnik
2022-03-16 18:01:24 +03:00
committed by Heikki Linnakangas
parent d756921220
commit a39de2997f
7 changed files with 157 additions and 140 deletions

View File

@@ -62,7 +62,6 @@ mod layer_map;
pub mod metadata;
mod par_fsync;
mod storage_layer;
mod utils;
use delta_layer::{DeltaLayer, DeltaLayerWriter};
use ephemeral_file::is_ephemeral_file;

View File

@@ -33,9 +33,8 @@
use crate::config::PageServerConf;
use crate::layered_repository::filename::{DeltaFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
BlobRef, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::layered_repository::utils;
use crate::repository::{Key, Value};
use crate::virtual_file::VirtualFile;
use crate::walrecord;
@@ -122,7 +121,7 @@ pub struct DeltaLayerInner {
/// Indexed by block number and LSN. The value is an offset into the
/// chapter where the page version is stored.
///
index: HashMap<Key, VecMap<Lsn, u64>>,
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
book: Option<Book<VirtualFile>>,
}
@@ -170,22 +169,36 @@ impl Layer for DeltaLayer {
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let val = Value::des(&utils::read_blob_from_chapter(&values_reader, *pos)?)?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
let mut size = 0usize;
let mut first_pos = 0u64;
for (_entry_lsn, blob_ref) in slice.iter().rev() {
size += blob_ref.size();
first_pos = blob_ref.pos();
if blob_ref.will_init() {
break;
}
}
if size != 0 {
let mut buf = vec![0u8; size];
values_reader.read_exact_at(&mut buf, first_pos)?;
for (entry_lsn, blob_ref) in slice.iter().rev() {
let offs = (blob_ref.pos() - first_pos) as usize;
let val = Value::des(&buf[offs..offs + blob_ref.size()])?;
match val {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
need_image = false;
break;
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
reconstruct_state.records.push((*entry_lsn, rec));
if will_init {
// This WAL record initializes the page, so no need to go further back
need_image = false;
break;
}
}
}
}
}
@@ -205,9 +218,6 @@ impl Layer for DeltaLayer {
fn iter(&self) -> Box<dyn Iterator<Item = Result<(Key, Lsn, Value)>> + '_> {
let inner = self.load().unwrap();
let mut pairs: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
pairs.sort_by_key(|x| x.0);
match DeltaValueIter::new(inner) {
Ok(iter) => Box::new(iter),
Err(err) => Box::new(std::iter::once(Err(err))),
@@ -274,14 +284,14 @@ impl Layer for DeltaLayer {
let book = Book::new(file)?;
let chapter = book.chapter_reader(VALUES_CHAPTER)?;
let mut values: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
let mut values: Vec<(&Key, &VecMap<Lsn, BlobRef>)> = inner.index.iter().collect();
values.sort_by_key(|k| k.0);
for (key, versions) in values {
for (lsn, off) in versions.as_slice() {
for (lsn, blob_ref) in versions.as_slice() {
let mut desc = String::new();
let buf = utils::read_blob_from_chapter(&chapter, *off)?;
let mut buf = vec![0u8; blob_ref.size()];
chapter.read_exact_at(&mut buf, blob_ref.pos())?;
let val = Value::des(&buf);
match val {
@@ -468,7 +478,7 @@ pub struct DeltaLayerWriter {
key_start: Key,
lsn_range: Range<Lsn>,
index: HashMap<Key, VecMap<Lsn, u64>>,
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
values_writer: ChapterWriter<BufWriter<VirtualFile>>,
end_offset: u64,
@@ -529,10 +539,13 @@ impl DeltaLayerWriter {
// Remember the offset and size metadata. The metadata is written
// to a separate chapter, in `finish`.
let off = self.end_offset;
let len = utils::write_blob(&mut self.values_writer, &Value::ser(&val)?)?;
self.end_offset += len;
let buf = Value::ser(&val)?;
let len = buf.len();
self.values_writer.write_all(&buf)?;
self.end_offset += len as u64;
let vec_map = self.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
let blob_ref = BlobRef::new(off, len, val.will_init());
let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
bail!(
@@ -637,14 +650,13 @@ impl DeltaLayerWriter {
/// That takes up quite a lot of memory. Should do this in a more streaming
/// fashion.
///
struct DeltaValueIter<'a> {
all_offsets: Vec<(Key, Lsn, u64)>,
struct DeltaValueIter {
all_offsets: Vec<(Key, Lsn, BlobRef)>,
next_idx: usize,
inner: RwLockReadGuard<'a, DeltaLayerInner>,
data: Vec<u8>,
}
impl<'a> Iterator for DeltaValueIter<'a> {
impl Iterator for DeltaValueIter {
type Item = Result<(Key, Lsn, Value)>;
fn next(&mut self) -> Option<Self::Item> {
@@ -652,38 +664,40 @@ impl<'a> Iterator for DeltaValueIter<'a> {
}
}
impl<'a> DeltaValueIter<'a> {
fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result<Self> {
let mut index: Vec<(&Key, &VecMap<Lsn, u64>)> = inner.index.iter().collect();
impl DeltaValueIter {
fn new(inner: RwLockReadGuard<DeltaLayerInner>) -> Result<Self> {
let mut index: Vec<(&Key, &VecMap<Lsn, BlobRef>)> = inner.index.iter().collect();
index.sort_by_key(|x| x.0);
let mut all_offsets: Vec<(Key, Lsn, u64)> = Vec::new();
let mut all_offsets: Vec<(Key, Lsn, BlobRef)> = Vec::new();
for (key, vec_map) in index.iter() {
for (lsn, off) in vec_map.as_slice().iter() {
all_offsets.push((**key, *lsn, *off));
for (lsn, blob_ref) in vec_map.as_slice().iter() {
all_offsets.push((**key, *lsn, *blob_ref));
}
}
Ok(DeltaValueIter {
let values_reader = inner
.book
.as_ref()
.expect("should be loaded in load call above")
.chapter_reader(VALUES_CHAPTER)?;
let file_size = values_reader.len() as usize;
let mut layer = DeltaValueIter {
all_offsets,
inner,
next_idx: 0,
})
data: vec![0u8; file_size],
};
values_reader.read_exact_at(&mut layer.data, 0)?;
Ok(layer)
}
fn next_res(&mut self) -> Result<Option<(Key, Lsn, Value)>> {
if self.next_idx < self.all_offsets.len() {
let (key, lsn, off) = self.all_offsets[self.next_idx];
let values_reader = self
.inner
.book
.as_ref()
.expect("should be loaded in load call above")
.chapter_reader(VALUES_CHAPTER)?;
let val = Value::des(&utils::read_blob_from_chapter(&values_reader, off)?)?;
let (key, lsn, blob_ref) = self.all_offsets[self.next_idx];
let offs = blob_ref.pos() as usize;
let size = blob_ref.size();
let val = Value::des(&self.data[offs..offs + size])?;
self.next_idx += 1;
Ok(Some((key, lsn, val)))
} else {

View File

@@ -21,9 +21,8 @@
use crate::config::PageServerConf;
use crate::layered_repository::filename::{ImageFileName, PathOrConf};
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
BlobRef, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::layered_repository::utils;
use crate::repository::{Key, Value};
use crate::virtual_file::VirtualFile;
use crate::{ZTenantId, ZTimelineId};
@@ -105,7 +104,7 @@ pub struct ImageLayerInner {
book: Option<Book<VirtualFile>>,
/// offset of each value
index: HashMap<Key, u64>,
index: HashMap<Key, BlobRef>,
}
impl Layer for ImageLayer {
@@ -142,20 +141,24 @@ impl Layer for ImageLayer {
let inner = self.load()?;
if let Some(offset) = inner.index.get(&key) {
if let Some(blob_ref) = inner.index.get(&key) {
let chapter = inner
.book
.as_ref()
.unwrap()
.chapter_reader(VALUES_CHAPTER)?;
let blob = utils::read_blob_from_chapter(&chapter, *offset).with_context(|| {
format!(
"failed to read value from data file {} at offset {}",
self.filename().display(),
offset
)
})?;
let mut blob = vec![0; blob_ref.size()];
chapter
.read_exact_at(&mut blob, blob_ref.pos())
.with_context(|| {
format!(
"failed to read {} bytes from data file {} at offset {}",
blob_ref.size(),
self.filename().display(),
blob_ref.pos()
)
})?;
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
@@ -215,11 +218,16 @@ impl Layer for ImageLayer {
let inner = self.load()?;
let mut index_vec: Vec<(&Key, &u64)> = inner.index.iter().collect();
index_vec.sort_by_key(|x| x.1);
let mut index_vec: Vec<(&Key, &BlobRef)> = inner.index.iter().collect();
index_vec.sort_by_key(|x| x.1.pos());
for (key, offset) in index_vec {
println!("key: {} offset {}", key, offset);
for (key, blob_ref) in index_vec {
println!(
"key: {} size {} offset {}",
key,
blob_ref.size(),
blob_ref.pos()
);
}
Ok(())
@@ -385,7 +393,7 @@ pub struct ImageLayerWriter {
values_writer: Option<ChapterWriter<BufWriter<VirtualFile>>>,
end_offset: u64,
index: HashMap<Key, u64>,
index: HashMap<Key, BlobRef>,
finished: bool,
}
@@ -446,10 +454,11 @@ impl ImageLayerWriter {
let off = self.end_offset;
if let Some(writer) = &mut self.values_writer {
let len = utils::write_blob(writer, img)?;
self.end_offset += len;
let len = img.len();
writer.write_all(img)?;
self.end_offset += len as u64;
let old = self.index.insert(key, off);
let old = self.index.insert(key, BlobRef::new(off, len, true));
assert!(old.is_none());
} else {
panic!()

View File

@@ -8,9 +8,8 @@ use crate::config::PageServerConf;
use crate::layered_repository::delta_layer::{DeltaLayer, DeltaLayerWriter};
use crate::layered_repository::ephemeral_file::EphemeralFile;
use crate::layered_repository::storage_layer::{
Layer, ValueReconstructResult, ValueReconstructState,
BlobRef, Layer, ValueReconstructResult, ValueReconstructState,
};
use crate::layered_repository::utils;
use crate::repository::{Key, Value};
use crate::walrecord;
use crate::{ZTenantId, ZTimelineId};
@@ -20,7 +19,9 @@ use std::collections::HashMap;
// avoid binding to Write (conflicts with std::io::Write)
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::io::Write;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::sync::RwLock;
use zenith_utils::bin_ser::BeSer;
@@ -53,7 +54,7 @@ pub struct InMemoryLayerInner {
/// by block number and LSN. The value is an offset into the
/// ephemeral file where the page version is stored.
///
index: HashMap<Key, VecMap<Lsn, u64>>,
index: HashMap<Key, VecMap<Lsn, BlobRef>>,
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
@@ -122,7 +123,7 @@ impl Layer for InMemoryLayer {
// Scan the page versions backwards, starting from `lsn`.
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
for (entry_lsn, blob_ref) in slice.iter().rev() {
match &reconstruct_state.img {
Some((cached_lsn, _)) if entry_lsn <= cached_lsn => {
return Ok(ValueReconstructResult::Complete)
@@ -130,7 +131,9 @@ impl Layer for InMemoryLayer {
_ => {}
}
let value = Value::des(&utils::read_blob(&inner.file, *pos)?)?;
let mut buf = vec![0u8; blob_ref.size()];
inner.file.read_exact_at(&mut buf, blob_ref.pos())?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
@@ -203,10 +206,11 @@ impl Layer for InMemoryLayer {
let mut buf = Vec::new();
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
for (lsn, blob_ref) in vec_map.as_slice() {
let mut desc = String::new();
let len = utils::read_blob_buf(&inner.file, *pos, &mut buf)?;
let val = Value::des(&buf[0..len]);
buf.resize(blob_ref.size(), 0);
inner.file.read_exact_at(&mut buf, blob_ref.pos())?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
write!(&mut desc, " img {} bytes", img.len())?;
@@ -276,11 +280,14 @@ impl InMemoryLayer {
inner.assert_writeable();
let off = inner.end_offset;
let len = utils::write_blob(&mut inner.file, &Value::ser(&val)?)?;
inner.end_offset += len;
let buf = Value::ser(&val)?;
let len = buf.len();
inner.file.write_all(&buf)?;
inner.end_offset += len as u64;
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
let blob_ref = BlobRef::new(off, len, val.will_init());
let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
@@ -348,13 +355,13 @@ impl InMemoryLayer {
self.start_lsn..inner.end_lsn.unwrap(),
)?;
let mut buf = Vec::new();
let mut do_steps = || -> Result<()> {
for (key, vec_map) in inner.index.iter() {
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
let len = utils::read_blob_buf(&inner.file, *pos, &mut buf)?;
let val = Value::des(&buf[0..len])?;
for (lsn, blob_ref) in vec_map.as_slice() {
let mut buf = vec![0u8; blob_ref.size()];
inner.file.read_exact_at(&mut buf, blob_ref.pos())?;
let val = Value::des(&buf)?;
delta_layer_writer.put_value(*key, *lsn, val)?;
}
}

View File

@@ -7,6 +7,7 @@ use crate::walrecord::ZenithWalRecord;
use crate::{ZTenantId, ZTimelineId};
use anyhow::Result;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::ops::Range;
use std::path::PathBuf;
@@ -144,3 +145,36 @@ pub trait Layer: Send + Sync {
/// Dump summary of the contents of the layer to stdout
fn dump(&self) -> Result<()>;
}
// Flag indicating that this version initialize the page
const WILL_INIT: u64 = 1;
///
/// Struct representing reference to BLOB in layers. Reference contains BLOB offset and size.
/// For WAL records (delta layer) it also contains `will_init` flag which helps to determine range of records
/// which needs to be applied without reading/deserializing records themselves.
///
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct BlobRef(u64);
impl BlobRef {
pub fn will_init(&self) -> bool {
(self.0 & WILL_INIT) != 0
}
pub fn pos(&self) -> u64 {
self.0 >> 32
}
pub fn size(&self) -> usize {
((self.0 & 0xFFFFFFFF) >> 1) as usize
}
pub fn new(pos: u64, size: usize, will_init: bool) -> BlobRef {
let mut blob_ref = (pos << 32) | ((size as u64) << 1);
if will_init {
blob_ref |= WILL_INIT;
}
BlobRef(blob_ref)
}
}

View File

@@ -1,53 +0,0 @@
// Utilities for reading and writing Values
use std::io::{Error, Write};
use std::os::unix::fs::FileExt;
use bookfile::BoundedReader;
pub fn read_blob_buf<F: FileExt>(file: &F, off: u64, buf: &mut Vec<u8>) -> Result<usize, Error> {
// read length
let mut len_buf = [0u8; 4];
file.read_exact_at(&mut len_buf, off)?;
let len = u32::from_ne_bytes(len_buf) as usize;
buf.resize(len, 0);
file.read_exact_at(&mut buf.as_mut_slice(), off + 4)?;
Ok(len)
}
pub fn read_blob<F: FileExt>(file: &F, off: u64) -> Result<Vec<u8>, Error> {
let mut buf: Vec<u8> = Vec::new();
let _ = read_blob_buf(file, off, &mut buf);
Ok(buf)
}
pub fn read_blob_from_chapter<F: FileExt>(
file: &BoundedReader<&F>,
off: u64,
) -> Result<Vec<u8>, Error> {
// read length
let mut len_buf = [0u8; 4];
file.read_exact_at(&mut len_buf, off)?;
let len = u32::from_ne_bytes(len_buf);
let mut buf: Vec<u8> = Vec::new();
buf.resize(len as usize, 0);
file.read_exact_at(&mut buf.as_mut_slice(), off + 4)?;
Ok(buf)
}
pub fn write_blob<W: Write>(writer: &mut W, buf: &[u8]) -> Result<u64, Error> {
let val_len = buf.len() as u32;
// write the 'length' field and kind byte.
let lenbuf = u32::to_ne_bytes(val_len);
writer.write_all(&lenbuf)?;
writer.write_all(buf)?;
Ok(4 + val_len as u64)
}

View File

@@ -154,6 +154,13 @@ impl Value {
pub fn is_image(&self) -> bool {
matches!(self, Value::Image(_))
}
pub fn will_init(&self) -> bool {
match self {
Value::Image(_) => true,
Value::WalRecord(rec) => rec.will_init(),
}
}
}
///