Compare commits

...

3 Commits

Author SHA1 Message Date
Alex Chi
a3909e03f8 pgserver: add immutable layer map manager
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 16:25:27 -04:00
Alex Chi
fc190a2a19 resolve merge conflicts
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:56:50 -04:00
Alex Chi
faee3152f3 refactor: use LayerDesc in LayerMap (part 2)
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:54:59 -04:00
13 changed files with 645 additions and 499 deletions

7
Cargo.lock generated
View File

@@ -110,6 +110,12 @@ dependencies = [
"backtrace", "backtrace",
] ]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]] [[package]]
name = "archery" name = "archery"
version = "0.5.0" version = "0.5.0"
@@ -2542,6 +2548,7 @@ name = "pageserver"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-swap",
"async-stream", "async-stream",
"async-trait", "async-trait",
"byteorder", "byteorder",

View File

@@ -32,6 +32,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project ## All dependency versions, used in the project
[workspace.dependencies] [workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] } anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-stream = "0.3" async-stream = "0.3"
async-trait = "0.1" async-trait = "0.1"
atty = "0.2.14" atty = "0.2.14"

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
arc-swap.workspace = true
async-stream.workspace = true async-stream.workspace = true
async-trait.workspace = true async-trait.workspace = true
byteorder.workspace = true byteorder.workspace = true

View File

