Abort uploads if the tenant/timeline is requested to shut down

- Introduce another UploadQueue::Stopped enum variant to indicate the state
  where the UploadQueue is shut down.
- In perform_upload_task, wait concurrently for tenant/timeline
  shutdown. If we are requested to shut down, the first in-progress tasks
  that notices the shutdown request transitions the queue from
  UploadQueue::Initialized to UploadQueue::Stopped state.
  This involves dropping all the queued ops that are not yet
  in progress, which conveniently unblocks wait_completion() calls
  that are waiting for their barrier to be executed.
  They will receive an Err(), and do something sensible.

Right now, wait_completion() is only used by tests, but I
suspect that we should be using it in wherever we
delete layer files, e.g., GC and compaction, as explained
in the storage_sync.rs block comment section "Consistency".

This change also fixes
test_timeline_deletion_with_files_stuck_in_upload_queue
which I added in the previous commit.
Before, timeline delete would wait until all in-progress
tasks and queued tasks were done.
If, like in the test, a task was stuck due to upload error,
timeline deletion would wait forever. Now it gets an error.

Co-authored-by: Heikki Linnakangas <heikki@neon.tech>
This commit is contained in:
Christian Schwarz
2022-11-24 11:29:29 -05:00
committed by Heikki Linnakangas
parent ef95637c65
commit dd2a77c2ef

View File

