diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 98929a7ba6..bd04d58941 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties}; -use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector}; +use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; @@ -207,10 +207,6 @@ impl SeqScan { )); } - if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) { - return self.scan_partition_by_series(metrics_set, partition); - } - let stream_ctx = self.stream_ctx.clone(); let semaphore = self.new_semaphore(); let partition_ranges = self.properties.partitions[partition].clone(); @@ -307,97 +303,6 @@ impl SeqScan { Ok(stream) } - /// Scans all ranges in the given partition and merge by time series. - /// Otherwise the returned stream might not contains any data. - fn scan_partition_by_series( - &self, - metrics_set: &ExecutionPlanMetricsSet, - partition: usize, - ) -> Result { - let stream_ctx = self.stream_ctx.clone(); - let semaphore = self.new_semaphore(); - let partition_ranges = self.properties.partitions[partition].clone(); - let distinguish_range = self.properties.distinguish_partition_range; - let part_metrics = self.new_partition_metrics(metrics_set, partition); - debug_assert!(!self.compaction); - - let stream = try_stream! { - part_metrics.on_first_poll(); - - let range_builder_list = Arc::new(RangeBuilderList::new( - stream_ctx.input.num_memtables(), - stream_ctx.input.num_files(), - )); - // Scans all parts. - let mut sources = Vec::with_capacity(partition_ranges.len()); - for part_range in partition_ranges { - build_sources( - &stream_ctx, - &part_range, - false, - &part_metrics, - range_builder_list.clone(), - &mut sources, - ); - } - - // Builds a reader that merge sources from all parts. - let mut reader = - Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let cache = &stream_ctx.input.cache_strategy; - let mut metrics = ScannerMetrics::default(); - let mut fetch_start = Instant::now(); - - while let Some(batch) = reader - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - { - metrics.scan_cost += fetch_start.elapsed(); - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - - debug_assert!(!batch.is_empty()); - if batch.is_empty() { - continue; - } - - let convert_start = Instant::now(); - let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); - let yield_start = Instant::now(); - yield record_batch; - metrics.yield_cost += yield_start.elapsed(); - - fetch_start = Instant::now(); - } - - // Yields an empty part to indicate this range is terminated. - // The query engine can use this to optimize some queries. - if distinguish_range { - let yield_start = Instant::now(); - yield stream_ctx.input.mapper.empty_record_batch(); - metrics.yield_cost += yield_start.elapsed(); - } - - metrics.scan_cost += fetch_start.elapsed(); - part_metrics.merge_metrics(&metrics); - - part_metrics.on_finish(); - }; - - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.stream_ctx.input.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) - } - fn new_semaphore(&self) -> Option> { if self.properties.target_partitions() > self.properties.num_partitions() { // We can use additional tasks to read the data if we have more target partitions than actual partitions. @@ -509,8 +414,8 @@ pub(crate) fn build_sources( // Gets range meta. let range_meta = &stream_ctx.ranges[part_range.identifier]; #[cfg(debug_assertions)] - if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) { - // Compaction or per series distribution expects input sources are not been split. + if compaction { + // Compaction expects input sources are not been split. debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len()); for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() { // It should scan all row groups.