diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 2c48295ad0..526d0c952f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -806,11 +806,11 @@ pub struct LayeredTimeline { /// Used to ensure that there is only one thread layer_flush_lock: Mutex<()>, - // Prevent concurrent checkpoints. - // Checkpoints are normally performed by one thread. But checkpoint can also be manually requested by admin - // (that's used in tests), and shutdown also forces a checkpoint. These forced checkpoints run in a different thread - // and could be triggered at the same time as a normal checkpoint. - checkpoint_cs: Mutex<()>, + // Prevent concurrent compactions. + // Compactions are normally performed by one thread. But compaction can also be manually + // requested by admin (that's used in tests). These forced compactions run in a different + // thread and could be triggered at the same time as a normal, timed compaction. + compaction_cs: Mutex<()>, // Needed to ensure that we can't create a branch at a point that was already garbage collected latest_gc_cutoff_lsn: RwLock, @@ -1021,7 +1021,7 @@ impl LayeredTimeline { write_lock: Mutex::new(()), layer_flush_lock: Mutex::new(()), - checkpoint_cs: Mutex::new(()), + compaction_cs: Mutex::new(()), gc_info: RwLock::new(GcInfo { retain_lsns: Vec::new(), @@ -1557,6 +1557,7 @@ impl LayeredTimeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. + let _compaction_cs = self.compaction_cs.lock().unwrap(); let target_file_size = self.conf.checkpoint_distance; @@ -1735,6 +1736,20 @@ impl LayeredTimeline { new_layers.push(writer.finish(prev_key.unwrap().next())?); } + // Sync layers + if !new_layers.is_empty() { + let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); + + // also sync the directory + layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid)); + + // Fsync all the layer files and directory using multiple threads to + // minimize latency. + par_fsync::par_fsync(&layer_paths)?; + + layer_paths.pop().unwrap(); + } + let mut layers = self.layers.lock().unwrap(); for l in new_layers { layers.insert_historic(Arc::new(l)); @@ -1783,7 +1798,8 @@ impl LayeredTimeline { let now = Instant::now(); let mut result: GcResult = Default::default(); let disk_consistent_lsn = self.get_disk_consistent_lsn(); - let _checkpoint_cs = self.checkpoint_cs.lock().unwrap(); + + let _compaction_cs = self.compaction_cs.lock().unwrap(); let gc_info = self.gc_info.read().unwrap(); let retain_lsns = &gc_info.retain_lsns;