Compare commits

...

29 Commits

Author SHA1 Message Date
Joonas Koivunen
69e92372c8 refactor: inline None into MaybeDone::NotStarted 2023-05-22 11:09:27 +03:00
Joonas Koivunen
c8b35324f6 doc: clarify cancellation on spawned task 2023-05-22 10:57:26 +03:00
Joonas Koivunen
bc4f6db0ad doc: explain what task_mgr gives us 2023-05-22 10:57:04 +03:00
Dmitry Ivanov
d0b2befe02 Simplify low-hanging fruit 2023-05-17 14:42:34 +03:00
Dmitry Ivanov
0251c7c15c Changes 2023-05-17 14:42:34 +03:00
Joonas Koivunen
9ab4e5ae54 test: refactor: unified check gone style 2023-05-17 13:43:56 +03:00
Joonas Koivunen
70c2717239 test: add missing allowed_error 2023-05-17 11:15:10 +03:00
Joonas Koivunen
f0bcfd61e3 refactor: panic with actual error, add fallback
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-17 11:13:49 +03:00
Joonas Koivunen
cbe3e11923 refactor: prefer in_current_span 2023-05-16 15:56:54 +03:00
Joonas Koivunen
7b6ed51319 refactor: log errors with {:#}
even though it's a std::io::Error

Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-16 15:49:21 +03:00
Joonas Koivunen
a9720349bb doc: review suggestions
Co-authored-by: Christian Schwarz <christian@neon.tech>
2023-05-16 15:49:02 +03:00
Joonas Koivunen
83457a848b test: simplify delete_timeline client hangup
per review comments.
2023-05-16 15:45:36 +03:00
Joonas Koivunen
b4cf3b18fa test: assert that allowed error appears in log 2023-05-16 15:45:36 +03:00
Joonas Koivunen
6b1adf6e5e test: test_concurrent_timeline_delete_if_first_stuck_at_index_upload docs 2023-05-16 15:45:36 +03:00
Joonas Koivunen
4591698cd4 fixup: refactor: stop using todo! for a panic! 2023-05-16 15:45:36 +03:00
Joonas Koivunen
81e8c10069 fixup: move doc test as test case and hide _spawn variants 2023-05-16 15:45:36 +03:00
Joonas Koivunen
6bd3951141 fixup: nicer ascii art 2023-05-16 15:45:36 +03:00
Joonas Koivunen
511978be6f doc: remove FIXME simplify uploading
there's no need to simplify the uploading; the uploading is not on
within this one future which will be executed so it's not safe to just
remove it or revert it back.

also I am hesitant to add new changes at this point, the diff is large
enough.
2023-05-16 15:45:36 +03:00
Joonas Koivunen
1e94bbf249 test: less racy test_delete_timeline_client_hangup
with the SharedRetried the first request creates a task which will
continue to the pause point regardless of the first request, so we need
to accept a 404 as success.
2023-05-16 15:45:36 +03:00
Joonas Koivunen
a0f63b8264 test: fix test_concurrent_timeline_delete_if_first_stuck_at_index_upload
manually cherry-picked 245a2a0592
2023-05-16 15:45:36 +03:00
Joonas Koivunen
c948ee3975 fixup: one missed logging opportunity 2023-05-16 15:45:36 +03:00
Joonas Koivunen
3784166180 doc: more cleanup 2023-05-16 15:45:36 +03:00
Joonas Koivunen
8c1962d40c refactor: simplify, racy removes no longer intended 2023-05-16 15:45:36 +03:00
Joonas Koivunen
d3ce8eae4e fixup: doc: align strong usage, fix broken references 2023-05-16 15:45:36 +03:00
Joonas Koivunen
3b1141b344 fixup: doc: explain convoluted return type 2023-05-16 15:45:36 +03:00
Joonas Koivunen
17c92c885c fixup: doc: many futures instead of tasks
as in, you could run many of these futures with tokio::select within one
task or `runtime.block_on`; it does not matter.
2023-05-16 15:45:36 +03:00
Joonas Koivunen
43b397da10 fixup: clippy 2023-05-16 15:45:36 +03:00
Joonas Koivunen
00673cf900 fix: coalesce requests to delete_timeline 2023-05-16 15:45:36 +03:00
Joonas Koivunen
ba7c97b61c feat: utils::shared_retryable::SharedRetryable
documented in the module.
2023-05-16 15:45:34 +03:00
6 changed files with 965 additions and 189 deletions

View File

@@ -60,6 +60,10 @@ pub mod tracing_span_assert;
pub mod rate_limit;
/// Primitive for coalescing operations into a single task which will not be cancelled by for
/// example external http client closing the connection.
pub mod shared_retryable;
mod failpoint_macro_helpers {
/// use with fail::cfg("$name", "return(2000)")
@@ -96,6 +100,7 @@ mod failpoint_macro_helpers {
tracing::info!("failpoint {:?}: sleep done", name);
}
}
pub use failpoint_macro_helpers::failpoint_sleep_helper;
/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages

View File

@@ -0,0 +1,657 @@
use std::future::Future;
use std::sync::Arc;
/// Container using which many request handlers can come together and join a single task to
/// completion instead of racing each other and their own cancellation.
///
/// In a picture:
///
/// ```text
/// SharedRetryable::try_restart Spawned task completes with only one concurrent attempt
/// \ /
/// request handler 1 ---->|--X
/// request handler 2 ---->|-------|
/// request handler 3 ---->|-------|
/// | |
/// v |
/// one spawned task \------>/
///
/// (X = cancelled during await)
/// ```
///
/// Implementation is cancel safe. Implementation and internal structure are hurt by the inability
/// to just spawn the task, but this is needed for `pageserver` usage. Within `pageserver`, the
/// `task_mgr` must be used to spawn the future because it will cause awaiting during shutdown.
///
/// Implementation exposes a fully decomposed [`SharedRetryable::try_restart`] which requires the
/// caller to do the spawning before awaiting for the result. If the caller is dropped while this
/// happens, a new attempt will be required, and all concurrent awaiters will see a
/// [`RetriedTaskPanicked`] error.
///
/// There is another "family of APIs" [`SharedRetryable::attempt_spawn`] for infallible futures. It is
/// just provided for completeness, and it does not have a fully decomposed version like
/// `try_restart`.
///
/// For `try_restart_*` family of APIs, there is a concept of two leveled results. The inner level
/// is returned by the executed future. It needs to be `Clone`. Most errors are not `Clone`, so
/// implementation advice is to log the happened error, and not propagate more than a label as the
/// "inner error" which will be used to build an outer error. The outer error will also have to be
/// convertable from [`RetriedTaskPanicked`] to absorb that case as well.
///
/// ## Example
///
/// A shared service value completes the infallible work once, even if called concurrently by
/// multiple cancellable tasks.
///
/// Example moved as a test `service_example`.
#[derive(Clone)]
pub struct SharedRetryable<V> {
inner: Arc<tokio::sync::Mutex<MaybeDone<V>>>,
}
impl<V> Default for SharedRetryable<V> {
fn default() -> Self {
Self {
inner: Arc::new(tokio::sync::Mutex::new(MaybeDone::default())),
}
}
}
/// Determine if an error is transient or permanent.
pub trait Retryable {
fn is_permanent(&self) -> bool {
true
}
}
pub trait MakeFuture {
type Future: Future<Output = Self::Output> + Send + 'static;
type Output: Send + 'static;
fn make_future(self) -> Self::Future;
}
impl<Fun, Fut, R> MakeFuture for Fun
where
Fun: FnOnce() -> Fut,
Fut: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
type Future = Fut;
type Output = R;
fn make_future(self) -> Self::Future {
self()
}
}
/// Retried task panicked, was cancelled, or never spawned (see [`SharedRetryable::try_restart`]).
#[derive(Debug, PartialEq, Eq)]
pub struct RetriedTaskPanicked;
impl<T, E1> SharedRetryable<Result<T, E1>>
where
T: Clone + std::fmt::Debug + Send + 'static,
E1: Retryable + Clone + std::fmt::Debug + Send + 'static,
{
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and and get the terminal result from an earlier attempt
/// or start a new attempt, or join an existing one.
///
/// Compared to `Self::try_restart`, this method also spawns the future to run, which would
/// otherwise have to be done manually.
#[cfg(test)]
pub async fn try_restart_spawn<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
let (recv, maybe_fut) = self.try_restart(retry_with).await;
if let Some(fut) = maybe_fut {
// top level function, we must spawn, pageserver cannot use this
tokio::spawn(fut);
}
recv.await
}
/// Restart a previously failed operation unless it already completed with a terminal result.
///
/// Many futures can call this function and get the terminal result from an earlier attempt or
/// start a new attempt, or join an existing one.
///
/// If a task calling this method is cancelled before spawning the returned future, this
/// attempt is immediatedly deemed as having panicked will happen, but without a panic ever
/// happening.
///
/// Returns one future for waiting for the result and possibly another which needs to be
/// spawned when `Some`. Spawning has to happen before waiting is started, otherwise the first
/// future will never make progress.
///
/// This complication exists because on `pageserver` we cannot use `tokio::spawn` directly
/// at this time.
pub async fn try_restart<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> (
impl Future<Output = Result<T, E2>> + Send + 'static,
Option<impl Future<Output = ()> + Send + 'static>,
)
where
E2: From<E1> + From<RetriedTaskPanicked> + Send + 'static,
{
use futures::future::Either;
match self.decide_to_retry_or_join(retry_with).await {
Ok(terminal) => (Either::Left(async move { terminal }), None),
Err((rx, maybe_fut)) => {
let recv = Self::make_oneshot_alike_receiver(rx);
(Either::Right(recv), maybe_fut)
}
}
}
/// Returns a Ok if the previous attempt had resulted in a terminal result. Err is returned
/// when an attempt can be joined and possibly needs to be spawned.
async fn decide_to_retry_or_join<E2>(
&self,
retry_with: impl MakeFuture<Output = Result<T, E1>>,
) -> Result<
Result<T, E2>,
(
tokio::sync::broadcast::Receiver<Result<T, E1>>,
Option<impl Future<Output = ()> + Send + 'static>,
),
>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
let mut g = self.inner.lock().await;
let maybe_rx = match &*g {
MaybeDone::Done(Ok(t)) => return Ok(Ok(t.to_owned())),
MaybeDone::Done(Err(e)) if e.is_permanent() => return Ok(Err(E2::from(e.to_owned()))),
MaybeDone::Pending(weak) => {
// failure to upgrade can mean only one thing: there was an unexpected
// panic which we consider as a transient retryable error.
weak.upgrade()
}
MaybeDone::Done(Err(_retryable)) => None,
MaybeDone::NotStarted => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
// new attempt
// panic safety: invoke the factory before configuring the pending value
let fut = retry_with.make_future();
let (strong, fut) = self.make_run_and_complete(fut, &mut g);
(strong, Some(fut))
}
};
// important: the Arc<Receiver> is not held after unlocking
// important: we resubscribe before lock is released to be sure to get a message which
// is sent once receiver is dropped
let rx = strong.resubscribe();
drop(strong);
Err((rx, maybe_fut))
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Returns an `Arc<Receiver<V>>` which is valid until the attempt completes, and the future
/// which will need to run to completion outside the lifecycle of the caller.
fn make_run_and_complete(
&self,
fut: impl Future<Output = Result<T, E1>> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, MaybeDone<Result<T, E1>>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<Result<T, E1>>>,
impl Future<Output = ()> + Send + 'static,
) {
#[cfg(debug_assertions)]
match &**g {
MaybeDone::Pending(weak) => {
assert!(
weak.upgrade().is_none(),
"when starting a restart, should no longer have an upgradeable channel"
);
}
MaybeDone::Done(Err(err)) => {
assert!(
!err.is_permanent(),
"when restarting, the err must be transient"
);
}
MaybeDone::Done(Ok(_)) => {
panic!("unexpected restart after a completion on MaybeDone");
}
MaybeDone::NotStarted => {}
}
self.make_run_and_complete_any(fut, g)
}
/// Oneshot alike as in it's a future which will be consumed by an `await`.
///
/// Otherwise the caller might think it's beneficial or reasonable to poll the channel multiple
/// times.
async fn make_oneshot_alike_receiver<E2>(
mut rx: tokio::sync::broadcast::Receiver<Result<T, E1>>,
) -> Result<T, E2>
where
E2: From<E1> + From<RetriedTaskPanicked>,
{
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(E2::from(e)),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(E2::from(RetriedTaskPanicked))
}
}
}
}
impl<V> SharedRetryable<V>
where
V: std::fmt::Debug + Clone + Send + 'static,
{
/// Attempt to run once a spawned future to completion.
///
/// Any previous attempt which panicked will be retried, but the `RetriedTaskPanicked` will be
/// returned when the most recent attempt panicked.
#[cfg(test)]
pub async fn attempt_spawn(
&self,
attempt_with: impl MakeFuture<Output = V>,
) -> Result<V, RetriedTaskPanicked> {
let (rx, maybe_fut) = {
let mut g = self.inner.lock().await;
let maybe_rx = match &*g {
MaybeDone::Done(v) => return Ok(v.to_owned()),
MaybeDone::Pending(weak) => {
// see comment in try_restart
weak.upgrade()
}
MaybeDone::NotStarted => None,
};
let (strong, maybe_fut) = match maybe_rx {
Some(strong) => (strong, None),
None => {
let fut = attempt_with.make_future();
let (strong, fut) = self.make_run_and_complete_any(fut, &mut g);
(strong, Some(fut))
}
};
// see decide_to_retry_or_join for important notes
let rx = strong.resubscribe();
drop(strong);
(rx, maybe_fut)
};
if let Some(fut) = maybe_fut {
// this is a top level function, need to spawn directly
// from pageserver one wouldn't use this but more piecewise functions
tokio::spawn(fut);
}
let recv = Self::make_oneshot_alike_receiver_any(rx);
recv.await
}
/// Configure a new attempt, but leave spawning it to the caller.
///
/// Forgetting the returned future is outside of scope of any correctness guarantees; all of
/// the waiters will then be deadlocked, and the MaybeDone will forever be pending. Dropping
/// and not running the future will then require a new attempt.
///
/// Also returns an `Arc<Receiver<V>>` which is valid until the attempt completes.
fn make_run_and_complete_any(
&self,
fut: impl Future<Output = V> + Send + 'static,
g: &mut tokio::sync::MutexGuard<'_, MaybeDone<V>>,
) -> (
Arc<tokio::sync::broadcast::Receiver<V>>,
impl Future<Output = ()> + Send + 'static,
) {
let (tx, rx) = tokio::sync::broadcast::channel(1);
let strong = Arc::new(rx);
**g = MaybeDone::Pending(Arc::downgrade(&strong));
let retry = {
let strong = strong.clone();
self.clone().run_and_complete(fut, tx, strong)
};
#[cfg(debug_assertions)]
match &**g {
MaybeDone::Pending(weak) => {
let rx = weak.upgrade().expect("holding the weak and strong locally");
assert!(Arc::ptr_eq(&strong, &rx));
}
_ => unreachable!("MaybeDone::pending must be set after spawn_and_run_complete_any"),
}
(strong, retry)
}
/// Run the actual attempt, and communicate the response via both:
/// - setting the `MaybeDone::Done`
/// - the broadcast channel
async fn run_and_complete(
self,
fut: impl Future<Output = V>,
tx: tokio::sync::broadcast::Sender<V>,
strong: Arc<tokio::sync::broadcast::Receiver<V>>,
) {
let res = fut.await;
{
let mut g = self.inner.lock().await;
g.complete(&strong, res.clone());
// make the weak un-upgradeable by dropping the final alive
// reference to it. it is final Arc because the Arc never escapes
// the critical section in `decide_to_retry_or_join` or `attempt_spawn`.
Arc::try_unwrap(strong).expect("expected this to be the only Arc<Receiver<V>>");
}
// now no one can get the Pending(weak) value to upgrade and they only see
// the Done(res).
//
// send the result value to listeners, if any
drop(tx.send(res));
}
#[cfg(test)]
async fn make_oneshot_alike_receiver_any(
mut rx: tokio::sync::broadcast::Receiver<V>,
) -> Result<V, RetriedTaskPanicked> {
use tokio::sync::broadcast::error::RecvError;
match rx.recv().await {
Ok(t) => Ok(t),
Err(RecvError::Closed | RecvError::Lagged(_)) => {
// lagged doesn't mean anything with 1 send, but whatever, handle it the same
// this case should only ever happen if a panick happened in the `fut`.
Err(RetriedTaskPanicked)
}
}
}
}
/// MaybeDone handles synchronization for multiple requests and the single actual task.
///
/// If request handlers witness `Pending` which they are able to upgrade, they are guaranteed a
/// useful `recv().await`, where useful means "value" or "disconnect" arrives. If upgrade fails,
/// this means that "disconnect" has happened in the past.
///
/// On successful execution the one executing task will set this to `Done` variant, with the actual
/// resulting value.
#[derive(Debug, Default)]
pub enum MaybeDone<V> {
Pending(std::sync::Weak<tokio::sync::broadcast::Receiver<V>>),
Done(V),
#[default]
NotStarted,
}
impl<V: std::fmt::Debug> MaybeDone<V> {
fn complete(&mut self, _strong: &Arc<tokio::sync::broadcast::Receiver<V>>, outcome: V) {
#[cfg(debug_assertions)]
match self {
MaybeDone::Pending(weak) => {
let same = weak
.upgrade()
// we don't yet have Receiver::same_channel
.map(|rx| Arc::ptr_eq(_strong, &rx))
.unwrap_or(false);
assert!(same, "different channel had been replaced or dropped");
}
other => panic!("unexpected MaybeDone: {other:?}"),
}
*self = MaybeDone::Done(outcome);
}
}
#[cfg(test)]
mod tests {
use super::{RetriedTaskPanicked, Retryable, SharedRetryable};
use std::sync::Arc;
#[derive(Debug)]
enum OuterError {
AttemptPanicked,
Unlucky,
}
#[derive(Clone, Debug)]
enum InnerError {
Unlucky,
}
impl Retryable for InnerError {
fn is_permanent(&self) -> bool {
false
}
}
impl From<InnerError> for OuterError {
fn from(_: InnerError) -> Self {
OuterError::Unlucky
}
}
impl From<RetriedTaskPanicked> for OuterError {
fn from(_: RetriedTaskPanicked) -> Self {
OuterError::AttemptPanicked
}
}
#[tokio::test]
async fn restartable_until_permanent() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let res = shr
.try_restart_spawn(|| async move { panic!("really unlucky") })
.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)));
let res = shr
.try_restart_spawn(|| async move { Err(InnerError::Unlucky) })
.await;
assert!(matches!(res, Err(OuterError::Unlucky)));
let res = shr.try_restart_spawn(|| async move { Ok(42) }).await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
let res = shr
.try_restart_spawn(|| async move { panic!("rerun should clone Ok(42)") })
.await;
assert!(matches!(res, Ok::<u8, OuterError>(42)));
}
/// Demonstration of the SharedRetryable::attempt
#[tokio::test]
async fn attemptable_until_no_panic() {
let shr = SharedRetryable::<u8>::default();
let res = shr
.attempt_spawn(|| async move { panic!("should not interfere") })
.await;
assert!(matches!(res, Err(RetriedTaskPanicked)), "{res:?}");
let res = shr.attempt_spawn(|| async move { 42 }).await;
assert_eq!(res, Ok(42));
let res = shr
.attempt_spawn(|| async move { panic!("should not be called") })
.await;
assert_eq!(res, Ok(42));
}
#[tokio::test]
async fn cancelling_spawner_is_fine() {
let shr = SharedRetryable::<Result<u8, InnerError>>::default();
let (recv1, maybe_fut) = shr
.try_restart(|| async move { panic!("should not have been called") })
.await;
let should_be_spawned = maybe_fut.unwrap();
let (recv2, maybe_fut) = shr
.try_restart(|| async move {
panic!("should never be called because waiting on should_be_spawned")
})
.await;
assert!(
matches!(maybe_fut, None),
"only the first one should had created the future"
);
let mut recv1 = std::pin::pin!(recv1);
let mut recv2 = std::pin::pin!(recv2);
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {},
_ = &mut recv1 => unreachable!("should not have completed because should_be_spawned not spawned"),
_ = &mut recv2 => unreachable!("should not have completed because should_be_spawned not spawned"),
}
drop(should_be_spawned);
let res = recv1.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
let res = recv2.await;
assert!(matches!(res, Err(OuterError::AttemptPanicked)), "{res:?}");
// but we can still reach a terminal state if the api is not misused or the
// should_be_spawned winner is not cancelled
let recv1 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(42) });
let recv2 = shr.try_restart_spawn::<OuterError>(|| async move { Ok(43) });
assert_eq!(recv1.await.unwrap(), 42);
assert_eq!(recv2.await.unwrap(), 42, "43 should never be returned");
}
#[tokio::test]
async fn service_example() {
#[derive(Debug, Clone, Copy)]
enum OneLevelError {
TaskPanicked,
}
impl Retryable for OneLevelError {
fn is_permanent(&self) -> bool {
// for a single level errors, this wording is weird
!matches!(self, OneLevelError::TaskPanicked)
}
}
impl From<RetriedTaskPanicked> for OneLevelError {
fn from(_: RetriedTaskPanicked) -> Self {
OneLevelError::TaskPanicked
}
}
#[derive(Clone, Default)]
struct Service(SharedRetryable<Result<u8, OneLevelError>>);
impl Service {
async fn work(
&self,
completions: Arc<std::sync::atomic::AtomicUsize>,
) -> Result<u8, OneLevelError> {
self.0
.try_restart_spawn(|| async move {
// give time to cancel some of the tasks
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
completions.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self::work_once().await
})
.await
}
async fn work_once() -> Result<u8, OneLevelError> {
Ok(42)
}
}
let svc = Service::default();
let mut js = tokio::task::JoinSet::new();
let barrier = Arc::new(tokio::sync::Barrier::new(10 + 1));
let completions = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let handles = (0..10)
.map(|_| {
js.spawn({
let svc = svc.clone();
let barrier = barrier.clone();
let completions = completions.clone();
async move {
// make sure all tasks are ready to start at the same time
barrier.wait().await;
// after successfully starting the work, any of the futures could get cancelled
svc.work(completions).await
}
})
})
.collect::<Vec<_>>();
barrier.wait().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
handles[5].abort();
let mut cancellations = 0;
while let Some(res) = js.join_next().await {
// all complete with the same result
match res {
Ok(res) => assert_eq!(res.unwrap(), 42),
Err(je) => {
// except for the one task we cancelled; it's cancelling
// does not interfere with the result
assert!(je.is_cancelled());
cancellations += 1;
assert_eq!(cancellations, 1, "only 6th task was aborted");
// however we cannot assert that everytime we get to cancel the 6th task
}
}
}
// there will be at most one terminal completion
assert_eq!(completions.load(std::sync::atomic::Ordering::Relaxed), 1);
}
}