@@ -1,22 +1,23 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key; use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn; use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion}; use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> { fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::<LayerDescriptor>::default(); let mut layer_map = LayerMap::default();
let mut min_lsn = Lsn(u64::MAX); let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0); let mut max_lsn = Lsn(0);
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start); min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
} }
println!("min: {min_lsn}, max: {max_lsn}"); println!("min: {min_lsn}, max: {max_lsn}");
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
} }
/// Construct a layer map query pattern for benchmarks /// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> { fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right // For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform // before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have // coverage of both the lsn and key space because image layers have
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
// Construct a partitioning for testing get_difficulty map when we // Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with. // don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning { fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new(); let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer, // We add a partition boundary at the start of each image layer,
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 { for i in 0..100_000 {
let i32 = (i as u32) % 100; let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor { let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
key: zero.add(10 * i32)..zero.add(10 * i32 + 1), TenantId::generate(),
lsn: Lsn(i)..Lsn(i + 1), TimelineId::generate(),
is_incremental: false, zero.add(10 * i32)..zero.add(10 * i32 + 1),
short_id: format!("Layer {}", i), Lsn(i),
}; false,
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); 0,
));
updates.insert_historic(layer.layer_desc().clone());
} }
updates.flush(); updates.flush();
println!("Finished layer map init in {:?}", now.elapsed()); println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -87,6 +87,7 @@ pub mod disk_btree;
pub(crate) mod ephemeral_file; pub(crate) mod ephemeral_file;
pub mod layer_map; pub mod layer_map;
pub mod manifest; pub mod manifest;
pub mod layer_map_mgr;
pub mod metadata; pub mod metadata;
mod par_fsync; mod par_fsync;
@@ -562,6 +563,7 @@ impl Tenant {
.layers .layers
.read() .read()
.await .await
.0
.iter_historic_layers() .iter_historic_layers()
.next() .next()
.is_some(), .is_some(),

View File

@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key; use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer; use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage; use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement; pub use historic_layer_coverage::{LayerKey, Replacement};
use super::storage_layer::range_eq; use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc; use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
/// ///
/// LayerMap tracks what layers exist on a timeline. /// LayerMap tracks what layers exist on a timeline.
/// ///
pub struct LayerMap<L: ?Sized> { #[derive(Default, Clone)]
pub struct LayerMap {
// //
// 'open_layer' holds the current InMemoryLayer that is accepting new // 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating // records. If it is None, 'next_open_layer_at' will be set instead, indicating
@@ -95,24 +93,6 @@ pub struct LayerMap<L: ?Sized> {
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>, l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
} }
/// The primary update API for the layer map. /// The primary update API for the layer map.
@@ -120,24 +100,21 @@ impl<L: ?Sized> Default for LayerMap<L> {
/// Batching historic layer insertions and removals is good for /// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly. /// performance and this struct helps us do that correctly.
#[must_use] #[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> { pub struct BatchedUpdates<'a> {
// While we hold this exclusive reference to the layer map the type checker // While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates. // will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>, layer_map: &'a mut LayerMap,
} }
/// Provide ability to batch more updates while hiding the read /// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing. /// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L> impl BatchedUpdates<'_> {
where
L: ?Sized + Layer,
{
/// ///
/// Insert an on-disk layer. /// Insert an on-disk layer.
/// ///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc, layer) self.layer_map.insert_historic_noflush(layer_desc)
} }
/// ///
@@ -145,31 +122,8 @@ where
/// ///
/// This should be called when the corresponding file on disk has been deleted. /// This should be called when the corresponding file on disk has been deleted.
/// ///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc, layer) self.layer_map.remove_historic_noflush(layer_desc)
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
} }
// We will flush on drop anyway, but this method makes it // We will flush on drop anyway, but this method makes it
@@ -185,25 +139,19 @@ where
// than panic later or read without flushing. // than panic later or read without flushing.
// //
// TODO maybe warn if flush hasn't explicitly been called // TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L> impl Drop for BatchedUpdates<'_> {
where
L: ?Sized + Layer,
{
fn drop(&mut self) { fn drop(&mut self) {
self.layer_map.flush_updates(); self.layer_map.flush_updates();
} }
} }
/// Return value of LayerMap::search /// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> { pub struct SearchResult {
pub layer: Arc<L>, pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn, pub lsn_floor: Lsn,
} }
impl<L> LayerMap<L> impl LayerMap {
where
L: ?Sized + Layer,
{
/// ///
/// Find the latest layer (by lsn.end) that covers the given /// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'. /// 'key', with lsn.start < 'end_lsn'.
@@ -235,7 +183,7 @@ where
/// NOTE: This only searches the 'historic' layers, *not* the /// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers! /// 'open' and 'frozen' layers!
/// ///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> { pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128());
@@ -244,7 +192,6 @@ where
(None, None) => None, (None, None) => None,
(None, Some(image)) => { (None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start; let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: image, layer: image,
lsn_floor, lsn_floor,
@@ -252,7 +199,6 @@ where
} }
(Some(delta), None) => { (Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start; let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta,
lsn_floor, lsn_floor,
@@ -263,7 +209,6 @@ where
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn; let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match { if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: image, layer: image,
lsn_floor: img_lsn, lsn_floor: img_lsn,
@@ -271,7 +216,6 @@ where
} else { } else {
let lsn_floor = let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta,
lsn_floor, lsn_floor,
@@ -282,7 +226,7 @@ where
} }
/// Start a batch of updates, applied on drop /// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> { pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
BatchedUpdates { layer_map: self } BatchedUpdates { layer_map: self }
} }
@@ -292,48 +236,32 @@ where
/// Helper function for BatchedUpdates::insert_historic /// Helper function for BatchedUpdates::insert_historic
/// ///
/// TODO(chi): remove L generic so that we do not need to pass layer object. /// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush( pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
// TODO: See #3869, resulting #4088, attempted fix and repro #4094 // TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer) { if Self::is_l0(&layer_desc) {
self.l0_delta_layers.push(layer_desc.clone().into()); self.l0_delta_layers.push(layer_desc.clone().into());
} }
self.historic.insert( self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer), historic_layer_coverage::LayerKey::from(&layer_desc),
layer_desc.into(), layer_desc.into(),
); );
} }
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
/// ///
/// Remove an on-disk layer from the map. /// Remove an on-disk layer from the map.
/// ///
/// Helper function for BatchedUpdates::remove_historic /// Helper function for BatchedUpdates::remove_historic
/// ///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
self.historic self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer)); .remove(historic_layer_coverage::LayerKey::from(&layer_desc));
if Self::is_l0(&layer) { let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
let len_before = self.l0_delta_layers.len(); let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| { l0_delta_layers.retain(|other| other.key() != layer_key);
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
self.l0_delta_layers = l0_delta_layers; self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer, // there's a chance that the comparison fails at runtime due to it comparing (pointer,
@@ -344,69 +272,6 @@ where
"failed to locate removed historic layer from l0_delta_layers" "failed to locate removed historic layer from l0_delta_layers"
); );
} }
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
}
pos
} else {
None
};
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
} }
/// Helper function for BatchedUpdates::drop. /// Helper function for BatchedUpdates::drop.
@@ -454,10 +319,8 @@ where
Ok(true) Ok(true)
} }
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> { pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic self.historic.iter()
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
} }
/// ///
@@ -472,7 +335,7 @@ where
&self, &self,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> { ) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) { let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v, Some(v) => v,
None => return Ok(vec![]), None => return Ok(vec![]),
@@ -482,36 +345,26 @@ where
let end = key_range.end.to_i128(); let end = key_range.end.to_i128();
// Initialize loop variables // Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![]; let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
let mut current_key = start; let mut current_key = start;
let mut current_val = version.image_coverage.query(start); let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals // Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) { for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push(( coverage.push((kr, current_val.take()));
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
current_key = change_key; current_key = change_key;
current_val = change_val.clone(); current_val = change_val.clone();
} }
// Add the final interval // Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end); let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push(( coverage.push((kr, current_val.take()));
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
Ok(coverage) Ok(coverage)
} }
pub fn is_l0(layer: &L) -> bool { pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX)) range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
} }
@@ -537,7 +390,7 @@ where
/// TODO The optimal number should probably be slightly higher than 1, but to /// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function /// implement that we need to plumb a lot more context into this function
/// than just the current partition_range. /// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool { pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
// Case 1 // Case 1
if !Self::is_l0(layer) { if !Self::is_l0(layer) {
return true; return true;
@@ -595,9 +448,7 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() { if !kr.is_empty() {
let base_count = let base_count = Self::is_reimage_worthy(&val, key) as usize;
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count); let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?; self.count_deltas(&kr, &lr, new_limit)?;
@@ -620,9 +471,7 @@ where
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() { if !kr.is_empty() {
let base_count = let base_count = Self::is_reimage_worthy(&val, key) as usize;
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count); let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max( max_stacked_deltas = std::cmp::max(
@@ -772,12 +621,8 @@ where
} }
/// Return all L0 delta layers /// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> { pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self Ok(self.l0_delta_layers.to_vec())
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
} }
/// debugging function to print out the contents of the layer map /// debugging function to print out the contents of the layer map
@@ -802,72 +647,79 @@ where
println!("End dump LayerMap"); println!("End dump LayerMap");
Ok(()) Ok(())
} }
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
/// ///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise. /// Returns `true` if the two `Arc` point to the same layer, false otherwise.
#[inline(always)] ///
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool { /// If comparing persistent layers, ALWAYS compare the layer descriptor key.
// "dyn Trait" objects are "fat pointers" in that they have two components: #[inline(always)]
// - pointer to the object pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// - pointer to the vtable // "dyn Trait" objects are "fat pointers" in that they have two components:
// // - pointer to the object
// rust does not provide a guarantee that these vtables are unique, but however // - pointer to the vtable
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the //
// pointer and the vtable need to be equal. // rust does not provide a guarantee that these vtables are unique, but however
// // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// See: https://github.com/rust-lang/rust/issues/103763 // pointer and the vtable need to be equal.
// //
// A future version of rust will most likely use this form below, where we cast each // See: https://github.com/rust-lang/rust/issues/103763
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it //
// not affect the comparison. // A future version of rust will most likely use this form below, where we cast each
// // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// See: https://github.com/rust-lang/rust/pull/106450 // not affect the comparison.
let left = Arc::as_ptr(left) as *const (); //
let right = Arc::as_ptr(right) as *const (); // See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right left == right
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{LayerMap, Replacement}; use super::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
mod l0_delta_layers_updated { mod l0_delta_layers_updated {
use crate::tenant::{
storage_layer::{PersistentLayer, PersistentLayerDesc},
timeline::LayerMapping,
};
use super::*; use super::*;
#[test] #[test]
fn for_full_range_delta() { fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates // l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true true
) )
} }
#[test] #[test]
fn for_non_full_range_delta() { fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta // has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range // because not full range
false false
) )
} }
#[test] #[test]
fn for_image() { fn for_image() {
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must // code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images // mean we should in practice never have full range images
false false
) )
} }
#[test] #[test]
@@ -883,16 +735,16 @@ mod tests {
let not_found = Arc::new(layer.clone()); let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer); let new_version = Arc::new(layer);
let mut map = LayerMap::default(); // after the immutable storage state refactor, the replace operation
// will not use layer map any more. We keep it here for consistency in test cases
// and can remove it in the future.
let _map = LayerMap::default();
let res = map.batch_update().replace_historic( let mut mapping = LayerMapping::new();
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}"); mapping
.replace_and_verify(not_found, new_version)
.unwrap_err();
} }
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
@@ -903,49 +755,44 @@ mod tests {
let downloaded = Arc::new(skeleton); let downloaded = Arc::new(skeleton);
let mut map = LayerMap::default(); let mut map = LayerMap::default();
let mut mapping = LayerMapping::new();
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the // two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers. // same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded)); assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0)); let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update() map.batch_update()
.insert_historic(remote.get_persistent_layer_desc(), remote.clone()); .insert_historic(remote.layer_desc().clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts); mapping.insert(remote.clone());
assert_eq!(
let replaced = map count_layer_in(&map, remote.layer_desc()),
.batch_update() expected_in_counts
.replace_historic( );
remote.get_persistent_layer_desc(),
&remote, mapping
downloaded.get_persistent_layer_desc(), .replace_and_verify(remote, downloaded.clone())
downloaded.clone(), .expect("name derived attributes are the same");
) assert_eq!(
.expect("name derived attributes are the same"); count_layer_in(&map, downloaded.layer_desc()),
assert!( expected_in_counts
matches!(replaced, Replacement::Replaced { .. }),
"{replaced:?}"
); );
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update() map.batch_update()
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); .remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
} }
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) { fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map let historic = map
.iter_historic_layers() .iter_historic_layers()
.filter(|x| LayerMap::compare_arced_layers(x, layer)) .filter(|x| x.key() == layer.key())
.count(); .count();
let l0s = map let l0s = map
.get_level0_deltas() .get_level0_deltas()
.expect("why does this return a result"); .expect("why does this return a result");
let l0 = l0s let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
.iter()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
(historic, l0) (historic, l0)
} }

