fix!: fix regression caused by unbalanced partitions and splitting ranges (#5090)

* feat: assign partition ranges by rows

* feat: balance partition rows

* feat: get uppoer bound for part nums

* feat: only split in non-compaction seq scan

* fix: parallel scan on multiple sources

* fix: can split check

* feat: scanner prepare by request

* feat: remove scan_parallelism

* docs: upate docs

* chore: update comment

* style: fix clippy

* feat: skip merge and dedup if there is only one source

* chore: Revert "feat: skip merge and dedup if there is only one source"

Since memtable won't do dedup jobs

This reverts commit 2fc7a54b11.

* test: avoid compaction in sqlness window sort test

* chore: do not create semaphore if num partitions is enough

* chore: more assertions

* chore: fix typo

* fix: compaction flag not set

* chore: address review comments
This commit is contained in:
Yingwen
2024-12-09 20:50:57 +08:00
committed by GitHub
parent 1b642ea6a9
commit 2fcb95f50a
19 changed files with 560 additions and 309 deletions

View File

@@ -136,7 +136,6 @@
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
@@ -464,7 +463,6 @@
| `region_engine.mito.experimental_write_cache_size` | String | `1GiB` | Capacity for write cache. If your disk space is sufficient, it is recommended to set it larger. |
| `region_engine.mito.experimental_write_cache_ttl` | String | Unset | TTL for write cache. |
| `region_engine.mito.sst_write_buffer_size` | String | `8MB` | Buffer size for SST writing. |
| `region_engine.mito.scan_parallelism` | Integer | `0` | Parallelism to scan a region (default: 1/4 of cpu cores).<br/>- `0`: using the default value (1/4 of cpu cores).<br/>- `1`: scan in current thread.<br/>- `n`: scan in parallelism n. |
| `region_engine.mito.parallel_scan_channel_size` | Integer | `32` | Capacity of the channel to send data from parallel scan tasks to the main task. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |

View File

@@ -492,12 +492,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32

View File

@@ -530,12 +530,6 @@ experimental_write_cache_ttl = "8h"
## Buffer size for SST writing.
sst_write_buffer_size = "8MB"
## Parallelism to scan a region (default: 1/4 of cpu cores).
## - `0`: using the default value (1/4 of cpu cores).
## - `1`: scan in current thread.
## - `n`: scan in parallelism n.
scan_parallelism = 0
## Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size = 32

View File

@@ -69,7 +69,6 @@ fn test_load_datanode_example_config() {
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
..Default::default()
}),
@@ -205,7 +204,6 @@ fn test_load_standalone_example_config() {
RegionEngineConfig::Mito(MitoConfig {
auto_flush_interval: Duration::from_secs(3600),
experimental_write_cache_ttl: Some(Duration::from_secs(60 * 60 * 8)),
scan_parallelism: 0,
..Default::default()
}),
RegionEngineConfig::File(EngineConfig {}),

View File

@@ -597,9 +597,8 @@ impl<'a> CompactionSstReaderBuilder<'a> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input)
.with_compaction()
.build_reader()
SeqScan::new(scan_input, true)
.build_reader_for_compaction()
.await
}
}

View File

