feat: only split in non-compaction seq scan

This commit is contained in:
evenyag
2024-12-03 18:57:04 +08:00
parent 8a357d93a6
commit adc374c9a3
5 changed files with 122 additions and 224 deletions

View File

@@ -589,10 +589,7 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input)
.with_compaction()
.build_reader()
.await
SeqScan::new(scan_input, true).build_reader().await
}
}

View File

@@ -32,7 +32,7 @@ use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
pub(crate) const ALL_ROW_GROUPS: i64 = -1;
const ALL_ROW_GROUPS: i64 = -1;
/// Index to access a row group.
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -81,25 +81,18 @@ impl RangeMeta {
}
/// Creates a list of ranges from the `input` for seq scan.
pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
/// If `compaction` is true, it doesn't split the ranges.
pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
// common_telemetry::info!(
// "DEBUG_SCAN: seq_scan_ranges before split, num_ranges: {}",
// ranges.len(),
// );
// maybe_split_ranges_for_seq_scan(ranges)
// let ranges =
// maybe_split_ranges_for_seq_scan_groups(ranges, input.memtables.len(), &input.files);
// common_telemetry::info!(
// "DEBUG_SCAN: seq_scan_ranges after split, num_ranges: {}, ranges: {:?}",
// ranges.len(),
// ranges,
// );
ranges
if compaction {
// We don't split ranges in compaction.
return ranges;
}
maybe_split_ranges_for_seq_scan(ranges, input.memtables.len(), &input.files)
}
/// Creates a list of ranges from the `input` for unordered scan.
@@ -117,13 +110,13 @@ impl RangeMeta {
}
/// Returns true if the time range of given `meta` overlaps with the time range of this meta.
pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool {
fn overlaps(&self, meta: &RangeMeta) -> bool {
overlaps(&self.time_range, &meta.time_range)
}
/// Merges given `meta` to this meta.
/// It assumes that the time ranges overlap and they don't have the same file or memtable index.
pub(crate) fn merge(&mut self, mut other: RangeMeta) {
fn merge(&mut self, mut other: RangeMeta) {
debug_assert!(self.overlaps(&other));
debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
debug_assert!(self
@@ -142,44 +135,16 @@ impl RangeMeta {
/// Returns true if we can split the range into multiple smaller ranges and
/// still preserve the order for [SeqScan].
pub(crate) fn can_split_preserve_order(&self) -> bool {
// Only one source and multiple row groups.
if self.indices.len() == 1 {
if self.row_group_indices.len() > 1 {
return true;
} else if self.row_group_indices.len() == 1
&& self.row_group_indices[0].row_group_index == ALL_ROW_GROUPS
{
return true;
}
}
false
// fixme(yingwen): Remove this.
// self.indices.len() == 1 && self.row_group_indices.len() > 1
fn can_split_preserve_order(&self) -> bool {
self.indices.len() == 1
}
/// Splits the range if it can preserve the order.
pub(crate) fn maybe_split(self, output: &mut Vec<RangeMeta>) {
if self.can_split_preserve_order() {
output.reserve(self.row_group_indices.len());
let num_rows = self.num_rows / self.row_group_indices.len();
// Splits by row group.
for index in self.row_group_indices {
output.push(RangeMeta {
time_range: self.time_range,
indices: self.indices.clone(),
row_group_indices: smallvec![index],
num_rows,
});
}
} else {
output.push(self);
}
}
/// Splits the range if it can preserve the order.
pub(crate) fn maybe_split_groups(self, num_row_groups: u64, output: &mut Vec<RangeMeta>) {
if self.can_split_preserve_order() && num_row_groups > 0 {
/// The range should only has one file or memtable. `num_row_groups` should be
/// the number of row groups in the file or memtable.
fn maybe_split(self, num_row_groups: u64, output: &mut Vec<RangeMeta>) {
debug_assert!(self.can_split_preserve_order());
if self.can_split_preserve_order() && num_row_groups > 1 {
output.reserve(self.row_group_indices.len());
let num_rows = self.num_rows / num_row_groups as usize;
// Splits by row group.
@@ -317,31 +282,6 @@ impl RangeMeta {
}],
num_rows: file.meta_ref().num_rows as usize,
});
// if file.meta_ref().num_row_groups > 0 {
// // All row groups share the same time range.
// let row_group_indices = (0..file.meta_ref().num_row_groups)
// .map(|row_group_index| RowGroupIndex {
// index: file_index,
// row_group_index: row_group_index as i64,
// })
// .collect();
// ranges.push(RangeMeta {
// time_range: file.time_range(),
// indices: smallvec![file_index],
// row_group_indices,
// num_rows: file.meta_ref().num_rows as usize,
// });
// } else {
// ranges.push(RangeMeta {
// time_range: file.time_range(),
// indices: smallvec![file_index],
// row_group_indices: smallvec![RowGroupIndex {
// index: file_index,
// row_group_index: ALL_ROW_GROUPS,
// }],
// num_rows: file.meta_ref().num_rows as usize,
// });
// }
}
}
}
@@ -386,18 +326,7 @@ fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
/// Splits the range into multiple smaller ranges.
/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
let mut new_ranges = Vec::with_capacity(ranges.len());
for range in ranges {
range.maybe_split(&mut new_ranges);
}
new_ranges
}
/// Splits the range into multiple smaller ranges.
/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
fn maybe_split_ranges_for_seq_scan_groups(
fn maybe_split_ranges_for_seq_scan(
ranges: Vec<RangeMeta>,
num_memtables: usize,
files: &[FileHandle],
@@ -405,11 +334,13 @@ fn maybe_split_ranges_for_seq_scan_groups(
let mut new_ranges = Vec::with_capacity(ranges.len());
for range in ranges {
if range.indices[0] < num_memtables {
range.maybe_split(&mut new_ranges);
// We don't split memtables.
new_ranges.push(range);
} else {
// Get meta for this file.
let index = range.indices[0] - num_memtables;
let file = &files[index];
range.maybe_split_groups(file.meta_ref().num_row_groups, &mut new_ranges);
range.maybe_split(file.meta_ref().num_row_groups, &mut new_ranges);
}
}
@@ -532,11 +463,6 @@ impl RangeBuilderList {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
// common_telemetry::info!(
// "DEBUG_SCAN: build file ranges, file_index: {}, build_cost: {:?}",
// file_index,
// reader_metrics.build_cost
// );
builder.build_ranges(index.row_group_index, &mut ranges);
self.set_file_builder(file_index, Arc::new(builder));
}
@@ -581,6 +507,8 @@ mod tests {
use common_time::Timestamp;
use super::*;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::new_noop_file_purger;
type Output = (Vec<usize>, i64, i64);
@@ -736,7 +664,7 @@ mod tests {
assert!(range.can_split_preserve_order());
let mut output = Vec::new();
range.maybe_split(&mut output);
range.maybe_split(2, &mut output);
assert_eq!(
output,
@@ -783,26 +711,60 @@ mod tests {
assert!(!range.can_split_preserve_order());
let mut output = Vec::new();
range.maybe_split(&mut output);
range.maybe_split(2, &mut output);
assert_eq!(1, output.len());
}
fn new_file_handle(
file_id: FileId,
start_ts: i64,
end_ts: i64,
num_row_groups: u64,
) -> FileHandle {
let file_purger = new_noop_file_purger();
FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id,
time_range: (
Timestamp::new_second(start_ts),
Timestamp::new_second(end_ts),
),
level: 0,
file_size: 0,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups,
},
file_purger,
)
}
#[test]
fn test_maybe_split_ranges() {
let files = vec![
new_file_handle(FileId::random(), 1000, 2000, 2),
new_file_handle(FileId::random(), 3000, 4000, 1),
new_file_handle(FileId::random(), 3000, 4000, 1),
];
let ranges = vec![
RangeMeta {
time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
indices: smallvec![0],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 0
},
RowGroupIndex {
index: 1,
row_group_index: 1
}
],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: ALL_ROW_GROUPS,
},],
num_rows: 4,
},
RangeMeta {
@@ -811,20 +773,29 @@ mod tests {
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,
},
];
let output = maybe_split_ranges_for_seq_scan(ranges);
let output = maybe_split_ranges_for_seq_scan(ranges, 1, &files);
assert_eq!(
output,
vec![
RangeMeta {
time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
indices: smallvec![0],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],

View File

@@ -236,7 +236,7 @@ impl ScanRegion {
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
Ok(SeqScan::new(input))
Ok(SeqScan::new(input, false))
}
/// Unordered scan.
@@ -248,7 +248,7 @@ impl ScanRegion {
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
Ok(SeqScan::new(input))
Ok(SeqScan::new(input, false))
}
/// Creates a scan input.
@@ -621,6 +621,10 @@ impl ScanInput {
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<Source>> {
if sources.len() > 1 {
return Ok(sources);
}
debug_assert!(self.parallelism.parallelism > 1);
// Spawn a task for each source.
let sources = sources
@@ -635,55 +639,6 @@ impl ScanInput {
Ok(sources)
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
pub(crate) fn create_parallel_sources_no_semaphore(
&self,
sources: Vec<Source>,
) -> Result<Vec<Source>> {
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
self.spawn_scan_task_no_semaphore(source, sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
})
.collect();
Ok(sources)
}
/// Scans the input source in another task and sends batches to the sender.
pub(crate) fn spawn_scan_task_no_semaphore(
&self,
mut input: Source,
sender: mpsc::Sender<Result<Batch>>,
) {
common_runtime::spawn_global(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// // Safety: We never close the semaphore.
// let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
});
}
/// Prunes a memtable to scan and returns the builder to build readers.
pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
let memtable = &self.memtables[mem_index];
@@ -810,14 +765,9 @@ pub(crate) struct StreamContext {
impl StreamContext {
/// Creates a new [StreamContext] for [SeqScan].
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
// common_telemetry::info!(
// "DEBUG_SCAN: seq_scan_ctx, num_ranges: {}, ranges: {:?}",
// ranges.len(),
// ranges
// );
let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
READ_SST_COUNT.observe(input.num_files() as f64);
Self {

View File

@@ -36,7 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::{RangeBuilderList, RowGroupIndex, ALL_ROW_GROUPS};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
@@ -51,39 +51,27 @@ pub struct SeqScan {
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
/// The scanner is used for compaction.
compaction: bool,
}
impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
// TODO(yingwen): Set permits according to partition num. But we need to support file
// level parallelism.
let parallelism = input.parallelism.parallelism.max(1);
/// Creates a new [SeqScan] with the given input and compaction flag.
/// If `compaction` is true, the scanner will not attempt to split ranges.
pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
compaction: false,
}
}
/// Sets the scanner to be used for compaction.
pub(crate) fn with_compaction(mut self) -> Self {
self.compaction = true;
self
}
/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
@@ -115,7 +103,6 @@ impl SeqScan {
let reader = Self::build_all_merge_reader(
&self.stream_ctx,
partition_ranges,
self.semaphore.clone(),
self.compaction,
&part_metrics,
)
@@ -127,7 +114,6 @@ impl SeqScan {
async fn build_all_merge_reader(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
semaphore: Arc<Semaphore>,
compaction: bool,
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
@@ -146,21 +132,21 @@ impl SeqScan {
&mut sources,
);
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
Self::build_reader_from_sources(stream_ctx, sources, None).await
}
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Arc<Semaphore>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<BoxedBatchReader> {
if stream_ctx.input.parallelism.parallelism > 1 {
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel. We always spawn a task so we can control the parallelism
// by the semaphore.
sources = stream_ctx
.input
.create_parallel_sources_no_semaphore(sources)?;
.create_parallel_sources(sources, semaphore.clone())?;
}
let mut builder = MergeReaderBuilder::from_sources(sources);
@@ -207,7 +193,8 @@ impl SeqScan {
}
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
// FIXME(yingwen): Get target partition from prepare.
let semaphore = Arc::new(Semaphore::new(self.properties.partitions.len()));
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range();
@@ -230,7 +217,6 @@ impl SeqScan {
stream_ctx.input.num_files(),
));
// Scans each part.
// common_telemetry::info!("DEBUG_SCAN: scan partition range, partition: {}, num_ranges: {}", partition, partition_ranges.len());
for part_range in partition_ranges {
let mut sources = Vec::new();
build_sources(
@@ -242,9 +228,8 @@ impl SeqScan {
&mut sources,
);
// common_telemetry::info!("DEBUG_SCAN: scan part range, partition: {}, num_sources: {}, part_range: {:?}", partition, sources.len(), part_range);
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
Self::build_reader_from_sources(&stream_ctx, sources, Some(semaphore.clone()))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -332,12 +317,6 @@ impl RegionScanner for SeqScan {
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
let num_partitions = ranges.len();
let available_permits = self.semaphore.available_permits();
if available_permits < num_partitions {
self.semaphore
.add_permits(num_partitions - available_permits);
}
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
Ok(())

View File

@@ -251,14 +251,13 @@ mod test {
// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
let expected_partition_num = 5;
let mut result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
result.sort_by_key(|ranges| ranges[0].identifier);
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
@@ -266,18 +265,20 @@ mod test {
num_rows: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
vec![
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
];
assert_eq!(result, expected);