From 1155945a3eeb54e31cae8afc04c80dfd10605587 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Apr 2025 18:39:33 +0200 Subject: [PATCH] Revert "revert the cancellation sensitivity change for the flush task, doesn't work because flush task can't disambiguate orderly shutdown" This reverts commit 42922cebe0d58d1aa89f4bedae00ec4d9863b6d3. --- .../src/virtual_file/owned_buffers_io/write/flush.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index c076ba0eca..282afd4f9f 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -20,6 +20,11 @@ pub struct FlushHandleInner { /// A bi-directional channel that sends (buffer, offset) for writes, /// and receives recyled buffer. channel: duplex::mpsc::Duplex, FullSlice>, + /// The flush task is sometimes sensitive to channel disconnection + /// (i.e. when we drop [`Self::channel`]), other times sensitive to + /// [`FlushBackgroundTask::cancel`], but never both. + /// So, also store this drop guard. + set_flush_task_cancelled: tokio_util::sync::DropGuard, /// Join handle for the background flush task. join_handle: tokio::task::JoinHandle, FlushTaskError>>, } @@ -134,8 +139,10 @@ where back.try_send(buf.flush()) .expect("we just created it with capacity 1"); + let cancel = cancel.child_token(); + let join_handle = tokio::spawn( - FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx) + FlushBackgroundTask::new(back, file, gate_guard, cancel.clone(), ctx) .run() .instrument(span), ); @@ -143,6 +150,7 @@ where FlushHandle { inner: Some(FlushHandleInner { channel: front, + set_flush_task_cancelled: cancel.drop_guard(), join_handle, }), } @@ -189,6 +197,7 @@ where .take() .expect("must not use after we returned an error"); drop(handle.channel.tx); + drop(handle.set_flush_task_cancelled); handle.join_handle.await.unwrap() }