buffered writer: add cancellation sensitivity (#11052)

In
-
https://github.com/neondatabase/neon/pull/10993#issuecomment-2690428336

I added infinite retries for buffered writer flush IOs, primarily to
gracefully handle ENOSPC but more generally so that the buffered writer
is not left in a state where reads from the surrounding InMemoryLayer
cause panics.

However, I didn't add cancellation sensitivity, which is concerning
because then there is no way to detach a timeline/tenant that is
encountering the write IO errors.
That’s a legitimate scenario in the case of some edge case bug. 
See the #10993 description for details.


This PR
- first makes flush loop infallible, enabled by infinite retries
- then adds sensitivity to `Timeline::cancel` to the flush loop, thereby
making it fallible in one specific way again
- finally fixes the InMemoryLayer/EphemeralFile/BufferedWriter
amalgamate to remain read-available after flush loop is cancelled.

The support for read-availability after cancellation is necessary so
that reads from the InMemoryLayer that are already queued up behind the
RwLock that wraps the BufferedWriter won't panic because of the
`mutable=None` that we leave behind in case the flush loop gets
cancelled.

# Alternatives

One might think that we can only ship the change for read-availability
if flush encounters an error, without the infinite retrying and/or
cancellation sensitivity complexity.

The problem with that is that read-availability sounds good but is
really quite useless, because we cannot ingest new WAL without a
writable InMemoryLayer. Thus, very soon after we transition to read-only
mode, reads from compute are going to wait anyway, but on `wait_lsn`
instead of the RwLock, because ingest isn't progressing.

Thus, having the infinite flush retries still makes more sense because
they're just "slowness" to the user, whereas wait_lsn is hard errors.
This commit is contained in:
Christian Schwarz
2025-03-18 19:48:43 +01:00
committed by GitHub
parent 99639c26b4
commit 9fb77d6cdd
10 changed files with 200 additions and 99 deletions

View File

@@ -26,6 +26,10 @@ impl<S: Send, R: Send> Duplex<S, R> {
self.tx.send(x).await
}
pub fn try_send(&self, x: S) -> Result<(), mpsc::error::TrySendError<S>> {
self.tx.try_send(x)
}
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are

View File

@@ -13,6 +13,7 @@ use pageserver::{page_cache, virtual_file};
use pageserver_api::key::Key;
use pageserver_api::shard::TenantShardId;
use pageserver_api::value::Value;
use tokio_util::sync::CancellationToken;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use wal_decoder::serialized_batch::SerializedValueBatch;
@@ -61,8 +62,18 @@ async fn ingest(
RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error).with_scope_debug_tools();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let layer = InMemoryLayer::create(conf, timeline_id, tenant_shard_id, lsn, &gate, &ctx).await?;
let layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
lsn,
&gate,
&cancel,
&ctx,
)
.await?;
let data = Value::Image(Bytes::from(vec![0u8; put_size]));
let data_ser_size = data.serialized_size().unwrap() as usize;

View File

