feat: assign partition ranges by rows

This commit is contained in:
evenyag
2024-11-26 20:17:32 +08:00
parent ff4c153d4b
commit 55e8d3549c
5 changed files with 287 additions and 46 deletions

View File

@@ -32,7 +32,7 @@ use crate::sst::parquet::format::parquet_row_group_time_range;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
pub(crate) const ALL_ROW_GROUPS: i64 = -1;
/// Index to access a row group.
#[derive(Debug, Clone, Copy, PartialEq)]
@@ -52,7 +52,7 @@ pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
/// Indices to memtables or files.
indices: SmallVec<[usize; 2]>,
pub(crate) indices: SmallVec<[usize; 2]>,
/// Indices to memtable/file row groups that this range scans.
pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
/// Estimated number of rows in the range. This can be 0 if the statistics are not available.
@@ -87,7 +87,19 @@ impl RangeMeta {
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
maybe_split_ranges_for_seq_scan(ranges)
// common_telemetry::info!(
// "DEBUG_SCAN: seq_scan_ranges before split, num_ranges: {}",
// ranges.len(),
// );
// maybe_split_ranges_for_seq_scan(ranges)
// let ranges =
// maybe_split_ranges_for_seq_scan_groups(ranges, input.memtables.len(), &input.files);
// common_telemetry::info!(
// "DEBUG_SCAN: seq_scan_ranges after split, num_ranges: {}, ranges: {:?}",
// ranges.len(),
// ranges,
// );
ranges
}
/// Creates a list of ranges from the `input` for unordered scan.
@@ -132,7 +144,18 @@ impl RangeMeta {
/// still preserve the order for [SeqScan].
pub(crate) fn can_split_preserve_order(&self) -> bool {
// Only one source and multiple row groups.
self.indices.len() == 1 && self.row_group_indices.len() > 1
if self.indices.len() == 1 {
if self.row_group_indices.len() > 1 {
return true;
} else if self.row_group_indices.len() == 1
&& self.row_group_indices[0].row_group_index == ALL_ROW_GROUPS
{
return true;
}
}
false
// fixme(yingwen): Remove this.
// self.indices.len() == 1 && self.row_group_indices.len() > 1
}
/// Splits the range if it can preserve the order.
@@ -154,6 +177,28 @@ impl RangeMeta {
}
}
/// Splits the range if it can preserve the order.
pub(crate) fn maybe_split_groups(self, num_row_groups: u64, output: &mut Vec<RangeMeta>) {
if self.can_split_preserve_order() && num_row_groups > 0 {
output.reserve(self.row_group_indices.len());
let num_rows = self.num_rows / num_row_groups as usize;
// Splits by row group.
for row_group_index in 0..num_row_groups {
output.push(RangeMeta {
time_range: self.time_range,
indices: self.indices.clone(),
row_group_indices: smallvec![RowGroupIndex {
index: self.indices[0],
row_group_index: row_group_index as i64,
}],
num_rows,
});
}
} else {
output.push(self);
}
}
fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
// For append mode, we can parallelize reading memtables.
for (memtable_index, memtable) in memtables.iter().enumerate() {
@@ -263,31 +308,40 @@ impl RangeMeta {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
if file.meta_ref().num_row_groups > 0 {
// All row groups share the same time range.
let row_group_indices = (0..file.meta_ref().num_row_groups)
.map(|row_group_index| RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
})
.collect();
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices,
num_rows: file.meta_ref().num_rows as usize,
});
} else {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
// if file.meta_ref().num_row_groups > 0 {
// // All row groups share the same time range.
// let row_group_indices = (0..file.meta_ref().num_row_groups)
// .map(|row_group_index| RowGroupIndex {
// index: file_index,
// row_group_index: row_group_index as i64,
// })
// .collect();
// ranges.push(RangeMeta {
// time_range: file.time_range(),
// indices: smallvec![file_index],
// row_group_indices,
// num_rows: file.meta_ref().num_rows as usize,
// });
// } else {
// ranges.push(RangeMeta {
// time_range: file.time_range(),
// indices: smallvec![file_index],
// row_group_indices: smallvec![RowGroupIndex {
// index: file_index,
// row_group_index: ALL_ROW_GROUPS,
// }],
// num_rows: file.meta_ref().num_rows as usize,
// });
// }
}
}
}
@@ -341,6 +395,27 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
new_ranges
}
/// Splits the range into multiple smaller ranges.
/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
fn maybe_split_ranges_for_seq_scan_groups(
ranges: Vec<RangeMeta>,
num_memtables: usize,
files: &[FileHandle],
) -> Vec<RangeMeta> {
let mut new_ranges = Vec::with_capacity(ranges.len());
for range in ranges {
if range.indices[0] < num_memtables {
range.maybe_split(&mut new_ranges);
} else {
let index = range.indices[0] - num_memtables;
let file = &files[index];
range.maybe_split_groups(file.meta_ref().num_row_groups, &mut new_ranges);
}
}
new_ranges
}
/// Builder to create file ranges.
#[derive(Default)]
pub(crate) struct FileRangeBuilder {
@@ -457,6 +532,11 @@ impl RangeBuilderList {
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
// common_telemetry::info!(
// "DEBUG_SCAN: build file ranges, file_index: {}, build_cost: {:?}",
// file_index,
// reader_metrics.build_cost
// );
builder.build_ranges(index.row_group_index, &mut ranges);
self.set_file_builder(file_index, Arc::new(builder));
}

View File

@@ -635,6 +635,55 @@ impl ScanInput {
Ok(sources)
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
pub(crate) fn create_parallel_sources_no_semaphore(
&self,
sources: Vec<Source>,
) -> Result<Vec<Source>> {
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
self.spawn_scan_task_no_semaphore(source, sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
})
.collect();
Ok(sources)
}
/// Scans the input source in another task and sends batches to the sender.
pub(crate) fn spawn_scan_task_no_semaphore(
&self,
mut input: Source,
sender: mpsc::Sender<Result<Batch>>,
) {
common_runtime::spawn_global(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit held.
let maybe_batch = {
// // Safety: We never close the semaphore.
// let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
});
}
/// Prunes a memtable to scan and returns the builder to build readers.
pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
let memtable = &self.memtables[mem_index];
@@ -764,6 +813,11 @@ impl StreamContext {
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
// common_telemetry::info!(
// "DEBUG_SCAN: seq_scan_ctx, num_ranges: {}, ranges: {:?}",
// ranges.len(),
// ranges
// );
READ_SST_COUNT.observe(input.num_files() as f64);
Self {

View File

@@ -172,6 +172,7 @@ pub(crate) fn scan_file_ranges(
let build_reader_start = Instant::now();
let reader = range.reader(stream_ctx.input.series_row_selector).await?;
let build_cost = build_reader_start.elapsed();
// common_telemetry::info!("DEBUG_SCAN: scan file ranges, build reader, file_id: {}, index: {:?}, build_cost: {:?}", range.file_handle().file_id(), index, build_cost);
part_metrics.inc_build_reader_cost(build_cost);
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);

View File

@@ -36,7 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::range::RangeBuilderList;
use crate::read::range::{RangeBuilderList, RowGroupIndex, ALL_ROW_GROUPS};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
@@ -160,7 +160,7 @@ impl SeqScan {
// by the semaphore.
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
.create_parallel_sources_no_semaphore(sources)?;
}
let mut builder = MergeReaderBuilder::from_sources(sources);
@@ -230,6 +230,7 @@ impl SeqScan {
stream_ctx.input.num_files(),
));
// Scans each part.
// common_telemetry::info!("DEBUG_SCAN: scan partition range, partition: {}, num_ranges: {}", partition, partition_ranges.len());
for part_range in partition_ranges {
let mut sources = Vec::new();
build_sources(
@@ -241,6 +242,7 @@ impl SeqScan {
&mut sources,
);
// common_telemetry::info!("DEBUG_SCAN: scan part range, partition: {}, num_sources: {}, part_range: {:?}", partition, sources.len(), part_range);
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
.await
@@ -330,6 +332,12 @@ impl RegionScanner for SeqScan {
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
let num_partitions = ranges.len();
let available_permits = self.semaphore.available_permits();
if available_permits < num_partitions {
self.semaphore
.add_permits(num_partitions - available_permits);
}
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
Ok(())

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BinaryHeap;
use std::sync::Arc;
use common_telemetry::debug;
@@ -114,16 +115,48 @@ impl ParallelizeScan {
/// if the number of ranges is smaller than `expected_partition_num`. But this will
/// return at least one partition.
fn assign_partition_range(
ranges: Vec<PartitionRange>,
mut ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
// round-robin assignment
for (i, range) in ranges.into_iter().enumerate() {
let partition_idx = i % expected_partition_num;
// Sort ranges by number of rows in descending order.
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
#[derive(Eq, PartialEq)]
struct HeapNode {
num_rows: usize,
partition_idx: usize,
}
impl Ord for HeapNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse for min-heap.
self.num_rows.cmp(&other.num_rows).reverse()
}
}
impl PartialOrd for HeapNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
let mut part_heap =
BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode {
num_rows: 0,
partition_idx,
}));
// Assigns the range to the partition with the smallest number of rows.
for range in ranges {
// Safety: actual_partition_num always > 0.
let mut node = part_heap.pop().unwrap();
let partition_idx = node.partition_idx;
node.num_rows += range.num_rows;
partition_ranges[partition_idx].push(range);
part_heap.push(node);
}
partition_ranges
@@ -172,18 +205,18 @@ mod test {
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
vec![
PartitionRange {
@@ -193,10 +226,10 @@ mod test {
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
];
@@ -204,7 +237,8 @@ mod test {
// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
let expected_partition_num = 5;
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let mut result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
result.sort_by_key(|ranges| ranges[0].identifier);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
@@ -237,4 +271,68 @@ mod test {
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
}
#[test]
fn test_assign_unbalance_partition_range() {
let ranges = vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 2500,
identifier: 4,
},
];
// assign to 2 partitions
let expected_partition_num = 2;
let result =
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 2500,
identifier: 4,
}],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
];
assert_eq!(result, expected);
}
}