Compare commits

...

19 Commits

Author SHA1 Message Date
Dmitry Ivanov
389dfb6809 Simplify low-hanging fruit 2023-05-16 18:00:11 +03:00
Dmitry Ivanov
b2bdd1b117 Changes 2023-05-16 17:09:33 +03:00
Joonas Koivunen
0b42ba9615 refactor: log errors with {:#}
even though it's a std::io::Error

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-11 20:01:24 +03:00
Joonas Koivunen
6d3be4cf7d doc: review suggestions
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-11 20:00:33 +03:00
Joonas Koivunen
7605aed5aa fixup: refactor: stop using todo! for a panic! 2023-05-10 18:21:19 +03:00
Joonas Koivunen
e13ae811bf fixup: move doc test as test case and hide _spawn variants 2023-05-10 17:55:45 +03:00
Joonas Koivunen
36e86e370b fixup: nicer ascii art 2023-05-10 17:55:05 +03:00
Joonas Koivunen
8e51d5d8f3 doc: remove FIXME simplify uploading
there's no need to simplify the uploading; the uploading is not on
within this one future which will be executed so it's not safe to just
remove it or revert it back.

also I am hesitant to add new changes at this point, the diff is large
enough.
2023-05-10 17:35:26 +03:00
Joonas Koivunen
72131e15ff test: less racy test_delete_timeline_client_hangup
with the SharedRetried the first request creates a task which will
continue to the pause point regardless of the first request, so we need
to accept a 404 as success.
2023-05-10 17:31:19 +03:00
Joonas Koivunen
0495043d30 test: fix test_concurrent_timeline_delete_if_first_stuck_at_index_upload
manually cherry-picked 245a2a0592
2023-05-10 17:30:51 +03:00
Joonas Koivunen
c87a742bbd fixup: one missed logging opportunity 2023-05-10 16:53:14 +03:00
Joonas Koivunen
c7c514aae4 doc: more cleanup 2023-05-10 16:53:03 +03:00
Joonas Koivunen
f7b0beb1cf refactor: simplify, racy removes no longer intended 2023-05-10 16:27:08 +03:00
Joonas Koivunen
b5b0a2e872 fixup: doc: align strong usage, fix broken references 2023-05-10 16:25:43 +03:00
Joonas Koivunen
cd787070f6 fixup: doc: explain convoluted return type 2023-05-10 16:25:07 +03:00
Joonas Koivunen
05d4a85c2f fixup: doc: many futures instead of tasks
as in, you could run many of these futures with tokio::select within one
task or `runtime.block_on`; it does not matter.
2023-05-10 16:24:18 +03:00
Joonas Koivunen
9a35ae40b5 fixup: clippy 2023-05-10 15:54:14 +03:00
Joonas Koivunen
3d33ea465d fix: coalesce requests to delete_timeline 2023-05-10 15:46:55 +03:00
Joonas Koivunen
fd50c95b9d feat: utils::shared_retryable::SharedRetryable
documented in the module.
2023-05-10 15:08:42 +03:00
6 changed files with 958 additions and 179 deletions

View File

@@ -60,6 +60,10 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Primitive for coalescing operations into a single task which will not be cancelled by for
/// example external http client closing the connection.
pub mod shared_retryable;
/// use with fail::cfg("$name", "return(2000)")
#[macro_export]
macro_rules! failpoint_sleep_millis_async {

View File

@@ -0,0 +1,659 @@
use std::future::Future;
use std::sync::Arc;
/// Container using which many request handlers can come together and join a single task to
/// completion instead of racing each other and their own cancellation.
///
/// In a picture:
///
/// ```text
/// SharedRetryable::try_restart Spawned task completes with only one concurrent attempt
/// \ /
/// request handler 1 ---->|--X
/// request handler 2 ---->|-------|
/// request handler 3 ---->|-------|
/// | |
/// v |
/// one spawned task \------>/
///
/// (X = cancelled during await)
/// ```
///
/// Implementation is cancel safe. Implementation and internal structure are hurt by the inability
/// to just spawn the task, but this is needed for pageserver usage.
///
/// Implementation exposes a fully decomposed [`SharedRetryable::try_restart`] which requires the
/// caller to do the spawning before awaiting for the result. If the caller is dropped while this
/// happens, a new attempt will be required, and all concurrent awaiters will see a
/// [`RetriedTaskPanicked`] error.
///
/// There is another "family of APIs" [`SharedRetryable::attempt_spawn`] for infallible futures. It is
/// just provided for completeness, and it does not have a fully decomposed version like
/// `try_restart`.
///
/// For `try_restart_*` family of APIs, there is a concept of two leveled results. The inner level
/// is returned by the executed future. It needs to be `Clone`. Most errors are not `Clone`, so
/// implementation advice is to log the happened error, and not propagate more than a label as the
/// "inner error" which will be used to build an outer error. The outer error will also have to be
/// convertable from [`RetriedTaskPanicked`] to absorb that case as well.
///
/// ## Example
///
/// A shared service value completes the infallible work once, even if called concurrently by
/// multiple cancellable tasks.
///
/// Example moved as a test `service_example`.
#[derive(Clone)]
pub struct SharedRetryable<V> {
inner: Arc<tokio::sync::Mutex<Option<MaybeDone<V>>>>,
}
impl<V> Default for SharedRetryable<V> {
fn default() -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(None)),
}
}
}
/// Determine if an error is transient or permanent.
pub trait Retryable {
fn is_permanent(&self) -> bool {
true
}
}
pub trait MakeFuture {
type Future: Future<Output = Self::Output> + Send + 'static;
type Output: Send + 'static;
fn make_future(self) -> Self::Future;
}
impl<Fun, Fut, R> MakeFuture for Fun
where
Fun: FnOnce() -> Fut,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
type Future = Fut;
type Output = R;
fn make_future(self) -> Self::Future {
self()
}
}
/// Retried task panicked, was cancelled, or never spawned (see [`SharedRetryable::try_restart`]).
#[derive(Debug, PartialEq, Eq)]
pub struct RetriedTaskPanicked;
impl<T, E1> SharedRetryable<Result<T, E1>>
where
T: Clone + std::fmt::Debug + Send + 'static,
E1: Retryable + Clone + std::fmt::Debug + Send + 'static,
{
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and and get the terminal result from an earlier attempt
/// or start a new attempt, or join an existing one.
///
/// Compared to `Self::try_restart`, this method also spawns the future to run, which would
/// otherwise have to be done manually.
#[cfg(test)]
pub async fn try_restart_spawn<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
let (recv, maybe_fut) = self.try_restart(retry_with).await;
if let Some(fut) = maybe_fut {
// top level function, we must spawn, pageserver cannot use this
tokio::spawn(fut);
}
recv.await
}
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and get the terminal result from an earlier attempt or
/// start a new attempt, or join an existing one.
///
/// If a task calling this method is cancelled, at worst, a new attempt which is immediatedly
/// deemed as having panicked will happen, but without a panic ever happening.
///
/// Returns one future for waiting for the result and possibly another which needs to be
/// spawned when `Some`. Spawning has to happen before waiting is started, otherwise the first
/// future will never make progress.
///
/// This complication exists because on pageserver we cannot use `tokio::spawn` directly
/// at this time.
pub async fn try_restart<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> (
impl Future<Output = Result<T, E2>> + Send + 'static,
Option<impl Future<Output = ()> + Send + 'static>,
)
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
use futures::future::Either;
match self.decide_to_retry_or_join(retry_with).await {
Ok(terminal) => (Either::Left(async move { terminal }), None),
Err((rx, maybe_fut)) => {
let recv = Self::make_oneshot_alike_receiver(rx);
(Either::Right(recv), maybe_fut)
}
}
}
/// Returns a Ok if the previous attempt had resulted in a terminal result. Err is returned
/// when an attempt can be joined and possibly needs to be spawned.
async fn decide_to_retry_or_join<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<
Result<T, E2>,
(
tokio::sync::broadcast::Receiver<Result<T, E1>>,
Option<impl Future<Output = ()> + Send + 'static>,
),
>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(Ok(t))) => return Ok(Ok(t.to_owned())),
Some(MaybeDone::Done(Err(e))) if e.is_permanent() => {
return Ok(Err(E2::from(e.to_owned())))
}
Some(MaybeDone::Pending(weak)) => {
// failure to upgrade can mean only one thing: there was an unexpected
// panic which we consider as a transient retryable error.
weak.upgrade()
}
Some(MaybeDone::Done(Err(_retryable))) => None,
None => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
// new attempt
// panic safety: invoke the factory before configuring the pending value
let fut = retry_with.make_future();
let (strong, fut) = self.make_run_and_complete(fut, &mut g);
(strong, Some(fut))
}
};
// important: the Arc<Receiver> is not held after unlocking
// important: we resubscribe before lock is released to be sure to get a message which
// is sent once receiver is dropped
let rx = strong.resubscribe();
drop(strong);
Err((rx, maybe_fut))
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Returns an `Arc<Receiver<V>>` which is valid until the attempt completes, and the future
/// which will need to run to completion outside the lifecycle of the caller.
fn make_run_and_complete(
&self,
fut: impl Future<Output = Result<T, E1>> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<Result<T, E1>>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<Result<T, E1>>>,
impl Future<Output = ()> + Send + 'static,
) {
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
assert!(
weak.upgrade().is_none(),
"when starting a restart, should no longer have an upgradeable channel"
);
}
Some(MaybeDone::Done(Err(err))) => {
assert!(
!err.is_permanent(),
"when restarting, the err must be transient"
);
}
Some(MaybeDone::Done(Ok(_))) => {
panic!("unexpected restart after a completion on MaybeDone");
}
None => {}
}
self.make_run_and_complete_any(fut, g)
}
/// Oneshot alike as in it's a future which will be consumed by an `await`.
///
/// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple
/// times.
async fn make_oneshot_alike_receiver<E2>(
mut rx: tokio::sync::broadcast::Receiver<Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(E2::from(e)),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(E2::from(RetriedTaskPanicked))
}
}
}
}
impl<V> SharedRetryable<V>
where
V: std::fmt::Debug + Clone + Send + 'static,
{
/// Attempt to run once a spawned future to completion.
///
/// Any previous attempt which panicked will be retried, but the `RetriedTaskPanicked` will be
/// returned when the most recent attempt panicked.
#[cfg(test)]
pub async fn attempt_spawn(
&self,
attempt_with: impl MakeFuture<Output = V>,
) -> Result<V, RetriedTaskPanicked> {
let (rx, maybe_fut) = {
let mut g = self.inner.lock().await;
let maybe_rx = match g.as_ref() {
Some(MaybeDone::Done(v)) => return Ok(v.to_owned()),
Some(MaybeDone::Pending(weak)) => {
// see comment in try_restart
weak.upgrade()
}
None => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
let fut = attempt_with.make_future();
let (strong, fut) = self.make_run_and_complete_any(fut, &mut g);
(strong, Some(fut))
}
};
// see decide_to_retry_or_join for important notes
let rx = strong.resubscribe();
drop(strong);
(rx, maybe_fut)
};
if let Some(fut) = maybe_fut {
// this is a top level function, need to spawn directly
// from pageserver one wouldn't use this but more piecewise functions
tokio::spawn(fut);
}
let recv = Self::make_oneshot_alike_receiver_any(rx);
recv.await
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Forgetting the returned future is outside of scope of any correctness guarantees; all of
/// the waiters will then be deadlocked, and the MaybeDone will forever be pending. Dropping
/// and not running the future will then require a new attempt.
///
/// Also returns an `Arc<Receiver<V>>` which is valid until the attempt completes.
fn make_run_and_complete_any(
&self,
fut: impl Future<Output = V> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, Option<MaybeDone<V>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<V>>,
impl Future<Output = ()> + Send + 'static,
) {
let (tx, rx) = tokio::sync::broadcast::channel(1);
let strong = Arc::new(rx);
**g = Some(MaybeDone::Pending(Arc::downgrade(&strong)));
let retry = {
let strong = strong.clone();
self.clone().run_and_complete(fut, tx, strong)
};
#[cfg(debug_assertions)]
match &**g {
Some(MaybeDone::Pending(weak)) => {
let rx = weak.upgrade().expect("holding the weak and strong locally");
assert!(Arc::ptr_eq(&strong, &rx));
}
_ => unreachable!("MaybeDone::pending must be set after spawn_and_run_complete_any"),
}
(strong, retry)
}
/// Run the actual attempt, and communicate the response via both:
/// - setting the `MaybeDone::Done`
/// - the broadcast channel
async fn run_and_complete(
self,
fut: impl Future<Output = V>,
tx: tokio::sync::broadcast::Sender<V>,
strong: Arc<tokio::sync::broadcast::Receiver<V>>,
) {
let res = fut.await;
{
let mut g = self.inner.lock().await;
MaybeDone::complete(&mut *g, &strong, res.clone());
// make the weak un-upgradeable by dropping the final alive
// reference to it. it is final Arc because the Arc never escapes
// the critical section in `decide_to_retry_or_join` or `attempt_spawn`.
Arc::try_unwrap(strong).expect("expected this to be the only Arc<Receiver<V>>");
}
// now no one can get the Pending(weak) value to upgrade and they only see
// the Done(res).
//
// send the result value to listeners, if any
drop(tx.send(res));
}
#[cfg(test)]
async fn make_oneshot_alike_receiver_any(
mut rx: tokio::sync::broadcast::Receiver<V>,
) -> Result<V, RetriedTaskPanicked> {
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(t) => Ok(t),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(RetriedTaskPanicked)
}
}
}
}
/// MaybeDone handles synchronization for multiple requests and the single actual task.
///
/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a
/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails,
/// this means that "disconnect" has happened in the past.
///
/// On successful execution the one executing task will set this to `Done` variant, with the actual
/// resulting value.
#[derive(Debug)]
pub enum MaybeDone<V> {
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
Done(V),
}
impl<V: std::fmt::Debug> MaybeDone<V> {
fn complete(
this: &mut Option<MaybeDone<V>>,
_strong: &Arc<tokio::sync::broadcast::Receiver<V>>,
outcome: V,
) {
#[cfg(debug_assertions)]
match this {
Some(MaybeDone::Pending(weak)) => {
let same = weak
.upgrade()
// we don't yet have Receiver::same_channel
.map(|rx| Arc::ptr_eq(_strong, &rx))
.unwrap_or(false);
assert!(same, "different channel had been replaced or dropped");
}
other => panic!("unexpected MaybeDone: {other:?}"),
}
*this = Some(MaybeDone::Done(outcome));
}
}
#[cfg(test)]
mod tests {
use super::{RetriedTaskPanicked, Retryable, SharedRetryable};
use std::sync::Arc;
#[derive(Debug)]
enum OuterError {
AttemptPanicked,
Unlucky,
}
#[derive(Clone, Debug)]
enum InnerError {
Unlucky,
}
impl Retryable for InnerError {
fn is_permanent(&self) -> bool {
false
}
}
impl From<InnerError> for OuterError {
fn from(_: InnerError) -> Self {
OuterError::Unlucky
}
}
impl From<RetriedTaskPanicked> for OuterError {
fn from(_: RetriedTaskPanicked) -> Self {
OuterError::AttemptPanicked
}
}
#[tokio::test]
async fn restartable_until_permanent() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let res = shr
.try_restart_spawn(|| async move { panic!("really unlucky") })
.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)));
let res = shr
.try_restart_spawn(|| async move { Err(InnerError::Unlucky) })
.await;
assert!(matches!(res, Err(OuterError::Unlucky)));
let res = shr.try_restart_spawn(|| async move { Ok(42) }).await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
let res = shr
.try_restart_spawn(|| async move { panic!("rerun should clone Ok(42)") })
.await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
}
/// Demonstration of the SharedRetryable::attempt
#[tokio::test]
async fn attemptable_until_no_panic() {
let shr = SharedRetryable::<u8>::default();
let res = shr
.attempt_spawn(|| async move { panic!("should not interfere") })
.await;
assert!(matches!(res, Err(RetriedTaskPanicked)), "{res:?}");
let res = shr.attempt_spawn(|| async move { 42 }).await;
assert_eq!(res, Ok(42));
let res = shr
.attempt_spawn(|| async move { panic!("should not be called") })
.await;
assert_eq!(res, Ok(42));
}
#[tokio::test]
async fn cancelling_spawner_is_fine() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let (recv1, maybe_fut) = shr
.try_restart(|| async move { panic!("should not have been called") })
.await;
let should_be_spawned = maybe_fut.unwrap();
let (recv2, maybe_fut) = shr
.try_restart(|| async move {
panic!("should never be called because waiting on should_be_spawned")
})
.await;
assert!(
matches!(maybe_fut, None),
"only the first one should had created the future"
);
let mut recv1 = std::pin::pin!(recv1);
let mut recv2 = std::pin::pin!(recv2);
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {},
_ = &mut recv1 => unreachable!("should not have completed because should_be_spawned not spawned"),
_ = &mut recv2 => unreachable!("should not have completed because should_be_spawned not spawned"),
}
drop(should_be_spawned);
let res = recv1.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
let res = recv2.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
// but we can still reach a terminal state if the api is not misused or the
// should_be_spawned winner is not cancelled
let recv1 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(42) });
let recv2 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(43) });
assert_eq!(recv1.await.unwrap(), 42);
assert_eq!(recv2.await.unwrap(), 42, "43 should never be returned");
}
#[tokio::test]
async fn service_example() {
#[derive(Debug, Clone, Copy)]
enum OneLevelError {
TaskPanicked,
}
impl Retryable for OneLevelError {
fn is_permanent(&self) -> bool {
// for a single level errors, this wording is weird
!matches!(self, OneLevelError::TaskPanicked)
}
}
impl From<RetriedTaskPanicked> for OneLevelError {
fn from(_: RetriedTaskPanicked) -> Self {
OneLevelError::TaskPanicked
}
}
#[derive(Clone, Default)]
struct Service(SharedRetryable<Result<u8, OneLevelError>>);
impl Service {
async fn work(
&self,
completions: Arc<std::sync::atomic::AtomicUsize>,
) -> Result<u8, OneLevelError> {
self.0
.try_restart_spawn(|| async move {
// give time to cancel some of the tasks
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
completions.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self::work_once().await
})
.await
}
async fn work_once() -> Result<u8, OneLevelError> {
Ok(42)
}
}
let svc = Service::default();
let mut js = tokio::task::JoinSet::new();
let barrier = Arc::new(tokio::sync::Barrier::new(10 + 1));
let completions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let handles = (0..10)
.map(|_| {
js.spawn({
let svc = svc.clone();
let barrier = barrier.clone();
let completions = completions.clone();
async move {
// make sure all tasks are ready to start at the same time
barrier.wait().await;
// after successfully starting the work, any of the futures could get cancelled
svc.work(completions).await
}
})
})
.collect::<Vec<_>>();
barrier.wait().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
handles[5].abort();
let mut cancellations = 0;
while let Some(res) = js.join_next().await {
// all complete with the same result
match res {
Ok(res) => assert_eq!(res.unwrap(), 42),
Err(je) => {
// except for the one task we cancelled; it's cancelling
// does not interfere with the result
assert!(je.is_cancelled());
cancellations += 1;
assert_eq!(cancellations, 1, "only 6th task was aborted");
// however we cannot assert that everytime we get to cancel the 6th task
}
}
}
// there will be at most one terminal completion
assert_eq!(completions.load(std::sync::atomic::Ordering::Relaxed), 1);
}
}

