From 6247de2d504be667be6677b2ad3f1e0fa54286c6 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 8 Nov 2024 23:57:18 +0800 Subject: [PATCH] chore: Revert "feat: prune in each partition" This reverts commit 3f9bf48161bcc26ace6787894f68c296a87760d7. --- src/mito2/src/read/scan_region.rs | 6 +- src/mito2/src/read/scan_util.rs | 142 +-------------------------- src/mito2/src/read/unordered_scan.rs | 9 +- 3 files changed, 6 insertions(+), 151 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d3d0de9409..1ef1e527a8 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -660,7 +660,7 @@ impl ScanInput { } /// Prunes a file to scan and returns the builder to build readers. - pub(crate) async fn prune_file( + async fn prune_file( &self, row_group_index: RowGroupIndex, file_index: usize, @@ -981,7 +981,7 @@ impl RangeBuilderList { /// Builder to create file ranges. #[derive(Default)] -pub(crate) struct FileRangeBuilder { +struct FileRangeBuilder { /// Context for the file. /// None indicates nothing to read. context: Option, @@ -993,7 +993,7 @@ pub(crate) struct FileRangeBuilder { impl FileRangeBuilder { /// Builds file ranges to read. /// Negative `row_group_index` indicates all row groups. - pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { + fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { let Some(context) = self.context.clone() else { return; }; diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 1814fccb25..385f6fa4b5 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -21,7 +21,6 @@ use async_stream::try_stream; use common_telemetry::debug; use futures::Stream; use prometheus::IntGauge; -use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use tokio::task::yield_now; @@ -29,10 +28,9 @@ use tokio::task::yield_now; use crate::error::{Result, TimeoutSnafu}; use crate::metrics::SCAN_PARTITION; use crate::read::range::RowGroupIndex; -use crate::read::scan_region::{FileRangeBuilder, ScanInput, StreamContext}; +use crate::read::scan_region::StreamContext; use crate::read::{Batch, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; -use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::ReaderMetrics; const BUILD_RANGES_TIMEOUT: Duration = Duration::from_secs(60 * 5); @@ -269,141 +267,3 @@ pub(crate) fn scan_file_ranges( part_metrics.merge_reader_metrics(&reader_metrics); } } - -#[derive(Default)] -pub(crate) struct RangeBuilder { - inner: Mutex)>>, -} - -impl RangeBuilder { - fn get_builder(&self, index: usize) -> Option> { - let inner = self.inner.lock().unwrap(); - let x = inner.as_ref()?; - if x.0 == index { - Some(x.1.clone()) - } else { - None - } - } - - fn set_builder(&self, index: usize, builder: Arc) { - let mut inner = self.inner.lock().unwrap(); - *inner = Some((index, builder)); - } - - pub(crate) async fn build_file_ranges( - &self, - input: &ScanInput, - index: RowGroupIndex, - reader_metrics: &mut ReaderMetrics, - ) -> Result> { - let mut ranges = SmallVec::new(); - let file_index = index.index - input.num_memtables(); - match self.get_builder(index.index) { - Some(builder) => { - builder.build_ranges(index.row_group_index, &mut ranges); - Ok(ranges) - } - None => { - // Init builder. - let builder = input.prune_file(index, file_index, reader_metrics).await?; - let builder = Arc::new(builder); - builder.build_ranges(index.row_group_index, &mut ranges); - self.set_builder(index.index, builder); - Ok(ranges) - } - } - } -} - -/// Scans file ranges at `index`. -pub(crate) fn scan_file_ranges_with_builder( - partition: usize, - stream_ctx: Arc, - part_metrics: PartitionMetrics, - index: RowGroupIndex, - read_type: &'static str, - range_builder: Arc, -) -> impl Stream> { - try_stream! { - let mut reader_metrics = ReaderMetrics::default(); - if read_type == "unordered_scan_files" { - common_telemetry::debug!( - "[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges start, region_id: {}, partition: {}, index: {:?}", - std::thread::current().id(), - stream_ctx.input.mapper.metadata().region_id, - partition, - index, - ); - } - // let ranges = tokio::time::timeout( - // BUILD_RANGES_TIMEOUT, - // stream_ctx.build_file_ranges(index, read_type, &mut reader_metrics), - // ) - // .await - // .with_context(|_| TimeoutSnafu { - // msg: format!( - // "build file ranges for {}, partition: {}", - // stream_ctx.input.mapper.metadata().region_id, - // partition, - // ), - // }) - // .inspect_err(|e| { - // common_telemetry::error!( - // e; "Thread: {:?}, Scan file ranges build ranges timeout, region_id: {}, partition: {}, index: {:?}", - // std::thread::current().id(), - // stream_ctx.input.mapper.metadata().region_id, - // partition, - // index, - // ); - // })??; - let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?; - // let ranges = stream_ctx - // .build_file_ranges(index, read_type, &mut reader_metrics) - // .await?; - part_metrics.inc_num_file_ranges(ranges.len()); - - if read_type == "unordered_scan_files" { - common_telemetry::debug!( - "[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges end, region_id: {}, partition: {}, index: {:?}, ranges: {}", - std::thread::current().id(), - stream_ctx.input.mapper.metadata().region_id, - partition, - index, - ranges.len(), - ); - } - for range in ranges { - let build_reader_start = Instant::now(); - let reader = range.reader(None).await?; - let build_cost = build_reader_start.elapsed(); - part_metrics.inc_build_reader_cost(build_cost); - if read_type == "unordered_scan_files" { - common_telemetry::debug!( - "[DEBUG_SCAN] Thread: {:?}, Scan file range, region_id: {}, partition: {}, file_id: {}, index: {:?}, build_cost: {:?}", - std::thread::current().id(), - stream_ctx.input.mapper.metadata().region_id, - partition, - range.file_handle().file_id(), - index, - build_cost - ); - } - let compat_batch = range.compat_batch(); - let mut source = Source::PruneReader(reader); - while let Some(mut batch) = source.next_batch().await? { - if let Some(compact_batch) = compat_batch { - batch = compact_batch.compat_batch(batch)?; - } - yield batch; - } - if let Source::PruneReader(mut reader) = source { - reader_metrics.merge_from(reader.metrics()); - } - } - - // Reports metrics. - reader_metrics.observe_rows(read_type); - part_metrics.merge_reader_metrics(&reader_metrics); - } -} diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index eea8cb6aaa..40d3fdfea8 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -31,9 +31,7 @@ use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties} use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::scan_region::{ScanInput, StreamContext}; -use crate::read::scan_util::{ - scan_file_ranges_with_builder, scan_mem_ranges, PartitionMetrics, RangeBuilder, -}; +use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; use crate::read::{Batch, ScannerMetrics}; /// Scans a region without providing any output ordering guarantee. @@ -87,7 +85,6 @@ impl UnorderedScan { stream_ctx: Arc, part_range_id: usize, part_metrics: PartitionMetrics, - range_builder: Arc, ) -> impl Stream> { stream! { // Gets range meta. @@ -99,7 +96,7 @@ impl UnorderedScan { yield batch; } } else { - let stream = scan_file_ranges_with_builder(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files", range_builder.clone()); + let stream = scan_file_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); for await batch in stream { yield batch; } @@ -150,7 +147,6 @@ impl UnorderedScan { let cache = stream_ctx.input.cache_manager.as_deref(); let ranges_len = part_ranges.len(); - let range_builder = Arc::new(RangeBuilder::default()); // Scans each part. for (part_idx, part_range) in part_ranges.into_iter().enumerate() { common_telemetry::debug!( @@ -173,7 +169,6 @@ impl UnorderedScan { stream_ctx.clone(), part_range.identifier, part_metrics.clone(), - range_builder.clone(), ); let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now();