From 916e1c2d9e0dc815835cd6b756814ea7ec825d54 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 8 Apr 2025 20:51:55 +0800 Subject: [PATCH] fix: address compiler errors --- src/mito2/src/read/series_scan.rs | 36 ++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 98bb16d2eb..a5a89fbdd3 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -22,6 +22,7 @@ use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::compute::concat_batches; use datatypes::schema::SchemaRef; @@ -81,6 +82,7 @@ impl SeriesScan { fn scan_partition_impl( &self, + metrics_set: &ExecutionPlanMetricsSet, partition: usize, ) -> Result { if partition >= self.properties.num_partitions() { @@ -93,9 +95,9 @@ impl SeriesScan { )); } - self.maybe_start_distributor(); + self.maybe_start_distributor(metrics_set); - let part_metrics = new_partition_metrics(&self.stream_ctx, partition); + let part_metrics = new_partition_metrics(&self.stream_ctx, metrics_set, partition); let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?; let stream_ctx = self.stream_ctx.clone(); @@ -158,7 +160,7 @@ impl SeriesScan { } /// Starts the distributor if the receiver list is empty. - fn maybe_start_distributor(&self) { + fn maybe_start_distributor(&self, metrics_set: &ExecutionPlanMetricsSet) { let mut rx_list = self.receivers.lock().unwrap(); if !rx_list.is_empty() { return; @@ -170,6 +172,7 @@ impl SeriesScan { semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))), partitions: self.properties.partitions.clone(), senders, + metrics_set: metrics_set.clone(), }; common_runtime::spawn_global(async move { distributor.execute().await; @@ -182,8 +185,9 @@ impl SeriesScan { /// Scans the region and returns a stream. pub(crate) async fn build_stream(&self) -> Result { let part_num = self.properties.num_partitions(); + let metrics_set = ExecutionPlanMetricsSet::default(); let streams = (0..part_num) - .map(|i| self.scan_partition(i)) + .map(|i| self.scan_partition(&metrics_set, i)) .collect::, BoxedError>>()?; let stream = stream! { for mut stream in streams { @@ -223,8 +227,12 @@ impl RegionScanner for SeriesScan { self.stream_ctx.input.mapper.metadata().clone() } - fn scan_partition(&self, partition: usize) -> Result { - self.scan_partition_impl(partition) + fn scan_partition( + &self, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + self.scan_partition_impl(metrics_set, partition) } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { @@ -282,6 +290,8 @@ struct SeriesDistributor { partitions: Vec>, /// Senders of all partitions. senders: SenderList, + /// Metrics set to report. + metrics_set: ExecutionPlanMetricsSet, } impl SeriesDistributor { @@ -294,7 +304,8 @@ impl SeriesDistributor { /// Scans all parts. async fn scan_partitions(&mut self) -> Result<()> { - let part_metrics = new_partition_metrics(&self.stream_ctx, self.partitions.len()); + let part_metrics = + new_partition_metrics(&self.stream_ctx, &self.metrics_set, self.partitions.len()); part_metrics.on_first_poll(); let range_builder_list = Arc::new(RangeBuilderList::new( @@ -466,15 +477,16 @@ impl SenderList { } } -fn new_partition_metrics(stream_ctx: &StreamContext, partition: usize) -> PartitionMetrics { +fn new_partition_metrics( + stream_ctx: &StreamContext, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, +) -> PartitionMetrics { PartitionMetrics::new( stream_ctx.input.mapper.metadata().region_id, partition, "SeriesScan", stream_ctx.query_start, - ScannerMetrics { - prepare_scan_cost: stream_ctx.query_start.elapsed(), - ..Default::default() - }, + metrics_set, ) }