diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 62a6fe8012..f409822e0d 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -26,6 +26,8 @@ pub const FLUSH_REASON: &str = "reason"; pub const FILE_TYPE_LABEL: &str = "file_type"; /// Region worker id label. pub const WORKER_LABEL: &str = "worker"; +/// Partition label. +pub const PARTITION_LABEL: &str = "partition"; lazy_static! { /// Global write buffer size in bytes. @@ -135,6 +137,13 @@ lazy_static! { .unwrap(); pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]); pub static ref READ_STAGE_BUILD_PAGE_READER: Histogram = READ_STAGE_ELAPSED.with_label_values(&["build_page_reader"]); + /// In progress scan for each partition. + pub static ref SCAN_PARTITION: IntGaugeVec = register_int_gauge_vec!( + "greptime_mito_scan_partition", + "mito partitions scanning", + &[TYPE_LABEL, PARTITION_LABEL] + ) + .unwrap(); /// Counter of rows read from different source. pub static ref READ_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap(); diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 583796b9e1..b820943192 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -20,9 +20,11 @@ use std::time::{Duration, Instant}; use async_stream::try_stream; use common_telemetry::debug; use futures::Stream; +use prometheus::IntGauge; use store_api::storage::RegionId; use crate::error::Result; +use crate::metrics::SCAN_PARTITION; use crate::read::range::RowGroupIndex; use crate::read::scan_region::StreamContext; use crate::read::{Batch, ScannerMetrics, Source}; @@ -41,6 +43,7 @@ struct PartitionMetricsInner { first_poll: Duration, metrics: ScannerMetrics, reader_metrics: ReaderMetrics, + scan_partition_gauge: IntGauge, } impl PartitionMetricsInner { @@ -56,6 +59,7 @@ impl Drop for PartitionMetricsInner { fn drop(&mut self) { self.on_finish(); self.metrics.observe_metrics(); + self.scan_partition_gauge.dec(); debug!( "{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}", @@ -76,6 +80,10 @@ impl PartitionMetrics { query_start: Instant, metrics: ScannerMetrics, ) -> Self { + let partition_str = partition.to_string(); + let scan_partition_gauge = + SCAN_PARTITION.with_label_values(&[scanner_type, &partition_str]); + scan_partition_gauge.inc(); let inner = PartitionMetricsInner { region_id, partition, @@ -84,6 +92,7 @@ impl PartitionMetrics { first_poll: Duration::default(), metrics, reader_metrics: ReaderMetrics::default(), + scan_partition_gauge, }; Self(Arc::new(Mutex::new(inner))) }