Compare commits

...

7 Commits

Author SHA1 Message Date
evenyag
9d3dc2d311 chore: Merge branch 'main' into chore/bench-metrics 2024-12-19 16:07:43 +08:00
Lei, HUANG
0c302ba127 fix: handle stall metrics 2024-12-13 11:08:11 +08:00
Lei, HUANG
7139ba08c8 chore/bench-metrics: Add configurable slow threshold for region worker
• Introduced slow_threshold environment variable to set a custom threshold for slow operations, defaulting to 1000 milliseconds.
 • Updated RegionWorkerLoop to use slow_threshold for performance monitoring.
 • Adjusted logic to include select_cost in the slow operation check.
2024-12-13 10:52:18 +08:00
Lei, HUANG
f3e0a31e5d chore/bench-metrics: Add Metrics for Compaction and Flush Operations
• Introduced INFLIGHT_COMPACTION_COUNT and INFLIGHT_FLUSH_COUNT metrics to track the number of ongoing compaction and flush operations.
 • Incremented INFLIGHT_COMPACTION_COUNT when scheduling remote and local compaction jobs, and decremented it upon completion.
 • Added INFLIGHT_FLUSH_COUNT increment and decrement logic around flush tasks to monitor active flush operations.
 • Removed redundant metric updates in worker.rs and handle_compaction.rs to streamline metric handling.
2024-12-12 21:38:13 +08:00
Lei, HUANG
36c82121fb chore/bench-metrics: Add INFLIGHT_FLUSH_COUNT Metric to Flush Process
• Introduced INFLIGHT_FLUSH_COUNT metric to track the number of ongoing flush operations.
 • Incremented INFLIGHT_FLUSH_COUNT in FlushScheduler to monitor active flushes.
 • Removed redundant increment of INFLIGHT_FLUSH_COUNT in RegionWorkerLoop to prevent double counting.
2024-12-12 21:38:13 +08:00
evenyag
716bb82d37 feat: add slow metrics for worker 2024-12-12 21:13:07 +08:00
Lei, HUANG
2bb450b09a add metrics 2024-12-12 20:23:01 +08:00
4 changed files with 108 additions and 8 deletions

View File

@@ -271,11 +271,11 @@ impl CompactionScheduler {
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_id);
TimeToLive::default()
});
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_id);
TimeToLive::default()
});
debug!(
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
@@ -351,7 +351,7 @@ impl CompactionScheduler {
job_id: None,
reason: e.reason,
}
.fail();
.fail();
}
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);

View File

