diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index f048a355a8..396d930f77 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -9,7 +9,7 @@ use camino::Utf8PathBuf; use num_traits::Num; use pageserver_api::shard::TenantShardId; use tokio_epoll_uring::{BoundedBuf, Slice}; -use tracing::error; +use tracing::{error, info_span}; use utils::id::TimelineId; use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64}; @@ -76,6 +76,7 @@ impl EphemeralFile { || IoBufferMut::with_capacity(TAIL_SZ), gate.enter()?, ctx, + info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename), ), _gate_guard: gate.enter()?, }) diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 92be2145ce..954ff0c1d6 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -18,7 +18,7 @@ use tokio::fs::{self, File, OpenOptions}; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio_util::io::StreamReader; use tokio_util::sync::CancellationToken; -use tracing::warn; +use tracing::{info_span, warn}; use utils::crashsafe::path_with_suffix_extension; use utils::id::{TenantId, TimelineId}; use utils::{backoff, pausable_failpoint}; @@ -229,6 +229,7 @@ async fn download_object( || IoBufferMut::with_capacity(super::BUFFER_SIZE), gate.enter().map_err(|_| DownloadError::Cancelled)?, ctx, + info_span!(parent: None, "download_object_buffered_writer", %dst_path), ); // TODO: use vectored write (writev) once supported by tokio-epoll-uring. diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 1da3130df0..cd3d897423 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -1299,9 +1299,8 @@ impl OwnedAsyncWriter for VirtualFile { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> std::io::Result> { - let (buf, res) = VirtualFile::write_all_at(self, buf, offset, ctx).await; - res.map(|_| buf) + ) -> (FullSlice, std::io::Result<()>) { + VirtualFile::write_all_at(self, buf, offset, ctx).await } } diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs index 861ca3aa2a..a7e06c0a14 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -31,7 +31,7 @@ pub trait OwnedAsyncWriter { buf: FullSlice, offset: u64, ctx: &RequestContext, - ) -> impl std::future::Future>> + Send; + ) -> impl std::future::Future, std::io::Result<()>)> + Send; } /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch @@ -66,6 +66,7 @@ where buf_new: impl Fn() -> B, gate_guard: utils::sync::gate::GateGuard, ctx: &RequestContext, + flush_task_span: tracing::Span, ) -> Self { Self { writer: writer.clone(), @@ -75,6 +76,7 @@ where buf_new(), gate_guard, ctx.attached_child(), + flush_task_span, ), bytes_submitted: 0, } @@ -269,12 +271,12 @@ mod tests { buf: FullSlice, offset: u64, _: &RequestContext, - ) -> std::io::Result> { + ) -> (FullSlice, std::io::Result<()>) { self.writes .lock() .unwrap() .push((Vec::from(&buf[..]), offset)); - Ok(buf) + (buf, Ok(())) } } @@ -293,6 +295,7 @@ mod tests { || IoBufferMut::with_capacity(2), gate.enter()?, ctx, + tracing::Span::none(), ); writer.write_buffered_borrowed(b"abc", ctx).await?; diff --git a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs index 46309d4011..e3cf9be438 100644 --- a/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs +++ b/pageserver/src/virtual_file/owned_buffers_io/write/flush.rs @@ -1,9 +1,14 @@ +use std::ops::ControlFlow; use std::sync::Arc; +use once_cell::sync::Lazy; +use tokio_util::sync::CancellationToken; +use tracing::{Instrument, info, info_span, warn}; use utils::sync::duplex; use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter}; use crate::context::RequestContext; +use crate::virtual_file::MaybeFatalIo; use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned; use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice; @@ -118,6 +123,7 @@ where buf: B, gate_guard: utils::sync::gate::GateGuard, ctx: RequestContext, + span: tracing::Span, ) -> Self where B: Buffer + Send + 'static, @@ -125,11 +131,14 @@ where // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time. let (front, back) = duplex::mpsc::channel(1); - let join_handle = tokio::spawn(async move { - FlushBackgroundTask::new(back, file, gate_guard, ctx) - .run(buf.flush()) - .await - }); + let join_handle = tokio::spawn( + async move { + FlushBackgroundTask::new(back, file, gate_guard, ctx) + .run(buf.flush()) + .await + } + .instrument(span), + ); FlushHandle { inner: Some(FlushHandleInner { @@ -236,6 +245,7 @@ where /// The passed in slice is immediately sent back to the flush handle through the duplex channel. async fn run(mut self, slice: FullSlice) -> std::io::Result> { // Sends the extra buffer back to the handle. + // TODO: can this ever await and or fail? I think not. self.channel.send(slice).await.map_err(|_| { std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early") })?; @@ -251,10 +261,47 @@ where } // Write slice to disk at `offset`. - let slice = self - .writer - .write_all_at(request.slice, request.offset, &self.ctx) - .await?; + // + // Error handling happens according to the current policy of crashing + // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable). + // (The upper layers of the Pageserver write path are not equipped to retry write errors + // becasuse they often deallocate the buffers that were already written). + // + // TODO: cancellation sensitiity. + // Without it, if we hit a bug where retrying is never successful, + // then we can't shut down the timeline/tenant/pageserver cleanly because + // layers of the Pageserver write path are holding the gate open for EphemeralFile. + // + // TODO: use utils::backoff::retry once async closures are actually usable + // + let mut slice_storage = Some(request.slice); + for attempt in 1.. { + let result = async { + if attempt > 1 { + info!("retrying flush"); + } + let slice = slice_storage.take().expect( + "likely previous invocation of this future didn't get polled to completion", + ); + let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await; + slice_storage = Some(slice); + let res = res.maybe_fatal_err("owned_buffers_io flush"); + let Err(err) = res else { + return ControlFlow::Break(()); + }; + warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff"); + static NO_CANCELLATION: Lazy = Lazy::new(CancellationToken::new); + utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await; + ControlFlow::Continue(()) + } + .instrument(info_span!("flush_attempt", %attempt)) + .await; + match result { + ControlFlow::Break(()) => break, + ControlFlow::Continue(()) => continue, + } + } + let slice = slice_storage.expect("loop must have run at least once"); #[cfg(test)] {