Compare commits

...

22 Commits

Author SHA1 Message Date
Alex Chi
19180e167f remove LayerKey usage
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-28 10:00:02 -04:00
Alex Chi
b2cd142836 fix tests
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-28 09:58:31 -04:00
Alex Chi
f2d7baf0ba rename DeleteGuard -> LayerDeletionGuard
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 16:57:11 -04:00
Alex Chi
113a4256d4 rename lcache to layer_cache
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 16:55:56 -04:00
Alex Chi
be4999713a add comments for LayerCache
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 16:54:51 -04:00
Alex Chi
7335f155c3 fmt
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 10:31:15 -04:00
Alex Chi
a1ca70ff35 Merge branch 'skyzh/layermap-ref-2' of https://github.com/neondatabase/neon into skyzh/layermap-as-cache 2023-06-27 10:26:13 -04:00
Alex Chi
ce1e57faea fix merge conflicts
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 10:25:06 -04:00
Alex Chi
6f50bec781 fix merge conflicts
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 10:24:48 -04:00
Alex Chi
b981702ecf Merge branch 'skyzh/layermap-ref-2' of https://github.com/neondatabase/neon into skyzh/layermap-as-cache 2023-06-27 10:23:08 -04:00
Alex Chi
21d30fc43f use layer_desc key for replace cmp
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 10:16:30 -04:00
Alex Chi
137ad83f37 fix merge conflicts
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 10:14:09 -04:00
Alex Chi
22da36bc02 fix merge conflicts
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-27 10:04:21 -04:00
Alex Chi
900ef3d92b Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/layermap-ref-2 2023-06-27 10:02:57 -04:00
Alex Chi
b7923fa0be use new errmsg
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-23 16:27:49 -04:00
Alex Chi
4c4a531d5e rename LayerMapping -> LayerFileManager
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-23 15:52:09 -04:00
Alex Chi
2b4f96345b resolve comments
Signed-off-by: Alex Chi <chi@neon.tech>
2023-06-23 15:32:57 -04:00
Alex Chi
b775ca8a58 resolve conflicts
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-20 09:52:28 -04:00
Alex Chi
ddb5862be2 Merge branch 'main' of https://github.com/neondatabase/neon into skyzh/layermap-ref-2 2023-06-20 09:12:15 -04:00
Alex Chi
a2056666ae pgserver: move mapping logic to layer cache
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-14 15:07:38 -04:00
Alex Chi
fc190a2a19 resolve merge conflicts
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:56:50 -04:00
Alex Chi
faee3152f3 refactor: use LayerDesc in LayerMap (part 2)
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:54:59 -04:00
10 changed files with 569 additions and 540 deletions

View File

