chore/bench-metrics: Add Metrics for Compaction and Flush Operations

• Introduced INFLIGHT_COMPACTION_COUNT and INFLIGHT_FLUSH_COUNT metrics to track the number of ongoing compaction and flush operations.
 • Incremented INFLIGHT_COMPACTION_COUNT when scheduling remote and local compaction jobs, and decremented it upon completion.
 • Added INFLIGHT_FLUSH_COUNT increment and decrement logic around flush tasks to monitor active flush operations.
 • Removed redundant metric updates in worker.rs and handle_compaction.rs to streamline metric handling.
This commit is contained in:
Lei, HUANG
2024-12-12 21:37:41 +08:00
parent 36c82121fb
commit f3e0a31e5d
5 changed files with 14 additions and 16 deletions

View File

@@ -53,7 +53,7 @@ use crate::error::{
RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
TimeoutSnafu,
};
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
@@ -271,11 +271,11 @@ impl CompactionScheduler {
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_id);
TimeToLive::default()
});
.await
.unwrap_or_else(|e| {
warn!(e; "Failed to get ttl for region: {}", region_id);
TimeToLive::default()
});
debug!(
"Pick compaction strategy {:?} for region: {}, ttl: {:?}",
@@ -340,6 +340,7 @@ impl CompactionScheduler {
"Scheduled remote compaction job {} for region {}",
job_id, region_id
);
INFLIGHT_COMPACTION_COUNT.inc();
return Ok(());
}
Err(e) => {
@@ -350,7 +351,7 @@ impl CompactionScheduler {
job_id: None,
reason: e.reason,
}
.fail();
.fail();
}
error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id);
@@ -384,7 +385,9 @@ impl CompactionScheduler {
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
INFLIGHT_COMPACTION_COUNT.inc();
local_compaction_task.run().await;
INFLIGHT_COMPACTION_COUNT.dec();
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);

View File

@@ -261,7 +261,9 @@ impl RegionFlushTask {
let version_data = version_control.current();
Box::pin(async move {
INFLIGHT_FLUSH_COUNT.inc();
self.do_flush(version_data).await;
INFLIGHT_FLUSH_COUNT.dec();
})
}
@@ -531,7 +533,6 @@ impl FlushScheduler {
return Err(e);
}
INFLIGHT_FLUSH_COUNT.inc();
flush_status.flushing = true;
Ok(())

View File

@@ -53,7 +53,7 @@ use crate::config::MitoConfig;
use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu};
use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::{INFLIGHT_COMPACTION_COUNT, REGION_COUNT, WRITE_STALL_TOTAL};
use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
@@ -1090,7 +1090,6 @@ impl WorkerListener {
}
pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) {
INFLIGHT_COMPACTION_COUNT.inc();
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_compaction_scheduled(_region_id);

View File

@@ -18,7 +18,7 @@ use store_api::region_request::RegionCompactRequest;
use store_api::storage::RegionId;
use crate::error::RegionNotFoundSnafu;
use crate::metrics::{COMPACTION_REQUEST_COUNT, INFLIGHT_COMPACTION_COUNT};
use crate::metrics::{COMPACTION_REQUEST_COUNT};
use crate::region::MitoRegionRef;
use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
@@ -87,14 +87,12 @@ impl<S> RegionWorkerLoop<S> {
self.schema_metadata_manager.clone(),
)
.await;
INFLIGHT_COMPACTION_COUNT.dec();
}
/// When compaction fails, we simply log the error.
pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
error!(req.err; "Failed to compact region: {}", req.region_id);
INFLIGHT_COMPACTION_COUNT.dec();
self.compaction_scheduler
.on_compaction_failed(req.region_id, req.err);
}

View File

@@ -24,7 +24,6 @@ use store_api::storage::RegionId;
use crate::config::MitoConfig;
use crate::error::{RegionNotFoundSnafu, Result};
use crate::flush::{FlushReason, RegionFlushTask};
use crate::metrics::INFLIGHT_FLUSH_COUNT;
use crate::region::MitoRegionRef;
use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx};
use crate::worker::RegionWorkerLoop;
@@ -60,7 +59,6 @@ impl<S> RegionWorkerLoop<S> {
/// On region flush job failed.
pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) {
INFLIGHT_FLUSH_COUNT.dec();
self.flush_scheduler.on_flush_failed(region_id, request.err);
}
@@ -193,7 +191,6 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: FlushFinished,
) {
INFLIGHT_FLUSH_COUNT.dec();
// Notifies other workers. Even the remaining steps of this method fail we still
// wake up other workers as we have released some memory by flush.
self.notify_group();