refactor(remote_timeline_client): infallible stop() and shutdown() (#7234)

preliminary refactoring for
https://github.com/neondatabase/neon/pull/7233

part of #7062
This commit is contained in:
Christian Schwarz
2024-03-25 18:42:18 +01:00
committed by GitHub
parent d837ce0686
commit f72415e1fd
5 changed files with 51 additions and 75 deletions

View File

@@ -2141,7 +2141,7 @@ impl Tenant {
// Shut down the timeline's remote client: this means that the indices we write
// for child shards will not be invalidated by the parent shard deleting layers.
tl_client.shutdown().await?;
tl_client.shutdown().await;
// Download methods can still be used after shutdown, as they don't flow through the remote client's
// queue. In principal the RemoteTimelineClient could provide this without downloading it, but this

View File

@@ -217,7 +217,7 @@ use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::download::download_retry;
use crate::tenant::storage_layer::AsLayerDesc;
use crate::tenant::upload_queue::Delete;
use crate::tenant::upload_queue::{Delete, UploadQueueStoppedDeletable};
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::{
config::PageServerConf,
@@ -265,15 +265,6 @@ pub enum MaybeDeletedIndexPart {
Deleted(IndexPart),
}
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
#[derive(Debug, thiserror::Error)]
pub enum StopError {
/// Returned if the upload queue was never initialized.
/// See [`RemoteTimelineClient::init_upload_queue`] and [`RemoteTimelineClient::init_upload_queue_for_empty_remote`].
#[error("queue is not initialized")]
QueueUninitialized,
}
#[derive(Debug, thiserror::Error)]
pub enum PersistIndexPartWithDeletedFlagError {
#[error("another task is already setting the deleted_flag, started at {0:?}")]
@@ -390,15 +381,10 @@ impl RemoteTimelineClient {
"bug: it is responsibility of the caller to provide index part from MaybeDeletedIndexPart::Deleted"
))?;
{
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
self.update_remote_physical_size_gauge(Some(index_part));
}
// also locks upload queue, without dropping the guard above it will be a deadlock
self.stop().expect("initialized line above");
let mut upload_queue = self.upload_queue.lock().unwrap();
upload_queue.initialize_with_current_remote_index_part(index_part)?;
self.update_remote_physical_size_gauge(Some(index_part));
self.stop_impl(&mut upload_queue);
upload_queue
.stopped_mut()
@@ -412,7 +398,8 @@ impl RemoteTimelineClient {
match &mut *self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => q.get_last_remote_consistent_lsn_projected(),
UploadQueue::Stopped(q) => q
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => q
.upload_queue_for_deletion
.get_last_remote_consistent_lsn_projected(),
}
@@ -422,7 +409,8 @@ impl RemoteTimelineClient {
match &mut *self.upload_queue.lock().unwrap() {
UploadQueue::Uninitialized => None,
UploadQueue::Initialized(q) => Some(q.get_last_remote_consistent_lsn_visible()),
UploadQueue::Stopped(q) => Some(
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => None,
UploadQueue::Stopped(UploadQueueStopped::Deletable(q)) => Some(
q.upload_queue_for_deletion
.get_last_remote_consistent_lsn_visible(),
),
@@ -889,7 +877,7 @@ impl RemoteTimelineClient {
/// Wait for all previously scheduled operations to complete, and then stop.
///
/// Not cancellation safe
pub(crate) async fn shutdown(self: &Arc<Self>) -> Result<(), StopError> {
pub(crate) async fn shutdown(self: &Arc<Self>) {
// On cancellation the queue is left in ackward state of refusing new operations but
// proper stop is yet to be called. On cancel the original or some later task must call
// `stop` or `shutdown`.
@@ -900,8 +888,12 @@ impl RemoteTimelineClient {
let fut = {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = match &mut *guard {
UploadQueue::Stopped(_) => return Ok(()),
UploadQueue::Uninitialized => return Err(StopError::QueueUninitialized),
UploadQueue::Stopped(_) => return,
UploadQueue::Uninitialized => {
// transition into Stopped state
self.stop_impl(&mut guard);
return;
}
UploadQueue::Initialized(ref mut init) => init,
};
@@ -933,7 +925,7 @@ impl RemoteTimelineClient {
}
}
self.stop()
self.stop();
}
/// Set the deleted_at field in the remote index file.
@@ -1314,12 +1306,7 @@ impl RemoteTimelineClient {
// upload finishes or times out soon enough.
if cancel.is_cancelled() {
info!("upload task cancelled by shutdown request");
match self.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
unreachable!("we never launch an upload task if the queue is uninitialized, and once it is initialized, we never go back")
}
}
self.stop();
return;
}
@@ -1574,17 +1561,23 @@ impl RemoteTimelineClient {
/// In-progress operations will still be running after this function returns.
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
/// to wait for them to complete, after calling this function.
pub(crate) fn stop(&self) -> Result<(), StopError> {
pub(crate) fn stop(&self) {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &mut *guard {
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
self.stop_impl(&mut guard);
}
fn stop_impl(&self, guard: &mut std::sync::MutexGuard<UploadQueue>) {
match &mut **guard {
UploadQueue::Uninitialized => {
info!("UploadQueue is in state Uninitialized, nothing to do");
**guard = UploadQueue::Stopped(UploadQueueStopped::Uninitialized);
}
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
Ok(())
}
UploadQueue::Initialized(initialized) => {
info!("shutting down upload queue");
@@ -1617,11 +1610,13 @@ impl RemoteTimelineClient {
};
let upload_queue = std::mem::replace(
&mut *guard,
UploadQueue::Stopped(UploadQueueStopped {
upload_queue_for_deletion,
deleted_at: SetDeletedFlagProgress::NotRunning,
}),
&mut **guard,
UploadQueue::Stopped(UploadQueueStopped::Deletable(
UploadQueueStoppedDeletable {
upload_queue_for_deletion,
deleted_at: SetDeletedFlagProgress::NotRunning,
},
)),
);
if let UploadQueue::Initialized(qi) = upload_queue {
qi
@@ -1650,10 +1645,6 @@ impl RemoteTimelineClient {
// which is exactly what we want to happen.
drop(op);
}
// We're done.
drop(guard);
Ok(())
}
}
}

View File

@@ -54,6 +54,7 @@ use std::{
ops::ControlFlow,
};
use crate::deletion_queue::DeletionQueueClient;
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
layer_map::{LayerMap, SearchResult},
@@ -64,7 +65,6 @@ use crate::{
disk_usage_eviction_task::DiskUsageEvictionInfo,
pgdatadir_mapping::CollectKeySpaceError,
};
use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError};
use crate::{
disk_usage_eviction_task::finite_f32,
tenant::storage_layer::{
@@ -1241,11 +1241,7 @@ impl Timeline {
// what is problematic is the shutting down of RemoteTimelineClient, because
// obviously it does not make sense to stop while we wait for it, but what
// about corner cases like s3 suddenly hanging up?
if let Err(e) = client.shutdown().await {
// Non-fatal. Shutdown is infallible. Failures to flush just mean that
// we have some extra WAL replay to do next time the timeline starts.
warn!("failed to flush to remote storage: {e:#}");
}
client.shutdown().await;
}
}
Err(e) => {
@@ -1282,12 +1278,7 @@ impl Timeline {
// Shut down remote timeline client: this gracefully moves its metadata into its Stopping state in
// case our caller wants to use that for a deletion
if let Some(remote_client) = self.remote_client.as_ref() {
match remote_client.stop() {
Ok(()) => {}
Err(StopError::QueueUninitialized) => {
// Shutting down during initialization is legal
}
}
remote_client.stop();
}
tracing::debug!("Waiting for tasks...");

View File

@@ -16,9 +16,7 @@ use crate::{
tenant::{
debug_assert_current_span_has_tenant_and_timeline_id,
metadata::TimelineMetadata,
remote_timeline_client::{
self, PersistIndexPartWithDeletedFlagError, RemoteTimelineClient,
},
remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient},
CreateTimelineCause, DeleteTimelineError, Tenant,
},
};
@@ -50,19 +48,7 @@ async fn stop_tasks(timeline: &Timeline) -> Result<(), DeleteTimelineError> {
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
}
},
}
remote_client.stop();
}
// Stop & wait for the remaining timeline tasks, including upload tasks.

View File

@@ -121,11 +121,16 @@ pub(super) enum SetDeletedFlagProgress {
Successful(NaiveDateTime),
}
pub(super) struct UploadQueueStopped {
pub(super) struct UploadQueueStoppedDeletable {
pub(super) upload_queue_for_deletion: UploadQueueInitialized,
pub(super) deleted_at: SetDeletedFlagProgress,
}
pub(super) enum UploadQueueStopped {
Deletable(UploadQueueStoppedDeletable),
Uninitialized,
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum NotInitialized {
#[error("queue is in state Uninitialized")]
@@ -249,12 +254,15 @@ impl UploadQueue {
}
}
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
match self {
UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
anyhow::bail!("queue is in state {}", self.as_str())
}
UploadQueue::Stopped(stopped) => Ok(stopped),
UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
anyhow::bail!("queue is in state Stopped(Uninitialized)")
}
UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
}
}
}