refactor: remove per series scan from SeqScan

This commit is contained in:
evenyag
2025-03-28 21:15:41 +08:00
parent c44ba1aa69
commit 17c797a6d0

View File

@@ -30,7 +30,7 @@ use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
use crate::error::{PartitionOutOfRangeSnafu, Result};
@@ -207,10 +207,6 @@ impl SeqScan {
));
}
if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
return self.scan_partition_by_series(metrics_set, partition);
}
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
@@ -307,97 +303,6 @@ impl SeqScan {
Ok(stream)
}
/// Scans all ranges in the given partition and merge by time series.
/// Otherwise the returned stream might not contains any data.
fn scan_partition_by_series(
&self,
metrics_set: &ExecutionPlanMetricsSet,
partition: usize,
) -> Result<SendableRecordBatchStream, BoxedError> {
let stream_ctx = self.stream_ctx.clone();
let semaphore = self.new_semaphore();
let partition_ranges = self.properties.partitions[partition].clone();
let distinguish_range = self.properties.distinguish_partition_range;
let part_metrics = self.new_partition_metrics(metrics_set, partition);
debug_assert!(!self.compaction);
let stream = try_stream! {
part_metrics.on_first_poll();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
));
// Scans all parts.
let mut sources = Vec::with_capacity(partition_ranges.len());
for part_range in partition_ranges {
build_sources(
&stream_ctx,
&part_range,
false,
&part_metrics,
range_builder_list.clone(),
&mut sources,
);
}
// Builds a reader that merge sources from all parts.
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cache = &stream_ctx.input.cache_strategy;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
while let Some(batch) = reader
.next_batch()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?
{
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
metrics.num_rows += batch.num_rows();
debug_assert!(!batch.is_empty());
if batch.is_empty() {
continue;
}
let convert_start = Instant::now();
let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
metrics.convert_cost += convert_start.elapsed();
let yield_start = Instant::now();
yield record_batch;
metrics.yield_cost += yield_start.elapsed();
fetch_start = Instant::now();
}
// Yields an empty part to indicate this range is terminated.
// The query engine can use this to optimize some queries.
if distinguish_range {
let yield_start = Instant::now();
yield stream_ctx.input.mapper.empty_record_batch();
metrics.yield_cost += yield_start.elapsed();
}
metrics.scan_cost += fetch_start.elapsed();
part_metrics.merge_metrics(&metrics);
part_metrics.on_finish();
};
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.stream_ctx.input.mapper.output_schema(),
Box::pin(stream),
));
Ok(stream)
}
fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
if self.properties.target_partitions() > self.properties.num_partitions() {
// We can use additional tasks to read the data if we have more target partitions than actual partitions.
@@ -509,8 +414,8 @@ pub(crate) fn build_sources(
// Gets range meta.
let range_meta = &stream_ctx.ranges[part_range.identifier];
#[cfg(debug_assertions)]
if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
// Compaction or per series distribution expects input sources are not been split.
if compaction {
// Compaction expects input sources are not been split.
debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
// It should scan all row groups.