diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 2a1c79e437..1744616888 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -472,6 +472,7 @@ jobs: CHECK_ONDISK_DATA_COMPATIBILITY: nonempty BUILD_TAG: ${{ needs.tag.outputs.build-tag }} PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs + PAGESERVER_GET_VECTORED_IMPL: vectored # Temporary disable this step until we figure out why it's so flaky # Ref https://github.com/neondatabase/neon/issues/4540 diff --git a/Cargo.lock b/Cargo.lock index f25e3d1574..ac8cceb5f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3552,6 +3552,7 @@ dependencies = [ "enum-map", "hex", "humantime-serde", + "itertools", "postgres_ffi", "rand 0.8.5", "serde", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 902af21965..938910caea 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -21,6 +21,7 @@ hex.workspace = true thiserror.workspace = true humantime-serde.workspace = true chrono.workspace = true +itertools.workspace = true workspace_hack.workspace = true diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 396c801606..443ffdcf03 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -2,6 +2,7 @@ use postgres_ffi::BLCKSZ; use std::ops::Range; use crate::key::Key; +use itertools::Itertools; /// /// Represents a set of Keys, in a compact form. @@ -63,9 +64,36 @@ impl KeySpace { KeyPartitioning { parts } } - /// Update the keyspace such that it doesn't contain any range - /// that is overlapping with `other`. This can involve splitting or - /// removing of existing ranges. + /// Merge another keyspace into the current one. + /// Note: the keyspaces must not ovelap (enforced via assertions) + pub fn merge(&mut self, other: &KeySpace) { + let all_ranges = self + .ranges + .iter() + .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start); + + let mut accum = KeySpaceAccum::new(); + let mut prev: Option<&Range> = None; + for range in all_ranges { + if let Some(prev) = prev { + let overlap = + std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end); + assert!( + !overlap, + "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}", + prev, range + ); + } + + accum.add_range(range.clone()); + prev = Some(range); + } + + self.ranges = accum.to_keyspace().ranges; + } + + /// Remove all keys in `other` from `self`. + /// This can involve splitting or removing of existing ranges. pub fn remove_overlapping_with(&mut self, other: &KeySpace) { let (self_start, self_end) = match (self.start(), self.end()) { (Some(start), Some(end)) => (start, end), @@ -220,16 +248,7 @@ impl KeySpaceAccum { } pub fn consume_keyspace(&mut self) -> KeySpace { - if let Some(accum) = self.accum.take() { - self.ranges.push(accum); - } - - let mut prev_accum = KeySpaceAccum::new(); - std::mem::swap(self, &mut prev_accum); - - KeySpace { - ranges: prev_accum.ranges, - } + std::mem::take(self).to_keyspace() } pub fn size(&self) -> u64 { @@ -279,6 +298,13 @@ impl KeySpaceRandomAccum { } KeySpace { ranges } } + + pub fn consume_keyspace(&mut self) -> KeySpace { + let mut prev_accum = KeySpaceRandomAccum::new(); + std::mem::swap(self, &mut prev_accum); + + prev_accum.to_keyspace() + } } pub fn key_range_size(key_range: &Range) -> u32 { diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 7edfab75d4..c862816b80 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -261,10 +261,7 @@ where let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); for part in slru_partitions.parts { - let blocks = self - .timeline - .get_vectored(&part.ranges, self.lsn, self.ctx) - .await?; + let blocks = self.timeline.get_vectored(part, self.lsn, self.ctx).await?; for (key, block) in blocks { slru_builder.add_block(&key, block?).await?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 6d71ff1dd4..6c00c55f39 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -33,6 +33,7 @@ use utils::{ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig; use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConfOpt; +use crate::tenant::timeline::GetVectoredImpl; use crate::tenant::{ TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, }; @@ -84,6 +85,8 @@ pub mod defaults { pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs"; + pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential"; + /// /// Default built-in configuration file. /// @@ -121,6 +124,8 @@ pub mod defaults { #virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}' +#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}' + [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -256,6 +261,8 @@ pub struct PageServerConf { pub ingest_batch_size: u64, pub virtual_file_io_engine: virtual_file::IoEngineKind, + + pub get_vectored_impl: GetVectoredImpl, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -342,6 +349,8 @@ struct PageServerConfigBuilder { ingest_batch_size: BuilderValue, virtual_file_io_engine: BuilderValue, + + get_vectored_impl: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -419,6 +428,8 @@ impl Default for PageServerConfigBuilder { ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE), virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()), + + get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()), } } } @@ -579,6 +590,10 @@ impl PageServerConfigBuilder { self.virtual_file_io_engine = BuilderValue::Set(value); } + pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) { + self.get_vectored_impl = BuilderValue::Set(value); + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_warmup = self .concurrent_tenant_warmup @@ -689,6 +704,9 @@ impl PageServerConfigBuilder { virtual_file_io_engine: self .virtual_file_io_engine .ok_or(anyhow!("missing virtual_file_io_engine"))?, + get_vectored_impl: self + .get_vectored_impl + .ok_or(anyhow!("missing get_vectored_impl"))?, }) } } @@ -943,6 +961,9 @@ impl PageServerConf { "virtual_file_io_engine" => { builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?) } + "get_vectored_impl" => { + builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -1017,6 +1038,7 @@ impl PageServerConf { secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), + get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(), } } } @@ -1250,6 +1272,7 @@ background_task_maximum_delay = '334 s' secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), + get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(), }, "Correct defaults should be used when no config values are provided" ); @@ -1314,6 +1337,7 @@ background_task_maximum_delay = '334 s' secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: 100, virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), + get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c646e5cf90..7021921b12 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3877,6 +3877,7 @@ mod tests { use bytes::BytesMut; use hex_literal::hex; use once_cell::sync::Lazy; + use pageserver_api::keyspace::KeySpace; use rand::{thread_rng, Rng}; use tokio_util::sync::CancellationToken; @@ -4514,6 +4515,61 @@ mod tests { Ok(()) } + async fn bulk_insert_compact_gc( + timeline: Arc, + ctx: &RequestContext, + mut lsn: Lsn, + repeat: usize, + key_count: usize, + ) -> anyhow::Result<()> { + let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap(); + let mut blknum = 0; + + // Enforce that key range is monotonously increasing + let mut keyspace = KeySpaceAccum::new(); + + for _ in 0..repeat { + for _ in 0..key_count { + test_key.field6 = blknum; + let mut writer = timeline.writer().await; + writer + .put( + test_key, + lsn, + &Value::Image(test_img(&format!("{} at {}", blknum, lsn))), + ctx, + ) + .await?; + writer.finish_write(lsn); + drop(writer); + + keyspace.add_key(test_key); + + lsn = Lsn(lsn.0 + 0x10); + blknum += 1; + } + + let cutoff = timeline.get_last_record_lsn(); + + timeline + .update_gc_info( + Vec::new(), + cutoff, + Duration::ZERO, + &CancellationToken::new(), + ctx, + ) + .await?; + timeline.freeze_and_flush().await?; + timeline + .compact(&CancellationToken::new(), EnumSet::empty(), ctx) + .await?; + timeline.gc().await?; + } + + Ok(()) + } + // // Insert 1000 key-value pairs with increasing keys, flush, compact, GC. // Repeat 50 times. @@ -4526,49 +4582,98 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; - let mut lsn = Lsn(0x10); + let lsn = Lsn(0x10); + bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?; - let mut keyspace = KeySpaceAccum::new(); + Ok(()) + } - let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap(); - let mut blknum = 0; - for _ in 0..50 { - for _ in 0..10000 { - test_key.field6 = blknum; - let mut writer = tline.writer().await; - writer - .put( - test_key, - lsn, - &Value::Image(test_img(&format!("{} at {}", blknum, lsn))), - &ctx, - ) - .await?; - writer.finish_write(lsn); - drop(writer); + // Test the vectored get real implementation against a simple sequential implementation. + // + // The test generates a keyspace by repeatedly flushing the in-memory layer and compacting. + // Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys + // grow to the right on the X axis. + // [Delta] + // [Delta] + // [Delta] + // [Delta] + // ------------ Image --------------- + // + // After layer generation we pick the ranges to query as follows: + // 1. The beginning of each delta layer + // 2. At the seam between two adjacent delta layers + // + // There's one major downside to this test: delta layers only contains images, + // so the search can stop at the first delta layer and doesn't traverse any deeper. + #[tokio::test] + async fn test_get_vectored() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_get_vectored")?; + let (tenant, ctx) = harness.load().await; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) + .await?; - keyspace.add_key(test_key); + let lsn = Lsn(0x10); + bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?; - lsn = Lsn(lsn.0 + 0x10); - blknum += 1; + let guard = tline.layers.read().await; + guard.layer_map().dump(true, &ctx).await?; + + let mut reads = Vec::new(); + let mut prev = None; + guard.layer_map().iter_historic_layers().for_each(|desc| { + if !desc.is_delta() { + prev = Some(desc.clone()); + return; } - let cutoff = tline.get_last_record_lsn(); + let start = desc.key_range.start; + let end = desc + .key_range + .start + .add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap()); + reads.push(KeySpace { + ranges: vec![start..end], + }); + if let Some(prev) = &prev { + if !prev.is_delta() { + return; + } + + let first_range = Key { + field6: prev.key_range.end.field6 - 4, + ..prev.key_range.end + }..prev.key_range.end; + + let second_range = desc.key_range.start..Key { + field6: desc.key_range.start.field6 + 4, + ..desc.key_range.start + }; + + reads.push(KeySpace { + ranges: vec![first_range, second_range], + }); + }; + + prev = Some(desc.clone()); + }); + + drop(guard); + + // Pick a big LSN such that we query over all the changes. + // Technically, u64::MAX - 1 is the largest LSN supported by the read path, + // but there seems to be a bug on the non-vectored search path which surfaces + // in that case. + let reads_lsn = Lsn(u64::MAX - 1000); + + for read in reads { + info!("Doing vectored read on {:?}", read); + + let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await; tline - .update_gc_info( - Vec::new(), - cutoff, - Duration::ZERO, - &CancellationToken::new(), - &ctx, - ) - .await?; - tline.freeze_and_flush().await?; - tline - .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) - .await?; - tline.gc().await?; + .validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx) + .await; } Ok(()) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index bb52e586d1..5f4814cc6b 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -52,8 +52,7 @@ use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use anyhow::Result; use pageserver_api::keyspace::KeySpaceAccum; -use std::cmp::Ordering; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::iter::Peekable; use std::ops::Range; use std::sync::Arc; @@ -147,43 +146,28 @@ impl Drop for BatchedUpdates<'_> { } /// Return value of LayerMap::search -#[derive(Eq, PartialEq, Debug)] +#[derive(Eq, PartialEq, Debug, Hash)] pub struct SearchResult { pub layer: Arc, pub lsn_floor: Lsn, } -pub struct OrderedSearchResult(SearchResult); - -impl Ord for OrderedSearchResult { - fn cmp(&self, other: &Self) -> Ordering { - self.0.lsn_floor.cmp(&other.0.lsn_floor) - } -} - -impl PartialOrd for OrderedSearchResult { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for OrderedSearchResult { - fn eq(&self, other: &Self) -> bool { - self.0.lsn_floor == other.0.lsn_floor - } -} - -impl Eq for OrderedSearchResult {} - +/// Return value of [`LayerMap::range_search`] +/// +/// Contains a mapping from a layer description to a keyspace +/// accumulator that contains all the keys which intersect the layer +/// from the original search space. Keys that were not found are accumulated +/// in a separate key space accumulator. +#[derive(Debug)] pub struct RangeSearchResult { - pub found: BTreeMap, + pub found: HashMap, pub not_found: KeySpaceAccum, } impl RangeSearchResult { fn new() -> Self { Self { - found: BTreeMap::new(), + found: HashMap::new(), not_found: KeySpaceAccum::new(), } } @@ -314,7 +298,7 @@ where Some(search_result) => self .result .found - .entry(OrderedSearchResult(search_result)) + .entry(search_result) .or_default() .add_range(covered_range), None => self.pad_range(covered_range), @@ -362,6 +346,35 @@ where } } +#[derive(PartialEq, Eq, Hash, Debug, Clone)] +pub enum InMemoryLayerHandle { + Open { + lsn_floor: Lsn, + end_lsn: Lsn, + }, + Frozen { + idx: usize, + lsn_floor: Lsn, + end_lsn: Lsn, + }, +} + +impl InMemoryLayerHandle { + pub fn get_lsn_floor(&self) -> Lsn { + match self { + InMemoryLayerHandle::Open { lsn_floor, .. } => *lsn_floor, + InMemoryLayerHandle::Frozen { lsn_floor, .. } => *lsn_floor, + } + } + + pub fn get_end_lsn(&self) -> Lsn { + match self { + InMemoryLayerHandle::Open { end_lsn, .. } => *end_lsn, + InMemoryLayerHandle::Frozen { end_lsn, .. } => *end_lsn, + } + } +} + impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given @@ -556,6 +569,43 @@ impl LayerMap { self.historic.iter() } + /// Get a handle for the first in memory layer that matches the provided predicate. + /// The handle should be used with [`Self::get_in_memory_layer`] to retrieve the actual layer. + /// + /// Note: [`Self::find_in_memory_layer`] and [`Self::get_in_memory_layer`] should be called during + /// the same exclusive region established by holding the layer manager lock. + pub fn find_in_memory_layer(&self, mut pred: Pred) -> Option + where + Pred: FnMut(&Arc) -> bool, + { + if let Some(open) = &self.open_layer { + if pred(open) { + return Some(InMemoryLayerHandle::Open { + lsn_floor: open.get_lsn_range().start, + end_lsn: open.get_lsn_range().end, + }); + } + } + + let pos = self.frozen_layers.iter().rev().position(pred); + pos.map(|rev_idx| { + let idx = self.frozen_layers.len() - 1 - rev_idx; + InMemoryLayerHandle::Frozen { + idx, + lsn_floor: self.frozen_layers[idx].get_lsn_range().start, + end_lsn: self.frozen_layers[idx].get_lsn_range().end, + } + }) + } + + /// Get the layer pointed to by the provided handle. + pub fn get_in_memory_layer(&self, handle: &InMemoryLayerHandle) -> Option> { + match handle { + InMemoryLayerHandle::Open { .. } => self.open_layer.clone(), + InMemoryLayerHandle::Frozen { idx, .. } => self.frozen_layers.get(*idx).cloned(), + } + } + /// /// Divide the whole given range of keys into sub-ranges based on the latest /// image layer that covers each range at the specified lsn (inclusive). @@ -869,6 +919,8 @@ impl LayerMap { #[cfg(test)] mod tests { + use pageserver_api::keyspace::KeySpace; + use super::*; #[derive(Clone)] @@ -895,15 +947,15 @@ mod tests { fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) { assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace()); - let lhs: Vec<_> = lhs + let lhs: HashMap = lhs .found .into_iter() - .map(|(search_result, accum)| (search_result.0, accum.to_keyspace())) + .map(|(search_result, accum)| (search_result, accum.to_keyspace())) .collect(); - let rhs: Vec<_> = rhs + let rhs: HashMap = rhs .found .into_iter() - .map(|(search_result, accum)| (search_result.0, accum.to_keyspace())) + .map(|(search_result, accum)| (search_result, accum.to_keyspace())) .collect(); assert_eq!(lhs, rhs); @@ -923,7 +975,7 @@ mod tests { Some(res) => { range_search_result .found - .entry(OrderedSearchResult(res)) + .entry(res) .or_default() .add_key(key); } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2d92baccbe..73c018db31 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,15 +8,21 @@ pub(crate) mod layer; mod layer_desc; use crate::context::{AccessStatsBehavior, RequestContext}; +use crate::repository::Value; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; use once_cell::sync::Lazy; +use pageserver_api::key::Key; +use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::models::{ LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; +use std::cmp::{Ordering, Reverse}; +use std::collections::hash_map::Entry; +use std::collections::{BinaryHeap, HashMap}; use std::ops::Range; use std::sync::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -34,6 +40,11 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub(crate) use layer::{EvictionError, Layer, ResidentLayer}; +use super::layer_map::InMemoryLayerHandle; +use super::timeline::layer_manager::LayerManager; +use super::timeline::GetVectoredError; +use super::PageReconstructError; + pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -67,6 +78,277 @@ pub struct ValueReconstructState { pub img: Option<(Lsn, Bytes)>, } +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub(crate) enum ValueReconstructSituation { + Complete, + #[default] + Continue, +} + +/// Reconstruct data accumulated for a single key during a vectored get +#[derive(Debug, Default, Clone)] +pub(crate) struct VectoredValueReconstructState { + pub(crate) records: Vec<(Lsn, NeonWalRecord)>, + pub(crate) img: Option<(Lsn, Bytes)>, + + situation: ValueReconstructSituation, +} + +impl VectoredValueReconstructState { + fn get_cached_lsn(&self) -> Option { + self.img.as_ref().map(|img| img.0) + } +} + +impl From for ValueReconstructState { + fn from(mut state: VectoredValueReconstructState) -> Self { + // walredo expects the records to be descending in terms of Lsn + state.records.sort_by_key(|(lsn, _)| Reverse(*lsn)); + + ValueReconstructState { + records: state.records, + img: state.img, + } + } +} + +/// Bag of data accumulated during a vectored get +pub(crate) struct ValuesReconstructState { + pub(crate) keys: HashMap>, + + keys_done: KeySpaceRandomAccum, +} + +impl ValuesReconstructState { + pub(crate) fn new() -> Self { + Self { + keys: HashMap::new(), + keys_done: KeySpaceRandomAccum::new(), + } + } + + /// Associate a key with the error which it encountered and mark it as done + pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) { + let previous = self.keys.insert(key, Err(err)); + if let Some(Ok(state)) = previous { + if state.situation == ValueReconstructSituation::Continue { + self.keys_done.add_key(key); + } + } + } + + /// Update the state collected for a given key. + /// Returns true if this was the last value needed for the key and false otherwise. + /// + /// If the key is done after the update, mark it as such. + pub(crate) fn update_key( + &mut self, + key: &Key, + lsn: Lsn, + value: Value, + ) -> ValueReconstructSituation { + let state = self + .keys + .entry(*key) + .or_insert(Ok(VectoredValueReconstructState::default())); + + if let Ok(state) = state { + let key_done = match state.situation { + ValueReconstructSituation::Complete => unreachable!(), + ValueReconstructSituation::Continue => match value { + Value::Image(img) => { + state.img = Some((lsn, img)); + true + } + Value::WalRecord(rec) => { + let reached_cache = + state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn); + let will_init = rec.will_init(); + state.records.push((lsn, rec)); + will_init || reached_cache + } + }, + }; + + if key_done && state.situation == ValueReconstructSituation::Continue { + state.situation = ValueReconstructSituation::Complete; + self.keys_done.add_key(*key); + } + + state.situation + } else { + ValueReconstructSituation::Complete + } + } + + /// Returns the Lsn at which this key is cached if one exists. + /// The read path should go no further than this Lsn for the given key. + pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option { + self.keys + .get(key) + .and_then(|k| k.as_ref().ok()) + .and_then(|state| state.get_cached_lsn()) + } + + /// Returns the key space describing the keys that have + /// been marked as completed since the last call to this function. + pub(crate) fn consume_done_keys(&mut self) -> KeySpace { + self.keys_done.consume_keyspace() + } +} + +impl Default for ValuesReconstructState { + fn default() -> Self { + Self::new() + } +} + +/// Description of layer to be read - the layer map can turn +/// this description into the actual layer. +#[derive(PartialEq, Eq, Hash, Debug, Clone)] +pub(crate) enum ReadableLayerDesc { + Persistent { + desc: PersistentLayerDesc, + lsn_floor: Lsn, + lsn_ceil: Lsn, + }, + InMemory { + handle: InMemoryLayerHandle, + lsn_ceil: Lsn, + }, +} + +/// Wraper for 'ReadableLayerDesc' sorted by Lsn +#[derive(Debug)] +struct ReadableLayerDescOrdered(ReadableLayerDesc); + +/// Data structure which maintains a fringe of layers for the +/// read path. The fringe is the set of layers which intersects +/// the current keyspace that the search is descending on. +/// Each layer tracks the keyspace that intersects it. +/// +/// The fringe must appear sorted by Lsn. Hence, it uses +/// a two layer indexing scheme. +#[derive(Debug)] +pub(crate) struct LayerFringe { + layers_by_lsn: BinaryHeap, + layers: HashMap, +} + +impl LayerFringe { + pub(crate) fn new() -> Self { + LayerFringe { + layers_by_lsn: BinaryHeap::new(), + layers: HashMap::new(), + } + } + + pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayerDesc, KeySpace)> { + let handle = match self.layers_by_lsn.pop() { + Some(h) => h, + None => return None, + }; + + let removed = self.layers.remove_entry(&handle.0); + match removed { + Some((layer, keyspace)) => Some((layer, keyspace)), + None => unreachable!("fringe internals are always consistent"), + } + } + + pub(crate) fn update(&mut self, layer: ReadableLayerDesc, keyspace: KeySpace) { + let entry = self.layers.entry(layer.clone()); + match entry { + Entry::Occupied(mut entry) => { + entry.get_mut().merge(&keyspace); + } + Entry::Vacant(entry) => { + self.layers_by_lsn + .push(ReadableLayerDescOrdered(entry.key().clone())); + entry.insert(keyspace); + } + } + } +} + +impl Default for LayerFringe { + fn default() -> Self { + Self::new() + } +} + +impl Ord for ReadableLayerDescOrdered { + fn cmp(&self, other: &Self) -> Ordering { + let ord = self.0.get_lsn_ceil().cmp(&other.0.get_lsn_ceil()); + if ord == std::cmp::Ordering::Equal { + self.0 + .get_lsn_floor() + .cmp(&other.0.get_lsn_floor()) + .reverse() + } else { + ord + } + } +} + +impl PartialOrd for ReadableLayerDescOrdered { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for ReadableLayerDescOrdered { + fn eq(&self, other: &Self) -> bool { + self.0.get_lsn_floor() == other.0.get_lsn_floor() + && self.0.get_lsn_ceil() == other.0.get_lsn_ceil() + } +} + +impl Eq for ReadableLayerDescOrdered {} + +impl ReadableLayerDesc { + pub(crate) fn get_lsn_floor(&self) -> Lsn { + match self { + ReadableLayerDesc::Persistent { lsn_floor, .. } => *lsn_floor, + ReadableLayerDesc::InMemory { handle, .. } => handle.get_lsn_floor(), + } + } + + pub(crate) fn get_lsn_ceil(&self) -> Lsn { + match self { + ReadableLayerDesc::Persistent { lsn_ceil, .. } => *lsn_ceil, + ReadableLayerDesc::InMemory { lsn_ceil, .. } => *lsn_ceil, + } + } + + pub(crate) async fn get_values_reconstruct_data( + &self, + layer_manager: &LayerManager, + keyspace: KeySpace, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + match self { + ReadableLayerDesc::Persistent { desc, lsn_ceil, .. } => { + let layer = layer_manager.get_from_desc(desc); + layer + .get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx) + .await + } + ReadableLayerDesc::InMemory { handle, lsn_ceil } => { + let layer = layer_manager + .layer_map() + .get_in_memory_layer(handle) + .unwrap(); + + layer + .get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx) + .await + } + } + } +} + /// Return value from [`Layer::get_value_reconstruct_data`] #[derive(Clone, Copy, Debug)] pub enum ValueReconstructResult { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 9a7bcbcebe..19eebf5531 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -35,16 +35,19 @@ use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; -use crate::tenant::Timeline; +use crate::tenant::timeline::GetVectoredError; +use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; @@ -59,7 +62,10 @@ use utils::{ lsn::Lsn, }; -use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer}; +use super::{ + AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation, + ValuesReconstructState, +}; /// /// Header stored in the beginning of the file @@ -818,6 +824,133 @@ impl DeltaLayerInner { } } + // Look up the keys in the provided keyspace and update + // the reconstruct state with whatever is found. + // + // If the key is cached, go no further than the cached Lsn. + // + // Currently, the index is visited for each range, but this + // can be further optimised to visit the index only once. + pub(super) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let file = &self.file; + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + self.index_start_blk, + self.index_root_blk, + file, + ); + + let mut offsets: BTreeMap> = BTreeMap::new(); + + for range in keyspace.ranges.iter() { + let mut ignore_key = None; + + // Scan the page versions backwards, starting from the last key in the range. + // to collect all the offsets at which need to be read. + let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1)); + tree_reader + .visit( + &end_key.0, + VisitDirection::Backwards, + |raw_key, value| { + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key); + + if entry_lsn >= end_lsn { + return true; + } + + if key < range.start { + return false; + } + + if key >= range.end { + return true; + } + + if Some(key) == ignore_key { + return true; + } + + if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) { + if entry_lsn <= cached_lsn { + return key != range.start; + } + } + + let blob_ref = BlobRef(value); + let lsns_at = offsets.entry(key).or_default(); + lsns_at.push((entry_lsn, blob_ref.pos())); + + if blob_ref.will_init() { + if key == range.start { + return false; + } else { + ignore_key = Some(key); + return true; + } + } + + true + }, + &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerBtreeNode) + .build(), + ) + .await + .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; + } + + let ctx = &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerValue) + .build(); + + let cursor = file.block_cursor(); + let mut buf = Vec::new(); + for (key, lsns_at) in offsets { + for (lsn, block_offset) in lsns_at { + let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await; + + if let Err(e) = res { + reconstruct_state.on_key_error( + key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to read blob from virtual file {}", + file.file.path + ))), + ); + + break; + } + + let value = Value::des(&buf); + if let Err(e) = value { + reconstruct_state.on_key_error( + key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to deserialize file blob from virtual file {}", + file.file.path + ))), + ); + + break; + } + + let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + break; + } + } + } + + Ok(()) + } + pub(super) async fn load_keys<'a>( &'a self, ctx: &RequestContext, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 458131b572..b867cb0333 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -26,20 +26,22 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::PAGE_SZ; -use crate::repository::{Key, KEY_SIZE}; +use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, }; -use crate::tenant::Timeline; +use crate::tenant::timeline::GetVectoredError; +use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; @@ -59,7 +61,7 @@ use utils::{ }; use super::filename::ImageFileName; -use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer}; +use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState}; /// /// Header stored in the beginning of the file @@ -438,6 +440,74 @@ impl ImageLayerInner { Ok(ValueReconstructResult::Missing) } } + + // Look up the keys in the provided keyspace and update + // the reconstruct state with whatever is found. + pub(super) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let file = &self.file; + let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file); + + let mut offsets = Vec::new(); + + for range in keyspace.ranges.iter() { + let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; + range.start.write_to_byte_slice(&mut search_key); + + tree_reader + .visit( + &search_key, + VisitDirection::Forwards, + |raw_key, value| { + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + assert!(key >= range.start); + + if !range.contains(&key) { + return false; + } + + offsets.push((key, value)); + + true + }, + &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerBtreeNode) + .build(), + ) + .await + .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; + } + + let ctx = &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerValue) + .build(); + + let cursor = file.block_cursor(); + let mut buf = Vec::new(); + for (key, offset) in offsets { + let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await; + if let Err(e) = res { + reconstruct_state.on_key_error( + key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to read blob from virtual file {}", + file.file.path + ))), + ); + + continue; + } + + let blob = Bytes::copy_from_slice(buf.as_slice()); + reconstruct_state.update_key(&key, self.lsn, Value::Image(blob)); + } + + Ok(()) + } } /// A builder object for constructing a new image layer. diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 4b06a787ce..5f1db21d49 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -9,13 +9,15 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::repository::{Key, Value}; use crate::tenant::block_io::BlockReader; use crate::tenant::ephemeral_file::EphemeralFile; -use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState}; -use crate::tenant::Timeline; +use crate::tenant::storage_layer::ValueReconstructResult; +use crate::tenant::timeline::GetVectoredError; +use crate::tenant::{PageReconstructError, Timeline}; use crate::walrecord; -use anyhow::{ensure, Result}; +use anyhow::{anyhow, ensure, Result}; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; -use std::collections::HashMap; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::{Arc, OnceLock}; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; @@ -25,7 +27,10 @@ use std::fmt::Write as _; use std::ops::Range; use tokio::sync::{RwLock, RwLockWriteGuard}; -use super::{DeltaLayerWriter, ResidentLayer}; +use super::{ + DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState, + ValuesReconstructState, +}; pub struct InMemoryLayer { conf: &'static PageServerConf, @@ -202,6 +207,91 @@ impl InMemoryLayer { Ok(ValueReconstructResult::Complete) } } + + // Look up the keys in the provided keyspace and update + // the reconstruct state with whatever is found. + // + // If the key is cached, go no further than the cached Lsn. + pub(crate) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::InMemoryLayer) + .build(); + + let inner = self.inner.read().await; + let reader = inner.file.block_cursor(); + + #[derive(Eq, PartialEq, Ord, PartialOrd)] + struct BlockRead { + key: Key, + lsn: Lsn, + block_offset: u64, + } + + let mut planned_block_reads = BinaryHeap::new(); + + for range in keyspace.ranges.iter() { + let mut key = range.start; + while key < range.end { + if let Some(vec_map) = inner.index.get(&key) { + let lsn_range = match reconstruct_state.get_cached_lsn(&key) { + Some(cached_lsn) => (cached_lsn + 1)..end_lsn, + None => self.start_lsn..end_lsn, + }; + + let slice = vec_map.slice_range(lsn_range); + for (entry_lsn, pos) in slice.iter().rev() { + planned_block_reads.push(BlockRead { + key, + lsn: *entry_lsn, + block_offset: *pos, + }); + } + } + + key = key.next(); + } + } + + let keyspace_size = keyspace.total_size(); + + let mut completed_keys = HashSet::new(); + while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() { + let block_read = planned_block_reads.pop().unwrap(); + if completed_keys.contains(&block_read.key) { + continue; + } + + let buf = reader.read_blob(block_read.block_offset, &ctx).await; + if let Err(e) = buf { + reconstruct_state + .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e))); + completed_keys.insert(block_read.key); + continue; + } + + let value = Value::des(&buf.unwrap()); + if let Err(e) = value { + reconstruct_state + .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e))); + completed_keys.insert(block_read.key); + continue; + } + + let key_situation = + reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + completed_keys.insert(block_read.key); + } + } + + Ok(()) + } } impl std::fmt::Display for InMemoryLayer { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index bfcc031863..cc5b7ade6a 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1,5 +1,6 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::{ HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus, }; @@ -16,13 +17,14 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; +use crate::tenant::timeline::GetVectoredError; use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline}; use super::delta_layer::{self, DeltaEntry}; use super::image_layer; use super::{ AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc, - ValueReconstructResult, ValueReconstructState, + ValueReconstructResult, ValueReconstructState, ValuesReconstructState, }; use utils::generation::Generation; @@ -262,6 +264,29 @@ impl Layer { .with_context(|| format!("get_value_reconstruct_data for layer {self}")) } + pub(crate) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_data: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let layer = self + .0 + .get_or_maybe_download(true, Some(ctx)) + .await + .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?; + + self.0 + .access_stats + .record_access(LayerAccessKind::GetValueReconstructData, ctx); + + layer + .get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx) + .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)) + .await + } + /// Download the layer if evicted. /// /// Will not error when the layer is already downloaded. @@ -1177,7 +1202,7 @@ pub(crate) enum EvictionError { /// Error internal to the [`LayerInner::get_or_maybe_download`] #[derive(Debug, thiserror::Error)] -enum DownloadError { +pub(crate) enum DownloadError { #[error("timeline has already shutdown")] TimelineShutdown, #[error("no remote storage configured")] @@ -1337,6 +1362,28 @@ impl DownloadedLayer { } } + async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_data: &mut ValuesReconstructState, + owner: &Arc, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + use LayerKind::*; + + match self.get(owner, ctx).await.map_err(GetVectoredError::from)? { + Delta(d) => { + d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx) + .await + } + Image(i) => { + i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx) + .await + } + } + } + async fn dump(&self, owner: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { use LayerKind::*; match self.get(owner, ctx).await? { diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index fa78e9fdb2..c375923e81 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -15,7 +15,7 @@ use utils::id::TenantId; /// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the /// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides /// a unified way to generate layer information like file name. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)] pub struct PersistentLayerDesc { pub tenant_shard_id: TenantShardId, pub timeline_id: TimelineId, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 92e5b52c75..0f22284c55 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -16,7 +16,7 @@ use futures::stream::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; use pageserver_api::{ - keyspace::{key_range_size, KeySpaceAccum}, + keyspace::KeySpaceAccum, models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState, @@ -67,7 +67,7 @@ use crate::{ tenant::storage_layer::{ AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, - ValueReconstructState, + ValueReconstructState, ValuesReconstructState, }, }; use crate::{ @@ -111,11 +111,11 @@ use self::layer_manager::LayerManager; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; -use super::config::TenantConf; -use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline}; +use super::{config::TenantConf, storage_layer::ReadableLayerDesc}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; +use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { @@ -472,6 +472,15 @@ pub(crate) enum GetVectoredError { #[error("Requested at invalid LSN: {0}")] InvalidLsn(Lsn), + + #[error("Requested key {0} not found")] + MissingKey(Key), + + #[error(transparent)] + GetReadyAncestorError(GetReadyAncestorError), + + #[error(transparent)] + Other(#[from] anyhow::Error), } #[derive(thiserror::Error, Debug)] @@ -579,6 +588,23 @@ impl From for PageReconstructError { } } +#[derive( + Eq, + PartialEq, + Debug, + Copy, + Clone, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, +)] +#[strum(serialize_all = "kebab-case")] +pub enum GetVectoredImpl { + Sequential, + Vectored, +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -708,7 +734,7 @@ impl Timeline { /// which actually vectorizes the read path. pub(crate) async fn get_vectored( &self, - key_ranges: &[Range], + keyspace: KeySpace, lsn: Lsn, ctx: &RequestContext, ) -> Result>, GetVectoredError> { @@ -716,10 +742,7 @@ impl Timeline { return Err(GetVectoredError::InvalidLsn(lsn)); } - let key_count = key_ranges - .iter() - .map(|range| key_range_size(range) as u64) - .sum(); + let key_count = keyspace.total_size().try_into().unwrap(); if key_count > Timeline::MAX_GET_VECTORED_KEYS { return Err(GetVectoredError::Oversized(key_count)); } @@ -728,33 +751,163 @@ impl Timeline { .throttle(ctx, key_count as usize) .await; - let _timer = crate::metrics::GET_VECTORED_LATENCY - .for_task_kind(ctx.task_kind()) - .map(|t| t.start_timer()); - - let mut values = BTreeMap::new(); - for range in key_ranges { + for range in &keyspace.ranges { let mut key = range.start; while key != range.end { assert!(!self.shard_identity.is_key_disposable(&key)); - - let block = self.get(key, lsn, ctx).await; - - if matches!( - block, - Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_)) - ) { - return Err(GetVectoredError::Cancelled); - } - - values.insert(key, block); key = key.next(); } } + trace!( + "get vectored request for {:?}@{} from task kind {:?} will use {} implementation", + keyspace, + lsn, + ctx.task_kind(), + self.conf.get_vectored_impl + ); + + let _timer = crate::metrics::GET_VECTORED_LATENCY + .for_task_kind(ctx.task_kind()) + .map(|t| t.start_timer()); + + match self.conf.get_vectored_impl { + GetVectoredImpl::Sequential => { + self.get_vectored_sequential_impl(keyspace, lsn, ctx).await + } + GetVectoredImpl::Vectored => { + let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await; + + self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx) + .await; + + vectored_res + } + } + } + + pub(super) async fn get_vectored_sequential_impl( + &self, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + let mut values = BTreeMap::new(); + for range in keyspace.ranges { + let mut key = range.start; + while key != range.end { + let block = self.get(key, lsn, ctx).await; + + use PageReconstructError::*; + match block { + Err(Cancelled | AncestorStopping(_)) => { + return Err(GetVectoredError::Cancelled) + } + Err(Other(err)) if err.to_string().contains("could not find data for key") => { + return Err(GetVectoredError::MissingKey(key)) + } + _ => { + values.insert(key, block); + key = key.next(); + } + } + } + } + Ok(values) } + pub(super) async fn get_vectored_impl( + &self, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + let mut reconstruct_state = ValuesReconstructState::new(); + + self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx) + .await?; + + let mut results: BTreeMap> = BTreeMap::new(); + for (key, res) in reconstruct_state.keys { + match res { + Err(err) => { + results.insert(key, Err(err)); + } + Ok(state) => { + let state = ValueReconstructState::from(state); + + let reconstruct_res = self.reconstruct_value(key, lsn, state).await; + results.insert(key, reconstruct_res); + } + } + } + + Ok(results) + } + + pub(super) async fn validate_get_vectored_impl( + &self, + vectored_res: &Result>, GetVectoredError>, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) { + let sequential_res = self + .get_vectored_sequential_impl(keyspace.clone(), lsn, ctx) + .await; + + fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool { + use GetVectoredError::*; + match (lhs, rhs) { + (Cancelled, Cancelled) => true, + (_, Cancelled) => true, + (Oversized(l), Oversized(r)) => l == r, + (InvalidLsn(l), InvalidLsn(r)) => l == r, + (MissingKey(l), MissingKey(r)) => l == r, + (GetReadyAncestorError(_), GetReadyAncestorError(_)) => true, + (Other(_), Other(_)) => true, + _ => false, + } + } + + match (&sequential_res, vectored_res) { + (Err(seq_err), Ok(_)) => { + panic!(concat!("Sequential get failed with {}, but vectored get did not", + " - keyspace={:?} lsn={}"), + seq_err, keyspace, lsn) }, + (Ok(_), Err(vec_err)) => { + panic!(concat!("Vectored get failed with {}, but sequential get did not", + " - keyspace={:?} lsn={}"), + vec_err, keyspace, lsn) }, + (Err(seq_err), Err(vec_err)) => { + assert!(errors_match(seq_err, vec_err), + "Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")}, + (Ok(seq_values), Ok(vec_values)) => { + seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| { + assert_eq!(seq_key, vec_key); + match (seq_res, vec_res) { + (Ok(seq_blob), Ok(vec_blob)) => { + assert_eq!(seq_blob, vec_blob, + "Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}"); + }, + (Err(err), Ok(_)) => { + panic!( + concat!("Sequential get failed with {} for key {}, but vectored get did not", + " - keyspace={:?} lsn={}"), + err, seq_key, keyspace, lsn) }, + (Ok(_), Err(err)) => { + panic!( + concat!("Vectored get failed with {} for key {}, but sequential get did not", + " - keyspace={:?} lsn={}"), + err, seq_key, keyspace, lsn) }, + (Err(_), Err(_)) => {} + } + }) + } + } + } + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. pub(crate) fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last @@ -2547,6 +2700,170 @@ impl Timeline { } } + /// Get the data needed to reconstruct all keys in the provided keyspace + /// + /// The algorithm is as follows: + /// 1. While some keys are still not done and there's a timeline to visit: + /// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]: + /// 2.1: Build the fringe for the current keyspace + /// 2.2 Visit the newest layer from the fringe to collect all values for the range it + /// intersects + /// 2.3. Pop the timeline from the fringe + /// 2.4. If the fringe is empty, go back to 1 + async fn get_vectored_reconstruct_data( + &self, + mut keyspace: KeySpace, + request_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let mut timeline_owned: Arc; + let mut timeline = self; + + let mut cont_lsn = Lsn(request_lsn.0 + 1); + + loop { + if self.cancel.is_cancelled() { + return Err(GetVectoredError::Cancelled); + } + + let completed = Self::get_vectored_reconstruct_data_timeline( + timeline, + keyspace.clone(), + cont_lsn, + reconstruct_state, + &self.cancel, + ctx, + ) + .await?; + + keyspace.remove_overlapping_with(&completed); + if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() { + break; + } + + cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1); + timeline_owned = timeline + .get_ready_ancestor_timeline(ctx) + .await + .map_err(GetVectoredError::GetReadyAncestorError)?; + timeline = &*timeline_owned; + } + + if keyspace.total_size() != 0 { + return Err(GetVectoredError::MissingKey(keyspace.start().unwrap())); + } + + Ok(()) + } + + /// Collect the reconstruct data for a ketspace from the specified timeline. + /// + /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect + /// the current keyspace. The current keyspace of the search at any given timeline + /// is the original keyspace minus all the keys that have been completed minus + /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly, + /// but if you merge all the keyspaces in the fringe, you get the "current keyspace". + /// + /// This is basically a depth-first search visitor implementation where a vertex + /// is the (layer, lsn range, key space) tuple. The fringe acts as the stack. + /// + /// At each iteration pop the top of the fringe (the layer with the highest Lsn) + /// and get all the required reconstruct data from the layer in one go. + async fn get_vectored_reconstruct_data_timeline( + timeline: &Timeline, + keyspace: KeySpace, + mut cont_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> Result { + let mut unmapped_keyspace = keyspace.clone(); + let mut fringe = LayerFringe::new(); + + let mut completed_keyspace = KeySpace::default(); + + // Hold the layer map whilst visiting the timeline to prevent + // compaction, eviction and flushes from rendering the layers unreadable. + // + // TODO: Do we actually need to do this? In theory holding on + // to [`tenant::storage_layer::Layer`] should be enough. However, + // [`Timeline::get`] also holds the lock during IO, so more investigation + // is needed. + let guard = timeline.layers.read().await; + let layers = guard.layer_map(); + + 'outer: loop { + if cancel.is_cancelled() { + return Err(GetVectoredError::Cancelled); + } + + let keys_done_last_step = reconstruct_state.consume_done_keys(); + unmapped_keyspace.remove_overlapping_with(&keys_done_last_step); + completed_keyspace.merge(&keys_done_last_step); + + let in_memory_layer = layers.find_in_memory_layer(|l| { + let start_lsn = l.get_lsn_range().start; + cont_lsn > start_lsn + }); + + match in_memory_layer { + Some(l) => { + fringe.update( + ReadableLayerDesc::InMemory { + handle: l, + lsn_ceil: cont_lsn, + }, + unmapped_keyspace.clone(), + ); + } + None => { + for range in unmapped_keyspace.ranges.iter() { + let results = match layers.range_search(range.clone(), cont_lsn) { + Some(res) => res, + None => { + break 'outer; + } + }; + + results + .found + .into_iter() + .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { + ( + ReadableLayerDesc::Persistent { + desc: (*layer).clone(), + lsn_floor, + lsn_ceil: cont_lsn, + }, + keyspace_accum.to_keyspace(), + ) + }) + .for_each(|(layer, keyspace)| fringe.update(layer, keyspace)); + } + } + } + + if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() { + layer_to_read + .get_values_reconstruct_data( + &guard, + keyspace_to_read.clone(), + reconstruct_state, + ctx, + ) + .await?; + + unmapped_keyspace = keyspace_to_read; + cont_lsn = layer_to_read.get_lsn_floor(); + } else { + break; + } + } + + Ok(completed_keyspace) + } + /// # Cancel-safety /// /// This method is cancellation-safe. @@ -3263,7 +3580,7 @@ impl Timeline { || last_key_in_range { let results = self - .get_vectored(&key_request_accum.consume_keyspace().ranges, lsn, ctx) + .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) .await?; for (img_key, img) in results { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 51b126b84b..ce5ef66d22 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -488,6 +488,11 @@ class NeonEnvBuilder: self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine + self.pageserver_get_vectored_impl: Optional[str] = None + if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored": + self.pageserver_get_vectored_impl = "vectored" + log.debug('Overriding pageserver get_vectored_impl config to "vectored"') + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1055,6 +1060,8 @@ class NeonEnv: } if self.pageserver_virtual_file_io_engine is not None: ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine + if config.pageserver_get_vectored_impl is not None: + ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl # Create a corresponding NeonPageserver object self.pageservers.append( diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 465101f64f..0ea76d447e 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -226,6 +226,10 @@ def test_forward_compatibility( ) try: + # TODO: remove this once the previous pageserrver version understands + # the 'get_vectored_impl' config + neon_env_builder.pageserver_get_vectored_impl = None + neon_env_builder.num_safekeepers = 3 neon_local_binpath = neon_env_builder.neon_binpath env = neon_env_builder.from_repo_dir(