feat: Seq scanner scans data by time range (#4809)

* feat: seq scan by partition

* feat: part metrics

* chore: remove unused codes

* chore: fmt stream

* feat: build ranges returns smallvec

* feat: move scan mem/file ranges to util and reuse

* feat: log metrics

* chore: correct some metrics

* feat: get explain info from ranges

* test: group test and remove unused codes

* chore: fix clippy

* feat: change PartitionRange end to exclusive

* test: add tests
This commit is contained in:
Yingwen
2024-10-17 19:05:12 +08:00
committed by GitHub
parent 613e07afb4
commit e0c4157ad8
10 changed files with 519 additions and 1026 deletions

View File

@@ -22,6 +22,7 @@ pub mod projection;
pub(crate) mod prune;
pub(crate) mod range;
pub(crate) mod scan_region;
pub(crate) mod scan_util;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
@@ -57,7 +58,6 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -738,7 +738,7 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
pub(crate) struct ScannerMetrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build parts.
/// Duration to build file ranges.
build_parts_cost: Duration,
/// Duration to build the (merge) reader.
build_reader_cost: Duration,
@@ -758,31 +758,17 @@ pub(crate) struct ScannerMetrics {
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}
impl ScannerMetrics {
/// Sets and observes metrics on initializing parts.
fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) {
self.build_parts_cost = build_parts_cost;
// Observes metrics.
/// Observes metrics.
fn observe_metrics(&self) {
READ_STAGE_ELAPSED
.with_label_values(&["prepare_scan"])
.observe(self.prepare_scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());
// We only call this once so we overwrite it directly.
self.filter_metrics = reader_metrics.filter_metrics;
// Observes filter metrics.
self.filter_metrics.observe();
}
/// Observes metrics on scanner finish.
fn observe_metrics_on_finish(&self) {
READ_STAGE_ELAPSED
.with_label_values(&["build_reader"])
.observe(self.build_reader_cost.as_secs_f64());
@@ -801,6 +787,21 @@ impl ScannerMetrics {
READ_ROWS_RETURN.observe(self.num_rows as f64);
READ_BATCHES_RETURN.observe(self.num_batches as f64);
}
/// Merges metrics from another [ScannerMetrics].
fn merge_from(&mut self, other: &ScannerMetrics) {
self.prepare_scan_cost += other.prepare_scan_cost;
self.build_parts_cost += other.build_parts_cost;
self.build_reader_cost += other.build_reader_cost;
self.scan_cost += other.scan_cost;
self.convert_cost += other.convert_cost;
self.yield_cost += other.yield_cost;
self.total_cost += other.total_cost;
self.num_batches += other.num_batches;
self.num_rows += other.num_rows;
self.num_mem_ranges += other.num_mem_ranges;
self.num_file_ranges += other.num_file_ranges;
}
}
#[cfg(test)]

View File

@@ -14,7 +14,9 @@
//! Structs for partition ranges.
use common_time::Timestamp;
use smallvec::{smallvec, SmallVec};
use store_api::region_engine::PartitionRange;
use crate::memtable::MemtableRef;
use crate::read::scan_region::ScanInput;
@@ -48,6 +50,26 @@ pub(crate) struct RangeMeta {
}
impl RangeMeta {
/// Creates a [PartitionRange] with specific identifier.
/// It converts the inclusive max timestamp to exclusive end timestamp.
pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
PartitionRange {
start: self.time_range.0,
end: Timestamp::new(
// The i64::MAX timestamp may be invisible but we don't guarantee to support this
// value now.
self.time_range
.1
.value()
.checked_add(1)
.unwrap_or(self.time_range.1.value()),
self.time_range.1.unit(),
),
num_rows: self.num_rows,
identifier,
}
}
/// Creates a list of ranges from the `input` for seq scan.
pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
@@ -177,7 +199,7 @@ impl RangeMeta {
}
fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
// For non append-only mode, each range only contains one memtable.
// For non append-only mode, each range only contains one memtable by default.
for (i, memtable) in memtables.iter().enumerate() {
let stats = memtable.stats();
let Some(time_range) = stats.time_range() else {
@@ -195,6 +217,7 @@ impl RangeMeta {
}
}
// TODO(yingwen): Support multiple row groups in a range so we can split them later.
fn push_seq_file_ranges(
num_memtables: usize,
files: &[FileHandle],
@@ -264,3 +287,83 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
new_ranges
}
#[cfg(test)]
mod tests {
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use super::*;
type Output = (Vec<usize>, i64, i64);
fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
let ranges = input
.iter()
.map(|(idx, start, end)| {
let time_range = (
Timestamp::new(*start, TimeUnit::Second),
Timestamp::new(*end, TimeUnit::Second),
);
RangeMeta {
time_range,
indices: smallvec![*idx],
row_group_indices: smallvec![RowGroupIndex {
index: *idx,
row_group_index: 0
}],
num_rows: 1,
}
})
.collect();
let output = group_ranges_for_seq_scan(ranges);
let actual: Vec<_> = output
.iter()
.map(|range| {
let indices = range.indices.to_vec();
let group_indices: Vec<_> = range
.row_group_indices
.iter()
.map(|idx| idx.index)
.collect();
assert_eq!(indices, group_indices);
let range = range.time_range;
(indices, range.0.value(), range.1.value())
})
.collect();
assert_eq!(expect, actual);
}
#[test]
fn test_group_ranges() {
// Group 1 part.
run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
// 1, 2, 3, 4 => [3, 1, 4], [2]
run_group_ranges_test(
&[
(1, 1000, 2000),
(2, 6000, 7000),
(3, 0, 1500),
(4, 1500, 3000),
],
&[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
);
// 1, 2, 3 => [3], [1], [2],
run_group_ranges_test(
&[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
&[
(vec![3], 0, 1000),
(vec![1], 3000, 4000),
(vec![2], 4001, 6000),
],
);
// 1, 2, 3 => [3], [1, 2]
run_group_ranges_test(
&[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
&[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
);
}
}

View File

@@ -17,14 +17,12 @@
use std::collections::{BTreeMap, HashSet};
use std::fmt;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::{Duration, Instant};
use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use datafusion_expr::utils::expr_to_columns;
use parquet::arrow::arrow_reader::RowSelection;
use smallvec::SmallVec;
@@ -48,7 +46,7 @@ use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::file::{overlaps, FileHandle, FileMeta};
use crate::sst::file::FileHandle;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
@@ -700,73 +698,6 @@ impl ScanInput {
})
}
/// Prunes file ranges to scan and adds them to the `collector`.
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<ReaderMetrics> {
let mut file_prune_cost = Duration::ZERO;
let mut reader_metrics = ReaderMetrics::default();
for file in &self.files {
let prune_start = Instant::now();
let res = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input(&mut reader_metrics)
.await;
file_prune_cost += prune_start.elapsed();
let (mut file_range_ctx, row_groups) = match res {
Ok(x) => x,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
continue;
} else {
return Err(e);
}
}
};
if !compat::has_same_columns(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat = CompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
// Build ranges from row groups.
let file_range_ctx = Arc::new(file_range_ctx);
let file_ranges = row_groups
.into_iter()
.map(|(row_group_idx, row_selection)| {
FileRange::new(file_range_ctx.clone(), row_group_idx, row_selection)
});
collector.append_file_ranges(file.meta_ref(), file_ranges);
}
READ_SST_COUNT.observe(self.files.len() as f64);
common_telemetry::debug!(
"Region {} prune {} files, cost is {:?}",
self.mapper.metadata().region_id,
self.files.len(),
file_prune_cost
);
Ok(reader_metrics)
}
/// Scans the input source in another task and sends batches to the sender.
pub(crate) fn spawn_scan_task(
&self,
@@ -806,10 +737,7 @@ impl ScanInput {
pub(crate) fn predicate(&self) -> Option<Predicate> {
self.predicate.clone()
}
}
#[cfg(test)]
impl ScanInput {
/// Returns number of memtables to scan.
pub(crate) fn num_memtables(&self) -> usize {
self.memtables.len()
@@ -819,166 +747,21 @@ impl ScanInput {
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}
}
#[cfg(test)]
impl ScanInput {
/// Returns SST file ids to scan.
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
}
/// Groups of file ranges. Each group in the list contains multiple file
/// ranges to scan. File ranges in the same group may come from different files.
pub(crate) type FileRangesGroup = SmallVec<[Vec<FileRange>; 4]>;
/// A partition of a scanner to read.
/// It contains memtables and file ranges to scan.
#[derive(Clone, Default)]
pub(crate) struct ScanPart {
/// Memtable ranges to scan.
pub(crate) memtable_ranges: Vec<MemtableRange>,
/// File ranges to scan.
pub(crate) file_ranges: FileRangesGroup,
/// Optional time range of the part (inclusive).
pub(crate) time_range: Option<(Timestamp, Timestamp)>,
}
impl fmt::Debug for ScanPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ScanPart({} memtable ranges, {} file ranges",
self.memtable_ranges.len(),
self.file_ranges
.iter()
.map(|ranges| ranges.len())
.sum::<usize>(),
)?;
if let Some(time_range) = &self.time_range {
write!(f, ", time range: {:?})", time_range)
} else {
write!(f, ")")
}
}
}
impl ScanPart {
/// Returns true if the time range given `part` overlaps with this part.
pub(crate) fn overlaps(&self, part: &ScanPart) -> bool {
let (Some(current_range), Some(part_range)) = (self.time_range, part.time_range) else {
return true;
};
overlaps(&current_range, &part_range)
}
/// Merges given `part` to this part.
pub(crate) fn merge(&mut self, mut part: ScanPart) {
self.memtable_ranges.append(&mut part.memtable_ranges);
self.file_ranges.append(&mut part.file_ranges);
let Some(part_range) = part.time_range else {
return;
};
let Some(current_range) = self.time_range else {
self.time_range = part.time_range;
return;
};
let start = current_range.0.min(part_range.0);
let end = current_range.1.max(part_range.1);
self.time_range = Some((start, end));
}
/// Returns true if the we can split the part into multiple parts
/// and preserving order.
pub(crate) fn can_split_preserve_order(&self) -> bool {
self.memtable_ranges.is_empty()
&& self.file_ranges.len() == 1
&& self.file_ranges[0].len() > 1
}
}
/// A trait to collect file ranges to scan.
pub(crate) trait FileRangeCollector {
/// Appends file ranges from the **same file** to the collector.
fn append_file_ranges(
&mut self,
file_meta: &FileMeta,
file_ranges: impl Iterator<Item = FileRange>,
);
}
/// Optional list of [ScanPart]s.
#[derive(Default)]
pub(crate) struct ScanPartList(pub(crate) Option<Vec<ScanPart>>);
impl fmt::Debug for ScanPartList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
Some(parts) => write!(f, "{:?}", parts),
None => write!(f, "[]"),
}
}
}
impl ScanPartList {
/// Returns true if the list is None.
pub(crate) fn is_none(&self) -> bool {
self.0.is_none()
}
/// Sets parts to the list.
pub(crate) fn set_parts(&mut self, parts: Vec<ScanPart>) {
self.0 = Some(parts);
}
/// Gets the part by index, returns None if the index is out of bound.
/// # Panics
/// Panics if parts are not initialized.
pub(crate) fn get_part(&mut self, index: usize) -> Option<&ScanPart> {
let parts = self.0.as_ref().unwrap();
parts.get(index)
}
/// Returns the number of parts.
pub(crate) fn len(&self) -> usize {
self.0.as_ref().map_or(0, |parts| parts.len())
}
/// Returns the number of memtable ranges.
pub(crate) fn num_mem_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.memtable_ranges.len()).sum()
})
}
/// Returns the number of files.
pub(crate) fn num_files(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.file_ranges.len()).sum()
})
}
/// Returns the number of file ranges.
pub(crate) fn num_file_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts
.iter()
.flat_map(|part| part.file_ranges.iter())
.map(|ranges| ranges.len())
.sum()
})
}
}
/// Context shared by different streams from a scanner.
/// It contains the input and distributes input to multiple parts
/// to scan.
/// It contains the input and ranges to scan.
pub(crate) struct StreamContext {
/// Input memtables and files.
pub(crate) input: ScanInput,
/// Parts to scan and the cost to build parts.
/// The scanner builds parts to scan from the input lazily.
/// The mutex is used to ensure the parts are only built once.
pub(crate) parts: Mutex<(ScanPartList, Duration)>,
/// Metadata for partition ranges.
pub(crate) ranges: Vec<RangeMeta>,
/// Lists of range builders.
@@ -994,12 +777,11 @@ impl StreamContext {
pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input);
READ_SST_COUNT.observe(input.files.len() as f64);
let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len());
READ_SST_COUNT.observe(input.num_files() as f64);
let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files());
Self {
input,
parts: Mutex::new((ScanPartList::default(), Duration::default())),
ranges,
range_builders,
query_start,
@@ -1010,12 +792,11 @@ impl StreamContext {
pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::unordered_scan_ranges(&input);
READ_SST_COUNT.observe(input.files.len() as f64);
let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len());
READ_SST_COUNT.observe(input.num_files() as f64);
let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files());
Self {
input,
parts: Mutex::new((ScanPartList::default(), Duration::default())),
ranges,
range_builders,
query_start,
@@ -1024,27 +805,28 @@ impl StreamContext {
/// Returns true if the index refers to a memtable.
pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
self.input.memtables.len() > index.index
self.input.num_memtables() > index.index
}
/// Creates file ranges to scan.
pub(crate) async fn build_file_ranges(
&self,
index: RowGroupIndex,
ranges: &mut Vec<FileRange>,
reader_metrics: &mut ReaderMetrics,
) -> Result<()> {
ranges.clear();
) -> Result<SmallVec<[FileRange; 2]>> {
let mut ranges = SmallVec::new();
self.range_builders
.build_file_ranges(&self.input, index, ranges, reader_metrics)
.await
.build_file_ranges(&self.input, index, &mut ranges, reader_metrics)
.await?;
Ok(ranges)
}
/// Creates memtable ranges to scan.
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex, ranges: &mut Vec<MemtableRange>) {
ranges.clear();
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
let mut ranges = SmallVec::new();
self.range_builders
.build_mem_ranges(&self.input, index, ranges)
.build_mem_ranges(&self.input, index, &mut ranges);
ranges
}
/// Retrieves the partition ranges.
@@ -1052,35 +834,30 @@ impl StreamContext {
self.ranges
.iter()
.enumerate()
.map(|(idx, range_meta)| PartitionRange {
start: range_meta.time_range.0,
end: range_meta.time_range.1,
num_rows: range_meta.num_rows,
identifier: idx,
})
.map(|(idx, range_meta)| range_meta.new_partition_range(idx))
.collect()
}
/// Format the context for explain.
pub(crate) fn format_for_explain(
&self,
t: DisplayFormatType,
f: &mut fmt::Formatter,
) -> fmt::Result {
match self.parts.try_lock() {
Ok(inner) => match t {
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} memtable ranges, {} file {} ranges)",
inner.0.len(),
inner.0.num_mem_ranges(),
inner.0.num_files(),
inner.0.num_file_ranges()
)?,
DisplayFormatType::Verbose => write!(f, "{:?}", inner.0)?,
},
Err(_) => write!(f, "<locked>")?,
pub(crate) fn format_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
for range_meta in &self.ranges {
for idx in &range_meta.row_group_indices {
if self.is_mem_range_index(*idx) {
num_mem_ranges += 1;
} else {
num_file_ranges += 1;
}
}
}
write!(
f,
"partition_count={} ({} memtable ranges, {} file {} ranges)",
self.ranges.len(),
num_mem_ranges,
self.input.num_files(),
num_file_ranges,
)?;
if let Some(selector) = &self.input.series_row_selector {
write!(f, ", selector={}", selector)?;
}
@@ -1110,7 +887,7 @@ impl RangeBuilderList {
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut Vec<FileRange>,
ranges: &mut SmallVec<[FileRange; 2]>,
reader_metrics: &mut ReaderMetrics,
) -> Result<()> {
let file_index = index.index - self.mem_builders.len();
@@ -1131,7 +908,7 @@ impl RangeBuilderList {
&self,
input: &ScanInput,
index: RowGroupIndex,
ranges: &mut Vec<MemtableRange>,
ranges: &mut SmallVec<[MemtableRange; 2]>,
) {
let mut builder_opt = self.mem_builders[index.index].lock().unwrap();
match &mut *builder_opt {
@@ -1159,7 +936,7 @@ struct FileRangeBuilder {
impl FileRangeBuilder {
/// Builds file ranges to read.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec<FileRange>) {
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
let Some(context) = self.context.clone() else {
return;
};
@@ -1196,7 +973,7 @@ struct MemRangeBuilder {
impl MemRangeBuilder {
/// Builds mem ranges to read in the memtable.
/// Negative `row_group_index` indicates all row groups.
fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec<MemtableRange>) {
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
if row_group_index >= 0 {
let row_group_index = row_group_index as usize;
// Scans one row group.

View File

@@ -0,0 +1,182 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for scanners.
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_telemetry::debug;
use futures::Stream;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::read::range::RowGroupIndex;
use crate::read::scan_region::StreamContext;
use crate::read::{Batch, ScannerMetrics, Source};
use crate::sst::parquet::reader::ReaderMetrics;
struct PartitionMetricsInner {
region_id: RegionId,
/// Index of the partition to scan.
partition: usize,
/// Label to distinguish different scan operation.
scanner_type: &'static str,
/// Query start time.
query_start: Instant,
/// Elapsed time before the first poll operation.
first_poll: Duration,
metrics: ScannerMetrics,
reader_metrics: ReaderMetrics,
}
impl PartitionMetricsInner {
fn on_finish(&mut self) {
if self.metrics.total_cost.is_zero() {
self.metrics.total_cost = self.query_start.elapsed();
}
self.metrics.build_parts_cost = self.reader_metrics.build_cost;
}
}
impl Drop for PartitionMetricsInner {
fn drop(&mut self) {
self.on_finish();
self.metrics.observe_metrics();
debug!(
"{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}",
self.scanner_type, self.region_id, self.partition, self.first_poll, self.metrics, self.reader_metrics
);
}
}
/// Metrics while reading a partition.
#[derive(Clone)]
pub(crate) struct PartitionMetrics(Arc<Mutex<PartitionMetricsInner>>);
impl PartitionMetrics {
pub(crate) fn new(
region_id: RegionId,
partition: usize,
scanner_type: &'static str,
query_start: Instant,
metrics: ScannerMetrics,
) -> Self {
let inner = PartitionMetricsInner {
region_id,
partition,
scanner_type,
query_start,
first_poll: Duration::default(),
metrics,
reader_metrics: ReaderMetrics::default(),
};
Self(Arc::new(Mutex::new(inner)))
}
pub(crate) fn on_first_poll(&self) {
let mut inner = self.0.lock().unwrap();
inner.first_poll = inner.query_start.elapsed();
}
pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
let mut inner = self.0.lock().unwrap();
inner.metrics.num_mem_ranges += num;
}
pub(crate) fn inc_num_file_ranges(&self, num: usize) {
let mut inner = self.0.lock().unwrap();
inner.metrics.num_file_ranges += num;
}
pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
let mut inner = self.0.lock().unwrap();
inner.metrics.build_reader_cost += cost;
}
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
let mut inner = self.0.lock().unwrap();
inner.metrics.merge_from(metrics);
}
pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
let mut inner = self.0.lock().unwrap();
inner.reader_metrics.merge_from(metrics);
}
pub(crate) fn on_finish(&self) {
let mut inner = self.0.lock().unwrap();
inner.on_finish();
}
}
/// Scans memtable ranges at `index`.
pub(crate) fn scan_mem_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let ranges = stream_ctx.build_mem_ranges(index);
part_metrics.inc_num_mem_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_iter()?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let mut source = Source::Iter(iter);
while let Some(batch) = source.next_batch().await? {
yield batch;
}
}
}
}
/// Scans file ranges at `index`.
pub(crate) fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
read_type: &'static str,
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let mut reader_metrics = ReaderMetrics::default();
let ranges = stream_ctx
.build_file_ranges(index, &mut reader_metrics)
.await?;
part_metrics.inc_num_file_ranges(ranges.len());
for range in ranges {
let build_reader_start = Instant::now();
let reader = range.reader(None).await?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(mut batch) = source.next_batch().await? {
if let Some(compact_batch) = compat_batch {
batch = compact_batch.compat_batch(batch)?;
}
yield batch;
}
if let Source::PruneReader(mut reader) = source {
reader_metrics.merge_from(reader.metrics());
}
}
// Reports metrics.
reader_metrics.observe_rows(read_type);
part_metrics.merge_reader_metrics(&reader_metrics);
}
}

