push metadata.to_bytes() out of stop() into persist_index_part_with_deleted_flag()

This pushes the (unlikely) possibility of failure to serialize metadata
out of stop().
That in turn leaves us with only one case of how stop() can fail.
There are two callsites of stop():
1. perform_upload_task: here, we can safely say "unreachable", and I
   think any future refactorings that might violate that invariant
   would notice, because the unreachable!() is close to the code that
   would likely be refactored.
   The unreachable!() is desirable there because otherwise we'd need to
   think about how to handle the error. Maybe the previous code would
   have done the right thing, maybe not.

2. delete_timeline: this is the new one, and, it's far away from the
   code that initializes the upload queue. Putting an unreachable!()
   there seems risky. So, bail out with an error. It will become a 500
   status code, which console shall retry according to the openapi spec.
   We have test coverage that the retry can succeed.
This commit is contained in:
Christian Schwarz
2023-04-27 18:33:40 +02:00
parent 48112bdf53
commit 11c18b05aa
4 changed files with 96 additions and 74 deletions

7
Cargo.lock generated
View File

@@ -2677,6 +2677,7 @@ dependencies = [
"strum_macros",
"svg_fmt",
"sync_wrapper",
"take_mut",
"tempfile",
"tenant_size_model",
"thiserror",
@@ -4105,6 +4106,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "take_mut"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tar"
version = "0.4.38"

View File

@@ -74,6 +74,7 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
take_mut = "0.2.2"
[dev-dependencies]
criterion.workspace = true

View File

@@ -204,11 +204,11 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
use chrono::Utc;
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
use scopeguard::ScopeGuard;
use utils::bin_ser::SerializeError;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -266,8 +266,6 @@ pub enum StopError {
/// Callers are responsible for checking this before calling `stop()`.
#[error("queue is not initialized")]
QueueUninitialized,
#[error("serialize metadata: {0:#}")]
SerializeMetadata(SerializeError),
}
/// A client for accessing a timeline's data in remote storage.
@@ -354,7 +352,7 @@ impl RemoteTimelineClient {
match &*self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => Some(q.last_uploaded_consistent_lsn),
UploadQueue::Stopped(q) => Some(q.last_uploaded_index_part.disk_consistent_lsn),
UploadQueue::Stopped(q) => Some(q.last_uploaded_consistent_lsn),
}
}
@@ -643,7 +641,7 @@ impl RemoteTimelineClient {
pub(crate) async fn persist_index_part_with_deleted_flag(
self: &Arc<Self>,
) -> anyhow::Result<()> {
let index_part = {
let index_part_with_deleted_at = {
let mut locked = self.upload_queue.lock().unwrap();
// We must be in stopped state because otherwise
@@ -655,11 +653,22 @@ impl RemoteTimelineClient {
UploadQueue::Stopped(stopped) => stopped,
};
if let Some(deleted_at) = stopped.last_uploaded_index_part.deleted_at.as_ref() {
if let Some(deleted_at) = stopped.deleted_at.as_ref() {
anyhow::bail!("timeline is deleting, deleted_at: {:?}", deleted_at);
}
stopped.last_uploaded_index_part.deleted_at = Some(Utc::now().naive_utc());
stopped.last_uploaded_index_part.clone()
let deleted_at = Utc::now().naive_utc();
stopped.deleted_at = Some(deleted_at);
let mut index_part = IndexPart::new(
stopped.latest_files.clone(),
stopped.last_uploaded_consistent_lsn,
stopped
.latest_metadata
.to_bytes()
.context("serialize metadata")?,
);
index_part.deleted_at = Some(deleted_at);
index_part
};
let undo_deleted_at = scopeguard::guard(Arc::clone(self), |self_clone| {
@@ -671,7 +680,7 @@ impl RemoteTimelineClient {
),
UploadQueue::Stopped(stopped) => stopped,
};
stopped.last_uploaded_index_part.deleted_at = None;
stopped.deleted_at = None;
});
#[cfg(feature = "testing")]
@@ -695,7 +704,7 @@ impl RemoteTimelineClient {
&self.storage_impl,
self.tenant_id,
self.timeline_id,
&index_part,
&index_part_with_deleted_at,
)
.await?;
@@ -822,8 +831,11 @@ impl RemoteTimelineClient {
// upload finishes or times out soon enough.
if task_mgr::is_shutdown_requested() {
info!("upload task cancelled by shutdown request");
if let Err(e) = self.stop() {
error!("got an error when trying to stop remote client: {e}")
match self.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
}
}
self.calls_unfinished_metric_end(&task.op);
return;
@@ -1017,67 +1029,65 @@ impl RemoteTimelineClient {
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &*guard {
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
Ok(())
}
UploadQueue::Initialized(qi) => {
info!("shutting down upload queue");
// Prepare index part to put into stopped state
let index_part = IndexPart::new(
qi.latest_files.clone(),
qi.last_uploaded_consistent_lsn,
qi.latest_metadata
.to_bytes()
.map_err(StopError::SerializeMetadata)?,
);
// Replace the queue with the Stopped state, taking ownership of the old
// Initialized queue. We will do some checks on it, and then drop it.
let qi = {
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
last_uploaded_index_part: index_part,
}),
);
if let UploadQueue::Initialized(qi) = upload_queue {
qi
} else {
unreachable!("we checked in the match above that it is Initialized");
}
};
// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(qi.inprogress_tasks);
// Tear down queued ops
for op in qi.queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
let mut res = None;
take_mut::take(&mut *guard, |queue| {
match queue {
UploadQueue::Uninitialized => {
res = Some(Err(StopError::QueueUninitialized));
queue
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
res = Some(Ok(()));
queue
}
UploadQueue::Initialized(qi) => {
let UploadQueueInitialized {
task_counter: _,
latest_files,
// XXX need to think about what it means if it's non-zero here
latest_files_changes_since_metadata_upload_scheduled: _,
latest_metadata,
last_uploaded_consistent_lsn,
num_inprogress_layer_uploads,
num_inprogress_metadata_uploads,
num_inprogress_deletions,
inprogress_tasks,
queued_operations,
} = qi;
// We're done.
drop(guard);
Ok(())
// consistency check
assert_eq!(
num_inprogress_layer_uploads
+ num_inprogress_metadata_uploads
+ num_inprogress_deletions,
inprogress_tasks.len()
);
// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
drop(inprogress_tasks);
// Tear down queued ops
for op in queued_operations.into_iter() {
self.calls_unfinished_metric_end(&op);
// Dropping UploadOp::Barrier() here will make wait_completion() return with an Err()
// which is exactly what we want to happen.
drop(op);
}
res = Some(Ok(()));
UploadQueue::Stopped(UploadQueueStopped {
latest_files,
last_uploaded_consistent_lsn,
latest_metadata,
deleted_at: None,
})
}
}
}
});
res.expect("the closure above always sets res")
}
}

View File

@@ -7,6 +7,7 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use chrono::NaiveDateTime;
use std::sync::Arc;
use tracing::info;
@@ -18,7 +19,7 @@ use utils::lsn::Lsn;
// that many upload queues in a running pageserver, and most of them are initialized
// anyway.
#[allow(clippy::large_enum_variant)]
pub(crate) enum UploadQueue {
pub(super) enum UploadQueue {
Uninitialized,
Initialized(UploadQueueInitialized),
Stopped(UploadQueueStopped),
@@ -75,9 +76,12 @@ pub(crate) struct UploadQueueInitialized {
pub(crate) queued_operations: VecDeque<UploadOp>,
}
pub(crate) struct UploadQueueStopped {
/// Index part is needed here so timeline_delete can access it
pub(super) last_uploaded_index_part: IndexPart,
pub(super) struct UploadQueueStopped {
pub(super) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
pub(super) last_uploaded_consistent_lsn: Lsn,
pub(super) latest_metadata: TimelineMetadata,
/// If Some(), a call to `persist_index_part_with_deleted_flag` is ongoing or finished.
pub(super) deleted_at: Option<NaiveDateTime>,
}
impl UploadQueue {