Prevent compaction from running at same time as GC.

For same reasons as we prohibited concurrent checkpointing and GC
previosly.
This commit is contained in:
Heikki Linnakangas
2022-03-14 14:22:04 +02:00
parent 09f2dff537
commit 89690d7349

View File

@@ -806,11 +806,11 @@ pub struct LayeredTimeline {
/// Used to ensure that there is only one thread
layer_flush_lock: Mutex<()>,
// Prevent concurrent checkpoints.
// Checkpoints are normally performed by one thread. But checkpoint can also be manually requested by admin
// (that's used in tests), and shutdown also forces a checkpoint. These forced checkpoints run in a different thread
// and could be triggered at the same time as a normal checkpoint.
checkpoint_cs: Mutex<()>,
// Prevent concurrent compactions.
// Compactions are normally performed by one thread. But compaction can also be manually
// requested by admin (that's used in tests). These forced compactions run in a different
// thread and could be triggered at the same time as a normal, timed compaction.
compaction_cs: Mutex<()>,
// Needed to ensure that we can't create a branch at a point that was already garbage collected
latest_gc_cutoff_lsn: RwLock<Lsn>,
@@ -1021,7 +1021,7 @@ impl LayeredTimeline {
write_lock: Mutex::new(()),
layer_flush_lock: Mutex::new(()),
checkpoint_cs: Mutex::new(()),
compaction_cs: Mutex::new(()),
gc_info: RwLock::new(GcInfo {
retain_lsns: Vec::new(),
@@ -1557,6 +1557,7 @@ impl LayeredTimeline {
// Below are functions compact_level0() and create_image_layers()
// but they are a bit ad hoc and don't quite work like it's explained
// above. Rewrite it.
let _compaction_cs = self.compaction_cs.lock().unwrap();
let target_file_size = self.conf.checkpoint_distance;
@@ -1735,6 +1736,20 @@ impl LayeredTimeline {
new_layers.push(writer.finish(prev_key.unwrap().next())?);
}
// Sync layers
if !new_layers.is_empty() {
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// also sync the directory
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
}
let mut layers = self.layers.lock().unwrap();
for l in new_layers {
layers.insert_historic(Arc::new(l));
@@ -1783,7 +1798,8 @@ impl LayeredTimeline {
let now = Instant::now();
let mut result: GcResult = Default::default();
let disk_consistent_lsn = self.get_disk_consistent_lsn();
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
let _compaction_cs = self.compaction_cs.lock().unwrap();
let gc_info = self.gc_info.read().unwrap();
let retain_lsns = &gc_info.retain_lsns;