refactor: scan Batches directly (#6369)

* refactor: scan `Batch`es directly

Signed-off-by: luofucong <luofc@foxmail.com>

* fix ci

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-06-24 15:55:49 +08:00
committed by GitHub
parent 24019334ee
commit bb12be3310
10 changed files with 476 additions and 162 deletions

View File

@@ -101,6 +101,7 @@ use crate::manifest::action::RegionEdit;
use crate::memtable::MemtableStats;
use crate::metrics::HANDLE_REQUEST_ELAPSED;
use crate::read::scan_region::{ScanRegion, Scanner};
use crate::read::stream::ScanBatchStream;
use crate::region::MitoRegionRef;
use crate::request::{RegionEditRequest, WorkerRequest};
use crate::sst::file::FileMeta;
@@ -183,6 +184,18 @@ impl MitoEngine {
.await
}
/// Scan [`Batch`]es by [`ScanRequest`].
pub async fn scan_batch(
&self,
region_id: RegionId,
request: ScanRequest,
filter_deleted: bool,
) -> Result<ScanBatchStream> {
let mut scan_region = self.scan_region(region_id, request)?;
scan_region.set_filter_deleted(filter_deleted);
scan_region.scanner().await?.scan_batch()
}
/// Returns a scanner to scan for `request`.
async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner().await

View File

@@ -21,11 +21,12 @@ pub mod merge;
pub mod plain_batch;
pub mod projection;
pub(crate) mod prune;
pub(crate) mod range;
pub(crate) mod scan_region;
pub(crate) mod scan_util;
pub mod range;
pub mod scan_region;
pub mod scan_util;
pub(crate) mod seq_scan;
pub(crate) mod series_scan;
pub mod series_scan;
pub mod stream;
pub(crate) mod unordered_scan;
use std::collections::{HashMap, HashSet};
@@ -41,12 +42,14 @@ use datatypes::arrow::array::{Array, ArrayRef, UInt64Array};
use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::types::TimestampType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
Vector, VectorRef,
TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector,
UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector,
VectorRef,
};
use futures::stream::BoxStream;
use futures::TryStreamExt;
@@ -161,6 +164,19 @@ impl Batch {
self.sequences.len()
}
/// Create an empty [`Batch`].
pub(crate) fn empty() -> Self {
Self {
primary_key: vec![],
pk_values: None,
timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()),
sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()),
op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()),
fields: vec![],
fields_idx: None,
}
}
/// Returns true if the number of rows in the batch is 0.
pub fn is_empty(&self) -> bool {
self.num_rows() == 0
@@ -1011,8 +1027,6 @@ pub(crate) struct ScannerMetrics {
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Number of batches returned.
@@ -1048,7 +1062,8 @@ mod tests {
#[test]
fn test_empty_batch() {
let batch = new_batch(&[], &[], &[], &[]);
let batch = Batch::empty();
assert!(batch.is_empty());
assert_eq!(None, batch.first_timestamp());
assert_eq!(None, batch.last_timestamp());
assert_eq!(None, batch.first_sequence());

View File

@@ -30,7 +30,7 @@ use datafusion_common::Column;
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::Expr;
use smallvec::SmallVec;
use store_api::metadata::RegionMetadata;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
@@ -48,6 +48,7 @@ use crate::read::projection::ProjectionMapper;
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::seq_scan::SeqScan;
use crate::read::series_scan::SeriesScan;
use crate::read::stream::ScanBatchStream;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
use crate::region::options::MergeMode;
@@ -82,6 +83,15 @@ impl Scanner {
Scanner::Series(series_scan) => series_scan.build_stream().await,
}
}
/// Create a stream of [`Batch`] by this scanner.
pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
match self {
Scanner::Seq(x) => x.scan_all_partitions(),
Scanner::Unordered(x) => x.scan_all_partitions(),
Scanner::Series(x) => x.scan_all_partitions(),
}
}
}
#[cfg(test)]
@@ -259,7 +269,6 @@ impl ScanRegion {
self
}
#[cfg(test)]
pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
self.filter_deleted = filter_deleted;
}
@@ -897,6 +906,10 @@ impl ScanInput {
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}
pub fn region_metadata(&self) -> &RegionMetadataRef {
self.mapper.metadata()
}
}
#[cfg(test)]

