feat: unordered scanner scans data by time ranges (#4757)

* feat: define range meta

* feat: group ranges

* feat: split range

* feat: build ranges from the scan input

* feat: get partition range from range meta

* feat: build file range

* feat: unordered scan read by ranges

* feat: wip for mem ranges

* feat: build ranges

* feat: remove unused codes

* chore: update comments

* feat: update metrics

* chore: address review comments

* chore: debug assertion
This commit is contained in:
Yingwen
2024-09-29 15:14:48 +08:00
committed by GitHub
parent 50cb59587d
commit cd55202136
13 changed files with 711 additions and 276 deletions

View File

@@ -14,6 +14,7 @@
//! Memtables are write buffers for regions.
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -71,6 +72,8 @@ pub struct MemtableStats {
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
num_rows: usize,
/// Total number of ranges in the memtable.
num_ranges: usize,
}
impl MemtableStats {
@@ -95,6 +98,11 @@ impl MemtableStats {
pub fn num_rows(&self) -> usize {
self.num_rows
}
/// Returns the number of ranges in the memtable.
pub fn num_ranges(&self) -> usize {
self.num_ranges
}
}
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
@@ -123,11 +131,12 @@ pub trait Memtable: Send + Sync + fmt::Debug {
) -> Result<BoxedBatchIterator>;
/// Returns the ranges in the memtable.
/// The returned map contains the range id and the range after applying the predicate.
fn ranges(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange>;
) -> BTreeMap<usize, MemtableRange>;
/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
@@ -332,7 +341,6 @@ impl MemtableRangeContext {
pub struct MemtableRange {
/// Shared context.
context: MemtableRangeContextRef,
// TODO(yingwen): Id to identify the range in the memtable.
}
impl MemtableRange {

View File

@@ -14,6 +14,7 @@
//! Memtable implementation for bulk load
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use store_api::metadata::RegionMetadataRef;
@@ -67,7 +68,7 @@ impl Memtable for BulkMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
todo!()
}

View File

@@ -24,6 +24,7 @@ mod shard;
mod shard_builder;
mod tree;
use std::collections::BTreeMap;
use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -176,7 +177,7 @@ impl Memtable for PartitionTreeMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
let projection = projection.map(|ids| ids.to_vec());
let builder = Box::new(PartitionTreeIterBuilder {
tree: self.tree.clone(),
@@ -185,7 +186,7 @@ impl Memtable for PartitionTreeMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
vec![MemtableRange::new(context)]
[(0, MemtableRange::new(context))].into()
}
fn is_empty(&self) -> bool {
@@ -207,6 +208,7 @@ impl Memtable for PartitionTreeMemtable {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
};
}
@@ -225,6 +227,7 @@ impl Memtable for PartitionTreeMemtable {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
}
}

View File

@@ -287,7 +287,7 @@ impl Memtable for TimeSeriesMemtable {
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
) -> BTreeMap<usize, MemtableRange> {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
@@ -305,7 +305,7 @@ impl Memtable for TimeSeriesMemtable {
});
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
vec![MemtableRange::new(context)]
[(0, MemtableRange::new(context))].into()
}
fn is_empty(&self) -> bool {
@@ -327,6 +327,7 @@ impl Memtable for TimeSeriesMemtable {
estimated_bytes,
time_range: None,
num_rows: 0,
num_ranges: 0,
};
}
let ts_type = self
@@ -343,6 +344,7 @@ impl Memtable for TimeSeriesMemtable {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
num_ranges: 1,
}
}

View File

@@ -20,6 +20,7 @@ pub mod last_row;
pub mod merge;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod range;
pub(crate) mod scan_region;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
@@ -753,6 +754,10 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of mem ranges scanned.
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}

View File

