diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 0af3d4ce39..06b90decd2 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -12,7 +12,7 @@ use crate::context::RequestContext; use crate::repository::{Key, Value}; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; -use anyhow::Result; +use anyhow::{Context, Result}; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; @@ -343,7 +343,8 @@ impl LayerAccessStats { /// All layers should implement a minimal `std::fmt::Debug` without tenant or /// timeline names, because those are known in the context of which the layers /// are used in (timeline). -pub trait Layer: std::fmt::Debug + Send + Sync { +#[async_trait::async_trait] +pub trait Layer: std::fmt::Debug + Send + Sync + 'static { /// Range of keys that this layer covers fn get_key_range(&self) -> Range; @@ -373,13 +374,42 @@ pub trait Layer: std::fmt::Debug + Send + Sync { /// is available. If this returns ValueReconstructResult::Continue, look up /// the predecessor layer and call again with the same 'reconstruct_data' to /// collect more data. - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_data: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> Result; + reconstruct_data: ValueReconstructState, + ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)>; + + /// CANCEL SAFETY: if the returned future is dropped, + /// the wrapped closure still run to completion and the return value discarded. + /// For the case of get_value_reconstruct_data, we expect the closure to not + /// have any side effects, as it only attempts to read a layer (and stuff like + /// page cache isn't considered a real side effect). + /// But, ... + /// TRACING: + /// If the returned future is cancelled, the spawn_blocking span can outlive + /// the caller's span. + /// So, technically, we should be using `parent: None` and `follows_from: current` + /// instead. However, in practice, the advantage of maintaining the span stack + /// in logs outweighs the disadvantage of having a dangling span in a case that + /// is not expected to happen because in pageserver we generally don't drop pending futures. + async fn get_value_reconstruct_data( + self: Arc, + key: Key, + lsn_range: Range, + reconstruct_data: ValueReconstructState, + ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)> { + let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking"); + tokio::task::spawn_blocking(move || { + let _enter = span.enter(); + self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx) + }) + .await + .context("spawn_blocking")? + } /// A short ID string that uniquely identifies the given layer within a [`LayerMap`]. fn short_id(&self) -> String; @@ -499,6 +529,7 @@ impl LayerDescriptor { } } +#[async_trait::async_trait] impl Layer for LayerDescriptor { fn get_key_range(&self) -> Range { self.key.clone() @@ -512,13 +543,13 @@ impl Layer for LayerDescriptor { self.is_incremental } - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, _key: Key, _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { + _reconstruct_data: ValueReconstructState, + _ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)> { todo!("This method shouldn't be part of the Layer trait") } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 6e14663121..cec7a09eff 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner { } } +#[async_trait::async_trait] impl Layer for DeltaLayer { /// debugging function to print out the contents of the layer fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { @@ -294,13 +295,13 @@ impl Layer for DeltaLayer { Ok(()) } - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { + mut reconstruct_state: ValueReconstructState, + ctx: RequestContext, + ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> { ensure!(lsn_range.start >= self.desc.lsn_range.start); let mut need_image = true; @@ -308,7 +309,7 @@ impl Layer for DeltaLayer { { // Open the file and lock the metadata in memory - let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?; // Scan the page versions backwards, starting from `lsn`. let file = &inner.file; @@ -374,9 +375,9 @@ impl Layer for DeltaLayer { // If an older page image is needed to reconstruct the page, let the // caller know. if need_image { - Ok(ValueReconstructResult::Continue) + Ok((reconstruct_state, ValueReconstructResult::Continue)) } else { - Ok(ValueReconstructResult::Complete) + Ok((reconstruct_state, ValueReconstructResult::Complete)) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 07a16a7de2..6019590db0 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner { } } +#[async_trait::async_trait] impl Layer for ImageLayer { /// debugging function to print out the contents of the layer fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> { @@ -181,18 +182,18 @@ impl Layer for ImageLayer { } /// Look up given page in the file - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - ctx: &RequestContext, - ) -> anyhow::Result { + mut reconstruct_state: ValueReconstructState, + ctx: RequestContext, + ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> { assert!(self.desc.key_range.contains(&key)); assert!(lsn_range.start >= self.lsn); assert!(lsn_range.end >= self.lsn); - let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?; + let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?; let file = inner.file.as_ref().unwrap(); let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file); @@ -210,9 +211,9 @@ impl Layer for ImageLayer { let value = Bytes::from(blob); reconstruct_state.img = Some((self.lsn, value)); - Ok(ValueReconstructResult::Complete) + Ok((reconstruct_state, ValueReconstructResult::Complete)) } else { - Ok(ValueReconstructResult::Missing) + Ok((reconstruct_state, ValueReconstructResult::Missing)) } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 78bcfdafc0..4efd032ba9 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -110,6 +110,7 @@ impl InMemoryLayer { } } +#[async_trait::async_trait] impl Layer for InMemoryLayer { fn get_key_range(&self) -> Range { Key::MIN..Key::MAX @@ -190,13 +191,13 @@ impl Layer for InMemoryLayer { } /// Look up given value in the layer. - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, key: Key, lsn_range: Range, - reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> anyhow::Result { + mut reconstruct_state: ValueReconstructState, + _ctx: RequestContext, + ) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> { ensure!(lsn_range.start >= self.start_lsn); let mut need_image = true; @@ -213,7 +214,7 @@ impl Layer for InMemoryLayer { match value { Value::Image(img) => { reconstruct_state.img = Some((*entry_lsn, img)); - return Ok(ValueReconstructResult::Complete); + return Ok((reconstruct_state, ValueReconstructResult::Complete)); } Value::WalRecord(rec) => { let will_init = rec.will_init(); @@ -233,9 +234,9 @@ impl Layer for InMemoryLayer { // If an older page image is needed to reconstruct the page, let the // caller know. if need_image { - Ok(ValueReconstructResult::Continue) + Ok((reconstruct_state, ValueReconstructResult::Continue)) } else { - Ok(ValueReconstructResult::Complete) + Ok((reconstruct_state, ValueReconstructResult::Complete)) } } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 387bae5b1f..a62334689a 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -6,7 +6,7 @@ use crate::context::RequestContext; use crate::repository::Key; use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; -use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::storage_layer::{Layer, ValueReconstructState}; use anyhow::{bail, Result}; use pageserver_api::models::HistoricLayerInfo; use std::ops::Range; @@ -21,7 +21,7 @@ use utils::{ use super::filename::{DeltaFileName, ImageFileName}; use super::{ DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter, - LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, + LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult, }; /// RemoteLayer is a not yet downloaded [`ImageLayer`] or @@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer { } } +#[async_trait::async_trait] impl Layer for RemoteLayer { - fn get_value_reconstruct_data( + fn get_value_reconstruct_data_blocking( &self, _key: Key, _lsn_range: Range, - _reconstruct_state: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { + _reconstruct_state: ValueReconstructState, + _ctx: RequestContext, + ) -> Result<(ValueReconstructState, ValueReconstructResult)> { bail!( "layer {} needs to be downloaded", self.filename().file_name() diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 060000a01a..447c09db76 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -555,13 +555,14 @@ impl Timeline { None => None, }; - let mut reconstruct_state = ValueReconstructState { + let reconstruct_state = ValueReconstructState { records: Vec::new(), img: cached_page_img, }; let timer = self.metrics.get_reconstruct_data_time_histo.start_timer(); - self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx) + let reconstruct_state = self + .get_reconstruct_data(key, lsn, reconstruct_state, ctx) .await?; timer.stop_and_record(); @@ -2352,9 +2353,9 @@ impl Timeline { &self, key: Key, request_lsn: Lsn, - reconstruct_state: &mut ValueReconstructState, + mut reconstruct_state: ValueReconstructState, ctx: &RequestContext, - ) -> Result<(), PageReconstructError> { + ) -> Result { // Start from the current timeline. let mut timeline_owned; let mut timeline = self; @@ -2384,12 +2385,12 @@ impl Timeline { // The function should have updated 'state' //info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn); match result { - ValueReconstructResult::Complete => return Ok(()), + ValueReconstructResult::Complete => return Ok(reconstruct_state), ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { MATERIALIZED_PAGE_CACHE_HIT.inc_by(1); - return Ok(()); + return Ok(reconstruct_state); } if prev_lsn <= cont_lsn { // Didn't make any progress in last iteration. Error out to avoid @@ -2493,13 +2494,19 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match open_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(open_layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; @@ -2520,13 +2527,19 @@ impl Timeline { if cont_lsn > start_lsn { //info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display()); let lsn_floor = max(cached_lsn + 1, start_lsn); - result = match frozen_layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(frozen_layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor; @@ -2555,13 +2568,19 @@ impl Timeline { // Get all the data needed to reconstruct the page version from this layer. // But if we have an older cached page image, no need to go past that. let lsn_floor = max(cached_lsn + 1, lsn_floor); - result = match layer.get_value_reconstruct_data( - key, - lsn_floor..cont_lsn, - reconstruct_state, - ctx, - ) { - Ok(result) => result, + result = match Arc::clone(&layer) + .get_value_reconstruct_data( + key, + lsn_floor..cont_lsn, + reconstruct_state, + ctx.attached_child(), + ) + .await + { + Ok((new_reconstruct_state, result)) => { + reconstruct_state = new_reconstruct_state; + result + } Err(e) => return Err(PageReconstructError::from(e)), }; cont_lsn = lsn_floor;