View File

@@ -16,36 +16,29 @@
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::{debug, tracing};
use common_telemetry::tracing;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::{ColumnId, TimeSeriesRowSelector};
use table::predicate::Predicate;
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::memtable::MemtableRef;
use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
use crate::read::last_row::LastRowReader;
use crate::read::merge::MergeReaderBuilder;
use crate::read::scan_region::{
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::region::options::MergeMode;
use crate::sst::file::FileMeta;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;
/// Scans a region and returns rows in a sorted sequence.
///
@@ -66,6 +59,8 @@ pub struct SeqScan {
impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
// TODO(yingwen): Set permits according to partition num. But we need to support file
// level parallelism.
let parallelism = input.parallelism.parallelism.max(1);
let mut properties = ScannerProperties::default()
.with_append_mode(input.append_mode)
@@ -102,150 +97,49 @@ impl SeqScan {
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let maybe_reader = Self::build_all_merge_reader(
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
0,
get_scanner_type(self.compaction),
self.stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
},
);
debug_assert_eq!(1, self.properties.partitions.len());
let partition_ranges = &self.properties.partitions[0];
let reader = Self::build_all_merge_reader(
&self.stream_ctx,
partition_ranges,
self.semaphore.clone(),
&mut metrics,
self.compaction,
self.properties.num_partitions(),
&part_metrics,
)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
let reader = maybe_reader.unwrap();
Ok(Box::new(reader))
}
/// Builds sources from a [ScanPart].
fn build_part_sources(
part: &ScanPart,
sources: &mut Vec<Source>,
row_selector: Option<TimeSeriesRowSelector>,
compaction: bool,
) -> Result<()> {
sources.reserve(part.memtable_ranges.len() + part.file_ranges.len());
// Read memtables.
for mem in &part.memtable_ranges {
let iter = mem.build_iter()?;
sources.push(Source::Iter(iter));
}
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
// Read files.
for file in &part.file_ranges {
if file.is_empty() {
continue;
}
// Creates a stream to read the file.
let ranges = file.clone();
let stream = try_stream! {
let mut reader_metrics = ReaderMetrics::default();
// Safety: We checked whether it is empty before.
let file_id = ranges[0].file_handle().file_id();
let region_id = ranges[0].file_handle().region_id();
let range_num = ranges.len();
for range in ranges {
let mut reader = range.reader(row_selector).await?;
let compat_batch = range.compat_batch();
while let Some(mut batch) = reader.next_batch().await? {
if let Some(compat) = compat_batch {
batch = compat
.compat_batch(batch)?;
}
yield batch;
}
reader_metrics.merge_from(reader.metrics());
}
debug!(
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}, compaction: {}",
region_id, file_id, range_num, reader_metrics, compaction
);
// Reports metrics.
reader_metrics.observe_rows(read_type);
};
let stream = Box::pin(stream);
sources.push(Source::Stream(stream));
}
Ok(())
}
/// Builds a merge reader that reads all data.
async fn build_all_merge_reader(
stream_ctx: &StreamContext,
stream_ctx: &Arc<StreamContext>,
partition_ranges: &[PartitionRange],
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
parallelism: usize,
) -> Result<Option<BoxedBatchReader>> {
// initialize parts list
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?;
let parts_len = parts.0.len();
let mut sources = Vec::with_capacity(parts_len);
for id in 0..parts_len {
let Some(part) = parts.0.get_part(id) else {
return Ok(None);
};
Self::build_part_sources(part, &mut sources, None, compaction)?;
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
}
/// Builds a merge reader that reads data from one [`PartitionRange`].
///
/// If the `range_id` is out of bound, returns None.
async fn build_merge_reader(
stream_ctx: &StreamContext,
range_id: usize,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
compaction: bool,
parallelism: usize,
) -> Result<Option<BoxedBatchReader>> {
part_metrics: &PartitionMetrics,
) -> Result<BoxedBatchReader> {
let mut sources = Vec::new();
let build_start = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?;
let Some(part) = parts.0.get_part(range_id) else {
return Ok(None);
};
let build_start = Instant::now();
Self::build_part_sources(
part,
&mut sources,
stream_ctx.input.series_row_selector,
for part_range in partition_ranges {
build_sources(
stream_ctx,
part_range,
compaction,
)?;
build_start
};
let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await;
let build_reader_cost = build_start.elapsed();
metrics.build_reader_cost += build_reader_cost;
debug!(
"Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
range_id,
build_reader_cost,
compaction,
);
maybe_reader
part_metrics,
&mut sources,
);
}
Self::build_reader_from_sources(stream_ctx, sources, semaphore).await
}
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
@@ -253,7 +147,7 @@ impl SeqScan {
stream_ctx: &StreamContext,
mut sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Option<BoxedBatchReader>> {
) -> Result<BoxedBatchReader> {
if stream_ctx.input.parallelism.parallelism > 1 {
// Read sources in parallel. We always spawn a task so we can control the parallelism
// by the semaphore.
@@ -286,13 +180,11 @@ impl SeqScan {
None => reader,
};
Ok(Some(reader))
Ok(reader)
}
/// Scans the given partition when the part list is set properly.
/// Otherwise the returned stream might not contains any data.
// TODO: refactor out `uncached_scan_part_impl`.
#[allow(dead_code)]
fn scan_partition_impl(
&self,
partition: usize,
@@ -307,28 +199,36 @@ impl SeqScan {
));
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let parallelism = self.properties.num_partitions();
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
get_scanner_type(self.compaction),
stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
},
);
for partition_range in partition_ranges {
let maybe_reader =
Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction, parallelism)
let stream = try_stream! {
part_metrics.on_first_poll();
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
build_sources(&stream_ctx, &part_range, compaction, &part_metrics, &mut sources);
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(mut reader) = maybe_reader else {
return;
};
let cache = stream_ctx.input.cache_manager.as_deref();
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
@@ -350,18 +250,10 @@ impl SeqScan {
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
metrics,
first_poll,
compaction,
);
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
@@ -371,133 +263,6 @@ impl SeqScan {
Ok(stream)
}
/// Scans the given partition when the part list is not set.
/// This method will do a lazy initialize of part list and
/// ignores the partition settings in `properties`.
fn uncached_scan_part_impl(
&self,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let num_partitions = self.properties.partitions.len();
if partition >= num_partitions {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
.build(),
));
}
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let compaction = self.compaction;
let parallelism = self.properties.num_partitions();
// build stream
let stream = try_stream! {
let first_poll = stream_ctx.query_start.elapsed();
// init parts
let parts_len = {
let mut parts = stream_ctx.parts.lock().await;
Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism).await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
parts.0.len()
};
for id in (0..parts_len).skip(partition).step_by(num_partitions) {
let maybe_reader = Self::build_merge_reader(
&stream_ctx,
id,
semaphore.clone(),
&mut metrics,
compaction,
parallelism
)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(mut reader) = maybe_reader else {
return;
};
let cache = stream_ctx.input.cache_manager.as_deref();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}, compaction: {}",
stream_ctx.input.mapper.metadata().region_id,
partition,
id,
metrics,
first_poll,
compaction,
);
}
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut (ScanPartList, Duration),
metrics: &mut ScannerMetrics,
parallelism: usize,
) -> Result<()> {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
input.predicate.clone(),
);
part_list.0.set_parts(distributor.build_parts(parallelism));
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
}
Ok(())
}
}
impl RegionScanner for SeqScan {
@@ -510,7 +275,7 @@ impl RegionScanner for SeqScan {
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
self.uncached_scan_part_impl(partition)
self.scan_partition_impl(partition)
}
fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
@@ -525,24 +290,53 @@ impl RegionScanner for SeqScan {
}
impl DisplayAs for SeqScan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"SeqScan: region={}, ",
self.stream_ctx.input.mapper.metadata().region_id
)?;
self.stream_ctx.format_for_explain(t, f)
self.stream_ctx.format_for_explain(f)
}
}
impl fmt::Debug for SeqScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SeqScan")
.field("parts", &self.stream_ctx.parts)
.field("num_ranges", &self.stream_ctx.ranges.len())
.finish()
}
}
/// Builds sources for the partition range.
fn build_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
sources: &mut Vec<Source>,
) {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
sources.reserve(range_meta.row_group_indices.len());
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
Box::pin(stream) as _
} else {
let read_type = if compaction {
"compaction"
} else {
"seq_scan_files"
};
let stream =
scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type);
Box::pin(stream) as _
};
sources.push(Source::Stream(stream));
}
}
#[cfg(test)]
impl SeqScan {
/// Returns the input.
@@ -551,266 +345,11 @@ impl SeqScan {
}
}
/// Builds [ScanPart]s that preserves order.
#[derive(Default)]
pub(crate) struct SeqDistributor {
parts: Vec<ScanPart>,
}
impl FileRangeCollector for SeqDistributor {
fn append_file_ranges(
&mut self,
file_meta: &FileMeta,
file_ranges: impl Iterator<Item = FileRange>,
) {
// Creates a [ScanPart] for each file.
let ranges: Vec<_> = file_ranges.collect();
if ranges.is_empty() {
// No ranges to read.
return;
}
let part = ScanPart {
memtable_ranges: Vec::new(),
file_ranges: smallvec![ranges],
time_range: Some(file_meta.time_range),
};
self.parts.push(part);
}
}
impl SeqDistributor {
/// Appends memtable ranges to the distributor.
fn append_mem_ranges(
&mut self,
memtables: &[MemtableRef],
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) {
for mem in memtables {
let stats = mem.stats();
let mem_ranges = mem.ranges(projection, predicate.clone());
if mem_ranges.is_empty() {
continue;
}
let part = ScanPart {
memtable_ranges: mem_ranges.into_values().collect(),
file_ranges: smallvec![],
time_range: stats.time_range(),
};
self.parts.push(part);
}
}
/// Groups file ranges and memtable ranges by time ranges.
/// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1.
///
/// Output parts have non-overlapping time ranges.
fn build_parts(self, parallelism: usize) -> Vec<ScanPart> {
let parallelism = parallelism.max(1);
let parts = group_parts_by_range(self.parts);
let parts = maybe_split_parts(parts, parallelism);
// Ensures it doesn't returns parts more than `parallelism`.
maybe_merge_parts(parts, parallelism)
}
}
/// Groups parts by time range. It may generate parts more than parallelism.
/// All time ranges are not None.
fn group_parts_by_range(mut parts: Vec<ScanPart>) -> Vec<ScanPart> {
if parts.is_empty() {
return Vec::new();
}
// Sorts parts by time range.
parts.sort_unstable_by(|a, b| {
// Safety: time ranges of parts from [SeqPartBuilder] are not None.
let a = a.time_range.unwrap();
let b = b.time_range.unwrap();
a.0.cmp(&b.0).then_with(|| b.1.cmp(&a.1))
});
let mut part_in_range = None;
// Parts with exclusive time ranges.
let mut part_groups = Vec::new();
for part in parts {
let Some(mut prev_part) = part_in_range.take() else {
part_in_range = Some(part);
continue;
};
if prev_part.overlaps(&part) {
prev_part.merge(part);
part_in_range = Some(prev_part);
} else {
// A new group.
part_groups.push(prev_part);
part_in_range = Some(part);
}
}
if let Some(part) = part_in_range {
part_groups.push(part);
}
part_groups
}
/// Merges parts by parallelism.
/// It merges parts if the number of parts is greater than `parallelism`.
fn maybe_merge_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPart> {
assert!(parallelism > 0);
if parts.len() <= parallelism {
// No need to merge parts.
return parts;
}
// Sort parts by number of memtables and ranges in reverse order.
parts.sort_unstable_by(|a, b| {
a.memtable_ranges
.len()
.cmp(&b.memtable_ranges.len())
.then_with(|| {
let a_ranges_len = a
.file_ranges
.iter()
.map(|ranges| ranges.len())
.sum::<usize>();
let b_ranges_len = b
.file_ranges
.iter()
.map(|ranges| ranges.len())
.sum::<usize>();
a_ranges_len.cmp(&b_ranges_len)
})
.reverse()
});
let parts_to_reduce = parts.len() - parallelism;
for _ in 0..parts_to_reduce {
// Safety: We ensure `parts.len() > parallelism`.
let part = parts.pop().unwrap();
parts.last_mut().unwrap().merge(part);
}
parts
}
/// Splits parts by parallelism.
/// It splits a part if it only scans one file and doesn't scan any memtable.
fn maybe_split_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPart> {
assert!(parallelism > 0);
if parts.len() >= parallelism {
// No need to split parts.
return parts;
}
let has_part_to_split = parts.iter().any(|part| part.can_split_preserve_order());
if !has_part_to_split {
// No proper parts to scan.
return parts;
}
// Sorts parts by the number of ranges in the first file.
parts.sort_unstable_by(|a, b| {
let a_len = a.file_ranges.first().map(|file| file.len()).unwrap_or(0);
let b_len = b.file_ranges.first().map(|file| file.len()).unwrap_or(0);
a_len.cmp(&b_len).reverse()
});
let num_parts_to_split = parallelism - parts.len();
let mut output_parts = Vec::with_capacity(parallelism);
// Split parts up to num_parts_to_split.
for part in parts.iter_mut() {
if !part.can_split_preserve_order() {
continue;
}
// Safety: `can_split_preserve_order()` ensures file_ranges.len() == 1.
// Splits part into `num_parts_to_split + 1` new parts if possible.
let target_part_num = num_parts_to_split + 1;
let ranges_per_part = (part.file_ranges[0].len() + target_part_num - 1) / target_part_num;
// `can_split_preserve_order()` ensures part.file_ranges[0].len() > 1.
assert!(ranges_per_part > 0);
for ranges in part.file_ranges[0].chunks(ranges_per_part) {
let new_part = ScanPart {
memtable_ranges: Vec::new(),
file_ranges: smallvec![ranges.to_vec()],
time_range: part.time_range,
};
output_parts.push(new_part);
}
// Replace the current part with the last output part as we will put the current part
// into the output parts later.
*part = output_parts.pop().unwrap();
if output_parts.len() >= num_parts_to_split {
// We already split enough parts.
break;
}
}
// Put the remaining parts into the output parts.
output_parts.append(&mut parts);
output_parts
}
#[cfg(test)]
mod tests {
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use super::*;
use crate::memtable::MemtableId;
use crate::test_util::memtable_util::mem_range_for_test;
type Output = (Vec<MemtableId>, i64, i64);
fn run_group_parts_test(input: &[(MemtableId, i64, i64)], expect: &[Output]) {
let parts = input
.iter()
.map(|(id, start, end)| {
let range = (
Timestamp::new(*start, TimeUnit::Second),
Timestamp::new(*end, TimeUnit::Second),
);
ScanPart {
memtable_ranges: vec![mem_range_for_test(*id)],
file_ranges: smallvec![],
time_range: Some(range),
}
})
.collect();
let output = group_parts_by_range(parts);
let actual: Vec<_> = output
.iter()
.map(|part| {
let ids: Vec<_> = part.memtable_ranges.iter().map(|mem| mem.id()).collect();
let range = part.time_range.unwrap();
(ids, range.0.value(), range.1.value())
})
.collect();
assert_eq!(expect, actual);
}
#[test]
fn test_group_parts() {
// Group 1 part.
run_group_parts_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
// 1, 2, 3, 4 => [3, 1, 4], [2]
run_group_parts_test(
&[
(1, 1000, 2000),
(2, 6000, 7000),
(3, 0, 1500),
(4, 1500, 3000),
],
&[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
);
// 1, 2, 3 => [3], [1], [2],
run_group_parts_test(
&[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
&[
(vec![3], 0, 1000),
(vec![1], 3000, 4000),
(vec![2], 4001, 6000),
],
);
/// Returns the scanner type.
fn get_scanner_type(compaction: bool) -> &'static str {
if compaction {
"SeqScan(compaction)"
} else {
"SeqScan"
}
}

View File

@@ -21,24 +21,17 @@ use std::time::Instant;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::debug;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use crate::cache::CacheManager;
use crate::error::Result;
use crate::memtable::MemtableRange;
use crate::read::compat::CompatBatch;
use crate::read::projection::ProjectionMapper;
use crate::read::range::RowGroupIndex;
use crate::error::{PartitionOutOfRangeSnafu, Result};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::{ScannerMetrics, Source};
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics};
use crate::read::{Batch, ScannerMetrics};
/// Scans a region without providing any output ordering guarantee.
///
@@ -85,62 +78,23 @@ impl UnorderedScan {
Ok(stream)
}
/// Fetch a batch from the source and convert it into a record batch.
async fn fetch_from_source(
source: &mut Source,
mapper: &ProjectionMapper,
cache: Option<&CacheManager>,
compat_batch: Option<&CompatBatch>,
metrics: &mut ScannerMetrics,
) -> common_recordbatch::error::Result<Option<RecordBatch>> {
let start = Instant::now();
let Some(mut batch) = source
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
else {
metrics.scan_cost += start.elapsed();
return Ok(None);
};
if let Some(compat) = compat_batch {
batch = compat
.compat_batch(batch)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}
metrics.scan_cost += start.elapsed();
let convert_start = Instant::now();
let record_batch = mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
Ok(Some(record_batch))
}
/// Scans a [PartitionRange] and returns a stream.
fn scan_partition_range<'a>(
stream_ctx: &'a StreamContext,
part_range: &'a PartitionRange,
mem_ranges: &'a mut Vec<MemtableRange>,
file_ranges: &'a mut Vec<FileRange>,
reader_metrics: &'a mut ReaderMetrics,
metrics: &'a mut ScannerMetrics,
) -> impl Stream<Item = common_recordbatch::error::Result<RecordBatch>> + 'a {
/// Scans a [PartitionRange] by its `identifier` and returns a stream.
fn scan_partition_range(
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
part_metrics: PartitionMetrics,
) -> impl Stream<Item = Result<Batch>> {
stream! {
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = Self::scan_mem_ranges(stream_ctx, *index, mem_ranges, metrics);
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
for await batch in stream {
yield batch;
}
} else {
let stream = Self::scan_file_ranges(stream_ctx, *index, file_ranges, reader_metrics, metrics);
let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
for await batch in stream {
yield batch;
}
@@ -149,124 +103,68 @@ impl UnorderedScan {
}
}
/// Scans memtable ranges at `index`.
fn scan_mem_ranges<'a>(
stream_ctx: &'a StreamContext,
index: RowGroupIndex,
ranges: &'a mut Vec<MemtableRange>,
metrics: &'a mut ScannerMetrics,
) -> impl Stream<Item = common_recordbatch::error::Result<RecordBatch>> + 'a {
try_stream! {
let mapper = &stream_ctx.input.mapper;
let cache = stream_ctx.input.cache_manager.as_deref();
stream_ctx.build_mem_ranges(index, ranges);
metrics.num_mem_ranges += ranges.len();
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_iter().map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.build_reader_cost = build_reader_start.elapsed();
let mut source = Source::Iter(iter);
while let Some(batch) =
Self::fetch_from_source(&mut source, mapper, cache, None, metrics).await?
{
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
}
}
}
/// Scans file ranges at `index`.
fn scan_file_ranges<'a>(
stream_ctx: &'a StreamContext,
index: RowGroupIndex,
ranges: &'a mut Vec<FileRange>,
reader_metrics: &'a mut ReaderMetrics,
metrics: &'a mut ScannerMetrics,
) -> impl Stream<Item = common_recordbatch::error::Result<RecordBatch>> + 'a {
try_stream! {
let mapper = &stream_ctx.input.mapper;
let cache = stream_ctx.input.cache_manager.as_deref();
stream_ctx
.build_file_ranges(index, ranges, reader_metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.num_file_ranges += ranges.len();
for range in ranges {
let build_reader_start = Instant::now();
let reader = range
.reader(None)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
metrics.build_reader_cost += build_reader_start.elapsed();
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);
while let Some(batch) =
Self::fetch_from_source(&mut source, mapper, cache, compat_batch, metrics)
.await?
{
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let yield_start = Instant::now();
yield batch;
metrics.yield_cost += yield_start.elapsed();
}
if let Source::PruneReader(mut reader) = source {
reader_metrics.merge_from(reader.metrics());
}
}
}
}
fn scan_partition_impl(
&self,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
};
if partition >= self.properties.partitions.len() {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
.build(),
));
}
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
"UnorderedScan",
self.stream_ctx.query_start,
ScannerMetrics {
prepare_scan_cost: self.stream_ctx.query_start.elapsed(),
..Default::default()
},
);
let stream_ctx = self.stream_ctx.clone();
let ranges_opt = self.properties.partitions.get(partition).cloned();
let part_ranges = self.properties.partitions[partition].clone();
let stream = stream! {
let first_poll = stream_ctx.query_start.elapsed();
let Some(part_ranges) = ranges_opt else {
return;
};
let stream = try_stream! {
part_metrics.on_first_poll();
let mut mem_ranges = Vec::new();
let mut file_ranges = Vec::new();
let mut reader_metrics = ReaderMetrics::default();
let cache = stream_ctx.input.cache_manager.as_deref();
// Scans each part.
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let stream = Self::scan_partition_range(
&stream_ctx,
&part_range,
&mut mem_ranges,
&mut file_ranges,
&mut reader_metrics,
&mut metrics,
stream_ctx.clone(),
part_range.identifier,
part_metrics.clone(),
);
for await batch in stream {
yield batch;
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
reader_metrics.observe_rows("unordered_scan_files");
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
let mapper = &stream_ctx.input.mapper;
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll,
);
part_metrics.on_finish();
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
@@ -302,20 +200,20 @@ impl RegionScanner for UnorderedScan {
}
impl DisplayAs for UnorderedScan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"UnorderedScan: region={}, ",
self.stream_ctx.input.mapper.metadata().region_id
)?;
self.stream_ctx.format_for_explain(t, f)
self.stream_ctx.format_for_explain(f)
}
}
impl fmt::Debug for UnorderedScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UnorderedScan")
.field("parts", &self.stream_ctx.parts)
.field("num_ranges", &self.stream_ctx.ranges.len())
.finish()
}
}

