From 9fb77d6cdd0894ec4e93b4fe3a576655cfad3b2e Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 18 Mar 2025 19:48:43 +0100 Subject: [PATCH] buffered writer: add cancellation sensitivity (#11052) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In - https://github.com/neondatabase/neon/pull/10993#issuecomment-2690428336 I added infinite retries for buffered writer flush IOs, primarily to gracefully handle ENOSPC but more generally so that the buffered writer is not left in a state where reads from the surrounding InMemoryLayer cause panics. However, I didn't add cancellation sensitivity, which is concerning because then there is no way to detach a timeline/tenant that is encountering the write IO errors. That’s a legitimate scenario in the case of some edge case bug. See the #10993 description for details. This PR - first makes flush loop infallible, enabled by infinite retries - then adds sensitivity to `Timeline::cancel` to the flush loop, thereby making it fallible in one specific way again - finally fixes the InMemoryLayer/EphemeralFile/BufferedWriter amalgamate to remain read-available after flush loop is cancelled. The support for read-availability after cancellation is necessary so that reads from the InMemoryLayer that are already queued up behind the RwLock that wraps the BufferedWriter won't panic because of the `mutable=None` that we leave behind in case the flush loop gets cancelled. # Alternatives One might think that we can only ship the change for read-availability if flush encounters an error, without the infinite retrying and/or cancellation sensitivity complexity. The problem with that is that read-availability sounds good but is really quite useless, because we cannot ingest new WAL without a writable InMemoryLayer. Thus, very soon after we transition to read-only mode, reads from compute are going to wait anyway, but on `wait_lsn` instead of the RwLock, because ingest isn't progressing. Thus, having the infinite flush retries still makes more sense because they're just "slowness" to the user, whereas wait_lsn is hard errors. --- libs/utils/src/sync/duplex/mpsc.rs | 4 + pageserver/benches/bench_ingest.rs | 13 ++- pageserver/src/tenant/ephemeral_file.rs | 74 ++++++++++------- pageserver/src/tenant/layer_map.rs | 3 + .../tenant/remote_timeline_client/download.rs | 18 +++- .../tenant/storage_layer/inmemory_layer.rs | 5 +- pageserver/src/tenant/timeline.rs | 3 + .../src/tenant/timeline/layer_manager.rs | 16 +++- .../virtual_file/owned_buffers_io/write.rs | 81 +++++++++++++----- .../owned_buffers_io/write/flush.rs | 82 +++++++++---------- 10 files changed, 200 insertions(+), 99 deletions(-) diff --git a/libs/utils/src/sync/duplex/mpsc.rs b/libs/utils/src/sync/duplex/mpsc.rs index 56b4e6d2b3..7c0183e615 100644 --- a/libs/utils/src/sync/duplex/mpsc.rs +++ b/libs/utils/src/sync/duplex/mpsc.rs @@ -26,6 +26,10 @@ impl Duplex { self.tx.send(x).await } + pub fn try_send(&self, x: S) -> Result<(), mpsc::error::TrySendError> { + self.tx.try_send(x) + } + /// Receives the next value for this receiver. /// /// This method returns `None` if the channel has been closed and there are diff --git a/pageserver/benches/bench_ingest.rs b/pageserver/benches/bench_ingest.rs index 272c3e2338..000938b189 100644 --- a/pageserver/benches/bench_ingest.rs +++ b/pageserver/benches/bench_ingest.rs @@ -13,6 +13,7 @@ use pageserver::{page_cache, virtual_file}; use pageserver_api::key::Key; use pageserver_api::shard::TenantShardId; use pageserver_api::value::Value; +use tokio_util::sync::CancellationToken; use utils::bin_ser::BeSer; use utils::id::{TenantId, TimelineId}; use wal_decoder::serialized_batch::SerializedValueBatch; @@ -61,8 +62,18 @@ async fn ingest( RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error).with_scope_debug_tools(); let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); - let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?; + let layer = InMemoryLayer::create( + conf, + timeline_id, + tenant_shard_id, + lsn, + &gate, + &cancel, + &ctx, + ) + .await?; let data = Value::Image(Bytes::from(vec![0u8; put_size])); let data_ser_size = data.serialized_size().unwrap() as usize; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 396d930f77..19215bb918 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,6 +9,7 @@ use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, Slice}; +use tokio_util::sync::CancellationToken; use tracing::{error, info_span}; use utils::id::TimelineId; @@ -19,7 +20,7 @@ use crate::page_cache; use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut; use crate::virtual_file::owned_buffers_io::slice::SliceMutExt; -use crate::virtual_file::owned_buffers_io::write::Buffer; +use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError}; use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io}; pub struct EphemeralFile { @@ -40,6 +41,7 @@ impl EphemeralFile { tenant_shard_id: TenantShardId, timeline_id: TimelineId, gate: &utils::sync::gate::Gate, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result { static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); @@ -75,6 +77,7 @@ impl EphemeralFile { file, || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, + cancel.child_token(), ctx, info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), ), @@ -101,6 +104,14 @@ impl Drop for EphemeralFile { } } +#[derive(Debug, thiserror::Error)] +pub(crate) enum EphemeralFileWriteError { + #[error("{0}")] + TooLong(String), + #[error("cancelled")] + Cancelled, +} + impl EphemeralFile { pub(crate) fn len(&self) -> u64 { self.bytes_written @@ -133,7 +144,7 @@ impl EphemeralFile { &mut self, srcbuf: &[u8], ctx: &RequestContext, - ) -> std::io::Result { + ) -> Result { let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?; if let Some(control) = control { control.release().await; @@ -145,24 +156,24 @@ impl EphemeralFile { &mut self, srcbuf: &[u8], ctx: &RequestContext, - ) -> std::io::Result<(u64, Option)> { + ) -> Result<(u64, Option), EphemeralFileWriteError> { let pos = self.bytes_written; let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!( - "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", - srcbuf_len = srcbuf.len(), - ), - ) + EphemeralFileWriteError::TooLong(format!( + "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}", + srcbuf_len = srcbuf.len(), + )) })?; // Write the payload let (nwritten, control) = self .buffered_writer .write_buffered_borrowed_controlled(srcbuf, ctx) - .await?; + .await + .map_err(|e| match e { + FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled, + })?; assert_eq!( nwritten, srcbuf.len(), @@ -184,8 +195,14 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral ) -> std::io::Result<(tokio_epoll_uring::Slice, usize)> { let submitted_offset = self.buffered_writer.bytes_submitted(); - let mutable = self.buffered_writer.inspect_mutable(); - let mutable = &mutable[0..mutable.pending()]; + let mutable = match self.buffered_writer.inspect_mutable() { + Some(mutable) => &mutable[0..mutable.pending()], + None => { + // Timeline::cancel and hence buffered writer flush was cancelled. + // Remain read-available while timeline is shutting down. + &[] + } + }; let maybe_flushed = self.buffered_writer.inspect_maybe_flushed(); @@ -216,7 +233,6 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral let (written_range, maybe_flushed_range) = { if maybe_flushed.is_some() { // [ written ][ maybe_flushed ][ mutable ] - // <- TAIL_SZ -><- TAIL_SZ -> // ^ // `submitted_offset` // <++++++ on disk +++++++????????????????> @@ -232,7 +248,6 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral ) } else { // [ written ][ mutable ] - // <- TAIL_SZ -> // ^ // `submitted_offset` // <++++++ on disk +++++++++++++++++++++++> @@ -366,8 +381,9 @@ mod tests { harness("ephemeral_file_holds_gate_open").unwrap(); let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); - let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); @@ -397,12 +413,13 @@ mod tests { let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap(); let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); - let mutable = file.buffered_writer.inspect_mutable(); + let mutable = file.buffered_writer.mutable(); let cap = mutable.capacity(); let align = mutable.align(); @@ -445,7 +462,7 @@ mod tests { let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap(); assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]); - let mutable_buffer_contents = file.buffered_writer.inspect_mutable(); + let mutable_buffer_contents = file.buffered_writer.mutable(); assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]); } @@ -454,13 +471,13 @@ mod tests { let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap(); let gate = utils::sync::gate::Gate::default(); - - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + let cancel = CancellationToken::new(); + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); // mutable buffer and maybe_flushed buffer each has `cap` bytes. - let cap = file.buffered_writer.inspect_mutable().capacity(); + let cap = file.buffered_writer.mutable().capacity(); let content: Vec = rand::thread_rng() .sample_iter(rand::distributions::Standard) @@ -470,10 +487,8 @@ mod tests { file.write_raw(&content, &ctx).await.unwrap(); // assert the state is as this test expects it to be - assert_eq!( - &file.load_to_io_buf(&ctx).await.unwrap(), - &content[0..cap * 2 + cap / 2] - ); + let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap(); + assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]); let md = file.buffered_writer.as_inner().path().metadata().unwrap(); assert_eq!( md.len(), @@ -485,7 +500,7 @@ mod tests { &content[cap..cap * 2] ); assert_eq!( - &file.buffered_writer.inspect_mutable()[0..cap / 2], + &file.buffered_writer.mutable()[0..cap / 2], &content[cap * 2..cap * 2 + cap / 2] ); } @@ -501,12 +516,13 @@ mod tests { harness("test_read_split_across_file_and_buffer").unwrap(); let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); - let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx) + let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx) .await .unwrap(); - let mutable = file.buffered_writer.inspect_mutable(); + let mutable = file.buffered_writer.mutable(); let cap = mutable.capacity(); let align = mutable.align(); let content: Vec = rand::thread_rng() diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index 2b04e53f10..96cee922ff 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -1112,6 +1112,7 @@ mod tests { }; use pageserver_api::key::DBDIR_KEY; use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; + use tokio_util::sync::CancellationToken; use utils::id::{TenantId, TimelineId}; use utils::shard::TenantShardId; @@ -1193,6 +1194,7 @@ mod tests { async fn ranged_search() { let harness = TenantHarness::create("ranged_search").await.unwrap(); let (tenant, ctx) = harness.load().await; + let cancel = CancellationToken::new(); let timeline_id = TimelineId::generate(); // Create the timeline such that the in-memory layers can be written // to the timeline directory. @@ -1209,6 +1211,7 @@ mod tests { harness.tenant_shard_id, lsn_range.start, &gate, + &cancel, &ctx, ) .await diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 0001f67c99..8b399996d5 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -205,6 +205,7 @@ async fn download_object( } #[cfg(target_os = "linux")] crate::virtual_file::io_engine::IoEngine::TokioEpollUring => { + use crate::virtual_file::owned_buffers_io::write::FlushTaskError; use std::sync::Arc; use crate::virtual_file::{IoBufferMut, owned_buffers_io}; @@ -228,6 +229,7 @@ async fn download_object( destination_file, || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, + cancel.child_token(), ctx, tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path), ); @@ -240,11 +242,21 @@ async fn download_object( { let chunk = match res { Ok(chunk) => chunk, - Err(e) => return Err(e), + Err(e) => return Err(DownloadError::from(e)), }; - buffered.write_buffered_borrowed(&chunk, ctx).await?; + buffered + .write_buffered_borrowed(&chunk, ctx) + .await + .map_err(|e| match e { + FlushTaskError::Cancelled => DownloadError::Cancelled, + })?; } - let inner = buffered.flush_and_into_inner(ctx).await?; + let inner = buffered + .flush_and_into_inner(ctx) + .await + .map_err(|e| match e { + FlushTaskError::Cancelled => DownloadError::Cancelled, + })?; Ok(inner) } .await?; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 46135b5330..bb4ae38ad1 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -19,6 +19,7 @@ use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; use pageserver_api::shard::TenantShardId; use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; use tracing::*; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -552,13 +553,15 @@ impl InMemoryLayer { tenant_shard_id: TenantShardId, start_lsn: Lsn, gate: &utils::sync::gate::Gate, + cancel: &CancellationToken, ctx: &RequestContext, ) -> Result { trace!( "initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}" ); - let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?; + let file = + EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, cancel, ctx).await?; let key = InMemoryLayerFileId(file.page_cache_file_id()); Ok(InMemoryLayer { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 3007d1e58a..ce2b8c4f1a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -4186,6 +4186,7 @@ impl Timeline { self.timeline_id, self.tenant_shard_id, &self.gate, + &self.cancel, ctx, ) .await?; @@ -6742,6 +6743,8 @@ impl Timeline { self.tenant_shard_id, in_memory.lsn_range.start, &self.gate, + // TODO: if we ever use this function in production code, we need to pass the real cancellation token + &CancellationToken::new(), ctx, ) .await diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index 1b489028dc..ed92ea28ce 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::{Context, bail, ensure}; use itertools::Itertools; use pageserver_api::shard::TenantShardId; +use tokio_util::sync::CancellationToken; use tracing::trace; use utils::id::TimelineId; use utils::lsn::{AtomicLsn, Lsn}; @@ -193,6 +194,7 @@ impl OpenLayerManager { /// Open a new writable layer to append data if there is no open layer, otherwise return the /// current open layer, called within `get_layer_for_write`. + #[allow(clippy::too_many_arguments)] pub(crate) async fn get_layer_for_write( &mut self, lsn: Lsn, @@ -200,6 +202,7 @@ impl OpenLayerManager { timeline_id: TimelineId, tenant_shard_id: TenantShardId, gate: &utils::sync::gate::Gate, + cancel: &CancellationToken, ctx: &RequestContext, ) -> anyhow::Result> { ensure!(lsn.is_aligned()); @@ -227,9 +230,16 @@ impl OpenLayerManager { timeline_id, start_lsn, lsn ); - let new_layer = - InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, gate, ctx) - .await?; + let new_layer = InMemoryLayer::create( + conf, + timeline_id, + tenant_shard_id, + start_lsn, + gate, + cancel, + ctx, + ) + .await?; let layer = Arc::new(new_layer); self.layer_map.open_layer = Some(layer.clone()); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index a7e06c0a14..f3ab2c285a 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -3,7 +3,9 @@ use std::sync::Arc; pub(crate) use flush::FlushControl; use flush::FlushHandle; +pub(crate) use flush::FlushTaskError; use tokio_epoll_uring::IoBuf; +use tokio_util::sync::CancellationToken; use super::io_buf_aligned::IoBufAligned; use super::io_buf_ext::{FullSlice, IoBufExt}; @@ -40,11 +42,19 @@ pub trait OwnedAsyncWriter { // since we would avoid copying majority of the data into the internal buffer. pub struct BufferedWriter { writer: Arc, - /// invariant: always remains Some(buf) except - /// - while IO is ongoing => goes back to Some() once the IO completed successfully - /// - after an IO error => stays `None` forever - /// - /// In these exceptional cases, it's `None`. + /// Clone of the buffer that was last submitted to the flush loop. + /// `None` if no flush request has been submitted, Some forever after. + pub(super) maybe_flushed: Option>, + /// New writes are accumulated here. + /// `None` only during submission while we wait for flush loop to accept + /// the full dirty buffer in exchange for a clean buffer. + /// If that exchange fails with an [`FlushTaskError`], the write path + /// bails and leaves this as `None`. + /// Subsequent writes will panic if attempted. + /// The read path continues to work without error because [`Self::maybe_flushed`] + /// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts, + /// so, they will never try to read from [`Self::mutable`] anyway, because it's past + /// the [`Self::maybe_flushed`] point. mutable: Option, /// A handle to the background flush task for writting data to disk. flush_handle: FlushHandle, @@ -65,16 +75,19 @@ where writer: Arc, buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, + cancel: CancellationToken, ctx: &RequestContext, flush_task_span: tracing::Span, ) -> Self { Self { writer: writer.clone(), mutable: Some(buf_new()), + maybe_flushed: None, flush_handle: FlushHandle::spawn_new( writer, buf_new(), gate_guard, + cancel, ctx.attached_child(), flush_task_span, ), @@ -92,25 +105,26 @@ where } /// Panics if used after any of the write paths returned an error - pub fn inspect_mutable(&self) -> &B { - self.mutable() + pub fn inspect_mutable(&self) -> Option<&B> { + self.mutable.as_ref() } /// Gets a reference to the maybe flushed read-only buffer. /// Returns `None` if the writer has not submitted any flush request. pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice> { - self.flush_handle.maybe_flushed.as_ref() + self.maybe_flushed.as_ref() } #[cfg_attr(target_os = "macos", allow(dead_code))] pub async fn flush_and_into_inner( mut self, ctx: &RequestContext, - ) -> std::io::Result<(u64, Arc)> { + ) -> Result<(u64, Arc), FlushTaskError> { self.flush(ctx).await?; let Self { mutable: buf, + maybe_flushed: _, writer, mut flush_handle, bytes_submitted: bytes_amount, @@ -120,12 +134,9 @@ where Ok((bytes_amount, writer)) } - /// Gets a reference to the mutable in-memory buffer. - #[inline(always)] - fn mutable(&self) -> &B { - self.mutable - .as_ref() - .expect("must not use after we returned an error") + #[cfg(test)] + pub(crate) fn mutable(&self) -> &B { + self.mutable.as_ref().expect("must not use after an error") } #[cfg_attr(target_os = "macos", allow(dead_code))] @@ -133,7 +144,7 @@ where &mut self, chunk: &[u8], ctx: &RequestContext, - ) -> std::io::Result { + ) -> Result { let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?; if let Some(control) = control { control.release().await; @@ -146,7 +157,7 @@ where &mut self, mut chunk: &[u8], ctx: &RequestContext, - ) -> std::io::Result<(usize, Option)> { + ) -> Result<(usize, Option), FlushTaskError> { let chunk_len = chunk.len(); let mut control: Option = None; while !chunk.is_empty() { @@ -167,17 +178,47 @@ where Ok((chunk_len, control)) } + /// This function can only error if the flush task got cancelled. + /// In that case, we leave [`Self::mutable`] intentionally as `None`. + /// + /// The read path continues to function correctly; it can read up to the + /// point where it could read before, i.e., including what was in [`Self::mutable`] + /// before the call to this function, because that's now stored in [`Self::maybe_flushed`]. + /// + /// The write path becomes unavailable and will panic if used. + /// The only correct solution to retry writes is to discard the entire [`BufferedWriter`], + /// which upper layers of pageserver write path currently do not support. + /// It is in fact quite hard to reason about what exactly happens in today's code. + /// Best case we accumulate junk in the EphemeralFile, worst case is data corruption. #[must_use = "caller must explcitly check the flush control"] - async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result> { + async fn flush( + &mut self, + _ctx: &RequestContext, + ) -> Result, FlushTaskError> { let buf = self.mutable.take().expect("must not use after an error"); let buf_len = buf.pending(); if buf_len == 0 { self.mutable = Some(buf); return Ok(None); } - let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?; + // Prepare the buffer for read while flushing. + let slice = buf.flush(); + // NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below. + self.maybe_flushed = Some(slice.cheap_clone()); + let offset = self.bytes_submitted; self.bytes_submitted += u64::try_from(buf_len).unwrap(); + + // If we return/panic here or later, we'll leave mutable = None, breaking further + // writers, but the read path should still work. + let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?; + + // The only other place that could hold a reference to the recycled buffer + // is in `Self::maybe_flushed`, but we have already replace it with the new buffer. + let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner()); + + // We got back some recycled buffer, can open up for more writes again. self.mutable = Some(recycled); + Ok(Some(flush_control)) } } @@ -290,10 +331,12 @@ mod tests { let ctx = &ctx; let recorder = Arc::new(RecorderWriter::default()); let gate = utils::sync::gate::Gate::default(); + let cancel = CancellationToken::new(); let mut writer = BufferedWriter::<_, RecorderWriter>::new( recorder, || IoBufferMut::with_capacity(2), gate.enter()?, + cancel, ctx, tracing::Span::none(), ); diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index e3cf9be438..c076ba0eca 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,7 +1,6 @@ use std::ops::ControlFlow; use std::sync::Arc; -use once_cell::sync::Lazy; use tokio_util::sync::CancellationToken; use tracing::{Instrument, info, info_span, warn}; use utils::sync::duplex; @@ -15,9 +14,6 @@ use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; /// A handle to the flush task. pub struct FlushHandle { inner: Option>, - /// Immutable buffer for serving tail reads. - /// `None` if no flush request has been submitted. - pub(super) maybe_flushed: Option>, } pub struct FlushHandleInner { @@ -25,7 +21,7 @@ pub struct FlushHandleInner { /// and receives recyled buffer. channel: duplex::mpsc::Duplex, FullSlice>, /// Join handle for the background flush task. - join_handle: tokio::task::JoinHandle>>, + join_handle: tokio::task::JoinHandle, FlushTaskError>>, } struct FlushRequest { @@ -117,27 +113,31 @@ where { /// Spawns a new background flush task and obtains a handle. /// - /// Note: The background task so we do not need to explicitly maintain a queue of buffers. + /// Handle and background task are connected through a duplex channel. + /// Dirty buffers are sent to the background task for flushing. + /// Clean buffers are sent back to the handle for reuse. + /// + /// The queue depth is 1, and the passed-in `buf` seeds the queue depth. + /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer. pub fn spawn_new( file: Arc, buf: B, gate_guard: utils::sync::gate::GateGuard, + cancel: CancellationToken, ctx: RequestContext, span: tracing::Span, ) -> Self where B: Buffer + Send + 'static, { - // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. let (front, back) = duplex::mpsc::channel(1); + back.try_send(buf.flush()) + .expect("we just created it with capacity 1"); let join_handle = tokio::spawn( - async move { - FlushBackgroundTask::new(back, file, gate_guard, ctx) - .run(buf.flush()) - .await - } - .instrument(span), + FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx) + .run() + .instrument(span), ); FlushHandle { @@ -145,7 +145,6 @@ where channel: front, join_handle, }), - maybe_flushed: None, } } @@ -153,15 +152,11 @@ where /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged. /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise /// clear `maybe_flushed`. - pub async fn flush(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)> - where - B: Buffer + Send + 'static, - { - let slice = buf.flush(); - - // Saves a buffer for read while flushing. This also removes reference to the old buffer. - self.maybe_flushed = Some(slice.cheap_clone()); - + pub async fn flush( + &mut self, + slice: FullSlice, + offset: u64, + ) -> Result<(FullSlice, FlushControl), FlushTaskError> { let (request, flush_control) = new_flush_op(slice, offset); // Submits the buffer to the background task. @@ -177,13 +172,10 @@ where return self.handle_error().await; }; - // The only other place that could hold a reference to the recycled buffer - // is in `Self::maybe_flushed`, but we have already replace it with the new buffer. - let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner()); Ok((recycled, flush_control)) } - async fn handle_error(&mut self) -> std::io::Result { + async fn handle_error(&mut self) -> Result { Err(self .shutdown() .await @@ -191,7 +183,7 @@ where } /// Cleans up the channel, join the flush task. - pub async fn shutdown(&mut self) -> std::io::Result> { + pub async fn shutdown(&mut self) -> Result, FlushTaskError> { let handle = self .inner .take() @@ -217,10 +209,17 @@ pub struct FlushBackgroundTask { /// A writter for persisting data to disk. writer: Arc, ctx: RequestContext, + cancel: CancellationToken, /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk. _gate_guard: utils::sync::gate::GateGuard, } +#[derive(Debug, thiserror::Error)] +pub enum FlushTaskError { + #[error("flush task cancelled")] + Cancelled, +} + impl FlushBackgroundTask where Buf: IoBufAligned + Send + Sync, @@ -231,25 +230,20 @@ where channel: duplex::mpsc::Duplex, FlushRequest>, file: Arc, gate_guard: utils::sync::gate::GateGuard, + cancel: CancellationToken, ctx: RequestContext, ) -> Self { FlushBackgroundTask { channel, writer: file, _gate_guard: gate_guard, + cancel, ctx, } } /// Runs the background flush task. - /// The passed in slice is immediately sent back to the flush handle through the duplex channel. - async fn run(mut self, slice: FullSlice) -> std::io::Result> { - // Sends the extra buffer back to the handle. - // TODO: can this ever await and or fail? I think not. - self.channel.send(slice).await.map_err(|_| { - std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") - })?; - + async fn run(mut self) -> Result, FlushTaskError> { // Exit condition: channel is closed and there is no remaining buffer to be flushed while let Some(request) = self.channel.recv().await { #[cfg(test)] @@ -267,15 +261,13 @@ where // (The upper layers of the Pageserver write path are not equipped to retry write errors // becasuse they often deallocate the buffers that were already written). // - // TODO: cancellation sensitiity. - // Without it, if we hit a bug where retrying is never successful, - // then we can't shut down the timeline/tenant/pageserver cleanly because - // layers of the Pageserver write path are holding the gate open for EphemeralFile. - // // TODO: use utils::backoff::retry once async closures are actually usable // let mut slice_storage = Some(request.slice); for attempt in 1.. { + if self.cancel.is_cancelled() { + return Err(FlushTaskError::Cancelled); + } let result = async { if attempt > 1 { info!("retrying flush"); @@ -283,6 +275,11 @@ where let slice = slice_storage.take().expect( "likely previous invocation of this future didn't get polled to completion", ); + // Don't cancel this write by doing tokio::select with self.cancel.cancelled(). + // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources. + // If we retry indefinitely, we'll deplete those resources. + // Future: teach tokio-epoll-uring io_uring operation cancellation, but still, + // wait for cancelled ops to complete and discard their error. let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; slice_storage = Some(slice); let res = res.maybe_fatal_err("owned_buffers_io flush"); @@ -290,8 +287,7 @@ where return ControlFlow::Break(()); }; warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); - static NO_CANCELLATION: Lazy = Lazy::new(CancellationToken::new); - utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await; + utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await; ControlFlow::Continue(()) } .instrument(info_span!("flush_attempt", %attempt))