feat: add metrics for request wait time and adjust stall metrics (#6540)

* feat: add metric greptime_mito_request_wait_time to observe wait time

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add worker to wait time metric

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename stall gauge to greptime_mito_write_stalling_count

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: change greptime_mito_write_stall_total to total stalled requests

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: merge lazy static blocks

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-07-18 01:17:51 +08:00
committed by GitHub
parent 90ababca97
commit c23b26461c
10 changed files with 132 additions and 55 deletions

View File

@@ -62,7 +62,7 @@ use crate::read::BoxedBatchReader;
use crate::region::options::MergeMode;
use crate::region::version::VersionControlRef;
use crate::region::ManifestContextRef;
use crate::request::{OptionOutputTx, OutputTx, WorkerRequest};
use crate::request::{OptionOutputTx, OutputTx, WorkerRequestWithTime};
use crate::schedule::remote_job_scheduler::{
CompactionJob, DefaultNotifier, RemoteJob, RemoteJobSchedulerRef,
};
@@ -77,7 +77,7 @@ pub struct CompactionRequest {
pub(crate) current_version: CompactionVersion,
pub(crate) access_layer: AccessLayerRef,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
/// Waiters of the compaction request.
pub(crate) waiters: Vec<OutputTx>,
/// Start time of compaction task.
@@ -101,7 +101,7 @@ pub(crate) struct CompactionScheduler {
/// Compacting regions.
region_status: HashMap<RegionId, CompactionStatus>,
/// Request sender of the worker that this scheduler belongs to.
request_sender: Sender<WorkerRequest>,
request_sender: Sender<WorkerRequestWithTime>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
@@ -112,7 +112,7 @@ pub(crate) struct CompactionScheduler {
impl CompactionScheduler {
pub(crate) fn new(
scheduler: SchedulerRef,
request_sender: Sender<WorkerRequest>,
request_sender: Sender<WorkerRequestWithTime>,
cache_manager: CacheManagerRef,
engine_config: Arc<MitoConfig>,
listener: WorkerListener,
@@ -560,7 +560,7 @@ impl CompactionStatus {
#[allow(clippy::too_many_arguments)]
fn new_compaction_request(
&mut self,
request_sender: Sender<WorkerRequest>,
request_sender: Sender<WorkerRequestWithTime>,
mut waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
cache_manager: CacheManagerRef,

View File

@@ -27,6 +27,7 @@ use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
WorkerRequestWithTime,
};
use crate::worker::WorkerListener;
use crate::{error, metrics};
@@ -37,7 +38,7 @@ pub const MAX_PARALLEL_COMPACTION: usize = 1;
pub(crate) struct CompactionTaskImpl {
pub compaction_region: CompactionRegion,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
/// Senders that are used to notify waiters waiting for pending compaction tasks.
pub waiters: Vec<OutputTx>,
/// Start time of compaction task
@@ -135,7 +136,11 @@ impl CompactionTaskImpl {
/// Notifies region worker to handle post-compaction tasks.
async fn send_to_worker(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
if let Err(e) = self
.request_sender
.send(WorkerRequestWithTime::new(request))
.await
{
error!(
"Failed to notify compaction job status for region {}, request: {:?}",
self.compaction_region.region_id, e.0

View File

@@ -42,7 +42,7 @@ use crate::region::version::{VersionControlData, VersionControlRef};
use crate::region::{ManifestContextRef, RegionLeaderState};
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
SenderDdlRequest, SenderWriteRequest, WorkerRequest,
SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::FileMeta;
@@ -223,7 +223,7 @@ pub(crate) struct RegionFlushTask {
/// Flush result senders.
pub(crate) senders: Vec<OutputTx>,
/// Request sender to notify the worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
pub(crate) access_layer: AccessLayerRef,
pub(crate) listener: WorkerListener,
@@ -441,7 +441,11 @@ impl RegionFlushTask {
/// Notify flush job status.
async fn send_worker_request(&self, request: WorkerRequest) {
if let Err(e) = self.request_sender.send(request).await {
if let Err(e) = self
.request_sender
.send(WorkerRequestWithTime::new(request))
.await
{
error!(
"Failed to notify flush job status for region {}, request: {:?}",
self.region_id, e.0

View File

@@ -94,12 +94,7 @@ lazy_static! {
// ------ Write related metrics
/// Number of stalled write requests in each worker.
pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_write_stall_total",
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
//
/// Counter of rejected write requests.
pub static ref WRITE_REJECT_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap();
@@ -402,6 +397,7 @@ lazy_static! {
}
// Use another block to avoid reaching the recursion limit.
lazy_static! {
/// Counter for compaction input file size.
pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!(
@@ -426,6 +422,27 @@ lazy_static! {
"greptime_mito_memtable_field_builder_count",
"active field builder count in TimeSeriesMemtable",
).unwrap();
/// Number of stalling write requests in each worker.
pub static ref WRITE_STALLING: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_write_stalling_count",
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
/// Total number of stalled write requests.
pub static ref WRITE_STALL_TOTAL: IntCounter = register_int_counter!(
"greptime_mito_write_stall_total",
"Total number of stalled write requests"
).unwrap();
/// Time waiting for requests to be handled by the region worker.
pub static ref REQUEST_WAIT_TIME: HistogramVec = register_histogram_vec!(
"greptime_mito_request_wait_time",
"mito request wait time before being handled by region worker",
&[WORKER_LABEL],
// 0.001 ~ 10000
exponential_buckets(0.001, 10.0, 8).unwrap(),
)
.unwrap();
}
/// Stager notifier to collect metrics.

View File

@@ -545,6 +545,22 @@ pub(crate) struct SenderBulkRequest {
pub(crate) region_metadata: RegionMetadataRef,
}
/// Request sent to a worker with timestamp
#[derive(Debug)]
pub(crate) struct WorkerRequestWithTime {
pub(crate) request: WorkerRequest,
pub(crate) created_at: Instant,
}
impl WorkerRequestWithTime {
pub(crate) fn new(request: WorkerRequest) -> Self {
Self {
request,
created_at: Instant::now(),
}
}
}
/// Request sent to a worker
#[derive(Debug)]
pub(crate) enum WorkerRequest {

View File

@@ -31,6 +31,7 @@ use crate::manifest::action::RegionEdit;
use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
WorkerRequestWithTime,
};
pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
@@ -132,7 +133,7 @@ pub struct CompactionJobResult {
/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
pub(crate) struct DefaultNotifier {
/// The sender to send WorkerRequest to the mito engine. This is used to notify the mito engine when a remote job is completed.
pub(crate) request_sender: Sender<WorkerRequest>,
pub(crate) request_sender: Sender<WorkerRequestWithTime>,
}
impl DefaultNotifier {
@@ -175,10 +176,10 @@ impl Notifier for DefaultNotifier {
if let Err(e) = self
.request_sender
.send(WorkerRequest::Background {
.send(WorkerRequestWithTime::new(WorkerRequest::Background {
region_id: result.region_id,
notify,
})
}))
.await
{
error!(

View File

@@ -32,7 +32,7 @@ use crate::error::Result;
use crate::flush::FlushScheduler;
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequest;
use crate::request::{WorkerRequest, WorkerRequestWithTime};
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -85,7 +85,7 @@ impl SchedulerEnv {
/// Creates a new compaction scheduler.
pub(crate) fn mock_compaction_scheduler(
&self,
request_sender: Sender<WorkerRequest>,
request_sender: Sender<WorkerRequestWithTime>,
) -> CompactionScheduler {
let scheduler = self.get_scheduler();

View File

@@ -39,7 +39,7 @@ use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use futures::future::try_join_all;
use object_store::manager::ObjectStoreManagerRef;
use prometheus::IntGauge;
use prometheus::{Histogram, IntGauge};
use rand::{rng, Rng};
use snafu::{ensure, ResultExt};
use store_api::logstore::LogStore;
@@ -58,11 +58,11 @@ use crate::error;
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
use crate::metrics::{REGION_COUNT, REQUEST_WAIT_TIME, WRITE_STALLING};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderBulkRequest, SenderDdlRequest, SenderWriteRequest,
WorkerRequest,
WorkerRequest, WorkerRequestWithTime,
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file::FileId;
@@ -469,8 +469,9 @@ impl<S: LogStore> WorkerStarter<S> {
last_periodical_check_millis: now,
flush_sender: self.flush_sender,
flush_receiver: self.flush_receiver,
stalled_count: WRITE_STALL_TOTAL.with_label_values(&[&id_string]),
stalling_count: WRITE_STALLING.with_label_values(&[&id_string]),
region_count: REGION_COUNT.with_label_values(&[&id_string]),
request_wait_time: REQUEST_WAIT_TIME.with_label_values(&[&id_string]),
region_edit_queues: RegionEditQueues::default(),
schema_metadata_manager: self.schema_metadata_manager,
};
@@ -498,7 +499,7 @@ pub(crate) struct RegionWorker {
/// The opening regions.
opening_regions: OpeningRegionsRef,
/// Request sender.
sender: Sender<WorkerRequest>,
sender: Sender<WorkerRequestWithTime>,
/// Handle to the worker thread.
handle: Mutex<Option<JoinHandle<()>>>,
/// Whether to run the worker thread.
@@ -509,7 +510,8 @@ impl RegionWorker {
/// Submits request to background worker thread.
async fn submit_request(&self, request: WorkerRequest) -> Result<()> {
ensure!(self.is_running(), WorkerStoppedSnafu { id: self.id });
if self.sender.send(request).await.is_err() {
let request_with_time = WorkerRequestWithTime::new(request);
if self.sender.send(request_with_time).await.is_err() {
warn!(
"Worker {} is already exited but the running flag is still true",
self.id
@@ -531,7 +533,12 @@ impl RegionWorker {
info!("Stop region worker {}", self.id);
self.set_running(false);
if self.sender.send(WorkerRequest::Stop).await.is_err() {
if self
.sender
.send(WorkerRequestWithTime::new(WorkerRequest::Stop))
.await
.is_err()
{
warn!("Worker {} is already exited before stop", self.id);
}
@@ -669,9 +676,9 @@ struct RegionWorkerLoop<S> {
/// Regions that are opening.
opening_regions: OpeningRegionsRef,
/// Request sender.
sender: Sender<WorkerRequest>,
sender: Sender<WorkerRequestWithTime>,
/// Request receiver.
receiver: Receiver<WorkerRequest>,
receiver: Receiver<WorkerRequestWithTime>,
/// WAL of the engine.
wal: Wal<S>,
/// Manages object stores for manifest and SSTs.
@@ -706,10 +713,12 @@ struct RegionWorkerLoop<S> {
flush_sender: watch::Sender<()>,
/// Watch channel receiver to wait for background flush job.
flush_receiver: watch::Receiver<()>,
/// Gauge of stalled request count.
stalled_count: IntGauge,
/// Gauge of stalling request count.
stalling_count: IntGauge,
/// Gauge of regions in the worker.
region_count: IntGauge,
/// Histogram of request wait time for this worker.
request_wait_time: Histogram,
/// Queues for region edit requests.
region_edit_queues: RegionEditQueues,
/// Database level metadata manager.
@@ -749,10 +758,16 @@ impl<S: LogStore> RegionWorkerLoop<S> {
tokio::select! {
request_opt = self.receiver.recv() => {
match request_opt {
Some(request) => match request {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
_ => general_req_buffer.push(request),
Some(request_with_time) => {
// Observe the wait time
let wait_time = request_with_time.created_at.elapsed();
self.request_wait_time.observe(wait_time.as_secs_f64());
match request_with_time.request {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
req => general_req_buffer.push(req),
}
},
// The channel is disconnected.
None => break,
@@ -791,11 +806,17 @@ impl<S: LogStore> RegionWorkerLoop<S> {
for _ in 1..self.config.worker_request_batch_size {
// We have received one request so we start from 1.
match self.receiver.try_recv() {
Ok(req) => match req {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
_ => general_req_buffer.push(req),
},
Ok(request_with_time) => {
// Observe the wait time
let wait_time = request_with_time.created_at.elapsed();
self.request_wait_time.observe(wait_time.as_secs_f64());
match request_with_time.request {
WorkerRequest::Write(sender_req) => write_req_buffer.push(sender_req),
WorkerRequest::Ddl(sender_req) => ddl_req_buffer.push(sender_req),
req => general_req_buffer.push(req),
}
}
// We still need to handle remaining requests.
Err(_) => break,
}

View File

@@ -34,7 +34,7 @@ use crate::region::version::VersionBuilder;
use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
use crate::request::{
BackgroundNotify, OptionOutputTx, RegionChangeResult, RegionEditRequest, RegionEditResult,
RegionSyncRequest, TruncateResult, WorkerRequest,
RegionSyncRequest, TruncateResult, WorkerRequest, WorkerRequestWithTime,
};
use crate::sst::location;
use crate::worker::{RegionWorkerLoop, WorkerListener};
@@ -230,7 +230,10 @@ impl<S> RegionWorkerLoop<S> {
}),
};
// We don't set state back as the worker loop is already exited.
if let Err(res) = request_sender.send(notify).await {
if let Err(res) = request_sender
.send(WorkerRequestWithTime::new(notify))
.await
{
warn!(
"Failed to send region edit result back to the worker, region_id: {}, res: {:?}",
region_id, res
@@ -318,10 +321,10 @@ impl<S> RegionWorkerLoop<S> {
truncated_sequence: truncate.truncated_sequence,
};
let _ = request_sender
.send(WorkerRequest::Background {
.send(WorkerRequestWithTime::new(WorkerRequest::Background {
region_id: truncate.region_id,
notify: BackgroundNotify::Truncate(truncate_result),
})
}))
.await
.inspect_err(|_| warn!("failed to send truncate result"));
});
@@ -364,7 +367,10 @@ impl<S> RegionWorkerLoop<S> {
.on_notify_region_change_result_begin(region.region_id)
.await;
if let Err(res) = request_sender.send(notify).await {
if let Err(res) = request_sender
.send(WorkerRequestWithTime::new(notify))
.await
{
warn!(
"Failed to send region change result back to the worker, region_id: {}, res: {:?}",
region.region_id, res

View File

@@ -27,7 +27,9 @@ use store_api::storage::RegionId;
use crate::error::{InvalidRequestSnafu, RegionStateSnafu, RejectWriteSnafu, Result};
use crate::metrics;
use crate::metrics::{WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED};
use crate::metrics::{
WRITE_REJECT_TOTAL, WRITE_ROWS_TOTAL, WRITE_STAGE_ELAPSED, WRITE_STALL_TOTAL,
};
use crate::region::{RegionLeaderState, RegionRoleState};
use crate::region_write_ctx::RegionWriteCtx;
use crate::request::{SenderBulkRequest, SenderWriteRequest, WriteRequest};
@@ -57,8 +59,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
if self.write_buffer_manager.should_stall() && allow_stall {
self.stalled_count
.add((write_requests.len() + bulk_requests.len()) as i64);
let stalled_count = (write_requests.len() + bulk_requests.len()) as i64;
self.stalling_count.add(stalled_count);
WRITE_STALL_TOTAL.inc_by(stalled_count as u64);
self.stalled_requests.append(write_requests, bulk_requests);
self.listener.on_write_stall();
return;
@@ -161,7 +164,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_stalled_requests(&mut self) {
// Handle stalled requests.
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.stalled_count() as i64);
self.stalling_count.sub(stalled.stalled_count() as i64);
// We already stalled these requests, don't stall them again.
for (_, (_, mut requests, mut bulk)) in stalled.requests {
self.handle_write_requests(&mut requests, &mut bulk, false)
@@ -172,7 +175,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
/// Rejects all stalled requests.
pub(crate) fn reject_stalled_requests(&mut self) {
let stalled = std::mem::take(&mut self.stalled_requests);
self.stalled_count.sub(stalled.stalled_count() as i64);
self.stalling_count.sub(stalled.stalled_count() as i64);
for (_, (_, mut requests, mut bulk)) in stalled.requests {
reject_write_requests(&mut requests, &mut bulk);
}
@@ -182,7 +185,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) fn reject_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Rejects stalled requests for region {}", region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalled_count.sub((requests.len() + bulk.len()) as i64);
self.stalling_count
.sub((requests.len() + bulk.len()) as i64);
reject_write_requests(&mut requests, &mut bulk);
}
@@ -190,7 +194,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_region_stalled_requests(&mut self, region_id: &RegionId) {
debug!("Handles stalled requests for region {}", region_id);
let (mut requests, mut bulk) = self.stalled_requests.remove(region_id);
self.stalled_count.sub((requests.len() + bulk.len()) as i64);
self.stalling_count
.sub((requests.len() + bulk.len()) as i64);
self.handle_write_requests(&mut requests, &mut bulk, true)
.await;
}
@@ -251,7 +256,8 @@ impl<S> RegionWorkerLoop<S> {
"Region {} is altering, add request to pending writes",
region.region_id
);
self.stalled_count.add(1);
self.stalling_count.add(1);
WRITE_STALL_TOTAL.inc();
self.stalled_requests.push(sender_req);
continue;
}
@@ -353,7 +359,8 @@ impl<S> RegionWorkerLoop<S> {
"Region {} is altering, add request to pending writes",
region.region_id
);
self.stalled_count.add(1);
self.stalling_count.add(1);
WRITE_STALL_TOTAL.inc();
self.stalled_requests.push_bulk(bulk_req);
continue;
}