diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 8669b8e5a9..b047a6db52 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -207,6 +207,8 @@ pub(crate) struct ValuesReconstructState { pub(crate) enum IoConcurrency { Sequential, SidecarTask { + task_id: usize, + num_active_ios: Arc, ios_tx: tokio::sync::mpsc::UnboundedSender, }, } @@ -267,9 +269,10 @@ impl IoConcurrency { let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel(); static TASK_ID: AtomicUsize = AtomicUsize::new(0); let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...) let span = - tracing::trace_span!(parent: None, "futures_unordered_io", task_id = task_id); - trace!(task_id, "spawning"); + tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id); + trace!(task_id, "spawning sidecar task"); tokio::spawn(async move { trace!("start"); scopeguard::defer!{ trace!("end") }; @@ -338,23 +341,6 @@ impl IoConcurrency { } => { trace!("shutting down"); - // Make rate-limited noise in case the IoConcurrency gets dropped while - // we (the sidecar task) are still processing IOs. - // Refer to `collect_pending_ios` for why we shouldn't be doing that. - if !futures.is_empty(){ - use utils::rate_limit::RateLimit; - use std::sync::Mutex; - use once_cell::sync::Lazy; - static LOGGED: Lazy> = - Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1)))); - let mut rate_limit = LOGGED.lock().unwrap(); - rate_limit.call(|| { - tracing::warn!( - "IoConcurrency dropped while sidecar task was still processing IOs", - ); - }); - } - while let Some(()) = futures.next().await { // drain } @@ -364,7 +350,11 @@ impl IoConcurrency { } drop(gate_guard); // drop it right before we exitlast }.instrument(span)); - IoConcurrency::SidecarTask { ios_tx } + IoConcurrency::SidecarTask { + task_id, + ios_tx, + num_active_ios: Arc::new(AtomicUsize::new(0)), + } } } } @@ -372,8 +362,14 @@ impl IoConcurrency { pub(crate) fn clone(&self) -> Self { match self { IoConcurrency::Sequential => IoConcurrency::Sequential, - IoConcurrency::SidecarTask { ios_tx } => IoConcurrency::SidecarTask { + IoConcurrency::SidecarTask { + task_id, + ios_tx, + num_active_ios, + } => IoConcurrency::SidecarTask { + task_id: *task_id, ios_tx: ios_tx.clone(), + num_active_ios: num_active_ios.clone(), }, } } @@ -433,8 +429,21 @@ impl IoConcurrency { { match self { IoConcurrency::Sequential => fut.await, - IoConcurrency::SidecarTask { ios_tx, .. } => { - let fut = Box::pin(fut); + IoConcurrency::SidecarTask { + ios_tx, + num_active_ios, + .. + } => { + let fut = Box::pin({ + let num_active_ios = Arc::clone(num_active_ios); + async move { + num_active_ios.fetch_add(1, std::sync::atomic::Ordering::Release); + scopeguard::defer! { + num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release); + } + fut.await + } + }); // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput // while insignificant for latency. // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try @@ -483,6 +492,46 @@ impl IoConcurrency { } } +// Make rate-limited noise in case the root IoConcurrency gets dropped while +// there are still IOs queue; we'll be leaking the sidecar task in that case. +// Sidecar tasks holds a gate, so, it's technically fine, but, hard to debug. +// +// Refer to `collect_pending_ios` for why we shouldn't be doing that. +impl Drop for IoConcurrency { + fn drop(&mut self) { + match self { + IoConcurrency::Sequential => (), + IoConcurrency::SidecarTask { + task_id, + ios_tx, + num_active_ios, + } => { + let _entered = + tracing::info_span!("IoConcurrency_drop", task_id = *task_id).entered(); + assert_eq!(ios_tx.weak_count(), 0, "we don't us downgrade()"); + if ios_tx.strong_count() == 1 { + trace!("dropping last IoConcurrency clone, this will trigger shutdown of sidecar task"); + let num_active_ios = num_active_ios.load(std::sync::atomic::Ordering::Acquire); + if num_active_ios > 0 { + use once_cell::sync::Lazy; + use std::sync::Mutex; + use utils::rate_limit::RateLimit; + static LOGGED: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1)))); + let mut rate_limit = LOGGED.lock().unwrap(); + rate_limit.call(|| { + tracing::warn!( + num_active_ios, + "IoConcurrency dropped while sidecar task was still processing IOs" + ); + }); + } + } + } + } + } +} + impl ValuesReconstructState { pub(crate) fn new(io_concurrency: IoConcurrency) -> Self { Self {