diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 0f21820d64..d3389ccefa 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -57,7 +57,8 @@ impl RegionWorkerLoop { } if self.write_buffer_manager.should_stall() && allow_stall { - self.stalled_count.add(write_requests.len() as i64); + self.stalled_count + .add((write_requests.len() + bulk_requests.len()) as i64); self.stalled_requests.append(write_requests, bulk_requests); self.listener.on_write_stall(); return; @@ -181,7 +182,7 @@ impl RegionWorkerLoop { pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Rejects stalled requests for region {}", region_id); let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); - self.stalled_count.sub(requests.len() as i64); + self.stalled_count.sub((requests.len() + bulk.len()) as i64); reject_write_requests(&mut requests, &mut bulk); } @@ -189,7 +190,7 @@ impl RegionWorkerLoop { pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Handles stalled requests for region {}", region_id); let (mut requests, mut bulk) = self.stalled_requests.remove(region_id); - self.stalled_count.sub(requests.len() as i64); + self.stalled_count.sub((requests.len() + bulk.len()) as i64); self.handle_write_requests(&mut requests, &mut bulk, true) .await; }