feat: Implement RegionScanner for SeqScan (#4060)

* feat: ordered builder wip

* feat: impl RegionScanner for SeqScan

* feat: implement scan_partition and build_stream

* chore: return SeqScan as RegionScanner

* fix: group parts

* feat: split parts

* chore: reader metrics

* chore: metrics

* chore: remove unused codes

* chore: support holding a group of ranges in ScanPart

* feat: group ScanParts to ScanParts

* feat: impl SeqScanner again

* chore: observe build cost in ScannerMetrics

* chore: fix compiler warnings

* style: fix clippy

* docs: update config docs

* chore: forward DisplayAs to scanner

* test: update sqlness tests

* chore: update debug fmt

* chore: custom debug for timestamp

fix test compiling issue with common-macro when running
cargo nextest -p common-time

* chore: update debug format

* feat: update fmt for scan part

* chore: fix warning

* fix: sanitize parallelism

* feat: split parts

* test: fix config api test

* feat: update logs

* chore: Revert "chore: remove unused codes"

This reverts commit b548b30a01eeded59b1a0a8d89f9293ca63afc41.

* chore: Revert "docs: update config docs"

This reverts commit a7997e78d6ddcf635560574de8c1948c495bdd12.

* feat: each partition scan files in parallel

* test: fix config api test

* docs: fix typo

* chore: address comments, simplify tests

* feat: global semaphore

* feat: always spawn task

* chore: simplify default explain output format

* handle output partiton number is 0

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2024-06-12 16:21:30 +08:00
committed by GitHub
parent 9473daab8b
commit 65f8b72d34
29 changed files with 884 additions and 348 deletions

View File

@@ -14,7 +14,7 @@
use core::default::Default;
use std::cmp::Ordering;
use std::fmt::{Display, Formatter, Write};
use std::fmt::{self, Display, Formatter, Write};
use std::hash::{Hash, Hasher};
use std::time::Duration;
@@ -41,7 +41,7 @@ use crate::{error, Interval};
/// # Note:
/// For values out of range, you can still store these timestamps, but while performing arithmetic
/// or formatting operations, it will return an error or just overflow.
#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)]
#[derive(Clone, Default, Copy, Serialize, Deserialize)]
pub struct Timestamp {
value: i64,
unit: TimeUnit,
@@ -498,6 +498,12 @@ impl From<Timestamp> for serde_json::Value {
}
}
impl fmt::Debug for Timestamp {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}::{}", self.value, self.unit)
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TimeUnit {
Second,
@@ -1382,4 +1388,24 @@ mod tests {
Timestamp::MAX_SECOND.to_timezone_aware_string(Some(&Timezone::Named(Tz::UTC)))
);
}
#[test]
fn test_debug_timestamp() {
assert_eq!(
"1000::Second",
format!("{:?}", Timestamp::new(1000, TimeUnit::Second))
);
assert_eq!(
"1001::Millisecond",
format!("{:?}", Timestamp::new(1001, TimeUnit::Millisecond))
);
assert_eq!(
"1002::Microsecond",
format!("{:?}", Timestamp::new(1002, TimeUnit::Microsecond))
);
assert_eq!(
"1003::Nanosecond",
format!("{:?}", Timestamp::new(1003, TimeUnit::Nanosecond))
);
}
}

View File

@@ -25,7 +25,7 @@ use crate::compaction::buckets::infer_time_bucket;
use crate::compaction::picker::{CompactionTask, Picker};
use crate::compaction::task::CompactionTaskImpl;
use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::file::{overlaps, FileHandle, FileId};
use crate::sst::version::LevelMeta;
/// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction
@@ -271,15 +271,6 @@ fn assign_to_windows<'a>(
windows.into_iter().map(|w| (w.time_window, w)).collect()
}
/// Checks if two inclusive timestamp ranges overlap with each other.
fn overlaps(l: &(Timestamp, Timestamp), r: &(Timestamp, Timestamp)) -> bool {
let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
let (_, l_end) = l;
let (r_start, _) = r;
r_start <= l_end
}
/// Finds the latest active writing window among all files.
/// Returns `None` when there are no files or all files are corrupted.
fn find_latest_window_in_seconds<'a>(

View File

