introduce the Broken state, using take_mut::take_with_recover

This commit is contained in:
Christian Schwarz
2023-05-04 18:04:45 +02:00
parent 8425b6ab21
commit e1e627e897
3 changed files with 110 additions and 65 deletions

View File

@@ -1408,6 +1408,12 @@ impl Tenant {
match &res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueBroken => {
// This happens if there's a panic inside above stop() call,
// and we call stop() again after that.
// The calling again can happen because we won't poison any
// mutexes on the unwind path at the first panicking call.
}
remote_timeline_client::StopError::QueueUninitialized => {
// This could happen if the timeline is Broken, e.g., because it failed to fetch IndexPart when it was loaded.
}

View File

@@ -268,6 +268,8 @@ pub enum StopError {
/// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
#[error("queue is not initialized")]
QueueUninitialized,
#[error("queue is broken")]
QueueBroken,
}
/// A client for accessing a timeline's data in remote storage.
@@ -353,6 +355,7 @@ impl RemoteTimelineClient {
pub fn last_uploaded_consistent_lsn(&self) -> Option<Lsn> {
match &*self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Broken => None, // could we return something?
UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn),
}
@@ -664,8 +667,12 @@ impl RemoteTimelineClient {
// we can have inprogress index part upload that can overwrite the file
// with missing is_deleted flag that we going to set below
let stopped = match &mut *locked {
UploadQueue::Uninitialized => anyhow::bail!("is not Stopped but Uninitialized"),
UploadQueue::Initialized(_) => anyhow::bail!("is not Stopped but Initialized"),
UploadQueue::Uninitialized | UploadQueue::Initialized(_) | UploadQueue::Broken => {
anyhow::bail!(
"upload queue must be in state Stopped, but is in state {}",
locked.as_str()
);
}
UploadQueue::Stopped(stopped) => stopped,
};
@@ -690,10 +697,12 @@ impl RemoteTimelineClient {
let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
let mut locked = self_clone.upload_queue.lock().unwrap();
let stopped = match &mut *locked {
UploadQueue::Uninitialized | UploadQueue::Initialized(_) => unreachable!(
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
locked.as_str(),
),
UploadQueue::Broken | UploadQueue::Uninitialized | UploadQueue::Initialized(_) => {
unreachable!(
"there's no way out of Stopping, and we checked it's Stopping above: {:?}",
locked.as_str(),
)
}
UploadQueue::Stopped(stopped) => stopped,
};
stopped.deleted_at = None;
@@ -846,9 +855,14 @@ impl RemoteTimelineClient {
// is cancellation safe, so we don't dare to do that. Hopefully, the
// upload finishes or times out soon enough.
if task_mgr::is_shutdown_requested() {
info!("upload task cancelled by shutdown request");
info!("upload task cancelled by shutdown request, stopping queue");
match self.stop() {
Ok(()) => {}
Err(StopError::QueueBroken) => {
warn!("stop() observed upload queue as broken");
// In this case, it's still ok to proceed with balancing out the metric and returning.
// (The metric has nothing to do with the queue state itself).
}
Err(StopError::QueueUninitialized) => {
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
}
@@ -978,6 +992,10 @@ impl RemoteTimelineClient {
info!("another concurrent task already stopped the queue");
return;
}, // nothing to do
UploadQueue::Broken => {
warn!("the upload queue became broken while the task was running");
return;
}
UploadQueue::Initialized(qi) => { qi }
};
@@ -1067,63 +1085,82 @@ impl RemoteTimelineClient {
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
let mut res = None;
take_mut::take(&mut *guard, |queue| {
match queue {
UploadQueue::Uninitialized => {
res = Some(Err(StopError::QueueUninitialized));
queue
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
res = Some(Ok(()));
queue
}
UploadQueue::Initialized(qi) => {
let UploadQueueInitialized {
task_counter: _,
latest_files,
// XXX need to think about what it means if it's non-zero here
latest_files_changes_since_metadata_upload_scheduled: _,
latest_metadata,
last_uploaded_consistent_lsn,
num_inprogress_layer_uploads,
num_inprogress_metadata_uploads,
num_inprogress_deletions,
inprogress_tasks,
queued_operations,
} = qi;
// consistency check
assert_eq!(
num_inprogress_layer_uploads
+ num_inprogress_metadata_uploads
+ num_inprogress_deletions,
inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(inprogress_tasks);
// Tear down queued ops
for op in queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
// If any of the asserts fail, we'll transition the queue into Broken state
// and then continue to panic.
// As part of that transition, `queued_operations` will get dropped.
// `wait_completion` operations waiting for these will observe an error.
// That's exactly what we want.
// We don't need to care about in-progress operations because that responsibility
// lies with the caller. There's no point for them to try anything funky, like,
// catching the panic and retrying the stop() call. We will return QueueBroken in that case.
take_mut::take_or_recover(
&mut *guard,
|| {
warn!("transitioning the upload queue into Broken state");
UploadQueue::Broken
},
|queue| {
match queue {
UploadQueue::Broken => {
res = Some(Err(StopError::QueueBroken));
queue
}
UploadQueue::Uninitialized => {
res = Some(Err(StopError::QueueUninitialized));
queue
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
res = Some(Ok(()));
queue
}
UploadQueue::Initialized(qi) => {
let UploadQueueInitialized {
task_counter: _,
latest_files,
// XXX need to think about what it means if it's non-zero here
latest_files_changes_since_metadata_upload_scheduled: _,
latest_metadata,
last_uploaded_consistent_lsn,
num_inprogress_layer_uploads,
num_inprogress_metadata_uploads,
num_inprogress_deletions,
inprogress_tasks,
queued_operations,
} = qi;
// consistency check
assert_eq!(
num_inprogress_layer_uploads
+ num_inprogress_metadata_uploads
+ num_inprogress_deletions,
inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(inprogress_tasks);
// Tear down queued ops
for op in queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
res = Some(Ok(()));
UploadQueue::Stopped(UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: None,
})
}
res = Some(Ok(()));
UploadQueue::Stopped(UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: None,
})
}
}
});
},
);
res.expect("the closure above always sets res")
}
}

View File

@@ -23,6 +23,7 @@ pub(super) enum UploadQueue {
Uninitialized,
Initialized(UploadQueueInitialized),
Stopped(UploadQueueStopped),
Broken,
}
impl UploadQueue {
@@ -31,6 +32,7 @@ impl UploadQueue {
UploadQueue::Uninitialized => "Uninitialized",
UploadQueue::Initialized(_) => "Initialized",
UploadQueue::Stopped(_) => "Stopped",
UploadQueue::Broken => "Broken",
}
}
}
@@ -91,7 +93,7 @@ impl UploadQueue {
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) | UploadQueue::Broken => {
anyhow::bail!("already initialized, state {}", self.as_str())
}
}
@@ -125,7 +127,7 @@ impl UploadQueue {
) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized => (),
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
UploadQueue::Initialized(_) | UploadQueue::Stopped(_) | UploadQueue::Broken => {
anyhow::bail!("already initialized, state {}", self.as_str())
}
}
@@ -175,7 +177,7 @@ impl UploadQueue {
pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
match self {
UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
UploadQueue::Broken | UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Initialized(x) => Ok(x),