pageserver: add data structure to track vectored reconstruct state

This commit is contained in:
Vlad Lazar
2024-02-02 13:33:01 +00:00
committed by Vlad Lazar
parent dc69d5f37e
commit d985d21a7c

View File

@@ -8,15 +8,19 @@ pub(crate) mod layer;
mod layer_desc;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
use pageserver_api::key::Key;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use pageserver_api::models::{
LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::cmp::Reverse;
use std::ops::Range;
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -34,6 +38,8 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
use super::PageReconstructError;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
T: PartialOrd<T>,
@@ -67,6 +73,125 @@ pub struct ValueReconstructState {
pub img: Option<(Lsn, Bytes)>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(crate) enum ValueReconstructSituation {
Complete,
#[default]
Continue,
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
// TODO: can probably get rid of cached_lsn and use img.0
cached_lsn: Option<Lsn>,
situation: ValueReconstructSituation,
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
ValueReconstructState {
records: state.records,
img: state.img,
}
}
}
/// Bag of data accumulated during a vectored get
pub(crate) struct ValuesReconstructState {
pub(crate) keys: HashMap<Key, Result<VectoredValueReconstructState, PageReconstructError>>,
pub(crate) total_keys_done: usize,
keys_done: KeySpaceRandomAccum,
}
impl ValuesReconstructState {
pub(crate) fn new() -> Self {
Self {
keys: HashMap::new(),
total_keys_done: 0,
keys_done: KeySpaceRandomAccum::new(),
}
}
/// Associate a key with the error which it encountered and mark it as done
pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) {
let previous = self.keys.insert(key, Err(err));
if let Some(Ok(state)) = previous {
if state.situation == ValueReconstructSituation::Continue {
self.total_keys_done += 1;
self.keys_done.add_key(key);
}
}
}
/// Update the state collected for a given key.
/// Returns true if this was the last value needed for the key and false otherwise.
///
/// If the key is done after the update, mark it as such.
pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, value: Value) -> bool {
let state = self
.keys
.entry(*key)
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let key_done = match state.situation {
ValueReconstructSituation::Complete => true,
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
let reached_cache = state.cached_lsn.map(|clsn| clsn + 1) == Some(lsn);
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init || reached_cache
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.total_keys_done += 1;
self.keys_done.add_key(*key);
}
key_done
} else {
true
}
}
/// Returns the Lsn at which this key is cached if one exists.
/// The read path should go no further than this Lsn for the given key.
pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option<Lsn> {
self.keys
.get(key)
.and_then(|k| k.as_ref().ok())
.and_then(|state| state.cached_lsn)
}
/// Returns the key space describing the keys that have
/// been marked as completed since the last call to this function.
pub(crate) fn consume_done_keys(&mut self) -> KeySpace {
self.keys_done.consume_keyspace()
}
}
impl Default for ValuesReconstructState {
fn default() -> Self {
Self::new()
}
}
/// Return value from [`Layer::get_value_reconstruct_data`]
#[derive(Clone, Copy, Debug)]
pub enum ValueReconstructResult {