From 5a0da5b6bb1add61f8319bb5207da67e70df8efc Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 21 May 2025 21:21:50 +0800 Subject: [PATCH] fix: region worker stall metrics (#6149) fix/stall-metrics: Improve stalled request handling in `handle_write.rs` - Updated logic to account for both `write_requests` and `bulk_requests` when adjusting `stalled_count`. - Modified `reject_region_stalled_requests` and `handle_region_stalled_requests` to correctly subtract the combined length of `requests` and `bulk` from `stalled_count`. --- src/mito2/src/worker/handle_write.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 0f21820d64..d3389ccefa 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -57,7 +57,8 @@ impl RegionWorkerLoop { } if self.write_buffer_manager.should_stall() && allow_stall { - self.stalled_count.add(write_requests.len() as i64); + self.stalled_count + .add((write_requests.len() + bulk_requests.len()) as i64); self.stalled_requests.append(write_requests, bulk_requests); self.listener.on_write_stall(); return; @@ -181,7 +182,7 @@ impl RegionWorkerLoop { pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Rejects stalled requests for region {}", region_id); let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); - self.stalled_count.sub(requests.len() as i64); + self.stalled_count.sub((requests.len() + bulk.len()) as i64); reject_write_requests(&mut requests, &mut bulk); } @@ -189,7 +190,7 @@ impl RegionWorkerLoop { pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Handles stalled requests for region {}", region_id); let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); - self.stalled_count.sub(requests.len() as i64); + self.stalled_count.sub((requests.len() + bulk.len()) as i64); self.handle_write_requests(&mut requests, &mut bulk, true) .await; }