From 7139ba08c89c4de93c4da451c7ccde68aaeade8e Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Fri, 13 Dec 2024 10:52:18 +0800 Subject: [PATCH] chore/bench-metrics: Add configurable slow threshold for region worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Introduced slow_threshold environment variable to set a custom threshold for slow operations, defaulting to 1000 milliseconds. • Updated RegionWorkerLoop to use slow_threshold for performance monitoring. • Adjusted logic to include select_cost in the slow operation check. --- src/mito2/src/worker.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index ded1735ddc..ab04521d16 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -27,6 +27,7 @@ mod handle_truncate; mod handle_write; use std::collections::HashMap; use std::path::Path; +use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -430,6 +431,12 @@ impl WorkerStarter { let running = Arc::new(AtomicBool::new(true)); let now = self.time_provider.current_time_millis(); let id_string = self.id.to_string(); + + let slow_threshold = std::env::var("slow_threshold") + .ok() + .and_then(|v| u64::from_str(&v).ok()) + .unwrap_or(1000); + let mut worker_thread = RegionWorkerLoop { id: self.id, config: self.config.clone(), @@ -471,6 +478,7 @@ impl WorkerStarter { schema_metadata_manager: self.schema_metadata_manager, metrics: WorkerMetrics::default(), stall_start: None, + slow_threshold: Duration::from_millis(slow_threshold), }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -726,6 +734,7 @@ struct RegionWorkerLoop { metrics: WorkerMetrics, /// Last stall start time. stall_start: Option, + slow_threshold: Duration, } impl RegionWorkerLoop { @@ -741,6 +750,7 @@ impl RegionWorkerLoop { // Buffer to retrieve requests from receiver. let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size); + let mut select_start = Instant::now(); while self.running.load(Ordering::Relaxed) { // Clear the buffer before handling next batch of requests. buffer.clear(); @@ -749,7 +759,6 @@ impl RegionWorkerLoop { let sleep = tokio::time::sleep(max_wait_time); tokio::pin!(sleep); - let select_start = Instant::now(); tokio::select! { request_opt = self.receiver.recv() => { match request_opt { @@ -816,7 +825,7 @@ impl RegionWorkerLoop { self.metrics.handle_periodical_task_cost = start.elapsed(); self.metrics.handle_cost = handle_start.elapsed(); - if self.metrics.handle_cost > Duration::from_secs(3) { + if self.metrics.handle_cost + self.metrics.select_cost > self.slow_threshold { info!( "Region worker too slow, id: {}, metrics: {:?}", self.id, self.metrics @@ -824,6 +833,7 @@ impl RegionWorkerLoop { } // Clear the metrics. self.metrics = WorkerMetrics::default(); + select_start = Instant::now(); } self.clean().await;