@@ -9,6 +9,7 @@ use camino::Utf8PathBuf;
use num_traits::Num;
use pageserver_api::shard::TenantShardId;
use tokio_epoll_uring::{BoundedBuf, Slice};
use tokio_util::sync::CancellationToken;
use tracing::{error, info_span};
use utils::id::TimelineId;
@@ -19,7 +20,7 @@ use crate::page_cache;
use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
use crate::virtual_file::owned_buffers_io::write::Buffer;
use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
pub struct EphemeralFile {
@@ -40,6 +41,7 @@ impl EphemeralFile {
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<EphemeralFile> {
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
@@ -75,6 +77,7 @@ impl EphemeralFile {
file,
|| IoBufferMut::with_capacity(TAIL_SZ),
gate.enter()?,
cancel.child_token(),
ctx,
info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
),
@@ -101,6 +104,14 @@ impl Drop for EphemeralFile {
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum EphemeralFileWriteError {
#[error("{0}")]
TooLong(String),
#[error("cancelled")]
Cancelled,
}
impl EphemeralFile {
pub(crate) fn len(&self) -> u64 {
self.bytes_written
@@ -133,7 +144,7 @@ impl EphemeralFile {
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<u64> {
) -> Result<u64, EphemeralFileWriteError> {
let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
if let Some(control) = control {
control.release().await;
@@ -145,24 +156,24 @@ impl EphemeralFile {
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> std::io::Result<(u64, Option<owned_buffers_io::write::FlushControl>)> {
) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
let pos = self.bytes_written;
let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
),
)
EphemeralFileWriteError::TooLong(format!(
"write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
srcbuf_len = srcbuf.len(),
))
})?;
// Write the payload
let (nwritten, control) = self
.buffered_writer
.write_buffered_borrowed_controlled(srcbuf, ctx)
.await?;
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
})?;
assert_eq!(
nwritten,
srcbuf.len(),
@@ -184,8 +195,14 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
let submitted_offset = self.buffered_writer.bytes_submitted();
let mutable = self.buffered_writer.inspect_mutable();
let mutable = &mutable[0..mutable.pending()];
let mutable = match self.buffered_writer.inspect_mutable() {
Some(mutable) => &mutable[0..mutable.pending()],
None => {
// Timeline::cancel and hence buffered writer flush was cancelled.
// Remain read-available while timeline is shutting down.
&[]
}
};
let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
@@ -216,7 +233,6 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
let (written_range, maybe_flushed_range) = {
if maybe_flushed.is_some() {
// [ written ][ maybe_flushed ][ mutable ]
// <- TAIL_SZ -><- TAIL_SZ ->
// ^
// `submitted_offset`
// <++++++ on disk +++++++????????????????>
@@ -232,7 +248,6 @@ impl super::storage_layer::inmemory_layer::vectored_dio_read::File for Ephemeral
)
} else {
// [ written ][ mutable ]
// <- TAIL_SZ ->
// ^
// `submitted_offset`
// <++++++ on disk +++++++++++++++++++++++>
@@ -366,8 +381,9 @@ mod tests {
harness("ephemeral_file_holds_gate_open").unwrap();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
@@ -397,12 +413,13 @@ mod tests {
let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let mutable = file.buffered_writer.inspect_mutable();
let mutable = file.buffered_writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
@@ -445,7 +462,7 @@ mod tests {
let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
let mutable_buffer_contents = file.buffered_writer.inspect_mutable();
let mutable_buffer_contents = file.buffered_writer.mutable();
assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
}
@@ -454,13 +471,13 @@ mod tests {
let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
let gate = utils::sync::gate::Gate::default();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
// mutable buffer and maybe_flushed buffer each has `cap` bytes.
let cap = file.buffered_writer.inspect_mutable().capacity();
let cap = file.buffered_writer.mutable().capacity();
let content: Vec<u8> = rand::thread_rng()
.sample_iter(rand::distributions::Standard)
@@ -470,10 +487,8 @@ mod tests {
file.write_raw(&content, &ctx).await.unwrap();
// assert the state is as this test expects it to be
assert_eq!(
&file.load_to_io_buf(&ctx).await.unwrap(),
&content[0..cap * 2 + cap / 2]
);
let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
let md = file.buffered_writer.as_inner().path().metadata().unwrap();
assert_eq!(
md.len(),
@@ -485,7 +500,7 @@ mod tests {
&content[cap..cap * 2]
);
assert_eq!(
&file.buffered_writer.inspect_mutable()[0..cap / 2],
&file.buffered_writer.mutable()[0..cap / 2],
&content[cap * 2..cap * 2 + cap / 2]
);
}
@@ -501,12 +516,13 @@ mod tests {
harness("test_read_split_across_file_and_buffer").unwrap();
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &ctx)
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
.await
.unwrap();
let mutable = file.buffered_writer.inspect_mutable();
let mutable = file.buffered_writer.mutable();
let cap = mutable.capacity();
let align = mutable.align();
let content: Vec<u8> = rand::thread_rng()

View File

@@ -1112,6 +1112,7 @@ mod tests {
};
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use tokio_util::sync::CancellationToken;
use utils::id::{TenantId, TimelineId};
use utils::shard::TenantShardId;
@@ -1193,6 +1194,7 @@ mod tests {
async fn ranged_search() {
let harness = TenantHarness::create("ranged_search").await.unwrap();
let (tenant, ctx) = harness.load().await;
let cancel = CancellationToken::new();
let timeline_id = TimelineId::generate();
// Create the timeline such that the in-memory layers can be written
// to the timeline directory.
@@ -1209,6 +1211,7 @@ mod tests {
harness.tenant_shard_id,
lsn_range.start,
&gate,
&cancel,
&ctx,
)
.await

View File

@@ -205,6 +205,7 @@ async fn download_object(
}
#[cfg(target_os = "linux")]
crate::virtual_file::io_engine::IoEngine::TokioEpollUring => {
use crate::virtual_file::owned_buffers_io::write::FlushTaskError;
use std::sync::Arc;
use crate::virtual_file::{IoBufferMut, owned_buffers_io};
@@ -228,6 +229,7 @@ async fn download_object(
destination_file,
|| IoBufferMut::with_capacity(super::BUFFER_SIZE),
gate.enter().map_err(|_| DownloadError::Cancelled)?,
cancel.child_token(),
ctx,
tracing::info_span!(parent: None, "download_object_buffered_writer", %dst_path),
);
@@ -240,11 +242,21 @@ async fn download_object(
{
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => return Err(e),
Err(e) => return Err(DownloadError::from(e)),
};
buffered.write_buffered_borrowed(&chunk, ctx).await?;
buffered
.write_buffered_borrowed(&chunk, ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
}
let inner = buffered.flush_and_into_inner(ctx).await?;
let inner = buffered
.flush_and_into_inner(ctx)
.await
.map_err(|e| match e {
FlushTaskError::Cancelled => DownloadError::Cancelled,
})?;
Ok(inner)
}
.await?;

View File

@@ -19,6 +19,7 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::InMemoryLayerInfo;
use pageserver_api::shard::TenantShardId;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TimelineId;
use utils::lsn::Lsn;
@@ -552,13 +553,15 @@ impl InMemoryLayer {
tenant_shard_id: TenantShardId,
start_lsn: Lsn,
gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<InMemoryLayer> {
trace!(
"initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}"
);
let file = EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, ctx).await?;
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate, cancel, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
Ok(InMemoryLayer {

View File

@@ -4186,6 +4186,7 @@ impl Timeline {
self.timeline_id,
self.tenant_shard_id,
&self.gate,
&self.cancel,
ctx,
)
.await?;
@@ -6742,6 +6743,8 @@ impl Timeline {
self.tenant_shard_id,
in_memory.lsn_range.start,
&self.gate,
// TODO: if we ever use this function in production code, we need to pass the real cancellation token
&CancellationToken::new(),
ctx,
)
.await

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use anyhow::{Context, bail, ensure};
use itertools::Itertools;
use pageserver_api::shard::TenantShardId;
use tokio_util::sync::CancellationToken;
use tracing::trace;
use utils::id::TimelineId;
use utils::lsn::{AtomicLsn, Lsn};
@@ -193,6 +194,7 @@ impl OpenLayerManager {
/// Open a new writable layer to append data if there is no open layer, otherwise return the
/// current open layer, called within `get_layer_for_write`.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn get_layer_for_write(
&mut self,
lsn: Lsn,
@@ -200,6 +202,7 @@ impl OpenLayerManager {
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
gate: &utils::sync::gate::Gate,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
ensure!(lsn.is_aligned());
@@ -227,9 +230,16 @@ impl OpenLayerManager {
timeline_id, start_lsn, lsn
);
let new_layer =
InMemoryLayer::create(conf, timeline_id, tenant_shard_id, start_lsn, gate, ctx)
.await?;
let new_layer = InMemoryLayer::create(
conf,
timeline_id,
tenant_shard_id,
start_lsn,
gate,
cancel,
ctx,
)
.await?;
let layer = Arc::new(new_layer);
self.layer_map.open_layer = Some(layer.clone());

View File

@@ -3,7 +3,9 @@ use std::sync::Arc;
pub(crate) use flush::FlushControl;
use flush::FlushHandle;
pub(crate) use flush::FlushTaskError;
use tokio_epoll_uring::IoBuf;
use tokio_util::sync::CancellationToken;
use super::io_buf_aligned::IoBufAligned;
use super::io_buf_ext::{FullSlice, IoBufExt};
@@ -40,11 +42,19 @@ pub trait OwnedAsyncWriter {
// since we would avoid copying majority of the data into the internal buffer.
pub struct BufferedWriter<B: Buffer, W> {
writer: Arc<W>,
/// invariant: always remains Some(buf) except
/// - while IO is ongoing => goes back to Some() once the IO completed successfully
/// - after an IO error => stays `None` forever
///
/// In these exceptional cases, it's `None`.
/// Clone of the buffer that was last submitted to the flush loop.
/// `None` if no flush request has been submitted, Some forever after.
pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
/// New writes are accumulated here.
/// `None` only during submission while we wait for flush loop to accept
/// the full dirty buffer in exchange for a clean buffer.
/// If that exchange fails with an [`FlushTaskError`], the write path
/// bails and leaves this as `None`.
/// Subsequent writes will panic if attempted.
/// The read path continues to work without error because [`Self::maybe_flushed`]
/// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts,
/// so, they will never try to read from [`Self::mutable`] anyway, because it's past
/// the [`Self::maybe_flushed`] point.
mutable: Option<B>,
/// A handle to the background flush task for writting data to disk.
flush_handle: FlushHandle<B::IoBuf, W>,
@@ -65,16 +75,19 @@ where
writer: Arc<W>,
buf_new: impl Fn() -> B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
ctx: &RequestContext,
flush_task_span: tracing::Span,
) -> Self {
Self {
writer: writer.clone(),
mutable: Some(buf_new()),
maybe_flushed: None,
flush_handle: FlushHandle::spawn_new(
writer,
buf_new(),
gate_guard,
cancel,
ctx.attached_child(),
flush_task_span,
),
@@ -92,25 +105,26 @@ where
}
/// Panics if used after any of the write paths returned an error
pub fn inspect_mutable(&self) -> &B {
self.mutable()
pub fn inspect_mutable(&self) -> Option<&B> {
self.mutable.as_ref()
}
/// Gets a reference to the maybe flushed read-only buffer.
/// Returns `None` if the writer has not submitted any flush request.
pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
self.flush_handle.maybe_flushed.as_ref()
self.maybe_flushed.as_ref()
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
pub async fn flush_and_into_inner(
mut self,
ctx: &RequestContext,
) -> std::io::Result<(u64, Arc<W>)> {
) -> Result<(u64, Arc<W>), FlushTaskError> {
self.flush(ctx).await?;
let Self {
mutable: buf,
maybe_flushed: _,
writer,
mut flush_handle,
bytes_submitted: bytes_amount,
@@ -120,12 +134,9 @@ where
Ok((bytes_amount, writer))
}
/// Gets a reference to the mutable in-memory buffer.
#[inline(always)]
fn mutable(&self) -> &B {
self.mutable
.as_ref()
.expect("must not use after we returned an error")
#[cfg(test)]
pub(crate) fn mutable(&self) -> &B {
self.mutable.as_ref().expect("must not use after an error")
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
@@ -133,7 +144,7 @@ where
&mut self,
chunk: &[u8],
ctx: &RequestContext,
) -> std::io::Result<usize> {
) -> Result<usize, FlushTaskError> {
let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
if let Some(control) = control {
control.release().await;
@@ -146,7 +157,7 @@ where
&mut self,
mut chunk: &[u8],
ctx: &RequestContext,
) -> std::io::Result<(usize, Option<FlushControl>)> {
) -> Result<(usize, Option<FlushControl>), FlushTaskError> {
let chunk_len = chunk.len();
let mut control: Option<FlushControl> = None;
while !chunk.is_empty() {
@@ -167,17 +178,47 @@ where
Ok((chunk_len, control))
}
/// This function can only error if the flush task got cancelled.
/// In that case, we leave [`Self::mutable`] intentionally as `None`.
///
/// The read path continues to function correctly; it can read up to the
/// point where it could read before, i.e., including what was in [`Self::mutable`]
/// before the call to this function, because that's now stored in [`Self::maybe_flushed`].
///
/// The write path becomes unavailable and will panic if used.
/// The only correct solution to retry writes is to discard the entire [`BufferedWriter`],
/// which upper layers of pageserver write path currently do not support.
/// It is in fact quite hard to reason about what exactly happens in today's code.
/// Best case we accumulate junk in the EphemeralFile, worst case is data corruption.
#[must_use = "caller must explcitly check the flush control"]
async fn flush(&mut self, _ctx: &RequestContext) -> std::io::Result<Option<FlushControl>> {
async fn flush(
&mut self,
_ctx: &RequestContext,
) -> Result<Option<FlushControl>, FlushTaskError> {
let buf = self.mutable.take().expect("must not use after an error");
let buf_len = buf.pending();
if buf_len == 0 {
self.mutable = Some(buf);
return Ok(None);
}
let (recycled, flush_control) = self.flush_handle.flush(buf, self.bytes_submitted).await?;
// Prepare the buffer for read while flushing.
let slice = buf.flush();
// NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below.
self.maybe_flushed = Some(slice.cheap_clone());
let offset = self.bytes_submitted;
self.bytes_submitted += u64::try_from(buf_len).unwrap();
// If we return/panic here or later, we'll leave mutable = None, breaking further
// writers, but the read path should still work.
let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?;
// The only other place that could hold a reference to the recycled buffer
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
// We got back some recycled buffer, can open up for more writes again.
self.mutable = Some(recycled);
Ok(Some(flush_control))
}
}
@@ -290,10 +331,12 @@ mod tests {
let ctx = &ctx;
let recorder = Arc::new(RecorderWriter::default());
let gate = utils::sync::gate::Gate::default();
let cancel = CancellationToken::new();
let mut writer = BufferedWriter::<_, RecorderWriter>::new(
recorder,
|| IoBufferMut::with_capacity(2),
gate.enter()?,
cancel,
ctx,
tracing::Span::none(),
);

View File

@@ -1,7 +1,6 @@
use std::ops::ControlFlow;
use std::sync::Arc;
use once_cell::sync::Lazy;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info, info_span, warn};
use utils::sync::duplex;
@@ -15,9 +14,6 @@ use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
/// A handle to the flush task.
pub struct FlushHandle<Buf, W> {
inner: Option<FlushHandleInner<Buf, W>>,
/// Immutable buffer for serving tail reads.
/// `None` if no flush request has been submitted.
pub(super) maybe_flushed: Option<FullSlice<Buf>>,
}
pub struct FlushHandleInner<Buf, W> {
@@ -25,7 +21,7 @@ pub struct FlushHandleInner<Buf, W> {
/// and receives recyled buffer.
channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
/// Join handle for the background flush task.
join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
}
struct FlushRequest<Buf> {
@@ -117,27 +113,31 @@ where
{
/// Spawns a new background flush task and obtains a handle.
///
/// Note: The background task so we do not need to explicitly maintain a queue of buffers.
/// Handle and background task are connected through a duplex channel.
/// Dirty buffers are sent to the background task for flushing.
/// Clean buffers are sent back to the handle for reuse.
///
/// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
/// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
pub fn spawn_new<B>(
file: Arc<W>,
buf: B,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
ctx: RequestContext,
span: tracing::Span,
) -> Self
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
// It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
let (front, back) = duplex::mpsc::channel(1);
back.try_send(buf.flush())
.expect("we just created it with capacity 1");
let join_handle = tokio::spawn(
async move {
FlushBackgroundTask::new(back, file, gate_guard, ctx)
.run(buf.flush())
.await
}
.instrument(span),
FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
.run()
.instrument(span),
);
FlushHandle {
@@ -145,7 +145,6 @@ where
channel: front,
join_handle,
}),
maybe_flushed: None,
}
}
@@ -153,15 +152,11 @@ where
/// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
/// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
/// clear `maybe_flushed`.
pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
where
B: Buffer<IoBuf = Buf> + Send + 'static,
{
let slice = buf.flush();
// Saves a buffer for read while flushing. This also removes reference to the old buffer.
self.maybe_flushed = Some(slice.cheap_clone());
pub async fn flush(
&mut self,
slice: FullSlice<Buf>,
offset: u64,
) -> Result<(FullSlice<Buf>, FlushControl), FlushTaskError> {
let (request, flush_control) = new_flush_op(slice, offset);
// Submits the buffer to the background task.
@@ -177,13 +172,10 @@ where
return self.handle_error().await;
};
// The only other place that could hold a reference to the recycled buffer
// is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
Ok((recycled, flush_control))
}
async fn handle_error<T>(&mut self) -> std::io::Result<T> {
async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
Err(self
.shutdown()
.await
@@ -191,7 +183,7 @@ where
}
/// Cleans up the channel, join the flush task.
pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
let handle = self
.inner
.take()
@@ -217,10 +209,17 @@ pub struct FlushBackgroundTask<Buf, W> {
/// A writter for persisting data to disk.
writer: Arc<W>,
ctx: RequestContext,
cancel: CancellationToken,
/// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
_gate_guard: utils::sync::gate::GateGuard,
}
#[derive(Debug, thiserror::Error)]
pub enum FlushTaskError {
#[error("flush task cancelled")]
Cancelled,
}
impl<Buf, W> FlushBackgroundTask<Buf, W>
where
Buf: IoBufAligned + Send + Sync,
@@ -231,25 +230,20 @@ where
channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
file: Arc<W>,
gate_guard: utils::sync::gate::GateGuard,
cancel: CancellationToken,
ctx: RequestContext,
) -> Self {
FlushBackgroundTask {
channel,
writer: file,
_gate_guard: gate_guard,
cancel,
ctx,
}
}
/// Runs the background flush task.
/// The passed in slice is immediately sent back to the flush handle through the duplex channel.
async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
// Sends the extra buffer back to the handle.
// TODO: can this ever await and or fail? I think not.
self.channel.send(slice).await.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
})?;
async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
// Exit condition: channel is closed and there is no remaining buffer to be flushed
while let Some(request) = self.channel.recv().await {
#[cfg(test)]
@@ -267,15 +261,13 @@ where
// (The upper layers of the Pageserver write path are not equipped to retry write errors
// becasuse they often deallocate the buffers that were already written).
//
// TODO: cancellation sensitiity.
// Without it, if we hit a bug where retrying is never successful,
// then we can't shut down the timeline/tenant/pageserver cleanly because
// layers of the Pageserver write path are holding the gate open for EphemeralFile.
//
// TODO: use utils::backoff::retry once async closures are actually usable
//
let mut slice_storage = Some(request.slice);
for attempt in 1.. {
if self.cancel.is_cancelled() {
return Err(FlushTaskError::Cancelled);
}
let result = async {
if attempt > 1 {
info!("retrying flush");
@@ -283,6 +275,11 @@ where
let slice = slice_storage.take().expect(
"likely previous invocation of this future didn't get polled to completion",
);
// Don't cancel this write by doing tokio::select with self.cancel.cancelled().
// The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
// If we retry indefinitely, we'll deplete those resources.
// Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
// wait for cancelled ops to complete and discard their error.
let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
slice_storage = Some(slice);
let res = res.maybe_fatal_err("owned_buffers_io flush");
@@ -290,8 +287,7 @@ where
return ControlFlow::Break(());
};
warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
ControlFlow::Continue(())
}
.instrument(info_span!("flush_attempt", %attempt))