This partially reverts commit 11c18b05aa
for the take_mut parts.
This commit is contained in:
Joonas Koivunen
2023-05-04 21:50:17 +03:00
parent 8425b6ab21
commit fd6d81b044
3 changed files with 60 additions and 66 deletions

7
Cargo.lock generated
View File

@@ -2748,7 +2748,6 @@ dependencies = [
"strum_macros",
"svg_fmt",
"sync_wrapper",
"take_mut",
"tempfile",
"tenant_size_model",
"thiserror",
@@ -4198,12 +4197,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "take_mut"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tar"
version = "0.4.38"

View File

@@ -75,7 +75,6 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
take_mut = "0.2.2"
[dev-dependencies]
criterion.workspace = true

View File

@@ -1066,65 +1066,67 @@ impl RemoteTimelineClient {
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
let mut res = None;
take_mut::take(&mut *guard, |queue| {
match queue {
UploadQueue::Uninitialized => {
res = Some(Err(StopError::QueueUninitialized));
queue
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
res = Some(Ok(()));
queue
}
UploadQueue::Initialized(qi) => {
let UploadQueueInitialized {
task_counter: _,
latest_files,
// XXX need to think about what it means if it's non-zero here
latest_files_changes_since_metadata_upload_scheduled: _,
latest_metadata,
last_uploaded_consistent_lsn,
num_inprogress_layer_uploads,
num_inprogress_metadata_uploads,
num_inprogress_deletions,
inprogress_tasks,
queued_operations,
} = qi;
// consistency check
assert_eq!(
num_inprogress_layer_uploads
+ num_inprogress_metadata_uploads
+ num_inprogress_deletions,
inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(inprogress_tasks);
// Tear down queued ops
for op in queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
res = Some(Ok(()));
UploadQueue::Stopped(UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: None,
})
}
match &*guard {
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
Ok(())
}
});
res.expect("the closure above always sets res")
UploadQueue::Initialized(qi) => {
info!("shutting down upload queue");
// Prepare index part to put into stopped state
let index_part = IndexPart::new(
qi.latest_files.clone(),
qi.last_uploaded_consistent_lsn,
qi.latest_metadata
.to_bytes()
.map_err(StopError::SerializeMetadata)?,
);
// Replace the queue with the Stopped state, taking ownership of the old
// Initialized queue. We will do some checks on it, and then drop it.
let qi = {
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
last_uploaded_index_part: index_part,
}),
);
if let UploadQueue::Initialized(qi) = upload_queue {
qi
} else {
unreachable!("we checked in the match above that it is Initialized");
}
};
// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(qi.inprogress_tasks);
// Tear down queued ops
for op in qi.queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
// We're done.
drop(guard);
Ok(())
}
}
}
}