From 82d20b52b373ea6d2d1fa53302a881043df7dbdc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 20 Jan 2025 19:12:00 +0100 Subject: [PATCH] make noise when dropping an IoConcurrency with unfinished requests --- pageserver/src/tenant/storage_layer.rs | 103 +++++++++++++++++++------ 1 file changed, 79 insertions(+), 24 deletions(-) diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 390cff7347..810dfddbb5 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -107,42 +107,77 @@ pub(crate) struct VectoredValueReconstructState { } impl VectoredValueReconstructState { + /// # Cancel-Safety + /// + /// Technically fine to stop polling this future, but, the IOs will still + /// be executed to completion by the sidecar task and hold on to / consume resources. + /// Better not do it to make reasonsing about the system easier. pub(crate) async fn collect_pending_ios( self, ) -> Result { use utils::bin_ser::BeSer; - let mut to = ValueReconstructState::default(); + let mut res = Ok(ValueReconstructState::default()); + // We should try hard not to bail early, so that by the time we return from this + // function, all IO for this value is done. It's not required -- we could totally + // stop polling the IO futures in the sidecar task, they need to support that, + // but just stopping to poll doesn't reduce the IO load on the disk. It's easier + // to reason about the system if we just wait for all IO to complete, even if + // we're no longer interested in the result. + // + // Revisit this when IO futures are replaced with a more sophisticated IO system + // and an IO scheduler, where we know which IOs were submitted and which ones + // just queued. Cf the comment on IoConcurrency::spawn_io. for (lsn, fut) in self.on_disk_values { - // TODO: IO futures are not failable - we could expect - let res = fut - .await - .map_err(|err| PageReconstructError::Other(err.into()))?; - let on_disk_value = res.map_err(|err| PageReconstructError::Other(err.into()))?; - - match on_disk_value { - OnDiskValue::WalRecordOrImage(buf) => { - let value = - Value::des(&buf).map_err(|err| PageReconstructError::Other(err.into()))?; - match value { - Value::WalRecord(rec) => { - to.records.push((lsn, rec)); - } - Value::Image(img) => { - assert!(to.img.is_none()); - to.img = Some((lsn, img)); + let value_recv_res = fut + // we rely on the caller to poll us to completion, so this is not a bail point + .await; + // Force not bailing early by wrapping the code into a closure. + #[allow(clippy::redundant_closure_call)] + let _: () = (|| { + match (&mut res, value_recv_res) { + (Err(_), _) => { + // We've already failed, no need to process more. + } + (Ok(_), Err(recv_error)) => { + // This shouldn't happen - likely the sidecar task panicked. + let recv_error: tokio::sync::oneshot::error::RecvError = recv_error; + res = Err(PageReconstructError::Other(recv_error.into())); + } + (Ok(_), Ok(Err(err))) => { + let err: std::io::Error = err; + // TODO: returning IO error here will fail a compute query. + // Probably not what we want, we're not doing `maybe_fatal_err` + // in the IO futures. + // But it's been like that for a long time, not changing it + // as part of concurrent IO. + // => https://github.com/neondatabase/neon/issues/10454 + res = Err(PageReconstructError::Other(err.into())); + } + (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => { + assert!(ok.img.is_none()); + ok.img = Some((lsn, img)); + } + (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => { + match Value::des(&buf) { + Ok(Value::WalRecord(rec)) => { + ok.records.push((lsn, rec)); + } + Ok(Value::Image(img)) => { + assert!(ok.img.is_none()); + ok.img = Some((lsn, img)); + } + Err(err) => { + res = Err(PageReconstructError::Other(err.into())); + } } } } - OnDiskValue::RawImage(img) => { - assert!(to.img.is_none()); - to.img = Some((lsn, img)); - } - } + })(); } - Ok(to) + res } } @@ -312,6 +347,24 @@ impl IoConcurrency { mut futures, } => { trace!("shutting down"); + + // Make rate-limited noise in case the IoConcurrency gets dropped while + // we (the sidecar task) are still processing IOs. + // Refer to `collect_pending_ios` for why we shouldn't be doing that. + if !futures.is_empty(){ + use utils::rate_limit::RateLimit; + use std::sync::Mutex; + use once_cell::sync::Lazy; + static LOGGED: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1)))); + let mut rate_limit = LOGGED.lock().unwrap(); + rate_limit.call(|| { + tracing::warn!( + "IoConcurrency dropped while sidecar task was still processing IOs", + ); + }); + } + while let Some(()) = futures.next().await { // drain } @@ -389,6 +442,8 @@ impl IoConcurrency { /// and are ready to retire the Serial mode which runs the futures in place. /// Right now, while brittle, the opaque IO approach allows us to ship the feature /// with minimal changes to the code and minimal changes to existing behavior in Serial mode. + /// + /// Also read the comment in `collect_pending_ios`. pub(crate) async fn spawn_io(&mut self, fut: F) where F: std::future::Future + Send + 'static,