mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-22 15:41:15 +00:00
buffered writer: handle write errors by retrying all write IO errors indefinitely (#10993)
# Problem
If the Pageserver ingest path
(InMemoryLayer=>EphemeralFile=>BufferedWriter)
encounters ENOSPC or any other write IO error when flushing the mutable
buffer
of the BufferedWriter, the buffered writer is left in a state where
subsequent _reads_
from the InMemoryLayer it will cause a `must not use after we returned
an error` panic.
The reason is that
1. the flush background task bails on flush failure,
2. causing the `FlushHandle::flush` function to fail at channel.recv()
and
3. causing the `FlushHandle::flush` function to bail with the flush
error,
4. leaving its caller `BufferedWriter::flush` with
`BufferedWriter::mutable = None`,
5. once the InMemoryLayer's RwLock::write guard is dropped, subsequent
reads can enter,
6. those reads find `mutable = None` and cause the panic.
# Context
It has always been the contract that writes against the BufferedWriter
API
must not be retried because the writer/stream-style/append-only
interface makes no atomicity guarantees ("On error, did nothing or a
piece of the buffer get appended?").
The idea was that the error would bubble up to upper layers that can
throw away the buffered writer and create a new one. (See our [internal
error handling policy document on how to handle e.g.
`ENOSPC`](c870a50bc0/src/storage/handling_io_and_logical_errors.md (L36-L43))).
That _might_ be true for delta/image layer writers, I haven't checked.
But it's certainly not true for the ingest path: there are no provisions
to throw away an InMemoryLayer that encountered a write error an
reingest the WAL already written to it.
Adding such higher-level retries would involve either resetting
last_record_lsn to a lower value and restarting walreceiver. The code
isn't flexible enough to do that, and such complexity likely isn't worth
it given that write errors are rare.
# Solution
The solution in this PR is to retry _any_ failing write operation
_indefinitely_ inside the buffered writer flush task, except of course
those that are fatal as per `maybe_fatal_err`.
Retrying indefinitely ensures that `BufferedWriter::mutable` is never
left `None` in the case of IO errors, thereby solving the problem
described above.
It's a clear improvement over the status quo.
However, while we're retrying, we build up backpressure because the
`flush` is only double-buffered, not infinitely buffered.
Backpressure here is generally good to avoid resource exhaustion, **but
blocks reads** and hence stalls GetPage requests because InMemoryLayer
reads and writes are mutually exclusive.
That's orthogonal to the problem that is solved here, though.
## Caveats
Note that there are some remaining conditions in the flush background
task where it can bail with an error. I have annotated one of them with
a TODO comment.
Hence the `FlushHandle::flush` is still fallible and hence the overall
scenario of leaving `mutable = None` on the bail path is still possible.
We can clean that up in a later commit.
Note also that retrying indefinitely is great for temporary errors like
ENOSPC but likely undesirable in case the `std::io::Error` we get is
really due to higher-level logic bugs.
For example, we could fail to flush because the timeline or tenant
directory got deleted and VirtualFile's reopen fails with ENOENT.
Note finally that cancellation is not respected while we're retrying.
This means we will block timeline/tenant/pageserver shutdown.
The reason is that the existing cancellation story for the buffered
writer background task was to recv from flush op channel until the
sending side (FlushHandle) is explicitly shut down or dropped.
Failing to handle cancellation carries the operational risk that even if
a single timeline gets stuck because of a logic bug such as the one laid
out above, we must still restart the whole pageserver process.
# Alternatives Considered
As pointed out in the `Context` section, throwing away a InMemoryLayer
that encountered an error and reingesting the WAL is a lot of complexity
that IMO isn't justified for such an edge case.
Also, it's wasteful.
I think it's a local optimum.
A more general and simpler solution for ENOSPC is to `abort()` the
process and run eviction on startup before bringing up the rest of
pageserver.
I argued for it in the past, the pro arguments are still valid and
complete:
https://neondb.slack.com/archives/C033RQ5SPDH/p1716896265296329
The trouble at the time was implementing eviction on startup.
However, maybe things are simpler now that we are fully storcon-managed
and all tenants have secondaries.
For example, if pageserver `abort()`s on ENOSPC and then simply don't
respond to storcon heartbeats while we're running eviction on startup,
storcon will fail tenants over to the secondary anyway, giving us all
the time we need to clean up.
The downside is that if there's a systemic space management bug, above
proposal will just propagate the problem to other nodes. But I imagine
that because of the delays involved with filling up disks, the system
might reach a half-stable state, providing operators more time to react.
# Demo
Intermediary commit `a03f335121480afc0171b0f34606bdf929e962c5` is demoed
in this (internal) screen recording:
https://drive.google.com/file/d/1nBC6lFV2himQ8vRXDXrY30yfWmI2JL5J/view?usp=drive_link
# Perf Testing
Ran `bench_ingest` on tmpfs, no measurable difference.
Spans are uniquely owned by the flush task, and the span stack isn't too
deep, so, enter and exit should be cheap.
Plus, each flush takes ~150us with direct IO enabled, so, not _that_
high frequency event anyways.
# Refs
- fixes https://github.com/neondatabase/neon/issues/10856
This commit is contained in:
committed by
GitHub
parent
083a30b1e2
commit
158db414bf
@@ -9,7 +9,7 @@ use camino::Utf8PathBuf;
|
||||
use num_traits::Num;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use tokio_epoll_uring::{BoundedBuf, Slice};
|
||||
use tracing::error;
|
||||
use tracing::{error, info_span};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
|
||||
@@ -76,6 +76,7 @@ impl EphemeralFile {
|
||||
|| IoBufferMut::with_capacity(TAIL_SZ),
|
||||
gate.enter()?,
|
||||
ctx,
|
||||
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
|
||||
),
|
||||
_gate_guard: gate.enter()?,
|
||||
})
|
||||
|
||||
@@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions};
|
||||
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
|
||||
use tokio_util::io::StreamReader;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
use tracing::{info_span, warn};
|
||||
use utils::crashsafe::path_with_suffix_extension;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
use utils::{backoff, pausable_failpoint};
|
||||
@@ -229,6 +229,7 @@ async fn download_object(
|
||||
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
|
||||
gate.enter().map_err(|_| DownloadError::Cancelled)?,
|
||||
ctx,
|
||||
info_span!(parent: None, "download_object_buffered_writer", %dst_path),
|
||||
);
|
||||
|
||||
// TODO: use vectored write (writev) once supported by tokio-epoll-uring.
|
||||
|
||||
@@ -1299,9 +1299,8 @@ impl OwnedAsyncWriter for VirtualFile {
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> std::io::Result<FullSlice<Buf>> {
|
||||
let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await;
|
||||
res.map(|_| buf)
|
||||
) -> (FullSlice<Buf>, std::io::Result<()>) {
|
||||
VirtualFile::write_all_at(self, buf, offset, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ pub trait OwnedAsyncWriter {
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> impl std::future::Future<Output = std::io::Result<FullSlice<Buf>>> + Send;
|
||||
) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
|
||||
}
|
||||
|
||||
/// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
|
||||
@@ -66,6 +66,7 @@ where
|
||||
buf_new: impl Fn() -> B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: &RequestContext,
|
||||
flush_task_span: tracing::Span,
|
||||
) -> Self {
|
||||
Self {
|
||||
writer: writer.clone(),
|
||||
@@ -75,6 +76,7 @@ where
|
||||
buf_new(),
|
||||
gate_guard,
|
||||
ctx.attached_child(),
|
||||
flush_task_span,
|
||||
),
|
||||
bytes_submitted: 0,
|
||||
}
|
||||
@@ -269,12 +271,12 @@ mod tests {
|
||||
buf: FullSlice<Buf>,
|
||||
offset: u64,
|
||||
_: &RequestContext,
|
||||
) -> std::io::Result<FullSlice<Buf>> {
|
||||
) -> (FullSlice<Buf>, std::io::Result<()>) {
|
||||
self.writes
|
||||
.lock()
|
||||
.unwrap()
|
||||
.push((Vec::from(&buf[..]), offset));
|
||||
Ok(buf)
|
||||
(buf, Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,6 +295,7 @@ mod tests {
|
||||
|| IoBufferMut::with_capacity(2),
|
||||
gate.enter()?,
|
||||
ctx,
|
||||
tracing::Span::none(),
|
||||
);
|
||||
|
||||
writer.write_buffered_borrowed(b"abc", ctx).await?;
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
use std::ops::ControlFlow;
|
||||
use std::sync::Arc;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info, info_span, warn};
|
||||
use utils::sync::duplex;
|
||||
|
||||
use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
|
||||
use crate::context::RequestContext;
|
||||
use crate::virtual_file::MaybeFatalIo;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
|
||||
use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
|
||||
|
||||
@@ -118,6 +123,7 @@ where
|
||||
buf: B,
|
||||
gate_guard: utils::sync::gate::GateGuard,
|
||||
ctx: RequestContext,
|
||||
span: tracing::Span,
|
||||
) -> Self
|
||||
where
|
||||
B: Buffer<IoBuf = Buf> + Send + 'static,
|
||||
@@ -125,11 +131,14 @@ where
|
||||
// It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
|
||||
let (front, back) = duplex::mpsc::channel(1);
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
FlushBackgroundTask::new(back, file, gate_guard, ctx)
|
||||
.run(buf.flush())
|
||||
.await
|
||||
});
|
||||
let join_handle = tokio::spawn(
|
||||
async move {
|
||||
FlushBackgroundTask::new(back, file, gate_guard, ctx)
|
||||
.run(buf.flush())
|
||||
.await
|
||||
}
|
||||
.instrument(span),
|
||||
);
|
||||
|
||||
FlushHandle {
|
||||
inner: Some(FlushHandleInner {
|
||||
@@ -236,6 +245,7 @@ where
|
||||
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
|
||||
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
|
||||
// Sends the extra buffer back to the handle.
|
||||
// TODO: can this ever await and or fail? I think not.
|
||||
self.channel.send(slice).await.map_err(|_| {
|
||||
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
|
||||
})?;
|
||||
@@ -251,10 +261,47 @@ where
|
||||
}
|
||||
|
||||
// Write slice to disk at `offset`.
|
||||
let slice = self
|
||||
.writer
|
||||
.write_all_at(request.slice, request.offset, &self.ctx)
|
||||
.await?;
|
||||
//
|
||||
// Error handling happens according to the current policy of crashing
|
||||
// on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
|
||||
// (The upper layers of the Pageserver write path are not equipped to retry write errors
|
||||
// becasuse they often deallocate the buffers that were already written).
|
||||
//
|
||||
// TODO: cancellation sensitiity.
|
||||
// Without it, if we hit a bug where retrying is never successful,
|
||||
// then we can't shut down the timeline/tenant/pageserver cleanly because
|
||||
// layers of the Pageserver write path are holding the gate open for EphemeralFile.
|
||||
//
|
||||
// TODO: use utils::backoff::retry once async closures are actually usable
|
||||
//
|
||||
let mut slice_storage = Some(request.slice);
|
||||
for attempt in 1.. {
|
||||
let result = async {
|
||||
if attempt > 1 {
|
||||
info!("retrying flush");
|
||||
}
|
||||
let slice = slice_storage.take().expect(
|
||||
"likely previous invocation of this future didn't get polled to completion",
|
||||
);
|
||||
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
|
||||
slice_storage = Some(slice);
|
||||
let res = res.maybe_fatal_err("owned_buffers_io flush");
|
||||
let Err(err) = res else {
|
||||
return ControlFlow::Break(());
|
||||
};
|
||||
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
|
||||
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
|
||||
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
|
||||
ControlFlow::Continue(())
|
||||
}
|
||||
.instrument(info_span!("flush_attempt", %attempt))
|
||||
.await;
|
||||
match result {
|
||||
ControlFlow::Break(()) => break,
|
||||
ControlFlow::Continue(()) => continue,
|
||||
}
|
||||
}
|
||||
let slice = slice_storage.expect("loop must have run at least once");
|
||||
|
||||
#[cfg(test)]
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user