mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 23:12:54 +00:00
load_layer_map: schedule deletions for any future layers (#5103)
Unrelated fixes noticed while integrating #4938. - Stop leaking future layers in remote storage - We schedule extra index_part uploads if layer name to be removed was not actually present
This commit is contained in:
@@ -651,8 +651,9 @@ impl RemoteTimelineClient {
|
||||
// to syntactically forbid ? or bail! calls here.
|
||||
let no_bail_here = || {
|
||||
for name in names {
|
||||
upload_queue.latest_files.remove(name);
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
if upload_queue.latest_files.remove(name).is_some() {
|
||||
upload_queue.latest_files_changes_since_metadata_upload_scheduled += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if upload_queue.latest_files_changes_since_metadata_upload_scheduled > 0 {
|
||||
|
||||
@@ -1614,7 +1614,7 @@ impl Timeline {
|
||||
let (conf, tenant_id, timeline_id) = (self.conf, self.tenant_id, self.timeline_id);
|
||||
let span = tracing::Span::current();
|
||||
|
||||
let (loaded_layers, needs_upload, total_physical_size) = tokio::task::spawn_blocking({
|
||||
let (loaded_layers, to_sync, total_physical_size) = tokio::task::spawn_blocking({
|
||||
move || {
|
||||
let _g = span.entered();
|
||||
let discovered = init::scan_timeline_dir(&timeline_path)?;
|
||||
@@ -1660,6 +1660,7 @@ impl Timeline {
|
||||
|
||||
let mut loaded_layers = Vec::new();
|
||||
let mut needs_upload = Vec::new();
|
||||
let mut needs_cleanup = Vec::new();
|
||||
let mut total_physical_size = 0;
|
||||
|
||||
for (name, decision) in decided {
|
||||
@@ -1675,14 +1676,10 @@ impl Timeline {
|
||||
Err(FutureLayer { local }) => {
|
||||
if local.is_some() {
|
||||
path.push(name.file_name());
|
||||
init::cleanup_future_layer(&path, name, disk_consistent_lsn)?;
|
||||
init::cleanup_future_layer(&path, &name, disk_consistent_lsn)?;
|
||||
path.pop();
|
||||
} else {
|
||||
// we cannot do anything for remote layers, but not continuing to
|
||||
// process it will leave it out index_part.json as well.
|
||||
}
|
||||
//
|
||||
// we do not currently schedule deletions for these.
|
||||
needs_cleanup.push(name);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -1736,7 +1733,11 @@ impl Timeline {
|
||||
|
||||
loaded_layers.push(layer);
|
||||
}
|
||||
Ok((loaded_layers, needs_upload, total_physical_size))
|
||||
Ok((
|
||||
loaded_layers,
|
||||
(needs_upload, needs_cleanup),
|
||||
total_physical_size,
|
||||
))
|
||||
}
|
||||
})
|
||||
.await
|
||||
@@ -1748,9 +1749,11 @@ impl Timeline {
|
||||
guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1);
|
||||
|
||||
if let Some(rtc) = self.remote_client.as_ref() {
|
||||
let (needs_upload, needs_cleanup) = to_sync;
|
||||
for (layer, m) in needs_upload {
|
||||
rtc.schedule_layer_file_upload(&layer.layer_desc().filename(), &m)?;
|
||||
}
|
||||
rtc.schedule_layer_file_deletion(&needs_cleanup)?;
|
||||
rtc.schedule_index_upload_for_file_changes()?;
|
||||
// Tenant::create_timeline will wait for these uploads to happen before returning, or
|
||||
// on retry.
|
||||
|
||||
@@ -183,7 +183,7 @@ pub(super) fn cleanup_local_file_for_remote(
|
||||
|
||||
pub(super) fn cleanup_future_layer(
|
||||
path: &Path,
|
||||
name: LayerFileName,
|
||||
name: &LayerFileName,
|
||||
disk_consistent_lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
use LayerFileName::*;
|
||||
|
||||
Reference in New Issue
Block a user