review: address round one of Christian's comments

This commit is contained in:
Vlad Lazar
2024-02-08 20:41:13 +00:00
parent 02f1b7905f
commit e6c04119c4
4 changed files with 49 additions and 31 deletions

View File

@@ -91,11 +91,15 @@ pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
// TODO: can probably get rid of cached_lsn and use img.0
cached_lsn: Option<Lsn>,
situation: ValueReconstructSituation,
}
impl VectoredValueReconstructState {
fn get_cached_lsn(&self) -> Option<Lsn> {
self.img.as_ref().map(|img| img.0)
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
@@ -111,7 +115,6 @@ impl From<VectoredValueReconstructState> for ValueReconstructState {
/// Bag of data accumulated during a vectored get
pub(crate) struct ValuesReconstructState {
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
pub(crate) total_keys_done: usize,
keys_done: KeySpaceRandomAccum,
}
@@ -120,7 +123,6 @@ impl ValuesReconstructState {
pub(crate) fn new() -> Self {
Self {
keys: HashMap::new(),
total_keys_done: 0,
keys_done: KeySpaceRandomAccum::new(),
}
}
@@ -130,7 +132,6 @@ impl ValuesReconstructState {
let previous = self.keys.insert(key, Err(err));
if let Some(Ok(state)) = previous {
if state.situation == ValueReconstructSituation::Continue {
self.total_keys_done += 1;
self.keys_done.add_key(key);
}
}
@@ -140,7 +141,12 @@ impl ValuesReconstructState {
/// Returns true if this was the last value needed for the key and false otherwise.
///
/// If the key is done after the update, mark it as such.
pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, value: Value) -> bool {
pub(crate) fn update_key(
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
) -> ValueReconstructSituation {
let state = self
.keys
.entry(*key)
@@ -148,14 +154,15 @@ impl ValuesReconstructState {
if let Ok(state) = state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => true,
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
let reached_cache = state.cached_lsn.map(|clsn| clsn + 1) == Some(lsn);
let reached_cache =
state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
@@ -165,13 +172,12 @@ impl ValuesReconstructState {
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.total_keys_done += 1;
self.keys_done.add_key(*key);
}
key_done
state.situation
} else {
true
ValueReconstructSituation::Complete
}
}
@@ -181,7 +187,7 @@ impl ValuesReconstructState {
self.keys
.get(key)
.and_then(|k| k.as_ref().ok())
.and_then(|state| state.cached_lsn)
.and_then(|state| state.get_cached_lsn())
}
/// Returns the key space describing the keys that have

View File

@@ -63,7 +63,8 @@ use utils::{
};
use super::{
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValuesReconstructState,
AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation,
ValuesReconstructState,
};
///
@@ -946,7 +947,10 @@ impl DeltaLayerInner {
break;
}
reconstruct_state.update_key(&key, lsn, value.unwrap());
let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
break;
}
}
}

View File

@@ -27,7 +27,10 @@ use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::{RwLock, RwLockWriteGuard};
use super::{DeltaLayerWriter, ResidentLayer, ValueReconstructState, ValuesReconstructState};
use super::{
DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState,
ValuesReconstructState,
};
pub struct InMemoryLayer {
conf: &'static PageServerConf,
@@ -281,9 +284,9 @@ impl InMemoryLayer {
continue;
}
let key_done =
let key_situation =
reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap());
if key_done {
if key_situation == ValueReconstructSituation::Complete {
completed_keys.insert(block_read.key);
}
}

View File

@@ -828,7 +828,7 @@ impl Timeline {
Ok(results)
}
#[allow(unused)]
#[cfg(debug_assertions)]
pub(super) async fn validate_get_vectored_impl(
&self,
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
@@ -2713,19 +2713,14 @@ impl Timeline {
/// Get the data needed to reconstruct all keys in the provided keyspace
///
/// Maintain a fringe (LayerFringe) which tracks all the layers that intersect
/// the current keyspace. At each iteration pop the top of the fringe (the layer
/// with the highest Lsn) and get all the required reconstruct data from the layer
/// in one go.
///
/// More granulary, the algorithm is as follows:
// 1. While some keys are still not done and there's a timeline to visit:
// 2. Visit the timeline:
// 2.1: Build the fringe for the current keyspace
// 2.2 Visit the newest layer from the fringe to collect all values for the range it
// intersects
// 2.3. Pop the timeline from the fringe
// 2.4. If the fringe is empty, go back to 1
/// The algorithm is as follows:
/// 1. While some keys are still not done and there's a timeline to visit:
/// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_inner`]:
/// 2.1: Build the fringe for the current keyspace
/// 2.2 Visit the newest layer from the fringe to collect all values for the range it
/// intersects
/// 2.3. Pop the timeline from the fringe
/// 2.4. If the fringe is empty, go back to 1
async fn get_vectored_reconstruct_data(
&self,
mut keyspace: KeySpace,
@@ -2773,6 +2768,16 @@ impl Timeline {
Ok(())
}
/// Collect the reconstruct data for a ketspace from the specified timeline.
///
/// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect
/// the current keyspace. The current keyspace of the search at any given timeline
/// is the original keyspace minus all the keys that have been completed minus
/// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly,
/// but if you merge all the keyspaces in the fringe, you get the "current keyspace".
///
/// At each iteration pop the top of the fringe (the layer with the highest Lsn)
/// and get all the required reconstruct data from the layer in one go.
async fn get_vectored_reconstruct_data_inner(
&self,
timeline: &Timeline,