diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index bf0944a9f0..451de62d81 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -29,7 +29,7 @@ use std::collections::HashMap; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use common_base::Plugins; use common_meta::key::SchemaMetadataManagerRef; @@ -469,6 +469,8 @@ impl WorkerStarter { region_count: REGION_COUNT.with_label_values(&[&id_string]), region_edit_queues: RegionEditQueues::default(), schema_metadata_manager: self.schema_metadata_manager, + metrics: WorkerMetrics::default(), + stall_start: None, }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -622,6 +624,46 @@ impl StalledRequests { } } +/// Local metrics of the worker. +#[derive(Debug, Default)] +struct WorkerMetrics { + /// Elapsed time of the select block. + select_cost: Duration, + /// Number of times waking up by flush. + num_flush_wake: usize, + /// Number of times waking up by periodical tasks. + num_periodical_wake: usize, + /// Number of requests to handle. + num_requests: usize, + /// Number of stalled requests to process. + num_stalled_request_processed: usize, + /// Number of stalled requests to add. + num_stalled_request_added: usize, + /// Number of write stall. + num_stall: usize, + + /// Total time of handling stall requests. + handle_stall_cost: Duration, + /// Total time of handling requests. + handle_request_cost: Duration, + /// Total time of handling write requests. + handle_write_request_cost: Duration, + /// Total time of handling periodical tasks. + handle_periodical_task_cost: Duration, + /// Total time of handling requests after waking up. + handle_cost: Duration, + + // Cost of handle write + /// Total time of flushing the worker. + flush_worker_cost: Duration, + /// Total time of writing WAL. + write_wal_cost: Duration, + /// Total time of writing memtables. + write_memtable_cost: Duration, + /// Total time of stall. + stall_cost: Duration, +} + /// Background worker loop to handle requests. struct RegionWorkerLoop { /// Id of the worker. @@ -680,6 +722,10 @@ struct RegionWorkerLoop { region_edit_queues: RegionEditQueues, /// Database level metadata manager. schema_metadata_manager: SchemaMetadataManagerRef, + /// Metrics of the worker in one loop. + metrics: WorkerMetrics, + /// Last stall start time. + stall_start: Option, } impl RegionWorkerLoop { @@ -703,6 +749,7 @@ impl RegionWorkerLoop { let sleep = tokio::time::sleep(max_wait_time); tokio::pin!(sleep); + let select_start = Instant::now(); tokio::select! { request_opt = self.receiver.recv() => { match request_opt { @@ -712,6 +759,7 @@ impl RegionWorkerLoop { } } recv_res = self.flush_receiver.changed() => { + self.metrics.num_flush_wake += 1; if recv_res.is_err() { // The channel is disconnected. break; @@ -727,17 +775,23 @@ impl RegionWorkerLoop { } } _ = &mut sleep => { + self.metrics.num_periodical_wake += 1; // Timeout. Checks periodical tasks. self.handle_periodical_tasks(); continue; } } + self.metrics.select_cost = select_start.elapsed(); + let handle_start = Instant::now(); + if self.flush_receiver.has_changed().unwrap_or(false) { + let start = Instant::now(); // Always checks whether we could process stalled requests to avoid a request // hangs too long. // If the channel is closed, do nothing. self.handle_stalled_requests().await; + self.metrics.handle_stall_cost = start.elapsed(); } // Try to recv more requests from the channel. @@ -749,12 +803,27 @@ impl RegionWorkerLoop { Err(_) => break, } } + self.metrics.num_requests = buffer.len(); self.listener.on_recv_requests(buffer.len()); + let start = Instant::now(); self.handle_requests(&mut buffer).await; + self.metrics.handle_request_cost = start.elapsed(); + let start = Instant::now(); self.handle_periodical_tasks(); + self.metrics.handle_periodical_task_cost = start.elapsed(); + + self.metrics.handle_cost = handle_start.elapsed(); + if self.metrics.handle_cost > Duration::from_secs(3) { + info!( + "Region worker too slow, id: {}, metrics: {:?}", + self.id, self.metrics + ); + } + // Clear the metrics. + self.metrics = WorkerMetrics::default(); } self.clean().await; @@ -802,7 +871,9 @@ impl RegionWorkerLoop { // Handles all write requests first. So we can alter regions without // considering existing write requests. + let start = Instant::now(); self.handle_write_requests(write_requests, true).await; + self.metrics.handle_write_request_cost = start.elapsed(); self.handle_ddl_requests(ddl_requests).await; } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 7dccb6952a..03455201c5 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -16,6 +16,7 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; +use std::time::Instant; use api::v1::OpType; use common_telemetry::debug; @@ -43,7 +44,9 @@ impl RegionWorkerLoop { } // Flush this worker if the engine needs to flush. + let start = Instant::now(); self.maybe_flush_worker(); + self.metrics.flush_worker_cost += start.elapsed(); if self.should_reject_write() { // The memory pressure is still too high, reject write requests. @@ -55,6 +58,9 @@ impl RegionWorkerLoop { if self.write_buffer_manager.should_stall() && allow_stall { self.stalled_count.add(write_requests.len() as i64); + self.metrics.num_stalled_request_added += write_requests.len(); + self.metrics.num_stall += 1; + self.stall_start = Some(Instant::now()); self.stalled_requests.append(&mut write_requests); self.listener.on_write_stall(); return; @@ -70,6 +76,7 @@ impl RegionWorkerLoop { // Write to WAL. { + let start = Instant::now(); let _timer = WRITE_STAGE_ELAPSED .with_label_values(&["write_wal"]) .start_timer(); @@ -87,12 +94,14 @@ impl RegionWorkerLoop { let last_entry_id = response.last_entry_ids.get(region_id).unwrap(); region_ctx.set_next_entry_id(last_entry_id + 1); } + self.metrics.write_wal_cost += start.elapsed(); } Err(e) => { // Failed to write wal. for mut region_ctx in region_ctxs.into_values() { region_ctx.set_error(e.clone()); } + self.metrics.write_wal_cost += start.elapsed(); return; } } @@ -101,6 +110,7 @@ impl RegionWorkerLoop { let (mut put_rows, mut delete_rows) = (0, 0); // Write to memtables. { + let start = Instant::now(); let _timer = WRITE_STAGE_ELAPSED .with_label_values(&["write_memtable"]) .start_timer(); @@ -109,6 +119,7 @@ impl RegionWorkerLoop { put_rows += region_ctx.put_num; delete_rows += region_ctx.delete_num; } + self.metrics.write_memtable_cost += start.elapsed(); } WRITE_ROWS_TOTAL .with_label_values(&["put"]) @@ -120,11 +131,15 @@ impl RegionWorkerLoop { /// Handles all stalled write requests. pub(crate) async fn handle_stalled_requests(&mut self) { + if let Some(start) = self.stall_start.take() { + self.metrics.stall_cost += start.elapsed(); + } // Handle stalled requests. let stalled = std::mem::take(&mut self.stalled_requests); self.stalled_count.sub(stalled.requests.len() as i64); // We already stalled these requests, don't stall them again. for (_, (_, requests)) in stalled.requests { + self.metrics.num_stalled_request_processed += requests.len(); self.handle_write_requests(requests, false).await; } } @@ -149,7 +164,11 @@ impl RegionWorkerLoop { /// Handles a specific region's stalled requests. pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) { debug!("Handles stalled requests for region {}", region_id); + if let Some(start) = self.stall_start.take() { + self.metrics.stall_cost += start.elapsed(); + } let requests = self.stalled_requests.remove(region_id); + self.metrics.num_stalled_request_processed += requests.len(); self.stalled_count.sub(requests.len() as i64); self.handle_write_requests(requests, true).await; }