chore/bench-metrics: Add configurable slow threshold for region worker

• Introduced slow_threshold environment variable to set a custom threshold for slow operations, defaulting to 1000 milliseconds.
 • Updated RegionWorkerLoop to use slow_threshold for performance monitoring.
 • Adjusted logic to include select_cost in the slow operation check.
This commit is contained in:
Lei, HUANG
2024-12-13 10:52:18 +08:00
parent f3e0a31e5d
commit 7139ba08c8

View File

@@ -27,6 +27,7 @@ mod handle_truncate;
mod handle_write;
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -430,6 +431,12 @@ impl<S: LogStore> WorkerStarter<S> {
let running = Arc::new(AtomicBool::new(true));
let now = self.time_provider.current_time_millis();
let id_string = self.id.to_string();
let slow_threshold = std::env::var("slow_threshold")
.ok()
.and_then(|v| u64::from_str(&v).ok())
.unwrap_or(1000);
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config.clone(),
@@ -471,6 +478,7 @@ impl<S: LogStore> WorkerStarter<S> {
schema_metadata_manager: self.schema_metadata_manager,
metrics: WorkerMetrics::default(),
stall_start: None,
slow_threshold: Duration::from_millis(slow_threshold),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
@@ -726,6 +734,7 @@ struct RegionWorkerLoop<S> {
metrics: WorkerMetrics,
/// Last stall start time.
stall_start: Option<Instant>,
slow_threshold: Duration,
}
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -741,6 +750,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Buffer to retrieve requests from receiver.
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
let mut select_start = Instant::now();
while self.running.load(Ordering::Relaxed) {
// Clear the buffer before handling next batch of requests.
buffer.clear();
@@ -749,7 +759,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let sleep = tokio::time::sleep(max_wait_time);
tokio::pin!(sleep);
let select_start = Instant::now();
tokio::select! {
request_opt = self.receiver.recv() => {
match request_opt {
@@ -816,7 +825,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.metrics.handle_periodical_task_cost = start.elapsed();
self.metrics.handle_cost = handle_start.elapsed();
if self.metrics.handle_cost > Duration::from_secs(3) {
if self.metrics.handle_cost + self.metrics.select_cost > self.slow_threshold {
info!(
"Region worker too slow, id: {}, metrics: {:?}",
self.id, self.metrics
@@ -824,6 +833,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// Clear the metrics.
self.metrics = WorkerMetrics::default();
select_start = Instant::now();
}
self.clean().await;