From 69528b7c30eeff18b6ef3ef973e3f107093e6694 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Tue, 1 Aug 2023 13:38:35 +0200 Subject: [PATCH] Prepare k-merge in compaction for async I/O (#4836) ## Problem The k-merge in pageserver compaction currently relies on iterators over the keys and also over the values. This approach does not support async code because we are using iterators and those don't support async in general. Also, the k-merge implementation we use doesn't support async either. Instead, as we already load all the keys into memory, the plan is to just do the sorting in-memory for now, switch to async, and then once we want to support workloads that don't have all keys stored in memory, we can look into switching to a k-merge implementation that supports async instead. ## Summary of changes The core of this PR is the move from functions on the `PersistentLayer` trait to return custom iterator types to inherent functions on `DeltaLayer` that return buffers with all keys or value references. Value references are a type we created in this PR, containing a `BlobRef` as well as an `Arc` pointer to the `DeltaLayerInner`, so that we can lazily load the values during compaction. This preserves the property of the current code. This PR does not switch us to doing the k-merge via sort on slices, but with this PR, doing such a switch is relatively easy and only requires changes of the compaction code itself. Part of https://github.com/neondatabase/neon/issues/4743 --- pageserver/ctl/src/layers.rs | 2 +- pageserver/src/tenant/blob_io.rs | 4 +- pageserver/src/tenant/block_io.rs | 2 +- pageserver/src/tenant/ephemeral_file.rs | 2 +- pageserver/src/tenant/storage_layer.rs | 19 +- .../src/tenant/storage_layer/delta_layer.rs | 227 ++++++------------ .../src/tenant/storage_layer/image_layer.rs | 8 +- .../tenant/storage_layer/inmemory_layer.rs | 6 +- .../src/tenant/storage_layer/remote_layer.rs | 12 +- pageserver/src/tenant/timeline.rs | 40 +-- 10 files changed, 116 insertions(+), 206 deletions(-) diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index d81303d533..74613eb5f4 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -68,7 +68,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { true }, )?; - let mut cursor = BlockCursor::new(&file); + let cursor = BlockCursor::new(&file); for (k, v) in all { let value = cursor.read_blob(v.pos())?; println!("key:{} value_len:{}", k, value.len()); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 4dcf7fe5fe..7dd53407e7 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -21,7 +21,7 @@ where R: BlockReader, { /// Read a blob into a new buffer. - pub fn read_blob(&mut self, offset: u64) -> Result, std::io::Error> { + pub fn read_blob(&self, offset: u64) -> Result, std::io::Error> { let mut buf = Vec::new(); self.read_blob_into_buf(offset, &mut buf)?; Ok(buf) @@ -29,7 +29,7 @@ where /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. pub fn read_blob_into_buf( - &mut self, + &self, offset: u64, dstbuf: &mut Vec, ) -> Result<(), std::io::Error> { diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 10de34e3f6..e6ebebe594 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -80,7 +80,7 @@ where BlockCursor { reader } } - pub fn read_blk(&mut self, blknum: u32) -> Result { + pub fn read_blk(&self, blknum: u32) -> Result { self.reader.read_blk(blknum) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index f44534249c..8d42150824 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -420,7 +420,7 @@ mod tests { blobs.push((pos, data)); } - let mut cursor = BlockCursor::new(&file); + let cursor = BlockCursor::new(&file); for (pos, expected) in blobs { let actual = cursor.read_blob(pos)?; assert_eq!(actual, expected); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 05381cb56d..b4db7e2f08 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -9,7 +9,7 @@ mod remote_layer; use crate::config::PageServerConf; use crate::context::RequestContext; -use crate::repository::{Key, Value}; +use crate::repository::Key; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; @@ -34,7 +34,7 @@ use utils::{ lsn::Lsn, }; -pub use delta_layer::{DeltaLayer, DeltaLayerWriter}; +pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef}; pub use filename::{DeltaFileName, ImageFileName, LayerFileName}; pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; @@ -381,12 +381,6 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static { async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>; } -/// Returned by [`PersistentLayer::iter`] -pub type LayerIter<'i> = Box> + 'i + Send>; - -/// Returned by [`PersistentLayer::key_iter`] -pub type LayerKeyIter<'i> = Box + 'i + Send>; - /// Get a layer descriptor from a layer. pub trait AsLayerDesc { /// Get the layer descriptor. @@ -427,15 +421,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc { // `None` for `RemoteLayer`. fn local_path(&self) -> Option; - /// Iterate through all keys and values stored in the layer - fn iter(&self, ctx: &RequestContext) -> Result>; - - /// Iterate through all keys stored in the layer. Returns key, lsn and value size - /// It is used only for compaction and so is currently implemented only for DeltaLayer - fn key_iter(&self, _ctx: &RequestContext) -> Result> { - panic!("Not implemented") - } - /// Permanently remove this layer from disk. fn delete_resident_layer_file(&self) -> Result<()>; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 7574554b4e..98dc1230f5 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -61,8 +61,8 @@ use utils::{ }; use super::{ - AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter, - LayerKeyIter, PathOrConf, PersistentLayerDesc, + AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf, + PersistentLayerDesc, }; /// @@ -189,7 +189,7 @@ pub struct DeltaLayer { access_stats: LayerAccessStats, - inner: OnceCell, + inner: OnceCell>, } impl std::fmt::Debug for DeltaLayer { @@ -258,10 +258,10 @@ impl Layer for DeltaLayer { tree_reader.dump().await?; - let mut cursor = file.block_cursor(); + let cursor = file.block_cursor(); // A subroutine to dump a single blob - let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result { + let dump_blob = |blob_ref: BlobRef| -> anyhow::Result { let buf = cursor.read_blob(blob_ref.pos())?; let val = Value::des(&buf)?; let desc = match val { @@ -343,7 +343,7 @@ impl Layer for DeltaLayer { })?; // Ok, 'offsets' now contains the offsets of all the entries we need to read - let mut cursor = file.block_cursor(); + let cursor = file.block_cursor(); let mut buf = Vec::new(); for (entry_lsn, pos) in offsets { cursor.read_blob_into_buf(pos, &mut buf).with_context(|| { @@ -424,23 +424,6 @@ impl PersistentLayer for DeltaLayer { Some(self.path()) } - fn iter(&self, ctx: &RequestContext) -> Result> { - let inner = self - .load(LayerAccessKind::KeyIter, ctx) - .context("load delta layer")?; - Ok(match DeltaValueIter::new(inner) { - Ok(iter) => Box::new(iter), - Err(err) => Box::new(std::iter::once(Err(err))), - }) - } - - fn key_iter(&self, ctx: &RequestContext) -> Result> { - let inner = self.load(LayerAccessKind::KeyIter, ctx)?; - Ok(Box::new( - DeltaKeyIter::new(inner).context("Layer index is corrupted")?, - )) - } - fn delete_resident_layer_file(&self) -> Result<()> { // delete underlying file fs::remove_file(self.path())?; @@ -510,7 +493,11 @@ impl DeltaLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> { + fn load( + &self, + access_kind: LayerAccessKind, + ctx: &RequestContext, + ) -> Result<&Arc> { self.access_stats .record_access(access_kind, ctx.task_kind()); // Quick exit if already loaded @@ -519,7 +506,7 @@ impl DeltaLayer { .with_context(|| format!("Failed to load delta layer {}", self.path().display())) } - fn load_inner(&self) -> Result { + fn load_inner(&self) -> Result> { let path = self.path(); let file = VirtualFile::open(&path) @@ -554,11 +541,11 @@ impl DeltaLayer { debug!("loaded from {}", &path.display()); - Ok(DeltaLayerInner { + Ok(Arc::new(DeltaLayerInner { file, index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, - }) + })) } /// Create a DeltaLayer struct representing an existing file on disk. @@ -623,6 +610,24 @@ impl DeltaLayer { &self.layer_name(), ) } + + /// Obtains all keys and value references stored in the layer + /// + /// The value can be obtained via the [`ValueRef::load`] function. + pub fn load_val_refs(&self, ctx: &RequestContext) -> Result> { + let inner = self + .load(LayerAccessKind::KeyIter, ctx) + .context("load delta layer")?; + DeltaLayerInner::load_val_refs(inner).context("Layer index is corrupted") + } + + /// Loads all keys stored in the layer. Returns key, lsn and value size. + pub fn load_keys(&self, ctx: &RequestContext) -> Result> { + let inner = self + .load(LayerAccessKind::KeyIter, ctx) + .context("load delta layer keys")?; + inner.load_keys().context("Layer index is corrupted") + } } /// A builder object for constructing a new delta layer. @@ -893,121 +898,41 @@ impl Drop for DeltaLayerWriter { } } -/// -/// Iterator over all key-value pairse stored in a delta layer -/// -/// FIXME: This creates a Vector to hold the offsets of all key value pairs. -/// That takes up quite a lot of memory. Should do this in a more streaming -/// fashion. -/// -struct DeltaValueIter<'a> { - all_offsets: Vec<(DeltaKey, BlobRef)>, - next_idx: usize, - reader: BlockCursor>, -} - -struct Adapter<'a>(&'a DeltaLayerInner); - -impl<'a> BlockReader for Adapter<'a> { - type BlockLease = PageReadGuard<'static>; - - fn read_blk(&self, blknum: u32) -> Result { - self.0.file.read_blk(blknum) - } -} - -impl<'a> Iterator for DeltaValueIter<'a> { - type Item = Result<(Key, Lsn, Value)>; - - fn next(&mut self) -> Option { - self.next_res().transpose() - } -} - -impl<'a> DeltaValueIter<'a> { - fn new(inner: &'a DeltaLayerInner) -> Result { - let file = &inner.file; +impl DeltaLayerInner { + fn load_val_refs(this: &Arc) -> Result> { + let file = &this.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, + this.index_start_blk, + this.index_root_blk, file, ); - let mut all_offsets: Vec<(DeltaKey, BlobRef)> = Vec::new(); + let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new(); tree_reader.visit( &[0u8; DELTA_KEY_SIZE], VisitDirection::Forwards, |key, value| { - all_offsets.push((DeltaKey::from_slice(key), BlobRef(value))); + let delta_key = DeltaKey::from_slice(key); + let val_ref = ValueRef { + blob_ref: BlobRef(value), + reader: BlockCursor::new(Adapter(this.clone())), + }; + all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref)); true }, )?; - let iter = DeltaValueIter { - all_offsets, - next_idx: 0, - reader: BlockCursor::new(Adapter(inner)), - }; - - Ok(iter) + Ok(all_offsets) } - - fn next_res(&mut self) -> Result> { - if self.next_idx < self.all_offsets.len() { - let (delta_key, blob_ref) = &self.all_offsets[self.next_idx]; - - let key = delta_key.key(); - let lsn = delta_key.lsn(); - - let buf = self.reader.read_blob(blob_ref.pos())?; - let val = Value::des(&buf)?; - self.next_idx += 1; - Ok(Some((key, lsn, val))) - } else { - Ok(None) - } - } -} -/// -/// Iterator over all keys stored in a delta layer -/// -/// FIXME: This creates a Vector to hold all keys. -/// That takes up quite a lot of memory. Should do this in a more streaming -/// fashion. -/// -struct DeltaKeyIter { - all_keys: Vec<(DeltaKey, u64)>, - next_idx: usize, -} - -impl Iterator for DeltaKeyIter { - type Item = (Key, Lsn, u64); - - fn next(&mut self) -> Option { - if self.next_idx < self.all_keys.len() { - let (delta_key, size) = &self.all_keys[self.next_idx]; - - let key = delta_key.key(); - let lsn = delta_key.lsn(); - - self.next_idx += 1; - Some((key, lsn, *size)) - } else { - None - } - } -} - -impl<'a> DeltaKeyIter { - fn new(inner: &'a DeltaLayerInner) -> Result { - let file = &inner.file; + fn load_keys(&self) -> Result> { + let file = &self.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, + self.index_start_blk, + self.index_root_blk, file, ); - let mut all_keys: Vec<(DeltaKey, u64)> = Vec::new(); + let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new(); tree_reader.visit( &[0u8; DELTA_KEY_SIZE], VisitDirection::Forwards, @@ -1015,46 +940,48 @@ impl<'a> DeltaKeyIter { let delta_key = DeltaKey::from_slice(key); let pos = BlobRef(value).pos(); if let Some(last) = all_keys.last_mut() { - if last.0.key() == delta_key.key() { + if last.0 == delta_key.key() { return true; } else { // subtract offset of new key BLOB and first blob of this key // to get total size if values associated with this key - let first_pos = last.1; - last.1 = pos - first_pos; + let first_pos = last.2; + last.2 = pos - first_pos; } } - all_keys.push((delta_key, pos)); + all_keys.push((delta_key.key(), delta_key.lsn(), pos)); true }, )?; if let Some(last) = all_keys.last_mut() { // Last key occupies all space till end of layer - last.1 = std::fs::metadata(&file.file.path)?.len() - last.1; + last.2 = std::fs::metadata(&file.file.path)?.len() - last.2; } - let iter = DeltaKeyIter { - all_keys, - next_idx: 0, - }; - - Ok(iter) + Ok(all_keys) } } -#[cfg(test)] -mod test { - use super::DeltaKeyIter; - use super::DeltaLayer; - use super::DeltaValueIter; +/// Reference to an on-disk value +pub struct ValueRef { + blob_ref: BlobRef, + reader: BlockCursor, +} - // We will soon need the iters to be send in the compaction code. - // Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883 - // Cf https://github.com/neondatabase/neon/issues/4471 - #[test] - fn is_send() { - fn assert_send() {} - assert_send::(); - assert_send::(); - assert_send::(); +impl ValueRef { + /// Loads the value from disk + pub fn load(&self) -> Result { + let buf = self.reader.read_blob(self.blob_ref.pos())?; + let val = Value::des(&buf)?; + Ok(val) + } +} + +struct Adapter(Arc); + +impl BlockReader for Adapter { + type BlockLease = PageReadGuard<'static>; + + fn read_blk(&self, blknum: u32) -> Result { + self.0.file.read_blk(blknum) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a5b9d8834e..f3aaed61b9 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -57,9 +57,7 @@ use utils::{ }; use super::filename::ImageFileName; -use super::{ - AsLayerDesc, Layer, LayerAccessStatsReset, LayerIter, PathOrConf, PersistentLayerDesc, -}; +use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLayerDesc}; /// /// Header stored in the beginning of the file @@ -255,10 +253,6 @@ impl PersistentLayer for ImageLayer { Some(self.path()) } - fn iter(&self, _ctx: &RequestContext) -> Result> { - unimplemented!(); - } - fn delete_resident_layer_file(&self) -> Result<()> { // delete underlying file fs::remove_file(self.path())?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 31d0b5997a..3d222fcb1e 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -151,7 +151,7 @@ impl Layer for InMemoryLayer { return Ok(()); } - let mut cursor = inner.file.block_cursor(); + let cursor = inner.file.block_cursor(); let mut buf = Vec::new(); for (key, vec_map) in inner.index.iter() { for (lsn, pos) in vec_map.as_slice() { @@ -196,7 +196,7 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); - let mut reader = inner.file.block_cursor(); + let reader = inner.file.block_cursor(); // Scan the page versions backwards, starting from `lsn`. if let Some(vec_map) = inner.index.get(&key) { @@ -354,7 +354,7 @@ impl InMemoryLayer { let mut buf = Vec::new(); - let mut cursor = inner.file.block_cursor(); + let cursor = inner.file.block_cursor(); let mut keys: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); keys.sort_by_key(|k| k.0); diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index e5511d6051..36a6593779 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -20,8 +20,8 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName}; use super::{ - AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, - LayerKeyIter, LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, + AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, + LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, }; /// RemoteLayer is a not yet downloaded [`ImageLayer`] or @@ -129,14 +129,6 @@ impl PersistentLayer for RemoteLayer { None } - fn iter(&self, _ctx: &RequestContext) -> Result> { - bail!("cannot iterate a remote layer"); - } - - fn key_iter(&self, _ctx: &RequestContext) -> Result> { - bail!("cannot iterate a remote layer"); - } - fn delete_resident_layer_file(&self) -> Result<()> { bail!("remote layer has no layer file"); } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 34211fb714..628865ea2b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3513,7 +3513,13 @@ impl Timeline { let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); let mut prev: Option = None; for (next_key, _next_lsn, _size) in itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + deltas_to_compact.iter().map(|l| -> Result<_> { + Ok(l.clone() + .downcast_delta_layer() + .expect("delta layer") + .load_keys(ctx)? + .into_iter()) + }), |iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0), )? { if let Some(prev_key) = prev { @@ -3549,25 +3555,31 @@ impl Timeline { // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. let all_values_iter = itertools::process_results( - deltas_to_compact.iter().map(|l| l.iter(ctx)), + deltas_to_compact.iter().map(|l| -> Result<_> { + Ok(l.clone() + .downcast_delta_layer() + .expect("delta layer") + .load_val_refs(ctx)? + .into_iter()) + }), |iter_iter| { iter_iter.kmerge_by(|a, b| { - if let Ok((a_key, a_lsn, _)) = a { - if let Ok((b_key, b_lsn, _)) = b { - (a_key, a_lsn) < (b_key, b_lsn) - } else { - false - } - } else { - true - } + let (a_key, a_lsn, _) = a; + let (b_key, b_lsn, _) = b; + (a_key, a_lsn) < (b_key, b_lsn) }) }, )?; // This iterator walks through all keys and is needed to calculate size used by each key let mut all_keys_iter = itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + deltas_to_compact.iter().map(|l| -> Result<_> { + Ok(l.clone() + .downcast_delta_layer() + .expect("delta layer") + .load_keys(ctx)? + .into_iter()) + }), |iter_iter| { iter_iter.kmerge_by(|a, b| { let (a_key, a_lsn, _) = a; @@ -3629,8 +3641,8 @@ impl Timeline { let mut key_values_total_size = 0u64; let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key - for x in all_values_iter { - let (key, lsn, value) = x?; + for (key, lsn, value_ref) in all_values_iter { + let value = value_ref.load()?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn {