From 5d6083bfc61701877be2ae8b9d9d726a4d0e773b Mon Sep 17 00:00:00 2001 From: Vlad Lazar Date: Wed, 21 Feb 2024 09:49:46 +0000 Subject: [PATCH] pageserver: add vectored get implementation (#6576) This PR introduces a new vectored implementation of the read path. The search is basically a DFS if you squint at it long enough. LayerFringe tracks the next layers to visit and acts as our stack. Vertices are tuples of (layer, keyspace, lsn range). Continuously pop the top of the stack (most recent layer) and do all the reads for one layer at once. The search maintains a fringe (`LayerFringe`) which tracks all the layers that intersect the current keyspace being searched. Continuously pop the top of the fringe (layer with highest LSN) and get all the data required from the layer in one go. Said search is done on one timeline at a time. If data is still required for some keys, then search the ancestor timeline. Apart from the high level layer traversal, vectored variants have been introduced for grabbing data from each layer type. They still suffer from read amplification issues and that will be addressed in a different PR. You might notice that in some places we duplicate the code for the existing read path. All of that code will be removed when we switch the non-vectored read path to proxy into the vectored read path. In the meantime, we'll have to contend with the extra cruft for the sake of testing and gentle releasing. --- .github/workflows/build_and_test.yml | 1 + Cargo.lock | 1 + libs/pageserver_api/Cargo.toml | 1 + libs/pageserver_api/src/keyspace.rs | 52 ++- pageserver/src/basebackup.rs | 5 +- pageserver/src/config.rs | 24 ++ pageserver/src/tenant.rs | 175 +++++++-- pageserver/src/tenant/layer_map.rs | 118 ++++-- pageserver/src/tenant/storage_layer.rs | 282 +++++++++++++ .../src/tenant/storage_layer/delta_layer.rs | 139 ++++++- .../src/tenant/storage_layer/image_layer.rs | 78 +++- .../tenant/storage_layer/inmemory_layer.rs | 100 ++++- pageserver/src/tenant/storage_layer/layer.rs | 51 ++- .../src/tenant/storage_layer/layer_desc.rs | 2 +- pageserver/src/tenant/timeline.rs | 371 ++++++++++++++++-- test_runner/fixtures/neon_fixtures.py | 7 + test_runner/regress/test_compatibility.py | 4 + 17 files changed, 1284 insertions(+), 127 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 2a1c79e437..1744616888 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -472,6 +472,7 @@ jobs: CHECK_ONDISK_DATA_COMPATIBILITY: nonempty BUILD_TAG: ${{ needs.tag.outputs.build-tag }} PAGESERVER_VIRTUAL_FILE_IO_ENGINE: std-fs + PAGESERVER_GET_VECTORED_IMPL: vectored # Temporary disable this step until we figure out why it's so flaky # Ref https://github.com/neondatabase/neon/issues/4540 diff --git a/Cargo.lock b/Cargo.lock index f25e3d1574..ac8cceb5f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3552,6 +3552,7 @@ dependencies = [ "enum-map", "hex", "humantime-serde", + "itertools", "postgres_ffi", "rand 0.8.5", "serde", diff --git a/libs/pageserver_api/Cargo.toml b/libs/pageserver_api/Cargo.toml index 902af21965..938910caea 100644 --- a/libs/pageserver_api/Cargo.toml +++ b/libs/pageserver_api/Cargo.toml @@ -21,6 +21,7 @@ hex.workspace = true thiserror.workspace = true humantime-serde.workspace = true chrono.workspace = true +itertools.workspace = true workspace_hack.workspace = true diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 396c801606..443ffdcf03 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -2,6 +2,7 @@ use postgres_ffi::BLCKSZ; use std::ops::Range; use crate::key::Key; +use itertools::Itertools; /// /// Represents a set of Keys, in a compact form. @@ -63,9 +64,36 @@ impl KeySpace { KeyPartitioning { parts } } - /// Update the keyspace such that it doesn't contain any range - /// that is overlapping with `other`. This can involve splitting or - /// removing of existing ranges. + /// Merge another keyspace into the current one. + /// Note: the keyspaces must not ovelap (enforced via assertions) + pub fn merge(&mut self, other: &KeySpace) { + let all_ranges = self + .ranges + .iter() + .merge_by(other.ranges.iter(), |lhs, rhs| lhs.start < rhs.start); + + let mut accum = KeySpaceAccum::new(); + let mut prev: Option<&Range> = None; + for range in all_ranges { + if let Some(prev) = prev { + let overlap = + std::cmp::max(range.start, prev.start) < std::cmp::min(range.end, prev.end); + assert!( + !overlap, + "Attempt to merge ovelapping keyspaces: {:?} overlaps {:?}", + prev, range + ); + } + + accum.add_range(range.clone()); + prev = Some(range); + } + + self.ranges = accum.to_keyspace().ranges; + } + + /// Remove all keys in `other` from `self`. + /// This can involve splitting or removing of existing ranges. pub fn remove_overlapping_with(&mut self, other: &KeySpace) { let (self_start, self_end) = match (self.start(), self.end()) { (Some(start), Some(end)) => (start, end), @@ -220,16 +248,7 @@ impl KeySpaceAccum { } pub fn consume_keyspace(&mut self) -> KeySpace { - if let Some(accum) = self.accum.take() { - self.ranges.push(accum); - } - - let mut prev_accum = KeySpaceAccum::new(); - std::mem::swap(self, &mut prev_accum); - - KeySpace { - ranges: prev_accum.ranges, - } + std::mem::take(self).to_keyspace() } pub fn size(&self) -> u64 { @@ -279,6 +298,13 @@ impl KeySpaceRandomAccum { } KeySpace { ranges } } + + pub fn consume_keyspace(&mut self) -> KeySpace { + let mut prev_accum = KeySpaceRandomAccum::new(); + std::mem::swap(self, &mut prev_accum); + + prev_accum.to_keyspace() + } } pub fn key_range_size(key_range: &Range) -> u32 { diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 7edfab75d4..c862816b80 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -261,10 +261,7 @@ where let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar); for part in slru_partitions.parts { - let blocks = self - .timeline - .get_vectored(&part.ranges, self.lsn, self.ctx) - .await?; + let blocks = self.timeline.get_vectored(part, self.lsn, self.ctx).await?; for (key, block) in blocks { slru_builder.add_block(&key, block?).await?; diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 6d71ff1dd4..6c00c55f39 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -33,6 +33,7 @@ use utils::{ use crate::disk_usage_eviction_task::DiskUsageEvictionTaskConfig; use crate::tenant::config::TenantConf; use crate::tenant::config::TenantConfOpt; +use crate::tenant::timeline::GetVectoredImpl; use crate::tenant::{ TENANTS_SEGMENT_NAME, TENANT_DELETED_MARKER_FILE_NAME, TIMELINES_SEGMENT_NAME, }; @@ -84,6 +85,8 @@ pub mod defaults { pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs"; + pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential"; + /// /// Default built-in configuration file. /// @@ -121,6 +124,8 @@ pub mod defaults { #virtual_file_io_engine = '{DEFAULT_VIRTUAL_FILE_IO_ENGINE}' +#get_vectored_impl = '{DEFAULT_GET_VECTORED_IMPL}' + [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -256,6 +261,8 @@ pub struct PageServerConf { pub ingest_batch_size: u64, pub virtual_file_io_engine: virtual_file::IoEngineKind, + + pub get_vectored_impl: GetVectoredImpl, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -342,6 +349,8 @@ struct PageServerConfigBuilder { ingest_batch_size: BuilderValue, virtual_file_io_engine: BuilderValue, + + get_vectored_impl: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -419,6 +428,8 @@ impl Default for PageServerConfigBuilder { ingest_batch_size: Set(DEFAULT_INGEST_BATCH_SIZE), virtual_file_io_engine: Set(DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap()), + + get_vectored_impl: Set(DEFAULT_GET_VECTORED_IMPL.parse().unwrap()), } } } @@ -579,6 +590,10 @@ impl PageServerConfigBuilder { self.virtual_file_io_engine = BuilderValue::Set(value); } + pub fn get_vectored_impl(&mut self, value: GetVectoredImpl) { + self.get_vectored_impl = BuilderValue::Set(value); + } + pub fn build(self) -> anyhow::Result { let concurrent_tenant_warmup = self .concurrent_tenant_warmup @@ -689,6 +704,9 @@ impl PageServerConfigBuilder { virtual_file_io_engine: self .virtual_file_io_engine .ok_or(anyhow!("missing virtual_file_io_engine"))?, + get_vectored_impl: self + .get_vectored_impl + .ok_or(anyhow!("missing get_vectored_impl"))?, }) } } @@ -943,6 +961,9 @@ impl PageServerConf { "virtual_file_io_engine" => { builder.virtual_file_io_engine(parse_toml_from_str("virtual_file_io_engine", item)?) } + "get_vectored_impl" => { + builder.get_vectored_impl(parse_toml_from_str("get_vectored_impl", item)?) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -1017,6 +1038,7 @@ impl PageServerConf { secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), + get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(), } } } @@ -1250,6 +1272,7 @@ background_task_maximum_delay = '334 s' secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: defaults::DEFAULT_INGEST_BATCH_SIZE, virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), + get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(), }, "Correct defaults should be used when no config values are provided" ); @@ -1314,6 +1337,7 @@ background_task_maximum_delay = '334 s' secondary_download_concurrency: defaults::DEFAULT_SECONDARY_DOWNLOAD_CONCURRENCY, ingest_batch_size: 100, virtual_file_io_engine: DEFAULT_VIRTUAL_FILE_IO_ENGINE.parse().unwrap(), + get_vectored_impl: defaults::DEFAULT_GET_VECTORED_IMPL.parse().unwrap(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c646e5cf90..7021921b12 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3877,6 +3877,7 @@ mod tests { use bytes::BytesMut; use hex_literal::hex; use once_cell::sync::Lazy; + use pageserver_api::keyspace::KeySpace; use rand::{thread_rng, Rng}; use tokio_util::sync::CancellationToken; @@ -4514,6 +4515,61 @@ mod tests { Ok(()) } + async fn bulk_insert_compact_gc( + timeline: Arc, + ctx: &RequestContext, + mut lsn: Lsn, + repeat: usize, + key_count: usize, + ) -> anyhow::Result<()> { + let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap(); + let mut blknum = 0; + + // Enforce that key range is monotonously increasing + let mut keyspace = KeySpaceAccum::new(); + + for _ in 0..repeat { + for _ in 0..key_count { + test_key.field6 = blknum; + let mut writer = timeline.writer().await; + writer + .put( + test_key, + lsn, + &Value::Image(test_img(&format!("{} at {}", blknum, lsn))), + ctx, + ) + .await?; + writer.finish_write(lsn); + drop(writer); + + keyspace.add_key(test_key); + + lsn = Lsn(lsn.0 + 0x10); + blknum += 1; + } + + let cutoff = timeline.get_last_record_lsn(); + + timeline + .update_gc_info( + Vec::new(), + cutoff, + Duration::ZERO, + &CancellationToken::new(), + ctx, + ) + .await?; + timeline.freeze_and_flush().await?; + timeline + .compact(&CancellationToken::new(), EnumSet::empty(), ctx) + .await?; + timeline.gc().await?; + } + + Ok(()) + } + // // Insert 1000 key-value pairs with increasing keys, flush, compact, GC. // Repeat 50 times. @@ -4526,49 +4582,98 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) .await?; - let mut lsn = Lsn(0x10); + let lsn = Lsn(0x10); + bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?; - let mut keyspace = KeySpaceAccum::new(); + Ok(()) + } - let mut test_key = Key::from_hex("010000000033333333444444445500000000").unwrap(); - let mut blknum = 0; - for _ in 0..50 { - for _ in 0..10000 { - test_key.field6 = blknum; - let mut writer = tline.writer().await; - writer - .put( - test_key, - lsn, - &Value::Image(test_img(&format!("{} at {}", blknum, lsn))), - &ctx, - ) - .await?; - writer.finish_write(lsn); - drop(writer); + // Test the vectored get real implementation against a simple sequential implementation. + // + // The test generates a keyspace by repeatedly flushing the in-memory layer and compacting. + // Projected to 2D the key space looks like below. Lsn grows upwards on the Y axis and keys + // grow to the right on the X axis. + // [Delta] + // [Delta] + // [Delta] + // [Delta] + // ------------ Image --------------- + // + // After layer generation we pick the ranges to query as follows: + // 1. The beginning of each delta layer + // 2. At the seam between two adjacent delta layers + // + // There's one major downside to this test: delta layers only contains images, + // so the search can stop at the first delta layer and doesn't traverse any deeper. + #[tokio::test] + async fn test_get_vectored() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_get_vectored")?; + let (tenant, ctx) = harness.load().await; + let tline = tenant + .create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx) + .await?; - keyspace.add_key(test_key); + let lsn = Lsn(0x10); + bulk_insert_compact_gc(tline.clone(), &ctx, lsn, 50, 10000).await?; - lsn = Lsn(lsn.0 + 0x10); - blknum += 1; + let guard = tline.layers.read().await; + guard.layer_map().dump(true, &ctx).await?; + + let mut reads = Vec::new(); + let mut prev = None; + guard.layer_map().iter_historic_layers().for_each(|desc| { + if !desc.is_delta() { + prev = Some(desc.clone()); + return; } - let cutoff = tline.get_last_record_lsn(); + let start = desc.key_range.start; + let end = desc + .key_range + .start + .add(Timeline::MAX_GET_VECTORED_KEYS.try_into().unwrap()); + reads.push(KeySpace { + ranges: vec![start..end], + }); + if let Some(prev) = &prev { + if !prev.is_delta() { + return; + } + + let first_range = Key { + field6: prev.key_range.end.field6 - 4, + ..prev.key_range.end + }..prev.key_range.end; + + let second_range = desc.key_range.start..Key { + field6: desc.key_range.start.field6 + 4, + ..desc.key_range.start + }; + + reads.push(KeySpace { + ranges: vec![first_range, second_range], + }); + }; + + prev = Some(desc.clone()); + }); + + drop(guard); + + // Pick a big LSN such that we query over all the changes. + // Technically, u64::MAX - 1 is the largest LSN supported by the read path, + // but there seems to be a bug on the non-vectored search path which surfaces + // in that case. + let reads_lsn = Lsn(u64::MAX - 1000); + + for read in reads { + info!("Doing vectored read on {:?}", read); + + let vectored_res = tline.get_vectored_impl(read.clone(), reads_lsn, &ctx).await; tline - .update_gc_info( - Vec::new(), - cutoff, - Duration::ZERO, - &CancellationToken::new(), - &ctx, - ) - .await?; - tline.freeze_and_flush().await?; - tline - .compact(&CancellationToken::new(), EnumSet::empty(), &ctx) - .await?; - tline.gc().await?; + .validate_get_vectored_impl(&vectored_res, read, reads_lsn, &ctx) + .await; } Ok(()) diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index bb52e586d1..5f4814cc6b 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -52,8 +52,7 @@ use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use anyhow::Result; use pageserver_api::keyspace::KeySpaceAccum; -use std::cmp::Ordering; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::iter::Peekable; use std::ops::Range; use std::sync::Arc; @@ -147,43 +146,28 @@ impl Drop for BatchedUpdates<'_> { } /// Return value of LayerMap::search -#[derive(Eq, PartialEq, Debug)] +#[derive(Eq, PartialEq, Debug, Hash)] pub struct SearchResult { pub layer: Arc, pub lsn_floor: Lsn, } -pub struct OrderedSearchResult(SearchResult); - -impl Ord for OrderedSearchResult { - fn cmp(&self, other: &Self) -> Ordering { - self.0.lsn_floor.cmp(&other.0.lsn_floor) - } -} - -impl PartialOrd for OrderedSearchResult { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for OrderedSearchResult { - fn eq(&self, other: &Self) -> bool { - self.0.lsn_floor == other.0.lsn_floor - } -} - -impl Eq for OrderedSearchResult {} - +/// Return value of [`LayerMap::range_search`] +/// +/// Contains a mapping from a layer description to a keyspace +/// accumulator that contains all the keys which intersect the layer +/// from the original search space. Keys that were not found are accumulated +/// in a separate key space accumulator. +#[derive(Debug)] pub struct RangeSearchResult { - pub found: BTreeMap, + pub found: HashMap, pub not_found: KeySpaceAccum, } impl RangeSearchResult { fn new() -> Self { Self { - found: BTreeMap::new(), + found: HashMap::new(), not_found: KeySpaceAccum::new(), } } @@ -314,7 +298,7 @@ where Some(search_result) => self .result .found - .entry(OrderedSearchResult(search_result)) + .entry(search_result) .or_default() .add_range(covered_range), None => self.pad_range(covered_range), @@ -362,6 +346,35 @@ where } } +#[derive(PartialEq, Eq, Hash, Debug, Clone)] +pub enum InMemoryLayerHandle { + Open { + lsn_floor: Lsn, + end_lsn: Lsn, + }, + Frozen { + idx: usize, + lsn_floor: Lsn, + end_lsn: Lsn, + }, +} + +impl InMemoryLayerHandle { + pub fn get_lsn_floor(&self) -> Lsn { + match self { + InMemoryLayerHandle::Open { lsn_floor, .. } => *lsn_floor, + InMemoryLayerHandle::Frozen { lsn_floor, .. } => *lsn_floor, + } + } + + pub fn get_end_lsn(&self) -> Lsn { + match self { + InMemoryLayerHandle::Open { end_lsn, .. } => *end_lsn, + InMemoryLayerHandle::Frozen { end_lsn, .. } => *end_lsn, + } + } +} + impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given @@ -556,6 +569,43 @@ impl LayerMap { self.historic.iter() } + /// Get a handle for the first in memory layer that matches the provided predicate. + /// The handle should be used with [`Self::get_in_memory_layer`] to retrieve the actual layer. + /// + /// Note: [`Self::find_in_memory_layer`] and [`Self::get_in_memory_layer`] should be called during + /// the same exclusive region established by holding the layer manager lock. + pub fn find_in_memory_layer(&self, mut pred: Pred) -> Option + where + Pred: FnMut(&Arc) -> bool, + { + if let Some(open) = &self.open_layer { + if pred(open) { + return Some(InMemoryLayerHandle::Open { + lsn_floor: open.get_lsn_range().start, + end_lsn: open.get_lsn_range().end, + }); + } + } + + let pos = self.frozen_layers.iter().rev().position(pred); + pos.map(|rev_idx| { + let idx = self.frozen_layers.len() - 1 - rev_idx; + InMemoryLayerHandle::Frozen { + idx, + lsn_floor: self.frozen_layers[idx].get_lsn_range().start, + end_lsn: self.frozen_layers[idx].get_lsn_range().end, + } + }) + } + + /// Get the layer pointed to by the provided handle. + pub fn get_in_memory_layer(&self, handle: &InMemoryLayerHandle) -> Option> { + match handle { + InMemoryLayerHandle::Open { .. } => self.open_layer.clone(), + InMemoryLayerHandle::Frozen { idx, .. } => self.frozen_layers.get(*idx).cloned(), + } + } + /// /// Divide the whole given range of keys into sub-ranges based on the latest /// image layer that covers each range at the specified lsn (inclusive). @@ -869,6 +919,8 @@ impl LayerMap { #[cfg(test)] mod tests { + use pageserver_api::keyspace::KeySpace; + use super::*; #[derive(Clone)] @@ -895,15 +947,15 @@ mod tests { fn assert_range_search_result_eq(lhs: RangeSearchResult, rhs: RangeSearchResult) { assert_eq!(lhs.not_found.to_keyspace(), rhs.not_found.to_keyspace()); - let lhs: Vec<_> = lhs + let lhs: HashMap = lhs .found .into_iter() - .map(|(search_result, accum)| (search_result.0, accum.to_keyspace())) + .map(|(search_result, accum)| (search_result, accum.to_keyspace())) .collect(); - let rhs: Vec<_> = rhs + let rhs: HashMap = rhs .found .into_iter() - .map(|(search_result, accum)| (search_result.0, accum.to_keyspace())) + .map(|(search_result, accum)| (search_result, accum.to_keyspace())) .collect(); assert_eq!(lhs, rhs); @@ -923,7 +975,7 @@ mod tests { Some(res) => { range_search_result .found - .entry(OrderedSearchResult(res)) + .entry(res) .or_default() .add_key(key); } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 2d92baccbe..73c018db31 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -8,15 +8,21 @@ pub(crate) mod layer; mod layer_desc; use crate::context::{AccessStatsBehavior, RequestContext}; +use crate::repository::Value; use crate::task_mgr::TaskKind; use crate::walrecord::NeonWalRecord; use bytes::Bytes; use enum_map::EnumMap; use enumset::EnumSet; use once_cell::sync::Lazy; +use pageserver_api::key::Key; +use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::models::{ LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; +use std::cmp::{Ordering, Reverse}; +use std::collections::hash_map::Entry; +use std::collections::{BinaryHeap, HashMap}; use std::ops::Range; use std::sync::Mutex; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -34,6 +40,11 @@ pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub(crate) use layer::{EvictionError, Layer, ResidentLayer}; +use super::layer_map::InMemoryLayerHandle; +use super::timeline::layer_manager::LayerManager; +use super::timeline::GetVectoredError; +use super::PageReconstructError; + pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -67,6 +78,277 @@ pub struct ValueReconstructState { pub img: Option<(Lsn, Bytes)>, } +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub(crate) enum ValueReconstructSituation { + Complete, + #[default] + Continue, +} + +/// Reconstruct data accumulated for a single key during a vectored get +#[derive(Debug, Default, Clone)] +pub(crate) struct VectoredValueReconstructState { + pub(crate) records: Vec<(Lsn, NeonWalRecord)>, + pub(crate) img: Option<(Lsn, Bytes)>, + + situation: ValueReconstructSituation, +} + +impl VectoredValueReconstructState { + fn get_cached_lsn(&self) -> Option { + self.img.as_ref().map(|img| img.0) + } +} + +impl From for ValueReconstructState { + fn from(mut state: VectoredValueReconstructState) -> Self { + // walredo expects the records to be descending in terms of Lsn + state.records.sort_by_key(|(lsn, _)| Reverse(*lsn)); + + ValueReconstructState { + records: state.records, + img: state.img, + } + } +} + +/// Bag of data accumulated during a vectored get +pub(crate) struct ValuesReconstructState { + pub(crate) keys: HashMap>, + + keys_done: KeySpaceRandomAccum, +} + +impl ValuesReconstructState { + pub(crate) fn new() -> Self { + Self { + keys: HashMap::new(), + keys_done: KeySpaceRandomAccum::new(), + } + } + + /// Associate a key with the error which it encountered and mark it as done + pub(crate) fn on_key_error(&mut self, key: Key, err: PageReconstructError) { + let previous = self.keys.insert(key, Err(err)); + if let Some(Ok(state)) = previous { + if state.situation == ValueReconstructSituation::Continue { + self.keys_done.add_key(key); + } + } + } + + /// Update the state collected for a given key. + /// Returns true if this was the last value needed for the key and false otherwise. + /// + /// If the key is done after the update, mark it as such. + pub(crate) fn update_key( + &mut self, + key: &Key, + lsn: Lsn, + value: Value, + ) -> ValueReconstructSituation { + let state = self + .keys + .entry(*key) + .or_insert(Ok(VectoredValueReconstructState::default())); + + if let Ok(state) = state { + let key_done = match state.situation { + ValueReconstructSituation::Complete => unreachable!(), + ValueReconstructSituation::Continue => match value { + Value::Image(img) => { + state.img = Some((lsn, img)); + true + } + Value::WalRecord(rec) => { + let reached_cache = + state.get_cached_lsn().map(|clsn| clsn + 1) == Some(lsn); + let will_init = rec.will_init(); + state.records.push((lsn, rec)); + will_init || reached_cache + } + }, + }; + + if key_done && state.situation == ValueReconstructSituation::Continue { + state.situation = ValueReconstructSituation::Complete; + self.keys_done.add_key(*key); + } + + state.situation + } else { + ValueReconstructSituation::Complete + } + } + + /// Returns the Lsn at which this key is cached if one exists. + /// The read path should go no further than this Lsn for the given key. + pub(crate) fn get_cached_lsn(&self, key: &Key) -> Option { + self.keys + .get(key) + .and_then(|k| k.as_ref().ok()) + .and_then(|state| state.get_cached_lsn()) + } + + /// Returns the key space describing the keys that have + /// been marked as completed since the last call to this function. + pub(crate) fn consume_done_keys(&mut self) -> KeySpace { + self.keys_done.consume_keyspace() + } +} + +impl Default for ValuesReconstructState { + fn default() -> Self { + Self::new() + } +} + +/// Description of layer to be read - the layer map can turn +/// this description into the actual layer. +#[derive(PartialEq, Eq, Hash, Debug, Clone)] +pub(crate) enum ReadableLayerDesc { + Persistent { + desc: PersistentLayerDesc, + lsn_floor: Lsn, + lsn_ceil: Lsn, + }, + InMemory { + handle: InMemoryLayerHandle, + lsn_ceil: Lsn, + }, +} + +/// Wraper for 'ReadableLayerDesc' sorted by Lsn +#[derive(Debug)] +struct ReadableLayerDescOrdered(ReadableLayerDesc); + +/// Data structure which maintains a fringe of layers for the +/// read path. The fringe is the set of layers which intersects +/// the current keyspace that the search is descending on. +/// Each layer tracks the keyspace that intersects it. +/// +/// The fringe must appear sorted by Lsn. Hence, it uses +/// a two layer indexing scheme. +#[derive(Debug)] +pub(crate) struct LayerFringe { + layers_by_lsn: BinaryHeap, + layers: HashMap, +} + +impl LayerFringe { + pub(crate) fn new() -> Self { + LayerFringe { + layers_by_lsn: BinaryHeap::new(), + layers: HashMap::new(), + } + } + + pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayerDesc, KeySpace)> { + let handle = match self.layers_by_lsn.pop() { + Some(h) => h, + None => return None, + }; + + let removed = self.layers.remove_entry(&handle.0); + match removed { + Some((layer, keyspace)) => Some((layer, keyspace)), + None => unreachable!("fringe internals are always consistent"), + } + } + + pub(crate) fn update(&mut self, layer: ReadableLayerDesc, keyspace: KeySpace) { + let entry = self.layers.entry(layer.clone()); + match entry { + Entry::Occupied(mut entry) => { + entry.get_mut().merge(&keyspace); + } + Entry::Vacant(entry) => { + self.layers_by_lsn + .push(ReadableLayerDescOrdered(entry.key().clone())); + entry.insert(keyspace); + } + } + } +} + +impl Default for LayerFringe { + fn default() -> Self { + Self::new() + } +} + +impl Ord for ReadableLayerDescOrdered { + fn cmp(&self, other: &Self) -> Ordering { + let ord = self.0.get_lsn_ceil().cmp(&other.0.get_lsn_ceil()); + if ord == std::cmp::Ordering::Equal { + self.0 + .get_lsn_floor() + .cmp(&other.0.get_lsn_floor()) + .reverse() + } else { + ord + } + } +} + +impl PartialOrd for ReadableLayerDescOrdered { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for ReadableLayerDescOrdered { + fn eq(&self, other: &Self) -> bool { + self.0.get_lsn_floor() == other.0.get_lsn_floor() + && self.0.get_lsn_ceil() == other.0.get_lsn_ceil() + } +} + +impl Eq for ReadableLayerDescOrdered {} + +impl ReadableLayerDesc { + pub(crate) fn get_lsn_floor(&self) -> Lsn { + match self { + ReadableLayerDesc::Persistent { lsn_floor, .. } => *lsn_floor, + ReadableLayerDesc::InMemory { handle, .. } => handle.get_lsn_floor(), + } + } + + pub(crate) fn get_lsn_ceil(&self) -> Lsn { + match self { + ReadableLayerDesc::Persistent { lsn_ceil, .. } => *lsn_ceil, + ReadableLayerDesc::InMemory { lsn_ceil, .. } => *lsn_ceil, + } + } + + pub(crate) async fn get_values_reconstruct_data( + &self, + layer_manager: &LayerManager, + keyspace: KeySpace, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + match self { + ReadableLayerDesc::Persistent { desc, lsn_ceil, .. } => { + let layer = layer_manager.get_from_desc(desc); + layer + .get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx) + .await + } + ReadableLayerDesc::InMemory { handle, lsn_ceil } => { + let layer = layer_manager + .layer_map() + .get_in_memory_layer(handle) + .unwrap(); + + layer + .get_values_reconstruct_data(keyspace, *lsn_ceil, reconstruct_state, ctx) + .await + } + } + } +} + /// Return value from [`Layer::get_value_reconstruct_data`] #[derive(Clone, Copy, Debug)] pub enum ValueReconstructResult { diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 9a7bcbcebe..19eebf5531 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -35,16 +35,19 @@ use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; -use crate::tenant::Timeline; +use crate::tenant::timeline::GetVectoredError; +use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{walrecord, TEMP_FILE_SUFFIX}; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use camino::{Utf8Path, Utf8PathBuf}; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; use std::fs::File; use std::io::SeekFrom; use std::ops::Range; @@ -59,7 +62,10 @@ use utils::{ lsn::Lsn, }; -use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer}; +use super::{ + AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer, ValueReconstructSituation, + ValuesReconstructState, +}; /// /// Header stored in the beginning of the file @@ -818,6 +824,133 @@ impl DeltaLayerInner { } } + // Look up the keys in the provided keyspace and update + // the reconstruct state with whatever is found. + // + // If the key is cached, go no further than the cached Lsn. + // + // Currently, the index is visited for each range, but this + // can be further optimised to visit the index only once. + pub(super) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let file = &self.file; + let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( + self.index_start_blk, + self.index_root_blk, + file, + ); + + let mut offsets: BTreeMap> = BTreeMap::new(); + + for range in keyspace.ranges.iter() { + let mut ignore_key = None; + + // Scan the page versions backwards, starting from the last key in the range. + // to collect all the offsets at which need to be read. + let end_key = DeltaKey::from_key_lsn(&range.end, Lsn(end_lsn.0 - 1)); + tree_reader + .visit( + &end_key.0, + VisitDirection::Backwards, + |raw_key, value| { + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + let entry_lsn = DeltaKey::extract_lsn_from_buf(raw_key); + + if entry_lsn >= end_lsn { + return true; + } + + if key < range.start { + return false; + } + + if key >= range.end { + return true; + } + + if Some(key) == ignore_key { + return true; + } + + if let Some(cached_lsn) = reconstruct_state.get_cached_lsn(&key) { + if entry_lsn <= cached_lsn { + return key != range.start; + } + } + + let blob_ref = BlobRef(value); + let lsns_at = offsets.entry(key).or_default(); + lsns_at.push((entry_lsn, blob_ref.pos())); + + if blob_ref.will_init() { + if key == range.start { + return false; + } else { + ignore_key = Some(key); + return true; + } + } + + true + }, + &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerBtreeNode) + .build(), + ) + .await + .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; + } + + let ctx = &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::DeltaLayerValue) + .build(); + + let cursor = file.block_cursor(); + let mut buf = Vec::new(); + for (key, lsns_at) in offsets { + for (lsn, block_offset) in lsns_at { + let res = cursor.read_blob_into_buf(block_offset, &mut buf, ctx).await; + + if let Err(e) = res { + reconstruct_state.on_key_error( + key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to read blob from virtual file {}", + file.file.path + ))), + ); + + break; + } + + let value = Value::des(&buf); + if let Err(e) = value { + reconstruct_state.on_key_error( + key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to deserialize file blob from virtual file {}", + file.file.path + ))), + ); + + break; + } + + let key_situation = reconstruct_state.update_key(&key, lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + break; + } + } + } + + Ok(()) + } + pub(super) async fn load_keys<'a>( &'a self, ctx: &RequestContext, diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 458131b572..b867cb0333 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -26,20 +26,22 @@ use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::page_cache::PAGE_SZ; -use crate::repository::{Key, KEY_SIZE}; +use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, }; -use crate::tenant::Timeline; +use crate::tenant::timeline::GetVectoredError; +use crate::tenant::{PageReconstructError, Timeline}; use crate::virtual_file::{self, VirtualFile}; use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX}; -use anyhow::{bail, ensure, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use hex; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::LayerAccessKind; use pageserver_api::shard::TenantShardId; use rand::{distributions::Alphanumeric, Rng}; @@ -59,7 +61,7 @@ use utils::{ }; use super::filename::ImageFileName; -use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer}; +use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer, ValuesReconstructState}; /// /// Header stored in the beginning of the file @@ -438,6 +440,74 @@ impl ImageLayerInner { Ok(ValueReconstructResult::Missing) } } + + // Look up the keys in the provided keyspace and update + // the reconstruct state with whatever is found. + pub(super) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let file = &self.file; + let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file); + + let mut offsets = Vec::new(); + + for range in keyspace.ranges.iter() { + let mut search_key: [u8; KEY_SIZE] = [0u8; KEY_SIZE]; + range.start.write_to_byte_slice(&mut search_key); + + tree_reader + .visit( + &search_key, + VisitDirection::Forwards, + |raw_key, value| { + let key = Key::from_slice(&raw_key[..KEY_SIZE]); + assert!(key >= range.start); + + if !range.contains(&key) { + return false; + } + + offsets.push((key, value)); + + true + }, + &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerBtreeNode) + .build(), + ) + .await + .map_err(|err| GetVectoredError::Other(anyhow!(err)))?; + } + + let ctx = &RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::ImageLayerValue) + .build(); + + let cursor = file.block_cursor(); + let mut buf = Vec::new(); + for (key, offset) in offsets { + let res = cursor.read_blob_into_buf(offset, &mut buf, ctx).await; + if let Err(e) = res { + reconstruct_state.on_key_error( + key, + PageReconstructError::from(anyhow!(e).context(format!( + "Failed to read blob from virtual file {}", + file.file.path + ))), + ); + + continue; + } + + let blob = Bytes::copy_from_slice(buf.as_slice()); + reconstruct_state.update_key(&key, self.lsn, Value::Image(blob)); + } + + Ok(()) + } } /// A builder object for constructing a new image layer. diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 4b06a787ce..5f1db21d49 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -9,13 +9,15 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; use crate::repository::{Key, Value}; use crate::tenant::block_io::BlockReader; use crate::tenant::ephemeral_file::EphemeralFile; -use crate::tenant::storage_layer::{ValueReconstructResult, ValueReconstructState}; -use crate::tenant::Timeline; +use crate::tenant::storage_layer::ValueReconstructResult; +use crate::tenant::timeline::GetVectoredError; +use crate::tenant::{PageReconstructError, Timeline}; use crate::walrecord; -use anyhow::{ensure, Result}; +use anyhow::{anyhow, ensure, Result}; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; -use std::collections::HashMap; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::sync::{Arc, OnceLock}; use tracing::*; use utils::{bin_ser::BeSer, id::TimelineId, lsn::Lsn, vec_map::VecMap}; @@ -25,7 +27,10 @@ use std::fmt::Write as _; use std::ops::Range; use tokio::sync::{RwLock, RwLockWriteGuard}; -use super::{DeltaLayerWriter, ResidentLayer}; +use super::{ + DeltaLayerWriter, ResidentLayer, ValueReconstructSituation, ValueReconstructState, + ValuesReconstructState, +}; pub struct InMemoryLayer { conf: &'static PageServerConf, @@ -202,6 +207,91 @@ impl InMemoryLayer { Ok(ValueReconstructResult::Complete) } } + + // Look up the keys in the provided keyspace and update + // the reconstruct state with whatever is found. + // + // If the key is cached, go no further than the cached Lsn. + pub(crate) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::InMemoryLayer) + .build(); + + let inner = self.inner.read().await; + let reader = inner.file.block_cursor(); + + #[derive(Eq, PartialEq, Ord, PartialOrd)] + struct BlockRead { + key: Key, + lsn: Lsn, + block_offset: u64, + } + + let mut planned_block_reads = BinaryHeap::new(); + + for range in keyspace.ranges.iter() { + let mut key = range.start; + while key < range.end { + if let Some(vec_map) = inner.index.get(&key) { + let lsn_range = match reconstruct_state.get_cached_lsn(&key) { + Some(cached_lsn) => (cached_lsn + 1)..end_lsn, + None => self.start_lsn..end_lsn, + }; + + let slice = vec_map.slice_range(lsn_range); + for (entry_lsn, pos) in slice.iter().rev() { + planned_block_reads.push(BlockRead { + key, + lsn: *entry_lsn, + block_offset: *pos, + }); + } + } + + key = key.next(); + } + } + + let keyspace_size = keyspace.total_size(); + + let mut completed_keys = HashSet::new(); + while completed_keys.len() < keyspace_size && !planned_block_reads.is_empty() { + let block_read = planned_block_reads.pop().unwrap(); + if completed_keys.contains(&block_read.key) { + continue; + } + + let buf = reader.read_blob(block_read.block_offset, &ctx).await; + if let Err(e) = buf { + reconstruct_state + .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e))); + completed_keys.insert(block_read.key); + continue; + } + + let value = Value::des(&buf.unwrap()); + if let Err(e) = value { + reconstruct_state + .on_key_error(block_read.key, PageReconstructError::from(anyhow!(e))); + completed_keys.insert(block_read.key); + continue; + } + + let key_situation = + reconstruct_state.update_key(&block_read.key, block_read.lsn, value.unwrap()); + if key_situation == ValueReconstructSituation::Complete { + completed_keys.insert(block_read.key); + } + } + + Ok(()) + } } impl std::fmt::Display for InMemoryLayer { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index bfcc031863..cc5b7ade6a 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -1,5 +1,6 @@ use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; +use pageserver_api::keyspace::KeySpace; use pageserver_api::models::{ HistoricLayerInfo, LayerAccessKind, LayerResidenceEventReason, LayerResidenceStatus, }; @@ -16,13 +17,14 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; +use crate::tenant::timeline::GetVectoredError; use crate::tenant::{remote_timeline_client::LayerFileMetadata, Timeline}; use super::delta_layer::{self, DeltaEntry}; use super::image_layer; use super::{ AsLayerDesc, LayerAccessStats, LayerAccessStatsReset, LayerFileName, PersistentLayerDesc, - ValueReconstructResult, ValueReconstructState, + ValueReconstructResult, ValueReconstructState, ValuesReconstructState, }; use utils::generation::Generation; @@ -262,6 +264,29 @@ impl Layer { .with_context(|| format!("get_value_reconstruct_data for layer {self}")) } + pub(crate) async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_data: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let layer = self + .0 + .get_or_maybe_download(true, Some(ctx)) + .await + .map_err(|err| GetVectoredError::Other(anyhow::anyhow!(err)))?; + + self.0 + .access_stats + .record_access(LayerAccessKind::GetValueReconstructData, ctx); + + layer + .get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, &self.0, ctx) + .instrument(tracing::debug_span!("get_values_reconstruct_data", layer=%self)) + .await + } + /// Download the layer if evicted. /// /// Will not error when the layer is already downloaded. @@ -1177,7 +1202,7 @@ pub(crate) enum EvictionError { /// Error internal to the [`LayerInner::get_or_maybe_download`] #[derive(Debug, thiserror::Error)] -enum DownloadError { +pub(crate) enum DownloadError { #[error("timeline has already shutdown")] TimelineShutdown, #[error("no remote storage configured")] @@ -1337,6 +1362,28 @@ impl DownloadedLayer { } } + async fn get_values_reconstruct_data( + &self, + keyspace: KeySpace, + end_lsn: Lsn, + reconstruct_data: &mut ValuesReconstructState, + owner: &Arc, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + use LayerKind::*; + + match self.get(owner, ctx).await.map_err(GetVectoredError::from)? { + Delta(d) => { + d.get_values_reconstruct_data(keyspace, end_lsn, reconstruct_data, ctx) + .await + } + Image(i) => { + i.get_values_reconstruct_data(keyspace, reconstruct_data, ctx) + .await + } + } + } + async fn dump(&self, owner: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { use LayerKind::*; match self.get(owner, ctx).await? { diff --git a/pageserver/src/tenant/storage_layer/layer_desc.rs b/pageserver/src/tenant/storage_layer/layer_desc.rs index fa78e9fdb2..c375923e81 100644 --- a/pageserver/src/tenant/storage_layer/layer_desc.rs +++ b/pageserver/src/tenant/storage_layer/layer_desc.rs @@ -15,7 +15,7 @@ use utils::id::TenantId; /// A unique identifier of a persistent layer. This is different from `LayerDescriptor`, which is only used in the /// benchmarks. This struct contains all necessary information to find the image / delta layer. It also provides /// a unified way to generate layer information like file name. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Hash)] pub struct PersistentLayerDesc { pub tenant_shard_id: TenantShardId, pub timeline_id: TimelineId, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 92e5b52c75..0f22284c55 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -16,7 +16,7 @@ use futures::stream::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; use pageserver_api::{ - keyspace::{key_range_size, KeySpaceAccum}, + keyspace::KeySpaceAccum, models::{ DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskSpawnRequest, EvictionPolicy, LayerMapInfo, TimelineState, @@ -67,7 +67,7 @@ use crate::{ tenant::storage_layer::{ AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, - ValueReconstructState, + ValueReconstructState, ValuesReconstructState, }, }; use crate::{ @@ -111,11 +111,11 @@ use self::layer_manager::LayerManager; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; -use super::config::TenantConf; -use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline}; +use super::{config::TenantConf, storage_layer::ReadableLayerDesc}; use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf}; +use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { @@ -472,6 +472,15 @@ pub(crate) enum GetVectoredError { #[error("Requested at invalid LSN: {0}")] InvalidLsn(Lsn), + + #[error("Requested key {0} not found")] + MissingKey(Key), + + #[error(transparent)] + GetReadyAncestorError(GetReadyAncestorError), + + #[error(transparent)] + Other(#[from] anyhow::Error), } #[derive(thiserror::Error, Debug)] @@ -579,6 +588,23 @@ impl From for PageReconstructError { } } +#[derive( + Eq, + PartialEq, + Debug, + Copy, + Clone, + strum_macros::EnumString, + strum_macros::Display, + serde_with::DeserializeFromStr, + serde_with::SerializeDisplay, +)] +#[strum(serialize_all = "kebab-case")] +pub enum GetVectoredImpl { + Sequential, + Vectored, +} + /// Public interface functions impl Timeline { /// Get the LSN where this branch was created @@ -708,7 +734,7 @@ impl Timeline { /// which actually vectorizes the read path. pub(crate) async fn get_vectored( &self, - key_ranges: &[Range], + keyspace: KeySpace, lsn: Lsn, ctx: &RequestContext, ) -> Result>, GetVectoredError> { @@ -716,10 +742,7 @@ impl Timeline { return Err(GetVectoredError::InvalidLsn(lsn)); } - let key_count = key_ranges - .iter() - .map(|range| key_range_size(range) as u64) - .sum(); + let key_count = keyspace.total_size().try_into().unwrap(); if key_count > Timeline::MAX_GET_VECTORED_KEYS { return Err(GetVectoredError::Oversized(key_count)); } @@ -728,33 +751,163 @@ impl Timeline { .throttle(ctx, key_count as usize) .await; - let _timer = crate::metrics::GET_VECTORED_LATENCY - .for_task_kind(ctx.task_kind()) - .map(|t| t.start_timer()); - - let mut values = BTreeMap::new(); - for range in key_ranges { + for range in &keyspace.ranges { let mut key = range.start; while key != range.end { assert!(!self.shard_identity.is_key_disposable(&key)); - - let block = self.get(key, lsn, ctx).await; - - if matches!( - block, - Err(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_)) - ) { - return Err(GetVectoredError::Cancelled); - } - - values.insert(key, block); key = key.next(); } } + trace!( + "get vectored request for {:?}@{} from task kind {:?} will use {} implementation", + keyspace, + lsn, + ctx.task_kind(), + self.conf.get_vectored_impl + ); + + let _timer = crate::metrics::GET_VECTORED_LATENCY + .for_task_kind(ctx.task_kind()) + .map(|t| t.start_timer()); + + match self.conf.get_vectored_impl { + GetVectoredImpl::Sequential => { + self.get_vectored_sequential_impl(keyspace, lsn, ctx).await + } + GetVectoredImpl::Vectored => { + let vectored_res = self.get_vectored_impl(keyspace.clone(), lsn, ctx).await; + + self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx) + .await; + + vectored_res + } + } + } + + pub(super) async fn get_vectored_sequential_impl( + &self, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + let mut values = BTreeMap::new(); + for range in keyspace.ranges { + let mut key = range.start; + while key != range.end { + let block = self.get(key, lsn, ctx).await; + + use PageReconstructError::*; + match block { + Err(Cancelled | AncestorStopping(_)) => { + return Err(GetVectoredError::Cancelled) + } + Err(Other(err)) if err.to_string().contains("could not find data for key") => { + return Err(GetVectoredError::MissingKey(key)) + } + _ => { + values.insert(key, block); + key = key.next(); + } + } + } + } + Ok(values) } + pub(super) async fn get_vectored_impl( + &self, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) -> Result>, GetVectoredError> { + let mut reconstruct_state = ValuesReconstructState::new(); + + self.get_vectored_reconstruct_data(keyspace, lsn, &mut reconstruct_state, ctx) + .await?; + + let mut results: BTreeMap> = BTreeMap::new(); + for (key, res) in reconstruct_state.keys { + match res { + Err(err) => { + results.insert(key, Err(err)); + } + Ok(state) => { + let state = ValueReconstructState::from(state); + + let reconstruct_res = self.reconstruct_value(key, lsn, state).await; + results.insert(key, reconstruct_res); + } + } + } + + Ok(results) + } + + pub(super) async fn validate_get_vectored_impl( + &self, + vectored_res: &Result>, GetVectoredError>, + keyspace: KeySpace, + lsn: Lsn, + ctx: &RequestContext, + ) { + let sequential_res = self + .get_vectored_sequential_impl(keyspace.clone(), lsn, ctx) + .await; + + fn errors_match(lhs: &GetVectoredError, rhs: &GetVectoredError) -> bool { + use GetVectoredError::*; + match (lhs, rhs) { + (Cancelled, Cancelled) => true, + (_, Cancelled) => true, + (Oversized(l), Oversized(r)) => l == r, + (InvalidLsn(l), InvalidLsn(r)) => l == r, + (MissingKey(l), MissingKey(r)) => l == r, + (GetReadyAncestorError(_), GetReadyAncestorError(_)) => true, + (Other(_), Other(_)) => true, + _ => false, + } + } + + match (&sequential_res, vectored_res) { + (Err(seq_err), Ok(_)) => { + panic!(concat!("Sequential get failed with {}, but vectored get did not", + " - keyspace={:?} lsn={}"), + seq_err, keyspace, lsn) }, + (Ok(_), Err(vec_err)) => { + panic!(concat!("Vectored get failed with {}, but sequential get did not", + " - keyspace={:?} lsn={}"), + vec_err, keyspace, lsn) }, + (Err(seq_err), Err(vec_err)) => { + assert!(errors_match(seq_err, vec_err), + "Mismatched errors: {seq_err} != {vec_err} - keyspace={keyspace:?} lsn={lsn}")}, + (Ok(seq_values), Ok(vec_values)) => { + seq_values.iter().zip(vec_values.iter()).for_each(|((seq_key, seq_res), (vec_key, vec_res))| { + assert_eq!(seq_key, vec_key); + match (seq_res, vec_res) { + (Ok(seq_blob), Ok(vec_blob)) => { + assert_eq!(seq_blob, vec_blob, + "Image mismatch for key {seq_key} - keyspace={keyspace:?} lsn={lsn}"); + }, + (Err(err), Ok(_)) => { + panic!( + concat!("Sequential get failed with {} for key {}, but vectored get did not", + " - keyspace={:?} lsn={}"), + err, seq_key, keyspace, lsn) }, + (Ok(_), Err(err)) => { + panic!( + concat!("Vectored get failed with {} for key {}, but sequential get did not", + " - keyspace={:?} lsn={}"), + err, seq_key, keyspace, lsn) }, + (Err(_), Err(_)) => {} + } + }) + } + } + } + /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. pub(crate) fn get_last_record_lsn(&self) -> Lsn { self.last_record_lsn.load().last @@ -2547,6 +2700,170 @@ impl Timeline { } } + /// Get the data needed to reconstruct all keys in the provided keyspace + /// + /// The algorithm is as follows: + /// 1. While some keys are still not done and there's a timeline to visit: + /// 2. Visit the timeline (see [`Timeline::get_vectored_reconstruct_data_timeline`]: + /// 2.1: Build the fringe for the current keyspace + /// 2.2 Visit the newest layer from the fringe to collect all values for the range it + /// intersects + /// 2.3. Pop the timeline from the fringe + /// 2.4. If the fringe is empty, go back to 1 + async fn get_vectored_reconstruct_data( + &self, + mut keyspace: KeySpace, + request_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + ctx: &RequestContext, + ) -> Result<(), GetVectoredError> { + let mut timeline_owned: Arc; + let mut timeline = self; + + let mut cont_lsn = Lsn(request_lsn.0 + 1); + + loop { + if self.cancel.is_cancelled() { + return Err(GetVectoredError::Cancelled); + } + + let completed = Self::get_vectored_reconstruct_data_timeline( + timeline, + keyspace.clone(), + cont_lsn, + reconstruct_state, + &self.cancel, + ctx, + ) + .await?; + + keyspace.remove_overlapping_with(&completed); + if keyspace.total_size() == 0 || timeline.ancestor_timeline.is_none() { + break; + } + + cont_lsn = Lsn(timeline.ancestor_lsn.0 + 1); + timeline_owned = timeline + .get_ready_ancestor_timeline(ctx) + .await + .map_err(GetVectoredError::GetReadyAncestorError)?; + timeline = &*timeline_owned; + } + + if keyspace.total_size() != 0 { + return Err(GetVectoredError::MissingKey(keyspace.start().unwrap())); + } + + Ok(()) + } + + /// Collect the reconstruct data for a ketspace from the specified timeline. + /// + /// Maintain a fringe [`LayerFringe`] which tracks all the layers that intersect + /// the current keyspace. The current keyspace of the search at any given timeline + /// is the original keyspace minus all the keys that have been completed minus + /// any keys for which we couldn't find an intersecting layer. It's not tracked explicitly, + /// but if you merge all the keyspaces in the fringe, you get the "current keyspace". + /// + /// This is basically a depth-first search visitor implementation where a vertex + /// is the (layer, lsn range, key space) tuple. The fringe acts as the stack. + /// + /// At each iteration pop the top of the fringe (the layer with the highest Lsn) + /// and get all the required reconstruct data from the layer in one go. + async fn get_vectored_reconstruct_data_timeline( + timeline: &Timeline, + keyspace: KeySpace, + mut cont_lsn: Lsn, + reconstruct_state: &mut ValuesReconstructState, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> Result { + let mut unmapped_keyspace = keyspace.clone(); + let mut fringe = LayerFringe::new(); + + let mut completed_keyspace = KeySpace::default(); + + // Hold the layer map whilst visiting the timeline to prevent + // compaction, eviction and flushes from rendering the layers unreadable. + // + // TODO: Do we actually need to do this? In theory holding on + // to [`tenant::storage_layer::Layer`] should be enough. However, + // [`Timeline::get`] also holds the lock during IO, so more investigation + // is needed. + let guard = timeline.layers.read().await; + let layers = guard.layer_map(); + + 'outer: loop { + if cancel.is_cancelled() { + return Err(GetVectoredError::Cancelled); + } + + let keys_done_last_step = reconstruct_state.consume_done_keys(); + unmapped_keyspace.remove_overlapping_with(&keys_done_last_step); + completed_keyspace.merge(&keys_done_last_step); + + let in_memory_layer = layers.find_in_memory_layer(|l| { + let start_lsn = l.get_lsn_range().start; + cont_lsn > start_lsn + }); + + match in_memory_layer { + Some(l) => { + fringe.update( + ReadableLayerDesc::InMemory { + handle: l, + lsn_ceil: cont_lsn, + }, + unmapped_keyspace.clone(), + ); + } + None => { + for range in unmapped_keyspace.ranges.iter() { + let results = match layers.range_search(range.clone(), cont_lsn) { + Some(res) => res, + None => { + break 'outer; + } + }; + + results + .found + .into_iter() + .map(|(SearchResult { layer, lsn_floor }, keyspace_accum)| { + ( + ReadableLayerDesc::Persistent { + desc: (*layer).clone(), + lsn_floor, + lsn_ceil: cont_lsn, + }, + keyspace_accum.to_keyspace(), + ) + }) + .for_each(|(layer, keyspace)| fringe.update(layer, keyspace)); + } + } + } + + if let Some((layer_to_read, keyspace_to_read)) = fringe.next_layer() { + layer_to_read + .get_values_reconstruct_data( + &guard, + keyspace_to_read.clone(), + reconstruct_state, + ctx, + ) + .await?; + + unmapped_keyspace = keyspace_to_read; + cont_lsn = layer_to_read.get_lsn_floor(); + } else { + break; + } + } + + Ok(completed_keyspace) + } + /// # Cancel-safety /// /// This method is cancellation-safe. @@ -3263,7 +3580,7 @@ impl Timeline { || last_key_in_range { let results = self - .get_vectored(&key_request_accum.consume_keyspace().ranges, lsn, ctx) + .get_vectored(key_request_accum.consume_keyspace(), lsn, ctx) .await?; for (img_key, img) in results { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 51b126b84b..ce5ef66d22 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -488,6 +488,11 @@ class NeonEnvBuilder: self.pageserver_virtual_file_io_engine: Optional[str] = pageserver_virtual_file_io_engine + self.pageserver_get_vectored_impl: Optional[str] = None + if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored": + self.pageserver_get_vectored_impl = "vectored" + log.debug('Overriding pageserver get_vectored_impl config to "vectored"') + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1055,6 +1060,8 @@ class NeonEnv: } if self.pageserver_virtual_file_io_engine is not None: ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine + if config.pageserver_get_vectored_impl is not None: + ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl # Create a corresponding NeonPageserver object self.pageservers.append( diff --git a/test_runner/regress/test_compatibility.py b/test_runner/regress/test_compatibility.py index 465101f64f..0ea76d447e 100644 --- a/test_runner/regress/test_compatibility.py +++ b/test_runner/regress/test_compatibility.py @@ -226,6 +226,10 @@ def test_forward_compatibility( ) try: + # TODO: remove this once the previous pageserrver version understands + # the 'get_vectored_impl' config + neon_env_builder.pageserver_get_vectored_impl = None + neon_env_builder.num_safekeepers = 3 neon_local_binpath = neon_env_builder.neon_binpath env = neon_env_builder.from_repo_dir(