@@ -400,7 +400,6 @@ pub(crate) struct LastNonNull {
impl LastNonNull {
/// Creates a new strategy with the given `filter_deleted` flag.
#[allow(dead_code)]
pub(crate) fn new(filter_deleted: bool) -> Self {
Self {
buffer: None,

266
src/mito2/src/read/range.rs Normal file
View File

@@ -0,0 +1,266 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Structs for partition ranges.
use smallvec::{smallvec, SmallVec};
use crate::memtable::MemtableRef;
use crate::read::scan_region::ScanInput;
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
const ALL_ROW_GROUPS: i64 = -1;
/// Index to access a row group.
#[derive(Clone, Copy, PartialEq)]
pub(crate) struct RowGroupIndex {
/// Index to the memtable/file.
pub(crate) index: usize,
/// Row group index in the file.
/// Negative index indicates all row groups.
pub(crate) row_group_index: i64,
}
/// Meta data of a partition range.
/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
pub(crate) struct RangeMeta {
/// The time range of the range.
pub(crate) time_range: FileTimeRange,
/// Indices to memtables or files.
indices: SmallVec<[usize; 2]>,
/// Indices to memtable/file row groups that this range scans.
pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
/// Estimated number of rows in the range. This can be 0 if the statistics are not available.
pub(crate) num_rows: usize,
}
impl RangeMeta {
/// Creates a list of ranges from the `input` for seq scan.
pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
let ranges = group_ranges_for_seq_scan(ranges);
maybe_split_ranges_for_seq_scan(ranges)
}
/// Creates a list of ranges from the `input` for unordered scan.
pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
Self::push_unordered_file_ranges(input.memtables.len(), &input.files, &mut ranges);
ranges
}
/// Returns true if the time range of given `meta` overlaps with the time range of this meta.
pub(crate) fn overlaps(&self, meta: &RangeMeta) -> bool {
overlaps(&self.time_range, &meta.time_range)
}
/// Merges given `meta` to this meta.
/// It assumes that the time ranges overlap and they don't have the same file or memtable index.
pub(crate) fn merge(&mut self, mut other: RangeMeta) {
debug_assert!(self.overlaps(&other));
debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
debug_assert!(self
.row_group_indices
.iter()
.all(|idx| !other.row_group_indices.contains(idx)));
self.time_range = (
self.time_range.0.min(other.time_range.0),
self.time_range.1.max(other.time_range.1),
);
self.indices.append(&mut other.indices);
self.row_group_indices.append(&mut other.row_group_indices);
self.num_rows += other.num_rows;
}
/// Returns true if we can split the range into multiple smaller ranges and
/// still preserve the order for [SeqScan].
pub(crate) fn can_split_preserve_order(&self) -> bool {
// Only one source and multiple row groups.
self.indices.len() == 1 && self.row_group_indices.len() > 1
}
/// Splits the range if it can preserve the order.
pub(crate) fn maybe_split(self, output: &mut Vec<RangeMeta>) {
if self.can_split_preserve_order() {
output.reserve(self.row_group_indices.len());
let num_rows = self.num_rows / self.row_group_indices.len();
// Splits by row group.
for index in self.row_group_indices {
output.push(RangeMeta {
time_range: self.time_range,
indices: self.indices.clone(),
row_group_indices: smallvec![index],
num_rows,
});
}
} else {
output.push(self);
}
}
fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
// For append mode, we can parallelize reading memtables.
for (memtable_index, memtable) in memtables.iter().enumerate() {
let stats = memtable.stats();
let Some(time_range) = stats.time_range() else {
continue;
};
for row_group_index in 0..stats.num_ranges() {
let num_rows = stats.num_rows() / stats.num_ranges();
ranges.push(RangeMeta {
time_range,
indices: smallvec![memtable_index],
row_group_indices: smallvec![RowGroupIndex {
index: memtable_index,
row_group_index: row_group_index as i64,
}],
num_rows,
});
}
}
}
fn push_unordered_file_ranges(
num_memtables: usize,
files: &[FileHandle],
ranges: &mut Vec<RangeMeta>,
) {
// For append mode, we can parallelize reading row groups.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
if file.meta_ref().num_row_groups > 0 {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: row_group_index as i64,
}],
num_rows: DEFAULT_ROW_GROUP_SIZE,
});
}
} else {
// If we don't known the number of row groups in advance, scan all row groups.
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
// This may be 0.
num_rows: file.meta_ref().num_rows as usize,
});
}
}
}
fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
// For non append-only mode, each range only contains one memtable.
for (i, memtable) in memtables.iter().enumerate() {
let stats = memtable.stats();
let Some(time_range) = stats.time_range() else {
continue;
};
ranges.push(RangeMeta {
time_range,
indices: smallvec![i],
row_group_indices: smallvec![RowGroupIndex {
index: i,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: stats.num_rows(),
});
}
}
fn push_seq_file_ranges(
num_memtables: usize,
files: &[FileHandle],
ranges: &mut Vec<RangeMeta>,
) {
// For non append-only mode, each range only contains one file.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
ranges.push(RangeMeta {
time_range: file.time_range(),
indices: smallvec![file_index],
row_group_indices: smallvec![RowGroupIndex {
index: file_index,
row_group_index: ALL_ROW_GROUPS,
}],
num_rows: file.meta_ref().num_rows as usize,
});
}
}
}
/// Groups ranges by time range.
/// It assumes each input range only contains a file or a memtable.
fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
if ranges.is_empty() {
return ranges;
}
// Sorts ranges by time range (start asc, end desc).
ranges.sort_unstable_by(|a, b| {
let l = a.time_range;
let r = b.time_range;
l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
});
let mut range_in_progress = None;
// Parts with exclusive time ranges.
let mut exclusive_ranges = Vec::with_capacity(ranges.len());
for range in ranges {
let Some(mut prev_range) = range_in_progress.take() else {
// This is the new range to process.
range_in_progress = Some(range);
continue;
};
if prev_range.overlaps(&range) {
prev_range.merge(range);
range_in_progress = Some(prev_range);
} else {
exclusive_ranges.push(prev_range);
range_in_progress = Some(range);
}
}
if let Some(range) = range_in_progress {
exclusive_ranges.push(range);
}
exclusive_ranges
}
/// Splits the range into multiple smaller ranges.
/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
let mut new_ranges = Vec::with_capacity(ranges.len());
for range in ranges {
range.maybe_split(&mut new_ranges);
}
new_ranges
}

