From 01a73105b8e5441ee66170cf3816085f68d9019e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 13 Apr 2026 16:27:53 +0800 Subject: [PATCH] feat: use partition range cache in scan (#7873) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: use range cache in scan Signed-off-by: evenyag * refactor: rename dedup to skip_dedup Signed-off-by: evenyag * feat: use background concat for buffered batches Signed-off-by: evenyag * chore: fmt Signed-off-by: evenyag * fix: store permits Signed-off-by: evenyag * fix: fix potential panic Signed-off-by: evenyag * fix: skip range-cache wrapping when cache is disabled Signed-off-by: evenyag * fix: avoid potential deadlock Deadlock Chain 1. Range-level merge tasks: Each concurrent build_flat_partition_range_read (line 494-506) calls build_flat_reader_from_sources → create_parallel_flat_sources → spawn_flat_scan_task. These background tasks loop: acquire permit → input.next() → release permit. 2. Final merge tasks: After all range tasks return streams (line 509-511), the distributor calls build_flat_reader_from_sources again (line 520-527) → create_parallel_flat_sources → more spawn_flat_scan_task tasks. These also loop: acquire permit → input.next() → release permit. 3. Circular wait: The final merge tasks' input.next() reads from ReceiverStreams backed by range-level merge tasks. If all num_partitions permits are held by final merge tasks blocked on input.next(), the range-level merge tasks can't acquire permits to produce data → deadlock. Signed-off-by: evenyag * test: add test for small permits Signed-off-by: evenyag * feat: use avg batch size for channel size Signed-off-by: evenyag * test: fix test Signed-off-by: evenyag * chore: address review comments Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/cache.rs | 70 ++++ src/mito2/src/engine/scan_test.rs | 96 +++++ src/mito2/src/read/range_cache.rs | 658 +++++++++++++++++++----------- src/mito2/src/read/scan_util.rs | 139 ++++++- src/mito2/src/read/seq_scan.rs | 83 +++- src/mito2/src/read/series_scan.rs | 85 ++-- 6 files changed, 845 insertions(+), 286 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 35db74eee6..5d2559cba1 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -28,6 +28,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::Bytes; +use common_base::readable_size::ReadableSize; use common_telemetry::warn; use datatypes::arrow::record_batch::RecordBatch; use datatypes::value::Value; @@ -72,6 +73,46 @@ const INDEX_TYPE: &str = "index"; const SELECTOR_RESULT_TYPE: &str = "selector_result"; /// Metrics type key for range scan result cache. const RANGE_RESULT_TYPE: &str = "range_result"; +const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512); +const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1); + +#[derive(Debug)] +pub(crate) struct RangeResultMemoryLimiter { + semaphore: Arc, + permit_bytes: usize, +} + +impl Default for RangeResultMemoryLimiter { + fn default() -> Self { + Self::new( + RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize, + RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize, + ) + } +} + +impl RangeResultMemoryLimiter { + pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self { + let permit_bytes = permit_bytes.max(1); + let permits = limit_bytes.div_ceil(permit_bytes).max(1); + Self { + semaphore: Arc::new(tokio::sync::Semaphore::new(permits)), + permit_bytes, + } + } + + pub(crate) fn permit_bytes(&self) -> usize { + self.permit_bytes + } + + pub(crate) async fn acquire( + &self, + bytes: usize, + ) -> std::result::Result, tokio::sync::AcquireError> { + let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32; + self.semaphore.acquire_many(permits).await + } +} /// Cached SST metadata combines the parquet footer with the decoded region metadata. /// @@ -373,6 +414,23 @@ impl CacheStrategy { } } + /// Returns true if the range result cache is enabled. + pub(crate) fn has_range_result_cache(&self) -> bool { + match self { + CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(), + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false, + } + } + + pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc> { + match self { + CacheStrategy::EnableAll(cache_manager) => { + Some(cache_manager.range_result_memory_limiter()) + } + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } + /// Calls [CacheManager::write_cache()]. /// It returns None if the strategy is [CacheStrategy::Disabled]. pub fn write_cache(&self) -> Option<&WriteCacheRef> { @@ -476,6 +534,8 @@ pub struct CacheManager { selector_result_cache: Option, /// Cache for range scan outputs in flat format. range_result_cache: Option, + /// Shared memory limiter for async range-result cache tasks. + range_result_memory_limiter: Arc, /// Cache for index result. index_result_cache: Option, } @@ -735,6 +795,15 @@ impl CacheManager { } } + /// Returns true if the range result cache is enabled. + pub(crate) fn has_range_result_cache(&self) -> bool { + self.range_result_cache.is_some() + } + + pub(crate) fn range_result_memory_limiter(&self) -> &Arc { + &self.range_result_memory_limiter + } + /// Gets the write cache. pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { self.write_cache.as_ref() @@ -969,6 +1038,7 @@ impl CacheManagerBuilder { puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, range_result_cache, + range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()), index_result_cache, } } diff --git a/src/mito2/src/engine/scan_test.rs b/src/mito2/src/engine/scan_test.rs index 119b4493fd..a39761ad01 100644 --- a/src/mito2/src/engine/scan_test.rs +++ b/src/mito2/src/engine/scan_test.rs @@ -403,3 +403,99 @@ fn collect_and_assert_partition_rows( actual_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2))); actual_rows } + +/// Tests series scan with multiple partition ranges (each with multiple overlapping sources) +/// and small semaphore permits (controlled by num_partitions). +#[tokio::test] +async fn test_series_scan_flat_small_permits() { + let mut env = TestEnv::with_prefix("test_series_scan_small_permits").await; + let engine = env + .create_engine(MitoConfig { + default_flat_format: true, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .insert_option("compaction.twcs.time_window", "1h") + .build(); + let column_schemas = test_util::rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Create overlapping SSTs in each time window so partition ranges have multiple sources. + let put_flush_rows = async |start, end| { + let rows = Rows { + schema: column_schemas.clone(), + rows: test_util::build_rows(start, end), + }; + test_util::put_rows(&engine, region_id, rows).await; + test_util::flush_region(&engine, region_id, None).await; + }; + // Window 0 (0s-999s): 3 overlapping SSTs + put_flush_rows(0, 3).await; + put_flush_rows(1, 5).await; + put_flush_rows(3, 7).await; + // Window 1 (3600s-4599s): 2 overlapping SSTs + put_flush_rows(3600, 3603).await; + put_flush_rows(3601, 3605).await; + // Window 2 (7200s-8199s): 2 overlapping SSTs + put_flush_rows(7200, 7203).await; + put_flush_rows(7201, 7204).await; + + let mut expected_rows = Vec::new(); + for value in [ + 0_i64, 1, 2, 3, 4, 5, 6, 3600, 3601, 3602, 3603, 3604, 7200, 7201, 7202, 7203, + ] { + expected_rows.push((value.to_string(), value as f64, value * 1000)); + } + expected_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2))); + + // Test with different semaphore sizes (num_partitions controls Semaphore::new(num_partitions)). + for num_partitions in [1, 2] { + let request = ScanRequest { + distribution: Some(TimeSeriesDistribution::PerSeries), + ..Default::default() + }; + let scanner = engine.scanner(region_id, request).await.unwrap(); + let Scanner::Series(mut scanner) = scanner else { + panic!("Scanner should be series scan"); + }; + + // Collect all partition ranges and redistribute into `num_partitions` partitions. + let raw_ranges: Vec<_> = scanner + .properties() + .partitions + .iter() + .flatten() + .cloned() + .collect(); + assert!( + raw_ranges.len() >= 3, + "expected at least 3 partition ranges, got {}", + raw_ranges.len() + ); + + let mut new_ranges = vec![vec![]; num_partitions]; + for (i, range) in raw_ranges.into_iter().enumerate() { + new_ranges[i % num_partitions].push(range); + } + scanner + .prepare(PrepareRequest { + ranges: Some(new_ranges), + ..Default::default() + }) + .unwrap(); + + let actual_rows = collect_partition_rows_round_robin(&scanner, num_partitions).await; + assert_eq!( + expected_rows, actual_rows, + "mismatch with num_partitions={num_partitions}" + ); + } +} diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 2431a21f6a..1daaa6399b 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -18,22 +18,28 @@ use std::mem; use std::sync::Arc; use async_stream::try_stream; +use common_telemetry::warn; use common_time::range::TimestampRange; -use datatypes::arrow::array::{Array, AsArray, DictionaryArray}; -use datatypes::arrow::datatypes::UInt32Type; +use datatypes::arrow::compute::concat_batches; use datatypes::arrow::record_batch::RecordBatch; use datatypes::prelude::ConcreteDataType; use futures::TryStreamExt; +use snafu::ResultExt; use store_api::region_engine::PartitionRange; use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector}; +use tokio::sync::{mpsc, oneshot}; use crate::cache::CacheStrategy; +use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu}; use crate::read::BoxedRecordBatchStream; use crate::read::scan_region::StreamContext; use crate::read::scan_util::PartitionMetrics; use crate::region::options::MergeMode; use crate::sst::file::FileTimeRange; -use crate::sst::parquet::flat_format::primary_key_column_index; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; + +const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 2 * 1024 * 1024; +const RANGE_CACHE_SKIP_BYTES: usize = 512 * 1024 * 1024; /// Fingerprint of the scan request fields that affect partition range cache reuse. /// @@ -187,29 +193,48 @@ impl RangeScanCacheKey { } /// Cached result for one range scan. +#[derive(Debug)] +pub(crate) struct CachedBatchSlice { + batch: RecordBatch, + slice_lengths: Vec, +} + +impl CachedBatchSlice { + fn metadata_size(&self) -> usize { + self.slice_lengths.capacity() * mem::size_of::() + } +} + pub(crate) struct RangeScanCacheValue { - pub(crate) batches: Vec, - /// Precomputed size of all batches, accounting for shared dictionary values. + cached_batches: Vec, + /// Precomputed size of all compacted batches. estimated_batches_size: usize, } impl RangeScanCacheValue { - pub(crate) fn new(batches: Vec, estimated_batches_size: usize) -> Self { + pub(crate) fn new( + cached_batches: Vec, + estimated_batches_size: usize, + ) -> Self { Self { - batches, + cached_batches, estimated_batches_size, } } pub(crate) fn estimated_size(&self) -> usize { mem::size_of::() - + self.batches.capacity() * mem::size_of::() + + self.cached_batches.capacity() * mem::size_of::() + + self + .cached_batches + .iter() + .map(CachedBatchSlice::metadata_size) + .sum::() + self.estimated_batches_size } } /// Row groups and whether all sources are file-only for a partition range. -#[allow(dead_code)] pub(crate) struct PartitionRangeRowGroups { /// Sorted (file_id, row_group_index) pairs. pub(crate) row_groups: Vec<(FileId, i64)>, @@ -217,7 +242,6 @@ pub(crate) struct PartitionRangeRowGroups { } /// Collects (file_id, row_group_index) pairs from a partition range's row group indices. -#[allow(dead_code)] pub(crate) fn collect_partition_range_row_groups( stream_ctx: &StreamContext, part_range: &PartitionRange, @@ -244,11 +268,14 @@ pub(crate) fn collect_partition_range_row_groups( } /// Builds a cache key for the given partition range if it is eligible for caching. -#[allow(dead_code)] pub(crate) fn build_range_cache_key( stream_ctx: &StreamContext, part_range: &PartitionRange, ) -> Option { + if !stream_ctx.input.cache_strategy.has_range_result_cache() { + return None; + } + let fingerprint = stream_ctx.scan_fingerprint.as_ref()?; // Dyn filters can change at runtime, so we can't cache when they're present. @@ -283,7 +310,6 @@ pub(crate) fn build_range_cache_key( }) } -#[allow(dead_code)] fn query_time_range_covers_partition_range( query_time_range: Option<&TimestampRange>, partition_time_range: FileTimeRange, @@ -297,117 +323,232 @@ fn query_time_range_covers_partition_range( } /// Returns a stream that replays cached record batches. -#[allow(dead_code)] pub(crate) fn cached_flat_range_stream(value: Arc) -> BoxedRecordBatchStream { - Box::pin(futures::stream::iter( - value.batches.clone().into_iter().map(Ok), - )) + Box::pin(try_stream! { + for cached_batch in &value.cached_batches { + let mut offset = 0; + for &len in &cached_batch.slice_lengths { + yield cached_batch.batch.slice(offset, len); + offset += len; + } + } + }) } -/// Returns true if two primary key dictionary arrays share the same underlying -/// values buffers by pointer comparison. -/// -/// The primary key column is always `DictionaryArray` with `Binary` values. -fn pk_values_ptr_eq(a: &DictionaryArray, b: &DictionaryArray) -> bool { - let a = a.values().as_binary::(); - let b = b.values().as_binary::(); - let values_eq = a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets()); - match (a.nulls(), b.nulls()) { - (Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()), - (None, None) => values_eq, - _ => false, +enum CacheConcatCommand { + Compact(Vec), + Finish { + pending: Vec, + key: RangeScanCacheKey, + cache_strategy: CacheStrategy, + part_metrics: PartitionMetrics, + result_tx: Option>>>, + }, +} + +#[derive(Default)] +struct CacheConcatState { + cached_batches: Vec, + estimated_size: usize, +} + +impl CacheConcatState { + async fn compact( + &mut self, + batches: Vec, + limiter: &crate::cache::RangeResultMemoryLimiter, + ) -> Result<()> { + if batches.is_empty() { + return Ok(()); + } + + let input_size = batches + .iter() + .map(RecordBatch::get_array_memory_size) + .sum::(); + let _permit = limiter.acquire(input_size).await.map_err(|_| { + UnexpectedSnafu { + reason: "range result memory limiter is unexpectedly closed", + } + .build() + })?; + + let compacted = compact_record_batches(batches)?; + self.estimated_size += compacted.batch.get_array_memory_size(); + self.cached_batches.push(compacted); + Ok(()) + } + + fn finish(self) -> RangeScanCacheValue { + RangeScanCacheValue::new(self.cached_batches, self.estimated_size) } } -/// Buffers record batches for caching, tracking memory size while deduplicating -/// shared dictionary values across batches. -/// -/// Uses the primary key column as a proxy to detect dictionary sharing: if the PK -/// column's dictionary values are pointer-equal across batches, we assume all -/// dictionary columns share their values and deduct the total dictionary values size. -struct CacheBatchBuffer { +fn compact_record_batches(batches: Vec) -> Result { + debug_assert!(!batches.is_empty()); + + let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect(); + build_cached_batch_slice(batches, slice_lengths) +} + +fn build_cached_batch_slice( batches: Vec, - /// Running total of batch memory. - total_size: usize, - /// The first batch's PK dictionary array, for pointer comparison. - /// `None` if no dictionary PK column exists or no batch has been added yet. - first_pk_dict: Option>, - /// Sum of `get_array_memory_size()` of all dictionary value arrays from the first batch. - total_dict_values_size: usize, - /// Whether the PK dictionary is still shared across all batches seen so far. - shared: bool, + slice_lengths: Vec, +) -> Result { + let batch = if batches.len() == 1 { + batches.into_iter().next().unwrap() + } else { + let schema = batches[0].schema(); + concat_batches(&schema, &batches).context(ComputeArrowSnafu)? + }; + + Ok(CachedBatchSlice { + batch, + slice_lengths, + }) +} + +async fn run_cache_concat_task( + mut rx: mpsc::UnboundedReceiver, + limiter: Arc, +) { + let mut state = CacheConcatState::default(); + + while let Some(cmd) = rx.recv().await { + match cmd { + CacheConcatCommand::Compact(batches) => { + if let Err(err) = state.compact(batches, &limiter).await { + warn!(err; "Failed to compact range cache batches"); + return; + } + } + CacheConcatCommand::Finish { + pending, + key, + cache_strategy, + part_metrics, + result_tx, + } => { + let result = state + .compact(pending, &limiter) + .await + .map(|()| state.finish()); + if let Err(err) = &result { + warn!(err; "Failed to finalize range cache batches"); + } + + let value = result.ok().map(Arc::new); + if let Some(value) = &value { + part_metrics + .inc_range_cache_size(key.estimated_size() + value.estimated_size()); + cache_strategy.put_range_result(key, value.clone()); + } + if let Some(tx) = result_tx { + let _ = tx.send(value); + } + return; + } + } + } +} + +struct CacheBatchBuffer { + buffered_batches: Vec, + buffered_rows: usize, + buffered_size: usize, + total_weight: usize, + sender: Option>, } impl CacheBatchBuffer { - fn new() -> Self { + fn new(cache_strategy: &CacheStrategy) -> Self { + let sender = cache_strategy.range_result_memory_limiter().map(|limiter| { + let (tx, rx) = mpsc::unbounded_channel(); + common_runtime::spawn_global(run_cache_concat_task(rx, limiter.clone())); + tx + }); + Self { - batches: Vec::new(), - total_size: 0, - first_pk_dict: None, - total_dict_values_size: 0, - shared: true, + buffered_batches: Vec::new(), + buffered_rows: 0, + buffered_size: 0, + total_weight: 0, + sender, } } - fn push(&mut self, batch: RecordBatch) { - if self.batches.is_empty() { - self.init_first_batch(&batch); - } else { - self.add_subsequent_batch(&batch); + fn push(&mut self, batch: RecordBatch) -> Result<()> { + if self.sender.is_none() { + return Ok(()); } - self.batches.push(batch); - } - fn init_first_batch(&mut self, batch: &RecordBatch) { - self.total_size += batch.get_array_memory_size(); - - let pk_col_idx = primary_key_column_index(batch.num_columns()); - let mut total_dict_values_size = 0; - for col_idx in 0..batch.num_columns() { - let col = batch.column(col_idx); - if let Some(dict) = col.as_any().downcast_ref::>() { - total_dict_values_size += dict.values().get_array_memory_size(); - if col_idx == pk_col_idx { - self.first_pk_dict = Some(dict.clone()); - } - } - } - self.total_dict_values_size = total_dict_values_size; - } - - fn add_subsequent_batch(&mut self, batch: &RecordBatch) { let batch_size = batch.get_array_memory_size(); - - if self.shared - && let Some(first_pk_dict) = &self.first_pk_dict - { - let pk_col_idx = primary_key_column_index(batch.num_columns()); - let col = batch.column(pk_col_idx); - if let Some(dict) = col.as_any().downcast_ref::>() - && pk_values_ptr_eq(first_pk_dict, dict) - { - // PK dict is shared, deduct all dict values sizes. - self.total_size += batch_size - self.total_dict_values_size; - return; - } - // Dictionary diverged. - self.shared = false; + self.total_weight += batch_size; + if self.total_weight > RANGE_CACHE_SKIP_BYTES { + self.buffered_batches.clear(); + self.buffered_rows = 0; + self.buffered_size = 0; + self.sender = None; + return Ok(()); } - self.total_size += batch_size; + self.buffered_rows += batch.num_rows(); + self.buffered_size += batch_size; + self.buffered_batches.push(batch); + + if self.buffered_rows > DEFAULT_READ_BATCH_SIZE + || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES + { + self.notify_compact(); + } + + Ok(()) } - fn estimated_batches_size(&self) -> usize { - self.total_size + fn notify_compact(&mut self) { + if self.buffered_batches.is_empty() || self.sender.is_none() { + return; + } + + let batches = mem::take(&mut self.buffered_batches); + self.buffered_rows = 0; + self.buffered_size = 0; + + let Some(sender) = &self.sender else { + return; + }; + if sender.send(CacheConcatCommand::Compact(batches)).is_err() { + self.sender = None; + } } - fn into_batches(self) -> Vec { - self.batches + fn finish( + mut self, + key: RangeScanCacheKey, + cache_strategy: CacheStrategy, + part_metrics: PartitionMetrics, + result_tx: Option>>>, + ) { + let Some(sender) = self.sender.take() else { + return; + }; + + if sender + .send(CacheConcatCommand::Finish { + pending: mem::take(&mut self.buffered_batches), + key, + cache_strategy, + part_metrics, + result_tx, + }) + .is_err() + { + self.sender = None; + } } } /// Wraps a stream to cache its output for future range cache hits. -#[allow(dead_code)] pub(crate) fn cache_flat_range_stream( mut stream: BoxedRecordBatchStream, cache_strategy: CacheStrategy, @@ -415,17 +556,13 @@ pub(crate) fn cache_flat_range_stream( part_metrics: PartitionMetrics, ) -> BoxedRecordBatchStream { Box::pin(try_stream! { - let mut buffer = CacheBatchBuffer::new(); + let mut buffer = CacheBatchBuffer::new(&cache_strategy); while let Some(batch) = stream.try_next().await? { - buffer.push(batch.clone()); + buffer.push(batch.clone())?; yield batch; } - let estimated_size = buffer.estimated_batches_size(); - let batches = buffer.into_batches(); - let value = Arc::new(RangeScanCacheValue::new(batches, estimated_size)); - part_metrics.inc_range_cache_size(key.estimated_size() + value.estimated_size()); - cache_strategy.put_range_result(key, value); + buffer.finish(key, cache_strategy, part_metrics, None); }) } @@ -486,10 +623,11 @@ mod tests { use common_time::Timestamp; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; + use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_common::ScalarValue; use datafusion_expr::{Expr, col, lit}; use smallvec::smallvec; - use store_api::storage::FileId; + use store_api::storage::{FileId, RegionId}; use super::*; use crate::cache::CacheManager; @@ -508,6 +646,44 @@ mod tests { )) } + fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) { + let region_id = RegionId::new(1, 1); + let key = RangeScanCacheKey { + region_id, + row_groups: vec![], + scan: ScanRequestFingerprintBuilder { + read_column_ids: vec![], + read_column_types: vec![], + filters: vec![], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: false, + merge_mode: MergeMode::LastRow, + partition_expr_version: 0, + } + .build(), + }; + + let metrics_set = ExecutionPlanMetricsSet::new(); + let part_metrics = + PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set); + + assert!(strategy.get_range_result(&key).is_none()); + (key, part_metrics) + } + + async fn finish_cache_batch_buffer( + buffer: CacheBatchBuffer, + key: RangeScanCacheKey, + cache_strategy: CacheStrategy, + part_metrics: PartitionMetrics, + ) -> Option> { + let (tx, rx) = oneshot::channel(); + buffer.finish(key, cache_strategy, part_metrics, Some(tx)); + rx.await.context(crate::error::RecvSnafu).ok().flatten() + } + async fn new_stream_context( filters: Vec, query_time_range: Option, @@ -687,169 +863,175 @@ mod tests { ); } - /// Creates a test schema with 5 columns where the primary key dictionary column - /// is at index 2 (`num_columns - 3`), matching the flat format layout. - /// - /// Layout: `[field0: Int64, field1: Int64, pk: Dictionary, ts: Int64, seq: Int64]` - fn dict_test_schema() -> Arc { + fn test_schema() -> Arc { use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; - Arc::new(Schema::new(vec![ - Field::new("field0", ArrowDataType::Int64, false), - Field::new("field1", ArrowDataType::Int64, false), - Field::new( - "pk", - ArrowDataType::Dictionary( - Box::new(ArrowDataType::UInt32), - Box::new(ArrowDataType::Binary), - ), - false, - ), - Field::new("ts", ArrowDataType::Int64, false), - Field::new("seq", ArrowDataType::Int64, false), - ])) + + Arc::new(Schema::new(vec![Field::new( + "value", + ArrowDataType::Int64, + false, + )])) } - /// Helper to create a record batch with a dictionary column at the primary key position. - fn make_dict_batch( - schema: Arc, - dict_values: &datatypes::arrow::array::BinaryArray, - keys: &[u32], - int_values: &[i64], - ) -> RecordBatch { - use datatypes::arrow::array::{Int64Array, UInt32Array}; + fn make_batch(values: &[i64]) -> RecordBatch { + use datatypes::arrow::array::Int64Array; - let key_array = UInt32Array::from(keys.to_vec()); - let dict_array: DictionaryArray = - DictionaryArray::new(key_array, Arc::new(dict_values.clone())); - let int_array = Int64Array::from(int_values.to_vec()); - let zeros = Int64Array::from(vec![0i64; int_values.len()]); RecordBatch::try_new( - schema, - vec![ - Arc::new(zeros.clone()), - Arc::new(int_array), - Arc::new(dict_array), - Arc::new(zeros.clone()), - Arc::new(zeros), - ], + test_schema(), + vec![Arc::new(Int64Array::from(values.to_vec()))], ) .unwrap() } - /// Computes the total `get_array_memory_size()` of all dictionary value arrays in a batch. - fn compute_total_dict_values_size(batch: &RecordBatch) -> usize { - batch - .columns() - .iter() - .filter_map(|col| { - col.as_any() - .downcast_ref::>() - .map(|dict| dict.values().get_array_memory_size()) - }) - .sum() - } - - #[test] - fn cache_batch_buffer_empty() { - let buffer = CacheBatchBuffer::new(); - assert_eq!(buffer.estimated_batches_size(), 0); - assert!(buffer.into_batches().is_empty()); - } - - #[test] - fn cache_batch_buffer_single_batch() { + fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch { use datatypes::arrow::array::BinaryArray; + use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; - let schema = dict_test_schema(); - let dict_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]); - let batch = make_dict_batch(schema, &dict_values, &[0, 1, 2], &[10, 20, 30]); + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + ArrowDataType::Binary, + false, + )])); + let payload = vec![b'x'; bytes_per_row]; + let values = (0..rows).map(|_| payload.as_slice()).collect::>(); - let full_size = batch.get_array_memory_size(); - - let mut buffer = CacheBatchBuffer::new(); - buffer.push(batch); - assert_eq!(buffer.estimated_batches_size(), full_size); - assert_eq!(buffer.into_batches().len(), 1); + RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap() } #[test] - fn cache_batch_buffer_shared_dictionary() { - use datatypes::arrow::array::BinaryArray; + fn compact_record_batches_keeps_original_boundaries() { + let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])]; - let schema = dict_test_schema(); - let dict_values = BinaryArray::from_vec(vec![b"alpha", b"beta", b"gamma"]); + let compacted = compact_record_batches(batches).unwrap(); - // Two batches sharing the same dictionary values array. - let batch1 = make_dict_batch(schema.clone(), &dict_values, &[0, 1], &[10, 20]); - let batch2 = make_dict_batch(schema, &dict_values, &[1, 2], &[30, 40]); + assert_eq!(compacted.batch.num_rows(), 5); + assert_eq!(compacted.slice_lengths, vec![2, 1, 2]); + } - let batch1_full = batch1.get_array_memory_size(); - let batch2_full = batch2.get_array_memory_size(); + #[tokio::test] + async fn cached_flat_range_stream_replays_original_batches() { + let value = Arc::new(RangeScanCacheValue::new( + vec![CachedBatchSlice { + batch: make_batch(&[1, 2, 3]), + slice_lengths: vec![2, 1], + }], + make_batch(&[1, 2, 3]).get_array_memory_size(), + )); - // The total dictionary values size that should be deduplicated for the second batch. - let dict_values_size = compute_total_dict_values_size(&batch2); + let replayed = cached_flat_range_stream(value) + .try_collect::>() + .await + .unwrap(); - let mut buffer = CacheBatchBuffer::new(); - buffer.push(batch1); - buffer.push(batch2); + assert_eq!(replayed.len(), 2); + assert_eq!(replayed[0].num_rows(), 2); + assert_eq!(replayed[1].num_rows(), 1); + } - // Second batch's dict values should not be counted again. + #[tokio::test] + async fn cache_batch_buffer_finishes_pending_batches() { + let strategy = test_cache_strategy(); + let batch = make_batch(&[1, 2, 3]); + let expected_size = batch.get_array_memory_size(); + let (key, part_metrics) = test_cache_context(&strategy); + + let mut buffer = CacheBatchBuffer::new(&strategy); + buffer.push(batch).unwrap(); + + let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics) + .await + .unwrap(); + assert_eq!(value.cached_batches.len(), 1); + assert_eq!(value.cached_batches[0].slice_lengths, vec![3]); + assert_eq!(value.estimated_batches_size, expected_size); + assert!(Arc::ptr_eq( + &value, + &strategy.get_range_result(&key).unwrap() + )); + } + + #[tokio::test] + async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() { + let strategy = test_cache_strategy(); + let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]); + let (key, part_metrics) = test_cache_context(&strategy); + + let mut buffer = CacheBatchBuffer::new(&strategy); + buffer.push(batch.clone()).unwrap(); + buffer.push(batch).unwrap(); + + assert_eq!(buffer.buffered_rows, 0); + assert!(buffer.buffered_batches.is_empty()); + + let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics) + .await + .unwrap(); + assert_eq!(value.cached_batches.len(), 1); assert_eq!( - buffer.estimated_batches_size(), - batch1_full + batch2_full - dict_values_size + value.cached_batches[0].slice_lengths, + vec![ + DEFAULT_READ_BATCH_SIZE / 2 + 1, + DEFAULT_READ_BATCH_SIZE / 2 + 1 + ] ); - assert_eq!(buffer.into_batches().len(), 2); } - #[test] - fn cache_batch_buffer_non_shared_dictionary() { - use datatypes::arrow::array::BinaryArray; + #[tokio::test] + async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() { + let strategy = test_cache_strategy(); + let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096); + let (key, part_metrics) = test_cache_context(&strategy); - let schema = dict_test_schema(); - let dict_values1 = BinaryArray::from_vec(vec![b"a", b"b"]); - let dict_values2 = BinaryArray::from_vec(vec![b"x", b"y"]); + let mut buffer = CacheBatchBuffer::new(&strategy); + buffer.push(large_batch.clone()).unwrap(); - let batch1 = make_dict_batch(schema.clone(), &dict_values1, &[0, 1], &[10, 20]); - let batch2 = make_dict_batch(schema, &dict_values2, &[0, 1], &[30, 40]); + assert_eq!(buffer.buffered_rows, 0); + assert!(buffer.buffered_batches.is_empty()); - let batch1_full = batch1.get_array_memory_size(); - let batch2_full = batch2.get_array_memory_size(); - - let mut buffer = CacheBatchBuffer::new(); - buffer.push(batch1); - buffer.push(batch2); - - // Different dictionaries: full size for both. - assert_eq!(buffer.estimated_batches_size(), batch1_full + batch2_full); - } - - #[test] - fn cache_batch_buffer_shared_then_diverged() { - use datatypes::arrow::array::BinaryArray; - - let schema = dict_test_schema(); - let shared_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]); - let different_values = BinaryArray::from_vec(vec![b"x", b"y"]); - - let batch1 = make_dict_batch(schema.clone(), &shared_values, &[0], &[1]); - let batch2 = make_dict_batch(schema.clone(), &shared_values, &[1], &[2]); - let batch3 = make_dict_batch(schema, &different_values, &[0], &[3]); - - let size1 = batch1.get_array_memory_size(); - let size2 = batch2.get_array_memory_size(); - let size3 = batch3.get_array_memory_size(); - - let dict_values_size = compute_total_dict_values_size(&batch2); - - let mut buffer = CacheBatchBuffer::new(); - buffer.push(batch1); - buffer.push(batch2); - buffer.push(batch3); - - // batch2 shares dict with batch1 (dedup), batch3 does not (full size). + let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics) + .await + .unwrap(); + assert_eq!(value.cached_batches.len(), 1); assert_eq!( - buffer.estimated_batches_size(), - size1 + (size2 - dict_values_size) + size3 + value.cached_batches[0].slice_lengths, + vec![large_batch.num_rows()] + ); + } + + #[tokio::test] + async fn cache_batch_buffer_uses_compacted_size_for_weight() { + let strategy = test_cache_strategy(); + let batch1 = make_batch(&[1, 2]); + let batch2 = make_batch(&[3, 4]); + let (key, part_metrics) = test_cache_context(&strategy); + let expected = concat_batches(&test_schema(), &[batch1.clone(), batch2.clone()]) + .unwrap() + .get_array_memory_size(); + + let mut buffer = CacheBatchBuffer::new(&strategy); + buffer.push(batch1).unwrap(); + buffer.push(batch2).unwrap(); + + let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics) + .await + .unwrap(); + assert_eq!(value.estimated_batches_size, expected); + } + + #[tokio::test] + async fn cache_batch_buffer_skips_cache_when_weight_exceeds_limit() { + let strategy = test_cache_strategy(); + let (key, part_metrics) = test_cache_context(&strategy); + let mut buffer = CacheBatchBuffer::new(&strategy); + buffer.total_weight = RANGE_CACHE_SKIP_BYTES; + + buffer.push(make_batch(&[1])).unwrap(); + + assert!(buffer.sender.is_none()); + assert!( + finish_cache_batch_buffer(buffer, key, strategy, part_metrics) + .await + .is_none() ); } } diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 8fc946b3d3..80563f32a9 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -1258,13 +1258,25 @@ pub(crate) fn should_split_flat_batches_for_merge( // This is a file range. let file_index = index.index - stream_ctx.input.num_memtables(); let file = &stream_ctx.input.files[file_index]; - if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 { + let file_meta = file.meta_ref(); + if file_meta.level == 0 { + // Always split level 0 files. + num_files_to_split += 1; + continue; + } else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 { // If the file doesn't have enough rows, or the number of series is unavailable, skips it. continue; } - debug_assert!(file.meta_ref().num_rows > 0); - if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) { + debug_assert!(file_meta.num_rows > 0); + if !can_split_series(file_meta.num_rows, file_meta.num_series) { // We can't split batches in a file. + common_telemetry::trace!( + "Can't split series for file {}, level: {}, num_rows: {}, num_series: {}", + file_meta.file_id, + file_meta.level, + file_meta.num_rows, + file_meta.num_series, + ); return None; } else { num_files_to_split += 1; @@ -1310,14 +1322,108 @@ pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) -> size.clamp(2, 64) } +/// Computes the average estimated rows per batch across multiple range readers. +pub(crate) fn compute_average_batch_size( + estimated_rows_per_batch: impl IntoIterator, +) -> usize { + let mut total = 0usize; + let mut count = 0usize; + for size in estimated_rows_per_batch { + total += size; + count += 1; + } + + if count == 0 { + return DEFAULT_READ_BATCH_SIZE; + } + + (total / count).clamp(1, DEFAULT_READ_BATCH_SIZE) +} + fn can_split_series(num_rows: u64, num_series: u64) -> bool { - assert!(num_series > 0); - assert!(num_rows > 0); + if num_rows == 0 || num_series == 0 { + return false; + } // It doesn't have too many series or it will have enough rows for each batch. num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD } +#[cfg(test)] +mod split_tests { + use std::sync::Arc; + + use common_time::Timestamp; + use smallvec::smallvec; + use store_api::storage::FileId; + + use super::*; + use crate::read::projection::ProjectionMapper; + use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex}; + use crate::read::scan_region::{ScanInput, StreamContext}; + use crate::sst::file::FileHandle; + use crate::test_util::memtable_util::metadata_with_primary_key; + use crate::test_util::scheduler_util::SchedulerEnv; + use crate::test_util::sst_util::sst_file_handle_with_file_id; + + async fn new_stream_context_with_files(files: Vec) -> StreamContext { + let env = SchedulerEnv::new().await; + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(); + let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files); + + StreamContext { + input, + ranges: vec![], + scan_fingerprint: None, + query_start: std::time::Instant::now(), + } + } + + fn single_file_range_meta() -> RangeMeta { + RangeMeta { + time_range: ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + ), + indices: smallvec![SourceIndex { + index: 0, + num_row_groups: 1, + }], + row_group_indices: smallvec![RowGroupIndex { + index: 0, + row_group_index: 0, + }], + num_rows: 1024, + } + } + + #[tokio::test] + async fn should_split_level_zero_file_even_when_series_stats_are_missing() { + let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000) + .meta_ref() + .clone(); + file.level = 0; + file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64; + file.num_row_groups = 1; + file.num_series = 0; + + let file = FileHandle::new(file, crate::test_util::new_noop_file_purger()); + let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await); + + assert!( + should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some() + ); + } + + #[test] + fn can_split_series_returns_false_for_zero_inputs() { + assert!(!can_split_series(0, 1)); + assert!(!can_split_series(1, 0)); + assert!(!can_split_series(0, 0)); + } +} + /// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized /// based on the `explain_verbose` flag. fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics { @@ -1653,6 +1759,7 @@ mod tests { let meta = FileMeta { region_id: RegionId::new(123, 456), file_id: Default::default(), + level: 1, time_range: ( Timestamp::new_millisecond(0), Timestamp::new_millisecond(1000), @@ -1816,4 +1923,26 @@ mod tests { compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2) ); } + + #[test] + fn test_compute_average_batch_size_uses_arithmetic_mean() { + assert_eq!(24, compute_average_batch_size([16, 24, 32])); + } + + #[test] + fn test_compute_average_batch_size_clamps_values() { + assert_eq!( + DEFAULT_READ_BATCH_SIZE, + compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2]) + ); + assert_eq!(1, compute_average_batch_size([0, 1])); + } + + #[test] + fn test_compute_average_batch_size_falls_back_when_empty() { + assert_eq!( + DEFAULT_READ_BATCH_SIZE, + compute_average_batch_size(std::iter::empty()) + ); + } } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 15ab435425..932d382834 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -41,6 +41,9 @@ use crate::read::flat_merge::FlatMergeReader; use crate::read::last_row::FlatLastRowReader; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::range::RangeMeta; +use crate::read::range_cache::{ + build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream, +}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size, @@ -181,19 +184,22 @@ impl SeqScan { sources, None, None, + false, compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE), ) .await } - /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel - /// if possible. + /// Builds a flat reader to read sources that returns RecordBatch. + /// If `semaphore` is provided, reads sources in parallel if possible. + /// If `skip_dedup` is true, the merged stream is returned without applying flat dedup. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn build_flat_reader_from_sources( stream_ctx: &StreamContext, mut sources: Vec, semaphore: Option>, part_metrics: Option<&PartitionMetrics>, + skip_dedup: bool, channel_size: usize, ) -> Result { if let Some(semaphore) = semaphore.as_ref() { @@ -215,7 +221,7 @@ impl SeqScan { FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter) .await?; - let dedup = !stream_ctx.input.append_mode; + let dedup = !skip_dedup && !stream_ctx.input.append_mode; let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter()); let reader = if dedup { match stream_ctx.input.merge_mode { @@ -253,6 +259,62 @@ impl SeqScan { Ok(reader) } + /// Builds a flat read stream for one partition range. + pub(crate) async fn build_flat_partition_range_read( + stream_ctx: &Arc, + part_range: &PartitionRange, + compaction: bool, + part_metrics: &PartitionMetrics, + partition_pruner: Arc, + file_scan_semaphore: Option>, + merge_semaphore: Option>, + ) -> Result<(BoxedRecordBatchStream, usize)> { + let cache_key = build_range_cache_key(stream_ctx, part_range); + + if let Some(key) = cache_key.as_ref() { + if let Some(value) = stream_ctx.input.cache_strategy.get_range_result(key) { + part_metrics.inc_range_cache_hit(); + return Ok((cached_flat_range_stream(value), DEFAULT_READ_BATCH_SIZE)); + } + part_metrics.inc_range_cache_miss(); + } + + let mut sources = Vec::new(); + let split_batch_size = build_flat_sources( + stream_ctx, + part_range, + compaction, + part_metrics, + partition_pruner, + &mut sources, + file_scan_semaphore, + ) + .await?; + let estimated_rows_per_batch = split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE); + let channel_size = compute_parallel_channel_size(estimated_rows_per_batch); + let stream = Self::build_flat_reader_from_sources( + stream_ctx, + sources, + merge_semaphore, + Some(part_metrics), + false, + channel_size, + ) + .await?; + + let stream = match cache_key { + Some(key) => cache_flat_range_stream( + stream, + stream_ctx.input.cache_strategy.clone(), + key, + part_metrics.clone(), + ), + None => stream, + }; + + Ok((stream, estimated_rows_per_batch)) + } + /// Scans the given partition when the part list is set properly. /// Otherwise the returned stream might not contains any data. fn scan_partition_impl( @@ -331,23 +393,16 @@ impl SeqScan { // Scans each part. for part_range in partition_ranges { - let mut sources = Vec::new(); - let split_batch_size = build_flat_sources( + let (mut reader, _) = Self::build_flat_partition_range_read( &stream_ctx, &part_range, compaction, &part_metrics, partition_pruner.clone(), - &mut sources, file_scan_semaphore.clone(), - ).await?; - - let channel_size = compute_parallel_channel_size( - split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE), - ); - let mut reader = - Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size) - .await?; + semaphore.clone(), + ) + .await?; let mut metrics = ScannerMetrics { scan_cost: fetch_start.elapsed(), diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index bf7ed072ab..7883c1d553 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -41,18 +41,18 @@ use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; use tokio::sync::mpsc::{self, Receiver, Sender}; use crate::error::{ - Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu, + Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu, ScanSeriesSnafu, TooManyFilesToReadSnafu, }; use crate::read::ScannerMetrics; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ - PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_parallel_channel_size, + PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size, + compute_parallel_channel_size, }; -use crate::read::seq_scan::{SeqScan, build_flat_sources}; +use crate::read::seq_scan::SeqScan; use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream}; -use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::PrimaryKeyArray; @@ -227,7 +227,8 @@ impl SeriesScan { let (senders, receivers) = new_channel_list(self.properties.num_partitions()); let mut distributor = SeriesDistributor { stream_ctx: self.stream_ctx.clone(), - semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))), + range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))), + final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))), partitions: self.properties.partitions.clone(), pruner: self.pruner.clone(), senders, @@ -420,8 +421,13 @@ impl SeriesScan { struct SeriesDistributor { /// Context for the scan stream. stream_ctx: Arc, - /// Optional semaphore for limiting the number of concurrent scans. - semaphore: Option>, + /// Semaphore for file scanning and range-level merging. + range_semaphore: Option>, + /// Semaphore for the final merge across all range streams. + /// Must be separate from `range_semaphore` to avoid deadlock: final merge tasks + /// hold a permit while waiting for data from range-level merge tasks, which also + /// need permits to produce data. + final_merge_semaphore: Option>, /// Partition ranges to scan. partitions: Vec>, /// Shared pruner for file range building. @@ -483,36 +489,57 @@ impl SeriesDistributor { // build part cost. let mut fetch_start = Instant::now(); - // Scans all parts. - let mut sources = Vec::with_capacity(self.partitions.len()); - let mut min_batch_size: Option = None; + // Builds one deduped stream per partition range, then merges across ranges. + let build_start = Instant::now(); + let mut tasks = Vec::new(); for partition in &self.partitions { - sources.reserve(partition.len()); for part_range in partition { - let split_batch_size = build_flat_sources( - &self.stream_ctx, - part_range, - false, - &part_metrics, - partition_pruner.clone(), - &mut sources, - self.semaphore.clone(), - ) - .await?; - if let Some(size) = split_batch_size { - min_batch_size = Some(min_batch_size.map_or(size, |cur| cur.min(size))); - } + let stream_ctx = self.stream_ctx.clone(); + let part_range = *part_range; + let part_metrics = part_metrics.clone(); + let partition_pruner = partition_pruner.clone(); + let file_scan_semaphore = self.range_semaphore.clone(); + let merge_semaphore = self.range_semaphore.clone(); + tasks.push(common_runtime::spawn_global(async move { + SeqScan::build_flat_partition_range_read( + &stream_ctx, + &part_range, + false, + &part_metrics, + partition_pruner, + file_scan_semaphore, + merge_semaphore, + ) + .await + })); } } - - // Builds a flat reader that merge sources from all parts. + let mut range_streams = Vec::with_capacity(tasks.len()); + let mut estimated_batch_sizes = Vec::with_capacity(tasks.len()); + for task in tasks { + let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??; + range_streams.push(stream); + estimated_batch_sizes.push(estimated_batch_size); + } let channel_size = - compute_parallel_channel_size(min_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE)); + compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes)); + common_telemetry::debug!( + "SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}", + range_streams.len(), + self.stream_ctx.input.region_metadata().region_id, + build_start.elapsed(), + channel_size, + ); + + // Each partition range stream is already deduped, so skip dedup here. + // Use a separate semaphore for the final merge to avoid deadlock with + // range-level merge tasks that share the range_semaphore. let mut reader = SeqScan::build_flat_reader_from_sources( &self.stream_ctx, - sources, - self.semaphore.clone(), + range_streams, + self.final_merge_semaphore.clone(), Some(&part_metrics), + true, channel_size, ) .await?;