distinguished error types

This commit is contained in:
Christian Schwarz
2023-04-27 17:59:10 +02:00
parent d5280bf2dd
commit e41b2ed66a
3 changed files with 21 additions and 6 deletions

View File

@@ -143,6 +143,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
HasChildren => ApiError::BadRequest(anyhow::anyhow!(
"Cannot delete timeline which has child timelines"
)),
StopUploadQueue(e) => ApiError::InternalServerError(e.into()),
Other(e) => ApiError::InternalServerError(e),
}
}

View File

@@ -447,6 +447,8 @@ pub enum DeleteTimelineError {
NotFound,
#[error("HasChildren")]
HasChildren,
#[error("stop upload queue: {0:#}")]
StopUploadQueue(#[from] remote_timeline_client::StopError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}

View File

@@ -204,11 +204,11 @@ mod download;
pub mod index;
mod upload;
use anyhow::Context;
use chrono::Utc;
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
use scopeguard::ScopeGuard;
use utils::bin_ser::SerializeError;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -260,6 +260,16 @@ pub enum MaybeDeletedIndexPart {
Deleted,
}
/// Errors that can arise when calling [`RemoteTimelineClient::stop`].
#[derive(Debug, thiserror::Error)]
pub enum StopError {
/// Callers are responsible for checking this before calling `stop()`.
#[error("queue is not initialized")]
QueueUninitialized,
#[error("serialize metadata: {0:#}")]
SerializeMetadata(SerializeError),
}
/// A client for accessing a timeline's data in remote storage.
///
/// This takes care of managing the number of connections, and balancing them
@@ -998,15 +1008,17 @@ impl RemoteTimelineClient {
self.metrics.call_end(&file_kind, &op_kind);
}
pub fn stop(&self) -> anyhow::Result<()> {
/// Close the upload queue for new operations and cancel queued operations.
/// In-progress operations will still be running after this function returns.
/// Use `task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id))`
/// to wait for them to complete, after calling this function.
pub fn stop(&self) -> Result<(), StopError> {
// Whichever *task* for this RemoteTimelineClient grabs the mutex first will transition the queue
// into stopped state, thereby dropping all off the queued *ops* which haven't become *tasks* yet.
// The other *tasks* will come here and observe an already shut down queue and hence simply wrap up their business.
let mut guard = self.upload_queue.lock().unwrap();
match &*guard {
UploadQueue::Uninitialized => anyhow::bail!(
"callers are responsible for ensuring this is only called on initialized queue"
),
UploadQueue::Uninitialized => Err(StopError::QueueUninitialized),
UploadQueue::Stopped(_) => {
// nothing to do
info!("another concurrent task already shut down the queue");
@@ -1021,7 +1033,7 @@ impl RemoteTimelineClient {
qi.last_uploaded_consistent_lsn,
qi.latest_metadata
.to_bytes()
.context("should be able to serialize metadata")?,
.map_err(StopError::SerializeMetadata)?,
);
// Replace the queue with the Stopped state, taking ownership of the old