diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index d30d6c5c6e..a17855b940 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -22,6 +22,7 @@ use pageserver_api::models::{ }; use std::ops::Range; use std::path::PathBuf; +use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::warn; @@ -335,6 +336,13 @@ impl LayerAccessStats { } } +pub(crate) type GetValueReconstructFuture = Pin< + Box< + dyn Send + + std::future::Future>, + >, +>; + /// Supertrait of the [`Layer`] trait that captures the bare minimum interface /// required by [`LayerMap`]. /// @@ -372,12 +380,12 @@ pub trait Layer: std::fmt::Debug + Send + Sync { /// the predecessor layer and call again with the same 'reconstruct_data' to /// collect more data. fn get_value_reconstruct_data( - &self, + self: Arc, key: Key, lsn_range: Range, - reconstruct_data: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> Result; + reconstruct_data: ValueReconstructState, + ctx: RequestContext, + ) -> GetValueReconstructFuture; /// A short ID string that uniquely identifies the given layer within a [`LayerMap`]. fn short_id(&self) -> String; @@ -486,12 +494,12 @@ impl Layer for LayerDescriptor { } fn get_value_reconstruct_data( - &self, + self: Arc, _key: Key, _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { + _reconstruct_data: ValueReconstructState, + _ctx: RequestContext, + ) -> GetValueReconstructFuture { todo!("This method shouldn't be part of the Layer trait") } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index ba3ab6dd4c..04c622e31c 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -46,7 +46,7 @@ use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; -use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use tracing::*; use utils::{ @@ -56,8 +56,8 @@ use utils::{ }; use super::{ - DeltaFileName, Layer, LayerAccessStats, LayerAccessStatsReset, LayerFileName, LayerIter, - LayerKeyIter, PathOrConf, + DeltaFileName, GetValueReconstructFuture, Layer, LayerAccessStats, LayerAccessStatsReset, + LayerFileName, LayerIter, LayerKeyIter, PathOrConf, }; /// @@ -318,89 +318,91 @@ impl Layer for DeltaLayer { } fn get_value_reconstruct_data( - &self, + self: Arc, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { - ensure!(lsn_range.start >= self.lsn_range.start); - let mut need_image = true; + mut reconstruct_state: ValueReconstructState, + ctx: RequestContext, + ) -> GetValueReconstructFuture { + Box::pin(async move { + ensure!(lsn_range.start >= self.lsn_range.start); + let mut need_image = true; - ensure!(self.key_range.contains(&key)); + ensure!(self.key_range.contains(&key)); - { - // Open the file and lock the metadata in memory - let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; + { + // Open the file and lock the metadata in memory + let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?; - // Scan the page versions backwards, starting from `lsn`. - let file = inner.file.as_ref().unwrap(); - let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( - inner.index_start_blk, - inner.index_root_blk, - file, - ); - let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1)); + // Scan the page versions backwards, starting from `lsn`. + let file = inner.file.as_ref().unwrap(); + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + inner.index_start_blk, + inner.index_root_blk, + file, + ); + let search_key = DeltaKey::from_key_lsn(&key, Lsn(lsn_range.end.0 - 1)); - let mut offsets: Vec<(Lsn, u64)> = Vec::new(); + let mut offsets: Vec<(Lsn, u64)> = Vec::new(); - tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| { - let blob_ref = BlobRef(value); - if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] { - return false; - } - let entry_lsn = DeltaKey::extract_lsn_from_buf(key); - if entry_lsn < lsn_range.start { - return false; - } - offsets.push((entry_lsn, blob_ref.pos())); - - !blob_ref.will_init() - })?; - - // Ok, 'offsets' now contains the offsets of all the entries we need to read - let mut cursor = file.block_cursor(); - let mut buf = Vec::new(); - for (entry_lsn, pos) in offsets { - cursor.read_blob_into_buf(pos, &mut buf).with_context(|| { - format!( - "Failed to read blob from virtual file {}", - file.file.path.display() - ) - })?; - let val = Value::des(&buf).with_context(|| { - format!( - "Failed to deserialize file blob from virtual file {}", - file.file.path.display() - ) - })?; - match val { - Value::Image(img) => { - reconstruct_state.img = Some((entry_lsn, img)); - need_image = false; - break; + tree_reader.visit(&search_key.0, VisitDirection::Backwards, |key, value| { + let blob_ref = BlobRef(value); + if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] { + return false; } - Value::WalRecord(rec) => { - let will_init = rec.will_init(); - reconstruct_state.records.push((entry_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back + let entry_lsn = DeltaKey::extract_lsn_from_buf(key); + if entry_lsn < lsn_range.start { + return false; + } + offsets.push((entry_lsn, blob_ref.pos())); + + !blob_ref.will_init() + })?; + + // Ok, 'offsets' now contains the offsets of all the entries we need to read + let mut cursor = file.block_cursor(); + let mut buf = Vec::new(); + for (entry_lsn, pos) in offsets { + cursor.read_blob_into_buf(pos, &mut buf).with_context(|| { + format!( + "Failed to read blob from virtual file {}", + file.file.path.display() + ) + })?; + let val = Value::des(&buf).with_context(|| { + format!( + "Failed to deserialize file blob from virtual file {}", + file.file.path.display() + ) + })?; + match val { + Value::Image(img) => { + reconstruct_state.img = Some((entry_lsn, img)); need_image = false; break; } + Value::WalRecord(rec) => { + let will_init = rec.will_init(); + reconstruct_state.records.push((entry_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } + } } } + // release metadata lock and close the file } - // release metadata lock and close the file - } - // If an older page image is needed to reconstruct the page, let the - // caller know. - if need_image { - Ok(ValueReconstructResult::Continue) - } else { - Ok(ValueReconstructResult::Complete) - } + // If an older page image is needed to reconstruct the page, let the + // caller know. + if need_image { + Ok((reconstruct_state, ValueReconstructResult::Continue)) + } else { + Ok((reconstruct_state, ValueReconstructResult::Complete)) + } + }) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index d298b3e852..ce47523ffe 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -43,7 +43,7 @@ use std::io::{Seek, SeekFrom}; use std::ops::Range; use std::os::unix::prelude::FileExt; use std::path::{Path, PathBuf}; -use std::sync::{RwLock, RwLockReadGuard}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use tracing::*; use utils::{ @@ -53,7 +53,7 @@ use utils::{ }; use super::filename::{ImageFileName, LayerFileName}; -use super::{Layer, LayerAccessStatsReset, LayerIter, PathOrConf}; +use super::{GetValueReconstructFuture, Layer, LayerAccessStatsReset, LayerIter, PathOrConf}; /// /// Header stored in the beginning of the file @@ -197,38 +197,41 @@ impl Layer for ImageLayer { /// Look up given page in the file fn get_value_reconstruct_data( - &self, + self: Arc, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { - assert!(self.key_range.contains(&key)); - assert!(lsn_range.start >= self.lsn); - assert!(lsn_range.end >= self.lsn); + mut reconstruct_state: ValueReconstructState, + ctx: RequestContext, + ) -> GetValueReconstructFuture { + Box::pin(async move { + assert!(self.key_range.contains(&key)); + assert!(lsn_range.start >= self.lsn); + assert!(lsn_range.end >= self.lsn); - let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?; - let file = inner.file.as_ref().unwrap(); - let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); + let file = inner.file.as_ref().unwrap(); + let tree_reader = + DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); - let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; - key.write_to_byte_slice(&mut keybuf); - if let Some(offset) = tree_reader.get(&keybuf)? { - let blob = file.block_cursor().read_blob(offset).with_context(|| { - format!( - "failed to read value from data file {} at offset {}", - self.path().display(), - offset - ) - })?; - let value = Bytes::from(blob); + let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; + key.write_to_byte_slice(&mut keybuf); + if let Some(offset) = tree_reader.get(&keybuf)? { + let blob = file.block_cursor().read_blob(offset).with_context(|| { + format!( + "failed to read value from data file {} at offset {}", + self.path().display(), + offset + ) + })?; + let value = Bytes::from(blob); - reconstruct_state.img = Some((self.lsn, value)); - Ok(ValueReconstructResult::Complete) - } else { - Ok(ValueReconstructResult::Missing) - } + reconstruct_state.img = Some((self.lsn, value)); + Ok((reconstruct_state, ValueReconstructResult::Complete)) + } else { + Ok((reconstruct_state, ValueReconstructResult::Missing)) + } + }) } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..7aefb6b313 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -27,9 +27,9 @@ use utils::{ // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::ops::Range; -use std::sync::RwLock; +use std::sync::{Arc, RwLock}; -use super::{DeltaLayer, DeltaLayerWriter, Layer}; +use super::{DeltaLayer, DeltaLayerWriter, GetValueReconstructFuture, Layer}; thread_local! { /// A buffer for serializing object during [`InMemoryLayer::put_value`]. @@ -191,52 +191,54 @@ impl Layer for InMemoryLayer { /// Look up given value in the layer. fn get_value_reconstruct_data( - &self, + self: Arc, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> anyhow::Result { - ensure!(lsn_range.start >= self.start_lsn); - let mut need_image = true; + mut reconstruct_state: ValueReconstructState, + _ctx: RequestContext, + ) -> GetValueReconstructFuture { + Box::pin(async move { + ensure!(lsn_range.start >= self.start_lsn); + let mut need_image = true; - let inner = self.inner.read().unwrap(); + let inner = self.inner.read().unwrap(); - let mut reader = inner.file.block_cursor(); + let mut reader = inner.file.block_cursor(); - // Scan the page versions backwards, starting from `lsn`. - if let Some(vec_map) = inner.index.get(&key) { - let slice = vec_map.slice_range(lsn_range); - for (entry_lsn, pos) in slice.iter().rev() { - let buf = reader.read_blob(*pos)?; - let value = Value::des(&buf)?; - match value { - Value::Image(img) => { - reconstruct_state.img = Some((*entry_lsn, img)); - return Ok(ValueReconstructResult::Complete); - } - Value::WalRecord(rec) => { - let will_init = rec.will_init(); - reconstruct_state.records.push((*entry_lsn, rec)); - if will_init { - // This WAL record initializes the page, so no need to go further back - need_image = false; - break; + // Scan the page versions backwards, starting from `lsn`. + if let Some(vec_map) = inner.index.get(&key) { + let slice = vec_map.slice_range(lsn_range); + for (entry_lsn, pos) in slice.iter().rev() { + let buf = reader.read_blob(*pos)?; + let value = Value::des(&buf)?; + match value { + Value::Image(img) => { + reconstruct_state.img = Some((*entry_lsn, img)); + return Ok((reconstruct_state, ValueReconstructResult::Complete)); + } + Value::WalRecord(rec) => { + let will_init = rec.will_init(); + reconstruct_state.records.push((*entry_lsn, rec)); + if will_init { + // This WAL record initializes the page, so no need to go further back + need_image = false; + break; + } } } } } - } - // release lock on 'inner' + // release lock on 'inner' - // If an older page image is needed to reconstruct the page, let the - // caller know. - if need_image { - Ok(ValueReconstructResult::Continue) - } else { - Ok(ValueReconstructResult::Complete) - } + // If an older page image is needed to reconstruct the page, let the + // caller know. + if need_image { + Ok((reconstruct_state, ValueReconstructResult::Continue)) + } else { + Ok((reconstruct_state, ValueReconstructResult::Complete)) + } + }) } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 2106587ab2..a5d1c173e5 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -6,7 +6,7 @@ use crate::context::RequestContext; use crate::repository::Key; use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{Layer, ValueReconstructState}; use anyhow::{bail, Result}; use pageserver_api::models::HistoricLayerInfo; use std::ops::Range; @@ -21,8 +21,8 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName, LayerFileName}; use super::image_layer::ImageLayer; use super::{ - DeltaLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter, - LayerResidenceStatus, PersistentLayer, + DeltaLayer, GetValueReconstructFuture, LayerAccessStats, LayerAccessStatsReset, LayerIter, + LayerKeyIter, LayerResidenceStatus, PersistentLayer, }; /// RemoteLayer is a not yet downloaded [`ImageLayer`] or @@ -83,16 +83,18 @@ impl Layer for RemoteLayer { } fn get_value_reconstruct_data( - &self, + self: Arc, _key: Key, _lsn_range: Range, - _reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { - bail!( - "layer {} needs to be downloaded", - self.filename().file_name() - ); + _reconstruct_state: ValueReconstructState, + _ctx: RequestContext, + ) -> GetValueReconstructFuture { + Box::pin(async move { + bail!( + "layer {} needs to be downloaded", + self.filename().file_name() + ); + }) } fn is_incremental(&self) -> bool { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ae8acc6c97..62b0cd4d44 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -504,12 +504,13 @@ impl Timeline { None => None, }; - let mut reconstruct_state = ValueReconstructState { + let reconstruct_state = ValueReconstructState { records: Vec::new(), img: cached_page_img, }; - self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) + let reconstruct_state = self + .get_reconstruct_data(key, lsn, reconstruct_state, ctx) .await?; self.metrics @@ -2146,9 +2147,9 @@ impl Timeline { &self, key: Key, request_lsn: Lsn, - reconstruct_state: &mut ValueReconstructState, + mut reconstruct_state: ValueReconstructState, ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { + ) -> Result { // Start from the current timeline. let mut timeline_owned; let mut timeline = self; @@ -2175,12 +2176,12 @@ impl Timeline { // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); match result { - ValueReconstructResult::Complete => return Ok(()), + ValueReconstructResult::Complete => return Ok(reconstruct_state), ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { self.metrics.materialized_page_cache_hit_counter.inc_by(1); - return Ok(()); + return Ok(reconstruct_state); } if prev_lsn <= cont_lsn { // Didn't make any progress in last iteration. Error out to avoid @@ -2237,13 +2238,19 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match open_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(open_layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; @@ -2263,13 +2270,19 @@ impl Timeline { if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match frozen_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(frozen_layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; @@ -2297,13 +2310,19 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, lsn_floor); - result = match layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(&layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor;