diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 5d2559cba1..ee415be366 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -50,7 +50,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache #[cfg(feature = "vector_index")] use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef}; use crate::cache::write_cache::WriteCacheRef; -use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result}; +use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu}; use crate::memtable::record_batch_estimated_size; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; @@ -80,6 +80,7 @@ const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1); pub(crate) struct RangeResultMemoryLimiter { semaphore: Arc, permit_bytes: usize, + total_permits: usize, } impl Default for RangeResultMemoryLimiter { @@ -94,23 +95,46 @@ impl Default for RangeResultMemoryLimiter { impl RangeResultMemoryLimiter { pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self { let permit_bytes = permit_bytes.max(1); - let permits = limit_bytes.div_ceil(permit_bytes).max(1); + let total_permits = limit_bytes + .div_ceil(permit_bytes) + .clamp(1, tokio::sync::Semaphore::MAX_PERMITS); Self { - semaphore: Arc::new(tokio::sync::Semaphore::new(permits)), + semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)), permit_bytes, + total_permits, } } + #[cfg(test)] pub(crate) fn permit_bytes(&self) -> usize { self.permit_bytes } - pub(crate) async fn acquire( - &self, - bytes: usize, - ) -> std::result::Result, tokio::sync::AcquireError> { - let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32; - self.semaphore.acquire_many(permits).await + #[cfg(test)] + pub(crate) fn available_permits(&self) -> usize { + self.semaphore.available_permits() + } + + pub(crate) async fn acquire(&self, bytes: usize) -> Result> { + let permits = bytes.div_ceil(self.permit_bytes).max(1); + if permits > self.total_permits { + return UnexpectedSnafu { + reason: format!( + "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes", + self.total_permits.saturating_mul(self.permit_bytes) + ), + } + .fail(); + } + self.semaphore + .acquire_many(permits as u32) + .await + .map_err(|_| { + UnexpectedSnafu { + reason: "range result memory limiter is unexpectedly closed", + } + .build() + }) } } @@ -431,6 +455,15 @@ impl CacheStrategy { } } + pub(crate) fn range_result_cache_size(&self) -> Option { + match self { + CacheStrategy::EnableAll(cache_manager) => { + Some(cache_manager.range_result_cache_size()) + } + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } + /// Calls [CacheManager::write_cache()]. /// It returns None if the strategy is [CacheStrategy::Disabled]. pub fn write_cache(&self) -> Option<&WriteCacheRef> { @@ -534,6 +567,8 @@ pub struct CacheManager { selector_result_cache: Option, /// Cache for range scan outputs in flat format. range_result_cache: Option, + /// Configured capacity for range scan outputs in flat format. + range_result_cache_size: u64, /// Shared memory limiter for async range-result cache tasks. range_result_memory_limiter: Arc, /// Cache for index result. @@ -804,6 +839,10 @@ impl CacheManager { &self.range_result_memory_limiter } + pub(crate) fn range_result_cache_size(&self) -> usize { + self.range_result_cache_size as usize + } + /// Gets the write cache. pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { self.write_cache.as_ref() @@ -1038,7 +1077,11 @@ impl CacheManagerBuilder { puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, range_result_cache, - range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()), + range_result_cache_size: self.range_result_cache_size, + range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new( + self.range_result_cache_size as usize, + RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize, + )), index_result_cache, } } @@ -1448,6 +1491,46 @@ mod tests { assert!(cache.get_range_result(&key).is_some()); } + #[test] + fn test_range_result_cache_size_configures_limiter() { + let cache_size = 3 * 1024_u64; + let cache = CacheManager::builder() + .range_result_cache_size(cache_size) + .build(); + + assert_eq!(cache.range_result_cache_size(), cache_size as usize); + assert_eq!( + cache.range_result_memory_limiter().permit_bytes(), + RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize + ); + assert_eq!( + cache.range_result_memory_limiter().available_permits(), + (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize) + ); + } + + #[tokio::test] + async fn range_result_memory_limiter_rejects_oversized_request() { + let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024); + assert_eq!(limiter.available_permits(), 2); + + let err = limiter.acquire(10 * 1024).await.unwrap_err(); + assert!( + err.to_string().contains("exceeds limiter capacity"), + "unexpected error: {err}" + ); + assert_eq!(limiter.available_permits(), 2); + } + + #[tokio::test] + async fn range_result_memory_limiter_allows_request_up_to_capacity() { + let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024); + let permit = limiter.acquire(2 * 1024).await.unwrap(); + assert_eq!(limiter.available_permits(), 0); + drop(permit); + assert_eq!(limiter.available_permits(), 2); + } + #[tokio::test] async fn test_evict_puffin_cache_clears_all_entries() { use std::collections::{BTreeMap, HashMap}; diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs index 53d491e716..93995bd017 100644 --- a/src/mito2/src/read/range_cache.rs +++ b/src/mito2/src/read/range_cache.rs @@ -30,7 +30,7 @@ use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector}; use tokio::sync::{mpsc, oneshot}; use crate::cache::CacheStrategy; -use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu}; +use crate::error::{ComputeArrowSnafu, Result}; use crate::read::BoxedRecordBatchStream; use crate::read::scan_region::StreamContext; use crate::read::scan_util::PartitionMetrics; @@ -38,8 +38,7 @@ use crate::region::options::MergeMode; use crate::sst::file::FileTimeRange; use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; -const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 2 * 1024 * 1024; -const RANGE_CACHE_SKIP_BYTES: usize = 512 * 1024 * 1024; +const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024; /// Fingerprint of the scan request fields that affect partition range cache reuse. /// @@ -342,7 +341,7 @@ enum CacheConcatCommand { key: RangeScanCacheKey, cache_strategy: CacheStrategy, part_metrics: PartitionMetrics, - result_tx: Option>>>, + result_tx: Option>>>, }, } @@ -366,12 +365,7 @@ impl CacheConcatState { .iter() .map(RecordBatch::get_array_memory_size) .sum::(); - let _permit = limiter.acquire(input_size).await.map_err(|_| { - UnexpectedSnafu { - reason: "range result memory limiter is unexpectedly closed", - } - .build() - })?; + let _permit = limiter.acquire(input_size).await?; let compacted = compact_record_batches(batches)?; self.estimated_size += compacted.batch.get_array_memory_size(); @@ -411,6 +405,7 @@ fn build_cached_batch_slice( async fn run_cache_concat_task( mut rx: mpsc::UnboundedReceiver, limiter: Arc, + skip_threshold_bytes: usize, ) { let mut state = CacheConcatState::default(); @@ -421,6 +416,11 @@ async fn run_cache_concat_task( warn!(err; "Failed to compact range cache batches"); return; } + // Close the channel to stop further work as soon as the cached + // size exceeds the configured cache budget. + if state.estimated_size > skip_threshold_bytes { + return; + } } CacheConcatCommand::Finish { pending, @@ -429,24 +429,31 @@ async fn run_cache_concat_task( part_metrics, result_tx, } => { - let result = state + let compact_result = state .compact(pending, &limiter) .await .map(|()| state.finish()); - if let Err(err) = &result { - warn!(err; "Failed to finalize range cache batches"); + let result = match compact_result { + Ok(v) => { + let value = Arc::new(v); + part_metrics + .inc_range_cache_size(key.estimated_size() + value.estimated_size()); + cache_strategy.put_range_result(key, value.clone()); + + Ok(value) + } + Err(e) => { + warn!(e; "Failed to finalize range cache batches"); + + Err(e) + } + }; + + if let Some(tx) = result_tx { + let _ = tx.send(result); } - let value = result.ok().map(Arc::new); - if let Some(value) = &value { - part_metrics - .inc_range_cache_size(key.estimated_size() + value.estimated_size()); - cache_strategy.put_range_result(key, value.clone()); - } - if let Some(tx) = result_tx { - let _ = tx.send(value); - } - return; + break; } } } @@ -456,15 +463,19 @@ struct CacheBatchBuffer { buffered_batches: Vec, buffered_rows: usize, buffered_size: usize, - total_weight: usize, sender: Option>, } impl CacheBatchBuffer { fn new(cache_strategy: &CacheStrategy) -> Self { let sender = cache_strategy.range_result_memory_limiter().map(|limiter| { + let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0); let (tx, rx) = mpsc::unbounded_channel(); - common_runtime::spawn_global(run_cache_concat_task(rx, limiter.clone())); + common_runtime::spawn_global(run_cache_concat_task( + rx, + limiter.clone(), + skip_threshold_bytes, + )); tx }); @@ -472,7 +483,6 @@ impl CacheBatchBuffer { buffered_batches: Vec::new(), buffered_rows: 0, buffered_size: 0, - total_weight: 0, sender, } } @@ -482,22 +492,13 @@ impl CacheBatchBuffer { return Ok(()); } - let batch_size = batch.get_array_memory_size(); - self.total_weight += batch_size; - if self.total_weight > RANGE_CACHE_SKIP_BYTES { - self.buffered_batches.clear(); - self.buffered_rows = 0; - self.buffered_size = 0; - self.sender = None; - return Ok(()); - } - self.buffered_rows += batch.num_rows(); - self.buffered_size += batch_size; + self.buffered_size += batch.get_array_memory_size(); self.buffered_batches.push(batch); - if self.buffered_rows > DEFAULT_READ_BATCH_SIZE - || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES + if self.buffered_batches.len() > 1 + && (self.buffered_rows > DEFAULT_READ_BATCH_SIZE + || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES) { self.notify_compact(); } @@ -527,7 +528,7 @@ impl CacheBatchBuffer { key: RangeScanCacheKey, cache_strategy: CacheStrategy, part_metrics: PartitionMetrics, - result_tx: Option>>>, + result_tx: Option>>>, ) { let Some(sender) = self.sender.take() else { return; @@ -641,28 +642,38 @@ mod tests { fn test_cache_strategy() -> CacheStrategy { CacheStrategy::EnableAll(Arc::new( CacheManager::builder() - .range_result_cache_size(1024) + .range_result_cache_size(1024 * 1024) .build(), )) } + fn test_scan_fingerprint( + filters: Vec, + time_filters: Vec, + series_row_selector: Option, + filter_deleted: bool, + partition_expr_version: u64, + ) -> ScanRequestFingerprint { + ScanRequestFingerprintBuilder { + read_column_ids: vec![1, 2], + read_column_types: vec![None, None], + filters, + time_filters, + series_row_selector, + append_mode: false, + filter_deleted, + merge_mode: MergeMode::LastRow, + partition_expr_version, + } + .build() + } + fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) { let region_id = RegionId::new(1, 1); let key = RangeScanCacheKey { region_id, row_groups: vec![], - scan: ScanRequestFingerprintBuilder { - read_column_ids: vec![], - read_column_types: vec![], - filters: vec![], - time_filters: vec![], - series_row_selector: None, - append_mode: false, - filter_deleted: false, - merge_mode: MergeMode::LastRow, - partition_expr_version: 0, - } - .build(), + scan: test_scan_fingerprint(vec![], vec![], None, false, 0), }; let metrics_set = ExecutionPlanMetricsSet::new(); @@ -678,10 +689,12 @@ mod tests { key: RangeScanCacheKey, cache_strategy: CacheStrategy, part_metrics: PartitionMetrics, - ) -> Option> { + ) -> Result> { let (tx, rx) = oneshot::channel(); + common_telemetry::info!("finish start"); buffer.finish(key, cache_strategy, part_metrics, Some(tx)); - rx.await.context(crate::error::RecvSnafu).ok().flatten() + common_telemetry::info!("finish end"); + rx.await.context(crate::error::RecvSnafu)? } async fn new_stream_context( @@ -733,9 +746,40 @@ mod tests { lit(ScalarValue::TimestampMillisecond(Some(val), None)) } + fn normalized_exprs(exprs: impl IntoIterator) -> Vec { + let mut exprs = exprs + .into_iter() + .map(|expr| expr.to_string()) + .collect::>(); + exprs.sort_unstable(); + exprs + } + + async fn assert_range_cache_filters( + filters: Vec, + query_time_range: Option, + partition_time_range: FileTimeRange, + expected_filters: Vec, + expected_time_filters: Vec, + ) { + let (stream_ctx, part_range) = + new_stream_context(filters, query_time_range, partition_time_range).await; + + let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); + + assert_eq!( + key.scan.filters(), + normalized_exprs(expected_filters).as_slice() + ); + assert_eq!( + key.scan.time_filters(), + normalized_exprs(expected_time_filters).as_slice() + ); + } + #[tokio::test] async fn strips_time_only_filters_when_query_covers_partition_range() { - let (stream_ctx, part_range) = new_stream_context( + assert_range_cache_filters( vec![ col("ts").gt_eq(ts_lit(1000)), col("ts").lt(ts_lit(2001)), @@ -747,50 +791,30 @@ mod tests { Timestamp::new_millisecond(1000), Timestamp::new_millisecond(2000), ), + vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], + vec![], ) .await; - - let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); - - // Range-reducible time filters should be cleared when query covers partition range. - assert!(key.scan.time_filters().is_empty()); - // Non-range time predicates stay in filters. - let mut expected_filters = [ - col("k0").eq(lit("foo")).to_string(), - col("ts").is_not_null().to_string(), - ]; - expected_filters.sort_unstable(); - assert_eq!(key.scan.filters(), expected_filters.as_slice()); } #[tokio::test] async fn preserves_time_filters_when_query_does_not_cover_partition_range() { - let (stream_ctx, part_range) = new_stream_context( + assert_range_cache_filters( vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))], TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond), ( Timestamp::new_millisecond(1000), Timestamp::new_millisecond(2000), ), + vec![col("k0").eq(lit("foo"))], + vec![col("ts").gt_eq(ts_lit(1000))], ) .await; - - let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); - - // Time filters should be preserved when query does not cover partition range. - assert_eq!( - key.scan.time_filters(), - [col("ts").gt_eq(ts_lit(1000)).to_string()].as_slice() - ); - assert_eq!( - key.scan.filters(), - [col("k0").eq(lit("foo")).to_string()].as_slice() - ); } #[tokio::test] async fn strips_time_only_filters_when_query_has_no_time_range_limit() { - let (stream_ctx, part_range) = new_stream_context( + assert_range_cache_filters( vec![ col("ts").gt_eq(ts_lit(1000)), col("ts").is_not_null(), @@ -801,51 +825,26 @@ mod tests { Timestamp::new_millisecond(1000), Timestamp::new_millisecond(2000), ), + vec![col("k0").eq(lit("foo")), col("ts").is_not_null()], + vec![], ) .await; - - let key = build_range_cache_key(&stream_ctx, &part_range).unwrap(); - - // Range-reducible time filters should be cleared when query has no time range limit. - assert!(key.scan.time_filters().is_empty()); - // Non-range time predicates stay in filters. - let mut expected_filters = [ - col("k0").eq(lit("foo")).to_string(), - col("ts").is_not_null().to_string(), - ]; - expected_filters.sort_unstable(); - assert_eq!(key.scan.filters(), expected_filters.as_slice()); } #[test] fn normalizes_and_clears_time_filters() { - let normalized = ScanRequestFingerprintBuilder { - read_column_ids: vec![1, 2], - read_column_types: vec![None, None], - filters: vec!["k0 = 'foo'".to_string()], - time_filters: vec![], - series_row_selector: None, - append_mode: false, - filter_deleted: true, - merge_mode: MergeMode::LastRow, - partition_expr_version: 0, - } - .build(); + let normalized = + test_scan_fingerprint(vec!["k0 = 'foo'".to_string()], vec![], None, true, 0); assert!(normalized.time_filters().is_empty()); - let fingerprint = ScanRequestFingerprintBuilder { - read_column_ids: vec![1, 2], - read_column_types: vec![None, None], - filters: vec!["k0 = 'foo'".to_string()], - time_filters: vec!["ts >= 1000".to_string()], - series_row_selector: Some(TimeSeriesRowSelector::LastRow), - append_mode: false, - filter_deleted: true, - merge_mode: MergeMode::LastRow, - partition_expr_version: 7, - } - .build(); + let fingerprint = test_scan_fingerprint( + vec!["k0 = 'foo'".to_string()], + vec!["ts >= 1000".to_string()], + Some(TimeSeriesRowSelector::LastRow), + true, + 7, + ); let reset = fingerprint.without_time_filters(); @@ -978,13 +977,22 @@ mod tests { #[tokio::test] async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() { - let strategy = test_cache_strategy(); let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096); + let strategy = CacheStrategy::EnableAll(Arc::new( + CacheManager::builder() + .range_result_cache_size((large_batch.get_array_memory_size() * 3) as u64) + .build(), + )); let (key, part_metrics) = test_cache_context(&strategy); let mut buffer = CacheBatchBuffer::new(&strategy); buffer.push(large_batch.clone()).unwrap(); + assert_eq!(buffer.buffered_rows, large_batch.num_rows()); + assert_eq!(buffer.buffered_batches.len(), 1); + + buffer.push(large_batch.clone()).unwrap(); + assert_eq!(buffer.buffered_rows, 0); assert!(buffer.buffered_batches.is_empty()); @@ -994,44 +1002,31 @@ mod tests { assert_eq!(value.cached_batches.len(), 1); assert_eq!( value.cached_batches[0].slice_lengths, - vec![large_batch.num_rows()] + vec![large_batch.num_rows(), large_batch.num_rows()] ); } #[tokio::test] - async fn cache_batch_buffer_uses_compacted_size_for_weight() { - let strategy = test_cache_strategy(); - let batch1 = make_batch(&[1, 2]); - let batch2 = make_batch(&[3, 4]); + async fn cache_batch_buffer_skips_cache_when_compacted_size_exceeds_limit() { + let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE / 2 + 1, 4096); + // Budget only fits two large batches. + let budget = (large_batch.get_array_memory_size() as u64) * 2 + 1; + let strategy = CacheStrategy::EnableAll(Arc::new( + CacheManager::builder() + .range_result_cache_size(budget) + .build(), + )); let (key, part_metrics) = test_cache_context(&strategy); - let expected = concat_batches(&test_schema(), &[batch1.clone(), batch2.clone()]) - .unwrap() - .get_array_memory_size(); let mut buffer = CacheBatchBuffer::new(&strategy); - buffer.push(batch1).unwrap(); - buffer.push(batch2).unwrap(); - - let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics) - .await - .unwrap(); - assert_eq!(value.estimated_batches_size, expected); - } - - #[tokio::test] - async fn cache_batch_buffer_skips_cache_when_weight_exceeds_limit() { - let strategy = test_cache_strategy(); - let (key, part_metrics) = test_cache_context(&strategy); - let mut buffer = CacheBatchBuffer::new(&strategy); - buffer.total_weight = RANGE_CACHE_SKIP_BYTES; - - buffer.push(make_batch(&[1])).unwrap(); - - assert!(buffer.sender.is_none()); + for _ in 0..4 { + buffer.push(large_batch.clone()).unwrap(); + } assert!( - finish_cache_batch_buffer(buffer, key, strategy, part_metrics) + finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics) .await - .is_none() + .is_err() ); + assert!(strategy.get_range_result(&key).is_none()); } }