View File

@@ -3,6 +3,8 @@ use std::ops::Range;
use tracing::info; use tracing::info;
use crate::tenant::storage_layer::PersistentLayerDesc;
use super::layer_coverage::LayerCoverageTuple; use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data. /// Layers in this module are identified and indexed by this data.
@@ -53,11 +55,24 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
} }
} }
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
}
}
}
/// Efficiently queryable layer coverage for each LSN. /// Efficiently queryable layer coverage for each LSN.
/// ///
/// Allows answering layer map queries very efficiently, /// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is /// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage. /// sometimes necessary. See BufferedHistoricLayerCoverage.
#[derive(Clone)]
pub struct HistoricLayerCoverage<Value> { pub struct HistoricLayerCoverage<Value> {
/// The latest state /// The latest state
head: LayerCoverageTuple<Value>, head: LayerCoverageTuple<Value>,
@@ -411,6 +426,7 @@ fn test_persistent_overlapping() {
/// ///
/// See this for more on persistent and retroactive techniques: /// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s /// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
#[derive(Clone)]
pub struct BufferedHistoricLayerCoverage<Value> { pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update /// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>, historic_coverage: HistoricLayerCoverage<Value>,
@@ -467,6 +483,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
/// ///
/// Returns a `Replacement` value describing the outcome; only the case of /// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild. /// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[allow(dead_code)]
pub fn replace<F>( pub fn replace<F>(
&mut self, &mut self,
layer_key: &LayerKey, layer_key: &LayerKey,

View File

@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
/// ///
/// NOTE The struct is parameterized over Value for easier /// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer. /// testing, but in practice it's some sort of layer.
#[derive(Clone)]
pub struct LayerCoverage<Value> { pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space) /// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value). /// we store (lsn.end, value).
@@ -139,6 +140,7 @@ impl<Value: Clone> LayerCoverage<Value> {
} }
/// Image and delta coverage at a specific LSN. /// Image and delta coverage at a specific LSN.
#[derive(Clone)]
pub struct LayerCoverageTuple<Value> { pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>, pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>, pub delta_coverage: LayerCoverage<Value>,

View File

@@ -0,0 +1,146 @@
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
//!
//! A common usage pattern is as follows:
//!
//! ```ignore
//! async fn compaction(&self) {
//! // Get the current state.
//! let state = self.layer_map_mgr.read();
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
//! // take a long time.
//! let compaction_result = self.do_compaction(&state).await?;
//! // Update the state.
//! self.layer_map_mgr.update(|mut state| async move {
//! // do updates to the state, return it.
//! Ok(state)
//! }).await?;
//! }
//! ```
use anyhow::Result;
use arc_swap::ArcSwap;
use futures::Future;
use std::sync::Arc;
use super::layer_map::LayerMap;
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
/// layer map.
pub struct LayerMapMgr {
layer_map: ArcSwap<LayerMap>,
state_lock: tokio::sync::Mutex<()>,
}
impl LayerMapMgr {
/// Get the current state of the layer map.
pub fn read(&self) -> Arc<LayerMap> {
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
// here.
self.layer_map.load_full()
}
/// Clone the layer map for modification.
fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap {
(**self.layer_map.load()).clone()
}
pub fn new(layer_map: LayerMap) -> Self {
Self {
layer_map: ArcSwap::new(Arc::new(layer_map)),
state_lock: tokio::sync::Mutex::new(()),
}
}
/// Update the layer map.
pub async fn update<O, F>(&self, operation: O) -> Result<()>
where
O: FnOnce(LayerMap) -> F,
F: Future<Output = Result<LayerMap>>,
{
let state_lock = self.state_lock.lock().await;
let state = self.clone_for_write(&state_lock);
let new_state = operation(state).await?;
self.layer_map.store(Arc::new(new_state));
Ok(())
}
}
#[cfg(test)]
mod tests {
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
use super::*;
#[tokio::test]
async fn test_layer_map_manage() -> Result<()> {
let mgr = LayerMapMgr::new(Default::default());
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(0)..Key::from_i128(1),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_1 = mgr.read();
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(1)..Key::from_i128(2),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_2 = mgr.read();
// Modification should not be visible to the old reference.
assert_eq!(
ref_1
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
// Modification should be visible to the new reference.
assert_eq!(
ref_2
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert_eq!(
ref_2
.search(Key::from_i128(1), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(1)..Key::from_i128(2)
);
Ok(())
}
}

View File

@@ -176,13 +176,10 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
/// ///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock. /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>( pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>, layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus, status: LayerResidenceStatus,
) -> Self ) -> Self {
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event( new.record_residence_event(
layer_map_lock_held_witness, layer_map_lock_held_witness,
@@ -197,14 +194,11 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`. /// The `new_status` is not recorded in `self`.
/// ///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock. /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>( pub(crate) fn clone_for_residence_change(
&self, &self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>, layer_map_lock_held_witness: &BatchedUpdates<'_>,
new_status: LayerResidenceStatus, new_status: LayerResidenceStatus,
) -> LayerAccessStats ) -> LayerAccessStats {
where
L: ?Sized + Layer,
{
let clone = { let clone = {
let inner = self.0.lock().unwrap(); let inner = self.0.lock().unwrap();
inner.clone() inner.clone()
@@ -232,14 +226,12 @@ impl LayerAccessStats {
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
/// ///
pub(crate) fn record_residence_event<L>( pub(crate) fn record_residence_event(
&self, &self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>, _layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus, status: LayerResidenceStatus,
reason: LayerResidenceEventReason, reason: LayerResidenceEventReason,
) where ) {
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap(); let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| { locked.iter_mut().for_each(|inner| {
inner inner
@@ -473,94 +465,125 @@ pub fn downcast_remote_layer(
} }
} }
/// Holds metadata about a layer without any content. Used mostly for testing. pub mod tests {
/// use super::*;
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
impl LayerDescriptor { /// Holds metadata about a layer without any content. Used mostly for testing.
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, ///
/// and the tenant / timeline id does not matter. /// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { /// LayerDescriptor.
PersistentLayerDesc::new_delta( #[derive(Clone, Debug)]
TenantId::from_array([0; 16]), pub struct LayerDescriptor {
TimelineId::from_array([0; 16]), base: PersistentLayerDesc,
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
} }
fn get_lsn_range(&self) -> Range<Lsn> { impl From<PersistentLayerDesc> for LayerDescriptor {
self.lsn.clone() fn from(base: PersistentLayerDesc) -> Self {
} Self { base }
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
let short_id = value.to_string();
LayerDescriptor {
key: value.key_range,
lsn: value.lsn_range,
is_incremental: true,
short_id,
} }
} }
}
impl From<ImageFileName> for LayerDescriptor { impl Layer for LayerDescriptor {
fn from(value: ImageFileName) -> Self { fn get_value_reconstruct_data(
let short_id = value.to_string(); &self,
let lsn = value.lsn_as_range(); _key: Key,
LayerDescriptor { _lsn_range: Range<Lsn>,
key: value.key_range, _reconstruct_data: &mut ValueReconstructState,
lsn, _ctx: &RequestContext,
is_incremental: false, ) -> Result<ValueReconstructResult> {
short_id, todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
} }
} }
}
impl From<LayerFileName> for LayerDescriptor { impl PersistentLayer for LayerDescriptor {
fn from(value: LayerFileName) -> Self { fn layer_desc(&self) -> &PersistentLayerDesc {
match value { &self.base
LayerFileName::Delta(d) => Self::from(d), }
LayerFileName::Image(i) => Self::from(i),
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
} }
} }
} }

View File

@@ -218,15 +218,12 @@ impl RemoteLayer {
} }
/// Create a Layer struct representing this layer, after it has been downloaded. /// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer<L>( pub fn create_downloaded_layer(
&self, &self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>, layer_map_lock_held_witness: &BatchedUpdates<'_>,
conf: &'static PageServerConf, conf: &'static PageServerConf,
file_size: u64, file_size: u64,
) -> Arc<dyn PersistentLayer> ) -> Arc<dyn PersistentLayer> {
where
L: ?Sized + Layer,
{
if self.desc.is_delta { if self.desc.is_delta {
let fname = self.desc.delta_file_name(); let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new( Arc::new(DeltaLayer::new(

View File

@@ -3,7 +3,7 @@
mod eviction_task; mod eviction_task;
mod walreceiver; mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context}; use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes; use bytes::Bytes;
use fail::fail_point; use fail::fail_point;
use futures::StreamExt; use futures::StreamExt;
@@ -52,6 +52,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key}; use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError}; use crate::pgdatadir_mapping::{BlockNumber, CalculateLogicalSizeError};
use crate::tenant::config::{EvictionPolicy, TenantConfOpt}; use crate::tenant::config::{EvictionPolicy, TenantConfOpt};
use crate::tenant::layer_map;
use pageserver_api::reltag::RelTag; use pageserver_api::reltag::RelTag;
use postgres_connection::PgConnectionConfig; use postgres_connection::PgConnectionConfig;
@@ -81,7 +82,9 @@ use super::config::TenantConf;
use super::layer_map::BatchedUpdates; use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient; use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)] #[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState { pub(super) enum FlushLoopState {
@@ -114,6 +117,80 @@ impl PartialOrd for Hole {
} }
} }
pub struct LayerMapping(HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>);
impl LayerMapping {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.0.get(&desc.key()).expect("not found").clone()
}
pub(crate) fn insert(&mut self, layer: Arc<dyn PersistentLayer>) {
self.0.insert(layer.layer_desc().key(), layer);
}
pub(crate) fn new() -> Self {
Self(HashMap::new())
}
pub(crate) fn remove(&mut self, layer: Arc<dyn PersistentLayer>) {
self.0.remove(&layer.layer_desc().key());
}
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
use super::layer_map::LayerKey;
let key = LayerKey::from(&*expected);
let other = LayerKey::from(&*new);
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<'_, T>) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
drop(rlock)
}
pub struct Timeline { pub struct Timeline {
conf: &'static PageServerConf, conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>, tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -125,7 +202,7 @@ pub struct Timeline {
pub pg_version: u32, pub pg_version: u32,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>, pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>,
/// Set of key ranges which should be covered by image layers to /// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -598,7 +675,8 @@ impl Timeline {
/// This method makes no distinction between local and remote layers. /// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**. /// Hence, the result **does not represent local filesystem usage**.
pub async fn layer_size_sum(&self) -> u64 { pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await; let guard = self.layers.read().await;
let (layer_map, _) = &*guard;
let mut size = 0; let mut size = 0;
for l in layer_map.iter_historic_layers() { for l in layer_map.iter_historic_layers() {
size += l.file_size(); size += l.file_size();
@@ -908,7 +986,8 @@ impl Timeline {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> { pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn(); let last_lsn = self.get_last_record_lsn();
let open_layer_size = { let open_layer_size = {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
let Some(open_layer) = layers.open_layer.as_ref() else { let Some(open_layer) = layers.open_layer.as_ref() else {
return Ok(()); return Ok(());
}; };
@@ -1039,7 +1118,8 @@ impl Timeline {
} }
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await; let guard = self.layers.read().await;
let (layer_map, mapping) = &*guard;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer { if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info()); in_memory_layers.push(open_layer.info());
@@ -1050,6 +1130,7 @@ impl Timeline {
let mut historic_layers = Vec::new(); let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() { for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = mapping.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset)); historic_layers.push(historic_layer.info(reset));
} }
@@ -1159,7 +1240,8 @@ impl Timeline {
} }
// start the batch update // start the batch update
let mut layer_map = self.layers.write().await; let mut guard = self.layers.write().await;
let (layer_map, mapping) = &mut *guard;
let mut batch_updates = layer_map.batch_update(); let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len()); let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1168,14 +1250,19 @@ impl Timeline {
let res = if cancel.is_cancelled() { let res = if cancel.is_cancelled() {
None None
} else { } else {
Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) Some(self.evict_layer_batch_impl(
&layer_removal_guard,
l,
&mut batch_updates,
mapping,
))
}; };
results.push(res); results.push(res);
} }
// commit the updates & release locks // commit the updates & release locks
batch_updates.flush(); batch_updates.flush();
drop(layer_map); drop_wlock(guard);
drop(layer_removal_guard); drop(layer_removal_guard);
assert_eq!(results.len(), layers_to_evict.len()); assert_eq!(results.len(), layers_to_evict.len());
@@ -1186,10 +1273,9 @@ impl Timeline {
&self, &self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>, local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, batch_updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerMapping,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if local_layer.is_remote_layer() { if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false, // TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later // which is the same out the match later
@@ -1237,13 +1323,10 @@ impl Timeline {
), ),
}); });
let replaced = match batch_updates.replace_historic( assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
local_layer.layer_desc().clone(),
local_layer, let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) {
new_remote_layer.layer_desc().clone(), Ok(()) => {
new_remote_layer,
)? {
Replacement::Replaced { .. } => {
if let Err(e) = local_layer.delete_resident_layer_file() { if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}"); error!("failed to remove layer file on evict after replacement: {e:#?}");
} }
@@ -1276,24 +1359,17 @@ impl Timeline {
true true
} }
Replacement::NotFound => { Err(err) => {
debug!(evicted=?local_layer, "layer was no longer in layer map"); if cfg!(debug_assertions) {
false error!(evicted=?local_layer, "failed to replace: {err}");
} } else {
Replacement::RemovalBuffered => { error!(evicted=?local_layer, "failed to replace: {err}");
unreachable!("not doing anything else in this batch") }
}
Replacement::Unexpected(other) => {
error!(
local_layer.ptr=?Arc::as_ptr(local_layer),
other.ptr=?Arc::as_ptr(&other),
?other,
"failed to replace");
false false
} }
}; };
Ok(replaced) Ok(succeed)
} }
} }
@@ -1417,7 +1493,7 @@ impl Timeline {
timeline_id, timeline_id,
tenant_id, tenant_id,
pg_version, pg_version,
layers: tokio::sync::RwLock::new(LayerMap::default()), layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())),
wanted_image_layers: Mutex::new(None), wanted_image_layers: Mutex::new(None),
walredo_mgr, walredo_mgr,
@@ -1601,14 +1677,15 @@ impl Timeline {
let mut layers = self.layers.try_write().expect( let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object", "in the context where we call this function, no other task has access to the object",
); );
layers.next_open_layer_at = Some(Lsn(start_lsn.0)); layers.0.next_open_layer_at = Some(Lsn(start_lsn.0));
} }
/// ///
/// Scan the timeline directory to populate the layer map. /// Scan the timeline directory to populate the layer map.
/// ///
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let mut num_layers = 0; let mut num_layers = 0;
@@ -1651,7 +1728,8 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
mapping.insert(Arc::new(layer));
num_layers += 1; num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file. // Create a DeltaLayer struct for each delta file.
@@ -1683,7 +1761,8 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
mapping.insert(Arc::new(layer));
num_layers += 1; num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these // ignore these
@@ -1737,7 +1816,8 @@ impl Timeline {
// We're holding a layer map lock for a while but this // We're holding a layer map lock for a while but this
// method is only called during init so it's fine. // method is only called during init so it's fine.
let mut layer_map = self.layers.write().await; let mut guard = self.layers.write().await;
let (layer_map, mapping) = &mut *guard;
let mut updates = layer_map.batch_update(); let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers { for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name); let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1782,7 +1862,8 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else { } else {
self.metrics.resident_physical_size_gauge.sub(local_size); self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer.layer_desc().clone(), local_layer); updates.remove_historic(local_layer.layer_desc().clone());
mapping.remove(local_layer);
// fall-through to adding the remote layer // fall-through to adding the remote layer
} }
} else { } else {
@@ -1821,7 +1902,8 @@ impl Timeline {
); );
let remote_layer = Arc::new(remote_layer); let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); updates.insert_historic(remote_layer.layer_desc().clone());
mapping.insert(remote_layer);
} }
LayerFileName::Delta(deltafilename) => { LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file. // Create a RemoteLayer for the delta file.
@@ -1848,7 +1930,8 @@ impl Timeline {
), ),
); );
let remote_layer = Arc::new(remote_layer); let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); updates.insert_historic(remote_layer.layer_desc().clone());
mapping.insert(remote_layer);
} }
} }
} }
@@ -1887,13 +1970,14 @@ impl Timeline {
let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn();
let local_layers = self let local_layers = {
.layers let guard = self.layers.read().await;
.read() let (layers, mapping) = &*guard;
.await layers
.iter_historic_layers() .iter_historic_layers()
.map(|l| (l.filename(), l)) .map(|l| (l.filename(), mapping.get_from_desc(&l)))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>()
};
// If no writes happen, new branches do not have any layers, only the metadata file. // If no writes happen, new branches do not have any layers, only the metadata file.
let has_local_layers = !local_layers.is_empty(); let has_local_layers = !local_layers.is_empty();
@@ -2264,10 +2348,12 @@ impl Timeline {
} }
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> { async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() { let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
for historic_layer in layers.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name(); let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name { if layer_file_name == historic_layer_name {
return Some(historic_layer); return Some(mapping.get_from_desc(&historic_layer));
} }
} }
@@ -2280,9 +2366,11 @@ impl Timeline {
&self, &self,
// we cannot remove layers otherwise, since gc and compaction will race // we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>, _layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layer: Arc<dyn PersistentLayer>, layer: Arc<PersistentLayerDesc>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerMapping,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let layer = mapping.get_from_desc(&layer);
if !layer.is_remote_layer() { if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?; layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size(); let layer_file_size = layer.file_size();
@@ -2296,7 +2384,8 @@ impl Timeline {
// won't be needed for page reconstruction for this timeline, // won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer // and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index. // map index without actually rebuilding the index.
updates.remove_historic(layer.layer_desc().clone(), layer); updates.remove_historic(layer.layer_desc().clone());
mapping.remove(layer);
Ok(()) Ok(())
} }
@@ -2481,7 +2570,8 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop #[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop { 'layer_map_search: loop {
let remote_layer = { let remote_layer = {
let layers = timeline.layers.read().await; let guard = timeline.layers.read().await;
let (layers, mapping) = &*guard;
// Check the open and frozen in-memory layers first, in order from newest // Check the open and frozen in-memory layers first, in order from newest
// to oldest. // to oldest.
@@ -2543,6 +2633,7 @@ impl Timeline {
} }
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = mapping.get_from_desc(&layer);
// If it's a remote layer, download it and retry. // If it's a remote layer, download it and retry.
if let Some(remote_layer) = if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer) super::storage_layer::downcast_remote_layer(&layer)
@@ -2664,7 +2755,8 @@ impl Timeline {
/// Get a handle to the latest layer for appending. /// Get a handle to the latest layer for appending.
/// ///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> { async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
ensure!(lsn.is_aligned()); ensure!(lsn.is_aligned());
@@ -2741,7 +2833,8 @@ impl Timeline {
} else { } else {
Some(self.write_lock.lock().await) Some(self.write_lock.lock().await)
}; };
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
if let Some(open_layer) = &layers.open_layer { if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer); let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing? // Does this layer need freezing?
@@ -2755,7 +2848,7 @@ impl Timeline {
layers.next_open_layer_at = Some(end_lsn); layers.next_open_layer_at = Some(end_lsn);
self.last_freeze_at.store(end_lsn); self.last_freeze_at.store(end_lsn);
} }
drop(layers); drop_wlock(guard);
} }
/// Layer flusher task's main loop. /// Layer flusher task's main loop.
@@ -2779,7 +2872,8 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow(); let flush_counter = *layer_flush_start_rx.borrow();
let result = loop { let result = loop {
let layer_to_flush = { let layer_to_flush = {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
layers.frozen_layers.front().cloned() layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes // drop 'layers' lock to allow concurrent reads and writes
}; };
@@ -2905,12 +2999,12 @@ impl Timeline {
// in-memory layer from the map now. // in-memory layer from the map now.
{ {
let mut layers = self.layers.write().await; let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front(); let l = layers.0.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this // Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen // timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work. // layer to disk at the same time, that would not work.
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer)); assert!(layer_map::compare_arced_layers(&l.unwrap(), &frozen_layer));
// release lock on 'layers' // release lock on 'layers'
} }
@@ -3041,14 +3135,16 @@ impl Timeline {
// Add it to the layer map // Add it to the layer map
let l = Arc::new(new_delta); let l = Arc::new(new_delta);
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut batch_updates = layers.batch_update(); let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event( l.access_stats().record_residence_event(
&batch_updates, &batch_updates,
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.insert_historic(l.layer_desc().clone());
mapping.insert(l);
batch_updates.flush(); batch_updates.flush();
// update the timeline's physical size // update the timeline's physical size
@@ -3098,7 +3194,8 @@ impl Timeline {
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold(); let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
let mut max_deltas = 0; let mut max_deltas = 0;
{ {
@@ -3276,7 +3373,8 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -3298,10 +3396,11 @@ impl Timeline {
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
updates.insert_historic(l.layer_desc().clone(), l); updates.insert_historic(l.layer_desc().clone());
mapping.insert(l);
} }
updates.flush(); updates.flush();
drop(layers); drop_wlock(guard);
timer.stop_and_record(); timer.stop_and_record();
Ok(layer_paths_to_upload) Ok(layer_paths_to_upload)
@@ -3311,7 +3410,7 @@ impl Timeline {
#[derive(Default)] #[derive(Default)]
struct CompactLevel0Phase1Result { struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>, new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>, deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
} }
/// Top-level failure to compact. /// Top-level failure to compact.
@@ -3344,9 +3443,9 @@ impl Timeline {
target_file_size: u64, target_file_size: u64,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> { ) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let mut level0_deltas = layers.get_level0_deltas()?; let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
// Only compact if enough layers have accumulated. // Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold(); let threshold = self.get_compaction_threshold();
@@ -3393,6 +3492,7 @@ impl Timeline {
let remotes = deltas_to_compact let remotes = deltas_to_compact
.iter() .iter()
.map(|l| mapping.get_from_desc(l))
.filter(|l| l.is_remote_layer()) .filter(|l| l.is_remote_layer())
.inspect(|l| info!("compact requires download of {}", l.filename().file_name())) .inspect(|l| info!("compact requires download of {}", l.filename().file_name()))
.map(|l| { .map(|l| {
@@ -3402,6 +3502,13 @@ impl Timeline {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let deltas_to_compact_layers = deltas_to_compact
.iter()
.map(|l| mapping.get_from_desc(l))
.collect_vec();
drop_rlock(guard);
if !remotes.is_empty() { if !remotes.is_empty() {
// caller is holding the lock to layer_removal_cs, and we don't want to download while // caller is holding the lock to layer_removal_cs, and we don't want to download while
// holding that; in future download_remote_layer might take it as well. this is // holding that; in future download_remote_layer might take it as well. this is
@@ -3428,7 +3535,7 @@ impl Timeline {
// This iterator walks through all key-value pairs from all the layers // This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order. // we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results( let all_values_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.iter(ctx)), deltas_to_compact_layers.iter().map(|l| l.iter(ctx)),
|iter_iter| { |iter_iter| {
iter_iter.kmerge_by(|a, b| { iter_iter.kmerge_by(|a, b| {
if let Ok((a_key, a_lsn, _)) = a { if let Ok((a_key, a_lsn, _)) = a {
@@ -3450,7 +3557,7 @@ impl Timeline {
// This iterator walks through all keys and is needed to calculate size used by each key // This iterator walks through all keys and is needed to calculate size used by each key
let mut all_keys_iter = itertools::process_results( let mut all_keys_iter = itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)), deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)),
|iter_iter| { |iter_iter| {
iter_iter.kmerge_by(|a, b| { iter_iter.kmerge_by(|a, b| {
let (a_key, a_lsn, _) = a; let (a_key, a_lsn, _) = a;
@@ -3467,7 +3574,8 @@ impl Timeline {
// Determine N largest holes where N is number of compacted layers. // Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len(); let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn(); let last_record_lsn = self.get_last_record_lsn();
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here? let guard = self.layers.read().await; // Is'n it better to hold original layers lock till here?
let (layers, _) = &*guard;
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible? let min_hole_coverage_size = 3; // TODO: something more flexible?
@@ -3475,7 +3583,7 @@ impl Timeline {
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1); let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None; let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results( for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)), deltas_to_compact_layers.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0), |iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? { )? {
if let Some(prev_key) = prev { if let Some(prev_key) = prev {
@@ -3500,7 +3608,7 @@ impl Timeline {
} }
prev = Some(next_key.next()); prev = Some(next_key.next());
} }
drop(layers); drop_rlock(guard);
let mut holes = heap.into_vec(); let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start); holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector let mut next_hole = 0; // index of next hole in holes vector
@@ -3704,7 +3812,8 @@ impl Timeline {
.context("wait for layer upload ops to complete")?; .context("wait for layer upload ops to complete")?;
} }
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers { for l in new_layers {
@@ -3736,7 +3845,8 @@ impl Timeline {
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
updates.insert_historic(x.layer_desc().clone(), x); updates.insert_historic(x.layer_desc().clone());
mapping.insert(x);
} }
// Now that we have reshuffled the data to set of new delta layers, we can // Now that we have reshuffled the data to set of new delta layers, we can
@@ -3744,10 +3854,10 @@ impl Timeline {
let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len());
for l in deltas_to_compact { for l in deltas_to_compact {
layer_names_to_delete.push(l.filename()); layer_names_to_delete.push(l.filename());
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?;
} }
updates.flush(); updates.flush();
drop(layers); drop_wlock(guard);
// Also schedule the deletions in remote storage // Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client { if let Some(remote_client) = &self.remote_client {
@@ -3963,7 +4073,8 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range // 4. newer on-disk image layers cover the layer's whole key range
// //
// TODO holding a write lock is too agressive and avoidable // TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, mapping) = &mut *guard;
'outer: for l in layers.iter_historic_layers() { 'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1; result.layers_total += 1;
@@ -4042,7 +4153,7 @@ impl Timeline {
// delta layers. Image layers can form "stairs" preventing old image from been deleted. // delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some // But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration. // protection from replacing recent image layers with new one after each GC iteration.
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) { if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
wanted_image_layers.add_range(l.get_key_range()); wanted_image_layers.add_range(l.get_key_range());
} }
result.layers_not_updated += 1; result.layers_not_updated += 1;
@@ -4079,6 +4190,7 @@ impl Timeline {
layer_removal_cs.clone(), layer_removal_cs.clone(),
doomed_layer, doomed_layer,
&mut updates, &mut updates,
mapping,
)?; // FIXME: schedule succeeded deletions before returning? )?; // FIXME: schedule succeeded deletions before returning?
result.layers_removed += 1; result.layers_removed += 1;
} }
@@ -4263,42 +4375,15 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding // Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map. // Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().await; let mut guard = self_clone.layers.write().await;
let mut updates = layers.batch_update(); let (layers, mapping) = &mut *guard;
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); let updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{ {
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone(); let l: Arc<dyn PersistentLayer> = remote_layer.clone();
let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) { let failure = match mapping.replace_and_verify(l, new_layer) {
Ok(Replacement::Replaced { .. }) => false, Ok(()) => false,
Ok(Replacement::NotFound) => {
// TODO: the downloaded file should probably be removed, otherwise
// it will be added to the layermap on next load? we should
// probably restart any get_reconstruct_data search as well.
//
// See: https://github.com/neondatabase/neon/issues/3533
error!("replacing downloaded layer into layermap failed because layer was not found");
true
}
Ok(Replacement::RemovalBuffered) => {
unreachable!("current implementation does not remove anything")
}
Ok(Replacement::Unexpected(other)) => {
// if the other layer would have the same pointer value as
// expected, it means they differ only on vtables.
//
// otherwise there's no known reason for this to happen as
// compacted layers should have different covering rectangle
// leading to produce Replacement::NotFound.
error!(
expected.ptr = ?Arc::as_ptr(&l),
other.ptr = ?Arc::as_ptr(&other),
?other,
"replacing downloaded layer into layermap failed because another layer was found instead of expected"
);
true
}
Err(e) => { Err(e) => {
// this is a precondition failure, the layer filename derived // this is a precondition failure, the layer filename derived
// attributes didn't match up, which doesn't seem likely. // attributes didn't match up, which doesn't seem likely.
@@ -4326,7 +4411,7 @@ impl Timeline {
} }
} }
updates.flush(); updates.flush();
drop(layers); drop_wlock(guard);
info!("on-demand download successful"); info!("on-demand download successful");
@@ -4337,7 +4422,10 @@ impl Timeline {
remote_layer.ongoing_download.close(); remote_layer.ongoing_download.close();
} else { } else {
// Keep semaphore open. We'll drop the permit at the end of the function. // Keep semaphore open. We'll drop the permit at the end of the function.
error!("layer file download failed: {:?}", result.as_ref().unwrap_err()); error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
} }
// Don't treat it as an error if the task that triggered the download // Don't treat it as an error if the task that triggered the download
@@ -4351,7 +4439,8 @@ impl Timeline {
drop(permit); drop(permit);
Ok(()) Ok(())
}.in_current_span(), }
.in_current_span(),
); );
receiver.await.context("download task cancelled")? receiver.await.context("download task cancelled")?
@@ -4421,9 +4510,11 @@ impl Timeline {
) { ) {
let mut downloads = Vec::new(); let mut downloads = Vec::new();
{ {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
layers layers
.iter_historic_layers() .iter_historic_layers()
.map(|l| mapping.get_from_desc(&l))
.filter_map(|l| l.downcast_remote_layer()) .filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l)) .map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl)) .for_each(|dl| downloads.push(dl))
@@ -4524,7 +4615,8 @@ impl LocalLayerInfoForDiskUsageEviction {
impl Timeline { impl Timeline {
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let mut max_layer_size: Option<u64> = None; let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new(); let mut resident_layers = Vec::new();
@@ -4533,6 +4625,8 @@ impl Timeline {
let file_size = l.file_size(); let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let l = mapping.get_from_desc(&l);
if l.is_remote_layer() { if l.is_remote_layer() {
continue; continue;
} }

View File

@@ -197,9 +197,11 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction. // We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this. // So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = { let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, mapping) = &*guard;
let mut candidates = Vec::new(); let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() { for hist_layer in layers.iter_historic_layers() {
let hist_layer = mapping.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() { if hist_layer.is_remote_layer() {
continue; continue;
} }