refactor: use LayerDesc in LayerMap (part 2)

Signed-off-by: Alex Chi <iskyzh@gmail.com>
This commit is contained in:
Alex Chi
2023-06-06 17:20:15 -04:00
parent 3693d1f431
commit faee3152f3
8 changed files with 486 additions and 500 deletions

View File

@@ -1,22 +1,23 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
let mut layer_map = LayerMap::<LayerDescriptor>::default();
fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::default();
let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0);
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
updates.insert_historic(layer.layer_desc().clone());
}
println!("min: {min_lsn}, max: {max_lsn}");
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
}
/// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> {
fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
// Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning {
fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer,
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 {
let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor {
key: zero.add(10 * i32)..zero.add(10 * i32 + 1),
lsn: Lsn(i)..Lsn(i + 1),
is_incremental: false,
short_id: format!("Layer {}", i),
};
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer));
let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
zero.add(10 * i32)..zero.add(10 * i32 + 1),
Lsn(i),
false,
0,
));
updates.insert_historic(layer.layer_desc().clone());
}
updates.flush();
println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -562,6 +562,7 @@ impl Tenant {
.layers
.read()
.await
.0
.iter_historic_layers()
.next()
.is_some(),

View File

@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::ops::Range;
use std::sync::Arc;
use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement;
pub use historic_layer_coverage::{LayerKey, Replacement};
use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
///
/// LayerMap tracks what layers exist on a timeline.
///
pub struct LayerMap<L: ?Sized> {
#[derive(Default)]
pub struct LayerMap {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating
@@ -95,24 +93,6 @@ pub struct LayerMap<L: ?Sized> {
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
}
/// The primary update API for the layer map.
@@ -120,24 +100,21 @@ impl<L: ?Sized> Default for LayerMap<L> {
/// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly.
#[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> {
pub struct BatchedUpdates<'a> {
// While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>,
layer_map: &'a mut LayerMap,
}
/// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
impl BatchedUpdates<'_> {
///
/// Insert an on-disk layer.
///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.insert_historic_noflush(layer_desc, layer)
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc)
}
///
@@ -145,31 +122,8 @@ where
///
/// This should be called when the corresponding file on disk has been deleted.
///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
self.layer_map.remove_historic_noflush(layer_desc, layer)
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc)
}
// We will flush on drop anyway, but this method makes it
@@ -185,25 +139,19 @@ where
// than panic later or read without flushing.
//
// TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L>
where
L: ?Sized + Layer,
{
impl Drop for BatchedUpdates<'_> {
fn drop(&mut self) {
self.layer_map.flush_updates();
}
}
/// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> {
pub layer: Arc<L>,
pub struct SearchResult {
pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn,
}
impl<L> LayerMap<L>
where
L: ?Sized + Layer,
{
impl LayerMap {
///
/// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'.
@@ -235,7 +183,7 @@ where
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> {
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128());
@@ -244,7 +192,6 @@ where
(None, None) => None,
(None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor,
@@ -252,7 +199,6 @@ where
}
(Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -263,7 +209,6 @@ where
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult {
layer: image,
lsn_floor: img_lsn,
@@ -271,7 +216,6 @@ where
} else {
let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult {
layer: delta,
lsn_floor,
@@ -282,7 +226,7 @@ where
}
/// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> {
pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
BatchedUpdates { layer_map: self }
}
@@ -292,48 +236,32 @@ where
/// Helper function for BatchedUpdates::insert_historic
///
/// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush(
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
// TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer) {
if Self::is_l0(&layer_desc) {
self.l0_delta_layers.push(layer_desc.clone().into());
}
self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer),
historic_layer_coverage::LayerKey::from(&layer_desc),
layer_desc.into(),
);
}
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
///
/// Remove an on-disk layer from the map.
///
/// Helper function for BatchedUpdates::remove_historic
///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) {
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer));
if Self::is_l0(&layer) {
.remove(historic_layer_coverage::LayerKey::from(&layer_desc));
let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| {
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
l0_delta_layers.retain(|other| other.key() != layer_key);
self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
@@ -344,69 +272,6 @@ where
"failed to locate removed historic layer from l0_delta_layers"
);
}
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
}
pos
} else {
None
};
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
}
/// Helper function for BatchedUpdates::drop.
@@ -454,10 +319,8 @@ where
Ok(true)
}
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> {
self.historic
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic.iter()
}
///
@@ -472,7 +335,7 @@ where
&self,
key_range: &Range<Key>,
lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> {
) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v,
None => return Ok(vec![]),
@@ -482,36 +345,26 @@ where
let end = key_range.end.to_i128();
// Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![];
let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
let mut current_key = start;
let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
coverage.push((kr, current_val.take()));
current_key = change_key;
current_val = change_val.clone();
}
// Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push((
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
coverage.push((kr, current_val.take()));
Ok(coverage)
}
pub fn is_l0(layer: &L) -> bool {
pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
}
@@ -537,7 +390,7 @@ where
/// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function
/// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool {
pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
// Case 1
if !Self::is_l0(layer) {
return true;
@@ -595,9 +448,7 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?;
@@ -620,9 +471,7 @@ where
let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() {
let base_count =
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let base_count = Self::is_reimage_worthy(&val, key) as usize;
let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max(
@@ -772,12 +621,8 @@ where
}
/// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> {
Ok(self
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self.l0_delta_layers.to_vec())
}
/// debugging function to print out the contents of the layer map
@@ -802,72 +647,79 @@ where
println!("End dump LayerMap");
Ok(())
}
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
#[inline(always)]
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
///
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
#[inline(always)]
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right
}
left == right
}
#[cfg(test)]
mod tests {
use super::{LayerMap, Replacement};
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName};
use super::LayerMap;
use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use std::str::FromStr;
use std::sync::Arc;
mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{PersistentLayer, PersistentLayerDesc},
timeline::LayerMapping,
};
use super::*;
#[test]
fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true
)
}
#[test]
fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range
false
)
}
#[test]
fn for_image() {
l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images
false
)
}
#[test]
@@ -883,16 +735,16 @@ mod tests {
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
let mut map = LayerMap::default();
// after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let res = map.batch_update().replace_historic(
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
let mut mapping = LayerMapping::new();
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
}
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
@@ -903,49 +755,44 @@ mod tests {
let downloaded = Arc::new(skeleton);
let mut map = LayerMap::default();
let mut mapping = LayerMapping::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded));
assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update()
.insert_historic(remote.get_persistent_layer_desc(), remote.clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts);
let replaced = map
.batch_update()
.replace_historic(
remote.get_persistent_layer_desc(),
&remote,
downloaded.get_persistent_layer_desc(),
downloaded.clone(),
)
.expect("name derived attributes are the same");
assert!(
matches!(replaced, Replacement::Replaced { .. }),
"{replaced:?}"
.insert_historic(remote.layer_desc().clone());
mapping.insert(remote.clone());
assert_eq!(
count_layer_in(&map, remote.layer_desc()),
expected_in_counts
);
mapping
.replace_and_verify(remote, downloaded.clone())
.expect("name derived attributes are the same");
assert_eq!(
count_layer_in(&map, downloaded.layer_desc()),
expected_in_counts
);
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update()
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0));
.remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
}
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) {
fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map
.iter_historic_layers()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.filter(|x| x.key() == layer.key())
.count();
let l0s = map
.get_level0_deltas()
.expect("why does this return a result");
let l0 = l0s
.iter()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
(historic, l0)
}

