diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index e7c1c7272e..5a5d76da4c 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -75,6 +75,12 @@ lazy_static! { /// Histogram of flushed bytes. pub static ref FLUSH_BYTES_TOTAL: IntCounter = register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap(); + /// Gauge for inflight compaction tasks. + pub static ref INFLIGHT_FLUSH_COUNT: IntGauge = + register_int_gauge!( + "greptime_mito_inflight_flush_count", + "inflight flush count", + ).unwrap(); // ------ End of flush related metrics @@ -124,6 +130,13 @@ lazy_static! { /// Counter of failed compaction task. pub static ref COMPACTION_FAILURE_COUNT: IntCounter = register_int_counter!("greptime_mito_compaction_failure_total", "mito compaction failure total").unwrap(); + + /// Gauge for inflight compaction tasks. + pub static ref INFLIGHT_COMPACTION_COUNT: IntGauge = + register_int_gauge!( + "greptime_mito_inflight_compaction_count", + "inflight compaction count", + ).unwrap(); // ------- End of compaction metrics. // Query metrics. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 33d26c8196..bf0944a9f0 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -53,7 +53,7 @@ use crate::config::MitoConfig; use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; -use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL}; +use crate::metrics::{INFLIGHT_COMPACTION_COUNT, REGION_COUNT, WRITE_STALL_TOTAL}; use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -1019,6 +1019,7 @@ impl WorkerListener { } pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) { + INFLIGHT_COMPACTION_COUNT.inc(); #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { listener.on_compaction_scheduled(_region_id); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 292eb23735..cfb53c947c 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -18,7 +18,7 @@ use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; use crate::error::RegionNotFoundSnafu; -use crate::metrics::COMPACTION_REQUEST_COUNT; +use crate::metrics::{COMPACTION_REQUEST_COUNT, INFLIGHT_COMPACTION_COUNT}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -87,12 +87,14 @@ impl RegionWorkerLoop { self.schema_metadata_manager.clone(), ) .await; + INFLIGHT_COMPACTION_COUNT.dec(); } /// When compaction fails, we simply log the error. pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { error!(req.err; "Failed to compact region: {}", req.region_id); + INFLIGHT_COMPACTION_COUNT.dec(); self.compaction_scheduler .on_compaction_failed(req.region_id, req.err); } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index b2bc5fd2e8..15822ba16c 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -24,6 +24,7 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RegionNotFoundSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; +use crate::metrics::INFLIGHT_FLUSH_COUNT; use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -54,11 +55,14 @@ impl RegionWorkerLoop { .schedule_flush(region.region_id, ®ion.version_control, task) { error!(e; "Failed to schedule flush task for region {}", region.region_id); + } else { + INFLIGHT_FLUSH_COUNT.inc(); } } /// On region flush job failed. pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) { + INFLIGHT_FLUSH_COUNT.dec(); self.flush_scheduler.on_flush_failed(region_id, request.err); } @@ -191,6 +195,7 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: FlushFinished, ) { + INFLIGHT_FLUSH_COUNT.dec(); // Notifies other workers. Even the remaining steps of this method fail we still // wake up other workers as we have released some memory by flush. self.notify_group();