diff --git a/Cargo.lock b/Cargo.lock index 1cd96d278e..9436b591d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2748,7 +2748,6 @@ dependencies = [ "strum_macros", "svg_fmt", "sync_wrapper", - "take_mut", "tempfile", "tenant_size_model", "thiserror", @@ -4198,12 +4197,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "take_mut" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" - [[package]] name = "tar" version = "0.4.38" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 99abfb2fb5..ea81544cbe 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -75,7 +75,6 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true -take_mut = "0.2.2" [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 7b6e10148a..8e5e6cbcb1 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1066,65 +1066,67 @@ impl RemoteTimelineClient { // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); - let mut res = None; - take_mut::take(&mut *guard, |queue| { - match queue { - UploadQueue::Uninitialized => { - res = Some(Err(StopError::QueueUninitialized)); - queue - } - UploadQueue::Stopped(_) => { - // nothing to do - info!("another concurrent task already shut down the queue"); - res = Some(Ok(())); - queue - } - UploadQueue::Initialized(qi) => { - let UploadQueueInitialized { - task_counter: _, - latest_files, - // XXX need to think about what it means if it's non-zero here - latest_files_changes_since_metadata_upload_scheduled: _, - latest_metadata, - last_uploaded_consistent_lsn, - num_inprogress_layer_uploads, - num_inprogress_metadata_uploads, - num_inprogress_deletions, - inprogress_tasks, - queued_operations, - } = qi; - - // consistency check - assert_eq!( - num_inprogress_layer_uploads - + num_inprogress_metadata_uploads - + num_inprogress_deletions, - inprogress_tasks.len() - ); - - // We don't need to do anything here for in-progress tasks. They will finish - // on their own, decrement the unfinished-task counter themselves, and observe - // that the queue is Stopped. - drop(inprogress_tasks); - - // Tear down queued ops - for op in queued_operations.into_iter() { - self.calls_unfinished_metric_end(&op); - // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() - // which is exactly what we want to happen. - drop(op); - } - res = Some(Ok(())); - UploadQueue::Stopped(UploadQueueStopped { - latest_files, - last_uploaded_consistent_lsn, - latest_metadata, - deleted_at: None, - }) - } + match &*guard { + UploadQueue::Uninitialized => Err(StopError::QueueUninitialized), + UploadQueue::Stopped(_) => { + // nothing to do + info!("another concurrent task already shut down the queue"); + Ok(()) } - }); - res.expect("the closure above always sets res") + UploadQueue::Initialized(qi) => { + info!("shutting down upload queue"); + + // Prepare index part to put into stopped state + let index_part = IndexPart::new( + qi.latest_files.clone(), + qi.last_uploaded_consistent_lsn, + qi.latest_metadata + .to_bytes() + .map_err(StopError::SerializeMetadata)?, + ); + + // Replace the queue with the Stopped state, taking ownership of the old + // Initialized queue. We will do some checks on it, and then drop it. + let qi = { + let upload_queue = std::mem::replace( + &mut *guard, + UploadQueue::Stopped(UploadQueueStopped { + last_uploaded_index_part: index_part, + }), + ); + if let UploadQueue::Initialized(qi) = upload_queue { + qi + } else { + unreachable!("we checked in the match above that it is Initialized"); + } + }; + + // consistency check + assert_eq!( + qi.num_inprogress_layer_uploads + + qi.num_inprogress_metadata_uploads + + qi.num_inprogress_deletions, + qi.inprogress_tasks.len() + ); + + // We don't need to do anything here for in-progress tasks. They will finish + // on their own, decrement the unfinished-task counter themselves, and observe + // that the queue is Stopped. + drop(qi.inprogress_tasks); + + // Tear down queued ops + for op in qi.queued_operations.into_iter() { + self.calls_unfinished_metric_end(&op); + // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() + // which is exactly what we want to happen. + drop(op); + } + + // We're done. + drop(guard); + Ok(()) + } + } } }