feat: initialize partition range from ScanInput (#4635)

* feat: initialize partition range from ScanInput

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use num_rows instead

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add todo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* setup unordered scan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Update src/mito2/src/read/scan_region.rs

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

* leave unordered scan unchanged

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2024-08-30 15:30:37 +08:00
committed by GitHub
parent f641c562c2
commit a37aeb2814
5 changed files with 49 additions and 19 deletions

View File

@@ -25,7 +25,7 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use smallvec::SmallVec;
use store_api::region_engine::RegionScannerRef;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
@@ -705,6 +705,37 @@ impl ScanInput {
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
rows_in_files + rows_in_memtables
}
/// Retrieves [`PartitionRange`] from memtable and files
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
let mut id = 0;
let mut container = Vec::with_capacity(self.memtables.len() + self.files.len());
for memtable in &self.memtables {
let range = PartitionRange {
// TODO(ruihang): filter out empty memtables in the future.
start: memtable.stats().time_range().unwrap().0,
end: memtable.stats().time_range().unwrap().1,
num_rows: memtable.stats().num_rows(),
identifier: id,
};
id += 1;
container.push(range);
}
for file in &self.files {
let range = PartitionRange {
start: file.meta_ref().time_range.0,
end: file.meta_ref().time_range.1,
num_rows: file.meta_ref().num_rows as usize,
identifier: id,
};
id += 1;
container.push(range);
}
container
}
}
#[cfg(test)]

View File

@@ -67,10 +67,11 @@ impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));
Self {

View File

@@ -364,7 +364,7 @@ impl TestEnv {
.as_path()
.display()
.to_string();
let mut builder = Fs::default();
let builder = Fs::default();
let object_store = ObjectStore::new(builder.root(&data_path)).unwrap().finish();
object_store_manager.add(storage_name, object_store);
}

View File

@@ -61,7 +61,6 @@ impl ParallelizeScan {
debug!(
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);
// update the partition ranges
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
@@ -114,25 +113,25 @@ mod test {
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
num_rows: 250,
identifier: 4,
},
];
@@ -146,13 +145,13 @@ mod test {
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
num_rows: 150,
identifier: 3,
},
],
@@ -160,13 +159,13 @@ mod test {
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
num_rows: 250,
identifier: 4,
},
],
@@ -180,25 +179,25 @@ mod test {
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
num_rows: 100,
identifier: 1,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
num_rows: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
num_rows: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
num_rows: 250,
identifier: 4,
}],
];

View File

@@ -149,9 +149,8 @@ pub struct PartitionRange {
pub start: Timestamp,
/// End time of time index column. Inclusive.
pub end: Timestamp,
/// Estimate size of this range. Is used to balance ranges between partitions.
/// No base unit, just a number.
pub estimated_size: usize,
/// Number of rows in this range. Is used to balance ranges between partitions.
pub num_rows: usize,
/// Identifier to this range. Assigned by storage engine.
pub identifier: usize,
}