@@ -30,7 +30,7 @@ use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
const MULTIPART_UPLOAD_MINIMUM_SIZE: ReadableSize = ReadableSize::mb(5);
/// Default channel size for parallel scan task.
const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
pub(crate) const DEFAULT_SCAN_CHANNEL_SIZE: usize = 32;
// Use `1/GLOBAL_WRITE_BUFFER_SIZE_FACTOR` of OS memory as global write buffer size in default mode
const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
@@ -107,11 +107,6 @@ pub struct MitoConfig {
// Other configs:
/// Buffer size for SST writing.
pub sst_write_buffer_size: ReadableSize,
/// Parallelism to scan a region (default: 1/4 of cpu cores).
/// - 0: using the default value (1/4 of cpu cores).
/// - 1: scan in current thread.
/// - n: scan in parallelism n.
pub scan_parallelism: usize,
/// Capacity of the channel to send data from parallel scan tasks to the main task (default 32).
pub parallel_scan_channel_size: usize,
/// Whether to allow stale entries read during replay.
@@ -156,7 +151,6 @@ impl Default for MitoConfig {
experimental_write_cache_size: ReadableSize::gb(1),
experimental_write_cache_ttl: None,
sst_write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
index: IndexConfig::default(),
@@ -229,11 +223,6 @@ impl MitoConfig {
);
}
// Use default value if `scan_parallelism` is 0.
if self.scan_parallelism == 0 {
self.scan_parallelism = divide_num_cpus(4);
}
if self.parallel_scan_channel_size < 1 {
self.parallel_scan_channel_size = DEFAULT_SCAN_CHANNEL_SIZE;
warn!(

View File

@@ -90,7 +90,7 @@ use crate::error::{
};
use crate::manifest::action::RegionEdit;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanParallelism, ScanRegion, Scanner};
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::wal::entry_distributor::{
build_wal_entry_distributor_and_receivers, DEFAULT_ENTRY_RECEIVER_BUFFER_SIZE,
@@ -171,19 +171,9 @@ impl MitoEngine {
self.scan_region(region_id, request)?.scanner()
}
/// Returns a region scanner to scan the region for `request`.
fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner()
}
/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.handle_query(region_id, request)
self.inner.scan_region(region_id, request)
}
/// Edit region's metadata by [RegionEdit] directly. Use with care.
@@ -423,7 +413,7 @@ impl EngineInner {
}
/// Handles the scan `request` and returns a [ScanRegion].
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self
@@ -433,14 +423,10 @@ impl EngineInner {
let version = region.version();
// Get cache.
let cache_manager = self.workers.cache_manager();
let scan_parallelism = ScanParallelism {
parallelism: self.config.scan_parallelism,
channel_size: self.config.parallel_scan_channel_size,
};
let scan_region =
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
.with_parallelism(scan_parallelism)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
@@ -538,7 +524,9 @@ impl RegionEngine for MitoEngine {
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
self.scan_region(region_id, request)
.map_err(BoxedError::new)?
.region_scanner()
.map_err(BoxedError::new)
}

View File

@@ -92,7 +92,6 @@ async fn test_append_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
scan_parallelism: 2,
..Default::default()
})
.await;
@@ -176,19 +175,19 @@ async fn test_append_mode_compaction() {
| b | 1.0 | 1970-01-01T00:00:01 |
+-------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(2, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine with parallelism 1.
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
scan_parallelism: 1,
..Default::default()
},
)

View File