View File

@@ -14,9 +14,9 @@
//! Scans a region according to the scan request.
use std::collections::HashSet;
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::sync::Arc;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use common_error::ext::BoxedError;
@@ -26,6 +26,7 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use datafusion_expr::utils::expr_to_columns;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
@@ -41,6 +42,7 @@ use crate::memtable::{MemtableRange, MemtableRef};
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{RangeMeta, RowGroupIndex};
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
@@ -51,7 +53,7 @@ use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBui
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
use crate::sst::parquet::reader::ReaderMetrics;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
@@ -643,6 +645,61 @@ impl ScanInput {
Ok(sources)
}
/// Prunes a memtable to scan and returns the builder to build readers.
fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
let memtable = &self.memtables[mem_index];
let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone());
MemRangeBuilder { row_groups }
}
/// Prunes a file to scan and returns the builder to build readers.
async fn prune_file(
&self,
file_index: usize,
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let file = &self.files[file_index];
let res = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, row_groups) = match res {
Ok(x) => x,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
return Ok(FileRangeBuilder::default());
} else {
return Err(e);
}
}
};
if !compat::has_same_columns(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat = CompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
Ok(FileRangeBuilder {
context: Some(Arc::new(file_range_ctx)),
row_groups,
})
}
/// Prunes file ranges to scan and adds them to the `collector`.
pub(crate) async fn prune_file_ranges(
&self,
@@ -749,51 +806,6 @@ impl ScanInput {
pub(crate) fn predicate(&self) -> Option<Predicate> {
self.predicate.clone()
}
/// Retrieves [`PartitionRange`] from memtable and files
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
let mut id = 0;
let mut container = Vec::with_capacity(self.memtables.len() + self.files.len());
for memtable in &self.memtables {
let range = PartitionRange {
// TODO(ruihang): filter out empty memtables in the future.
start: memtable.stats().time_range().unwrap().0,
end: memtable.stats().time_range().unwrap().1,
num_rows: memtable.stats().num_rows(),
identifier: id,
};
id += 1;
container.push(range);
}
for file in &self.files {
if self.append_mode {
// For append mode, we can parallelize reading row groups.
for _ in 0..file.meta_ref().num_row_groups {
let range = PartitionRange {
start: file.time_range().0,
end: file.time_range().1,
num_rows: file.num_rows(),
identifier: id,
};
id += 1;
container.push(range);
}
} else {
let range = PartitionRange {
start: file.meta_ref().time_range.0,
end: file.meta_ref().time_range.1,
num_rows: file.meta_ref().num_rows as usize,
identifier: id,
};
id += 1;
container.push(range);
}
}
container
}
}
#[cfg(test)]
@@ -967,6 +979,10 @@ pub(crate) struct StreamContext {
/// The scanner builds parts to scan from the input lazily.
/// The mutex is used to ensure the parts are only built once.
pub(crate) parts: Mutex<(ScanPartList, Duration)>,
/// Metadata for partition ranges.
pub(crate) ranges: Vec<RangeMeta>,
/// Lists of range builders.
range_builders: RangeBuilderList,
// Metrics:
/// The start time of the query.
@@ -974,17 +990,77 @@ pub(crate) struct StreamContext {
}
impl StreamContext {
/// Creates a new [StreamContext].
pub(crate) fn new(input: ScanInput) -> Self {
/// Creates a new [StreamContext] for [SeqScan].
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
READ_SST_COUNT.observe(input.files.len() as f64);
let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len());
Self {
input,
parts: Mutex::new((ScanPartList::default(), Duration::default())),
ranges,
range_builders,
query_start,
}
}
/// Creates a new [StreamContext] for [UnorderedScan].
pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::unordered_scan_ranges(&input);
READ_SST_COUNT.observe(input.files.len() as f64);
let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len());
Self {
input,
parts: Mutex::new((ScanPartList::default(), Duration::default())),
ranges,
range_builders,
query_start,
}
}
/// Returns true if the index refers to a memtable.
pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
self.input.memtables.len() > index.index
}
/// Creates file ranges to scan.
pub(crate) async fn build_file_ranges(
&self,
index: RowGroupIndex,
ranges: &mut Vec<FileRange>,
reader_metrics: &mut ReaderMetrics,
) -> Result<()> {
ranges.clear();
self.range_builders
.build_file_ranges(&self.input, index, ranges, reader_metrics)
.await
}
/// Creates memtable ranges to scan.
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex, ranges: &mut Vec<MemtableRange>) {
ranges.clear();
self.range_builders
.build_mem_ranges(&self.input, index, ranges)
}
/// Retrieves the partition ranges.
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
self.ranges
.iter()
.enumerate()
.map(|(idx, range_meta)| PartitionRange {
start: range_meta.time_range.0,
end: range_meta.time_range.1,
num_rows: range_meta.num_rows,
identifier: idx,
})
.collect()
}
/// Format the context for explain.
pub(crate) fn format_for_explain(
&self,
@@ -1011,3 +1087,125 @@ impl StreamContext {
Ok(())
}
}
/// List to manages the builders to create file ranges.
struct RangeBuilderList {
mem_builders: Vec<StdMutex<Option<MemRangeBuilder>>>,
file_builders: Vec<Mutex<Option<FileRangeBuilder>>>,
}
impl RangeBuilderList {
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
fn new(num_memtables: usize, num_files: usize) -> Self {
let mem_builders = (0..num_memtables).map(|_| StdMutex::new(None)).collect();
let file_builders = (0..num_files).map(|_| Mutex::new(None)).collect();
Self {
mem_builders,
file_builders,
}
}
/// Builds file ranges to read the row group at `index`.
async fn build_file_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut Vec<FileRange>,
reader_metrics: &mut ReaderMetrics,
) -> Result<()> {
let file_index = index.index - self.mem_builders.len();
let mut builder_opt = self.file_builders[file_index].lock().await;
match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
None => {
let builder = input.prune_file(file_index, reader_metrics).await?;
builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder);
}
}
Ok(())
}
/// Builds mem ranges to read the row group at `index`.
fn build_mem_ranges(
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut Vec<MemtableRange>,
) {
let mut builder_opt = self.mem_builders[index.index].lock().unwrap();
match &mut *builder_opt {
Some(builder) => builder.build_ranges(index.row_group_index, ranges),
None => {
let builder = input.prune_memtable(index.index);
builder.build_ranges(index.row_group_index, ranges);
*builder_opt = Some(builder);
}
}
}
}
/// Builder to create file ranges.
#[derive(Default)]
struct FileRangeBuilder {
/// Context for the file.
/// None indicates nothing to read.
context: Option<FileRangeContextRef>,
/// Row selections for each row group to read.
/// It skips the row group if it is not in the map.
row_groups: BTreeMap<usize, Option<RowSelection>>,
}
impl FileRangeBuilder {
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec<FileRange>) {
let Some(context) = self.context.clone() else {
return;
};
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(row_selection) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(FileRange::new(
context,
row_group_index,
row_selection.clone(),
));
} else {
// Scans all row groups.
ranges.extend(
self.row_groups
.iter()
.map(|(row_group_index, row_selection)| {
FileRange::new(context.clone(), *row_group_index, row_selection.clone())
}),
);
}
}
}
/// Builder to create mem ranges.
struct MemRangeBuilder {
/// Ranges of a memtable.
row_groups: BTreeMap<usize, MemtableRange>,
}
impl MemRangeBuilder {
/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec<MemtableRange>) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.
let Some(range) = self.row_groups.get(&row_group_index) else {
return;
};
ranges.push(range.clone());
} else {
ranges.extend(self.row_groups.values().cloned());
}
}
}

