diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index febd2be45e..2a6d3de84e 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -644,7 +644,9 @@ impl EngineInner { .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) - .with_start_time(query_start); + .with_start_time(query_start) + // TODO(yingwen): Enable it after flat format is supported. + .with_flat_format(false); #[cfg(feature = "enterprise")] let scan_region = self.maybe_fill_extension_range_provider(scan_region, region); diff --git a/src/mito2/src/read/flat_dedup.rs b/src/mito2/src/read/flat_dedup.rs index 5b5120e4a4..f872e738ec 100644 --- a/src/mito2/src/read/flat_dedup.rs +++ b/src/mito2/src/read/flat_dedup.rs @@ -84,13 +84,13 @@ impl>, S: RecordBatchDedupStrategy> Itera } /// An async reader to dedup sorted record batches from a stream based on the dedup strategy. -pub struct DedupReader { +pub struct FlatDedupReader { stream: I, strategy: S, metrics: DedupMetrics, } -impl DedupReader { +impl FlatDedupReader { /// Creates a new dedup iterator. pub fn new(stream: I, strategy: S) -> Self { Self { @@ -101,7 +101,9 @@ impl DedupReader { } } -impl> + Unpin, S: RecordBatchDedupStrategy> DedupReader { +impl> + Unpin, S: RecordBatchDedupStrategy> + FlatDedupReader +{ /// Returns the next deduplicated batch. async fn fetch_next_batch(&mut self) -> Result> { while let Some(batch) = self.stream.try_next().await? { diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 109aef108e..fa7994121b 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -551,7 +551,7 @@ impl Iterator for FlatMergeIterator { /// Iterator to merge multiple sorted iterators into a single sorted iterator. /// /// All iterators must be sorted by primary key, time index, sequence desc. -pub struct MergeReader { +pub struct FlatMergeReader { /// The merge algorithm to maintain heaps. algo: MergeAlgo, /// Current buffered rows to output. @@ -564,7 +564,7 @@ pub struct MergeReader { batch_size: usize, } -impl MergeReader { +impl FlatMergeReader { /// Creates a new iterator to merge sorted `iters`. pub async fn new( schema: SchemaRef, diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index 0716a041d3..ce2e51f85a 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -16,10 +16,12 @@ use std::sync::Arc; +use api::v1::SemanticType; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::RecordBatch; -use datatypes::prelude::ConcreteDataType; +use datatypes::arrow::datatypes::Field; +use datatypes::prelude::{ConcreteDataType, DataType}; use datatypes::schema::{Schema, SchemaRef}; use datatypes::vectors::Helper; use snafu::{OptionExt, ResultExt}; @@ -27,6 +29,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use crate::error::{InvalidRequestSnafu, Result}; +use crate::sst::internal_fields; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; @@ -51,6 +54,8 @@ pub struct FlatProjectionMapper { is_empty_projection: bool, /// The index in flat format [RecordBatch] for each column in the output [RecordBatch]. batch_indices: Vec, + /// Precomputed Arrow schema for input batches. + input_arrow_schema: datatypes::arrow::datatypes::SchemaRef, } impl FlatProjectionMapper { @@ -101,6 +106,9 @@ impl FlatProjectionMapper { let batch_schema = flat_projected_columns(metadata, &format_projection); + // Safety: We get the column id from the metadata. + let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema); + if is_empty_projection { // If projection is empty, we don't output any column. return Ok(FlatProjectionMapper { @@ -110,6 +118,7 @@ impl FlatProjectionMapper { batch_schema: vec![], is_empty_projection, batch_indices: vec![], + input_arrow_schema, }); } @@ -135,6 +144,7 @@ impl FlatProjectionMapper { batch_schema, is_empty_projection, batch_indices, + input_arrow_schema, }) } @@ -154,12 +164,39 @@ impl FlatProjectionMapper { &self.column_ids } + /// Returns the field column start index in output batch. + pub(crate) fn field_column_start(&self) -> usize { + for (idx, column_id) in self + .batch_schema + .iter() + .map(|(column_id, _)| column_id) + .enumerate() + { + // Safety: We get the column id from the metadata in new(). + if self + .metadata + .column_by_id(*column_id) + .unwrap() + .semantic_type + == SemanticType::Field + { + return idx; + } + } + + self.batch_schema.len() + } + /// Returns ids of columns of the batch that the mapper expects to convert. #[allow(dead_code)] pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] { &self.batch_schema } + pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef { + self.input_arrow_schema.clone() + } + /// Returns the schema of converted [RecordBatch]. /// This is the schema that the stream will output. This schema may contain /// less columns than [FlatProjectionMapper::column_ids()]. @@ -219,3 +256,35 @@ pub(crate) fn flat_projected_columns( // Safety: FormatProjection ensures all indices can be unwrapped. schema.into_iter().map(|id_type| id_type.unwrap()).collect() } + +/// Computes the Arrow schema for input batches. +/// +/// # Panics +/// Panics if it can't find the column by the column id in the batch_schema. +fn compute_input_arrow_schema( + metadata: &RegionMetadata, + batch_schema: &[(ColumnId, ConcreteDataType)], +) -> datatypes::arrow::datatypes::SchemaRef { + let mut new_fields = Vec::with_capacity(batch_schema.len() + 3); + for (column_id, _) in batch_schema { + let column_metadata = metadata.column_by_id(*column_id).unwrap(); + let field = if column_metadata.semantic_type == SemanticType::Tag { + Field::new_dictionary( + &column_metadata.column_schema.name, + datatypes::arrow::datatypes::DataType::UInt32, + column_metadata.column_schema.data_type.as_arrow_type(), + column_metadata.column_schema.is_nullable(), + ) + } else { + Field::new( + &column_metadata.column_schema.name, + column_metadata.column_schema.data_type.as_arrow_type(), + column_metadata.column_schema.is_nullable(), + ) + }; + new_fields.push(Arc::new(field)); + } + new_fields.extend_from_slice(&internal_fields()); + + Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields)) +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index be7854f0db..4b5fcdf1b3 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -29,6 +29,7 @@ use common_time::range::TimestampRange; use datafusion_common::Column; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::Expr; +use futures::StreamExt; use smallvec::SmallVec; use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; @@ -52,7 +53,7 @@ use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; use crate::read::stream::ScanBatchStream; use crate::read::unordered_scan::UnorderedScan; -use crate::read::{Batch, Source}; +use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; use crate::sst::file::FileHandle; @@ -65,6 +66,9 @@ use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBui use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::reader::ReaderMetrics; +/// Parallel scan channel size for flat format. +const FLAT_SCAN_CHANNEL_SIZE: usize = 2; + /// A scanner scans a region and returns a [SendableRecordBatchStream]. pub(crate) enum Scanner { /// Sequential scan. @@ -212,6 +216,8 @@ pub(crate) struct ScanRegion { /// Whether to filter out the deleted rows. /// Usually true for normal read, and false for scan for compaction. filter_deleted: bool, + /// Whether to use flat format. + flat_format: bool, #[cfg(feature = "enterprise")] extension_range_provider: Option, } @@ -236,6 +242,7 @@ impl ScanRegion { ignore_bloom_filter: false, start_time: None, filter_deleted: true, + flat_format: false, #[cfg(feature = "enterprise")] extension_range_provider: None, } @@ -292,6 +299,13 @@ impl ScanRegion { self.filter_deleted = filter_deleted; } + /// Sets whether to use flat format. + #[must_use] + pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self { + self.flat_format = flat_format; + self + } + #[cfg(feature = "enterprise")] pub(crate) fn set_extension_range_provider( &mut self, @@ -374,8 +388,10 @@ impl ScanRegion { // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { - Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?, - None => ProjectionMapper::all(&self.version.metadata, false)?, + Some(p) => { + ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)? + } + None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?, }; let ssts = &self.version.ssts; @@ -449,11 +465,11 @@ impl ScanRegion { let bloom_filter_applier = self.build_bloom_filter_applier(); let fulltext_index_applier = self.build_fulltext_index_applier(); let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters); - // The mapper always computes projected column ids as the schema of SSTs may change. - let mapper = match &self.request.projection { - Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?, - None => ProjectionMapper::all(&self.version.metadata, false)?, - }; + + if self.flat_format { + // The batch is already large enough so we use a small channel size here. + self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE; + } let input = ScanInput::new(self.access_layer, mapper) .with_time_range(Some(time_range)) @@ -471,7 +487,8 @@ impl ScanRegion { .with_filter_deleted(self.filter_deleted) .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) - .with_distribution(self.request.distribution); + .with_distribution(self.request.distribution) + .with_flat_format(self.flat_format); #[cfg(feature = "enterprise")] let input = if let Some(provider) = self.extension_range_provider { @@ -673,6 +690,8 @@ pub struct ScanInput { pub(crate) series_row_selector: Option, /// Hint for the required distribution of the scanner. pub(crate) distribution: Option, + /// Whether to use flat format. + pub(crate) flat_format: bool, #[cfg(feature = "enterprise")] extension_ranges: Vec, } @@ -701,6 +720,7 @@ impl ScanInput { merge_mode: MergeMode::default(), series_row_selector: None, distribution: None, + flat_format: false, #[cfg(feature = "enterprise")] extension_ranges: Vec::new(), } @@ -845,6 +865,13 @@ impl ScanInput { self } + /// Sets whether to use flat format. + #[must_use] + pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self { + self.flat_format = flat_format; + self + } + /// Scans sources in parallel. /// /// # Panics if the input doesn't allow parallel scan. @@ -894,6 +921,7 @@ impl ScanInput { .bloom_filter_index_applier(self.bloom_filter_index_applier.clone()) .fulltext_index_applier(self.fulltext_index_applier.clone()) .expected_metadata(Some(self.mapper.metadata().clone())) + .flat_format(self.flat_format) .build_reader_input(reader_metrics) .await; let (mut file_range_ctx, selection) = match res { @@ -964,6 +992,61 @@ impl ScanInput { }); } + /// Scans flat sources (RecordBatch streams) in parallel. + /// + /// # Panics if the input doesn't allow parallel scan. + pub(crate) fn create_parallel_flat_sources( + &self, + sources: Vec, + semaphore: Arc, + ) -> Result> { + if sources.len() <= 1 { + return Ok(sources); + } + + // Spawn a task for each source. + let sources = sources + .into_iter() + .map(|source| { + let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size); + self.spawn_flat_scan_task(source, semaphore.clone(), sender); + let stream = Box::pin(ReceiverStream::new(receiver)); + Box::pin(stream) as _ + }) + .collect(); + Ok(sources) + } + + /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously. + pub(crate) fn spawn_flat_scan_task( + &self, + mut input: BoxedRecordBatchStream, + semaphore: Arc, + sender: mpsc::Sender>, + ) { + common_runtime::spawn_global(async move { + loop { + // We release the permit before sending result to avoid the task waiting on + // the channel with the permit held. + let maybe_batch = { + // Safety: We never close the semaphore. + let _permit = semaphore.acquire().await.unwrap(); + input.next().await + }; + match maybe_batch { + Some(Ok(batch)) => { + let _ = sender.send(Ok(batch)).await; + } + Some(Err(e)) => { + let _ = sender.send(Err(e)).await; + break; + } + None => break, + } + } + }); + } + pub(crate) fn total_rows(&self) -> usize { let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum(); let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum(); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index bb6514992b..fd597b829a 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -26,7 +26,7 @@ use common_telemetry::tracing; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ @@ -37,16 +37,22 @@ use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu}; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; +use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow}; +use crate::read::flat_merge::FlatMergeReader; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; use crate::read::range::{RangeBuilderList, RangeMeta}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ - scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, + scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges, + PartitionMetrics, PartitionMetricsList, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source}; +use crate::read::{ + scan_util, Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, +}; use crate::region::options::MergeMode; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; /// Scans a region and returns rows in a sorted sequence. /// @@ -210,6 +216,56 @@ impl SeqScan { Ok(reader) } + /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel + /// if possible. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] + pub(crate) async fn build_flat_reader_from_sources( + stream_ctx: &StreamContext, + mut sources: Vec, + semaphore: Option>, + ) -> Result { + if let Some(semaphore) = semaphore.as_ref() { + // Read sources in parallel. + if sources.len() > 1 { + sources = stream_ctx + .input + .create_parallel_flat_sources(sources, semaphore.clone())?; + } + } + + let mapper = stream_ctx.input.mapper.as_flat().unwrap(); + let schema = mapper.input_arrow_schema(); + + let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?; + + let dedup = !stream_ctx.input.append_mode; + let reader = if dedup { + match stream_ctx.input.merge_mode { + MergeMode::LastRow => Box::pin( + FlatDedupReader::new( + reader.into_stream().boxed(), + FlatLastRow::new(stream_ctx.input.filter_deleted), + ) + .into_stream(), + ) as _, + MergeMode::LastNonNull => Box::pin( + FlatDedupReader::new( + reader.into_stream().boxed(), + FlatLastNonNull::new( + mapper.field_column_start(), + stream_ctx.input.filter_deleted, + ), + ) + .into_stream(), + ) as _, + } + } else { + Box::pin(reader.into_stream()) as _ + }; + + Ok(reader) + } + /// Scans the given partition when the part list is set properly. /// Otherwise the returned stream might not contains any data. fn scan_partition_impl( @@ -227,10 +283,15 @@ impl SeqScan { } let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition); - - let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?; - let input = &self.stream_ctx.input; + + let batch_stream = if input.flat_format { + // Use flat scan for bulk memtables + self.scan_flat_batch_in_partition(partition, metrics.clone())? + } else { + // Use regular batch scan for normal memtables + self.scan_batch_in_partition(partition, metrics.clone())? + }; let record_batch_stream = ConvertBatchStream::new( batch_stream, input.mapper.clone(), @@ -342,6 +403,79 @@ impl SeqScan { Ok(Box::pin(stream)) } + fn scan_flat_batch_in_partition( + &self, + partition: usize, + part_metrics: PartitionMetrics, + ) -> Result { + ensure!( + partition < self.properties.partitions.len(), + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + ); + + if self.properties.partitions[partition].is_empty() { + return Ok(Box::pin(futures::stream::empty())); + } + + let stream_ctx = self.stream_ctx.clone(); + let semaphore = self.new_semaphore(); + let partition_ranges = self.properties.partitions[partition].clone(); + let compaction = self.compaction; + + let stream = try_stream! { + part_metrics.on_first_poll(); + + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); + // Scans each part. + for part_range in partition_ranges { + let mut sources = Vec::new(); + build_flat_sources( + &stream_ctx, + &part_range, + compaction, + &part_metrics, + range_builder_list.clone(), + &mut sources, + ).await?; + + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); + let mut reader = + Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone()) + .await?; + + while let Some(record_batch) = reader.try_next().await? { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += record_batch.num_rows(); + + debug_assert!(record_batch.num_rows() > 0); + if record_batch.num_rows() == 0 { + continue; + } + + let yield_start = Instant::now(); + yield ScanBatch::RecordBatch(record_batch); + metrics.yield_cost += yield_start.elapsed(); + + fetch_start = Instant::now(); + } + + metrics.scan_cost += fetch_start.elapsed(); + part_metrics.merge_metrics(&metrics); + } + + part_metrics.on_finish(); + }; + Ok(Box::pin(stream)) + } + fn new_semaphore(&self) -> Option> { if self.properties.target_partitions() > self.properties.num_partitions() { // We can use additional tasks to read the data if we have more target partitions than actual partitions. @@ -543,6 +677,59 @@ pub(crate) async fn build_sources( Ok(()) } +/// Builds flat sources for the partition range and push them to the `sources` vector. +pub(crate) async fn build_flat_sources( + stream_ctx: &Arc, + part_range: &PartitionRange, + compaction: bool, + part_metrics: &PartitionMetrics, + range_builder_list: Arc, + sources: &mut Vec, +) -> Result<()> { + // Gets range meta. + let range_meta = &stream_ctx.ranges[part_range.identifier]; + #[cfg(debug_assertions)] + if compaction { + // Compaction expects input sources are not been split. + debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len()); + for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() { + // It should scan all row groups. + debug_assert_eq!( + -1, row_group_idx.row_group_index, + "Expect {} range scan all row groups, given: {}", + i, row_group_idx.row_group_index, + ); + } + } + + sources.reserve(range_meta.row_group_indices.len()); + for index in &range_meta.row_group_indices { + let stream = if stream_ctx.is_mem_range_index(*index) { + let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); + Box::pin(stream) as _ + } else if stream_ctx.is_file_range_index(*index) { + let read_type = if compaction { + "compaction" + } else { + "seq_scan_files" + }; + let stream = scan_flat_file_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + read_type, + range_builder_list.clone(), + ) + .await?; + Box::pin(stream) as _ + } else { + scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await? + }; + sources.push(stream); + } + Ok(()) +} + #[cfg(test)] impl SeqScan { /// Returns the input. diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs index 58befd0711..85a5dd4d9d 100644 --- a/src/mito2/src/read/stream.rs +++ b/src/mito2/src/read/stream.rs @@ -36,6 +36,7 @@ use crate::read::Batch; pub enum ScanBatch { Normal(Batch), Series(SeriesBatch), + RecordBatch(DfRecordBatch), } pub type ScanBatchStream = BoxStream<'static, Result>; @@ -99,6 +100,12 @@ impl ConvertBatchStream { RecordBatch::try_from_df_record_batch(output_schema, record_batch) } + ScanBatch::RecordBatch(df_record_batch) => { + // Safety: Only flat format returns this batch. + let mapper = self.projection_mapper.as_flat().unwrap(); + + mapper.convert(&df_record_batch) + } } } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index c653328278..1d9004106d 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -23,6 +23,7 @@ use common_error::ext::BoxedError; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::arrow::record_batch::RecordBatch; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::ensure; @@ -35,7 +36,8 @@ use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ - scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, + scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges, + PartitionMetrics, PartitionMetricsList, }; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; use crate::read::{scan_util, Batch, ScannerMetrics}; @@ -135,6 +137,51 @@ impl UnorderedScan { } } + /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch. + fn scan_flat_partition_range( + stream_ctx: Arc, + part_range_id: usize, + part_metrics: PartitionMetrics, + range_builder_list: Arc, + ) -> impl Stream> { + try_stream! { + // Gets range meta. + let range_meta = &stream_ctx.ranges[part_range_id]; + for index in &range_meta.row_group_indices { + if stream_ctx.is_mem_range_index(*index) { + let stream = scan_flat_mem_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + ); + for await record_batch in stream { + yield record_batch?; + } + } else if stream_ctx.is_file_range_index(*index) { + let stream = scan_flat_file_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + "unordered_scan_files", + range_builder_list.clone(), + ).await?; + for await record_batch in stream { + yield record_batch?; + } + } else { + let stream = scan_util::maybe_scan_flat_other_ranges( + &stream_ctx, + *index, + &part_metrics, + ).await?; + for await record_batch in stream { + yield record_batch?; + } + } + } + } + } + /// Scan [`Batch`] in all partitions one by one. pub(crate) fn scan_all_partitions(&self) -> Result { let metrics_set = ExecutionPlanMetricsSet::new(); @@ -182,10 +229,16 @@ impl UnorderedScan { } let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set); - - let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?; - let input = &self.stream_ctx.input; + + let batch_stream = if input.flat_format { + // Use flat scan for bulk memtables + self.scan_flat_batch_in_partition(partition, metrics.clone())? + } else { + // Use regular batch scan for normal memtables + self.scan_batch_in_partition(partition, metrics.clone())? + }; + let record_batch_stream = ConvertBatchStream::new( batch_stream, input.mapper.clone(), @@ -282,6 +335,67 @@ impl UnorderedScan { }; Ok(Box::pin(stream)) } + + fn scan_flat_batch_in_partition( + &self, + partition: usize, + part_metrics: PartitionMetrics, + ) -> Result { + ensure!( + partition < self.properties.partitions.len(), + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + ); + + let stream_ctx = self.stream_ctx.clone(); + let part_ranges = self.properties.partitions[partition].clone(); + + let stream = try_stream! { + part_metrics.on_first_poll(); + + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); + // Scans each part. + for part_range in part_ranges { + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); + + let stream = Self::scan_flat_partition_range( + stream_ctx.clone(), + part_range.identifier, + part_metrics.clone(), + range_builder_list.clone(), + ); + for await record_batch in stream { + let record_batch = record_batch?; + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += record_batch.num_rows(); + + debug_assert!(record_batch.num_rows() > 0); + if record_batch.num_rows() == 0 { + continue; + } + + let yield_start = Instant::now(); + yield ScanBatch::RecordBatch(record_batch); + metrics.yield_cost += yield_start.elapsed(); + + fetch_start = Instant::now(); + } + + metrics.scan_cost += fetch_start.elapsed(); + part_metrics.merge_metrics(&metrics); + } + + part_metrics.on_finish(); + }; + Ok(Box::pin(stream)) + } } impl RegionScanner for UnorderedScan { diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs index 211e47ea43..87377f6e6d 100644 --- a/src/mito2/src/sst/parquet/format.rs +++ b/src/mito2/src/sst/parquet/format.rs @@ -332,6 +332,14 @@ impl ReadFormat { Some(Arc::new(UInt64Array::from_iter(values))) } + /// Sets the sequence number to override. + pub(crate) fn set_override_sequence(&mut self, sequence: Option) { + match self { + ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence), + ReadFormat::Flat(format) => format.set_override_sequence(sequence), + } + } + /// Creates a sequence array to override. pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option { match self { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index edc3be2465..9798eda826 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -56,7 +56,7 @@ use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef}; -use crate::sst::parquet::format::{need_override_sequence, PrimaryKeyReadFormat, ReadFormat}; +use crate::sst::parquet::format::{need_override_sequence, ReadFormat}; use crate::sst::parquet::metadata::MetadataLoader; use crate::sst::parquet::row_group::InMemoryRowGroup; use crate::sst::parquet::row_selection::RowGroupSelection; @@ -113,6 +113,8 @@ pub struct ParquetReaderBuilder { /// This is usually the latest metadata of the region. The reader use /// it get the correct column id of a column by name. expected_metadata: Option, + /// Whether to use flat format for reading. + flat_format: bool, } impl ParquetReaderBuilder { @@ -135,6 +137,7 @@ impl ParquetReaderBuilder { bloom_filter_index_applier: None, fulltext_index_applier: None, expected_metadata: None, + flat_format: false, } } @@ -198,6 +201,13 @@ impl ParquetReaderBuilder { self } + /// Sets the flat format flag. + #[must_use] + pub fn flat_format(mut self, flat_format: bool) -> Self { + self.flat_format = flat_format; + self + } + /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. @@ -227,23 +237,27 @@ impl ParquetReaderBuilder { // Gets the metadata stored in the SST. let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?); let mut read_format = if let Some(column_ids) = &self.projection { - PrimaryKeyReadFormat::new(region_meta.clone(), column_ids.iter().copied()) + ReadFormat::new( + region_meta.clone(), + column_ids.iter().copied(), + self.flat_format, + ) } else { // Lists all column ids to read, we always use the expected metadata if possible. let expected_meta = self.expected_metadata.as_ref().unwrap_or(®ion_meta); - PrimaryKeyReadFormat::new( + ReadFormat::new( region_meta.clone(), expected_meta .column_metadatas .iter() .map(|col| col.column_id), + self.flat_format, ) }; if need_override_sequence(&parquet_meta) { read_format .set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get())); } - let read_format = ReadFormat::PrimaryKey(read_format); // Computes the projection mask. let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();