diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 6e9a4932d8..2f6b44cb4f 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,15 +8,19 @@ pub(crate) mod layer; mod layer_desc; use crate::context::{AccessStatsBehavior, RequestContext}; +use crate::repository::Value; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; use once_cell::sync::Lazy; +use pageserver_api::key::Key; +use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::models::{ LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; +use std::cmp::Reverse; use std::ops::Range; use std::sync::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -34,6 +38,8 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub(crate) use layer::{EvictionError, Layer, ResidentLayer}; +use super::PageReconstructError; + pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -67,6 +73,125 @@ pub struct ValueReconstructState { pub img: Option<(Lsn, Bytes)>, } +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub(crate) enum ValueReconstructSituation { + Complete, + #[default] + Continue, +} + +/// Reconstruct data accumulated for a single key during a vectored get +#[derive(Debug, Default, Clone)] +pub(crate) struct VectoredValueReconstructState { + pub(crate) records: Vec<(Lsn, NeonWalRecord)>, + pub(crate) img: Option<(Lsn, Bytes)>, + + // TODO: can probably get rid of cached_lsn and use img.0 + cached_lsn: Option, + situation: ValueReconstructSituation, +} + +impl From for ValueReconstructState { + fn from(mut state: VectoredValueReconstructState) -> Self { + // walredo expects the records to be descending in terms of Lsn + state.records.sort_by_key(|(lsn, _)| Reverse(*lsn)); + + ValueReconstructState { + records: state.records, + img: state.img, + } + } +} + +/// Bag of data accumulated during a vectored get +pub(crate) struct ValuesReconstructState { + pub(crate) keys: HashMap>, + pub(crate) total_keys_done: usize, + + keys_done: KeySpaceRandomAccum, +} + +impl ValuesReconstructState { + pub(crate) fn new() -> Self { + Self { + keys: HashMap::new(), + total_keys_done: 0, + keys_done: KeySpaceRandomAccum::new(), + } + } + + /// Associate a key with the error which it encountered and mark it as done + pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) { + let previous = self.keys.insert(key, Err(err)); + if let Some(Ok(state)) = previous { + if state.situation == ValueReconstructSituation::Continue { + self.total_keys_done += 1; + self.keys_done.add_key(key); + } + } + } + + /// Update the state collected for a given key. + /// Returns true if this was the last value needed for the key and false otherwise. + /// + /// If the key is done after the update, mark it as such. + pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, value: Value) -> bool { + let state = self + .keys + .entry(*key) + .or_insert(Ok(VectoredValueReconstructState::default())); + + if let Ok(state) = state { + let key_done = match state.situation { + ValueReconstructSituation::Complete => true, + ValueReconstructSituation::Continue => match value { + Value::Image(img) => { + state.img = Some((lsn, img)); + true + } + Value::WalRecord(rec) => { + let reached_cache = state.cached_lsn.map(|clsn| clsn + 1) == Some(lsn); + let will_init = rec.will_init(); + state.records.push((lsn, rec)); + will_init || reached_cache + } + }, + }; + + if key_done && state.situation == ValueReconstructSituation::Continue { + state.situation = ValueReconstructSituation::Complete; + self.total_keys_done += 1; + self.keys_done.add_key(*key); + } + + key_done + } else { + true + } + } + + /// Returns the Lsn at which this key is cached if one exists. + /// The read path should go no further than this Lsn for the given key. + pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option { + self.keys + .get(key) + .and_then(|k| k.as_ref().ok()) + .and_then(|state| state.cached_lsn) + } + + /// Returns the key space describing the keys that have + /// been marked as completed since the last call to this function. + pub(crate) fn consume_done_keys(&mut self) -> KeySpace { + self.keys_done.consume_keyspace() + } +} + +impl Default for ValuesReconstructState { + fn default() -> Self { + Self::new() + } +} + /// Return value from [`Layer::get_value_reconstruct_data`] #[derive(Clone, Copy, Debug)] pub enum ValueReconstructResult {