mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
Make schedule_layer_file_deletion atomic.
Make sure that we don't mutate the upload queue, until we're sure that we will complete all the changse. Otherwise, we could bail out after removing some files from upload_queue.latest_files, but not all. This is purely theoretical: there were only two ? calls in the function, and neither of them should actually return an error: 1. `RelativePath::from_local_path` only returns error if the argument path is not in the base directory. I.e. in this case, if the argument path was outside the timeline directory. Shouldn't happen. 2. latest_metadata.to_bytes() only returns an error if the serde serialization fails. It really shouldn't fail. I considered turning those into panics instead, but as long as the function as whole can return an Err, the callers need to be prepared for that anyway, so there's little difference. Nevertheless, I refactored the code a little to make it atomic even one of those can't-happen errors happen after all. And I used a closure to make it harder to introduce new dangerous ?-operators in the future.
This commit is contained in:
committed by
Heikki Linnakangas
parent
d36fd4f141
commit
2dad3a6445
@@ -684,38 +684,53 @@ impl RemoteTimelineClient {
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
// Update the remote index file, removing the to-be-deleted files from the index,
|
||||
// before deleting the actual files.
|
||||
// NB: deleting layers doesn't affect the values stored in TimelineMetadata,
|
||||
// so, we don't need update it.
|
||||
|
||||
// Convert the paths into RelativePaths, and gather other information we need.
|
||||
let mut relative_paths = Vec::with_capacity(paths.len());
|
||||
for path in paths {
|
||||
let relative_path = RelativePath::from_local_path(
|
||||
relative_paths.push(RelativePath::from_local_path(
|
||||
&self.conf.timeline_path(&self.timeline_id, &self.tenant_id),
|
||||
path,
|
||||
)?;
|
||||
upload_queue.latest_files.remove(&relative_path);
|
||||
)?);
|
||||
}
|
||||
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
|
||||
let index_part = IndexPart::new(
|
||||
upload_queue.latest_files.clone(),
|
||||
disk_consistent_lsn,
|
||||
upload_queue.latest_metadata.to_bytes()?,
|
||||
);
|
||||
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
|
||||
self.update_upload_queue_unfinished_metric(1, &op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
|
||||
// schedule the actual deletions
|
||||
for path in paths {
|
||||
let op = UploadOp::Delete(RemoteOpFileKind::Layer, PathBuf::from(path));
|
||||
// Deleting layers doesn't affect the values stored in TimelineMetadata,
|
||||
// so we don't need update it. Just serialize it.
|
||||
let metadata_bytes = upload_queue.latest_metadata.to_bytes()?;
|
||||
let disk_consistent_lsn = upload_queue.latest_metadata.disk_consistent_lsn();
|
||||
|
||||
// Update the remote index file, removing the to-be-deleted files from the index,
|
||||
// before deleting the actual files.
|
||||
//
|
||||
// Once we start removing files from upload_queue.latest_files, there's
|
||||
// no going back! Otherwise, some of the files would already be removed
|
||||
// from latest_files, but not yet scheduled for deletion. Use a closure
|
||||
// to syntactically forbid ? or bail! calls here.
|
||||
let no_bail_here = || {
|
||||
for relative_path in relative_paths {
|
||||
upload_queue.latest_files.remove(&relative_path);
|
||||
}
|
||||
|
||||
let index_part = IndexPart::new(
|
||||
upload_queue.latest_files.clone(),
|
||||
disk_consistent_lsn,
|
||||
metadata_bytes,
|
||||
);
|
||||
let op = UploadOp::UploadMetadata(index_part, disk_consistent_lsn);
|
||||
self.update_upload_queue_unfinished_metric(1, &op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
info!("scheduled layer file deletion {}", path.display());
|
||||
}
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
// schedule the actual deletions
|
||||
for path in paths {
|
||||
let op = UploadOp::Delete(RemoteOpFileKind::Layer, PathBuf::from(path));
|
||||
self.update_upload_queue_unfinished_metric(1, &op);
|
||||
upload_queue.queued_operations.push_back(op);
|
||||
info!("scheduled layer file deletion {}", path.display());
|
||||
}
|
||||
|
||||
// Launch the tasks immediately, if possible
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
};
|
||||
no_bail_here();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -164,7 +164,10 @@ def test_tenants_attached_after_download(
|
||||
detail_before = client.timeline_detail(
|
||||
tenant_id, timeline_id, include_non_incremental_physical_size=True
|
||||
)
|
||||
assert detail_before["current_physical_size_non_incremental"] == detail_before["current_physical_size"]
|
||||
assert (
|
||||
detail_before["current_physical_size_non_incremental"]
|
||||
== detail_before["current_physical_size"]
|
||||
)
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user