diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 208c4c6479..c03fa5231f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -583,6 +583,8 @@ type RequestBuffer = Vec; #[derive(Default)] pub(crate) struct StalledRequests { /// Stalled requests. + /// Remember to use `StalledRequests::stalled_count()` to get the total number of stalled requests + /// instead of `StalledRequests::requests.len()`. /// /// Key: RegionId /// Value: (estimated size, stalled requests) @@ -617,6 +619,11 @@ impl StalledRequests { vec![] } } + + /// Returns the total number of all stalled requests. + pub(crate) fn stalled_count(&self) -> usize { + self.requests.values().map(|reqs| reqs.1.len()).sum() + } } /// Background worker loop to handle requests. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index efc81df57b..b3e1aa949a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -147,7 +147,7 @@ impl RegionWorkerLoop { pub(crate) async fn handle_stalled_requests(&mut self) { // Handle stalled requests. let stalled = std::mem::take(&mut self.stalled_requests); - self.stalled_count.sub(stalled.requests.len() as i64); + self.stalled_count.sub(stalled.stalled_count() as i64); // We already stalled these requests, don't stall them again. for (_, (_, mut requests)) in stalled.requests { self.handle_write_requests(&mut requests, false).await; @@ -157,7 +157,7 @@ impl RegionWorkerLoop { /// Rejects all stalled requests. pub(crate) fn reject_stalled_requests(&mut self) { let stalled = std::mem::take(&mut self.stalled_requests); - self.stalled_count.sub(stalled.requests.len() as i64); + self.stalled_count.sub(stalled.stalled_count() as i64); for (_, (_, mut requests)) in stalled.requests { reject_write_requests(&mut requests); }