@@ -103,7 +103,7 @@
//! the client's tenant and timeline.
//! Dropping the client will drop queued operations but not executing operations.
//! These will complete unless the `task_mgr` tasks are cancelled using `task_mgr`
//! APIs, e.g., during pageserver shutdown or tenant detach.
//! APIs, e.g., during pageserver shutdown, timeline delete, or tenant detach.
//!
//! # Completion
//!
@@ -234,13 +234,14 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
// re-export this
pub use download::is_temp_download_file;
pub use download::list_remote_timelines;
use tracing::{info_span, Instrument};
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::ops::DerefMut;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -310,7 +311,19 @@ pub struct RemoteTimelineClient {
enum UploadQueue {
Uninitialized,
Initialized(UploadQueueInitialized),
Stopped(UploadQueueStopped),
}
impl UploadQueue {
fn as_str(&self) -> &'static str {
match self {
UploadQueue::Uninitialized => "Uninitialized",
UploadQueue::Initialized(_) => "Initialized",
UploadQueue::Stopped(_) => "Stopped",
}
}
}
/// This keeps track of queued and in-progress tasks.
struct UploadQueueInitialized {
/// Counter to assign task IDs
@@ -348,6 +361,10 @@ struct UploadQueueInitialized {
queued_operations: VecDeque<UploadOp>,
}
struct UploadQueueStopped {
last_uploaded_consistent_lsn: Lsn,
}
impl UploadQueue {
fn initialize_empty_remote(
&mut self,
@@ -355,7 +372,9 @@ impl UploadQueue {
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
UploadQueue::Initialized(_) => anyhow::bail!("already initialized"),
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
anyhow::bail!("already initialized, state {}", self.as_str())
}
}
info!("initializing upload queue for empty remote");
@@ -386,7 +405,9 @@ impl UploadQueue {
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
UploadQueue::Initialized(_) => anyhow::bail!("already initialized"),
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
anyhow::bail!("already initialized, state {}", self.as_str())
}
}
let mut files = HashMap::new();
@@ -422,10 +443,12 @@ impl UploadQueue {
Ok(self.initialized_mut().expect("we just set it"))
}
fn initialized_mut(&mut self) -> Option<&mut UploadQueueInitialized> {
fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(x) => Some(x),
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(x) => Ok(x),
}
}
}
@@ -493,11 +516,11 @@ impl RemoteTimelineClient {
}
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
self.upload_queue
.lock()
.unwrap()
.initialized_mut()
.map(|q| q.last_uploaded_consistent_lsn)
match &*self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn),
}
}
//
@@ -542,9 +565,7 @@ impl RemoteTimelineClient {
if layer_metadata.file_size().is_none() {
let new_metadata = LayerFileMetadata::new(downloaded_size);
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
let upload_queue = guard.initialized_mut()?;
if let Some(upgraded) = upload_queue.latest_files.get_mut(path) {
upgraded.merge(&new_metadata);
} else {
@@ -575,9 +596,7 @@ impl RemoteTimelineClient {
metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
let upload_queue = guard.initialized_mut()?;
// As documented in the struct definition, it's ok for latest_metadata to be
// ahead of what's _actually_ on the remote during index upload.
@@ -614,9 +633,7 @@ impl RemoteTimelineClient {
layer_metadata: &LayerFileMetadata,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
let upload_queue = guard.initialized_mut()?;
// The file size can be missing for files that were created before we tracked that
// in the metadata, but it should be present for any new files we create.
@@ -652,9 +669,7 @@ impl RemoteTimelineClient {
/// upload operations have completed succesfully.
pub fn schedule_layer_file_deletion(self: &Arc<Self>, paths: &[PathBuf]) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
let upload_queue = guard.initialized_mut()?;
// Update the remote index file, removing the to-be-deleted files from the index,
// before deleting the actual files.
@@ -700,9 +715,7 @@ impl RemoteTimelineClient {
{
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard
.initialized_mut()
.context("upload queue is not initialized")?;
let upload_queue = guard.initialized_mut()?;
upload_queue.queued_operations.push_back(barrier_op);
// Don't count this kind of operation!
@@ -710,7 +723,9 @@ impl RemoteTimelineClient {
self.launch_queued_tasks(upload_queue);
}
receiver.changed().await?;
if receiver.changed().await.is_err() {
anyhow::bail!("wait_completion aborted because upload queue was stopped");
}
Ok(())
}
@@ -796,11 +811,10 @@ impl RemoteTimelineClient {
"remote upload",
false,
async move {
let task_clone = Arc::clone(&task);
self_rc.perform_upload_task(task).await;
self_rc.update_upload_queue_unfinished_metric(-1, &task_clone.op);
Ok(())
},
}
.instrument(info_span!(parent: None, "remote_upload", tenant = %self.tenant_id, timeline = %self.timeline_id, upload_task_id = %task_id)),
);
// Loop back to process next task
@@ -815,9 +829,28 @@ impl RemoteTimelineClient {
/// removed it from the `inprogress_tasks` list, and any next task(s) in the
/// queue that were waiting by the completion are launched.
///
/// The task can be shut down, however. That leads to stopping the whole
/// queue.
///
async fn perform_upload_task(self: &Arc<Self>, task: Arc<UploadTask>) {
// Loop to retry until it completes.
loop {
// If we're requested to shut down, close up shop and exit.
//
// Note: We only check for the shutdown requests between retries, so
// if a shutdown request arrives while we're busy uploading, in the
// upload::upload:*() call below, we will wait not exit until it has
// finisheed. We probably could cancel the upload by simply dropping
// the Future, but we're not 100% sure if the remote storage library
// is cancellation safe, so we don't dare to do that. Hopefully, the
// upload finishes or times out soon enough.
if task_mgr::is_shutdown_requested() {
info!("upload task cancelled by shutdown request");
self.update_upload_queue_unfinished_metric(-1, &task.op);
self.stop();
return;
}
let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref path, ref layer_metadata) => {
upload::upload_timeline_layer(&self.storage_impl, path, layer_metadata)
@@ -875,12 +908,15 @@ impl RemoteTimelineClient {
task.op, retries, e
);
exponential_backoff(
retries,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await;
// sleep until it's time to retry, or we're cancelled
tokio::select! {
_ = task_mgr::shutdown_watcher() => { },
_ = exponential_backoff(
retries,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
) => { },
};
}
}
}
@@ -897,10 +933,16 @@ impl RemoteTimelineClient {
// The task has completed succesfully. Remove it from the in-progress list.
{
let mut upload_queue = self.upload_queue.lock().unwrap();
let mut upload_queue = upload_queue.initialized_mut().expect(
"callers are responsible for ensuring this is only called on initialized queue",
);
let mut upload_queue_guard = self.upload_queue.lock().unwrap();
let upload_queue = match upload_queue_guard.deref_mut() {
UploadQueue::Uninitialized => panic!("callers are responsible for ensuring this is only called on an initialized queue"),
UploadQueue::Stopped(_) => {
info!("another concurrent task already stopped the queue");
return;
}, // nothing to do
UploadQueue::Initialized(qi) => { qi }
};
upload_queue.inprogress_tasks.remove(&task.task_id);
match task.op {
@@ -920,6 +962,7 @@ impl RemoteTimelineClient {
// Launch any queued tasks that were unblocked by this one.
self.launch_queued_tasks(upload_queue);
}
self.update_upload_queue_unfinished_metric(-1, &task.op);
}
fn update_upload_queue_unfinished_metric(&self, delta: i64, op: &UploadOp) {
@@ -941,6 +984,66 @@ impl RemoteTimelineClient {
.unwrap()
.add(delta)
}
fn stop(&self) {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &*guard {
UploadQueue::Uninitialized => panic!(
"callers are responsible for ensuring this is only called on initialized queue"
),
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
}
UploadQueue::Initialized(qi) => {
info!("shutting down upload queue");
// Replace the queue with the Stopped state, taking ownership of the old
// Initialized queue. We will do some checks on it, and then drop it.
let qi = {
let last_uploaded_consistent_lsn = qi.last_uploaded_consistent_lsn;
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
last_uploaded_consistent_lsn,
}),
);
if let UploadQueue::Initialized(qi) = upload_queue {
qi
} else {
unreachable!("we checked in the match above that it is Initialized");
}
};
// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(qi.inprogress_tasks);
// Tear down queued ops
for op in qi.queued_operations.into_iter() {
self.update_upload_queue_unfinished_metric(-1, &op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
// We're done.
drop(guard);
}
}
}
}
///