feat: add slow metrics for worker

This commit is contained in:
evenyag
2024-12-12 21:13:07 +08:00
parent 2bb450b09a
commit 716bb82d37
2 changed files with 91 additions and 1 deletions

View File

@@ -29,7 +29,7 @@ use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
@@ -469,6 +469,8 @@ impl<S: LogStore> WorkerStarter<S> {
region_count: REGION_COUNT.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
schema_metadata_manager: self.schema_metadata_manager,
metrics: WorkerMetrics::default(),
stall_start: None,
};
let handle = common_runtime::spawn_global(async move {
worker_thread.run().await;
@@ -622,6 +624,46 @@ impl StalledRequests {
}
}
/// Local metrics of the worker.
#[derive(Debug, Default)]
struct WorkerMetrics {
/// Elapsed time of the select block.
select_cost: Duration,
/// Number of times waking up by flush.
num_flush_wake: usize,
/// Number of times waking up by periodical tasks.
num_periodical_wake: usize,
/// Number of requests to handle.
num_requests: usize,
/// Number of stalled requests to process.
num_stalled_request_processed: usize,
/// Number of stalled requests to add.
num_stalled_request_added: usize,
/// Number of write stall.
num_stall: usize,
/// Total time of handling stall requests.
handle_stall_cost: Duration,
/// Total time of handling requests.
handle_request_cost: Duration,
/// Total time of handling write requests.
handle_write_request_cost: Duration,
/// Total time of handling periodical tasks.
handle_periodical_task_cost: Duration,
/// Total time of handling requests after waking up.
handle_cost: Duration,
// Cost of handle write
/// Total time of flushing the worker.
flush_worker_cost: Duration,
/// Total time of writing WAL.
write_wal_cost: Duration,
/// Total time of writing memtables.
write_memtable_cost: Duration,
/// Total time of stall.
stall_cost: Duration,
}
/// Background worker loop to handle requests.
struct RegionWorkerLoop<S> {
/// Id of the worker.
@@ -680,6 +722,10 @@ struct RegionWorkerLoop<S> {
region_edit_queues: RegionEditQueues,
/// Database level metadata manager.
schema_metadata_manager: SchemaMetadataManagerRef,
/// Metrics of the worker in one loop.
metrics: WorkerMetrics,
/// Last stall start time.
stall_start: Option<Instant>,
}
impl<S: LogStore> RegionWorkerLoop<S> {
@@ -703,6 +749,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let sleep = tokio::time::sleep(max_wait_time);
tokio::pin!(sleep);
let select_start = Instant::now();
tokio::select! {
request_opt = self.receiver.recv() => {
match request_opt {
@@ -712,6 +759,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
recv_res = self.flush_receiver.changed() => {
self.metrics.num_flush_wake += 1;
if recv_res.is_err() {
// The channel is disconnected.
break;
@@ -727,17 +775,23 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
}
_ = &mut sleep => {
self.metrics.num_periodical_wake += 1;
// Timeout. Checks periodical tasks.
self.handle_periodical_tasks();
continue;
}
}
self.metrics.select_cost = select_start.elapsed();
let handle_start = Instant::now();
if self.flush_receiver.has_changed().unwrap_or(false) {
let start = Instant::now();
// Always checks whether we could process stalled requests to avoid a request
// hangs too long.
// If the channel is closed, do nothing.
self.handle_stalled_requests().await;
self.metrics.handle_stall_cost = start.elapsed();
}
// Try to recv more requests from the channel.
@@ -749,12 +803,27 @@ impl<S: LogStore> RegionWorkerLoop<S> {
Err(_) => break,
}
}
self.metrics.num_requests = buffer.len();
self.listener.on_recv_requests(buffer.len());
let start = Instant::now();
self.handle_requests(&mut buffer).await;
self.metrics.handle_request_cost = start.elapsed();
let start = Instant::now();
self.handle_periodical_tasks();
self.metrics.handle_periodical_task_cost = start.elapsed();
self.metrics.handle_cost = handle_start.elapsed();
if self.metrics.handle_cost > Duration::from_secs(3) {
info!(
"Region worker too slow, id: {}, metrics: {:?}",
self.id, self.metrics
);
}
// Clear the metrics.
self.metrics = WorkerMetrics::default();
}
self.clean().await;
@@ -802,7 +871,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Handles all write requests first. So we can alter regions without
// considering existing write requests.
let start = Instant::now();
self.handle_write_requests(write_requests, true).await;
self.metrics.handle_write_request_cost = start.elapsed();
self.handle_ddl_requests(ddl_requests).await;
}

View File

@@ -16,6 +16,7 @@
use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use std::time::Instant;
use api::v1::OpType;
use common_telemetry::debug;
@@ -43,7 +44,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
// Flush this worker if the engine needs to flush.
let start = Instant::now();
self.maybe_flush_worker();
self.metrics.flush_worker_cost += start.elapsed();
if self.should_reject_write() {
// The memory pressure is still too high, reject write requests.
@@ -55,6 +58,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
if self.write_buffer_manager.should_stall() && allow_stall {
self.stalled_count.add(write_requests.len() as i64);
self.metrics.num_stalled_request_added += write_requests.len();
self.metrics.num_stall += 1;
self.stall_start = Some(Instant::now());
self.stalled_requests.append(&mut write_requests);
self.listener.on_write_stall();
return;
@@ -70,6 +76,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Write to WAL.
{
let start = Instant::now();
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["write_wal"])
.start_timer();
@@ -87,12 +94,14 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let last_entry_id = response.last_entry_ids.get(region_id).unwrap();
region_ctx.set_next_entry_id(last_entry_id + 1);
}
self.metrics.write_wal_cost += start.elapsed();
}
Err(e) => {
// Failed to write wal.
for mut region_ctx in region_ctxs.into_values() {
region_ctx.set_error(e.clone());
}
self.metrics.write_wal_cost += start.elapsed();
return;
}
}
@@ -101,6 +110,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let (mut put_rows, mut delete_rows) = (0, 0);
// Write to memtables.
{
let start = Instant::now();
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["write_memtable"])
.start_timer();
@@ -109,6 +119,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
put_rows += region_ctx.put_num;
delete_rows += region_ctx.delete_num;
}
self.metrics.write_memtable_cost += start.elapsed();
}
WRITE_ROWS_TOTAL
.with_label_values(&["put"])
@@ -120,11 +131,15 @@ impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles all stalled write requests.
pub(crate) async fn handle_stalled_requests(&mut self) {
if let Some(start) = self.stall_start.take() {
self.metrics.stall_cost += start.elapsed();
}
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.requests.len() as i64);
// We already stalled these requests, don't stall them again.
for (_, (_, requests)) in stalled.requests {
self.metrics.num_stalled_request_processed += requests.len();
self.handle_write_requests(requests, false).await;
}
}
@@ -149,7 +164,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
/// Handles a specific region's stalled requests.
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
if let Some(start) = self.stall_start.take() {
self.metrics.stall_cost += start.elapsed();
}
let requests = self.stalled_requests.remove(region_id);
self.metrics.num_stalled_request_processed += requests.len();
self.stalled_count.sub(requests.len() as i64);
self.handle_write_requests(requests, true).await;
}