feat: use partition range cache in scan (#7873)

* feat: use range cache in scan

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: rename dedup to skip_dedup

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use background concat for buffered batches

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: store permits

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix potential panic

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: skip range-cache wrapping when cache is disabled

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: avoid potential deadlock

Deadlock Chain

1. Range-level merge tasks: Each concurrent build_flat_partition_range_read (line 494-506) calls
build_flat_reader_from_sources → create_parallel_flat_sources → spawn_flat_scan_task. These
background tasks loop: acquire permit → input.next() → release permit.
2. Final merge tasks: After all range tasks return streams (line 509-511), the distributor calls
build_flat_reader_from_sources again (line 520-527) → create_parallel_flat_sources → more
spawn_flat_scan_task tasks. These also loop: acquire permit → input.next() → release permit.
3. Circular wait: The final merge tasks' input.next() reads from ReceiverStreams backed by
range-level merge tasks. If all num_partitions permits are held by final merge tasks blocked on
input.next(), the range-level merge tasks can't acquire permits to produce data → deadlock.

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for small permits

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: use avg batch size for channel size

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: fix test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address review comments

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-13 16:27:53 +08:00
committed by GitHub
parent 9f7ffb4d26
commit 01a73105b8
6 changed files with 845 additions and 286 deletions

View File

@@ -28,6 +28,7 @@ use std::ops::Range;
use std::sync::Arc;
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::Value;
@@ -72,6 +73,46 @@ const INDEX_TYPE: &str = "index";
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";
const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
#[derive(Debug)]
pub(crate) struct RangeResultMemoryLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
permit_bytes: usize,
}
impl Default for RangeResultMemoryLimiter {
fn default() -> Self {
Self::new(
RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
)
}
}
impl RangeResultMemoryLimiter {
pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
let permit_bytes = permit_bytes.max(1);
let permits = limit_bytes.div_ceil(permit_bytes).max(1);
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
permit_bytes,
}
}
pub(crate) fn permit_bytes(&self) -> usize {
self.permit_bytes
}
pub(crate) async fn acquire(
&self,
bytes: usize,
) -> std::result::Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32;
self.semaphore.acquire_many(permits).await
}
}
/// Cached SST metadata combines the parquet footer with the decoded region metadata.
///
@@ -373,6 +414,23 @@ impl CacheStrategy {
}
}
/// Returns true if the range result cache is enabled.
pub(crate) fn has_range_result_cache(&self) -> bool {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
}
}
pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
Some(cache_manager.range_result_memory_limiter())
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
@@ -476,6 +534,8 @@ pub struct CacheManager {
selector_result_cache: Option<SelectorResultCache>,
/// Cache for range scan outputs in flat format.
range_result_cache: Option<RangeResultCache>,
/// Shared memory limiter for async range-result cache tasks.
range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
/// Cache for index result.
index_result_cache: Option<IndexResultCache>,
}
@@ -735,6 +795,15 @@ impl CacheManager {
}
}
/// Returns true if the range result cache is enabled.
pub(crate) fn has_range_result_cache(&self) -> bool {
self.range_result_cache.is_some()
}
pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
&self.range_result_memory_limiter
}
/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
@@ -969,6 +1038,7 @@ impl CacheManagerBuilder {
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
range_result_cache,
range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()),
index_result_cache,
}
}

View File

