From 11c18b05aa67ecbe3ed3db2ae92e60da86d3bd31 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 27 Apr 2023 18:33:40 +0200 Subject: [PATCH] push metadata.to_bytes() out of stop() into persist_index_part_with_deleted_flag() This pushes the (unlikely) possibility of failure to serialize metadata out of stop(). That in turn leaves us with only one case of how stop() can fail. There are two callsites of stop(): 1. perform_upload_task: here, we can safely say "unreachable", and I think any future refactorings that might violate that invariant would notice, because the unreachable!() is close to the code that would likely be refactored. The unreachable!() is desirable there because otherwise we'd need to think about how to handle the error. Maybe the previous code would have done the right thing, maybe not. 2. delete_timeline: this is the new one, and, it's far away from the code that initializes the upload queue. Putting an unreachable!() there seems risky. So, bail out with an error. It will become a 500 status code, which console shall retry according to the openapi spec. We have test coverage that the retry can succeed. --- Cargo.lock | 7 + pageserver/Cargo.toml | 1 + .../src/tenant/remote_timeline_client.rs | 150 ++++++++++-------- pageserver/src/tenant/upload_queue.rs | 12 +- 4 files changed, 96 insertions(+), 74 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce24bbcee8..88ae3b57d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2677,6 +2677,7 @@ dependencies = [ "strum_macros", "svg_fmt", "sync_wrapper", + "take_mut", "tempfile", "tenant_size_model", "thiserror", @@ -4105,6 +4106,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + [[package]] name = "tar" version = "0.4.38" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 0bc7eba95e..62a8c377db 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -74,6 +74,7 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +take_mut = "0.2.2" [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 97bc002466..525725eb9e 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -204,11 +204,11 @@ mod download; pub mod index; mod upload; +use anyhow::Context; use chrono::Utc; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; use scopeguard::ScopeGuard; -use utils::bin_ser::SerializeError; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -266,8 +266,6 @@ pub enum StopError { /// Callers are responsible for checking this before calling `stop()`. #[error("queue is not initialized")] QueueUninitialized, - #[error("serialize metadata: {0:#}")] - SerializeMetadata(SerializeError), } /// A client for accessing a timeline's data in remote storage. @@ -354,7 +352,7 @@ impl RemoteTimelineClient { match &*self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn), - UploadQueue::Stopped(q) => Some(q.last_uploaded_index_part.disk_consistent_lsn), + UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn), } } @@ -643,7 +641,7 @@ impl RemoteTimelineClient { pub(crate) async fn persist_index_part_with_deleted_flag( self: &Arc, ) -> anyhow::Result<()> { - let index_part = { + let index_part_with_deleted_at = { let mut locked = self.upload_queue.lock().unwrap(); // We must be in stopped state because otherwise @@ -655,11 +653,22 @@ impl RemoteTimelineClient { UploadQueue::Stopped(stopped) => stopped, }; - if let Some(deleted_at) = stopped.last_uploaded_index_part.deleted_at.as_ref() { + if let Some(deleted_at) = stopped.deleted_at.as_ref() { anyhow::bail!("timeline is deleting, deleted_at: {:?}", deleted_at); } - stopped.last_uploaded_index_part.deleted_at = Some(Utc::now().naive_utc()); - stopped.last_uploaded_index_part.clone() + let deleted_at = Utc::now().naive_utc(); + stopped.deleted_at = Some(deleted_at); + + let mut index_part = IndexPart::new( + stopped.latest_files.clone(), + stopped.last_uploaded_consistent_lsn, + stopped + .latest_metadata + .to_bytes() + .context("serialize metadata")?, + ); + index_part.deleted_at = Some(deleted_at); + index_part }; let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| { @@ -671,7 +680,7 @@ impl RemoteTimelineClient { ), UploadQueue::Stopped(stopped) => stopped, }; - stopped.last_uploaded_index_part.deleted_at = None; + stopped.deleted_at = None; }); #[cfg(feature = "testing")] @@ -695,7 +704,7 @@ impl RemoteTimelineClient { &self.storage_impl, self.tenant_id, self.timeline_id, - &index_part, + &index_part_with_deleted_at, ) .await?; @@ -822,8 +831,11 @@ impl RemoteTimelineClient { // upload finishes or times out soon enough. if task_mgr::is_shutdown_requested() { info!("upload task cancelled by shutdown request"); - if let Err(e) = self.stop() { - error!("got an error when trying to stop remote client: {e}") + match self.stop() { + Ok(()) => {} + Err(StopError::QueueUninitialized) => { + unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back") + } } self.calls_unfinished_metric_end(&task.op); return; @@ -1017,67 +1029,65 @@ impl RemoteTimelineClient { // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); - match &*guard { - UploadQueue::Uninitialized => Err(StopError::QueueUninitialized), - UploadQueue::Stopped(_) => { - // nothing to do - info!("another concurrent task already shut down the queue"); - Ok(()) - } - UploadQueue::Initialized(qi) => { - info!("shutting down upload queue"); - - // Prepare index part to put into stopped state - let index_part = IndexPart::new( - qi.latest_files.clone(), - qi.last_uploaded_consistent_lsn, - qi.latest_metadata - .to_bytes() - .map_err(StopError::SerializeMetadata)?, - ); - - // Replace the queue with the Stopped state, taking ownership of the old - // Initialized queue. We will do some checks on it, and then drop it. - let qi = { - let upload_queue = std::mem::replace( - &mut *guard, - UploadQueue::Stopped(UploadQueueStopped { - last_uploaded_index_part: index_part, - }), - ); - if let UploadQueue::Initialized(qi) = upload_queue { - qi - } else { - unreachable!("we checked in the match above that it is Initialized"); - } - }; - - // consistency check - assert_eq!( - qi.num_inprogress_layer_uploads - + qi.num_inprogress_metadata_uploads - + qi.num_inprogress_deletions, - qi.inprogress_tasks.len() - ); - - // We don't need to do anything here for in-progress tasks. They will finish - // on their own, decrement the unfinished-task counter themselves, and observe - // that the queue is Stopped. - drop(qi.inprogress_tasks); - - // Tear down queued ops - for op in qi.queued_operations.into_iter() { - self.calls_unfinished_metric_end(&op); - // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() - // which is exactly what we want to happen. - drop(op); + let mut res = None; + take_mut::take(&mut *guard, |queue| { + match queue { + UploadQueue::Uninitialized => { + res = Some(Err(StopError::QueueUninitialized)); + queue } + UploadQueue::Stopped(_) => { + // nothing to do + info!("another concurrent task already shut down the queue"); + res = Some(Ok(())); + queue + } + UploadQueue::Initialized(qi) => { + let UploadQueueInitialized { + task_counter: _, + latest_files, + // XXX need to think about what it means if it's non-zero here + latest_files_changes_since_metadata_upload_scheduled: _, + latest_metadata, + last_uploaded_consistent_lsn, + num_inprogress_layer_uploads, + num_inprogress_metadata_uploads, + num_inprogress_deletions, + inprogress_tasks, + queued_operations, + } = qi; - // We're done. - drop(guard); - Ok(()) + // consistency check + assert_eq!( + num_inprogress_layer_uploads + + num_inprogress_metadata_uploads + + num_inprogress_deletions, + inprogress_tasks.len() + ); + + // We don't need to do anything here for in-progress tasks. They will finish + // on their own, decrement the unfinished-task counter themselves, and observe + // that the queue is Stopped. + drop(inprogress_tasks); + + // Tear down queued ops + for op in queued_operations.into_iter() { + self.calls_unfinished_metric_end(&op); + // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() + // which is exactly what we want to happen. + drop(op); + } + res = Some(Ok(())); + UploadQueue::Stopped(UploadQueueStopped { + latest_files, + last_uploaded_consistent_lsn, + latest_metadata, + deleted_at: None, + }) + } } - } + }); + res.expect("the closure above always sets res") } } diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 21c090e209..e1651d3fa7 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -7,6 +7,7 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; +use chrono::NaiveDateTime; use std::sync::Arc; use tracing::info; @@ -18,7 +19,7 @@ use utils::lsn::Lsn; // that many upload queues in a running pageserver, and most of them are initialized // anyway. #[allow(clippy::large_enum_variant)] -pub(crate) enum UploadQueue { +pub(super) enum UploadQueue { Uninitialized, Initialized(UploadQueueInitialized), Stopped(UploadQueueStopped), @@ -75,9 +76,12 @@ pub(crate) struct UploadQueueInitialized { pub(crate) queued_operations: VecDeque, } -pub(crate) struct UploadQueueStopped { - /// Index part is needed here so timeline_delete can access it - pub(super) last_uploaded_index_part: IndexPart, +pub(super) struct UploadQueueStopped { + pub(super) latest_files: HashMap, + pub(super) last_uploaded_consistent_lsn: Lsn, + pub(super) latest_metadata: TimelineMetadata, + /// If Some(), a call to `persist_index_part_with_deleted_flag` is ongoing or finished. + pub(super) deleted_at: Option, } impl UploadQueue {