Compare commits

...

7 Commits

Author SHA1 Message Date
Alex Chi
1863ae799d fix blocking read
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-21 10:17:42 -04:00
Alex Chi
20fe57d93b refactor: use immutable storage state in timeline
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-14 16:03:41 -04:00
Alex Chi
0fad5e21ce Merge remote-tracking branch 'origin/skyzh/layermap-imm' into skyzh/immutable-storage 2023-06-14 15:29:59 -04:00
Alex Chi
a2056666ae pgserver: move mapping logic to layer cache
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-14 15:07:38 -04:00
Alex Chi
a3909e03f8 pgserver: add immutable layer map manager
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 16:25:27 -04:00
Alex Chi
fc190a2a19 resolve merge conflicts
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:56:50 -04:00
Alex Chi
faee3152f3 refactor: use LayerDesc in LayerMap (part 2)
Signed-off-by: Alex Chi <iskyzh@gmail.com>
2023-06-13 13:54:59 -04:00
15 changed files with 1020 additions and 811 deletions

7
Cargo.lock generated
View File

@@ -110,6 +110,12 @@ dependencies = [
"backtrace", "backtrace",
] ]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]] [[package]]
name = "archery" name = "archery"
version = "0.5.0" version = "0.5.0"
@@ -2542,6 +2548,7 @@ name = "pageserver"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"arc-swap",
"async-stream", "async-stream",
"async-trait", "async-trait",
"byteorder", "byteorder",

View File

@@ -32,6 +32,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project ## All dependency versions, used in the project
[workspace.dependencies] [workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] } anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-stream = "0.3" async-stream = "0.3"
async-trait = "0.1" async-trait = "0.1"
atty = "0.2.14" atty = "0.2.14"

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies] [dependencies]
anyhow.workspace = true anyhow.workspace = true
arc-swap.workspace = true
async-stream.workspace = true async-stream.workspace = true
async-trait.workspace = true async-trait.workspace = true
byteorder.workspace = true byteorder.workspace = true

View File

