From dd2a77c2ef3687f5c36377c67f5c179faca56dab Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 24 Nov 2022 11:29:29 -0500 Subject: [PATCH] Abort uploads if the tenant/timeline is requested to shut down - Introduce another UploadQueue::Stopped enum variant to indicate the state where the UploadQueue is shut down. - In perform_upload_task, wait concurrently for tenant/timeline shutdown. If we are requested to shut down, the first in-progress tasks that notices the shutdown request transitions the queue from UploadQueue::Initialized to UploadQueue::Stopped state. This involves dropping all the queued ops that are not yet in progress, which conveniently unblocks wait_completion() calls that are waiting for their barrier to be executed. They will receive an Err(), and do something sensible. Right now, wait_completion() is only used by tests, but I suspect that we should be using it in wherever we delete layer files, e.g., GC and compaction, as explained in the storage_sync.rs block comment section "Consistency". This change also fixes test_timeline_deletion_with_files_stuck_in_upload_queue which I added in the previous commit. Before, timeline delete would wait until all in-progress tasks and queued tasks were done. If, like in the test, a task was stuck due to upload error, timeline deletion would wait forever. Now it gets an error. Co-authored-by: Heikki Linnakangas --- pageserver/src/storage_sync.rs | 185 +++++++++++++++++++++++++-------- 1 file changed, 144 insertions(+), 41 deletions(-) diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 5db910f016..7d90f11ac4 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -103,7 +103,7 @@ //! the client's tenant and timeline. //! Dropping the client will drop queued operations but not executing operations. //! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr` -//! APIs, e.g., during pageserver shutdown or tenant detach. +//! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach. //! //! # Completion //! @@ -234,13 +234,14 @@ mod download; pub mod index; mod upload; -use anyhow::Context; // re-export this pub use download::is_temp_download_file; pub use download::list_remote_timelines; +use tracing::{info_span, Instrument}; use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; +use std::ops::DerefMut; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; @@ -310,7 +311,19 @@ pub struct RemoteTimelineClient { enum UploadQueue { Uninitialized, Initialized(UploadQueueInitialized), + Stopped(UploadQueueStopped), } + +impl UploadQueue { + fn as_str(&self) -> &'static str { + match self { + UploadQueue::Uninitialized => "Uninitialized", + UploadQueue::Initialized(_) => "Initialized", + UploadQueue::Stopped(_) => "Stopped", + } + } +} + /// This keeps track of queued and in-progress tasks. struct UploadQueueInitialized { /// Counter to assign task IDs @@ -348,6 +361,10 @@ struct UploadQueueInitialized { queued_operations: VecDeque, } +struct UploadQueueStopped { + last_uploaded_consistent_lsn: Lsn, +} + impl UploadQueue { fn initialize_empty_remote( &mut self, @@ -355,7 +372,9 @@ impl UploadQueue { ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), - UploadQueue::Initialized(_) => anyhow::bail!("already initialized"), + UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { + anyhow::bail!("already initialized, state {}", self.as_str()) + } } info!("initializing upload queue for empty remote"); @@ -386,7 +405,9 @@ impl UploadQueue { ) -> anyhow::Result<&mut UploadQueueInitialized> { match self { UploadQueue::Uninitialized => (), - UploadQueue::Initialized(_) => anyhow::bail!("already initialized"), + UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => { + anyhow::bail!("already initialized, state {}", self.as_str()) + } } let mut files = HashMap::new(); @@ -422,10 +443,12 @@ impl UploadQueue { Ok(self.initialized_mut().expect("we just set it")) } - fn initialized_mut(&mut self) -> Option<&mut UploadQueueInitialized> { + fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> { match self { - UploadQueue::Uninitialized => None, - UploadQueue::Initialized(x) => Some(x), + UploadQueue::Uninitialized | UploadQueue::Stopped(_) => { + anyhow::bail!("queue is in state {}", self.as_str()) + } + UploadQueue::Initialized(x) => Ok(x), } } } @@ -493,11 +516,11 @@ impl RemoteTimelineClient { } pub fn last_uploaded_consistent_lsn(&self) -> Option { - self.upload_queue - .lock() - .unwrap() - .initialized_mut() - .map(|q| q.last_uploaded_consistent_lsn) + match &*self.upload_queue.lock().unwrap() { + UploadQueue::Uninitialized => None, + UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn), + UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn), + } } // @@ -542,9 +565,7 @@ impl RemoteTimelineClient { if layer_metadata.file_size().is_none() { let new_metadata = LayerFileMetadata::new(downloaded_size); let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard - .initialized_mut() - .context("upload queue is not initialized")?; + let upload_queue = guard.initialized_mut()?; if let Some(upgraded) = upload_queue.latest_files.get_mut(path) { upgraded.merge(&new_metadata); } else { @@ -575,9 +596,7 @@ impl RemoteTimelineClient { metadata: &TimelineMetadata, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard - .initialized_mut() - .context("upload queue is not initialized")?; + let upload_queue = guard.initialized_mut()?; // As documented in the struct definition, it's ok for latest_metadata to be // ahead of what's _actually_ on the remote during index upload. @@ -614,9 +633,7 @@ impl RemoteTimelineClient { layer_metadata: &LayerFileMetadata, ) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard - .initialized_mut() - .context("upload queue is not initialized")?; + let upload_queue = guard.initialized_mut()?; // The file size can be missing for files that were created before we tracked that // in the metadata, but it should be present for any new files we create. @@ -652,9 +669,7 @@ impl RemoteTimelineClient { /// upload operations have completed succesfully. pub fn schedule_layer_file_deletion(self: &Arc, paths: &[PathBuf]) -> anyhow::Result<()> { let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard - .initialized_mut() - .context("upload queue is not initialized")?; + let upload_queue = guard.initialized_mut()?; // Update the remote index file, removing the to-be-deleted files from the index, // before deleting the actual files. @@ -700,9 +715,7 @@ impl RemoteTimelineClient { { let mut guard = self.upload_queue.lock().unwrap(); - let upload_queue = guard - .initialized_mut() - .context("upload queue is not initialized")?; + let upload_queue = guard.initialized_mut()?; upload_queue.queued_operations.push_back(barrier_op); // Don't count this kind of operation! @@ -710,7 +723,9 @@ impl RemoteTimelineClient { self.launch_queued_tasks(upload_queue); } - receiver.changed().await?; + if receiver.changed().await.is_err() { + anyhow::bail!("wait_completion aborted because upload queue was stopped"); + } Ok(()) } @@ -796,11 +811,10 @@ impl RemoteTimelineClient { "remote upload", false, async move { - let task_clone = Arc::clone(&task); self_rc.perform_upload_task(task).await; - self_rc.update_upload_queue_unfinished_metric(-1, &task_clone.op); Ok(()) - }, + } + .instrument(info_span!(parent: None, "remote_upload", tenant = %self.tenant_id, timeline = %self.timeline_id, upload_task_id = %task_id)), ); // Loop back to process next task @@ -815,9 +829,28 @@ impl RemoteTimelineClient { /// removed it from the `inprogress_tasks` list, and any next task(s) in the /// queue that were waiting by the completion are launched. /// + /// The task can be shut down, however. That leads to stopping the whole + /// queue. + /// async fn perform_upload_task(self: &Arc, task: Arc) { // Loop to retry until it completes. loop { + // If we're requested to shut down, close up shop and exit. + // + // Note: We only check for the shutdown requests between retries, so + // if a shutdown request arrives while we're busy uploading, in the + // upload::upload:*() call below, we will wait not exit until it has + // finisheed. We probably could cancel the upload by simply dropping + // the Future, but we're not 100% sure if the remote storage library + // is cancellation safe, so we don't dare to do that. Hopefully, the + // upload finishes or times out soon enough. + if task_mgr::is_shutdown_requested() { + info!("upload task cancelled by shutdown request"); + self.update_upload_queue_unfinished_metric(-1, &task.op); + self.stop(); + return; + } + let upload_result: anyhow::Result<()> = match &task.op { UploadOp::UploadLayer(ref path, ref layer_metadata) => { upload::upload_timeline_layer(&self.storage_impl, path, layer_metadata) @@ -875,12 +908,15 @@ impl RemoteTimelineClient { task.op, retries, e ); - exponential_backoff( - retries, - DEFAULT_BASE_BACKOFF_SECONDS, - DEFAULT_MAX_BACKOFF_SECONDS, - ) - .await; + // sleep until it's time to retry, or we're cancelled + tokio::select! { + _ = task_mgr::shutdown_watcher() => { }, + _ = exponential_backoff( + retries, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + ) => { }, + }; } } } @@ -897,10 +933,16 @@ impl RemoteTimelineClient { // The task has completed succesfully. Remove it from the in-progress list. { - let mut upload_queue = self.upload_queue.lock().unwrap(); - let mut upload_queue = upload_queue.initialized_mut().expect( - "callers are responsible for ensuring this is only called on initialized queue", - ); + let mut upload_queue_guard = self.upload_queue.lock().unwrap(); + let upload_queue = match upload_queue_guard.deref_mut() { + UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"), + UploadQueue::Stopped(_) => { + info!("another concurrent task already stopped the queue"); + return; + }, // nothing to do + UploadQueue::Initialized(qi) => { qi } + }; + upload_queue.inprogress_tasks.remove(&task.task_id); match task.op { @@ -920,6 +962,7 @@ impl RemoteTimelineClient { // Launch any queued tasks that were unblocked by this one. self.launch_queued_tasks(upload_queue); } + self.update_upload_queue_unfinished_metric(-1, &task.op); } fn update_upload_queue_unfinished_metric(&self, delta: i64, op: &UploadOp) { @@ -941,6 +984,66 @@ impl RemoteTimelineClient { .unwrap() .add(delta) } + + fn stop(&self) { + // Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue + // into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet. + // The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business. + let mut guard = self.upload_queue.lock().unwrap(); + match &*guard { + UploadQueue::Uninitialized => panic!( + "callers are responsible for ensuring this is only called on initialized queue" + ), + UploadQueue::Stopped(_) => { + // nothing to do + info!("another concurrent task already shut down the queue"); + } + UploadQueue::Initialized(qi) => { + info!("shutting down upload queue"); + + // Replace the queue with the Stopped state, taking ownership of the old + // Initialized queue. We will do some checks on it, and then drop it. + let qi = { + let last_uploaded_consistent_lsn = qi.last_uploaded_consistent_lsn; + let upload_queue = std::mem::replace( + &mut *guard, + UploadQueue::Stopped(UploadQueueStopped { + last_uploaded_consistent_lsn, + }), + ); + if let UploadQueue::Initialized(qi) = upload_queue { + qi + } else { + unreachable!("we checked in the match above that it is Initialized"); + } + }; + + // consistency check + assert_eq!( + qi.num_inprogress_layer_uploads + + qi.num_inprogress_metadata_uploads + + qi.num_inprogress_deletions, + qi.inprogress_tasks.len() + ); + + // We don't need to do anything here for in-progress tasks. They will finish + // on their own, decrement the unfinished-task counter themselves, and observe + // that the queue is Stopped. + drop(qi.inprogress_tasks); + + // Tear down queued ops + for op in qi.queued_operations.into_iter() { + self.update_upload_queue_unfinished_metric(-1, &op); + // Dropping UploadOp::Barrier() here will make wait_completion() return with an Err() + // which is exactly what we want to happen. + drop(op); + } + + // We're done. + drop(guard); + } + } + } } ///