Revert "revert the cancellation sensitivity change for the flush task, doesn't work because flush task can't disambiguate orderly shutdown"

This reverts commit 42922cebe0.
This commit is contained in:
Christian Schwarz
2025-04-11 18:39:33 +02:00
parent 42922cebe0
commit 1155945a3e

View File

@@ -20,6 +20,11 @@ pub struct FlushHandleInner<Buf, W> {
/// A bi-directional channel that sends (buffer, offset) for writes,
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// The flush task is sometimes sensitive to channel disconnection
/// (i.e. when we drop [`Self::channel`]), other times sensitive to
/// [`FlushBackgroundTask::cancel`], but never both.
/// So, also store this drop guard.
set_flush_task_cancelled: tokio_util::sync::DropGuard,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
}
@@ -134,8 +139,10 @@ where
back.try_send(buf.flush())
.expect("we just created it with capacity 1");
let cancel = cancel.child_token();
let join_handle = tokio::spawn(
FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
FlushBackgroundTask::new(back, file, gate_guard, cancel.clone(), ctx)
.run()
.instrument(span),
);
@@ -143,6 +150,7 @@ where
FlushHandle {
inner: Some(FlushHandleInner {
channel: front,
set_flush_task_cancelled: cancel.drop_guard(),
join_handle,
}),
}
@@ -189,6 +197,7 @@ where
.take()
.expect("must not use after we returned an error");
drop(handle.channel.tx);
drop(handle.set_flush_task_cancelled);
handle.join_handle.await.unwrap()
}