diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index d81303d533..74613eb5f4 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -68,7 +68,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { true }, )?; - let mut cursor = BlockCursor::new(&file); + let cursor = BlockCursor::new(&file); for (k, v) in all { let value = cursor.read_blob(v.pos())?; println!("key:{} value_len:{}", k, value.len()); diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 4dcf7fe5fe..7dd53407e7 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -21,7 +21,7 @@ where R: BlockReader, { /// Read a blob into a new buffer. - pub fn read_blob(&mut self, offset: u64) -> Result, std::io::Error> { + pub fn read_blob(&self, offset: u64) -> Result, std::io::Error> { let mut buf = Vec::new(); self.read_blob_into_buf(offset, &mut buf)?; Ok(buf) @@ -29,7 +29,7 @@ where /// Read blob into the given buffer. Any previous contents in the buffer /// are overwritten. pub fn read_blob_into_buf( - &mut self, + &self, offset: u64, dstbuf: &mut Vec, ) -> Result<(), std::io::Error> { diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 10de34e3f6..e6ebebe594 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -80,7 +80,7 @@ where BlockCursor { reader } } - pub fn read_blk(&mut self, blknum: u32) -> Result { + pub fn read_blk(&self, blknum: u32) -> Result { self.reader.read_blk(blknum) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index f44534249c..8d42150824 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -420,7 +420,7 @@ mod tests { blobs.push((pos, data)); } - let mut cursor = BlockCursor::new(&file); + let cursor = BlockCursor::new(&file); for (pos, expected) in blobs { let actual = cursor.read_blob(pos)?; assert_eq!(actual, expected); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 05381cb56d..b4db7e2f08 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -9,7 +9,7 @@ mod remote_layer; use crate::config::PageServerConf; use crate::context::RequestContext; -use crate::repository::{Key, Value}; +use crate::repository::Key; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use anyhow::Result; @@ -34,7 +34,7 @@ use utils::{ lsn::Lsn, }; -pub use delta_layer::{DeltaLayer, DeltaLayerWriter}; +pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef}; pub use filename::{DeltaFileName, ImageFileName, LayerFileName}; pub use image_layer::{ImageLayer, ImageLayerWriter}; pub use inmemory_layer::InMemoryLayer; @@ -381,12 +381,6 @@ pub trait Layer: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static { async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()>; } -/// Returned by [`PersistentLayer::iter`] -pub type LayerIter<'i> = Box> + 'i + Send>; - -/// Returned by [`PersistentLayer::key_iter`] -pub type LayerKeyIter<'i> = Box + 'i + Send>; - /// Get a layer descriptor from a layer. pub trait AsLayerDesc { /// Get the layer descriptor. @@ -427,15 +421,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc { // `None` for `RemoteLayer`. fn local_path(&self) -> Option; - /// Iterate through all keys and values stored in the layer - fn iter(&self, ctx: &RequestContext) -> Result>; - - /// Iterate through all keys stored in the layer. Returns key, lsn and value size - /// It is used only for compaction and so is currently implemented only for DeltaLayer - fn key_iter(&self, _ctx: &RequestContext) -> Result> { - panic!("Not implemented") - } - /// Permanently remove this layer from disk. fn delete_resident_layer_file(&self) -> Result<()>; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 7574554b4e..98dc1230f5 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -61,8 +61,8 @@ use utils::{ }; use super::{ - AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerIter, - LayerKeyIter, PathOrConf, PersistentLayerDesc, + AsLayerDesc, DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, PathOrConf, + PersistentLayerDesc, }; /// @@ -189,7 +189,7 @@ pub struct DeltaLayer { access_stats: LayerAccessStats, - inner: OnceCell, + inner: OnceCell>, } impl std::fmt::Debug for DeltaLayer { @@ -258,10 +258,10 @@ impl Layer for DeltaLayer { tree_reader.dump().await?; - let mut cursor = file.block_cursor(); + let cursor = file.block_cursor(); // A subroutine to dump a single blob - let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result { + let dump_blob = |blob_ref: BlobRef| -> anyhow::Result { let buf = cursor.read_blob(blob_ref.pos())?; let val = Value::des(&buf)?; let desc = match val { @@ -343,7 +343,7 @@ impl Layer for DeltaLayer { })?; // Ok, 'offsets' now contains the offsets of all the entries we need to read - let mut cursor = file.block_cursor(); + let cursor = file.block_cursor(); let mut buf = Vec::new(); for (entry_lsn, pos) in offsets { cursor.read_blob_into_buf(pos, &mut buf).with_context(|| { @@ -424,23 +424,6 @@ impl PersistentLayer for DeltaLayer { Some(self.path()) } - fn iter(&self, ctx: &RequestContext) -> Result> { - let inner = self - .load(LayerAccessKind::KeyIter, ctx) - .context("load delta layer")?; - Ok(match DeltaValueIter::new(inner) { - Ok(iter) => Box::new(iter), - Err(err) => Box::new(std::iter::once(Err(err))), - }) - } - - fn key_iter(&self, ctx: &RequestContext) -> Result> { - let inner = self.load(LayerAccessKind::KeyIter, ctx)?; - Ok(Box::new( - DeltaKeyIter::new(inner).context("Layer index is corrupted")?, - )) - } - fn delete_resident_layer_file(&self) -> Result<()> { // delete underlying file fs::remove_file(self.path())?; @@ -510,7 +493,11 @@ impl DeltaLayer { /// Open the underlying file and read the metadata into memory, if it's /// not loaded already. /// - fn load(&self, access_kind: LayerAccessKind, ctx: &RequestContext) -> Result<&DeltaLayerInner> { + fn load( + &self, + access_kind: LayerAccessKind, + ctx: &RequestContext, + ) -> Result<&Arc> { self.access_stats .record_access(access_kind, ctx.task_kind()); // Quick exit if already loaded @@ -519,7 +506,7 @@ impl DeltaLayer { .with_context(|| format!("Failed to load delta layer {}", self.path().display())) } - fn load_inner(&self) -> Result { + fn load_inner(&self) -> Result> { let path = self.path(); let file = VirtualFile::open(&path) @@ -554,11 +541,11 @@ impl DeltaLayer { debug!("loaded from {}", &path.display()); - Ok(DeltaLayerInner { + Ok(Arc::new(DeltaLayerInner { file, index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, - }) + })) } /// Create a DeltaLayer struct representing an existing file on disk. @@ -623,6 +610,24 @@ impl DeltaLayer { &self.layer_name(), ) } + + /// Obtains all keys and value references stored in the layer + /// + /// The value can be obtained via the [`ValueRef::load`] function. + pub fn load_val_refs(&self, ctx: &RequestContext) -> Result> { + let inner = self + .load(LayerAccessKind::KeyIter, ctx) + .context("load delta layer")?; + DeltaLayerInner::load_val_refs(inner).context("Layer index is corrupted") + } + + /// Loads all keys stored in the layer. Returns key, lsn and value size. + pub fn load_keys(&self, ctx: &RequestContext) -> Result> { + let inner = self + .load(LayerAccessKind::KeyIter, ctx) + .context("load delta layer keys")?; + inner.load_keys().context("Layer index is corrupted") + } } /// A builder object for constructing a new delta layer. @@ -893,121 +898,41 @@ impl Drop for DeltaLayerWriter { } } -/// -/// Iterator over all key-value pairse stored in a delta layer -/// -/// FIXME: This creates a Vector to hold the offsets of all key value pairs. -/// That takes up quite a lot of memory. Should do this in a more streaming -/// fashion. -/// -struct DeltaValueIter<'a> { - all_offsets: Vec<(DeltaKey, BlobRef)>, - next_idx: usize, - reader: BlockCursor>, -} - -struct Adapter<'a>(&'a DeltaLayerInner); - -impl<'a> BlockReader for Adapter<'a> { - type BlockLease = PageReadGuard<'static>; - - fn read_blk(&self, blknum: u32) -> Result { - self.0.file.read_blk(blknum) - } -} - -impl<'a> Iterator for DeltaValueIter<'a> { - type Item = Result<(Key, Lsn, Value)>; - - fn next(&mut self) -> Option { - self.next_res().transpose() - } -} - -impl<'a> DeltaValueIter<'a> { - fn new(inner: &'a DeltaLayerInner) -> Result { - let file = &inner.file; +impl DeltaLayerInner { + fn load_val_refs(this: &Arc) -> Result> { + let file = &this.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, + this.index_start_blk, + this.index_root_blk, file, ); - let mut all_offsets: Vec<(DeltaKey, BlobRef)> = Vec::new(); + let mut all_offsets = Vec::<(Key, Lsn, ValueRef)>::new(); tree_reader.visit( &[0u8; DELTA_KEY_SIZE], VisitDirection::Forwards, |key, value| { - all_offsets.push((DeltaKey::from_slice(key), BlobRef(value))); + let delta_key = DeltaKey::from_slice(key); + let val_ref = ValueRef { + blob_ref: BlobRef(value), + reader: BlockCursor::new(Adapter(this.clone())), + }; + all_offsets.push((delta_key.key(), delta_key.lsn(), val_ref)); true }, )?; - let iter = DeltaValueIter { - all_offsets, - next_idx: 0, - reader: BlockCursor::new(Adapter(inner)), - }; - - Ok(iter) + Ok(all_offsets) } - - fn next_res(&mut self) -> Result> { - if self.next_idx < self.all_offsets.len() { - let (delta_key, blob_ref) = &self.all_offsets[self.next_idx]; - - let key = delta_key.key(); - let lsn = delta_key.lsn(); - - let buf = self.reader.read_blob(blob_ref.pos())?; - let val = Value::des(&buf)?; - self.next_idx += 1; - Ok(Some((key, lsn, val))) - } else { - Ok(None) - } - } -} -/// -/// Iterator over all keys stored in a delta layer -/// -/// FIXME: This creates a Vector to hold all keys. -/// That takes up quite a lot of memory. Should do this in a more streaming -/// fashion. -/// -struct DeltaKeyIter { - all_keys: Vec<(DeltaKey, u64)>, - next_idx: usize, -} - -impl Iterator for DeltaKeyIter { - type Item = (Key, Lsn, u64); - - fn next(&mut self) -> Option { - if self.next_idx < self.all_keys.len() { - let (delta_key, size) = &self.all_keys[self.next_idx]; - - let key = delta_key.key(); - let lsn = delta_key.lsn(); - - self.next_idx += 1; - Some((key, lsn, *size)) - } else { - None - } - } -} - -impl<'a> DeltaKeyIter { - fn new(inner: &'a DeltaLayerInner) -> Result { - let file = &inner.file; + fn load_keys(&self) -> Result> { + let file = &self.file; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, + self.index_start_blk, + self.index_root_blk, file, ); - let mut all_keys: Vec<(DeltaKey, u64)> = Vec::new(); + let mut all_keys: Vec<(Key, Lsn, u64)> = Vec::new(); tree_reader.visit( &[0u8; DELTA_KEY_SIZE], VisitDirection::Forwards, @@ -1015,46 +940,48 @@ impl<'a> DeltaKeyIter { let delta_key = DeltaKey::from_slice(key); let pos = BlobRef(value).pos(); if let Some(last) = all_keys.last_mut() { - if last.0.key() == delta_key.key() { + if last.0 == delta_key.key() { return true; } else { // subtract offset of new key BLOB and first blob of this key // to get total size if values associated with this key - let first_pos = last.1; - last.1 = pos - first_pos; + let first_pos = last.2; + last.2 = pos - first_pos; } } - all_keys.push((delta_key, pos)); + all_keys.push((delta_key.key(), delta_key.lsn(), pos)); true }, )?; if let Some(last) = all_keys.last_mut() { // Last key occupies all space till end of layer - last.1 = std::fs::metadata(&file.file.path)?.len() - last.1; + last.2 = std::fs::metadata(&file.file.path)?.len() - last.2; } - let iter = DeltaKeyIter { - all_keys, - next_idx: 0, - }; - - Ok(iter) + Ok(all_keys) } } -#[cfg(test)] -mod test { - use super::DeltaKeyIter; - use super::DeltaLayer; - use super::DeltaValueIter; +/// Reference to an on-disk value +pub struct ValueRef { + blob_ref: BlobRef, + reader: BlockCursor, +} - // We will soon need the iters to be send in the compaction code. - // Cf https://github.com/neondatabase/neon/pull/4462#issuecomment-1587398883 - // Cf https://github.com/neondatabase/neon/issues/4471 - #[test] - fn is_send() { - fn assert_send() {} - assert_send::(); - assert_send::(); - assert_send::(); +impl ValueRef { + /// Loads the value from disk + pub fn load(&self) -> Result { + let buf = self.reader.read_blob(self.blob_ref.pos())?; + let val = Value::des(&buf)?; + Ok(val) + } +} + +struct Adapter(Arc); + +impl BlockReader for Adapter { + type BlockLease = PageReadGuard<'static>; + + fn read_blk(&self, blknum: u32) -> Result { + self.0.file.read_blk(blknum) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index a5b9d8834e..f3aaed61b9 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -57,9 +57,7 @@ use utils::{ }; use super::filename::ImageFileName; -use super::{ - AsLayerDesc, Layer, LayerAccessStatsReset, LayerIter, PathOrConf, PersistentLayerDesc, -}; +use super::{AsLayerDesc, Layer, LayerAccessStatsReset, PathOrConf, PersistentLayerDesc}; /// /// Header stored in the beginning of the file @@ -255,10 +253,6 @@ impl PersistentLayer for ImageLayer { Some(self.path()) } - fn iter(&self, _ctx: &RequestContext) -> Result> { - unimplemented!(); - } - fn delete_resident_layer_file(&self) -> Result<()> { // delete underlying file fs::remove_file(self.path())?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 31d0b5997a..3d222fcb1e 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -151,7 +151,7 @@ impl Layer for InMemoryLayer { return Ok(()); } - let mut cursor = inner.file.block_cursor(); + let cursor = inner.file.block_cursor(); let mut buf = Vec::new(); for (key, vec_map) in inner.index.iter() { for (lsn, pos) in vec_map.as_slice() { @@ -196,7 +196,7 @@ impl Layer for InMemoryLayer { let inner = self.inner.read().unwrap(); - let mut reader = inner.file.block_cursor(); + let reader = inner.file.block_cursor(); // Scan the page versions backwards, starting from `lsn`. if let Some(vec_map) = inner.index.get(&key) { @@ -354,7 +354,7 @@ impl InMemoryLayer { let mut buf = Vec::new(); - let mut cursor = inner.file.block_cursor(); + let cursor = inner.file.block_cursor(); let mut keys: Vec<(&Key, &VecMap)> = inner.index.iter().collect(); keys.sort_by_key(|k| k.0); diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index e5511d6051..36a6593779 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -20,8 +20,8 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName}; use super::{ - AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, - LayerKeyIter, LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, + AsLayerDesc, DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, + LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, }; /// RemoteLayer is a not yet downloaded [`ImageLayer`] or @@ -129,14 +129,6 @@ impl PersistentLayer for RemoteLayer { None } - fn iter(&self, _ctx: &RequestContext) -> Result> { - bail!("cannot iterate a remote layer"); - } - - fn key_iter(&self, _ctx: &RequestContext) -> Result> { - bail!("cannot iterate a remote layer"); - } - fn delete_resident_layer_file(&self) -> Result<()> { bail!("remote layer has no layer file"); } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 34211fb714..628865ea2b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3513,7 +3513,13 @@ impl Timeline { let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); let mut prev: Option = None; for (next_key, _next_lsn, _size) in itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + deltas_to_compact.iter().map(|l| -> Result<_> { + Ok(l.clone() + .downcast_delta_layer() + .expect("delta layer") + .load_keys(ctx)? + .into_iter()) + }), |iter_iter| iter_iter.kmerge_by(|a, b| a.0 < b.0), )? { if let Some(prev_key) = prev { @@ -3549,25 +3555,31 @@ impl Timeline { // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. let all_values_iter = itertools::process_results( - deltas_to_compact.iter().map(|l| l.iter(ctx)), + deltas_to_compact.iter().map(|l| -> Result<_> { + Ok(l.clone() + .downcast_delta_layer() + .expect("delta layer") + .load_val_refs(ctx)? + .into_iter()) + }), |iter_iter| { iter_iter.kmerge_by(|a, b| { - if let Ok((a_key, a_lsn, _)) = a { - if let Ok((b_key, b_lsn, _)) = b { - (a_key, a_lsn) < (b_key, b_lsn) - } else { - false - } - } else { - true - } + let (a_key, a_lsn, _) = a; + let (b_key, b_lsn, _) = b; + (a_key, a_lsn) < (b_key, b_lsn) }) }, )?; // This iterator walks through all keys and is needed to calculate size used by each key let mut all_keys_iter = itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + deltas_to_compact.iter().map(|l| -> Result<_> { + Ok(l.clone() + .downcast_delta_layer() + .expect("delta layer") + .load_keys(ctx)? + .into_iter()) + }), |iter_iter| { iter_iter.kmerge_by(|a, b| { let (a_key, a_lsn, _) = a; @@ -3629,8 +3641,8 @@ impl Timeline { let mut key_values_total_size = 0u64; let mut dup_start_lsn: Lsn = Lsn::INVALID; // start LSN of layer containing values of the single key let mut dup_end_lsn: Lsn = Lsn::INVALID; // end LSN of layer containing values of the single key - for x in all_values_iter { - let (key, lsn, value) = x?; + for (key, lsn, value_ref) in all_values_iter { + let value = value_ref.load()?; let same_key = prev_key.map_or(false, |prev_key| prev_key == key); // We need to check key boundaries once we reach next key or end of layer with the same key if !same_key || lsn == dup_end_lsn {