diff --git a/Cargo.lock b/Cargo.lock index 44d4980e1d..d77a2da3ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10828,6 +10828,7 @@ dependencies = [ "common-error", "common-macro", "common-recordbatch", + "common-time", "common-wal", "datafusion-expr 38.0.0", "datafusion-physical-plan 38.0.0", diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index 166a0922e5..271310e80f 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -105,6 +105,10 @@ impl BoxedError { inner: Box::new(err), } } + + pub fn into_inner(self) -> Box { + self.inner + } } impl std::fmt::Debug for BoxedError { diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 745c2280de..4f9136333c 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -20,6 +20,7 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use datafusion_common::ScalarValue; use datatypes::prelude::ConcreteDataType; +use datatypes::schema::SchemaRef; use snafu::{Location, Snafu}; pub type Result = std::result::Result; @@ -138,12 +139,28 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Cannot construct an empty stream"))] + EmptyStream { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Schema not match, left: {:?}, right: {:?}", left, right))] + SchemaNotMatch { + left: SchemaRef, + right: SchemaRef, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - Error::NewDfRecordBatch { .. } => StatusCode::InvalidArguments, + Error::NewDfRecordBatch { .. } + | Error::EmptyStream { .. } + | Error::SchemaNotMatch { .. } => StatusCode::InvalidArguments, Error::DataTypes { .. } | Error::CreateRecordBatches { .. } diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs index 723a0f9dca..8732a47e03 100644 --- a/src/common/recordbatch/src/util.rs +++ b/src/common/recordbatch/src/util.rs @@ -12,10 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use futures::TryStreamExt; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; -use crate::error::Result; -use crate::{RecordBatch, RecordBatches, SendableRecordBatchStream}; +use arc_swap::ArcSwapOption; +use datatypes::schema::SchemaRef; +use futures::{Stream, StreamExt, TryStreamExt}; +use snafu::ensure; + +use crate::adapter::RecordBatchMetrics; +use crate::error::{EmptyStreamSnafu, Result, SchemaNotMatchSnafu}; +use crate::{ + OrderOption, RecordBatch, RecordBatchStream, RecordBatches, SendableRecordBatchStream, +}; /// Collect all the items from the stream into a vector of [`RecordBatch`]. pub async fn collect(stream: SendableRecordBatchStream) -> Result> { @@ -29,6 +39,91 @@ pub async fn collect_batches(stream: SendableRecordBatchStream) -> Result, + curr_index: usize, + schema: SchemaRef, + metrics: Arc>, +} + +impl ChainedRecordBatchStream { + pub fn new(inputs: Vec) -> Result { + // check length + ensure!(!inputs.is_empty(), EmptyStreamSnafu); + + // check schema + let first_schema = inputs[0].schema(); + for input in inputs.iter().skip(1) { + let schema = input.schema(); + ensure!( + first_schema == schema, + SchemaNotMatchSnafu { + left: first_schema, + right: schema + } + ); + } + + Ok(Self { + inputs, + curr_index: 0, + schema: first_schema, + metrics: Default::default(), + }) + } + + fn sequence_poll( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll>> { + if self.curr_index >= self.inputs.len() { + return Poll::Ready(None); + } + + let curr_index = self.curr_index; + match self.inputs[curr_index].poll_next_unpin(ctx) { + Poll::Ready(Some(Ok(batch))) => Poll::Ready(Some(Ok(batch))), + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => { + self.curr_index += 1; + if self.curr_index < self.inputs.len() { + self.sequence_poll(ctx) + } else { + Poll::Ready(None) + } + } + Poll::Pending => Poll::Pending, + } + } +} + +impl RecordBatchStream for ChainedRecordBatchStream { + fn name(&self) -> &str { + "ChainedRecordBatchStream" + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + None + } + + fn metrics(&self) -> Option { + self.metrics.load().as_ref().map(|m| m.as_ref().clone()) + } +} + +impl Stream for ChainedRecordBatchStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + self.sequence_poll(ctx) + } +} + #[cfg(test)] mod tests { use std::pin::Pin; diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 4ecfba9019..22e5f96e7b 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -90,7 +90,7 @@ impl RegionEngine for FileRegionEngine { request: ScanRequest, ) -> Result { let stream = self.handle_query(region_id, request).await?; - let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let scanner = Box::new(SinglePartitionScanner::new(stream)); Ok(scanner) } diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index f12e29aed0..f4e386a053 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -32,14 +32,11 @@ use api::region::RegionResponse; use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; -use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; -use store_api::region_engine::{ - RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner, -}; +use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -174,9 +171,7 @@ impl RegionEngine for MetricEngine { region_id: RegionId, request: ScanRequest, ) -> Result { - let stream = self.handle_query(region_id, request).await?; - let scanner = Arc::new(SinglePartitionScanner::new(stream)); - Ok(scanner) + self.handle_query(region_id, request).await } /// Retrieves region's metadata. @@ -269,7 +264,7 @@ impl MetricEngine { &self, region_id: RegionId, request: ScanRequest, - ) -> Result { + ) -> Result { self.inner .read_region(region_id, request) .await @@ -277,6 +272,17 @@ impl MetricEngine { } } +#[cfg(test)] +impl MetricEngine { + pub async fn scan_to_stream( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + self.inner.scan_to_stream(region_id, request).await + } +} + struct MetricEngineInner { mito: MitoEngine, metadata_region: MetadataRegion, diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 9534a69f7d..1768d141cb 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -235,7 +235,7 @@ mod tests { let request = ScanRequest::default(); let stream = env .metric() - .handle_query(physical_region_id, request) + .scan_to_stream(physical_region_id, request) .await .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -255,7 +255,7 @@ mod tests { let request = ScanRequest::default(); let stream = env .metric() - .handle_query(logical_region_id, request) + .scan_to_stream(logical_region_id, request) .await .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 0b8f680c18..af01e1bb9e 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -15,13 +15,12 @@ use std::sync::Arc; use api::v1::SemanticType; -use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info, tracing}; use datafusion::logical_expr::{self, Expr}; use snafu::{OptionExt, ResultExt}; use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef}; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; -use store_api::region_engine::RegionEngine; +use store_api::region_engine::{RegionEngine, RegionScannerRef}; use store_api::storage::{RegionId, ScanRequest}; use crate::engine::MetricEngineInner; @@ -37,7 +36,7 @@ impl MetricEngineInner { &self, region_id: RegionId, request: ScanRequest, - ) -> Result { + ) -> Result { let is_reading_physical_region = self.is_physical_region(region_id); if is_reading_physical_region { @@ -55,13 +54,13 @@ impl MetricEngineInner { &self, region_id: RegionId, request: ScanRequest, - ) -> Result { + ) -> Result { let _timer = MITO_OPERATION_ELAPSED .with_label_values(&["read_physical"]) .start_timer(); self.mito - .scan_to_stream(region_id, request) + .handle_query(region_id, request) .await .context(MitoReadOperationSnafu) } @@ -70,7 +69,7 @@ impl MetricEngineInner { &self, logical_region_id: RegionId, request: ScanRequest, - ) -> Result { + ) -> Result { let _timer = MITO_OPERATION_ELAPSED .with_label_values(&["read"]) .start_timer(); @@ -81,7 +80,7 @@ impl MetricEngineInner { .transform_request(physical_region_id, logical_region_id, request) .await?; self.mito - .scan_to_stream(data_region_id, request) + .handle_query(data_region_id, request) .await .context(MitoReadOperationSnafu) } @@ -253,6 +252,37 @@ impl MetricEngineInner { } } +#[cfg(test)] +impl MetricEngineInner { + pub async fn scan_to_stream( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + let is_reading_physical_region = self.is_physical_region(region_id); + + if is_reading_physical_region { + self.mito + .scan_to_stream(region_id, request) + .await + .map_err(common_error::ext::BoxedError::new) + } else { + let physical_region_id = self + .get_physical_region_id(region_id) + .await + .map_err(common_error::ext::BoxedError::new)?; + let request = self + .transform_request(physical_region_id, region_id, request) + .await + .map_err(common_error::ext::BoxedError::new)?; + self.mito + .scan_to_stream(physical_region_id, request) + .await + .map_err(common_error::ext::BoxedError::new) + } + } +} + #[cfg(test)] mod test { use store_api::region_request::RegionRequest; diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 546b2c21e1..86324ec9f0 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -135,6 +135,9 @@ impl MitoEngine { } /// Handle substrait query and return a stream of record batches + /// + /// Notice that the output stream's ordering is not guranateed. If order + /// matter, please use [`scanner`] to build a [`Scanner`] to consume. #[tracing::instrument(skip_all)] pub async fn scan_to_stream( &self, diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index c9320f75e0..7b31f7cfc0 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -697,6 +697,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Partition {} out of range, {} in total", given, all))] + PartitionOutOfRange { + given: usize, + all: usize, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to iter data part"))] ReadDataPart { #[snafu(source)] @@ -796,7 +804,8 @@ impl ErrorExt for Error { | ColumnNotFound { .. } | InvalidMetadata { .. } | InvalidRegionOptions { .. } - | InvalidWalReadRequest { .. } => StatusCode::InvalidArguments, + | InvalidWalReadRequest { .. } + | PartitionOutOfRange { .. } => StatusCode::InvalidArguments, InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated, diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d4200fcb73..4fe783ce9e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -68,8 +68,8 @@ impl Scanner { /// Returns a [RegionScanner] to scan the region. pub(crate) async fn region_scanner(self) -> Result { match self { - Scanner::Seq(seq_scan) => Ok(Arc::new(seq_scan)), - Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)), + Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)), + Scanner::Unordered(unordered_scan) => Ok(Box::new(unordered_scan)), } } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 532d051fa2..17151b624d 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -21,18 +21,19 @@ use std::time::Instant; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::debug; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use smallvec::smallvec; use snafu::ResultExt; -use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties}; +use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; use store_api::storage::ColumnId; use table::predicate::Predicate; use tokio::sync::Semaphore; -use crate::error::Result; +use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::memtable::MemtableRef; use crate::read::dedup::{DedupReader, LastRow}; use crate::read::merge::MergeReaderBuilder; @@ -46,7 +47,8 @@ use crate::sst::parquet::reader::ReaderMetrics; /// Scans a region and returns rows in a sorted sequence. /// -/// The output order is always `order by primary keys, time index`. +/// The output order is always `order by primary keys, time index` inside every +/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s. pub struct SeqScan { /// Properties of the scanner. properties: ScannerProperties, @@ -61,7 +63,7 @@ impl SeqScan { /// Creates a new [SeqScan]. pub(crate) fn new(input: ScanInput) -> Self { let parallelism = input.parallelism.parallelism.max(1); - let properties = ScannerProperties::new(ScannerPartitioning::Unknown(parallelism)); + let properties = ScannerProperties::new_with_partitions(parallelism); let stream_ctx = Arc::new(StreamContext::new(input)); Self { @@ -72,8 +74,16 @@ impl SeqScan { } /// Builds a stream for the query. + /// + /// The returned stream is not partitioned and will contains all the data. If want + /// partitioned scan, use [`RegionScanner::scan_partition`]. pub fn build_stream(&self) -> Result { - self.scan_partition_opt(None) + let streams = (0..self.properties.partitions.len()) + .map(|partition: usize| self.scan_partition(partition)) + .collect::, _>>()?; + + let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?; + Ok(Box::pin(aggr_stream)) } /// Builds a [BoxedBatchReader] from sequential scan for compaction. @@ -83,7 +93,7 @@ impl SeqScan { ..Default::default() }; let maybe_reader = - Self::build_merge_reader(&self.stream_ctx, None, self.semaphore.clone(), &mut metrics) + Self::build_all_merge_reader(&self.stream_ctx, self.semaphore.clone(), &mut metrics) .await?; // Safety: `build_merge_reader()` always returns a reader if partition is None. let reader = maybe_reader.unwrap(); @@ -137,32 +147,56 @@ impl SeqScan { Ok(()) } - /// Builds a merge reader. - /// If `partition` is None, reads all partitions. - /// If the `partition` is out of bound, returns None. - async fn build_merge_reader( + /// Builds a merge reader that reads all data. + async fn build_all_merge_reader( stream_ctx: &StreamContext, - partition: Option, semaphore: Arc, metrics: &mut ScannerMetrics, ) -> Result> { + // initialize parts list let mut parts = stream_ctx.parts.lock().await; - maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + let parts_len = parts.len(); - let mut sources = Vec::new(); - if let Some(index) = partition { - let Some(part) = parts.get_part(index) else { + let mut sources = Vec::with_capacity(parts_len); + for id in 0..parts_len { + let Some(part) = parts.get_part(id) else { return Ok(None); }; Self::build_part_sources(part, &mut sources)?; - } else { - // Safety: We initialized parts before. - for part in parts.0.as_ref().unwrap() { - Self::build_part_sources(part, &mut sources)?; - } } + Self::build_reader_from_sources(stream_ctx, sources, semaphore).await + } + + /// Builds a merge reader that reads data from one [`PartitionRange`]. + /// + /// If the `range_id` is out of bound, returns None. + async fn build_merge_reader( + stream_ctx: &StreamContext, + range_id: usize, + semaphore: Arc, + metrics: &mut ScannerMetrics, + ) -> Result> { + let mut parts = stream_ctx.parts.lock().await; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + + let mut sources = Vec::new(); + let Some(part) = parts.get_part(range_id) else { + return Ok(None); + }; + + Self::build_part_sources(part, &mut sources)?; + + Self::build_reader_from_sources(stream_ctx, sources, semaphore).await + } + + async fn build_reader_from_sources( + stream_ctx: &StreamContext, + mut sources: Vec, + semaphore: Arc, + ) -> Result> { if stream_ctx.input.parallelism.parallelism > 1 { // Read sources in parallel. We always spawn a task so we can control the parallelism // by the semaphore. @@ -187,52 +221,71 @@ impl SeqScan { } } - /// Scans one partition or all partitions. - fn scan_partition_opt( + /// Scans the given partition when the part list is set properly. + /// Otherwise the returned stream might not contains any data. + // TODO: refactor out `uncached_scan_part_impl`. + #[allow(dead_code)] + fn scan_partition_impl( &self, - partition: Option, + partition: usize, ) -> Result { + if partition >= self.properties.partitions.len() { + return Err(BoxedError::new( + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + .build(), + )); + } + let mut metrics = ScannerMetrics { prepare_scan_cost: self.stream_ctx.prepare_scan_cost, ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); + let partition_ranges = self.properties.partitions[partition].clone(); let stream = try_stream! { - let maybe_reader = Self::build_merge_reader(&stream_ctx, partition, semaphore, &mut metrics) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let Some(mut reader) = maybe_reader else { - return; - }; - let cache = stream_ctx.input.cache_manager.as_deref(); - let mut fetch_start = Instant::now(); - while let Some(batch) = reader - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - { + for partition_range in partition_ranges { + let maybe_reader = + Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let Some(mut reader) = maybe_reader else { + return; + }; + let cache = stream_ctx.input.cache_manager.as_deref(); + let mut fetch_start = Instant::now(); + while let Some(batch) = reader + .next_batch() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + + let convert_start = Instant::now(); + let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; + metrics.convert_cost += convert_start.elapsed(); + yield record_batch; + + fetch_start = Instant::now(); + } metrics.scan_cost += fetch_start.elapsed(); - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); + metrics.total_cost = stream_ctx.query_start.elapsed(); + metrics.observe_metrics_on_finish(); - let convert_start = Instant::now(); - let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); - yield record_batch; - - fetch_start = Instant::now(); + debug!( + "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}", + stream_ctx.input.mapper.metadata().region_id, + partition, + metrics, + ); } - metrics.scan_cost += fetch_start.elapsed(); - metrics.total_cost = stream_ctx.query_start.elapsed(); - metrics.observe_metrics_on_finish(); - - debug!( - "Seq scan finished, region_id: {:?}, partition: {:?}, metrics: {:?}", - stream_ctx.input.mapper.metadata().region_id, partition, metrics, - ); }; let stream = Box::pin(RecordBatchStreamWrapper::new( @@ -242,6 +295,116 @@ impl SeqScan { Ok(stream) } + + /// Scans the given partition when the part list is not set. + /// This method will do a lazy initialize of part list and + /// ignores the partition settings in `properties`. + fn uncached_scan_part_impl( + &self, + partition: usize, + ) -> Result { + let num_partitions = self.properties.partitions.len(); + if partition >= num_partitions { + return Err(BoxedError::new( + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + .build(), + )); + } + let mut metrics = ScannerMetrics { + prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + ..Default::default() + }; + let stream_ctx = self.stream_ctx.clone(); + let semaphore = self.semaphore.clone(); + + // build stream + let stream = try_stream! { + // init parts + let parts_len = { + let mut parts = stream_ctx.parts.lock().await; + Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics).await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + parts.len() + }; + + for id in (0..parts_len).skip(partition).step_by(num_partitions) { + let maybe_reader = Self::build_merge_reader( + &stream_ctx, + id, + semaphore.clone(), + &mut metrics, + ) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let Some(mut reader) = maybe_reader else { + return; + }; + let cache = stream_ctx.input.cache_manager.as_deref(); + let mut fetch_start = Instant::now(); + while let Some(batch) = reader + .next_batch() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? + { + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + + let convert_start = Instant::now(); + let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; + metrics.convert_cost += convert_start.elapsed(); + yield record_batch; + + fetch_start = Instant::now(); + } + metrics.scan_cost += fetch_start.elapsed(); + metrics.total_cost = stream_ctx.query_start.elapsed(); + metrics.observe_metrics_on_finish(); + + debug!( + "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}", + stream_ctx.input.mapper.metadata().region_id, + partition, + metrics, + ); + } + }; + + let stream = Box::pin(RecordBatchStreamWrapper::new( + self.stream_ctx.input.mapper.output_schema(), + Box::pin(stream), + )); + + Ok(stream) + } + + /// Initializes parts if they are not built yet. + async fn maybe_init_parts( + input: &ScanInput, + part_list: &mut ScanPartList, + metrics: &mut ScannerMetrics, + ) -> Result<()> { + if part_list.is_none() { + let now = Instant::now(); + let mut distributor = SeqDistributor::default(); + input.prune_file_ranges(&mut distributor).await?; + distributor.append_mem_ranges( + &input.memtables, + Some(input.mapper.column_ids()), + input.predicate.clone(), + ); + part_list.set_parts(distributor.build_parts(input.parallelism.parallelism)); + + metrics.observe_init_part(now.elapsed()); + } + Ok(()) + } } impl RegionScanner for SeqScan { @@ -254,7 +417,12 @@ impl RegionScanner for SeqScan { } fn scan_partition(&self, partition: usize) -> Result { - self.scan_partition_opt(Some(partition)) + self.uncached_scan_part_impl(partition) + } + + fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + self.properties.partitions = ranges; + Ok(()) } } @@ -282,28 +450,6 @@ impl SeqScan { } } -/// Initializes parts if they are not built yet. -async fn maybe_init_parts( - input: &ScanInput, - part_list: &mut ScanPartList, - metrics: &mut ScannerMetrics, -) -> Result<()> { - if part_list.is_none() { - let now = Instant::now(); - let mut distributor = SeqDistributor::default(); - input.prune_file_ranges(&mut distributor).await?; - distributor.append_mem_ranges( - &input.memtables, - Some(input.mapper.column_ids()), - input.predicate.clone(), - ); - part_list.set_parts(distributor.build_parts(input.parallelism.parallelism)); - - metrics.observe_init_part(now.elapsed()); - } - Ok(()) -} - /// Builds [ScanPart]s that preserves order. #[derive(Default)] pub(crate) struct SeqDistributor { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 5950094b0d..62a173d86c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -28,7 +28,7 @@ use datatypes::schema::SchemaRef; use futures::StreamExt; use smallvec::smallvec; use snafu::ResultExt; -use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties}; +use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -58,9 +58,8 @@ pub struct UnorderedScan { impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { - let properties = ScannerProperties::new(ScannerPartitioning::Unknown( - input.parallelism.parallelism.max(1), - )); + let properties = + ScannerProperties::new_with_partitions(input.parallelism.parallelism.max(1)); let stream_ctx = Arc::new(StreamContext::new(input)); Self { @@ -71,7 +70,7 @@ impl UnorderedScan { /// Scans the region and returns a stream. pub(crate) async fn build_stream(&self) -> Result { - let part_num = self.properties.partitioning().num_partitions(); + let part_num = self.properties.num_partitions(); let streams = (0..part_num) .map(|i| self.scan_partition(i)) .collect::, BoxedError>>()?; @@ -135,6 +134,11 @@ impl RegionScanner for UnorderedScan { self.stream_ctx.input.mapper.output_schema() } + fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + self.properties = ScannerProperties::new(ranges); + Ok(()) + } + fn scan_partition(&self, partition: usize) -> Result { let mut metrics = ScannerMetrics { prepare_scan_cost: self.stream_ctx.prepare_scan_cost, diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 4ac5a7c10a..0d4553c944 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -15,6 +15,7 @@ #![feature(let_chains)] #![feature(int_roundings)] #![feature(option_get_or_insert_default)] +#![feature(trait_upcasting)] mod analyze; pub mod dataframe; diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 1cb54c7126..15b63d5784 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -14,6 +14,7 @@ pub mod count_wildcard; pub mod order_hint; +pub mod parallelize_scan; pub mod remove_duplicate; pub mod string_normalization; #[cfg(test)] diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs new file mode 100644 index 0000000000..fa20ca6d38 --- /dev/null +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -0,0 +1,97 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_telemetry::debug; +use datafusion::config::ConfigOptions; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{DataFusionError, Result}; +use store_api::region_engine::PartitionRange; +use table::table::scan::RegionScanExec; + +pub struct ParallelizeScan; + +impl PhysicalOptimizerRule for ParallelizeScan { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + Self::do_optimize(plan, config) + } + + fn name(&self) -> &str { + "parallelize_scan" + } + + fn schema_check(&self) -> bool { + true + } +} + +impl ParallelizeScan { + fn do_optimize( + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + let result = plan + .transform_down(|plan| { + if let Some(region_scan_exec) = plan.as_any().downcast_ref::() { + let ranges = region_scan_exec.get_partition_ranges(); + let total_range_num = ranges.len(); + let expected_partition_num = config.execution.target_partitions; + + // assign ranges to each partition + let partition_ranges = + Self::assign_partition_range(ranges, expected_partition_num); + debug!( + "Assign {total_range_num} ranges to {expected_partition_num} partitions" + ); + + // update the partition ranges + region_scan_exec + .set_partitions(partition_ranges) + .map_err(|e| DataFusionError::External(e.into_inner()))?; + } + + // The plan might be modified, but it's modified in-place so we always return + // Transformed::no(plan) to indicate there is no "new child" + Ok(Transformed::no(plan)) + })? + .data; + + Ok(result) + } + + /// Distribute [`PartitionRange`]s to each partition. + /// + /// Currently we use a simple round-robin strategy to assign ranges to partitions. + fn assign_partition_range( + ranges: Vec, + expected_partition_num: usize, + ) -> Vec> { + let mut partition_ranges = vec![vec![]; expected_partition_num]; + + // round-robin assignment + for (i, range) in ranges.into_iter().enumerate() { + let partition_idx = i % expected_partition_num; + partition_ranges[partition_idx].push(range); + } + + partition_ranges + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 51b3f82ef2..7d164af520 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -43,6 +43,7 @@ use table::TableRef; use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer}; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; use crate::optimizer::order_hint::OrderHintRule; +use crate::optimizer::parallelize_scan::ParallelizeScan; use crate::optimizer::remove_duplicate::RemoveDuplicate; use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; @@ -112,6 +113,11 @@ impl QueryEngineState { // add physical optimizer let mut physical_optimizer = PhysicalOptimizer::new(); + // Change TableScan's partition at first + physical_optimizer + .rules + .insert(0, Arc::new(ParallelizeScan)); + // Add rule to remove duplicate nodes generated by other rules. Run this in the last. physical_optimizer.rules.push(Arc::new(RemoveDuplicate)); let session_state = SessionState::new_with_config_rt(session_config, runtime_env) diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index dd49634830..9e8143a9ec 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -15,6 +15,7 @@ common-base.workspace = true common-error.workspace = true common-macro.workspace = true common-recordbatch.workspace = true +common-time.workspace = true common-wal.workspace = true datafusion-expr.workspace = true datafusion-physical-plan.workspace = true diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 7f88fe7857..e37f69f441 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -24,6 +24,7 @@ use async_trait::async_trait; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use common_recordbatch::SendableRecordBatchStream; +use common_time::Timestamp; use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::future::join_all; @@ -141,42 +142,71 @@ impl ScannerPartitioning { } } +/// Represents one data range within a partition +#[derive(Debug, Clone, Copy)] +pub struct PartitionRange { + /// Start time of time index column. Inclusive. + pub start: Timestamp, + /// End time of time index column. Inclusive. + pub end: Timestamp, + /// Estimate size of this range. Is used to balance ranges between partitions. + /// No base unit, just a number. + pub estimated_size: usize, + /// Identifier to this range. Assigned by storage engine. + pub identifier: usize, +} + /// Properties of the [RegionScanner]. #[derive(Debug)] pub struct ScannerProperties { - /// Partitions to scan. - partitioning: ScannerPartitioning, + /// A 2-dim partition ranges. + /// + /// The first dim vector's length represents the output partition number. The second + /// dim is ranges within one partition. + pub partitions: Vec>, } impl ScannerProperties { - /// Creates a new [ScannerProperties] with the given partitioning. - pub fn new(partitioning: ScannerPartitioning) -> Self { - Self { partitioning } + /// Creates a new [`ScannerProperties`] with the given partitioning. + pub fn new(partitions: Vec>) -> Self { + Self { partitions } } - /// Returns properties of partitions to scan. - pub fn partitioning(&self) -> &ScannerPartitioning { - &self.partitioning + /// Creates a new [`ScannerProperties`] with the given number of partitions. + pub fn new_with_partitions(partitions: usize) -> Self { + Self { + partitions: vec![vec![]; partitions], + } + } + + pub fn num_partitions(&self) -> usize { + self.partitions.len() } } /// A scanner that provides a way to scan the region concurrently. /// The scanner splits the region into partitions so that each partition can be scanned concurrently. -/// You can use this trait to implement an [ExecutionPlan](datafusion_physical_plan::ExecutionPlan). -pub trait RegionScanner: Debug + DisplayAs + Send + Sync { +/// You can use this trait to implement an [`ExecutionPlan`](datafusion_physical_plan::ExecutionPlan). +pub trait RegionScanner: Debug + DisplayAs + Send { /// Returns the properties of the scanner. fn properties(&self) -> &ScannerProperties; /// Returns the schema of the record batches. fn schema(&self) -> SchemaRef; + /// Prepares the scanner with the given partition ranges. + /// + /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. + fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError>; + /// Scans the partition and returns a stream of record batches. + /// /// # Panics /// Panics if the `partition` is out of bound. fn scan_partition(&self, partition: usize) -> Result; } -pub type RegionScannerRef = Arc; +pub type RegionScannerRef = Box; pub type BatchResponses = Vec<(RegionId, Result)>; @@ -272,7 +302,7 @@ impl SinglePartitionScanner { Self { stream: Mutex::new(Some(stream)), schema, - properties: ScannerProperties::new(ScannerPartitioning::Unknown(1)), + properties: ScannerProperties::new_with_partitions(1), } } } @@ -292,6 +322,11 @@ impl RegionScanner for SinglePartitionScanner { self.schema.clone() } + fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { + self.properties = ScannerProperties::new(ranges); + Ok(()) + } + fn scan_partition(&self, _partition: usize) -> Result { let mut stream = self.stream.lock().unwrap(); stream.take().ok_or_else(|| { diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 565d3d7e62..8df1fe9d8b 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -110,7 +110,7 @@ impl TableProvider for DfTableProviderAdapter { .collect::>() }); - let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let scanner = Box::new(SinglePartitionScanner::new(stream)); let mut plan = RegionScanExec::new(scanner); if let Some(sort_expr) = sort_expr { plan = plan.with_output_ordering(sort_expr); diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index df71efeaea..70e76586b6 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -14,9 +14,10 @@ use std::any::Any; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; +use common_error::ext::BoxedError; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; @@ -31,14 +32,14 @@ use datafusion_common::DataFusionError; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::{Stream, StreamExt}; -use store_api::region_engine::RegionScannerRef; +use store_api::region_engine::{PartitionRange, RegionScannerRef}; use crate::table::metrics::MemoryUsageMetrics; /// A plan to read multiple partitions from a region of a table. #[derive(Debug)] pub struct RegionScanExec { - scanner: RegionScannerRef, + scanner: Mutex, arrow_schema: ArrowSchemaRef, /// The expected output ordering for the plan. output_ordering: Option>, @@ -50,7 +51,7 @@ impl RegionScanExec { pub fn new(scanner: RegionScannerRef) -> Self { let arrow_schema = scanner.schema().arrow_schema().clone(); let scanner_props = scanner.properties(); - let mut num_output_partition = scanner_props.partitioning().num_partitions(); + let mut num_output_partition = scanner_props.num_partitions(); // The meaning of word "partition" is different in different context. For datafusion // it's about "parallelism" and for storage it's about "data range". Thus here we add // a special case to handle the situation where the number of storage partition is 0. @@ -63,7 +64,7 @@ impl RegionScanExec { ExecutionMode::Bounded, ); Self { - scanner, + scanner: Mutex::new(scanner), arrow_schema, output_ordering: None, metric: ExecutionPlanMetricsSet::new(), @@ -76,6 +77,27 @@ impl RegionScanExec { self.output_ordering = Some(output_ordering); self } + + /// Get the partition ranges of the scanner. This method will collapse the ranges into + /// a single vector. + pub fn get_partition_ranges(&self) -> Vec { + let scanner = self.scanner.lock().unwrap(); + let raw_ranges = &scanner.properties().partitions; + + // collapse the ranges + let mut ranges = Vec::with_capacity(raw_ranges.len()); + for partition in raw_ranges { + ranges.extend_from_slice(partition); + } + + ranges + } + + /// Update the partition ranges of underlying scanner. + pub fn set_partitions(&self, partitions: Vec>) -> Result<(), BoxedError> { + let mut scanner = self.scanner.lock().unwrap(); + scanner.prepare(partitions) + } } impl ExecutionPlan for RegionScanExec { @@ -113,6 +135,8 @@ impl ExecutionPlan for RegionScanExec { let stream = self .scanner + .lock() + .unwrap() .scan_partition(partition) .map_err(|e| DataFusionError::External(Box::new(e)))?; let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); @@ -131,7 +155,10 @@ impl ExecutionPlan for RegionScanExec { impl DisplayAs for RegionScanExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { // The scanner contains all information needed to display the plan. - self.scanner.fmt_as(t, f) + match self.scanner.try_lock() { + Ok(scanner) => scanner.fmt_as(t, f), + Err(_) => write!(f, "RegionScanExec "), + } } } @@ -217,7 +244,7 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let scanner = Box::new(SinglePartitionScanner::new(stream)); let plan = RegionScanExec::new(scanner); let actual: SchemaRef = Arc::new( plan.properties diff --git a/tests/cases/standalone/common/tql-explain-analyze/explain.result b/tests/cases/standalone/common/tql-explain-analyze/explain.result index c1b55e665c..0cfe77f1a8 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/explain.result +++ b/tests/cases/standalone/common/tql-explain-analyze/explain.result @@ -137,6 +137,11 @@ TQL EXPLAIN VERBOSE (0, 10, '5s') test; |_|_PromSeriesDivideExec: tags=["k"], statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]_| |_|_MergeScanExec: REDACTED |_|_| +| physical_plan after parallelize_scan_| PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| +|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_| +|_|_PromSeriesDivideExec: tags=["k"]_| +|_|_MergeScanExec: REDACTED +|_|_| | physical_plan after OutputRequirements_| OutputRequirementExec_| |_|_PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j]_| |_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false]_|