View File

@@ -272,6 +272,9 @@ pub enum TaskKind {
#[cfg(test)]
UnitTest,
/// Task which is the only task to delete this particular timeline
DeleteTimeline,
}
#[derive(Default)]

View File

@@ -459,6 +459,53 @@ pub enum DeleteTimelineError {
Other(#[from] anyhow::Error),
}
#[derive(Debug, thiserror::Error, Clone, Copy)]
enum InnerDeleteTimelineError {
#[error("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")]
QueueUninitialized,
#[error("failpoint: {0}")]
Failpoint(&'static str),
#[error("failed to remove local timeline directory")]
FailedToRemoveLocalTimelineDirectory,
#[error("index_part.json upload failed")]
UploadFailed,
#[error("the deleted timeline grew branches while deleting it; tenant should now be broken")]
DeletedGrewChildren,
#[error("panicked while removing the timeline from Tenant::timelines")]
OtherPanic,
}
impl utils::shared_retryable::Retryable for InnerDeleteTimelineError {
fn is_permanent(&self) -> bool {
use InnerDeleteTimelineError::*;
match self {
QueueUninitialized => false,
Failpoint(_) => false,
FailedToRemoveLocalTimelineDirectory => false,
UploadFailed => false,
DeletedGrewChildren | OtherPanic => true,
}
}
}
impl From<InnerDeleteTimelineError> for DeleteTimelineError {
fn from(value: InnerDeleteTimelineError) -> Self {
DeleteTimelineError::Other(anyhow::Error::new(value))
}
}
impl From<utils::shared_retryable::RetriedTaskPanicked> for DeleteTimelineError {
fn from(_: utils::shared_retryable::RetriedTaskPanicked) -> Self {
DeleteTimelineError::Other(anyhow::anyhow!("deleting timeline failed, please retry"))
}
}
struct RemoteStartupData {
index_part: IndexPart,
remote_metadata: TimelineMetadata,
@@ -1426,7 +1473,7 @@ impl Tenant {
/// Removes timeline-related in-memory data
pub async fn delete_timeline(
&self,
self: &Arc<Tenant>,
timeline_id: TimelineId,
_ctx: &RequestContext,
) -> Result<(), DeleteTimelineError> {
@@ -1459,162 +1506,200 @@ impl Tenant {
timeline
};
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
// if we have concurrent requests, we will only execute one version of following future;
// initially it did not have any means to be cancelled.
//
// NB: If you call delete_timeline multiple times concurrently, they will
// all go through the motions here. Make sure the code here is idempotent,
// and don't error out if some of the shutdown tasks have already been
// completed!
// NOTE: Unlike "the usual" futures, this one will log any errors instead of just propagating
// them to the caller. This is because this one future produces a value, which will need to
// be cloneable to everyone interested, and normal `std::error::Error` are not clonable.
// Also, it wouldn't make sense to log the same failure multiple times, it would look like
// multiple failures to anyone reading the logs.
let factory = || {
let tenant = self.clone();
let tenant_id = self.tenant_id;
let timeline = timeline.clone();
let timeline_id = timeline.timeline_id;
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
async move {
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
// Stop the walreceiver first.
debug!("waiting for wal receiver to shutdown");
timeline.walreceiver.stop().await;
debug!("wal receiver shutdown confirmed");
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
return Err(DeleteTimelineError::Other(anyhow::anyhow!("upload queue is uninitialized, likely the timeline was in Broken state prior to this call because it failed to fetch IndexPart during load or attach, check the logs")));
// Prevent new uploads from starting.
if let Some(remote_client) = timeline.remote_client.as_ref() {
let res = remote_client.stop();
match res {
Ok(()) => {}
Err(e) => match e {
remote_timeline_client::StopError::QueueUninitialized => {
// This case shouldn't happen currently because the
// load and attach code bails out if _any_ of the timeline fails to fetch its IndexPart.
// That is, before we declare the Tenant as Active.
// But we only allow calls to delete_timeline on Active tenants.
warn!("failed to stop RemoteTimelineClient due to uninitialized queue");
return Err(InnerDeleteTimelineError::QueueUninitialized);
}
},
}
}
// Stop & wait for the remaining timeline tasks, including upload tasks.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(tenant_id), Some(timeline_id)).await;
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
warn!("upload failed: {e}");
return Err(InnerDeleteTimelineError::UploadFailed);
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let _layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by us earlier.
let local_timeline_directory =
tenant.conf.timeline_path(&timeline_id, &tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"failpoint: timeline-delete-before-rm",
))
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = tenant.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
}
Err(e) => {
warn!(
"failed to remove local timeline directory {}: {e:#}",
local_timeline_directory.display()
);
return Err(
InnerDeleteTimelineError::FailedToRemoveLocalTimelineDirectory,
);
}
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(InnerDeleteTimelineError::Failpoint(
"timeline-delete-after-rm",
))
});
// Remove the timeline from the map or poison it if we've grown children.
// The poisonsed map will make the child's GetPage accesses fail.
//
// Growing children can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
let removed_timeline =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut timelines = tenant.timelines.lock().unwrap();
let children_exist = timelines.iter().any(|(_, entry)| {
entry.get_ancestor_timeline_id() == Some(timeline_id)
});
if children_exist {
std::panic::panic_any(InnerDeleteTimelineError::DeletedGrewChildren);
}
timelines.remove(&timeline_id)
}));
match removed_timeline {
Ok(Some(_)) => Ok(()),
Ok(None) => {
// with SharedRetryable this should no longer happen
warn!("no other task should had dropped the Timeline");
Ok(())
}
Err(panic) => {
if let Some(err) = panic.downcast_ref::<InnerDeleteTimelineError>() {
Err(*err)
} else {
// the panic has already been formatted by hook, don't worry about it
Err(InnerDeleteTimelineError::OtherPanic)
}
},
}
}
// execute in the *winner's* span so we will capture the request id etc.
.in_current_span()
};
let (recv, maybe_fut) = timeline.delete_self.try_restart(factory).await;
if let Some(fut) = maybe_fut {
crate::task_mgr::spawn(
&tokio::runtime::Handle::current(),
TaskKind::DeleteTimeline,
Some(self.tenant_id()),
None,
&format!("delete_timeline {}", timeline.timeline_id),
false,
async move {
fut.await;
Ok(())
},
}
);
}
// Stop & wait for the remaining timeline tasks, including upload tasks.
// NB: This and other delete_timeline calls do not run as a task_mgr task,
// so, they are not affected by this shutdown_tasks() call.
info!("waiting for timeline tasks to shutdown");
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
// Mark timeline as deleted in S3 so we won't pick it up next time
// during attach or pageserver restart.
// See comment in persist_index_part_with_deleted_flag.
if let Some(remote_client) = timeline.remote_client.as_ref() {
match remote_client.persist_index_part_with_deleted_flag().await {
// If we (now, or already) marked it successfully as deleted, we can proceed
Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (),
// Bail out otherwise
Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_))
| Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(e)));
}
}
}
{
// Grab the layer_removal_cs lock, and actually perform the deletion.
//
// This lock prevents multiple concurrent delete_timeline calls from
// stepping on each other's toes, while deleting the files. It also
// prevents GC or compaction from running at the same time.
//
// Note that there are still other race conditions between
// GC, compaction and timeline deletion. GC task doesn't
// register itself properly with the timeline it's
// operating on. See
// https://github.com/neondatabase/neon/issues/2671
//
// No timeout here, GC & Compaction should be responsive to the
// `TimelineState::Stopping` change.
info!("waiting for layer_removal_cs.lock()");
let layer_removal_guard = timeline.layer_removal_cs.lock().await;
info!("got layer_removal_cs.lock(), deleting layer files");
// NB: storage_sync upload tasks that reference these layers have been cancelled
// by the caller.
let local_timeline_directory = self.conf.timeline_path(&timeline_id, &self.tenant_id);
fail::fail_point!("timeline-delete-before-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
});
// NB: This need not be atomic because the deleted flag in the IndexPart
// will be observed during tenant/timeline load. The deletion will be resumed there.
//
// For configurations without remote storage, we tolerate that we're not crash-safe here.
// The timeline may come up Active but with missing layer files, in such setups.
// See https://github.com/neondatabase/neon/pull/3919#issuecomment-1531726720
match std::fs::remove_dir_all(&local_timeline_directory) {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
// This can happen if we're called a second time, e.g.,
// because of a previous failure/cancellation at/after
// failpoint timeline-delete-after-rm.
//
// It can also happen if we race with tenant detach, because,
// it doesn't grab the layer_removal_cs lock.
//
// For now, log and continue.
// warn! level is technically not appropriate for the
// first case because we should expect retries to happen.
// But the error is so rare, it seems better to get attention if it happens.
let tenant_state = self.current_state();
warn!(
timeline_dir=?local_timeline_directory,
?tenant_state,
"timeline directory not found, proceeding anyway"
);
// continue with the rest of the deletion
}
res => res.with_context(|| {
format!(
"Failed to remove local timeline directory '{}'",
local_timeline_directory.display()
)
})?,
}
info!("finished deleting layer files, releasing layer_removal_cs.lock()");
drop(layer_removal_guard);
}
fail::fail_point!("timeline-delete-after-rm", |_| {
Err(anyhow::anyhow!("failpoint: timeline-delete-after-rm"))?
});
// Remove the timeline from the map.
let mut timelines = self.timelines.lock().unwrap();
let children_exist = timelines
.iter()
.any(|(_, entry)| entry.get_ancestor_timeline_id() == Some(timeline_id));
// XXX this can happen because `branch_timeline` doesn't check `TimelineState::Stopping`.
// We already deleted the layer files, so it's probably best to panic.
// (Ideally, above remove_dir_all is atomic so we don't see this timeline after a restart)
if children_exist {
panic!("Timeline grew children while we removed layer files");
}
let removed_timeline = timelines.remove(&timeline_id);
if removed_timeline.is_none() {
// This can legitimately happen if there's a concurrent call to this function.
// T1 T2
// lock
// unlock
// lock
// unlock
// remove files
// lock
// remove from map
// unlock
// return
// remove files
// lock
// remove from map observes empty map
// unlock
// return
debug!("concurrent call to this function won the race");
}
drop(timelines);
Ok(())
recv.await
}
pub fn current_state(&self) -> TenantState {

View File

@@ -227,6 +227,9 @@ pub struct Timeline {
state: watch::Sender<TimelineState>,
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
pub(super) delete_self:
utils::shared_retryable::SharedRetryable<Result<(), super::InnerDeleteTimelineError>>,
}
/// Internal structure to hold all data needed for logical size calculation.
@@ -1421,6 +1424,8 @@ impl Timeline {
eviction_task_timeline_state: tokio::sync::Mutex::new(
EvictionTaskTimelineState::default(),
),
delete_self: utils::shared_retryable::SharedRetryable::default(),
};
result.repartition_threshold = result.get_checkpoint_distance() / 10;
result

View File

@@ -324,11 +324,7 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
If we're stuck uploading the index file with the is_delete flag,
eventually console will hand up and retry.
If we're still stuck at the retry time, ensure that the retry
fails with status 500, signalling to console that it should retry
later.
Ideally, timeline_delete should return 202 Accepted and require
console to poll for completion, but, that would require changing
the API contract.
eventually completes with the same status.
"""
neon_env_builder.enable_remote_storage(
@@ -342,24 +338,34 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
ps_http = env.pageserver.http_client()
# make the first call sleep practically forever
failpoint_name = "persist_index_part_with_deleted_flag_after_set_before_upload_pause"
ps_http.configure_failpoints((failpoint_name, "pause"))
def first_call(result_queue):
def delete_timeline_call(name, result_queue, barrier):
if barrier:
barrier.wait()
try:
log.info("first call start")
log.info(f"{name} call start")
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=10)
log.info("first call success")
log.info(f"{name} call success")
result_queue.put("success")
except Exception:
log.exception("first call failed")
log.exception(f"{name} call failed")
result_queue.put("failure, see log for stack trace")
first_call_result: queue.Queue[str] = queue.Queue()
first_call_thread = threading.Thread(target=first_call, args=(first_call_result,))
delete_results: queue.Queue[str] = queue.Queue()
first_call_thread = threading.Thread(
target=delete_timeline_call,
args=(
"1st",
delete_results,
None,
),
)
first_call_thread.start()
second_call_thread = None
try:
def first_call_hit_failpoint():
@@ -369,38 +375,53 @@ def test_concurrent_timeline_delete_if_first_stuck_at_index_upload(
wait_until(50, 0.1, first_call_hit_failpoint)
# make the second call and assert behavior
log.info("second call start")
error_msg_re = "another task is already setting the deleted_flag, started at"
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
assert second_call_err.value.status_code == 500
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
# the second call will try to transition the timeline into Stopping state as well
barrier = threading.Barrier(2)
second_call_thread = threading.Thread(
target=delete_timeline_call,
args=(
"2nd",
delete_results,
barrier,
),
)
second_call_thread.start()
barrier.wait()
# release the pause
ps_http.configure_failpoints((failpoint_name, "off"))
# both should had succeeded: the second call will coalesce with the already-ongoing first call
result = delete_results.get()
assert result == "success"
result = delete_results.get()
assert result == "success"
# the second call will try to transition the timeline into Stopping state, but it's already in that state
# (the transition to Stopping state is not part of the request coalescing, because Tenant and Timeline states are a mess already)
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
log.info("second call failed as expected")
# by now we know that the second call failed, let's ensure the first call will finish
ps_http.configure_failpoints((failpoint_name, "off"))
result = first_call_result.get()
assert result == "success"
def second_call_attempt():
assert env.pageserver.log_contains(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
wait_until(50, 0.1, second_call_attempt)
finally:
log.info("joining first call thread")
log.info("joining 1st thread")
# in any case, make sure the lifetime of the thread is bounded to this test
first_call_thread.join()
if second_call_thread:
log.info("joining 2nd thread")
second_call_thread.join()
def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
"""
If the client hangs up before we start the index part upload but after we mark it
deleted in local memory, a subsequent delete_timeline call should be able to do
another delete timeline operation.
This tests cancel safety up to the given failpoint.
Make sure the timeline_delete runs to completion even if first request is cancelled because of a timeout.
"""
neon_env_builder.enable_remote_storage(
remote_storage_kind=RemoteStorageKind.MOCK_S3,
@@ -437,12 +458,12 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_until(50, 0.1, got_hangup_log_message)
# ok, retry without failpoint, it should succeed
# after disabling the failpoint pause, the original attempt should complete eventually
ps_http.configure_failpoints((failpoint_name, "off"))
# this should succeed
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
# the second call will try to transition the timeline into Stopping state, but it's already in that state
env.pageserver.allowed_errors.append(
f".*{child_timeline_id}.*Ignoring new state, equal to the existing one: Stopping"
)
def delete_timeline_completes():
assert [env.initial_timeline] == [
timeline_id for (_, timeline_id) in env.neon_cli.list_timelines()
]
wait_until(50, 0.5, delete_timeline_completes)