@@ -92,7 +92,6 @@ async fn test_merge_mode_compaction() {
let mut env = TestEnv::new();
let engine = env
.create_engine(MitoConfig {
scan_parallelism: 2,
..Default::default()
})
.await;
@@ -190,19 +189,19 @@ async fn test_merge_mode_compaction() {
| a | | 13.0 | 1970-01-01T00:00:03 |
+-------+---------+---------+---------------------+";
// Scans in parallel.
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
let mut scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(1, scanner.num_files());
assert_eq!(1, scanner.num_memtables());
scanner.set_target_partitions(2);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopens engine with parallelism 1.
// Reopens engine.
let engine = env
.reopen_engine(
engine,
MitoConfig {
scan_parallelism: 1,
..Default::default()
},
)

View File

@@ -37,7 +37,6 @@ async fn scan_in_parallel(
) {
let engine = env
.open_engine(MitoConfig {
scan_parallelism: parallelism,
parallel_scan_channel_size: channel_size,
..Default::default()
})
@@ -57,7 +56,9 @@ async fn scan_in_parallel(
.unwrap();
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let mut scanner = engine.scanner(region_id, request).unwrap();
scanner.set_target_partitions(parallelism);
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+

View File

@@ -34,6 +34,16 @@ use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
/// Index and metadata for a memtable or file.
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct SourceIndex {
/// Index of the memtable and file.
pub(crate) index: usize,
/// Total number of row groups in this source. 0 if the metadata
/// is unavailable. We use this to split files.
pub(crate) num_row_groups: u64,
}
/// Index to access a row group.
#[derive(Debug, Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
@@ -52,7 +62,7 @@ pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
/// Indices to memtables or files.
indices: SmallVec<[usize; 2]>,
pub(crate) indices: SmallVec<[SourceIndex; 2]>,
/// Indices to memtable/file row groups that this range scans.
pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
/// Estimated number of rows in the range. This can be 0 if the statistics are not available.
@@ -81,12 +91,17 @@ impl RangeMeta {
}
/// Creates a list of ranges from the `input` for seq scan.
pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
/// If `compaction` is true, it doesn't split the ranges.
pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
if compaction {
// We don't split ranges in compaction.
return ranges;
}
maybe_split_ranges_for_seq_scan(ranges)
}
@@ -105,13 +120,13 @@ impl RangeMeta {
}
/// Returns true if the time range of given `meta` overlaps with the time range of this meta.
pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool {
fn overlaps(&self, meta: &RangeMeta) -> bool {
overlaps(&self.time_range, &meta.time_range)
}
/// Merges given `meta` to this meta.
/// It assumes that the time ranges overlap and they don't have the same file or memtable index.
pub(crate) fn merge(&mut self, mut other: RangeMeta) {
fn merge(&mut self, mut other: RangeMeta) {
debug_assert!(self.overlaps(&other));
debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
debug_assert!(self
@@ -130,22 +145,28 @@ impl RangeMeta {
/// Returns true if we can split the range into multiple smaller ranges and
/// still preserve the order for [SeqScan].
pub(crate) fn can_split_preserve_order(&self) -> bool {
// Only one source and multiple row groups.
self.indices.len() == 1 && self.row_group_indices.len() > 1
fn can_split_preserve_order(&self) -> bool {
self.indices.len() == 1 && self.indices[0].num_row_groups > 1
}
/// Splits the range if it can preserve the order.
pub(crate) fn maybe_split(self, output: &mut Vec<RangeMeta>) {
fn maybe_split(self, output: &mut Vec<RangeMeta>) {
if self.can_split_preserve_order() {
let num_row_groups = self.indices[0].num_row_groups;
debug_assert_eq!(1, self.row_group_indices.len());
debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
output.reserve(self.row_group_indices.len());
let num_rows = self.num_rows / self.row_group_indices.len();
let num_rows = self.num_rows / num_row_groups as usize;
// Splits by row group.
for index in self.row_group_indices {
for row_group_index in 0..num_row_groups {
output.push(RangeMeta {
time_range: self.time_range,
indices: self.indices.clone(),
row_group_indices: smallvec![index],
row_group_indices: smallvec![RowGroupIndex {
index: self.indices[0].index,
row_group_index: row_group_index as i64,
}],
num_rows,
});
}
@@ -165,7 +186,10 @@ impl RangeMeta {
let num_rows = stats.num_rows() / stats.num_ranges();
ranges.push(RangeMeta {
time_range,
indices: smallvec![memtable_index],
indices: smallvec![SourceIndex {
index: memtable_index,
num_row_groups: stats.num_ranges() as u64,
}],
row_group_indices: smallvec![RowGroupIndex {
index: memtable_index,
row_group_index: row_group_index as i64,
@@ -199,7 +223,10 @@ impl RangeMeta {
let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
ranges.push(RangeMeta {
time_range: time_range.unwrap_or_else(|| file.time_range()),
indices: smallvec![file_index],
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: file.meta_ref().num_row_groups,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
@@ -212,7 +239,10 @@ impl RangeMeta {
for row_group_index in 0..file.meta_ref().num_row_groups {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: file.meta_ref().num_row_groups,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
@@ -224,7 +254,10 @@ impl RangeMeta {
// If we don't known the number of row groups in advance, scan all row groups.
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: 0,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
@@ -245,7 +278,10 @@ impl RangeMeta {
};
ranges.push(RangeMeta {
time_range,
indices: smallvec![i],
indices: smallvec![SourceIndex {
index: i,
num_row_groups: stats.num_ranges() as u64,
}],
row_group_indices: smallvec![RowGroupIndex {
index: i,
row_group_index: ALL_ROW_GROUPS,
@@ -263,31 +299,18 @@ impl RangeMeta {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
if file.meta_ref().num_row_groups > 0 {
// All row groups share the same time range.
let row_group_indices = (0..file.meta_ref().num_row_groups)
.map(|row_group_index| RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
})
.collect();
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices,
num_rows: file.meta_ref().num_rows as usize,
});
} else {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![SourceIndex {
index: file_index,
num_row_groups: file.meta_ref().num_row_groups,
}],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
}
}
@@ -514,7 +537,10 @@ mod tests {
);
RangeMeta {
time_range,
indices: smallvec![*idx],
indices: smallvec![SourceIndex {
index: *idx,
num_row_groups: 0,
}],
row_group_indices: smallvec![RowGroupIndex {
index: *idx,
row_group_index: 0
@@ -527,7 +553,7 @@ mod tests {
let actual: Vec<_> = output
.iter()
.map(|range| {
let indices = range.indices.to_vec();
let indices = range.indices.iter().map(|index| index.index).collect();
let group_indices: Vec<_> = range
.row_group_indices
.iter()
@@ -578,7 +604,10 @@ mod tests {
fn test_merge_range() {
let mut left = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -593,7 +622,10 @@ mod tests {
};
let right = RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
indices: smallvec![2],
indices: smallvec![SourceIndex {
index: 2,
num_row_groups: 2,
}],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
@@ -612,7 +644,16 @@ mod tests {
left,
RangeMeta {
time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
indices: smallvec![
SourceIndex {
index: 1,
num_row_groups: 2
},
SourceIndex {
index: 2,
num_row_groups: 2
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -640,17 +681,14 @@ mod tests {
fn test_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 1
},
RowGroupIndex {
index: 1,
row_group_index: 2
}
],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: 5,
};
@@ -663,19 +701,25 @@ mod tests {
&[
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
row_group_index: 0
},],
num_rows: 2,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 2
row_group_index: 1
}],
num_rows: 2,
}
@@ -687,7 +731,16 @@ mod tests {
fn test_not_split_range() {
let range = RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1, 2],
indices: smallvec![
SourceIndex {
index: 1,
num_row_groups: 1,
},
SourceIndex {
index: 2,
num_row_groups: 1,
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
@@ -710,32 +763,50 @@ mod tests {
#[test]
fn test_maybe_split_ranges() {
let ranges = vec![
RangeMeta {
time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
indices: smallvec![SourceIndex {
index: 0,
num_row_groups: 1,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
row_group_indices: smallvec![
RowGroupIndex {
index: 1,
row_group_index: 0
},
RowGroupIndex {
index: 1,
row_group_index: 1
}
],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: ALL_ROW_GROUPS,
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
indices: smallvec![
SourceIndex {
index: 2,
num_row_groups: 2,
},
SourceIndex {
index: 3,
num_row_groups: 0,
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,
@@ -745,9 +816,24 @@ mod tests {
assert_eq!(
output,
vec![
RangeMeta {
time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
indices: smallvec![SourceIndex {
index: 0,
num_row_groups: 1,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0
},],
num_rows: 4,
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 0
@@ -756,7 +842,10 @@ mod tests {
},
RangeMeta {
time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
indices: smallvec![1],
indices: smallvec![SourceIndex {
index: 1,
num_row_groups: 2,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 1,
row_group_index: 1
@@ -765,15 +854,24 @@ mod tests {
},
RangeMeta {
time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
indices: smallvec![2, 3],
indices: smallvec![
SourceIndex {
index: 2,
num_row_groups: 2
},
SourceIndex {
index: 3,
num_row_groups: 0,
}
],
row_group_indices: smallvec![
RowGroupIndex {
index: 2,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
},
RowGroupIndex {
index: 3,
row_group_index: 0
row_group_index: ALL_ROW_GROUPS,
}
],
num_rows: 5,

View File

@@ -33,6 +33,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::FileCacheRef;
use crate::cache::CacheManagerRef;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
@@ -68,15 +69,6 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)),
}
}
}
#[cfg(test)]
@@ -104,6 +96,17 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
}
}
/// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
let request = PrepareRequest::default().with_target_partitions(target_partitions);
match self {
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -165,8 +168,8 @@ pub(crate) struct ScanRegion {
request: ScanRequest,
/// Cache.
cache_manager: CacheManagerRef,
/// Parallelism to scan.
parallelism: ScanParallelism,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
@@ -188,17 +191,20 @@ impl ScanRegion {
access_layer,
request,
cache_manager,
parallelism: ScanParallelism::default(),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
ignore_inverted_index: false,
ignore_fulltext_index: false,
start_time: None,
}
}
/// Sets parallelism.
/// Sets parallel scan task channel size.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
self.parallelism = parallelism;
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -224,7 +230,7 @@ impl ScanRegion {
/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
if self.version.options.append_mode && self.request.series_row_selector.is_none() {
if self.use_unordered_scan() {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.unordered_scan().map(Scanner::Unordered)
@@ -233,10 +239,20 @@ impl ScanRegion {
}
}
/// Returns a [RegionScanner] to scan the region.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
if self.use_unordered_scan() {
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
} else {
self.seq_scan().map(|scanner| Box::new(scanner) as _)
}
}
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
Ok(SeqScan::new(input))
Ok(SeqScan::new(input, false))
}
/// Unordered scan.
@@ -248,7 +264,14 @@ impl ScanRegion {
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
Ok(SeqScan::new(input))
Ok(SeqScan::new(input, false))
}
/// Returns true if the region can use unordered scan for current request.
fn use_unordered_scan(&self) -> bool {
// If table is append only and there is no series row selector, we use unordered scan in query.
// We still use seq scan in compaction.
self.version.options.append_mode && self.request.series_row_selector.is_none()
}
/// Creates a scan input.
@@ -314,7 +337,7 @@ impl ScanRegion {
.with_cache(self.cache_manager)
.with_inverted_index_applier(inverted_index_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallelism(self.parallelism)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
@@ -428,15 +451,6 @@ impl ScanRegion {
}
}
/// Config for parallel scan.
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct ScanParallelism {
/// Number of tasks expect to spawn to read data.
pub(crate) parallelism: usize,
/// Channel size to send batches. Only takes effect when the parallelism > 1.
pub(crate) channel_size: usize,
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -466,8 +480,8 @@ pub(crate) struct ScanInput {
pub(crate) cache_manager: CacheManagerRef,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Parallelism to scan data.
pub(crate) parallelism: ScanParallelism,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
pub(crate) parallel_scan_channel_size: usize,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
@@ -496,7 +510,7 @@ impl ScanInput {
files: Vec::new(),
cache_manager: CacheManagerRef::default(),
ignore_file_not_found: false,
parallelism: ScanParallelism::default(),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
fulltext_index_applier: None,
query_start: None,
@@ -549,10 +563,13 @@ impl ScanInput {
self
}
/// Sets scan parallelism.
/// Sets scan task channel size.
#[must_use]
pub(crate) fn with_parallelism(mut self, parallelism: ScanParallelism) -> Self {
self.parallelism = parallelism;
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
@@ -621,12 +638,15 @@ impl ScanInput {
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<Source>> {
debug_assert!(self.parallelism.parallelism > 1);
if sources.len() <= 1 {
return Ok(sources);
}
// Spawn a task for each source.
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallelism.channel_size);
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
@@ -761,9 +781,9 @@ pub(crate) struct StreamContext {
impl StreamContext {
/// Creates a new [StreamContext] for [SeqScan].
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
READ_SST_COUNT.observe(input.num_files() as f64);
Self {

View File

@@ -28,7 +28,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
@@ -51,39 +51,27 @@ pub struct SeqScan {
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
/// The scanner is used for compaction.
compaction: bool,
}
impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
// TODO(yingwen): Set permits according to partition num. But we need to support file
// level parallelism.
let parallelism = input.parallelism.parallelism.max(1);
/// Creates a new [SeqScan] with the given input and compaction flag.
/// If `compaction` is true, the scanner will not attempt to split ranges.
pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
compaction: false,
compaction,
}
}
/// Sets the scanner to be used for compaction.
pub(crate) fn with_compaction(mut self) -> Self {
self.compaction = true;
self
}
/// Builds a stream for the query.
///
/// The returned stream is not partitioned and will contains all the data. If want
@@ -98,7 +86,12 @@ impl SeqScan {
}
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
///
/// # Panics
/// Panics if the compaction flag is not set.
pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
assert!(self.compaction);
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
0,
@@ -112,23 +105,20 @@ impl SeqScan {
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
let reader = Self::build_all_merge_reader(
let reader = Self::merge_all_ranges_for_compaction(
&self.stream_ctx,
partition_ranges,
self.semaphore.clone(),
self.compaction,
&part_metrics,
)
.await?;
Ok(Box::new(reader))
}
/// Builds a merge reader that reads all data.
async fn build_all_merge_reader(
/// Builds a merge reader that reads all ranges.
/// Callers MUST not split ranges before calling this method.
async fn merge_all_ranges_for_compaction(
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
semaphore: Arc<Semaphore>,
compaction: bool,
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
@@ -140,27 +130,37 @@ impl SeqScan {
build_sources(
stream_ctx,
part_range,
compaction,
true,
part_metrics,
range_builder_list.clone(),
&mut sources,
);
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
common_telemetry::debug!(
"Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
stream_ctx.input.mapper.metadata().region_id,
partition_ranges.len(),
sources.len()
);
Self::build_reader_from_sources(stream_ctx, sources, None).await
}
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
/// if possible.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Arc<Semaphore>,
semaphore: Option<Arc<Semaphore>>,
) -> Result<BoxedBatchReader> {
if stream_ctx.input.parallelism.parallelism > 1 {
// Read sources in parallel. We always spawn a task so we can control the parallelism
// by the semaphore.
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
if let Some(semaphore) = semaphore.as_ref() {
// Read sources in parallel.
if sources.len() > 1 {
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
}
}
let mut builder = MergeReaderBuilder::from_sources(sources);
@@ -207,10 +207,21 @@ impl SeqScan {
}
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let semaphore = if self.properties.target_partitions() > self.properties.num_partitions() {
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
// This semaphore is partition level.
// We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
// of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
// files in a part range.
Some(Arc::new(Semaphore::new(
self.properties.target_partitions() - self.properties.num_partitions() + 1,
)))
} else {
None
};
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range();
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
@@ -325,13 +336,8 @@ impl RegionScanner for SeqScan {
self.scan_partition_impl(partition)
}
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}
@@ -375,6 +381,20 @@ fn build_sources(
) {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
if compaction {
// Compaction expects input sources are not been split.
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
// It should scan all row groups.
debug_assert_eq!(
-1, row_group_idx.row_group_index,
"Expect {} range scan all row groups, given: {}",
i, row_group_idx.row_group_index,
);
}
}
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {

View File

@@ -27,7 +27,7 @@ use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::range::RangeBuilderList;
@@ -144,7 +144,7 @@ impl UnorderedScan {
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range();
let distinguish_range = self.properties.distinguish_partition_range;
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -231,13 +231,8 @@ impl RegionScanner for UnorderedScan {
self.stream_ctx.input.mapper.output_schema()
}
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BinaryHeap;
use std::sync::Arc;
use common_telemetry::debug;
@@ -93,7 +94,7 @@ impl ParallelizeScan {
// update the partition ranges
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
.with_new_partitions(partition_ranges, expected_partition_num)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_exec)));
}
@@ -109,21 +110,71 @@ impl ParallelizeScan {
/// Distribute [`PartitionRange`]s to each partition.
///
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
/// Currently we assign ranges to partitions according to their rows so each partition
/// has similar number of rows.
/// This method may return partitions with smaller number than `expected_partition_num`
/// if the number of ranges is smaller than `expected_partition_num`. But this will
/// return at least one partition.
fn assign_partition_range(
ranges: Vec<PartitionRange>,
mut ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
if ranges.is_empty() {
// Returns a single partition with no range.
return vec![vec![]];
}
if ranges.len() == 1 {
return vec![ranges];
}
// Sort ranges by number of rows in descending order.
ranges.sort_by(|a, b| b.num_rows.cmp(&a.num_rows));
// Get the max row number of the ranges. Note that the number of rows may be 0 if statistics are not available.
let max_rows = ranges[0].num_rows;
let total_rows = ranges.iter().map(|range| range.num_rows).sum::<usize>();
// Computes the partition num by the max row number. This eliminates the unbalance of the partitions.
let balanced_partition_num = if max_rows > 0 {
total_rows.div_ceil(max_rows)
} else {
ranges.len()
};
let actual_partition_num = expected_partition_num.min(balanced_partition_num).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];
// round-robin assignment
for (i, range) in ranges.into_iter().enumerate() {
let partition_idx = i % expected_partition_num;
#[derive(Eq, PartialEq)]
struct HeapNode {
num_rows: usize,
partition_idx: usize,
}
impl Ord for HeapNode {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
// Reverse for min-heap.
self.num_rows.cmp(&other.num_rows).reverse()
}
}
impl PartialOrd for HeapNode {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
let mut part_heap =
BinaryHeap::from_iter((0..actual_partition_num).map(|partition_idx| HeapNode {
num_rows: 0,
partition_idx,
}));
// Assigns the range to the partition with the smallest number of rows.
for range in ranges {
// Safety: actual_partition_num always > 0.
let mut node = part_heap.pop().unwrap();
let partition_idx = node.partition_idx;
node.num_rows += range.num_rows;
partition_ranges[partition_idx].push(range);
part_heap.push(node);
}
partition_ranges
@@ -172,18 +223,18 @@ mod test {
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
vec![
PartitionRange {
@@ -193,10 +244,10 @@ mod test {
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
],
];
@@ -207,10 +258,10 @@ mod test {
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
@@ -218,18 +269,20 @@ mod test {
num_rows: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 250,
identifier: 4,
}],
vec![
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
];
assert_eq!(result, expected);
@@ -237,4 +290,68 @@ mod test {
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
}
#[test]
fn test_assign_unbalance_partition_range() {
let ranges = vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 2500,
identifier: 4,
},
];
// assign to 2 partitions
let expected_partition_num = 2;
let result =
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
num_rows: 2500,
identifier: 4,
}],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
num_rows: 100,
identifier: 1,
},
],
];
assert_eq!(result, expected);
}
}

View File

@@ -206,16 +206,13 @@ pub struct ScannerProperties {
/// Whether to yield an empty batch to distinguish partition ranges.
pub distinguish_partition_range: bool,
/// The target partitions of the scanner. 0 indicates using the number of partitions as target partitions.
target_partitions: usize,
}
impl ScannerProperties {
/// Initialize partitions with given parallelism for scanner.
pub fn with_parallelism(mut self, parallelism: usize) -> Self {
self.partitions = vec![vec![]; parallelism];
self
}
/// Set append mode for scanner.
/// Sets append mode for scanner.
pub fn with_append_mode(mut self, append_mode: bool) -> Self {
self.append_mode = append_mode;
self
@@ -234,9 +231,24 @@ impl ScannerProperties {
append_mode,
total_rows,
distinguish_partition_range: false,
target_partitions: 0,
}
}
/// Updates the properties with the given [PrepareRequest].
pub fn prepare(&mut self, request: PrepareRequest) {
if let Some(ranges) = request.ranges {
self.partitions = ranges;
}
if let Some(distinguish_partition_range) = request.distinguish_partition_range {
self.distinguish_partition_range = distinguish_partition_range;
}
if let Some(target_partitions) = request.target_partitions {
self.target_partitions = target_partitions;
}
}
/// Returns the number of actual partitions.
pub fn num_partitions(&self) -> usize {
self.partitions.len()
}
@@ -249,8 +261,44 @@ impl ScannerProperties {
self.total_rows
}
pub fn distinguish_partition_range(&self) -> bool {
self.distinguish_partition_range
/// Returns the target partitions of the scanner. If it is not set, returns the number of partitions.
pub fn target_partitions(&self) -> usize {
if self.target_partitions == 0 {
self.num_partitions()
} else {
self.target_partitions
}
}
}
/// Request to override the scanner properties.
#[derive(Default)]
pub struct PrepareRequest {
/// Assigned partition ranges.
pub ranges: Option<Vec<Vec<PartitionRange>>>,
/// Distringuishes partition range by empty batches.
pub distinguish_partition_range: Option<bool>,
/// The expected number of target partitions.
pub target_partitions: Option<usize>,
}
impl PrepareRequest {
/// Sets the ranges.
pub fn with_ranges(mut self, ranges: Vec<Vec<PartitionRange>>) -> Self {
self.ranges = Some(ranges);
self
}
/// Sets the distinguish partition range flag.
pub fn with_distinguish_partition_range(mut self, distinguish_partition_range: bool) -> Self {
self.distinguish_partition_range = Some(distinguish_partition_range);
self
}
/// Sets the target partitions.
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.target_partitions = Some(target_partitions);
self
}
}
@@ -271,11 +319,7 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Prepares the scanner with the given partition ranges.
///
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError>;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>;
/// Scans the partition and returns a stream of record batches.
///
@@ -431,9 +475,7 @@ impl SinglePartitionScanner {
Self {
stream: Mutex::new(Some(stream)),
schema,
properties: ScannerProperties::default()
.with_parallelism(1)
.with_append_mode(append_mode),
properties: ScannerProperties::default().with_append_mode(append_mode),
metadata,
}
}
@@ -454,13 +496,8 @@ impl RegionScanner for SinglePartitionScanner {
self.schema.clone()
}
fn prepare(
&mut self,
ranges: Vec<Vec<PartitionRange>>,
distinguish_partition_range: bool,
) -> Result<(), BoxedError> {
self.properties.partitions = ranges;
self.properties.distinguish_partition_range = distinguish_partition_range;
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
self.properties.prepare(request);
Ok(())
}

View File

@@ -35,7 +35,7 @@ use datafusion_common::{ColumnStatistics, DataFusionError, Statistics};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use futures::{Stream, StreamExt};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScannerRef};
use crate::table::metrics::StreamMetrics;
@@ -112,6 +112,7 @@ impl RegionScanExec {
pub fn with_new_partitions(
&self,
partitions: Vec<Vec<PartitionRange>>,
target_partitions: usize,
) -> Result<Self, BoxedError> {
if self.is_partition_set {
warn!("Setting partition ranges more than once for RegionScanExec");
@@ -123,8 +124,11 @@ impl RegionScanExec {
{
let mut scanner = self.scanner.lock().unwrap();
let distinguish_partition_range = scanner.properties().distinguish_partition_range();
scanner.prepare(partitions, distinguish_partition_range)?;
scanner.prepare(
PrepareRequest::default()
.with_ranges(partitions)
.with_target_partitions(target_partitions),
)?;
}
Ok(Self {
@@ -141,9 +145,10 @@ impl RegionScanExec {
pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) {
let mut scanner = self.scanner.lock().unwrap();
let partition_ranges = scanner.properties().partitions.clone();
// set distinguish_partition_range won't fail
let _ = scanner.prepare(partition_ranges, distinguish_partition_range);
let _ = scanner.prepare(
PrepareRequest::default().with_distinguish_partition_range(distinguish_partition_range),
);
}
pub fn time_index(&self) -> String {

View File

@@ -1,5 +1,5 @@
-- Test without PK, with a windowed sort query.
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
Affected Rows: 0
@@ -69,8 +69,8 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
@@ -101,9 +101,9 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=4 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=4 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
@@ -113,7 +113,7 @@ DROP TABLE test;
Affected Rows: 0
-- Test with PK, with a windowed sort query.
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
Affected Rows: 0
@@ -183,9 +183,9 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=4 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+
@@ -216,9 +216,9 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=4 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=4 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=4 (1 memtable ranges, 3 file 3 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
+-+-+-+

View File

@@ -1,5 +1,5 @@
-- Test without PK, with a windowed sort query.
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test(i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
INSERT INTO test VALUES (1, 1), (NULL, 2), (1, 3);
@@ -36,7 +36,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
DROP TABLE test;
-- Test with PK, with a windowed sort query.
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX);
CREATE TABLE test_pk(pk INTEGER PRIMARY KEY, i INTEGER, t TIMESTAMP TIME INDEX) WITH('compaction.type'='twcs', 'compaction.twcs.max_inactive_window_files'='4');
INSERT INTO test_pk VALUES (1, 1, 1), (2, NULL, 2), (3, 1, 3);