feat: tune range cache (#8006)

* feat: use range cache size in manager

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update compact condition

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: update tests

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: validate permits before acquiring

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-23 19:26:23 +08:00
committed by GitHub
parent e316797fff
commit 1fda2f5a35
2 changed files with 236 additions and 158 deletions

View File

@@ -50,7 +50,7 @@ use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCache
#[cfg(feature = "vector_index")]
use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result};
use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
use crate::memtable::record_batch_estimated_size;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
@@ -80,6 +80,7 @@ const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
pub(crate) struct RangeResultMemoryLimiter {
semaphore: Arc<tokio::sync::Semaphore>,
permit_bytes: usize,
total_permits: usize,
}
impl Default for RangeResultMemoryLimiter {
@@ -94,23 +95,46 @@ impl Default for RangeResultMemoryLimiter {
impl RangeResultMemoryLimiter {
pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
let permit_bytes = permit_bytes.max(1);
let permits = limit_bytes.div_ceil(permit_bytes).max(1);
let total_permits = limit_bytes
.div_ceil(permit_bytes)
.clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
Self {
semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
permit_bytes,
total_permits,
}
}
#[cfg(test)]
pub(crate) fn permit_bytes(&self) -> usize {
self.permit_bytes
}
pub(crate) async fn acquire(
&self,
bytes: usize,
) -> std::result::Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32;
self.semaphore.acquire_many(permits).await
#[cfg(test)]
pub(crate) fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}
pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
let permits = bytes.div_ceil(self.permit_bytes).max(1);
if permits > self.total_permits {
return UnexpectedSnafu {
reason: format!(
"range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
self.total_permits.saturating_mul(self.permit_bytes)
),
}
.fail();
}
self.semaphore
.acquire_many(permits as u32)
.await
.map_err(|_| {
UnexpectedSnafu {
reason: "range result memory limiter is unexpectedly closed",
}
.build()
})
}
}
@@ -431,6 +455,15 @@ impl CacheStrategy {
}
}
pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
Some(cache_manager.range_result_cache_size())
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
@@ -534,6 +567,8 @@ pub struct CacheManager {
selector_result_cache: Option<SelectorResultCache>,
/// Cache for range scan outputs in flat format.
range_result_cache: Option<RangeResultCache>,
/// Configured capacity for range scan outputs in flat format.
range_result_cache_size: u64,
/// Shared memory limiter for async range-result cache tasks.
range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
/// Cache for index result.
@@ -804,6 +839,10 @@ impl CacheManager {
&self.range_result_memory_limiter
}
pub(crate) fn range_result_cache_size(&self) -> usize {
self.range_result_cache_size as usize
}
/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
@@ -1038,7 +1077,11 @@ impl CacheManagerBuilder {
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
range_result_cache,
range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()),
range_result_cache_size: self.range_result_cache_size,
range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
self.range_result_cache_size as usize,
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
)),
index_result_cache,
}
}
@@ -1448,6 +1491,46 @@ mod tests {
assert!(cache.get_range_result(&key).is_some());
}
#[test]
fn test_range_result_cache_size_configures_limiter() {
let cache_size = 3 * 1024_u64;
let cache = CacheManager::builder()
.range_result_cache_size(cache_size)
.build();
assert_eq!(cache.range_result_cache_size(), cache_size as usize);
assert_eq!(
cache.range_result_memory_limiter().permit_bytes(),
RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
);
assert_eq!(
cache.range_result_memory_limiter().available_permits(),
(cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
);
}
#[tokio::test]
async fn range_result_memory_limiter_rejects_oversized_request() {
let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
assert_eq!(limiter.available_permits(), 2);
let err = limiter.acquire(10 * 1024).await.unwrap_err();
assert!(
err.to_string().contains("exceeds limiter capacity"),
"unexpected error: {err}"
);
assert_eq!(limiter.available_permits(), 2);
}
#[tokio::test]
async fn range_result_memory_limiter_allows_request_up_to_capacity() {
let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
let permit = limiter.acquire(2 * 1024).await.unwrap();
assert_eq!(limiter.available_permits(), 0);
drop(permit);
assert_eq!(limiter.available_permits(), 2);
}
#[tokio::test]
async fn test_evict_puffin_cache_clears_all_entries() {
use std::collections::{BTreeMap, HashMap};

View File

@@ -30,7 +30,7 @@ use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
use tokio::sync::{mpsc, oneshot};
use crate::cache::CacheStrategy;
use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu};
use crate::error::{ComputeArrowSnafu, Result};
use crate::read::BoxedRecordBatchStream;
use crate::read::scan_region::StreamContext;
use crate::read::scan_util::PartitionMetrics;
@@ -38,8 +38,7 @@ use crate::region::options::MergeMode;
use crate::sst::file::FileTimeRange;
use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 2 * 1024 * 1024;
const RANGE_CACHE_SKIP_BYTES: usize = 512 * 1024 * 1024;
const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
/// Fingerprint of the scan request fields that affect partition range cache reuse.
///
@@ -342,7 +341,7 @@ enum CacheConcatCommand {
key: RangeScanCacheKey,
cache_strategy: CacheStrategy,
part_metrics: PartitionMetrics,
result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
},
}
@@ -366,12 +365,7 @@ impl CacheConcatState {
.iter()
.map(RecordBatch::get_array_memory_size)
.sum::<usize>();
let _permit = limiter.acquire(input_size).await.map_err(|_| {
UnexpectedSnafu {
reason: "range result memory limiter is unexpectedly closed",
}
.build()
})?;
let _permit = limiter.acquire(input_size).await?;
let compacted = compact_record_batches(batches)?;
self.estimated_size += compacted.batch.get_array_memory_size();
@@ -411,6 +405,7 @@ fn build_cached_batch_slice(
async fn run_cache_concat_task(
mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
skip_threshold_bytes: usize,
) {
let mut state = CacheConcatState::default();
@@ -421,6 +416,11 @@ async fn run_cache_concat_task(
warn!(err; "Failed to compact range cache batches");
return;
}
// Close the channel to stop further work as soon as the cached
// size exceeds the configured cache budget.
if state.estimated_size > skip_threshold_bytes {
return;
}
}
CacheConcatCommand::Finish {
pending,
@@ -429,24 +429,31 @@ async fn run_cache_concat_task(
part_metrics,
result_tx,
} => {
let result = state
let compact_result = state
.compact(pending, &limiter)
.await
.map(|()| state.finish());
if let Err(err) = &result {
warn!(err; "Failed to finalize range cache batches");
let result = match compact_result {
Ok(v) => {
let value = Arc::new(v);
part_metrics
.inc_range_cache_size(key.estimated_size() + value.estimated_size());
cache_strategy.put_range_result(key, value.clone());
Ok(value)
}
Err(e) => {
warn!(e; "Failed to finalize range cache batches");
Err(e)
}
};
if let Some(tx) = result_tx {
let _ = tx.send(result);
}
let value = result.ok().map(Arc::new);
if let Some(value) = &value {
part_metrics
.inc_range_cache_size(key.estimated_size() + value.estimated_size());
cache_strategy.put_range_result(key, value.clone());
}
if let Some(tx) = result_tx {
let _ = tx.send(value);
}
return;
break;
}
}
}
@@ -456,15 +463,19 @@ struct CacheBatchBuffer {
buffered_batches: Vec<RecordBatch>,
buffered_rows: usize,
buffered_size: usize,
total_weight: usize,
sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
}
impl CacheBatchBuffer {
fn new(cache_strategy: &CacheStrategy) -> Self {
let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0);
let (tx, rx) = mpsc::unbounded_channel();
common_runtime::spawn_global(run_cache_concat_task(rx, limiter.clone()));
common_runtime::spawn_global(run_cache_concat_task(
rx,
limiter.clone(),
skip_threshold_bytes,
));
tx
});
@@ -472,7 +483,6 @@ impl CacheBatchBuffer {
buffered_batches: Vec::new(),
buffered_rows: 0,
buffered_size: 0,
total_weight: 0,
sender,
}
}
@@ -482,22 +492,13 @@ impl CacheBatchBuffer {
return Ok(());
}
let batch_size = batch.get_array_memory_size();
self.total_weight += batch_size;
if self.total_weight > RANGE_CACHE_SKIP_BYTES {
self.buffered_batches.clear();
self.buffered_rows = 0;
self.buffered_size = 0;
self.sender = None;
return Ok(());
}
self.buffered_rows += batch.num_rows();
self.buffered_size += batch_size;
self.buffered_size += batch.get_array_memory_size();
self.buffered_batches.push(batch);
if self.buffered_rows > DEFAULT_READ_BATCH_SIZE
|| self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES
if self.buffered_batches.len() > 1
&& (self.buffered_rows > DEFAULT_READ_BATCH_SIZE
|| self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES)
{
self.notify_compact();
}
@@ -527,7 +528,7 @@ impl CacheBatchBuffer {
key: RangeScanCacheKey,
cache_strategy: CacheStrategy,
part_metrics: PartitionMetrics,
result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
) {
let Some(sender) = self.sender.take() else {
return;
@@ -641,28 +642,38 @@ mod tests {
fn test_cache_strategy() -> CacheStrategy {
CacheStrategy::EnableAll(Arc::new(
CacheManager::builder()
.range_result_cache_size(1024)
.range_result_cache_size(1024 * 1024)
.build(),
))
}
fn test_scan_fingerprint(
filters: Vec<String>,
time_filters: Vec<String>,
series_row_selector: Option<TimeSeriesRowSelector>,
filter_deleted: bool,
partition_expr_version: u64,
) -> ScanRequestFingerprint {
ScanRequestFingerprintBuilder {
read_column_ids: vec![1, 2],
read_column_types: vec![None, None],
filters,
time_filters,
series_row_selector,
append_mode: false,
filter_deleted,
merge_mode: MergeMode::LastRow,
partition_expr_version,
}
.build()
}
fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
let region_id = RegionId::new(1, 1);
let key = RangeScanCacheKey {
region_id,
row_groups: vec![],
scan: ScanRequestFingerprintBuilder {
read_column_ids: vec![],
read_column_types: vec![],
filters: vec![],
time_filters: vec![],
series_row_selector: None,
append_mode: false,
filter_deleted: false,
merge_mode: MergeMode::LastRow,
partition_expr_version: 0,
}
.build(),
scan: test_scan_fingerprint(vec![], vec![], None, false, 0),
};
let metrics_set = ExecutionPlanMetricsSet::new();
@@ -678,10 +689,12 @@ mod tests {
key: RangeScanCacheKey,
cache_strategy: CacheStrategy,
part_metrics: PartitionMetrics,
) -> Option<Arc<RangeScanCacheValue>> {
) -> Result<Arc<RangeScanCacheValue>> {
let (tx, rx) = oneshot::channel();
common_telemetry::info!("finish start");
buffer.finish(key, cache_strategy, part_metrics, Some(tx));
rx.await.context(crate::error::RecvSnafu).ok().flatten()
common_telemetry::info!("finish end");
rx.await.context(crate::error::RecvSnafu)?
}
async fn new_stream_context(
@@ -733,9 +746,40 @@ mod tests {
lit(ScalarValue::TimestampMillisecond(Some(val), None))
}
fn normalized_exprs(exprs: impl IntoIterator<Item = Expr>) -> Vec<String> {
let mut exprs = exprs
.into_iter()
.map(|expr| expr.to_string())
.collect::<Vec<_>>();
exprs.sort_unstable();
exprs
}
async fn assert_range_cache_filters(
filters: Vec<Expr>,
query_time_range: Option<TimestampRange>,
partition_time_range: FileTimeRange,
expected_filters: Vec<Expr>,
expected_time_filters: Vec<Expr>,
) {
let (stream_ctx, part_range) =
new_stream_context(filters, query_time_range, partition_time_range).await;
let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
assert_eq!(
key.scan.filters(),
normalized_exprs(expected_filters).as_slice()
);
assert_eq!(
key.scan.time_filters(),
normalized_exprs(expected_time_filters).as_slice()
);
}
#[tokio::test]
async fn strips_time_only_filters_when_query_covers_partition_range() {
let (stream_ctx, part_range) = new_stream_context(
assert_range_cache_filters(
vec![
col("ts").gt_eq(ts_lit(1000)),
col("ts").lt(ts_lit(2001)),
@@ -747,50 +791,30 @@ mod tests {
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
vec![],
)
.await;
let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
// Range-reducible time filters should be cleared when query covers partition range.
assert!(key.scan.time_filters().is_empty());
// Non-range time predicates stay in filters.
let mut expected_filters = [
col("k0").eq(lit("foo")).to_string(),
col("ts").is_not_null().to_string(),
];
expected_filters.sort_unstable();
assert_eq!(key.scan.filters(), expected_filters.as_slice());
}
#[tokio::test]
async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
let (stream_ctx, part_range) = new_stream_context(
assert_range_cache_filters(
vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
(
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo"))],
vec![col("ts").gt_eq(ts_lit(1000))],
)
.await;
let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
// Time filters should be preserved when query does not cover partition range.
assert_eq!(
key.scan.time_filters(),
[col("ts").gt_eq(ts_lit(1000)).to_string()].as_slice()
);
assert_eq!(
key.scan.filters(),
[col("k0").eq(lit("foo")).to_string()].as_slice()
);
}
#[tokio::test]
async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
let (stream_ctx, part_range) = new_stream_context(
assert_range_cache_filters(
vec![
col("ts").gt_eq(ts_lit(1000)),
col("ts").is_not_null(),
@@ -801,51 +825,26 @@ mod tests {
Timestamp::new_millisecond(1000),
Timestamp::new_millisecond(2000),
),
vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
vec![],
)
.await;
let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
// Range-reducible time filters should be cleared when query has no time range limit.
assert!(key.scan.time_filters().is_empty());
// Non-range time predicates stay in filters.
let mut expected_filters = [
col("k0").eq(lit("foo")).to_string(),
col("ts").is_not_null().to_string(),
];
expected_filters.sort_unstable();
assert_eq!(key.scan.filters(), expected_filters.as_slice());
}
#[test]
fn normalizes_and_clears_time_filters() {
let normalized = ScanRequestFingerprintBuilder {
read_column_ids: vec![1, 2],
read_column_types: vec![None, None],
filters: vec!["k0 = 'foo'".to_string()],
time_filters: vec![],
series_row_selector: None,
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::LastRow,
partition_expr_version: 0,
}
.build();
let normalized =
test_scan_fingerprint(vec!["k0 = 'foo'".to_string()], vec![], None, true, 0);
assert!(normalized.time_filters().is_empty());
let fingerprint = ScanRequestFingerprintBuilder {
read_column_ids: vec![1, 2],
read_column_types: vec![None, None],
filters: vec!["k0 = 'foo'".to_string()],
time_filters: vec!["ts >= 1000".to_string()],
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::LastRow,
partition_expr_version: 7,
}
.build();
let fingerprint = test_scan_fingerprint(
vec!["k0 = 'foo'".to_string()],
vec!["ts >= 1000".to_string()],
Some(TimeSeriesRowSelector::LastRow),
true,
7,
);
let reset = fingerprint.without_time_filters();
@@ -978,13 +977,22 @@ mod tests {
#[tokio::test]
async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
let strategy = test_cache_strategy();
let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
let strategy = CacheStrategy::EnableAll(Arc::new(
CacheManager::builder()
.range_result_cache_size((large_batch.get_array_memory_size() * 3) as u64)
.build(),
));
let (key, part_metrics) = test_cache_context(&strategy);
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.push(large_batch.clone()).unwrap();
assert_eq!(buffer.buffered_rows, large_batch.num_rows());
assert_eq!(buffer.buffered_batches.len(), 1);
buffer.push(large_batch.clone()).unwrap();
assert_eq!(buffer.buffered_rows, 0);
assert!(buffer.buffered_batches.is_empty());
@@ -994,44 +1002,31 @@ mod tests {
assert_eq!(value.cached_batches.len(), 1);
assert_eq!(
value.cached_batches[0].slice_lengths,
vec![large_batch.num_rows()]
vec![large_batch.num_rows(), large_batch.num_rows()]
);
}
#[tokio::test]
async fn cache_batch_buffer_uses_compacted_size_for_weight() {
let strategy = test_cache_strategy();
let batch1 = make_batch(&[1, 2]);
let batch2 = make_batch(&[3, 4]);
async fn cache_batch_buffer_skips_cache_when_compacted_size_exceeds_limit() {
let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE / 2 + 1, 4096);
// Budget only fits two large batches.
let budget = (large_batch.get_array_memory_size() as u64) * 2 + 1;
let strategy = CacheStrategy::EnableAll(Arc::new(
CacheManager::builder()
.range_result_cache_size(budget)
.build(),
));
let (key, part_metrics) = test_cache_context(&strategy);
let expected = concat_batches(&test_schema(), &[batch1.clone(), batch2.clone()])
.unwrap()
.get_array_memory_size();
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.push(batch1).unwrap();
buffer.push(batch2).unwrap();
let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
.await
.unwrap();
assert_eq!(value.estimated_batches_size, expected);
}
#[tokio::test]
async fn cache_batch_buffer_skips_cache_when_weight_exceeds_limit() {
let strategy = test_cache_strategy();
let (key, part_metrics) = test_cache_context(&strategy);
let mut buffer = CacheBatchBuffer::new(&strategy);
buffer.total_weight = RANGE_CACHE_SKIP_BYTES;
buffer.push(make_batch(&[1])).unwrap();
assert!(buffer.sender.is_none());
for _ in 0..4 {
buffer.push(large_batch.clone()).unwrap();
}
assert!(
finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
.await
.is_none()
.is_err()
);
assert!(strategy.get_range_result(&key).is_none());
}
}