@@ -403,3 +403,99 @@ fn collect_and_assert_partition_rows(
actual_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
actual_rows
}
/// Tests series scan with multiple partition ranges (each with multiple overlapping sources)
/// and small semaphore permits (controlled by num_partitions).
#[tokio::test]
async fn test_series_scan_flat_small_permits() {
let mut env = TestEnv::with_prefix("test_series_scan_small_permits").await;
let engine = env
.create_engine(MitoConfig {
default_flat_format: true,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.time_window", "1h")
.build();
let column_schemas = test_util::rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Create overlapping SSTs in each time window so partition ranges have multiple sources.
let put_flush_rows = async |start, end| {
let rows = Rows {
schema: column_schemas.clone(),
rows: test_util::build_rows(start, end),
};
test_util::put_rows(&engine, region_id, rows).await;
test_util::flush_region(&engine, region_id, None).await;
};
// Window 0 (0s-999s): 3 overlapping SSTs
put_flush_rows(0, 3).await;
put_flush_rows(1, 5).await;
put_flush_rows(3, 7).await;
// Window 1 (3600s-4599s): 2 overlapping SSTs
put_flush_rows(3600, 3603).await;
put_flush_rows(3601, 3605).await;
// Window 2 (7200s-8199s): 2 overlapping SSTs
put_flush_rows(7200, 7203).await;
put_flush_rows(7201, 7204).await;
let mut expected_rows = Vec::new();
for value in [
0_i64, 1, 2, 3, 4, 5, 6, 3600, 3601, 3602, 3603, 3604, 7200, 7201, 7202, 7203,
] {
expected_rows.push((value.to_string(), value as f64, value * 1000));
}
expected_rows.sort_by(|a, b| a.0.cmp(&b.0).then(a.2.cmp(&b.2)));
// Test with different semaphore sizes (num_partitions controls Semaphore::new(num_partitions)).
for num_partitions in [1, 2] {
let request = ScanRequest {
distribution: Some(TimeSeriesDistribution::PerSeries),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let Scanner::Series(mut scanner) = scanner else {
panic!("Scanner should be series scan");
};
// Collect all partition ranges and redistribute into `num_partitions` partitions.
let raw_ranges: Vec<_> = scanner
.properties()
.partitions
.iter()
.flatten()
.cloned()
.collect();
assert!(
raw_ranges.len() >= 3,
"expected at least 3 partition ranges, got {}",
raw_ranges.len()
);
let mut new_ranges = vec![vec![]; num_partitions];
for (i, range) in raw_ranges.into_iter().enumerate() {
new_ranges[i % num_partitions].push(range);
}
scanner
.prepare(PrepareRequest {
ranges: Some(new_ranges),
..Default::default()
})
.unwrap();
let actual_rows = collect_partition_rows_round_robin(&scanner, num_partitions).await;
assert_eq!(
expected_rows, actual_rows,
"mismatch with num_partitions={num_partitions}"
);
}
}

View File

@@ -18,22 +18,28 @@ use std::mem;
use std::sync::Arc;
use async_stream::try_stream;
use common_telemetry::warn;
use common_time::range::TimestampRange;
use datatypes::arrow::array::{Array, AsArray, DictionaryArray};
use datatypes::arrow::datatypes::UInt32Type;
use datatypes::arrow::compute::concat_batches;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use futures::TryStreamExt;
use snafu::ResultExt;
use store_api::region_engine::PartitionRange;
use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
use tokio::sync::{mpsc, oneshot};
use crate::cache::CacheStrategy;
use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu};
use crate::read::BoxedRecordBatchStream;
use crate::read::scan_region::StreamContext;
use crate::read::scan_util::PartitionMetrics;
use crate::region::options::MergeMode;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 2 * 1024 * 1024;
const RANGE_CACHE_SKIP_BYTES: usize = 512 * 1024 * 1024;
/// Fingerprint of the scan request fields that affect partition range cache reuse.
///
@@ -187,29 +193,48 @@ impl RangeScanCacheKey {
}
/// Cached result for one range scan.
#[derive(Debug)]
pub(crate) struct CachedBatchSlice {
batch: RecordBatch,
slice_lengths: Vec<usize>,
}
impl CachedBatchSlice {
fn metadata_size(&self) -> usize {
self.slice_lengths.capacity() * mem::size_of::<usize>()
}
}
pub(crate) struct RangeScanCacheValue {
pub(crate) batches: Vec<RecordBatch>,
/// Precomputed size of all batches, accounting for shared dictionary values.
cached_batches: Vec<CachedBatchSlice>,
/// Precomputed size of all compacted batches.
estimated_batches_size: usize,
}
impl RangeScanCacheValue {
pub(crate) fn new(batches: Vec<RecordBatch>, estimated_batches_size: usize) -> Self {
pub(crate) fn new(
cached_batches: Vec<CachedBatchSlice>,
estimated_batches_size: usize,
) -> Self {
Self {
batches,
cached_batches,
estimated_batches_size,
}
}
pub(crate) fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
+ self.batches.capacity() * mem::size_of::<RecordBatch>()
+ self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
+ self
.cached_batches
.iter()
.map(CachedBatchSlice::metadata_size)
.sum::<usize>()
+ self.estimated_batches_size
}
}
/// Row groups and whether all sources are file-only for a partition range.
#[allow(dead_code)]
pub(crate) struct PartitionRangeRowGroups {
/// Sorted (file_id, row_group_index) pairs.
pub(crate) row_groups: Vec<(FileId, i64)>,
@@ -217,7 +242,6 @@ pub(crate) struct PartitionRangeRowGroups {
}
/// Collects (file_id, row_group_index) pairs from a partition range's row group indices.
#[allow(dead_code)]
pub(crate) fn collect_partition_range_row_groups(
stream_ctx: &StreamContext,
part_range: &PartitionRange,
@@ -244,11 +268,14 @@ pub(crate) fn collect_partition_range_row_groups(
}
/// Builds a cache key for the given partition range if it is eligible for caching.
#[allow(dead_code)]
pub(crate) fn build_range_cache_key(
stream_ctx: &StreamContext,
part_range: &PartitionRange,
) -> Option<RangeScanCacheKey> {
if !stream_ctx.input.cache_strategy.has_range_result_cache() {
return None;
}
let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
// Dyn filters can change at runtime, so we can't cache when they're present.
@@ -283,7 +310,6 @@ pub(crate) fn build_range_cache_key(
})
}
#[allow(dead_code)]
fn query_time_range_covers_partition_range(
query_time_range: Option<&TimestampRange>,
partition_time_range: FileTimeRange,
@@ -297,117 +323,232 @@ fn query_time_range_covers_partition_range(
}
/// Returns a stream that replays cached record batches.
#[allow(dead_code)]
pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
Box::pin(futures::stream::iter(
value.batches.clone().into_iter().map(Ok),
))
Box::pin(try_stream! {
for cached_batch in &value.cached_batches {
let mut offset = 0;
for &len in &cached_batch.slice_lengths {
yield cached_batch.batch.slice(offset, len);
offset += len;
}
}
})
}
/// Returns true if two primary key dictionary arrays share the same underlying
/// values buffers by pointer comparison.
///
/// The primary key column is always `DictionaryArray<UInt32Type>` with `Binary` values.
fn pk_values_ptr_eq(a: &DictionaryArray<UInt32Type>, b: &DictionaryArray<UInt32Type>) -> bool {
let a = a.values().as_binary::<i32>();
let b = b.values().as_binary::<i32>();
let values_eq = a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets());
match (a.nulls(), b.nulls()) {
(Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()),
(None, None) => values_eq,
_ => false,
enum CacheConcatCommand {
Compact(Vec<RecordBatch>),
Finish {
pending: Vec<RecordBatch>,
key: RangeScanCacheKey,
cache_strategy: CacheStrategy,
part_metrics: PartitionMetrics,
result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
},
}
#[derive(Default)]
struct CacheConcatState {
cached_batches: Vec<CachedBatchSlice>,
estimated_size: usize,
}
impl CacheConcatState {
async fn compact(
&mut self,
batches: Vec<RecordBatch>,
limiter: &crate::cache::RangeResultMemoryLimiter,
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
let input_size = batches
.iter()
.map(RecordBatch::get_array_memory_size)
.sum::<usize>();
let _permit = limiter.acquire(input_size).await.map_err(|_| {
UnexpectedSnafu {
reason: "range result memory limiter is unexpectedly closed",
}
.build()
})?;
let compacted = compact_record_batches(batches)?;
self.estimated_size += compacted.batch.get_array_memory_size();
self.cached_batches.push(compacted);
Ok(())
}
fn finish(self) -> RangeScanCacheValue {
RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
}
}
/// Buffers record batches for caching, tracking memory size while deduplicating
/// shared dictionary values across batches.
///
/// Uses the primary key column as a proxy to detect dictionary sharing: if the PK
/// column's dictionary values are pointer-equal across batches, we assume all
/// dictionary columns share their values and deduct the total dictionary values size.
struct CacheBatchBuffer {
fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
debug_assert!(!batches.is_empty());
let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
build_cached_batch_slice(batches, slice_lengths)
}
fn build_cached_batch_slice(
batches: Vec<RecordBatch>,
/// Running total of batch memory.
total_size: usize,
/// The first batch's PK dictionary array, for pointer comparison.
/// `None` if no dictionary PK column exists or no batch has been added yet.
first_pk_dict: Option<DictionaryArray<UInt32Type>>,
/// Sum of `get_array_memory_size()` of all dictionary value arrays from the first batch.
total_dict_values_size: usize,
/// Whether the PK dictionary is still shared across all batches seen so far.
shared: bool,
slice_lengths: Vec<usize>,
) -> Result<CachedBatchSlice> {
let batch = if batches.len() == 1 {
batches.into_iter().next().unwrap()
} else {
let schema = batches[0].schema();
concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
};
Ok(CachedBatchSlice {
batch,
slice_lengths,
})
}
async fn run_cache_concat_task(
mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
) {
let mut state = CacheConcatState::default();
while let Some(cmd) = rx.recv().await {
match cmd {
CacheConcatCommand::Compact(batches) => {
if let Err(err) = state.compact(batches, &limiter).await {
warn!(err; "Failed to compact range cache batches");
return;
}
}
CacheConcatCommand::Finish {
pending,
key,
cache_strategy,
part_metrics,
result_tx,
} => {
let result = state
.compact(pending, &limiter)
.await
.map(|()| state.finish());
if let Err(err) = &result {
warn!(err; "Failed to finalize range cache batches");
}
let value = result.ok().map(Arc::new);
if let Some(value) = &value {
part_metrics
.inc_range_cache_size(key.estimated_size() + value.estimated_size());
cache_strategy.put_range_result(key, value.clone());
}
if let Some(tx) = result_tx {
let _ = tx.send(value);
}
return;
}
}
}
}
struct CacheBatchBuffer {
buffered_batches: Vec<RecordBatch>,
buffered_rows: usize,
buffered_size: usize,
total_weight: usize,
sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
}
impl CacheBatchBuffer {
fn new() -> Self {
fn new(cache_strategy: &CacheStrategy) -> Self {
let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
let (tx, rx) = mpsc::unbounded_channel();
common_runtime::spawn_global(run_cache_concat_task(rx, limiter.clone()));
tx
});
Self {
batches: Vec::new(),
total_size: 0,
first_pk_dict: None,
total_dict_values_size: 0,
shared: true,
buffered_batches: Vec::new(),
buffered_rows: 0,
buffered_size: 0,
total_weight: 0,
sender,
}
}
fn push(&mut self, batch: RecordBatch) {
if self.batches.is_empty() {
self.init_first_batch(&batch);
} else {
self.add_subsequent_batch(&batch);
fn push(&mut self, batch: RecordBatch) -> Result<()> {
if self.sender.is_none() {
return Ok(());
}
self.batches.push(batch);
}
fn init_first_batch(&mut self, batch: &RecordBatch) {
self.total_size += batch.get_array_memory_size();
let pk_col_idx = primary_key_column_index(batch.num_columns());
let mut total_dict_values_size = 0;
for col_idx in 0..batch.num_columns() {
let col = batch.column(col_idx);
if let Some(dict) = col.as_any().downcast_ref::<DictionaryArray<UInt32Type>>() {
total_dict_values_size += dict.values().get_array_memory_size();
if col_idx == pk_col_idx {
self.first_pk_dict = Some(dict.clone());
}
}
}
self.total_dict_values_size = total_dict_values_size;
}
fn add_subsequent_batch(&mut self, batch: &RecordBatch) {
let batch_size = batch.get_array_memory_size();
if self.shared
&& let Some(first_pk_dict) = &self.first_pk_dict
{
let pk_col_idx = primary_key_column_index(batch.num_columns());
let col = batch.column(pk_col_idx);
if let Some(dict) = col.as_any().downcast_ref::<DictionaryArray<UInt32Type>>()
&& pk_values_ptr_eq(first_pk_dict, dict)
{
// PK dict is shared, deduct all dict values sizes.
self.total_size += batch_size - self.total_dict_values_size;
return;
}
// Dictionary diverged.
self.shared = false;
self.total_weight += batch_size;
if self.total_weight > RANGE_CACHE_SKIP_BYTES {
self.buffered_batches.clear();
self.buffered_rows = 0;
self.buffered_size = 0;
self.sender = None;
return Ok(());
}
self.total_size += batch_size;
self.buffered_rows += batch.num_rows();
self.buffered_size += batch_size;
self.buffered_batches.push(batch);
if self.buffered_rows > DEFAULT_READ_BATCH_SIZE
|| self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES
{
self.notify_compact();
}
Ok(())
}
fn estimated_batches_size(&self) -> usize {
self.total_size
fn notify_compact(&mut self) {
if self.buffered_batches.is_empty() || self.sender.is_none() {
return;
}
let batches = mem::take(&mut self.buffered_batches);
self.buffered_rows = 0;
self.buffered_size = 0;
let Some(sender) = &self.sender else {
return;
};
if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
self.sender = None;
}
}
fn into_batches(self) -> Vec<RecordBatch> {
self.batches
fn finish(
mut self,
key: RangeScanCacheKey,
cache_strategy: CacheStrategy,
part_metrics: PartitionMetrics,
result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
) {
let Some(sender) = self.sender.take() else {
return;
};
if sender
.send(CacheConcatCommand::Finish {
pending: mem::take(&mut self.buffered_batches),
key,
cache_strategy,
part_metrics,
result_tx,
})
.is_err()
{
self.sender = None;
}
}
}
/// Wraps a stream to cache its output for future range cache hits.
#[allow(dead_code)]
pub(crate) fn cache_flat_range_stream(
mut stream: BoxedRecordBatchStream,
cache_strategy: CacheStrategy,
@@ -415,17 +556,13 @@ pub(crate) fn cache_flat_range_stream(
part_metrics: PartitionMetrics,
) -> BoxedRecordBatchStream {
Box::pin(try_stream! {
let mut buffer = CacheBatchBuffer::new();
let mut buffer = CacheBatchBuffer::new(&cache_strategy);
while let Some(batch) = stream.try_next().await? {
buffer.push(batch.clone());
buffer.push(batch.clone())?;
yield batch;
}
let estimated_size = buffer.estimated_batches_size();
let batches = buffer.into_batches();
let value = Arc::new(RangeScanCacheValue::new(batches, estimated_size));
part_metrics.inc_range_cache_size(key.estimated_size() + value.estimated_size());
cache_strategy.put_range_result(key, value);
buffer.finish(key, cache_strategy, part_metrics, None);
})
}
@@ -486,10 +623,11 @@ mod tests {
use common_time::Timestamp;
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_common::ScalarValue;
use datafusion_expr::{Expr, col, lit};
use smallvec::smallvec;
use store_api::storage::FileId;
use store_api::storage::{FileId, RegionId};
use super::*;
use crate::cache::CacheManager;
@@ -508,6 +646,44 @@ mod tests {
))
}
fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
let region_id = RegionId::new(1, 1);
let key = RangeScanCacheKey {
region_id,
row_groups: vec![],
scan: ScanRequestFingerprintBuilder {
read_column_ids: vec![],
read_column_types: vec![],
filters: vec![],
time_filters: vec![],
series_row_selector: None,
append_mode: false,
filter_deleted: false,
merge_mode: MergeMode::LastRow,
partition_expr_version: 0,
}
.build(),
};
let metrics_set = ExecutionPlanMetricsSet::new();
let part_metrics =
PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
assert!(strategy.get_range_result(&key).is_none());
(key, part_metrics)
}
async fn finish_cache_batch_buffer(
buffer: CacheBatchBuffer,
key: RangeScanCacheKey,
cache_strategy: CacheStrategy,
part_metrics: PartitionMetrics,
) -> Option<Arc<RangeScanCacheValue>> {
let (tx, rx) = oneshot::channel();
buffer.finish(key, cache_strategy, part_metrics, Some(tx));
rx.await.context(crate::error::RecvSnafu).ok().flatten()
}
async fn new_stream_context(
filters: Vec<Expr>,
query_time_range: Option<TimestampRange>,
@@ -687,169 +863,175 @@ mod tests {
);
}
/// Creates a test schema with 5 columns where the primary key dictionary column
/// is at index 2 (`num_columns - 3`), matching the flat format layout.
///
/// Layout: `[field0: Int64, field1: Int64, pk: Dictionary<UInt32,Binary>, ts: Int64, seq: Int64]`
fn dict_test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
Arc::new(Schema::new(vec![
Field::new("field0", ArrowDataType::Int64, false),
Field::new("field1", ArrowDataType::Int64, false),
Field::new(
"pk",
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt32),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("ts", ArrowDataType::Int64, false),
Field::new("seq", ArrowDataType::Int64, false),
]))
Arc::new(Schema::new(vec![Field::new(
"value",
ArrowDataType::Int64,
false,
)]))
}
/// Helper to create a record batch with a dictionary column at the primary key position.
fn make_dict_batch(
schema: Arc<datatypes::arrow::datatypes::Schema>,
dict_values: &datatypes::arrow::array::BinaryArray,
keys: &[u32],
int_values: &[i64],
) -> RecordBatch {
use datatypes::arrow::array::{Int64Array, UInt32Array};
fn make_batch(values: &[i64]) -> RecordBatch {
use datatypes::arrow::array::Int64Array;
let key_array = UInt32Array::from(keys.to_vec());
let dict_array: DictionaryArray<UInt32Type> =
DictionaryArray::new(key_array, Arc::new(dict_values.clone()));
let int_array = Int64Array::from(int_values.to_vec());
let zeros = Int64Array::from(vec![0i64; int_values.len()]);
RecordBatch::try_new(
schema,
vec![
Arc::new(zeros.clone()),
Arc::new(int_array),
Arc::new(dict_array),
Arc::new(zeros.clone()),
Arc::new(zeros),
],
test_schema(),
vec![Arc::new(Int64Array::from(values.to_vec()))],
)
.unwrap()
}
/// Computes the total `get_array_memory_size()` of all dictionary value arrays in a batch.
fn compute_total_dict_values_size(batch: &RecordBatch) -> usize {
batch
.columns()
.iter()
.filter_map(|col| {
col.as_any()
.downcast_ref::<DictionaryArray<UInt32Type>>()
.map(|dict| dict.values().get_array_memory_size())
})
.sum()
}
#[test]
fn cache_batch_buffer_empty() {
let buffer = CacheBatchBuffer::new();
assert_eq!(buffer.estimated_batches_size(), 0);
assert!(buffer.into_batches().is_empty());
}
#[test]
fn cache_batch_buffer_single_batch() {
fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
use datatypes::arrow::array::BinaryArray;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
let schema = dict_test_schema();
let dict_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]);
let batch = make_dict_batch(schema, &dict_values, &[0, 1, 2], &[10, 20, 30]);
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
ArrowDataType::Binary,
false,
)]));
let payload = vec![b'x'; bytes_per_row];
let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
let full_size = batch.get_array_memory_size();
let mut buffer = CacheBatchBuffer::new();
buffer.push(batch);
assert_eq!(buffer.estimated_batches_size(), full_size);
assert_eq!(buffer.into_batches().len(), 1);
RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
}
#[test]
fn cache_batch_buffer_shared_dictionary() {
use datatypes::arrow::array::BinaryArray;
fn compact_record_batches_keeps_original_boundaries() {
let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
let schema = dict_test_schema();
let dict_values = BinaryArray::from_vec(vec![b"alpha", b"beta", b"gamma"]);
let compacted = compact_record_batches(batches).unwrap();
// Two batches sharing the same dictionary values array.
let batch1 = make_dict_batch(schema.clone(), &dict_values, &[0, 1], &[10, 20]);
let batch2 = make_dict_batch(schema, &dict_values, &[1, 2], &[30, 40]);
assert_eq!(compacted.batch.num_rows(), 5);
assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
}
let batch1_full = batch1.get_array_memory_size();
let batch2_full = batch2.get_array_memory_size();
#[tokio::test]
async fn cached_flat_range_stream_replays_original_batches() {
let value = Arc::new(RangeScanCacheValue::new(
vec![CachedBatchSlice {
batch: make_batch(&[1, 2, 3]),
slice_lengths: vec![2, 1],
}],
make_batch(&[1, 2, 3]).get_array_memory_size(),
));
// The total dictionary values size that should be deduplicated for the second batch.
let dict_values_size = compute_total_dict_values_size(&batch2);
let replayed = cached_flat_range_stream(value)
.try_collect::<Vec<_>>()
.await
.unwrap();
let mut buffer = CacheBatchBuffer::new();
buffer.push(batch1);
buffer.push(batch2);
assert_eq!(replayed.len(), 2);
assert_eq!(replayed[0].num_rows(), 2);
assert_eq!(replayed[1].num_rows(), 1);
}
// Second batch's dict values should not be counted again.
#[tokio::test]
async fn cache_batch_buffer_finishes_pending_batches() {
let strategy = test_cache_strategy();
let batch = make_batch(&[1, 2, 3]);
let expected_size = batch.get_array_memory_size();
let (key, part_metrics) = test_cache_context(&strategy);
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.push(batch).unwrap();
let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
.await
.unwrap();
assert_eq!(value.cached_batches.len(), 1);
assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
assert_eq!(value.estimated_batches_size, expected_size);
assert!(Arc::ptr_eq(
&value,
&strategy.get_range_result(&key).unwrap()
));
}
#[tokio::test]
async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
let strategy = test_cache_strategy();
let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
let (key, part_metrics) = test_cache_context(&strategy);
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.push(batch.clone()).unwrap();
buffer.push(batch).unwrap();
assert_eq!(buffer.buffered_rows, 0);
assert!(buffer.buffered_batches.is_empty());
let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
.await
.unwrap();
assert_eq!(value.cached_batches.len(), 1);
assert_eq!(
buffer.estimated_batches_size(),
batch1_full + batch2_full - dict_values_size
value.cached_batches[0].slice_lengths,
vec![
DEFAULT_READ_BATCH_SIZE / 2 + 1,
DEFAULT_READ_BATCH_SIZE / 2 + 1
]
);
assert_eq!(buffer.into_batches().len(), 2);
}
#[test]
fn cache_batch_buffer_non_shared_dictionary() {
use datatypes::arrow::array::BinaryArray;
#[tokio::test]
async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
let strategy = test_cache_strategy();
let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
let (key, part_metrics) = test_cache_context(&strategy);
let schema = dict_test_schema();
let dict_values1 = BinaryArray::from_vec(vec![b"a", b"b"]);
let dict_values2 = BinaryArray::from_vec(vec![b"x", b"y"]);
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.push(large_batch.clone()).unwrap();
let batch1 = make_dict_batch(schema.clone(), &dict_values1, &[0, 1], &[10, 20]);
let batch2 = make_dict_batch(schema, &dict_values2, &[0, 1], &[30, 40]);
assert_eq!(buffer.buffered_rows, 0);
assert!(buffer.buffered_batches.is_empty());
let batch1_full = batch1.get_array_memory_size();
let batch2_full = batch2.get_array_memory_size();
let mut buffer = CacheBatchBuffer::new();
buffer.push(batch1);
buffer.push(batch2);
// Different dictionaries: full size for both.
assert_eq!(buffer.estimated_batches_size(), batch1_full + batch2_full);
}
#[test]
fn cache_batch_buffer_shared_then_diverged() {
use datatypes::arrow::array::BinaryArray;
let schema = dict_test_schema();
let shared_values = BinaryArray::from_vec(vec![b"a", b"b", b"c"]);
let different_values = BinaryArray::from_vec(vec![b"x", b"y"]);
let batch1 = make_dict_batch(schema.clone(), &shared_values, &[0], &[1]);
let batch2 = make_dict_batch(schema.clone(), &shared_values, &[1], &[2]);
let batch3 = make_dict_batch(schema, &different_values, &[0], &[3]);
let size1 = batch1.get_array_memory_size();
let size2 = batch2.get_array_memory_size();
let size3 = batch3.get_array_memory_size();
let dict_values_size = compute_total_dict_values_size(&batch2);
let mut buffer = CacheBatchBuffer::new();
buffer.push(batch1);
buffer.push(batch2);
buffer.push(batch3);
// batch2 shares dict with batch1 (dedup), batch3 does not (full size).
let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
.await
.unwrap();
assert_eq!(value.cached_batches.len(), 1);
assert_eq!(
buffer.estimated_batches_size(),
size1 + (size2 - dict_values_size) + size3
value.cached_batches[0].slice_lengths,
vec![large_batch.num_rows()]
);
}
#[tokio::test]
async fn cache_batch_buffer_uses_compacted_size_for_weight() {
let strategy = test_cache_strategy();
let batch1 = make_batch(&[1, 2]);
let batch2 = make_batch(&[3, 4]);
let (key, part_metrics) = test_cache_context(&strategy);
let expected = concat_batches(&test_schema(), &[batch1.clone(), batch2.clone()])
.unwrap()
.get_array_memory_size();
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.push(batch1).unwrap();
buffer.push(batch2).unwrap();
let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
.await
.unwrap();
assert_eq!(value.estimated_batches_size, expected);
}
#[tokio::test]
async fn cache_batch_buffer_skips_cache_when_weight_exceeds_limit() {
let strategy = test_cache_strategy();
let (key, part_metrics) = test_cache_context(&strategy);
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.total_weight = RANGE_CACHE_SKIP_BYTES;
buffer.push(make_batch(&[1])).unwrap();
assert!(buffer.sender.is_none());
assert!(
finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
.await
.is_none()
);
}
}

