diff --git a/Cargo.lock b/Cargo.lock index ce24bbcee8..88ae3b57d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2677,6 +2677,7 @@ dependencies = [ "strum_macros", "svg_fmt", "sync_wrapper", + "take_mut", "tempfile", "tenant_size_model", "thiserror", @@ -4105,6 +4106,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tar" version = "0.4.38" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0bc7eba95e..62a8c377db 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -74,6 +74,7 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +take_mut = "0.2.2" [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 97bc002466..525725eb9e 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -204,11 +204,11 @@ mod download; pub mod index; mod upload; +use anyhow::Context; use chrono::Utc; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; use scopeguard::ScopeGuard; -use utils::bin_ser::SerializeError; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -266,8 +266,6 @@ pub enum StopError { /// Callers are responsible for checking this before calling `stop()`. #[error("queue is not initialized")] QueueUninitialized, - #[error("serialize metadata: {0:#}")] - SerializeMetadata(SerializeError), } /// A client for accessing a timeline's data in remote storage. @@ -354,7 +352,7 @@ impl RemoteTimelineClient { match &*self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn), - UploadQueue::Stopped(q) => Some(q.last_uploaded_index_part.disk_consistent_lsn), + UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn), } } @@ -643,7 +641,7 @@ impl RemoteTimelineClient { pub(crate) async fn persist_index_part_with_deleted_flag( self: &Arc, ) -> anyhow::Result<()> { - let index_part = { + let index_part_with_deleted_at = { let mut locked = self.upload_queue.lock().unwrap(); // We must be in stopped state because otherwise @@ -655,11 +653,22 @@ impl RemoteTimelineClient { UploadQueue::Stopped(stopped) => stopped, }; - if let Some(deleted_at) = stopped.last_uploaded_index_part.deleted_at.as_ref() { + if let Some(deleted_at) = stopped.deleted_at.as_ref() { anyhow::bail!("timeline is deleting, deleted_at: {:?}", deleted_at); } - stopped.last_uploaded_index_part.deleted_at = Some(Utc::now().naive_utc()); - stopped.last_uploaded_index_part.clone() + let deleted_at = Utc::now().naive_utc(); + stopped.deleted_at = Some(deleted_at); + + let mut index_part = IndexPart::new( + stopped.latest_files.clone(), + stopped.last_uploaded_consistent_lsn, + stopped + .latest_metadata + .to_bytes() + .context("serialize metadata")?, + ); + index_part.deleted_at = Some(deleted_at); + index_part }; let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| { @@ -671,7 +680,7 @@ impl RemoteTimelineClient { ), UploadQueue::Stopped(stopped) => stopped, }; - stopped.last_uploaded_index_part.deleted_at = None; + stopped.deleted_at = None; }); #[cfg(feature = "testing")] @@ -695,7 +704,7 @@ impl RemoteTimelineClient { &self.storage_impl, self.tenant_id, self.timeline_id, - &index_part, + &index_part_with_deleted_at, ) .await?; @@ -822,8 +831,11 @@ impl RemoteTimelineClient { // upload finishes or times out soon enough. if task_mgr::is_shutdown_requested() { info!("upload task cancelled by shutdown request"); - if let Err(e) = self.stop() { - error!("got an error when trying to stop remote client: {e}") + match self.stop() { + Ok(()) => {} + Err(StopError::QueueUninitialized) => { + unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back") + } } self.calls_unfinished_metric_end(&task.op); return; @@ -1017,67 +1029,65 @@ impl RemoteTimelineClient { // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); - match &*guard { - UploadQueue::Uninitialized => Err(StopError::QueueUninitialized), - UploadQueue::Stopped(_) => { - // nothing to do - info!("another concurrent task already shut down the queue"); - Ok(()) - } - UploadQueue::Initialized(qi) => { - info!("shutting down upload queue"); - - // Prepare index part to put into stopped state - let index_part = IndexPart::new( - qi.latest_files.clone(), - qi.last_uploaded_consistent_lsn, - qi.latest_metadata - .to_bytes() - .map_err(StopError::SerializeMetadata)?, - ); - - // Replace the queue with the Stopped state, taking ownership of the old - // Initialized queue. We will do some checks on it, and then drop it. - let qi = { - let upload_queue = std::mem::replace( - &mut *guard, - UploadQueue::Stopped(UploadQueueStopped { - last_uploaded_index_part: index_part, - }), - ); - if let UploadQueue::Initialized(qi) = upload_queue { - qi - } else { - unreachable!("we checked in the match above that it is Initialized"); - } - }; - - // consistency check - assert_eq!( - qi.num_inprogress_layer_uploads - + qi.num_inprogress_metadata_uploads - + qi.num_inprogress_deletions, - qi.inprogress_tasks.len() - ); - - // We don't need to do anything here for in-progress tasks. They will finish - // on their own, decrement the unfinished-task counter themselves, and observe - // that the queue is Stopped. - drop(qi.inprogress_tasks); - - // Tear down queued ops - for op in qi.queued_operations.into_iter() { - self.calls_unfinished_metric_end(&op); - // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() - // which is exactly what we want to happen. - drop(op); + let mut res = None; + take_mut::take(&mut *guard, |queue| { + match queue { + UploadQueue::Uninitialized => { + res = Some(Err(StopError::QueueUninitialized)); + queue } + UploadQueue::Stopped(_) => { + // nothing to do + info!("another concurrent task already shut down the queue"); + res = Some(Ok(())); + queue + } + UploadQueue::Initialized(qi) => { + let UploadQueueInitialized { + task_counter: _, + latest_files, + // XXX need to think about what it means if it's non-zero here + latest_files_changes_since_metadata_upload_scheduled: _, + latest_metadata, + last_uploaded_consistent_lsn, + num_inprogress_layer_uploads, + num_inprogress_metadata_uploads, + num_inprogress_deletions, + inprogress_tasks, + queued_operations, + } = qi; - // We're done. - drop(guard); - Ok(()) + // consistency check + assert_eq!( + num_inprogress_layer_uploads + + num_inprogress_metadata_uploads + + num_inprogress_deletions, + inprogress_tasks.len() + ); + + // We don't need to do anything here for in-progress tasks. They will finish + // on their own, decrement the unfinished-task counter themselves, and observe + // that the queue is Stopped. + drop(inprogress_tasks); + + // Tear down queued ops + for op in queued_operations.into_iter() { + self.calls_unfinished_metric_end(&op); + // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() + // which is exactly what we want to happen. + drop(op); + } + res = Some(Ok(())); + UploadQueue::Stopped(UploadQueueStopped { + latest_files, + last_uploaded_consistent_lsn, + latest_metadata, + deleted_at: None, + }) + } } - } + }); + res.expect("the closure above always sets res") } } diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 21c090e209..e1651d3fa7 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -7,6 +7,7 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; +use chrono::NaiveDateTime; use std::sync::Arc; use tracing::info; @@ -18,7 +19,7 @@ use utils::lsn::Lsn; // that many upload queues in a running pageserver, and most of them are initialized // anyway. #[allow(clippy::large_enum_variant)] -pub(crate) enum UploadQueue { +pub(super) enum UploadQueue { Uninitialized, Initialized(UploadQueueInitialized), Stopped(UploadQueueStopped), @@ -75,9 +76,12 @@ pub(crate) struct UploadQueueInitialized { pub(crate) queued_operations: VecDeque, } -pub(crate) struct UploadQueueStopped { - /// Index part is needed here so timeline_delete can access it - pub(super) last_uploaded_index_part: IndexPart, +pub(super) struct UploadQueueStopped { + pub(super) latest_files: HashMap, + pub(super) last_uploaded_consistent_lsn: Lsn, + pub(super) latest_metadata: TimelineMetadata, + /// If Some(), a call to `persist_index_part_with_deleted_flag` is ongoing or finished. + pub(super) deleted_at: Option, } impl UploadQueue {