mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-28 10:30:40 +00:00
fix: schedule_compaction_update must only unlink (#5675)
#5649 added the concept of dangling layers which #4938 uses but only partially. I forgot to change `schedule_compaction_update` to not schedule deletions to uphold the "have a layer, you can read it". With the now remembered fix, I don't think these checks should ever fail except for a mistake I already did. These changes might be useful for protecting future changes, even though the Layer carrying the generation AND the `schedule_(gc|compaction)_update` require strong arcs. Rationale for keeping the `#[cfg(feature = "testing")]` is worsening any leak situation which might come up.
This commit is contained in:
@@ -721,6 +721,17 @@ impl RemoteTimelineClient {
|
||||
})
|
||||
.collect();
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
for (name, gen) in &with_generations {
|
||||
if let Some(unexpected) = upload_queue.dangling_files.insert(name.to_owned(), *gen) {
|
||||
if &unexpected == gen {
|
||||
tracing::error!("{name} was unlinked twice with same generation");
|
||||
} else {
|
||||
tracing::error!("{name} was unlinked twice with different generations {gen:?} and {unexpected:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// after unlinking files from the upload_queue.latest_files we must always schedule an
|
||||
// index_part update, because that needs to be uploaded before we can actually delete the
|
||||
// files.
|
||||
@@ -754,6 +765,19 @@ impl RemoteTimelineClient {
|
||||
info!("scheduling deletion of layer {}{}", name, gen.get_suffix());
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
for (name, gen) in &with_generations {
|
||||
match upload_queue.dangling_files.remove(name) {
|
||||
Some(same) if &same == gen => { /* expected */ }
|
||||
Some(other) => {
|
||||
tracing::error!("{name} was unlinked with {other:?} but deleted with {gen:?}");
|
||||
}
|
||||
None => {
|
||||
tracing::error!("{name} was unlinked but was not dangling");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// schedule the actual deletions
|
||||
let op = UploadOp::Delete(Delete {
|
||||
layers: with_generations,
|
||||
@@ -779,9 +803,7 @@ impl RemoteTimelineClient {
|
||||
|
||||
let names = compacted_from.iter().map(|x| x.layer_desc().filename());
|
||||
|
||||
let with_generations =
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
|
||||
self.schedule_deletion_of_unlinked0(upload_queue, with_generations);
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
|
||||
self.launch_queued_tasks(upload_queue);
|
||||
|
||||
Ok(())
|
||||
@@ -1434,6 +1456,8 @@ impl RemoteTimelineClient {
|
||||
num_inprogress_deletions: 0,
|
||||
inprogress_tasks: HashMap::default(),
|
||||
queued_operations: VecDeque::default(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::default(),
|
||||
};
|
||||
|
||||
let upload_queue = std::mem::replace(
|
||||
|
||||
@@ -80,6 +80,14 @@ pub(crate) struct UploadQueueInitialized {
|
||||
/// tasks to finish. For example, metadata upload cannot be performed before all
|
||||
/// preceding layer file uploads have completed.
|
||||
pub(crate) queued_operations: VecDeque<UploadOp>,
|
||||
|
||||
/// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
|
||||
/// for error logging.
|
||||
///
|
||||
/// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
|
||||
/// bug causing leaks, then it's better to not leave this enabled for production builds.
|
||||
#[cfg(feature = "testing")]
|
||||
pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
|
||||
}
|
||||
|
||||
impl UploadQueueInitialized {
|
||||
@@ -136,6 +144,8 @@ impl UploadQueue {
|
||||
num_inprogress_deletions: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::new(),
|
||||
};
|
||||
|
||||
*self = UploadQueue::Initialized(state);
|
||||
@@ -181,6 +191,8 @@ impl UploadQueue {
|
||||
num_inprogress_deletions: 0,
|
||||
inprogress_tasks: HashMap::new(),
|
||||
queued_operations: VecDeque::new(),
|
||||
#[cfg(feature = "testing")]
|
||||
dangling_files: HashMap::new(),
|
||||
};
|
||||
|
||||
*self = UploadQueue::Initialized(state);
|
||||
|
||||
Reference in New Issue
Block a user