feat: skip merge and dedup if there is only one source

This commit is contained in:
evenyag
2024-12-04 15:29:57 +08:00
parent 32afd91f01
commit 2fc7a54b11
4 changed files with 63 additions and 57 deletions

View File

@@ -56,7 +56,7 @@ use crate::metrics::COMPACTION_STAGE_ELAPSED;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::seq_scan::SeqScan;
use crate::read::BoxedBatchReader;
use crate::read::Source;
use crate::region::options::MergeMode;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::region::ManifestContextRef;
@@ -558,8 +558,8 @@ pub struct SerializedCompactionOutput {
output_time_range: Option<TimestampRange>,
}
/// Builders to create [BoxedBatchReader] for compaction.
struct CompactionSstReaderBuilder<'a> {
/// Builders to create [Source] for compaction.
struct CompactionSstSourceBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: CacheManagerRef,
@@ -570,9 +570,9 @@ struct CompactionSstReaderBuilder<'a> {
merge_mode: MergeMode,
}
impl<'a> CompactionSstReaderBuilder<'a> {
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
impl<'a> CompactionSstSourceBuilder<'a> {
/// Builds [Source] that reads all SST files and yields batches in primary key order.
async fn build_sst_source(self) -> Result<Source> {
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
@@ -589,7 +589,9 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input, true).build_reader().await
SeqScan::new(scan_input, true)
.build_source_for_compaction()
.await
}
}

View File

@@ -28,7 +28,7 @@ use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{new_picker, PickerOutput};
use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
use crate::compaction::{find_ttl, CompactionSstSourceBuilder};
use crate::config::MitoConfig;
use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
@@ -36,7 +36,6 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::MemtableBuilderProvider;
use crate::read::Source;
use crate::region::opener::new_manifest_dir;
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionRef};
@@ -292,7 +291,7 @@ impl Compactor for DefaultCompactor {
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
let source = CompactionSstSourceBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
@@ -302,7 +301,7 @@ impl Compactor for DefaultCompactor {
time_range: output.output_time_range,
merge_mode,
}
.build_sst_reader()
.build_sst_source()
.await?;
let file_meta_opt = sst_layer
.write_sst(
@@ -310,7 +309,7 @@ impl Compactor for DefaultCompactor {
op_type: OperationType::Compact,
file_id,
metadata: region_metadata,
source: Source::Reader(reader),
source,
cache_manager,
storage,
index_options,

View File

@@ -24,7 +24,7 @@ use crate::cache::{
SelectorResultValue,
};
use crate::error::Result;
use crate::read::{Batch, BatchReader, BoxedBatchReader};
use crate::read::{Batch, BatchReader, Source};
use crate::sst::file::FileId;
use crate::sst::parquet::reader::RowGroupReader;
@@ -38,14 +38,14 @@ use crate::sst::parquet::reader::RowGroupReader;
/// it focus on time series (the same key).
pub(crate) struct LastRowReader {
/// Inner reader.
reader: BoxedBatchReader,
reader: Source,
/// The last batch pending to return.
selector: LastRowSelector,
}
impl LastRowReader {
/// Creates a new `LastRowReader`.
pub(crate) fn new(reader: BoxedBatchReader) -> Self {
pub(crate) fn new(reader: Source) -> Self {
Self {
reader,
selector: LastRowSelector::default(),
@@ -284,7 +284,7 @@ mod tests {
&[21, 22],
)];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
let mut reader = LastRowReader::new(Source::Reader(Box::new(reader)));
check_reader_result(
&mut reader,
&[new_batch(b"k1", &[2], &[11], &[OpType::Put], &[22])],
@@ -294,7 +294,7 @@ mod tests {
// Only one row.
let input = [new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
let mut reader = LastRowReader::new(Source::Reader(Box::new(reader)));
check_reader_result(
&mut reader,
&[new_batch(b"k1", &[1], &[11], &[OpType::Put], &[21])],
@@ -328,7 +328,7 @@ mod tests {
),
];
let reader = VecBatchReader::new(&input);
let mut reader = LastRowReader::new(Box::new(reader));
let mut reader = LastRowReader::new(Source::Reader(Box::new(reader)));
check_reader_result(
&mut reader,
&[

View File

@@ -39,7 +39,7 @@ use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::read::{ScannerMetrics, Source};
use crate::region::options::MergeMode;
/// Scans a region and returns rows in a sorted sequence.
@@ -85,8 +85,8 @@ impl SeqScan {
Ok(Box::pin(aggr_stream))
}
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
/// Builds a [Source] from sequential scan for compaction.
pub async fn build_source_for_compaction(&self) -> Result<Source> {
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
0,
@@ -100,23 +100,22 @@ impl SeqScan {
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
let reader = Self::build_all_merge_reader(
Self::build_source_merge_all(
&self.stream_ctx,
partition_ranges,
self.compaction,
&part_metrics,
)
.await?;
Ok(Box::new(reader))
.await
}
/// Builds a merge reader that reads all data.
async fn build_all_merge_reader(
/// Builds a source to merge all data.
async fn build_source_merge_all(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
compaction: bool,
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
) -> Result<Source> {
let mut sources = Vec::new();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
@@ -132,51 +131,57 @@ impl SeqScan {
&mut sources,
);
}
Self::build_reader_from_sources(stream_ctx, sources, None).await
Self::merge_sources(stream_ctx, sources, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
/// Builds a source to merge and read input sources. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_reader_from_sources(
async fn merge_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<BoxedBatchReader> {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
) -> Result<Source> {
let output_source = if sources.len() > 1 {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
}
}
let mut builder = MergeReaderBuilder::from_sources(sources);
let reader = builder.build().await?;
let mut builder = MergeReaderBuilder::from_sources(sources);
let reader = builder.build().await?;
let dedup = !stream_ctx.input.append_mode;
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
)) as _,
MergeMode::LastNonNull => Box::new(DedupReader::new(
reader,
LastNonNull::new(stream_ctx.input.filter_deleted),
)) as _,
}
let dedup = !stream_ctx.input.append_mode;
let reader = if dedup {
match stream_ctx.input.merge_mode {
MergeMode::LastRow => Box::new(DedupReader::new(
reader,
LastRow::new(stream_ctx.input.filter_deleted),
)) as _,
MergeMode::LastNonNull => Box::new(DedupReader::new(
reader,
LastNonNull::new(stream_ctx.input.filter_deleted),
)) as _,
}
} else {
Box::new(reader) as _
};
Source::Reader(reader)
} else {
Box::new(reader) as _
// There is only one source. We can skip merge and dedup.
sources.pop().unwrap()
};
let reader = match &stream_ctx.input.series_row_selector {
Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
None => reader,
};
Ok(reader)
match &stream_ctx.input.series_row_selector {
Some(TimeSeriesRowSelector::LastRow) => {
let reader = Box::new(LastRowReader::new(output_source)) as _;
Ok(Source::Reader(reader))
}
None => Ok(output_source),
}
}
/// Scans the given partition when the part list is set properly.
@@ -242,7 +247,7 @@ impl SeqScan {
);
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
Self::merge_sources(&stream_ctx, sources, semaphore.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;