@@ -1,22 +1,23 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key; use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn; use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion}; use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> { fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::<LayerDescriptor>::default(); let mut layer_map = LayerMap::default();
let mut min_lsn = Lsn(u64::MAX); let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0); let mut max_lsn = Lsn(0);
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start); min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
} }
println!("min: {min_lsn}, max: {max_lsn}"); println!("min: {min_lsn}, max: {max_lsn}");
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
} }
/// Construct a layer map query pattern for benchmarks /// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> { fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right // For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform // before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have // coverage of both the lsn and key space because image layers have
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
// Construct a partitioning for testing get_difficulty map when we // Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with. // don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning { fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new(); let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer, // We add a partition boundary at the start of each image layer,
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 { for i in 0..100_000 {
let i32 = (i as u32) % 100; let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor { let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
key: zero.add(10 * i32)..zero.add(10 * i32 + 1), TenantId::generate(),
lsn: Lsn(i)..Lsn(i + 1), TimelineId::generate(),
is_incremental: false, zero.add(10 * i32)..zero.add(10 * i32 + 1),
short_id: format!("Layer {}", i), Lsn(i),
}; false,
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); 0,
));
updates.insert_historic(layer.layer_desc().clone());
} }
updates.flush(); updates.flush();
println!("Finished layer map init in {:?}", now.elapsed()); println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -85,6 +85,7 @@ pub mod blob_io;
pub mod block_io; pub mod block_io;
pub mod disk_btree; pub mod disk_btree;
pub(crate) mod ephemeral_file; pub(crate) mod ephemeral_file;
pub mod layer_cache;
pub mod layer_map; pub mod layer_map;
pub mod manifest; pub mod manifest;
@@ -590,6 +591,7 @@ impl Tenant {
.layers .layers
.read() .read()
.await .await
.0
.iter_historic_layers() .iter_historic_layers()
.next() .next()
.is_some(), .is_some(),
@@ -1616,7 +1618,7 @@ impl Tenant {
// No timeout here, GC & Compaction should be responsive to the // No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change. // `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()"); info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await; let layer_removal_guard = timeline.layer_cache.delete_guard().await;
info!("got layer_removal_cs.lock(), deleting layer files"); info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled // NB: storage_sync upload tasks that reference these layers have been cancelled

View File

@@ -0,0 +1,146 @@
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
use super::Timeline;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::timeline::compare_arced_layers;
use anyhow::Result;
use std::sync::{Mutex, Weak};
use std::{collections::HashMap, sync::Arc};
/// LayerCache is meant to facilitate mapping to/from whatever `PersistentLayerDesc` to an actual in-memory layer
/// object. In the future, operations that do not modify layer map (i.e., eviction and download) will be implemented
/// here.
pub struct LayerCache {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
/// Will be useful when we move evict / download to layer cache.
#[allow(unused)]
timeline: Weak<Timeline>,
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
}
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
#[derive(Clone)]
pub struct LayerDeletionGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
impl LayerCache {
pub fn new(timeline: Weak<Timeline>) -> Self {
Self {
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
mapping: Mutex::new(HashMap::new()),
timeline,
}
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
let guard = self.mapping.lock().unwrap();
guard.get(&desc.key()).expect("not found").clone()
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard(&self) -> LayerDeletionGuard {
LayerDeletionGuard(Arc::new(
self.layers_removal_lock.clone().lock_owned().await,
))
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within read path.
pub fn replace_and_verify(
&self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
let mut guard = self.mapping.lock().unwrap();
let key = expected.layer_desc().key();
let other = new.layer_desc().key();
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"layermap-replace-notfound"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
/// Called within write path. When compaction and image layer creation we will create new layers.
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
/// Will move logic to delete files here later.
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
}

View File

@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key; use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer; use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage; use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement; pub use historic_layer_coverage::{LayerKey, Replacement};
use super::storage_layer::range_eq; use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc; use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
/// ///
/// LayerMap tracks what layers exist on a timeline. /// LayerMap tracks what layers exist on a timeline.
/// ///
pub struct LayerMap<L: ?Sized> { #[derive(Default)]
pub struct LayerMap {
// //
// 'open_layer' holds the current InMemoryLayer that is accepting new // 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating // records. If it is None, 'next_open_layer_at' will be set instead, indicating
@@ -95,24 +93,6 @@ pub struct LayerMap<L: ?Sized> {
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>, l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
} }
/// The primary update API for the layer map. /// The primary update API for the layer map.
@@ -120,24 +100,21 @@ impl<L: ?Sized> Default for LayerMap<L> {
/// Batching historic layer insertions and removals is good for /// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly. /// performance and this struct helps us do that correctly.
#[must_use] #[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> { pub struct BatchedUpdates<'a> {
// While we hold this exclusive reference to the layer map the type checker // While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates. // will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>, layer_map: &'a mut LayerMap,
} }
/// Provide ability to batch more updates while hiding the read /// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing. /// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L> impl BatchedUpdates<'_> {
where
L: ?Sized + Layer,
{
/// ///
/// Insert an on-disk layer. /// Insert an on-disk layer.
/// ///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc, layer) self.layer_map.insert_historic_noflush(layer_desc)
} }
/// ///
@@ -145,31 +122,8 @@ where
/// ///
/// This should be called when the corresponding file on disk has been deleted. /// This should be called when the corresponding file on disk has been deleted.
/// ///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc, layer) self.layer_map.remove_historic_noflush(layer_desc)
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
} }
// We will flush on drop anyway, but this method makes it // We will flush on drop anyway, but this method makes it
@@ -185,25 +139,19 @@ where
// than panic later or read without flushing. // than panic later or read without flushing.
// //
// TODO maybe warn if flush hasn't explicitly been called // TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L> impl Drop for BatchedUpdates<'_> {
where
L: ?Sized + Layer,
{
fn drop(&mut self) { fn drop(&mut self) {
self.layer_map.flush_updates(); self.layer_map.flush_updates();
} }
} }
/// Return value of LayerMap::search /// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> { pub struct SearchResult {
pub layer: Arc<L>, pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn, pub lsn_floor: Lsn,
} }
impl<L> LayerMap<L> impl LayerMap {
where
L: ?Sized + Layer,
{
/// ///
/// Find the latest layer (by lsn.end) that covers the given /// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'. /// 'key', with lsn.start < 'end_lsn'.
@@ -235,7 +183,7 @@ where
/// NOTE: This only searches the 'historic' layers, *not* the /// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers! /// 'open' and 'frozen' layers!
/// ///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> { pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128());
@@ -244,7 +192,6 @@ where
(None, None) => None, (None, None) => None,
(None, Some(image)) => { (None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start; let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: image, layer: image,
lsn_floor, lsn_floor,
@@ -252,7 +199,6 @@ where
} }
(Some(delta), None) => { (Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start; let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta,
lsn_floor, lsn_floor,
@@ -263,7 +209,6 @@ where
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn; let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match { if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: image, layer: image,
lsn_floor: img_lsn, lsn_floor: img_lsn,
@@ -271,7 +216,6 @@ where
} else { } else {
let lsn_floor = let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta,
lsn_floor, lsn_floor,
@@ -282,7 +226,7 @@ where
} }
/// Start a batch of updates, applied on drop /// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> { pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
BatchedUpdates { layer_map: self } BatchedUpdates { layer_map: self }
} }
@@ -292,48 +236,32 @@ where
/// Helper function for BatchedUpdates::insert_historic /// Helper function for BatchedUpdates::insert_historic
/// ///
/// TODO(chi): remove L generic so that we do not need to pass layer object. /// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush( pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
// TODO: See #3869, resulting #4088, attempted fix and repro #4094 // TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer) { if Self::is_l0(&layer_desc) {
self.l0_delta_layers.push(layer_desc.clone().into()); self.l0_delta_layers.push(layer_desc.clone().into());
} }
self.historic.insert( self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer), historic_layer_coverage::LayerKey::from(&layer_desc),
layer_desc.into(), layer_desc.into(),
); );
} }
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
/// ///
/// Remove an on-disk layer from the map. /// Remove an on-disk layer from the map.
/// ///
/// Helper function for BatchedUpdates::remove_historic /// Helper function for BatchedUpdates::remove_historic
/// ///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
self.historic self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer)); .remove(historic_layer_coverage::LayerKey::from(&layer_desc));
if Self::is_l0(&layer) { let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
let len_before = self.l0_delta_layers.len(); let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| { l0_delta_layers.retain(|other| other.key() != layer_key);
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
self.l0_delta_layers = l0_delta_layers; self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer, // there's a chance that the comparison fails at runtime due to it comparing (pointer,
@@ -344,69 +272,6 @@ where
"failed to locate removed historic layer from l0_delta_layers" "failed to locate removed historic layer from l0_delta_layers"
); );
} }
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
}
pos
} else {
None
};
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
} }
/// Helper function for BatchedUpdates::drop. /// Helper function for BatchedUpdates::drop.
@@ -454,10 +319,8 @@ where
Ok(true) Ok(true)
} }
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> { pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic self.historic.iter()
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
} }
/// ///
@@ -472,7 +335,7 @@ where
&self, &self,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> { ) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) { let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v, Some(v) => v,
None => return Ok(vec![]), None => return Ok(vec![]),
@@ -482,36 +345,26 @@ where
let end = key_range.end.to_i128(); let end = key_range.end.to_i128();
// Initialize loop variables // Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![]; let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
let mut current_key = start; let mut current_key = start;
let mut current_val = version.image_coverage.query(start); let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals // Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) { for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push(( coverage.push((kr, current_val.take()));
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
current_key = change_key; current_key = change_key;
current_val = change_val.clone(); current_val = change_val.clone();
} }
// Add the final interval // Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end); let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push(( coverage.push((kr, current_val.take()));
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
Ok(coverage) Ok(coverage)
} }
pub fn is_l0(layer: &L) -> bool { pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX)) range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
} }
@@ -537,7 +390,7 @@ where
/// TODO The optimal number should probably be slightly higher than 1, but to /// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function /// implement that we need to plumb a lot more context into this function
/// than just the current partition_range. /// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool { pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
// Case 1 // Case 1
if !Self::is_l0(layer) { if !Self::is_l0(layer) {
return true; return true;
@@ -595,9 +448,7 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() { if !kr.is_empty() {
let base_count = let base_count = Self::is_reimage_worthy(&val, key) as usize;
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count); let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?; self.count_deltas(&kr, &lr, new_limit)?;
@@ -620,9 +471,7 @@ where
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() { if !kr.is_empty() {
let base_count = let base_count = Self::is_reimage_worthy(&val, key) as usize;
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count); let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max( max_stacked_deltas = std::cmp::max(
@@ -772,12 +621,8 @@ where
} }
/// Return all L0 delta layers /// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> { pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self Ok(self.l0_delta_layers.to_vec())
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
} }
/// debugging function to print out the contents of the layer map /// debugging function to print out the contents of the layer map
@@ -802,97 +647,48 @@ where
println!("End dump LayerMap"); println!("End dump LayerMap");
Ok(()) Ok(())
} }
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
#[inline(always)]
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{LayerMap, Replacement}; use super::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
mod l0_delta_layers_updated { mod l0_delta_layers_updated {
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use super::*; use super::*;
#[test] #[test]
fn for_full_range_delta() { fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates // l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true true
) )
} }
#[test] #[test]
fn for_non_full_range_delta() { fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta // has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range // because not full range
false false
) )
} }
#[test] #[test]
fn for_image() { fn for_image() {
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must // code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images // mean we should in practice never have full range images
false false
) )
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = LayerDescriptor::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
let mut map = LayerMap::default();
let res = map.batch_update().replace_historic(
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
} }
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
@@ -906,46 +702,31 @@ mod tests {
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the // two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers. // same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded)); assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0)); let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update() map.batch_update()
.insert_historic(remote.get_persistent_layer_desc(), remote.clone()); .insert_historic(remote.layer_desc().clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts); assert_eq!(
count_layer_in(&map, remote.layer_desc()),
let replaced = map expected_in_counts
.batch_update()
.replace_historic(
remote.get_persistent_layer_desc(),
&remote,
downloaded.get_persistent_layer_desc(),
downloaded.clone(),
)
.expect("name derived attributes are the same");
assert!(
matches!(replaced, Replacement::Replaced { .. }),
"{replaced:?}"
); );
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update() map.batch_update()
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); .remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
} }
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) { fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map let historic = map
.iter_historic_layers() .iter_historic_layers()
.filter(|x| LayerMap::compare_arced_layers(x, layer)) .filter(|x| x.key() == layer.key())
.count(); .count();
let l0s = map let l0s = map
.get_level0_deltas() .get_level0_deltas()
.expect("why does this return a result"); .expect("why does this return a result");
let l0 = l0s let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
.iter()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
(historic, l0) (historic, l0)
} }

