diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 4c504527a0..be88fd8676 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -429,7 +429,7 @@ impl Tenant { .layers .read() .await - .0 + .layer_map() .iter_historic_layers() .next() .is_some(), diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 13fa7ccc7b..9dd3212a5b 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -659,7 +659,7 @@ mod tests { use crate::tenant::{ storage_layer::{AsLayerDesc, PersistentLayerDesc}, - timeline::LayerFileManager, + timeline::layer_manager::LayerFileManager, }; use super::*; diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 72b2bfb1de..7996c00215 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -41,7 +41,7 @@ pub use inmemory_layer::InMemoryLayer; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; -use super::layer_map::BatchedUpdates; +use super::timeline::layer_manager::LayerManager; pub fn range_overlaps(a: &Range, b: &Range) -> bool where @@ -170,7 +170,7 @@ impl LayerAccessStats { /// /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &BatchedUpdates<'_>, + layer_map_lock_held_witness: &LayerManager, status: LayerResidenceStatus, ) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); @@ -189,7 +189,7 @@ impl LayerAccessStats { /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_>, + layer_map_lock_held_witness: &LayerManager, new_status: LayerResidenceStatus, ) -> LayerAccessStats { let clone = { @@ -221,7 +221,7 @@ impl LayerAccessStats { /// pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &BatchedUpdates<'_>, + _layer_map_lock_held_witness: &LayerManager, status: LayerResidenceStatus, reason: LayerResidenceEventReason, ) { diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 54954fdb80..4ab11a6c3e 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -4,9 +4,9 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; -use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; +use crate::tenant::timeline::layer_manager::LayerManager; use anyhow::{bail, Result}; use pageserver_api::models::HistoricLayerInfo; use std::ops::Range; @@ -224,7 +224,7 @@ impl RemoteLayer { /// Create a Layer struct representing this layer, after it has been downloaded. pub fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_>, + layer_map_lock_held_witness: &LayerManager, conf: &'static PageServerConf, file_size: u64, ) -> Arc { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d589f10570..3db1347c37 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1,6 +1,5 @@ -//! - mod eviction_task; +pub mod layer_manager; mod logical_size; pub mod span; pub mod uninit; @@ -82,16 +81,15 @@ use crate::{is_temporary, task_mgr}; pub(super) use self::eviction_task::EvictionTaskTenantState; use self::eviction_task::EvictionTaskTimelineState; +use self::layer_manager::LayerManager; use self::logical_size::LogicalSize; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; -use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::storage_layer::{ AsLayerDesc, DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, - PersistentLayerKey, }; #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -125,78 +123,6 @@ impl PartialOrd for Hole { } } -pub struct LayerFileManager( - HashMap>, -); - -impl LayerFileManager { - fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { - // The assumption for the `expect()` is that all code maintains the following invariant: - // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. - self.0 - .get(&desc.key()) - .with_context(|| format!("get layer from desc: {}", desc.filename())) - .expect("not found") - .clone() - } - - pub(crate) fn insert(&mut self, layer: Arc) { - let present = self.0.insert(layer.layer_desc().key(), layer.clone()); - if present.is_some() && cfg!(debug_assertions) { - panic!("overwriting a layer: {:?}", layer.layer_desc()) - } - } - - pub(crate) fn new() -> Self { - Self(HashMap::new()) - } - - pub(crate) fn remove(&mut self, layer: Arc) { - let present = self.0.remove(&layer.layer_desc().key()); - if present.is_none() && cfg!(debug_assertions) { - panic!( - "removing layer that is not present in layer mapping: {:?}", - layer.layer_desc() - ) - } - } - - pub(crate) fn replace_and_verify(&mut self, expected: Arc, new: Arc) -> Result<()> { - let key = expected.layer_desc().key(); - let other = new.layer_desc().key(); - - let expected_l0 = LayerMap::is_l0(expected.layer_desc()); - let new_l0 = LayerMap::is_l0(new.layer_desc()); - - fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!( - "layermap-replace-notfound" - )); - - anyhow::ensure!( - key == other, - "expected and new layer have different keys: {key:?} != {other:?}" - ); - - anyhow::ensure!( - expected_l0 == new_l0, - "one layer is l0 while the other is not: {expected_l0} != {new_l0}" - ); - - if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) { - anyhow::ensure!( - compare_arced_layers(&expected, layer), - "another layer was found instead of expected, expected={expected:?}, new={new:?}", - expected = Arc::as_ptr(&expected), - new = Arc::as_ptr(layer), - ); - *layer = new; - Ok(()) - } else { - anyhow::bail!("layer was not found"); - } - } -} - /// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. /// Can be removed after all refactors are done. fn drop_rlock(rlock: tokio::sync::OwnedRwLockReadGuard) { @@ -236,7 +162,7 @@ pub struct Timeline { /// /// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`, /// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`. - pub(crate) layers: Arc>, + pub(crate) layers: Arc>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -587,7 +513,7 @@ impl Timeline { /// Hence, the result **does not represent local filesystem usage**. pub async fn layer_size_sum(&self) -> u64 { let guard = self.layers.read().await; - let (layer_map, _) = &*guard; + let layer_map = guard.layer_map(); let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -898,7 +824,7 @@ impl Timeline { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { let guard = self.layers.read().await; - let (layers, _) = &*guard; + let layers = guard.layer_map(); let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1030,7 +956,7 @@ impl Timeline { pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { let guard = self.layers.read().await; - let (layer_map, mapping) = &*guard; + let layer_map = guard.layer_map(); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1041,7 +967,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { - let historic_layer = mapping.get_from_desc(&historic_layer); + let historic_layer = guard.get_from_desc(&historic_layer); historic_layers.push(historic_layer.info(reset)); } @@ -1152,27 +1078,18 @@ impl Timeline { // start the batch update let mut guard = self.layers.write().await; - let (layer_map, mapping) = &mut *guard; - let mut batch_updates = layer_map.batch_update(); - let mut results = Vec::with_capacity(layers_to_evict.len()); for l in layers_to_evict.iter() { let res = if cancel.is_cancelled() { None } else { - Some(self.evict_layer_batch_impl( - &layer_removal_guard, - l, - &mut batch_updates, - mapping, - )) + Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut guard)) }; results.push(res); } // commit the updates & release locks - batch_updates.flush(); drop_wlock(guard); drop(layer_removal_guard); @@ -1184,8 +1101,7 @@ impl Timeline { &self, _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, local_layer: &Arc, - batch_updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, + layer_mgr: &mut LayerManager, ) -> anyhow::Result { if local_layer.is_remote_layer() { // TODO(issue #3851): consider returning an err here instead of false, @@ -1221,7 +1137,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), + .clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, @@ -1230,13 +1146,13 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), + .clone_for_residence_change(layer_mgr, LayerResidenceStatus::Evicted), ), }); assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); - let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) { + let succeed = match layer_mgr.replace_and_verify(local_layer.clone(), new_remote_layer) { Ok(()) => { if let Err(e) = local_layer.delete_resident_layer_file() { error!("failed to remove layer file on evict after replacement: {e:#?}"); @@ -1407,10 +1323,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: Arc::new(tokio::sync::RwLock::new(( - LayerMap::default(), - LayerFileManager::new(), - ))), + layers: Arc::new(tokio::sync::RwLock::new(LayerManager::create())), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1595,7 +1508,7 @@ impl Timeline { let mut layers = self.layers.try_write().expect( "in the context where we call this function, no other task has access to the object", ); - layers.0.next_open_layer_at = Some(Lsn(start_lsn.0)); + layers.initialize_empty(Lsn(start_lsn.0)); } /// @@ -1603,8 +1516,6 @@ impl Timeline { /// pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; - let mut updates = layers.batch_update(); let mut num_layers = 0; let timer = self.metrics.load_layer_map_histo.start_timer(); @@ -1615,6 +1526,8 @@ impl Timeline { // total size of layer files in the current timeline directory let mut total_physical_size = 0; + let mut loaded_layers = Vec::>::new(); + for direntry in fs::read_dir(timeline_path)? { let direntry = direntry?; let direntry_path = direntry.path(); @@ -1641,12 +1554,12 @@ impl Timeline { self.tenant_id, &imgfilename, file_size, - LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - self.insert_historic_layer(Arc::new(layer), &mut updates, mapping); + loaded_layers.push(Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1673,12 +1586,12 @@ impl Timeline { self.tenant_id, &deltafilename, file_size, - LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident), ); trace!("found layer {}", layer.path().display()); total_physical_size += file_size; - self.insert_historic_layer(Arc::new(layer), &mut updates, mapping); + loaded_layers.push(Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1704,8 +1617,7 @@ impl Timeline { } } - updates.flush(); - layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1); + guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1); info!( "loaded layer map with {} layers at {}, total physical size: {}", @@ -1733,8 +1645,9 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. let mut guard = self.layers.write().await; - let (layer_map, mapping) = &mut *guard; - let mut updates = layer_map.batch_update(); + + let mut corrupted_local_layers = Vec::new(); + let mut added_remote_layers = Vec::new(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1778,7 +1691,7 @@ impl Timeline { anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); } else { self.metrics.resident_physical_size_gauge.sub(local_size); - self.remove_historic_layer(local_layer, &mut updates, mapping); + corrupted_local_layers.push(local_layer); // fall-through to adding the remote layer } } else { @@ -1810,14 +1723,10 @@ impl Timeline { self.timeline_id, imgfilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer( - &updates, - LayerResidenceStatus::Evicted, - ), + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); - - self.insert_historic_layer(remote_layer, &mut updates, mapping); + added_remote_layers.push(remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1838,18 +1747,14 @@ impl Timeline { self.timeline_id, deltafilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer( - &updates, - LayerResidenceStatus::Evicted, - ), + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted), ); let remote_layer = Arc::new(remote_layer); - self.insert_historic_layer(remote_layer, &mut updates, mapping); + added_remote_layers.push(remote_layer); } } } - - updates.flush(); + guard.initialize_remote_layers(corrupted_local_layers, added_remote_layers); Ok(local_only_layers) } @@ -1885,10 +1790,10 @@ impl Timeline { let local_layers = { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let layers = guard.layer_map(); layers .iter_historic_layers() - .map(|l| (l.filename(), mapping.get_from_desc(&l))) + .map(|l| (l.filename(), guard.get_from_desc(&l))) .collect::>() }; @@ -2262,70 +2167,15 @@ impl Timeline { async fn find_layer(&self, layer_file_name: &str) -> Option> { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; - for historic_layer in layers.iter_historic_layers() { + for historic_layer in guard.layer_map().iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { - return Some(mapping.get_from_desc(&historic_layer)); + return Some(guard.get_from_desc(&historic_layer)); } } None } - - /// Helper function to insert a layer from both layer map and layer file manager. Will be removed in the future - /// after we introduce `LayerMapManager`. - fn insert_historic_layer( - &self, - layer: Arc, - updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, - ) { - updates.insert_historic(layer.layer_desc().clone()); - mapping.insert(layer); - } - - /// Helper function to remove a layer from both layer map and layer file manager. Will be removed in the future - /// after we introduce `LayerMapManager`. - fn remove_historic_layer( - &self, - layer: Arc, - updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, - ) { - updates.remove_historic(layer.layer_desc().clone()); - mapping.remove(layer); - } - - /// Removes the layer from local FS (if present) and from memory. - /// Remote storage is not affected by this operation. - fn delete_historic_layer( - &self, - // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: Arc>, - layer: Arc, - updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerFileManager, - ) -> anyhow::Result<()> { - let layer = mapping.get_from_desc(&layer); - if !layer.is_remote_layer() { - layer.delete_resident_layer_file()?; - let layer_file_size = layer.file_size(); - self.metrics - .resident_physical_size_gauge - .sub(layer_file_size); - } - - // TODO Removing from the bottom of the layer map is expensive. - // Maybe instead discard all layer map historic versions that - // won't be needed for page reconstruction for this timeline, - // and mark what we can't delete yet as deleted from the layer - // map index without actually rebuilding the index. - updates.remove_historic(layer.layer_desc().clone()); - mapping.remove(layer); - - Ok(()) - } } type TraversalId = String; @@ -2500,7 +2350,7 @@ impl Timeline { 'layer_map_search: loop { let remote_layer = { let guard = timeline.layers.read().await; - let (layers, mapping) = &*guard; + let layers = guard.layer_map(); // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2562,7 +2412,7 @@ impl Timeline { } if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { - let layer = mapping.get_from_desc(&layer); + let layer = guard.get_from_desc(&layer); // If it's a remote layer, download it and retry. if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) @@ -2685,52 +2535,13 @@ impl Timeline { /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - - ensure!(lsn.is_aligned()); - - let last_record_lsn = self.get_last_record_lsn(); - ensure!( - lsn > last_record_lsn, - "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}", + let layer = guard.get_layer_for_write( lsn, - last_record_lsn, - std::backtrace::Backtrace::force_capture(), - ); - - // Do we have a layer open for writing already? - let layer; - if let Some(open_layer) = &layers.open_layer { - if open_layer.get_lsn_range().start > lsn { - bail!( - "unexpected open layer in the future: open layers starts at {}, write lsn {}", - open_layer.get_lsn_range().start, - lsn - ); - } - - layer = Arc::clone(open_layer); - } else { - // No writeable layer yet. Create one. - let start_lsn = layers - .next_open_layer_at - .context("No next open layer found")?; - - trace!( - "creating layer for write at {}/{} for record at {}", - self.timeline_id, - start_lsn, - lsn - ); - let new_layer = - InMemoryLayer::create(self.conf, self.timeline_id, self.tenant_id, start_lsn)?; - let layer_rc = Arc::new(new_layer); - - layers.open_layer = Some(Arc::clone(&layer_rc)); - layers.next_open_layer_at = None; - - layer = layer_rc; - } + self.get_last_record_lsn(), + self.conf, + self.timeline_id, + self.tenant_id, + )?; Ok(layer) } @@ -2763,21 +2574,7 @@ impl Timeline { Some(self.write_lock.lock().await) }; let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - if let Some(open_layer) = &layers.open_layer { - let open_layer_rc = Arc::clone(open_layer); - // Does this layer need freezing? - let end_lsn = Lsn(self.get_last_record_lsn().0 + 1); - open_layer.freeze(end_lsn); - - // The layer is no longer open, update the layer map to reflect this. - // We will replace it with on-disk historics below. - layers.frozen_layers.push_back(open_layer_rc); - layers.open_layer = None; - layers.next_open_layer_at = Some(end_lsn); - self.last_freeze_at.store(end_lsn); - } - drop_wlock(guard); + guard.try_freeze_in_memory_layer(self.get_last_record_lsn(), &self.last_freeze_at); } /// Layer flusher task's main loop. @@ -2802,8 +2599,7 @@ impl Timeline { let result = loop { let layer_to_flush = { let guard = self.layers.read().await; - let (layers, _) = &*guard; - layers.frozen_layers.front().cloned() + guard.layer_map().frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; let Some(layer_to_flush) = layer_to_flush else { break Ok(()) }; @@ -2921,12 +2717,11 @@ impl Timeline { pausable_failpoint!("flush-frozen-before-sync"); // The new on-disk layers are now in the layer map. We can remove the - // in-memory layer from the map now. We do not modify `LayerFileManager` because - // it only contains persistent layers. The flushed layer is stored in + // in-memory layer from the map now. The flushed layer is stored in // the mapping in `create_delta_layer`. { - let mut layers = self.layers.write().await; - let l = layers.0.frozen_layers.pop_front(); + let mut guard = self.layers.write().await; + let l = guard.layer_map_mut().frozen_layers.pop_front(); // Only one thread may call this function at a time (for this // timeline). If two threads tried to flush the same frozen @@ -3065,15 +2860,12 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; - let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( - &batch_updates, + &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - self.insert_historic_layer(l, &mut batch_updates, mapping); - batch_updates.flush(); + guard.track_new_l0_delta_layer(l); // update metrics self.metrics.resident_physical_size_gauge.add(sz); @@ -3122,7 +2914,7 @@ impl Timeline { let threshold = self.get_image_creation_threshold(); let guard = self.layers.read().await; - let (layers, _) = &*guard; + let layers = guard.layer_map(); let mut max_deltas = 0; { @@ -3301,11 +3093,9 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; - let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.tenant_id, &self.timeline_id); - for l in image_layers { + for l in &image_layers { let path = l.filename(); let metadata = timeline_path .join(path.file_name()) @@ -3319,13 +3109,12 @@ impl Timeline { .add(metadata.len()); let l = Arc::new(l); l.access_stats().record_residence_event( - &updates, + &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - self.insert_historic_layer(l, &mut updates, mapping); } - updates.flush(); + guard.track_new_image_layers(image_layers); drop_wlock(guard); timer.stop_and_record(); @@ -3487,18 +3276,18 @@ impl Timeline { fn compact_level0_phase1( self: Arc, _layer_removal_cs: Arc>, - guard: tokio::sync::OwnedRwLockReadGuard<(LayerMap, LayerFileManager)>, + guard: tokio::sync::OwnedRwLockReadGuard, mut stats: CompactLevel0Phase1StatsBuilder, target_file_size: u64, ctx: &RequestContext, ) -> Result { stats.read_lock_held_spawn_blocking_startup_micros = stats.read_lock_acquisition_micros.till_now(); // set by caller - let (layers, mapping) = &*guard; + let layers = guard.layer_map(); let level0_deltas = layers.get_level0_deltas()?; let mut level0_deltas = level0_deltas .into_iter() - .map(|x| mapping.get_from_desc(&x)) + .map(|x| guard.get_from_desc(&x)) .collect_vec(); stats.level0_deltas_count = Some(level0_deltas.len()); // Only compact if enough layers have accumulated. @@ -3914,9 +3703,11 @@ impl Timeline { } let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; - let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + + let mut insert_layers = Vec::new(); + let mut remove_layers = Vec::new(); + for l in new_layers { let new_delta_path = l.path(); @@ -3942,11 +3733,11 @@ impl Timeline { new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); let x: Arc = Arc::new(l); x.access_stats().record_residence_event( - &updates, + &guard, LayerResidenceStatus::Resident, LayerResidenceEventReason::LayerCreate, ); - self.insert_historic_layer(x, &mut updates, mapping); + insert_layers.push(x); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -3954,12 +3745,16 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - // NB: the layer file identified by descriptor `l` is guaranteed to be present - // in the LayerFileManager because we kept holding `layer_removal_cs` the entire - // time, even though we dropped `Timeline::layers` inbetween. - self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?; + remove_layers.push(guard.get_from_desc(&l)); } - updates.flush(); + + guard.finish_compact_l0( + layer_removal_cs, + remove_layers, + insert_layers, + &self.metrics, + )?; + drop_wlock(guard); // Also schedule the deletions in remote storage @@ -4178,7 +3973,7 @@ impl Timeline { // // TODO holding a write lock is too agressive and avoidable let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; + let layers = guard.layer_map(); 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4274,7 +4069,6 @@ impl Timeline { .unwrap() .replace((new_gc_cutoff, wanted_image_layers.to_keyspace())); - let mut updates = layers.batch_update(); if !layers_to_remove.is_empty() { // Persist the new GC cutoff value in the metadata file, before // we actually remove anything. @@ -4284,18 +4078,15 @@ impl Timeline { // (couldn't do this in the loop above, because you cannot modify a collection // while iterating it. BTreeMap::retain() would be another option) let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len()); - { - for doomed_layer in layers_to_remove { - layer_names_to_delete.push(doomed_layer.filename()); - self.delete_historic_layer( - layer_removal_cs.clone(), - doomed_layer, - &mut updates, - mapping, - )?; // FIXME: schedule succeeded deletions before returning? - result.layers_removed += 1; - } + let gc_layers = layers_to_remove + .iter() + .map(|x| guard.get_from_desc(x)) + .collect(); + for doomed_layer in layers_to_remove { + layer_names_to_delete.push(doomed_layer.filename()); + result.layers_removed += 1; } + let apply = guard.finish_gc_timeline(layer_removal_cs, gc_layers, &self.metrics)?; if result.layers_removed != 0 { fail_point!("after-timeline-gc-removed-layers"); @@ -4304,8 +4095,9 @@ impl Timeline { if let Some(remote_client) = &self.remote_client { remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; } + + apply.flush(); } - updates.flush(); info!( "GC completed removing {} layers, cutoff {}", @@ -4477,13 +4269,11 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. let mut guard = self_clone.layers.write().await; - let (layers, mapping) = &mut *guard; - let updates = layers.batch_update(); let new_layer = - remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); + remote_layer.create_downloaded_layer(&guard, self_clone.conf, *size); { let l: Arc = remote_layer.clone(); - let failure = match mapping.replace_and_verify(l, new_layer) { + let failure = match guard.replace_and_verify(l, new_layer) { Ok(()) => false, Err(e) => { // this is a precondition failure, the layer filename derived @@ -4511,7 +4301,6 @@ impl Timeline { .store(true, Relaxed); } } - updates.flush(); drop_wlock(guard); info!("on-demand download successful"); @@ -4612,10 +4401,10 @@ impl Timeline { let mut downloads = Vec::new(); { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let layers = guard.layer_map(); layers .iter_historic_layers() - .map(|l| mapping.get_from_desc(&l)) + .map(|l| guard.get_from_desc(&l)) .filter_map(|l| l.downcast_remote_layer()) .map(|l| self.download_remote_layer(l)) .for_each(|dl| downloads.push(dl)) @@ -4717,7 +4506,7 @@ impl LocalLayerInfoForDiskUsageEviction { impl Timeline { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let layers = guard.layer_map(); let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4726,7 +4515,7 @@ impl Timeline { let file_size = l.file_size(); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); - let l = mapping.get_from_desc(&l); + let l = guard.get_from_desc(&l); if l.is_remote_layer() { continue; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index cd6a2d10cc..80146419df 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -198,10 +198,10 @@ impl Timeline { // So, we just need to deal with this. let candidates: Vec> = { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let layers = guard.layer_map(); let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { - let hist_layer = mapping.get_from_desc(&hist_layer); + let hist_layer = guard.get_from_desc(&hist_layer); if hist_layer.is_remote_layer() { continue; } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs new file mode 100644 index 0000000000..979155f8d2 --- /dev/null +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -0,0 +1,370 @@ +use anyhow::{bail, ensure, Context, Result}; +use std::{collections::HashMap, sync::Arc}; +use tracing::trace; +use utils::{ + id::{TenantId, TimelineId}, + lsn::{AtomicLsn, Lsn}, +}; + +use crate::{ + config::PageServerConf, + metrics::TimelineMetrics, + tenant::{ + layer_map::{BatchedUpdates, LayerMap}, + storage_layer::{ + AsLayerDesc, DeltaLayer, ImageLayer, InMemoryLayer, Layer, PersistentLayer, + PersistentLayerDesc, PersistentLayerKey, RemoteLayer, + }, + timeline::compare_arced_layers, + }, +}; + +/// Provides semantic APIs to manipulate the layer map. +pub struct LayerManager { + layer_map: LayerMap, + layer_fmgr: LayerFileManager, +} + +/// After GC, the layer map changes will not be applied immediately. Users should manually apply the changes after +/// scheduling deletes in remote client. +pub struct ApplyGcResultGuard<'a>(BatchedUpdates<'a>); + +impl ApplyGcResultGuard<'_> { + pub fn flush(self) { + self.0.flush(); + } +} + +impl LayerManager { + pub fn create() -> Self { + Self { + layer_map: LayerMap::default(), + layer_fmgr: LayerFileManager::new(), + } + } + + pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + self.layer_fmgr.get_from_desc(desc) + } + + /// Get an immutable reference to the layer map. + /// + /// We expect users only to be able to get an immutable layer map. If users want to make modifications, + /// they should use the below semantic APIs. This design makes us step closer to immutable storage state. + pub fn layer_map(&self) -> &LayerMap { + &self.layer_map + } + + /// Get a mutable reference to the layer map. This function will be removed once `flush_frozen_layer` + /// gets a refactor. + pub fn layer_map_mut(&mut self) -> &mut LayerMap { + &mut self.layer_map + } + + /// Replace layers in the layer file manager, used in evictions and layer downloads. + pub fn replace_and_verify( + &mut self, + expected: Arc, + new: Arc, + ) -> Result<()> { + self.layer_fmgr.replace_and_verify(expected, new) + } + + /// Called from `load_layer_map`. Initialize the layer manager with: + /// 1. all on-disk layers + /// 2. next open layer (with disk disk_consistent_lsn LSN) + pub fn initialize_local_layers( + &mut self, + on_disk_layers: Vec>, + next_open_layer_at: Lsn, + ) { + let mut updates = self.layer_map.batch_update(); + for layer in on_disk_layers { + Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr); + } + updates.flush(); + self.layer_map.next_open_layer_at = Some(next_open_layer_at); + } + + /// Initialize when creating a new timeline, called in `init_empty_layer_map`. + pub fn initialize_empty(&mut self, next_open_layer_at: Lsn) { + self.layer_map.next_open_layer_at = Some(next_open_layer_at); + } + + pub fn initialize_remote_layers( + &mut self, + corrupted_local_layers: Vec>, + remote_layers: Vec>, + ) { + let mut updates = self.layer_map.batch_update(); + for layer in corrupted_local_layers { + Self::remove_historic_layer(layer, &mut updates, &mut self.layer_fmgr); + } + for layer in remote_layers { + Self::insert_historic_layer(layer, &mut updates, &mut self.layer_fmgr); + } + updates.flush(); + } + + /// Open a new writable layer to append data if there is no open layer, otherwise return the current open layer, + /// called within `get_layer_for_write`. + pub fn get_layer_for_write( + &mut self, + lsn: Lsn, + last_record_lsn: Lsn, + conf: &'static PageServerConf, + timeline_id: TimelineId, + tenant_id: TenantId, + ) -> Result> { + ensure!(lsn.is_aligned()); + + ensure!( + lsn > last_record_lsn, + "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}", + lsn, + last_record_lsn, + std::backtrace::Backtrace::force_capture(), + ); + + // Do we have a layer open for writing already? + let layer = if let Some(open_layer) = &self.layer_map.open_layer { + if open_layer.get_lsn_range().start > lsn { + bail!( + "unexpected open layer in the future: open layers starts at {}, write lsn {}", + open_layer.get_lsn_range().start, + lsn + ); + } + + Arc::clone(open_layer) + } else { + // No writeable layer yet. Create one. + let start_lsn = self + .layer_map + .next_open_layer_at + .context("No next open layer found")?; + + trace!( + "creating in-memory layer at {}/{} for record at {}", + timeline_id, + start_lsn, + lsn + ); + + let new_layer = InMemoryLayer::create(conf, timeline_id, tenant_id, start_lsn)?; + let layer = Arc::new(new_layer); + + self.layer_map.open_layer = Some(layer.clone()); + self.layer_map.next_open_layer_at = None; + + layer + }; + + Ok(layer) + } + + /// Called from `freeze_inmem_layer`, returns true if successfully frozen. + pub fn try_freeze_in_memory_layer( + &mut self, + Lsn(last_record_lsn): Lsn, + last_freeze_at: &AtomicLsn, + ) { + let end_lsn = Lsn(last_record_lsn + 1); + + if let Some(open_layer) = &self.layer_map.open_layer { + let open_layer_rc = Arc::clone(open_layer); + // Does this layer need freezing? + open_layer.freeze(end_lsn); + + // The layer is no longer open, update the layer map to reflect this. + // We will replace it with on-disk historics below. + self.layer_map.frozen_layers.push_back(open_layer_rc); + self.layer_map.open_layer = None; + self.layer_map.next_open_layer_at = Some(end_lsn); + last_freeze_at.store(end_lsn); + } + } + + /// Add image layers to the layer map, called from `create_image_layers`. + pub fn track_new_image_layers(&mut self, image_layers: Vec) { + let mut updates = self.layer_map.batch_update(); + for layer in image_layers { + Self::insert_historic_layer(Arc::new(layer), &mut updates, &mut self.layer_fmgr); + } + updates.flush(); + } + + /// Insert into the layer map when a new delta layer is created, called from `create_delta_layer`. + pub fn track_new_l0_delta_layer(&mut self, delta_layer: Arc) { + let mut updates = self.layer_map.batch_update(); + Self::insert_historic_layer(delta_layer, &mut updates, &mut self.layer_fmgr); + updates.flush(); + } + + /// Called when compaction is completed. + pub fn finish_compact_l0( + &mut self, + layer_removal_cs: Arc>, + compact_from: Vec>, + compact_to: Vec>, + metrics: &TimelineMetrics, + ) -> Result<()> { + let mut updates = self.layer_map.batch_update(); + for l in compact_to { + Self::insert_historic_layer(l, &mut updates, &mut self.layer_fmgr); + } + for l in compact_from { + // NB: the layer file identified by descriptor `l` is guaranteed to be present + // in the LayerFileManager because compaction kept holding `layer_removal_cs` the entire + // time, even though we dropped `Timeline::layers` inbetween. + Self::delete_historic_layer( + layer_removal_cs.clone(), + l, + &mut updates, + metrics, + &mut self.layer_fmgr, + )?; + } + updates.flush(); + Ok(()) + } + + /// Called when garbage collect the timeline. Returns a guard that will apply the updates to the layer map. + pub fn finish_gc_timeline( + &mut self, + layer_removal_cs: Arc>, + gc_layers: Vec>, + metrics: &TimelineMetrics, + ) -> Result { + let mut updates = self.layer_map.batch_update(); + for doomed_layer in gc_layers { + Self::delete_historic_layer( + layer_removal_cs.clone(), + doomed_layer, + &mut updates, + metrics, + &mut self.layer_fmgr, + )?; // FIXME: schedule succeeded deletions in timeline.rs `gc_timeline` instead of in batch? + } + Ok(ApplyGcResultGuard(updates)) + } + + /// Helper function to insert a layer into the layer map and file manager. + fn insert_historic_layer( + layer: Arc, + updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerFileManager, + ) { + updates.insert_historic(layer.layer_desc().clone()); + mapping.insert(layer); + } + + /// Helper function to remove a layer into the layer map and file manager + fn remove_historic_layer( + layer: Arc, + updates: &mut BatchedUpdates<'_>, + mapping: &mut LayerFileManager, + ) { + updates.remove_historic(layer.layer_desc().clone()); + mapping.remove(layer); + } + + /// Removes the layer from local FS (if present) and from memory. + /// Remote storage is not affected by this operation. + fn delete_historic_layer( + // we cannot remove layers otherwise, since gc and compaction will race + _layer_removal_cs: Arc>, + layer: Arc, + updates: &mut BatchedUpdates<'_>, + metrics: &TimelineMetrics, + mapping: &mut LayerFileManager, + ) -> anyhow::Result<()> { + if !layer.is_remote_layer() { + layer.delete_resident_layer_file()?; + let layer_file_size = layer.file_size(); + metrics.resident_physical_size_gauge.sub(layer_file_size); + } + + // TODO Removing from the bottom of the layer map is expensive. + // Maybe instead discard all layer map historic versions that + // won't be needed for page reconstruction for this timeline, + // and mark what we can't delete yet as deleted from the layer + // map index without actually rebuilding the index. + updates.remove_historic(layer.layer_desc().clone()); + mapping.remove(layer); + + Ok(()) + } +} + +pub struct LayerFileManager( + HashMap>, +); + +impl LayerFileManager { + fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + // The assumption for the `expect()` is that all code maintains the following invariant: + // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. + self.0 + .get(&desc.key()) + .with_context(|| format!("get layer from desc: {}", desc.filename())) + .expect("not found") + .clone() + } + + pub(crate) fn insert(&mut self, layer: Arc) { + let present = self.0.insert(layer.layer_desc().key(), layer.clone()); + if present.is_some() && cfg!(debug_assertions) { + panic!("overwriting a layer: {:?}", layer.layer_desc()) + } + } + + pub(crate) fn new() -> Self { + Self(HashMap::new()) + } + + pub(crate) fn remove(&mut self, layer: Arc) { + let present = self.0.remove(&layer.layer_desc().key()); + if present.is_none() && cfg!(debug_assertions) { + panic!( + "removing layer that is not present in layer mapping: {:?}", + layer.layer_desc() + ) + } + } + + pub(crate) fn replace_and_verify(&mut self, expected: Arc, new: Arc) -> Result<()> { + let key = expected.layer_desc().key(); + let other = new.layer_desc().key(); + + let expected_l0 = LayerMap::is_l0(expected.layer_desc()); + let new_l0 = LayerMap::is_l0(new.layer_desc()); + + fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!( + "layermap-replace-notfound" + )); + + anyhow::ensure!( + key == other, + "expected and new layer have different keys: {key:?} != {other:?}" + ); + + anyhow::ensure!( + expected_l0 == new_l0, + "one layer is l0 while the other is not: {expected_l0} != {new_l0}" + ); + + if let Some(layer) = self.0.get_mut(&key) { + anyhow::ensure!( + compare_arced_layers(&expected, layer), + "another layer was found instead of expected, expected={expected:?}, new={new:?}", + expected = Arc::as_ptr(&expected), + new = Arc::as_ptr(layer), + ); + *layer = new; + Ok(()) + } else { + anyhow::bail!("layer was not found"); + } + } +}