diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index dac6b2f893..d4b2ead5b1 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -287,7 +287,7 @@ pub(crate) enum ReadableLayer { /// A partial description of a read to be done. #[derive(Debug, Clone)] -struct ReadDesc { +struct VisitLocation { /// An id used to resolve the readable layer within the fringe layer_id: LayerId, /// Lsn range for the read, used for selecting the next read @@ -303,46 +303,442 @@ struct ReadDesc { /// a two layer indexing scheme. #[derive(Debug)] pub(crate) struct LayerFringe { - planned_reads_by_lsn: BinaryHeap, - layers: HashMap, + visits_by_lsn_index: BinaryHeap, + layer_visits: HashMap, } #[derive(Debug)] -struct LayerKeyspace { - layer: ReadableLayer, - target_keyspace: KeySpaceRandomAccum, +pub(crate) enum LayerVisitBuilder { + InMemoryLayer(InMemoryLayerVisitBuilder), + PersistentLayer(PersistentLayerVisitBuilder), +} + +#[derive(Debug)] +pub(crate) enum LayerVisit { + InMemoryLayer(InMemoryLayerVisit), + PersistentLayer(PersistentLayerVisit), +} + +#[derive(Debug)] +pub(crate) enum PersistentLayerVisitBuilder { + DeltaLayer(DeltaLayerVisitBuilder), + ImageLayer(ImageLayerVisitBuilder), +} + +#[derive(Debug)] +pub(crate) enum PersistentLayerVisit { + DeltaLayer(DeltaLayerVisit), + ImageLayer(ImageLayerVisit), +} + +#[derive(Debug)] +pub(crate) struct InMemoryLayerVisitBuilder { + /// Key space accumulator which will define which keys we are + /// interested in for this layer visit. + keyspace_accum: KeySpaceRandomAccum, + /// Ignore any keys with an LSN greater or equal + /// than the specified one. + lsn_ceil: Lsn, + /// Only consider keys at LSN greater or equal than the specified one + lsn_floor: Lsn, + // The in-memory layer to visit + layer: Arc, +} + +#[derive(Debug)] +pub(crate) struct InMemoryLayerVisit { + /// Key space of keys considered by the visit + keyspace: KeySpace, + /// Ignore any keys with an LSN greater or equal + /// than the specified one. + lsn_ceil: Lsn, + /// Only consider keys at LSN greater or equal than the specified one + lsn_floor: Lsn, +} + +#[derive(Debug)] +pub(crate) struct DeltaLayerVisitBuilder { + /// List of key spaces accumulators which will define what deltas are read. + /// Each accumulator has an associated LSN which specifies + /// the LSN floor for it (i.e. do not read below this LSN). + keyspace_accums: HashMap, + /// Ignore any keys with an LSN greater or equal + /// than the specified one. + lsn_ceil: Lsn, + /// Handle for the layer to visit (guaranteed to be a delta layer) + layer: Layer, +} + +#[derive(Debug)] +pub(crate) struct DeltaLayerVisit { + /// List of key spaces considered during the visit. + /// Each keyspace has an associated LSN which specifies + /// the LSN floor for it (i.e. do not read below this LSN). + keyspaces: Vec<(Lsn, KeySpace)>, + /// Ignore any keys with an LSN greater or equal + /// than the specified one. + lsn_ceil: Lsn, +} + +#[derive(Debug)] +pub(crate) struct ImageLayerVisitBuilder { + /// Key space which defines which keys we are + /// interested in for this layer visit. + keyspace_accum: KeySpaceRandomAccum, + /// Handle for the layer to visit (guaranteed to be an image layer) + layer: Layer, + /// Only consider keys at LSN greater or equal than the specified one + lsn_floor: Lsn, +} + +#[derive(Debug)] +pub(crate) struct ImageLayerVisit { + /// Key space which defines which keys we are + /// interested in for this layer visit. + keyspace: KeySpace, + /// Only consider keys at LSN greater or equal than the specified one + lsn_floor: Lsn, +} + +impl LayerVisitBuilder { + pub(crate) fn new(layer: ReadableLayer, keyspace: KeySpace, lsn_range: Range) -> Self { + match layer { + ReadableLayer::InMemoryLayer(in_mem) => { + Self::InMemoryLayer(InMemoryLayerVisitBuilder::new(in_mem, keyspace, lsn_range)) + } + ReadableLayer::PersistentLayer(persistent) => Self::PersistentLayer( + PersistentLayerVisitBuilder::new(persistent, keyspace, lsn_range), + ), + } + } +} + +impl InMemoryLayerVisitBuilder { + fn new(layer: Arc, keyspace: KeySpace, lsn_range: Range) -> Self { + assert_eq!(lsn_range.start, layer.get_lsn_range().start); + + let mut keyspace_accum = KeySpaceRandomAccum::new(); + keyspace_accum.add_keyspace(keyspace); + + Self { + keyspace_accum, + lsn_floor: lsn_range.start, + lsn_ceil: lsn_range.end, + layer, + } + } +} + +impl PersistentLayerVisitBuilder { + fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range) -> Self { + let is_delta = layer.layer_desc().is_delta; + if is_delta { + Self::DeltaLayer(DeltaLayerVisitBuilder::new(layer, keyspace, lsn_range)) + } else { + Self::ImageLayer(ImageLayerVisitBuilder::new(layer, keyspace, lsn_range)) + } + } +} + +impl DeltaLayerVisitBuilder { + fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range) -> Self { + assert!(layer.layer_desc().is_delta); + + let mut keyspace_accum = KeySpaceRandomAccum::new(); + keyspace_accum.add_keyspace(keyspace); + + Self { + keyspace_accums: HashMap::from([(lsn_range.start, keyspace_accum)]), + lsn_ceil: lsn_range.end, + layer, + } + } +} + +impl ImageLayerVisitBuilder { + fn new(layer: Layer, keyspace: KeySpace, lsn_range: Range) -> Self { + assert!(!layer.layer_desc().is_delta); + assert_eq!(lsn_range.start, layer.layer_desc().lsn_range.start); + + let mut keyspace_accum = KeySpaceRandomAccum::new(); + keyspace_accum.add_keyspace(keyspace); + + Self { + keyspace_accum, + lsn_floor: lsn_range.start, + layer, + } + } +} + +pub(crate) trait LayerVisitBuilderUpdate { + type L; + type LV; + + /// Extend an already planned layer visit to also include the keys + /// in the provided keyspace and LSN range. + fn update(&mut self, keyspace: KeySpace, lsn_range: Range); + + /// Build the visit! + fn build(self) -> (Self::L, Self::LV); +} + +impl LayerVisitBuilderUpdate for LayerVisitBuilder { + type L = ReadableLayer; + type LV = LayerVisit; + + fn update(&mut self, keyspace: KeySpace, lsn_range: Range) { + match self { + LayerVisitBuilder::InMemoryLayer(v) => v.update(keyspace, lsn_range), + LayerVisitBuilder::PersistentLayer(v) => v.update(keyspace, lsn_range), + } + } + + fn build(self) -> (Self::L, Self::LV) { + match self { + LayerVisitBuilder::InMemoryLayer(in_mem) => ( + ReadableLayer::InMemoryLayer(in_mem.layer), + LayerVisit::InMemoryLayer(InMemoryLayerVisit { + keyspace: in_mem.keyspace_accum.to_keyspace(), + lsn_ceil: in_mem.lsn_ceil, + lsn_floor: in_mem.lsn_floor, + }), + ), + LayerVisitBuilder::PersistentLayer(pers) => { + let (layer, visit) = pers.build(); + ( + ReadableLayer::PersistentLayer(layer), + LayerVisit::PersistentLayer(visit), + ) + } + } + } +} + +impl LayerVisitBuilderUpdate for PersistentLayerVisitBuilder { + type L = Layer; + type LV = PersistentLayerVisit; + + fn update(&mut self, keyspace: KeySpace, lsn_range: Range) { + match self { + PersistentLayerVisitBuilder::DeltaLayer(v) => v.update(keyspace, lsn_range), + PersistentLayerVisitBuilder::ImageLayer(v) => v.update(keyspace, lsn_range), + } + } + + fn build(self) -> (Self::L, Self::LV) { + match self { + PersistentLayerVisitBuilder::DeltaLayer(delta) => { + let (layer, visit) = delta.build(); + (layer, PersistentLayerVisit::DeltaLayer(visit)) + } + PersistentLayerVisitBuilder::ImageLayer(img) => { + let (layer, visit) = img.build(); + (layer, PersistentLayerVisit::ImageLayer(visit)) + } + } + } +} + +impl LayerVisitBuilderUpdate for InMemoryLayerVisitBuilder { + type L = Arc; + type LV = InMemoryLayerVisit; + + fn update(&mut self, keyspace: KeySpace, lsn_range: Range) { + // Note: I cannot think of any cases when this update should happen, + // since in memory layers span the entire key range. + assert_eq!(lsn_range.end, self.lsn_ceil); + assert_eq!(lsn_range.start, self.lsn_floor); + self.keyspace_accum.add_keyspace(keyspace); + } + + fn build(self) -> (Self::L, Self::LV) { + ( + self.layer, + InMemoryLayerVisit { + keyspace: self.keyspace_accum.to_keyspace(), + lsn_floor: self.lsn_floor, + lsn_ceil: self.lsn_ceil, + }, + ) + } +} + +impl LayerVisitBuilderUpdate for DeltaLayerVisitBuilder { + type L = Layer; + type LV = DeltaLayerVisit; + + fn update(&mut self, keyspace: KeySpace, lsn_range: Range) { + assert_eq!(lsn_range.end, self.lsn_ceil); + self.keyspace_accums + .entry(lsn_range.start) + .or_default() + .add_keyspace(keyspace); + } + + fn build(self) -> (Self::L, Self::LV) { + use itertools::Itertools; + + let keyspaces = self + .keyspace_accums + .into_iter() + .filter_map(|(lsn_floor, accum)| { + let keyspace = accum.to_keyspace(); + if keyspace.is_empty() { + None + } else { + Some((lsn_floor, keyspace)) + } + }) + .sorted_by_key(|(_lsn_floor, keyspace)| keyspace.start().unwrap()) + .collect::>(); + + if cfg!(debug_assertions) { + // Check that the keyspaces we are going to read from + // a layer are non-overlapping. + // + // The keyspaces provided to vectored read initially are non-overlapping. + // We may split keyspaces at each step and keyspaces resulting from a split + // are non-overlapping as well. One can prove that the property holds by + // induction. + let mut prev_end: Option = None; + for (_lsn_floor, crnt) in keyspaces.iter() { + if let Some(prev_end) = prev_end { + debug_assert!(prev_end <= crnt.start().unwrap()) + } + + prev_end = Some(crnt.end().unwrap()); + } + } + + ( + self.layer, + DeltaLayerVisit { + keyspaces, + lsn_ceil: self.lsn_ceil, + }, + ) + } +} + +impl LayerVisitBuilderUpdate for ImageLayerVisitBuilder { + type L = Layer; + type LV = ImageLayerVisit; + + fn update(&mut self, keyspace: KeySpace, lsn_range: Range) { + assert_eq!(lsn_range.start, self.lsn_floor); + self.keyspace_accum.add_keyspace(keyspace); + } + + fn build(self) -> (Self::L, Self::LV) { + ( + self.layer, + ImageLayerVisit { + keyspace: self.keyspace_accum.to_keyspace(), + lsn_floor: self.lsn_floor, + }, + ) + } +} + +pub(crate) trait LayerVisitDetails { + /// Returns the key spaces planned for the visit + /// and their associated floor LSNs. + fn keyspaces(&self) -> Vec<(Lsn, KeySpace)>; +} + +impl LayerVisitDetails for LayerVisit { + fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> { + match self { + LayerVisit::PersistentLayer(pers) => pers.keyspaces(), + LayerVisit::InMemoryLayer(in_mem) => in_mem.keyspaces(), + } + } +} + +impl LayerVisitDetails for PersistentLayerVisit { + fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> { + match self { + PersistentLayerVisit::DeltaLayer(delta) => delta.keyspaces(), + PersistentLayerVisit::ImageLayer(image) => image.keyspaces(), + } + } +} + +impl LayerVisitDetails for DeltaLayerVisit { + fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> { + self.keyspaces.clone() + } +} + +impl LayerVisitDetails for ImageLayerVisit { + fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> { + vec![(self.lsn_floor, self.keyspace.clone())] + } +} + +impl LayerVisitDetails for InMemoryLayerVisit { + fn keyspaces(&self) -> Vec<(Lsn, KeySpace)> { + vec![(self.lsn_floor, self.keyspace.clone())] + } +} + +impl LayerVisit { + fn into_persistent_layer_visit(self) -> PersistentLayerVisit { + match self { + LayerVisit::PersistentLayer(visit) => visit, + LayerVisit::InMemoryLayer(visit) => { + panic!("Invalid attempt to cast to PersistentLayerVisit: {visit:?}"); + } + } + } + + fn into_in_memory_layer_visit(self) -> InMemoryLayerVisit { + match self { + LayerVisit::InMemoryLayer(visit) => visit, + LayerVisit::PersistentLayer(visit) => { + panic!("Invalid attempt to cast to InMemoryLayerVisit: {visit:?}"); + } + } + } +} + +impl PersistentLayerVisit { + fn into_delta_layer_visit(self) -> DeltaLayerVisit { + match self { + PersistentLayerVisit::DeltaLayer(visit) => visit, + PersistentLayerVisit::ImageLayer(visit) => { + panic!("Invalid attempt to cast to DeltaLayerVisit: {visit:?}"); + } + } + } + + fn into_image_layer_visit(self) -> ImageLayerVisit { + match self { + PersistentLayerVisit::ImageLayer(visit) => visit, + PersistentLayerVisit::DeltaLayer(visit) => { + panic!("Invalid attempt to cast to ImageLayerVisit: {visit:?}"); + } + } + } } impl LayerFringe { pub(crate) fn new() -> Self { LayerFringe { - planned_reads_by_lsn: BinaryHeap::new(), - layers: HashMap::new(), + visits_by_lsn_index: BinaryHeap::new(), + layer_visits: HashMap::new(), } } - pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range)> { - let read_desc = match self.planned_reads_by_lsn.pop() { + pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, LayerVisit)> { + let read_desc = match self.visits_by_lsn_index.pop() { Some(desc) => desc, None => return None, }; - let removed = self.layers.remove_entry(&read_desc.layer_id); - - match removed { - Some(( - _, - LayerKeyspace { - layer, - mut target_keyspace, - }, - )) => Some(( - layer, - target_keyspace.consume_keyspace(), - read_desc.lsn_range, - )), - None => unreachable!("fringe internals are always consistent"), - } + let removed = self.layer_visits.remove(&read_desc.layer_id)?; + Some(removed.build()) } pub(crate) fn update( @@ -352,22 +748,18 @@ impl LayerFringe { lsn_range: Range, ) { let layer_id = layer.id(); - let entry = self.layers.entry(layer_id.clone()); + let entry = self.layer_visits.entry(layer_id.clone()); match entry { Entry::Occupied(mut entry) => { - entry.get_mut().target_keyspace.add_keyspace(keyspace); + entry.get_mut().update(keyspace, lsn_range); } Entry::Vacant(entry) => { - self.planned_reads_by_lsn.push(ReadDesc { - lsn_range, + self.visits_by_lsn_index.push(VisitLocation { + lsn_range: lsn_range.clone(), layer_id: layer_id.clone(), }); - let mut accum = KeySpaceRandomAccum::new(); - accum.add_keyspace(keyspace); - entry.insert(LayerKeyspace { - layer, - target_keyspace: accum, - }); + + entry.insert(LayerVisitBuilder::new(layer, keyspace, lsn_range)); } } } @@ -379,7 +771,7 @@ impl Default for LayerFringe { } } -impl Ord for ReadDesc { +impl Ord for VisitLocation { fn cmp(&self, other: &Self) -> Ordering { let ord = self.lsn_range.end.cmp(&other.lsn_range.end); if ord == std::cmp::Ordering::Equal { @@ -390,19 +782,19 @@ impl Ord for ReadDesc { } } -impl PartialOrd for ReadDesc { +impl PartialOrd for VisitLocation { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl PartialEq for ReadDesc { +impl PartialEq for VisitLocation { fn eq(&self, other: &Self) -> bool { self.lsn_range == other.lsn_range } } -impl Eq for ReadDesc {} +impl Eq for VisitLocation {} impl ReadableLayer { pub(crate) fn id(&self) -> LayerId { @@ -414,20 +806,27 @@ impl ReadableLayer { pub(crate) async fn get_values_reconstruct_data( &self, - keyspace: KeySpace, - lsn_range: Range, + visit: LayerVisit, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { match self { ReadableLayer::PersistentLayer(layer) => { layer - .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, ctx) + .get_values_reconstruct_data( + visit.into_persistent_layer_visit(), + reconstruct_state, + ctx, + ) .await } ReadableLayer::InMemoryLayer(layer) => { layer - .get_values_reconstruct_data(keyspace, lsn_range.end, reconstruct_state, ctx) + .get_values_reconstruct_data( + visit.into_in_memory_layer_visit(), + reconstruct_state, + ctx, + ) .await } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 34f1b15138..8403ad10db 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -75,7 +75,7 @@ use utils::{ lsn::Lsn, }; -use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState}; +use super::{AsLayerDesc, DeltaLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState}; /// /// Header stored in the beginning of the file @@ -841,8 +841,7 @@ impl DeltaLayerInner { // can be further optimised to visit the index only once. pub(super) async fn get_values_reconstruct_data( &self, - keyspace: KeySpace, - lsn_range: Range, + visit: DeltaLayerVisit, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { @@ -863,8 +862,8 @@ impl DeltaLayerInner { let data_end_offset = self.index_start_offset(); let reads = Self::plan_reads( - &keyspace, - lsn_range.clone(), + &visit.keyspaces, + visit.lsn_ceil, data_end_offset, index_reader, planner, @@ -877,14 +876,16 @@ impl DeltaLayerInner { self.do_reads_and_update_state(reads, reconstruct_state, ctx) .await; - reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start); + for (lsn_floor, keyspace) in visit.keyspaces { + reconstruct_state.on_lsn_advanced(&keyspace, lsn_floor); + } Ok(()) } async fn plan_reads( - keyspace: &KeySpace, - lsn_range: Range, + keyspaces: &Vec<(Lsn, KeySpace)>, + lsn_ceil: Lsn, data_end_offset: u64, index_reader: DiskBtreeReader, mut planner: VectoredReadPlanner, @@ -898,48 +899,52 @@ impl DeltaLayerInner { .page_content_kind(PageContentKind::DeltaLayerBtreeNode) .build(); - for range in keyspace.ranges.iter() { - let mut range_end_handled = false; + for (lsn_floor, keyspace) in keyspaces { + let lsn_range = *lsn_floor..lsn_ceil; - let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); - let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx); - let mut index_stream = std::pin::pin!(index_stream); + for range in keyspace.ranges.iter() { + let mut range_end_handled = false; - while let Some(index_entry) = index_stream.next().await { - let (raw_key, value) = index_entry?; - let key = Key::from_slice(&raw_key[..KEY_SIZE]); - let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); - let blob_ref = BlobRef(value); + let start_key = DeltaKey::from_key_lsn(&range.start, lsn_range.start); + let index_stream = index_reader.clone().into_stream(&start_key.0, &ctx); + let mut index_stream = std::pin::pin!(index_stream); - // Lsns are not monotonically increasing across keys, so we don't assert on them. - assert!(key >= range.start); + while let Some(index_entry) = index_stream.next().await { + let (raw_key, value) = index_entry?; + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + let lsn = DeltaKey::extract_lsn_from_buf(&raw_key); + let blob_ref = BlobRef(value); - let outside_lsn_range = !lsn_range.contains(&lsn); - let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn); + // Lsns are not monotonically increasing across keys, so we don't assert on them. + assert!(key >= range.start); - let flag = { - if outside_lsn_range || below_cached_lsn { - BlobFlag::Ignore - } else if blob_ref.will_init() { - BlobFlag::ReplaceAll + let outside_lsn_range = !lsn_range.contains(&lsn); + let below_cached_lsn = reconstruct_state.get_cached_lsn(&key) >= Some(lsn); + + let flag = { + if outside_lsn_range || below_cached_lsn { + BlobFlag::Ignore + } else if blob_ref.will_init() { + BlobFlag::ReplaceAll + } else { + // Usual path: add blob to the read + BlobFlag::None + } + }; + + if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { + planner.handle_range_end(blob_ref.pos()); + range_end_handled = true; + break; } else { - // Usual path: add blob to the read - BlobFlag::None + planner.handle(key, lsn, blob_ref.pos(), flag); } - }; - - if key >= range.end || (key.next() == range.end && lsn >= lsn_range.end) { - planner.handle_range_end(blob_ref.pos()); - range_end_handled = true; - break; - } else { - planner.handle(key, lsn, blob_ref.pos(), flag); } - } - if !range_end_handled { - tracing::debug!("Handling range end fallback at {}", data_end_offset); - planner.handle_range_end(data_end_offset); + if !range_end_handled { + tracing::debug!("Handling range end fallback at {}", data_end_offset); + planner.handle_range_end(data_end_offset); + } } } @@ -1641,8 +1646,8 @@ pub(crate) mod test { // Plan and validate let vectored_reads = DeltaLayerInner::plan_reads( - &keyspace, - lsn_range.clone(), + &vec![(lsn_range.start, keyspace.clone())], + lsn_range.end, disk_offset, reader, planner, @@ -1895,8 +1900,8 @@ pub(crate) mod test { let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64; let vectored_reads = DeltaLayerInner::plan_reads( - &keyspace, - entries_meta.lsn_range.clone(), + &vec![(entries_meta.lsn_range.start, keyspace)], + entries_meta.lsn_range.end, data_end_offset, index_reader, planner, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 5de2582ab7..d61eeca1c3 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -69,7 +69,7 @@ use utils::{ }; use super::layer_name::ImageLayerName; -use super::{AsLayerDesc, LayerName, PersistentLayerDesc, ValuesReconstructState}; +use super::{AsLayerDesc, ImageLayerVisit, LayerName, PersistentLayerDesc, ValuesReconstructState}; /// /// Header stored in the beginning of the file @@ -435,12 +435,12 @@ impl ImageLayerInner { // the reconstruct state with whatever is found. pub(super) async fn get_values_reconstruct_data( &self, - keyspace: KeySpace, + visit: ImageLayerVisit, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { let reads = self - .plan_reads(keyspace, None, ctx) + .plan_reads(visit.keyspace, None, ctx) .await .map_err(GetVectoredError::Other)?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index e487bee1f2..dd9971a845 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -17,7 +17,6 @@ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; use camino::Utf8PathBuf; use pageserver_api::key::CompactKey; -use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; use std::collections::{BTreeMap, HashMap}; @@ -36,7 +35,8 @@ use std::sync::atomic::{AtomicU64, AtomicUsize}; use tokio::sync::RwLock; use super::{ - DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState, + DeltaLayerWriter, InMemoryLayerVisit, PersistentLayerDesc, ValueReconstructSituation, + ValuesReconstructState, }; pub(crate) mod vectored_dio_read; @@ -416,11 +416,14 @@ impl InMemoryLayer { // If the key is cached, go no further than the cached Lsn. pub(crate) async fn get_values_reconstruct_data( &self, - keyspace: KeySpace, - end_lsn: Lsn, + visit: InMemoryLayerVisit, reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { + let InMemoryLayerVisit { + keyspace, lsn_ceil, .. + } = visit; + let ctx = RequestContextBuilder::extend(ctx) .page_content_kind(PageContentKind::InMemoryLayer) .build(); @@ -440,8 +443,8 @@ impl InMemoryLayer { { let key = Key::from_compact(*key); let lsn_range = match reconstruct_state.get_cached_lsn(&key) { - Some(cached_lsn) => (cached_lsn + 1)..end_lsn, - None => self.start_lsn..end_lsn, + Some(cached_lsn) => (cached_lsn + 1)..lsn_ceil, + None => self.start_lsn..lsn_ceil, }; let slice = vec_map.slice_range(lsn_range); diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index b15cd4da39..bb5db8bc60 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1,9 +1,7 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; -use pageserver_api::keyspace::KeySpace; use pageserver_api::models::HistoricLayerInfo; use pageserver_api::shard::{ShardIdentity, ShardIndex, TenantShardId}; -use std::ops::Range; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Weak}; use std::time::{Duration, SystemTime}; @@ -23,7 +21,7 @@ use super::delta_layer::{self, DeltaEntry}; use super::image_layer::{self}; use super::{ AsLayerDesc, ImageLayerWriter, LayerAccessStats, LayerAccessStatsReset, LayerName, - LayerVisibilityHint, PersistentLayerDesc, ValuesReconstructState, + LayerVisibilityHint, PersistentLayerDesc, PersistentLayerVisit, ValuesReconstructState, }; use utils::generation::Generation; @@ -303,8 +301,7 @@ impl Layer { pub(crate) async fn get_values_reconstruct_data( &self, - keyspace: KeySpace, - lsn_range: Range, + visit: PersistentLayerVisit, reconstruct_data: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { @@ -322,7 +319,7 @@ impl Layer { self.record_access(ctx); layer - .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, &self.0, ctx) + .get_values_reconstruct_data(visit, reconstruct_data, &self.0, ctx) .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)) .await .map_err(|err| match err { @@ -1741,8 +1738,7 @@ impl DownloadedLayer { async fn get_values_reconstruct_data( &self, - keyspace: KeySpace, - lsn_range: Range, + visit: PersistentLayerVisit, reconstruct_data: &mut ValuesReconstructState, owner: &Arc, ctx: &RequestContext, @@ -1755,11 +1751,11 @@ impl DownloadedLayer { .map_err(GetVectoredError::Other)? { Delta(d) => { - d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx) + d.get_values_reconstruct_data(visit.into_delta_layer_visit(), reconstruct_data, ctx) .await } Image(i) => { - i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx) + i.get_values_reconstruct_data(visit.into_image_layer_visit(), reconstruct_data, ctx) .await } } diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 0b9bde4f57..b191304402 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -1,6 +1,6 @@ use std::time::UNIX_EPOCH; -use pageserver_api::key::CONTROLFILE_KEY; +use pageserver_api::{key::CONTROLFILE_KEY, keyspace::KeySpace}; use tokio::task::JoinSet; use utils::{ completion::{self, Completion}, @@ -9,7 +9,10 @@ use utils::{ use super::failpoints::{Failpoint, FailpointKind}; use super::*; -use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint}; +use crate::{ + context::DownloadBehavior, + tenant::storage_layer::{ImageLayerVisit, LayerVisibilityHint}, +}; use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness}; /// Used in tests to advance a future to wanted await point, and not futher. @@ -56,13 +59,14 @@ async fn smoke_test() { let img_before = { let mut data = ValuesReconstructState::default(); + let visit = ImageLayerVisit { + keyspace: controlfile_keyspace.clone(), + lsn_floor: Lsn(0x10), + }; + let visit = PersistentLayerVisit::ImageLayer(visit); + layer - .get_values_reconstruct_data( - controlfile_keyspace.clone(), - Lsn(0x10)..Lsn(0x11), - &mut data, - &ctx, - ) + .get_values_reconstruct_data(visit, &mut data, &ctx) .await .unwrap(); data.keys @@ -88,13 +92,14 @@ async fn smoke_test() { // on accesses when the layer is evicted, it will automatically be downloaded. let img_after = { let mut data = ValuesReconstructState::default(); + let visit = ImageLayerVisit { + keyspace: controlfile_keyspace.clone(), + lsn_floor: Lsn(0x10), + }; + let visit = PersistentLayerVisit::ImageLayer(visit); + layer - .get_values_reconstruct_data( - controlfile_keyspace.clone(), - Lsn(0x10)..Lsn(0x11), - &mut data, - &ctx, - ) + .get_values_reconstruct_data(visit, &mut data, &ctx) .instrument(download_span.clone()) .await .unwrap(); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f66491d962..fc6fa31d37 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -136,7 +136,8 @@ use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::{ - config::TenantConf, storage_layer::inmemory_layer, storage_layer::LayerVisibilityHint, + config::TenantConf, + storage_layer::{inmemory_layer, LayerVisibilityHint, LayerVisitDetails}, upload_queue::NotInitialized, }; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; @@ -3153,12 +3154,12 @@ impl Timeline { async fn get_vectored_reconstruct_data_timeline( timeline: &Timeline, keyspace: KeySpace, - mut cont_lsn: Lsn, + cont_lsn: Lsn, reconstruct_state: &mut ValuesReconstructState, cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { - let mut unmapped_keyspace = keyspace.clone(); + let mut unmapped_keyspaces = vec![(cont_lsn, keyspace.clone())]; let mut fringe = LayerFringe::new(); let mut completed_keyspace = KeySpace::default(); @@ -3169,86 +3170,83 @@ impl Timeline { return Err(GetVectoredError::Cancelled); } - let (keys_done_last_step, keys_with_image_coverage) = - reconstruct_state.consume_done_keys(); - unmapped_keyspace.remove_overlapping_with(&keys_done_last_step); - completed_keyspace.merge(&keys_done_last_step); - if let Some(keys_with_image_coverage) = keys_with_image_coverage { - unmapped_keyspace - .remove_overlapping_with(&KeySpace::single(keys_with_image_coverage.clone())); - image_covered_keyspace.add_range(keys_with_image_coverage); - } - - // Do not descent any further if the last layer we visited - // completed all keys in the keyspace it inspected. This is not - // required for correctness, but avoids visiting extra layers - // which turns out to be a perf bottleneck in some cases. - if !unmapped_keyspace.is_empty() { - let guard = timeline.layers.read().await; - let layers = guard.layer_map()?; - - let in_memory_layer = layers.find_in_memory_layer(|l| { - let start_lsn = l.get_lsn_range().start; - cont_lsn > start_lsn - }); - - match in_memory_layer { - Some(l) => { - let lsn_range = l.get_lsn_range().start..cont_lsn; - fringe.update( - ReadableLayer::InMemoryLayer(l), - unmapped_keyspace.clone(), - lsn_range, - ); - } - None => { - for range in unmapped_keyspace.ranges.iter() { - let results = layers.range_search(range.clone(), cont_lsn); - - results - .found - .into_iter() - .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { - ( - ReadableLayer::PersistentLayer(guard.get_from_desc(&layer)), - keyspace_accum.to_keyspace(), - lsn_floor..cont_lsn, - ) - }) - .for_each(|(layer, keyspace, lsn_range)| { - fringe.update(layer, keyspace, lsn_range) - }); - } - } + for (cont_lsn, unmapped_keyspace) in unmapped_keyspaces.iter_mut() { + let (keys_done_last_step, keys_with_image_coverage) = + reconstruct_state.consume_done_keys(); + unmapped_keyspace.remove_overlapping_with(&keys_done_last_step); + completed_keyspace.merge(&keys_done_last_step); + if let Some(keys_with_image_coverage) = keys_with_image_coverage { + unmapped_keyspace.remove_overlapping_with(&KeySpace::single( + keys_with_image_coverage.clone(), + )); + image_covered_keyspace.add_range(keys_with_image_coverage); } - // It's safe to drop the layer map lock after planning the next round of reads. - // The fringe keeps readable handles for the layers which are safe to read even - // if layers were compacted or flushed. - // - // The more interesting consideration is: "Why is the read algorithm still correct - // if the layer map changes while it is operating?". Doing a vectored read on a - // timeline boils down to pushing an imaginary lsn boundary downwards for each range - // covered by the read. The layer map tells us how to move the lsn downwards for a - // range at *a particular point in time*. It is fine for the answer to be different - // at two different time points. - drop(guard); + // Do not descent any further if the last layer we visited + // completed all keys in the keyspace it inspected. This is not + // required for correctness, but avoids visiting extra layers + // which turns out to be a perf bottleneck in some cases. + if !unmapped_keyspace.is_empty() { + let guard = timeline.layers.read().await; + let layers = guard.layer_map()?; + + let in_memory_layer = layers.find_in_memory_layer(|l| { + let start_lsn = l.get_lsn_range().start; + *cont_lsn > start_lsn + }); + + match in_memory_layer { + Some(l) => { + let lsn_range = l.get_lsn_range().start..*cont_lsn; + fringe.update( + ReadableLayer::InMemoryLayer(l), + unmapped_keyspace.clone(), + lsn_range, + ); + } + None => { + for range in unmapped_keyspace.ranges.iter() { + let results = layers.range_search(range.clone(), *cont_lsn); + + results + .found + .into_iter() + .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { + ( + ReadableLayer::PersistentLayer( + guard.get_from_desc(&layer), + ), + keyspace_accum.to_keyspace(), + lsn_floor..*cont_lsn, + ) + }) + .for_each(|(layer, keyspace, lsn_range)| { + fringe.update(layer, keyspace, lsn_range) + }); + } + } + } + + // It's safe to drop the layer map lock after planning the next round of reads. + // The fringe keeps readable handles for the layers which are safe to read even + // if layers were compacted or flushed. + // + // The more interesting consideration is: "Why is the read algorithm still correct + // if the layer map changes while it is operating?". Doing a vectored read on a + // timeline boils down to pushing an imaginary lsn boundary downwards for each range + // covered by the read. The layer map tells us how to move the lsn downwards for a + // range at *a particular point in time*. It is fine for the answer to be different + // at two different time points. + drop(guard); + } } - if let Some((layer_to_read, keyspace_to_read, lsn_range)) = fringe.next_layer() { - let next_cont_lsn = lsn_range.start; + if let Some((layer_to_read, layer_visit)) = fringe.next_layer() { + unmapped_keyspaces = layer_visit.keyspaces(); layer_to_read - .get_values_reconstruct_data( - keyspace_to_read.clone(), - lsn_range, - reconstruct_state, - ctx, - ) + .get_values_reconstruct_data(layer_visit, reconstruct_state, ctx) .await?; - unmapped_keyspace = keyspace_to_read; - cont_lsn = next_cont_lsn; - reconstruct_state.on_layer_visited(&layer_to_read); } else { break; @@ -5502,19 +5500,24 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> anyhow::Result> { + use super::storage_layer::{LayerVisitBuilder, LayerVisitBuilderUpdate}; + let mut all_data = Vec::new(); let guard = self.layers.read().await; for layer in guard.layer_map()?.iter_historic_layers() { if !layer.is_delta() && layer.image_layer_lsn() == lsn { let layer = guard.get_from_desc(&layer); let mut reconstruct_data = ValuesReconstructState::default(); + + let (layer, visit) = LayerVisitBuilder::new( + ReadableLayer::PersistentLayer(layer), + KeySpace::single(Key::MIN..Key::MAX), + lsn..Lsn(lsn.0 + 1), + ) + .build(); + layer - .get_values_reconstruct_data( - KeySpace::single(Key::MIN..Key::MAX), - lsn..Lsn(lsn.0 + 1), - &mut reconstruct_data, - ctx, - ) + .get_values_reconstruct_data(visit, &mut reconstruct_data, ctx) .await?; for (k, v) in reconstruct_data.keys { all_data.push((k, v?.img.unwrap().1));