From 908ffc5fef60a60e6f2cf2a095f700516cbdeb85 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 10 May 2023 19:53:02 +0200 Subject: [PATCH] switch Timeline::layers to parking_lot::RwLock --- pageserver/src/tenant.rs | 4 +- pageserver/src/tenant/timeline.rs | 49 +++++++++---------- .../src/tenant/timeline/eviction_task.rs | 2 +- 3 files changed, 26 insertions(+), 29 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c04d91fe10..885f4ca17a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -560,7 +560,6 @@ impl Tenant { || timeline .layers .read() - .unwrap() .iter_historic_layers() .next() .is_some(), @@ -2502,8 +2501,7 @@ impl Tenant { ) { Ok(new_timeline) => { if init_layers { - new_timeline.layers.write().unwrap().next_open_layer_at = - Some(new_timeline.initdb_lsn); + new_timeline.layers.write().next_open_layer_at = Some(new_timeline.initdb_lsn); } debug!( "Successfully created initial files for timeline {tenant_id}/{new_timeline_id}" diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 90f2951aef..ae8acc6c97 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -121,7 +121,7 @@ pub struct Timeline { pub pg_version: u32, - pub(super) layers: RwLock>, + pub(super) layers: parking_lot::RwLock>, last_freeze_at: AtomicLsn, // Atomic would be more appropriate here. @@ -547,7 +547,7 @@ impl Timeline { /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + let layer_map = self.layers.read(); let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -847,7 +847,7 @@ impl Timeline { /// safekeepers to regard pageserver as caught up and suspend activity. pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); if let Some(open_layer) = &layers.open_layer { let open_layer_size = open_layer.size()?; drop(layers); @@ -927,7 +927,7 @@ impl Timeline { } pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + let layer_map = self.layers.read(); let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1047,7 +1047,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write(); let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1311,7 +1311,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: parking_lot::RwLock::new(LayerMap::default()), walredo_mgr, walreceiver, @@ -1452,7 +1452,7 @@ impl Timeline { /// Returns all timeline-related files that were found and loaded. /// pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1581,7 +1581,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write(); let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1734,7 +1734,6 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2060,7 +2059,7 @@ impl Timeline { } fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + for historic_layer in self.layers.read().iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2227,7 +2226,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read(); // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2407,7 +2406,7 @@ impl Timeline { /// Get a handle to the latest layer for appending. /// fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); ensure!(lsn.is_aligned()); @@ -2480,7 +2479,7 @@ impl Timeline { } else { Some(self.write_lock.lock().unwrap()) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2518,7 +2517,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2619,7 +2618,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -2737,7 +2736,7 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -2792,7 +2791,7 @@ impl Timeline { fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); let mut max_deltas = 0; @@ -2935,7 +2934,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for l in image_layers { @@ -3002,7 +3001,7 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); let mut level0_deltas = layers.get_level0_deltas()?; drop(layers); @@ -3125,7 +3124,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here? + let layers = self.layers.read(); // Is'n it better to hold original layers lock till here? let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? @@ -3362,7 +3361,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3621,7 +3620,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write(); 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -3904,7 +3903,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write(); let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4062,7 +4061,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4165,7 +4164,7 @@ impl LocalLayerInfoForDiskUsageEviction { impl Timeline { pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 523a5f8fa7..2f081c4f58 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -178,7 +178,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read(); let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() {