From a2056666ae97a307ee2b41956eaf172b326818dd Mon Sep 17 00:00:00 2001 From: Alex Chi Date: Wed, 14 Jun 2023 14:58:50 -0400 Subject: [PATCH] pgserver: move mapping logic to layer cache Signed-off-by: Alex Chi --- pageserver/src/tenant.rs | 3 +- pageserver/src/tenant/layer_cache.rs | 143 ++++++++++++++ pageserver/src/tenant/layer_map.rs | 40 +--- pageserver/src/tenant/timeline.rs | 175 ++++++------------ .../src/tenant/timeline/eviction_task.rs | 4 +- 5 files changed, 204 insertions(+), 161 deletions(-) create mode 100644 pageserver/src/tenant/layer_cache.rs diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8effb1a238..a4c578c592 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -85,6 +85,7 @@ pub mod blob_io; pub mod block_io; pub mod disk_btree; pub(crate) mod ephemeral_file; +pub mod layer_cache; pub mod layer_map; pub mod manifest; @@ -1559,7 +1560,7 @@ impl Tenant { // No timeout here, GC & Compaction should be responsive to the // `TimelineState::Stopping` change. info!("waiting for layer_removal_cs.lock()"); - let layer_removal_guard = timeline.layer_removal_cs.lock().await; + let layer_removal_guard = timeline.lcache.delete_guard().await; info!("got layer_removal_cs.lock(), deleting layer files"); // NB: storage_sync upload tasks that reference these layers have been cancelled diff --git a/pageserver/src/tenant/layer_cache.rs b/pageserver/src/tenant/layer_cache.rs new file mode 100644 index 0000000000..060cfa09d8 --- /dev/null +++ b/pageserver/src/tenant/layer_cache.rs @@ -0,0 +1,143 @@ +use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer}; +use super::Timeline; +use crate::tenant::layer_map::{self, LayerMap}; +use anyhow::Result; +use std::sync::{Mutex, Weak}; +use std::{collections::HashMap, sync::Arc}; + +pub struct LayerCache { + /// Layer removal lock. + /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. + /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], + /// and [`Tenant::delete_timeline`]. This is an `Arc` lock because we need an owned + /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`). + pub layers_removal_lock: Arc>, + + /// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use. + /// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock. + pub layers_operation_lock: Arc>, + + /// Will be useful when we move evict / download to layer cache. + #[allow(unused)] + timeline: Weak, + + mapping: Mutex>>, +} + +pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>); + +pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>); + +#[derive(Clone)] +pub struct DeleteGuard(Arc>); + +impl LayerCache { + pub fn new(timeline: Weak) -> Self { + Self { + layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())), + layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())), + mapping: Mutex::new(HashMap::new()), + timeline, + } + } + + pub fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { + let guard = self.mapping.lock().unwrap(); + guard.get(&desc.key()).expect("not found").clone() + } + + /// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure + /// we won't delete files that are being read. + pub async fn layer_in_use_write(&self) -> LayerInUseWrite { + LayerInUseWrite(self.layers_operation_lock.clone().write_owned().await) + } + + /// This function is to mock the original behavior of `layers` lock in `Timeline`. Can be removed after we ensure + /// we won't delete files that are being read. + pub async fn layer_in_use_read(&self) -> LayerInUseRead { + LayerInUseRead(self.layers_operation_lock.clone().read_owned().await) + } + + /// Ensures only one of compaction / gc can happen at a time. + pub async fn delete_guard(&self) -> DeleteGuard { + DeleteGuard(Arc::new( + self.layers_removal_lock.clone().lock_owned().await, + )) + } + + /// Should only be called when initializing the timeline. Bypass checks and layer operation lock. + pub fn remove_local_when_init(&self, layer: Arc) { + let mut guard = self.mapping.lock().unwrap(); + guard.remove(&layer.layer_desc().key()); + } + + /// Should only be called when initializing the timeline. Bypass checks and layer operation lock. + pub fn populate_remote_when_init(&self, layer: Arc) { + let mut guard = self.mapping.lock().unwrap(); + guard.insert(layer.layer_desc().key(), layer); + } + + /// Should only be called when initializing the timeline. Bypass checks and layer operation lock. + pub fn populate_local_when_init(&self, layer: Arc) { + let mut guard = self.mapping.lock().unwrap(); + guard.insert(layer.layer_desc().key(), layer); + } + + /// Called within read path. + pub fn replace_and_verify( + &self, + expected: Arc, + new: Arc, + ) -> Result<()> { + let mut guard = self.mapping.lock().unwrap(); + + use super::layer_map::LayerKey; + let key = LayerKey::from(&*expected); + let other = LayerKey::from(&*new); + + let expected_l0 = LayerMap::is_l0(expected.layer_desc()); + let new_l0 = LayerMap::is_l0(new.layer_desc()); + + fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!( + "replacing downloaded layer into layermap failed because layer was not found" + )); + + anyhow::ensure!( + key == other, + "replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}" + ); + + anyhow::ensure!( + expected_l0 == new_l0, + "replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}" + ); + + if let Some(layer) = guard.get_mut(&expected.layer_desc().key()) { + anyhow::ensure!( + layer_map::compare_arced_layers(&expected, layer), + "replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}", + expected = Arc::as_ptr(&expected), + new = Arc::as_ptr(layer), + ); + *layer = new; + Ok(()) + } else { + anyhow::bail!( + "replacing downloaded layer into layermap failed because layer was not found" + ); + } + } + + /// Called within write path. When compaction and image layer creation we will create new layers. + pub fn create_new_layer(&self, layer: Arc) { + let mut guard = self.mapping.lock().unwrap(); + guard.insert(layer.layer_desc().key(), layer); + } + + /// Called within write path. When GC and compaction we will remove layers and delete them on disk. + /// Will move logic to delete files here later. + pub fn delete_layer(&self, layer: Arc) { + let mut guard = self.mapping.lock().unwrap(); + guard.remove(&layer.layer_desc().key()); + } +} diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 893e4d52a3..276a005976 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -686,10 +686,7 @@ mod tests { mod l0_delta_layers_updated { - use crate::tenant::{ - storage_layer::{PersistentLayer, PersistentLayerDesc}, - timeline::LayerMapping, - }; + use crate::tenant::storage_layer::{PersistentLayer, PersistentLayerDesc}; use super::*; @@ -722,31 +719,6 @@ mod tests { ) } - #[test] - fn replacing_missing_l0_is_notfound() { - // original impl had an oversight, and L0 was an anyhow::Error. anyhow::Error should - // however only happen for precondition failures. - - let layer = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000053423C21-0000000053424D69"; - let layer = LayerFileName::from_str(layer).unwrap(); - let layer = LayerDescriptor::from(layer); - - // same skeletan construction; see scenario below - let not_found = Arc::new(layer.clone()); - let new_version = Arc::new(layer); - - // after the immutable storage state refactor, the replace operation - // will not use layer map any more. We keep it here for consistency in test cases - // and can remove it in the future. - let _map = LayerMap::default(); - - let mut mapping = LayerMapping::new(); - - mapping - .replace_and_verify(not_found, new_version) - .unwrap_err(); - } - fn l0_delta_layers_updated_scenario(layer_name: &str, expected_l0: bool) { let name = LayerFileName::from_str(layer_name).unwrap(); let skeleton = LayerDescriptor::from(name); @@ -755,7 +727,6 @@ mod tests { let downloaded = Arc::new(skeleton); let mut map = LayerMap::default(); - let mut mapping = LayerMapping::new(); // two disjoint Arcs in different lifecycle phases. even if it seems they must be the // same layer, we use LayerMap::compare_arced_layers as the identity of layers. @@ -765,20 +736,11 @@ mod tests { map.batch_update() .insert_historic(remote.layer_desc().clone()); - mapping.insert(remote.clone()); assert_eq!( count_layer_in(&map, remote.layer_desc()), expected_in_counts ); - mapping - .replace_and_verify(remote, downloaded.clone()) - .expect("name derived attributes are the same"); - assert_eq!( - count_layer_in(&map, downloaded.layer_desc()), - expected_in_counts - ); - map.batch_update() .remove_historic(downloaded.layer_desc().clone()); assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0)); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d1848e0eb9..7ba5d218c1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -79,11 +79,12 @@ use self::eviction_task::EvictionTaskTimelineState; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; +use super::layer_cache::{DeleteGuard, LayerCache}; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::storage_layer::{ - DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, PersistentLayerKey, + DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, }; #[derive(Debug, PartialEq, Eq, Clone, Copy)] @@ -117,65 +118,11 @@ impl PartialOrd for Hole { } } -pub struct LayerMapping(HashMap>); +pub struct LayerMapping(()); impl LayerMapping { - fn get_from_desc(&self, desc: &PersistentLayerDesc) -> Arc { - self.0.get(&desc.key()).expect("not found").clone() - } - - pub(crate) fn insert(&mut self, layer: Arc) { - self.0.insert(layer.layer_desc().key(), layer); - } - pub(crate) fn new() -> Self { - Self(HashMap::new()) - } - - pub(crate) fn remove(&mut self, layer: Arc) { - self.0.remove(&layer.layer_desc().key()); - } - - pub(crate) fn replace_and_verify( - &mut self, - expected: Arc, - new: Arc, - ) -> Result<()> { - use super::layer_map::LayerKey; - let key = LayerKey::from(&*expected); - let other = LayerKey::from(&*new); - - let expected_l0 = LayerMap::is_l0(expected.layer_desc()); - let new_l0 = LayerMap::is_l0(new.layer_desc()); - - fail::fail_point!("layermap-replace-notfound", |_| anyhow::bail!( - "replacing downloaded layer into layermap failed because layer was not found" - )); - - anyhow::ensure!( - key == other, - "replacing downloaded layer into layermap failed because two layers have different keys: {key:?} != {other:?}" - ); - - anyhow::ensure!( - expected_l0 == new_l0, - "replacing downloaded layer into layermap failed because one layer is l0 while the other is not: {expected_l0} != {new_l0}" - ); - - if let Some(layer) = self.0.get_mut(&expected.layer_desc().key()) { - anyhow::ensure!( - layer_map::compare_arced_layers(&expected, layer), - "replacing downloaded layer into layermap failed because another layer was found instead of expected, expected={expected:?}, new={new:?}", - expected = Arc::as_ptr(&expected), - new = Arc::as_ptr(layer), - ); - *layer = new; - Ok(()) - } else { - anyhow::bail!( - "replacing downloaded layer into layermap failed because layer was not found" - ); - } + Self(()) } } @@ -192,7 +139,7 @@ fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { } pub struct Timeline { - conf: &'static PageServerConf, + pub(super) conf: &'static PageServerConf, tenant_conf: Arc>, myself: Weak, @@ -204,6 +151,8 @@ pub struct Timeline { pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>, + pub(super) lcache: LayerCache, + /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. /// It is used by compaction task when it checks if new image layer should be created. @@ -275,13 +224,6 @@ pub struct Timeline { /// to be notified when layer flushing has finished, subscribe to the layer_flush_done channel layer_flush_done_tx: tokio::sync::watch::Sender<(u64, anyhow::Result<()>)>, - /// Layer removal lock. - /// A lock to ensure that no layer of the timeline is removed concurrently by other tasks. - /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], - /// and [`Tenant::delete_timeline`]. This is an `Arc` lock because we need an owned - /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`). - pub(super) layer_removal_cs: Arc>, - // Needed to ensure that we can't create a branch at a point that was already garbage collected pub latest_gc_cutoff_lsn: Rcu, @@ -896,7 +838,7 @@ impl Timeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); + let layer_removal_cs = self.lcache.delete_guard().await; // Is the timeline being deleted? if self.is_stopping() { return Err(anyhow::anyhow!("timeline is Stopping").into()); @@ -1119,7 +1061,7 @@ impl Timeline { pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { let guard = self.layers.read().await; - let (layer_map, mapping) = &*guard; + let (layer_map, _) = &*guard; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1130,7 +1072,7 @@ impl Timeline { let mut historic_layers = Vec::new(); for historic_layer in layer_map.iter_historic_layers() { - let historic_layer = mapping.get_from_desc(&historic_layer); + let historic_layer = self.lcache.get_from_desc(&historic_layer); historic_layers.push(historic_layer.info(reset)); } @@ -1228,7 +1170,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; // now lock out layer removal (compaction, gc, timeline deletion) - let layer_removal_guard = self.layer_removal_cs.lock().await; + let layer_removal_guard = self.lcache.delete_guard().await; { // to avoid racing with detach and delete_timeline @@ -1241,7 +1183,7 @@ impl Timeline { // start the batch update let mut guard = self.layers.write().await; - let (layer_map, mapping) = &mut *guard; + let (layer_map, _) = &mut *guard; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1250,12 +1192,7 @@ impl Timeline { let res = if cancel.is_cancelled() { None } else { - Some(self.evict_layer_batch_impl( - &layer_removal_guard, - l, - &mut batch_updates, - mapping, - )) + Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) }; results.push(res); } @@ -1271,10 +1208,9 @@ impl Timeline { fn evict_layer_batch_impl( &self, - _layer_removal_cs: &tokio::sync::MutexGuard<'_, ()>, + _layer_removal_cs: &DeleteGuard, local_layer: &Arc, batch_updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerMapping, ) -> anyhow::Result { if local_layer.is_remote_layer() { // TODO(issue #3851): consider returning an err here instead of false, @@ -1325,7 +1261,10 @@ impl Timeline { assert_eq!(local_layer.layer_desc(), new_remote_layer.layer_desc()); - let succeed = match mapping.replace_and_verify(local_layer.clone(), new_remote_layer) { + let succeed = match self + .lcache + .replace_and_verify(local_layer.clone(), new_remote_layer) + { Ok(()) => { if let Err(e) = local_layer.delete_resident_layer_file() { error!("failed to remove layer file on evict after replacement: {e:#?}"); @@ -1494,6 +1433,7 @@ impl Timeline { tenant_id, pg_version, layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())), + lcache: LayerCache::new(myself.clone()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1529,7 +1469,6 @@ impl Timeline { layer_flush_done_tx, write_lock: tokio::sync::Mutex::new(()), - layer_removal_cs: Default::default(), gc_info: std::sync::RwLock::new(GcInfo { retain_lsns: Vec::new(), @@ -1685,7 +1624,7 @@ impl Timeline { /// pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; + let (layers, _) = &mut *guard; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1729,7 +1668,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; updates.insert_historic(layer.layer_desc().clone()); - mapping.insert(Arc::new(layer)); + self.lcache.populate_local_when_init(Arc::new(layer)); num_layers += 1; } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. @@ -1762,7 +1701,7 @@ impl Timeline { trace!("found layer {}", layer.path().display()); total_physical_size += file_size; updates.insert_historic(layer.layer_desc().clone()); - mapping.insert(Arc::new(layer)); + self.lcache.populate_local_when_init(Arc::new(layer)); num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these @@ -1817,7 +1756,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. let mut guard = self.layers.write().await; - let (layer_map, mapping) = &mut *guard; + let (layer_map, _) = &mut *guard; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1863,7 +1802,7 @@ impl Timeline { } else { self.metrics.resident_physical_size_gauge.sub(local_size); updates.remove_historic(local_layer.layer_desc().clone()); - mapping.remove(local_layer); + self.lcache.remove_local_when_init(local_layer); // fall-through to adding the remote layer } } else { @@ -1903,7 +1842,7 @@ impl Timeline { let remote_layer = Arc::new(remote_layer); updates.insert_historic(remote_layer.layer_desc().clone()); - mapping.insert(remote_layer); + self.lcache.populate_remote_when_init(remote_layer); } LayerFileName::Delta(deltafilename) => { // Create a RemoteLayer for the delta file. @@ -1931,7 +1870,7 @@ impl Timeline { ); let remote_layer = Arc::new(remote_layer); updates.insert_historic(remote_layer.layer_desc().clone()); - mapping.insert(remote_layer); + self.lcache.populate_remote_when_init(remote_layer); } } } @@ -1972,10 +1911,10 @@ impl Timeline { let local_layers = { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; layers .iter_historic_layers() - .map(|l| (l.filename(), mapping.get_from_desc(&l))) + .map(|l| (l.filename(), self.lcache.get_from_desc(&l))) .collect::>() }; @@ -2349,11 +2288,11 @@ impl Timeline { async fn find_layer(&self, layer_file_name: &str) -> Option> { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; for historic_layer in layers.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { - return Some(mapping.get_from_desc(&historic_layer)); + return Some(self.lcache.get_from_desc(&historic_layer)); } } @@ -2365,12 +2304,11 @@ impl Timeline { fn delete_historic_layer( &self, // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: Arc>, + _layer_removal_cs: DeleteGuard, layer: Arc, updates: &mut BatchedUpdates<'_>, - mapping: &mut LayerMapping, ) -> anyhow::Result<()> { - let layer = mapping.get_from_desc(&layer); + let layer = self.lcache.get_from_desc(&layer); if !layer.is_remote_layer() { layer.delete_resident_layer_file()?; let layer_file_size = layer.file_size(); @@ -2385,7 +2323,7 @@ impl Timeline { // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. updates.remove_historic(layer.layer_desc().clone()); - mapping.remove(layer); + self.lcache.delete_layer(layer); Ok(()) } @@ -2571,7 +2509,7 @@ impl Timeline { 'layer_map_search: loop { let remote_layer = { let guard = timeline.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2633,7 +2571,7 @@ impl Timeline { } if let Some(SearchResult { lsn_floor, layer }) = layers.search(key, cont_lsn) { - let layer = mapping.get_from_desc(&layer); + let layer = timeline.lcache.get_from_desc(&layer); // If it's a remote layer, download it and retry. if let Some(remote_layer) = super::storage_layer::downcast_remote_layer(&layer) @@ -3136,7 +3074,7 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; + let (layers, _) = &mut *guard; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -3144,7 +3082,7 @@ impl Timeline { LayerResidenceEventReason::LayerCreate, ); batch_updates.insert_historic(l.layer_desc().clone()); - mapping.insert(l); + self.lcache.create_new_layer(l); batch_updates.flush(); // update the timeline's physical size @@ -3374,7 +3312,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; + let (layers, _) = &mut *guard; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -3397,7 +3335,7 @@ impl Timeline { LayerResidenceEventReason::LayerCreate, ); updates.insert_historic(l.layer_desc().clone()); - mapping.insert(l); + self.lcache.create_new_layer(l); } updates.flush(); drop_wlock(guard); @@ -3439,12 +3377,12 @@ impl Timeline { /// start of level0 files compaction, the on-demand download should be revisited as well. async fn compact_level0_phase1( &self, - _layer_removal_cs: Arc>, + _layer_removal_cs: DeleteGuard, target_file_size: u64, ctx: &RequestContext, ) -> Result { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; let mut level0_deltas = layers.get_level0_deltas()?; // Only compact if enough layers have accumulated. @@ -3492,7 +3430,7 @@ impl Timeline { let remotes = deltas_to_compact .iter() - .map(|l| mapping.get_from_desc(l)) + .map(|l| self.lcache.get_from_desc(l)) .filter(|l| l.is_remote_layer()) .inspect(|l| info!("compact requires download of {}", l.filename().file_name())) .map(|l| { @@ -3504,7 +3442,7 @@ impl Timeline { let deltas_to_compact_layers = deltas_to_compact .iter() - .map(|l| mapping.get_from_desc(l)) + .map(|l| self.lcache.get_from_desc(l)) .collect_vec(); drop_rlock(guard); @@ -3785,7 +3723,7 @@ impl Timeline { /// async fn compact_level0( self: &Arc, - layer_removal_cs: Arc>, + layer_removal_cs: DeleteGuard, target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { @@ -3813,7 +3751,7 @@ impl Timeline { } let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; + let (layers, _) = &mut *guard; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3846,7 +3784,7 @@ impl Timeline { LayerResidenceEventReason::LayerCreate, ); updates.insert_historic(x.layer_desc().clone()); - mapping.insert(x); + self.lcache.create_new_layer(x); } // Now that we have reshuffled the data to set of new delta layers, we can @@ -3854,7 +3792,7 @@ impl Timeline { let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); for l in deltas_to_compact { layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates, mapping)?; + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; } updates.flush(); drop_wlock(guard); @@ -3974,7 +3912,7 @@ impl Timeline { fail_point!("before-timeline-gc"); - let layer_removal_cs = Arc::new(self.layer_removal_cs.clone().lock_owned().await); + let layer_removal_cs = self.lcache.delete_guard().await; // Is the timeline being deleted? if self.is_stopping() { anyhow::bail!("timeline is Stopping"); @@ -4012,7 +3950,7 @@ impl Timeline { async fn gc_timeline( &self, - layer_removal_cs: Arc>, + layer_removal_cs: DeleteGuard, horizon_cutoff: Lsn, pitr_cutoff: Lsn, retain_lsns: Vec, @@ -4074,7 +4012,7 @@ impl Timeline { // // TODO holding a write lock is too agressive and avoidable let mut guard = self.layers.write().await; - let (layers, mapping) = &mut *guard; + let (layers, _) = &mut *guard; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4190,7 +4128,6 @@ impl Timeline { layer_removal_cs.clone(), doomed_layer, &mut updates, - mapping, )?; // FIXME: schedule succeeded deletions before returning? result.layers_removed += 1; } @@ -4376,13 +4313,13 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. let mut guard = self_clone.layers.write().await; - let (layers, mapping) = &mut *guard; + let (layers, _) = &mut *guard; let updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { let l: Arc = remote_layer.clone(); - let failure = match mapping.replace_and_verify(l, new_layer) { + let failure = match self_clone.lcache.replace_and_verify(l, new_layer) { Ok(()) => false, Err(e) => { // this is a precondition failure, the layer filename derived @@ -4511,10 +4448,10 @@ impl Timeline { let mut downloads = Vec::new(); { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; layers .iter_historic_layers() - .map(|l| mapping.get_from_desc(&l)) + .map(|l| self.lcache.get_from_desc(&l)) .filter_map(|l| l.downcast_remote_layer()) .map(|l| self.download_remote_layer(l)) .for_each(|dl| downloads.push(dl)) @@ -4616,7 +4553,7 @@ impl LocalLayerInfoForDiskUsageEviction { impl Timeline { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4625,7 +4562,7 @@ impl Timeline { let file_size = l.file_size(); max_layer_size = max_layer_size.map_or(Some(file_size), |m| Some(m.max(file_size))); - let l = mapping.get_from_desc(&l); + let l = self.lcache.get_from_desc(&l); if l.is_remote_layer() { continue; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 03cf2d89ad..89231a31ed 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -198,10 +198,10 @@ impl Timeline { // So, we just need to deal with this. let candidates: Vec> = { let guard = self.layers.read().await; - let (layers, mapping) = &*guard; + let (layers, _) = &*guard; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { - let hist_layer = mapping.get_from_desc(&hist_layer); + let hist_layer = self.lcache.get_from_desc(&hist_layer); if hist_layer.is_remote_layer() { continue; }