diff --git a/Cargo.lock b/Cargo.lock index 6856b9e3ac..1169f45467 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,12 @@ dependencies = [ "backtrace", ] +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "archery" version = "0.5.0" @@ -2542,6 +2548,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "arc-swap", "async-stream", "async-trait", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index dc34705f8d..466e9e7a09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ license = "Apache-2.0" ## All dependency versions, used in the project [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } +arc-swap = "1.6" async-stream = "0.3" async-trait = "0.1" atty = "0.2.14" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index ea81544cbe..ce510b4e5b 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -12,6 +12,7 @@ testing = ["fail/failpoints"] [dependencies] anyhow.workspace = true +arc-swap.workspace = true async-stream.workspace = true async-trait.workspace = true byteorder.workspace = true diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8effb1a238..6454400c72 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -87,6 +87,7 @@ pub mod disk_btree; pub(crate) mod ephemeral_file; pub mod layer_map; pub mod manifest; +pub mod layer_map_mgr; pub mod metadata; mod par_fsync; diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 893e4d52a3..4fb459c07c 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -66,7 +66,7 @@ use super::storage_layer::PersistentLayerDesc; /// /// LayerMap tracks what layers exist on a timeline. /// -#[derive(Default)] +#[derive(Default, Clone)] pub struct LayerMap { // // 'open_layer' holds the current InMemoryLayer that is accepting new diff --git a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs index ad463b4f23..e68d457e91 100644 --- a/pageserver/src/tenant/layer_map/historic_layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/historic_layer_coverage.rs @@ -72,6 +72,7 @@ impl From<&PersistentLayerDesc> for LayerKey { /// Allows answering layer map queries very efficiently, /// but doesn't allow retroactive insertion, which is /// sometimes necessary. See BufferedHistoricLayerCoverage. +#[derive(Clone)] pub struct HistoricLayerCoverage { /// The latest state head: LayerCoverageTuple, @@ -425,6 +426,7 @@ fn test_persistent_overlapping() { /// /// See this for more on persistent and retroactive techniques: /// https://www.youtube.com/watch?v=WqCWghETNDc&t=581s +#[derive(Clone)] pub struct BufferedHistoricLayerCoverage { /// A persistent layer map that we rebuild when we need to retroactively update historic_coverage: HistoricLayerCoverage, diff --git a/pageserver/src/tenant/layer_map/layer_coverage.rs b/pageserver/src/tenant/layer_map/layer_coverage.rs index 47aace97a5..5dc2f8e5cc 100644 --- a/pageserver/src/tenant/layer_map/layer_coverage.rs +++ b/pageserver/src/tenant/layer_map/layer_coverage.rs @@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync; /// /// NOTE The struct is parameterized over Value for easier /// testing, but in practice it's some sort of layer. +#[derive(Clone)] pub struct LayerCoverage { /// For every change in coverage (as we sweep the key space) /// we store (lsn.end, value). @@ -139,6 +140,7 @@ impl LayerCoverage { } /// Image and delta coverage at a specific LSN. +#[derive(Clone)] pub struct LayerCoverageTuple { pub image_coverage: LayerCoverage, pub delta_coverage: LayerCoverage, diff --git a/pageserver/src/tenant/layer_map_mgr.rs b/pageserver/src/tenant/layer_map_mgr.rs new file mode 100644 index 0000000000..1e165c55ab --- /dev/null +++ b/pageserver/src/tenant/layer_map_mgr.rs @@ -0,0 +1,146 @@ +//! This module implements `LayerMapMgr`, which manages a layer map object and provides lock-free access to the state. +//! +//! A common usage pattern is as follows: +//! +//! ```ignore +//! async fn compaction(&self) { +//! // Get the current state. +//! let state = self.layer_map_mgr.read(); +//! // No lock held at this point. Do compaction based on the state. This part usually incurs I/O operations and may +//! // take a long time. +//! let compaction_result = self.do_compaction(&state).await?; +//! // Update the state. +//! self.layer_map_mgr.update(|mut state| async move { +//! // do updates to the state, return it. +//! Ok(state) +//! }).await?; +//! } +//! ``` +use anyhow::Result; +use arc_swap::ArcSwap; +use futures::Future; +use std::sync::Arc; + +use super::layer_map::LayerMap; + +/// Manages the storage state. Provide utility functions to modify the layer map and get an immutable reference to the +/// layer map. +pub struct LayerMapMgr { + layer_map: ArcSwap, + state_lock: tokio::sync::Mutex<()>, +} + +impl LayerMapMgr { + /// Get the current state of the layer map. + pub fn read(&self) -> Arc { + // TODO: it is possible to use `load` to reduce the overhead of cloning the Arc, but read path usually involves + // disk reads and layer mapping fetching, and therefore it's not a big deal to use a more optimized version + // here. + self.layer_map.load_full() + } + + /// Clone the layer map for modification. + fn clone_for_write(&self, _state_lock_witness: &tokio::sync::MutexGuard<'_, ()>) -> LayerMap { + (**self.layer_map.load()).clone() + } + + pub fn new(layer_map: LayerMap) -> Self { + Self { + layer_map: ArcSwap::new(Arc::new(layer_map)), + state_lock: tokio::sync::Mutex::new(()), + } + } + + /// Update the layer map. + pub async fn update(&self, operation: O) -> Result<()> + where + O: FnOnce(LayerMap) -> F, + F: Future>, + { + let state_lock = self.state_lock.lock().await; + let state = self.clone_for_write(&state_lock); + let new_state = operation(state).await?; + self.layer_map.store(Arc::new(new_state)); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use utils::{ + id::{TenantId, TimelineId}, + lsn::Lsn, + }; + + use crate::{repository::Key, tenant::storage_layer::PersistentLayerDesc}; + + use super::*; + + #[tokio::test] + async fn test_layer_map_manage() -> Result<()> { + let mgr = LayerMapMgr::new(Default::default()); + mgr.update(|mut map| async move { + let mut updates = map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_img( + TenantId::generate(), + TimelineId::generate(), + Key::from_i128(0)..Key::from_i128(1), + Lsn(0), + false, + 0, + )); + updates.flush(); + Ok(map) + }) + .await?; + + let ref_1 = mgr.read(); + + mgr.update(|mut map| async move { + let mut updates = map.batch_update(); + updates.insert_historic(PersistentLayerDesc::new_img( + TenantId::generate(), + TimelineId::generate(), + Key::from_i128(1)..Key::from_i128(2), + Lsn(0), + false, + 0, + )); + updates.flush(); + Ok(map) + }) + .await?; + + let ref_2 = mgr.read(); + + // Modification should not be visible to the old reference. + assert_eq!( + ref_1 + .search(Key::from_i128(0), Lsn(1)) + .unwrap() + .layer + .key_range, + Key::from_i128(0)..Key::from_i128(1) + ); + assert!(ref_1.search(Key::from_i128(1), Lsn(1)).is_none()); + + // Modification should be visible to the new reference. + assert_eq!( + ref_2 + .search(Key::from_i128(0), Lsn(1)) + .unwrap() + .layer + .key_range, + Key::from_i128(0)..Key::from_i128(1) + ); + assert_eq!( + ref_2 + .search(Key::from_i128(1), Lsn(1)) + .unwrap() + .layer + .key_range, + Key::from_i128(1)..Key::from_i128(2) + ); + Ok(()) + } +}