View File

@@ -3,6 +3,8 @@ use std::ops::Range;
use tracing::info; use tracing::info;
use crate::tenant::storage_layer::PersistentLayerDesc;
use super::layer_coverage::LayerCoverageTuple; use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data. /// Layers in this module are identified and indexed by this data.
@@ -53,6 +55,18 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
} }
} }
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
}
}
}
/// Efficiently queryable layer coverage for each LSN. /// Efficiently queryable layer coverage for each LSN.
/// ///
/// Allows answering layer map queries very efficiently, /// Allows answering layer map queries very efficiently,
@@ -467,6 +481,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
/// ///
/// Returns a `Replacement` value describing the outcome; only the case of /// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild. /// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[cfg(test)]
pub fn replace<F>( pub fn replace<F>(
&mut self, &mut self,
layer_key: &LayerKey, layer_key: &LayerKey,

View File

@@ -176,13 +176,10 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
/// ///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock. /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>( pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &BatchedUpdates<'_, L>, layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus, status: LayerResidenceStatus,
) -> Self ) -> Self {
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event( new.record_residence_event(
layer_map_lock_held_witness, layer_map_lock_held_witness,
@@ -197,14 +194,11 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`. /// The `new_status` is not recorded in `self`.
/// ///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock. /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>( pub(crate) fn clone_for_residence_change(
&self, &self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>, layer_map_lock_held_witness: &BatchedUpdates<'_>,
new_status: LayerResidenceStatus, new_status: LayerResidenceStatus,
) -> LayerAccessStats ) -> LayerAccessStats {
where
L: ?Sized + Layer,
{
let clone = { let clone = {
let inner = self.0.lock().unwrap(); let inner = self.0.lock().unwrap();
inner.clone() inner.clone()
@@ -232,14 +226,12 @@ impl LayerAccessStats {
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
/// ///
pub(crate) fn record_residence_event<L>( pub(crate) fn record_residence_event(
&self, &self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>, _layer_map_lock_held_witness: &BatchedUpdates<'_>,
status: LayerResidenceStatus, status: LayerResidenceStatus,
reason: LayerResidenceEventReason, reason: LayerResidenceEventReason,
) where ) {
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap(); let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| { locked.iter_mut().for_each(|inner| {
inner inner
@@ -473,94 +465,125 @@ pub fn downcast_remote_layer(
} }
} }
/// Holds metadata about a layer without any content. Used mostly for testing. pub mod tests {
/// use super::*;
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
impl LayerDescriptor { /// Holds metadata about a layer without any content. Used mostly for testing.
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, ///
/// and the tenant / timeline id does not matter. /// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { /// LayerDescriptor.
PersistentLayerDesc::new_delta( #[derive(Clone, Debug)]
TenantId::from_array([0; 16]), pub struct LayerDescriptor {
TimelineId::from_array([0; 16]), base: PersistentLayerDesc,
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
} }
fn get_lsn_range(&self) -> Range<Lsn> { impl From<PersistentLayerDesc> for LayerDescriptor {
self.lsn.clone() fn from(base: PersistentLayerDesc) -> Self {
} Self { base }
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
let short_id = value.to_string();
LayerDescriptor {
key: value.key_range,
lsn: value.lsn_range,
is_incremental: true,
short_id,
} }
} }
}
impl From<ImageFileName> for LayerDescriptor { impl Layer for LayerDescriptor {
fn from(value: ImageFileName) -> Self { fn get_value_reconstruct_data(
let short_id = value.to_string(); &self,
let lsn = value.lsn_as_range(); _key: Key,
LayerDescriptor { _lsn_range: Range<Lsn>,
key: value.key_range, _reconstruct_data: &mut ValueReconstructState,
lsn, _ctx: &RequestContext,
is_incremental: false, ) -> Result<ValueReconstructResult> {
short_id, todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
} }
} }
}
impl From<LayerFileName> for LayerDescriptor { impl PersistentLayer for LayerDescriptor {
fn from(value: LayerFileName) -> Self { fn layer_desc(&self) -> &PersistentLayerDesc {
match value { &self.base
LayerFileName::Delta(d) => Self::from(d), }
LayerFileName::Image(i) => Self::from(i),
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
} }
} }
} }

View File

@@ -218,15 +218,12 @@ impl RemoteLayer {
} }
/// Create a Layer struct representing this layer, after it has been downloaded. /// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer<L>( pub fn create_downloaded_layer(
&self, &self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>, layer_map_lock_held_witness: &BatchedUpdates<'_>,
conf: &'static PageServerConf, conf: &'static PageServerConf,
file_size: u64, file_size: u64,
) -> Arc<dyn PersistentLayer> ) -> Arc<dyn PersistentLayer> {
where
L: ?Sized + Layer,
{
if self.desc.is_delta { if self.desc.is_delta {
let fname = self.desc.delta_file_name(); let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new( Arc::new(DeltaLayer::new(

View File

@@ -3,7 +3,7 @@
mod eviction_task; mod eviction_task;
mod walreceiver; mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context}; use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes; use bytes::Bytes;
use fail::fail_point; use fail::fail_point;
use futures::StreamExt; use futures::StreamExt;
@@ -82,10 +82,13 @@ use self::eviction_task::EvictionTaskTimelineState;
use self::walreceiver::{WalReceiver, WalReceiverConf}; use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::config::TenantConf; use super::config::TenantConf;
use super::layer_cache::{LayerCache, LayerDeletionGuard};
use super::layer_map::BatchedUpdates; use super::layer_map::BatchedUpdates;
use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient; use super::remote_timeline_client::RemoteTimelineClient;
use super::storage_layer::{DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset}; use super::storage_layer::{
DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc,
};
#[derive(Debug, PartialEq, Eq, Clone, Copy)] #[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(super) enum FlushLoopState { pub(super) enum FlushLoopState {
@@ -118,8 +121,28 @@ impl PartialOrd for Hole {
} }
} }
pub struct LayerFileManager(());
impl LayerFileManager {
pub(crate) fn new() -> Self {
Self(())
}
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
drop(rlock)
}
pub struct Timeline { pub struct Timeline {
conf: &'static PageServerConf, pub(super) conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>, tenant_conf: Arc<RwLock<TenantConfOpt>>,
myself: Weak<Self>, myself: Weak<Self>,
@@ -129,7 +152,9 @@ pub struct Timeline {
pub pg_version: u32, pub pg_version: u32,
pub(crate) layers: Arc<tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>>, pub(crate) layers: Arc<tokio::sync::RwLock<(LayerMap, LayerFileManager)>>,
pub(super) layer_cache: LayerCache,
/// Set of key ranges which should be covered by image layers to /// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -202,13 +227,6 @@ pub struct Timeline {
/// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel
layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>, layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>,
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub(super) layer_removal_cs: Arc<tokio::sync::Mutex<()>>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected // Needed to ensure that we can't create a branch at a point that was already garbage collected
pub latest_gc_cutoff_lsn: Rcu<Lsn>, pub latest_gc_cutoff_lsn: Rcu<Lsn>,
@@ -599,7 +617,8 @@ impl Timeline {
/// This method makes no distinction between local and remote layers. /// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**. /// Hence, the result **does not represent local filesystem usage**.
pub async fn layer_size_sum(&self) -> u64 { pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await; let guard = self.layers.read().await;
let (layer_map, _) = &*guard;
let mut size = 0; let mut size = 0;
for l in layer_map.iter_historic_layers() { for l in layer_map.iter_historic_layers() {
size += l.file_size(); size += l.file_size();
@@ -819,7 +838,7 @@ impl Timeline {
// Below are functions compact_level0() and create_image_layers() // Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained // but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it. // above. Rewrite it.
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); let layer_removal_cs = self.layer_cache.delete_guard().await;
// Is the timeline being deleted? // Is the timeline being deleted?
if self.is_stopping() { if self.is_stopping() {
return Err(anyhow::anyhow!("timeline is Stopping").into()); return Err(anyhow::anyhow!("timeline is Stopping").into());
@@ -909,7 +928,8 @@ impl Timeline {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> { pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn(); let last_lsn = self.get_last_record_lsn();
let open_layer_size = { let open_layer_size = {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
let Some(open_layer) = layers.open_layer.as_ref() else { let Some(open_layer) = layers.open_layer.as_ref() else {
return Ok(()); return Ok(());
}; };
@@ -1040,7 +1060,8 @@ impl Timeline {
} }
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await; let guard = self.layers.read().await;
let (layer_map, _) = &*guard;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer { if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info()); in_memory_layers.push(open_layer.info());
@@ -1051,6 +1072,7 @@ impl Timeline {
let mut historic_layers = Vec::new(); let mut historic_layers = Vec::new();
for historic_layer in layer_map.iter_historic_layers() { for historic_layer in layer_map.iter_historic_layers() {
let historic_layer = self.layer_cache.get_from_desc(&historic_layer);
historic_layers.push(historic_layer.info(reset)); historic_layers.push(historic_layer.info(reset));
} }
@@ -1148,7 +1170,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?; .context("wait for layer upload ops to complete")?;
// now lock out layer removal (compaction, gc, timeline deletion) // now lock out layer removal (compaction, gc, timeline deletion)
let layer_removal_guard = self.layer_removal_cs.lock().await; let layer_removal_guard = self.layer_cache.delete_guard().await;
{ {
// to avoid racing with detach and delete_timeline // to avoid racing with detach and delete_timeline
@@ -1160,7 +1182,8 @@ impl Timeline {
} }
// start the batch update // start the batch update
let mut layer_map = self.layers.write().await; let mut guard = self.layers.write().await;
let (layer_map, _) = &mut *guard;
let mut batch_updates = layer_map.batch_update(); let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len()); let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1176,7 +1199,7 @@ impl Timeline {
// commit the updates & release locks // commit the updates & release locks
batch_updates.flush(); batch_updates.flush();
drop(layer_map); drop_wlock(guard);
drop(layer_removal_guard); drop(layer_removal_guard);
assert_eq!(results.len(), layers_to_evict.len()); assert_eq!(results.len(), layers_to_evict.len());
@@ -1185,12 +1208,10 @@ impl Timeline {
fn evict_layer_batch_impl( fn evict_layer_batch_impl(
&self, &self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, _layer_removal_cs: &LayerDeletionGuard,
local_layer: &Arc<dyn PersistentLayer>, local_layer: &Arc<dyn PersistentLayer>,
batch_updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, batch_updates: &mut BatchedUpdates<'_>,
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
use super::layer_map::Replacement;
if local_layer.is_remote_layer() { if local_layer.is_remote_layer() {
// TODO(issue #3851): consider returning an err here instead of false, // TODO(issue #3851): consider returning an err here instead of false,
// which is the same out the match later // which is the same out the match later
@@ -1238,13 +1259,13 @@ impl Timeline {
), ),
}); });
let replaced = match batch_updates.replace_historic( assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc());
local_layer.layer_desc().clone(),
local_layer, let succeed = match self
new_remote_layer.layer_desc().clone(), .layer_cache
new_remote_layer, .replace_and_verify(local_layer.clone(), new_remote_layer)
)? { {
Replacement::Replaced { .. } => { Ok(()) => {
if let Err(e) = local_layer.delete_resident_layer_file() { if let Err(e) = local_layer.delete_resident_layer_file() {
error!("failed to remove layer file on evict after replacement: {e:#?}"); error!("failed to remove layer file on evict after replacement: {e:#?}");
} }
@@ -1277,24 +1298,17 @@ impl Timeline {
true true
} }
Replacement::NotFound => { Err(err) => {
debug!(evicted=?local_layer, "layer was no longer in layer map"); if cfg!(debug_assertions) {
false panic!("failed to replace: {err}, evicted: {local_layer:?}");
} } else {
Replacement::RemovalBuffered => { error!(evicted=?local_layer, "failed to replace: {err}");
unreachable!("not doing anything else in this batch") }
}
Replacement::Unexpected(other) => {
error!(
local_layer.ptr=?Arc::as_ptr(local_layer),
other.ptr=?Arc::as_ptr(&other),
?other,
"failed to replace");
false false
} }
}; };
Ok(replaced) Ok(succeed)
} }
} }
@@ -1418,7 +1432,11 @@ impl Timeline {
timeline_id, timeline_id,
tenant_id, tenant_id,
pg_version, pg_version,
layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())), layers: Arc::new(tokio::sync::RwLock::new((
LayerMap::default(),
LayerFileManager::new(),
))),
layer_cache: LayerCache::new(myself.clone()),
wanted_image_layers: Mutex::new(None), wanted_image_layers: Mutex::new(None),
walredo_mgr, walredo_mgr,
@@ -1454,7 +1472,6 @@ impl Timeline {
layer_flush_done_tx, layer_flush_done_tx,
write_lock: tokio::sync::Mutex::new(()), write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo { gc_info: std::sync::RwLock::new(GcInfo {
retain_lsns: Vec::new(), retain_lsns: Vec::new(),
@@ -1602,14 +1619,15 @@ impl Timeline {
let mut layers = self.layers.try_write().expect( let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object", "in the context where we call this function, no other task has access to the object",
); );
layers.next_open_layer_at = Some(Lsn(start_lsn.0)); layers.0.next_open_layer_at = Some(Lsn(start_lsn.0));
} }
/// ///
/// Scan the timeline directory to populate the layer map. /// Scan the timeline directory to populate the layer map.
/// ///
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let mut num_layers = 0; let mut num_layers = 0;
@@ -1652,7 +1670,8 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
self.layer_cache.populate_local_when_init(Arc::new(layer));
num_layers += 1; num_layers += 1;
} else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) {
// Create a DeltaLayer struct for each delta file. // Create a DeltaLayer struct for each delta file.
@@ -1684,7 +1703,8 @@ impl Timeline {
trace!("found layer {}", layer.path().display()); trace!("found layer {}", layer.path().display());
total_physical_size += file_size; total_physical_size += file_size;
updates.insert_historic(layer.layer_desc().clone(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
self.layer_cache.populate_local_when_init(Arc::new(layer));
num_layers += 1; num_layers += 1;
} else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") {
// ignore these // ignore these
@@ -1738,7 +1758,8 @@ impl Timeline {
// We're holding a layer map lock for a while but this // We're holding a layer map lock for a while but this
// method is only called during init so it's fine. // method is only called during init so it's fine.
let mut layer_map = self.layers.write().await; let mut guard = self.layers.write().await;
let (layer_map, _) = &mut *guard;
let mut updates = layer_map.batch_update(); let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers { for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name); let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1783,7 +1804,8 @@ impl Timeline {
anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}");
} else { } else {
self.metrics.resident_physical_size_gauge.sub(local_size); self.metrics.resident_physical_size_gauge.sub(local_size);
updates.remove_historic(local_layer.layer_desc().clone(), local_layer); updates.remove_historic(local_layer.layer_desc().clone());
self.layer_cache.remove_local_when_init(local_layer);
// fall-through to adding the remote layer // fall-through to adding the remote layer
} }
} else { } else {
@@ -1822,7 +1844,8 @@ impl Timeline {
); );
let remote_layer = Arc::new(remote_layer); let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); updates.insert_historic(remote_layer.layer_desc().clone());
self.layer_cache.populate_remote_when_init(remote_layer);
} }
LayerFileName::Delta(deltafilename) => { LayerFileName::Delta(deltafilename) => {
// Create a RemoteLayer for the delta file. // Create a RemoteLayer for the delta file.
@@ -1849,7 +1872,8 @@ impl Timeline {
), ),
); );
let remote_layer = Arc::new(remote_layer); let remote_layer = Arc::new(remote_layer);
updates.insert_historic(remote_layer.layer_desc().clone(), remote_layer); updates.insert_historic(remote_layer.layer_desc().clone());
self.layer_cache.populate_remote_when_init(remote_layer);
} }
} }
} }
@@ -1888,13 +1912,14 @@ impl Timeline {
let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn();
let local_layers = self let local_layers = {
.layers let guard = self.layers.read().await;
.read() let (layers, _) = &*guard;
.await layers
.iter_historic_layers() .iter_historic_layers()
.map(|l| (l.filename(), l)) .map(|l| (l.filename(), self.layer_cache.get_from_desc(&l)))
.collect::<HashMap<_, _>>(); .collect::<HashMap<_, _>>()
};
// If no writes happen, new branches do not have any layers, only the metadata file. // If no writes happen, new branches do not have any layers, only the metadata file.
let has_local_layers = !local_layers.is_empty(); let has_local_layers = !local_layers.is_empty();
@@ -2265,10 +2290,12 @@ impl Timeline {
} }
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> { async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() { let guard = self.layers.read().await;
let (layers, _) = &*guard;
for historic_layer in layers.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name(); let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name { if layer_file_name == historic_layer_name {
return Some(historic_layer); return Some(self.layer_cache.get_from_desc(&historic_layer));
} }
} }
@@ -2280,10 +2307,11 @@ impl Timeline {
fn delete_historic_layer( fn delete_historic_layer(
&self, &self,
// we cannot remove layers otherwise, since gc and compaction will race // we cannot remove layers otherwise, since gc and compaction will race
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>, _layer_removal_cs: LayerDeletionGuard,
layer: Arc<dyn PersistentLayer>, layer: Arc<PersistentLayerDesc>,
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>, updates: &mut BatchedUpdates<'_>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let layer = self.layer_cache.get_from_desc(&layer);
if !layer.is_remote_layer() { if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?; layer.delete_resident_layer_file()?;
let layer_file_size = layer.file_size(); let layer_file_size = layer.file_size();
@@ -2297,7 +2325,8 @@ impl Timeline {
// won't be needed for page reconstruction for this timeline, // won't be needed for page reconstruction for this timeline,
// and mark what we can't delete yet as deleted from the layer // and mark what we can't delete yet as deleted from the layer
// map index without actually rebuilding the index. // map index without actually rebuilding the index.
updates.remove_historic(layer.layer_desc().clone(), layer); updates.remove_historic(layer.layer_desc().clone());
self.layer_cache.delete_layer(layer);
Ok(()) Ok(())
} }
@@ -2482,7 +2511,8 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop #[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop { 'layer_map_search: loop {
let remote_layer = { let remote_layer = {
let layers = timeline.layers.read().await; let guard = timeline.layers.read().await;
let (layers, _) = &*guard;
// Check the open and frozen in-memory layers first, in order from newest // Check the open and frozen in-memory layers first, in order from newest
// to oldest. // to oldest.
@@ -2544,6 +2574,7 @@ impl Timeline {
} }
if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) {
let layer = timeline.layer_cache.get_from_desc(&layer);
// If it's a remote layer, download it and retry. // If it's a remote layer, download it and retry.
if let Some(remote_layer) = if let Some(remote_layer) =
super::storage_layer::downcast_remote_layer(&layer) super::storage_layer::downcast_remote_layer(&layer)
@@ -2665,7 +2696,8 @@ impl Timeline {
/// Get a handle to the latest layer for appending. /// Get a handle to the latest layer for appending.
/// ///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> { async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
ensure!(lsn.is_aligned()); ensure!(lsn.is_aligned());
@@ -2742,7 +2774,8 @@ impl Timeline {
} else { } else {
Some(self.write_lock.lock().await) Some(self.write_lock.lock().await)
}; };
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
if let Some(open_layer) = &layers.open_layer { if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer); let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing? // Does this layer need freezing?
@@ -2756,7 +2789,7 @@ impl Timeline {
layers.next_open_layer_at = Some(end_lsn); layers.next_open_layer_at = Some(end_lsn);
self.last_freeze_at.store(end_lsn); self.last_freeze_at.store(end_lsn);
} }
drop(layers); drop_wlock(guard);
} }
/// Layer flusher task's main loop. /// Layer flusher task's main loop.
@@ -2780,7 +2813,8 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow(); let flush_counter = *layer_flush_start_rx.borrow();
let result = loop { let result = loop {
let layer_to_flush = { let layer_to_flush = {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
layers.frozen_layers.front().cloned() layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes // drop 'layers' lock to allow concurrent reads and writes
}; };
@@ -2903,15 +2937,17 @@ impl Timeline {
fail_point!("flush-frozen-before-sync"); fail_point!("flush-frozen-before-sync");
// The new on-disk layers are now in the layer map. We can remove the // The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now. // in-memory layer from the map now. We do not modify `LayerFileManager` because
// it only contains persistent layers. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{ {
let mut layers = self.layers.write().await; let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front(); let l = layers.0.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this // Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen // timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work. // layer to disk at the same time, that would not work.
assert!(LayerMap::compare_arced_layers(&l.unwrap(), &frozen_layer)); assert!(compare_arced_layers(&l.unwrap(), &frozen_layer));
// release lock on 'layers' // release lock on 'layers'
} }
@@ -3044,14 +3080,16 @@ impl Timeline {
// Add it to the layer map // Add it to the layer map
let l = Arc::new(new_delta); let l = Arc::new(new_delta);
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
let mut batch_updates = layers.batch_update(); let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event( l.access_stats().record_residence_event(
&batch_updates, &batch_updates,
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
batch_updates.insert_historic(l.layer_desc().clone(), l); batch_updates.insert_historic(l.layer_desc().clone());
self.layer_cache.create_new_layer(l);
batch_updates.flush(); batch_updates.flush();
// update metrics // update metrics
@@ -3100,7 +3138,8 @@ impl Timeline {
) -> anyhow::Result<bool> { ) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold(); let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
let mut max_deltas = 0; let mut max_deltas = 0;
{ {
@@ -3278,7 +3317,8 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -3300,10 +3340,11 @@ impl Timeline {
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
updates.insert_historic(l.layer_desc().clone(), l); updates.insert_historic(l.layer_desc().clone());
self.layer_cache.create_new_layer(l);
} }
updates.flush(); updates.flush();
drop(layers); drop_wlock(guard);
timer.stop_and_record(); timer.stop_and_record();
Ok(layer_paths_to_upload) Ok(layer_paths_to_upload)
@@ -3313,7 +3354,7 @@ impl Timeline {
#[derive(Default)] #[derive(Default)]
struct CompactLevel0Phase1Result { struct CompactLevel0Phase1Result {
new_layers: Vec<DeltaLayer>, new_layers: Vec<DeltaLayer>,
deltas_to_compact: Vec<Arc<dyn PersistentLayer>>, deltas_to_compact: Vec<Arc<PersistentLayerDesc>>,
} }
/// Top-level failure to compact. /// Top-level failure to compact.
@@ -3456,22 +3497,22 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
} }
impl Timeline { impl Timeline {
/// Level0 files first phase of compaction, explained in the [`compact_inner`] comment.
///
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
fn compact_level0_phase1( fn compact_level0_phase1(
self: Arc<Self>, self: Arc<Self>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>, _layer_removal_cs: LayerDeletionGuard,
layers: tokio::sync::OwnedRwLockReadGuard<LayerMap<dyn PersistentLayer>>, guard: tokio::sync::OwnedRwLockReadGuard<(LayerMap, LayerFileManager)>,
mut stats: CompactLevel0Phase1StatsBuilder, mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64, target_file_size: u64,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> { ) -> Result<CompactLevel0Phase1Result, CompactionError> {
stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller stats.read_lock_acquisition_micros.till_now(); // set by caller
let mut level0_deltas = layers.get_level0_deltas()?; let (layers, _) = &*guard;
let level0_deltas = layers.get_level0_deltas()?;
let mut level0_deltas = level0_deltas
.into_iter()
.map(|x| self.layer_cache.get_from_desc(&x))
.collect_vec();
stats.level0_deltas_count = Some(level0_deltas.len()); stats.level0_deltas_count = Some(level0_deltas.len());
// Only compact if enough layers have accumulated. // Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold(); let threshold = self.get_compaction_threshold();
@@ -3591,7 +3632,7 @@ impl Timeline {
} }
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_compute_holes_micros =
stats.read_lock_held_prerequisites_micros.till_now(); stats.read_lock_held_prerequisites_micros.till_now();
drop(layers); drop_rlock(guard);
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now(); stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
let mut holes = heap.into_vec(); let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start); holes.sort_unstable_by_key(|hole| hole.key_range.start);
@@ -3818,7 +3859,10 @@ impl Timeline {
Ok(CompactLevel0Phase1Result { Ok(CompactLevel0Phase1Result {
new_layers, new_layers,
deltas_to_compact, deltas_to_compact: deltas_to_compact
.into_iter()
.map(|x| Arc::new(x.layer_desc().clone()))
.collect(),
}) })
} }
@@ -3828,7 +3872,7 @@ impl Timeline {
/// ///
async fn compact_level0( async fn compact_level0(
self: &Arc<Self>, self: &Arc<Self>,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>, layer_removal_cs: LayerDeletionGuard,
target_file_size: u64, target_file_size: u64,
ctx: &RequestContext, ctx: &RequestContext,
) -> Result<(), CompactionError> { ) -> Result<(), CompactionError> {
@@ -3882,7 +3926,8 @@ impl Timeline {
.context("wait for layer upload ops to complete")?; .context("wait for layer upload ops to complete")?;
} }
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
let mut updates = layers.batch_update(); let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers { for l in new_layers {
@@ -3914,7 +3959,8 @@ impl Timeline {
LayerResidenceStatus::Resident, LayerResidenceStatus::Resident,
LayerResidenceEventReason::LayerCreate, LayerResidenceEventReason::LayerCreate,
); );
updates.insert_historic(x.layer_desc().clone(), x); updates.insert_historic(x.layer_desc().clone());
self.layer_cache.create_new_layer(x);
} }
// Now that we have reshuffled the data to set of new delta layers, we can // Now that we have reshuffled the data to set of new delta layers, we can
@@ -3925,7 +3971,7 @@ impl Timeline {
self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?;
} }
updates.flush(); updates.flush();
drop(layers); drop_wlock(guard);
// Also schedule the deletions in remote storage // Also schedule the deletions in remote storage
if let Some(remote_client) = &self.remote_client { if let Some(remote_client) = &self.remote_client {
@@ -4043,7 +4089,7 @@ impl Timeline {
fail_point!("before-timeline-gc"); fail_point!("before-timeline-gc");
let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); let layer_removal_cs = self.layer_cache.delete_guard().await;
// Is the timeline being deleted? // Is the timeline being deleted?
if self.is_stopping() { if self.is_stopping() {
anyhow::bail!("timeline is Stopping"); anyhow::bail!("timeline is Stopping");
@@ -4081,7 +4127,7 @@ impl Timeline {
async fn gc_timeline( async fn gc_timeline(
&self, &self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>, layer_removal_cs: LayerDeletionGuard,
horizon_cutoff: Lsn, horizon_cutoff: Lsn,
pitr_cutoff: Lsn, pitr_cutoff: Lsn,
retain_lsns: Vec<Lsn>, retain_lsns: Vec<Lsn>,
@@ -4142,7 +4188,8 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range // 4. newer on-disk image layers cover the layer's whole key range
// //
// TODO holding a write lock is too agressive and avoidable // TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().await; let mut guard = self.layers.write().await;
let (layers, _) = &mut *guard;
'outer: for l in layers.iter_historic_layers() { 'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1; result.layers_total += 1;
@@ -4221,7 +4268,7 @@ impl Timeline {
// delta layers. Image layers can form "stairs" preventing old image from been deleted. // delta layers. Image layers can form "stairs" preventing old image from been deleted.
// But image layers are in any case less sparse than delta layers. Also we need some // But image layers are in any case less sparse than delta layers. Also we need some
// protection from replacing recent image layers with new one after each GC iteration. // protection from replacing recent image layers with new one after each GC iteration.
if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&*l) { if self.get_gc_feedback() && l.is_incremental() && !LayerMap::is_l0(&l) {
wanted_image_layers.add_range(l.get_key_range()); wanted_image_layers.add_range(l.get_key_range());
} }
result.layers_not_updated += 1; result.layers_not_updated += 1;
@@ -4442,42 +4489,16 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding // Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map. // Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().await; let mut guard = self_clone.layers.write().await;
let mut updates = layers.batch_update(); let (layers, _) = &mut *guard;
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); let updates = layers.batch_update();
let new_layer =
remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{ {
use crate::tenant::layer_map::Replacement;
let l: Arc<dyn PersistentLayer> = remote_layer.clone(); let l: Arc<dyn PersistentLayer> = remote_layer.clone();
let failure = match updates.replace_historic(l.layer_desc().clone(), &l, new_layer.layer_desc().clone(), new_layer) { let failure = match self_clone.layer_cache.replace_and_verify(l, new_layer)
Ok(Replacement::Replaced { .. }) => false, {
Ok(Replacement::NotFound) => { Ok(()) => false,
// TODO: the downloaded file should probably be removed, otherwise
// it will be added to the layermap on next load? we should
// probably restart any get_reconstruct_data search as well.
//
// See: https://github.com/neondatabase/neon/issues/3533
error!("replacing downloaded layer into layermap failed because layer was not found");
true
}
Ok(Replacement::RemovalBuffered) => {
unreachable!("current implementation does not remove anything")
}
Ok(Replacement::Unexpected(other)) => {
// if the other layer would have the same pointer value as
// expected, it means they differ only on vtables.
//
// otherwise there's no known reason for this to happen as
// compacted layers should have different covering rectangle
// leading to produce Replacement::NotFound.
error!(
expected.ptr = ?Arc::as_ptr(&l),
other.ptr = ?Arc::as_ptr(&other),
?other,
"replacing downloaded layer into layermap failed because another layer was found instead of expected"
);
true
}
Err(e) => { Err(e) => {
// this is a precondition failure, the layer filename derived // this is a precondition failure, the layer filename derived
// attributes didn't match up, which doesn't seem likely. // attributes didn't match up, which doesn't seem likely.
@@ -4505,7 +4526,7 @@ impl Timeline {
} }
} }
updates.flush(); updates.flush();
drop(layers); drop_wlock(guard);
info!("on-demand download successful"); info!("on-demand download successful");
@@ -4516,7 +4537,10 @@ impl Timeline {
remote_layer.ongoing_download.close(); remote_layer.ongoing_download.close();
} else { } else {
// Keep semaphore open. We'll drop the permit at the end of the function. // Keep semaphore open. We'll drop the permit at the end of the function.
error!("layer file download failed: {:?}", result.as_ref().unwrap_err()); error!(
"layer file download failed: {:?}",
result.as_ref().unwrap_err()
);
} }
// Don't treat it as an error if the task that triggered the download // Don't treat it as an error if the task that triggered the download
@@ -4530,7 +4554,8 @@ impl Timeline {
drop(permit); drop(permit);
Ok(()) Ok(())
}.in_current_span(), }
.in_current_span(),
); );
receiver.await.context("download task cancelled")? receiver.await.context("download task cancelled")?
@@ -4600,9 +4625,11 @@ impl Timeline {
) { ) {
let mut downloads = Vec::new(); let mut downloads = Vec::new();
{ {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
layers layers
.iter_historic_layers() .iter_historic_layers()
.map(|l| self.layer_cache.get_from_desc(&l))
.filter_map(|l| l.downcast_remote_layer()) .filter_map(|l| l.downcast_remote_layer())
.map(|l| self.download_remote_layer(l)) .map(|l| self.download_remote_layer(l))
.for_each(|dl| downloads.push(dl)) .for_each(|dl| downloads.push(dl))
@@ -4703,7 +4730,8 @@ impl LocalLayerInfoForDiskUsageEviction {
impl Timeline { impl Timeline {
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
let mut max_layer_size: Option<u64> = None; let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new(); let mut resident_layers = Vec::new();
@@ -4712,6 +4740,8 @@ impl Timeline {
let file_size = l.file_size(); let file_size = l.file_size();
max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size)));
let l = self.layer_cache.get_from_desc(&l);
if l.is_remote_layer() { if l.is_remote_layer() {
continue; continue;
} }
@@ -4870,3 +4900,31 @@ pub(crate) fn debug_assert_current_span_has_tenant_and_timeline_id() {
), ),
} }
} }
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise.
///
/// If comparing persistent layers, ALWAYS compare the layer descriptor key.
#[inline(always)]
pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// "dyn Trait" objects are "fat pointers" in that they have two components:
// - pointer to the object
// - pointer to the vtable
//
// rust does not provide a guarantee that these vtables are unique, but however
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// pointer and the vtable need to be equal.
//
// See: https://github.com/rust-lang/rust/issues/103763
//
// A future version of rust will most likely use this form below, where we cast each
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// not affect the comparison.
//
// See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right
}

View File

@@ -197,9 +197,11 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction. // We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this. // So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = { let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().await; let guard = self.layers.read().await;
let (layers, _) = &*guard;
let mut candidates = Vec::new(); let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() { for hist_layer in layers.iter_historic_layers() {
let hist_layer = self.layer_cache.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() { if hist_layer.is_remote_layer() {
continue; continue;
} }

View File

@@ -713,9 +713,7 @@ def test_ondemand_download_failure_to_replace(
# error message is not useful # error message is not useful
pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2) pageserver_http.timeline_detail(tenant_id, timeline_id, True, timeout=2)
actual_message = ( actual_message = ".* ERROR .*layermap-replace-notfound"
".* ERROR .*replacing downloaded layer into layermap failed because layer was not found"
)
assert env.pageserver.log_contains(actual_message) is not None assert env.pageserver.log_contains(actual_message) is not None
env.pageserver.allowed_errors.append(actual_message) env.pageserver.allowed_errors.append(actual_message)