mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
Compare commits
14 Commits
flow/add_a
...
feat/serie
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3a02effa7 | ||
|
|
52f9fc25ba | ||
|
|
214a16565a | ||
|
|
21790a607e | ||
|
|
b33d8c1bad | ||
|
|
916e1c2d9e | ||
|
|
96ba00d175 | ||
|
|
7173401732 | ||
|
|
17c797a6d0 | ||
|
|
c44ba1aa69 | ||
|
|
843d33f9d0 | ||
|
|
b74e2a7d9b | ||
|
|
4a79c1527d | ||
|
|
b7a6ff9cc3 |
@@ -710,8 +710,8 @@ pub enum Error {
|
|||||||
error: std::io::Error,
|
error: std::io::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Failed to filter record batch"))]
|
#[snafu(display("Record batch error"))]
|
||||||
FilterRecordBatch {
|
RecordBatch {
|
||||||
source: common_recordbatch::error::Error,
|
source: common_recordbatch::error::Error,
|
||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
@@ -1021,6 +1021,20 @@ pub enum Error {
|
|||||||
#[snafu(implicit)]
|
#[snafu(implicit)]
|
||||||
location: Location,
|
location: Location,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Failed to scan series"))]
|
||||||
|
ScanSeries {
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
source: Arc<Error>,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Partition {} scan multiple times", partition))]
|
||||||
|
ScanMultiTimes {
|
||||||
|
partition: usize,
|
||||||
|
#[snafu(implicit)]
|
||||||
|
location: Location,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
@@ -1143,7 +1157,7 @@ impl ErrorExt for Error {
|
|||||||
|
|
||||||
External { source, .. } => source.status_code(),
|
External { source, .. } => source.status_code(),
|
||||||
|
|
||||||
FilterRecordBatch { source, .. } => source.status_code(),
|
RecordBatch { source, .. } => source.status_code(),
|
||||||
|
|
||||||
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
Download { .. } | Upload { .. } => StatusCode::StorageUnavailable,
|
||||||
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
||||||
@@ -1172,6 +1186,10 @@ impl ErrorExt for Error {
|
|||||||
ManualCompactionOverride {} => StatusCode::Cancelled,
|
ManualCompactionOverride {} => StatusCode::Cancelled,
|
||||||
|
|
||||||
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,
|
||||||
|
|
||||||
|
ScanSeries { source, .. } => source.status_code(),
|
||||||
|
|
||||||
|
ScanMultiTimes { .. } => StatusCode::InvalidArguments,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ pub(crate) mod range;
|
|||||||
pub(crate) mod scan_region;
|
pub(crate) mod scan_region;
|
||||||
pub(crate) mod scan_util;
|
pub(crate) mod scan_util;
|
||||||
pub(crate) mod seq_scan;
|
pub(crate) mod seq_scan;
|
||||||
|
pub(crate) mod series_scan;
|
||||||
pub(crate) mod unordered_scan;
|
pub(crate) mod unordered_scan;
|
||||||
|
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ use datatypes::arrow::array::BooleanArray;
|
|||||||
use datatypes::arrow::buffer::BooleanBuffer;
|
use datatypes::arrow::buffer::BooleanBuffer;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
use crate::error::{FilterRecordBatchSnafu, Result};
|
use crate::error::{RecordBatchSnafu, Result};
|
||||||
use crate::memtable::BoxedBatchIterator;
|
use crate::memtable::BoxedBatchIterator;
|
||||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||||
use crate::read::{Batch, BatchReader};
|
use crate::read::{Batch, BatchReader};
|
||||||
@@ -201,7 +201,7 @@ impl PruneTimeIterator {
|
|||||||
for filter in filters.iter() {
|
for filter in filters.iter() {
|
||||||
let result = filter
|
let result = filter
|
||||||
.evaluate_vector(batch.timestamps())
|
.evaluate_vector(batch.timestamps())
|
||||||
.context(FilterRecordBatchSnafu)?;
|
.context(RecordBatchSnafu)?;
|
||||||
mask = mask.bitand(&result);
|
mask = mask.bitand(&result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ use crate::read::compat::{self, CompatBatch};
|
|||||||
use crate::read::projection::ProjectionMapper;
|
use crate::read::projection::ProjectionMapper;
|
||||||
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
|
||||||
use crate::read::seq_scan::SeqScan;
|
use crate::read::seq_scan::SeqScan;
|
||||||
|
use crate::read::series_scan::SeriesScan;
|
||||||
use crate::read::unordered_scan::UnorderedScan;
|
use crate::read::unordered_scan::UnorderedScan;
|
||||||
use crate::read::{Batch, Source};
|
use crate::read::{Batch, Source};
|
||||||
use crate::region::options::MergeMode;
|
use crate::region::options::MergeMode;
|
||||||
@@ -66,6 +67,8 @@ pub(crate) enum Scanner {
|
|||||||
Seq(SeqScan),
|
Seq(SeqScan),
|
||||||
/// Unordered scan.
|
/// Unordered scan.
|
||||||
Unordered(UnorderedScan),
|
Unordered(UnorderedScan),
|
||||||
|
/// Per-series scan.
|
||||||
|
Series(SeriesScan),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Scanner {
|
impl Scanner {
|
||||||
@@ -75,6 +78,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
|
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
|
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
|
||||||
|
Scanner::Series(series_scan) => series_scan.build_stream().await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -86,6 +90,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
|
Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.input().num_files(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,6 +99,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
|
Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.input().num_memtables(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,6 +108,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
|
Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.input().file_ids(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,6 +120,7 @@ impl Scanner {
|
|||||||
match self {
|
match self {
|
||||||
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
|
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
|
||||||
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
|
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
|
||||||
|
Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -248,7 +256,9 @@ impl ScanRegion {
|
|||||||
|
|
||||||
/// Returns a [Scanner] to scan the region.
|
/// Returns a [Scanner] to scan the region.
|
||||||
pub(crate) fn scanner(self) -> Result<Scanner> {
|
pub(crate) fn scanner(self) -> Result<Scanner> {
|
||||||
if self.use_unordered_scan() {
|
if self.use_series_scan() {
|
||||||
|
self.series_scan().map(Scanner::Series)
|
||||||
|
} else if self.use_unordered_scan() {
|
||||||
// If table is append only and there is no series row selector, we use unordered scan in query.
|
// If table is append only and there is no series row selector, we use unordered scan in query.
|
||||||
// We still use seq scan in compaction.
|
// We still use seq scan in compaction.
|
||||||
self.unordered_scan().map(Scanner::Unordered)
|
self.unordered_scan().map(Scanner::Unordered)
|
||||||
@@ -260,7 +270,9 @@ impl ScanRegion {
|
|||||||
/// Returns a [RegionScanner] to scan the region.
|
/// Returns a [RegionScanner] to scan the region.
|
||||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||||
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
|
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
|
||||||
if self.use_unordered_scan() {
|
if self.use_series_scan() {
|
||||||
|
self.series_scan().map(|scanner| Box::new(scanner) as _)
|
||||||
|
} else if self.use_unordered_scan() {
|
||||||
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
|
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
|
||||||
} else {
|
} else {
|
||||||
self.seq_scan().map(|scanner| Box::new(scanner) as _)
|
self.seq_scan().map(|scanner| Box::new(scanner) as _)
|
||||||
@@ -279,6 +291,12 @@ impl ScanRegion {
|
|||||||
Ok(UnorderedScan::new(input))
|
Ok(UnorderedScan::new(input))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Scans by series.
|
||||||
|
pub(crate) fn series_scan(self) -> Result<SeriesScan> {
|
||||||
|
let input = self.scan_input(true)?;
|
||||||
|
Ok(SeriesScan::new(input))
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
|
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
|
||||||
let input = self.scan_input(false)?;
|
let input = self.scan_input(false)?;
|
||||||
@@ -299,6 +317,11 @@ impl ScanRegion {
|
|||||||
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
|
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the region can use series scan for current request.
|
||||||
|
fn use_series_scan(&self) -> bool {
|
||||||
|
self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
|
||||||
|
}
|
||||||
|
|
||||||
/// Creates a scan input.
|
/// Creates a scan input.
|
||||||
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
|
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
|
||||||
let time_range = self.build_time_range_predicate();
|
let time_range = self.build_time_range_predicate();
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef;
|
|||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use store_api::metadata::RegionMetadataRef;
|
use store_api::metadata::RegionMetadataRef;
|
||||||
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||||
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
|
use store_api::storage::TimeSeriesRowSelector;
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
use crate::error::{PartitionOutOfRangeSnafu, Result};
|
||||||
@@ -149,7 +149,7 @@ impl SeqScan {
|
|||||||
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
|
/// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
|
||||||
/// if possible.
|
/// if possible.
|
||||||
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
|
||||||
async fn build_reader_from_sources(
|
pub(crate) async fn build_reader_from_sources(
|
||||||
stream_ctx: &StreamContext,
|
stream_ctx: &StreamContext,
|
||||||
mut sources: Vec<Source>,
|
mut sources: Vec<Source>,
|
||||||
semaphore: Option<Arc<Semaphore>>,
|
semaphore: Option<Arc<Semaphore>>,
|
||||||
@@ -207,10 +207,6 @@ impl SeqScan {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
|
||||||
return self.scan_partition_by_series(metrics_set, partition);
|
|
||||||
}
|
|
||||||
|
|
||||||
let stream_ctx = self.stream_ctx.clone();
|
let stream_ctx = self.stream_ctx.clone();
|
||||||
let semaphore = self.new_semaphore();
|
let semaphore = self.new_semaphore();
|
||||||
let partition_ranges = self.properties.partitions[partition].clone();
|
let partition_ranges = self.properties.partitions[partition].clone();
|
||||||
@@ -237,14 +233,14 @@ impl SeqScan {
|
|||||||
&mut sources,
|
&mut sources,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
let mut reader =
|
let mut reader =
|
||||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(BoxedError::new)
|
.map_err(BoxedError::new)
|
||||||
.context(ExternalSnafu)?;
|
.context(ExternalSnafu)?;
|
||||||
let cache = &stream_ctx.input.cache_strategy;
|
let cache = &stream_ctx.input.cache_strategy;
|
||||||
let mut metrics = ScannerMetrics::default();
|
|
||||||
let mut fetch_start = Instant::now();
|
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
let mut checker = crate::read::BatchChecker::default()
|
let mut checker = crate::read::BatchChecker::default()
|
||||||
.with_start(Some(part_range.start))
|
.with_start(Some(part_range.start))
|
||||||
@@ -307,97 +303,6 @@ impl SeqScan {
|
|||||||
Ok(stream)
|
Ok(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Scans all ranges in the given partition and merge by time series.
|
|
||||||
/// Otherwise the returned stream might not contains any data.
|
|
||||||
fn scan_partition_by_series(
|
|
||||||
&self,
|
|
||||||
metrics_set: &ExecutionPlanMetricsSet,
|
|
||||||
partition: usize,
|
|
||||||
) -> Result<SendableRecordBatchStream, BoxedError> {
|
|
||||||
let stream_ctx = self.stream_ctx.clone();
|
|
||||||
let semaphore = self.new_semaphore();
|
|
||||||
let partition_ranges = self.properties.partitions[partition].clone();
|
|
||||||
let distinguish_range = self.properties.distinguish_partition_range;
|
|
||||||
let part_metrics = self.new_partition_metrics(metrics_set, partition);
|
|
||||||
debug_assert!(!self.compaction);
|
|
||||||
|
|
||||||
let stream = try_stream! {
|
|
||||||
part_metrics.on_first_poll();
|
|
||||||
|
|
||||||
let range_builder_list = Arc::new(RangeBuilderList::new(
|
|
||||||
stream_ctx.input.num_memtables(),
|
|
||||||
stream_ctx.input.num_files(),
|
|
||||||
));
|
|
||||||
// Scans all parts.
|
|
||||||
let mut sources = Vec::with_capacity(partition_ranges.len());
|
|
||||||
for part_range in partition_ranges {
|
|
||||||
build_sources(
|
|
||||||
&stream_ctx,
|
|
||||||
&part_range,
|
|
||||||
false,
|
|
||||||
&part_metrics,
|
|
||||||
range_builder_list.clone(),
|
|
||||||
&mut sources,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Builds a reader that merge sources from all parts.
|
|
||||||
let mut reader =
|
|
||||||
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
|
|
||||||
.await
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(ExternalSnafu)?;
|
|
||||||
let cache = &stream_ctx.input.cache_strategy;
|
|
||||||
let mut metrics = ScannerMetrics::default();
|
|
||||||
let mut fetch_start = Instant::now();
|
|
||||||
|
|
||||||
while let Some(batch) = reader
|
|
||||||
.next_batch()
|
|
||||||
.await
|
|
||||||
.map_err(BoxedError::new)
|
|
||||||
.context(ExternalSnafu)?
|
|
||||||
{
|
|
||||||
metrics.scan_cost += fetch_start.elapsed();
|
|
||||||
metrics.num_batches += 1;
|
|
||||||
metrics.num_rows += batch.num_rows();
|
|
||||||
|
|
||||||
debug_assert!(!batch.is_empty());
|
|
||||||
if batch.is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let convert_start = Instant::now();
|
|
||||||
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
|
||||||
metrics.convert_cost += convert_start.elapsed();
|
|
||||||
let yield_start = Instant::now();
|
|
||||||
yield record_batch;
|
|
||||||
metrics.yield_cost += yield_start.elapsed();
|
|
||||||
|
|
||||||
fetch_start = Instant::now();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Yields an empty part to indicate this range is terminated.
|
|
||||||
// The query engine can use this to optimize some queries.
|
|
||||||
if distinguish_range {
|
|
||||||
let yield_start = Instant::now();
|
|
||||||
yield stream_ctx.input.mapper.empty_record_batch();
|
|
||||||
metrics.yield_cost += yield_start.elapsed();
|
|
||||||
}
|
|
||||||
|
|
||||||
metrics.scan_cost += fetch_start.elapsed();
|
|
||||||
part_metrics.merge_metrics(&metrics);
|
|
||||||
|
|
||||||
part_metrics.on_finish();
|
|
||||||
};
|
|
||||||
|
|
||||||
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
|
||||||
self.stream_ctx.input.mapper.output_schema(),
|
|
||||||
Box::pin(stream),
|
|
||||||
));
|
|
||||||
|
|
||||||
Ok(stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
|
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
|
||||||
if self.properties.target_partitions() > self.properties.num_partitions() {
|
if self.properties.target_partitions() > self.properties.num_partitions() {
|
||||||
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
|
||||||
@@ -498,7 +403,7 @@ impl fmt::Debug for SeqScan {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Builds sources for the partition range and push them to the `sources` vector.
|
/// Builds sources for the partition range and push them to the `sources` vector.
|
||||||
fn build_sources(
|
pub(crate) fn build_sources(
|
||||||
stream_ctx: &Arc<StreamContext>,
|
stream_ctx: &Arc<StreamContext>,
|
||||||
part_range: &PartitionRange,
|
part_range: &PartitionRange,
|
||||||
compaction: bool,
|
compaction: bool,
|
||||||
@@ -509,8 +414,8 @@ fn build_sources(
|
|||||||
// Gets range meta.
|
// Gets range meta.
|
||||||
let range_meta = &stream_ctx.ranges[part_range.identifier];
|
let range_meta = &stream_ctx.ranges[part_range.identifier];
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
|
if compaction {
|
||||||
// Compaction or per series distribution expects input sources are not been split.
|
// Compaction expects input sources are not been split.
|
||||||
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
|
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
|
||||||
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
|
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
|
||||||
// It should scan all row groups.
|
// It should scan all row groups.
|
||||||
|
|||||||
496
src/mito2/src/read/series_scan.rs
Normal file
496
src/mito2/src/read/series_scan.rs
Normal file
@@ -0,0 +1,496 @@
|
|||||||
|
// Copyright 2023 Greptime Team
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
//! Per-series scan implementation.
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use async_stream::{stream, try_stream};
|
||||||
|
use common_error::ext::BoxedError;
|
||||||
|
use common_recordbatch::error::ExternalSnafu;
|
||||||
|
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
|
||||||
|
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
|
||||||
|
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
|
||||||
|
use datatypes::compute::concat_batches;
|
||||||
|
use datatypes::schema::SchemaRef;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use smallvec::{smallvec, SmallVec};
|
||||||
|
use snafu::{ensure, OptionExt, ResultExt};
|
||||||
|
use store_api::metadata::RegionMetadataRef;
|
||||||
|
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
|
||||||
|
use tokio::sync::mpsc::error::SendTimeoutError;
|
||||||
|
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
|
use crate::error::{
|
||||||
|
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
|
||||||
|
ScanMultiTimesSnafu, ScanSeriesSnafu,
|
||||||
|
};
|
||||||
|
use crate::read::range::RangeBuilderList;
|
||||||
|
use crate::read::scan_region::{ScanInput, StreamContext};
|
||||||
|
use crate::read::scan_util::PartitionMetrics;
|
||||||
|
use crate::read::seq_scan::{build_sources, SeqScan};
|
||||||
|
use crate::read::{Batch, ScannerMetrics};
|
||||||
|
|
||||||
|
/// Timeout to send a batch to a sender.
|
||||||
|
const SEND_TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
|
/// List of receivers.
|
||||||
|
type ReceiverList = Vec<Option<Receiver<Result<SeriesBatch>>>>;
|
||||||
|
|
||||||
|
/// Scans a region and returns sorted rows of a series in the same partition.
|
||||||
|
///
|
||||||
|
/// The output order is always order by `(primary key, time index)` inside every
|
||||||
|
/// partition.
|
||||||
|
/// Always returns the same series (primary key) to the same partition.
|
||||||
|
pub struct SeriesScan {
|
||||||
|
/// Properties of the scanner.
|
||||||
|
properties: ScannerProperties,
|
||||||
|
/// Context of streams.
|
||||||
|
stream_ctx: Arc<StreamContext>,
|
||||||
|
/// Receivers of each partition.
|
||||||
|
receivers: Mutex<ReceiverList>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesScan {
|
||||||
|
/// Creates a new [SeriesScan].
|
||||||
|
pub(crate) fn new(input: ScanInput) -> Self {
|
||||||
|
let mut properties = ScannerProperties::default()
|
||||||
|
.with_append_mode(input.append_mode)
|
||||||
|
.with_total_rows(input.total_rows());
|
||||||
|
let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, false));
|
||||||
|
properties.partitions = vec![stream_ctx.partition_ranges()];
|
||||||
|
|
||||||
|
Self {
|
||||||
|
properties,
|
||||||
|
stream_ctx,
|
||||||
|
receivers: Mutex::new(Vec::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_partition_impl(
|
||||||
|
&self,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
partition: usize,
|
||||||
|
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||||
|
if partition >= self.properties.num_partitions() {
|
||||||
|
return Err(BoxedError::new(
|
||||||
|
PartitionOutOfRangeSnafu {
|
||||||
|
given: partition,
|
||||||
|
all: self.properties.num_partitions(),
|
||||||
|
}
|
||||||
|
.build(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
self.maybe_start_distributor(metrics_set);
|
||||||
|
|
||||||
|
let part_metrics = new_partition_metrics(&self.stream_ctx, metrics_set, partition);
|
||||||
|
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
|
||||||
|
let stream_ctx = self.stream_ctx.clone();
|
||||||
|
|
||||||
|
let stream = try_stream! {
|
||||||
|
part_metrics.on_first_poll();
|
||||||
|
|
||||||
|
let cache = &stream_ctx.input.cache_strategy;
|
||||||
|
let mut df_record_batches = Vec::new();
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
|
while let Some(result) = receiver.recv().await {
|
||||||
|
let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
|
||||||
|
|
||||||
|
let convert_start = Instant::now();
|
||||||
|
df_record_batches.reserve(series.batches.len());
|
||||||
|
for batch in series.batches {
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
metrics.num_batches += 1;
|
||||||
|
metrics.num_rows += batch.num_rows();
|
||||||
|
|
||||||
|
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
|
||||||
|
df_record_batches.push(record_batch.into_df_record_batch());
|
||||||
|
}
|
||||||
|
|
||||||
|
let output_schema = stream_ctx.input.mapper.output_schema();
|
||||||
|
let df_record_batch =
|
||||||
|
concat_batches(output_schema.arrow_schema(), &df_record_batches)
|
||||||
|
.context(ComputeArrowSnafu)
|
||||||
|
.map_err(BoxedError::new)
|
||||||
|
.context(ExternalSnafu)?;
|
||||||
|
df_record_batches.clear();
|
||||||
|
let record_batch =
|
||||||
|
RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
|
||||||
|
metrics.convert_cost += convert_start.elapsed();
|
||||||
|
|
||||||
|
let yield_start = Instant::now();
|
||||||
|
yield record_batch;
|
||||||
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
|
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
part_metrics.merge_metrics(&metrics);
|
||||||
|
fetch_start = Instant::now();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||||
|
self.stream_ctx.input.mapper.output_schema(),
|
||||||
|
Box::pin(stream),
|
||||||
|
));
|
||||||
|
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes the receiver for the partition.
|
||||||
|
fn take_receiver(&self, partition: usize) -> Result<Receiver<Result<SeriesBatch>>> {
|
||||||
|
let mut rx_list = self.receivers.lock().unwrap();
|
||||||
|
rx_list[partition]
|
||||||
|
.take()
|
||||||
|
.context(ScanMultiTimesSnafu { partition })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Starts the distributor if the receiver list is empty.
|
||||||
|
fn maybe_start_distributor(&self, metrics_set: &ExecutionPlanMetricsSet) {
|
||||||
|
let mut rx_list = self.receivers.lock().unwrap();
|
||||||
|
if !rx_list.is_empty() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (senders, receivers) = new_channel_list(self.properties.num_partitions());
|
||||||
|
let mut distributor = SeriesDistributor {
|
||||||
|
stream_ctx: self.stream_ctx.clone(),
|
||||||
|
semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
|
||||||
|
partitions: self.properties.partitions.clone(),
|
||||||
|
senders,
|
||||||
|
metrics_set: metrics_set.clone(),
|
||||||
|
};
|
||||||
|
common_runtime::spawn_global(async move {
|
||||||
|
distributor.execute().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
*rx_list = receivers;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(yingwen): Reuse codes.
|
||||||
|
/// Scans the region and returns a stream.
|
||||||
|
pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||||
|
let part_num = self.properties.num_partitions();
|
||||||
|
let metrics_set = ExecutionPlanMetricsSet::default();
|
||||||
|
let streams = (0..part_num)
|
||||||
|
.map(|i| self.scan_partition(&metrics_set, i))
|
||||||
|
.collect::<Result<Vec<_>, BoxedError>>()?;
|
||||||
|
let stream = stream! {
|
||||||
|
for mut stream in streams {
|
||||||
|
while let Some(rb) = stream.next().await {
|
||||||
|
yield rb;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let stream = Box::pin(RecordBatchStreamWrapper::new(
|
||||||
|
self.schema(),
|
||||||
|
Box::pin(stream),
|
||||||
|
));
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
|
||||||
|
let (senders, receivers): (Vec<_>, Vec<_>) = (0..num_partitions)
|
||||||
|
.map(|_| {
|
||||||
|
let (sender, receiver) = mpsc::channel(1);
|
||||||
|
(Some(sender), Some(receiver))
|
||||||
|
})
|
||||||
|
.unzip();
|
||||||
|
(SenderList::new(senders), receivers)
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RegionScanner for SeriesScan {
|
||||||
|
fn properties(&self) -> &ScannerProperties {
|
||||||
|
&self.properties
|
||||||
|
}
|
||||||
|
|
||||||
|
fn schema(&self) -> SchemaRef {
|
||||||
|
self.stream_ctx.input.mapper.output_schema()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn metadata(&self) -> RegionMetadataRef {
|
||||||
|
self.stream_ctx.input.mapper.metadata().clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_partition(
|
||||||
|
&self,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
partition: usize,
|
||||||
|
) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||||
|
self.scan_partition_impl(metrics_set, partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
|
||||||
|
self.properties.prepare(request);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_predicate(&self) -> bool {
|
||||||
|
let predicate = self.stream_ctx.input.predicate();
|
||||||
|
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_logical_region(&mut self, logical_region: bool) {
|
||||||
|
self.properties.set_logical_region(logical_region);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DisplayAs for SeriesScan {
|
||||||
|
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"SeriesScan: region={}, ",
|
||||||
|
self.stream_ctx.input.mapper.metadata().region_id
|
||||||
|
)?;
|
||||||
|
match t {
|
||||||
|
DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
|
||||||
|
DisplayFormatType::Verbose => self.stream_ctx.format_for_explain(true, f),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for SeriesScan {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("SeriesScan")
|
||||||
|
.field("num_ranges", &self.stream_ctx.ranges.len())
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
impl SeriesScan {
|
||||||
|
/// Returns the input.
|
||||||
|
pub(crate) fn input(&self) -> &ScanInput {
|
||||||
|
&self.stream_ctx.input
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The distributor scans series and distributes them to different partitions.
|
||||||
|
struct SeriesDistributor {
|
||||||
|
/// Context for the scan stream.
|
||||||
|
stream_ctx: Arc<StreamContext>,
|
||||||
|
/// Optional semaphore for limiting the number of concurrent scans.
|
||||||
|
semaphore: Option<Arc<Semaphore>>,
|
||||||
|
/// Partition ranges to scan.
|
||||||
|
partitions: Vec<Vec<PartitionRange>>,
|
||||||
|
/// Senders of all partitions.
|
||||||
|
senders: SenderList,
|
||||||
|
/// Metrics set to report.
|
||||||
|
/// The distributor report the metrics as an addtional partition.
|
||||||
|
/// This may double the scan cost of the [SeriesScan] metrics. We can
|
||||||
|
/// get per-partition metrics in verbose mode to see the metrics of the
|
||||||
|
/// distributor.
|
||||||
|
metrics_set: ExecutionPlanMetricsSet,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesDistributor {
|
||||||
|
/// Executes the distributor.
|
||||||
|
async fn execute(&mut self) {
|
||||||
|
if let Err(e) = self.scan_partitions().await {
|
||||||
|
self.senders.send_error(e).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Scans all parts.
|
||||||
|
async fn scan_partitions(&mut self) -> Result<()> {
|
||||||
|
let part_metrics =
|
||||||
|
new_partition_metrics(&self.stream_ctx, &self.metrics_set, self.partitions.len());
|
||||||
|
part_metrics.on_first_poll();
|
||||||
|
|
||||||
|
let range_builder_list = Arc::new(RangeBuilderList::new(
|
||||||
|
self.stream_ctx.input.num_memtables(),
|
||||||
|
self.stream_ctx.input.num_files(),
|
||||||
|
));
|
||||||
|
// Scans all parts.
|
||||||
|
let mut sources = Vec::with_capacity(self.partitions.len());
|
||||||
|
for partition in &self.partitions {
|
||||||
|
sources.reserve(partition.len());
|
||||||
|
for part_range in partition {
|
||||||
|
build_sources(
|
||||||
|
&self.stream_ctx,
|
||||||
|
&part_range,
|
||||||
|
false,
|
||||||
|
&part_metrics,
|
||||||
|
range_builder_list.clone(),
|
||||||
|
&mut sources,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Builds a reader that merge sources from all parts.
|
||||||
|
let mut reader =
|
||||||
|
SeqScan::build_reader_from_sources(&self.stream_ctx, sources, self.semaphore.clone())
|
||||||
|
.await?;
|
||||||
|
let mut metrics = ScannerMetrics::default();
|
||||||
|
let mut fetch_start = Instant::now();
|
||||||
|
|
||||||
|
let mut current_series = SeriesBatch::default();
|
||||||
|
while let Some(batch) = reader.next_batch().await? {
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
metrics.num_batches += 1;
|
||||||
|
metrics.num_rows += batch.num_rows();
|
||||||
|
|
||||||
|
debug_assert!(!batch.is_empty());
|
||||||
|
if batch.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(last_key) = current_series.current_key() else {
|
||||||
|
current_series.push(batch);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
if last_key == batch.primary_key() {
|
||||||
|
current_series.push(batch);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// We find a new series, send the current one.
|
||||||
|
let to_send = std::mem::replace(&mut current_series, SeriesBatch::single(batch));
|
||||||
|
let yield_start = Instant::now();
|
||||||
|
self.senders.send_batch(to_send).await?;
|
||||||
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
|
|
||||||
|
fetch_start = Instant::now();
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo: if not empty
|
||||||
|
if !current_series.is_empty() {
|
||||||
|
let yield_start = Instant::now();
|
||||||
|
self.senders.send_batch(current_series).await?;
|
||||||
|
metrics.yield_cost += yield_start.elapsed();
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics.scan_cost += fetch_start.elapsed();
|
||||||
|
part_metrics.merge_metrics(&metrics);
|
||||||
|
|
||||||
|
part_metrics.on_finish();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Batches of the same series.
|
||||||
|
#[derive(Default)]
|
||||||
|
struct SeriesBatch {
|
||||||
|
batches: SmallVec<[Batch; 4]>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SeriesBatch {
|
||||||
|
/// Creates a new [SeriesBatch] from a single [Batch].
|
||||||
|
fn single(batch: Batch) -> Self {
|
||||||
|
Self {
|
||||||
|
batches: smallvec![batch],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn current_key(&self) -> Option<&[u8]> {
|
||||||
|
self.batches.first().map(|batch| batch.primary_key())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push(&mut self, batch: Batch) {
|
||||||
|
self.batches.push(batch);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if there is no batch.
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.batches.is_empty()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List of senders.
|
||||||
|
struct SenderList {
|
||||||
|
senders: Vec<Option<Sender<Result<SeriesBatch>>>>,
|
||||||
|
/// Number of None senders.
|
||||||
|
num_nones: usize,
|
||||||
|
/// Index of the current partition to send.
|
||||||
|
sender_idx: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SenderList {
|
||||||
|
fn new(senders: Vec<Option<Sender<Result<SeriesBatch>>>>) -> Self {
|
||||||
|
let num_nones = senders.iter().filter(|sender| sender.is_none()).count();
|
||||||
|
Self {
|
||||||
|
senders,
|
||||||
|
num_nones,
|
||||||
|
sender_idx: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Finds a partition and sends the batch to the partition.
|
||||||
|
async fn send_batch(&mut self, mut batch: SeriesBatch) -> Result<()> {
|
||||||
|
loop {
|
||||||
|
ensure!(self.num_nones < self.senders.len(), InvalidSenderSnafu);
|
||||||
|
|
||||||
|
let sender_idx = self.fetch_add_sender_idx();
|
||||||
|
let Some(sender) = &self.senders[sender_idx] else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
// Adds a timeout to avoid blocking indefinitely and sending
|
||||||
|
// the batch in a round-robin fashion when some partitions
|
||||||
|
// don't poll their inputs. This may happen if we have a
|
||||||
|
// node like sort merging. But it is rare when we are using SeriesScan.
|
||||||
|
match sender.send_timeout(Ok(batch), SEND_TIMEOUT).await {
|
||||||
|
Ok(()) => break,
|
||||||
|
Err(SendTimeoutError::Timeout(res)) => {
|
||||||
|
// Safety: we send Ok.
|
||||||
|
batch = res.unwrap();
|
||||||
|
}
|
||||||
|
Err(SendTimeoutError::Closed(res)) => {
|
||||||
|
self.senders[sender_idx] = None;
|
||||||
|
self.num_nones += 1;
|
||||||
|
// Safety: we send Ok.
|
||||||
|
batch = res.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_error(&self, error: Error) {
|
||||||
|
let error = Arc::new(error);
|
||||||
|
for sender in &self.senders {
|
||||||
|
if let Some(sender) = sender {
|
||||||
|
let result = Err(error.clone()).context(ScanSeriesSnafu);
|
||||||
|
let _ = sender.send(result).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fetch_add_sender_idx(&mut self) -> usize {
|
||||||
|
let sender_idx = self.sender_idx;
|
||||||
|
self.sender_idx = (self.sender_idx + 1) % self.senders.len();
|
||||||
|
sender_idx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new_partition_metrics(
|
||||||
|
stream_ctx: &StreamContext,
|
||||||
|
metrics_set: &ExecutionPlanMetricsSet,
|
||||||
|
partition: usize,
|
||||||
|
) -> PartitionMetrics {
|
||||||
|
PartitionMetrics::new(
|
||||||
|
stream_ctx.input.mapper.metadata().region_id,
|
||||||
|
partition,
|
||||||
|
"SeriesScan",
|
||||||
|
stream_ctx.query_start,
|
||||||
|
metrics_set,
|
||||||
|
)
|
||||||
|
}
|
||||||
@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
|
|||||||
use store_api::storage::TimeSeriesRowSelector;
|
use store_api::storage::TimeSeriesRowSelector;
|
||||||
|
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
|
DecodeStatsSnafu, FieldTypeMismatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
|
||||||
};
|
};
|
||||||
use crate::read::compat::CompatBatch;
|
use crate::read::compat::CompatBatch;
|
||||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||||
@@ -286,7 +286,7 @@ impl RangeBase {
|
|||||||
if filter
|
if filter
|
||||||
.filter()
|
.filter()
|
||||||
.evaluate_scalar(&pk_value)
|
.evaluate_scalar(&pk_value)
|
||||||
.context(FilterRecordBatchSnafu)?
|
.context(RecordBatchSnafu)?
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
@@ -303,12 +303,12 @@ impl RangeBase {
|
|||||||
filter
|
filter
|
||||||
.filter()
|
.filter()
|
||||||
.evaluate_vector(field_col)
|
.evaluate_vector(field_col)
|
||||||
.context(FilterRecordBatchSnafu)?
|
.context(RecordBatchSnafu)?
|
||||||
}
|
}
|
||||||
SemanticType::Timestamp => filter
|
SemanticType::Timestamp => filter
|
||||||
.filter()
|
.filter()
|
||||||
.evaluate_vector(input.timestamps())
|
.evaluate_vector(input.timestamps())
|
||||||
.context(FilterRecordBatchSnafu)?,
|
.context(RecordBatchSnafu)?,
|
||||||
};
|
};
|
||||||
|
|
||||||
mask = mask.bitand(&result);
|
mask = mask.bitand(&result);
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ impl ParallelizeScan {
|
|||||||
} else if let Some(region_scan_exec) =
|
} else if let Some(region_scan_exec) =
|
||||||
plan.as_any().downcast_ref::<RegionScanExec>()
|
plan.as_any().downcast_ref::<RegionScanExec>()
|
||||||
{
|
{
|
||||||
|
let expected_partition_num = config.execution.target_partitions;
|
||||||
if region_scan_exec.is_partition_set() {
|
if region_scan_exec.is_partition_set() {
|
||||||
return Ok(Transformed::no(plan));
|
return Ok(Transformed::no(plan));
|
||||||
}
|
}
|
||||||
@@ -71,12 +72,17 @@ impl ParallelizeScan {
|
|||||||
region_scan_exec.distribution(),
|
region_scan_exec.distribution(),
|
||||||
Some(TimeSeriesDistribution::PerSeries)
|
Some(TimeSeriesDistribution::PerSeries)
|
||||||
) {
|
) {
|
||||||
return Ok(Transformed::no(plan));
|
let partition_range = region_scan_exec.get_partition_ranges();
|
||||||
|
let mut new_partitions = vec![vec![]; expected_partition_num];
|
||||||
|
new_partitions[0] = partition_range;
|
||||||
|
let new_plan = region_scan_exec
|
||||||
|
.with_new_partitions(new_partitions, expected_partition_num)
|
||||||
|
.map_err(|e| DataFusionError::External(e.into_inner()))?;
|
||||||
|
return Ok(Transformed::yes(Arc::new(new_plan)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let ranges = region_scan_exec.get_partition_ranges();
|
let ranges = region_scan_exec.get_partition_ranges();
|
||||||
let total_range_num = ranges.len();
|
let total_range_num = ranges.len();
|
||||||
let expected_partition_num = config.execution.target_partitions;
|
|
||||||
|
|
||||||
// assign ranges to each partition
|
// assign ranges to each partition
|
||||||
let mut partition_ranges =
|
let mut partition_ranges =
|
||||||
|
|||||||
Reference in New Issue
Block a user