handle duplicate l1s almost safely

there is still the possibility of the old layer not really being
readable or returning bogus values if the internal representation has
shifted. cannot do anything for that... unless we rename them away, or
something like that.
This commit is contained in:
Joonas Koivunen
2023-08-30 10:22:04 +03:00
parent 544136f13f
commit cd869e5737
2 changed files with 94 additions and 10 deletions

View File

@@ -36,8 +36,8 @@ use crate::context::{
use crate::tenant::storage_layer::delta_layer::DeltaEntry;
use crate::tenant::storage_layer::{
AsLayerDesc, DeltaLayerWriter, EvictionError, ImageLayerWriter, InMemoryLayer, Layer,
LayerAccessStatsReset, LayerFileName, ResidentLayer, ValueReconstructResult,
ValueReconstructState,
LayerAccessStatsReset, LayerFileName, PersistentLayerKey, ResidentLayer,
ValueReconstructResult, ValueReconstructState,
};
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
@@ -2744,6 +2744,7 @@ impl Timeline {
struct CompactLevel0Phase1Result {
new_layers: Vec<ResidentLayer>,
deltas_to_compact: Vec<Layer>,
possibly_overlapping_overwritten: HashMap<PersistentLayerKey, ResidentLayer>,
}
/// Top-level failure to compact.
@@ -2949,6 +2950,7 @@ impl Timeline {
return Ok(CompactLevel0Phase1Result {
new_layers,
deltas_to_compact: level0_deltas,
possibly_overlapping_overwritten: HashMap::new(),
});
}
}
@@ -3015,6 +3017,37 @@ impl Timeline {
.read_lock_held_spawn_blocking_startup_micros
.till_now();
// before we start, because we can produce duplicate L1 layers, we must first collect all
// existing L1's overlapping with the LSN range, and make sure they are resident for the
// duration of the compaction. any duplicate Layers will end up touching the same
// filesystem file on eviction, and we do not want that to happen.
let resident_overlapping_l1s = {
let existing_l1s = guard
.layer_map()
.iter_historic_layers()
.filter(|x| !LayerMap::is_l0(x))
.filter(|x| {
x.lsn_range.contains(&lsn_range.start)
|| x.lsn_range.contains(&(lsn_range.end + 1))
})
.map(|x| guard.get_from_desc(&x));
let mut resident_overlapping_l1s = HashMap::new();
// download all matching and keep them resident so that they cannot be downloaded over
// after we've compacted a new possibly different version on top.
for l1 in existing_l1s {
let guard = l1.download_and_keep_resident().await?;
let existing = resident_overlapping_l1s.insert(guard.layer_desc().key(), guard);
assert!(
existing.is_none(),
"cannot have duplicate layers in existing L1s"
);
}
resident_overlapping_l1s
};
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
@@ -3305,6 +3338,7 @@ impl Timeline {
.into_iter()
.map(|x| x.drop_eviction_guard())
.collect::<Vec<_>>(),
possibly_overlapping_overwritten: resident_overlapping_l1s,
})
}
@@ -3321,6 +3355,7 @@ impl Timeline {
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
mut possibly_overlapping_overwritten,
} = {
let phase1_span = info_span!("compact_level0_phase1");
let ctx = ctx.attached_child();
@@ -3370,16 +3405,19 @@ impl Timeline {
// We should move to numbering the layer files instead of naming them using key range / LSN some day. But for
// now, we just skip the file to avoid unintentional modification to files on the disk and in the layer map.
let mut duplicated_layers = HashSet::new();
let mut duplicates = Vec::new();
let mut insert_layers = Vec::new();
for l in new_layers {
if let Some(remote_client) = &self.remote_client {
// upload even if duplicated, because we may have changed the contents
remote_client.schedule_layer_file_upload(l.clone())?;
}
if guard.contains(l.as_ref()) {
duplicated_layers.insert(l.layer_desc().key());
// here we should find a possibly_overlapping_overwritten
let key = l.layer_desc().key();
let old = possibly_overlapping_overwritten
.remove(&key)
.context("should not have duplicated a layer we had not made resident first")?;
duplicated_layers.insert(key);
l.keep_resident_while(&old);
duplicates.push((old, l));
} else if LayerMap::is_l0(l.layer_desc()) {
return Err(CompactionError::Other(anyhow!("compaction generates a L0 layer file as output, which will cause infinite compaction.")));
} else {
@@ -3396,14 +3434,46 @@ impl Timeline {
// deletion will happen later, the layer file manager sets wanted_garbage_collected
guard.finish_compact_l0(&layer_removal_cs, remove_layers, &insert_layers)?;
guard.finish_compact_l0(
&layer_removal_cs,
remove_layers,
&insert_layers,
&duplicates,
)?;
if !possibly_overlapping_overwritten.is_empty() {
use std::fmt::Write as _;
// will do a better version of selection soon after a test run
let mut s = String::new();
let mut first = true;
write!(s, "[").unwrap();
for value in possibly_overlapping_overwritten.values() {
if !first {
write!(s, ", ").unwrap();
}
first = false;
write!(s, "{value}").unwrap();
}
write!(s, "]").unwrap();
tracing::warn!(
count = possibly_overlapping_overwritten.len(),
layers = s,
"kept layers resident for no reason, selection needs to be better (and faster)"
);
}
drop_wlock(guard);
if let Some(rtc) = self.remote_client.as_ref() {
let (new, overwritten) = (insert_layers.len(), duplicates.len());
for needs_upload in insert_layers {
rtc.schedule_layer_file_upload(needs_upload)?;
}
for (_, needs_upload) in duplicates {
rtc.schedule_layer_file_upload(needs_upload)?;
}
tracing::info!(new, overwritten, "done scheduling all compaction uploads");
}
Ok(())

View File

@@ -191,6 +191,7 @@ impl LayerManager {
layer_removal_cs: &Arc<tokio::sync::OwnedMutexGuard<()>>,
compact_from: Vec<Layer>,
compact_to: &[ResidentLayer],
duplicates: &[(ResidentLayer, ResidentLayer)],
) -> Result<()> {
let mut updates = self.layer_map.batch_update();
for l in compact_to {
@@ -202,6 +203,9 @@ impl LayerManager {
// time, even though we dropped `Timeline::layers` inbetween.
Self::delete_historic_layer(layer_removal_cs, l, &mut updates, &mut self.layer_fmgr)?;
}
for (old, new) in duplicates {
self.layer_fmgr.replace(old.as_ref(), new.as_ref().clone());
}
updates.flush();
Ok(())
}
@@ -264,7 +268,7 @@ impl LayerManager {
pub(crate) struct LayerFileManager<T>(HashMap<PersistentLayerKey, T>);
impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
impl<T: AsLayerDesc + Clone + PartialEq + std::fmt::Debug> LayerFileManager<T> {
fn get_from_desc(&self, desc: &PersistentLayerDesc) -> T {
// The assumption for the `expect()` is that all code maintains the following invariant:
// A layer's descriptor is present in the LayerMap => the LayerFileManager contains a layer for the descriptor.
@@ -299,4 +303,14 @@ impl<T: AsLayerDesc + Clone> LayerFileManager<T> {
)
}
}
pub(crate) fn replace(&mut self, old: &T, new: T) {
let key = old.layer_desc().key();
assert_eq!(key, new.layer_desc().key());
if let Some(existing) = self.0.get_mut(&key) {
assert_eq!(existing, old);
*existing = new;
}
}
}