chore: gauge for flush compaction (#5156)

* add metrics

* chore/bench-metrics: Add INFLIGHT_FLUSH_COUNT Metric to Flush Process

 • Introduced INFLIGHT_FLUSH_COUNT metric to track the number of ongoing flush operations.
 • Incremented INFLIGHT_FLUSH_COUNT in FlushScheduler to monitor active flushes.
 • Removed redundant increment of INFLIGHT_FLUSH_COUNT in RegionWorkerLoop to prevent double counting.

* chore/bench-metrics: Add Metrics for Compaction and Flush Operations

 • Introduced INFLIGHT_COMPACTION_COUNT and INFLIGHT_FLUSH_COUNT metrics to track the number of ongoing compaction and flush operations.
 • Incremented INFLIGHT_COMPACTION_COUNT when scheduling remote and local compaction jobs, and decremented it upon completion.
 • Added INFLIGHT_FLUSH_COUNT increment and decrement logic around flush tasks to monitor active flush operations.
 • Removed redundant metric updates in worker.rs and handle_compaction.rs to streamline metric handling.

* chore: add metrics for remote compaction jobs

* chore: format

* chore: also add dashbaord
This commit is contained in:
Lei, HUANG
2024-12-16 15:08:07 +08:00
committed by GitHub
parent f82af15eba
commit 5ffda7e971
5 changed files with 774 additions and 752 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -53,7 +53,7 @@ use crate::error::{
RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu,
TimeoutSnafu,
};
use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT};
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
@@ -340,6 +340,7 @@ impl CompactionScheduler {
"Scheduled remote compaction job {} for region {}",
job_id, region_id
);
INFLIGHT_COMPACTION_COUNT.inc();
return Ok(());
}
Err(e) => {
@@ -384,7 +385,9 @@ impl CompactionScheduler {
// Submit the compaction task.
self.scheduler
.schedule(Box::pin(async move {
INFLIGHT_COMPACTION_COUNT.inc();
local_compaction_task.run().await;
INFLIGHT_COMPACTION_COUNT.dec();
}))
.map_err(|e| {
error!(e; "Failed to submit compaction request for region {}", region_id);

View File

@@ -32,7 +32,10 @@ use crate::error::{
Error, FlushRegionSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result,
};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL};
use crate::metrics::{
FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL,
INFLIGHT_FLUSH_COUNT,
};
use crate::read::Source;
use crate::region::options::IndexOptions;
use crate::region::version::{VersionControlData, VersionControlRef};
@@ -261,7 +264,9 @@ impl RegionFlushTask {
let version_data = version_control.current();
Box::pin(async move {
INFLIGHT_FLUSH_COUNT.inc();
self.do_flush(version_data).await;
INFLIGHT_FLUSH_COUNT.dec();
})
}
@@ -530,6 +535,7 @@ impl FlushScheduler {
self.region_status.remove(&region_id);
return Err(e);
}
flush_status.flushing = true;
Ok(())

View File

@@ -75,6 +75,12 @@ lazy_static! {
/// Histogram of flushed bytes.
pub static ref FLUSH_BYTES_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
/// Gauge for inflight compaction tasks.
pub static ref INFLIGHT_FLUSH_COUNT: IntGauge =
register_int_gauge!(
"greptime_mito_inflight_flush_count",
"inflight flush count",
).unwrap();
// ------ End of flush related metrics
@@ -124,6 +130,13 @@ lazy_static! {
/// Counter of failed compaction task.
pub static ref COMPACTION_FAILURE_COUNT: IntCounter =
register_int_counter!("greptime_mito_compaction_failure_total", "mito compaction failure total").unwrap();
/// Gauge for inflight compaction tasks.
pub static ref INFLIGHT_COMPACTION_COUNT: IntGauge =
register_int_gauge!(
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
// ------- End of compaction metrics.
// Query metrics.

View File

@@ -27,7 +27,7 @@ use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::PickerOutput;
use crate::error::{CompactRegionSnafu, Error, ParseJobIdSnafu, Result};
use crate::manifest::action::RegionEdit;
use crate::metrics::COMPACTION_FAILURE_COUNT;
use crate::metrics::{COMPACTION_FAILURE_COUNT, INFLIGHT_COMPACTION_COUNT};
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
};
@@ -145,6 +145,7 @@ impl DefaultNotifier {
#[async_trait::async_trait]
impl Notifier for DefaultNotifier {
async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) {
INFLIGHT_COMPACTION_COUNT.dec();
match result {
RemoteJobResult::CompactionJobResult(result) => {
let notify = {