replace take_mut usage with std::mem::replace + assignment

This commit is contained in:
Christian Schwarz
2023-05-04 18:12:12 +02:00
parent e1e627e897
commit ab89f164d8
3 changed files with 63 additions and 79 deletions

7
Cargo.lock generated
View File

@@ -2748,7 +2748,6 @@ dependencies = [
"strum_macros",
"svg_fmt",
"sync_wrapper",
"take_mut",
"tempfile",
"tenant_size_model",
"thiserror",
@@ -4198,12 +4197,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "take_mut"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tar"
version = "0.4.38"

View File

@@ -75,7 +75,6 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
take_mut = "0.2.2"
[dev-dependencies]
criterion.workspace = true

View File

@@ -1084,84 +1084,76 @@ impl RemoteTimelineClient {
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
let mut res = None;
// If any of the asserts fail, we'll transition the queue into Broken state
// and then continue to panic.
// As part of that transition, `queued_operations` will get dropped.
// `wait_completion` operations waiting for these will observe an error.
// That's exactly what we want.
// If any of the code below panics, the queue remains in Broken state.
// If we're coming from Initialized state, `queued_operations` will get dropped
// as part of the panic, because it sits in the local variable named `owned`.
// Any `wait_completion` operations against those queued operations
// will observe an error. That's exactly what we want.
// We don't need to care about in-progress operations because that responsibility
// lies with the caller. There's no point for them to try anything funky, like,
// catching the panic and retrying the stop() call. We will return QueueBroken in that case.
take_mut::take_or_recover(
&mut *guard,
|| {
warn!("transitioning the upload queue into Broken state");
UploadQueue::Broken
},
|queue| {
match queue {
UploadQueue::Broken => {
res = Some(Err(StopError::QueueBroken));
queue
}
UploadQueue::Uninitialized => {
res = Some(Err(StopError::QueueUninitialized));
queue
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
res = Some(Ok(()));
queue
}
UploadQueue::Initialized(qi) => {
let UploadQueueInitialized {
task_counter: _,
latest_files,
// XXX need to think about what it means if it's non-zero here
latest_files_changes_since_metadata_upload_scheduled: _,
latest_metadata,
last_uploaded_consistent_lsn,
num_inprogress_layer_uploads,
num_inprogress_metadata_uploads,
num_inprogress_deletions,
inprogress_tasks,
queued_operations,
} = qi;
let owned = std::mem::replace(&mut *guard, UploadQueue::Broken);
let res;
*guard = match owned {
UploadQueue::Broken => {
res = Err(StopError::QueueBroken);
owned
}
UploadQueue::Uninitialized => {
res = Err(StopError::QueueUninitialized);
owned
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
res = Ok(());
owned
}
UploadQueue::Initialized(qi) => {
let UploadQueueInitialized {
task_counter: _,
latest_files,
// XXX need to think about what it means if it's non-zero here
latest_files_changes_since_metadata_upload_scheduled: _,
latest_metadata,
last_uploaded_consistent_lsn,
num_inprogress_layer_uploads,
num_inprogress_metadata_uploads,
num_inprogress_deletions,
inprogress_tasks,
queued_operations,
} = qi;
// consistency check
assert_eq!(
num_inprogress_layer_uploads
+ num_inprogress_metadata_uploads
+ num_inprogress_deletions,
inprogress_tasks.len()
);
// consistency check
assert_eq!(
num_inprogress_layer_uploads
+ num_inprogress_metadata_uploads
+ num_inprogress_deletions,
inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(inprogress_tasks);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(inprogress_tasks);
// Tear down queued ops
for op in queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
res = Some(Ok(()));
UploadQueue::Stopped(UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: None,
})
}
// Tear down queued ops
for op in queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
},
);
res.expect("the closure above always sets res")
res = Ok(());
UploadQueue::Stopped(UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: None,
})
}
};
res
}
}