From f72415e1fd952274f132a47baaddbf0a4ac912de Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 25 Mar 2024 18:42:18 +0100 Subject: [PATCH] refactor(remote_timeline_client): infallible stop() and shutdown() (#7234) preliminary refactoring for https://github.com/neondatabase/neon/pull/7233 part of #7062 --- pageserver/src/tenant.rs | 2 +- .../src/tenant/remote_timeline_client.rs | 77 ++++++++----------- pageserver/src/tenant/timeline.rs | 15 +--- pageserver/src/tenant/timeline/delete.rs | 18 +---- pageserver/src/tenant/upload_queue.rs | 14 +++- 5 files changed, 51 insertions(+), 75 deletions(-) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 7bd85b6fd5..b923e473ce 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2141,7 +2141,7 @@ impl Tenant { // Shut down the timeline's remote client: this means that the indices we write // for child shards will not be invalidated by the parent shard deleting layers. - tl_client.shutdown().await?; + tl_client.shutdown().await; // Download methods can still be used after shutdown, as they don't flow through the remote client's // queue. In principal the RemoteTimelineClient could provide this without downloading it, but this diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index c0a150eb0d..b4b3243d11 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -217,7 +217,7 @@ use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::download::download_retry; use crate::tenant::storage_layer::AsLayerDesc; -use crate::tenant::upload_queue::Delete; +use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable}; use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::{ config::PageServerConf, @@ -265,15 +265,6 @@ pub enum MaybeDeletedIndexPart { Deleted(IndexPart), } -/// Errors that can arise when calling [`RemoteTimelineClient::stop`]. -#[derive(Debug, thiserror::Error)] -pub enum StopError { - /// Returned if the upload queue was never initialized. - /// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`]. - #[error("queue is not initialized")] - QueueUninitialized, -} - #[derive(Debug, thiserror::Error)] pub enum PersistIndexPartWithDeletedFlagError { #[error("another task is already setting the deleted_flag, started at {0:?}")] @@ -390,15 +381,10 @@ impl RemoteTimelineClient { "bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted" ))?; - { - let mut upload_queue = self.upload_queue.lock().unwrap(); - upload_queue.initialize_with_current_remote_index_part(index_part)?; - self.update_remote_physical_size_gauge(Some(index_part)); - } - // also locks upload queue, without dropping the guard above it will be a deadlock - self.stop().expect("initialized line above"); - let mut upload_queue = self.upload_queue.lock().unwrap(); + upload_queue.initialize_with_current_remote_index_part(index_part)?; + self.update_remote_physical_size_gauge(Some(index_part)); + self.stop_impl(&mut upload_queue); upload_queue .stopped_mut() @@ -412,7 +398,8 @@ impl RemoteTimelineClient { match &mut *self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(), - UploadQueue::Stopped(q) => q + UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None, + UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q .upload_queue_for_deletion .get_last_remote_consistent_lsn_projected(), } @@ -422,7 +409,8 @@ impl RemoteTimelineClient { match &mut *self.upload_queue.lock().unwrap() { UploadQueue::Uninitialized => None, UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()), - UploadQueue::Stopped(q) => Some( + UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None, + UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some( q.upload_queue_for_deletion .get_last_remote_consistent_lsn_visible(), ), @@ -889,7 +877,7 @@ impl RemoteTimelineClient { /// Wait for all previously scheduled operations to complete, and then stop. /// /// Not cancellation safe - pub(crate) async fn shutdown(self: &Arc) -> Result<(), StopError> { + pub(crate) async fn shutdown(self: &Arc) { // On cancellation the queue is left in ackward state of refusing new operations but // proper stop is yet to be called. On cancel the original or some later task must call // `stop` or `shutdown`. @@ -900,8 +888,12 @@ impl RemoteTimelineClient { let fut = { let mut guard = self.upload_queue.lock().unwrap(); let upload_queue = match &mut *guard { - UploadQueue::Stopped(_) => return Ok(()), - UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized), + UploadQueue::Stopped(_) => return, + UploadQueue::Uninitialized => { + // transition into Stopped state + self.stop_impl(&mut guard); + return; + } UploadQueue::Initialized(ref mut init) => init, }; @@ -933,7 +925,7 @@ impl RemoteTimelineClient { } } - self.stop() + self.stop(); } /// Set the deleted_at field in the remote index file. @@ -1314,12 +1306,7 @@ impl RemoteTimelineClient { // upload finishes or times out soon enough. if cancel.is_cancelled() { info!("upload task cancelled by shutdown request"); - match self.stop() { - Ok(()) => {} - Err(StopError::QueueUninitialized) => { - unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back") - } - } + self.stop(); return; } @@ -1574,17 +1561,23 @@ impl RemoteTimelineClient { /// In-progress operations will still be running after this function returns. /// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))` /// to wait for them to complete, after calling this function. - pub(crate) fn stop(&self) -> Result<(), StopError> { + pub(crate) fn stop(&self) { // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. let mut guard = self.upload_queue.lock().unwrap(); - match &mut *guard { - UploadQueue::Uninitialized => Err(StopError::QueueUninitialized), + self.stop_impl(&mut guard); + } + + fn stop_impl(&self, guard: &mut std::sync::MutexGuard) { + match &mut **guard { + UploadQueue::Uninitialized => { + info!("UploadQueue is in state Uninitialized, nothing to do"); + **guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized); + } UploadQueue::Stopped(_) => { // nothing to do info!("another concurrent task already shut down the queue"); - Ok(()) } UploadQueue::Initialized(initialized) => { info!("shutting down upload queue"); @@ -1617,11 +1610,13 @@ impl RemoteTimelineClient { }; let upload_queue = std::mem::replace( - &mut *guard, - UploadQueue::Stopped(UploadQueueStopped { - upload_queue_for_deletion, - deleted_at: SetDeletedFlagProgress::NotRunning, - }), + &mut **guard, + UploadQueue::Stopped(UploadQueueStopped::Deletable( + UploadQueueStoppedDeletable { + upload_queue_for_deletion, + deleted_at: SetDeletedFlagProgress::NotRunning, + }, + )), ); if let UploadQueue::Initialized(qi) = upload_queue { qi @@ -1650,10 +1645,6 @@ impl RemoteTimelineClient { // which is exactly what we want to happen. drop(op); } - - // We're done. - drop(guard); - Ok(()) } } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 15ffa72aaa..6c6bb4b788 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -54,6 +54,7 @@ use std::{ ops::ControlFlow, }; +use crate::deletion_queue::DeletionQueueClient; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, @@ -64,7 +65,6 @@ use crate::{ disk_usage_eviction_task::DiskUsageEvictionInfo, pgdatadir_mapping::CollectKeySpaceError, }; -use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError}; use crate::{ disk_usage_eviction_task::finite_f32, tenant::storage_layer::{ @@ -1241,11 +1241,7 @@ impl Timeline { // what is problematic is the shutting down of RemoteTimelineClient, because // obviously it does not make sense to stop while we wait for it, but what // about corner cases like s3 suddenly hanging up? - if let Err(e) = client.shutdown().await { - // Non-fatal. Shutdown is infallible. Failures to flush just mean that - // we have some extra WAL replay to do next time the timeline starts. - warn!("failed to flush to remote storage: {e:#}"); - } + client.shutdown().await; } } Err(e) => { @@ -1282,12 +1278,7 @@ impl Timeline { // Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in // case our caller wants to use that for a deletion if let Some(remote_client) = self.remote_client.as_ref() { - match remote_client.stop() { - Ok(()) => {} - Err(StopError::QueueUninitialized) => { - // Shutting down during initialization is legal - } - } + remote_client.stop(); } tracing::debug!("Waiting for tasks..."); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d2272fc75f..e9afbfd8ba 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -16,9 +16,7 @@ use crate::{ tenant::{ debug_assert_current_span_has_tenant_and_timeline_id, metadata::TimelineMetadata, - remote_timeline_client::{ - self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient, - }, + remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient}, CreateTimelineCause, DeleteTimelineError, Tenant, }, }; @@ -50,19 +48,7 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> { // Prevent new uploads from starting. if let Some(remote_client) = timeline.remote_client.as_ref() { - let res = remote_client.stop(); - match res { - Ok(()) => {} - Err(e) => match e { - remote_timeline_client::StopError::QueueUninitialized => { - // This case shouldn't happen currently because the - // load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart. - // That is, before we declare the Tenant as Active. - // But we only allow calls to delete_timeline on Active tenants. - return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs"))); - } - }, - } + remote_client.stop(); } // Stop & wait for the remaining timeline tasks, including upload tasks. diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index a5516bb9a9..0bf4d1e599 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -121,11 +121,16 @@ pub(super) enum SetDeletedFlagProgress { Successful(NaiveDateTime), } -pub(super) struct UploadQueueStopped { +pub(super) struct UploadQueueStoppedDeletable { pub(super) upload_queue_for_deletion: UploadQueueInitialized, pub(super) deleted_at: SetDeletedFlagProgress, } +pub(super) enum UploadQueueStopped { + Deletable(UploadQueueStoppedDeletable), + Uninitialized, +} + #[derive(thiserror::Error, Debug)] pub(crate) enum NotInitialized { #[error("queue is in state Uninitialized")] @@ -249,12 +254,15 @@ impl UploadQueue { } } - pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> { + pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> { match self { UploadQueue::Initialized(_) | UploadQueue::Uninitialized => { anyhow::bail!("queue is in state {}", self.as_str()) } - UploadQueue::Stopped(stopped) => Ok(stopped), + UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => { + anyhow::bail!("queue is in state Stopped(Uninitialized)") + } + UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable), } } }