From faee3152f355eed75eaff9f57f234aabcdb2e8bf Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Tue, 6 Jun 2023 17:20:15 -0400 Subject: [PATCH] refactor: use LayerDesc in LayerMap (part 2) Signed-off-by: Alex Chi --- pageserver/benches/bench_layer_map.rs | 31 +- pageserver/src/tenant.rs | 1 + pageserver/src/tenant/layer_map.rs | 371 +++++------------- .../layer_map/historic_layer_coverage.rs | 19 + pageserver/src/tenant/storage_layer.rs | 215 +++++----- .../src/tenant/storage_layer/remote_layer.rs | 9 +- pageserver/src/tenant/timeline.rs | 336 ++++++++++------ .../src/tenant/timeline/eviction_task.rs | 4 +- 8 files changed, 486 insertions(+), 500 deletions(-) diff --git a/pageserver/benches/bench_layer_map.rs b/pageserver/benches/bench_layer_map.rs index 45dc9fad4a..03bb7a5bfd 100644 --- a/pageserver/benches/bench_layer_map.rs +++ b/pageserver/benches/bench_layer_map.rs @@ -1,22 +1,23 @@ use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::repository::Key; use pageserver::tenant::layer_map::LayerMap; -use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; +use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName}; +use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc}; use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use std::cmp::{max, min}; use std::fs::File; use std::io::{BufRead, BufReader}; use std::path::PathBuf; use std::str::FromStr; -use std::sync::Arc; use std::time::Instant; +use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -fn build_layer_map(filename_dump: PathBuf) -> LayerMap { - let mut layer_map = LayerMap::::default(); +fn build_layer_map(filename_dump: PathBuf) -> LayerMap { + let mut layer_map = LayerMap::default(); let mut min_lsn = Lsn(u64::MAX); let mut max_lsn = Lsn(0); @@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { min_lsn = min(min_lsn, lsn_range.start); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); - updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone()); } println!("min: {min_lsn}, max: {max_lsn}"); @@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap { } /// Construct a layer map query pattern for benchmarks -fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { +fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> { // For each image layer we query one of the pages contained, at LSN right // before the image layer was created. This gives us a somewhat uniform // coverage of both the lsn and key space because image layers have @@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn // Construct a partitioning for testing get_difficulty map when we // don't have an exact result of `collect_keyspace` to work with. -fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { +fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning { let mut parts = Vec::new(); // We add a partition boundary at the start of each image layer, @@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) { for i in 0..100_000 { let i32 = (i as u32) % 100; let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); - let layer = LayerDescriptor { - key: zero.add(10 * i32)..zero.add(10 * i32 + 1), - lsn: Lsn(i)..Lsn(i + 1), - is_incremental: false, - short_id: format!("Layer {}", i), - }; - updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); + let layer = LayerDescriptor::from(PersistentLayerDesc::new_img( + TenantId::generate(), + TimelineId::generate(), + zero.add(10 * i32)..zero.add(10 * i32 + 1), + Lsn(i), + false, + 0, + )); + updates.insert_historic(layer.layer_desc().clone()); } updates.flush(); println!("Finished layer map init in {:?}", now.elapsed()); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 5603bcef84..8effb1a238 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -562,6 +562,7 @@ impl Tenant { .layers .read() .await + .0 .iter_historic_layers() .next() .is_some(), diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index ca1a71b623..893e4d52a3 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning; use crate::repository::Key; use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::Layer; -use anyhow::Context; use anyhow::Result; -use std::collections::HashMap; use std::collections::VecDeque; use std::ops::Range; use std::sync::Arc; use utils::lsn::Lsn; use historic_layer_coverage::BufferedHistoricLayerCoverage; -pub use historic_layer_coverage::Replacement; +pub use historic_layer_coverage::{LayerKey, Replacement}; use super::storage_layer::range_eq; use super::storage_layer::PersistentLayerDesc; -use super::storage_layer::PersistentLayerKey; /// /// LayerMap tracks what layers exist on a timeline. /// -pub struct LayerMap { +#[derive(Default)] +pub struct LayerMap { // // 'open_layer' holds the current InMemoryLayer that is accepting new // records. If it is None, 'next_open_layer_at' will be set instead, indicating @@ -95,24 +93,6 @@ pub struct LayerMap { /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. l0_delta_layers: Vec>, - - /// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and - /// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and - /// RemoteLayer will be removed. - mapping: HashMap>, -} - -impl Default for LayerMap { - fn default() -> Self { - Self { - open_layer: None, - next_open_layer_at: None, - frozen_layers: VecDeque::default(), - l0_delta_layers: Vec::default(), - historic: BufferedHistoricLayerCoverage::default(), - mapping: HashMap::default(), - } - } } /// The primary update API for the layer map. @@ -120,24 +100,21 @@ impl Default for LayerMap { /// Batching historic layer insertions and removals is good for /// performance and this struct helps us do that correctly. #[must_use] -pub struct BatchedUpdates<'a, L: ?Sized + Layer> { +pub struct BatchedUpdates<'a> { // While we hold this exclusive reference to the layer map the type checker // will prevent us from accidentally reading any unflushed updates. - layer_map: &'a mut LayerMap, + layer_map: &'a mut LayerMap, } /// Provide ability to batch more updates while hiding the read /// API so we don't accidentally read without flushing. -impl BatchedUpdates<'_, L> -where - L: ?Sized + Layer, -{ +impl BatchedUpdates<'_> { /// /// Insert an on-disk layer. /// // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` - pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { - self.layer_map.insert_historic_noflush(layer_desc, layer) + pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) { + self.layer_map.insert_historic_noflush(layer_desc) } /// @@ -145,31 +122,8 @@ where /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { - self.layer_map.remove_historic_noflush(layer_desc, layer) - } - - /// Replaces existing layer iff it is the `expected`. - /// - /// If the expected layer has been removed it will not be inserted by this function. - /// - /// Returned `Replacement` describes succeeding in replacement or the reason why it could not - /// be done. - /// - /// TODO replacement can be done without buffering and rebuilding layer map updates. - /// One way to do that is to add a layer of indirection for returned values, so - /// that we can replace values only by updating a hashmap. - pub fn replace_historic( - &mut self, - expected_desc: PersistentLayerDesc, - expected: &Arc, - new_desc: PersistentLayerDesc, - new: Arc, - ) -> anyhow::Result>> { - fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound)); - - self.layer_map - .replace_historic_noflush(expected_desc, expected, new_desc, new) + pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) { + self.layer_map.remove_historic_noflush(layer_desc) } // We will flush on drop anyway, but this method makes it @@ -185,25 +139,19 @@ where // than panic later or read without flushing. // // TODO maybe warn if flush hasn't explicitly been called -impl Drop for BatchedUpdates<'_, L> -where - L: ?Sized + Layer, -{ +impl Drop for BatchedUpdates<'_> { fn drop(&mut self) { self.layer_map.flush_updates(); } } /// Return value of LayerMap::search -pub struct SearchResult { - pub layer: Arc, +pub struct SearchResult { + pub layer: Arc, pub lsn_floor: Lsn, } -impl LayerMap -where - L: ?Sized + Layer, -{ +impl LayerMap { /// /// Find the latest layer (by lsn.end) that covers the given /// 'key', with lsn.start < 'end_lsn'. @@ -235,7 +183,7 @@ where /// NOTE: This only searches the 'historic' layers, *not* the /// 'open' and 'frozen' layers! /// - pub fn search(&self, key: Key, end_lsn: Lsn) -> Option> { + pub fn search(&self, key: Key, end_lsn: Lsn) -> Option { let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128()); @@ -244,7 +192,6 @@ where (None, None) => None, (None, Some(image)) => { let lsn_floor = image.get_lsn_range().start; - let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor, @@ -252,7 +199,6 @@ where } (Some(delta), None) => { let lsn_floor = delta.get_lsn_range().start; - let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -263,7 +209,6 @@ where let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_exact_match = img_lsn + 1 == end_lsn; if image_is_newer || image_exact_match { - let image = self.get_layer_from_mapping(&image.key()).clone(); Some(SearchResult { layer: image, lsn_floor: img_lsn, @@ -271,7 +216,6 @@ where } else { let lsn_floor = std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); - let delta = self.get_layer_from_mapping(&delta.key()).clone(); Some(SearchResult { layer: delta, lsn_floor, @@ -282,7 +226,7 @@ where } /// Start a batch of updates, applied on drop - pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> { + pub fn batch_update(&mut self) -> BatchedUpdates<'_> { BatchedUpdates { layer_map: self } } @@ -292,48 +236,32 @@ where /// Helper function for BatchedUpdates::insert_historic /// /// TODO(chi): remove L generic so that we do not need to pass layer object. - pub(self) fn insert_historic_noflush( - &mut self, - layer_desc: PersistentLayerDesc, - layer: Arc, - ) { - self.mapping.insert(layer_desc.key(), layer.clone()); - + pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) { // TODO: See #3869, resulting #4088, attempted fix and repro #4094 - if Self::is_l0(&layer) { + if Self::is_l0(&layer_desc) { self.l0_delta_layers.push(layer_desc.clone().into()); } self.historic.insert( - historic_layer_coverage::LayerKey::from(&*layer), + historic_layer_coverage::LayerKey::from(&layer_desc), layer_desc.into(), ); } - fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc { - let layer = self - .mapping - .get(key) - .with_context(|| format!("{key:?}")) - .expect("inconsistent layer mapping"); - layer - } - /// /// Remove an on-disk layer from the map. /// /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc) { + pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) { self.historic - .remove(historic_layer_coverage::LayerKey::from(&*layer)); - if Self::is_l0(&layer) { + .remove(historic_layer_coverage::LayerKey::from(&layer_desc)); + let layer_key = layer_desc.key(); + if Self::is_l0(&layer_desc) { let len_before = self.l0_delta_layers.len(); let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); - l0_delta_layers.retain(|other| { - !Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer) - }); + l0_delta_layers.retain(|other| other.key() != layer_key); self.l0_delta_layers = l0_delta_layers; // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // there's a chance that the comparison fails at runtime due to it comparing (pointer, @@ -344,69 +272,6 @@ where "failed to locate removed historic layer from l0_delta_layers" ); } - self.mapping.remove(&layer_desc.key()); - } - - pub(self) fn replace_historic_noflush( - &mut self, - expected_desc: PersistentLayerDesc, - expected: &Arc, - new_desc: PersistentLayerDesc, - new: Arc, - ) -> anyhow::Result>> { - let key = historic_layer_coverage::LayerKey::from(&**expected); - let other = historic_layer_coverage::LayerKey::from(&*new); - - let expected_l0 = Self::is_l0(expected); - let new_l0 = Self::is_l0(&new); - - anyhow::ensure!( - key == other, - "expected and new must have equal LayerKeys: {key:?} != {other:?}" - ); - - anyhow::ensure!( - expected_l0 == new_l0, - "expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}" - ); - - let l0_index = if expected_l0 { - // find the index in case replace worked, we need to replace that as well - let pos = self.l0_delta_layers.iter().position(|slot| { - Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected) - }); - - if pos.is_none() { - return Ok(Replacement::NotFound); - } - pos - } else { - None - }; - - let new_desc = Arc::new(new_desc); - let replaced = self.historic.replace(&key, new_desc.clone(), |existing| { - **existing == expected_desc - }); - - if let Replacement::Replaced { .. } = &replaced { - self.mapping.remove(&expected_desc.key()); - self.mapping.insert(new_desc.key(), new); - if let Some(index) = l0_index { - self.l0_delta_layers[index] = new_desc; - } - } - - let replaced = match replaced { - Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered }, - Replacement::NotFound => Replacement::NotFound, - Replacement::RemovalBuffered => Replacement::RemovalBuffered, - Replacement::Unexpected(x) => { - Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone()) - } - }; - - Ok(replaced) } /// Helper function for BatchedUpdates::drop. @@ -454,10 +319,8 @@ where Ok(true) } - pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { - self.historic - .iter() - .map(|x| self.get_layer_from_mapping(&x.key()).clone()) + pub fn iter_historic_layers(&self) -> impl '_ + Iterator> { + self.historic.iter() } /// @@ -472,7 +335,7 @@ where &self, key_range: &Range, lsn: Lsn, - ) -> Result, Option>)>> { + ) -> Result, Option>)>> { let version = match self.historic.get().unwrap().get_version(lsn.0) { Some(v) => v, None => return Ok(vec![]), @@ -482,36 +345,26 @@ where let end = key_range.end.to_i128(); // Initialize loop variables - let mut coverage: Vec<(Range, Option>)> = vec![]; + let mut coverage: Vec<(Range, Option>)> = vec![]; let mut current_key = start; let mut current_val = version.image_coverage.query(start); // Loop through the change events and push intervals for (change_key, change_val) in version.image_coverage.range(start..end) { let kr = Key::from_i128(current_key)..Key::from_i128(change_key); - coverage.push(( - kr, - current_val - .take() - .map(|l| self.get_layer_from_mapping(&l.key()).clone()), - )); + coverage.push((kr, current_val.take())); current_key = change_key; current_val = change_val.clone(); } // Add the final interval let kr = Key::from_i128(current_key)..Key::from_i128(end); - coverage.push(( - kr, - current_val - .take() - .map(|l| self.get_layer_from_mapping(&l.key()).clone()), - )); + coverage.push((kr, current_val.take())); Ok(coverage) } - pub fn is_l0(layer: &L) -> bool { + pub fn is_l0(layer: &PersistentLayerDesc) -> bool { range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX)) } @@ -537,7 +390,7 @@ where /// TODO The optimal number should probably be slightly higher than 1, but to /// implement that we need to plumb a lot more context into this function /// than just the current partition_range. - pub fn is_reimage_worthy(layer: &L, partition_range: &Range) -> bool { + pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range) -> bool { // Case 1 if !Self::is_l0(layer) { return true; @@ -595,9 +448,7 @@ where let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = - Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) - as usize; + let base_count = Self::is_reimage_worthy(&val, key) as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; @@ -620,9 +471,7 @@ where let lr = lsn.start..val.get_lsn_range().start; if !kr.is_empty() { - let base_count = - Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key) - as usize; + let base_count = Self::is_reimage_worthy(&val, key) as usize; let new_limit = limit.map(|l| l - base_count); let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; max_stacked_deltas = std::cmp::max( @@ -772,12 +621,8 @@ where } /// Return all L0 delta layers - pub fn get_level0_deltas(&self) -> Result>> { - Ok(self - .l0_delta_layers - .iter() - .map(|x| self.get_layer_from_mapping(&x.key()).clone()) - .collect()) + pub fn get_level0_deltas(&self) -> Result>> { + Ok(self.l0_delta_layers.to_vec()) } /// debugging function to print out the contents of the layer map @@ -802,72 +647,79 @@ where println!("End dump LayerMap"); Ok(()) } +} - /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. - /// - /// Returns `true` if the two `Arc` point to the same layer, false otherwise. - #[inline(always)] - pub fn compare_arced_layers(left: &Arc, right: &Arc) -> bool { - // "dyn Trait" objects are "fat pointers" in that they have two components: - // - pointer to the object - // - pointer to the vtable - // - // rust does not provide a guarantee that these vtables are unique, but however - // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the - // pointer and the vtable need to be equal. - // - // See: https://github.com/rust-lang/rust/issues/103763 - // - // A future version of rust will most likely use this form below, where we cast each - // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it - // not affect the comparison. - // - // See: https://github.com/rust-lang/rust/pull/106450 - let left = Arc::as_ptr(left) as *const (); - let right = Arc::as_ptr(right) as *const (); +/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. +/// +/// Returns `true` if the two `Arc` point to the same layer, false otherwise. +/// +/// If comparing persistent layers, ALWAYS compare the layer descriptor key. +#[inline(always)] +pub fn compare_arced_layers(left: &Arc, right: &Arc) -> bool { + // "dyn Trait" objects are "fat pointers" in that they have two components: + // - pointer to the object + // - pointer to the vtable + // + // rust does not provide a guarantee that these vtables are unique, but however + // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the + // pointer and the vtable need to be equal. + // + // See: https://github.com/rust-lang/rust/issues/103763 + // + // A future version of rust will most likely use this form below, where we cast each + // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it + // not affect the comparison. + // + // See: https://github.com/rust-lang/rust/pull/106450 + let left = Arc::as_ptr(left) as *const (); + let right = Arc::as_ptr(right) as *const (); - left == right - } + left == right } #[cfg(test)] mod tests { - use super::{LayerMap, Replacement}; - use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; + use super::LayerMap; + use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName}; use std::str::FromStr; use std::sync::Arc; mod l0_delta_layers_updated { + use crate::tenant::{ + storage_layer::{PersistentLayer, PersistentLayerDesc}, + timeline::LayerMapping, + }; + use super::*; #[test] fn for_full_range_delta() { // l0_delta_layers are used by compaction, and should observe all buffered updates l0_delta_layers_updated_scenario( - "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", - true - ) + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", + true + ) } #[test] fn for_non_full_range_delta() { // has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta l0_delta_layers_updated_scenario( - "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", - // because not full range - false - ) + "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", + // because not full range + false + ) } #[test] fn for_image() { l0_delta_layers_updated_scenario( - "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", - // code only checks if it is a full range layer, doesn't care about images, which must - // mean we should in practice never have full range images - false - ) + "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", + // code only checks if it is a full range layer, doesn't care about images, which must + // mean we should in practice never have full range images + false + ) } #[test] @@ -883,16 +735,16 @@ mod tests { let not_found = Arc::new(layer.clone()); let new_version = Arc::new(layer); - let mut map = LayerMap::default(); + // after the immutable storage state refactor, the replace operation + // will not use layer map any more. We keep it here for consistency in test cases + // and can remove it in the future. + let _map = LayerMap::default(); - let res = map.batch_update().replace_historic( - not_found.get_persistent_layer_desc(), - ¬_found, - new_version.get_persistent_layer_desc(), - new_version, - ); + let mut mapping = LayerMapping::new(); - assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}"); + mapping + .replace_and_verify(not_found, new_version) + .unwrap_err(); } fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { @@ -903,49 +755,44 @@ mod tests { let downloaded = Arc::new(skeleton); let mut map = LayerMap::default(); + let mut mapping = LayerMapping::new(); // two disjoint Arcs in different lifecycle phases. even if it seems they must be the // same layer, we use LayerMap::compare_arced_layers as the identity of layers. - assert!(!LayerMap::compare_arced_layers(&remote, &downloaded)); + assert_eq!(remote.layer_desc(), downloaded.layer_desc()); let expected_in_counts = (1, usize::from(expected_l0)); map.batch_update() - .insert_historic(remote.get_persistent_layer_desc(), remote.clone()); - assert_eq!(count_layer_in(&map, &remote), expected_in_counts); - - let replaced = map - .batch_update() - .replace_historic( - remote.get_persistent_layer_desc(), - &remote, - downloaded.get_persistent_layer_desc(), - downloaded.clone(), - ) - .expect("name derived attributes are the same"); - assert!( - matches!(replaced, Replacement::Replaced { .. }), - "{replaced:?}" + .insert_historic(remote.layer_desc().clone()); + mapping.insert(remote.clone()); + assert_eq!( + count_layer_in(&map, remote.layer_desc()), + expected_in_counts + ); + + mapping + .replace_and_verify(remote, downloaded.clone()) + .expect("name derived attributes are the same"); + assert_eq!( + count_layer_in(&map, downloaded.layer_desc()), + expected_in_counts ); - assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts); map.batch_update() - .remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); - assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); + .remove_historic(downloaded.layer_desc().clone()); + assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0)); } - fn count_layer_in(map: &LayerMap, layer: &Arc) -> (usize, usize) { + fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) { let historic = map .iter_historic_layers() - .filter(|x| LayerMap::compare_arced_layers(x, layer)) + .filter(|x| x.key() == layer.key()) .count(); let l0s = map .get_level0_deltas() .expect("why does this return a result"); - let l0 = l0s - .iter() - .filter(|x| LayerMap::compare_arced_layers(x, layer)) - .count(); + let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count(); (historic, l0) } diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index 49dcbc63c2..ad463b4f23 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -3,6 +3,8 @@ use std::ops::Range; use tracing::info; +use crate::tenant::storage_layer::PersistentLayerDesc; + use super::layer_coverage::LayerCoverageTuple; /// Layers in this module are identified and indexed by this data. @@ -53,6 +55,18 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK } } +impl From<&PersistentLayerDesc> for LayerKey { + fn from(layer: &PersistentLayerDesc) -> Self { + let kr = layer.get_key_range(); + let lr = layer.get_lsn_range(); + LayerKey { + key: kr.start.to_i128()..kr.end.to_i128(), + lsn: lr.start.0..lr.end.0, + is_image: !layer.is_incremental(), + } + } +} + /// Efficiently queryable layer coverage for each LSN. /// /// Allows answering layer map queries very efficiently, @@ -467,6 +481,11 @@ impl BufferedHistoricLayerCoverage { /// /// Returns a `Replacement` value describing the outcome; only the case of /// `Replacement::Replaced` modifies the map and requires a rebuild. + /// + /// This function is unlikely to be used in the future because LayerMap now only records the + /// layer descriptors. Therefore, anything added to the layer map will only be removed or + /// added, and never replaced. + #[allow(dead_code)] pub fn replace( &mut self, layer_key: &LayerKey, diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 0af3d4ce39..7bc513b3a1 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -176,13 +176,10 @@ impl LayerAccessStats { /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. /// /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. - pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + pub(crate) fn for_loading_layer( + layer_map_lock_held_witness: &BatchedUpdates<'_>, status: LayerResidenceStatus, - ) -> Self - where - L: ?Sized + Layer, - { + ) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( layer_map_lock_held_witness, @@ -197,14 +194,11 @@ impl LayerAccessStats { /// The `new_status` is not recorded in `self`. /// /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. - pub(crate) fn clone_for_residence_change( + pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + layer_map_lock_held_witness: &BatchedUpdates<'_>, new_status: LayerResidenceStatus, - ) -> LayerAccessStats - where - L: ?Sized + Layer, - { + ) -> LayerAccessStats { let clone = { let inner = self.0.lock().unwrap(); inner.clone() @@ -232,14 +226,12 @@ impl LayerAccessStats { /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. /// - pub(crate) fn record_residence_event( + pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + _layer_map_lock_held_witness: &BatchedUpdates<'_>, status: LayerResidenceStatus, reason: LayerResidenceEventReason, - ) where - L: ?Sized + Layer, - { + ) { let mut locked = self.0.lock().unwrap(); locked.iter_mut().for_each(|inner| { inner @@ -473,94 +465,125 @@ pub fn downcast_remote_layer( } } -/// Holds metadata about a layer without any content. Used mostly for testing. -/// -/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a -/// LayerDescriptor. -#[derive(Clone, Debug)] -pub struct LayerDescriptor { - pub key: Range, - pub lsn: Range, - pub is_incremental: bool, - pub short_id: String, -} +pub mod tests { + use super::*; -impl LayerDescriptor { - /// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, - /// and the tenant / timeline id does not matter. - pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { - PersistentLayerDesc::new_delta( - TenantId::from_array([0; 16]), - TimelineId::from_array([0; 16]), - self.key.clone(), - self.lsn.clone(), - 233, - ) - } -} - -impl Layer for LayerDescriptor { - fn get_key_range(&self) -> Range { - self.key.clone() + /// Holds metadata about a layer without any content. Used mostly for testing. + /// + /// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a + /// LayerDescriptor. + #[derive(Clone, Debug)] + pub struct LayerDescriptor { + base: PersistentLayerDesc, } - fn get_lsn_range(&self) -> Range { - self.lsn.clone() - } - - fn is_incremental(&self) -> bool { - self.is_incremental - } - - fn get_value_reconstruct_data( - &self, - _key: Key, - _lsn_range: Range, - _reconstruct_data: &mut ValueReconstructState, - _ctx: &RequestContext, - ) -> Result { - todo!("This method shouldn't be part of the Layer trait") - } - - fn short_id(&self) -> String { - self.short_id.clone() - } - - fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { - todo!() - } -} - -impl From for LayerDescriptor { - fn from(value: DeltaFileName) -> Self { - let short_id = value.to_string(); - LayerDescriptor { - key: value.key_range, - lsn: value.lsn_range, - is_incremental: true, - short_id, + impl From for LayerDescriptor { + fn from(base: PersistentLayerDesc) -> Self { + Self { base } } } -} -impl From for LayerDescriptor { - fn from(value: ImageFileName) -> Self { - let short_id = value.to_string(); - let lsn = value.lsn_as_range(); - LayerDescriptor { - key: value.key_range, - lsn, - is_incremental: false, - short_id, + impl Layer for LayerDescriptor { + fn get_value_reconstruct_data( + &self, + _key: Key, + _lsn_range: Range, + _reconstruct_data: &mut ValueReconstructState, + _ctx: &RequestContext, + ) -> Result { + todo!("This method shouldn't be part of the Layer trait") + } + + fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> { + todo!() + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn get_key_range(&self) -> Range { + self.layer_desc().key_range.clone() + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn get_lsn_range(&self) -> Range { + self.layer_desc().lsn_range.clone() + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn is_incremental(&self) -> bool { + self.layer_desc().is_incremental + } + + /// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers. + fn short_id(&self) -> String { + self.layer_desc().short_id() } } -} -impl From for LayerDescriptor { - fn from(value: LayerFileName) -> Self { - match value { - LayerFileName::Delta(d) => Self::from(d), - LayerFileName::Image(i) => Self::from(i), + impl PersistentLayer for LayerDescriptor { + fn layer_desc(&self) -> &PersistentLayerDesc { + &self.base + } + + fn local_path(&self) -> Option { + unimplemented!() + } + + fn iter(&self, _: &RequestContext) -> Result> { + unimplemented!() + } + + fn key_iter(&self, _: &RequestContext) -> Result> { + unimplemented!() + } + + fn delete_resident_layer_file(&self) -> Result<()> { + unimplemented!() + } + + fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo { + unimplemented!() + } + + fn access_stats(&self) -> &LayerAccessStats { + unimplemented!() + } + } + + impl From for LayerDescriptor { + fn from(value: DeltaFileName) -> Self { + LayerDescriptor { + base: PersistentLayerDesc::new_delta( + TenantId::from_array([0; 16]), + TimelineId::from_array([0; 16]), + value.key_range, + value.lsn_range, + 233, + ), + } + } + } + + impl From for LayerDescriptor { + fn from(value: ImageFileName) -> Self { + LayerDescriptor { + base: PersistentLayerDesc::new_img( + TenantId::from_array([0; 16]), + TimelineId::from_array([0; 16]), + value.key_range, + value.lsn, + false, + 233, + ), + } + } + } + + impl From for LayerDescriptor { + fn from(value: LayerFileName) -> Self { + match value { + LayerFileName::Delta(d) => Self::from(d), + LayerFileName::Image(i) => Self::from(i), + } } } } diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 387bae5b1f..9d423ed815 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -218,15 +218,12 @@ impl RemoteLayer { } /// Create a Layer struct representing this layer, after it has been downloaded. - pub fn create_downloaded_layer( + pub fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_, L>, + layer_map_lock_held_witness: &BatchedUpdates<'_>, conf: &'static PageServerConf, file_size: u64, - ) -> Arc - where - L: ?Sized + Layer, - { + ) -> Arc { if self.desc.is_delta { let fname = self.desc.delta_file_name(); Arc::new(DeltaLayer::new( diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d642090996..2b223a18d1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3,7 +3,7 @@ mod eviction_task; mod walreceiver; -use anyhow::{anyhow, bail, ensure, Context}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use fail::fail_point; use futures::StreamExt; @@ -28,7 +28,7 @@ use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::pin::pin; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, RwLock, Weak}; +use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::context::{DownloadBehavior, RequestContext}; @@ -52,6 +52,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; use crate::tenant::config::{EvictionPolicy, TenantConfOpt}; +use crate::tenant::layer_map; use pageserver_api::reltag::RelTag; use postgres_connection::PgConnectionConfig; @@ -81,7 +82,9 @@ use super::config::TenantConf; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; -use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; +use super::storage_layer::{ + DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey, +}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { @@ -114,6 +117,80 @@ impl PartialOrd for Hole { } } +pub struct LayerMapping(HashMap>); + +impl LayerMapping { + fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + self.0.get(&desc.key()).expect("not found").clone() + } + + pub(crate) fn insert(&mut self, layer: Arc) { + self.0.insert(layer.layer_desc().key(), layer); + } + + pub(crate) fn new() -> Self { + Self(HashMap::new()) + } + + pub(crate) fn remove(&mut self, layer: Arc) { + self.0.remove(&layer.layer_desc().key()); + } + + pub(crate) fn replace_and_verify( + &mut self, + expected: Arc, + new: Arc, + ) -> Result<()> { + use super::layer_map::LayerKey; + let key = LayerKey::from(&*expected); + let other = LayerKey::from(&*new); + + let expected_l0 = LayerMap::is_l0(expected.layer_desc()); + let new_l0 = LayerMap::is_l0(new.layer_desc()); + + fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!( + "replacing downloaded layer into layermap failed because layer was not found" + )); + + anyhow::ensure!( + key == other, + "replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}" + ); + + anyhow::ensure!( + expected_l0 == new_l0, + "replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}" + ); + + if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) { + anyhow::ensure!( + layer_map::compare_arced_layers(&expected, layer), + "replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}", + expected = Arc::as_ptr(&expected), + new = Arc::as_ptr(layer), + ); + *layer = new; + Ok(()) + } else { + anyhow::bail!( + "replacing downloaded layer into layermap failed because layer was not found" + ); + } + } +} + +/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. +/// Can be removed after all refactors are done. +fn drop_rlock(rlock: RwLockReadGuard<'_, T>) { + drop(rlock) +} + +/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. +/// Can be removed after all refactors are done. +fn drop_wlock(rlock: RwLockWriteGuard<'_, T>) { + drop(rlock) +} + pub struct Timeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -125,7 +202,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: tokio::sync::RwLock>, + pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -598,7 +675,8 @@ impl Timeline { /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. pub async fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().await; + let guard = self.layers.read().await; + let (layer_map, _) = &*guard; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -908,7 +986,8 @@ impl Timeline { pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, _) = &*guard; let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1039,7 +1118,8 @@ impl Timeline { } pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().await; + let guard = self.layers.read().await; + let (layer_map, mapping) = &*guard; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1050,6 +1130,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { + let historic_layer = mapping.get_from_desc(&historic_layer); historic_layers.push(historic_layer.info(reset)); } @@ -1159,7 +1240,8 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layer_map, mapping) = &mut *guard; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1168,14 +1250,19 @@ impl Timeline { let res = if cancel.is_cancelled() { None } else { - Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) + Some(self.evict_layer_batch_impl( + &layer_removal_guard, + l, + &mut batch_updates, + mapping, + )) }; results.push(res); } // commit the updates & release locks batch_updates.flush(); - drop(layer_map); + drop_wlock(guard); drop(layer_removal_guard); assert_eq!(results.len(), layers_to_evict.len()); @@ -1186,10 +1273,9 @@ impl Timeline { &self, _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, local_layer: &Arc, - batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, + batch_updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerMapping, ) -> anyhow::Result { - use super::layer_map::Replacement; - if local_layer.is_remote_layer() { // TODO(issue #3851): consider returning an err here instead of false, // which is the same out the match later @@ -1237,13 +1323,10 @@ impl Timeline { ), }); - let replaced = match batch_updates.replace_historic( - local_layer.layer_desc().clone(), - local_layer, - new_remote_layer.layer_desc().clone(), - new_remote_layer, - )? { - Replacement::Replaced { .. } => { + assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); + + let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) { + Ok(()) => { if let Err(e) = local_layer.delete_resident_layer_file() { error!("failed to remove layer file on evict after replacement: {e:#?}"); } @@ -1276,24 +1359,17 @@ impl Timeline { true } - Replacement::NotFound => { - debug!(evicted=?local_layer, "layer was no longer in layer map"); - false - } - Replacement::RemovalBuffered => { - unreachable!("not doing anything else in this batch") - } - Replacement::Unexpected(other) => { - error!( - local_layer.ptr=?Arc::as_ptr(local_layer), - other.ptr=?Arc::as_ptr(&other), - ?other, - "failed to replace"); + Err(err) => { + if cfg!(debug_assertions) { + error!(evicted=?local_layer, "failed to replace: {err}"); + } else { + error!(evicted=?local_layer, "failed to replace: {err}"); + } false } }; - Ok(replaced) + Ok(succeed) } } @@ -1417,7 +1493,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: tokio::sync::RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1608,7 +1684,8 @@ impl Timeline { /// Scan the timeline directory to populate the layer map. /// pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1651,7 +1728,8 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone()); + mapping.insert(Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1683,7 +1761,8 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); + updates.insert_historic(layer.layer_desc().clone()); + mapping.insert(Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1737,7 +1816,8 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layer_map, mapping) = &mut *guard; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1782,7 +1862,8 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer.layer_desc().clone(), local_layer); + updates.remove_historic(local_layer.layer_desc().clone()); + mapping.remove(local_layer); // fall-through to adding the remote layer } } else { @@ -1821,7 +1902,8 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone()); + mapping.insert(remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1848,7 +1930,8 @@ impl Timeline { ), ); let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone()); + mapping.insert(remote_layer); } } } @@ -1887,13 +1970,14 @@ impl Timeline { let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); - let local_layers = self - .layers - .read() - .await - .iter_historic_layers() - .map(|l| (l.filename(), l)) - .collect::>(); + let local_layers = { + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; + layers + .iter_historic_layers() + .map(|l| (l.filename(), mapping.get_from_desc(&l))) + .collect::>() + }; // If no writes happen, new branches do not have any layers, only the metadata file. let has_local_layers = !local_layers.is_empty(); @@ -2263,11 +2347,13 @@ impl Timeline { } } - async fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().await.iter_historic_layers() { + fn find_layer(&self, layer_file_name: &str) -> Option> { + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; + for historic_layer in layers.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { - return Some(historic_layer); + return Some(mapping.get_from_desc(&historic_layer)); } } @@ -2280,9 +2366,11 @@ impl Timeline { &self, // we cannot remove layers otherwise, since gc and compaction will race _layer_removal_cs: Arc>, - layer: Arc, - updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, + layer: Arc, + updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerMapping, ) -> anyhow::Result<()> { + let layer = mapping.get_from_desc(&layer); if !layer.is_remote_layer() { layer.delete_resident_layer_file()?; let layer_file_size = layer.file_size(); @@ -2296,7 +2384,8 @@ impl Timeline { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer.layer_desc().clone(), layer); + updates.remove_historic(layer.layer_desc().clone()); + mapping.remove(layer); Ok(()) } @@ -2481,7 +2570,8 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().await; + let guard = timeline.layers.read().await; + let (layers, mapping) = &*guard; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2543,6 +2633,7 @@ impl Timeline { } if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { + let layer = mapping.get_from_desc(&layer); // If it's a remote layer, download it and retry. if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) @@ -2664,7 +2755,8 @@ impl Timeline { /// Get a handle to the latest layer for appending. /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, _) = &mut *guard; ensure!(lsn.is_aligned()); @@ -2741,7 +2833,8 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, _) = &mut *guard; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2755,7 +2848,7 @@ impl Timeline { layers.next_open_layer_at = Some(end_lsn); self.last_freeze_at.store(end_lsn); } - drop(layers); + drop_wlock(guard); } /// Layer flusher task's main loop. @@ -2779,7 +2872,8 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, _) = &*guard; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2905,12 +2999,12 @@ impl Timeline { // in-memory layer from the map now. { let mut layers = self.layers.write().await; - let l = layers.frozen_layers.pop_front(); + let l = layers.0.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this // timeline). If two threads tried to flush the same frozen // layer to disk at the same time, that would not work. - assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer)); + assert!(layer_map::compare_arced_layers(&l.unwrap(), &frozen_layer)); // release lock on 'layers' } @@ -3041,14 +3135,16 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - batch_updates.insert_historic(l.layer_desc().clone(), l); + batch_updates.insert_historic(l.layer_desc().clone()); + mapping.insert(l); batch_updates.flush(); // update the timeline's physical size @@ -3098,7 +3194,8 @@ impl Timeline { ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, _) = &*guard; let mut max_deltas = 0; { @@ -3276,7 +3373,8 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -3298,10 +3396,11 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(l.layer_desc().clone(), l); + updates.insert_historic(l.layer_desc().clone()); + mapping.insert(l); } updates.flush(); - drop(layers); + drop_wlock(guard); timer.stop_and_record(); Ok(layer_paths_to_upload) @@ -3311,7 +3410,7 @@ impl Timeline { #[derive(Default)] struct CompactLevel0Phase1Result { new_layers: Vec, - deltas_to_compact: Vec>, + deltas_to_compact: Vec>, } /// Top-level failure to compact. @@ -3344,9 +3443,9 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; let mut level0_deltas = layers.get_level0_deltas()?; - drop(layers); // Only compact if enough layers have accumulated. let threshold = self.get_compaction_threshold(); @@ -3393,6 +3492,7 @@ impl Timeline { let remotes = deltas_to_compact .iter() + .map(|l| mapping.get_from_desc(l)) .filter(|l| l.is_remote_layer()) .inspect(|l| info!("compact requires download of {}", l.filename().file_name())) .map(|l| { @@ -3402,6 +3502,13 @@ impl Timeline { }) .collect::>(); + let deltas_to_compact_layers = deltas_to_compact + .iter() + .map(|l| mapping.get_from_desc(l)) + .collect_vec(); + + drop_rlock(guard); + if !remotes.is_empty() { // caller is holding the lock to layer_removal_cs, and we don't want to download while // holding that; in future download_remote_layer might take it as well. this is @@ -3428,7 +3535,7 @@ impl Timeline { // This iterator walks through all key-value pairs from all the layers // we're compacting, in key, LSN order. let all_values_iter = itertools::process_results( - deltas_to_compact.iter().map(|l| l.iter(ctx)), + deltas_to_compact_layers.iter().map(|l| l.iter(ctx)), |iter_iter| { iter_iter.kmerge_by(|a, b| { if let Ok((a_key, a_lsn, _)) = a { @@ -3450,7 +3557,7 @@ impl Timeline { // This iterator walks through all keys and is needed to calculate size used by each key let mut all_keys_iter = itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)), |iter_iter| { iter_iter.kmerge_by(|a, b| { let (a_key, a_lsn, _) = a; @@ -3467,7 +3574,8 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? + let guard = self.layers.read().await; // Is'n it better to hold original layers lock till here? + let (layers, _) = &*guard; let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3475,7 +3583,7 @@ impl Timeline { let mut heap: BinaryHeap = BinaryHeap::with_capacity(max_holes + 1); let mut prev: Option = None; for (next_key, _next_lsn, _size) in itertools::process_results( - deltas_to_compact.iter().map(|l| l.key_iter(ctx)), + deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)), |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), )? { if let Some(prev_key) = prev { @@ -3500,7 +3608,7 @@ impl Timeline { } prev = Some(next_key.next()); } - drop(layers); + drop_rlock(guard); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); let mut next_hole = 0; // index of next hole in holes vector @@ -3704,7 +3812,8 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3736,7 +3845,8 @@ impl Timeline { LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - updates.insert_historic(x.layer_desc().clone(), x); + updates.insert_historic(x.layer_desc().clone()); + mapping.insert(x); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -3744,10 +3854,10 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?; } updates.flush(); - drop(layers); + drop_wlock(guard); // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { @@ -3963,7 +4073,8 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().await; + let mut guard = self.layers.write().await; + let (layers, mapping) = &mut *guard; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4042,7 +4153,7 @@ impl Timeline { // delta layers. Image layers can form "stairs" preventing old image from been deleted. // But image layers are in any case less sparse than delta layers. Also we need some // protection from replacing recent image layers with new one after each GC iteration. - if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) { + if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) { wanted_image_layers.add_range(l.get_key_range()); } result.layers_not_updated += 1; @@ -4079,6 +4190,7 @@ impl Timeline { layer_removal_cs.clone(), doomed_layer, &mut updates, + mapping, )?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } @@ -4263,42 +4375,15 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().await; - let mut updates = layers.batch_update(); - let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); + let mut guard = self_clone.layers.write().await; + let (layers, mapping) = &mut *guard; + let updates = layers.batch_update(); + let new_layer = + remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { - use crate::tenant::layer_map::Replacement; let l: Arc = remote_layer.clone(); - let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) { - Ok(Replacement::Replaced { .. }) => false, - Ok(Replacement::NotFound) => { - // TODO: the downloaded file should probably be removed, otherwise - // it will be added to the layermap on next load? we should - // probably restart any get_reconstruct_data search as well. - // - // See: https://github.com/neondatabase/neon/issues/3533 - error!("replacing downloaded layer into layermap failed because layer was not found"); - true - } - Ok(Replacement::RemovalBuffered) => { - unreachable!("current implementation does not remove anything") - } - Ok(Replacement::Unexpected(other)) => { - // if the other layer would have the same pointer value as - // expected, it means they differ only on vtables. - // - // otherwise there's no known reason for this to happen as - // compacted layers should have different covering rectangle - // leading to produce Replacement::NotFound. - - error!( - expected.ptr = ?Arc::as_ptr(&l), - other.ptr = ?Arc::as_ptr(&other), - ?other, - "replacing downloaded layer into layermap failed because another layer was found instead of expected" - ); - true - } + let failure = match mapping.replace_and_verify(l, new_layer) { + Ok(()) => false, Err(e) => { // this is a precondition failure, the layer filename derived // attributes didn't match up, which doesn't seem likely. @@ -4326,7 +4411,7 @@ impl Timeline { } } updates.flush(); - drop(layers); + drop_wlock(guard); info!("on-demand download successful"); @@ -4337,7 +4422,10 @@ impl Timeline { remote_layer.ongoing_download.close(); } else { // Keep semaphore open. We'll drop the permit at the end of the function. - error!("layer file download failed: {:?}", result.as_ref().unwrap_err()); + error!( + "layer file download failed: {:?}", + result.as_ref().unwrap_err() + ); } // Don't treat it as an error if the task that triggered the download @@ -4351,7 +4439,8 @@ impl Timeline { drop(permit); Ok(()) - }.in_current_span(), + } + .in_current_span(), ); receiver.await.context("download task cancelled")? @@ -4421,9 +4510,11 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; layers .iter_historic_layers() + .map(|l| mapping.get_from_desc(&l)) .filter_map(|l| l.downcast_remote_layer()) .map(|l| self.download_remote_layer(l)) .for_each(|dl| downloads.push(dl)) @@ -4524,7 +4615,8 @@ impl LocalLayerInfoForDiskUsageEviction { impl Timeline { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4533,6 +4625,8 @@ impl Timeline { let file_size = l.file_size(); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); + let l = mapping.get_from_desc(&l); + if l.is_remote_layer() { continue; } diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 80c5210211..03cf2d89ad 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -197,9 +197,11 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().await; + let guard = self.layers.read().await; + let (layers, mapping) = &*guard; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { + let hist_layer = mapping.get_from_desc(&hist_layer); if hist_layer.is_remote_layer() { continue; }