feat: Supports flat format in SeqScan and UnorderedScan (#6905)

* feat: support flat format in SeqScan

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support flat format in unordered scan

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support parallel read for flat format in SeqScan

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename flat DedupReader to FlatDedupReader

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address review comments

It also precomputes the input arrow schema

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-04 21:12:24 +08:00
committed by GitHub
parent 8e7f2e92cc
commit 8bbb396506
10 changed files with 516 additions and 30 deletions

View File

@@ -644,7 +644,9 @@ impl EngineInner {
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start);
.with_start_time(query_start)
// TODO(yingwen): Enable it after flat format is supported.
.with_flat_format(false);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);

View File

@@ -84,13 +84,13 @@ impl<I: Iterator<Item = Result<RecordBatch>>, S: RecordBatchDedupStrategy> Itera
}
/// An async reader to dedup sorted record batches from a stream based on the dedup strategy.
pub struct DedupReader<I, S> {
pub struct FlatDedupReader<I, S> {
stream: I,
strategy: S,
metrics: DedupMetrics,
}
impl<I, S> DedupReader<I, S> {
impl<I, S> FlatDedupReader<I, S> {
/// Creates a new dedup iterator.
pub fn new(stream: I, strategy: S) -> Self {
Self {
@@ -101,7 +101,9 @@ impl<I, S> DedupReader<I, S> {
}
}
impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy> DedupReader<I, S> {
impl<I: Stream<Item = Result<RecordBatch>> + Unpin, S: RecordBatchDedupStrategy>
FlatDedupReader<I, S>
{
/// Returns the next deduplicated batch.
async fn fetch_next_batch(&mut self) -> Result<Option<RecordBatch>> {
while let Some(batch) = self.stream.try_next().await? {

View File

@@ -551,7 +551,7 @@ impl Iterator for FlatMergeIterator {
/// Iterator to merge multiple sorted iterators into a single sorted iterator.
///
/// All iterators must be sorted by primary key, time index, sequence desc.
pub struct MergeReader {
pub struct FlatMergeReader {
/// The merge algorithm to maintain heaps.
algo: MergeAlgo<StreamNode>,
/// Current buffered rows to output.
@@ -564,7 +564,7 @@ pub struct MergeReader {
batch_size: usize,
}
impl MergeReader {
impl FlatMergeReader {
/// Creates a new iterator to merge sorted `iters`.
pub async fn new(
schema: SchemaRef,

View File

@@ -16,10 +16,12 @@
use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::arrow::datatypes::Field;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::vectors::Helper;
use snafu::{OptionExt, ResultExt};
@@ -27,6 +29,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::error::{InvalidRequestSnafu, Result};
use crate::sst::internal_fields;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
@@ -51,6 +54,8 @@ pub struct FlatProjectionMapper {
is_empty_projection: bool,
/// The index in flat format [RecordBatch] for each column in the output [RecordBatch].
batch_indices: Vec<usize>,
/// Precomputed Arrow schema for input batches.
input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
}
impl FlatProjectionMapper {
@@ -101,6 +106,9 @@ impl FlatProjectionMapper {
let batch_schema = flat_projected_columns(metadata, &format_projection);
// Safety: We get the column id from the metadata.
let input_arrow_schema = compute_input_arrow_schema(metadata, &batch_schema);
if is_empty_projection {
// If projection is empty, we don't output any column.
return Ok(FlatProjectionMapper {
@@ -110,6 +118,7 @@ impl FlatProjectionMapper {
batch_schema: vec![],
is_empty_projection,
batch_indices: vec![],
input_arrow_schema,
});
}
@@ -135,6 +144,7 @@ impl FlatProjectionMapper {
batch_schema,
is_empty_projection,
batch_indices,
input_arrow_schema,
})
}
@@ -154,12 +164,39 @@ impl FlatProjectionMapper {
&self.column_ids
}
/// Returns the field column start index in output batch.
pub(crate) fn field_column_start(&self) -> usize {
for (idx, column_id) in self
.batch_schema
.iter()
.map(|(column_id, _)| column_id)
.enumerate()
{
// Safety: We get the column id from the metadata in new().
if self
.metadata
.column_by_id(*column_id)
.unwrap()
.semantic_type
== SemanticType::Field
{
return idx;
}
}
self.batch_schema.len()
}
/// Returns ids of columns of the batch that the mapper expects to convert.
#[allow(dead_code)]
pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
&self.batch_schema
}
pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
self.input_arrow_schema.clone()
}
/// Returns the schema of converted [RecordBatch].
/// This is the schema that the stream will output. This schema may contain
/// less columns than [FlatProjectionMapper::column_ids()].
@@ -219,3 +256,35 @@ pub(crate) fn flat_projected_columns(
// Safety: FormatProjection ensures all indices can be unwrapped.
schema.into_iter().map(|id_type| id_type.unwrap()).collect()
}
/// Computes the Arrow schema for input batches.
///
/// # Panics
/// Panics if it can't find the column by the column id in the batch_schema.
fn compute_input_arrow_schema(
metadata: &RegionMetadata,
batch_schema: &[(ColumnId, ConcreteDataType)],
) -> datatypes::arrow::datatypes::SchemaRef {
let mut new_fields = Vec::with_capacity(batch_schema.len() + 3);
for (column_id, _) in batch_schema {
let column_metadata = metadata.column_by_id(*column_id).unwrap();
let field = if column_metadata.semantic_type == SemanticType::Tag {
Field::new_dictionary(
&column_metadata.column_schema.name,
datatypes::arrow::datatypes::DataType::UInt32,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
)
} else {
Field::new(
&column_metadata.column_schema.name,
column_metadata.column_schema.data_type.as_arrow_type(),
column_metadata.column_schema.is_nullable(),
)
};
new_fields.push(Arc::new(field));
}
new_fields.extend_from_slice(&internal_fields());
Arc::new(datatypes::arrow::datatypes::Schema::new(new_fields))
}

View File

@@ -29,6 +29,7 @@ use common_time::range::TimestampRange;
use datafusion_common::Column;
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::Expr;
use futures::StreamExt;
use smallvec::SmallVec;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
@@ -52,7 +53,7 @@ use crate::read::seq_scan::SeqScan;
use crate::read::series_scan::SeriesScan;
use crate::read::stream::ScanBatchStream;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
@@ -65,6 +66,9 @@ use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBui
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::reader::ReaderMetrics;
/// Parallel scan channel size for flat format.
const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
/// Sequential scan.
@@ -212,6 +216,8 @@ pub(crate) struct ScanRegion {
/// Whether to filter out the deleted rows.
/// Usually true for normal read, and false for scan for compaction.
filter_deleted: bool,
/// Whether to use flat format.
flat_format: bool,
#[cfg(feature = "enterprise")]
extension_range_provider: Option<BoxedExtensionRangeProvider>,
}
@@ -236,6 +242,7 @@ impl ScanRegion {
ignore_bloom_filter: false,
start_time: None,
filter_deleted: true,
flat_format: false,
#[cfg(feature = "enterprise")]
extension_range_provider: None,
}
@@ -292,6 +299,13 @@ impl ScanRegion {
self.filter_deleted = filter_deleted;
}
/// Sets whether to use flat format.
#[must_use]
pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
self.flat_format = flat_format;
self
}
#[cfg(feature = "enterprise")]
pub(crate) fn set_extension_range_provider(
&mut self,
@@ -374,8 +388,10 @@ impl ScanRegion {
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?,
None => ProjectionMapper::all(&self.version.metadata, false)?,
Some(p) => {
ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
}
None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
};
let ssts = &self.version.ssts;
@@ -449,11 +465,11 @@ impl ScanRegion {
let bloom_filter_applier = self.build_bloom_filter_applier();
let fulltext_index_applier = self.build_fulltext_index_applier();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied(), false)?,
None => ProjectionMapper::all(&self.version.metadata, false)?,
};
if self.flat_format {
// The batch is already large enough so we use a small channel size here.
self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
}
let input = ScanInput::new(self.access_layer, mapper)
.with_time_range(Some(time_range))
@@ -471,7 +487,8 @@ impl ScanRegion {
.with_filter_deleted(self.filter_deleted)
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution);
.with_distribution(self.request.distribution)
.with_flat_format(self.flat_format);
#[cfg(feature = "enterprise")]
let input = if let Some(provider) = self.extension_range_provider {
@@ -673,6 +690,8 @@ pub struct ScanInput {
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
/// Hint for the required distribution of the scanner.
pub(crate) distribution: Option<TimeSeriesDistribution>,
/// Whether to use flat format.
pub(crate) flat_format: bool,
#[cfg(feature = "enterprise")]
extension_ranges: Vec<BoxedExtensionRange>,
}
@@ -701,6 +720,7 @@ impl ScanInput {
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
flat_format: false,
#[cfg(feature = "enterprise")]
extension_ranges: Vec::new(),
}
@@ -845,6 +865,13 @@ impl ScanInput {
self
}
/// Sets whether to use flat format.
#[must_use]
pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
self.flat_format = flat_format;
self
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
@@ -894,6 +921,7 @@ impl ScanInput {
.bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.flat_format(self.flat_format)
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, selection) = match res {
@@ -964,6 +992,61 @@ impl ScanInput {
});
}
/// Scans flat sources (RecordBatch streams) in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
pub(crate) fn create_parallel_flat_sources(
&self,
sources: Vec<BoxedRecordBatchStream>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<BoxedRecordBatchStream>> {
if sources.len() <= 1 {
return Ok(sources);
}
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
self.spawn_flat_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Box::pin(stream) as _
})
.collect();
Ok(sources)
}
/// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
pub(crate) fn spawn_flat_scan_task(
&self,
mut input: BoxedRecordBatchStream,
semaphore: Arc<Semaphore>,
sender: mpsc::Sender<Result<RecordBatch>>,
) {
common_runtime::spawn_global(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
input.next().await
};
match maybe_batch {
Some(Ok(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Some(Err(e)) => {
let _ = sender.send(Err(e)).await;
break;
}
None => break,
}
}
});
}
pub(crate) fn total_rows(&self) -> usize {
let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();

View File

@@ -26,7 +26,7 @@ use common_telemetry::tracing;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{
@@ -37,16 +37,22 @@ use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
use crate::read::flat_merge::FlatMergeReader;
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::{RangeBuilderList, RangeMeta};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
PartitionMetrics, PartitionMetricsList,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::read::{
scan_util, Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source,
};
use crate::region::options::MergeMode;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
/// Scans a region and returns rows in a sorted sequence.
///
@@ -210,6 +216,56 @@ impl SeqScan {
Ok(reader)
}
/// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn build_flat_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
sources = stream_ctx
.input
.create_parallel_flat_sources(sources, semaphore.clone())?;
}
}
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
let schema = mapper.input_arrow_schema();
let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
let dedup = !stream_ctx.input.append_mode;
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::pin(
FlatDedupReader::new(
reader.into_stream().boxed(),
FlatLastRow::new(stream_ctx.input.filter_deleted),
)
.into_stream(),
) as _,
MergeMode::LastNonNull => Box::pin(
FlatDedupReader::new(
reader.into_stream().boxed(),
FlatLastNonNull::new(
mapper.field_column_start(),
stream_ctx.input.filter_deleted,
),
)
.into_stream(),
) as _,
}
} else {
Box::pin(reader.into_stream()) as _
};
Ok(reader)
}
/// Scans the given partition when the part list is set properly.
/// Otherwise the returned stream might not contains any data.
fn scan_partition_impl(
@@ -227,10 +283,15 @@ impl SeqScan {
}
let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
let input = &self.stream_ctx.input;
let batch_stream = if input.flat_format {
// Use flat scan for bulk memtables
self.scan_flat_batch_in_partition(partition, metrics.clone())?
} else {
// Use regular batch scan for normal memtables
self.scan_batch_in_partition(partition, metrics.clone())?
};
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
input.mapper.clone(),
@@ -342,6 +403,79 @@ impl SeqScan {
Ok(Box::pin(stream))
}
fn scan_flat_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.partitions.len(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
);
if self.properties.partitions[partition].is_empty() {
return Ok(Box::pin(futures::stream::empty()));
}
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
build_flat_sources(
&stream_ctx,
&part_range,
compaction,
&part_metrics,
range_builder_list.clone(),
&mut sources,
).await?;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
.await?;
while let Some(record_batch) = reader.try_next().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += record_batch.num_rows();
debug_assert!(record_batch.num_rows() > 0);
if record_batch.num_rows() == 0 {
continue;
}
let yield_start = Instant::now();
yield ScanBatch::RecordBatch(record_batch);
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
Ok(Box::pin(stream))
}
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
if self.properties.target_partitions() > self.properties.num_partitions() {
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
@@ -543,6 +677,59 @@ pub(crate) async fn build_sources(
Ok(())
}
/// Builds flat sources for the partition range and push them to the `sources` vector.
pub(crate) async fn build_flat_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
sources: &mut Vec<BoxedRecordBatchStream>,
) -> Result<()> {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
if compaction {
// Compaction expects input sources are not been split.
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
// It should scan all row groups.
debug_assert_eq!(
-1, row_group_idx.row_group_index,
"Expect {} range scan all row groups, given: {}",
i, row_group_idx.row_group_index,
);
}
}
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
Box::pin(stream) as _
} else if stream_ctx.is_file_range_index(*index) {
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
let stream = scan_flat_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
range_builder_list.clone(),
)
.await?;
Box::pin(stream) as _
} else {
scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
};
sources.push(stream);
}
Ok(())
}
#[cfg(test)]
impl SeqScan {
/// Returns the input.

View File

@@ -36,6 +36,7 @@ use crate::read::Batch;
pub enum ScanBatch {
Normal(Batch),
Series(SeriesBatch),
RecordBatch(DfRecordBatch),
}
pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
@@ -99,6 +100,12 @@ impl ConvertBatchStream {
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
}
ScanBatch::RecordBatch(df_record_batch) => {
// Safety: Only flat format returns this batch.
let mapper = self.projection_mapper.as_flat().unwrap();
mapper.convert(&df_record_batch)
}
}
}
}

View File

@@ -23,6 +23,7 @@ use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ensure;
@@ -35,7 +36,8 @@ use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
PartitionMetrics, PartitionMetricsList,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{scan_util, Batch, ScannerMetrics};
@@ -135,6 +137,51 @@ impl UnorderedScan {
}
}
/// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
fn scan_flat_partition_range(
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
part_metrics: PartitionMetrics,
range_builder_list: Arc<RangeBuilderList>,
) -> impl Stream<Item = Result<RecordBatch>> {
try_stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_flat_mem_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
);
for await record_batch in stream {
yield record_batch?;
}
} else if stream_ctx.is_file_range_index(*index) {
let stream = scan_flat_file_ranges(
stream_ctx.clone(),
part_metrics.clone(),
*index,
"unordered_scan_files",
range_builder_list.clone(),
).await?;
for await record_batch in stream {
yield record_batch?;
}
} else {
let stream = scan_util::maybe_scan_flat_other_ranges(
&stream_ctx,
*index,
&part_metrics,
).await?;
for await record_batch in stream {
yield record_batch?;
}
}
}
}
}
/// Scan [`Batch`] in all partitions one by one.
pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
let metrics_set = ExecutionPlanMetricsSet::new();
@@ -182,10 +229,16 @@ impl UnorderedScan {
}
let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
let input = &self.stream_ctx.input;
let batch_stream = if input.flat_format {
// Use flat scan for bulk memtables
self.scan_flat_batch_in_partition(partition, metrics.clone())?
} else {
// Use regular batch scan for normal memtables
self.scan_batch_in_partition(partition, metrics.clone())?
};
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
input.mapper.clone(),
@@ -282,6 +335,67 @@ impl UnorderedScan {
};
Ok(Box::pin(stream))
}
fn scan_flat_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.partitions.len(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans each part.
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let stream = Self::scan_flat_partition_range(
stream_ctx.clone(),
part_range.identifier,
part_metrics.clone(),
range_builder_list.clone(),
);
for await record_batch in stream {
let record_batch = record_batch?;
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += record_batch.num_rows();
debug_assert!(record_batch.num_rows() > 0);
if record_batch.num_rows() == 0 {
continue;
}
let yield_start = Instant::now();
yield ScanBatch::RecordBatch(record_batch);
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
Ok(Box::pin(stream))
}
}
impl RegionScanner for UnorderedScan {

View File

@@ -332,6 +332,14 @@ impl ReadFormat {
Some(Arc::new(UInt64Array::from_iter(values)))
}
/// Sets the sequence number to override.
pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
match self {
ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
ReadFormat::Flat(format) => format.set_override_sequence(sequence),
}
}
/// Creates a sequence array to override.
pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
match self {

View File

@@ -56,7 +56,7 @@ use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::{need_override_sequence, PrimaryKeyReadFormat, ReadFormat};
use crate::sst::parquet::format::{need_override_sequence, ReadFormat};
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::RowGroupSelection;
@@ -113,6 +113,8 @@ pub struct ParquetReaderBuilder {
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
expected_metadata: Option<RegionMetadataRef>,
/// Whether to use flat format for reading.
flat_format: bool,
}
impl ParquetReaderBuilder {
@@ -135,6 +137,7 @@ impl ParquetReaderBuilder {
bloom_filter_index_applier: None,
fulltext_index_applier: None,
expected_metadata: None,
flat_format: false,
}
}
@@ -198,6 +201,13 @@ impl ParquetReaderBuilder {
self
}
/// Sets the flat format flag.
#[must_use]
pub fn flat_format(mut self, flat_format: bool) -> Self {
self.flat_format = flat_format;
self
}
/// Builds a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -227,23 +237,27 @@ impl ParquetReaderBuilder {
// Gets the metadata stored in the SST.
let region_meta = Arc::new(Self::get_region_metadata(&file_path, key_value_meta)?);
let mut read_format = if let Some(column_ids) = &self.projection {
PrimaryKeyReadFormat::new(region_meta.clone(), column_ids.iter().copied())
ReadFormat::new(
region_meta.clone(),
column_ids.iter().copied(),
self.flat_format,
)
} else {
// Lists all column ids to read, we always use the expected metadata if possible.
let expected_meta = self.expected_metadata.as_ref().unwrap_or(&region_meta);
PrimaryKeyReadFormat::new(
ReadFormat::new(
region_meta.clone(),
expected_meta
.column_metadatas
.iter()
.map(|col| col.column_id),
self.flat_format,
)
};
if need_override_sequence(&parquet_meta) {
read_format
.set_override_sequence(self.file_handle.meta_ref().sequence.map(|x| x.get()));
}
let read_format = ReadFormat::PrimaryKey(read_format);
// Computes the projection mask.
let parquet_schema_desc = parquet_meta.file_metadata().schema_descr();