diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1ef1e527a8..d3d0de9409 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -660,7 +660,7 @@ impl ScanInput { } /// Prunes a file to scan and returns the builder to build readers. - async fn prune_file( + pub(crate) async fn prune_file( &self, row_group_index: RowGroupIndex, file_index: usize, @@ -981,7 +981,7 @@ impl RangeBuilderList { /// Builder to create file ranges. #[derive(Default)] -struct FileRangeBuilder { +pub(crate) struct FileRangeBuilder { /// Context for the file. /// None indicates nothing to read. context: Option, @@ -993,7 +993,7 @@ struct FileRangeBuilder { impl FileRangeBuilder { /// Builds file ranges to read. /// Negative `row_group_index` indicates all row groups. - fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { + pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { let Some(context) = self.context.clone() else { return; }; diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 385f6fa4b5..1814fccb25 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -21,6 +21,7 @@ use async_stream::try_stream; use common_telemetry::debug; use futures::Stream; use prometheus::IntGauge; +use smallvec::SmallVec; use snafu::ResultExt; use store_api::storage::RegionId; use tokio::task::yield_now; @@ -28,9 +29,10 @@ use tokio::task::yield_now; use crate::error::{Result, TimeoutSnafu}; use crate::metrics::SCAN_PARTITION; use crate::read::range::RowGroupIndex; -use crate::read::scan_region::StreamContext; +use crate::read::scan_region::{FileRangeBuilder, ScanInput, StreamContext}; use crate::read::{Batch, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; +use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::ReaderMetrics; const BUILD_RANGES_TIMEOUT: Duration = Duration::from_secs(60 * 5); @@ -267,3 +269,141 @@ pub(crate) fn scan_file_ranges( part_metrics.merge_reader_metrics(&reader_metrics); } } + +#[derive(Default)] +pub(crate) struct RangeBuilder { + inner: Mutex)>>, +} + +impl RangeBuilder { + fn get_builder(&self, index: usize) -> Option> { + let inner = self.inner.lock().unwrap(); + let x = inner.as_ref()?; + if x.0 == index { + Some(x.1.clone()) + } else { + None + } + } + + fn set_builder(&self, index: usize, builder: Arc) { + let mut inner = self.inner.lock().unwrap(); + *inner = Some((index, builder)); + } + + pub(crate) async fn build_file_ranges( + &self, + input: &ScanInput, + index: RowGroupIndex, + reader_metrics: &mut ReaderMetrics, + ) -> Result> { + let mut ranges = SmallVec::new(); + let file_index = index.index - input.num_memtables(); + match self.get_builder(index.index) { + Some(builder) => { + builder.build_ranges(index.row_group_index, &mut ranges); + Ok(ranges) + } + None => { + // Init builder. + let builder = input.prune_file(index, file_index, reader_metrics).await?; + let builder = Arc::new(builder); + builder.build_ranges(index.row_group_index, &mut ranges); + self.set_builder(index.index, builder); + Ok(ranges) + } + } + } +} + +/// Scans file ranges at `index`. +pub(crate) fn scan_file_ranges_with_builder( + partition: usize, + stream_ctx: Arc, + part_metrics: PartitionMetrics, + index: RowGroupIndex, + read_type: &'static str, + range_builder: Arc, +) -> impl Stream> { + try_stream! { + let mut reader_metrics = ReaderMetrics::default(); + if read_type == "unordered_scan_files" { + common_telemetry::debug!( + "[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges start, region_id: {}, partition: {}, index: {:?}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + partition, + index, + ); + } + // let ranges = tokio::time::timeout( + // BUILD_RANGES_TIMEOUT, + // stream_ctx.build_file_ranges(index, read_type, &mut reader_metrics), + // ) + // .await + // .with_context(|_| TimeoutSnafu { + // msg: format!( + // "build file ranges for {}, partition: {}", + // stream_ctx.input.mapper.metadata().region_id, + // partition, + // ), + // }) + // .inspect_err(|e| { + // common_telemetry::error!( + // e; "Thread: {:?}, Scan file ranges build ranges timeout, region_id: {}, partition: {}, index: {:?}", + // std::thread::current().id(), + // stream_ctx.input.mapper.metadata().region_id, + // partition, + // index, + // ); + // })??; + let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?; + // let ranges = stream_ctx + // .build_file_ranges(index, read_type, &mut reader_metrics) + // .await?; + part_metrics.inc_num_file_ranges(ranges.len()); + + if read_type == "unordered_scan_files" { + common_telemetry::debug!( + "[DEBUG_SCAN] Thread: {:?}, Scan file ranges build ranges end, region_id: {}, partition: {}, index: {:?}, ranges: {}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + partition, + index, + ranges.len(), + ); + } + for range in ranges { + let build_reader_start = Instant::now(); + let reader = range.reader(None).await?; + let build_cost = build_reader_start.elapsed(); + part_metrics.inc_build_reader_cost(build_cost); + if read_type == "unordered_scan_files" { + common_telemetry::debug!( + "[DEBUG_SCAN] Thread: {:?}, Scan file range, region_id: {}, partition: {}, file_id: {}, index: {:?}, build_cost: {:?}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + partition, + range.file_handle().file_id(), + index, + build_cost + ); + } + let compat_batch = range.compat_batch(); + let mut source = Source::PruneReader(reader); + while let Some(mut batch) = source.next_batch().await? { + if let Some(compact_batch) = compat_batch { + batch = compact_batch.compat_batch(batch)?; + } + yield batch; + } + if let Source::PruneReader(mut reader) = source { + reader_metrics.merge_from(reader.metrics()); + } + } + + // Reports metrics. + reader_metrics.observe_rows(read_type); + part_metrics.merge_reader_metrics(&reader_metrics); + } +} diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 40d3fdfea8..eea8cb6aaa 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -31,7 +31,9 @@ use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties} use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::scan_region::{ScanInput, StreamContext}; -use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; +use crate::read::scan_util::{ + scan_file_ranges_with_builder, scan_mem_ranges, PartitionMetrics, RangeBuilder, +}; use crate::read::{Batch, ScannerMetrics}; /// Scans a region without providing any output ordering guarantee. @@ -85,6 +87,7 @@ impl UnorderedScan { stream_ctx: Arc, part_range_id: usize, part_metrics: PartitionMetrics, + range_builder: Arc, ) -> impl Stream> { stream! { // Gets range meta. @@ -96,7 +99,7 @@ impl UnorderedScan { yield batch; } } else { - let stream = scan_file_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); + let stream = scan_file_ranges_with_builder(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files", range_builder.clone()); for await batch in stream { yield batch; } @@ -147,6 +150,7 @@ impl UnorderedScan { let cache = stream_ctx.input.cache_manager.as_deref(); let ranges_len = part_ranges.len(); + let range_builder = Arc::new(RangeBuilder::default()); // Scans each part. for (part_idx, part_range) in part_ranges.into_iter().enumerate() { common_telemetry::debug!( @@ -169,6 +173,7 @@ impl UnorderedScan { stream_ctx.clone(), part_range.identifier, part_metrics.clone(), + range_builder.clone(), ); let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now();