@@ -82,7 +82,7 @@ async fn test_append_mode_write_query() {
.scan_region(region_id, ScanRequest::default())
.unwrap();
let seq_scan = scan.seq_scan().unwrap();
let stream = seq_scan.build_stream().await.unwrap();
let stream = seq_scan.build_stream().unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -330,6 +330,8 @@ async fn test_different_order_and_type() {
#[tokio::test]
async fn test_put_delete() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

View File

@@ -87,7 +87,7 @@ async fn test_scan_without_filtering_deleted() {
let seq_scan = scan.scan_without_filter_deleted().unwrap();
let stream = seq_scan.build_stream().await.unwrap();
let stream = seq_scan.build_stream().unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+

View File

@@ -64,11 +64,19 @@ impl Default for MemtableConfig {
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
estimated_bytes: usize,
/// The time range that this memtable contains.
/// The time range that this memtable contains. It is None if
/// and only if the memtable is empty.
time_range: Option<(Timestamp, Timestamp)>,
}
impl MemtableStats {
/// Attaches the time range to the stats.
#[cfg(any(test, feature = "test"))]
pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self {
self.time_range = time_range;
self
}
/// Returns the estimated bytes allocated by this memtable.
pub fn bytes_allocated(&self) -> usize {
self.estimated_bytes

View File

@@ -23,6 +23,7 @@ pub(crate) mod unordered_scan;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use api::v1::OpType;
use async_trait::async_trait;
@@ -50,6 +51,7 @@ use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::sst::parquet::reader::RowGroupReader;
/// Storage internal representation of a batch of rows for a primary key (time series).
@@ -744,6 +746,55 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
}
}
/// Metrics for scanners.
#[derive(Debug, Default)]
pub(crate) struct ScannerMetrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build parts.
build_parts_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration of the scan.
total_cost: Duration,
/// Number of batches returned.
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
}
impl ScannerMetrics {
/// Sets and observes metrics on initializing parts.
fn observe_init_part(&mut self, build_parts_cost: Duration) {
self.build_parts_cost = build_parts_cost;
// Observes metrics.
READ_STAGE_ELAPSED
.with_label_values(&["prepare_scan"])
.observe(self.prepare_scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());
}
/// Observes metrics on scanner finish.
fn observe_metrics_on_finish(&self) {
READ_STAGE_ELAPSED
.with_label_values(&["convert_rb"])
.observe(self.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan"])
.observe(self.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(self.total_cost.as_secs_f64());
READ_ROWS_RETURN.observe(self.num_rows as f64);
READ_BATCHES_RETURN.observe(self.num_batches as f64);
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -16,16 +16,19 @@
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use smallvec::SmallVec;
use store_api::region_engine::RegionScannerRef;
use store_api::storage::ScanRequest;
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Semaphore};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
@@ -34,13 +37,13 @@ use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{CompatBatch, CompatReader};
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{compat, Batch, Source};
use crate::read::{Batch, Source};
use crate::region::version::VersionRef;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::file::{overlaps, FileHandle, FileMeta};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
@@ -57,7 +60,7 @@ impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build_stream().await.map_err(BoxedError::new),
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}
@@ -65,11 +68,7 @@ impl Scanner {
/// Returns a [RegionScanner] to scan the region.
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => {
let stream = seq_scan.build_stream().await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}
Scanner::Seq(seq_scan) => Ok(Arc::new(seq_scan)),
Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)),
}
}
@@ -221,9 +220,7 @@ impl ScanRegion {
/// Scan sequentially.
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
let seq_scan = SeqScan::new(input);
Ok(seq_scan)
Ok(SeqScan::new(input))
}
/// Unordered scan.
@@ -235,8 +232,7 @@ impl ScanRegion {
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
let scan = SeqScan::new(input);
Ok(scan)
Ok(SeqScan::new(input))
}
/// Creates a scan input.
@@ -263,9 +259,8 @@ impl ScanRegion {
return false;
}
let stats = mem.stats();
let Some((start, end)) = stats.time_range() else {
return true;
};
// Safety: the memtable is not empty.
let (start, end) = stats.time_range().unwrap();
// The time range of the memtable is inclusive.
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
@@ -364,13 +359,6 @@ pub(crate) struct ScanParallism {
pub(crate) channel_size: usize,
}
impl ScanParallism {
/// Returns true if we allow parallel scan.
pub(crate) fn allow_parallel_scan(&self) -> bool {
self.parallelism > 1
}
}
/// Returns true if the time range of a SST `file` matches the `predicate`.
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
@@ -509,60 +497,15 @@ impl ScanInput {
self
}
/// Builds and returns sources to read.
pub(crate) async fn build_sources(&self) -> Result<Vec<Source>> {
let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len());
for mem in &self.memtables {
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone())?;
sources.push(Source::Iter(iter));
}
for file in &self.files {
let maybe_reader = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build()
.await;
let reader = match maybe_reader {
Ok(reader) => reader,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
continue;
} else {
return Err(e);
}
}
};
if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) {
sources.push(Source::Reader(Box::new(reader)));
} else {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat_reader =
CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?;
sources.push(Source::Reader(Box::new(compat_reader)));
}
}
READ_SST_COUNT.observe(self.files.len() as f64);
Ok(sources)
}
/// Scans sources in parallel.
///
/// # Panics if the input doesn't allow parallel scan.
pub(crate) async fn build_parallel_sources(&self) -> Result<Vec<Source>> {
assert!(self.parallelism.allow_parallel_scan());
// Scall all memtables and SSTs.
let sources = self.build_sources().await?;
let semaphore = Arc::new(Semaphore::new(self.parallelism.parallelism));
pub(crate) fn create_parallel_sources(
&self,
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<Source>> {
debug_assert!(self.parallelism.parallelism > 1);
// Spawn a task for each source.
let sources = sources
.into_iter()
@@ -576,7 +519,7 @@ impl ScanInput {
Ok(sources)
}
/// Prunes file ranges to scan and adds them tothe `collector`.
/// Prunes file ranges to scan and adds them to the `collector`.
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
@@ -641,7 +584,7 @@ impl ScanInput {
common_runtime::spawn_read(async move {
loop {
// We release the permit before sending result to avoid the task waiting on
// the channel with the permit holded
// the channel with the permit held.
let maybe_batch = {
// Safety: We never close the semaphore.
let _permit = semaphore.acquire().await.unwrap();
@@ -680,6 +623,10 @@ impl ScanInput {
}
}
/// Groups of file ranges. Each group in the list contains multiple file
/// ranges to scan. File ranges in the same group may come from different files.
pub(crate) type FileRangesGroup = SmallVec<[Vec<FileRange>; 4]>;
/// A partition of a scanner to read.
/// It contains memtables and file ranges to scan.
#[derive(Default)]
@@ -688,17 +635,60 @@ pub(crate) struct ScanPart {
/// We scan the whole memtable now. We might scan a range of the memtable in the future.
pub(crate) memtables: Vec<MemtableRef>,
/// File ranges to scan.
pub(crate) file_ranges: Vec<FileRange>,
pub(crate) file_ranges: FileRangesGroup,
/// Optional time range of the part (inclusive).
pub(crate) time_range: Option<(Timestamp, Timestamp)>,
}
impl fmt::Debug for ScanPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ScanPart({} memtables, {} file ranges)",
"ScanPart({} memtables, {} file ranges",
self.memtables.len(),
self.file_ranges.len()
)
self.file_ranges
.iter()
.map(|ranges| ranges.len())
.sum::<usize>(),
)?;
if let Some(time_range) = &self.time_range {
write!(f, ", time range: {:?})", time_range)
} else {
write!(f, ")")
}
}
}
impl ScanPart {
/// Returns true if the time range given `part` overlaps with this part.
pub(crate) fn overlaps(&self, part: &ScanPart) -> bool {
let (Some(current_range), Some(part_range)) = (self.time_range, part.time_range) else {
return true;
};
overlaps(&current_range, &part_range)
}
/// Merges given `part` to this part.
pub(crate) fn merge(&mut self, mut part: ScanPart) {
self.memtables.append(&mut part.memtables);
self.file_ranges.append(&mut part.file_ranges);
let Some(part_range) = part.time_range else {
return;
};
let Some(current_range) = self.time_range else {
self.time_range = part.time_range;
return;
};
let start = current_range.0.min(part_range.0);
let end = current_range.1.max(part_range.1);
self.time_range = Some((start, end));
}
/// Returns true if the we can split the part into multiple parts
/// and preserving order.
pub(crate) fn can_split_preserve_order(&self) -> bool {
self.memtables.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1
}
}
@@ -711,3 +701,105 @@ pub(crate) trait FileRangeCollector {
file_ranges: impl Iterator<Item = FileRange>,
);
}
/// Optional list of [ScanPart]s.
#[derive(Default)]
pub(crate) struct ScanPartList(pub(crate) Option<Vec<ScanPart>>);
impl fmt::Debug for ScanPartList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
Some(parts) => write!(f, "{:?}", parts),
None => write!(f, "[]"),
}
}
}
impl ScanPartList {
/// Returns true if the list is None.
pub(crate) fn is_none(&self) -> bool {
self.0.is_none()
}
/// Sets parts to the list.
pub(crate) fn set_parts(&mut self, parts: Vec<ScanPart>) {
self.0 = Some(parts);
}
/// Gets the part by index, returns None if the index is out of bound.
/// # Panics
/// Panics if parts are not initialized.
pub(crate) fn get_part(&mut self, index: usize) -> Option<&ScanPart> {
let parts = self.0.as_ref().unwrap();
parts.get(index)
}
/// Returns the number of parts.
pub(crate) fn len(&self) -> usize {
self.0.as_ref().map_or(0, |parts| parts.len())
}
/// Returns the number of memtables.
pub(crate) fn num_memtables(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.memtables.len()).sum()
})
}
/// Returns the number of file ranges.
pub(crate) fn num_file_ranges(&self) -> usize {
self.0.as_ref().map_or(0, |parts| {
parts.iter().map(|part| part.file_ranges.len()).sum()
})
}
}
/// Context shared by different streams from a scanner.
/// It contains the input and distributes input to multiple parts
/// to scan.
pub(crate) struct StreamContext {
/// Input memtables and files.
pub(crate) input: ScanInput,
/// Parts to scan.
/// The scanner builds parts to scan from the input lazily.
/// The mutex is used to ensure the parts are only built once.
pub(crate) parts: Mutex<ScanPartList>,
// Metrics:
/// The start time of the query.
pub(crate) query_start: Instant,
/// Time elapsed before creating the scanner.
pub(crate) prepare_scan_cost: Duration,
}
impl StreamContext {
/// Creates a new [StreamContext].
pub(crate) fn new(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let prepare_scan_cost = query_start.elapsed();
Self {
input,
parts: Mutex::new(ScanPartList::default()),
query_start,
prepare_scan_cost,
}
}
/// Format parts for explain.
pub(crate) fn format_parts(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match self.parts.try_lock() {
Ok(inner) => match t {
DisplayFormatType::Default => write!(
f,
"partition_count={} ({} memtables, {} file ranges)",
inner.len(),
inner.num_memtables(),
inner.num_file_ranges()
),
DisplayFormatType::Verbose => write!(f, "{:?}", &*inner),
},
Err(_) => write!(f, "<locked>"),
}
}
}

