From e6c04119c45f92547893fc88e6f069cb26d51098 Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Thu, 8 Feb 2024 20:41:13 +0000 Subject: [PATCH] review: address round one of Christian's comments --- pageserver/src/tenant/storage_layer.rs | 30 ++++++++++------- .../src/tenant/storage_layer/delta_layer.rs | 8 +++-- .../tenant/storage_layer/inmemory_layer.rs | 9 +++-- pageserver/src/tenant/timeline.rs | 33 +++++++++++-------- 4 files changed, 49 insertions(+), 31 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index f9223ee85c..6992fbf426 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -91,11 +91,15 @@ pub(crate) struct VectoredValueReconstructState { pub(crate) records: Vec<(Lsn, NeonWalRecord)>, pub(crate) img: Option<(Lsn, Bytes)>, - // TODO: can probably get rid of cached_lsn and use img.0 - cached_lsn: Option, situation: ValueReconstructSituation, } +impl VectoredValueReconstructState { + fn get_cached_lsn(&self) -> Option { + self.img.as_ref().map(|img| img.0) + } +} + impl From for ValueReconstructState { fn from(mut state: VectoredValueReconstructState) -> Self { // walredo expects the records to be descending in terms of Lsn @@ -111,7 +115,6 @@ impl From for ValueReconstructState { /// Bag of data accumulated during a vectored get pub(crate) struct ValuesReconstructState { pub(crate) keys: HashMap>, - pub(crate) total_keys_done: usize, keys_done: KeySpaceRandomAccum, } @@ -120,7 +123,6 @@ impl ValuesReconstructState { pub(crate) fn new() -> Self { Self { keys: HashMap::new(), - total_keys_done: 0, keys_done: KeySpaceRandomAccum::new(), } } @@ -130,7 +132,6 @@ impl ValuesReconstructState { let previous = self.keys.insert(key, Err(err)); if let Some(Ok(state)) = previous { if state.situation == ValueReconstructSituation::Continue { - self.total_keys_done += 1; self.keys_done.add_key(key); } } @@ -140,7 +141,12 @@ impl ValuesReconstructState { /// Returns true if this was the last value needed for the key and false otherwise. /// /// If the key is done after the update, mark it as such. - pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, value: Value) -> bool { + pub(crate) fn update_key( + &mut self, + key: &Key, + lsn: Lsn, + value: Value, + ) -> ValueReconstructSituation { let state = self .keys .entry(*key) @@ -148,14 +154,15 @@ impl ValuesReconstructState { if let Ok(state) = state { let key_done = match state.situation { - ValueReconstructSituation::Complete => true, + ValueReconstructSituation::Complete => unreachable!(), ValueReconstructSituation::Continue => match value { Value::Image(img) => { state.img = Some((lsn, img)); true } Value::WalRecord(rec) => { - let reached_cache = state.cached_lsn.map(|clsn| clsn + 1) == Some(lsn); + let reached_cache = + state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn); let will_init = rec.will_init(); state.records.push((lsn, rec)); will_init || reached_cache @@ -165,13 +172,12 @@ impl ValuesReconstructState { if key_done && state.situation == ValueReconstructSituation::Continue { state.situation = ValueReconstructSituation::Complete; - self.total_keys_done += 1; self.keys_done.add_key(*key); } - key_done + state.situation } else { - true + ValueReconstructSituation::Complete } } @@ -181,7 +187,7 @@ impl ValuesReconstructState { self.keys .get(key) .and_then(|k| k.as_ref().ok()) - .and_then(|state| state.cached_lsn) + .and_then(|state| state.get_cached_lsn()) } /// Returns the key space describing the keys that have diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 28a169db38..0866d05696 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -63,7 +63,8 @@ use utils::{ }; use super::{ - AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState, + AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation, + ValuesReconstructState, }; /// @@ -946,7 +947,10 @@ impl DeltaLayerInner { break; } - reconstruct_state.update_key(&key, lsn, value.unwrap()); + let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + break; + } } } diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c5b929a372..af3c73740a 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -27,7 +27,10 @@ use std::fmt::Write as _; use std::ops::Range; use tokio::sync::{RwLock, RwLockWriteGuard}; -use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructState, ValuesReconstructState}; +use super::{ + DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState, + ValuesReconstructState, +}; pub struct InMemoryLayer { conf: &'static PageServerConf, @@ -281,9 +284,9 @@ impl InMemoryLayer { continue; } - let key_done = + let key_situation = reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap()); - if key_done { + if key_situation == ValueReconstructSituation::Complete { completed_keys.insert(block_read.key); } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d97875dd5c..849eac9989 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -828,7 +828,7 @@ impl Timeline { Ok(results) } - #[allow(unused)] + #[cfg(debug_assertions)] pub(super) async fn validate_get_vectored_impl( &self, vectored_res: &Result>, GetVectoredError>, @@ -2713,19 +2713,14 @@ impl Timeline { /// Get the data needed to reconstruct all keys in the provided keyspace /// - /// Maintain a fringe (LayerFringe) which tracks all the layers that intersect - /// the current keyspace. At each iteration pop the top of the fringe (the layer - /// with the highest Lsn) and get all the required reconstruct data from the layer - /// in one go. - /// - /// More granulary, the algorithm is as follows: - // 1. While some keys are still not done and there's a timeline to visit: - // 2. Visit the timeline: - // 2.1: Build the fringe for the current keyspace - // 2.2 Visit the newest layer from the fringe to collect all values for the range it - // intersects - // 2.3. Pop the timeline from the fringe - // 2.4. If the fringe is empty, go back to 1 + /// The algorithm is as follows: + /// 1. While some keys are still not done and there's a timeline to visit: + /// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_inner`]: + /// 2.1: Build the fringe for the current keyspace + /// 2.2 Visit the newest layer from the fringe to collect all values for the range it + /// intersects + /// 2.3. Pop the timeline from the fringe + /// 2.4. If the fringe is empty, go back to 1 async fn get_vectored_reconstruct_data( &self, mut keyspace: KeySpace, @@ -2773,6 +2768,16 @@ impl Timeline { Ok(()) } + /// Collect the reconstruct data for a ketspace from the specified timeline. + /// + /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect + /// the current keyspace. The current keyspace of the search at any given timeline + /// is the original keyspace minus all the keys that have been completed minus + /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly, + /// but if you merge all the keyspaces in the fringe, you get the "current keyspace". + /// + /// At each iteration pop the top of the fringe (the layer with the highest Lsn) + /// and get all the required reconstruct data from the layer in one go. async fn get_vectored_reconstruct_data_inner( &self, timeline: &Timeline,