From cbd04f51406fa03060c65168c3daa1a4cd42d8ea Mon Sep 17 00:00:00 2001 From: Joonas Koivunen Date: Wed, 9 Aug 2023 14:35:56 +0300 Subject: [PATCH] remove_remote_layer: uninteresting refactorings (#4936) In the quest to solve #4745 by moving the download/evictedness to be internally mutable factor of a Layer and get rid of `trait PersistentLayer` at least for prod usage, `layer_removal_cs`, we present some misc cleanups. --------- Co-authored-by: Dmitry Rodionov --- pageserver/src/disk_usage_eviction_task.rs | 16 +-- pageserver/src/tenant/layer_map.rs | 11 +-- pageserver/src/tenant/storage_layer.rs | 18 ---- .../src/tenant/storage_layer/delta_layer.rs | 4 - .../src/tenant/storage_layer/image_layer.rs | 13 +-- pageserver/src/tenant/timeline.rs | 99 ++++++++++--------- .../src/tenant/timeline/layer_manager.rs | 11 +-- 7 files changed, 74 insertions(+), 98 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index b729b6a643..d5bdfc84b9 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -304,17 +304,18 @@ pub async fn disk_usage_eviction_task_iteration_impl( // Debug-log the list of candidates let now = SystemTime::now(); for (i, (partition, candidate)) in candidates.iter().enumerate() { + let desc = candidate.layer.layer_desc(); debug!( "cand {}/{}: size={}, no_access_for={}us, partition={:?}, {}/{}/{}", i + 1, candidates.len(), - candidate.layer.file_size(), + desc.file_size, now.duration_since(candidate.last_activity_ts) .unwrap() .as_micros(), partition, - candidate.layer.get_tenant_id(), - candidate.layer.get_timeline_id(), + desc.tenant_id, + desc.timeline_id, candidate.layer, ); } @@ -346,7 +347,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( warned = Some(usage_planned); } - usage_planned.add_available_bytes(candidate.layer.file_size()); + usage_planned.add_available_bytes(candidate.layer.layer_desc().file_size); batched .entry(TimelineKey(candidate.timeline)) @@ -389,15 +390,16 @@ pub async fn disk_usage_eviction_task_iteration_impl( Ok(results) => { assert_eq!(results.len(), batch.len()); for (result, layer) in results.into_iter().zip(batch.iter()) { + let file_size = layer.layer_desc().file_size; match result { Some(Ok(())) => { - usage_assumed.add_available_bytes(layer.file_size()); + usage_assumed.add_available_bytes(file_size); } Some(Err(EvictionError::CannotEvictRemoteLayer)) => { unreachable!("get_local_layers_for_disk_usage_eviction finds only local layers") } Some(Err(EvictionError::FileNotFound)) => { - evictions_failed.file_sizes += layer.file_size(); + evictions_failed.file_sizes += file_size; evictions_failed.count += 1; } Some(Err( @@ -406,7 +408,7 @@ pub async fn disk_usage_eviction_task_iteration_impl( )) => { let e = utils::error::report_compact_sources(&e); warn!(%layer, "failed to evict layer: {e}"); - evictions_failed.file_sizes += layer.file_size(); + evictions_failed.file_sizes += file_size; evictions_failed.count += 1; } None => { diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index c4b894fcf5..59813b19d5 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -121,7 +121,7 @@ impl BatchedUpdates<'_> { /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer_desc: PersistentLayerDesc) { + pub fn remove_historic(&mut self, layer_desc: &PersistentLayerDesc) { self.layer_map.remove_historic_noflush(layer_desc) } @@ -253,11 +253,11 @@ impl LayerMap { /// /// Helper function for BatchedUpdates::remove_historic /// - pub fn remove_historic_noflush(&mut self, layer_desc: PersistentLayerDesc) { + pub fn remove_historic_noflush(&mut self, layer_desc: &PersistentLayerDesc) { self.historic - .remove(historic_layer_coverage::LayerKey::from(&layer_desc)); + .remove(historic_layer_coverage::LayerKey::from(layer_desc)); let layer_key = layer_desc.key(); - if Self::is_l0(&layer_desc) { + if Self::is_l0(layer_desc) { let len_before = self.l0_delta_layers.len(); let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers); l0_delta_layers.retain(|other| other.key() != layer_key); @@ -766,8 +766,7 @@ mod tests { expected_in_counts ); - map.batch_update() - .remove_historic(downloaded.layer_desc().clone()); + map.batch_update().remove_historic(downloaded.layer_desc()); assert_eq!(count_layer_in(&map, downloaded.layer_desc()), (0, 0)); } diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index b4db7e2f08..951fa38d8d 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -401,16 +401,6 @@ pub trait AsLayerDesc { /// An image layer is a snapshot of all the data in a key-range, at a single /// LSN. pub trait PersistentLayer: Layer + AsLayerDesc { - /// Identify the tenant this layer belongs to - fn get_tenant_id(&self) -> TenantId { - self.layer_desc().tenant_id - } - - /// Identify the timeline this layer belongs to - fn get_timeline_id(&self) -> TimelineId { - self.layer_desc().timeline_id - } - /// File name used for this layer, both in the pageserver's local filesystem /// state as well as in the remote storage. fn filename(&self) -> LayerFileName { @@ -436,14 +426,6 @@ pub trait PersistentLayer: Layer + AsLayerDesc { false } - /// Returns None if the layer file size is not known. - /// - /// Should not change over the lifetime of the layer object because - /// current_physical_size is computed as the som of this value. - fn file_size(&self) -> u64 { - self.layer_desc().file_size - } - fn info(&self, reset: LayerAccessStatsReset) -> HistoricLayerInfo; fn access_stats(&self) -> &LayerAccessStats; diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index fd002ef398..7dc5cdd089 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -108,12 +108,10 @@ impl From<&DeltaLayer> for Summary { // Flag indicating that this version initialize the page const WILL_INIT: u64 = 1; -/// /// Struct representing reference to BLOB in layers. Reference contains BLOB /// offset, and for WAL records it also contains `will_init` flag. The flag /// helps to determine the range of records that needs to be applied, without /// reading/deserializing records themselves. -/// #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub struct BlobRef(pub u64); @@ -138,10 +136,8 @@ impl BlobRef { pub const DELTA_KEY_SIZE: usize = KEY_SIZE + 8; struct DeltaKey([u8; DELTA_KEY_SIZE]); -/// /// This is the key of the B-tree index stored in the delta layer. It consists /// of the serialized representation of a Key and LSN. -/// impl DeltaKey { fn from_slice(buf: &[u8]) -> Self { let mut bytes: [u8; DELTA_KEY_SIZE] = [0u8; DELTA_KEY_SIZE]; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 6c19d0a871..511af71210 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -323,15 +323,10 @@ impl ImageLayer { ) -> Result<&ImageLayerInner> { self.access_stats .record_access(access_kind, ctx.task_kind()); - loop { - if let Some(inner) = self.inner.get() { - return Ok(inner); - } - self.inner - .get_or_try_init(|| self.load_inner()) - .await - .with_context(|| format!("Failed to load image layer {}", self.path().display()))?; - } + self.inner + .get_or_try_init(|| self.load_inner()) + .await + .with_context(|| format!("Failed to load image layer {}", self.path().display())) } async fn load_inner(&self) -> Result { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0e715998ab..b2d29a1e6f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1160,7 +1160,7 @@ impl Timeline { return Err(EvictionError::CannotEvictRemoteLayer); } - let layer_file_size = local_layer.file_size(); + let layer_file_size = local_layer.layer_desc().file_size; let local_layer_mtime = local_layer .local_path() @@ -1590,7 +1590,6 @@ impl Timeline { /// pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { let mut guard = self.layers.write().await; - let mut num_layers = 0; let timer = self.metrics.load_layer_map_histo.start_timer(); @@ -1608,12 +1607,12 @@ impl Timeline { let fname = direntry.file_name(); let fname = fname.to_string_lossy(); - if let Some(imgfilename) = ImageFileName::parse_str(&fname) { + if let Some(filename) = ImageFileName::parse_str(&fname) { // create an ImageLayer struct for each image file. - if imgfilename.lsn > disk_consistent_lsn { + if filename.lsn > disk_consistent_lsn { info!( "found future image layer {} on timeline {} disk_consistent_lsn is {}", - imgfilename, self.timeline_id, disk_consistent_lsn + filename, self.timeline_id, disk_consistent_lsn ); rename_to_backup(&direntry_path)?; @@ -1621,31 +1620,31 @@ impl Timeline { } let file_size = direntry_path.metadata()?.len(); + let stats = + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident); let layer = ImageLayer::new( self.conf, self.timeline_id, self.tenant_id, - &imgfilename, + &filename, file_size, - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident), + stats, ); - trace!("found layer {}", layer.path().display()); total_physical_size += file_size; loaded_layers.push(Arc::new(layer)); - num_layers += 1; - } else if let Some(deltafilename) = DeltaFileName::parse_str(&fname) { + } else if let Some(filename) = DeltaFileName::parse_str(&fname) { // Create a DeltaLayer struct for each delta file. // The end-LSN is exclusive, while disk_consistent_lsn is // inclusive. For example, if disk_consistent_lsn is 100, it is // OK for a delta layer to have end LSN 101, but if the end LSN // is 102, then it might not have been fully flushed to disk // before crash. - if deltafilename.lsn_range.end > disk_consistent_lsn + 1 { + if filename.lsn_range.end > disk_consistent_lsn + 1 { info!( "found future delta layer {} on timeline {} disk_consistent_lsn is {}", - deltafilename, self.timeline_id, disk_consistent_lsn + filename, self.timeline_id, disk_consistent_lsn ); rename_to_backup(&direntry_path)?; @@ -1653,20 +1652,20 @@ impl Timeline { } let file_size = direntry_path.metadata()?.len(); + let stats = + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident); let layer = DeltaLayer::new( self.conf, self.timeline_id, self.tenant_id, - &deltafilename, + &filename, file_size, - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Resident), + stats, ); - trace!("found layer {}", layer.path().display()); total_physical_size += file_size; loaded_layers.push(Arc::new(layer)); - num_layers += 1; } else if fname == METADATA_FILE_NAME || fname.ends_with(".old") { // ignore these } else if remote_timeline_client::is_temp_download_file(&direntry_path) { @@ -1691,6 +1690,7 @@ impl Timeline { } } + let num_layers = loaded_layers.len(); guard.initialize_local_layers(loaded_layers, Lsn(disk_consistent_lsn.0) + 1); info!( @@ -1791,13 +1791,15 @@ impl Timeline { ); continue; } + let stats = + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted); let remote_layer = RemoteLayer::new_img( self.tenant_id, self.timeline_id, imgfilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted), + stats, ); let remote_layer = Arc::new(remote_layer); added_remote_layers.push(remote_layer); @@ -1816,12 +1818,15 @@ impl Timeline { ); continue; } + let stats = + LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted); + let remote_layer = RemoteLayer::new_delta( self.tenant_id, self.timeline_id, deltafilename, &remote_layer_metadata, - LayerAccessStats::for_loading_layer(&guard, LayerResidenceStatus::Evicted), + stats, ); let remote_layer = Arc::new(remote_layer); added_remote_layers.push(remote_layer); @@ -2269,15 +2274,16 @@ trait TraversalLayerExt { impl TraversalLayerExt for Arc { fn traversal_id(&self) -> TraversalId { + let timeline_id = self.layer_desc().timeline_id; match self.local_path() { Some(local_path) => { - debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", self.get_timeline_id())), + debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)), "need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary", ); format!("{}", local_path.display()) } None => { - format!("remote {}/{self}", self.get_timeline_id()) + format!("remote {}/{self}", timeline_id) } } } @@ -2813,7 +2819,10 @@ impl Timeline { // We will remove frozen layer and add delta layer in one atomic operation later. let layer = self.create_delta_layer(&frozen_layer).await?; ( - HashMap::from([(layer.filename(), LayerFileMetadata::new(layer.file_size()))]), + HashMap::from([( + layer.filename(), + LayerFileMetadata::new(layer.layer_desc().file_size), + )]), Some(layer), ) }; @@ -2833,7 +2842,7 @@ impl Timeline { ); // update metrics - let sz = l.file_size(); + let sz = l.layer_desc().file_size; self.metrics.resident_physical_size_gauge.add(sz); self.metrics.num_persistent_files_created.inc_by(1); self.metrics.persistent_bytes_written.inc_by(sz); @@ -3452,14 +3461,14 @@ impl Timeline { // "gaps" in the sequence of level 0 files should only happen in case // of a crash, partial download from cloud storage, or something like // that, so it's not a big deal in practice. - level0_deltas.sort_by_key(|l| l.get_lsn_range().start); + level0_deltas.sort_by_key(|l| l.layer_desc().lsn_range.start); let mut level0_deltas_iter = level0_deltas.iter(); let first_level0_delta = level0_deltas_iter.next().unwrap(); - let mut prev_lsn_end = first_level0_delta.get_lsn_range().end; + let mut prev_lsn_end = first_level0_delta.layer_desc().lsn_range.end; let mut deltas_to_compact = vec![Arc::clone(first_level0_delta)]; for l in level0_deltas_iter { - let lsn_range = l.get_lsn_range(); + let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; @@ -3468,8 +3477,13 @@ impl Timeline { prev_lsn_end = lsn_range.end; } let lsn_range = Range { - start: deltas_to_compact.first().unwrap().get_lsn_range().start, - end: deltas_to_compact.last().unwrap().get_lsn_range().end, + start: deltas_to_compact + .first() + .unwrap() + .layer_desc() + .lsn_range + .start, + end: deltas_to_compact.last().unwrap().layer_desc().lsn_range.end, }; let remotes = deltas_to_compact @@ -3521,33 +3535,22 @@ impl Timeline { let mut prev: Option = None; let mut all_value_refs = Vec::new(); + let mut all_keys = Vec::new(); + for l in deltas_to_compact.iter() { // TODO: replace this with an await once we fully go async - all_value_refs.extend( - Handle::current().block_on( - l.clone() - .downcast_delta_layer() - .expect("delta layer") - .load_val_refs(ctx), - )?, - ); + let delta = l.clone().downcast_delta_layer().expect("delta layer"); + Handle::current().block_on(async { + all_value_refs.extend(delta.load_val_refs(ctx).await?); + all_keys.extend(delta.load_keys(ctx).await?); + anyhow::Ok(()) + })?; } + // The current stdlib sorting implementation is designed in a way where it is // particularly fast where the slice is made up of sorted sub-ranges. all_value_refs.sort_by_key(|(key, lsn, _value_ref)| (*key, *lsn)); - let mut all_keys = Vec::new(); - for l in deltas_to_compact.iter() { - // TODO: replace this with an await once we fully go async - all_keys.extend( - Handle::current().block_on( - l.clone() - .downcast_delta_layer() - .expect("delta layer") - .load_keys(ctx), - )?, - ); - } // The current stdlib sorting implementation is designed in a way where it is // particularly fast where the slice is made up of sorted sub-ranges. all_keys.sort_by_key(|(key, lsn, _size)| (*key, *lsn)); @@ -4656,7 +4659,7 @@ impl std::fmt::Debug for LocalLayerInfoForDiskUsageEviction { impl LocalLayerInfoForDiskUsageEviction { pub fn file_size(&self) -> u64 { - self.layer.file_size() + self.layer.layer_desc().file_size } } diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index f6f0d533d1..824d869bec 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -120,10 +120,9 @@ impl LayerManager { ensure!( lsn > last_record_lsn, - "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})\n{}", + "cannot modify relation after advancing last_record_lsn (incoming_lsn={}, last_record_lsn={})", lsn, last_record_lsn, - std::backtrace::Backtrace::force_capture(), ); // Do we have a layer open for writing already? @@ -278,7 +277,7 @@ impl LayerManager { updates: &mut BatchedUpdates<'_>, mapping: &mut LayerFileManager, ) { - updates.remove_historic(layer.layer_desc().clone()); + updates.remove_historic(layer.layer_desc()); mapping.remove(layer); } @@ -292,10 +291,10 @@ impl LayerManager { metrics: &TimelineMetrics, mapping: &mut LayerFileManager, ) -> anyhow::Result<()> { + let desc = layer.layer_desc(); if !layer.is_remote_layer() { layer.delete_resident_layer_file()?; - let layer_file_size = layer.file_size(); - metrics.resident_physical_size_gauge.sub(layer_file_size); + metrics.resident_physical_size_gauge.sub(desc.file_size); } // TODO Removing from the bottom of the layer map is expensive. @@ -303,7 +302,7 @@ impl LayerManager { // won't be needed for page reconstruction for this timeline, // and mark what we can't delete yet as deleted from the layer // map index without actually rebuilding the index. - updates.remove_historic(layer.layer_desc().clone()); + updates.remove_historic(desc); mapping.remove(layer); Ok(())