mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 12:30:38 +00:00
@@ -165,6 +165,7 @@ impl CompactionTaskImpl {
|
||||
sender: expire_delete_sender,
|
||||
edit,
|
||||
result: Ok(()),
|
||||
started_at: std::time::Instant::now(),
|
||||
update_region_state: false,
|
||||
is_staging: false,
|
||||
}),
|
||||
|
||||
@@ -60,6 +60,24 @@ lazy_static! {
|
||||
exponential_buckets(0.01, 10.0, 7).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
/// Time spent by queued region edit requests before they are started by the region worker.
|
||||
pub static ref REGION_EDIT_QUEUE_WAIT_TIME: HistogramVec = register_histogram_vec!(
|
||||
"greptime_mito_region_edit_queue_wait_time",
|
||||
"mito region edit queue wait time",
|
||||
&[WORKER_LABEL],
|
||||
// 0.001 ~ 10000
|
||||
exponential_buckets(0.001, 10.0, 8).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
/// Time spent while a region is in the Editing state for region edit requests.
|
||||
pub static ref REGION_EDIT_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_mito_region_edit_elapsed",
|
||||
"mito region edit elapsed time",
|
||||
&[WORKER_LABEL],
|
||||
// 0.001 ~ 10000
|
||||
exponential_buckets(0.001, 10.0, 8).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// ------ Flush related metrics
|
||||
/// Counter of scheduled flush requests.
|
||||
|
||||
@@ -1115,6 +1115,8 @@ pub(crate) struct RegionEditResult {
|
||||
pub(crate) edit: RegionEdit,
|
||||
/// Result from the manifest manager.
|
||||
pub(crate) result: Result<()>,
|
||||
/// The instant when the region entered Editing state for this request.
|
||||
pub(crate) started_at: std::time::Instant,
|
||||
/// Whether region state need to be set to Writable after handling this request.
|
||||
pub(crate) update_region_state: bool,
|
||||
/// The region is in staging mode before handling this request.
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use common_telemetry::{info, warn};
|
||||
use parquet::file::metadata::PageIndexPolicy;
|
||||
@@ -34,7 +35,9 @@ use crate::manifest::action::{
|
||||
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
|
||||
};
|
||||
use crate::memtable::MemtableBuilderProvider;
|
||||
use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
|
||||
use crate::metrics::{
|
||||
REGION_EDIT_ELAPSED, REGION_EDIT_QUEUE_WAIT_TIME, WRITE_CACHE_INFLIGHT_DOWNLOAD,
|
||||
};
|
||||
use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
|
||||
use crate::region::options::RegionOptions;
|
||||
use crate::region::version::VersionControlRef;
|
||||
@@ -55,7 +58,12 @@ pub(crate) type RegionEditQueues = HashMap<RegionId, RegionEditQueue>;
|
||||
/// Everything is done in the region worker loop.
|
||||
pub(crate) struct RegionEditQueue {
|
||||
region_id: RegionId,
|
||||
requests: VecDeque<RegionEditRequest>,
|
||||
requests: VecDeque<QueuedRegionEditRequest>,
|
||||
}
|
||||
|
||||
struct QueuedRegionEditRequest {
|
||||
request: RegionEditRequest,
|
||||
queued_at: Instant,
|
||||
}
|
||||
|
||||
impl RegionEditQueue {
|
||||
@@ -78,10 +86,13 @@ impl RegionEditQueue {
|
||||
);
|
||||
return;
|
||||
};
|
||||
self.requests.push_back(request);
|
||||
self.requests.push_back(QueuedRegionEditRequest {
|
||||
request,
|
||||
queued_at: Instant::now(),
|
||||
});
|
||||
}
|
||||
|
||||
fn dequeue(&mut self) -> Option<RegionEditRequest> {
|
||||
fn dequeue(&mut self) -> Option<QueuedRegionEditRequest> {
|
||||
self.requests.pop_front()
|
||||
}
|
||||
}
|
||||
@@ -259,6 +270,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
let _ = sender.send(Err(e));
|
||||
return;
|
||||
}
|
||||
let started_at = Instant::now();
|
||||
|
||||
let request_sender = self.sender.clone();
|
||||
let cache_manager = self.cache_manager.clone();
|
||||
@@ -275,6 +287,7 @@ impl<S> RegionWorkerLoop<S> {
|
||||
sender,
|
||||
edit,
|
||||
result,
|
||||
started_at,
|
||||
// we always need to restore region state after region edit
|
||||
update_region_state: true,
|
||||
is_staging,
|
||||
@@ -335,13 +348,21 @@ impl<S> RegionWorkerLoop<S> {
|
||||
|
||||
need_compaction
|
||||
};
|
||||
if edit_result.update_region_state {
|
||||
REGION_EDIT_ELAPSED
|
||||
.with_label_values(&[&self.id.to_string()])
|
||||
.observe(edit_result.started_at.elapsed().as_secs_f64());
|
||||
}
|
||||
|
||||
let _ = edit_result.sender.send(edit_result.result);
|
||||
|
||||
if let Some(edit_queue) = self.region_edit_queues.get_mut(&edit_result.region_id)
|
||||
&& let Some(request) = edit_queue.dequeue()
|
||||
&& let Some(queued) = edit_queue.dequeue()
|
||||
{
|
||||
self.handle_region_edit(request);
|
||||
REGION_EDIT_QUEUE_WAIT_TIME
|
||||
.with_label_values(&[&self.id.to_string()])
|
||||
.observe(queued.queued_at.elapsed().as_secs_f64());
|
||||
self.handle_region_edit(queued.request);
|
||||
}
|
||||
|
||||
if need_compaction {
|
||||
|
||||
Reference in New Issue
Block a user