View File

@@ -90,7 +90,8 @@ impl FromStr for FileId {
}
}
/// Time range of a SST file.
/// Time range (min and max timestamps) of a SST file.
/// Both min and max are inclusive.
pub type FileTimeRange = (Timestamp, Timestamp);
/// Checks if two inclusive timestamp ranges overlap with each other.

View File

@@ -238,9 +238,6 @@ impl ParquetReaderBuilder {
cache_manager: self.cache_manager.clone(),
};
// TODO(yingwen): count the cost of the method.
metrics.build_cost = start.elapsed();
let mut filters = if let Some(predicate) = &self.predicate {
predicate
.exprs()
@@ -270,6 +267,9 @@ impl ParquetReaderBuilder {
);
let context = FileRangeContext::new(reader_builder, filters, read_format, codec);
metrics.build_cost += start.elapsed();
Ok((context, row_groups))
}

View File

@@ -35,7 +35,7 @@ use crate::memtable::key_values::KeyValue;
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
use crate::memtable::{
BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId,
MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
MemtableRange, MemtableRef, MemtableStats,
};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -361,11 +361,3 @@ pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec<i64> {
.map(|v| v.unwrap().0.value())
.collect()
}
/// Builds a memtable range for test.
pub(crate) fn mem_range_for_test(id: MemtableId) -> MemtableRange {
let builder = Box::new(EmptyIterBuilder::default());
let context = Arc::new(MemtableRangeContext::new(id, builder));
MemtableRange::new(context)
}

View File

@@ -180,7 +180,7 @@ impl ScannerPartitioning {
pub struct PartitionRange {
/// Start time of time index column. Inclusive.
pub start: Timestamp,
/// End time of time index column. Inclusive.
/// End time of time index column. Exclusive.
pub end: Timestamp,
/// Number of rows in this range. Is used to balance ranges between partitions.
pub num_rows: usize,