@@ -27,9 +27,10 @@ mod handle_truncate;
mod handle_write;
use std::collections::HashMap;
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
@@ -432,6 +433,12 @@ impl<S: LogStore> WorkerStarter<S> {
let running = Arc::new(AtomicBool::new(true));
let now = self.time_provider.current_time_millis();
let id_string = self.id.to_string();
let slow_threshold = std::env::var("slow_threshold")
.ok()
.and_then(|v| u64::from_str(&v).ok())
.unwrap_or(1000);
let mut worker_thread = RegionWorkerLoop {
id: self.id,
config: self.config.clone(),
@@ -471,6 +478,9 @@ impl<S: LogStore> WorkerStarter<S> {
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
schema_metadata_manager: self.schema_metadata_manager,
metrics: WorkerMetrics::default(),
stall_start: None,
slow_threshold: Duration::from_millis(slow_threshold),
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
@@ -624,6 +634,46 @@ impl StalledRequests {
}
}
/// Local metrics of the worker.
#[derive(Debug, Default)]
struct WorkerMetrics {
/// Elapsed time of the select block.
select_cost: Duration,
/// Number of times waking up by flush.
num_flush_wake: usize,
/// Number of times waking up by periodical tasks.
num_periodical_wake: usize,
/// Number of requests to handle.
num_requests: usize,
/// Number of stalled requests to process.
num_stalled_request_processed: usize,
/// Number of stalled requests to add.
num_stalled_request_added: usize,
/// Number of write stall.
num_stall: usize,
/// Total time of handling stall requests.
handle_stall_cost: Duration,
/// Total time of handling requests.
handle_request_cost: Duration,
/// Total time of handling write requests.
handle_write_request_cost: Duration,
/// Total time of handling periodical tasks.
handle_periodical_task_cost: Duration,
/// Total time of handling requests after waking up.
handle_cost: Duration,
// Cost of handle write
/// Total time of flushing the worker.
flush_worker_cost: Duration,
/// Total time of writing WAL.
write_wal_cost: Duration,
/// Total time of writing memtables.
write_memtable_cost: Duration,
/// Total time of stall.
stall_cost: Duration,
}
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
/// Id of the worker.
@@ -682,6 +732,11 @@ struct RegionWorkerLoop<S> {
region_edit_queues: RegionEditQueues,
/// Database level metadata manager.
schema_metadata_manager: SchemaMetadataManagerRef,
/// Metrics of the worker in one loop.
metrics: WorkerMetrics,
/// Last stall start time.
stall_start: Option<Instant>,
slow_threshold: Duration,
}
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -697,6 +752,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Buffer to retrieve requests from receiver.
let mut buffer = RequestBuffer::with_capacity(self.config.worker_request_batch_size);
let mut select_start = Instant::now();
while self.running.load(Ordering::Relaxed) {
// Clear the buffer before handling next batch of requests.
buffer.clear();
@@ -714,6 +770,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
recv_res = self.flush_receiver.changed() => {
self.metrics.num_flush_wake += 1;
if recv_res.is_err() {
// The channel is disconnected.
break;
@@ -729,12 +786,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
_ = &mut sleep => {
self.metrics.num_periodical_wake += 1;
// Timeout. Checks periodical tasks.
self.handle_periodical_tasks();
continue;
}
}
self.metrics.select_cost = select_start.elapsed();
let handle_start = Instant::now();
if self.flush_receiver.has_changed().unwrap_or(false) {
// Always checks whether we could process stalled requests to avoid a request
// hangs too long.
@@ -751,12 +812,28 @@ impl<S: LogStore> RegionWorkerLoop<S> {
Err(_) => break,
}
}
self.metrics.num_requests = buffer.len();
self.listener.on_recv_requests(buffer.len());
let start = Instant::now();
self.handle_requests(&mut buffer).await;
self.metrics.handle_request_cost = start.elapsed();
let start = Instant::now();
self.handle_periodical_tasks();
self.metrics.handle_periodical_task_cost = start.elapsed();
self.metrics.handle_cost = handle_start.elapsed();
if self.metrics.handle_cost + self.metrics.select_cost > self.slow_threshold {
info!(
"Region worker too slow, id: {}, metrics: {:?}",
self.id, self.metrics
);
}
// Clear the metrics.
self.metrics = WorkerMetrics::default();
select_start = Instant::now();
}
self.clean().await;
@@ -804,7 +881,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Handles all write requests first. So we can alter regions without
// considering existing write requests.
let start = Instant::now();
self.handle_write_requests(write_requests, true).await;
self.metrics.handle_write_request_cost = start.elapsed();
self.handle_ddl_requests(ddl_requests).await;
}

View File

@@ -18,7 +18,7 @@ use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
use crate::error::RegionNotFoundSnafu;
use crate::metrics::COMPACTION_REQUEST_COUNT;
use crate::metrics::{COMPACTION_REQUEST_COUNT};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;

View File

@@ -16,6 +16,7 @@
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use std::time::Instant;
use api::v1::OpType;
use common_telemetry::debug;
@@ -43,7 +44,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// Flush this worker if the engine needs to flush.
let start = Instant::now();
self.maybe_flush_worker();
self.metrics.flush_worker_cost += start.elapsed();
if self.should_reject_write() {
// The memory pressure is still too high, reject write requests.
@@ -55,6 +58,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if self.write_buffer_manager.should_stall() && allow_stall {
self.stalled_count.add(write_requests.len() as i64);
self.metrics.num_stalled_request_added += write_requests.len();
self.metrics.num_stall += 1;
self.stall_start = Some(Instant::now());
self.stalled_requests.append(&mut write_requests);
self.listener.on_write_stall();
return;
@@ -70,6 +76,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Write to WAL.
{
let start = Instant::now();
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["write_wal"])
.start_timer();
@@ -87,12 +94,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
region_ctx.set_next_entry_id(last_entry_id + 1);
}
self.metrics.write_wal_cost += start.elapsed();
}
Err(e) => {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
self.metrics.write_wal_cost += start.elapsed();
return;
}
}
@@ -101,6 +110,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let (mut put_rows, mut delete_rows) = (0, 0);
// Write to memtables.
{
let start = Instant::now();
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["write_memtable"])
.start_timer();
@@ -109,6 +119,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
put_rows += region_ctx.put_num;
delete_rows += region_ctx.delete_num;
}
self.metrics.write_memtable_cost += start.elapsed();
}
WRITE_ROWS_TOTAL
.with_label_values(&["put"])
@@ -120,13 +131,19 @@ impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles all stalled write requests.
pub(crate) async fn handle_stalled_requests(&mut self) {
let handle_stall_start = Instant::now();
if let Some(start) = self.stall_start.take() {
self.metrics.stall_cost += start.elapsed();
}
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
for (_, (_, requests)) in stalled.requests {
self.metrics.num_stalled_request_processed += requests.len();
self.handle_write_requests(requests, false).await;
}
self.metrics.handle_stall_cost += handle_stall_start.elapsed();
}
/// Rejects all stalled requests.
@@ -149,7 +166,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
if let Some(start) = self.stall_start.take() {
self.metrics.stall_cost += start.elapsed();
}
let requests = self.stalled_requests.remove(region_id);
self.metrics.num_stalled_request_processed += requests.len();
self.stalled_count.sub(requests.len() as i64);
self.handle_write_requests(requests, true).await;
}