View File

@@ -3,6 +3,8 @@ use std::ops::Range;
use tracing::info;
use crate::tenant::storage_layer::PersistentLayerDesc;
use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data.
@@ -53,6 +55,18 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
}
}
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
}
}
}
/// Efficiently queryable layer coverage for each LSN.
///
/// Allows answering layer map queries very efficiently,
@@ -467,6 +481,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
///
/// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[allow(dead_code)]
pub fn replace<F>(
&mut self,
layer_key: &LayerKey,

View File

@@ -176,13 +176,10 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus,
) -> Self
where
L: ?Sized + Layer,
{
) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event(
layer_map_lock_held_witness,
@@ -197,14 +194,11 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`.
///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>(
pub(crate) fn clone_for_residence_change(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
new_status: LayerResidenceStatus,
) -> LayerAccessStats
where
L: ?Sized + Layer,
{
) -> LayerAccessStats {
let clone = {
let inner = self.0.lock().unwrap();
inner.clone()
@@ -232,14 +226,12 @@ impl LayerAccessStats {
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
///
pub(crate) fn record_residence_event<L>(
pub(crate) fn record_residence_event(
&self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
_layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) where
L: ?Sized + Layer,
{
) {
let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| {
inner
@@ -473,94 +465,125 @@ pub fn downcast_remote_layer(
}
}
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
pub mod tests {
use super::*;
impl LayerDescriptor {
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta,
/// and the tenant / timeline id does not matter.
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc {
PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
/// Holds metadata about a layer without any content. Used mostly for testing.
///
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
base: PersistentLayerDesc,
}
fn get_lsn_range(&self) -> Range<Lsn> {
self.lsn.clone()
}
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
let short_id = value.to_string();
LayerDescriptor {
key: value.key_range,
lsn: value.lsn_range,
is_incremental: true,
short_id,
impl From<PersistentLayerDesc> for LayerDescriptor {
fn from(base: PersistentLayerDesc) -> Self {
Self { base }
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
let short_id = value.to_string();
let lsn = value.lsn_as_range();
LayerDescriptor {
key: value.key_range,
lsn,
is_incremental: false,
short_id,
impl Layer for LayerDescriptor {
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
impl PersistentLayer for LayerDescriptor {
fn layer_desc(&self) -> &PersistentLayerDesc {
&self.base
}
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
}
}
}

View File

@@ -218,15 +218,12 @@ impl RemoteLayer {
}
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer<L>(
pub fn create_downloaded_layer(
&self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
layer_map_lock_held_witness: &BatchedUpdates<'_>,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer>
where
L: ?Sized + Layer,
{
) -> Arc<dyn PersistentLayer> {
if self.desc.is_delta {
let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new(

View File

@@ -3,7 +3,7 @@
mod eviction_task;
mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use fail::fail_point;
use futures::StreamExt;
@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::{DownloadBehavior, RequestContext};
@@ -52,6 +52,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
use crate::tenant::layer_map;
use pageserver_api::reltag::RelTag;
use postgres_connection::PgConnectionConfig;
@@ -81,7 +82,9 @@ use super::config::TenantConf;
use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset};
use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState {
@@ -114,6 +117,80 @@ impl PartialOrd for Hole {
}
}
pub struct LayerMapping(HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>);
impl LayerMapping {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.0.get(&desc.key()).expect("not found").clone()
}
pub(crate) fn insert(&mut self, layer: Arc<dyn PersistentLayer>) {
self.0.insert(layer.layer_desc().key(), layer);
}
pub(crate) fn new() -> Self {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: Arc<dyn PersistentLayer>) {
self.0.remove(&layer.layer_desc().key());
}
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
use super::layer_map::LayerKey;
let key = LayerKey::from(&*expected);
let other = LayerKey::from(&*new);
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: RwLockReadGuard<'_, T>) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_wlock<T>(rlock: RwLockWriteGuard<'_, T>) {
drop(rlock)
}
pub struct Timeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -125,7 +202,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -598,7 +675,8 @@ impl Timeline {
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await;
let guard = self.layers.read().await;
let (layer_map, _) = &*guard;
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -908,7 +986,8 @@ impl Timeline {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let open_layer_size = {
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, _) = &*guard;
let Some(open_layer) = layers.open_layer.as_ref() else {
return Ok(());
};
@@ -1039,7 +1118,8 @@ impl Timeline {
}
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await;
let guard = self.layers.read().await;
let (layer_map, mapping) = &*guard;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1050,6 +1130,7 @@ impl Timeline {
let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = mapping.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset));
}
@@ -1159,7 +1240,8 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layer_map, mapping) = &mut *guard;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1168,14 +1250,19 @@ impl Timeline {
let res = if cancel.is_cancelled() {
None
} else {
Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates))
Some(self.evict_layer_batch_impl(
&layer_removal_guard,
l,
&mut batch_updates,
mapping,
))
};
results.push(res);
}
// commit the updates & release locks
batch_updates.flush();
drop(layer_map);
drop_wlock(guard);
drop(layer_removal_guard);
assert_eq!(results.len(), layers_to_evict.len());
@@ -1186,10 +1273,9 @@ impl Timeline {
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerMapping,
) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later
@@ -1237,13 +1323,10 @@ impl Timeline {
),
});
let replaced = match batch_updates.replace_historic(
local_layer.layer_desc().clone(),
local_layer,
new_remote_layer.layer_desc().clone(),
new_remote_layer,
)? {
Replacement::Replaced { .. } => {
assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) {
Ok(()) => {
if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}");
}
@@ -1276,24 +1359,17 @@ impl Timeline {
true
}
Replacement::NotFound => {
debug!(evicted=?local_layer, "layer was no longer in layer map");
false
}
Replacement::RemovalBuffered => {
unreachable!("not doing anything else in this batch")
}
Replacement::Unexpected(other) => {
error!(
local_layer.ptr=?Arc::as_ptr(local_layer),
other.ptr=?Arc::as_ptr(&other),
?other,
"failed to replace");
Err(err) => {
if cfg!(debug_assertions) {
error!(evicted=?local_layer, "failed to replace: {err}");
} else {
error!(evicted=?local_layer, "failed to replace: {err}");
}
false
}
};
Ok(replaced)
Ok(succeed)
}
}
@@ -1417,7 +1493,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: tokio::sync::RwLock::new(LayerMap::default()),
layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -1608,7 +1684,8 @@ impl Timeline {
/// Scan the timeline directory to populate the layer map.
///
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1651,7 +1728,8 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer));
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(Arc::new(layer));
num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file.
@@ -1683,7 +1761,8 @@ impl Timeline {
trace!("found layer {}", layer.path().display());
total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer));
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(Arc::new(layer));
num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these
@@ -1737,7 +1816,8 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layer_map, mapping) = &mut *guard;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1782,7 +1862,8 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else {
self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer.layer_desc().clone(), local_layer);
updates.remove_historic(local_layer.layer_desc().clone());
mapping.remove(local_layer);
// fall-through to adding the remote layer
}
} else {
@@ -1821,7 +1902,8 @@ impl Timeline {
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone());
mapping.insert(remote_layer);
}
LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file.
@@ -1848,7 +1930,8 @@ impl Timeline {
),
);
let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone());
mapping.insert(remote_layer);
}
}
}
@@ -1887,13 +1970,14 @@ impl Timeline {
let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn();
let local_layers = self
.layers
.read()
.await
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
let local_layers = {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
layers
.iter_historic_layers()
.map(|l| (l.filename(), mapping.get_from_desc(&l)))
.collect::<HashMap<_, _>>()
};
// If no writes happen, new branches do not have any layers, only the metadata file.
let has_local_layers = !local_layers.is_empty();
@@ -2263,11 +2347,13 @@ impl Timeline {
}
}
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() {
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
for historic_layer in layers.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
return Some(mapping.get_from_desc(&historic_layer));
}
}
@@ -2280,9 +2366,11 @@ impl Timeline {
&self,
// we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
layer: Arc<PersistentLayerDesc>,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerMapping,
) -> anyhow::Result<()> {
let layer = mapping.get_from_desc(&layer);
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size();
@@ -2296,7 +2384,8 @@ impl Timeline {
// won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index.
updates.remove_historic(layer.layer_desc().clone(), layer);
updates.remove_historic(layer.layer_desc().clone());
mapping.remove(layer);
Ok(())
}
@@ -2481,7 +2570,8 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().await;
let guard = timeline.layers.read().await;
let (layers, mapping) = &*guard;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2543,6 +2633,7 @@ impl Timeline {
}
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = mapping.get_from_desc(&layer);
// If it's a remote layer, download it and retry.
if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer)
@@ -2664,7 +2755,8 @@ impl Timeline {
/// Get a handle to the latest layer for appending.
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
ensure!(lsn.is_aligned());
@@ -2741,7 +2833,8 @@ impl Timeline {
} else {
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2755,7 +2848,7 @@ impl Timeline {
layers.next_open_layer_at = Some(end_lsn);
self.last_freeze_at.store(end_lsn);
}
drop(layers);
drop_wlock(guard);
}
/// Layer flusher task's main loop.
@@ -2779,7 +2872,8 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, _) = &*guard;
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2905,12 +2999,12 @@ impl Timeline {
// in-memory layer from the map now.
{
let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front();
let l = layers.0.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer));
assert!(layer_map::compare_arced_layers(&l.unwrap(), &frozen_layer));
// release lock on 'layers'
}
@@ -3041,14 +3135,16 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
batch_updates.insert_historic(l.layer_desc().clone(), l);
batch_updates.insert_historic(l.layer_desc().clone());
mapping.insert(l);
batch_updates.flush();
// update the timeline's physical size
@@ -3098,7 +3194,8 @@ impl Timeline {
) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, _) = &*guard;
let mut max_deltas = 0;
{
@@ -3276,7 +3373,8 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -3298,10 +3396,11 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(l.layer_desc().clone(), l);
updates.insert_historic(l.layer_desc().clone());
mapping.insert(l);
}
updates.flush();
drop(layers);
drop_wlock(guard);
timer.stop_and_record();
Ok(layer_paths_to_upload)
@@ -3311,7 +3410,7 @@ impl Timeline {
#[derive(Default)]
struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>,
deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
}
/// Top-level failure to compact.
@@ -3344,9 +3443,9 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
@@ -3393,6 +3492,7 @@ impl Timeline {
let remotes = deltas_to_compact
.iter()
.map(|l| mapping.get_from_desc(l))
.filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.map(|l| {
@@ -3402,6 +3502,13 @@ impl Timeline {
})
.collect::<Vec<_>>();
let deltas_to_compact_layers = deltas_to_compact
.iter()
.map(|l| mapping.get_from_desc(l))
.collect_vec();
drop_rlock(guard);
if !remotes.is_empty() {
// caller is holding the lock to layer_removal_cs, and we don't want to download while
// holding that; in future download_remote_layer might take it as well. this is
@@ -3428,7 +3535,7 @@ impl Timeline {
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.iter(ctx)),
deltas_to_compact_layers.iter().map(|l| l.iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a {
@@ -3450,7 +3557,7 @@ impl Timeline {
// This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)),
|iter_iter| {
iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a;
@@ -3467,7 +3574,8 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
let guard = self.layers.read().await; // Is'n it better to hold original layers lock till here?
let (layers, _) = &*guard;
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3475,7 +3583,7 @@ impl Timeline {
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
@@ -3500,7 +3608,7 @@ impl Timeline {
}
prev = Some(next_key.next());
}
drop(layers);
drop_rlock(guard);
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
@@ -3704,7 +3812,8 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3736,7 +3845,8 @@ impl Timeline {
LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate,
);
updates.insert_historic(x.layer_desc().clone(), x);
updates.insert_historic(x.layer_desc().clone());
mapping.insert(x);
}
// Now that we have reshuffled the data to set of new delta layers, we can
@@ -3744,10 +3854,10 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact {
layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?;
}
updates.flush();
drop(layers);
drop_wlock(guard);
// Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client {
@@ -3963,7 +4073,8 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().await;
let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4042,7 +4153,7 @@ impl Timeline {
// delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration.
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) {
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
wanted_image_layers.add_range(l.get_key_range());
}
result.layers_not_updated += 1;
@@ -4079,6 +4190,7 @@ impl Timeline {
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
mapping,
)?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1;
}
@@ -4263,42 +4375,15 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().await;
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
let mut guard = self_clone.layers.write().await;
let (layers, mapping) = &mut *guard;
let updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone();
let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) {
Ok(Replacement::Replaced { .. }) => false,
Ok(Replacement::NotFound) => {
// TODO: the downloaded file should probably be removed, otherwise
// it will be added to the layermap on next load? we should
// probably restart any get_reconstruct_data search as well.
//
// See: https://github.com/neondatabase/neon/issues/3533
error!("replacing downloaded layer into layermap failed because layer was not found");
true
}
Ok(Replacement::RemovalBuffered) => {
unreachable!("current implementation does not remove anything")
}
Ok(Replacement::Unexpected(other)) => {
// if the other layer would have the same pointer value as
// expected, it means they differ only on vtables.
//
// otherwise there's no known reason for this to happen as
// compacted layers should have different covering rectangle
// leading to produce Replacement::NotFound.
error!(
expected.ptr = ?Arc::as_ptr(&l),
other.ptr = ?Arc::as_ptr(&other),
?other,
"replacing downloaded layer into layermap failed because another layer was found instead of expected"
);
true
}
let failure = match mapping.replace_and_verify(l, new_layer) {
Ok(()) => false,
Err(e) => {
// this is a precondition failure, the layer filename derived
// attributes didn't match up, which doesn't seem likely.
@@ -4326,7 +4411,7 @@ impl Timeline {
}
}
updates.flush();
drop(layers);
drop_wlock(guard);
info!("on-demand download successful");
@@ -4337,7 +4422,10 @@ impl Timeline {
remote_layer.ongoing_download.close();
} else {
// Keep semaphore open. We'll drop the permit at the end of the function.
error!("layer file download failed: {:?}", result.as_ref().unwrap_err());
error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
}
// Don't treat it as an error if the task that triggered the download
@@ -4351,7 +4439,8 @@ impl Timeline {
drop(permit);
Ok(())
}.in_current_span(),
}
.in_current_span(),
);
receiver.await.context("download task cancelled")?
@@ -4421,9 +4510,11 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
layers
.iter_historic_layers()
.map(|l| mapping.get_from_desc(&l))
.filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl))
@@ -4524,7 +4615,8 @@ impl LocalLayerInfoForDiskUsageEviction {
impl Timeline {
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4533,6 +4625,8 @@ impl Timeline {
let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let l = mapping.get_from_desc(&l);
if l.is_remote_layer() {
continue;
}

View File

@@ -197,9 +197,11 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().await;
let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
let hist_layer = mapping.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() {
continue;
}