diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index ded1735ddc..ab04521d16 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -27,6 +27,7 @@ mod handle_truncate; mod handle_write; use std::collections::HashMap; use std::path::Path; +use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -430,6 +431,12 @@ impl WorkerStarter { let running = Arc::new(AtomicBool::new(true)); let now = self.time_provider.current_time_millis(); let id_string = self.id.to_string(); + + let slow_threshold = std::env::var("slow_threshold") + .ok() + .and_then(|v| u64::from_str(&v).ok()) + .unwrap_or(1000); + let mut worker_thread = RegionWorkerLoop { id: self.id, config: self.config.clone(), @@ -471,6 +478,7 @@ impl WorkerStarter { schema_metadata_manager: self.schema_metadata_manager, metrics: WorkerMetrics::default(), stall_start: None, + slow_threshold: Duration::from_millis(slow_threshold), }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -726,6 +734,7 @@ struct RegionWorkerLoop { metrics: WorkerMetrics, /// Last stall start time. stall_start: Option, + slow_threshold: Duration, } impl RegionWorkerLoop { @@ -741,6 +750,7 @@ impl RegionWorkerLoop { // Buffer to retrieve requests from receiver. let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size); + let mut select_start = Instant::now(); while self.running.load(Ordering::Relaxed) { // Clear the buffer before handling next batch of requests. buffer.clear(); @@ -749,7 +759,6 @@ impl RegionWorkerLoop { let sleep = tokio::time::sleep(max_wait_time); tokio::pin!(sleep); - let select_start = Instant::now(); tokio::select! { request_opt = self.receiver.recv() => { match request_opt { @@ -816,7 +825,7 @@ impl RegionWorkerLoop { self.metrics.handle_periodical_task_cost = start.elapsed(); self.metrics.handle_cost = handle_start.elapsed(); - if self.metrics.handle_cost > Duration::from_secs(3) { + if self.metrics.handle_cost + self.metrics.select_cost > self.slow_threshold { info!( "Region worker too slow, id: {}, metrics: {:?}", self.id, self.metrics @@ -824,6 +833,7 @@ impl RegionWorkerLoop { } // Clear the metrics. self.metrics = WorkerMetrics::default(); + select_start = Instant::now(); } self.clean().await;