diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 259c4a8d79..abb3725c1b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -36,8 +36,8 @@ use crate::context::{ use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer, - LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult, - ValueReconstructState, + LayerAccessStatsReset, LayerFileName, PersistentLayerKey, ResidentLayer, + ValueReconstructResult, ValueReconstructState, }; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ @@ -2744,6 +2744,7 @@ impl Timeline { struct CompactLevel0Phase1Result { new_layers: Vec, deltas_to_compact: Vec, + possibly_overlapping_overwritten: HashMap, } /// Top-level failure to compact. @@ -2949,6 +2950,7 @@ impl Timeline { return Ok(CompactLevel0Phase1Result { new_layers, deltas_to_compact: level0_deltas, + possibly_overlapping_overwritten: HashMap::new(), }); } } @@ -3015,6 +3017,37 @@ impl Timeline { .read_lock_held_spawn_blocking_startup_micros .till_now(); + // before we start, because we can produce duplicate L1 layers, we must first collect all + // existing L1's overlapping with the LSN range, and make sure they are resident for the + // duration of the compaction. any duplicate Layers will end up touching the same + // filesystem file on eviction, and we do not want that to happen. + let resident_overlapping_l1s = { + let existing_l1s = guard + .layer_map() + .iter_historic_layers() + .filter(|x| !LayerMap::is_l0(x)) + .filter(|x| { + x.lsn_range.contains(&lsn_range.start) + || x.lsn_range.contains(&(lsn_range.end + 1)) + }) + .map(|x| guard.get_from_desc(&x)); + + let mut resident_overlapping_l1s = HashMap::new(); + + // download all matching and keep them resident so that they cannot be downloaded over + // after we've compacted a new possibly different version on top. + for l1 in existing_l1s { + let guard = l1.download_and_keep_resident().await?; + let existing = resident_overlapping_l1s.insert(guard.layer_desc().key(), guard); + assert!( + existing.is_none(), + "cannot have duplicate layers in existing L1s" + ); + } + + resident_overlapping_l1s + }; + // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); @@ -3305,6 +3338,7 @@ impl Timeline { .into_iter() .map(|x| x.drop_eviction_guard()) .collect::>(), + possibly_overlapping_overwritten: resident_overlapping_l1s, }) } @@ -3321,6 +3355,7 @@ impl Timeline { let CompactLevel0Phase1Result { new_layers, deltas_to_compact, + mut possibly_overlapping_overwritten, } = { let phase1_span = info_span!("compact_level0_phase1"); let ctx = ctx.attached_child(); @@ -3370,16 +3405,19 @@ impl Timeline { // We should move to numbering the layer files instead of naming them using key range / LSN some day. But for // now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map. let mut duplicated_layers = HashSet::new(); - + let mut duplicates = Vec::new(); let mut insert_layers = Vec::new(); for l in new_layers { - if let Some(remote_client) = &self.remote_client { - // upload even if duplicated, because we may have changed the contents - remote_client.schedule_layer_file_upload(l.clone())?; - } if guard.contains(l.as_ref()) { - duplicated_layers.insert(l.layer_desc().key()); + // here we should find a possibly_overlapping_overwritten + let key = l.layer_desc().key(); + let old = possibly_overlapping_overwritten + .remove(&key) + .context("should not have duplicated a layer we had not made resident first")?; + duplicated_layers.insert(key); + l.keep_resident_while(&old); + duplicates.push((old, l)); } else if LayerMap::is_l0(l.layer_desc()) { return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction."))); } else { @@ -3396,14 +3434,46 @@ impl Timeline { // deletion will happen later, the layer file manager sets wanted_garbage_collected - guard.finish_compact_l0(&layer_removal_cs, remove_layers, &insert_layers)?; + guard.finish_compact_l0( + &layer_removal_cs, + remove_layers, + &insert_layers, + &duplicates, + )?; + + if !possibly_overlapping_overwritten.is_empty() { + use std::fmt::Write as _; + // will do a better version of selection soon after a test run + let mut s = String::new(); + let mut first = true; + write!(s, "[").unwrap(); + for value in possibly_overlapping_overwritten.values() { + if !first { + write!(s, ", ").unwrap(); + } + first = false; + write!(s, "{value}").unwrap(); + } + write!(s, "]").unwrap(); + + tracing::warn!( + count = possibly_overlapping_overwritten.len(), + layers = s, + "kept layers resident for no reason, selection needs to be better (and faster)" + ); + } drop_wlock(guard); if let Some(rtc) = self.remote_client.as_ref() { + let (new, overwritten) = (insert_layers.len(), duplicates.len()); for needs_upload in insert_layers { rtc.schedule_layer_file_upload(needs_upload)?; } + for (_, needs_upload) in duplicates { + rtc.schedule_layer_file_upload(needs_upload)?; + } + tracing::info!(new, overwritten, "done scheduling all compaction uploads"); } Ok(()) diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 7ded804ea0..594cfd7ebd 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -191,6 +191,7 @@ impl LayerManager { layer_removal_cs: &Arc>, compact_from: Vec, compact_to: &[ResidentLayer], + duplicates: &[(ResidentLayer, ResidentLayer)], ) -> Result<()> { let mut updates = self.layer_map.batch_update(); for l in compact_to { @@ -202,6 +203,9 @@ impl LayerManager { // time, even though we dropped `Timeline::layers` inbetween. Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr)?; } + for (old, new) in duplicates { + self.layer_fmgr.replace(old.as_ref(), new.as_ref().clone()); + } updates.flush(); Ok(()) } @@ -264,7 +268,7 @@ impl LayerManager { pub(crate) struct LayerFileManager(HashMap); -impl LayerFileManager { +impl LayerFileManager { fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T { // The assumption for the `expect()` is that all code maintains the following invariant: // A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor. @@ -299,4 +303,14 @@ impl LayerFileManager { ) } } + + pub(crate) fn replace(&mut self, old: &T, new: T) { + let key = old.layer_desc().key(); + assert_eq!(key, new.layer_desc().key()); + + if let Some(existing) = self.0.get_mut(&key) { + assert_eq!(existing, old); + *existing = new; + } + } }