From a39de2997ff159451b6da9f94dbb99ed0bf71a90 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 16 Mar 2022 18:01:24 +0300 Subject: [PATCH] Optimize reading versions for delta_layer Store blob size in layer metadata for all layers types Heikki: This is a squashed version of PR #1369 --- pageserver/src/layered_repository.rs | 1 - .../src/layered_repository/delta_layer.rs | 118 ++++++++++-------- .../src/layered_repository/image_layer.rs | 47 ++++--- .../src/layered_repository/inmemory_layer.rs | 37 +++--- .../src/layered_repository/storage_layer.rs | 34 +++++ pageserver/src/layered_repository/utils.rs | 53 -------- pageserver/src/repository.rs | 7 ++ 7 files changed, 157 insertions(+), 140 deletions(-) delete mode 100644 pageserver/src/layered_repository/utils.rs diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 53dfd371e1..ca4dc7d6fe 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -62,7 +62,6 @@ mod layer_map; pub mod metadata; mod par_fsync; mod storage_layer; -mod utils; use delta_layer::{DeltaLayer, DeltaLayerWriter}; use ephemeral_file::is_ephemeral_file; diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index fd4a21cc14..56fd86b4c0 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -33,9 +33,8 @@ use crate::config::PageServerConf; use crate::layered_repository::filename::{DeltaFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ - Layer, ValueReconstructResult, ValueReconstructState, + BlobRef, Layer, ValueReconstructResult, ValueReconstructState, }; -use crate::layered_repository::utils; use crate::repository::{Key, Value}; use crate::virtual_file::VirtualFile; use crate::walrecord; @@ -122,7 +121,7 @@ pub struct DeltaLayerInner { /// Indexed by block number and LSN. The value is an offset into the /// chapter where the page version is stored. /// - index: HashMap>, + index: HashMap>, book: Option>, } @@ -170,22 +169,36 @@ impl Layer for DeltaLayer { // Scan the page versions backwards, starting from `lsn`. if let Some(vec_map) = inner.index.get(&key) { let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { - let val = Value::des(&utils::read_blob_from_chapter(&values_reader, *pos)?)?; - match val { - Value::Image(img) => { - reconstruct_state.img = Some((*entry_lsn, img)); - need_image = false; - break; - } - Value::WalRecord(rec) => { - let will_init = rec.will_init(); - reconstruct_state.records.push((*entry_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back + let mut size = 0usize; + let mut first_pos = 0u64; + for (_entry_lsn, blob_ref) in slice.iter().rev() { + size += blob_ref.size(); + first_pos = blob_ref.pos(); + if blob_ref.will_init() { + break; + } + } + if size != 0 { + let mut buf = vec![0u8; size]; + values_reader.read_exact_at(&mut buf, first_pos)?; + for (entry_lsn, blob_ref) in slice.iter().rev() { + let offs = (blob_ref.pos() - first_pos) as usize; + let val = Value::des(&buf[offs..offs + blob_ref.size()])?; + match val { + Value::Image(img) => { + reconstruct_state.img = Some((*entry_lsn, img)); need_image = false; break; } + Value::WalRecord(rec) => { + let will_init = rec.will_init(); + reconstruct_state.records.push((*entry_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } + } } } } @@ -205,9 +218,6 @@ impl Layer for DeltaLayer { fn iter(&self) -> Box> + '_> { let inner = self.load().unwrap(); - let mut pairs: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); - pairs.sort_by_key(|x| x.0); - match DeltaValueIter::new(inner) { Ok(iter) => Box::new(iter), Err(err) => Box::new(std::iter::once(Err(err))), @@ -274,14 +284,14 @@ impl Layer for DeltaLayer { let book = Book::new(file)?; let chapter = book.chapter_reader(VALUES_CHAPTER)?; - let mut values: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); + let mut values: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); values.sort_by_key(|k| k.0); for (key, versions) in values { - for (lsn, off) in versions.as_slice() { + for (lsn, blob_ref) in versions.as_slice() { let mut desc = String::new(); - - let buf = utils::read_blob_from_chapter(&chapter, *off)?; + let mut buf = vec![0u8; blob_ref.size()]; + chapter.read_exact_at(&mut buf, blob_ref.pos())?; let val = Value::des(&buf); match val { @@ -468,7 +478,7 @@ pub struct DeltaLayerWriter { key_start: Key, lsn_range: Range, - index: HashMap>, + index: HashMap>, values_writer: ChapterWriter>, end_offset: u64, @@ -529,10 +539,13 @@ impl DeltaLayerWriter { // Remember the offset and size metadata. The metadata is written // to a separate chapter, in `finish`. let off = self.end_offset; - let len = utils::write_blob(&mut self.values_writer, &Value::ser(&val)?)?; - self.end_offset += len; + let buf = Value::ser(&val)?; + let len = buf.len(); + self.values_writer.write_all(&buf)?; + self.end_offset += len as u64; let vec_map = self.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + let blob_ref = BlobRef::new(off, len, val.will_init()); + let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. bail!( @@ -637,14 +650,13 @@ impl DeltaLayerWriter { /// That takes up quite a lot of memory. Should do this in a more streaming /// fashion. /// -struct DeltaValueIter<'a> { - all_offsets: Vec<(Key, Lsn, u64)>, +struct DeltaValueIter { + all_offsets: Vec<(Key, Lsn, BlobRef)>, next_idx: usize, - - inner: RwLockReadGuard<'a, DeltaLayerInner>, + data: Vec, } -impl<'a> Iterator for DeltaValueIter<'a> { +impl Iterator for DeltaValueIter { type Item = Result<(Key, Lsn, Value)>; fn next(&mut self) -> Option { @@ -652,38 +664,40 @@ impl<'a> Iterator for DeltaValueIter<'a> { } } -impl<'a> DeltaValueIter<'a> { - fn new(inner: RwLockReadGuard<'a, DeltaLayerInner>) -> Result { - let mut index: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); +impl DeltaValueIter { + fn new(inner: RwLockReadGuard) -> Result { + let mut index: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); index.sort_by_key(|x| x.0); - let mut all_offsets: Vec<(Key, Lsn, u64)> = Vec::new(); + let mut all_offsets: Vec<(Key, Lsn, BlobRef)> = Vec::new(); for (key, vec_map) in index.iter() { - for (lsn, off) in vec_map.as_slice().iter() { - all_offsets.push((**key, *lsn, *off)); + for (lsn, blob_ref) in vec_map.as_slice().iter() { + all_offsets.push((**key, *lsn, *blob_ref)); } } - Ok(DeltaValueIter { + let values_reader = inner + .book + .as_ref() + .expect("should be loaded in load call above") + .chapter_reader(VALUES_CHAPTER)?; + let file_size = values_reader.len() as usize; + let mut layer = DeltaValueIter { all_offsets, - inner, next_idx: 0, - }) + data: vec![0u8; file_size], + }; + values_reader.read_exact_at(&mut layer.data, 0)?; + + Ok(layer) } fn next_res(&mut self) -> Result> { if self.next_idx < self.all_offsets.len() { - let (key, lsn, off) = self.all_offsets[self.next_idx]; - - let values_reader = self - .inner - .book - .as_ref() - .expect("should be loaded in load call above") - .chapter_reader(VALUES_CHAPTER)?; - - let val = Value::des(&utils::read_blob_from_chapter(&values_reader, off)?)?; - + let (key, lsn, blob_ref) = self.all_offsets[self.next_idx]; + let offs = blob_ref.pos() as usize; + let size = blob_ref.size(); + let val = Value::des(&self.data[offs..offs + size])?; self.next_idx += 1; Ok(Some((key, lsn, val))) } else { diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index ecea5b4fcf..948e5b1433 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -21,9 +21,8 @@ use crate::config::PageServerConf; use crate::layered_repository::filename::{ImageFileName, PathOrConf}; use crate::layered_repository::storage_layer::{ - Layer, ValueReconstructResult, ValueReconstructState, + BlobRef, Layer, ValueReconstructResult, ValueReconstructState, }; -use crate::layered_repository::utils; use crate::repository::{Key, Value}; use crate::virtual_file::VirtualFile; use crate::{ZTenantId, ZTimelineId}; @@ -105,7 +104,7 @@ pub struct ImageLayerInner { book: Option>, /// offset of each value - index: HashMap, + index: HashMap, } impl Layer for ImageLayer { @@ -142,20 +141,24 @@ impl Layer for ImageLayer { let inner = self.load()?; - if let Some(offset) = inner.index.get(&key) { + if let Some(blob_ref) = inner.index.get(&key) { let chapter = inner .book .as_ref() .unwrap() .chapter_reader(VALUES_CHAPTER)?; - let blob = utils::read_blob_from_chapter(&chapter, *offset).with_context(|| { - format!( - "failed to read value from data file {} at offset {}", - self.filename().display(), - offset - ) - })?; + let mut blob = vec![0; blob_ref.size()]; + chapter + .read_exact_at(&mut blob, blob_ref.pos()) + .with_context(|| { + format!( + "failed to read {} bytes from data file {} at offset {}", + blob_ref.size(), + self.filename().display(), + blob_ref.pos() + ) + })?; let value = Bytes::from(blob); reconstruct_state.img = Some((self.lsn, value)); @@ -215,11 +218,16 @@ impl Layer for ImageLayer { let inner = self.load()?; - let mut index_vec: Vec<(&Key, &u64)> = inner.index.iter().collect(); - index_vec.sort_by_key(|x| x.1); + let mut index_vec: Vec<(&Key, &BlobRef)> = inner.index.iter().collect(); + index_vec.sort_by_key(|x| x.1.pos()); - for (key, offset) in index_vec { - println!("key: {} offset {}", key, offset); + for (key, blob_ref) in index_vec { + println!( + "key: {} size {} offset {}", + key, + blob_ref.size(), + blob_ref.pos() + ); } Ok(()) @@ -385,7 +393,7 @@ pub struct ImageLayerWriter { values_writer: Option>>, end_offset: u64, - index: HashMap, + index: HashMap, finished: bool, } @@ -446,10 +454,11 @@ impl ImageLayerWriter { let off = self.end_offset; if let Some(writer) = &mut self.values_writer { - let len = utils::write_blob(writer, img)?; - self.end_offset += len; + let len = img.len(); + writer.write_all(img)?; + self.end_offset += len as u64; - let old = self.index.insert(key, off); + let old = self.index.insert(key, BlobRef::new(off, len, true)); assert!(old.is_none()); } else { panic!() diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 145dbeecb1..1e2f4f52df 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -8,9 +8,8 @@ use crate::config::PageServerConf; use crate::layered_repository::delta_layer::{DeltaLayer, DeltaLayerWriter}; use crate::layered_repository::ephemeral_file::EphemeralFile; use crate::layered_repository::storage_layer::{ - Layer, ValueReconstructResult, ValueReconstructState, + BlobRef, Layer, ValueReconstructResult, ValueReconstructState, }; -use crate::layered_repository::utils; use crate::repository::{Key, Value}; use crate::walrecord; use crate::{ZTenantId, ZTimelineId}; @@ -20,7 +19,9 @@ use std::collections::HashMap; // avoid binding to Write (conflicts with std::io::Write) // while being able to use std::fmt::Write's methods use std::fmt::Write as _; +use std::io::Write; use std::ops::Range; +use std::os::unix::fs::FileExt; use std::path::PathBuf; use std::sync::RwLock; use zenith_utils::bin_ser::BeSer; @@ -53,7 +54,7 @@ pub struct InMemoryLayerInner { /// by block number and LSN. The value is an offset into the /// ephemeral file where the page version is stored. /// - index: HashMap>, + index: HashMap>, /// The values are stored in a serialized format in this file. /// Each serialized Value is preceded by a 'u32' length field. @@ -122,7 +123,7 @@ impl Layer for InMemoryLayer { // Scan the page versions backwards, starting from `lsn`. if let Some(vec_map) = inner.index.get(&key) { let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { + for (entry_lsn, blob_ref) in slice.iter().rev() { match &reconstruct_state.img { Some((cached_lsn, _)) if entry_lsn <= cached_lsn => { return Ok(ValueReconstructResult::Complete) @@ -130,7 +131,9 @@ impl Layer for InMemoryLayer { _ => {} } - let value = Value::des(&utils::read_blob(&inner.file, *pos)?)?; + let mut buf = vec![0u8; blob_ref.size()]; + inner.file.read_exact_at(&mut buf, blob_ref.pos())?; + let value = Value::des(&buf)?; match value { Value::Image(img) => { reconstruct_state.img = Some((*entry_lsn, img)); @@ -203,10 +206,11 @@ impl Layer for InMemoryLayer { let mut buf = Vec::new(); for (key, vec_map) in inner.index.iter() { - for (lsn, pos) in vec_map.as_slice() { + for (lsn, blob_ref) in vec_map.as_slice() { let mut desc = String::new(); - let len = utils::read_blob_buf(&inner.file, *pos, &mut buf)?; - let val = Value::des(&buf[0..len]); + buf.resize(blob_ref.size(), 0); + inner.file.read_exact_at(&mut buf, blob_ref.pos())?; + let val = Value::des(&buf); match val { Ok(Value::Image(img)) => { write!(&mut desc, " img {} bytes", img.len())?; @@ -276,11 +280,14 @@ impl InMemoryLayer { inner.assert_writeable(); let off = inner.end_offset; - let len = utils::write_blob(&mut inner.file, &Value::ser(&val)?)?; - inner.end_offset += len; + let buf = Value::ser(&val)?; + let len = buf.len(); + inner.file.write_all(&buf)?; + inner.end_offset += len as u64; let vec_map = inner.index.entry(key).or_default(); - let old = vec_map.append_or_update_last(lsn, off).unwrap().0; + let blob_ref = BlobRef::new(off, len, val.will_init()); + let old = vec_map.append_or_update_last(lsn, blob_ref).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. warn!("Key {} at {} already exists", key, lsn); @@ -348,13 +355,13 @@ impl InMemoryLayer { self.start_lsn..inner.end_lsn.unwrap(), )?; - let mut buf = Vec::new(); let mut do_steps = || -> Result<()> { for (key, vec_map) in inner.index.iter() { // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - let len = utils::read_blob_buf(&inner.file, *pos, &mut buf)?; - let val = Value::des(&buf[0..len])?; + for (lsn, blob_ref) in vec_map.as_slice() { + let mut buf = vec![0u8; blob_ref.size()]; + inner.file.read_exact_at(&mut buf, blob_ref.pos())?; + let val = Value::des(&buf)?; delta_layer_writer.put_value(*key, *lsn, val)?; } } diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index c5314350c8..5847f9cb75 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -7,6 +7,7 @@ use crate::walrecord::ZenithWalRecord; use crate::{ZTenantId, ZTimelineId}; use anyhow::Result; use bytes::Bytes; +use serde::{Deserialize, Serialize}; use std::ops::Range; use std::path::PathBuf; @@ -144,3 +145,36 @@ pub trait Layer: Send + Sync { /// Dump summary of the contents of the layer to stdout fn dump(&self) -> Result<()>; } + +// Flag indicating that this version initialize the page +const WILL_INIT: u64 = 1; + +/// +/// Struct representing reference to BLOB in layers. Reference contains BLOB offset and size. +/// For WAL records (delta layer) it also contains `will_init` flag which helps to determine range of records +/// which needs to be applied without reading/deserializing records themselves. +/// +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +pub struct BlobRef(u64); + +impl BlobRef { + pub fn will_init(&self) -> bool { + (self.0 & WILL_INIT) != 0 + } + + pub fn pos(&self) -> u64 { + self.0 >> 32 + } + + pub fn size(&self) -> usize { + ((self.0 & 0xFFFFFFFF) >> 1) as usize + } + + pub fn new(pos: u64, size: usize, will_init: bool) -> BlobRef { + let mut blob_ref = (pos << 32) | ((size as u64) << 1); + if will_init { + blob_ref |= WILL_INIT; + } + BlobRef(blob_ref) + } +} diff --git a/pageserver/src/layered_repository/utils.rs b/pageserver/src/layered_repository/utils.rs deleted file mode 100644 index b3aa8c7ef4..0000000000 --- a/pageserver/src/layered_repository/utils.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Utilities for reading and writing Values -use std::io::{Error, Write}; -use std::os::unix::fs::FileExt; - -use bookfile::BoundedReader; - -pub fn read_blob_buf(file: &F, off: u64, buf: &mut Vec) -> Result { - // read length - let mut len_buf = [0u8; 4]; - file.read_exact_at(&mut len_buf, off)?; - - let len = u32::from_ne_bytes(len_buf) as usize; - - buf.resize(len, 0); - file.read_exact_at(&mut buf.as_mut_slice(), off + 4)?; - - Ok(len) -} - -pub fn read_blob(file: &F, off: u64) -> Result, Error> { - let mut buf: Vec = Vec::new(); - let _ = read_blob_buf(file, off, &mut buf); - Ok(buf) -} - -pub fn read_blob_from_chapter( - file: &BoundedReader<&F>, - off: u64, -) -> Result, Error> { - // read length - let mut len_buf = [0u8; 4]; - file.read_exact_at(&mut len_buf, off)?; - - let len = u32::from_ne_bytes(len_buf); - - let mut buf: Vec = Vec::new(); - buf.resize(len as usize, 0); - file.read_exact_at(&mut buf.as_mut_slice(), off + 4)?; - - Ok(buf) -} - -pub fn write_blob(writer: &mut W, buf: &[u8]) -> Result { - let val_len = buf.len() as u32; - - // write the 'length' field and kind byte. - let lenbuf = u32::to_ne_bytes(val_len); - - writer.write_all(&lenbuf)?; - writer.write_all(buf)?; - - Ok(4 + val_len as u64) -} diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 56bd5208ca..4eb352f68d 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -154,6 +154,13 @@ impl Value { pub fn is_image(&self) -> bool { matches!(self, Value::Image(_)) } + + pub fn will_init(&self) -> bool { + match self { + Value::Image(_) => true, + Value::WalRecord(rec) => rec.will_init(), + } + } } ///