diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 99600c9733..006b2a2bb6 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -41,7 +41,7 @@ use crate::error::{ }; use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; -use crate::read::scan_util::PartitionMetrics; +use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList}; use crate::read::seq_scan::{build_sources, SeqScan}; use crate::read::{Batch, ScannerMetrics}; @@ -63,6 +63,9 @@ pub struct SeriesScan { stream_ctx: Arc, /// Receivers of each partition. receivers: Mutex, + /// Metrics for each partition. + /// The scanner only sets in query and keeps it empty during compaction. + metrics_list: Arc, } impl SeriesScan { @@ -78,6 +81,7 @@ impl SeriesScan { properties, stream_ctx, receivers: Mutex::new(Vec::new()), + metrics_list: Arc::new(PartitionMetricsList::default()), } } @@ -96,9 +100,10 @@ impl SeriesScan { )); } - self.maybe_start_distributor(metrics_set); + self.maybe_start_distributor(metrics_set, &self.metrics_list); - let part_metrics = new_partition_metrics(&self.stream_ctx, metrics_set, partition); + let part_metrics = + new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list); let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?; let stream_ctx = self.stream_ctx.clone(); @@ -161,7 +166,11 @@ impl SeriesScan { } /// Starts the distributor if the receiver list is empty. - fn maybe_start_distributor(&self, metrics_set: &ExecutionPlanMetricsSet) { + fn maybe_start_distributor( + &self, + metrics_set: &ExecutionPlanMetricsSet, + metrics_list: &Arc, + ) { let mut rx_list = self.receivers.lock().unwrap(); if !rx_list.is_empty() { return; @@ -174,6 +183,7 @@ impl SeriesScan { partitions: self.properties.partitions.clone(), senders, metrics_set: metrics_set.clone(), + metrics_list: metrics_list.clone(), }; common_runtime::spawn_global(async move { distributor.execute().await; @@ -297,6 +307,7 @@ struct SeriesDistributor { /// get per-partition metrics in verbose mode to see the metrics of the /// distributor. metrics_set: ExecutionPlanMetricsSet, + metrics_list: Arc, } impl SeriesDistributor { @@ -309,8 +320,12 @@ impl SeriesDistributor { /// Scans all parts. async fn scan_partitions(&mut self) -> Result<()> { - let part_metrics = - new_partition_metrics(&self.stream_ctx, &self.metrics_set, self.partitions.len()); + let part_metrics = new_partition_metrics( + &self.stream_ctx, + &self.metrics_set, + self.partitions.len(), + &self.metrics_list, + ); part_metrics.on_first_poll(); let range_builder_list = Arc::new(RangeBuilderList::new( @@ -525,12 +540,16 @@ fn new_partition_metrics( stream_ctx: &StreamContext, metrics_set: &ExecutionPlanMetricsSet, partition: usize, + metrics_list: &PartitionMetricsList, ) -> PartitionMetrics { - PartitionMetrics::new( + let metrics = PartitionMetrics::new( stream_ctx.input.mapper.metadata().region_id, partition, "SeriesScan", stream_ctx.query_start, metrics_set, - ) + ); + + metrics_list.set(partition, metrics.clone()); + metrics }