View File

@@ -272,6 +272,9 @@ pub enum TaskKind {
#[cfg(test)]
UnitTest,
/// Task which is the only task to delete this particular timeline
DeleteTimeline,
}
#[derive(Default)]

View File

@@ -456,6 +456,50 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error, Clone)]
enum InnerDeleteTimelineError {
#[error("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")]
QueueUninitialized,
#[error("failpoint: {0}")]
Failpoint(&'static str),
#[error("failed to remove local timeline directory")]
FailedToRemoveLocalTimelineDirectory,
#[error("index_part.json upload failed")]
UploadFailed,
#[error("the deleted timeline grew branches while deleting it; tenant should now be broken")]
DeletedGrewChildren,
}
impl utils::shared_retryable::Retryable for InnerDeleteTimelineError {
fn is_permanent(&self) -> bool {
use InnerDeleteTimelineError::*;
match self {
QueueUninitialized => false,
Failpoint(_) => false,
FailedToRemoveLocalTimelineDirectory => false,
UploadFailed => false,
DeletedGrewChildren => true,
}
}
}
impl From<InnerDeleteTimelineError> for DeleteTimelineError {
fn from(value: InnerDeleteTimelineError) -> Self {
DeleteTimelineError::Other(anyhow::Error::new(value))
}
}
impl From<utils::shared_retryable::RetriedTaskPanicked> for DeleteTimelineError {
fn from(_: utils::shared_retryable::RetriedTaskPanicked) -> Self {
DeleteTimelineError::Other(anyhow::anyhow!("deleting timeline failed, please retry"))
}
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -1361,7 +1405,7 @@ impl Tenant {
/// Removes timeline-related in-memory data
pub async fn delete_timeline(
&self,
self: &Arc<Tenant>,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
@@ -1394,162 +1438,196 @@ impl Tenant {
timeline
};
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
let span = tracing::Span::current();
// if we have concurrent requests, we will only execute one version of following future;
// initially it did not have any means to be cancelled.
//
// NB: If you call delete_timeline multiple times concurrently, they will
// all go through the motions here. Make sure the code here is idempotent,
// and don't error out if some of the shutdown tasks have already been
// completed!
// NOTE: Unlike "the usual" futures, this one will log any errors instead of just propagating
// them to the caller. This is because this one future produces a value, which will need to
// be cloneable to everyone interested, and normal `std::error::Error` are not clonable.
// Also, it wouldn't make sense to log the same failure multiple times, it would look like
// multiple failures to anyone reading the logs.
let factory = || {
let tenant = self.clone();
let tenant_id = self.tenant_id;
let timeline = timeline.clone();
let timeline_id = timeline.timeline_id;
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
async move {
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
warn!("failed to stop RemoteTimelineClient due to uninitialized queue");
return Err(InnerDeleteTimelineError::QueueUninitialized);
}
},
}
}
// Stop & wait for the remaining timeline tasks, including upload tasks.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
warn!("upload failed: {e}");
return Err(InnerDeleteTimelineError::UploadFailed);
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let _layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by us earlier.
let local_timeline_directory =
tenant.conf.timeline_path(&timeline_id, &tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"failpoint: timeline-delete-before-rm",
))
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = tenant.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
}
Err(e) => {
warn!(
"failed to remove local timeline directory {}: {e:#}",
local_timeline_directory.display()
);
return Err(
InnerDeleteTimelineError::FailedToRemoveLocalTimelineDirectory,
);
}
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"timeline-delete-after-rm",
))
});
// Remove the timeline from the map or poison it if we've grown children.
let removed_timeline =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut timelines = tenant.timelines.lock().unwrap();
let children_exist = timelines.iter().any(|(_, entry)| {
entry.get_ancestor_timeline_id() == Some(timeline_id)
});
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
timelines.remove(&timeline_id)
}));
match removed_timeline {
Ok(Some(_)) => {}
Ok(None) => {
// with SharedRetryable this should no longer happen
warn!("no other task should had dropped the Timeline");
}
Err(_panic) => return Err(InnerDeleteTimelineError::DeletedGrewChildren),
}
Ok(())
}
// execute in the *winner's* span so we will capture the request id etc.
.instrument(span)
};
let (recv, maybe_fut) = timeline.delete_self.try_restart(factory).await;
if let Some(fut) = maybe_fut {
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::DeleteTimeline,
Some(self.tenant_id()),
None,
&format!("delete_timeline {}", timeline.timeline_id),
false,
async move {
fut.await;
Ok(())
},
}
);
}
// Stop & wait for the remaining timeline tasks, including upload tasks.
// NB: This and other delete_timeline calls do not run as a task_mgr task,
// so, they are not affected by this shutdown_tasks() call.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = self.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
// continue with the rest of the deletion
}
res => res.with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?,
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
Ok(())
recv.await
}
pub fn current_state(&self) -> TenantState {

View File

@@ -227,6 +227,9 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
pub(super) delete_self:
utils::shared_retryable::SharedRetryable<Result<(), super::InnerDeleteTimelineError>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -1379,6 +1382,8 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_self: utils::shared_retryable::SharedRetryable::default(),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result

View File

@@ -346,20 +346,31 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
def first_call(result_queue):
def delete_timeline_call(name, result_queue, barrier):
if barrier:
barrier.wait()
try:
log.info("first call start")
log.info(f"{name} call start")
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
log.info("first call success")
log.info(f"{name} call success")
result_queue.put("success")
except Exception:
log.exception("first call failed")
log.exception(f"{name} call failed")
result_queue.put("failure, see log for stack trace")
first_call_result: queue.Queue[str] = queue.Queue()
first_call_thread = threading.Thread(target=first_call, args=(first_call_result,))
delete_results: queue.Queue[str] = queue.Queue()
first_call_thread = threading.Thread(
target=delete_timeline_call,
args=(
"1st",
delete_results,
None,
),
)
first_call_thread.start()
second_call_thread = None
try:
def first_call_hit_failpoint():
@@ -369,30 +380,42 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
wait_until(50, 0.1, first_call_hit_failpoint)
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "another task is already setting the deleted_flag, started at"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
# the second call will try to transition the timeline into Stopping state as well
barrier = threading.Barrier(2)
second_call_thread = threading.Thread(
target=delete_timeline_call,
args=(
"2nd",
delete_results,
barrier,
),
)
second_call_thread.start()
barrier.wait()
# release the pause
ps_http.configure_failpoints((failpoint_name, "off"))
# both should had succeeded: the second call will coalesce with the already-ongoing first call
result = delete_results.get()
assert result == "success"
result = delete_results.get()
assert result == "success"
# the second call will try to transition the timeline into Stopping state, but it's already in that state
# (the transition to Stopping state is not part of the request coalescing, because Tenant and Timeline states are a mess already)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
log.info("second call failed as expected")
# by now we know that the second call failed, let's ensure the first call will finish
ps_http.configure_failpoints((failpoint_name, "off"))
result = first_call_result.get()
assert result == "success"
finally:
log.info("joining first call thread")
log.info("joining 1st thread")
# in any case, make sure the lifetime of the thread is bounded to this test
first_call_thread.join()
if second_call_thread:
log.info("joining 2nd thread")
second_call_thread.join()
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
"""
@@ -440,9 +463,16 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
# ok, retry without failpoint, it should succeed
ps_http.configure_failpoints((failpoint_name, "off"))
# this should succeed
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# the second call will try to transition the timeline into Stopping state, but it's already in that state
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
try:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
except PageserverApiException as e:
if e.status_code != 404:
raise e
else:
# mock_s3 was fast enough to delete before we got the request in
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Error processing HTTP request: NotFound: timeline not found"
)