View File

@@ -1258,13 +1258,25 @@ pub(crate) fn should_split_flat_batches_for_merge(
// This is a file range.
let file_index = index.index - stream_ctx.input.num_memtables();
let file = &stream_ctx.input.files[file_index];
if file.meta_ref().num_rows < SPLIT_ROW_THRESHOLD || file.meta_ref().num_series == 0 {
let file_meta = file.meta_ref();
if file_meta.level == 0 {
// Always split level 0 files.
num_files_to_split += 1;
continue;
} else if file_meta.num_rows < SPLIT_ROW_THRESHOLD || file_meta.num_series == 0 {
// If the file doesn't have enough rows, or the number of series is unavailable, skips it.
continue;
}
debug_assert!(file.meta_ref().num_rows > 0);
if !can_split_series(file.meta_ref().num_rows, file.meta_ref().num_series) {
debug_assert!(file_meta.num_rows > 0);
if !can_split_series(file_meta.num_rows, file_meta.num_series) {
// We can't split batches in a file.
common_telemetry::trace!(
"Can't split series for file {}, level: {}, num_rows: {}, num_series: {}",
file_meta.file_id,
file_meta.level,
file_meta.num_rows,
file_meta.num_series,
);
return None;
} else {
num_files_to_split += 1;
@@ -1310,14 +1322,108 @@ pub(crate) fn compute_parallel_channel_size(estimated_rows_per_batch: usize) ->
size.clamp(2, 64)
}
/// Computes the average estimated rows per batch across multiple range readers.
pub(crate) fn compute_average_batch_size(
estimated_rows_per_batch: impl IntoIterator<Item = usize>,
) -> usize {
let mut total = 0usize;
let mut count = 0usize;
for size in estimated_rows_per_batch {
total += size;
count += 1;
}
if count == 0 {
return DEFAULT_READ_BATCH_SIZE;
}
(total / count).clamp(1, DEFAULT_READ_BATCH_SIZE)
}
fn can_split_series(num_rows: u64, num_series: u64) -> bool {
assert!(num_series > 0);
assert!(num_rows > 0);
if num_rows == 0 || num_series == 0 {
return false;
}
// It doesn't have too many series or it will have enough rows for each batch.
num_series < NUM_SERIES_THRESHOLD || num_rows / num_series >= BATCH_SIZE_THRESHOLD
}
#[cfg(test)]
mod split_tests {
use std::sync::Arc;
use common_time::Timestamp;
use smallvec::smallvec;
use store_api::storage::FileId;
use super::*;
use crate::read::projection::ProjectionMapper;
use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::sst::file::FileHandle;
use crate::test_util::memtable_util::metadata_with_primary_key;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::sst_util::sst_file_handle_with_file_id;
async fn new_stream_context_with_files(files: Vec<FileHandle>) -> StreamContext {
let env = SchedulerEnv::new().await;
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
let input = ScanInput::new(env.access_layer.clone(), mapper).with_files(files);
StreamContext {
input,
ranges: vec![],
scan_fingerprint: None,
query_start: std::time::Instant::now(),
}
}
fn single_file_range_meta() -> RangeMeta {
RangeMeta {
time_range: (
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
),
indices: smallvec![SourceIndex {
index: 0,
num_row_groups: 1,
}],
row_group_indices: smallvec![RowGroupIndex {
index: 0,
row_group_index: 0,
}],
num_rows: 1024,
}
}
#[tokio::test]
async fn should_split_level_zero_file_even_when_series_stats_are_missing() {
let mut file = sst_file_handle_with_file_id(FileId::random(), 0, 1000)
.meta_ref()
.clone();
file.level = 0;
file.num_rows = DEFAULT_ROW_GROUP_SIZE as u64;
file.num_row_groups = 1;
file.num_series = 0;
let file = FileHandle::new(file, crate::test_util::new_noop_file_purger());
let stream_ctx = Arc::new(new_stream_context_with_files(vec![file]).await);
assert!(
should_split_flat_batches_for_merge(&stream_ctx, &single_file_range_meta()).is_some()
);
}
#[test]
fn can_split_series_returns_false_for_zero_inputs() {
assert!(!can_split_series(0, 1));
assert!(!can_split_series(1, 0));
assert!(!can_split_series(0, 0));
}
}
/// Creates a new [ReaderFilterMetrics] with optional apply metrics initialized
/// based on the `explain_verbose` flag.
fn new_filter_metrics(explain_verbose: bool) -> ReaderFilterMetrics {
@@ -1653,6 +1759,7 @@ mod tests {
let meta = FileMeta {
region_id: RegionId::new(123, 456),
file_id: Default::default(),
level: 1,
time_range: (
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(1000),
@@ -1816,4 +1923,26 @@ mod tests {
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE * 2)
);
}
#[test]
fn test_compute_average_batch_size_uses_arithmetic_mean() {
assert_eq!(24, compute_average_batch_size([16, 24, 32]));
}
#[test]
fn test_compute_average_batch_size_clamps_values() {
assert_eq!(
DEFAULT_READ_BATCH_SIZE,
compute_average_batch_size([DEFAULT_READ_BATCH_SIZE, DEFAULT_READ_BATCH_SIZE * 2])
);
assert_eq!(1, compute_average_batch_size([0, 1]));
}
#[test]
fn test_compute_average_batch_size_falls_back_when_empty() {
assert_eq!(
DEFAULT_READ_BATCH_SIZE,
compute_average_batch_size(std::iter::empty())
);
}
}

View File

@@ -41,6 +41,9 @@ use crate::read::flat_merge::FlatMergeReader;
use crate::read::last_row::FlatLastRowReader;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::range::RangeMeta;
use crate::read::range_cache::{
build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream,
};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
@@ -181,19 +184,22 @@ impl SeqScan {
sources,
None,
None,
false,
compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
)
.await
}
/// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
/// if possible.
/// Builds a flat reader to read sources that returns RecordBatch.
/// If `semaphore` is provided, reads sources in parallel if possible.
/// If `skip_dedup` is true, the merged stream is returned without applying flat dedup.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn build_flat_reader_from_sources(
stream_ctx: &StreamContext,
mut sources: Vec<BoxedRecordBatchStream>,
semaphore: Option<Arc<Semaphore>>,
part_metrics: Option<&PartitionMetrics>,
skip_dedup: bool,
channel_size: usize,
) -> Result<BoxedRecordBatchStream> {
if let Some(semaphore) = semaphore.as_ref() {
@@ -215,7 +221,7 @@ impl SeqScan {
FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
.await?;
let dedup = !stream_ctx.input.append_mode;
let dedup = !skip_dedup && !stream_ctx.input.append_mode;
let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
let reader = if dedup {
match stream_ctx.input.merge_mode {
@@ -253,6 +259,62 @@ impl SeqScan {
Ok(reader)
}
/// Builds a flat read stream for one partition range.
pub(crate) async fn build_flat_partition_range_read(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
compaction: bool,
part_metrics: &PartitionMetrics,
partition_pruner: Arc<PartitionPruner>,
file_scan_semaphore: Option<Arc<Semaphore>>,
merge_semaphore: Option<Arc<Semaphore>>,
) -> Result<(BoxedRecordBatchStream, usize)> {
let cache_key = build_range_cache_key(stream_ctx, part_range);
if let Some(key) = cache_key.as_ref() {
if let Some(value) = stream_ctx.input.cache_strategy.get_range_result(key) {
part_metrics.inc_range_cache_hit();
return Ok((cached_flat_range_stream(value), DEFAULT_READ_BATCH_SIZE));
}
part_metrics.inc_range_cache_miss();
}
let mut sources = Vec::new();
let split_batch_size = build_flat_sources(
stream_ctx,
part_range,
compaction,
part_metrics,
partition_pruner,
&mut sources,
file_scan_semaphore,
)
.await?;
let estimated_rows_per_batch = split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE);
let channel_size = compute_parallel_channel_size(estimated_rows_per_batch);
let stream = Self::build_flat_reader_from_sources(
stream_ctx,
sources,
merge_semaphore,
Some(part_metrics),
false,
channel_size,
)
.await?;
let stream = match cache_key {
Some(key) => cache_flat_range_stream(
stream,
stream_ctx.input.cache_strategy.clone(),
key,
part_metrics.clone(),
),
None => stream,
};
Ok((stream, estimated_rows_per_batch))
}
/// Scans the given partition when the part list is set properly.
/// Otherwise the returned stream might not contains any data.
fn scan_partition_impl(
@@ -331,23 +393,16 @@ impl SeqScan {
// Scans each part.
for part_range in partition_ranges {
let mut sources = Vec::new();
let split_batch_size = build_flat_sources(
let (mut reader, _) = Self::build_flat_partition_range_read(
&stream_ctx,
&part_range,
compaction,
&part_metrics,
partition_pruner.clone(),
&mut sources,
file_scan_semaphore.clone(),
).await?;
let channel_size = compute_parallel_channel_size(
split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE),
);
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size)
.await?;
semaphore.clone(),
)
.await?;
let mut metrics = ScannerMetrics {
scan_cost: fetch_start.elapsed(),

View File

@@ -41,18 +41,18 @@ use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
use tokio::sync::mpsc::{self, Receiver, Sender};
use crate::error::{
Error, InvalidSenderSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
Error, InvalidSenderSnafu, JoinSnafu, PartitionOutOfRangeSnafu, Result, ScanMultiTimesSnafu,
ScanSeriesSnafu, TooManyFilesToReadSnafu,
};
use crate::read::ScannerMetrics;
use crate::read::pruner::{PartitionPruner, Pruner};
use crate::read::scan_region::{ScanInput, StreamContext};
use crate::read::scan_util::{
PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_parallel_channel_size,
PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics, compute_average_batch_size,
compute_parallel_channel_size,
};
use crate::read::seq_scan::{SeqScan, build_flat_sources};
use crate::read::seq_scan::SeqScan;
use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
use crate::sst::parquet::flat_format::primary_key_column_index;
use crate::sst::parquet::format::PrimaryKeyArray;
@@ -227,7 +227,8 @@ impl SeriesScan {
let (senders, receivers) = new_channel_list(self.properties.num_partitions());
let mut distributor = SeriesDistributor {
stream_ctx: self.stream_ctx.clone(),
semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
range_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
final_merge_semaphore: Some(Arc::new(Semaphore::new(self.properties.num_partitions()))),
partitions: self.properties.partitions.clone(),
pruner: self.pruner.clone(),
senders,
@@ -420,8 +421,13 @@ impl SeriesScan {
struct SeriesDistributor {
/// Context for the scan stream.
stream_ctx: Arc<StreamContext>,
/// Optional semaphore for limiting the number of concurrent scans.
semaphore: Option<Arc<Semaphore>>,
/// Semaphore for file scanning and range-level merging.
range_semaphore: Option<Arc<Semaphore>>,
/// Semaphore for the final merge across all range streams.
/// Must be separate from `range_semaphore` to avoid deadlock: final merge tasks
/// hold a permit while waiting for data from range-level merge tasks, which also
/// need permits to produce data.
final_merge_semaphore: Option<Arc<Semaphore>>,
/// Partition ranges to scan.
partitions: Vec<Vec<PartitionRange>>,
/// Shared pruner for file range building.
@@ -483,36 +489,57 @@ impl SeriesDistributor {
// build part cost.
let mut fetch_start = Instant::now();
// Scans all parts.
let mut sources = Vec::with_capacity(self.partitions.len());
let mut min_batch_size: Option<usize> = None;
// Builds one deduped stream per partition range, then merges across ranges.
let build_start = Instant::now();
let mut tasks = Vec::new();
for partition in &self.partitions {
sources.reserve(partition.len());
for part_range in partition {
let split_batch_size = build_flat_sources(
&self.stream_ctx,
part_range,
false,
&part_metrics,
partition_pruner.clone(),
&mut sources,
self.semaphore.clone(),
)
.await?;
if let Some(size) = split_batch_size {
min_batch_size = Some(min_batch_size.map_or(size, |cur| cur.min(size)));
}
let stream_ctx = self.stream_ctx.clone();
let part_range = *part_range;
let part_metrics = part_metrics.clone();
let partition_pruner = partition_pruner.clone();
let file_scan_semaphore = self.range_semaphore.clone();
let merge_semaphore = self.range_semaphore.clone();
tasks.push(common_runtime::spawn_global(async move {
SeqScan::build_flat_partition_range_read(
&stream_ctx,
&part_range,
false,
&part_metrics,
partition_pruner,
file_scan_semaphore,
merge_semaphore,
)
.await
}));
}
}
// Builds a flat reader that merge sources from all parts.
let mut range_streams = Vec::with_capacity(tasks.len());
let mut estimated_batch_sizes = Vec::with_capacity(tasks.len());
for task in tasks {
let (stream, estimated_batch_size) = task.await.context(JoinSnafu)??;
range_streams.push(stream);
estimated_batch_sizes.push(estimated_batch_size);
}
let channel_size =
compute_parallel_channel_size(min_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE));
compute_parallel_channel_size(compute_average_batch_size(estimated_batch_sizes));
common_telemetry::debug!(
"SeriesDistributor built {} range_streams, region: {}, build cost: {:?}, channel_size: {}",
range_streams.len(),
self.stream_ctx.input.region_metadata().region_id,
build_start.elapsed(),
channel_size,
);
// Each partition range stream is already deduped, so skip dedup here.
// Use a separate semaphore for the final merge to avoid deadlock with
// range-level merge tasks that share the range_semaphore.
let mut reader = SeqScan::build_flat_reader_from_sources(
&self.stream_ctx,
sources,
self.semaphore.clone(),
range_streams,
self.final_merge_semaphore.clone(),
Some(&part_metrics),
true,
channel_size,
)
.await?;