View File

@@ -14,172 +14,550 @@
//! Sequential scan.
use std::time::{Duration, Instant};
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::{debug, tracing};
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::debug;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties};
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use tokio::sync::Semaphore;
use crate::cache::CacheManager;
use crate::error::Result;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::merge::MergeReaderBuilder;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::ScanInput;
use crate::read::{BatchReader, BoxedBatchReader};
use crate::memtable::MemtableRef;
use crate::read::merge::{MergeReader, MergeReaderBuilder};
use crate::read::scan_region::{
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::sst::file::FileMeta;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;
/// Scans a region and returns rows in a sorted sequence.
///
/// The output order is always `order by primary key, time index`.
/// The output order is always `order by primary keys, time index`.
pub struct SeqScan {
input: ScanInput,
/// Properties of the scanner.
properties: ScannerProperties,
/// Context of streams.
stream_ctx: Arc<StreamContext>,
/// Semaphore to control scan parallelism of files.
/// Streams created by the scanner share the same semaphore.
semaphore: Arc<Semaphore>,
}
impl SeqScan {
/// Creates a new [SeqScan].
#[must_use]
pub(crate) fn new(input: ScanInput) -> SeqScan {
SeqScan { input }
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::new(ScannerPartitioning::Unknown(parallelism));
let stream_ctx = Arc::new(StreamContext::new(input));
Self {
properties,
stream_ctx,
semaphore: Arc::new(Semaphore::new(parallelism)),
}
}
/// Builds a stream for the query.
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
let mut metrics = Metrics::default();
let build_start = Instant::now();
let query_start = self.input.query_start.unwrap_or(build_start);
metrics.prepare_scan_cost = query_start.elapsed();
let use_parallel = self.use_parallel_reader();
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut reader = if use_parallel {
self.build_parallel_reader().await?
} else {
self.build_reader().await?
};
metrics.build_reader_cost = build_start.elapsed();
READ_STAGE_ELAPSED
.with_label_values(&["prepare_scan"])
.observe(metrics.prepare_scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["build_reader"])
.observe(metrics.build_reader_cost.as_secs_f64());
pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_opt(None)
}
// Creates a stream to poll the batch reader and convert batch into record batch.
let mapper = self.input.mapper.clone();
let cache_manager = self.input.cache_manager.clone();
let parallelism = self.input.parallelism.parallelism;
let stream = try_stream! {
let cache = cache_manager.as_ref().map(|cache| cache.as_ref());
while let Some(batch) =
Self::fetch_record_batch(&mut reader, &mapper, cache, &mut metrics).await?
{
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
yield batch;
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
..Default::default()
};
let maybe_reader =
Self::build_merge_reader(&self.stream_ctx, None, self.semaphore.clone(), &mut metrics)
.await?;
// Safety: `build_merge_reader()` always returns a reader if partition is None.
let reader = maybe_reader.unwrap();
Ok(Box::new(reader))
}
/// Builds sources from a [ScanPart].
fn build_part_sources(
part: &ScanPart,
projection: Option<&[ColumnId]>,
predicate: Option<&Predicate>,
sources: &mut Vec<Source>,
) -> Result<()> {
sources.reserve(part.memtables.len() + part.file_ranges.len());
// Read memtables.
for mem in &part.memtables {
let iter = mem.iter(projection, predicate.cloned())?;
sources.push(Source::Iter(iter));
}
// Read files.
for file in &part.file_ranges {
if file.is_empty() {
continue;
}
// Update metrics.
metrics.total_cost = query_start.elapsed();
READ_STAGE_ELAPSED.with_label_values(&["convert_rb"]).observe(metrics.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED.with_label_values(&["scan"]).observe(metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.total_cost.as_secs_f64());
READ_ROWS_RETURN.observe(metrics.num_rows as f64);
READ_BATCHES_RETURN.observe(metrics.num_batches as f64);
// Creates a stream to read the file.
let ranges = file.clone();
let stream = try_stream! {
let mut reader_metrics = ReaderMetrics::default();
// Safety: We checked whether it is empty before.
let file_id = ranges[0].file_handle().file_id();
let region_id = ranges[0].file_handle().region_id();
let range_num = ranges.len();
for range in ranges {
let mut reader = range.reader().await?;
let compat_batch = range.compat_batch();
while let Some(mut batch) = reader.next_batch().await? {
if let Some(compat) = compat_batch {
batch = compat
.compat_batch(batch)?;
}
yield batch;
}
reader_metrics.merge_from(reader.metrics());
}
debug!(
"Seq scan region {}, file {}, {} ranges finished, metrics: {:?}",
region_id, file_id, range_num, reader_metrics
);
};
let stream = Box::pin(stream);
sources.push(Source::Stream(stream));
}
Ok(())
}
/// Builds a merge reader.
/// If `partition` is None, reads all partitions.
/// If the `partition` is out of bound, returns None.
async fn build_merge_reader(
stream_ctx: &StreamContext,
partition: Option<usize>,
semaphore: Arc<Semaphore>,
metrics: &mut ScannerMetrics,
) -> Result<Option<MergeReader>> {
let mut parts = stream_ctx.parts.lock().await;
maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?;
let input = &stream_ctx.input;
let mut sources = Vec::new();
if let Some(index) = partition {
let Some(part) = parts.get_part(index) else {
return Ok(None);
};
Self::build_part_sources(
part,
Some(input.mapper.column_ids()),
input.predicate.as_ref(),
&mut sources,
)?;
} else {
// Safety: We initialized parts before.
for part in parts.0.as_ref().unwrap() {
Self::build_part_sources(
part,
Some(input.mapper.column_ids()),
input.predicate.as_ref(),
&mut sources,
)?;
}
}
if stream_ctx.input.parallelism.parallelism > 1 {
// Read sources in parallel. We always spawn a task so we can control the parallelism
// by the semaphore.
sources = stream_ctx
.input
.create_parallel_sources(sources, semaphore.clone())?;
}
let dedup = !stream_ctx.input.append_mode;
let mut builder =
MergeReaderBuilder::from_sources(sources, dedup, stream_ctx.input.filter_deleted);
builder.build().await.map(Some)
}
/// Scans one partition or all partitions.
fn scan_partition_opt(
&self,
partition: Option<usize>,
) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.semaphore.clone();
let stream = try_stream! {
let maybe_reader = Self::build_merge_reader(&stream_ctx, partition, semaphore, &mut metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Some(mut reader) = maybe_reader else {
return;
};
let cache = stream_ctx.input.cache_manager.as_deref();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
yield record_batch;
fetch_start = Instant::now();
}
metrics.scan_cost += fetch_start.elapsed();
metrics.total_cost = stream_ctx.query_start.elapsed();
metrics.observe_metrics_on_finish();
debug!(
"Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}, parallelism: {}",
mapper.metadata().region_id, metrics, use_parallel, parallelism,
"Seq scan finished, region_id: {:?}, partition: {:?}, metrics: {:?}",
stream_ctx.input.mapper.metadata().region_id, partition, metrics,
);
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.input.mapper.output_schema(),
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
}
/// Builds a [BoxedBatchReader] from sequential scan.
pub async fn build_reader(&self) -> Result<BoxedBatchReader> {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let sources = self.input.build_sources().await?;
let dedup = !self.input.append_mode;
let mut builder =
MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted);
let reader = builder.build().await?;
Ok(Box::new(reader))
impl RegionScanner for SeqScan {
fn properties(&self) -> &ScannerProperties {
&self.properties
}
/// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel.
async fn build_parallel_reader(&self) -> Result<BoxedBatchReader> {
let sources = self.input.build_parallel_sources().await?;
let dedup = !self.input.append_mode;
let mut builder =
MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted);
let reader = builder.build().await?;
Ok(Box::new(reader))
fn schema(&self) -> SchemaRef {
self.stream_ctx.input.mapper.output_schema()
}
/// Returns whether to use a parallel reader.
fn use_parallel_reader(&self) -> bool {
self.input.parallelism.allow_parallel_scan()
&& (self.input.files.len() + self.input.memtables.len()) > 1
}
/// Fetch a batch from the reader and convert it into a record batch.
#[tracing::instrument(skip_all, level = "trace")]
async fn fetch_record_batch(
reader: &mut dyn BatchReader,
mapper: &ProjectionMapper,
cache: Option<&CacheManager>,
metrics: &mut Metrics,
) -> common_recordbatch::error::Result<Option<RecordBatch>> {
let start = Instant::now();
let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
else {
metrics.scan_cost += start.elapsed();
return Ok(None);
};
let convert_start = Instant::now();
let record_batch = mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
metrics.scan_cost += start.elapsed();
Ok(Some(record_batch))
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_opt(Some(partition))
}
}
/// Metrics for [SeqScan].
#[derive(Debug, Default)]
struct Metrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build the reader.
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration of the scan.
total_cost: Duration,
/// Number of batches returned.
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
impl DisplayAs for SeqScan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SeqScan: ")?;
self.stream_ctx.format_parts(t, f)
}
}
impl fmt::Debug for SeqScan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SeqScan")
.field("parts", &self.stream_ctx.parts)
.field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost)
.finish()
}
}
#[cfg(test)]
impl SeqScan {
/// Returns the input.
pub(crate) fn input(&self) -> &ScanInput {
&self.input
&self.stream_ctx.input
}
}
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
input: &ScanInput,
part_list: &mut ScanPartList,
metrics: &mut ScannerMetrics,
) -> Result<()> {
if part_list.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
part_list
.set_parts(distributor.build_parts(&input.memtables, input.parallelism.parallelism));
metrics.observe_init_part(now.elapsed());
}
Ok(())
}
/// Builds [ScanPart]s that preserves order.
#[derive(Default)]
pub(crate) struct SeqDistributor {
parts: Vec<ScanPart>,
}
impl FileRangeCollector for SeqDistributor {
fn append_file_ranges(
&mut self,
file_meta: &FileMeta,
file_ranges: impl Iterator<Item = FileRange>,
) {
// Creates a [ScanPart] for each file.
let ranges: Vec<_> = file_ranges.collect();
if ranges.is_empty() {
// No ranges to read.
return;
}
let part = ScanPart {
memtables: Vec::new(),
file_ranges: smallvec![ranges],
time_range: Some(file_meta.time_range),
};
self.parts.push(part);
}
}
impl SeqDistributor {
/// Groups file ranges and memtables by time ranges.
/// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1.
///
/// Output parts have non-overlapping time ranges.
fn build_parts(mut self, memtables: &[MemtableRef], parallelism: usize) -> Vec<ScanPart> {
// Creates a part for each memtable.
for mem in memtables {
let stats = mem.stats();
let part = ScanPart {
memtables: vec![mem.clone()],
file_ranges: smallvec![],
time_range: stats.time_range(),
};
self.parts.push(part);
}
let parallelism = parallelism.max(1);
let parts = group_parts_by_range(self.parts);
let parts = maybe_split_parts(parts, parallelism);
// Ensures it doesn't returns parts more than `parallelism`.
maybe_merge_parts(parts, parallelism)
}
}
/// Groups parts by time range. It may generate parts more than parallelism.
/// All time ranges are not None.
fn group_parts_by_range(mut parts: Vec<ScanPart>) -> Vec<ScanPart> {
if parts.is_empty() {
return Vec::new();
}
// Sorts parts by time range.
parts.sort_unstable_by(|a, b| {
// Safety: time ranges of parts from [SeqPartBuilder] are not None.
let a = a.time_range.unwrap();
let b = b.time_range.unwrap();
a.0.cmp(&b.0).then_with(|| b.1.cmp(&a.1))
});
let mut part_in_range = None;
// Parts with exclusive time ranges.
let mut part_groups = Vec::new();
for part in parts {
let Some(mut prev_part) = part_in_range.take() else {
part_in_range = Some(part);
continue;
};
if prev_part.overlaps(&part) {
prev_part.merge(part);
part_in_range = Some(prev_part);
} else {
// A new group.
part_groups.push(prev_part);
part_in_range = Some(part);
}
}
if let Some(part) = part_in_range {
part_groups.push(part);
}
part_groups
}
/// Merges parts by parallelism.
/// It merges parts if the number of parts is greater than `parallelism`.
fn maybe_merge_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPart> {
assert!(parallelism > 0);
if parts.len() <= parallelism {
// No need to merge parts.
return parts;
}
// Sort parts by number of memtables and ranges in reverse order.
parts.sort_unstable_by(|a, b| {
a.memtables
.len()
.cmp(&b.memtables.len())
.then_with(|| {
let a_ranges_len = a
.file_ranges
.iter()
.map(|ranges| ranges.len())
.sum::<usize>();
let b_ranges_len = b
.file_ranges
.iter()
.map(|ranges| ranges.len())
.sum::<usize>();
a_ranges_len.cmp(&b_ranges_len)
})
.reverse()
});
let parts_to_reduce = parts.len() - parallelism;
for _ in 0..parts_to_reduce {
// Safety: We ensure `parts.len() > parallelism`.
let part = parts.pop().unwrap();
parts.last_mut().unwrap().merge(part);
}
parts
}
/// Splits parts by parallelism.
/// It splits a part if it only scans one file and doesn't scan any memtable.
fn maybe_split_parts(mut parts: Vec<ScanPart>, parallelism: usize) -> Vec<ScanPart> {
assert!(parallelism > 0);
if parts.len() >= parallelism {
// No need to split parts.
return parts;
}
let has_part_to_split = parts.iter().any(|part| part.can_split_preserve_order());
if !has_part_to_split {
// No proper parts to scan.
return parts;
}
// Sorts parts by the number of ranges in the first file.
parts.sort_unstable_by(|a, b| {
let a_len = a.file_ranges.first().map(|file| file.len()).unwrap_or(0);
let b_len = b.file_ranges.first().map(|file| file.len()).unwrap_or(0);
a_len.cmp(&b_len).reverse()
});
let num_parts_to_split = parallelism - parts.len();
let mut output_parts = Vec::with_capacity(parallelism);
// Split parts up to num_parts_to_split.
for part in parts.iter_mut() {
if !part.can_split_preserve_order() {
continue;
}
// Safety: `can_split_preserve_order()` ensures file_ranges.len() == 1.
// Splits part into `num_parts_to_split + 1` new parts if possible.
let target_part_num = num_parts_to_split + 1;
let ranges_per_part = (part.file_ranges[0].len() + target_part_num - 1) / target_part_num;
// `can_split_preserve_order()` ensures part.file_ranges[0].len() > 1.
assert!(ranges_per_part > 0);
for ranges in part.file_ranges[0].chunks(ranges_per_part) {
let new_part = ScanPart {
memtables: Vec::new(),
file_ranges: smallvec![ranges.to_vec()],
time_range: part.time_range,
};
output_parts.push(new_part);
}
// Replace the current part with the last output part as we will put the current part
// into the output parts later.
*part = output_parts.pop().unwrap();
if output_parts.len() >= num_parts_to_split {
// We already split enough parts.
break;
}
}
// Put the remaining parts into the output parts.
output_parts.append(&mut parts);
output_parts
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use super::*;
use crate::memtable::MemtableId;
use crate::test_util::memtable_util::EmptyMemtable;
type Output = (Vec<MemtableId>, i64, i64);
fn run_group_parts_test(input: &[(MemtableId, i64, i64)], expect: &[Output]) {
let parts = input
.iter()
.map(|(id, start, end)| {
let range = (
Timestamp::new(*start, TimeUnit::Second),
Timestamp::new(*end, TimeUnit::Second),
);
ScanPart {
memtables: vec![Arc::new(
EmptyMemtable::new(*id).with_time_range(Some(range)),
)],
file_ranges: smallvec![],
time_range: Some(range),
}
})
.collect();
let output = group_parts_by_range(parts);
let actual: Vec<_> = output
.iter()
.map(|part| {
let ids: Vec<_> = part.memtables.iter().map(|mem| mem.id()).collect();
let range = part.time_range.unwrap();
(ids, range.0.value(), range.1.value())
})
.collect();
assert_eq!(expect, actual);
}
#[test]
fn test_group_parts() {
// Group 1 part.
run_group_parts_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
// 1, 2, 3, 4 => [3, 1, 4], [2]
run_group_parts_test(
&[
(1, 1000, 2000),
(2, 6000, 7000),
(3, 0, 1500),
(4, 1500, 3000),
],
&[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
);
// 1, 2, 3 => [3], [1], [2],
run_group_parts_test(
&[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
&[
(vec![3], 0, 1000),
(vec![1], 3000, 4000),
(vec![2], 4001, 6000),
],
);
}
}

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Instant;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
@@ -26,18 +26,19 @@ use common_telemetry::debug;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use smallvec::smallvec;
use snafu::ResultExt;
use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties};
use tokio::sync::Mutex;
use crate::cache::CacheManager;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::compat::CompatBatch;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_region::{FileRangeCollector, ScanInput, ScanPart};
use crate::read::Source;
use crate::read::scan_region::{
FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext,
};
use crate::read::{ScannerMetrics, Source};
use crate::sst::file::FileMeta;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;
@@ -55,22 +56,10 @@ pub struct UnorderedScan {
impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let prepare_scan_cost = query_start.elapsed();
let properties =
ScannerProperties::new(ScannerPartitioning::Unknown(input.parallelism.parallelism));
// Observes metrics.
READ_STAGE_ELAPSED
.with_label_values(&["prepare_scan"])
.observe(prepare_scan_cost.as_secs_f64());
let stream_ctx = Arc::new(StreamContext {
input,
parts: Mutex::new(ScanPartList::default()),
query_start,
prepare_scan_cost,
});
let properties = ScannerProperties::new(ScannerPartitioning::Unknown(
input.parallelism.parallelism.max(1),
));
let stream_ctx = Arc::new(StreamContext::new(input));
Self {
properties,
@@ -104,7 +93,7 @@ impl UnorderedScan {
mapper: &ProjectionMapper,
cache: Option<&CacheManager>,
compat_batch: Option<&CompatBatch>,
metrics: &mut Metrics,
metrics: &mut ScannerMetrics,
) -> common_recordbatch::error::Result<Option<RecordBatch>> {
let start = Instant::now();
@@ -133,20 +122,6 @@ impl UnorderedScan {
Ok(Some(record_batch))
}
fn observe_metrics_on_finish(metrics: &Metrics) {
READ_STAGE_ELAPSED
.with_label_values(&["convert_rb"])
.observe(metrics.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan"])
.observe(metrics.scan_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(metrics.total_cost.as_secs_f64());
READ_ROWS_RETURN.observe(metrics.num_rows as f64);
READ_BATCHES_RETURN.observe(metrics.num_batches as f64);
}
}
impl RegionScanner for UnorderedScan {
@@ -159,15 +134,14 @@ impl RegionScanner for UnorderedScan {
}
fn scan_partition(&self, partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
let mut metrics = Metrics {
let mut metrics = ScannerMetrics {
prepare_scan_cost: self.stream_ctx.prepare_scan_cost,
..Default::default()
};
let stream_ctx = self.stream_ctx.clone();
let stream = try_stream! {
let mut parts = stream_ctx.parts.lock().await;
parts
.maybe_init_parts(&stream_ctx.input, &mut metrics)
maybe_init_parts(&mut parts, &stream_ctx.input, &mut metrics)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -201,7 +175,8 @@ impl RegionScanner for UnorderedScan {
}
// Then scans file ranges.
let mut reader_metrics = ReaderMetrics::default();
for file_range in &part.file_ranges {
// Safety: UnorderedDistributor::build_parts() ensures this.
for file_range in &part.file_ranges[0] {
let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?;
let compat_batch = file_range.compat_batch();
let mut source = Source::RowGroupReader(reader);
@@ -216,7 +191,7 @@ impl RegionScanner for UnorderedScan {
}
metrics.total_cost = query_start.elapsed();
Self::observe_metrics_on_finish(&metrics);
metrics.observe_metrics_on_finish();
debug!(
"Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}",
partition, mapper.metadata().region_id, metrics, reader_metrics
@@ -232,8 +207,9 @@ impl RegionScanner for UnorderedScan {
}
impl DisplayAs for UnorderedScan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UnorderedScan: [{:?}]", self.stream_ctx.parts)
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "UnorderedScan: ")?;
self.stream_ctx.format_parts(t, f)
}
}
@@ -254,73 +230,22 @@ impl UnorderedScan {
}
}
/// List of [ScanPart]s.
#[derive(Debug, Default)]
struct ScanPartList(Option<Vec<ScanPart>>);
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(
part_list: &mut ScanPartList,
input: &ScanInput,
metrics: &mut ScannerMetrics,
) -> Result<()> {
if part_list.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
part_list
.set_parts(distributor.build_parts(&input.memtables, input.parallelism.parallelism));
impl ScanPartList {
/// Initializes parts if they are not built yet.
async fn maybe_init_parts(&mut self, input: &ScanInput, metrics: &mut Metrics) -> Result<()> {
if self.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
self.0 = Some(distributor.build_parts(&input.memtables, input.parallelism.parallelism));
metrics.build_parts_cost = now.elapsed();
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(metrics.build_parts_cost.as_secs_f64());
}
Ok(())
metrics.observe_init_part(now.elapsed());
}
/// Gets the part by index, returns None if the index is out of bound.
/// # Panics
/// Panics if parts are not initialized.
fn get_part(&mut self, index: usize) -> Option<&ScanPart> {
let parts = self.0.as_ref().unwrap();
parts.get(index)
}
}
/// Context shared by different streams.
/// It contains the input and distributes input to multiple parts
/// to scan.
struct StreamContext {
/// Input memtables and files.
input: ScanInput,
/// Parts to scan.
/// The scanner builds parts to scan from the input lazily.
/// The mutex is used to ensure the parts are only built once.
parts: Mutex<ScanPartList>,
// Metrics:
/// The start time of the query.
query_start: Instant,
/// Time elapsed before creating the scanner.
prepare_scan_cost: Duration,
}
/// Metrics for [UnorderedScan].
// We print all fields in logs so we disable the dead_code lint.
#[allow(dead_code)]
#[derive(Debug, Default)]
struct Metrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build parts.
build_parts_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration of the scan.
total_cost: Duration,
/// Number of batches returned.
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
Ok(())
}
/// Builds [ScanPart]s without preserving order. It distributes file ranges and memtables
@@ -344,12 +269,15 @@ impl FileRangeCollector for UnorderedDistributor {
impl UnorderedDistributor {
/// Distributes file ranges and memtables across partitions according to the `parallelism`.
/// The output number of parts may be `<= parallelism`.
///
/// [ScanPart] created by this distributor only contains one group of file ranges.
fn build_parts(self, memtables: &[MemtableRef], parallelism: usize) -> Vec<ScanPart> {
if parallelism <= 1 {
// Returns a single part.
let part = ScanPart {
memtables: memtables.to_vec(),
file_ranges: self.file_ranges,
file_ranges: smallvec![self.file_ranges],
time_range: None,
};
return vec![part];
}
@@ -368,17 +296,19 @@ impl UnorderedDistributor {
.chunks(mems_per_part)
.map(|mems| ScanPart {
memtables: mems.to_vec(),
file_ranges: Vec::new(),
file_ranges: smallvec![Vec::new()], // Ensures there is always one group.
time_range: None,
})
.collect::<Vec<_>>();
for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() {
if i == scan_parts.len() {
scan_parts.push(ScanPart {
memtables: Vec::new(),
file_ranges: ranges.to_vec(),
file_ranges: smallvec![ranges.to_vec()],
time_range: None,
});
} else {
scan_parts[i].file_ranges = ranges.to_vec();
scan_parts[i].file_ranges = smallvec![ranges.to_vec()];
}
}

View File

@@ -82,6 +82,15 @@ impl FromStr for FileId {
/// Time range of a SST file.
pub type FileTimeRange = (Timestamp, Timestamp);
/// Checks if two inclusive timestamp ranges overlap with each other.
pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
let (_, l_end) = l;
let (r_start, _) = r;
r_start <= l_end
}
/// Metadata of a SST file.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]

View File

@@ -28,6 +28,7 @@ use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result};
use crate::read::compat::CompatBatch;
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec};
use crate::sst::file::FileHandle;
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext};
@@ -72,6 +73,11 @@ impl FileRange {
pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
self.context.compat_batch()
}
/// Returns the file handle of the file range.
pub(crate) fn file_handle(&self) -> &FileHandle {
self.context.reader_builder.file_handle()
}
}
/// Context shared by ranges of the same parquet SST.

View File

@@ -62,7 +62,7 @@ use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
/// Parquet SST reader builder.
pub(crate) struct ParquetReaderBuilder {
pub struct ParquetReaderBuilder {
/// SST directory.
file_dir: String,
file_handle: FileHandle,
@@ -138,7 +138,7 @@ impl ParquetReaderBuilder {
/// Attaches the index applier to the builder.
#[must_use]
pub fn index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
pub(crate) fn index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
self.index_applier = index_applier;
self
}
@@ -570,6 +570,11 @@ impl RowGroupReaderBuilder {
&self.file_path
}
/// Handle of the file to read.
pub(crate) fn file_handle(&self) -> &FileHandle {
&self.file_handle
}
/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
pub(crate) async fn build(
&self,

View File

@@ -19,6 +19,7 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{Row, Rows, SemanticType};
use common_time::Timestamp;
use datatypes::arrow::array::UInt64Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::scalars::ScalarVector;
@@ -42,12 +43,26 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
pub(crate) struct EmptyMemtable {
/// Id of this memtable.
id: MemtableId,
/// Time range to return.
time_range: Option<(Timestamp, Timestamp)>,
}
impl EmptyMemtable {
/// Returns a new memtable with specific `id`.
pub(crate) fn new(id: MemtableId) -> EmptyMemtable {
EmptyMemtable { id }
EmptyMemtable {
id,
time_range: None,
}
}
/// Attaches the time range to the memtable.
pub(crate) fn with_time_range(
mut self,
time_range: Option<(Timestamp, Timestamp)>,
) -> EmptyMemtable {
self.time_range = time_range;
self
}
}
@@ -81,7 +96,7 @@ impl Memtable for EmptyMemtable {
}
fn stats(&self) -> MemtableStats {
MemtableStats::default()
MemtableStats::default().with_time_range(self.time_range)
}
fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef {

View File

@@ -50,9 +50,16 @@ impl RegionScanExec {
pub fn new(scanner: RegionScannerRef) -> Self {
let arrow_schema = scanner.schema().arrow_schema().clone();
let scanner_props = scanner.properties();
let mut num_output_partition = scanner_props.partitioning().num_partitions();
// The meaning of word "partition" is different in different context. For datafusion
// it's about "parallelism" and for storage it's about "data range". Thus here we add
// a special case to handle the situation where the number of storage partition is 0.
if num_output_partition == 0 {
num_output_partition = 1;
}
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(scanner_props.partitioning().num_partitions()),
Partitioning::UnknownPartitioning(num_output_partition),
ExecutionMode::Bounded,
);
Self {
@@ -122,9 +129,9 @@ impl ExecutionPlan for RegionScanExec {
}
impl DisplayAs for RegionScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
// The scanner contains all information needed to display the plan.
write!(f, "{:?}", self.scanner)
self.scanner.fmt_as(t, f)
}
}

View File

@@ -652,7 +652,7 @@ CREATE TABLE {table_name} (
let request = Request::Query(QueryRequest {
query: Some(Query::Sql(
"SELECT ts, a, b FROM auto_created_table".to_string(),
"SELECT ts, a, b FROM auto_created_table order by ts".to_string(),
)),
});
let output = query(instance, request.clone()).await;

View File

@@ -35,7 +35,7 @@ explain analyze SELECT count(*) FROM system_metrics;
|_|_|_CoalescePartitionsExec REDACTED
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 1_|
+-+-+-+

View File

@@ -25,6 +25,7 @@ INSERT INTO test1 values (3, 3, DEFAULT), (4, 4, '2024-01-31 00:01:01');
Affected Rows: 2
-- SQLNESS SORT_RESULT 3 1
SELECT i, ts1 FROM test1;
+---+---------------------+
@@ -49,6 +50,7 @@ INSERT INTO test1 values (5, 5, DEFAULT, DEFAULT), (6, 6, DEFAULT, '2024-01-31 0
Affected Rows: 2
-- SQLNESS SORT_RESULT 3 1
SELECT i, ts1, ts2 FROM test1;
+---+---------------------+---------------------+

View File

@@ -11,6 +11,7 @@ ALTER TABLE test1 ADD COLUMN ts1 TIMESTAMP DEFAULT '2024-01-30 00:01:01' PRIMARY
INSERT INTO test1 values (3, 3, DEFAULT), (4, 4, '2024-01-31 00:01:01');
-- SQLNESS SORT_RESULT 3 1
SELECT i, ts1 FROM test1;
SET time_zone = 'Asia/Shanghai';
@@ -20,6 +21,7 @@ ALTER TABLE test1 ADD COLUMN ts2 TIMESTAMP DEFAULT '2024-01-30 00:01:01' PRIMARY
INSERT INTO test1 values (5, 5, DEFAULT, DEFAULT), (6, 6, DEFAULT, '2024-01-31 00:01:01');
-- SQLNESS SORT_RESULT 3 1
SELECT i, ts1, ts2 FROM test1;
SET time_zone = 'UTC';

View File

@@ -108,6 +108,7 @@ DESC TABLE t;
| m | Int32 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM t;
+---+---+-------------------------+---+---+---+
@@ -155,6 +156,7 @@ DESC TABLE t;
| m | Int32 | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM t;
+---+---+---+-------------------------+---+---+---+---+

View File

@@ -29,6 +29,7 @@ ALTER TABLE t ADD COLUMN y INTEGER AFTER j;
DESC TABLE t;
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM t;
-- SQLNESS ARG restart=true
@@ -40,6 +41,7 @@ ALTER TABLE t ADD COLUMN b INTEGER AFTER j;
DESC TABLE t;
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM t;
ALTER TABLE t ADD COLUMN x int xxx;

View File

@@ -39,6 +39,7 @@ INSERT INTO test VALUES (3, "greptime", 3, true);
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
+----+----------+-------------------------+-------+
@@ -64,6 +65,7 @@ ALTER TABLE test MODIFY I INTEGER;
Affected Rows: 0
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
+----+---+-------------------------+-------+

View File

@@ -16,12 +16,14 @@ SELECT * FROM test;
INSERT INTO test VALUES (3, "greptime", 3, true);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
DESCRIBE test;
ALTER TABLE test MODIFY I INTEGER;
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
DESCRIBE test;

View File

@@ -23,6 +23,7 @@ INSERT INTO test VALUES (3);
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
+-------------------------+

View File

@@ -8,6 +8,7 @@ ALTER TABLE test DROP COLUMN j;
INSERT INTO test VALUES (3);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
DROP TABLE test;

View File

@@ -28,6 +28,7 @@ INSERT INTO test VALUES (3, 13);
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
+-------------------------+----+

View File

@@ -11,6 +11,7 @@ INSERT INTO test VALUES (3, NULL);
INSERT INTO test VALUES (3, 13);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test;
DROP TABLE test;

View File

@@ -74,7 +74,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s';
| 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED
|_|_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
| 1_| 0_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 10_|
+-+-+-+

View File

@@ -30,7 +30,7 @@ TQL ANALYZE (0, 10, '5s') test;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -59,7 +59,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -87,7 +87,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+
@@ -117,7 +117,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test;
|_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED
|_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED
|_|_|_RepartitionExec: partitioning=REDACTED
|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED
|_|_|_|
|_|_| Total rows: 4_|
+-+-+-+