mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-24 00:20:37 +00:00
pageserver: schedule frozen layer uploads inside the layers lock (#5639)
## Problem Compaction's source of truth for what layers exist is the LayerManager. `flush_frozen_layer` updates LayerManager before it has scheduled upload of the frozen layer. Compaction can then "see" the new layer, decide to delete it, schedule uploads of replacement layers, all before `flush_frozen_layer` wakes up again and schedules the upload. When the upload is scheduled, the local layer file may be gone, in which case we end up with no such layer in remote storage, but an entry still added to IndexPart pointing to the missing layer. ## Summary of changes Schedule layer uploads inside the `self.layers` lock, so that whenever a frozen layer is present in LayerManager, it is also present in RemoteTimelineClient's metadata. Closes: #5635
This commit is contained in:
@@ -60,6 +60,8 @@ pub(super) async fn upload_timeline_layer<'a>(
|
||||
bail!("failpoint before-upload-layer")
|
||||
});
|
||||
|
||||
pausable_failpoint!("before-upload-layer-pausable");
|
||||
|
||||
let storage_path = remote_path(conf, source_path, generation)?;
|
||||
let source_file_res = fs::File::open(&source_path).await;
|
||||
let source_file = match source_file_res {
|
||||
|
||||
@@ -2793,10 +2793,13 @@ impl Timeline {
|
||||
)
|
||||
};
|
||||
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now. The flushed layer is stored in
|
||||
// the mapping in `create_delta_layer`.
|
||||
{
|
||||
let metadata = {
|
||||
let mut guard = self.layers.write().await;
|
||||
|
||||
if let Some(ref l) = delta_layer_to_add {
|
||||
@@ -2812,8 +2815,17 @@ impl Timeline {
|
||||
}
|
||||
|
||||
guard.finish_flush_l0_layer(delta_layer_to_add, &frozen_layer);
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
|
||||
// Schedule remote uploads that will reflect our new disk_consistent_lsn
|
||||
Some(self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
// release lock on 'layers'
|
||||
}
|
||||
};
|
||||
|
||||
// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
|
||||
// a compaction can delete the file and then it won't be available for uploads any more.
|
||||
@@ -2829,28 +2841,22 @@ impl Timeline {
|
||||
//
|
||||
// TODO: This perhaps should be done in 'flush_frozen_layers', after flushing
|
||||
// *all* the layers, to avoid fsyncing the file multiple times.
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
let old_disk_consistent_lsn = self.disk_consistent_lsn.load();
|
||||
|
||||
// If we were able to advance 'disk_consistent_lsn', save it the metadata file.
|
||||
// After crash, we will restart WAL streaming and processing from that point.
|
||||
if disk_consistent_lsn != old_disk_consistent_lsn {
|
||||
assert!(disk_consistent_lsn > old_disk_consistent_lsn);
|
||||
self.update_metadata_file(disk_consistent_lsn, layer_paths_to_upload)
|
||||
// If we updated our disk_consistent_lsn, persist the updated metadata to local disk.
|
||||
if let Some(metadata) = metadata {
|
||||
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
|
||||
.await
|
||||
.context("update_metadata_file")?;
|
||||
// Also update the in-memory copy
|
||||
self.disk_consistent_lsn.store(disk_consistent_lsn);
|
||||
.context("save_metadata")?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update metadata file
|
||||
async fn update_metadata_file(
|
||||
fn schedule_uploads(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> anyhow::Result<TimelineMetadata> {
|
||||
// We can only save a valid 'prev_record_lsn' value on disk if we
|
||||
// flushed *all* in-memory changes to disk. We only track
|
||||
// 'prev_record_lsn' in memory for the latest processed record, so we
|
||||
@@ -2887,10 +2893,6 @@ impl Timeline {
|
||||
x.unwrap()
|
||||
));
|
||||
|
||||
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
for (path, layer_metadata) in layer_paths_to_upload {
|
||||
remote_client.schedule_layer_file_upload(&path, &layer_metadata)?;
|
||||
@@ -2898,6 +2900,20 @@ impl Timeline {
|
||||
remote_client.schedule_index_upload_for_metadata_update(&metadata)?;
|
||||
}
|
||||
|
||||
Ok(metadata)
|
||||
}
|
||||
|
||||
async fn update_metadata_file(
|
||||
&self,
|
||||
disk_consistent_lsn: Lsn,
|
||||
layer_paths_to_upload: HashMap<LayerFileName, LayerFileMetadata>,
|
||||
) -> anyhow::Result<()> {
|
||||
let metadata = self.schedule_uploads(disk_consistent_lsn, layer_paths_to_upload)?;
|
||||
|
||||
save_metadata(self.conf, &self.tenant_id, &self.timeline_id, &metadata)
|
||||
.await
|
||||
.context("save_metadata")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user