make noise when dropping an IoConcurrency with unfinished requests

This commit is contained in:
Christian Schwarz
2025-01-20 19:12:00 +01:00
parent 3b1328423e
commit 82d20b52b3

View File

@@ -107,42 +107,77 @@ pub(crate) struct VectoredValueReconstructState {
}
impl VectoredValueReconstructState {
/// # Cancel-Safety
///
/// Technically fine to stop polling this future, but, the IOs will still
/// be executed to completion by the sidecar task and hold on to / consume resources.
/// Better not do it to make reasonsing about the system easier.
pub(crate) async fn collect_pending_ios(
self,
) -> Result<ValueReconstructState, PageReconstructError> {
use utils::bin_ser::BeSer;
let mut to = ValueReconstructState::default();
let mut res = Ok(ValueReconstructState::default());
// We should try hard not to bail early, so that by the time we return from this
// function, all IO for this value is done. It's not required -- we could totally
// stop polling the IO futures in the sidecar task, they need to support that,
// but just stopping to poll doesn't reduce the IO load on the disk. It's easier
// to reason about the system if we just wait for all IO to complete, even if
// we're no longer interested in the result.
//
// Revisit this when IO futures are replaced with a more sophisticated IO system
// and an IO scheduler, where we know which IOs were submitted and which ones
// just queued. Cf the comment on IoConcurrency::spawn_io.
for (lsn, fut) in self.on_disk_values {
// TODO: IO futures are not failable - we could expect
let res = fut
.await
.map_err(|err| PageReconstructError::Other(err.into()))?;
let on_disk_value = res.map_err(|err| PageReconstructError::Other(err.into()))?;
match on_disk_value {
OnDiskValue::WalRecordOrImage(buf) => {
let value =
Value::des(&buf).map_err(|err| PageReconstructError::Other(err.into()))?;
match value {
Value::WalRecord(rec) => {
to.records.push((lsn, rec));
}
Value::Image(img) => {
assert!(to.img.is_none());
to.img = Some((lsn, img));
let value_recv_res = fut
// we rely on the caller to poll us to completion, so this is not a bail point
.await;
// Force not bailing early by wrapping the code into a closure.
#[allow(clippy::redundant_closure_call)]
let _: () = (|| {
match (&mut res, value_recv_res) {
(Err(_), _) => {
// We've already failed, no need to process more.
}
(Ok(_), Err(recv_error)) => {
// This shouldn't happen - likely the sidecar task panicked.
let recv_error: tokio::sync::oneshot::error::RecvError = recv_error;
res = Err(PageReconstructError::Other(recv_error.into()));
}
(Ok(_), Ok(Err(err))) => {
let err: std::io::Error = err;
// TODO: returning IO error here will fail a compute query.
// Probably not what we want, we're not doing `maybe_fatal_err`
// in the IO futures.
// But it's been like that for a long time, not changing it
// as part of concurrent IO.
// => https://github.com/neondatabase/neon/issues/10454
res = Err(PageReconstructError::Other(err.into()));
}
(Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
assert!(ok.img.is_none());
ok.img = Some((lsn, img));
}
(Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
match Value::des(&buf) {
Ok(Value::WalRecord(rec)) => {
ok.records.push((lsn, rec));
}
Ok(Value::Image(img)) => {
assert!(ok.img.is_none());
ok.img = Some((lsn, img));
}
Err(err) => {
res = Err(PageReconstructError::Other(err.into()));
}
}
}
}
OnDiskValue::RawImage(img) => {
assert!(to.img.is_none());
to.img = Some((lsn, img));
}
}
})();
}
Ok(to)
res
}
}
@@ -312,6 +347,24 @@ impl IoConcurrency {
mut futures,
} => {
trace!("shutting down");
// Make rate-limited noise in case the IoConcurrency gets dropped while
// we (the sidecar task) are still processing IOs.
// Refer to `collect_pending_ios` for why we shouldn't be doing that.
if !futures.is_empty(){
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use once_cell::sync::Lazy;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
tracing::warn!(
"IoConcurrency dropped while sidecar task was still processing IOs",
);
});
}
while let Some(()) = futures.next().await {
// drain
}
@@ -389,6 +442,8 @@ impl IoConcurrency {
/// and are ready to retire the Serial mode which runs the futures in place.
/// Right now, while brittle, the opaque IO approach allows us to ship the feature
/// with minimal changes to the code and minimal changes to existing behavior in Serial mode.
///
/// Also read the comment in `collect_pending_ios`.
pub(crate) async fn spawn_io<F>(&mut self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,