@@ -1,22 +1,23 @@
use pageserver::keyspace::{KeyPartitioning, KeySpace}; use pageserver::keyspace::{KeyPartitioning, KeySpace};
use pageserver::repository::Key; use pageserver::repository::Key;
use pageserver::tenant::layer_map::LayerMap; use pageserver::tenant::layer_map::LayerMap;
use pageserver::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use pageserver::tenant::storage_layer::{tests::LayerDescriptor, Layer, LayerFileName};
use pageserver::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use rand::prelude::{SeedableRng, SliceRandom, StdRng}; use rand::prelude::{SeedableRng, SliceRandom, StdRng};
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::fs::File; use std::fs::File;
use std::io::{BufRead, BufReader}; use std::io::{BufRead, BufReader};
use std::path::PathBuf; use std::path::PathBuf;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn; use utils::lsn::Lsn;
use criterion::{black_box, criterion_group, criterion_main, Criterion}; use criterion::{black_box, criterion_group, criterion_main, Criterion};
fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> { fn build_layer_map(filename_dump: PathBuf) -> LayerMap {
let mut layer_map = LayerMap::<LayerDescriptor>::default(); let mut layer_map = LayerMap::default();
let mut min_lsn = Lsn(u64::MAX); let mut min_lsn = Lsn(u64::MAX);
let mut max_lsn = Lsn(0); let mut max_lsn = Lsn(0);
@@ -33,7 +34,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
min_lsn = min(min_lsn, lsn_range.start); min_lsn = min(min_lsn, lsn_range.start);
max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1)); max_lsn = max(max_lsn, Lsn(lsn_range.end.0 - 1));
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); updates.insert_historic(layer.layer_desc().clone());
} }
println!("min: {min_lsn}, max: {max_lsn}"); println!("min: {min_lsn}, max: {max_lsn}");
@@ -43,7 +44,7 @@ fn build_layer_map(filename_dump: PathBuf) -> LayerMap<LayerDescriptor> {
} }
/// Construct a layer map query pattern for benchmarks /// Construct a layer map query pattern for benchmarks
fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn)> { fn uniform_query_pattern(layer_map: &LayerMap) -> Vec<(Key, Lsn)> {
// For each image layer we query one of the pages contained, at LSN right // For each image layer we query one of the pages contained, at LSN right
// before the image layer was created. This gives us a somewhat uniform // before the image layer was created. This gives us a somewhat uniform
// coverage of both the lsn and key space because image layers have // coverage of both the lsn and key space because image layers have
@@ -69,7 +70,7 @@ fn uniform_query_pattern(layer_map: &LayerMap<LayerDescriptor>) -> Vec<(Key, Lsn
// Construct a partitioning for testing get_difficulty map when we // Construct a partitioning for testing get_difficulty map when we
// don't have an exact result of `collect_keyspace` to work with. // don't have an exact result of `collect_keyspace` to work with.
fn uniform_key_partitioning(layer_map: &LayerMap<LayerDescriptor>, _lsn: Lsn) -> KeyPartitioning { fn uniform_key_partitioning(layer_map: &LayerMap, _lsn: Lsn) -> KeyPartitioning {
let mut parts = Vec::new(); let mut parts = Vec::new();
// We add a partition boundary at the start of each image layer, // We add a partition boundary at the start of each image layer,
@@ -209,13 +210,15 @@ fn bench_sequential(c: &mut Criterion) {
for i in 0..100_000 { for i in 0..100_000 {
let i32 = (i as u32) % 100; let i32 = (i as u32) % 100;
let zero = Key::from_hex("000000000000000000000000000000000000").unwrap(); let zero = Key::from_hex("000000000000000000000000000000000000").unwrap();
let layer = LayerDescriptor { let layer = LayerDescriptor::from(PersistentLayerDesc::new_img(
key: zero.add(10 * i32)..zero.add(10 * i32 + 1), TenantId::generate(),
lsn: Lsn(i)..Lsn(i + 1), TimelineId::generate(),
is_incremental: false, zero.add(10 * i32)..zero.add(10 * i32 + 1),
short_id: format!("Layer {}", i), Lsn(i),
}; false,
updates.insert_historic(layer.get_persistent_layer_desc(), Arc::new(layer)); 0,
));
updates.insert_historic(layer.layer_desc().clone());
} }
updates.flush(); updates.flush();
println!("Finished layer map init in {:?}", now.elapsed()); println!("Finished layer map init in {:?}", now.elapsed());

View File

@@ -489,7 +489,9 @@ impl PageServerHandler {
// Create empty timeline // Create empty timeline
info!("creating new timeline"); info!("creating new timeline");
let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?; let tenant = get_active_tenant_with_timeout(tenant_id, &ctx).await?;
let timeline = tenant.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)?; let timeline = tenant
.create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx)
.await?;
// TODO mark timeline as not ready until it reaches end_lsn. // TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute // We might have some wal to import as well, and we should prevent compute

View File

@@ -85,7 +85,9 @@ pub mod blob_io;
pub mod block_io; pub mod block_io;
pub mod disk_btree; pub mod disk_btree;
pub(crate) mod ephemeral_file; pub(crate) mod ephemeral_file;
pub mod layer_cache;
pub mod layer_map; pub mod layer_map;
pub mod layer_map_mgr;
pub mod manifest; pub mod manifest;
pub mod metadata; pub mod metadata;
@@ -555,16 +557,10 @@ impl Tenant {
.context("failed to reconcile with remote")? .context("failed to reconcile with remote")?
} }
let layers = timeline.layer_mgr.read();
// Sanity check: a timeline should have some content. // Sanity check: a timeline should have some content.
anyhow::ensure!( anyhow::ensure!(
ancestor.is_some() ancestor.is_some() || layers.iter_historic_layers().next().is_some(),
|| timeline
.layers
.read()
.await
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files" "Timeline has no ancestor and no layer files"
); );
@@ -1245,7 +1241,7 @@ impl Tenant {
/// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the /// For tests, use `DatadirModification::init_empty_test_timeline` + `commit` to setup the
/// minimum amount of keys required to get a writable timeline. /// minimum amount of keys required to get a writable timeline.
/// (Without it, `put` might fail due to `repartition` failing.) /// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline( pub async fn create_empty_timeline(
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
@@ -1257,9 +1253,11 @@ impl Tenant {
"Cannot create empty timelines on inactive tenant" "Cannot create empty timelines on inactive tenant"
); );
let timelines = self.timelines.lock().unwrap(); let timeline_uninit_mark = {
let timeline_uninit_mark = self.create_timeline_uninit_mark(new_timeline_id, &timelines)?; let timelines: MutexGuard<'_, HashMap<TimelineId, Arc<Timeline>>> =
drop(timelines); self.timelines.lock().unwrap();
self.create_timeline_uninit_mark(new_timeline_id, &timelines)?
};
let new_metadata = TimelineMetadata::new( let new_metadata = TimelineMetadata::new(
// Initialize disk_consistent LSN to 0, The caller must import some data to // Initialize disk_consistent LSN to 0, The caller must import some data to
@@ -1279,6 +1277,7 @@ impl Tenant {
initdb_lsn, initdb_lsn,
None, None,
) )
.await
} }
/// Helper for unit tests to create an emtpy timeline. /// Helper for unit tests to create an emtpy timeline.
@@ -1294,7 +1293,9 @@ impl Tenant {
pg_version: u32, pg_version: u32,
ctx: &RequestContext, ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> { ) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?; let uninit_tl = self
.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
.await?;
let tline = uninit_tl.raw_timeline().expect("we just created it"); let tline = uninit_tl.raw_timeline().expect("we just created it");
assert_eq!(tline.get_last_record_lsn(), Lsn(0)); assert_eq!(tline.get_last_record_lsn(), Lsn(0));
@@ -1558,7 +1559,7 @@ impl Tenant {
// No timeout here, GC & Compaction should be responsive to the // No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change. // `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()"); info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await; let layer_removal_guard = timeline.lcache.delete_guard().await;
info!("got layer_removal_cs.lock(), deleting layer files"); info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled // NB: storage_sync upload tasks that reference these layers have been cancelled
@@ -2754,13 +2755,15 @@ impl Tenant {
src_timeline.pg_version, src_timeline.pg_version,
); );
let uninitialized_timeline = self.prepare_new_timeline( let uninitialized_timeline = self
dst_id, .prepare_new_timeline(
&metadata, dst_id,
timeline_uninit_mark, &metadata,
start_lsn + 1, timeline_uninit_mark,
Some(Arc::clone(src_timeline)), start_lsn + 1,
)?; Some(Arc::clone(src_timeline)),
)
.await?;
let new_timeline = uninitialized_timeline.finish_creation()?; let new_timeline = uninitialized_timeline.finish_creation()?;
@@ -2838,13 +2841,15 @@ impl Tenant {
pgdata_lsn, pgdata_lsn,
pg_version, pg_version,
); );
let raw_timeline = self.prepare_new_timeline( let raw_timeline = self
timeline_id, .prepare_new_timeline(
&new_metadata, timeline_id,
timeline_uninit_mark, &new_metadata,
pgdata_lsn, timeline_uninit_mark,
None, pgdata_lsn,
)?; None,
)
.await?;
let tenant_id = raw_timeline.owning_tenant.tenant_id; let tenant_id = raw_timeline.owning_tenant.tenant_id;
let unfinished_timeline = raw_timeline.raw_timeline()?; let unfinished_timeline = raw_timeline.raw_timeline()?;
@@ -2897,7 +2902,7 @@ impl Tenant {
/// at 'disk_consistent_lsn'. After any initial data has been imported, call /// at 'disk_consistent_lsn'. After any initial data has been imported, call
/// `finish_creation` to insert the Timeline into the timelines map and to remove the /// `finish_creation` to insert the Timeline into the timelines map and to remove the
/// uninit mark file. /// uninit mark file.
fn prepare_new_timeline( async fn prepare_new_timeline(
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
new_metadata: &TimelineMetadata, new_metadata: &TimelineMetadata,
@@ -2924,7 +2929,7 @@ impl Tenant {
.create_timeline_struct(new_timeline_id, new_metadata, ancestor, remote_client, None) .create_timeline_struct(new_timeline_id, new_metadata, ancestor, remote_client, None)
.context("Failed to create timeline data structure")?; .context("Failed to create timeline data structure")?;
timeline_struct.init_empty_layer_map(start_lsn); timeline_struct.init_empty_layer_map(start_lsn).await?;
if let Err(e) = if let Err(e) =
self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, new_metadata) self.create_timeline_files(&uninit_mark.timeline_path, new_timeline_id, new_metadata)
@@ -3621,7 +3626,10 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?; .await?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) { match tenant
.create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await
{
Ok(_) => panic!("duplicate timeline creation should fail"), Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!( Err(e) => assert_eq!(
e.to_string(), e.to_string(),
@@ -4421,8 +4429,9 @@ mod tests {
.await; .await;
let initdb_lsn = Lsn(0x20); let initdb_lsn = Lsn(0x20);
let utline = let utline = tenant
tenant.create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)?; .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx)
.await?;
let tline = utline.raw_timeline().unwrap(); let tline = utline.raw_timeline().unwrap();
// Spawn flush loop now so that we can set the `expect_initdb_optimization` // Spawn flush loop now so that we can set the `expect_initdb_optimization`

View File

@@ -0,0 +1,143 @@
use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer};
use super::Timeline;
use crate::tenant::layer_map::{self, LayerMap};
use anyhow::Result;
use std::sync::{Mutex, Weak};
use std::{collections::HashMap, sync::Arc};
pub struct LayerCache {
/// Layer removal lock.
/// A lock to ensure that no layer of the timeline is removed concurrently by other tasks.
/// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`],
/// and [`Tenant::delete_timeline`]. This is an `Arc<Mutex>` lock because we need an owned
/// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`).
pub layers_removal_lock: Arc<tokio::sync::Mutex<()>>,
/// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use.
/// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock.
pub layers_operation_lock: Arc<tokio::sync::RwLock<()>>,
/// Will be useful when we move evict / download to layer cache.
#[allow(unused)]
timeline: Weak<Timeline>,
mapping: Mutex<HashMap<PersistentLayerKey, Arc<dyn PersistentLayer>>>,
}
pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>);
pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>);
#[derive(Clone)]
pub struct DeleteGuard(Arc<tokio::sync::OwnedMutexGuard<()>>);
impl LayerCache {
pub fn new(timeline: Weak<Timeline>) -> Self {
Self {
layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())),
layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())),
mapping: Mutex::new(HashMap::new()),
timeline,
}
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
let guard = self.mapping.lock().unwrap();
guard.get(&desc.key()).expect("not found").clone()
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_write(&self) -> LayerInUseWrite {
LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await)
}
/// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure
/// we won't delete files that are being read.
pub async fn layer_in_use_read(&self) -> LayerInUseRead {
LayerInUseRead(self.layers_operation_lock.clone().read_owned().await)
}
/// Ensures only one of compaction / gc can happen at a time.
pub async fn delete_guard(&self) -> DeleteGuard {
DeleteGuard(Arc::new(
self.layers_removal_lock.clone().lock_owned().await,
))
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn remove_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_remote_when_init(&self, layer: Arc<RemoteLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Should only be called when initializing the timeline. Bypass checks and layer operation lock.
pub fn populate_local_when_init(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within read path.
pub fn replace_and_verify(
&self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
let mut guard = self.mapping.lock().unwrap();
use super::layer_map::LayerKey;
let key = LayerKey::from(&*expected);
let other = LayerKey::from(&*new);
let expected_l0 = LayerMap::is_l0(expected.layer_desc());
let new_l0 = LayerMap::is_l0(new.layer_desc());
fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
));
anyhow::ensure!(
key == other,
"replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) {
anyhow::ensure!(
layer_map::compare_arced_layers(&expected, layer),
"replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
);
*layer = new;
Ok(())
} else {
anyhow::bail!(
"replacing downloaded layer into layermap failed because layer was not found"
);
}
}
/// Called within write path. When compaction and image layer creation we will create new layers.
pub fn create_new_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.insert(layer.layer_desc().key(), layer);
}
/// Called within write path. When GC and compaction we will remove layers and delete them on disk.
/// Will move logic to delete files here later.
pub fn delete_layer(&self, layer: Arc<dyn PersistentLayer>) {
let mut guard = self.mapping.lock().unwrap();
guard.remove(&layer.layer_desc().key());
}
}

View File

@@ -51,25 +51,23 @@ use crate::keyspace::KeyPartitioning;
use crate::repository::Key; use crate::repository::Key;
use crate::tenant::storage_layer::InMemoryLayer; use crate::tenant::storage_layer::InMemoryLayer;
use crate::tenant::storage_layer::Layer; use crate::tenant::storage_layer::Layer;
use anyhow::Context;
use anyhow::Result; use anyhow::Result;
use std::collections::HashMap;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::ops::Range; use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use utils::lsn::Lsn; use utils::lsn::Lsn;
use historic_layer_coverage::BufferedHistoricLayerCoverage; use historic_layer_coverage::BufferedHistoricLayerCoverage;
pub use historic_layer_coverage::Replacement; pub use historic_layer_coverage::{LayerKey, Replacement};
use super::storage_layer::range_eq; use super::storage_layer::range_eq;
use super::storage_layer::PersistentLayerDesc; use super::storage_layer::PersistentLayerDesc;
use super::storage_layer::PersistentLayerKey;
/// ///
/// LayerMap tracks what layers exist on a timeline. /// LayerMap tracks what layers exist on a timeline.
/// ///
pub struct LayerMap<L: ?Sized> { #[derive(Default, Clone)]
pub struct LayerMap {
// //
// 'open_layer' holds the current InMemoryLayer that is accepting new // 'open_layer' holds the current InMemoryLayer that is accepting new
// records. If it is None, 'next_open_layer_at' will be set instead, indicating // records. If it is None, 'next_open_layer_at' will be set instead, indicating
@@ -95,24 +93,6 @@ pub struct LayerMap<L: ?Sized> {
/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient. /// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree. /// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>, l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,
/// Mapping from persistent layer key to the actual layer object. Currently, it stores delta, image, and
/// remote layers. In future refactors, this will be eventually moved out of LayerMap into Timeline, and
/// RemoteLayer will be removed.
mapping: HashMap<PersistentLayerKey, Arc<L>>,
}
impl<L: ?Sized> Default for LayerMap<L> {
fn default() -> Self {
Self {
open_layer: None,
next_open_layer_at: None,
frozen_layers: VecDeque::default(),
l0_delta_layers: Vec::default(),
historic: BufferedHistoricLayerCoverage::default(),
mapping: HashMap::default(),
}
}
} }
/// The primary update API for the layer map. /// The primary update API for the layer map.
@@ -120,24 +100,21 @@ impl<L: ?Sized> Default for LayerMap<L> {
/// Batching historic layer insertions and removals is good for /// Batching historic layer insertions and removals is good for
/// performance and this struct helps us do that correctly. /// performance and this struct helps us do that correctly.
#[must_use] #[must_use]
pub struct BatchedUpdates<'a, L: ?Sized + Layer> { pub struct BatchedUpdates<'a> {
// While we hold this exclusive reference to the layer map the type checker // While we hold this exclusive reference to the layer map the type checker
// will prevent us from accidentally reading any unflushed updates. // will prevent us from accidentally reading any unflushed updates.
layer_map: &'a mut LayerMap<L>, layer_map: &'a mut LayerMap,
} }
/// Provide ability to batch more updates while hiding the read /// Provide ability to batch more updates while hiding the read
/// API so we don't accidentally read without flushing. /// API so we don't accidentally read without flushing.
impl<L> BatchedUpdates<'_, L> impl BatchedUpdates<'_> {
where
L: ?Sized + Layer,
{
/// ///
/// Insert an on-disk layer. /// Insert an on-disk layer.
/// ///
// TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap` // TODO remove the `layer` argument when `mapping` is refactored out of `LayerMap`
pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn insert_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.insert_historic_noflush(layer_desc, layer) self.layer_map.insert_historic_noflush(layer_desc)
} }
/// ///
@@ -145,31 +122,8 @@ where
/// ///
/// This should be called when the corresponding file on disk has been deleted. /// This should be called when the corresponding file on disk has been deleted.
/// ///
pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) {
self.layer_map.remove_historic_noflush(layer_desc, layer) self.layer_map.remove_historic_noflush(layer_desc)
}
/// Replaces existing layer iff it is the `expected`.
///
/// If the expected layer has been removed it will not be inserted by this function.
///
/// Returned `Replacement` describes succeeding in replacement or the reason why it could not
/// be done.
///
/// TODO replacement can be done without buffering and rebuilding layer map updates.
/// One way to do that is to add a layer of indirection for returned values, so
/// that we can replace values only by updating a hashmap.
pub fn replace_historic(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
fail::fail_point!("layermap-replace-notfound", |_| Ok(Replacement::NotFound));
self.layer_map
.replace_historic_noflush(expected_desc, expected, new_desc, new)
} }
// We will flush on drop anyway, but this method makes it // We will flush on drop anyway, but this method makes it
@@ -185,25 +139,19 @@ where
// than panic later or read without flushing. // than panic later or read without flushing.
// //
// TODO maybe warn if flush hasn't explicitly been called // TODO maybe warn if flush hasn't explicitly been called
impl<L> Drop for BatchedUpdates<'_, L> impl Drop for BatchedUpdates<'_> {
where
L: ?Sized + Layer,
{
fn drop(&mut self) { fn drop(&mut self) {
self.layer_map.flush_updates(); self.layer_map.flush_updates();
} }
} }
/// Return value of LayerMap::search /// Return value of LayerMap::search
pub struct SearchResult<L: ?Sized> { pub struct SearchResult {
pub layer: Arc<L>, pub layer: Arc<PersistentLayerDesc>,
pub lsn_floor: Lsn, pub lsn_floor: Lsn,
} }
impl<L> LayerMap<L> impl LayerMap {
where
L: ?Sized + Layer,
{
/// ///
/// Find the latest layer (by lsn.end) that covers the given /// Find the latest layer (by lsn.end) that covers the given
/// 'key', with lsn.start < 'end_lsn'. /// 'key', with lsn.start < 'end_lsn'.
@@ -235,7 +183,7 @@ where
/// NOTE: This only searches the 'historic' layers, *not* the /// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers! /// 'open' and 'frozen' layers!
/// ///
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult<L>> { pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?; let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128()); let latest_delta = version.delta_coverage.query(key.to_i128());
let latest_image = version.image_coverage.query(key.to_i128()); let latest_image = version.image_coverage.query(key.to_i128());
@@ -244,7 +192,6 @@ where
(None, None) => None, (None, None) => None,
(None, Some(image)) => { (None, Some(image)) => {
let lsn_floor = image.get_lsn_range().start; let lsn_floor = image.get_lsn_range().start;
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: image, layer: image,
lsn_floor, lsn_floor,
@@ -252,7 +199,6 @@ where
} }
(Some(delta), None) => { (Some(delta), None) => {
let lsn_floor = delta.get_lsn_range().start; let lsn_floor = delta.get_lsn_range().start;
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta,
lsn_floor, lsn_floor,
@@ -263,7 +209,6 @@ where
let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end; let image_is_newer = image.get_lsn_range().end >= delta.get_lsn_range().end;
let image_exact_match = img_lsn + 1 == end_lsn; let image_exact_match = img_lsn + 1 == end_lsn;
if image_is_newer || image_exact_match { if image_is_newer || image_exact_match {
let image = self.get_layer_from_mapping(&image.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: image, layer: image,
lsn_floor: img_lsn, lsn_floor: img_lsn,
@@ -271,7 +216,6 @@ where
} else { } else {
let lsn_floor = let lsn_floor =
std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1); std::cmp::max(delta.get_lsn_range().start, image.get_lsn_range().start + 1);
let delta = self.get_layer_from_mapping(&delta.key()).clone();
Some(SearchResult { Some(SearchResult {
layer: delta, layer: delta,
lsn_floor, lsn_floor,
@@ -282,7 +226,7 @@ where
} }
/// Start a batch of updates, applied on drop /// Start a batch of updates, applied on drop
pub fn batch_update(&mut self) -> BatchedUpdates<'_, L> { pub fn batch_update(&mut self) -> BatchedUpdates<'_> {
BatchedUpdates { layer_map: self } BatchedUpdates { layer_map: self }
} }
@@ -292,48 +236,32 @@ where
/// Helper function for BatchedUpdates::insert_historic /// Helper function for BatchedUpdates::insert_historic
/// ///
/// TODO(chi): remove L generic so that we do not need to pass layer object. /// TODO(chi): remove L generic so that we do not need to pass layer object.
pub(self) fn insert_historic_noflush( pub(self) fn insert_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
&mut self,
layer_desc: PersistentLayerDesc,
layer: Arc<L>,
) {
self.mapping.insert(layer_desc.key(), layer.clone());
// TODO: See #3869, resulting #4088, attempted fix and repro #4094 // TODO: See #3869, resulting #4088, attempted fix and repro #4094
if Self::is_l0(&layer) { if Self::is_l0(&layer_desc) {
self.l0_delta_layers.push(layer_desc.clone().into()); self.l0_delta_layers.push(layer_desc.clone().into());
} }
self.historic.insert( self.historic.insert(
historic_layer_coverage::LayerKey::from(&*layer), historic_layer_coverage::LayerKey::from(&layer_desc),
layer_desc.into(), layer_desc.into(),
); );
} }
fn get_layer_from_mapping(&self, key: &PersistentLayerKey) -> &Arc<L> {
let layer = self
.mapping
.get(key)
.with_context(|| format!("{key:?}"))
.expect("inconsistent layer mapping");
layer
}
/// ///
/// Remove an on-disk layer from the map. /// Remove an on-disk layer from the map.
/// ///
/// Helper function for BatchedUpdates::remove_historic /// Helper function for BatchedUpdates::remove_historic
/// ///
pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc, layer: Arc<L>) { pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) {
self.historic self.historic
.remove(historic_layer_coverage::LayerKey::from(&*layer)); .remove(historic_layer_coverage::LayerKey::from(&layer_desc));
if Self::is_l0(&layer) { let layer_key = layer_desc.key();
if Self::is_l0(&layer_desc) {
let len_before = self.l0_delta_layers.len(); let len_before = self.l0_delta_layers.len();
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| { l0_delta_layers.retain(|other| other.key() != layer_key);
!Self::compare_arced_layers(self.get_layer_from_mapping(&other.key()), &layer)
});
self.l0_delta_layers = l0_delta_layers; self.l0_delta_layers = l0_delta_layers;
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers, // this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer, // there's a chance that the comparison fails at runtime due to it comparing (pointer,
@@ -344,69 +272,6 @@ where
"failed to locate removed historic layer from l0_delta_layers" "failed to locate removed historic layer from l0_delta_layers"
); );
} }
self.mapping.remove(&layer_desc.key());
}
pub(self) fn replace_historic_noflush(
&mut self,
expected_desc: PersistentLayerDesc,
expected: &Arc<L>,
new_desc: PersistentLayerDesc,
new: Arc<L>,
) -> anyhow::Result<Replacement<Arc<L>>> {
let key = historic_layer_coverage::LayerKey::from(&**expected);
let other = historic_layer_coverage::LayerKey::from(&*new);
let expected_l0 = Self::is_l0(expected);
let new_l0 = Self::is_l0(&new);
anyhow::ensure!(
key == other,
"expected and new must have equal LayerKeys: {key:?} != {other:?}"
);
anyhow::ensure!(
expected_l0 == new_l0,
"expected and new must both be l0 deltas or neither should be: {expected_l0} != {new_l0}"
);
let l0_index = if expected_l0 {
// find the index in case replace worked, we need to replace that as well
let pos = self.l0_delta_layers.iter().position(|slot| {
Self::compare_arced_layers(self.get_layer_from_mapping(&slot.key()), expected)
});
if pos.is_none() {
return Ok(Replacement::NotFound);
}
pos
} else {
None
};
let new_desc = Arc::new(new_desc);
let replaced = self.historic.replace(&key, new_desc.clone(), |existing| {
**existing == expected_desc
});
if let Replacement::Replaced { .. } = &replaced {
self.mapping.remove(&expected_desc.key());
self.mapping.insert(new_desc.key(), new);
if let Some(index) = l0_index {
self.l0_delta_layers[index] = new_desc;
}
}
let replaced = match replaced {
Replacement::Replaced { in_buffered } => Replacement::Replaced { in_buffered },
Replacement::NotFound => Replacement::NotFound,
Replacement::RemovalBuffered => Replacement::RemovalBuffered,
Replacement::Unexpected(x) => {
Replacement::Unexpected(self.get_layer_from_mapping(&x.key()).clone())
}
};
Ok(replaced)
} }
/// Helper function for BatchedUpdates::drop. /// Helper function for BatchedUpdates::drop.
@@ -454,10 +319,8 @@ where
Ok(true) Ok(true)
} }
pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<L>> { pub fn iter_historic_layers(&self) -> impl '_ + Iterator<Item = Arc<PersistentLayerDesc>> {
self.historic self.historic.iter()
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
} }
/// ///
@@ -472,7 +335,7 @@ where
&self, &self,
key_range: &Range<Key>, key_range: &Range<Key>,
lsn: Lsn, lsn: Lsn,
) -> Result<Vec<(Range<Key>, Option<Arc<L>>)>> { ) -> Result<Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)>> {
let version = match self.historic.get().unwrap().get_version(lsn.0) { let version = match self.historic.get().unwrap().get_version(lsn.0) {
Some(v) => v, Some(v) => v,
None => return Ok(vec![]), None => return Ok(vec![]),
@@ -482,36 +345,26 @@ where
let end = key_range.end.to_i128(); let end = key_range.end.to_i128();
// Initialize loop variables // Initialize loop variables
let mut coverage: Vec<(Range<Key>, Option<Arc<L>>)> = vec![]; let mut coverage: Vec<(Range<Key>, Option<Arc<PersistentLayerDesc>>)> = vec![];
let mut current_key = start; let mut current_key = start;
let mut current_val = version.image_coverage.query(start); let mut current_val = version.image_coverage.query(start);
// Loop through the change events and push intervals // Loop through the change events and push intervals
for (change_key, change_val) in version.image_coverage.range(start..end) { for (change_key, change_val) in version.image_coverage.range(start..end) {
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
coverage.push(( coverage.push((kr, current_val.take()));
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
current_key = change_key; current_key = change_key;
current_val = change_val.clone(); current_val = change_val.clone();
} }
// Add the final interval // Add the final interval
let kr = Key::from_i128(current_key)..Key::from_i128(end); let kr = Key::from_i128(current_key)..Key::from_i128(end);
coverage.push(( coverage.push((kr, current_val.take()));
kr,
current_val
.take()
.map(|l| self.get_layer_from_mapping(&l.key()).clone()),
));
Ok(coverage) Ok(coverage)
} }
pub fn is_l0(layer: &L) -> bool { pub fn is_l0(layer: &PersistentLayerDesc) -> bool {
range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX)) range_eq(&layer.get_key_range(), &(Key::MIN..Key::MAX))
} }
@@ -537,7 +390,7 @@ where
/// TODO The optimal number should probably be slightly higher than 1, but to /// TODO The optimal number should probably be slightly higher than 1, but to
/// implement that we need to plumb a lot more context into this function /// implement that we need to plumb a lot more context into this function
/// than just the current partition_range. /// than just the current partition_range.
pub fn is_reimage_worthy(layer: &L, partition_range: &Range<Key>) -> bool { pub fn is_reimage_worthy(layer: &PersistentLayerDesc, partition_range: &Range<Key>) -> bool {
// Case 1 // Case 1
if !Self::is_l0(layer) { if !Self::is_l0(layer) {
return true; return true;
@@ -595,9 +448,7 @@ where
let kr = Key::from_i128(current_key)..Key::from_i128(change_key); let kr = Key::from_i128(current_key)..Key::from_i128(change_key);
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() { if !kr.is_empty() {
let base_count = let base_count = Self::is_reimage_worthy(&val, key) as usize;
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count); let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = let max_stacked_deltas_underneath =
self.count_deltas(&kr, &lr, new_limit)?; self.count_deltas(&kr, &lr, new_limit)?;
@@ -620,9 +471,7 @@ where
let lr = lsn.start..val.get_lsn_range().start; let lr = lsn.start..val.get_lsn_range().start;
if !kr.is_empty() { if !kr.is_empty() {
let base_count = let base_count = Self::is_reimage_worthy(&val, key) as usize;
Self::is_reimage_worthy(self.get_layer_from_mapping(&val.key()), key)
as usize;
let new_limit = limit.map(|l| l - base_count); let new_limit = limit.map(|l| l - base_count);
let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?; let max_stacked_deltas_underneath = self.count_deltas(&kr, &lr, new_limit)?;
max_stacked_deltas = std::cmp::max( max_stacked_deltas = std::cmp::max(
@@ -772,12 +621,8 @@ where
} }
/// Return all L0 delta layers /// Return all L0 delta layers
pub fn get_level0_deltas(&self) -> Result<Vec<Arc<L>>> { pub fn get_level0_deltas(&self) -> Result<Vec<Arc<PersistentLayerDesc>>> {
Ok(self Ok(self.l0_delta_layers.to_vec())
.l0_delta_layers
.iter()
.map(|x| self.get_layer_from_mapping(&x.key()).clone())
.collect())
} }
/// debugging function to print out the contents of the layer map /// debugging function to print out the contents of the layer map
@@ -802,97 +647,76 @@ where
println!("End dump LayerMap"); println!("End dump LayerMap");
Ok(()) Ok(())
} }
}
/// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables. /// Similar to `Arc::ptr_eq`, but only compares the object pointers, not vtables.
/// ///
/// Returns `true` if the two `Arc` point to the same layer, false otherwise. /// Returns `true` if the two `Arc` point to the same layer, false otherwise.
#[inline(always)] ///
pub fn compare_arced_layers(left: &Arc<L>, right: &Arc<L>) -> bool { /// If comparing persistent layers, ALWAYS compare the layer descriptor key.
// "dyn Trait" objects are "fat pointers" in that they have two components: #[inline(always)]
// - pointer to the object pub fn compare_arced_layers<L: ?Sized>(left: &Arc<L>, right: &Arc<L>) -> bool {
// - pointer to the vtable // "dyn Trait" objects are "fat pointers" in that they have two components:
// // - pointer to the object
// rust does not provide a guarantee that these vtables are unique, but however // - pointer to the vtable
// `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the //
// pointer and the vtable need to be equal. // rust does not provide a guarantee that these vtables are unique, but however
// // `Arc::ptr_eq` as of writing (at least up to 1.67) uses a comparison where both the
// See: https://github.com/rust-lang/rust/issues/103763 // pointer and the vtable need to be equal.
// //
// A future version of rust will most likely use this form below, where we cast each // See: https://github.com/rust-lang/rust/issues/103763
// pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it //
// not affect the comparison. // A future version of rust will most likely use this form below, where we cast each
// // pointer into a pointer to unit, which drops the inaccessible vtable pointer, making it
// See: https://github.com/rust-lang/rust/pull/106450 // not affect the comparison.
let left = Arc::as_ptr(left) as *const (); //
let right = Arc::as_ptr(right) as *const (); // See: https://github.com/rust-lang/rust/pull/106450
let left = Arc::as_ptr(left) as *const ();
let right = Arc::as_ptr(right) as *const ();
left == right left == right
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{LayerMap, Replacement}; use super::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerDescriptor, LayerFileName}; use crate::tenant::storage_layer::{tests::LayerDescriptor, LayerFileName};
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
mod l0_delta_layers_updated { mod l0_delta_layers_updated {
use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc};
use super::*; use super::*;
#[test] #[test]
fn for_full_range_delta() { fn for_full_range_delta() {
// l0_delta_layers are used by compaction, and should observe all buffered updates // l0_delta_layers are used by compaction, and should observe all buffered updates
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69", "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69",
true true
) )
} }
#[test] #[test]
fn for_non_full_range_delta() { fn for_non_full_range_delta() {
// has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta // has minimal uncovered areas compared to l0_delta_layers_updated_on_insert_replace_remove_for_full_range_delta
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69", "000000000000000000000000000000000001-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFE__0000000053423C21-0000000053424D69",
// because not full range // because not full range
false false
) )
} }
#[test] #[test]
fn for_image() { fn for_image() {
l0_delta_layers_updated_scenario( l0_delta_layers_updated_scenario(
"000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69", "000000000000000000000000000000000000-000000000000000000000000000000010000__0000000053424D69",
// code only checks if it is a full range layer, doesn't care about images, which must // code only checks if it is a full range layer, doesn't care about images, which must
// mean we should in practice never have full range images // mean we should in practice never have full range images
false false
) )
}
#[test]
fn replacing_missing_l0_is_notfound() {
// original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should
// however only happen for precondition failures.
let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69";
let layer = LayerFileName::from_str(layer).unwrap();
let layer = LayerDescriptor::from(layer);
// same skeletan construction; see scenario below
let not_found = Arc::new(layer.clone());
let new_version = Arc::new(layer);
let mut map = LayerMap::default();
let res = map.batch_update().replace_historic(
not_found.get_persistent_layer_desc(),
&not_found,
new_version.get_persistent_layer_desc(),
new_version,
);
assert!(matches!(res, Ok(Replacement::NotFound)), "{res:?}");
} }
fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) {
@@ -906,46 +730,31 @@ mod tests {
// two disjoint Arcs in different lifecycle phases. even if it seems they must be the // two disjoint Arcs in different lifecycle phases. even if it seems they must be the
// same layer, we use LayerMap::compare_arced_layers as the identity of layers. // same layer, we use LayerMap::compare_arced_layers as the identity of layers.
assert!(!LayerMap::compare_arced_layers(&remote, &downloaded)); assert_eq!(remote.layer_desc(), downloaded.layer_desc());
let expected_in_counts = (1, usize::from(expected_l0)); let expected_in_counts = (1, usize::from(expected_l0));
map.batch_update() map.batch_update()
.insert_historic(remote.get_persistent_layer_desc(), remote.clone()); .insert_historic(remote.layer_desc().clone());
assert_eq!(count_layer_in(&map, &remote), expected_in_counts); assert_eq!(
count_layer_in(&map, remote.layer_desc()),
let replaced = map expected_in_counts
.batch_update()
.replace_historic(
remote.get_persistent_layer_desc(),
&remote,
downloaded.get_persistent_layer_desc(),
downloaded.clone(),
)
.expect("name derived attributes are the same");
assert!(
matches!(replaced, Replacement::Replaced { .. }),
"{replaced:?}"
); );
assert_eq!(count_layer_in(&map, &downloaded), expected_in_counts);
map.batch_update() map.batch_update()
.remove_historic(downloaded.get_persistent_layer_desc(), downloaded.clone()); .remove_historic(downloaded.layer_desc().clone());
assert_eq!(count_layer_in(&map, &downloaded), (0, 0)); assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0));
} }
fn count_layer_in<L: Layer + ?Sized>(map: &LayerMap<L>, layer: &Arc<L>) -> (usize, usize) { fn count_layer_in(map: &LayerMap, layer: &PersistentLayerDesc) -> (usize, usize) {
let historic = map let historic = map
.iter_historic_layers() .iter_historic_layers()
.filter(|x| LayerMap::compare_arced_layers(x, layer)) .filter(|x| x.key() == layer.key())
.count(); .count();
let l0s = map let l0s = map
.get_level0_deltas() .get_level0_deltas()
.expect("why does this return a result"); .expect("why does this return a result");
let l0 = l0s let l0 = l0s.iter().filter(|x| x.key() == layer.key()).count();
.iter()
.filter(|x| LayerMap::compare_arced_layers(x, layer))
.count();
(historic, l0) (historic, l0)
} }