View File

@@ -68,11 +68,10 @@ impl SeqScan {
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
@@ -594,7 +593,7 @@ impl SeqDistributor {
continue;
}
let part = ScanPart {
memtable_ranges: mem_ranges,
memtable_ranges: mem_ranges.into_values().collect(),
file_ranges: smallvec![],
time_range: stats.time_range(),
};

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
@@ -25,23 +25,18 @@ use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBa
use common_telemetry::debug;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use smallvec::smallvec;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::cache::CacheManager;
use crate::error::Result;
use crate::memtable::{MemtableRange, MemtableRef};
use crate::memtable::MemtableRange;
use crate::read::compat::CompatBatch;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
};
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::{ScannerMetrics, Source};
use crate::sst::file::FileMeta;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;
@@ -58,13 +53,11 @@ pub struct UnorderedScan {
impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));
let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
properties.partitions = vec![stream_ctx.partition_ranges()];
Self {
properties,
@@ -127,6 +120,161 @@ impl UnorderedScan {
Ok(Some(record_batch))
}
/// Scans a [PartitionRange] and returns a stream.
fn scan_partition_range<'a>(
stream_ctx: &'a StreamContext,
part_range: &'a PartitionRange,
mem_ranges: &'a mut Vec<MemtableRange>,
file_ranges: &'a mut Vec<FileRange>,
reader_metrics: &'a mut ReaderMetrics,
metrics: &'a mut ScannerMetrics,
) -> impl Stream<Item = common_recordbatch::error::Result<RecordBatch>> + 'a {
stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = Self::scan_mem_ranges(stream_ctx, *index, mem_ranges, metrics);
for await batch in stream {
yield batch;
}
} else {
let stream = Self::scan_file_ranges(stream_ctx, *index, file_ranges, reader_metrics, metrics);
for await batch in stream {
yield batch;
}
}
}
}
}
/// Scans memtable ranges at `index`.
fn scan_mem_ranges<'a>(
stream_ctx: &'a StreamContext,
index: RowGroupIndex,
ranges: &'a mut Vec<MemtableRange>,
metrics: &'a mut ScannerMetrics,
) -> impl Stream<Item = common_recordbatch::error::Result<RecordBatch>> + 'a {
try_stream! {
let mapper = &stream_ctx.input.mapper;
let cache = stream_ctx.input.cache_manager.as_deref();
stream_ctx.build_mem_ranges(index, ranges);
metrics.num_mem_ranges += ranges.len();
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_iter().map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.build_reader_cost = build_reader_start.elapsed();
let mut source = Source::Iter(iter);
while let Some(batch) =
Self::fetch_from_source(&mut source, mapper, cache, None, metrics).await?
{
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
}
}
}
/// Scans file ranges at `index`.
fn scan_file_ranges<'a>(
stream_ctx: &'a StreamContext,
index: RowGroupIndex,
ranges: &'a mut Vec<FileRange>,
reader_metrics: &'a mut ReaderMetrics,
metrics: &'a mut ScannerMetrics,
) -> impl Stream<Item = common_recordbatch::error::Result<RecordBatch>> + 'a {
try_stream! {
let mapper = &stream_ctx.input.mapper;
let cache = stream_ctx.input.cache_manager.as_deref();
stream_ctx
.build_file_ranges(index, ranges, reader_metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.num_file_ranges += ranges.len();
for range in ranges {
let build_reader_start = Instant::now();
let reader = range
.reader(None)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.build_reader_cost += build_reader_start.elapsed();
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(batch) =
Self::fetch_from_source(&mut source, mapper, cache, compat_batch, metrics)
.await?
{
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
if let Source::PruneReader(mut reader) = source {
reader_metrics.merge_from(reader.metrics());
}
}
}
}
fn scan_partition_impl(
&self,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let ranges_opt = self.properties.partitions.get(partition).cloned();
let stream = stream! {
let first_poll = stream_ctx.query_start.elapsed();
let Some(part_ranges) = ranges_opt else {
return;
};
let mut mem_ranges = Vec::new();
let mut file_ranges = Vec::new();
let mut reader_metrics = ReaderMetrics::default();
// Scans each part.
for part_range in part_ranges {
let stream = Self::scan_partition_range(
&stream_ctx,
&part_range,
&mut mem_ranges,
&mut file_ranges,
&mut reader_metrics,
&mut metrics,
);
for await batch in stream {
yield batch;
}
}
reader_metrics.observe_rows("unordered_scan_files");
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
let mapper = &stream_ctx.input.mapper;
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
}
impl RegionScanner for UnorderedScan {
@@ -144,89 +292,7 @@ impl RegionScanner for UnorderedScan {
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
let part = {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
// Clone the part and releases the lock.
// TODO(yingwen): We might wrap the part in an Arc in the future if cloning is too expensive.
let Some(part) = parts.0.get_part(partition).cloned() else {
return;
};
part
};
let build_reader_start = Instant::now();
let mapper = &stream_ctx.input.mapper;
let memtable_sources = part
.memtable_ranges
.iter()
.map(|mem| {
let iter = mem.build_iter()?;
Ok(Source::Iter(iter))
})
.collect::<Result<Vec<_>>>()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.build_reader_cost = build_reader_start.elapsed();
let query_start = stream_ctx.query_start;
let cache = stream_ctx.input.cache_manager.as_deref();
// Scans memtables first.
for mut source in memtable_sources {
while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, None, &mut metrics).await? {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
}
// Then scans file ranges.
let mut reader_metrics = ReaderMetrics::default();
// Safety: UnorderedDistributor::build_parts() ensures this.
for file_range in &part.file_ranges[0] {
let build_reader_start = Instant::now();
let reader = file_range.reader(None).await.map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.build_reader_cost += build_reader_start.elapsed();
let compat_batch = file_range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(batch) = Self::fetch_from_source(&mut source, mapper, cache, compat_batch, &mut metrics).await? {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
if let Source::PruneReader(mut reader) = source {
reader_metrics.merge_from(reader.metrics());
}
}
reader_metrics.observe_rows("unordered_scan_files");
metrics.total_cost = query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}, ranges: {}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, part.file_ranges[0].len(),
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
self.scan_partition_impl(partition)
}
fn has_predicate(&self) -> bool {
@@ -261,117 +327,3 @@ impl UnorderedScan {
&self.stream_ctx.input
}
}
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
parallelism: usize,
) -> Result<()> {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list.0.set_parts(distributor.build_parts(parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
}
Ok(())
}
/// Builds [ScanPart]s without preserving order. It distributes file ranges and memtables
/// across partitions. Each partition scans a subset of memtables and file ranges. There
/// is no output ordering guarantee of each partition.
#[derive(Default)]
struct UnorderedDistributor {
mem_ranges: Vec<MemtableRange>,
file_ranges: Vec<FileRange>,
}
impl FileRangeCollector for UnorderedDistributor {
fn append_file_ranges(
&mut self,
_file_meta: &FileMeta,
file_ranges: impl Iterator<Item = FileRange>,
) {
self.file_ranges.extend(file_ranges);
}
}
impl UnorderedDistributor {
/// Appends memtable ranges to the distributor.
fn append_mem_ranges(
&mut self,
memtables: &[MemtableRef],
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) {
for mem in memtables {
let mut mem_ranges = mem.ranges(projection, predicate.clone());
if mem_ranges.is_empty() {
continue;
}
self.mem_ranges.append(&mut mem_ranges);
}
}
/// Distributes file ranges and memtables across partitions according to the `parallelism`.
/// The output number of parts may be `<= parallelism`.
///
/// [ScanPart] created by this distributor only contains one group of file ranges.
fn build_parts(self, parallelism: usize) -> Vec<ScanPart> {
if parallelism <= 1 {
// Returns a single part.
let part = ScanPart {
memtable_ranges: self.mem_ranges.clone(),
file_ranges: smallvec![self.file_ranges],
time_range: None,
};
return vec![part];
}
let mems_per_part = ((self.mem_ranges.len() + parallelism - 1) / parallelism).max(1);
let ranges_per_part = ((self.file_ranges.len() + parallelism - 1) / parallelism).max(1);
debug!(
"Parallel scan is enabled, parallelism: {}, {} mem_ranges, {} file_ranges, mems_per_part: {}, ranges_per_part: {}",
parallelism,
self.mem_ranges.len(),
self.file_ranges.len(),
mems_per_part,
ranges_per_part
);
let mut scan_parts = self
.mem_ranges
.chunks(mems_per_part)
.map(|mems| ScanPart {
memtable_ranges: mems.to_vec(),
file_ranges: smallvec![Vec::new()], // Ensures there is always one group.
time_range: None,
})
.collect::<Vec<_>>();
for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() {
if i == scan_parts.len() {
scan_parts.push(ScanPart {
memtable_ranges: Vec::new(),
file_ranges: smallvec![ranges.to_vec()],
time_range: None,
});
} else {
scan_parts[i].file_ranges = smallvec![ranges.to_vec()];
}
}
scan_parts
}
}

View File

@@ -40,7 +40,7 @@ pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
/// Default batch size to read parquet files.
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
/// Default row group size for parquet files.
const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
/// Parquet write options.
#[derive(Debug)]

View File

@@ -238,6 +238,7 @@ impl ParquetReaderBuilder {
cache_manager: self.cache_manager.clone(),
};
// TODO(yingwen): count the cost of the method.
metrics.build_cost = start.elapsed();
let mut filters = if let Some(predicate) = &self.predicate {

View File

@@ -14,6 +14,7 @@
//! Memtable test utilities.
use std::collections::BTreeMap;
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
@@ -92,8 +93,8 @@ impl Memtable for EmptyMemtable {
&self,
_projection: Option<&[ColumnId]>,
_predicate: Option<Predicate>,
) -> Vec<MemtableRange> {
vec![]
) -> BTreeMap<usize, MemtableRange> {
BTreeMap::new()
}
fn is_empty(&self) -> bool {