diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index e41a8b9c62..011cf4136c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -101,6 +101,7 @@ use crate::manifest::action::RegionEdit; use crate::memtable::MemtableStats; use crate::metrics::HANDLE_REQUEST_ELAPSED; use crate::read::scan_region::{ScanRegion, Scanner}; +use crate::read::stream::ScanBatchStream; use crate::region::MitoRegionRef; use crate::request::{RegionEditRequest, WorkerRequest}; use crate::sst::file::FileMeta; @@ -183,6 +184,18 @@ impl MitoEngine { .await } + /// Scan [`Batch`]es by [`ScanRequest`]. + pub async fn scan_batch( + &self, + region_id: RegionId, + request: ScanRequest, + filter_deleted: bool, + ) -> Result { + let mut scan_region = self.scan_region(region_id, request)?; + scan_region.set_filter_deleted(filter_deleted); + scan_region.scanner().await?.scan_batch() + } + /// Returns a scanner to scan for `request`. async fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result { self.scan_region(region_id, request)?.scanner().await diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 5b62fcfc40..8c10ceb8f2 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -21,11 +21,12 @@ pub mod merge; pub mod plain_batch; pub mod projection; pub(crate) mod prune; -pub(crate) mod range; -pub(crate) mod scan_region; -pub(crate) mod scan_util; +pub mod range; +pub mod scan_region; +pub mod scan_util; pub(crate) mod seq_scan; -pub(crate) mod series_scan; +pub mod series_scan; +pub mod stream; pub(crate) mod unordered_scan; use std::collections::{HashMap, HashSet}; @@ -41,12 +42,14 @@ use datatypes::arrow::array::{Array, ArrayRef, UInt64Array}; use datatypes::arrow::compute::SortOptions; use datatypes::arrow::row::{RowConverter, SortField}; use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; +use datatypes::scalars::ScalarVectorBuilder; use datatypes::types::TimestampType; use datatypes::value::{Value, ValueRef}; use datatypes::vectors::{ BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector, - TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, - Vector, VectorRef, + TimestampMillisecondVectorBuilder, TimestampNanosecondVector, TimestampSecondVector, + UInt32Vector, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, Vector, + VectorRef, }; use futures::stream::BoxStream; use futures::TryStreamExt; @@ -161,6 +164,19 @@ impl Batch { self.sequences.len() } + /// Create an empty [`Batch`]. + pub(crate) fn empty() -> Self { + Self { + primary_key: vec![], + pk_values: None, + timestamps: Arc::new(TimestampMillisecondVectorBuilder::with_capacity(0).finish()), + sequences: Arc::new(UInt64VectorBuilder::with_capacity(0).finish()), + op_types: Arc::new(UInt8VectorBuilder::with_capacity(0).finish()), + fields: vec![], + fields_idx: None, + } + } + /// Returns true if the number of rows in the batch is 0. pub fn is_empty(&self) -> bool { self.num_rows() == 0 @@ -1011,8 +1027,6 @@ pub(crate) struct ScannerMetrics { build_reader_cost: Duration, /// Duration to scan data. scan_cost: Duration, - /// Duration to convert batches. - convert_cost: Duration, /// Duration while waiting for `yield`. yield_cost: Duration, /// Number of batches returned. @@ -1048,7 +1062,8 @@ mod tests { #[test] fn test_empty_batch() { - let batch = new_batch(&[], &[], &[], &[]); + let batch = Batch::empty(); + assert!(batch.is_empty()); assert_eq!(None, batch.first_timestamp()); assert_eq!(None, batch.last_timestamp()); assert_eq!(None, batch.first_sequence()); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index b03e3289b4..ad74b525b0 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -30,7 +30,7 @@ use datafusion_common::Column; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::Expr; use smallvec::SmallVec; -use store_api::metadata::RegionMetadata; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; @@ -48,6 +48,7 @@ use crate::read::projection::ProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; +use crate::read::stream::ScanBatchStream; use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, Source}; use crate::region::options::MergeMode; @@ -82,6 +83,15 @@ impl Scanner { Scanner::Series(series_scan) => series_scan.build_stream().await, } } + + /// Create a stream of [`Batch`] by this scanner. + pub(crate) fn scan_batch(&self) -> Result { + match self { + Scanner::Seq(x) => x.scan_all_partitions(), + Scanner::Unordered(x) => x.scan_all_partitions(), + Scanner::Series(x) => x.scan_all_partitions(), + } + } } #[cfg(test)] @@ -259,7 +269,6 @@ impl ScanRegion { self } - #[cfg(test)] pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) { self.filter_deleted = filter_deleted; } @@ -897,6 +906,10 @@ impl ScanInput { pub(crate) fn num_files(&self) -> usize { self.files.len() } + + pub fn region_metadata(&self) -> &RegionMetadataRef { + self.mapper.metadata() + } } #[cfg(test)] diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index e24e633511..5e965cb83c 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -45,8 +45,6 @@ struct ScanMetricsSet { build_reader_cost: Duration, /// Duration to scan data. scan_cost: Duration, - /// Duration to convert batches. - convert_cost: Duration, /// Duration while waiting for `yield`. yield_cost: Duration, /// Duration of the scan. @@ -111,7 +109,6 @@ impl fmt::Debug for ScanMetricsSet { prepare_scan_cost, build_reader_cost, scan_cost, - convert_cost, yield_cost, total_cost, num_rows, @@ -145,7 +142,6 @@ impl fmt::Debug for ScanMetricsSet { "{{\"prepare_scan_cost\":\"{prepare_scan_cost:?}\", \ \"build_reader_cost\":\"{build_reader_cost:?}\", \ \"scan_cost\":\"{scan_cost:?}\", \ - \"convert_cost\":\"{convert_cost:?}\", \ \"yield_cost\":\"{yield_cost:?}\", \ \"total_cost\":\"{total_cost:?}\", \ \"num_rows\":{num_rows}, \ @@ -188,7 +184,6 @@ impl ScanMetricsSet { prepare_scan_cost, build_reader_cost, scan_cost, - convert_cost, yield_cost, num_batches, num_rows, @@ -199,7 +194,6 @@ impl ScanMetricsSet { self.prepare_scan_cost += *prepare_scan_cost; self.build_reader_cost += *build_reader_cost; self.scan_cost += *scan_cost; - self.convert_cost += *convert_cost; self.yield_cost += *yield_cost; self.num_rows += *num_rows; self.num_batches += *num_batches; @@ -274,9 +268,6 @@ impl ScanMetricsSet { READ_STAGE_ELAPSED .with_label_values(&["build_reader"]) .observe(self.build_reader_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["convert_rb"]) - .observe(self.convert_cost.as_secs_f64()); READ_STAGE_ELAPSED .with_label_values(&["scan"]) .observe(self.scan_cost.as_secs_f64()); @@ -348,6 +339,8 @@ struct PartitionMetricsInner { scan_cost: Time, /// Duration while waiting for `yield`. yield_cost: Time, + /// Duration to convert [`Batch`]es. + convert_cost: Time, } impl PartitionMetricsInner { @@ -367,8 +360,8 @@ impl Drop for PartitionMetricsInner { self.in_progress_scan.dec(); debug!( - "{} finished, region_id: {}, partition: {}, metrics: {:?}", - self.scanner_type, self.region_id, self.partition, metrics + "{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}", + self.scanner_type, self.region_id, self.partition, metrics, self.convert_cost, ); } } @@ -400,7 +393,7 @@ impl PartitionMetricsList { /// Metrics while reading a partition. #[derive(Clone)] -pub(crate) struct PartitionMetrics(Arc); +pub struct PartitionMetrics(Arc); impl PartitionMetrics { pub(crate) fn new( @@ -427,6 +420,7 @@ impl PartitionMetrics { .subset_time("build_reader_cost", partition), scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition), yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition), + convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition), }; Self(Arc::new(inner)) } @@ -441,7 +435,7 @@ impl PartitionMetrics { metrics.num_mem_ranges += num; } - pub(crate) fn inc_num_file_ranges(&self, num: usize) { + pub fn inc_num_file_ranges(&self, num: usize) { let mut metrics = self.0.metrics.lock().unwrap(); metrics.num_file_ranges += num; } @@ -454,6 +448,10 @@ impl PartitionMetrics { metrics.build_reader_cost += cost; } + pub(crate) fn inc_convert_batch_cost(&self, cost: Duration) { + self.0.convert_cost.add_duration(cost); + } + /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`. pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) { self.0 diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index e48ca633da..1e61c9bdb1 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -20,14 +20,14 @@ use std::time::Instant; use async_stream::try_stream; use common_error::ext::BoxedError; -use common_recordbatch::error::ExternalSnafu; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::tracing; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; -use snafu::ResultExt; +use futures::StreamExt; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; use store_api::storage::TimeSeriesRowSelector; @@ -42,7 +42,8 @@ use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, }; -use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; +use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; +use crate::read::{Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source}; use crate::region::options::MergeMode; /// Scans a region and returns rows in a sorted sequence. @@ -93,6 +94,20 @@ impl SeqScan { Ok(Box::pin(aggr_stream)) } + /// Scan [`Batch`] in all partitions one by one. + pub(crate) fn scan_all_partitions(&self) -> Result { + let metrics_set = ExecutionPlanMetricsSet::new(); + + let streams = (0..self.properties.partitions.len()) + .map(|partition| { + let metrics = self.new_partition_metrics(&metrics_set, partition); + self.scan_batch_in_partition(partition, metrics) + }) + .collect::>>()?; + + Ok(Box::pin(futures::stream::iter(streams).flatten())) + } + /// Builds a [BoxedBatchReader] from sequential scan for compaction. /// /// # Panics @@ -196,23 +211,40 @@ impl SeqScan { &self, metrics_set: &ExecutionPlanMetricsSet, partition: usize, - ) -> Result { - if partition >= self.properties.partitions.len() { - return Err(BoxedError::new( - PartitionOutOfRangeSnafu { - given: partition, - all: self.properties.partitions.len(), - } - .build(), - )); - } + ) -> Result { + let metrics = self.new_partition_metrics(metrics_set, partition); + + let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?; + + let input = &self.stream_ctx.input; + let record_batch_stream = ConvertBatchStream::new( + batch_stream, + input.mapper.clone(), + input.cache_strategy.clone(), + metrics, + ); + + Ok(Box::pin(RecordBatchStreamWrapper::new( + input.mapper.output_schema(), + Box::pin(record_batch_stream), + ))) + } + + fn scan_batch_in_partition( + &self, + partition: usize, + part_metrics: PartitionMetrics, + ) -> Result { + ensure!( + partition < self.properties.partitions.len(), + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + ); + if self.properties.partitions[partition].is_empty() { - return Ok(Box::pin(RecordBatchStreamWrapper::new( - self.stream_ctx.input.mapper.output_schema(), - common_recordbatch::EmptyRecordBatchStream::new( - self.stream_ctx.input.mapper.output_schema(), - ), - ))); + return Ok(Box::pin(futures::stream::empty())); } let stream_ctx = self.stream_ctx.clone(); @@ -220,7 +252,6 @@ impl SeqScan { let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; let distinguish_range = self.properties.distinguish_partition_range; - let part_metrics = self.new_partition_metrics(metrics_set, partition); let stream = try_stream! { part_metrics.on_first_poll(); @@ -245,21 +276,13 @@ impl SeqScan { let mut fetch_start = Instant::now(); let mut reader = Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let cache = &stream_ctx.input.cache_strategy; + .await?; #[cfg(debug_assertions)] let mut checker = crate::read::BatchChecker::default() .with_start(Some(part_range.start)) .with_end(Some(part_range.end)); - while let Some(batch) = reader - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - { + while let Some(batch) = reader.next_batch().await? { metrics.scan_cost += fetch_start.elapsed(); metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); @@ -278,11 +301,8 @@ impl SeqScan { &batch, ); - let convert_start = Instant::now(); - let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); let yield_start = Instant::now(); - yield record_batch; + yield ScanBatch::Normal(batch); metrics.yield_cost += yield_start.elapsed(); fetch_start = Instant::now(); @@ -292,7 +312,7 @@ impl SeqScan { // The query engine can use this to optimize some queries. if distinguish_range { let yield_start = Instant::now(); - yield stream_ctx.input.mapper.empty_record_batch(); + yield ScanBatch::Normal(Batch::empty()); metrics.yield_cost += yield_start.elapsed(); } @@ -302,13 +322,7 @@ impl SeqScan { part_metrics.on_finish(); }; - - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.stream_ctx.input.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) + Ok(Box::pin(stream)) } fn new_semaphore(&self) -> Option> { @@ -368,6 +382,7 @@ impl RegionScanner for SeqScan { partition: usize, ) -> Result { self.scan_partition_impl(metrics_set, partition) + .map_err(BoxedError::new) } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index e198930384..4fab1fa52e 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -20,13 +20,12 @@ use std::time::{Duration, Instant}; use async_stream::try_stream; use common_error::ext::BoxedError; -use common_recordbatch::error::ExternalSnafu; use common_recordbatch::util::ChainedRecordBatchStream; -use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; -use datatypes::compute::concat_batches; use datatypes::schema::SchemaRef; +use futures::StreamExt; use smallvec::{smallvec, SmallVec}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; @@ -36,13 +35,14 @@ use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::Semaphore; use crate::error::{ - ComputeArrowSnafu, Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, - ScanMultiTimesSnafu, ScanSeriesSnafu, + Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu, + ScanSeriesSnafu, }; use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics}; use crate::read::seq_scan::{build_sources, SeqScan}; +use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; use crate::read::{Batch, ScannerMetrics}; /// Timeout to send a batch to a sender. @@ -89,71 +89,65 @@ impl SeriesScan { &self, metrics_set: &ExecutionPlanMetricsSet, partition: usize, - ) -> Result { - if partition >= self.properties.num_partitions() { - return Err(BoxedError::new( - PartitionOutOfRangeSnafu { - given: partition, - all: self.properties.num_partitions(), - } - .build(), - )); - } + ) -> Result { + let metrics = + new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list); + + let batch_stream = self.scan_batch_in_partition(partition, metrics.clone(), metrics_set)?; + + let input = &self.stream_ctx.input; + let record_batch_stream = ConvertBatchStream::new( + batch_stream, + input.mapper.clone(), + input.cache_strategy.clone(), + metrics, + ); + + Ok(Box::pin(RecordBatchStreamWrapper::new( + input.mapper.output_schema(), + Box::pin(record_batch_stream), + ))) + } + + fn scan_batch_in_partition( + &self, + partition: usize, + part_metrics: PartitionMetrics, + metrics_set: &ExecutionPlanMetricsSet, + ) -> Result { + ensure!( + partition < self.properties.num_partitions(), + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.num_partitions(), + } + ); self.maybe_start_distributor(metrics_set, &self.metrics_list); - let part_metrics = - new_partition_metrics(&self.stream_ctx, metrics_set, partition, &self.metrics_list); - let mut receiver = self.take_receiver(partition).map_err(BoxedError::new)?; - let stream_ctx = self.stream_ctx.clone(); - + let mut receiver = self.take_receiver(partition)?; let stream = try_stream! { part_metrics.on_first_poll(); - let cache = &stream_ctx.input.cache_strategy; - let mut df_record_batches = Vec::new(); let mut fetch_start = Instant::now(); - while let Some(result) = receiver.recv().await { + while let Some(series) = receiver.recv().await { + let series = series?; + let mut metrics = ScannerMetrics::default(); - let series = result.map_err(BoxedError::new).context(ExternalSnafu)?; metrics.scan_cost += fetch_start.elapsed(); fetch_start = Instant::now(); - let convert_start = Instant::now(); - df_record_batches.reserve(series.batches.len()); - for batch in series.batches { - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - - let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; - df_record_batches.push(record_batch.into_df_record_batch()); - } - - let output_schema = stream_ctx.input.mapper.output_schema(); - let df_record_batch = - concat_batches(output_schema.arrow_schema(), &df_record_batches) - .context(ComputeArrowSnafu) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - df_record_batches.clear(); - let record_batch = - RecordBatch::try_from_df_record_batch(output_schema, df_record_batch)?; - metrics.convert_cost += convert_start.elapsed(); + metrics.num_batches += series.batches.len(); + metrics.num_rows += series.batches.iter().map(|x| x.num_rows()).sum::(); let yield_start = Instant::now(); - yield record_batch; + yield ScanBatch::Series(series); metrics.yield_cost += yield_start.elapsed(); part_metrics.merge_metrics(&metrics); } }; - - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.stream_ctx.input.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) + Ok(Box::pin(stream)) } /// Takes the receiver for the partition. @@ -201,6 +195,26 @@ impl SeriesScan { let chained_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?; Ok(Box::pin(chained_stream)) } + + /// Scan [`Batch`] in all partitions one by one. + pub(crate) fn scan_all_partitions(&self) -> Result { + let metrics_set = ExecutionPlanMetricsSet::new(); + + let streams = (0..self.properties.partitions.len()) + .map(|partition| { + let metrics = new_partition_metrics( + &self.stream_ctx, + &metrics_set, + partition, + &self.metrics_list, + ); + + self.scan_batch_in_partition(partition, metrics, &metrics_set) + }) + .collect::>>()?; + + Ok(Box::pin(futures::stream::iter(streams).flatten())) + } } fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) { @@ -232,6 +246,7 @@ impl RegionScanner for SeriesScan { partition: usize, ) -> Result { self.scan_partition_impl(metrics_set, partition) + .map_err(BoxedError::new) } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { @@ -393,8 +408,8 @@ impl SeriesDistributor { /// Batches of the same series. #[derive(Default)] -struct SeriesBatch { - batches: SmallVec<[Batch; 4]>, +pub struct SeriesBatch { + pub batches: SmallVec<[Batch; 4]>, } impl SeriesBatch { diff --git a/src/mito2/src/read/stream.rs b/src/mito2/src/read/stream.rs new file mode 100644 index 0000000000..7bde4be6f6 --- /dev/null +++ b/src/mito2/src/read/stream.rs @@ -0,0 +1,120 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Instant; + +use common_error::ext::BoxedError; +use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu}; +use common_recordbatch::{DfRecordBatch, RecordBatch}; +use datatypes::compute; +use futures::stream::BoxStream; +use futures::{Stream, StreamExt}; +use snafu::ResultExt; + +use crate::cache::CacheStrategy; +use crate::error::Result; +use crate::read::projection::ProjectionMapper; +use crate::read::scan_util::PartitionMetrics; +use crate::read::series_scan::SeriesBatch; +use crate::read::Batch; + +/// All kinds of [`Batch`]es to produce in scanner. +pub enum ScanBatch { + Normal(Batch), + Series(SeriesBatch), +} + +pub type ScanBatchStream = BoxStream<'static, Result>; + +/// A stream that takes [`ScanBatch`]es and produces (converts them to) [`RecordBatch`]es. +pub(crate) struct ConvertBatchStream { + inner: ScanBatchStream, + projection_mapper: Arc, + cache_strategy: CacheStrategy, + partition_metrics: PartitionMetrics, + buffer: Vec, +} + +impl ConvertBatchStream { + pub(crate) fn new( + inner: ScanBatchStream, + projection_mapper: Arc, + cache_strategy: CacheStrategy, + partition_metrics: PartitionMetrics, + ) -> Self { + Self { + inner, + projection_mapper, + cache_strategy, + partition_metrics, + buffer: Vec::new(), + } + } + + fn convert(&mut self, batch: ScanBatch) -> common_recordbatch::error::Result { + match batch { + ScanBatch::Normal(batch) => { + if batch.is_empty() { + Ok(self.projection_mapper.empty_record_batch()) + } else { + self.projection_mapper.convert(&batch, &self.cache_strategy) + } + } + ScanBatch::Series(series) => { + self.buffer.clear(); + self.buffer.reserve(series.batches.len()); + + for batch in series.batches { + let record_batch = self + .projection_mapper + .convert(&batch, &self.cache_strategy)?; + self.buffer.push(record_batch.into_df_record_batch()); + } + + let output_schema = self.projection_mapper.output_schema(); + let record_batch = + compute::concat_batches(output_schema.arrow_schema(), &self.buffer) + .context(ArrowComputeSnafu)?; + + RecordBatch::try_from_df_record_batch(output_schema, record_batch) + } + } + } +} + +impl Stream for ConvertBatchStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let batch = futures::ready!(self.inner.poll_next_unpin(cx)); + let Some(batch) = batch else { + return Poll::Ready(None); + }; + + let record_batch = match batch { + Ok(batch) => { + let start = Instant::now(); + let record_batch = self.convert(batch); + self.partition_metrics + .inc_convert_batch_cost(start.elapsed()); + record_batch + } + Err(e) => Err(BoxedError::new(e)).context(ExternalSnafu), + }; + Poll::Ready(Some(record_batch)) + } +} diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 3724075998..eb58c79562 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -20,13 +20,12 @@ use std::time::Instant; use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; -use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; -use snafu::ResultExt; +use snafu::ensure; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties}; @@ -36,6 +35,7 @@ use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList, }; +use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; use crate::read::{Batch, ScannerMetrics}; /// Scans a region without providing any output ordering guarantee. @@ -124,21 +124,25 @@ impl UnorderedScan { } } - fn scan_partition_impl( - &self, - metrics_set: &ExecutionPlanMetricsSet, - partition: usize, - ) -> Result { - if partition >= self.properties.partitions.len() { - return Err(BoxedError::new( - PartitionOutOfRangeSnafu { - given: partition, - all: self.properties.partitions.len(), - } - .build(), - )); - } + /// Scan [`Batch`] in all partitions one by one. + pub(crate) fn scan_all_partitions(&self) -> Result { + let metrics_set = ExecutionPlanMetricsSet::new(); + let streams = (0..self.properties.partitions.len()) + .map(|partition| { + let metrics = self.partition_metrics(partition, &metrics_set); + self.scan_batch_in_partition(partition, metrics) + }) + .collect::>>()?; + + Ok(Box::pin(futures::stream::iter(streams).flatten())) + } + + fn partition_metrics( + &self, + partition: usize, + metrics_set: &ExecutionPlanMetricsSet, + ) -> PartitionMetrics { let part_metrics = PartitionMetrics::new( self.stream_ctx.input.mapper.metadata().region_id, partition, @@ -147,6 +151,45 @@ impl UnorderedScan { metrics_set, ); self.metrics_list.set(partition, part_metrics.clone()); + part_metrics + } + + fn scan_partition_impl( + &self, + metrics_set: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + let metrics = self.partition_metrics(partition, metrics_set); + + let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?; + + let input = &self.stream_ctx.input; + let record_batch_stream = ConvertBatchStream::new( + batch_stream, + input.mapper.clone(), + input.cache_strategy.clone(), + metrics, + ); + + Ok(Box::pin(RecordBatchStreamWrapper::new( + input.mapper.output_schema(), + Box::pin(record_batch_stream), + ))) + } + + fn scan_batch_in_partition( + &self, + partition: usize, + part_metrics: PartitionMetrics, + ) -> Result { + ensure!( + partition < self.properties.partitions.len(), + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + ); + let stream_ctx = self.stream_ctx.clone(); let part_ranges = self.properties.partitions[partition].clone(); let distinguish_range = self.properties.distinguish_partition_range; @@ -154,7 +197,6 @@ impl UnorderedScan { let stream = try_stream! { part_metrics.on_first_poll(); - let cache = &stream_ctx.input.cache_strategy; let range_builder_list = Arc::new(RangeBuilderList::new( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), @@ -175,7 +217,7 @@ impl UnorderedScan { range_builder_list.clone(), ); for await batch in stream { - let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?; + let batch = batch?; metrics.scan_cost += fetch_start.elapsed(); metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); @@ -194,11 +236,8 @@ impl UnorderedScan { &batch, ); - let convert_start = Instant::now(); - let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); let yield_start = Instant::now(); - yield record_batch; + yield ScanBatch::Normal(batch); metrics.yield_cost += yield_start.elapsed(); fetch_start = Instant::now(); @@ -208,22 +247,15 @@ impl UnorderedScan { // The query engine can use this to optimize some queries. if distinguish_range { let yield_start = Instant::now(); - yield stream_ctx.input.mapper.empty_record_batch(); + yield ScanBatch::Normal(Batch::empty()); metrics.yield_cost += yield_start.elapsed(); } metrics.scan_cost += fetch_start.elapsed(); part_metrics.merge_metrics(&metrics); } - - part_metrics.on_finish(); }; - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.stream_ctx.input.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) + Ok(Box::pin(stream)) } } @@ -251,6 +283,7 @@ impl RegionScanner for UnorderedScan { partition: usize, ) -> Result { self.scan_partition_impl(metrics_set, partition) + .map_err(BoxedError::new) } fn has_predicate(&self) -> bool { diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index aa9740e395..99498bf316 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -16,11 +16,12 @@ use std::pin::Pin; use std::task::{Context, Poll}; use arrow_flight::FlightData; +use common_error::ext::ErrorExt; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::tracing::{info_span, Instrument}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::warn; +use common_telemetry::{error, warn}; use futures::channel::mpsc; use futures::channel::mpsc::Sender; use futures::{SinkExt, Stream, StreamExt}; @@ -90,6 +91,10 @@ impl FlightRecordBatchStream { } } Err(e) => { + if e.status_code().should_log_error() { + error!("{e:?}"); + } + let e = Err(e).context(error::CollectRecordbatchSnafu); if let Err(e) = tx.send(e.map_err(|x| x.into())).await { warn!(e; "stop sending Flight data"); diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 4e1292cc3d..8e7f5c1c53 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -67,14 +67,34 @@ pub struct ScanRequest { impl Display for ScanRequest { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "ScanRequest {{")?; + enum Delimiter { + None, + Init, + } + + impl Delimiter { + fn as_str(&mut self) -> &str { + match self { + Delimiter::None => { + *self = Delimiter::Init; + "" + } + Delimiter::Init => ", ", + } + } + } + + let mut delimiter = Delimiter::None; + + write!(f, "ScanRequest {{ ")?; if let Some(projection) = &self.projection { - write!(f, "projection: {:?},", projection)?; + write!(f, "{}projection: {:?}", delimiter.as_str(), projection)?; } if !self.filters.is_empty() { write!( f, - ", filters: [{}]", + "{}filters: [{}]", + delimiter.as_str(), self.filters .iter() .map(|f| f.to_string()) @@ -83,23 +103,90 @@ impl Display for ScanRequest { )?; } if let Some(output_ordering) = &self.output_ordering { - write!(f, ", output_ordering: {:?}", output_ordering)?; + write!( + f, + "{}output_ordering: {:?}", + delimiter.as_str(), + output_ordering + )?; } if let Some(limit) = &self.limit { - write!(f, ", limit: {}", limit)?; + write!(f, "{}limit: {}", delimiter.as_str(), limit)?; } if let Some(series_row_selector) = &self.series_row_selector { - write!(f, ", series_row_selector: {}", series_row_selector)?; + write!( + f, + "{}series_row_selector: {}", + delimiter.as_str(), + series_row_selector + )?; } if let Some(sequence) = &self.sequence { - write!(f, ", sequence: {}", sequence)?; + write!(f, "{}sequence: {}", delimiter.as_str(), sequence)?; } if let Some(sst_min_sequence) = &self.sst_min_sequence { - write!(f, ", sst_min_sequence: {}", sst_min_sequence)?; + write!( + f, + "{}sst_min_sequence: {}", + delimiter.as_str(), + sst_min_sequence + )?; } if let Some(distribution) = &self.distribution { - write!(f, ", distribution: {}", distribution)?; + write!(f, "{}distribution: {}", delimiter.as_str(), distribution)?; } - write!(f, "}}") + write!(f, " }}") + } +} + +#[cfg(test)] +mod tests { + use datafusion_expr::{binary_expr, col, lit, Operator}; + + use super::*; + + #[test] + fn test_display_scan_request() { + let request = ScanRequest { + ..Default::default() + }; + assert_eq!(request.to_string(), "ScanRequest { }"); + + let request = ScanRequest { + projection: Some(vec![1, 2]), + filters: vec![ + binary_expr(col("i"), Operator::Gt, lit(1)), + binary_expr(col("s"), Operator::Eq, lit("x")), + ], + limit: Some(10), + ..Default::default() + }; + assert_eq!( + request.to_string(), + r#"ScanRequest { projection: [1, 2], filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"# + ); + + let request = ScanRequest { + filters: vec![ + binary_expr(col("i"), Operator::Gt, lit(1)), + binary_expr(col("s"), Operator::Eq, lit("x")), + ], + limit: Some(10), + ..Default::default() + }; + assert_eq!( + request.to_string(), + r#"ScanRequest { filters: [i > Int32(1), s = Utf8("x")], limit: 10 }"# + ); + + let request = ScanRequest { + projection: Some(vec![1, 2]), + limit: Some(10), + ..Default::default() + }; + assert_eq!( + request.to_string(), + "ScanRequest { projection: [1, 2], limit: 10 }" + ); } }