switch to immutable layer map implementation

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z
2023-07-20 16:35:26 -04:00
parent 3fc3666df7
commit 91c7bbb230
10 changed files with 393 additions and 177 deletions

40
Cargo.lock generated
View File

@@ -110,6 +110,12 @@ dependencies = [
"backtrace",
]
[[package]]
name = "arc-swap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6"
[[package]]
name = "archery"
version = "0.5.0"
@@ -1185,15 +1191,15 @@ dependencies = [
[[package]]
name = "dashmap"
version = "5.4.0"
version = "5.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
checksum = "6943ae99c34386c84a470c499d3414f66502a41340aa895406e0d2e4a207b91d"
dependencies = [
"cfg-if",
"hashbrown 0.12.3",
"hashbrown 0.14.0",
"lock_api",
"once_cell",
"parking_lot_core 0.9.7",
"parking_lot_core 0.9.8",
]
[[package]]
@@ -1642,6 +1648,12 @@ dependencies = [
"ahash",
]
[[package]]
name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
[[package]]
name = "hashlink"
version = "0.8.2"
@@ -2057,9 +2069,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "lock_api"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
dependencies = [
"autocfg",
"scopeguard",
@@ -2323,9 +2335,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.17.1"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "oorandom"
@@ -2515,6 +2527,7 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"arc-swap",
"async-compression",
"async-stream",
"async-trait",
@@ -2528,6 +2541,7 @@ dependencies = [
"crc32c",
"criterion",
"crossbeam-utils",
"dashmap",
"either",
"enum-map",
"enumset",
@@ -2622,7 +2636,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.7",
"parking_lot_core 0.9.8",
]
[[package]]
@@ -2641,15 +2655,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.7"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.2.16",
"redox_syscall 0.3.5",
"smallvec",
"windows-sys 0.45.0",
"windows-targets 0.48.0",
]
[[package]]

View File

@@ -32,6 +32,7 @@ license = "Apache-2.0"
## All dependency versions, used in the project
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip"] }
flate2 = "1.0.26"
async-stream = "0.3"
@@ -54,6 +55,7 @@ comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-utils = "0.8.5"
dashmap = "5.5"
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"

View File

@@ -12,6 +12,7 @@ testing = ["fail/failpoints"]
[dependencies]
anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
@@ -24,6 +25,7 @@ const_format.workspace = true
consumption_metrics.workspace = true
crc32c.workspace = true
crossbeam-utils.workspace = true
dashmap.workspace = true
either.workspace = true
flate2.workspace = true
fail.workspace = true

View File

@@ -65,7 +65,7 @@ use super::storage_layer::PersistentLayerDesc;
///
/// LayerMap tracks what layers exist on a timeline.
///
#[derive(Default)]
#[derive(Default, Clone)]
pub struct LayerMap {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new

View File

@@ -60,6 +60,7 @@ impl From<&PersistentLayerDesc> for LayerKey {
/// Allows answering layer map queries very efficiently,
/// but doesn't allow retroactive insertion, which is
/// sometimes necessary. See BufferedHistoricLayerCoverage.
#[derive(Clone)]
pub struct HistoricLayerCoverage<Value> {
/// The latest state
head: LayerCoverageTuple<Value>,
@@ -412,6 +413,7 @@ fn test_persistent_overlapping() {
///
/// See this for more on persistent and retroactive techniques:
/// <https://www.youtube.com/watch?v=WqCWghETNDc&t=581s>
#[derive(Clone)]
pub struct BufferedHistoricLayerCoverage<Value> {
/// A persistent layer map that we rebuild when we need to retroactively update
historic_coverage: HistoricLayerCoverage<Value>,

View File

@@ -15,6 +15,7 @@ use rpds::RedBlackTreeMapSync;
///
/// NOTE The struct is parameterized over Value for easier
/// testing, but in practice it's some sort of layer.
#[derive(Clone)]
pub struct LayerCoverage<Value> {
/// For every change in coverage (as we sweep the key space)
/// we store (lsn.end, value).
@@ -138,6 +139,7 @@ impl<Value: Clone> LayerCoverage<Value> {
}
/// Image and delta coverage at a specific LSN.
#[derive(Clone)]
pub struct LayerCoverageTuple<Value> {
pub image_coverage: LayerCoverage<Value>,
pub delta_coverage: LayerCoverage<Value>,

View File

@@ -41,7 +41,7 @@ pub use inmemory_layer::InMemoryLayer;
pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
pub use remote_layer::RemoteLayer;
use super::timeline::layer_manager::LayerManager;
use super::timeline::layer_manager::LayerManagerWriteGuard;
pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
where
@@ -176,7 +176,7 @@ impl LayerAccessStats {
/// [`LayerLoad`]: LayerResidenceEventReason::LayerLoad
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn for_loading_layer(
layer_map_lock_held_witness: &LayerManager,
layer_map_lock_held_witness: &LayerManagerWriteGuard,
status: LayerResidenceStatus,
) -> Self {
let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default()));
@@ -197,7 +197,7 @@ impl LayerAccessStats {
/// [`record_residence_event`]: Self::record_residence_event
pub(crate) fn clone_for_residence_change(
&self,
layer_map_lock_held_witness: &LayerManager,
layer_map_lock_held_witness: &LayerManagerWriteGuard,
new_status: LayerResidenceStatus,
) -> LayerAccessStats {
let clone = {
@@ -229,7 +229,7 @@ impl LayerAccessStats {
///
pub(crate) fn record_residence_event(
&self,
_layer_map_lock_held_witness: &LayerManager,
_layer_map_lock_held_witness: &LayerManagerWriteGuard,
status: LayerResidenceStatus,
reason: LayerResidenceEventReason,
) {

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::layer_manager::LayerManager;
use crate::tenant::timeline::layer_manager::LayerManagerWriteGuard;
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -226,7 +226,7 @@ impl RemoteLayer {
/// Create a Layer struct representing this layer, after it has been downloaded.
pub fn create_downloaded_layer(
&self,
layer_map_lock_held_witness: &LayerManager,
layer_map_lock_held_witness: &LayerManagerWriteGuard,
conf: &'static PageServerConf,
file_size: u64,
) -> Arc<dyn PersistentLayer> {

View File

@@ -81,7 +81,7 @@ use crate::{is_temporary, task_mgr};
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::layer_manager::{LayerManager, LayerManagerReadGuard, LayerManagerWriteGuard};
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
@@ -125,13 +125,13 @@ impl PartialOrd for Hole {
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_rlock<T>(rlock: tokio::sync::OwnedRwLockReadGuard<T>) {
fn drop_rlock(rlock: LayerManagerReadGuard) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
fn drop_wlock(rlock: LayerManagerWriteGuard) {
drop(rlock)
}
pub struct Timeline {
@@ -162,7 +162,7 @@ pub struct Timeline {
///
/// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: Arc<tokio::sync::RwLock<LayerManager>>,
pub(crate) layers: Arc<LayerManager>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -1109,7 +1109,7 @@ impl Timeline {
&self,
_layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>,
local_layer: &Arc<dyn PersistentLayer>,
layer_mgr: &mut LayerManager,
layer_mgr: &mut LayerManagerWriteGuard,
) -> Result<(), EvictionError> {
if local_layer.is_remote_layer() {
return Err(EvictionError::CannotEvictRemoteLayer);
@@ -1349,7 +1349,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())),
layers: Arc::new(LayerManager::create()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -2561,13 +2561,15 @@ impl Timeline {
///
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self.layers.write().await;
let layer = guard.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)?;
let layer = guard
.get_layer_for_write(
lsn,
self.get_last_record_lsn(),
self.conf,
self.timeline_id,
self.tenant_id,
)
.await?;
Ok(layer)
}
@@ -2600,7 +2602,7 @@ impl Timeline {
Some(self.write_lock.lock().await)
};
let mut guard = self.layers.write().await;
guard.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at);
guard.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at).await;
}
/// Layer flusher task's main loop.
@@ -2772,7 +2774,7 @@ impl Timeline {
self.metrics.persistent_bytes_written.inc_by(sz);
}
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer).await;
// release lock on 'layers'
}
@@ -3151,7 +3153,7 @@ impl Timeline {
LayerResidenceEventReason::LayerCreate,
);
}
guard.track_new_image_layers(image_layers);
guard.track_new_image_layers(image_layers).await;
drop_wlock(guard);
timer.stop_and_record();
@@ -3315,7 +3317,7 @@ impl Timeline {
fn compact_level0_phase1(
self: Arc<Self>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
guard: tokio::sync::OwnedRwLockReadGuard<LayerManager>,
guard: LayerManagerReadGuard,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
ctx: &RequestContext,
@@ -3739,7 +3741,7 @@ impl Timeline {
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
let phase1_layers_locked = self.layers.read().await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
@@ -3837,12 +3839,14 @@ impl Timeline {
remove_layers.push(guard.get_from_desc(&ldesc));
}
guard.finish_compact_l0(
layer_removal_cs,
remove_layers,
insert_layers,
&self.metrics,
)?;
guard
.finish_compact_l0(
layer_removal_cs,
remove_layers,
insert_layers,
&self.metrics,
)
.await?;
drop_wlock(guard);
@@ -4175,7 +4179,9 @@ impl Timeline {
layer_names_to_delete.push(doomed_layer.filename());
result.layers_removed += 1;
}
let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?;
guard
.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)
.await?;
if result.layers_removed != 0 {
fail_point!("after-timeline-gc-removed-layers");
@@ -4184,8 +4190,6 @@ impl Timeline {
if let Some(remote_client) = &self.remote_client {
remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?;
}
apply.flush();
}
info!(

View File

@@ -1,5 +1,7 @@
use anyhow::{bail, ensure, Context, Result};
use std::{collections::HashMap, sync::Arc};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use std::sync::Arc;
use tracing::trace;
use utils::{
id::{TenantId, TimelineId},
@@ -21,94 +23,229 @@ use crate::{
/// Provides semantic APIs to manipulate the layer map.
pub struct LayerManager {
layer_map: LayerMap,
layer_fmgr: LayerFileManager,
/// The layer map that tracks all layers in the timeline at the current time.
layer_map: ArcSwap<LayerMap>,
/// Ensure there is only one thread that can modify the layer map at a time.
state_lock: Arc<tokio::sync::Mutex<()>>,
/// Layer file manager that manages the layer files in the local / remote file system.
layer_fmgr: Arc<LayerFileManager>,
/// The lock to mock the original behavior of `RwLock<LayerMap>`. Can be removed if
/// #4509 is implemented.
pseudo_lock: Arc<tokio::sync::RwLock<()>>,
}
/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after
/// scheduling deletes in remote client.
pub struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>);
struct LayerSnapshot {
/// The current snapshot of the layer map. Immutable.
layer_map: Arc<LayerMap>,
/// Reference to the file manager. This is mutable and the content might change when
/// the snapshot is held.
layer_fmgr: Arc<LayerFileManager>,
}
impl ApplyGcResultGuard<'_> {
pub fn flush(self) {
self.0.flush();
}
pub struct LayerManagerReadGuard {
snapshot: LayerSnapshot,
/// Mock the behavior of the layer map lock.
#[allow(dead_code)]
pseudo_lock: tokio::sync::OwnedRwLockReadGuard<()>,
}
pub struct LayerManagerWriteGuard {
snapshot: LayerSnapshot,
/// Semantic layer operations will need to modify the layer content.
layer_manager: Arc<LayerManager>,
/// Mock the behavior of the layer map lock.
#[allow(dead_code)]
pseudo_lock: tokio::sync::OwnedRwLockWriteGuard<()>,
}
impl LayerManager {
pub fn create() -> Self {
Self {
layer_map: LayerMap::default(),
layer_fmgr: LayerFileManager::new(),
layer_map: ArcSwap::from(Arc::new(LayerMap::default())),
state_lock: Arc::new(tokio::sync::Mutex::new(())),
layer_fmgr: Arc::new(LayerFileManager::new()),
pseudo_lock: Arc::new(tokio::sync::RwLock::new(())),
}
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
pub async fn read(&self) -> LayerManagerReadGuard {
LayerManagerReadGuard {
snapshot: LayerSnapshot {
layer_map: self.layer_map.load_full(),
layer_fmgr: Arc::clone(&self.layer_fmgr),
},
pseudo_lock: self.pseudo_lock.clone().read_owned().await,
}
}
pub async fn write(self: &Arc<Self>) -> LayerManagerWriteGuard {
LayerManagerWriteGuard {
snapshot: LayerSnapshot {
layer_map: self.layer_map.load_full(),
layer_fmgr: Arc::clone(&self.layer_fmgr),
},
pseudo_lock: self.pseudo_lock.clone().write_owned().await,
layer_manager: self.clone(),
}
}
pub fn try_write(
self: &Arc<Self>,
) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
self.pseudo_lock
.clone()
.try_write_owned()
.map(|pseudo_lock| LayerManagerWriteGuard {
snapshot: LayerSnapshot {
layer_map: self.layer_map.load_full(),
layer_fmgr: Arc::clone(&self.layer_fmgr),
},
pseudo_lock,
layer_manager: self.clone(),
})
}
/// Make an update to the layer map. This function will NOT take the state lock and should ONLY
/// be used when there is not known concurrency when initializing the layer map. Error will be returned only when
/// the update function fails.
fn initialize_update(&self, f: impl FnOnce(LayerMap) -> Result<LayerMap>) -> Result<()> {
let snapshot = self.layer_map.load_full();
let new_layer_map = f(LayerMap::clone(&*snapshot))?;
let old_layer_map = self.layer_map.swap(Arc::new(new_layer_map));
debug_assert_eq!(
Arc::as_ptr(&snapshot) as usize,
Arc::as_ptr(&old_layer_map) as usize,
"race detected when modifying layer map, use `update` instead of `initialize_update`."
);
Ok(())
}
/// Make an update to the layer map. Error will be returned only when the update function fails.
async fn update<T>(&self, f: impl FnOnce(LayerMap) -> Result<(LayerMap, T)>) -> Result<T> {
let _guard = self.state_lock.lock().await;
let snapshot = self.layer_map.load_full();
let (new_layer_map, data) = f(LayerMap::clone(&*snapshot))?;
let old_layer_map = self.layer_map.swap(Arc::new(new_layer_map));
debug_assert_eq!(
Arc::as_ptr(&snapshot) as usize,
Arc::as_ptr(&old_layer_map) as usize,
"race detected when modifying layer map, please check if `initialize_update` is used on the `update` code path."
);
Ok(data)
}
}
impl LayerSnapshot {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.layer_fmgr.get_from_desc(desc)
}
/// Get an immutable reference to the layer map.
///
/// We expect users only to be able to get an immutable layer map. If users want to make modifications,
/// they should use the below semantic APIs. This design makes us step closer to immutable storage state.
pub fn layer_map(&self) -> &LayerMap {
/// If a user needs to modify the layer map, they should get a write guard and use the semantic
/// functions.
fn layer_map(&self) -> &LayerMap {
&self.layer_map
}
/// Get a mutable reference to the layer map. This function will be removed once `flush_frozen_layer`
/// gets a refactor.
pub fn layer_map_mut(&mut self) -> &mut LayerMap {
&mut self.layer_map
fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
self.layer_fmgr.contains(layer)
}
}
impl LayerManagerReadGuard {
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.snapshot.get_from_desc(desc)
}
/// Get an immutable reference to the layer map.
pub fn layer_map(&self) -> &LayerMap {
self.snapshot.layer_map()
}
}
impl LayerManagerWriteGuard {
/// Check if the layer file manager contains a layer. This should ONLY be used in the compaction
/// code path where we need to check if a layer already exists on disk. With the immutable layer
/// map design, it is possible that the layer map snapshot does not contain the layer, but the layer file
/// manager does. Therefore, use this function with caution.
pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
self.snapshot.contains(layer)
}
pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc<dyn PersistentLayer> {
self.snapshot.get_from_desc(desc)
}
/// Get an immutable reference to the layer map.
pub fn layer_map(&self) -> &LayerMap {
self.snapshot.layer_map()
}
/// Replace layers in the layer file manager, used in evictions and layer downloads.
pub fn replace_and_verify(
pub(crate) fn replace_and_verify(
&mut self,
expected: Arc<dyn PersistentLayer>,
new: Arc<dyn PersistentLayer>,
) -> Result<()> {
self.layer_fmgr.replace_and_verify(expected, new)
self.snapshot.layer_fmgr.replace_and_verify(expected, new)
}
/// Called from `load_layer_map`. Initialize the layer manager with:
/// 1. all on-disk layers
/// 2. next open layer (with disk disk_consistent_lsn LSN)
pub fn initialize_local_layers(
pub(crate) fn initialize_local_layers(
&mut self,
on_disk_layers: Vec<Arc<dyn PersistentLayer>>,
next_open_layer_at: Lsn,
) {
let mut updates = self.layer_map.batch_update();
for layer in on_disk_layers {
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
self.layer_manager
.initialize_update(|mut layer_map| {
let mut updates = layer_map.batch_update();
for layer in on_disk_layers {
Self::insert_historic_layer(layer, &mut updates, &self.snapshot.layer_fmgr);
}
updates.flush();
layer_map.next_open_layer_at = Some(next_open_layer_at);
Ok(layer_map)
})
.unwrap();
}
/// Initialize when creating a new timeline, called in `init_empty_layer_map`.
pub fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
self.layer_map.next_open_layer_at = Some(next_open_layer_at);
pub(crate) fn initialize_empty(&mut self, next_open_layer_at: Lsn) {
self.layer_manager
.initialize_update(|mut layer_map| {
layer_map.next_open_layer_at = Some(next_open_layer_at);
Ok(layer_map)
})
.unwrap();
}
pub fn initialize_remote_layers(
pub(crate) fn initialize_remote_layers(
&mut self,
corrupted_local_layers: Vec<Arc<dyn PersistentLayer>>,
remote_layers: Vec<Arc<RemoteLayer>>,
) {
let mut updates = self.layer_map.batch_update();
for layer in corrupted_local_layers {
Self::remove_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
}
for layer in remote_layers {
Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr);
}
updates.flush();
self.layer_manager
.initialize_update(|mut layer_map| {
let mut updates = layer_map.batch_update();
for layer in corrupted_local_layers {
Self::remove_historic_layer(layer, &mut updates, &self.snapshot.layer_fmgr);
}
for layer in remote_layers {
Self::insert_historic_layer(layer, &mut updates, &self.snapshot.layer_fmgr);
}
updates.flush();
Ok(layer_map)
})
.unwrap();
}
/// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer,
/// called within `get_layer_for_write`.
pub fn get_layer_for_write(
/// called within `get_layer_for_write`. Only ONE thread can call this function, which is guaranteed by `write_lock`
/// in `Timeline`.
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
last_record_lsn: Lsn,
@@ -127,7 +264,7 @@ impl LayerManager {
);
// Do we have a layer open for writing already?
let layer = if let Some(open_layer) = &self.layer_map.open_layer {
let layer = if let Some(open_layer) = &self.snapshot.layer_map.open_layer {
if open_layer.get_lsn_range().start > lsn {
bail!(
"unexpected open layer in the future: open layers starts at {}, write lsn {}",
@@ -138,135 +275,188 @@ impl LayerManager {
Arc::clone(open_layer)
} else {
// No writeable layer yet. Create one.
let start_lsn = self
.layer_map
.next_open_layer_at
.context("No next open layer found")?;
self.layer_manager
.update(|mut layer_map| {
// No writeable layer yet. Create one.
let start_lsn = layer_map
.next_open_layer_at
.context("No next open layer found")?;
trace!(
"creating in-memory layer at {}/{} for record at {}",
timeline_id,
start_lsn,
lsn
);
trace!(
"creating in-memory layer at {}/{} for record at {}",
timeline_id,
start_lsn,
lsn
);
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
let layer = Arc::new(new_layer);
let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());
self.layer_map.next_open_layer_at = None;
layer_map.open_layer = Some(layer.clone());
layer_map.next_open_layer_at = None;
layer
Ok((layer_map, layer))
})
.await?
};
Ok(layer)
}
/// Called from `freeze_inmem_layer`, returns true if successfully frozen.
pub fn try_freeze_in_memory_layer(
pub(crate) async fn try_freeze_in_memory_layer(
&mut self,
Lsn(last_record_lsn): Lsn,
last_freeze_at: &AtomicLsn,
) {
let end_lsn = Lsn(last_record_lsn + 1);
if let Some(open_layer) = &self.layer_map.open_layer {
if let Some(open_layer) = &self.snapshot.layer_map.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
open_layer.freeze(end_lsn);
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
self.layer_map.frozen_layers.push_back(open_layer_rc);
self.layer_map.open_layer = None;
self.layer_map.next_open_layer_at = Some(end_lsn);
self.layer_manager
.update(|mut layer_map| {
layer_map.frozen_layers.push_back(open_layer_rc);
layer_map.open_layer = None;
layer_map.next_open_layer_at = Some(end_lsn);
Ok((layer_map, ()))
})
.await
.unwrap();
last_freeze_at.store(end_lsn);
}
}
/// Add image layers to the layer map, called from `create_image_layers`.
pub fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
let mut updates = self.layer_map.batch_update();
for layer in image_layers {
Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr);
}
updates.flush();
pub(crate) async fn track_new_image_layers(&mut self, image_layers: Vec<ImageLayer>) {
self.layer_manager
.update(|mut layer_map| {
let mut updates: BatchedUpdates<'_> = layer_map.batch_update();
for layer in image_layers {
Self::insert_historic_layer(
Arc::new(layer),
&mut updates,
&self.snapshot.layer_fmgr,
);
}
updates.flush();
Ok((layer_map, ()))
})
.await
.unwrap();
}
/// Flush a frozen layer and add the written delta layer to the layer map.
pub fn finish_flush_l0_layer(
pub(crate) async fn finish_flush_l0_layer(
&mut self,
delta_layer: Option<DeltaLayer>,
frozen_layer_for_check: &Arc<InMemoryLayer>,
) {
let l = self.layer_map.frozen_layers.pop_front();
let mut updates = self.layer_map.batch_update();
self.layer_manager
.update(|mut layer_map| {
let l = layer_map.frozen_layers.pop_front();
let mut updates = layer_map.batch_update();
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(compare_arced_layers(&l.unwrap(), frozen_layer_for_check));
if let Some(delta_layer) = delta_layer {
Self::insert_historic_layer(Arc::new(delta_layer), &mut updates, &mut self.layer_fmgr);
}
updates.flush();
if let Some(delta_layer) = delta_layer {
Self::insert_historic_layer(
Arc::new(delta_layer),
&mut updates,
&self.snapshot.layer_fmgr,
);
}
updates.flush();
Ok((layer_map, ()))
})
.await
.unwrap();
}
/// Called when compaction is completed.
pub fn finish_compact_l0(
pub(crate) async fn finish_compact_l0(
&mut self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Arc<dyn PersistentLayer>>,
compact_to: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
) -> Result<()> {
let mut updates = self.layer_map.batch_update();
for l in compact_to {
Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr);
}
for l in compact_from {
// NB: the layer file identified by descriptor `l` is guaranteed to be present
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
// time, even though we dropped `Timeline::layers` inbetween.
Self::delete_historic_layer(
layer_removal_cs.clone(),
l,
&mut updates,
metrics,
&mut self.layer_fmgr,
)?;
}
updates.flush();
Ok(())
self.layer_manager
.update(|mut layer_map| {
let mut updates = layer_map.batch_update();
for l in compact_to {
Self::insert_historic_layer(l, &mut updates, &self.snapshot.layer_fmgr);
}
for l in compact_from {
// NB: the layer file identified by descriptor `l` is guaranteed to be present
// in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire
// time, even though we dropped `Timeline::layers` inbetween.
if let Err(e) = Self::delete_historic_layer(
layer_removal_cs.clone(),
l,
&mut updates,
metrics,
&self.snapshot.layer_fmgr,
) {
// If this fails, we will need to return the "partially" modified layer map
// now. Eventually, we should decouple file deletion and layer map updates, so
// that this part can be moved out of the `update` section.
updates.flush();
return Ok((layer_map, Err(e)));
}
}
updates.flush();
Ok((layer_map, Ok(())))
})
.await
.unwrap() // unwrap the first level error, which is always Ok.
}
/// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map.
pub fn finish_gc_timeline(
pub(crate) async fn finish_gc_timeline(
&mut self,
layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
gc_layers: Vec<Arc<dyn PersistentLayer>>,
metrics: &TimelineMetrics,
) -> Result<ApplyGcResultGuard> {
let mut updates = self.layer_map.batch_update();
for doomed_layer in gc_layers {
Self::delete_historic_layer(
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
metrics,
&mut self.layer_fmgr,
)?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
}
Ok(ApplyGcResultGuard(updates))
) -> Result<()> {
self.layer_manager
.update(|mut layer_map| {
let mut updates = layer_map.batch_update();
for doomed_layer in gc_layers {
// TODO: decouple deletion and layer map modification
if let Err(e) = Self::delete_historic_layer(
layer_removal_cs.clone(),
doomed_layer,
&mut updates,
metrics,
&self.snapshot.layer_fmgr,
)
// FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch?
{
updates.flush();
return Ok((layer_map, Err(e)));
}
}
updates.flush();
Ok((layer_map, Ok(())))
})
.await
.unwrap() // unwrap first level error
}
/// Helper function to insert a layer into the layer map and file manager.
fn insert_historic_layer(
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager,
mapping: &LayerFileManager,
) {
updates.insert_historic(layer.layer_desc().clone());
mapping.insert(layer);
@@ -276,7 +466,7 @@ impl LayerManager {
fn remove_historic_layer(
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_>,
mapping: &mut LayerFileManager,
mapping: &LayerFileManager,
) {
updates.remove_historic(layer.layer_desc().clone());
mapping.remove(layer);
@@ -290,7 +480,7 @@ impl LayerManager {
layer: Arc<dyn PersistentLayer>,
updates: &mut BatchedUpdates<'_>,
metrics: &TimelineMetrics,
mapping: &mut LayerFileManager,
mapping: &LayerFileManager,
) -> anyhow::Result<()> {
if !layer.is_remote_layer() {
layer.delete_resident_layer_file()?;
@@ -308,14 +498,14 @@ impl LayerManager {
Ok(())
}
pub(crate) fn contains(&self, layer: &Arc<dyn PersistentLayer>) -> bool {
self.layer_fmgr.contains(layer)
}
}
/// Manages the layer files in the local / remote file system. This is a wrapper around `DashMap`.
///
/// Developer notes: dashmap will deadlock in some cases. Please ensure only one reference to the element
/// in the dashmap is held in each of the functions.
pub struct LayerFileManager<T: AsLayerDesc + ?Sized = dyn PersistentLayer>(
HashMap<PersistentLayerKey, Arc<T>>,
DashMap<PersistentLayerKey, Arc<T>>,
);
impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
@@ -329,22 +519,22 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
.clone()
}
pub(crate) fn insert(&mut self, layer: Arc<T>) {
fn insert(&self, layer: Arc<T>) {
let present = self.0.insert(layer.layer_desc().key(), layer.clone());
if present.is_some() && cfg!(debug_assertions) {
panic!("overwriting a layer: {:?}", layer.layer_desc())
}
}
pub(crate) fn contains(&self, layer: &Arc<T>) -> bool {
fn contains(&self, layer: &Arc<T>) -> bool {
self.0.contains_key(&layer.layer_desc().key())
}
pub(crate) fn new() -> Self {
Self(HashMap::new())
fn new() -> Self {
Self(DashMap::new())
}
pub(crate) fn remove(&mut self, layer: Arc<T>) {
fn remove(&self, layer: Arc<T>) {
let present = self.0.remove(&layer.layer_desc().key());
if present.is_none() && cfg!(debug_assertions) {
panic!(
@@ -354,7 +544,7 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
}
}
pub(crate) fn replace_and_verify(&mut self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
fn replace_and_verify(&self, expected: Arc<T>, new: Arc<T>) -> Result<()> {
let key = expected.layer_desc().key();
let other = new.layer_desc().key();
@@ -375,12 +565,12 @@ impl<T: AsLayerDesc + ?Sized> LayerFileManager<T> {
"one layer is l0 while the other is not: {expected_l0} != {new_l0}"
);
if let Some(layer) = self.0.get_mut(&key) {
if let Some(mut layer) = self.0.get_mut(&key) {
anyhow::ensure!(
compare_arced_layers(&expected, layer),
compare_arced_layers(&expected, &*layer),
"another layer was found instead of expected, expected={expected:?}, new={new:?}",
expected = Arc::as_ptr(&expected),
new = Arc::as_ptr(layer),
new = Arc::as_ptr(&*layer),
);
*layer = new;
Ok(())