diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 424198439a..dfdb8a4aa9 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -656,7 +656,7 @@ mod tests { let file_metas: Vec<_> = data.version.ssts.levels()[0] .files .values() - .map(|file| file.meta()) + .map(|file| file.meta_ref().clone()) .collect(); // 5 files for next compaction and removes old files. diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 1869012f8d..a675ceafb3 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -110,7 +110,7 @@ impl CompactionTaskImpl { Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); for output in self.outputs.drain(..) { - compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); + compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone())); info!( "Compaction region {} output [{}]-> {}", @@ -229,7 +229,7 @@ impl CompactionTaskImpl { return Err(e); } }; - deleted.extend(self.expired_ssts.iter().map(FileHandle::meta)); + deleted.extend(self.expired_ssts.iter().map(|f| f.meta_ref().clone())); let merge_time = merge_timer.stop_and_record(); info!( "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 09bfe2535a..bd8d70a6ac 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -126,12 +126,11 @@ impl MitoEngine { &self, region_id: RegionId, request: ScanRequest, - ) -> std::result::Result { + ) -> Result { self.scanner(region_id, request) .map_err(BoxedError::new)? .scan() .await - .map_err(BoxedError::new) } /// Returns a scanner to scan for `request`. diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index a222eb886c..ff16e72fd7 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -50,6 +50,7 @@ use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, }; use crate::memtable::BoxedBatchIterator; +use crate::sst::parquet::reader::RowGroupReader; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -699,6 +700,8 @@ pub enum Source { Iter(BoxedBatchIterator), /// Source from a [BoxedBatchStream]. Stream(BoxedBatchStream), + /// Source from a [RowGroupReader]. + RowGroupReader(RowGroupReader), } impl Source { @@ -708,6 +711,7 @@ impl Source { Source::Reader(reader) => reader.next_batch().await, Source::Iter(iter) => iter.next().transpose(), Source::Stream(stream) => stream.try_next().await, + Source::RowGroupReader(reader) => reader.next_batch().await, } } } diff --git a/src/mito2/src/read/compat.rs b/src/mito2/src/read/compat.rs index c900600f33..11fb62e788 100644 --- a/src/mito2/src/read/compat.rs +++ b/src/mito2/src/read/compat.rs @@ -32,10 +32,8 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; pub struct CompatReader { /// Underlying reader. reader: R, - /// Optional primary key adapter. - compat_pk: Option, - /// Optional fields adapter. - compat_fields: Option, + /// Helper to compat batches. + compat: CompatBatch, } impl CompatReader { @@ -48,13 +46,9 @@ impl CompatReader { reader_meta: RegionMetadataRef, reader: R, ) -> Result> { - let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?; - let compat_fields = may_compat_fields(mapper, &reader_meta)?; - Ok(CompatReader { reader, - compat_pk, - compat_fields, + compat: CompatBatch::new(mapper, reader_meta)?, }) } } @@ -66,6 +60,36 @@ impl BatchReader for CompatReader { return Ok(None); }; + batch = self.compat.compat_batch(batch)?; + + Ok(Some(batch)) + } +} + +/// A helper struct to adapt schema of the batch to an expected schema. +pub(crate) struct CompatBatch { + /// Optional primary key adapter. + compat_pk: Option, + /// Optional fields adapter. + compat_fields: Option, +} + +impl CompatBatch { + /// Creates a new [CompatBatch]. + /// - `mapper` is built from the metadata users expect to see. + /// - `reader_meta` is the metadata of the input reader. + pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result { + let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?; + let compat_fields = may_compat_fields(mapper, &reader_meta)?; + + Ok(Self { + compat_pk, + compat_fields, + }) + } + + /// Adapts the `batch` to the expected schema. + pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result { if let Some(compat_pk) = &self.compat_pk { batch = compat_pk.compat(batch)?; } @@ -73,7 +97,7 @@ impl BatchReader for CompatReader { batch = compat_fields.compat(batch); } - Ok(Some(batch)) + Ok(batch) } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 08bc5bc30d..2e6325e1b7 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,9 +14,11 @@ //! Scans a region according to the scan request. +use std::fmt; use std::sync::Arc; use std::time::Instant; +use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; @@ -32,15 +34,16 @@ use crate::cache::CacheManagerRef; use crate::error::Result; use crate::memtable::MemtableRef; use crate::metrics::READ_SST_COUNT; -use crate::read::compat::CompatReader; +use crate::read::compat::{CompatBatch, CompatReader}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::read::unordered_scan::UnorderedScan; use crate::read::{compat, Batch, Source}; use crate::region::version::VersionRef; -use crate::sst::file::FileHandle; +use crate::sst::file::{FileHandle, FileMeta}; use crate::sst::index::applier::builder::SstIndexApplierBuilder; use crate::sst::index::applier::SstIndexApplierRef; +use crate::sst::parquet::file_range::FileRange; /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { @@ -51,20 +54,24 @@ pub(crate) enum Scanner { } impl Scanner { - /// Returns a [SendableRecordBatchStream] to retrieve scan results. - pub(crate) async fn scan(&self) -> Result { + /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions. + pub(crate) async fn scan(&self) -> Result { match self { - Scanner::Seq(seq_scan) => seq_scan.build_stream().await, + Scanner::Seq(seq_scan) => seq_scan.build_stream().await.map_err(BoxedError::new), Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await, } } /// Returns a [RegionScanner] to scan the region. - pub(crate) async fn region_scanner(&self) -> Result { - let stream = self.scan().await?; - let scanner = SinglePartitionScanner::new(stream); - - Ok(Arc::new(scanner)) + pub(crate) async fn region_scanner(self) -> Result { + match self { + Scanner::Seq(seq_scan) => { + let stream = seq_scan.build_stream().await?; + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + Ok(scanner) + } + Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)), + } } } @@ -222,9 +229,7 @@ impl ScanRegion { /// Unordered scan. pub(crate) fn unordered_scan(self) -> Result { let input = self.scan_input(true)?; - let scan = UnorderedScan::new(input); - - Ok(scan) + Ok(UnorderedScan::new(input)) } #[cfg(test)] @@ -386,7 +391,7 @@ pub(crate) struct ScanInput { /// Time range filter for time index. time_range: Option, /// Predicate to push down. - predicate: Option, + pub(crate) predicate: Option, /// Memtables to scan. pub(crate) memtables: Vec, /// Handles to SST files to scan. @@ -498,7 +503,6 @@ impl ScanInput { } /// Sets whether to remove deletion markers during scan. - #[allow(unused)] #[must_use] pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self { self.filter_deleted = filter_deleted; @@ -572,6 +576,61 @@ impl ScanInput { Ok(sources) } + /// Prunes file ranges to scan and adds them tothe `collector`. + pub(crate) async fn prune_file_ranges( + &self, + collector: &mut impl FileRangeCollector, + ) -> Result<()> { + for file in &self.files { + let res = self + .access_layer + .read_sst(file.clone()) + .predicate(self.predicate.clone()) + .time_range(self.time_range) + .projection(Some(self.mapper.column_ids().to_vec())) + .cache(self.cache_manager.clone()) + .index_applier(self.index_applier.clone()) + .expected_metadata(Some(self.mapper.metadata().clone())) + .build_reader_input() + .await; + let (mut file_range_ctx, row_groups) = match res { + Ok(x) => x, + Err(e) => { + if e.is_object_not_found() && self.ignore_file_not_found { + error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id()); + continue; + } else { + return Err(e); + } + } + }; + if !compat::has_same_columns( + self.mapper.metadata(), + file_range_ctx.read_format().metadata(), + ) { + // They have different schema. We need to adapt the batch first so the + // mapper can convert it. + let compat = CompatBatch::new( + &self.mapper, + file_range_ctx.read_format().metadata().clone(), + )?; + file_range_ctx.set_compat_batch(Some(compat)); + } + // Build ranges from row groups. + let file_range_ctx = Arc::new(file_range_ctx); + let file_ranges = row_groups + .into_iter() + .map(|(row_group_idx, row_selection)| { + FileRange::new(file_range_ctx.clone(), row_group_idx, row_selection) + }); + collector.append_file_ranges(file.meta_ref(), file_ranges); + } + + READ_SST_COUNT.observe(self.files.len() as f64); + + Ok(()) + } + /// Scans the input source in another task and sends batches to the sender. pub(crate) fn spawn_scan_task( &self, @@ -620,3 +679,35 @@ impl ScanInput { self.files.iter().map(|file| file.file_id()).collect() } } + +/// A partition of a scanner to read. +/// It contains memtables and file ranges to scan. +#[derive(Default)] +pub(crate) struct ScanPart { + /// Memtables to scan. + /// We scan the whole memtable now. We might scan a range of the memtable in the future. + pub(crate) memtables: Vec, + /// File ranges to scan. + pub(crate) file_ranges: Vec, +} + +impl fmt::Debug for ScanPart { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "ScanPart({} memtables, {} file ranges)", + self.memtables.len(), + self.file_ranges.len() + ) + } +} + +/// A trait to collect file ranges to scan. +pub(crate) trait FileRangeCollector { + /// Appends file ranges from the **same file** to the collector. + fn append_file_ranges( + &mut self, + file_meta: &FileMeta, + file_ranges: impl Iterator, + ); +} diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 5042764ede..48f682969d 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -14,148 +14,101 @@ //! Unordered scanner. +use std::fmt; use std::sync::Arc; use std::time::{Duration, Instant}; -use async_stream::try_stream; +use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::debug; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::schema::SchemaRef; +use futures::StreamExt; use snafu::ResultExt; -use tokio::sync::{mpsc, Semaphore}; -use tokio_stream::wrappers::ReceiverStream; +use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties}; +use tokio::sync::Mutex; use crate::cache::CacheManager; use crate::error::Result; +use crate::memtable::MemtableRef; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; +use crate::read::compat::CompatBatch; use crate::read::projection::ProjectionMapper; -use crate::read::scan_region::ScanInput; +use crate::read::scan_region::{FileRangeCollector, ScanInput, ScanPart}; use crate::read::Source; +use crate::sst::file::FileMeta; +use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::reader::ReaderMetrics; /// Scans a region without providing any output ordering guarantee. /// /// Only an append only table should use this scanner. pub struct UnorderedScan { - input: ScanInput, + /// Properties of the scanner. + properties: ScannerProperties, + /// Context of streams. + stream_ctx: Arc, } impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { - Self { input } + let query_start = input.query_start.unwrap_or_else(Instant::now); + let prepare_scan_cost = query_start.elapsed(); + let properties = + ScannerProperties::new(ScannerPartitioning::Unknown(input.parallelism.parallelism)); + + // Observes metrics. + READ_STAGE_ELAPSED + .with_label_values(&["prepare_scan"]) + .observe(prepare_scan_cost.as_secs_f64()); + + let stream_ctx = Arc::new(StreamContext { + input, + parts: Mutex::new(ScanPartList::default()), + query_start, + prepare_scan_cost, + }); + + Self { + properties, + stream_ctx, + } } /// Scans the region and returns a stream. - pub async fn build_stream(&self) -> Result { - let enable_parallel = self.enable_parallel_scan(); - if enable_parallel { - self.scan_in_parallel().await - } else { - self.scan_sources().await - } - } - - /// Scans all sources one by one. - async fn scan_sources(&self) -> Result { - let mut metrics = Metrics::default(); - let build_start = Instant::now(); - let query_start = self.input.query_start.unwrap_or(build_start); - metrics.prepare_scan_cost = query_start.elapsed(); - - // Scans all memtables and SSTs. - let sources = self.input.build_sources().await?; - metrics.build_source_cost = build_start.elapsed(); - Self::observe_metrics_on_start(&metrics); - - let mapper = self.input.mapper.clone(); - let cache_manager = self.input.cache_manager.clone(); - let stream = try_stream! { - for mut source in sources { - let cache = cache_manager.as_deref(); - while let Some(batch) = Self::fetch_from_source(&mut source, &mapper, cache, &mut metrics).await? { - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - yield batch; + pub(crate) async fn build_stream(&self) -> Result { + let part_num = self.properties.partitioning().num_partitions(); + let streams = (0..part_num) + .map(|i| self.scan_partition(i)) + .collect::, BoxedError>>()?; + let stream = stream! { + for mut stream in streams { + while let Some(rb) = stream.next().await { + yield rb; } } - - metrics.total_cost = query_start.elapsed(); - Self::observe_metrics_on_finish(&metrics); - debug!("Unordered scan finished, region_id: {}, metrics: {:?}", mapper.metadata().region_id, metrics); }; let stream = Box::pin(RecordBatchStreamWrapper::new( - self.input.mapper.output_schema(), + self.schema(), Box::pin(stream), )); - Ok(stream) } - /// Scans all sources in parallel. - async fn scan_in_parallel(&self) -> Result { - debug_assert!(self.input.parallelism.allow_parallel_scan()); - - let mut metrics = Metrics::default(); - let build_start = Instant::now(); - let query_start = self.input.query_start.unwrap_or(build_start); - metrics.prepare_scan_cost = query_start.elapsed(); - - // Scans all memtables and SSTs. - let sources = self.input.build_sources().await?; - metrics.build_source_cost = build_start.elapsed(); - Self::observe_metrics_on_start(&metrics); - - let (sender, receiver) = mpsc::channel(self.input.parallelism.channel_size); - let semaphore = Arc::new(Semaphore::new(self.input.parallelism.parallelism)); - // Spawn a task for each source. - for source in sources { - self.input - .spawn_scan_task(source, semaphore.clone(), sender.clone()); - } - let stream = Box::pin(ReceiverStream::new(receiver)); - - let mapper = self.input.mapper.clone(); - let cache_manager = self.input.cache_manager.clone(); - // For simplicity, we wrap the receiver into a stream to reuse code. We can use the channel directly if it - // becomes a bottleneck. - let mut source = Source::Stream(stream); - let stream = try_stream! { - let cache = cache_manager.as_deref(); - while let Some(batch) = Self::fetch_from_source(&mut source, &mapper, cache, &mut metrics).await? { - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - yield batch; - } - - metrics.total_cost = query_start.elapsed(); - Self::observe_metrics_on_finish(&metrics); - debug!("Unordered scan in parallel finished, region_id: {}, metrics: {:?}", mapper.metadata().region_id, metrics); - }; - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.input.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) - } - - /// Returns whether to scan in parallel. - fn enable_parallel_scan(&self) -> bool { - self.input.parallelism.allow_parallel_scan() - && (self.input.files.len() + self.input.memtables.len()) > 1 - } - /// Fetch a batch from the source and convert it into a record batch. async fn fetch_from_source( source: &mut Source, mapper: &ProjectionMapper, cache: Option<&CacheManager>, + compat_batch: Option<&CompatBatch>, metrics: &mut Metrics, ) -> common_recordbatch::error::Result> { let start = Instant::now(); - let Some(batch) = source + let Some(mut batch) = source .next_batch() .await .map_err(BoxedError::new) @@ -166,6 +119,13 @@ impl UnorderedScan { return Ok(None); }; + if let Some(compat) = compat_batch { + batch = compat + .compat_batch(batch) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + } + let convert_start = Instant::now(); let record_batch = mapper.convert(&batch, cache)?; metrics.convert_cost += convert_start.elapsed(); @@ -174,15 +134,6 @@ impl UnorderedScan { Ok(Some(record_batch)) } - fn observe_metrics_on_start(metrics: &Metrics) { - READ_STAGE_ELAPSED - .with_label_values(&["prepare_scan"]) - .observe(metrics.prepare_scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["build_source"]) - .observe(metrics.build_source_cost.as_secs_f64()); - } - fn observe_metrics_on_finish(metrics: &Metrics) { READ_STAGE_ELAPSED .with_label_values(&["convert_rb"]) @@ -198,21 +149,168 @@ impl UnorderedScan { } } +impl RegionScanner for UnorderedScan { + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> SchemaRef { + self.stream_ctx.input.mapper.output_schema() + } + + fn scan_partition(&self, partition: usize) -> Result { + let mut metrics = Metrics { + prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + ..Default::default() + }; + let stream_ctx = self.stream_ctx.clone(); + let stream = try_stream! { + let mut parts = stream_ctx.parts.lock().await; + parts + .maybe_init_parts(&stream_ctx.input, &mut metrics) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let Some(part) = parts.get_part(partition) else { + return; + }; + + let mapper = &stream_ctx.input.mapper; + let memtable_sources = part + .memtables + .iter() + .map(|mem| { + let iter = mem.iter( + Some(mapper.column_ids()), + stream_ctx.input.predicate.clone(), + )?; + Ok(Source::Iter(iter)) + }) + .collect::>>() + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let query_start = stream_ctx.query_start; + let cache = stream_ctx.input.cache_manager.as_deref(); + // Scans memtables first. + for mut source in memtable_sources { + while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, None, &mut metrics).await? { + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + yield batch; + } + } + // Then scans file ranges. + let mut reader_metrics = ReaderMetrics::default(); + for file_range in &part.file_ranges { + let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?; + let compat_batch = file_range.compat_batch(); + let mut source = Source::RowGroupReader(reader); + while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, compat_batch, &mut metrics).await? { + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + yield batch; + } + if let Source::RowGroupReader(reader) = source { + reader_metrics.merge_from(reader.metrics()); + } + } + + metrics.total_cost = query_start.elapsed(); + Self::observe_metrics_on_finish(&metrics); + debug!( + "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}", + partition, mapper.metadata().region_id, metrics, reader_metrics + ); + }; + let stream = Box::pin(RecordBatchStreamWrapper::new( + self.stream_ctx.input.mapper.output_schema(), + Box::pin(stream), + )); + + Ok(stream) + } +} + +impl DisplayAs for UnorderedScan { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "UnorderedScan: [{:?}]", self.stream_ctx.parts) + } +} + +impl fmt::Debug for UnorderedScan { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("UnorderedScan") + .field("parts", &self.stream_ctx.parts) + .field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost) + .finish() + } +} + #[cfg(test)] impl UnorderedScan { /// Returns the input. pub(crate) fn input(&self) -> &ScanInput { - &self.input + &self.stream_ctx.input } } +/// List of [ScanPart]s. +#[derive(Debug, Default)] +struct ScanPartList(Option>); + +impl ScanPartList { + /// Initializes parts if they are not built yet. + async fn maybe_init_parts(&mut self, input: &ScanInput, metrics: &mut Metrics) -> Result<()> { + if self.0.is_none() { + let now = Instant::now(); + let mut distributor = UnorderedDistributor::default(); + input.prune_file_ranges(&mut distributor).await?; + self.0 = Some(distributor.build_parts(&input.memtables, input.parallelism.parallelism)); + + metrics.build_parts_cost = now.elapsed(); + READ_STAGE_ELAPSED + .with_label_values(&["build_parts"]) + .observe(metrics.build_parts_cost.as_secs_f64()); + } + Ok(()) + } + + /// Gets the part by index, returns None if the index is out of bound. + /// # Panics + /// Panics if parts are not initialized. + fn get_part(&mut self, index: usize) -> Option<&ScanPart> { + let parts = self.0.as_ref().unwrap(); + parts.get(index) + } +} + +/// Context shared by different streams. +/// It contains the input and distributes input to multiple parts +/// to scan. +struct StreamContext { + /// Input memtables and files. + input: ScanInput, + /// Parts to scan. + /// The scanner builds parts to scan from the input lazily. + /// The mutex is used to ensure the parts are only built once. + parts: Mutex, + + // Metrics: + /// The start time of the query. + query_start: Instant, + /// Time elapsed before creating the scanner. + prepare_scan_cost: Duration, +} + /// Metrics for [UnorderedScan]. +// We print all fields in logs so we disable the dead_code lint. +#[allow(dead_code)] #[derive(Debug, Default)] struct Metrics { /// Duration to prepare the scan task. prepare_scan_cost: Duration, - /// Duration to build sources. - build_source_cost: Duration, + /// Duration to build parts. + build_parts_cost: Duration, /// Duration to scan data. scan_cost: Duration, /// Duration to convert batches. @@ -224,3 +322,66 @@ struct Metrics { /// Number of rows returned. num_rows: usize, } + +/// Builds [ScanPart]s without preserving order. It distributes file ranges and memtables +/// across partitions. Each partition scans a subset of memtables and file ranges. There +/// is no output ordering guarantee of each partition. +#[derive(Default)] +struct UnorderedDistributor { + file_ranges: Vec, +} + +impl FileRangeCollector for UnorderedDistributor { + fn append_file_ranges( + &mut self, + _file_meta: &FileMeta, + file_ranges: impl Iterator, + ) { + self.file_ranges.extend(file_ranges); + } +} + +impl UnorderedDistributor { + /// Distributes file ranges and memtables across partitions according to the `parallelism`. + /// The output number of parts may be `<= parallelism`. + fn build_parts(self, memtables: &[MemtableRef], parallelism: usize) -> Vec { + if parallelism <= 1 { + // Returns a single part. + let part = ScanPart { + memtables: memtables.to_vec(), + file_ranges: self.file_ranges, + }; + return vec![part]; + } + + let mems_per_part = ((memtables.len() + parallelism - 1) / parallelism).max(1); + let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1); + common_telemetry::debug!( + "Parallel scan is enabled, parallelism: {}, {} memtables, {} file_ranges, mems_per_part: {}, ranges_per_part: {}", + parallelism, + memtables.len(), + self.file_ranges.len(), + mems_per_part, + ranges_per_part + ); + let mut scan_parts = memtables + .chunks(mems_per_part) + .map(|mems| ScanPart { + memtables: mems.to_vec(), + file_ranges: Vec::new(), + }) + .collect::>(); + for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() { + if i == scan_parts.len() { + scan_parts.push(ScanPart { + memtables: Vec::new(), + file_ranges: ranges.to_vec(), + }); + } else { + scan_parts[i].file_ranges = ranges.to_vec(); + } + } + + scan_parts + } +} diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index f86faa81ef..f3587dd24a 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -175,8 +175,9 @@ impl FileHandle { self.inner.compacting.store(compacting, Ordering::Relaxed); } - pub fn meta(&self) -> FileMeta { - self.inner.meta.clone() + /// Returns a reference to the [FileMeta]. + pub fn meta_ref(&self) -> &FileMeta { + &self.inner.meta } } diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index f385ea992e..5a3b074ded 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -25,6 +25,7 @@ use parquet::arrow::arrow_reader::RowSelection; use snafu::ResultExt; use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; +use crate::read::compat::CompatBatch; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; use crate::sst::parquet::format::ReadFormat; @@ -32,6 +33,7 @@ use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleF /// A range of a parquet SST. Now it is a row group. /// We can read different file ranges in parallel. +#[derive(Clone)] pub struct FileRange { /// Shared context. context: FileRangeContextRef, @@ -56,7 +58,6 @@ impl FileRange { } /// Returns a reader to read the [FileRange]. - #[allow(dead_code)] pub(crate) async fn reader(&self) -> Result { let parquet_reader = self .context @@ -66,6 +67,11 @@ impl FileRange { Ok(RowGroupReader::new(self.context.clone(), parquet_reader)) } + + /// Returns the helper to compat batches. + pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { + self.context.compat_batch() + } } /// Context shared by ranges of the same parquet SST. @@ -78,6 +84,8 @@ pub(crate) struct FileRangeContext { read_format: ReadFormat, /// Decoder for primary keys codec: McmpRowCodec, + /// Optional helper to compat batches. + compat_batch: Option, } pub(crate) type FileRangeContextRef = Arc; @@ -95,6 +103,7 @@ impl FileRangeContext { filters, read_format, codec, + compat_batch: None, } } @@ -118,6 +127,16 @@ impl FileRangeContext { &self.reader_builder } + /// Returns the helper to compat batches. + pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { + self.compat_batch.as_ref() + } + + /// Sets the `CompatBatch` to the context. + pub(crate) fn set_compat_batch(&mut self, compat: Option) { + self.compat_batch = compat; + } + /// TRY THE BEST to perform pushed down predicate precisely on the input batch. /// Return the filtered batch. If the entire batch is filtered out, return None. /// diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index fb41923677..59f68bf1b2 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -53,7 +53,7 @@ use crate::read::{Batch, BatchReader}; use crate::row_converter::{McmpRowCodec, SortField}; use crate::sst::file::FileHandle; use crate::sst::index::applier::SstIndexApplierRef; -use crate::sst::parquet::file_range::{FileRange, FileRangeContext, FileRangeContextRef}; +use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; @@ -155,29 +155,17 @@ impl ParquetReaderBuilder { /// This needs to perform IO operation. pub async fn build(&self) -> Result { let (context, row_groups) = self.build_reader_input().await?; - ParquetReader::new(context, row_groups).await - } - - /// Builds [FileRange]s to read and pushes them to `file_ranges`. - #[allow(dead_code)] - pub async fn build_file_ranges(&self, file_ranges: &mut Vec) -> Result<()> { - let (context, row_groups) = self.build_reader_input().await?; - file_ranges.reserve_exact(row_groups.len()); - for (row_group_idx, row_selection) in row_groups { - let file_range = FileRange::new(context.clone(), row_group_idx, row_selection); - file_ranges.push(file_range); - } - Ok(()) + ParquetReader::new(Arc::new(context), row_groups).await } /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. - async fn build_reader_input(&self) -> Result<(FileRangeContextRef, RowGroupMap)> { + pub(crate) async fn build_reader_input(&self) -> Result<(FileRangeContext, RowGroupMap)> { let start = Instant::now(); let file_path = self.file_handle.file_path(&self.file_dir); - let file_size = self.file_handle.meta().file_size; + let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. @@ -211,7 +199,7 @@ impl ParquetReaderBuilder { parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint) .context(ReadParquetSnafu { path: &file_path })?; - let mut metrics = Metrics::default(); + let mut metrics = ReaderMetrics::default(); let row_groups = self .row_groups_to_read(&read_format, &parquet_meta, &mut metrics) @@ -258,7 +246,7 @@ impl ParquetReaderBuilder { ); let context = FileRangeContext::new(reader_builder, filters, read_format, codec); - Ok((Arc::new(context), row_groups)) + Ok((context, row_groups)) } /// Decodes region metadata from key value. @@ -324,7 +312,7 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - metrics: &mut Metrics, + metrics: &mut ReaderMetrics, ) -> BTreeMap> { let num_row_groups = parquet_meta.num_row_groups(); if num_row_groups == 0 { @@ -346,13 +334,13 @@ impl ParquetReaderBuilder { async fn prune_row_groups_by_inverted_index( &self, parquet_meta: &ParquetMetaData, - metrics: &mut Metrics, + metrics: &mut ReaderMetrics, ) -> Option>> { let Some(index_applier) = &self.index_applier else { return None; }; - if !self.file_handle.meta().inverted_index_available() { + if !self.file_handle.meta_ref().inverted_index_available() { return None; } @@ -428,7 +416,7 @@ impl ParquetReaderBuilder { &self, read_format: &ReadFormat, parquet_meta: &ParquetMetaData, - metrics: &mut Metrics, + metrics: &mut ReaderMetrics, ) -> Option>> { let Some(predicate) = &self.predicate else { return None; @@ -513,7 +501,7 @@ fn time_range_to_predicate( /// Parquet reader metrics. #[derive(Debug, Default)] -struct Metrics { +pub(crate) struct ReaderMetrics { /// Number of row groups before filtering. num_row_groups_before_filtering: usize, /// Number of row groups filtered by inverted index. @@ -538,6 +526,24 @@ struct Metrics { num_rows: usize, } +impl ReaderMetrics { + /// Adds `other` metrics to this metrics. + pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) { + self.num_row_groups_before_filtering += other.num_row_groups_before_filtering; + self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered; + self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered; + self.num_rows_precise_filtered += other.num_rows_precise_filtered; + self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering; + self.num_rows_in_row_group_inverted_index_filtered += + other.num_rows_in_row_group_inverted_index_filtered; + self.build_cost += other.build_cost; + self.scan_cost += other.scan_cost; + self.num_record_batches += other.num_record_batches; + self.num_batches += other.num_batches; + self.num_rows += other.num_rows; + } +} + /// Builder to build a [ParquetRecordBatchReader] for a row group. pub(crate) struct RowGroupReaderBuilder { /// SST file to read. @@ -606,12 +612,12 @@ enum ReaderState { /// The reader is reading a row group. Readable(RowGroupReader), /// The reader is exhausted. - Exhausted(Metrics), + Exhausted(ReaderMetrics), } impl ReaderState { /// Returns the metrics of the reader. - fn metrics(&self) -> &Metrics { + fn metrics(&self) -> &ReaderMetrics { match self { ReaderState::Readable(reader) => &reader.metrics, ReaderState::Exhausted(m) => m, @@ -807,7 +813,7 @@ impl ParquetReader { .await?; ReaderState::Readable(RowGroupReader::new(context.clone(), parquet_reader)) } else { - ReaderState::Exhausted(Metrics::default()) + ReaderState::Exhausted(ReaderMetrics::default()) }; Ok(ParquetReader { @@ -829,7 +835,7 @@ impl ParquetReader { } /// Reader to read a row group of a parquet file. -pub(crate) struct RowGroupReader { +pub struct RowGroupReader { /// Context for file ranges. context: FileRangeContextRef, /// Inner parquet reader. @@ -837,7 +843,7 @@ pub(crate) struct RowGroupReader { /// Buffered batches to return. batches: VecDeque, /// Local scan metrics. - metrics: Metrics, + metrics: ReaderMetrics, } impl RowGroupReader { @@ -847,17 +853,22 @@ impl RowGroupReader { context, reader, batches: VecDeque::new(), - metrics: Metrics::default(), + metrics: ReaderMetrics::default(), } } + /// Gets the metrics. + pub(crate) fn metrics(&self) -> &ReaderMetrics { + &self.metrics + } + /// Resets the parquet reader. fn reset_reader(&mut self, reader: ParquetRecordBatchReader) { self.reader = reader; } /// Tries to fetch next [Batch] from the reader. - async fn next_batch(&mut self) -> Result> { + pub(crate) async fn next_batch(&mut self) -> Result> { if let Some(batch) = self.batches.pop_front() { self.metrics.num_rows += batch.num_rows(); return Ok(Some(batch)); diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs index dc388c742d..07d6bee9d9 100644 --- a/src/mito2/src/sst/version.rs +++ b/src/mito2/src/sst/version.rs @@ -93,7 +93,7 @@ impl SstVersion { .files .values() .map(|file_handle| { - let meta = file_handle.meta(); + let meta = file_handle.meta_ref(); meta.file_size + meta.index_file_size }) .sum::() diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index f4b5aec37e..91813c9129 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -170,6 +170,8 @@ pub trait RegionScanner: Debug + DisplayAs + Send + Sync { fn schema(&self) -> SchemaRef; /// Scans the partition and returns a stream of record batches. + /// # Panics + /// Panics if the `partition` is out of bound. fn scan_partition(&self, partition: usize) -> Result; } diff --git a/tests/cases/standalone/common/insert/append_mode.result b/tests/cases/standalone/common/insert/append_mode.result index 4fe1166ae6..1e715ec004 100644 --- a/tests/cases/standalone/common/insert/append_mode.result +++ b/tests/cases/standalone/common/insert/append_mode.result @@ -10,6 +10,11 @@ with('append_mode'='true'); Affected Rows: 0 +SELECT host, ts from append_mode_on ORDER BY host, ts; + +++ +++ + INSERT INTO append_mode_on VALUES ('host1',0, 0), ('host2', 1, 1,); Affected Rows: 2 diff --git a/tests/cases/standalone/common/insert/append_mode.sql b/tests/cases/standalone/common/insert/append_mode.sql index a27a66f4e0..2438f87789 100644 --- a/tests/cases/standalone/common/insert/append_mode.sql +++ b/tests/cases/standalone/common/insert/append_mode.sql @@ -8,6 +8,8 @@ create table if not exists append_mode_on( engine=mito with('append_mode'='true'); +SELECT host, ts from append_mode_on ORDER BY host, ts; + INSERT INTO append_mode_on VALUES ('host1',0, 0), ('host2', 1, 1,); INSERT INTO append_mode_on VALUES ('host1',0, 0), ('host2', 1, 1,);