From e1e627e8977a595af45306f4bac9027b5e078850 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 4 May 2023 18:04:45 +0200 Subject: [PATCH] introduce the Broken state, using take_mut::take_with_recover --- pageserver/src/tenant.rs | 6 + .../src/tenant/remote_timeline_client.rs | 161 +++++++++++------- pageserver/src/tenant/upload_queue.rs | 8 +- 3 files changed, 110 insertions(+), 65 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 0061e4facc..7d5b0f2fd9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1408,6 +1408,12 @@ impl Tenant { match &res { Ok(()) => {} Err(e) => match e { + remote_timeline_client::StopError::QueueBroken => { + // This happens if there's a panic inside above stop() call, + // and we call stop() again after that. + // The calling again can happen because we won't poison any + // mutexes on the unwind path at the first panicking call. + } remote_timeline_client::StopError::QueueUninitialized => { // This could happen if the timeline is Broken, e.g., because it failed to fetch IndexPart when it was loaded. } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 7b6e10148a..fe34698061 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -268,6 +268,8 @@ pub enum StopError { /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`]. #[error("queue is not initialized")] QueueUninitialized, + #[error("queue is broken")] + QueueBroken, } /// A client for accessing a timeline's data in remote storage. @@ -353,6 +355,7 @@ impl RemoteTimelineClient { pub fn last_uploaded_consistent_lsn(&self) -> Option { match &*self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, + UploadQueue::Broken => None, // could we return something? UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn), UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn), } @@ -664,8 +667,12 @@ impl RemoteTimelineClient { // we can have inprogress index part upload that can overwrite the file // with missing is_deleted flag that we going to set below let stopped = match &mut *locked { - UploadQueue::Uninitialized => anyhow::bail!("is not Stopped but Uninitialized"), - UploadQueue::Initialized(_) => anyhow::bail!("is not Stopped but Initialized"), + UploadQueue::Uninitialized | UploadQueue::Initialized(_) | UploadQueue::Broken => { + anyhow::bail!( + "upload queue must be in state Stopped, but is in state {}", + locked.as_str() + ); + } UploadQueue::Stopped(stopped) => stopped, }; @@ -690,10 +697,12 @@ impl RemoteTimelineClient { let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| { let mut locked = self_clone.upload_queue.lock().unwrap(); let stopped = match &mut *locked { - UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!( - "there's no way out of Stopping, and we checked it's Stopping above: {:?}", - locked.as_str(), - ), + UploadQueue::Broken | UploadQueue::Uninitialized | UploadQueue::Initialized(_) => { + unreachable!( + "there's no way out of Stopping, and we checked it's Stopping above: {:?}", + locked.as_str(), + ) + } UploadQueue::Stopped(stopped) => stopped, }; stopped.deleted_at = None; @@ -846,9 +855,14 @@ impl RemoteTimelineClient { // is cancellation safe, so we don't dare to do that. Hopefully, the // upload finishes or times out soon enough. if task_mgr::is_shutdown_requested() { - info!("upload task cancelled by shutdown request"); + info!("upload task cancelled by shutdown request, stopping queue"); match self.stop() { Ok(()) => {} + Err(StopError::QueueBroken) => { + warn!("stop() observed upload queue as broken"); + // In this case, it's still ok to proceed with balancing out the metric and returning. + // (The metric has nothing to do with the queue state itself). + } Err(StopError::QueueUninitialized) => { unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back") } @@ -978,6 +992,10 @@ impl RemoteTimelineClient { info!("another concurrent task already stopped the queue"); return; }, // nothing to do + UploadQueue::Broken => { + warn!("the upload queue became broken while the task was running"); + return; + } UploadQueue::Initialized(qi) => { qi } }; @@ -1067,63 +1085,82 @@ impl RemoteTimelineClient { // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); let mut res = None; - take_mut::take(&mut *guard, |queue| { - match queue { - UploadQueue::Uninitialized => { - res = Some(Err(StopError::QueueUninitialized)); - queue - } - UploadQueue::Stopped(_) => { - // nothing to do - info!("another concurrent task already shut down the queue"); - res = Some(Ok(())); - queue - } - UploadQueue::Initialized(qi) => { - let UploadQueueInitialized { - task_counter: _, - latest_files, - // XXX need to think about what it means if it's non-zero here - latest_files_changes_since_metadata_upload_scheduled: _, - latest_metadata, - last_uploaded_consistent_lsn, - num_inprogress_layer_uploads, - num_inprogress_metadata_uploads, - num_inprogress_deletions, - inprogress_tasks, - queued_operations, - } = qi; - - // consistency check - assert_eq!( - num_inprogress_layer_uploads - + num_inprogress_metadata_uploads - + num_inprogress_deletions, - inprogress_tasks.len() - ); - - // We don't need to do anything here for in-progress tasks. They will finish - // on their own, decrement the unfinished-task counter themselves, and observe - // that the queue is Stopped. - drop(inprogress_tasks); - - // Tear down queued ops - for op in queued_operations.into_iter() { - self.calls_unfinished_metric_end(&op); - // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() - // which is exactly what we want to happen. - drop(op); + // If any of the asserts fail, we'll transition the queue into Broken state + // and then continue to panic. + // As part of that transition, `queued_operations` will get dropped. + // `wait_completion` operations waiting for these will observe an error. + // That's exactly what we want. + // We don't need to care about in-progress operations because that responsibility + // lies with the caller. There's no point for them to try anything funky, like, + // catching the panic and retrying the stop() call. We will return QueueBroken in that case. + take_mut::take_or_recover( + &mut *guard, + || { + warn!("transitioning the upload queue into Broken state"); + UploadQueue::Broken + }, + |queue| { + match queue { + UploadQueue::Broken => { + res = Some(Err(StopError::QueueBroken)); + queue + } + UploadQueue::Uninitialized => { + res = Some(Err(StopError::QueueUninitialized)); + queue + } + UploadQueue::Stopped(_) => { + // nothing to do + info!("another concurrent task already shut down the queue"); + res = Some(Ok(())); + queue + } + UploadQueue::Initialized(qi) => { + let UploadQueueInitialized { + task_counter: _, + latest_files, + // XXX need to think about what it means if it's non-zero here + latest_files_changes_since_metadata_upload_scheduled: _, + latest_metadata, + last_uploaded_consistent_lsn, + num_inprogress_layer_uploads, + num_inprogress_metadata_uploads, + num_inprogress_deletions, + inprogress_tasks, + queued_operations, + } = qi; + + // consistency check + assert_eq!( + num_inprogress_layer_uploads + + num_inprogress_metadata_uploads + + num_inprogress_deletions, + inprogress_tasks.len() + ); + + // We don't need to do anything here for in-progress tasks. They will finish + // on their own, decrement the unfinished-task counter themselves, and observe + // that the queue is Stopped. + drop(inprogress_tasks); + + // Tear down queued ops + for op in queued_operations.into_iter() { + self.calls_unfinished_metric_end(&op); + // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() + // which is exactly what we want to happen. + drop(op); + } + res = Some(Ok(())); + UploadQueue::Stopped(UploadQueueStopped { + latest_files, + last_uploaded_consistent_lsn, + latest_metadata, + deleted_at: None, + }) } - res = Some(Ok(())); - UploadQueue::Stopped(UploadQueueStopped { - latest_files, - last_uploaded_consistent_lsn, - latest_metadata, - deleted_at: None, - }) } - } - }); + }, + ); res.expect("the closure above always sets res") } } diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index e1651d3fa7..5fc67e9bc1 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -23,6 +23,7 @@ pub(super) enum UploadQueue { Uninitialized, Initialized(UploadQueueInitialized), Stopped(UploadQueueStopped), + Broken, } impl UploadQueue { @@ -31,6 +32,7 @@ impl UploadQueue { UploadQueue::Uninitialized => "Uninitialized", UploadQueue::Initialized(_) => "Initialized", UploadQueue::Stopped(_) => "Stopped", + UploadQueue::Broken => "Broken", } } } @@ -91,7 +93,7 @@ impl UploadQueue { ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), - UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { + UploadQueue::Initialized(_) | UploadQueue::Stopped(_) | UploadQueue::Broken => { anyhow::bail!("already initialized, state {}", self.as_str()) } } @@ -125,7 +127,7 @@ impl UploadQueue { ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), - UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { + UploadQueue::Initialized(_) | UploadQueue::Stopped(_) | UploadQueue::Broken => { anyhow::bail!("already initialized, state {}", self.as_str()) } } @@ -175,7 +177,7 @@ impl UploadQueue { pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { match self { - UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { + UploadQueue::Broken | UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { anyhow::bail!("queue is in state {}", self.as_str()) } UploadQueue::Initialized(x) => Ok(x),