diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 10f55df88e..6252fabd56 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -87,8 +87,8 @@ pub mod disk_btree; pub(crate) mod ephemeral_file; pub mod layer_cache; pub mod layer_map; -pub mod manifest; pub mod layer_map_mgr; +pub mod manifest; pub mod metadata; mod par_fsync; @@ -557,17 +557,10 @@ impl Tenant { .context("failed to reconcile with remote")? } + let layers = timeline.layer_mgr.read(); // Sanity check: a timeline should have some content. anyhow::ensure!( - ancestor.is_some() - || timeline - .layers - .read() - .await - .0 - .iter_historic_layers() - .next() - .is_some(), + ancestor.is_some() || layers.iter_historic_layers().next().is_some(), "Timeline has no ancestor and no layer files" ); diff --git a/pageserver/src/tenant/layer_map_mgr.rs b/pageserver/src/tenant/layer_map_mgr.rs index 1e165c55ab..b3a52fcf0f 100644 --- a/pageserver/src/tenant/layer_map_mgr.rs +++ b/pageserver/src/tenant/layer_map_mgr.rs @@ -63,6 +63,18 @@ impl LayerMapMgr { self.layer_map.store(Arc::new(new_state)); Ok(()) } + + /// Update the layer map. + pub fn update_sync(&self, operation: O) -> Result<()> + where + O: FnOnce(LayerMap) -> Result, + { + let state_lock = self.state_lock.blocking_lock(); + let state = self.clone_for_write(&state_lock); + let new_state = operation(state)?; + self.layer_map.store(Arc::new(new_state)); + Ok(()) + } } #[cfg(test)] diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 7bc513b3a1..431b9bc4ac 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -41,8 +41,6 @@ pub use inmemory_layer::InMemoryLayer; pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey}; pub use remote_layer::RemoteLayer; -use super::layer_map::BatchedUpdates; - pub fn range_overlaps(a: &Range, b: &Range) -> bool where T: PartialOrd, @@ -177,12 +175,10 @@ impl LayerAccessStats { /// /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. pub(crate) fn for_loading_layer( - layer_map_lock_held_witness: &BatchedUpdates<'_>, status: LayerResidenceStatus, ) -> Self { let new = LayerAccessStats(Mutex::new(LayerAccessStatsLocked::default())); new.record_residence_event( - layer_map_lock_held_witness, status, LayerResidenceEventReason::LayerLoad, ); @@ -196,7 +192,6 @@ impl LayerAccessStats { /// See [`record_residence_event`] for why you need to do this while holding the layer map lock. pub(crate) fn clone_for_residence_change( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_>, new_status: LayerResidenceStatus, ) -> LayerAccessStats { let clone = { @@ -205,7 +200,6 @@ impl LayerAccessStats { }; let new = LayerAccessStats(Mutex::new(clone)); new.record_residence_event( - layer_map_lock_held_witness, new_status, LayerResidenceEventReason::ResidenceChange, ); @@ -228,7 +222,6 @@ impl LayerAccessStats { /// pub(crate) fn record_residence_event( &self, - _layer_map_lock_held_witness: &BatchedUpdates<'_>, status: LayerResidenceStatus, reason: LayerResidenceEventReason, ) { diff --git a/pageserver/src/tenant/storage_layer/remote_layer.rs b/pageserver/src/tenant/storage_layer/remote_layer.rs index 9d423ed815..d5fe220351 100644 --- a/pageserver/src/tenant/storage_layer/remote_layer.rs +++ b/pageserver/src/tenant/storage_layer/remote_layer.rs @@ -4,7 +4,6 @@ use crate::config::PageServerConf; use crate::context::RequestContext; use crate::repository::Key; -use crate::tenant::layer_map::BatchedUpdates; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use anyhow::{bail, Result}; @@ -220,7 +219,6 @@ impl RemoteLayer { /// Create a Layer struct representing this layer, after it has been downloaded. pub fn create_downloaded_layer( &self, - layer_map_lock_held_witness: &BatchedUpdates<'_>, conf: &'static PageServerConf, file_size: u64, ) -> Arc { @@ -232,10 +230,8 @@ impl RemoteLayer { self.desc.tenant_id, &fname, file_size, - self.access_stats.clone_for_residence_change( - layer_map_lock_held_witness, - LayerResidenceStatus::Resident, - ), + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } else { let fname = self.desc.image_file_name(); @@ -245,10 +241,8 @@ impl RemoteLayer { self.desc.tenant_id, &fname, file_size, - self.access_stats.clone_for_residence_change( - layer_map_lock_held_witness, - LayerResidenceStatus::Resident, - ), + self.access_stats + .clone_for_residence_change(LayerResidenceStatus::Resident), )) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7ba5d218c1..f46be51304 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -81,6 +81,7 @@ use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; use super::layer_cache::{DeleteGuard, LayerCache}; use super::layer_map::BatchedUpdates; +use super::layer_map_mgr::LayerMapMgr; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; use super::storage_layer::{ @@ -118,26 +119,6 @@ impl PartialOrd for Hole { } } -pub struct LayerMapping(()); - -impl LayerMapping { - pub(crate) fn new() -> Self { - Self(()) - } -} - -/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. -/// Can be removed after all refactors are done. -fn drop_rlock(rlock: tokio::sync::RwLockReadGuard<'_, T>) { - drop(rlock) -} - -/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things. -/// Can be removed after all refactors are done. -fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { - drop(rlock) -} - pub struct Timeline { pub(super) conf: &'static PageServerConf, tenant_conf: Arc>, @@ -149,8 +130,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: tokio::sync::RwLock<(LayerMap, LayerMapping)>, - + pub(super) layer_mgr: LayerMapMgr, pub(super) lcache: LayerCache, /// Set of key ranges which should be covered by image layers to @@ -617,8 +597,8 @@ impl Timeline { /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. pub async fn layer_size_sum(&self) -> u64 { - let guard = self.layers.read().await; - let (layer_map, _) = &*guard; + let _guard = self.lcache.layer_in_use_read(); + let layer_map = self.layer_mgr.read(); let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -928,8 +908,8 @@ impl Timeline { pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let open_layer_size = { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let _guard = self.lcache.layer_in_use_read(); + let layers = self.layer_mgr.read(); let Some(open_layer) = layers.open_layer.as_ref() else { return Ok(()); }; @@ -1060,8 +1040,8 @@ impl Timeline { } pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let guard = self.layers.read().await; - let (layer_map, _) = &*guard; + let _guard = self.lcache.layer_in_use_read(); + let layer_map = self.layer_mgr.read(); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1181,25 +1161,34 @@ impl Timeline { ); } - // start the batch update - let mut guard = self.layers.write().await; - let (layer_map, _) = &mut *guard; - let mut batch_updates = layer_map.batch_update(); - let mut results = Vec::with_capacity(layers_to_evict.len()); - for l in layers_to_evict.iter() { - let res = if cancel.is_cancelled() { - None - } else { - Some(self.evict_layer_batch_impl(&layer_removal_guard, l, &mut batch_updates)) - }; - results.push(res); - } + // start the batch update + let guard = self.lcache.layer_in_use_write().await; + self.layer_mgr + .update(|mut layer_map| async { + let mut batch_updates = layer_map.batch_update(); - // commit the updates & release locks - batch_updates.flush(); - drop_wlock(guard); + for l in layers_to_evict.iter() { + let res = if cancel.is_cancelled() { + None + } else { + Some(self.evict_layer_batch_impl( + &layer_removal_guard, + l, + &mut batch_updates, + )) + }; + results.push(res); + } + + // commit the updates & release locks + batch_updates.flush(); + Ok(layer_map) + }) + .await?; + + drop(guard); drop(layer_removal_guard); assert_eq!(results.len(), layers_to_evict.len()); @@ -1246,7 +1235,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), LayerFileName::Delta(delta_name) => RemoteLayer::new_delta( self.tenant_id, @@ -1255,7 +1244,7 @@ impl Timeline { &layer_metadata, local_layer .access_stats() - .clone_for_residence_change(batch_updates, LayerResidenceStatus::Evicted), + .clone_for_residence_change(LayerResidenceStatus::Evicted), ), }); @@ -1432,7 +1421,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())), + layer_mgr: LayerMapMgr::new(LayerMap::default()), lcache: LayerCache::new(myself.clone()), wanted_image_layers: Mutex::new(None), @@ -1613,132 +1602,139 @@ impl Timeline { /// Initialize with an empty layer map. Used when creating a new timeline. /// pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) { - let mut layers = self.layers.try_write().expect( - "in the context where we call this function, no other task has access to the object", - ); - layers.0.next_open_layer_at = Some(Lsn(start_lsn.0)); + self.layer_mgr + .update_sync(|mut layers| { + layers.next_open_layer_at = Some(Lsn(start_lsn.0)); + Ok(layers) + }) + .unwrap(); } /// /// Scan the timeline directory to populate the layer map. /// pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - let mut updates = layers.batch_update(); - let mut num_layers = 0; + let guard = self.lcache.layer_in_use_write().await; + self.layer_mgr + .update(|mut layers| async move { + let mut updates = layers.batch_update(); + let mut num_layers = 0; - let timer = self.metrics.load_layer_map_histo.start_timer(); + let timer = self.metrics.load_layer_map_histo.start_timer(); - // Scan timeline directory and create ImageFileName and DeltaFilename - // structs representing all files on disk - let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); - // total size of layer files in the current timeline directory - let mut total_physical_size = 0; + // Scan timeline directory and create ImageFileName and DeltaFilename + // structs representing all files on disk + let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + // total size of layer files in the current timeline directory + let mut total_physical_size = 0; - for direntry in fs::read_dir(timeline_path)? { - let direntry = direntry?; - let direntry_path = direntry.path(); - let fname = direntry.file_name(); - let fname = fname.to_string_lossy(); + for direntry in fs::read_dir(timeline_path)? { + let direntry = direntry?; + let direntry_path = direntry.path(); + let fname = direntry.file_name(); + let fname = fname.to_string_lossy(); - if let Some(imgfilename) = ImageFileName::parse_str(&fname) { - // create an ImageLayer struct for each image file. - if imgfilename.lsn > disk_consistent_lsn { - warn!( + if let Some(imgfilename) = ImageFileName::parse_str(&fname) { + // create an ImageLayer struct for each image file. + if imgfilename.lsn > disk_consistent_lsn { + warn!( "found future image layer {} on timeline {} disk_consistent_lsn is {}", imgfilename, self.timeline_id, disk_consistent_lsn ); - rename_to_backup(&direntry_path)?; - continue; - } + rename_to_backup(&direntry_path)?; + continue; + } - let file_size = direntry_path.metadata()?.len(); + let file_size = direntry_path.metadata()?.len(); - let layer = ImageLayer::new( - self.conf, - self.timeline_id, - self.tenant_id, - &imgfilename, - file_size, - LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), - ); + let layer = ImageLayer::new( + self.conf, + self.timeline_id, + self.tenant_id, + &imgfilename, + file_size, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + ); - trace!("found layer {}", layer.path().display()); - total_physical_size += file_size; - updates.insert_historic(layer.layer_desc().clone()); - self.lcache.populate_local_when_init(Arc::new(layer)); - num_layers += 1; - } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { - // Create a DeltaLayer struct for each delta file. - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - if deltafilename.lsn_range.end > disk_consistent_lsn + 1 { - warn!( + trace!("found layer {}", layer.path().display()); + total_physical_size += file_size; + updates.insert_historic(layer.layer_desc().clone()); + self.lcache.populate_local_when_init(Arc::new(layer)); + num_layers += 1; + } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { + // Create a DeltaLayer struct for each delta file. + // The end-LSN is exclusive, while disk_consistent_lsn is + // inclusive. For example, if disk_consistent_lsn is 100, it is + // OK for a delta layer to have end LSN 101, but if the end LSN + // is 102, then it might not have been fully flushed to disk + // before crash. + if deltafilename.lsn_range.end > disk_consistent_lsn + 1 { + warn!( "found future delta layer {} on timeline {} disk_consistent_lsn is {}", deltafilename, self.timeline_id, disk_consistent_lsn ); - rename_to_backup(&direntry_path)?; - continue; - } + rename_to_backup(&direntry_path)?; + continue; + } - let file_size = direntry_path.metadata()?.len(); + let file_size = direntry_path.metadata()?.len(); - let layer = DeltaLayer::new( - self.conf, - self.timeline_id, - self.tenant_id, - &deltafilename, - file_size, - LayerAccessStats::for_loading_layer(&updates, LayerResidenceStatus::Resident), - ); + let layer = DeltaLayer::new( + self.conf, + self.timeline_id, + self.tenant_id, + &deltafilename, + file_size, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Resident), + ); - trace!("found layer {}", layer.path().display()); - total_physical_size += file_size; - updates.insert_historic(layer.layer_desc().clone()); - self.lcache.populate_local_when_init(Arc::new(layer)); - num_layers += 1; - } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { - // ignore these - } else if remote_timeline_client::is_temp_download_file(&direntry_path) { - info!( + trace!("found layer {}", layer.path().display()); + total_physical_size += file_size; + updates.insert_historic(layer.layer_desc().clone()); + self.lcache.populate_local_when_init(Arc::new(layer)); + num_layers += 1; + } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { + // ignore these + } else if remote_timeline_client::is_temp_download_file(&direntry_path) { + info!( "skipping temp download file, reconcile_with_remote will resume / clean up: {}", fname ); - } else if is_ephemeral_file(&fname) { - // Delete any old ephemeral files - trace!("deleting old ephemeral file in timeline dir: {}", fname); - fs::remove_file(&direntry_path)?; - } else if is_temporary(&direntry_path) { - info!("removing temp timeline file at {}", direntry_path.display()); - fs::remove_file(&direntry_path).with_context(|| { - format!( - "failed to remove temp download file at {}", - direntry_path.display() - ) - })?; - } else { - warn!("unrecognized filename in timeline dir: {}", fname); - } - } + } else if is_ephemeral_file(&fname) { + // Delete any old ephemeral files + trace!("deleting old ephemeral file in timeline dir: {}", fname); + fs::remove_file(&direntry_path)?; + } else if is_temporary(&direntry_path) { + info!("removing temp timeline file at {}", direntry_path.display()); + fs::remove_file(&direntry_path).with_context(|| { + format!( + "failed to remove temp download file at {}", + direntry_path.display() + ) + })?; + } else { + warn!("unrecognized filename in timeline dir: {}", fname); + } + } - updates.flush(); - layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1); + updates.flush(); + layers.next_open_layer_at = Some(Lsn(disk_consistent_lsn.0) + 1); - info!( - "loaded layer map with {} layers at {}, total physical size: {}", - num_layers, disk_consistent_lsn, total_physical_size - ); - self.metrics - .resident_physical_size_gauge - .set(total_physical_size); + info!( + "loaded layer map with {} layers at {}, total physical size: {}", + num_layers, disk_consistent_lsn, total_physical_size + ); + self.metrics + .resident_physical_size_gauge + .set(total_physical_size); - timer.stop_and_record(); + timer.stop_and_record(); + + Ok(layers) + }) + .await?; Ok(()) } @@ -1755,127 +1751,127 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut guard = self.layers.write().await; - let (layer_map, _) = &mut *guard; - let mut updates = layer_map.batch_update(); - for remote_layer_name in &index_part.timeline_layers { - let local_layer = local_only_layers.remove(remote_layer_name); + let guard = self.lcache.layer_in_use_write().await; - let remote_layer_metadata = index_part - .layer_metadata - .get(remote_layer_name) - .map(LayerFileMetadata::from) - .with_context(|| { - format!( - "No remote layer metadata found for layer {}", - remote_layer_name.file_name() - ) - })?; + let operation = |mut layer_map: LayerMap| { + let mut updates = layer_map.batch_update(); + for remote_layer_name in &index_part.timeline_layers { + let local_layer = local_only_layers.remove(remote_layer_name); - // Is the local layer's size different from the size stored in the - // remote index file? - // If so, rename_to_backup those files & replace their local layer with - // a RemoteLayer in the layer map so that we re-download them on-demand. - if let Some(local_layer) = local_layer { - let local_layer_path = local_layer - .local_path() - .expect("caller must ensure that local_layers only contains local layers"); - ensure!( - local_layer_path.exists(), - "every layer from local_layers must exist on disk: {}", - local_layer_path.display() - ); + let remote_layer_metadata = index_part + .layer_metadata + .get(remote_layer_name) + .map(LayerFileMetadata::from) + .with_context(|| { + format!( + "No remote layer metadata found for layer {}", + remote_layer_name.file_name() + ) + })?; - let remote_size = remote_layer_metadata.file_size(); - let metadata = local_layer_path.metadata().with_context(|| { - format!( - "get file size of local layer {}", + // Is the local layer's size different from the size stored in the + // remote index file? + // If so, rename_to_backup those files & replace their local layer with + // a RemoteLayer in the layer map so that we re-download them on-demand. + if let Some(local_layer) = local_layer { + let local_layer_path = local_layer + .local_path() + .expect("caller must ensure that local_layers only contains local layers"); + ensure!( + local_layer_path.exists(), + "every layer from local_layers must exist on disk: {}", local_layer_path.display() - ) - })?; - let local_size = metadata.len(); - if local_size != remote_size { - warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); - if let Err(err) = rename_to_backup(&local_layer_path) { - assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display()); - anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); + ); + + let remote_size = remote_layer_metadata.file_size(); + let metadata = local_layer_path.metadata().with_context(|| { + format!( + "get file size of local layer {}", + local_layer_path.display() + ) + })?; + let local_size = metadata.len(); + if local_size != remote_size { + warn!("removing local file {local_layer_path:?} because it has unexpected length {local_size}; length in remote index is {remote_size}"); + if let Err(err) = rename_to_backup(&local_layer_path) { + assert!(local_layer_path.exists(), "we would leave the local_layer without a file if this does not hold: {}", local_layer_path.display()); + anyhow::bail!("could not rename file {local_layer_path:?}: {err:?}"); + } else { + self.metrics.resident_physical_size_gauge.sub(local_size); + updates.remove_historic(local_layer.layer_desc().clone()); + self.lcache.remove_local_when_init(local_layer); + // fall-through to adding the remote layer + } } else { - self.metrics.resident_physical_size_gauge.sub(local_size); - updates.remove_historic(local_layer.layer_desc().clone()); - self.lcache.remove_local_when_init(local_layer); - // fall-through to adding the remote layer - } - } else { - debug!( - "layer is present locally and file size matches remote, using it: {}", - local_layer_path.display() - ); - continue; - } - } - - info!( - "remote layer does not exist locally, creating remote layer: {}", - remote_layer_name.file_name() - ); - - match remote_layer_name { - LayerFileName::Image(imgfilename) => { - if imgfilename.lsn > up_to_date_disk_consistent_lsn { - warn!( - "found future image layer {} on timeline {} remote_consistent_lsn is {}", - imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn - ); - continue; - } - - let remote_layer = RemoteLayer::new_img( - self.tenant_id, - self.timeline_id, - imgfilename, - &remote_layer_metadata, - LayerAccessStats::for_loading_layer( - &updates, - LayerResidenceStatus::Evicted, - ), - ); - let remote_layer = Arc::new(remote_layer); - - updates.insert_historic(remote_layer.layer_desc().clone()); - self.lcache.populate_remote_when_init(remote_layer); - } - LayerFileName::Delta(deltafilename) => { - // Create a RemoteLayer for the delta file. - // The end-LSN is exclusive, while disk_consistent_lsn is - // inclusive. For example, if disk_consistent_lsn is 100, it is - // OK for a delta layer to have end LSN 101, but if the end LSN - // is 102, then it might not have been fully flushed to disk - // before crash. - if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 { - warn!( - "found future delta layer {} on timeline {} remote_consistent_lsn is {}", - deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn + debug!( + "layer is present locally and file size matches remote, using it: {}", + local_layer_path.display() ); continue; } - let remote_layer = RemoteLayer::new_delta( - self.tenant_id, - self.timeline_id, - deltafilename, - &remote_layer_metadata, - LayerAccessStats::for_loading_layer( - &updates, - LayerResidenceStatus::Evicted, - ), - ); - let remote_layer = Arc::new(remote_layer); - updates.insert_historic(remote_layer.layer_desc().clone()); - self.lcache.populate_remote_when_init(remote_layer); + } + + info!( + "remote layer does not exist locally, creating remote layer: {}", + remote_layer_name.file_name() + ); + + match remote_layer_name { + LayerFileName::Image(imgfilename) => { + if imgfilename.lsn > up_to_date_disk_consistent_lsn { + warn!( + "found future image layer {} on timeline {} remote_consistent_lsn is {}", + imgfilename, self.timeline_id, up_to_date_disk_consistent_lsn + ); + continue; + } + + let remote_layer = RemoteLayer::new_img( + self.tenant_id, + self.timeline_id, + imgfilename, + &remote_layer_metadata, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), + ); + let remote_layer = Arc::new(remote_layer); + + updates.insert_historic(remote_layer.layer_desc().clone()); + self.lcache.populate_remote_when_init(remote_layer); + } + LayerFileName::Delta(deltafilename) => { + // Create a RemoteLayer for the delta file. + // The end-LSN is exclusive, while disk_consistent_lsn is + // inclusive. For example, if disk_consistent_lsn is 100, it is + // OK for a delta layer to have end LSN 101, but if the end LSN + // is 102, then it might not have been fully flushed to disk + // before crash. + if deltafilename.lsn_range.end > up_to_date_disk_consistent_lsn + 1 { + warn!( + "found future delta layer {} on timeline {} remote_consistent_lsn is {}", + deltafilename, self.timeline_id, up_to_date_disk_consistent_lsn + ); + continue; + } + let remote_layer = RemoteLayer::new_delta( + self.tenant_id, + self.timeline_id, + deltafilename, + &remote_layer_metadata, + LayerAccessStats::for_loading_layer(LayerResidenceStatus::Evicted), + ); + let remote_layer = Arc::new(remote_layer); + updates.insert_historic(remote_layer.layer_desc().clone()); + self.lcache.populate_remote_when_init(remote_layer); + } } } - } - updates.flush(); + updates.flush(); + Ok(layer_map) + }; + + self.layer_mgr.update_sync(operation)?; + Ok(local_only_layers) } @@ -1910,8 +1906,8 @@ impl Timeline { let disk_consistent_lsn = up_to_date_metadata.disk_consistent_lsn(); let local_layers = { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); layers .iter_historic_layers() .map(|l| (l.filename(), self.lcache.get_from_desc(&l))) @@ -2287,8 +2283,8 @@ impl Timeline { } async fn find_layer(&self, layer_file_name: &str) -> Option> { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); for historic_layer in layers.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { @@ -2508,8 +2504,8 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let guard = timeline.layers.read().await; - let (layers, _) = &*guard; + let guard = timeline.lcache.layer_in_use_read().await; + let layers = timeline.layer_mgr.read(); // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2693,8 +2689,8 @@ impl Timeline { /// Get a handle to the latest layer for appending. /// async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; + let guard = self.lcache.layer_in_use_write().await; + let layers = self.layer_mgr.read(); ensure!(lsn.is_aligned()); @@ -2735,8 +2731,11 @@ impl Timeline { InMemoryLayer::create(self.conf, self.timeline_id, self.tenant_id, start_lsn)?; let layer_rc = Arc::new(new_layer); - layers.open_layer = Some(Arc::clone(&layer_rc)); - layers.next_open_layer_at = None; + self.layer_mgr.update_sync(|mut layers| { + layers.open_layer = Some(Arc::clone(&layer_rc)); + layers.next_open_layer_at = None; + Ok(layers) + })?; layer = layer_rc; } @@ -2771,22 +2770,29 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; + let mut _guard = self.lcache.layer_in_use_write().await; + let layers = self.layer_mgr.read(); + if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? let end_lsn = Lsn(self.get_last_record_lsn().0 + 1); open_layer.freeze(end_lsn); - // The layer is no longer open, update the layer map to reflect this. - // We will replace it with on-disk historics below. - layers.frozen_layers.push_back(open_layer_rc); - layers.open_layer = None; - layers.next_open_layer_at = Some(end_lsn); + self.layer_mgr + .update(|mut layers| async move { + // The layer is no longer open, update the layer map to reflect this. + // We will replace it with on-disk historics below. + layers.frozen_layers.push_back(open_layer_rc); + layers.open_layer = None; + layers.next_open_layer_at = Some(end_lsn); + Ok(layers) + }) + .await + .unwrap(); + self.last_freeze_at.store(end_lsn); } - drop_wlock(guard); } /// Layer flusher task's main loop. @@ -2810,8 +2816,8 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2936,14 +2942,19 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().await; - let l = layers.0.frozen_layers.pop_front(); + let _guard = self.lcache.layer_in_use_write().await; + self.layer_mgr + .update_sync(|mut layers| { + let l = layers.frozen_layers.pop_front(); - // Only one thread may call this function at a time (for this - // timeline). If two threads tried to flush the same frozen - // layer to disk at the same time, that would not work. - assert!(layer_map::compare_arced_layers(&l.unwrap(), &frozen_layer)); + // Only one thread may call this function at a time (for this + // timeline). If two threads tried to flush the same frozen + // layer to disk at the same time, that would not work. + assert!(layer_map::compare_arced_layers(&l.unwrap(), &frozen_layer)); + Ok(layers) + }) + .unwrap(); // release lock on 'layers' } @@ -3073,17 +3084,18 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - let mut batch_updates = layers.batch_update(); - l.access_stats().record_residence_event( - &batch_updates, - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - batch_updates.insert_historic(l.layer_desc().clone()); - self.lcache.create_new_layer(l); - batch_updates.flush(); + let guard = self.lcache.layer_in_use_write().await; + self.layer_mgr.update_sync(|mut layers| { + let mut batch_updates = layers.batch_update(); + l.access_stats().record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + batch_updates.insert_historic(l.layer_desc().clone()); + self.lcache.create_new_layer(l); + batch_updates.flush(); + Ok(layers) + })?; // update the timeline's physical size self.metrics.resident_physical_size_gauge.add(sz); @@ -3132,8 +3144,8 @@ impl Timeline { ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); let mut max_deltas = 0; { @@ -3311,34 +3323,37 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - let mut updates = layers.batch_update(); - let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); + let guard = self.lcache.layer_in_use_write().await; + self.layer_mgr.update_sync(|mut layers| { + let mut updates = layers.batch_update(); + let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); - for l in image_layers { - let path = l.filename(); - let metadata = timeline_path - .join(path.file_name()) - .metadata() - .with_context(|| format!("reading metadata of layer file {}", path.file_name()))?; + for l in image_layers { + let path = l.filename(); + let metadata = timeline_path + .join(path.file_name()) + .metadata() + .with_context(|| { + format!("reading metadata of layer file {}", path.file_name()) + })?; - layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len())); + layer_paths_to_upload.insert(path, LayerFileMetadata::new(metadata.len())); - self.metrics - .resident_physical_size_gauge - .add(metadata.len()); - let l = Arc::new(l); - l.access_stats().record_residence_event( - &updates, - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - updates.insert_historic(l.layer_desc().clone()); - self.lcache.create_new_layer(l); - } - updates.flush(); - drop_wlock(guard); + self.metrics + .resident_physical_size_gauge + .add(metadata.len()); + let l = Arc::new(l); + l.access_stats().record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + updates.insert_historic(l.layer_desc().clone()); + self.lcache.create_new_layer(l); + } + updates.flush(); + Ok(layers) + })?; + drop(guard); timer.stop_and_record(); Ok(layer_paths_to_upload) @@ -3381,8 +3396,8 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); let mut level0_deltas = layers.get_level0_deltas()?; // Only compact if enough layers have accumulated. @@ -3445,7 +3460,7 @@ impl Timeline { .map(|l| self.lcache.get_from_desc(l)) .collect_vec(); - drop_rlock(guard); + drop(guard); if !remotes.is_empty() { // caller is holding the lock to layer_removal_cs, and we don't want to download while @@ -3512,8 +3527,8 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let guard = self.layers.read().await; // Is'n it better to hold original layers lock till here? - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; // Is'n it better to hold original layers lock till here? + let layers = self.layer_mgr.read(); let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3546,7 +3561,7 @@ impl Timeline { } prev = Some(next_key.next()); } - drop_rlock(guard); + drop(guard); let mut holes = heap.into_vec(); holes.sort_unstable_by_key(|hole| hole.key_range.start); let mut next_hole = 0; // index of next hole in holes vector @@ -3750,52 +3765,53 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - let mut updates = layers.batch_update(); - let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); - for l in new_layers { - let new_delta_path = l.path(); + let guard = self.lcache.layer_in_use_write().await; + let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); + self.layer_mgr.update_sync(|mut layers| { + let mut updates = layers.batch_update(); + let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); + for l in new_layers { + let new_delta_path = l.path(); - let metadata = new_delta_path.metadata().with_context(|| { - format!( - "read file metadata for new created layer {}", - new_delta_path.display() - ) - })?; + let metadata = new_delta_path.metadata().with_context(|| { + format!( + "read file metadata for new created layer {}", + new_delta_path.display() + ) + })?; - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_upload( - &l.filename(), - &LayerFileMetadata::new(metadata.len()), - )?; + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_upload( + &l.filename(), + &LayerFileMetadata::new(metadata.len()), + )?; + } + + // update the timeline's physical size + self.metrics + .resident_physical_size_gauge + .add(metadata.len()); + + new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); + let x: Arc = Arc::new(l); + x.access_stats().record_residence_event( + LayerResidenceStatus::Resident, + LayerResidenceEventReason::LayerCreate, + ); + updates.insert_historic(x.layer_desc().clone()); + self.lcache.create_new_layer(x); } - // update the timeline's physical size - self.metrics - .resident_physical_size_gauge - .add(metadata.len()); - - new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); - let x: Arc = Arc::new(l); - x.access_stats().record_residence_event( - &updates, - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - updates.insert_historic(x.layer_desc().clone()); - self.lcache.create_new_layer(x); - } - - // Now that we have reshuffled the data to set of new delta layers, we can - // delete the old ones - let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); - for l in deltas_to_compact { - layer_names_to_delete.push(l.filename()); - self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; - } - updates.flush(); - drop_wlock(guard); + // Now that we have reshuffled the data to set of new delta layers, we can + // delete the old ones + for l in deltas_to_compact { + layer_names_to_delete.push(l.filename()); + self.delete_historic_layer(layer_removal_cs.clone(), l, &mut updates)?; + } + updates.flush(); + Ok(layers) + })?; + drop(guard); // Also schedule the deletions in remote storage if let Some(remote_client) = &self.remote_client { @@ -4011,8 +4027,8 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; + let guard = self.lcache.layer_in_use_write().await; + let layers = self.layer_mgr.read(); 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4111,37 +4127,40 @@ impl Timeline { .unwrap() .replace((new_gc_cutoff, wanted_image_layers.to_keyspace())); - let mut updates = layers.batch_update(); - if !layers_to_remove.is_empty() { - // Persist the new GC cutoff value in the metadata file, before - // we actually remove anything. - self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?; + self.layer_mgr.update_sync(|mut layers| { + let mut updates = layers.batch_update(); + if !layers_to_remove.is_empty() { + // Persist the new GC cutoff value in the metadata file, before + // we actually remove anything. + self.update_metadata_file(self.disk_consistent_lsn.load(), HashMap::new())?; - // Actually delete the layers from disk and remove them from the map. - // (couldn't do this in the loop above, because you cannot modify a collection - // while iterating it. BTreeMap::retain() would be another option) - let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len()); - { - for doomed_layer in layers_to_remove { - layer_names_to_delete.push(doomed_layer.filename()); - self.delete_historic_layer( - layer_removal_cs.clone(), - doomed_layer, - &mut updates, - )?; // FIXME: schedule succeeded deletions before returning? - result.layers_removed += 1; + // Actually delete the layers from disk and remove them from the map. + // (couldn't do this in the loop above, because you cannot modify a collection + // while iterating it. BTreeMap::retain() would be another option) + let mut layer_names_to_delete = Vec::with_capacity(layers_to_remove.len()); + { + for doomed_layer in layers_to_remove { + layer_names_to_delete.push(doomed_layer.filename()); + self.delete_historic_layer( + layer_removal_cs.clone(), + doomed_layer, + &mut updates, + )?; // FIXME: schedule succeeded deletions before returning? + result.layers_removed += 1; + } + } + + if result.layers_removed != 0 { + fail_point!("after-timeline-gc-removed-layers"); + } + + if let Some(remote_client) = &self.remote_client { + remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; } } - - if result.layers_removed != 0 { - fail_point!("after-timeline-gc-removed-layers"); - } - - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; - } - } - updates.flush(); + updates.flush(); + Ok(layers) + })?; info!( "GC completed removing {} layers, cutoff {}", @@ -4312,11 +4331,8 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut guard = self_clone.layers.write().await; - let (layers, _) = &mut *guard; - let updates = layers.batch_update(); - let new_layer = - remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); + let guard = self_clone.lcache.layer_in_use_write().await; + let new_layer = remote_layer.create_downloaded_layer(self_clone.conf, *size); { let l: Arc = remote_layer.clone(); let failure = match self_clone.lcache.replace_and_verify(l, new_layer) { @@ -4347,8 +4363,7 @@ impl Timeline { .store(true, Relaxed); } } - updates.flush(); - drop_wlock(guard); + drop(guard); info!("on-demand download successful"); @@ -4447,8 +4462,8 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); layers .iter_historic_layers() .map(|l| self.lcache.get_from_desc(&l)) @@ -4552,8 +4567,8 @@ impl LocalLayerInfoForDiskUsageEviction { impl Timeline { pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 89231a31ed..62c00e1518 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -197,8 +197,8 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let guard = self.layers.read().await; - let (layers, _) = &*guard; + let guard = self.lcache.layer_in_use_read().await; + let layers = self.layer_mgr.read(); let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { let hist_layer = self.lcache.get_from_desc(&hist_layer);