Add missing fsyncs

This commit is contained in:
Heikki Linnakangas
2022-03-18 11:40:49 +02:00
parent 13ec0ce7b2
commit d383ed4e68

View File

@@ -1450,39 +1450,44 @@ impl LayeredTimeline {
Ok(())
}
/// Flush one frozen in-memory layer to disk, as a new delta layer.
fn flush_frozen_layer(&self, frozen_layer: Arc<InMemoryLayer>) -> Result<()> {
// Do we have a frozen in-memory layer that we need to write out?
let new_delta = frozen_layer.write_to_disk()?;
let new_delta_path = new_delta.path();
// Sync the new layer to disk.
//
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// TODO: If we're running inside 'flush_frozen_layers' and there are multiple
// files to flush, it might be better to first write them all, and then fsync
// them all in parallel.
par_fsync::par_fsync(&[
new_delta_path.clone(),
self.conf.timeline_path(&self.timelineid, &self.tenantid),
])?;
// Finally, replace the frozen in-memory layer with the new on-disk layers
let mut layers = self.layers.lock().unwrap();
let l = layers.frozen_layers.pop_front();
{
let mut layers = self.layers.lock().unwrap();
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(Arc::ptr_eq(&l.unwrap(), &frozen_layer));
// Only one thread may call this function at a time (for this
// timeline). If two threads tried to flush the same frozen
// layer to disk at the same time, that would not work.
assert!(Arc::ptr_eq(&l.unwrap(), &frozen_layer));
// Add the new delta layer to the LayerMap
let mut layer_paths = vec![new_delta.path()];
layers.insert_historic(Arc::new(new_delta));
// Add the new delta layer to the LayerMap
layers.insert_historic(Arc::new(new_delta));
drop(layers);
// Sync layers
if !layer_paths.is_empty() {
// We must fsync the timeline dir to ensure the directory entries for
// new layer files are durable
layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid));
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
par_fsync::par_fsync(&layer_paths)?;
layer_paths.pop().unwrap();
// release lock on 'layers'
}
// Compute new 'disk_consistent_lsn'
// Update the metadata file, with new 'disk_consistent_lsn'
//
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
// *all* the layers, to avoid fsyncing the file multiple times.
let disk_consistent_lsn;
disk_consistent_lsn = Lsn(frozen_layer.get_lsn_range().end.0 - 1);
@@ -1538,7 +1543,7 @@ impl LayeredTimeline {
schedule_timeline_checkpoint_upload(
self.tenantid,
self.timelineid,
layer_paths,
vec![new_delta_path],
metadata,
);
}
@@ -1678,10 +1683,25 @@ impl LayeredTimeline {
}
let image_layer = image_layer_writer.finish()?;
// Sync the new layer to disk before adding it to the layer map, to make sure
// we don't garbage collect something based on the new layer, before it has
// reached the disk.
//
// We must also fsync the timeline dir to ensure the directory entries for
// new layer files are durable
//
// Compaction creates multiple image layers. It would be better to create them all
// and fsync them all in parallel.
par_fsync::par_fsync(&[
image_layer.path(),
self.conf.timeline_path(&self.timelineid, &self.tenantid),
])?;
// FIXME: Do we need to do something to upload it to remote storage here?
let mut layers = self.layers.lock().unwrap();
layers.insert_historic(Arc::new(image_layer));
drop(layers);
// FIXME: need to fsync?
Ok(())
}