View File

@@ -3,6 +3,8 @@ use std::ops::Range;
use tracing::info; use tracing::info;
use crate::tenant::storage_layer::PersistentLayerDesc;
use super::layer_coverage::LayerCoverageTuple; use super::layer_coverage::LayerCoverageTuple;
/// Layers in this module are identified and indexed by this data. /// Layers in this module are identified and indexed by this data.
@@ -53,11 +55,24 @@ impl<'a, L: crate::tenant::storage_layer::Layer + ?Sized> From<&'a L> for LayerK
} }
} }
impl From<&PersistentLayerDesc> for LayerKey {
fn from(layer: &PersistentLayerDesc) -> Self {
let kr = layer.get_key_range();
let lr = layer.get_lsn_range();
LayerKey {
key: kr.start.to_i128()..kr.end.to_i128(),
lsn: lr.start.0..lr.end.0,
is_image: !layer.is_incremental(),
}
}
}
/// Efficiently queryable layer coverage for each LSN. /// Efficiently queryable layer coverage for each LSN.
/// ///
/// Allows answering layer map queries very efficiently, /// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is /// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage. /// sometimes necessary. See BufferedHistoricLayerCoverage.
#[derive(Clone)]
pub struct HistoricLayerCoverage<Value> { pub struct HistoricLayerCoverage<Value> {
/// The latest state /// The latest state
head: LayerCoverageTuple<Value>, head: LayerCoverageTuple<Value>,
@@ -411,6 +426,7 @@ fn test_persistent_overlapping() {
/// ///
/// See this for more on persistent and retroactive techniques: /// See this for more on persistent and retroactive techniques:
/// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s /// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s
#[derive(Clone)]
pub struct BufferedHistoricLayerCoverage<Value> { pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update /// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>, historic_coverage: HistoricLayerCoverage<Value>,
@@ -467,6 +483,11 @@ impl<Value: Clone> BufferedHistoricLayerCoverage<Value> {
/// ///
/// Returns a `Replacement` value describing the outcome; only the case of /// Returns a `Replacement` value describing the outcome; only the case of
/// `Replacement::Replaced` modifies the map and requires a rebuild. /// `Replacement::Replaced` modifies the map and requires a rebuild.
///
/// This function is unlikely to be used in the future because LayerMap now only records the
/// layer descriptors. Therefore, anything added to the layer map will only be removed or
/// added, and never replaced.
#[allow(dead_code)]
pub fn replace<F>( pub fn replace<F>(
&mut self, &mut self,
layer_key: &LayerKey, layer_key: &LayerKey,

View File

@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
/// ///
/// NOTE The struct is parameterized over Value for easier /// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer. /// testing, but in practice it's some sort of layer.
#[derive(Clone)]
pub struct LayerCoverage<Value> { pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space) /// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value). /// we store (lsn.end, value).
@@ -139,6 +140,7 @@ impl<Value: Clone> LayerCoverage<Value> {
} }
/// Image and delta coverage at a specific LSN. /// Image and delta coverage at a specific LSN.
#[derive(Clone)]
pub struct LayerCoverageTuple<Value> { pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>, pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>, pub delta_coverage: LayerCoverage<Value>,

View File

@@ -0,0 +1,146 @@
//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state.
//!
//! A common usage pattern is as follows:
//!
//! ```ignore
//! async fn compaction(&self) {
//! // Get the current state.
//! let state = self.layer_map_mgr.read();
//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may
//! // take a long time.
//! let compaction_result = self.do_compaction(&state).await?;
//! // Update the state.
//! self.layer_map_mgr.update(|mut state| async move {
//! // do updates to the state, return it.
//! Ok(state)
//! }).await?;
//! }
//! ```
use anyhow::Result;
use arc_swap::ArcSwap;
use futures::Future;
use std::sync::Arc;
use super::layer_map::LayerMap;
/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the
/// layer map.
pub struct LayerMapMgr {
layer_map: ArcSwap<LayerMap>,
state_lock: tokio::sync::Mutex<()>,
}
impl LayerMapMgr {
/// Get the current state of the layer map.
pub fn read(&self) -> Arc<LayerMap> {
// TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves
// disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version
// here.
self.layer_map.load_full()
}
/// Clone the layer map for modification.
fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap {
(**self.layer_map.load()).clone()
}
pub fn new(layer_map: LayerMap) -> Self {
Self {
layer_map: ArcSwap::new(Arc::new(layer_map)),
state_lock: tokio::sync::Mutex::new(()),
}
}
/// Update the layer map.
pub async fn update<O, F>(&self, operation: O) -> Result<()>
where
O: FnOnce(LayerMap) -> F,
F: Future<Output = Result<LayerMap>>,
{
let state_lock = self.state_lock.lock().await;
let state = self.clone_for_write(&state_lock);
let new_state = operation(state).await?;
self.layer_map.store(Arc::new(new_state));
Ok(())
}
}
#[cfg(test)]
mod tests {
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc};
use super::*;
#[tokio::test]
async fn test_layer_map_manage() -> Result<()> {
let mgr = LayerMapMgr::new(Default::default());
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(0)..Key::from_i128(1),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_1 = mgr.read();
mgr.update(|mut map| async move {
let mut updates = map.batch_update();
updates.insert_historic(PersistentLayerDesc::new_img(
TenantId::generate(),
TimelineId::generate(),
Key::from_i128(1)..Key::from_i128(2),
Lsn(0),
false,
0,
));
updates.flush();
Ok(map)
})
.await?;
let ref_2 = mgr.read();
// Modification should not be visible to the old reference.
assert_eq!(
ref_1
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none());
// Modification should be visible to the new reference.
assert_eq!(
ref_2
.search(Key::from_i128(0), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(0)..Key::from_i128(1)
);
assert_eq!(
ref_2
.search(Key::from_i128(1), Lsn(1))
.unwrap()
.layer
.key_range,
Key::from_i128(1)..Key::from_i128(2)
);
Ok(())
}
}

View File

@@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use remote_layer::RemoteLayer; pub use remote_layer::RemoteLayer;
use super::layer_map::BatchedUpdates;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where where
T: PartialOrd<T>, T: PartialOrd<T>,
@@ -176,19 +174,9 @@ impl LayerAccessStats {
/// Create an empty stats object and record a [`LayerLoad`] event with the given residence status. /// Create an empty stats object and record a [`LayerLoad`] event with the given residence status.
/// ///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock. /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn for_loading_layer<L>( pub(crate) fn for_loading_layer(status: LayerResidenceStatus) -> Self {
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus,
) -> Self
where
L: ?Sized + Layer,
{
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
new.record_residence_event( new.record_residence_event(status, LayerResidenceEventReason::LayerLoad);
layer_map_lock_held_witness,
status,
LayerResidenceEventReason::LayerLoad,
);
new new
} }
@@ -197,24 +185,16 @@ impl LayerAccessStats {
/// The `new_status` is not recorded in `self`. /// The `new_status` is not recorded in `self`.
/// ///
/// See [`record_residence_event`] for why you need to do this while holding the layer map lock. /// See [`record_residence_event`] for why you need to do this while holding the layer map lock.
pub(crate) fn clone_for_residence_change<L>( pub(crate) fn clone_for_residence_change(
&self, &self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
new_status: LayerResidenceStatus, new_status: LayerResidenceStatus,
) -> LayerAccessStats ) -> LayerAccessStats {
where
L: ?Sized + Layer,
{
let clone = { let clone = {
let inner = self.0.lock().unwrap(); let inner = self.0.lock().unwrap();
inner.clone() inner.clone()
}; };
let new = LayerAccessStats(Mutex::new(clone)); let new = LayerAccessStats(Mutex::new(clone));
new.record_residence_event( new.record_residence_event(new_status, LayerResidenceEventReason::ResidenceChange);
layer_map_lock_held_witness,
new_status,
LayerResidenceEventReason::ResidenceChange,
);
new new
} }
@@ -232,14 +212,11 @@ impl LayerAccessStats {
/// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock. /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
/// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event. /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
/// ///
pub(crate) fn record_residence_event<L>( pub(crate) fn record_residence_event(
&self, &self,
_layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
status: LayerResidenceStatus, status: LayerResidenceStatus,
reason: LayerResidenceEventReason, reason: LayerResidenceEventReason,
) where ) {
L: ?Sized + Layer,
{
let mut locked = self.0.lock().unwrap(); let mut locked = self.0.lock().unwrap();
locked.iter_mut().for_each(|inner| { locked.iter_mut().for_each(|inner| {
inner inner
@@ -473,94 +450,125 @@ pub fn downcast_remote_layer(
} }
} }
/// Holds metadata about a layer without any content. Used mostly for testing. pub mod tests {
/// use super::*;
/// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
/// LayerDescriptor.
#[derive(Clone, Debug)]
pub struct LayerDescriptor {
pub key: Range<Key>,
pub lsn: Range<Lsn>,
pub is_incremental: bool,
pub short_id: String,
}
impl LayerDescriptor { /// Holds metadata about a layer without any content. Used mostly for testing.
/// `LayerDescriptor` is only used for testing purpose so it does not matter whether it is image / delta, ///
/// and the tenant / timeline id does not matter. /// To use filenames as fixtures, parse them as [`LayerFileName`] then convert from that to a
pub fn get_persistent_layer_desc(&self) -> PersistentLayerDesc { /// LayerDescriptor.
PersistentLayerDesc::new_delta( #[derive(Clone, Debug)]
TenantId::from_array([0; 16]), pub struct LayerDescriptor {
TimelineId::from_array([0; 16]), base: PersistentLayerDesc,
self.key.clone(),
self.lsn.clone(),
233,
)
}
}
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
} }
fn get_lsn_range(&self) -> Range<Lsn> { impl From<PersistentLayerDesc> for LayerDescriptor {
self.lsn.clone() fn from(base: PersistentLayerDesc) -> Self {
} Self { base }
fn is_incremental(&self) -> bool {
self.is_incremental
}
fn get_value_reconstruct_data(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
todo!("This method shouldn't be part of the Layer trait")
}
fn short_id(&self) -> String {
self.short_id.clone()
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
let short_id = value.to_string();
LayerDescriptor {
key: value.key_range,
lsn: value.lsn_range,
is_incremental: true,
short_id,
} }
} }
}
impl From<ImageFileName> for LayerDescriptor { impl Layer for LayerDescriptor {
fn from(value: ImageFileName) -> Self { fn get_value_reconstruct_data(
let short_id = value.to_string(); &self,
let lsn = value.lsn_as_range(); _key: Key,
LayerDescriptor { _lsn_range: Range<Lsn>,
key: value.key_range, _reconstruct_data: &mut ValueReconstructState,
lsn, _ctx: &RequestContext,
is_incremental: false, ) -> Result<ValueReconstructResult> {
short_id, todo!("This method shouldn't be part of the Layer trait")
}
fn dump(&self, _verbose: bool, _ctx: &RequestContext) -> Result<()> {
todo!()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_key_range(&self) -> Range<Key> {
self.layer_desc().key_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn get_lsn_range(&self) -> Range<Lsn> {
self.layer_desc().lsn_range.clone()
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn is_incremental(&self) -> bool {
self.layer_desc().is_incremental
}
/// Boilerplate to implement the Layer trait, always use layer_desc for persistent layers.
fn short_id(&self) -> String {
self.layer_desc().short_id()
} }
} }
}
impl From<LayerFileName> for LayerDescriptor { impl PersistentLayer for LayerDescriptor {
fn from(value: LayerFileName) -> Self { fn layer_desc(&self) -> &PersistentLayerDesc {
match value { &self.base
LayerFileName::Delta(d) => Self::from(d), }
LayerFileName::Image(i) => Self::from(i),
fn local_path(&self) -> Option<PathBuf> {
unimplemented!()
}
fn iter(&self, _: &RequestContext) -> Result<LayerIter<'_>> {
unimplemented!()
}
fn key_iter(&self, _: &RequestContext) -> Result<LayerKeyIter<'_>> {
unimplemented!()
}
fn delete_resident_layer_file(&self) -> Result<()> {
unimplemented!()
}
fn info(&self, _: LayerAccessStatsReset) -> HistoricLayerInfo {
unimplemented!()
}
fn access_stats(&self) -> &LayerAccessStats {
unimplemented!()
}
}
impl From<DeltaFileName> for LayerDescriptor {
fn from(value: DeltaFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_delta(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn_range,
233,
),
}
}
}
impl From<ImageFileName> for LayerDescriptor {
fn from(value: ImageFileName) -> Self {
LayerDescriptor {
base: PersistentLayerDesc::new_img(
TenantId::from_array([0; 16]),
TimelineId::from_array([0; 16]),
value.key_range,
value.lsn,
false,
233,
),
}
}
}
impl From<LayerFileName> for LayerDescriptor {
fn from(value: LayerFileName) -> Self {
match value {
LayerFileName::Delta(d) => Self::from(d),
LayerFileName::Image(i) => Self::from(i),
}
} }
} }
} }

View File

@@ -4,7 +4,6 @@
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::context::RequestContext; use crate::context::RequestContext;
use crate::repository::Key; use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use anyhow::{bail, Result}; use anyhow::{bail, Result};
@@ -218,15 +217,11 @@ impl RemoteLayer {
} }
/// Create a Layer struct representing this layer, after it has been downloaded. /// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer<L>( pub fn create_downloaded_layer(
&self, &self,
layer_map_lock_held_witness: &BatchedUpdates<'_, L>,
conf: &'static PageServerConf, conf: &'static PageServerConf,
file_size: u64, file_size: u64,
) -> Arc<dyn PersistentLayer> ) -> Arc<dyn PersistentLayer> {
where
L: ?Sized + Layer,
{
if self.desc.is_delta { if self.desc.is_delta {
let fname = self.desc.delta_file_name(); let fname = self.desc.delta_file_name();
Arc::new(DeltaLayer::new( Arc::new(DeltaLayer::new(
@@ -235,10 +230,8 @@ impl RemoteLayer {
self.desc.tenant_id, self.desc.tenant_id,
&fname, &fname,
file_size, file_size,
self.access_stats.clone_for_residence_change( self.access_stats
layer_map_lock_held_witness, .clone_for_residence_change(LayerResidenceStatus::Resident),
LayerResidenceStatus::Resident,
),
)) ))
} else { } else {
let fname = self.desc.image_file_name(); let fname = self.desc.image_file_name();
@@ -248,10 +241,8 @@ impl RemoteLayer {
self.desc.tenant_id, self.desc.tenant_id,
&fname, &fname,
file_size, file_size,
self.access_stats.clone_for_residence_change( self.access_stats
layer_map_lock_held_witness, .clone_for_residence_change(LayerResidenceStatus::Resident),
LayerResidenceStatus::Resident,
),
)) ))
} }
} }

File diff suppressed because it is too large Load Diff

View File

@@ -197,9 +197,11 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction. // We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this. // So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = { let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().await; let guard = self.lcache.layer_in_use_read().await;
let layers = self.layer_mgr.read();
let mut candidates = Vec::new(); let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() { for hist_layer in layers.iter_historic_layers() {
let hist_layer = self.lcache.get_from_desc(&hist_layer);
if hist_layer.is_remote_layer() { if hist_layer.is_remote_layer() {
continue; continue;
} }