make noise from IoConcurrency::drop instead of the task, for more context

This commit is contained in:
Christian Schwarz
2025-01-21 14:51:54 +01:00
parent 4014c390e2
commit 4e72b22b41

View File

@@ -207,6 +207,8 @@ pub(crate) struct ValuesReconstructState {
pub(crate) enum IoConcurrency {
Sequential,
SidecarTask {
task_id: usize,
num_active_ios: Arc<AtomicUsize>,
ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
},
}
@@ -267,9 +269,10 @@ impl IoConcurrency {
let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
static TASK_ID: AtomicUsize = AtomicUsize::new(0);
let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
let span =
tracing::trace_span!(parent: None, "futures_unordered_io", task_id = task_id);
trace!(task_id, "spawning");
tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
trace!(task_id, "spawning sidecar task");
tokio::spawn(async move {
trace!("start");
scopeguard::defer!{ trace!("end") };
@@ -338,23 +341,6 @@ impl IoConcurrency {
} => {
trace!("shutting down");
// Make rate-limited noise in case the IoConcurrency gets dropped while
// we (the sidecar task) are still processing IOs.
// Refer to `collect_pending_ios` for why we shouldn't be doing that.
if !futures.is_empty(){
use utils::rate_limit::RateLimit;
use std::sync::Mutex;
use once_cell::sync::Lazy;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
tracing::warn!(
"IoConcurrency dropped while sidecar task was still processing IOs",
);
});
}
while let Some(()) = futures.next().await {
// drain
}
@@ -364,7 +350,11 @@ impl IoConcurrency {
}
drop(gate_guard); // drop it right before we exitlast
}.instrument(span));
IoConcurrency::SidecarTask { ios_tx }
IoConcurrency::SidecarTask {
task_id,
ios_tx,
num_active_ios: Arc::new(AtomicUsize::new(0)),
}
}
}
}
@@ -372,8 +362,14 @@ impl IoConcurrency {
pub(crate) fn clone(&self) -> Self {
match self {
IoConcurrency::Sequential => IoConcurrency::Sequential,
IoConcurrency::SidecarTask { ios_tx } => IoConcurrency::SidecarTask {
IoConcurrency::SidecarTask {
task_id,
ios_tx,
num_active_ios,
} => IoConcurrency::SidecarTask {
task_id: *task_id,
ios_tx: ios_tx.clone(),
num_active_ios: num_active_ios.clone(),
},
}
}
@@ -433,8 +429,21 @@ impl IoConcurrency {
{
match self {
IoConcurrency::Sequential => fut.await,
IoConcurrency::SidecarTask { ios_tx, .. } => {
let fut = Box::pin(fut);
IoConcurrency::SidecarTask {
ios_tx,
num_active_ios,
..
} => {
let fut = Box::pin({
let num_active_ios = Arc::clone(num_active_ios);
async move {
num_active_ios.fetch_add(1, std::sync::atomic::Ordering::Release);
scopeguard::defer! {
num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
}
fut.await
}
});
// NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
// while insignificant for latency.
// It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
@@ -483,6 +492,46 @@ impl IoConcurrency {
}
}
// Make rate-limited noise in case the root IoConcurrency gets dropped while
// there are still IOs queue; we'll be leaking the sidecar task in that case.
// Sidecar tasks holds a gate, so, it's technically fine, but, hard to debug.
//
// Refer to `collect_pending_ios` for why we shouldn't be doing that.
impl Drop for IoConcurrency {
fn drop(&mut self) {
match self {
IoConcurrency::Sequential => (),
IoConcurrency::SidecarTask {
task_id,
ios_tx,
num_active_ios,
} => {
let _entered =
tracing::info_span!("IoConcurrency_drop", task_id = *task_id).entered();
assert_eq!(ios_tx.weak_count(), 0, "we don't us downgrade()");
if ios_tx.strong_count() == 1 {
trace!("dropping last IoConcurrency clone, this will trigger shutdown of sidecar task");
let num_active_ios = num_active_ios.load(std::sync::atomic::Ordering::Acquire);
if num_active_ios > 0 {
use once_cell::sync::Lazy;
use std::sync::Mutex;
use utils::rate_limit::RateLimit;
static LOGGED: Lazy<Mutex<RateLimit>> =
Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1))));
let mut rate_limit = LOGGED.lock().unwrap();
rate_limit.call(|| {
tracing::warn!(
num_active_ios,
"IoConcurrency dropped while sidecar task was still processing IOs"
);
});
}
}
}
}
}
}
impl ValuesReconstructState {
pub(crate) fn new(io_concurrency: IoConcurrency) -> Self {
Self {