diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index a8182353aa..4bc98fff8a 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -165,6 +165,7 @@ impl CompactionTaskImpl { sender: expire_delete_sender, edit, result: Ok(()), + started_at: std::time::Instant::now(), update_region_state: false, is_staging: false, }), diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 30a7ac765c..470e9b7357 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -60,6 +60,24 @@ lazy_static! { exponential_buckets(0.01, 10.0, 7).unwrap(), ) .unwrap(); + /// Time spent by queued region edit requests before they are started by the region worker. + pub static ref REGION_EDIT_QUEUE_WAIT_TIME: HistogramVec = register_histogram_vec!( + "greptime_mito_region_edit_queue_wait_time", + "mito region edit queue wait time", + &[WORKER_LABEL], + // 0.001 ~ 10000 + exponential_buckets(0.001, 10.0, 8).unwrap(), + ) + .unwrap(); + /// Time spent while a region is in the Editing state for region edit requests. + pub static ref REGION_EDIT_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_mito_region_edit_elapsed", + "mito region edit elapsed time", + &[WORKER_LABEL], + // 0.001 ~ 10000 + exponential_buckets(0.001, 10.0, 8).unwrap(), + ) + .unwrap(); // ------ Flush related metrics /// Counter of scheduled flush requests. diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index cc965f48df..78348a21f8 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -1115,6 +1115,8 @@ pub(crate) struct RegionEditResult { pub(crate) edit: RegionEdit, /// Result from the manifest manager. pub(crate) result: Result<()>, + /// The instant when the region entered Editing state for this request. + pub(crate) started_at: std::time::Instant, /// Whether region state need to be set to Writable after handling this request. pub(crate) update_region_state: bool, /// The region is in staging mode before handling this request. diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 7eb9f958b9..828b0897cb 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -19,6 +19,7 @@ use std::collections::{HashMap, VecDeque}; use std::num::NonZeroU64; use std::sync::Arc; +use std::time::Instant; use common_telemetry::{info, warn}; use parquet::file::metadata::PageIndexPolicy; @@ -34,7 +35,9 @@ use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; use crate::memtable::MemtableBuilderProvider; -use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD; +use crate::metrics::{ + REGION_EDIT_ELAPSED, REGION_EDIT_QUEUE_WAIT_TIME, WRITE_CACHE_INFLIGHT_DOWNLOAD, +}; use crate::region::opener::{sanitize_region_options, version_builder_from_manifest}; use crate::region::options::RegionOptions; use crate::region::version::VersionControlRef; @@ -55,7 +58,12 @@ pub(crate) type RegionEditQueues = HashMap; /// Everything is done in the region worker loop. pub(crate) struct RegionEditQueue { region_id: RegionId, - requests: VecDeque, + requests: VecDeque, +} + +struct QueuedRegionEditRequest { + request: RegionEditRequest, + queued_at: Instant, } impl RegionEditQueue { @@ -78,10 +86,13 @@ impl RegionEditQueue { ); return; }; - self.requests.push_back(request); + self.requests.push_back(QueuedRegionEditRequest { + request, + queued_at: Instant::now(), + }); } - fn dequeue(&mut self) -> Option { + fn dequeue(&mut self) -> Option { self.requests.pop_front() } } @@ -259,6 +270,7 @@ impl RegionWorkerLoop { let _ = sender.send(Err(e)); return; } + let started_at = Instant::now(); let request_sender = self.sender.clone(); let cache_manager = self.cache_manager.clone(); @@ -275,6 +287,7 @@ impl RegionWorkerLoop { sender, edit, result, + started_at, // we always need to restore region state after region edit update_region_state: true, is_staging, @@ -335,13 +348,21 @@ impl RegionWorkerLoop { need_compaction }; + if edit_result.update_region_state { + REGION_EDIT_ELAPSED + .with_label_values(&[&self.id.to_string()]) + .observe(edit_result.started_at.elapsed().as_secs_f64()); + } let _ = edit_result.sender.send(edit_result.result); if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id) - && let Some(request) = edit_queue.dequeue() + && let Some(queued) = edit_queue.dequeue() { - self.handle_region_edit(request); + REGION_EDIT_QUEUE_WAIT_TIME + .with_label_values(&[&self.id.to_string()]) + .observe(queued.queued_at.elapsed().as_secs_f64()); + self.handle_region_edit(queued.request); } if need_compaction {