diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 2b70f455d8..93ccf986ce 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -53,7 +53,7 @@ use crate::error::{ RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; -use crate::metrics::COMPACTION_STAGE_ELAPSED; +use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::projection::ProjectionMapper; use crate::read::scan_region::ScanInput; use crate::read::seq_scan::SeqScan; @@ -271,11 +271,11 @@ impl CompactionScheduler { current_version.options.ttl, &schema_metadata_manager, ) - .await - .unwrap_or_else(|e| { - warn!(e; "Failed to get ttl for region: {}", region_id); - TimeToLive::default() - }); + .await + .unwrap_or_else(|e| { + warn!(e; "Failed to get ttl for region: {}", region_id); + TimeToLive::default() + }); debug!( "Pick compaction strategy {:?} for region: {}, ttl: {:?}", @@ -340,6 +340,7 @@ impl CompactionScheduler { "Scheduled remote compaction job {} for region {}", job_id, region_id ); + INFLIGHT_COMPACTION_COUNT.inc(); return Ok(()); } Err(e) => { @@ -350,7 +351,7 @@ impl CompactionScheduler { job_id: None, reason: e.reason, } - .fail(); + .fail(); } error!(e; "Failed to schedule remote compaction job for region {}, fallback to local compaction", region_id); @@ -384,7 +385,9 @@ impl CompactionScheduler { // Submit the compaction task. self.scheduler .schedule(Box::pin(async move { + INFLIGHT_COMPACTION_COUNT.inc(); local_compaction_task.run().await; + INFLIGHT_COMPACTION_COUNT.dec(); })) .map_err(|e| { error!(e; "Failed to submit compaction request for region {}", region_id); diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index f0ec59e93f..81e9efd90a 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -261,7 +261,9 @@ impl RegionFlushTask { let version_data = version_control.current(); Box::pin(async move { + INFLIGHT_FLUSH_COUNT.inc(); self.do_flush(version_data).await; + INFLIGHT_FLUSH_COUNT.dec(); }) } @@ -531,7 +533,6 @@ impl FlushScheduler { return Err(e); } - INFLIGHT_FLUSH_COUNT.inc(); flush_status.flushing = true; Ok(()) diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 451de62d81..ded1735ddc 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -53,7 +53,7 @@ use crate::config::MitoConfig; use crate::error::{CreateDirSnafu, JoinSnafu, Result, WorkerStoppedSnafu}; use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef}; use crate::memtable::MemtableBuilderProvider; -use crate::metrics::{INFLIGHT_COMPACTION_COUNT, REGION_COUNT, WRITE_STALL_TOTAL}; +use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL}; use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -1090,7 +1090,6 @@ impl WorkerListener { } pub(crate) fn on_compaction_scheduled(&self, _region_id: RegionId) { - INFLIGHT_COMPACTION_COUNT.inc(); #[cfg(any(test, feature = "test"))] if let Some(listener) = &self.listener { listener.on_compaction_scheduled(_region_id); diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index cfb53c947c..c456407557 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -18,7 +18,7 @@ use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; use crate::error::RegionNotFoundSnafu; -use crate::metrics::{COMPACTION_REQUEST_COUNT, INFLIGHT_COMPACTION_COUNT}; +use crate::metrics::{COMPACTION_REQUEST_COUNT}; use crate::region::MitoRegionRef; use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -87,14 +87,12 @@ impl RegionWorkerLoop { self.schema_metadata_manager.clone(), ) .await; - INFLIGHT_COMPACTION_COUNT.dec(); } /// When compaction fails, we simply log the error. pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { error!(req.err; "Failed to compact region: {}", req.region_id); - INFLIGHT_COMPACTION_COUNT.dec(); self.compaction_scheduler .on_compaction_failed(req.region_id, req.err); } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index da518dd32d..b2bc5fd2e8 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -24,7 +24,6 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RegionNotFoundSnafu, Result}; use crate::flush::{FlushReason, RegionFlushTask}; -use crate::metrics::INFLIGHT_FLUSH_COUNT; use crate::region::MitoRegionRef; use crate::request::{FlushFailed, FlushFinished, OnFailure, OptionOutputTx}; use crate::worker::RegionWorkerLoop; @@ -60,7 +59,6 @@ impl RegionWorkerLoop { /// On region flush job failed. pub(crate) async fn handle_flush_failed(&mut self, region_id: RegionId, request: FlushFailed) { - INFLIGHT_FLUSH_COUNT.dec(); self.flush_scheduler.on_flush_failed(region_id, request.err); } @@ -193,7 +191,6 @@ impl RegionWorkerLoop { region_id: RegionId, mut request: FlushFinished, ) { - INFLIGHT_FLUSH_COUNT.dec(); // Notifies other workers. Even the remaining steps of this method fail we still // wake up other workers as we have released some memory by flush. self.notify_group();