From 7e20b49da473276479340d2e2efd42dc2b336236 Mon Sep 17 00:00:00 2001 From: Alex Chi Z Date: Thu, 29 Jun 2023 15:06:07 -0400 Subject: [PATCH] refactor: use LayerDesc in LayerMap (part 2) (#4437) ## Problem part of https://github.com/neondatabase/neon/issues/4392, continuation of https://github.com/neondatabase/neon/pull/4408 ## Summary of changes This PR removes all layer objects from LayerMap and moves it to the timeline struct. In timeline struct, LayerFileManager maps a layer descriptor to a layer object, and it is stored in the same RwLock as LayerMap to avoid behavior difference. Key changes: * LayerMap now does not have generic, and only stores descriptors. * In Timeline, we add a new struct called layer mapping. * Currently, layer mapping is stored in the same lock with layer map. Every time we retrieve data from the layer map, we will need to map the descriptor to the actual object. * Replace_historic is moved to layer mapping's replace, and the return value behavior is different from before. I'm a little bit unsure about this part and it would be good to have some comments on that. * Some test cases are rewritten to adapt to the new interface, and we can decide whether to remove it in the future because it does not make much sense now. * LayerDescriptor is moved to `tests` module and should only be intended for unit testing / benchmarks. * Because we now have a usage pattern like "take the guard of lock, then get the reference of two fields", we want to avoid dropping the incorrect object when we intend to unlock the lock guard. Therefore, a new set of helper function `drop_r/wlock` is added. This can be removed in the future when we finish the refactor. TODOs after this PR: fully remove RemoteLayer, and move LayerMapping to a separate LayerCache. all refactor PRs: ``` #4437 --- #4479 ------------ #4510 (refactor done at this point) \-- #4455 -- #4502 --/ ``` --------- Signed-off-by: Alex Chi Z --- pageserver/benches/bench_layer_map.rs | 31 +- pageserver/src/tenant.rs | 1 + pageserver/src/tenant/layer_map.rs | 347 ++++----------- .../layer_map/historic_layer_coverage.rs | 211 +-------- pageserver/src/tenant/storage_layer.rs | 215 +++++---- .../src/tenant/storage_layer/remote_layer.rs | 9 +- pageserver/src/tenant/timeline.rs | 409 +++++++++++++----- .../src/tenant/timeline/eviction_task.rs | 4 +- test_runner/regress/test_ondemand_download.py | 4 +- 9 files changed, 522 insertions(+), 709 deletions(-) diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 45dc9fad4a..03bb7a5bfd 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -1,22 +1,23 @@ use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::repository::Key; use pageserver::tenant::layer_map::LayerMap; -use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; +use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName}; +use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc}; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use std::cmp::{max, min}; use std::fs::File; use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; use std::time::Instant; +use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -fn build_layer_map(filename_dump: PathBuf) -> LayerMap { - let mut layer_map = LayerMap::::default(); +fn build_layer_map(filename_dump: PathBuf) -> LayerMap { + let mut layer_map = LayerMap::default(); let mut min_lsn = Lsn(u64::MAX); let mut max_lsn = Lsn(0); @@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone()); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { } /// Construct a layer map query pattern for benchmarks -fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { +fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { // For each image layer we query one of the pages contained, at LSN right // before the image layer was created. This gives us a somewhat uniform // coverage of both the lsn and key space because image layers have @@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn // Construct a partitioning for testing get_difficulty map when we // don't have an exact result of `collect_keyspace` to work with. -fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { +fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { let mut parts = Vec::new(); // We add a partition boundary at the start of each image layer, @@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) { for i in 0..100_000 { let i32 = (i as u32) % 100; let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); - let layer = LayerDescriptor { - key: zero.add(10 * i32)..zero.add(10 * i32 + 1), - lsn: Lsn(i)..Lsn(i + 1), - is_incremental: false, - short_id: format!("Layer {}", i), - }; - updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); + let layer = LayerDescriptor::from(PersistentLayerDesc::new_img( + TenantId::generate(), + TimelineId::generate(), + zero.add(10 * i32)..zero.add(10 * i32 + 1), + Lsn(i), + false, + 0, + )); + updates.insert_historic(layer.layer_desc().clone()); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ce1e5bf51f..47e9b5b4ec 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -590,6 +590,7 @@ impl Tenant { .layers .read() .await + .0 .iter_historic_layers() .next() .is_some(), diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index ca1a71b623..dee02ac433 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; -use anyhow::Context; use anyhow::Result; -use std::collections::HashMap; use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; use utils::lsn::Lsn; use historic_layer_coverage::BufferedHistoricLayerCoverage; -pub use historic_layer_coverage::Replacement; +pub use historic_layer_coverage::LayerKey; use super::storage_layer::range_eq; use super::storage_layer::PersistentLayerDesc; -use super::storage_layer::PersistentLayerKey; /// /// LayerMap tracks what layers exist on a timeline. /// -pub struct LayerMap { +#[derive(Default)] +pub struct LayerMap { // // 'open_layer' holds the current InMemoryLayer that is accepting new // records. If it is None, 'next_open_layer_at' will be set instead, indicating @@ -95,24 +93,6 @@ pub struct LayerMap { /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. l0_delta_layers: Vec>, - - /// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and - /// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and - /// RemoteLayer will be removed. - mapping: HashMap>, -} - -impl Default for LayerMap { - fn default() -> Self { - Self { - open_layer: None, - next_open_layer_at: None, - frozen_layers: VecDeque::default(), - l0_delta_layers: Vec::default(), - historic: BufferedHistoricLayerCoverage::default(), - mapping: HashMap::default(), - } - } } /// The primary update API for the layer map. @@ -120,24 +100,21 @@ impl Default for LayerMap { /// Batching historic layer insertions and removals is good for /// performance and this struct helps us do that correctly. #[must_use] -pub struct BatchedUpdates<'a, L: ?Sized + Layer> { +pub struct BatchedUpdates<'a> { // While we hold this exclusive reference to the layer map the type checker // will prevent us from accidentally reading any unflushed updates. - layer_map: &'a mut LayerMap, + layer_map: &'a mut LayerMap, } /// Provide ability to batch more updates while hiding the read /// API so we don't accidentally read without flushing. -impl BatchedUpdates<'_, L> -where - L: ?Sized + Layer, -{ +impl BatchedUpdates<'_> { /// /// Insert an on-disk layer. /// // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` - pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { - self.layer_map.insert_historic_noflush(layer_desc, layer) + pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) { + self.layer_map.insert_historic_noflush(layer_desc) } /// @@ -145,31 +122,8 @@ where /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { - self.layer_map.remove_historic_noflush(layer_desc, layer) - } - - /// Replaces existing layer iff it is the `expected`. - /// - /// If the expected layer has been removed it will not be inserted by this function. - /// - /// Returned `Replacement` describes succeeding in replacement or the reason why it could not - /// be done. - /// - /// TODO replacement can be done without buffering and rebuilding layer map updates. - /// One way to do that is to add a layer of indirection for returned values, so - /// that we can replace values only by updating a hashmap. - pub fn replace_historic( - &mut self, - expected_desc: PersistentLayerDesc, - expected: &Arc, - new_desc: PersistentLayerDesc, - new: Arc, - ) -> anyhow::Result>> { - fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound)); - - self.layer_map - .replace_historic_noflush(expected_desc, expected, new_desc, new) + pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) { + self.layer_map.remove_historic_noflush(layer_desc) } // We will flush on drop anyway, but this method makes it @@ -185,25 +139,19 @@ where // than panic later or read without flushing. // // TODO maybe warn if flush hasn't explicitly been called -impl Drop for BatchedUpdates<'_, L> -where - L: ?Sized + Layer, -{ +impl Drop for BatchedUpdates<'_> { fn drop(&mut self) { self.layer_map.flush_updates(); } } /// Return value of LayerMap::search -pub struct SearchResult { - pub layer: Arc, +pub struct SearchResult { + pub layer: Arc, pub lsn_floor: Lsn, } -impl LayerMap -where - L: ?Sized + Layer, -{ +impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given /// 'key', with lsn.start < 'end_lsn'. @@ -235,7 +183,7 @@ where /// NOTE: This only searches the 'historic' layers, *not* the /// 'open' and 'frozen' layers! /// - pub fn search(&self, key: Key, end_lsn: Lsn) -> Option> { + pub fn search(&self, key: Key, end_lsn: Lsn) -> Option { let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128()); @@ -244,7 +192,6 @@ where (None, None) => None, (None, Some(image)) => { let lsn_floor = image.get_lsn_range().start; - let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor, @@ -252,7 +199,6 @@ where } (Some(delta), None) => { let lsn_floor = delta.get_lsn_range().start; - let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -263,7 +209,6 @@ where let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_exact_match = img_lsn + 1 == end_lsn; if image_is_newer || image_exact_match { - let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor: img_lsn, @@ -271,7 +216,6 @@ where } else { let lsn_floor = std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); - let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -282,7 +226,7 @@ where } /// Start a batch of updates, applied on drop - pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> { + pub fn batch_update(&mut self) -> BatchedUpdates<'_> { BatchedUpdates { layer_map: self } } @@ -292,48 +236,32 @@ where /// Helper function for BatchedUpdates::insert_historic /// /// TODO(chi): remove L generic so that we do not need to pass layer object. - pub(self) fn insert_historic_noflush( - &mut self, - layer_desc: PersistentLayerDesc, - layer: Arc, - ) { - self.mapping.insert(layer_desc.key(), layer.clone()); - + pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) { // TODO: See #3869, resulting #4088, attempted fix and repro #4094 - if Self::is_l0(&layer) { + if Self::is_l0(&layer_desc) { self.l0_delta_layers.push(layer_desc.clone().into()); } self.historic.insert( - historic_layer_coverage::LayerKey::from(&*layer), + historic_layer_coverage::LayerKey::from(&layer_desc), layer_desc.into(), ); } - fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc { - let layer = self - .mapping - .get(key) - .with_context(|| format!("{key:?}")) - .expect("inconsistent layer mapping"); - layer - } - /// /// Remove an on-disk layer from the map. /// /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { + pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) { self.historic - .remove(historic_layer_coverage::LayerKey::from(&*layer)); - if Self::is_l0(&layer) { + .remove(historic_layer_coverage::LayerKey::from(&layer_desc)); + let layer_key = layer_desc.key(); + if Self::is_l0(&layer_desc) { let len_before = self.l0_delta_layers.len(); let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); - l0_delta_layers.retain(|other| { - !Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer) - }); + l0_delta_layers.retain(|other| other.key() != layer_key); self.l0_delta_layers = l0_delta_layers; // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // there's a chance that the comparison fails at runtime due to it comparing (pointer, @@ -344,69 +272,6 @@ where "failed to locate removed historic layer from l0_delta_layers" ); } - self.mapping.remove(&layer_desc.key()); - } - - pub(self) fn replace_historic_noflush( - &mut self, - expected_desc: PersistentLayerDesc, - expected: &Arc, - new_desc: PersistentLayerDesc, - new: Arc, - ) -> anyhow::Result>> { - let key = historic_layer_coverage::LayerKey::from(&**expected); - let other = historic_layer_coverage::LayerKey::from(&*new); - - let expected_l0 = Self::is_l0(expected); - let new_l0 = Self::is_l0(&new); - - anyhow::ensure!( - key == other, - "expected and new must have equal LayerKeys: {key:?} != {other:?}" - ); - - anyhow::ensure!( - expected_l0 == new_l0, - "expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}" - ); - - let l0_index = if expected_l0 { - // find the index in case replace worked, we need to replace that as well - let pos = self.l0_delta_layers.iter().position(|slot| { - Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected) - }); - - if pos.is_none() { - return Ok(Replacement::NotFound); - } - pos - } else { - None - }; - - let new_desc = Arc::new(new_desc); - let replaced = self.historic.replace(&key, new_desc.clone(), |existing| { - **existing == expected_desc - }); - - if let Replacement::Replaced { .. } = &replaced { - self.mapping.remove(&expected_desc.key()); - self.mapping.insert(new_desc.key(), new); - if let Some(index) = l0_index { - self.l0_delta_layers[index] = new_desc; - } - } - - let replaced = match replaced { - Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered }, - Replacement::NotFound => Replacement::NotFound, - Replacement::RemovalBuffered => Replacement::RemovalBuffered, - Replacement::Unexpected(x) => { - Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone()) - } - }; - - Ok(replaced) } /// Helper function for BatchedUpdates::drop. @@ -454,10 +319,8 @@ where Ok(true) } - pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { - self.historic - .iter() - .map(|x| self.get_layer_from_mapping(&x.key()).clone()) + pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { + self.historic.iter() } /// @@ -472,7 +335,7 @@ where &self, key_range: &Range, lsn: Lsn, - ) -> Result, Option>)>> { + ) -> Result, Option>)>> { let version = match self.historic.get().unwrap().get_version(lsn.0) { Some(v) => v, None => return Ok(vec![]), @@ -482,36 +345,26 @@ where let end = key_range.end.to_i128(); // Initialize loop variables - let mut coverage: Vec<(Range, Option>)> = vec![]; + let mut coverage: Vec<(Range, Option>)> = vec![]; let mut current_key = start; let mut current_val = version.image_coverage.query(start); // Loop through the change events and push intervals for (change_key, change_val) in version.image_coverage.range(start..end) { let kr = Key::from_i128(current_key)..Key::from_i128(change_key); - coverage.push(( - kr, - current_val - .take() - .map(|l| self.get_layer_from_mapping(&l.key()).clone()), - )); + coverage.push((kr, current_val.take())); current_key = change_key; current_val = change_val.clone(); } // Add the final interval let kr = Key::from_i128(current_key)..Key::from_i128(end); - coverage.push(( - kr, - current_val - .take() - .map(|l| self.get_layer_from_mapping(&l.key()).clone()), - )); + coverage.push((kr, current_val.take())); Ok(coverage) } - pub fn is_l0(layer: &L) -> bool { + pub fn is_l0(layer: &PersistentLayerDesc) -> bool { range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX)) } @@ -537,7 +390,7 @@ where /// TODO The optimal number should probably be slightly higher than 1, but to /// implement that we need to plumb a lot more context into this function /// than just the current partition_range. - pub fn is_reimage_worthy(layer: &L, partition_range: &Range) -> bool { + pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range) -> bool { // Case 1 if !Self::is_l0(layer) { return true; @@ -595,9 +448,7 @@ where let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = - Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) - as usize; + let base_count = Self::is_reimage_worthy(&val, key) as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; @@ -620,9 +471,7 @@ where let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = - Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) - as usize; + let base_count = Self::is_reimage_worthy(&val, key) as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; max_stacked_deltas = std::cmp::max( @@ -772,12 +621,8 @@ where } /// Return all L0 delta layers - pub fn get_level0_deltas(&self) -> Result>> { - Ok(self - .l0_delta_layers - .iter() - .map(|x| self.get_layer_from_mapping(&x.key()).clone()) - .collect()) + pub fn get_level0_deltas(&self) -> Result>> { + Ok(self.l0_delta_layers.to_vec()) } /// debugging function to print out the contents of the layer map @@ -802,72 +647,51 @@ where println!("End dump LayerMap"); Ok(()) } - - /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. - /// - /// Returns `true` if the two `Arc` point to the same layer, false otherwise. - #[inline(always)] - pub fn compare_arced_layers(left: &Arc, right: &Arc) -> bool { - // "dyn Trait" objects are "fat pointers" in that they have two components: - // - pointer to the object - // - pointer to the vtable - // - // rust does not provide a guarantee that these vtables are unique, but however - // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the - // pointer and the vtable need to be equal. - // - // See: https://github.com/rust-lang/rust/issues/103763 - // - // A future version of rust will most likely use this form below, where we cast each - // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it - // not affect the comparison. - // - // See: https://github.com/rust-lang/rust/pull/106450 - let left = Arc::as_ptr(left) as *const (); - let right = Arc::as_ptr(right) as *const (); - - left == right - } } #[cfg(test)] mod tests { - use super::{LayerMap, Replacement}; - use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; + use super::LayerMap; + use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName}; use std::str::FromStr; use std::sync::Arc; mod l0_delta_layers_updated { + use crate::tenant::{ + storage_layer::{PersistentLayer, PersistentLayerDesc}, + timeline::LayerFileManager, + }; + use super::*; #[test] fn for_full_range_delta() { // l0_delta_layers are used by compaction, and should observe all buffered updates l0_delta_layers_updated_scenario( - "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", - true - ) + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", + true + ) } #[test] fn for_non_full_range_delta() { // has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta l0_delta_layers_updated_scenario( - "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", - // because not full range - false - ) + "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", + // because not full range + false + ) } #[test] fn for_image() { l0_delta_layers_updated_scenario( - "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", - // code only checks if it is a full range layer, doesn't care about images, which must - // mean we should in practice never have full range images - false - ) + "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", + // code only checks if it is a full range layer, doesn't care about images, which must + // mean we should in practice never have full range images + false + ) } #[test] @@ -883,16 +707,16 @@ mod tests { let not_found = Arc::new(layer.clone()); let new_version = Arc::new(layer); - let mut map = LayerMap::default(); + // after the immutable storage state refactor, the replace operation + // will not use layer map any more. We keep it here for consistency in test cases + // and can remove it in the future. + let _map = LayerMap::default(); - let res = map.batch_update().replace_historic( - not_found.get_persistent_layer_desc(), - ¬_found, - new_version.get_persistent_layer_desc(), - new_version, - ); + let mut mapping = LayerFileManager::new(); - assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}"); + mapping + .replace_and_verify(not_found, new_version) + .unwrap_err(); } fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { @@ -903,49 +727,44 @@ mod tests { let downloaded = Arc::new(skeleton); let mut map = LayerMap::default(); + let mut mapping = LayerFileManager::new(); // two disjoint Arcs in different lifecycle phases. even if it seems they must be the // same layer, we use LayerMap::compare_arced_layers as the identity of layers. - assert!(!LayerMap::compare_arced_layers(&remote, &downloaded)); + assert_eq!(remote.layer_desc(), downloaded.layer_desc()); let expected_in_counts = (1, usize::from(expected_l0)); map.batch_update() - .insert_historic(remote.get_persistent_layer_desc(), remote.clone()); - assert_eq!(count_layer_in(&map, &remote), expected_in_counts); - - let replaced = map - .batch_update() - .replace_historic( - remote.get_persistent_layer_desc(), - &remote, - downloaded.get_persistent_layer_desc(), - downloaded.clone(), - ) - .expect("name derived attributes are the same"); - assert!( - matches!(replaced, Replacement::Replaced { .. }), - "{replaced:?}" + .insert_historic(remote.layer_desc().clone()); + mapping.insert(remote.clone()); + assert_eq!( + count_layer_in(&map, remote.layer_desc()), + expected_in_counts + ); + + mapping + .replace_and_verify(remote, downloaded.clone()) + .expect("name derived attributes are the same"); + assert_eq!( + count_layer_in(&map, downloaded.layer_desc()), + expected_in_counts ); - assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts); map.batch_update() - .remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); - assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); + .remove_historic(downloaded.layer_desc().clone()); + assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0)); } - fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { + fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) { let historic = map .iter_historic_layers() - .filter(|x| LayerMap::compare_arced_layers(x, layer)) + .filter(|x| x.key() == layer.key()) .count(); let l0s = map .get_level0_deltas() .expect("why does this return a result"); - let l0 = l0s - .iter() - .filter(|x| LayerMap::compare_arced_layers(x, layer)) - .count(); + let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count(); (historic, l0) } diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index 49dcbc63c2..0f51597027 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -3,6 +3,8 @@ use std::ops::Range; use tracing::info; +use crate::tenant::storage_layer::PersistentLayerDesc; + use super::layer_coverage::LayerCoverageTuple; /// Layers in this module are identified and indexed by this data. @@ -41,8 +43,8 @@ impl Ord for LayerKey { } } -impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerKey { - fn from(layer: &'a L) -> Self { +impl From<&PersistentLayerDesc> for LayerKey { + fn from(layer: &PersistentLayerDesc) -> Self { let kr = layer.get_key_range(); let lr = layer.get_lsn_range(); LayerKey { @@ -454,59 +456,6 @@ impl BufferedHistoricLayerCoverage { self.buffer.insert(layer_key, None); } - /// Replaces a previous layer with a new layer value. - /// - /// The replacement is conditional on: - /// - there is an existing `LayerKey` record - /// - there is no buffered removal for the given `LayerKey` - /// - the given closure returns true for the current `Value` - /// - /// The closure is used to compare the latest value (buffered insert, or existing layer) - /// against some expectation. This allows to use `Arc::ptr_eq` or similar which would be - /// inaccessible via `PartialEq` trait. - /// - /// Returns a `Replacement` value describing the outcome; only the case of - /// `Replacement::Replaced` modifies the map and requires a rebuild. - pub fn replace( - &mut self, - layer_key: &LayerKey, - new: Value, - check_expected: F, - ) -> Replacement - where - F: FnOnce(&Value) -> bool, - { - let (slot, in_buffered) = match self.buffer.get(layer_key) { - Some(inner @ Some(_)) => { - // we compare against the buffered version, because there will be a later - // rebuild before querying - (inner.as_ref(), true) - } - Some(None) => { - // buffer has removal for this key; it will not be equivalent by any check_expected. - return Replacement::RemovalBuffered; - } - None => { - // no pending modification for the key, check layers - (self.layers.get(layer_key), false) - } - }; - - match slot { - Some(existing) if !check_expected(existing) => { - // unfortunate clone here, but otherwise the nll borrowck grows the region of - // 'a to cover the whole function, and we could not mutate in the other - // Some(existing) branch - Replacement::Unexpected(existing.clone()) - } - None => Replacement::NotFound, - Some(_existing) => { - self.insert(layer_key.to_owned(), new); - Replacement::Replaced { in_buffered } - } - } - } - pub fn rebuild(&mut self) { // Find the first LSN that needs to be rebuilt let rebuild_since: u64 = match self.buffer.iter().next() { @@ -575,22 +524,6 @@ impl BufferedHistoricLayerCoverage { } } -/// Outcome of the replace operation. -#[derive(Debug)] -pub enum Replacement { - /// Previous value was replaced with the new value. - Replaced { - /// Replacement happened for a scheduled insert. - in_buffered: bool, - }, - /// Key was not found buffered updates or existing layers. - NotFound, - /// Key has been scheduled for removal, it was not replaced. - RemovalBuffered, - /// Previous value was rejected by the closure. - Unexpected(Value), -} - #[test] fn test_retroactive_regression_1() { let mut map = BufferedHistoricLayerCoverage::new(); @@ -699,139 +632,3 @@ fn test_retroactive_simple() { assert_eq!(version.image_coverage.query(8), Some("Image 4".to_string())); } } - -#[test] -fn test_retroactive_replacement() { - let mut map = BufferedHistoricLayerCoverage::new(); - - let keys = [ - LayerKey { - key: 0..5, - lsn: 100..101, - is_image: true, - }, - LayerKey { - key: 3..9, - lsn: 110..111, - is_image: true, - }, - LayerKey { - key: 4..6, - lsn: 120..121, - is_image: true, - }, - ]; - - let layers = [ - "Image 1".to_string(), - "Image 2".to_string(), - "Image 3".to_string(), - ]; - - for (key, layer) in keys.iter().zip(layers.iter()) { - map.insert(key.to_owned(), layer.to_owned()); - } - - // rebuild is not necessary here, because replace works for both buffered updates and existing - // layers. - - for (key, orig_layer) in keys.iter().zip(layers.iter()) { - let replacement = format!("Remote {orig_layer}"); - - // evict - let ret = map.replace(key, replacement.clone(), |l| l == orig_layer); - assert!( - matches!(ret, Replacement::Replaced { .. }), - "replace {orig_layer}: {ret:?}" - ); - map.rebuild(); - - let at = key.lsn.end + 1; - - let version = map.get().expect("rebuilt").get_version(at).unwrap(); - assert_eq!( - version.image_coverage.query(4).as_deref(), - Some(replacement.as_str()), - "query for 4 at version {at} after eviction", - ); - - // download - let ret = map.replace(key, orig_layer.clone(), |l| l == &replacement); - assert!( - matches!(ret, Replacement::Replaced { .. }), - "replace {orig_layer} back: {ret:?}" - ); - map.rebuild(); - let version = map.get().expect("rebuilt").get_version(at).unwrap(); - assert_eq!( - version.image_coverage.query(4).as_deref(), - Some(orig_layer.as_str()), - "query for 4 at version {at} after download", - ); - } -} - -#[test] -fn missing_key_is_not_inserted_with_replace() { - let mut map = BufferedHistoricLayerCoverage::new(); - let key = LayerKey { - key: 0..5, - lsn: 100..101, - is_image: true, - }; - - let ret = map.replace(&key, "should not replace", |_| true); - assert!(matches!(ret, Replacement::NotFound), "{ret:?}"); - map.rebuild(); - assert!(map - .get() - .expect("no changes to rebuild") - .get_version(102) - .is_none()); -} - -#[test] -fn replacing_buffered_insert_and_remove() { - let mut map = BufferedHistoricLayerCoverage::new(); - let key = LayerKey { - key: 0..5, - lsn: 100..101, - is_image: true, - }; - - map.insert(key.clone(), "Image 1"); - let ret = map.replace(&key, "Remote Image 1", |&l| l == "Image 1"); - assert!( - matches!(ret, Replacement::Replaced { in_buffered: true }), - "{ret:?}" - ); - map.rebuild(); - - assert_eq!( - map.get() - .expect("rebuilt") - .get_version(102) - .unwrap() - .image_coverage - .query(4), - Some("Remote Image 1") - ); - - map.remove(key.clone()); - let ret = map.replace(&key, "should not replace", |_| true); - assert!( - matches!(ret, Replacement::RemovalBuffered), - "cannot replace after scheduled remove: {ret:?}" - ); - - map.rebuild(); - - let ret = map.replace(&key, "should not replace", |_| true); - assert!( - matches!(ret, Replacement::NotFound), - "cannot replace after remove + rebuild: {ret:?}" - ); - - let at_version = map.get().expect("rebuilt").get_version(102); - assert!(at_version.is_none()); -} diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 0af3d4ce39..7bc513b3a1 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -176,13 +176,10 @@ impl LayerAccessStats { /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. /// /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. - pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + pub(crate) fn for_loading_layer( + layer_map_lock_held_witness: &BatchedUpdates<'_>, status: LayerResidenceStatus, - ) -> Self - where - L: ?Sized + Layer, - { + ) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( layer_map_lock_held_witness, @@ -197,14 +194,11 @@ impl LayerAccessStats { /// The `new_status` is not recorded in `self`. /// /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. - pub(crate) fn clone_for_residence_change( + pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + layer_map_lock_held_witness: &BatchedUpdates<'_>, new_status: LayerResidenceStatus, - ) -> LayerAccessStats - where - L: ?Sized + Layer, - { + ) -> LayerAccessStats { let clone = { let inner = self.0.lock().unwrap(); inner.clone() @@ -232,14 +226,12 @@ impl LayerAccessStats { /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. /// - pub(crate) fn record_residence_event( + pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + _layer_map_lock_held_witness: &BatchedUpdates<'_>, status: LayerResidenceStatus, reason: LayerResidenceEventReason, - ) where - L: ?Sized + Layer, - { + ) { let mut locked = self.0.lock().unwrap(); locked.iter_mut().for_each(|inner| { inner @@ -473,94 +465,125 @@ pub fn downcast_remote_layer( } } -/// Holds metadata about a layer without any content. Used mostly for testing. -/// -/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a -/// LayerDescriptor. -#[derive(Clone, Debug)] -pub struct LayerDescriptor { - pub key: Range, - pub lsn: Range, - pub is_incremental: bool, - pub short_id: String, -} +pub mod tests { + use super::*; -impl LayerDescriptor { - /// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, - /// and the tenant / timeline id does not matter. - pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { - PersistentLayerDesc::new_delta( - TenantId::from_array([0; 16]), - TimelineId::from_array([0; 16]), - self.key.clone(), - self.lsn.clone(), - 233, - ) - } -} - -impl Layer for LayerDescriptor { - fn get_key_range(&self) -> Range { - self.key.clone() + /// Holds metadata about a layer without any content. Used mostly for testing. + /// + /// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a + /// LayerDescriptor. + #[derive(Clone, Debug)] + pub struct LayerDescriptor { + base: PersistentLayerDesc, } - fn get_lsn_range(&self) -> Range { - self.lsn.clone() - } - - fn is_incremental(&self) -> bool { - self.is_incremental - } - - fn get_value_reconstruct_data( - &self, - _key: Key, - _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { - todo!("This method shouldn't be part of the Layer trait") - } - - fn short_id(&self) -> String { - self.short_id.clone() - } - - fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { - todo!() - } -} - -impl From for LayerDescriptor { - fn from(value: DeltaFileName) -> Self { - let short_id = value.to_string(); - LayerDescriptor { - key: value.key_range, - lsn: value.lsn_range, - is_incremental: true, - short_id, + impl From for LayerDescriptor { + fn from(base: PersistentLayerDesc) -> Self { + Self { base } } } -} -impl From for LayerDescriptor { - fn from(value: ImageFileName) -> Self { - let short_id = value.to_string(); - let lsn = value.lsn_as_range(); - LayerDescriptor { - key: value.key_range, - lsn, - is_incremental: false, - short_id, + impl Layer for LayerDescriptor { + fn get_value_reconstruct_data( + &self, + _key: Key, + _lsn_range: Range, + _reconstruct_data: &mut ValueReconstructState, + _ctx: &RequestContext, + ) -> Result { + todo!("This method shouldn't be part of the Layer trait") + } + + fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { + todo!() + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn get_key_range(&self) -> Range { + self.layer_desc().key_range.clone() + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn get_lsn_range(&self) -> Range { + self.layer_desc().lsn_range.clone() + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn is_incremental(&self) -> bool { + self.layer_desc().is_incremental + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn short_id(&self) -> String { + self.layer_desc().short_id() } } -} -impl From for LayerDescriptor { - fn from(value: LayerFileName) -> Self { - match value { - LayerFileName::Delta(d) => Self::from(d), - LayerFileName::Image(i) => Self::from(i), + impl PersistentLayer for LayerDescriptor { + fn layer_desc(&self) -> &PersistentLayerDesc { + &self.base + } + + fn local_path(&self) -> Option { + unimplemented!() + } + + fn iter(&self, _: &RequestContext) -> Result> { + unimplemented!() + } + + fn key_iter(&self, _: &RequestContext) -> Result> { + unimplemented!() + } + + fn delete_resident_layer_file(&self) -> Result<()> { + unimplemented!() + } + + fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo { + unimplemented!() + } + + fn access_stats(&self) -> &LayerAccessStats { + unimplemented!() + } + } + + impl From for LayerDescriptor { + fn from(value: DeltaFileName) -> Self { + LayerDescriptor { + base: PersistentLayerDesc::new_delta( + TenantId::from_array([0; 16]), + TimelineId::from_array([0; 16]), + value.key_range, + value.lsn_range, + 233, + ), + } + } + } + + impl From for LayerDescriptor { + fn from(value: ImageFileName) -> Self { + LayerDescriptor { + base: PersistentLayerDesc::new_img( + TenantId::from_array([0; 16]), + TimelineId::from_array([0; 16]), + value.key_range, + value.lsn, + false, + 233, + ), + } + } + } + + impl From for LayerDescriptor { + fn from(value: LayerFileName) -> Self { + match value { + LayerFileName::Delta(d) => Self::from(d), + LayerFileName::Image(i) => Self::from(i), + } } } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 387bae5b1f..9d423ed815 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -218,15 +218,12 @@ impl RemoteLayer { } /// Create a Layer struct representing this layer, after it has been downloaded. - pub fn create_downloaded_layer( + pub fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + layer_map_lock_held_witness: &BatchedUpdates<'_>, conf: &'static PageServerConf, file_size: u64, - ) -> Arc - where - L: ?Sized + Layer, - { + ) -> Arc { if self.desc.is_delta { let fname = self.desc.delta_file_name(); Arc::new(DeltaLayer::new( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 192aa6e22d..39c72a7e47 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3,7 +3,7 @@ mod eviction_task; mod walreceiver; -use anyhow::{anyhow, bail, ensure, Context}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use fail::fail_point; use futures::StreamExt; @@ -85,7 +85,9 @@ use super::config::TenantConf; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; -use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; +use super::storage_layer::{ + DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey, +}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { @@ -118,6 +120,92 @@ impl PartialOrd for Hole { } } +pub struct LayerFileManager(HashMap>); + +impl LayerFileManager { + fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + // The assumption for the `expect()` is that all code maintains the following invariant: + // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. + self.0 + .get(&desc.key()) + .with_context(|| format!("get layer from desc: {}", desc.filename().file_name())) + .expect("not found") + .clone() + } + + pub(crate) fn insert(&mut self, layer: Arc) { + let present = self.0.insert(layer.layer_desc().key(), layer.clone()); + if present.is_some() && cfg!(debug_assertions) { + panic!("overwriting a layer: {:?}", layer.layer_desc()) + } + } + + pub(crate) fn new() -> Self { + Self(HashMap::new()) + } + + pub(crate) fn remove(&mut self, layer: Arc) { + let present = self.0.remove(&layer.layer_desc().key()); + if present.is_none() && cfg!(debug_assertions) { + panic!( + "removing layer that is not present in layer mapping: {:?}", + layer.layer_desc() + ) + } + } + + pub(crate) fn replace_and_verify( + &mut self, + expected: Arc, + new: Arc, + ) -> Result<()> { + let key = expected.layer_desc().key(); + let other = new.layer_desc().key(); + + let expected_l0 = LayerMap::is_l0(expected.layer_desc()); + let new_l0 = LayerMap::is_l0(new.layer_desc()); + + fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!( + "layermap-replace-notfound" + )); + + anyhow::ensure!( + key == other, + "expected and new layer have different keys: {key:?} != {other:?}" + ); + + anyhow::ensure!( + expected_l0 == new_l0, + "one layer is l0 while the other is not: {expected_l0} != {new_l0}" + ); + + if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) { + anyhow::ensure!( + compare_arced_layers(&expected, layer), + "another layer was found instead of expected, expected={expected:?}, new={new:?}", + expected = Arc::as_ptr(&expected), + new = Arc::as_ptr(layer), + ); + *layer = new; + Ok(()) + } else { + anyhow::bail!("layer was not found"); + } + } +} + +/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. +/// Can be removed after all refactors are done. +fn drop_rlock(rlock: tokio::sync::OwnedRwLockReadGuard) { + drop(rlock) +} + +/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. +/// Can be removed after all refactors are done. +fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { + drop(rlock) +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -129,7 +217,24 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: Arc>>, + /// The tuple has two elements. + /// 1. `LayerFileManager` keeps track of the various physical representations of the layer files (inmem, local, remote). + /// 2. `LayerMap`, the acceleration data structure for `get_reconstruct_data`. + /// + /// `LayerMap` maps out the `(PAGE,LSN) / (KEY,LSN)` space, which is composed of `(KeyRange, LsnRange)` rectangles. + /// We describe these rectangles through the `PersistentLayerDesc` struct. + /// + /// When we want to reconstruct a page, we first find the `PersistentLayerDesc`'s that we need for page reconstruction, + /// using `LayerMap`. Then, we use `LayerFileManager` to get the `PersistentLayer`'s that correspond to the + /// `PersistentLayerDesc`'s. + /// + /// Hence, it's important to keep things coherent. The `LayerFileManager` must always have an entry for all + /// `PersistentLayerDesc`'s in the `LayerMap`. If it doesn't, `LayerFileManager::get_from_desc` will panic at + /// runtime, e.g., during page reconstruction. + /// + /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`, + /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`. + pub(crate) layers: Arc>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -599,7 +704,8 @@ impl Timeline { /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. pub async fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().await; + let guard = self.layers.read().await; + let (layer_map, _) = &*guard; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -909,7 +1015,8 @@ impl Timeline { pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, _) = &*guard; let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1040,7 +1147,8 @@ impl Timeline { } pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().await; + let guard = self.layers.read().await; + let (layer_map, mapping) = &*guard; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1051,6 +1159,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { + let historic_layer = mapping.get_from_desc(&historic_layer); historic_layers.push(historic_layer.info(reset)); } @@ -1160,7 +1269,8 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layer_map, mapping) = &mut *guard; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1169,14 +1279,19 @@ impl Timeline { let res = if cancel.is_cancelled() { None } else { - Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) + Some(self.evict_layer_batch_impl( + &layer_removal_guard, + l, + &mut batch_updates, + mapping, + )) }; results.push(res); } // commit the updates & release locks batch_updates.flush(); - drop(layer_map); + drop_wlock(guard); drop(layer_removal_guard); assert_eq!(results.len(), layers_to_evict.len()); @@ -1187,10 +1302,9 @@ impl Timeline { &self, _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, local_layer: &Arc, - batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, + batch_updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerFileManager, ) -> anyhow::Result { - use super::layer_map::Replacement; - if local_layer.is_remote_layer() { // TODO(issue #3851): consider returning an err here instead of false, // which is the same out the match later @@ -1238,13 +1352,10 @@ impl Timeline { ), }); - let replaced = match batch_updates.replace_historic( - local_layer.layer_desc().clone(), - local_layer, - new_remote_layer.layer_desc().clone(), - new_remote_layer, - )? { - Replacement::Replaced { .. } => { + assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); + + let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) { + Ok(()) => { if let Err(e) = local_layer.delete_resident_layer_file() { error!("failed to remove layer file on evict after replacement: {e:#?}"); } @@ -1277,24 +1388,17 @@ impl Timeline { true } - Replacement::NotFound => { - debug!(evicted=?local_layer, "layer was no longer in layer map"); - false - } - Replacement::RemovalBuffered => { - unreachable!("not doing anything else in this batch") - } - Replacement::Unexpected(other) => { - error!( - local_layer.ptr=?Arc::as_ptr(local_layer), - other.ptr=?Arc::as_ptr(&other), - ?other, - "failed to replace"); + Err(err) => { + if cfg!(debug_assertions) { + panic!("failed to replace: {err}, evicted: {local_layer:?}"); + } else { + error!(evicted=?local_layer, "failed to replace: {err}"); + } false } }; - Ok(replaced) + Ok(succeed) } } @@ -1421,7 +1525,10 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())), + layers: Arc::new(tokio::sync::RwLock::new(( + LayerMap::default(), + LayerFileManager::new(), + ))), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1606,14 +1713,15 @@ impl Timeline { let mut layers = self.layers.try_write().expect( "in the context where we call this function, no other task has access to the object", ); - layers.next_open_layer_at = Some(Lsn(start_lsn.0)); + layers.0.next_open_layer_at = Some(Lsn(start_lsn.0)); } /// /// Scan the timeline directory to populate the layer map. /// pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1656,7 +1764,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); + self.insert_historic_layer(Arc::new(layer), &mut updates, mapping); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1688,7 +1796,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); + self.insert_historic_layer(Arc::new(layer), &mut updates, mapping); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1742,7 +1850,8 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layer_map, mapping) = &mut *guard; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1787,7 +1896,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer.layer_desc().clone(), local_layer); + self.remove_historic_layer(local_layer, &mut updates, mapping); // fall-through to adding the remote layer } } else { @@ -1826,7 +1935,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); + self.insert_historic_layer(remote_layer, &mut updates, mapping); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1853,7 +1962,7 @@ impl Timeline { ), ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); + self.insert_historic_layer(remote_layer, &mut updates, mapping); } } } @@ -1892,13 +2001,14 @@ impl Timeline { let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); - let local_layers = self - .layers - .read() - .await - .iter_historic_layers() - .map(|l| (l.filename(), l)) - .collect::>(); + let local_layers = { + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; + layers + .iter_historic_layers() + .map(|l| (l.filename(), mapping.get_from_desc(&l))) + .collect::>() + }; // If no writes happen, new branches do not have any layers, only the metadata file. let has_local_layers = !local_layers.is_empty(); @@ -2269,25 +2379,53 @@ impl Timeline { } async fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().await.iter_historic_layers() { + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; + for historic_layer in layers.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { - return Some(historic_layer); + return Some(mapping.get_from_desc(&historic_layer)); } } None } + /// Helper function to insert a layer from both layer map and layer file manager. Will be removed in the future + /// after we introduce `LayerMapManager`. + fn insert_historic_layer( + &self, + layer: Arc, + updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerFileManager, + ) { + updates.insert_historic(layer.layer_desc().clone()); + mapping.insert(layer); + } + + /// Helper function to remove a layer from both layer map and layer file manager. Will be removed in the future + /// after we introduce `LayerMapManager`. + fn remove_historic_layer( + &self, + layer: Arc, + updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerFileManager, + ) { + updates.remove_historic(layer.layer_desc().clone()); + mapping.remove(layer); + } + /// Removes the layer from local FS (if present) and from memory. /// Remote storage is not affected by this operation. fn delete_historic_layer( &self, // we cannot remove layers otherwise, since gc and compaction will race _layer_removal_cs: Arc>, - layer: Arc, - updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, + layer: Arc, + updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerFileManager, ) -> anyhow::Result<()> { + let layer = mapping.get_from_desc(&layer); if !layer.is_remote_layer() { layer.delete_resident_layer_file()?; let layer_file_size = layer.file_size(); @@ -2301,7 +2439,8 @@ impl Timeline { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer.layer_desc().clone(), layer); + updates.remove_historic(layer.layer_desc().clone()); + mapping.remove(layer); Ok(()) } @@ -2486,7 +2625,8 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().await; + let guard = timeline.layers.read().await; + let (layers, mapping) = &*guard; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2548,6 +2688,7 @@ impl Timeline { } if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { + let layer = mapping.get_from_desc(&layer); // If it's a remote layer, download it and retry. if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) @@ -2669,7 +2810,8 @@ impl Timeline { /// Get a handle to the latest layer for appending. /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, _) = &mut *guard; ensure!(lsn.is_aligned()); @@ -2746,7 +2888,8 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, _) = &mut *guard; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2760,7 +2903,7 @@ impl Timeline { layers.next_open_layer_at = Some(end_lsn); self.last_freeze_at.store(end_lsn); } - drop(layers); + drop_wlock(guard); } /// Layer flusher task's main loop. @@ -2784,7 +2927,8 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, _) = &*guard; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2907,15 +3051,17 @@ impl Timeline { fail_point!("flush-frozen-before-sync"); // The new on-disk layers are now in the layer map. We can remove the - // in-memory layer from the map now. + // in-memory layer from the map now. We do not modify `LayerFileManager` because + // it only contains persistent layers. The flushed layer is stored in + // the mapping in `create_delta_layer`. { let mut layers = self.layers.write().await; - let l = layers.frozen_layers.pop_front(); + let l = layers.0.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this // timeline). If two threads tried to flush the same frozen // layer to disk at the same time, that would not work. - assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer)); + assert!(compare_arced_layers(&l.unwrap(), &frozen_layer)); // release lock on 'layers' } @@ -3048,14 +3194,15 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - batch_updates.insert_historic(l.layer_desc().clone(), l); + self.insert_historic_layer(l, &mut batch_updates, mapping); batch_updates.flush(); // update metrics @@ -3104,7 +3251,8 @@ impl Timeline { ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, _) = &*guard; let mut max_deltas = 0; { @@ -3282,7 +3430,8 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -3304,10 +3453,10 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(l.layer_desc().clone(), l); + self.insert_historic_layer(l, &mut updates, mapping); } updates.flush(); - drop(layers); + drop_wlock(guard); timer.stop_and_record(); Ok(layer_paths_to_upload) @@ -3317,7 +3466,7 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { new_layers: Vec, - deltas_to_compact: Vec>, + deltas_to_compact: Vec>, } /// Top-level failure to compact. @@ -3468,14 +3617,19 @@ impl Timeline { fn compact_level0_phase1( self: Arc, _layer_removal_cs: Arc>, - layers: tokio::sync::OwnedRwLockReadGuard>, + guard: tokio::sync::OwnedRwLockReadGuard<(LayerMap, LayerFileManager)>, mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, ctx: &RequestContext, ) -> Result { stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_acquisition_micros.till_now(); // set by caller - let mut level0_deltas = layers.get_level0_deltas()?; + let (layers, mapping) = &*guard; + let level0_deltas = layers.get_level0_deltas()?; + let mut level0_deltas = level0_deltas + .into_iter() + .map(|x| mapping.get_from_desc(&x)) + .collect_vec(); stats.level0_deltas_count = Some(level0_deltas.len()); // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); @@ -3595,7 +3749,7 @@ impl Timeline { } stats.read_lock_held_compute_holes_micros = stats.read_lock_held_prerequisites_micros.till_now(); - drop(layers); + drop_rlock(guard); stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); @@ -3822,7 +3976,10 @@ impl Timeline { Ok(CompactLevel0Phase1Result { new_layers, - deltas_to_compact, + deltas_to_compact: deltas_to_compact + .into_iter() + .map(|x| Arc::new(x.layer_desc().clone())) + .collect(), }) } @@ -3886,7 +4043,8 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3918,7 +4076,7 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(x.layer_desc().clone(), x); + self.insert_historic_layer(x, &mut updates, mapping); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -3926,10 +4084,13 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; + // NB: the layer file identified by descriptor `l` is guaranteed to be present + // in the LayerFileManager because we kept holding `layer_removal_cs` the entire + // time, even though we dropped `Timeline::layers` inbetween. + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?; } updates.flush(); - drop(layers); + drop_wlock(guard); // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { @@ -4146,7 +4307,8 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4225,7 +4387,7 @@ impl Timeline { // delta layers. Image layers can form "stairs" preventing old image from been deleted. // But image layers are in any case less sparse than delta layers. Also we need some // protection from replacing recent image layers with new one after each GC iteration. - if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) { + if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) { wanted_image_layers.add_range(l.get_key_range()); } result.layers_not_updated += 1; @@ -4262,6 +4424,7 @@ impl Timeline { layer_removal_cs.clone(), doomed_layer, &mut updates, + mapping, )?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } @@ -4446,42 +4609,15 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().await; - let mut updates = layers.batch_update(); - let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); + let mut guard = self_clone.layers.write().await; + let (layers, mapping) = &mut *guard; + let updates = layers.batch_update(); + let new_layer = + remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { - use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); - let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) { - Ok(Replacement::Replaced { .. }) => false, - Ok(Replacement::NotFound) => { - // TODO: the downloaded file should probably be removed, otherwise - // it will be added to the layermap on next load? we should - // probably restart any get_reconstruct_data search as well. - // - // See: https://github.com/neondatabase/neon/issues/3533 - error!("replacing downloaded layer into layermap failed because layer was not found"); - true - } - Ok(Replacement::RemovalBuffered) => { - unreachable!("current implementation does not remove anything") - } - Ok(Replacement::Unexpected(other)) => { - // if the other layer would have the same pointer value as - // expected, it means they differ only on vtables. - // - // otherwise there's no known reason for this to happen as - // compacted layers should have different covering rectangle - // leading to produce Replacement::NotFound. - - error!( - expected.ptr = ?Arc::as_ptr(&l), - other.ptr = ?Arc::as_ptr(&other), - ?other, - "replacing downloaded layer into layermap failed because another layer was found instead of expected" - ); - true - } + let failure = match mapping.replace_and_verify(l, new_layer) { + Ok(()) => false, Err(e) => { // this is a precondition failure, the layer filename derived // attributes didn't match up, which doesn't seem likely. @@ -4509,7 +4645,7 @@ impl Timeline { } } updates.flush(); - drop(layers); + drop_wlock(guard); info!("on-demand download successful"); @@ -4520,7 +4656,10 @@ impl Timeline { remote_layer.ongoing_download.close(); } else { // Keep semaphore open. We'll drop the permit at the end of the function. - error!("layer file download failed: {:?}", result.as_ref().unwrap_err()); + error!( + "layer file download failed: {:?}", + result.as_ref().unwrap_err() + ); } // Don't treat it as an error if the task that triggered the download @@ -4534,7 +4673,8 @@ impl Timeline { drop(permit); Ok(()) - }.in_current_span(), + } + .in_current_span(), ); receiver.await.context("download task cancelled")? @@ -4604,9 +4744,11 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; layers .iter_historic_layers() + .map(|l| mapping.get_from_desc(&l)) .filter_map(|l| l.downcast_remote_layer()) .map(|l| self.download_remote_layer(l)) .for_each(|dl| downloads.push(dl)) @@ -4707,7 +4849,8 @@ impl LocalLayerInfoForDiskUsageEviction { impl Timeline { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4716,6 +4859,8 @@ impl Timeline { let file_size = l.file_size(); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); + let l = mapping.get_from_desc(&l); + if l.is_remote_layer() { continue; } @@ -4874,3 +5019,31 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() { ), } } + +/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. +/// +/// Returns `true` if the two `Arc` point to the same layer, false otherwise. +/// +/// If comparing persistent layers, ALWAYS compare the layer descriptor key. +#[inline(always)] +pub fn compare_arced_layers(left: &Arc, right: &Arc) -> bool { + // "dyn Trait" objects are "fat pointers" in that they have two components: + // - pointer to the object + // - pointer to the vtable + // + // rust does not provide a guarantee that these vtables are unique, but however + // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the + // pointer and the vtable need to be equal. + // + // See: https://github.com/rust-lang/rust/issues/103763 + // + // A future version of rust will most likely use this form below, where we cast each + // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it + // not affect the comparison. + // + // See: https://github.com/rust-lang/rust/pull/106450 + let left = Arc::as_ptr(left) as *const (); + let right = Arc::as_ptr(right) as *const (); + + left == right +} diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 80c5210211..03cf2d89ad 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -197,9 +197,11 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { + let hist_layer = mapping.get_from_desc(&hist_layer); if hist_layer.is_remote_layer() { continue; } diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py index c26ec76172..a30f0f02e1 100644 --- a/test_runner/regress/test_ondemand_download.py +++ b/test_runner/regress/test_ondemand_download.py @@ -713,9 +713,7 @@ def test_ondemand_download_failure_to_replace( # error message is not useful pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2) - actual_message = ( - ".* ERROR .*replacing downloaded layer into layermap failed because layer was not found" - ) + actual_message = ".* ERROR .*layermap-replace-notfound" assert env.pageserver.log_contains(actual_message) is not None env.pageserver.allowed_errors.append(actual_message)