View File

@@ -45,8 +45,6 @@ struct ScanMetricsSet {
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration to convert batches.
convert_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Duration of the scan.
@@ -111,7 +109,6 @@ impl fmt::Debug for ScanMetricsSet {
prepare_scan_cost,
build_reader_cost,
scan_cost,
convert_cost,
yield_cost,
total_cost,
num_rows,
@@ -145,7 +142,6 @@ impl fmt::Debug for ScanMetricsSet {
"{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \
\"build_reader_cost\":\"{build_reader_cost:?}\", \
\"scan_cost\":\"{scan_cost:?}\", \
\"convert_cost\":\"{convert_cost:?}\", \
\"yield_cost\":\"{yield_cost:?}\", \
\"total_cost\":\"{total_cost:?}\", \
\"num_rows\":{num_rows}, \
@@ -188,7 +184,6 @@ impl ScanMetricsSet {
prepare_scan_cost,
build_reader_cost,
scan_cost,
convert_cost,
yield_cost,
num_batches,
num_rows,
@@ -199,7 +194,6 @@ impl ScanMetricsSet {
self.prepare_scan_cost += *prepare_scan_cost;
self.build_reader_cost += *build_reader_cost;
self.scan_cost += *scan_cost;
self.convert_cost += *convert_cost;
self.yield_cost += *yield_cost;
self.num_rows += *num_rows;
self.num_batches += *num_batches;
@@ -274,9 +268,6 @@ impl ScanMetricsSet {
READ_STAGE_ELAPSED
.with_label_values(&["build_reader"])
.observe(self.build_reader_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["convert_rb"])
.observe(self.convert_cost.as_secs_f64());
READ_STAGE_ELAPSED
.with_label_values(&["scan"])
.observe(self.scan_cost.as_secs_f64());
@@ -348,6 +339,8 @@ struct PartitionMetricsInner {
scan_cost: Time,
/// Duration while waiting for `yield`.
yield_cost: Time,
/// Duration to convert [`Batch`]es.
convert_cost: Time,
}
impl PartitionMetricsInner {
@@ -367,8 +360,8 @@ impl Drop for PartitionMetricsInner {
self.in_progress_scan.dec();
debug!(
"{} finished, region_id: {}, partition: {}, metrics: {:?}",
self.scanner_type, self.region_id, self.partition, metrics
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost,
);
}
}
@@ -400,7 +393,7 @@ impl PartitionMetricsList {
/// Metrics while reading a partition.
#[derive(Clone)]
pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
pub struct PartitionMetrics(Arc<PartitionMetricsInner>);
impl PartitionMetrics {
pub(crate) fn new(
@@ -427,6 +420,7 @@ impl PartitionMetrics {
.subset_time("build_reader_cost", partition),
scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
};
Self(Arc::new(inner))
}
@@ -441,7 +435,7 @@ impl PartitionMetrics {
metrics.num_mem_ranges += num;
}
pub(crate) fn inc_num_file_ranges(&self, num: usize) {
pub fn inc_num_file_ranges(&self, num: usize) {
let mut metrics = self.0.metrics.lock().unwrap();
metrics.num_file_ranges += num;
}
@@ -454,6 +448,10 @@ impl PartitionMetrics {
metrics.build_reader_cost += cost;
}
pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) {
self.0.convert_cost.add_duration(cost);
}
/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
self.0

View File

@@ -20,14 +20,14 @@ use std::time::Instant;
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::tracing;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use futures::StreamExt;
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use store_api::storage::TimeSeriesRowSelector;
@@ -42,7 +42,8 @@ use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
};
use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
use crate::region::options::MergeMode;
/// Scans a region and returns rows in a sorted sequence.
@@ -93,6 +94,20 @@ impl SeqScan {
Ok(Box::pin(aggr_stream))
}
/// Scan [`Batch`] in all partitions one by one.
pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
let metrics_set = ExecutionPlanMetricsSet::new();
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = self.new_partition_metrics(&metrics_set, partition);
self.scan_batch_in_partition(partition, metrics)
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::pin(futures::stream::iter(streams).flatten()))
}
/// Builds a [BoxedBatchReader] from sequential scan for compaction.
///
/// # Panics
@@ -196,23 +211,40 @@ impl SeqScan {
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.partitions.len() {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
.build(),
));
}
) -> Result<SendableRecordBatchStream> {
let metrics = self.new_partition_metrics(metrics_set, partition);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
let input = &self.stream_ctx.input;
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
input.mapper.clone(),
input.cache_strategy.clone(),
metrics,
);
Ok(Box::pin(RecordBatchStreamWrapper::new(
input.mapper.output_schema(),
Box::pin(record_batch_stream),
)))
}
fn scan_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.partitions.len(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
);
if self.properties.partitions[partition].is_empty() {
return Ok(Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
common_recordbatch::EmptyRecordBatchStream::new(
self.stream_ctx.input.mapper.output_schema(),
),
)));
return Ok(Box::pin(futures::stream::empty()));
}
let stream_ctx = self.stream_ctx.clone();
@@ -220,7 +252,6 @@ impl SeqScan {
let partition_ranges = self.properties.partitions[partition].clone();
let compaction = self.compaction;
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(metrics_set, partition);
let stream = try_stream! {
part_metrics.on_first_poll();
@@ -245,21 +276,13 @@ impl SeqScan {
let mut fetch_start = Instant::now();
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cache = &stream_ctx.input.cache_strategy;
.await?;
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
.with_end(Some(part_range.end));
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
while let Some(batch) = reader.next_batch().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
@@ -278,11 +301,8 @@ impl SeqScan {
&batch,
);
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
yield ScanBatch::Normal(batch);
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
@@ -292,7 +312,7 @@ impl SeqScan {
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
yield ScanBatch::Normal(Batch::empty());
metrics.yield_cost += yield_start.elapsed();
}
@@ -302,13 +322,7 @@ impl SeqScan {
part_metrics.on_finish();
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
Ok(Box::pin(stream))
}
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
@@ -368,6 +382,7 @@ impl RegionScanner for SeqScan {
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
.map_err(BoxedError::new)
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {

View File

@@ -20,13 +20,12 @@ use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::util::ChainedRecordBatchStream;
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::compute::concat_batches;
use datatypes::schema::SchemaRef;
use futures::StreamExt;
use smallvec::{smallvec, SmallVec};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -36,13 +35,14 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::Semaphore;
use crate::error::{
ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result,
ScanMultiTimesSnafu, ScanSeriesSnafu,
Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
ScanSeriesSnafu,
};
use crate::read::range::RangeBuilderList;
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics};
use crate::read::seq_scan::{build_sources, SeqScan};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, ScannerMetrics};
/// Timeout to send a batch to a sender.
@@ -89,71 +89,65 @@ impl SeriesScan {
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.num_partitions() {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.num_partitions(),
}
.build(),
));
}
) -> Result<SendableRecordBatchStream> {
let metrics =
new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone(), metrics_set)?;
let input = &self.stream_ctx.input;
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
input.mapper.clone(),
input.cache_strategy.clone(),
metrics,
);
Ok(Box::pin(RecordBatchStreamWrapper::new(
input.mapper.output_schema(),
Box::pin(record_batch_stream),
)))
}
fn scan_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
metrics_set: &ExecutionPlanMetricsSet,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.num_partitions(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.num_partitions(),
}
);
self.maybe_start_distributor(metrics_set, &self.metrics_list);
let part_metrics =
new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list);
let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?;
let stream_ctx = self.stream_ctx.clone();
let mut receiver = self.take_receiver(partition)?;
let stream = try_stream! {
part_metrics.on_first_poll();
let cache = &stream_ctx.input.cache_strategy;
let mut df_record_batches = Vec::new();
let mut fetch_start = Instant::now();
while let Some(result) = receiver.recv().await {
while let Some(series) = receiver.recv().await {
let series = series?;
let mut metrics = ScannerMetrics::default();
let series = result.map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.scan_cost += fetch_start.elapsed();
fetch_start = Instant::now();
let convert_start = Instant::now();
df_record_batches.reserve(series.batches.len());
for batch in series.batches {
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
df_record_batches.push(record_batch.into_df_record_batch());
}
let output_schema = stream_ctx.input.mapper.output_schema();
let df_record_batch =
concat_batches(output_schema.arrow_schema(), &df_record_batches)
.context(ComputeArrowSnafu)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
df_record_batches.clear();
let record_batch =
RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?;
metrics.convert_cost += convert_start.elapsed();
metrics.num_batches += series.batches.len();
metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::<usize>();
let yield_start = Instant::now();
yield record_batch;
yield ScanBatch::Series(series);
metrics.yield_cost += yield_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
Ok(Box::pin(stream))
}
/// Takes the receiver for the partition.
@@ -201,6 +195,26 @@ impl SeriesScan {
let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
Ok(Box::pin(chained_stream))
}
/// Scan [`Batch`] in all partitions one by one.
pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
let metrics_set = ExecutionPlanMetricsSet::new();
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = new_partition_metrics(
&self.stream_ctx,
&metrics_set,
partition,
&self.metrics_list,
);
self.scan_batch_in_partition(partition, metrics, &metrics_set)
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::pin(futures::stream::iter(streams).flatten()))
}
}
fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) {
@@ -232,6 +246,7 @@ impl RegionScanner for SeriesScan {
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
.map_err(BoxedError::new)
}
fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
@@ -393,8 +408,8 @@ impl SeriesDistributor {
/// Batches of the same series.
#[derive(Default)]
struct SeriesBatch {
batches: SmallVec<[Batch; 4]>,
pub struct SeriesBatch {
pub batches: SmallVec<[Batch; 4]>,
}
impl SeriesBatch {

View File

@@ -0,0 +1,120 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::compute;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use crate::cache::CacheStrategy;
use crate::error::Result;
use crate::read::projection::ProjectionMapper;
use crate::read::scan_util::PartitionMetrics;
use crate::read::series_scan::SeriesBatch;
use crate::read::Batch;
/// All kinds of [`Batch`]es to produce in scanner.
pub enum ScanBatch {
Normal(Batch),
Series(SeriesBatch),
}
pub type ScanBatchStream = BoxStream<'static, Result<ScanBatch>>;
/// A stream that takes [`ScanBatch`]es and produces (converts them to) [`RecordBatch`]es.
pub(crate) struct ConvertBatchStream {
inner: ScanBatchStream,
projection_mapper: Arc<ProjectionMapper>,
cache_strategy: CacheStrategy,
partition_metrics: PartitionMetrics,
buffer: Vec<DfRecordBatch>,
}
impl ConvertBatchStream {
pub(crate) fn new(
inner: ScanBatchStream,
projection_mapper: Arc<ProjectionMapper>,
cache_strategy: CacheStrategy,
partition_metrics: PartitionMetrics,
) -> Self {
Self {
inner,
projection_mapper,
cache_strategy,
partition_metrics,
buffer: Vec::new(),
}
}
fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result<RecordBatch> {
match batch {
ScanBatch::Normal(batch) => {
if batch.is_empty() {
Ok(self.projection_mapper.empty_record_batch())
} else {
self.projection_mapper.convert(&batch, &self.cache_strategy)
}
}
ScanBatch::Series(series) => {
self.buffer.clear();
self.buffer.reserve(series.batches.len());
for batch in series.batches {
let record_batch = self
.projection_mapper
.convert(&batch, &self.cache_strategy)?;
self.buffer.push(record_batch.into_df_record_batch());
}
let output_schema = self.projection_mapper.output_schema();
let record_batch =
compute::concat_batches(output_schema.arrow_schema(), &self.buffer)
.context(ArrowComputeSnafu)?;
RecordBatch::try_from_df_record_batch(output_schema, record_batch)
}
}
}
}
impl Stream for ConvertBatchStream {
type Item = common_recordbatch::error::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let batch = futures::ready!(self.inner.poll_next_unpin(cx));
let Some(batch) = batch else {
return Poll::Ready(None);
};
let record_batch = match batch {
Ok(batch) => {
let start = Instant::now();
let record_batch = self.convert(batch);
self.partition_metrics
.inc_convert_batch_cost(start.elapsed());
record_batch
}
Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu),
};
Poll::Ready(Some(record_batch))
}
}

View File

@@ -20,13 +20,12 @@ use std::time::Instant;
use async_stream::{stream, try_stream};
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use snafu::ensure;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
@@ -36,6 +35,7 @@ use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
};
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::read::{Batch, ScannerMetrics};
/// Scans a region without providing any output ordering guarantee.
@@ -124,21 +124,25 @@ impl UnorderedScan {
}
}
fn scan_partition_impl(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
if partition >= self.properties.partitions.len() {
return Err(BoxedError::new(
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
.build(),
));
}
/// Scan [`Batch`] in all partitions one by one.
pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
let metrics_set = ExecutionPlanMetricsSet::new();
let streams = (0..self.properties.partitions.len())
.map(|partition| {
let metrics = self.partition_metrics(partition, &metrics_set);
self.scan_batch_in_partition(partition, metrics)
})
.collect::<Result<Vec<_>>>()?;
Ok(Box::pin(futures::stream::iter(streams).flatten()))
}
fn partition_metrics(
&self,
partition: usize,
metrics_set: &ExecutionPlanMetricsSet,
) -> PartitionMetrics {
let part_metrics = PartitionMetrics::new(
self.stream_ctx.input.mapper.metadata().region_id,
partition,
@@ -147,6 +151,45 @@ impl UnorderedScan {
metrics_set,
);
self.metrics_list.set(partition, part_metrics.clone());
part_metrics
}
fn scan_partition_impl(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let metrics = self.partition_metrics(partition, metrics_set);
let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
let input = &self.stream_ctx.input;
let record_batch_stream = ConvertBatchStream::new(
batch_stream,
input.mapper.clone(),
input.cache_strategy.clone(),
metrics,
);
Ok(Box::pin(RecordBatchStreamWrapper::new(
input.mapper.output_schema(),
Box::pin(record_batch_stream),
)))
}
fn scan_batch_in_partition(
&self,
partition: usize,
part_metrics: PartitionMetrics,
) -> Result<ScanBatchStream> {
ensure!(
partition < self.properties.partitions.len(),
PartitionOutOfRangeSnafu {
given: partition,
all: self.properties.partitions.len(),
}
);
let stream_ctx = self.stream_ctx.clone();
let part_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range;
@@ -154,7 +197,6 @@ impl UnorderedScan {
let stream = try_stream! {
part_metrics.on_first_poll();
let cache = &stream_ctx.input.cache_strategy;
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
@@ -175,7 +217,7 @@ impl UnorderedScan {
range_builder_list.clone(),
);
for await batch in stream {
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
let batch = batch?;
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
@@ -194,11 +236,8 @@ impl UnorderedScan {
&batch,
);
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
yield ScanBatch::Normal(batch);
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
@@ -208,22 +247,15 @@ impl UnorderedScan {
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
yield ScanBatch::Normal(Batch::empty());
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
}
part_metrics.on_finish();
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
Ok(Box::pin(stream))
}
}
@@ -251,6 +283,7 @@ impl RegionScanner for UnorderedScan {
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scan_partition_impl(metrics_set, partition)
.map_err(BoxedError::new)
}
fn has_predicate(&self) -> bool {

View File

@@ -16,11 +16,12 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use arrow_flight::FlightData;
use common_error::ext::ErrorExt;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::tracing::{info_span, Instrument};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::warn;
use common_telemetry::{error, warn};
use futures::channel::mpsc;
use futures::channel::mpsc::Sender;
use futures::{SinkExt, Stream, StreamExt};
@@ -90,6 +91,10 @@ impl FlightRecordBatchStream {
}
}
Err(e) => {
if e.status_code().should_log_error() {
error!("{e:?}");
}
let e = Err(e).context(error::CollectRecordbatchSnafu);
if let Err(e) = tx.send(e.map_err(|x| x.into())).await {
warn!(e; "stop sending Flight data");

View File

@@ -67,14 +67,34 @@ pub struct ScanRequest {
impl Display for ScanRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ScanRequest {{")?;
enum Delimiter {
None,
Init,
}
impl Delimiter {
fn as_str(&mut self) -> &str {
match self {
Delimiter::None => {
*self = Delimiter::Init;
""
}
Delimiter::Init => ", ",
}
}
}
let mut delimiter = Delimiter::None;
write!(f, "ScanRequest {{ ")?;
if let Some(projection) = &self.projection {
write!(f, "projection: {:?},", projection)?;
write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?;
}
if !self.filters.is_empty() {
write!(
f,
", filters: [{}]",
"{}filters: [{}]",
delimiter.as_str(),
self.filters
.iter()
.map(|f| f.to_string())
@@ -83,23 +103,90 @@ impl Display for ScanRequest {
)?;
}
if let Some(output_ordering) = &self.output_ordering {
write!(f, ", output_ordering: {:?}", output_ordering)?;
write!(
f,
"{}output_ordering: {:?}",
delimiter.as_str(),
output_ordering
)?;
}
if let Some(limit) = &self.limit {
write!(f, ", limit: {}", limit)?;
write!(f, "{}limit: {}", delimiter.as_str(), limit)?;
}
if let Some(series_row_selector) = &self.series_row_selector {
write!(f, ", series_row_selector: {}", series_row_selector)?;
write!(
f,
"{}series_row_selector: {}",
delimiter.as_str(),
series_row_selector
)?;
}
if let Some(sequence) = &self.sequence {
write!(f, ", sequence: {}", sequence)?;
write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?;
}
if let Some(sst_min_sequence) = &self.sst_min_sequence {
write!(f, ", sst_min_sequence: {}", sst_min_sequence)?;
write!(
f,
"{}sst_min_sequence: {}",
delimiter.as_str(),
sst_min_sequence
)?;
}
if let Some(distribution) = &self.distribution {
write!(f, ", distribution: {}", distribution)?;
write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?;
}
write!(f, "}}")
write!(f, " }}")
}
}
#[cfg(test)]
mod tests {
use datafusion_expr::{binary_expr, col, lit, Operator};
use super::*;
#[test]
fn test_display_scan_request() {
let request = ScanRequest {
..Default::default()
};
assert_eq!(request.to_string(), "ScanRequest { }");
let request = ScanRequest {
projection: Some(vec![1, 2]),
filters: vec![
binary_expr(col("i"), Operator::Gt, lit(1)),
binary_expr(col("s"), Operator::Eq, lit("x")),
],
limit: Some(10),
..Default::default()
};
assert_eq!(
request.to_string(),
r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
);
let request = ScanRequest {
filters: vec![
binary_expr(col("i"), Operator::Gt, lit(1)),
binary_expr(col("s"), Operator::Eq, lit("x")),
],
limit: Some(10),
..Default::default()
};
assert_eq!(
request.to_string(),
r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"#
);
let request = ScanRequest {
projection: Some(vec![1, 2]),
limit: Some(10),
..Default::default()
};
assert_eq!(
request.to_string(),
"ScanRequest { projection: [1, 2], limit: 10 }"
);
}
}