From 158db414bf881fb358494e3215d192c8fa420a53 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 11 Mar 2025 21:40:23 +0100 Subject: [PATCH] buffered writer: handle write errors by retrying all write IO errors indefinitely (#10993) # Problem If the Pageserver ingest path (InMemoryLayer=>EphemeralFile=>BufferedWriter) encounters ENOSPC or any other write IO error when flushing the mutable buffer of the BufferedWriter, the buffered writer is left in a state where subsequent _reads_ from the InMemoryLayer it will cause a `must not use after we returned an error` panic. The reason is that 1. the flush background task bails on flush failure, 2. causing the `FlushHandle::flush` function to fail at channel.recv() and 3. causing the `FlushHandle::flush` function to bail with the flush error, 4. leaving its caller `BufferedWriter::flush` with `BufferedWriter::mutable = None`, 5. once the InMemoryLayer's RwLock::write guard is dropped, subsequent reads can enter, 6. those reads find `mutable = None` and cause the panic. # Context It has always been the contract that writes against the BufferedWriter API must not be retried because the writer/stream-style/append-only interface makes no atomicity guarantees ("On error, did nothing or a piece of the buffer get appended?"). The idea was that the error would bubble up to upper layers that can throw away the buffered writer and create a new one. (See our [internal error handling policy document on how to handle e.g. `ENOSPC`](https://github.com/neondatabase/docs/blob/c870a50bc099d82444947a353fb302c761949c94/src/storage/handling_io_and_logical_errors.md#L36-L43)). That _might_ be true for delta/image layer writers, I haven't checked. But it's certainly not true for the ingest path: there are no provisions to throw away an InMemoryLayer that encountered a write error an reingest the WAL already written to it. Adding such higher-level retries would involve either resetting last_record_lsn to a lower value and restarting walreceiver. The code isn't flexible enough to do that, and such complexity likely isn't worth it given that write errors are rare. # Solution The solution in this PR is to retry _any_ failing write operation _indefinitely_ inside the buffered writer flush task, except of course those that are fatal as per `maybe_fatal_err`. Retrying indefinitely ensures that `BufferedWriter::mutable` is never left `None` in the case of IO errors, thereby solving the problem described above. It's a clear improvement over the status quo. However, while we're retrying, we build up backpressure because the `flush` is only double-buffered, not infinitely buffered. Backpressure here is generally good to avoid resource exhaustion, **but blocks reads** and hence stalls GetPage requests because InMemoryLayer reads and writes are mutually exclusive. That's orthogonal to the problem that is solved here, though. ## Caveats Note that there are some remaining conditions in the flush background task where it can bail with an error. I have annotated one of them with a TODO comment. Hence the `FlushHandle::flush` is still fallible and hence the overall scenario of leaving `mutable = None` on the bail path is still possible. We can clean that up in a later commit. Note also that retrying indefinitely is great for temporary errors like ENOSPC but likely undesirable in case the `std::io::Error` we get is really due to higher-level logic bugs. For example, we could fail to flush because the timeline or tenant directory got deleted and VirtualFile's reopen fails with ENOENT. Note finally that cancellation is not respected while we're retrying. This means we will block timeline/tenant/pageserver shutdown. The reason is that the existing cancellation story for the buffered writer background task was to recv from flush op channel until the sending side (FlushHandle) is explicitly shut down or dropped. Failing to handle cancellation carries the operational risk that even if a single timeline gets stuck because of a logic bug such as the one laid out above, we must still restart the whole pageserver process. # Alternatives Considered As pointed out in the `Context` section, throwing away a InMemoryLayer that encountered an error and reingesting the WAL is a lot of complexity that IMO isn't justified for such an edge case. Also, it's wasteful. I think it's a local optimum. A more general and simpler solution for ENOSPC is to `abort()` the process and run eviction on startup before bringing up the rest of pageserver. I argued for it in the past, the pro arguments are still valid and complete: https://neondb.slack.com/archives/C033RQ5SPDH/p1716896265296329 The trouble at the time was implementing eviction on startup. However, maybe things are simpler now that we are fully storcon-managed and all tenants have secondaries. For example, if pageserver `abort()`s on ENOSPC and then simply don't respond to storcon heartbeats while we're running eviction on startup, storcon will fail tenants over to the secondary anyway, giving us all the time we need to clean up. The downside is that if there's a systemic space management bug, above proposal will just propagate the problem to other nodes. But I imagine that because of the delays involved with filling up disks, the system might reach a half-stable state, providing operators more time to react. # Demo Intermediary commit `a03f335121480afc0171b0f34606bdf929e962c5` is demoed in this (internal) screen recording: https://drive.google.com/file/d/1nBC6lFV2himQ8vRXDXrY30yfWmI2JL5J/view?usp=drive_link # Perf Testing Ran `bench_ingest` on tmpfs, no measurable difference. Spans are uniquely owned by the flush task, and the span stack isn't too deep, so, enter and exit should be cheap. Plus, each flush takes ~150us with direct IO enabled, so, not _that_ high frequency event anyways. # Refs - fixes https://github.com/neondatabase/neon/issues/10856 --- pageserver/src/tenant/ephemeral_file.rs | 3 +- .../tenant/remote_timeline_client/download.rs | 3 +- pageserver/src/virtual_file.rs | 5 +- .../virtual_file/owned_buffers_io/write.rs | 9 ++- .../owned_buffers_io/write/flush.rs | 65 ++++++++++++++++--- 5 files changed, 68 insertions(+), 17 deletions(-) diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index f048a355a8..396d930f77 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,7 @@ use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, Slice}; -use tracing::error; +use tracing::{error, info_span}; use utils::id::TimelineId; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; @@ -76,6 +76,7 @@ impl EphemeralFile { || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, ctx, + info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), ), _gate_guard: gate.enter()?, }) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 92be2145ce..954ff0c1d6 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; -use tracing::warn; +use tracing::{info_span, warn}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use utils::{backoff, pausable_failpoint}; @@ -229,6 +229,7 @@ async fn download_object( || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, ctx, + info_span!(parent: None, "download_object_buffered_writer", %dst_path), ); // TODO: use vectored write (writev) once supported by tokio-epoll-uring. diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1da3130df0..cd3d897423 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -1299,9 +1299,8 @@ impl OwnedAsyncWriter for VirtualFile { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> std::io::Result> { - let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await; - res.map(|_| buf) + ) -> (FullSlice, std::io::Result<()>) { + VirtualFile::write_all_at(self, buf, offset, ctx).await } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 861ca3aa2a..a7e06c0a14 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -31,7 +31,7 @@ pub trait OwnedAsyncWriter { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> impl std::future::Future>> + Send; + ) -> impl std::future::Future, std::io::Result<()>)> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -66,6 +66,7 @@ where buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, ctx: &RequestContext, + flush_task_span: tracing::Span, ) -> Self { Self { writer: writer.clone(), @@ -75,6 +76,7 @@ where buf_new(), gate_guard, ctx.attached_child(), + flush_task_span, ), bytes_submitted: 0, } @@ -269,12 +271,12 @@ mod tests { buf: FullSlice, offset: u64, _: &RequestContext, - ) -> std::io::Result> { + ) -> (FullSlice, std::io::Result<()>) { self.writes .lock() .unwrap() .push((Vec::from(&buf[..]), offset)); - Ok(buf) + (buf, Ok(())) } } @@ -293,6 +295,7 @@ mod tests { || IoBufferMut::with_capacity(2), gate.enter()?, ctx, + tracing::Span::none(), ); writer.write_buffered_borrowed(b"abc", ctx).await?; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 46309d4011..e3cf9be438 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,9 +1,14 @@ +use std::ops::ControlFlow; use std::sync::Arc; +use once_cell::sync::Lazy; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, info, info_span, warn}; use utils::sync::duplex; use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; use crate::context::RequestContext; +use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned; use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; @@ -118,6 +123,7 @@ where buf: B, gate_guard: utils::sync::gate::GateGuard, ctx: RequestContext, + span: tracing::Span, ) -> Self where B: Buffer + Send + 'static, @@ -125,11 +131,14 @@ where // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. let (front, back) = duplex::mpsc::channel(1); - let join_handle = tokio::spawn(async move { - FlushBackgroundTask::new(back, file, gate_guard, ctx) - .run(buf.flush()) - .await - }); + let join_handle = tokio::spawn( + async move { + FlushBackgroundTask::new(back, file, gate_guard, ctx) + .run(buf.flush()) + .await + } + .instrument(span), + ); FlushHandle { inner: Some(FlushHandleInner { @@ -236,6 +245,7 @@ where /// The passed in slice is immediately sent back to the flush handle through the duplex channel. async fn run(mut self, slice: FullSlice) -> std::io::Result> { // Sends the extra buffer back to the handle. + // TODO: can this ever await and or fail? I think not. self.channel.send(slice).await.map_err(|_| { std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") })?; @@ -251,10 +261,47 @@ where } // Write slice to disk at `offset`. - let slice = self - .writer - .write_all_at(request.slice, request.offset, &self.ctx) - .await?; + // + // Error handling happens according to the current policy of crashing + // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable). + // (The upper layers of the Pageserver write path are not equipped to retry write errors + // becasuse they often deallocate the buffers that were already written). + // + // TODO: cancellation sensitiity. + // Without it, if we hit a bug where retrying is never successful, + // then we can't shut down the timeline/tenant/pageserver cleanly because + // layers of the Pageserver write path are holding the gate open for EphemeralFile. + // + // TODO: use utils::backoff::retry once async closures are actually usable + // + let mut slice_storage = Some(request.slice); + for attempt in 1.. { + let result = async { + if attempt > 1 { + info!("retrying flush"); + } + let slice = slice_storage.take().expect( + "likely previous invocation of this future didn't get polled to completion", + ); + let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; + slice_storage = Some(slice); + let res = res.maybe_fatal_err("owned_buffers_io flush"); + let Err(err) = res else { + return ControlFlow::Break(()); + }; + warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); + static NO_CANCELLATION: Lazy = Lazy::new(CancellationToken::new); + utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await; + ControlFlow::Continue(()) + } + .instrument(info_span!("flush_attempt", %attempt)) + .await; + match result { + ControlFlow::Break(()) => break, + ControlFlow::Continue(()) => continue, + } + } + let slice = slice_storage.expect("loop must have run at least once"); #[cfg(test)] {