Skip to main content

mito2/read/
range_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for the partition range scan result cache.
16
17use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_telemetry::warn;
22use datatypes::arrow::compute::concat_batches;
23use datatypes::arrow::record_batch::RecordBatch;
24use datatypes::prelude::ConcreteDataType;
25use futures::TryStreamExt;
26use snafu::ResultExt;
27use store_api::region_engine::PartitionRange;
28use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector};
29use tokio::sync::{mpsc, oneshot};
30
31use crate::cache::CacheStrategy;
32use crate::error::{ComputeArrowSnafu, Result};
33use crate::read::BoxedRecordBatchStream;
34use crate::read::read_columns::ReadColumns;
35use crate::read::scan_region::StreamContext;
36use crate::read::scan_util::PartitionMetrics;
37use crate::region::options::MergeMode;
38use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
39
40const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
41
42/// Fingerprint of the scan request fields that affect partition range cache reuse.
43///
44/// It records a normalized view of the projected columns and filters, plus
45/// scan options that can change the returned rows. Schema-dependent metadata
46/// and the partition expression version are included so cached results are not
47/// reused across incompatible schema or partitioning changes.
48#[derive(Debug, Clone, PartialEq, Eq, Hash)]
49pub(crate) struct ScanRequestFingerprint {
50    /// Projection and filters without the time index and partition exprs.
51    inner: Arc<SharedScanRequestFingerprint>,
52    /// Filters with the time index column.
53    time_filters: Option<Arc<Vec<String>>>,
54    series_row_selector: Option<TimeSeriesRowSelector>,
55    append_mode: bool,
56    filter_deleted: bool,
57    merge_mode: MergeMode,
58    /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
59    /// We store the version instead of the whole partition expr or partition expr filters.
60    partition_expr_version: u64,
61}
62
63#[derive(Debug)]
64pub(crate) struct ScanRequestFingerprintBuilder {
65    pub(crate) read_columns: ReadColumns,
66    pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
67    pub(crate) filters: Vec<String>,
68    pub(crate) time_filters: Vec<String>,
69    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
70    pub(crate) append_mode: bool,
71    pub(crate) filter_deleted: bool,
72    pub(crate) merge_mode: MergeMode,
73    pub(crate) partition_expr_version: u64,
74}
75
76impl ScanRequestFingerprintBuilder {
77    pub(crate) fn build(self) -> ScanRequestFingerprint {
78        let Self {
79            read_columns,
80            read_column_types,
81            filters,
82            time_filters,
83            series_row_selector,
84            append_mode,
85            filter_deleted,
86            merge_mode,
87            partition_expr_version,
88        } = self;
89
90        ScanRequestFingerprint {
91            inner: Arc::new(SharedScanRequestFingerprint {
92                read_columns,
93                read_column_types,
94                filters,
95            }),
96            time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
97            series_row_selector,
98            append_mode,
99            filter_deleted,
100            merge_mode,
101            partition_expr_version,
102        }
103    }
104}
105
106/// Non-copiable struct of the fingerprint.
107#[derive(Debug, PartialEq, Eq, Hash)]
108struct SharedScanRequestFingerprint {
109    /// Logical columns of the projection.
110    read_columns: ReadColumns,
111    /// Column types of the projection.
112    /// We keep this to ensure we won't reuse the fingerprint after a schema change.
113    read_column_types: Vec<Option<ConcreteDataType>>,
114    /// Filters without the time index column and region partition exprs.
115    filters: Vec<String>,
116}
117
118impl ScanRequestFingerprint {
119    #[cfg(test)]
120    pub(crate) fn read_columns(&self) -> &ReadColumns {
121        &self.inner.read_columns
122    }
123
124    #[cfg(test)]
125    pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
126        &self.inner.read_column_types
127    }
128
129    #[cfg(test)]
130    pub(crate) fn filters(&self) -> &[String] {
131        &self.inner.filters
132    }
133
134    #[cfg(test)]
135    pub(crate) fn time_filters(&self) -> &[String] {
136        self.time_filters
137            .as_deref()
138            .map(Vec::as_slice)
139            .unwrap_or(&[])
140    }
141
142    #[allow(dead_code)]
143    pub(crate) fn without_time_filters(&self) -> Self {
144        Self {
145            inner: Arc::clone(&self.inner),
146            time_filters: None,
147            series_row_selector: self.series_row_selector,
148            append_mode: self.append_mode,
149            filter_deleted: self.filter_deleted,
150            merge_mode: self.merge_mode,
151            partition_expr_version: self.partition_expr_version,
152        }
153    }
154
155    pub(crate) fn estimated_size(&self) -> usize {
156        mem::size_of::<SharedScanRequestFingerprint>()
157            + self.inner.read_columns.estimated_size()
158            + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
159            + self.inner.filters.capacity() * mem::size_of::<String>()
160            + self
161                .inner
162                .filters
163                .iter()
164                .map(|filter| filter.capacity())
165                .sum::<usize>()
166            + self.time_filters.as_ref().map_or(0, |filters| {
167                mem::size_of::<Vec<String>>()
168                    + filters.capacity() * mem::size_of::<String>()
169                    + filters
170                        .iter()
171                        .map(|filter| filter.capacity())
172                        .sum::<usize>()
173            })
174    }
175}
176
177/// Cache key for range scan outputs.
178#[derive(Debug, Clone, PartialEq, Eq, Hash)]
179pub(crate) struct RangeScanCacheKey {
180    pub(crate) region_id: RegionId,
181    /// Sorted (file_id, row_group_index) pairs that uniquely identify the data this range covers.
182    pub(crate) row_groups: Vec<(FileId, i64)>,
183    pub(crate) scan: ScanRequestFingerprint,
184}
185
186impl RangeScanCacheKey {
187    pub(crate) fn estimated_size(&self) -> usize {
188        mem::size_of::<Self>()
189            + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
190            + self.scan.estimated_size()
191    }
192}
193
194/// Cached result for one range scan.
195#[derive(Debug)]
196pub(crate) struct CachedBatchSlice {
197    batch: RecordBatch,
198    slice_lengths: Vec<usize>,
199}
200
201impl CachedBatchSlice {
202    fn metadata_size(&self) -> usize {
203        self.slice_lengths.capacity() * mem::size_of::<usize>()
204    }
205}
206
207pub(crate) struct RangeScanCacheValue {
208    cached_batches: Vec<CachedBatchSlice>,
209    /// Precomputed size of all compacted batches.
210    estimated_batches_size: usize,
211}
212
213impl RangeScanCacheValue {
214    pub(crate) fn new(
215        cached_batches: Vec<CachedBatchSlice>,
216        estimated_batches_size: usize,
217    ) -> Self {
218        Self {
219            cached_batches,
220            estimated_batches_size,
221        }
222    }
223
224    pub(crate) fn estimated_size(&self) -> usize {
225        mem::size_of::<Self>()
226            + self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
227            + self
228                .cached_batches
229                .iter()
230                .map(CachedBatchSlice::metadata_size)
231                .sum::<usize>()
232            + self.estimated_batches_size
233    }
234}
235
236/// Row groups and whether all sources are file-only for a partition range.
237pub(crate) struct PartitionRangeRowGroups {
238    /// Sorted (file_id, row_group_index) pairs.
239    pub(crate) row_groups: Vec<(FileId, i64)>,
240    pub(crate) only_file_sources: bool,
241}
242
243/// Collects (file_id, row_group_index) pairs from a partition range's row group indices.
244pub(crate) fn collect_partition_range_row_groups(
245    stream_ctx: &StreamContext,
246    part_range: &PartitionRange,
247) -> PartitionRangeRowGroups {
248    let range_meta = &stream_ctx.ranges[part_range.identifier];
249    let mut row_groups = Vec::new();
250    let mut only_file_sources = true;
251
252    for index in &range_meta.row_group_indices {
253        if stream_ctx.is_file_range_index(*index) {
254            let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
255            row_groups.push((file_id, index.row_group_index));
256        } else {
257            only_file_sources = false;
258        }
259    }
260
261    row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
262
263    PartitionRangeRowGroups {
264        row_groups,
265        only_file_sources,
266    }
267}
268
269/// Builds a cache key for the given partition range if it is eligible for caching.
270pub(crate) fn build_range_cache_key(
271    stream_ctx: &StreamContext,
272    part_range: &PartitionRange,
273) -> Option<RangeScanCacheKey> {
274    if !stream_ctx.input.cache_strategy.has_range_result_cache() {
275        return None;
276    }
277
278    let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
279
280    // Dyn filters can change at runtime, so we can't cache when they're present.
281    let has_dyn_filters = stream_ctx
282        .input
283        .predicate_group()
284        .predicate_without_region()
285        .is_some_and(|p| !p.dyn_filters().is_empty());
286    if has_dyn_filters {
287        return None;
288    }
289
290    let rg = collect_partition_range_row_groups(stream_ctx, part_range);
291    if !rg.only_file_sources || rg.row_groups.is_empty() {
292        return None;
293    }
294
295    // TODO(yingwen): We used to call `fingerprint.without_time_filters()` when the query's
296    // `TimestampRange` fully covered the partition's `FileTimeRange`, so different queries that
297    // all enclosed the same partition could share a cache entry. The cover check turned out to
298    // be too coarse: it returned true in cases where the dropped time predicates would still
299    // have excluded rows, so the cache served results that should have been filtered. Reviving
300    // the optimization needs a per-predicate implication check that walks each time-only `Expr`
301    // (recursing through AND/OR/NOT) and proves the predicate is satisfied for every timestamp
302    // inside the partition's `FileTimeRange` — not the looser "does `extract_time_range_from_expr`
303    // return a range that covers the partition" used previously. Until then, always carry the
304    // full fingerprint so cache reuse stays correct.
305    let scan = fingerprint.clone();
306
307    Some(RangeScanCacheKey {
308        region_id: stream_ctx.input.region_metadata().region_id,
309        row_groups: rg.row_groups,
310        scan,
311    })
312}
313
314/// Returns a stream that replays cached record batches.
315pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
316    Box::pin(try_stream! {
317        for cached_batch in &value.cached_batches {
318            let mut offset = 0;
319            for &len in &cached_batch.slice_lengths {
320                yield cached_batch.batch.slice(offset, len);
321                offset += len;
322            }
323        }
324    })
325}
326
327enum CacheConcatCommand {
328    Compact(Vec<RecordBatch>),
329    Finish {
330        pending: Vec<RecordBatch>,
331        key: RangeScanCacheKey,
332        cache_strategy: CacheStrategy,
333        part_metrics: PartitionMetrics,
334        result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
335    },
336}
337
338#[derive(Default)]
339struct CacheConcatState {
340    cached_batches: Vec<CachedBatchSlice>,
341    estimated_size: usize,
342}
343
344impl CacheConcatState {
345    async fn compact(
346        &mut self,
347        batches: Vec<RecordBatch>,
348        limiter: &crate::cache::RangeResultMemoryLimiter,
349    ) -> Result<()> {
350        if batches.is_empty() {
351            return Ok(());
352        }
353
354        let input_size = batches
355            .iter()
356            .map(RecordBatch::get_array_memory_size)
357            .sum::<usize>();
358        let _permit = limiter.acquire(input_size).await?;
359
360        let compacted = compact_record_batches(batches)?;
361        self.estimated_size += compacted.batch.get_array_memory_size();
362        self.cached_batches.push(compacted);
363        Ok(())
364    }
365
366    fn finish(self) -> RangeScanCacheValue {
367        RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
368    }
369}
370
371fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
372    debug_assert!(!batches.is_empty());
373
374    let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
375    build_cached_batch_slice(batches, slice_lengths)
376}
377
378fn build_cached_batch_slice(
379    batches: Vec<RecordBatch>,
380    slice_lengths: Vec<usize>,
381) -> Result<CachedBatchSlice> {
382    let batch = if batches.len() == 1 {
383        batches.into_iter().next().unwrap()
384    } else {
385        let schema = batches[0].schema();
386        concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
387    };
388
389    Ok(CachedBatchSlice {
390        batch,
391        slice_lengths,
392    })
393}
394
395async fn run_cache_concat_task(
396    mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
397    limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
398    skip_threshold_bytes: usize,
399) {
400    let mut state = CacheConcatState::default();
401
402    while let Some(cmd) = rx.recv().await {
403        match cmd {
404            CacheConcatCommand::Compact(batches) => {
405                if let Err(err) = state.compact(batches, &limiter).await {
406                    warn!(err; "Failed to compact range cache batches");
407                    return;
408                }
409                // Close the channel to stop further work as soon as the cached
410                // size exceeds the configured cache budget.
411                if state.estimated_size > skip_threshold_bytes {
412                    return;
413                }
414            }
415            CacheConcatCommand::Finish {
416                pending,
417                key,
418                cache_strategy,
419                part_metrics,
420                result_tx,
421            } => {
422                let compact_result = state
423                    .compact(pending, &limiter)
424                    .await
425                    .map(|()| state.finish());
426                let result = match compact_result {
427                    Ok(v) => {
428                        let value = Arc::new(v);
429                        part_metrics
430                            .inc_range_cache_size(key.estimated_size() + value.estimated_size());
431                        cache_strategy.put_range_result(key, value.clone());
432
433                        Ok(value)
434                    }
435                    Err(e) => {
436                        warn!(e; "Failed to finalize range cache batches");
437
438                        Err(e)
439                    }
440                };
441
442                if let Some(tx) = result_tx {
443                    let _ = tx.send(result);
444                }
445
446                break;
447            }
448        }
449    }
450}
451
452struct CacheBatchBuffer {
453    buffered_batches: Vec<RecordBatch>,
454    buffered_rows: usize,
455    buffered_size: usize,
456    sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
457}
458
459impl CacheBatchBuffer {
460    fn new(cache_strategy: &CacheStrategy) -> Self {
461        let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
462            let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0);
463            let (tx, rx) = mpsc::unbounded_channel();
464            common_runtime::spawn_global(run_cache_concat_task(
465                rx,
466                limiter.clone(),
467                skip_threshold_bytes,
468            ));
469            tx
470        });
471
472        Self {
473            buffered_batches: Vec::new(),
474            buffered_rows: 0,
475            buffered_size: 0,
476            sender,
477        }
478    }
479
480    fn push(&mut self, batch: RecordBatch) -> Result<()> {
481        if self.sender.is_none() {
482            return Ok(());
483        }
484
485        self.buffered_rows += batch.num_rows();
486        self.buffered_size += batch.get_array_memory_size();
487        self.buffered_batches.push(batch);
488
489        if self.buffered_batches.len() > 1
490            && (self.buffered_rows > DEFAULT_READ_BATCH_SIZE
491                || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES)
492        {
493            self.notify_compact();
494        }
495
496        Ok(())
497    }
498
499    fn notify_compact(&mut self) {
500        if self.buffered_batches.is_empty() || self.sender.is_none() {
501            return;
502        }
503
504        let batches = mem::take(&mut self.buffered_batches);
505        self.buffered_rows = 0;
506        self.buffered_size = 0;
507
508        let Some(sender) = &self.sender else {
509            return;
510        };
511        if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
512            self.sender = None;
513        }
514    }
515
516    fn finish(
517        mut self,
518        key: RangeScanCacheKey,
519        cache_strategy: CacheStrategy,
520        part_metrics: PartitionMetrics,
521        result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
522    ) {
523        let Some(sender) = self.sender.take() else {
524            return;
525        };
526
527        if sender
528            .send(CacheConcatCommand::Finish {
529                pending: mem::take(&mut self.buffered_batches),
530                key,
531                cache_strategy,
532                part_metrics,
533                result_tx,
534            })
535            .is_err()
536        {
537            self.sender = None;
538        }
539    }
540}
541
542/// Wraps a stream to cache its output for future range cache hits.
543pub(crate) fn cache_flat_range_stream(
544    mut stream: BoxedRecordBatchStream,
545    cache_strategy: CacheStrategy,
546    key: RangeScanCacheKey,
547    part_metrics: PartitionMetrics,
548) -> BoxedRecordBatchStream {
549    Box::pin(try_stream! {
550        let mut buffer = CacheBatchBuffer::new(&cache_strategy);
551        while let Some(batch) = stream.try_next().await? {
552            buffer.push(batch.clone())?;
553            yield batch;
554        }
555
556        buffer.finish(key, cache_strategy, part_metrics, None);
557    })
558}
559
560/// Creates a `cache_flat_range_stream` with dummy internals for benchmarking.
561///
562/// This avoids exposing `RangeScanCacheKey`, `ScanRequestFingerprint`, and
563/// `PartitionMetrics` publicly.
564#[cfg(feature = "test")]
565pub fn bench_cache_flat_range_stream(
566    stream: BoxedRecordBatchStream,
567    cache_size_bytes: u64,
568    region_id: RegionId,
569) -> BoxedRecordBatchStream {
570    use std::time::Instant;
571
572    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
573
574    use crate::region::options::MergeMode;
575
576    let cache_manager = Arc::new(
577        crate::cache::CacheManager::builder()
578            .range_result_cache_size(cache_size_bytes)
579            .build(),
580    );
581    let cache_strategy = CacheStrategy::EnableAll(cache_manager);
582
583    let fingerprint = ScanRequestFingerprintBuilder {
584        read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
585        read_column_types: vec![],
586        filters: vec![],
587        time_filters: vec![],
588        series_row_selector: None,
589        append_mode: false,
590        filter_deleted: false,
591        merge_mode: MergeMode::LastRow,
592        partition_expr_version: 0,
593    }
594    .build();
595
596    let key = RangeScanCacheKey {
597        region_id,
598        row_groups: vec![],
599        scan: fingerprint,
600    };
601
602    let metrics_set = ExecutionPlanMetricsSet::new();
603    let part_metrics =
604        PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
605
606    cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
607}
608
609#[cfg(test)]
610mod tests {
611    use std::sync::Arc;
612    use std::time::Instant;
613
614    use common_time::Timestamp;
615    use common_time::range::TimestampRange;
616    use common_time::timestamp::TimeUnit;
617    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
618    use datafusion_common::ScalarValue;
619    use datafusion_expr::{Expr, col, lit};
620    use smallvec::smallvec;
621    use store_api::storage::{FileId, RegionId};
622
623    use super::*;
624    use crate::cache::CacheManager;
625    use crate::read::flat_projection::FlatProjectionMapper;
626    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
627    use crate::read::scan_region::{PredicateGroup, ScanInput};
628    use crate::sst::file::FileTimeRange;
629    use crate::test_util::memtable_util::metadata_with_primary_key;
630    use crate::test_util::scheduler_util::SchedulerEnv;
631    use crate::test_util::sst_util::sst_file_handle_with_file_id;
632
633    fn test_cache_strategy() -> CacheStrategy {
634        CacheStrategy::EnableAll(Arc::new(
635            CacheManager::builder()
636                .range_result_cache_size(1024 * 1024)
637                .build(),
638        ))
639    }
640
641    fn test_scan_fingerprint(
642        filters: Vec<String>,
643        time_filters: Vec<String>,
644        series_row_selector: Option<TimeSeriesRowSelector>,
645        filter_deleted: bool,
646        partition_expr_version: u64,
647    ) -> ScanRequestFingerprint {
648        let read_columns = ReadColumns::from_deduped_column_ids([1, 2]);
649        ScanRequestFingerprintBuilder {
650            read_columns,
651            read_column_types: vec![None, None],
652            filters,
653            time_filters,
654            series_row_selector,
655            append_mode: false,
656            filter_deleted,
657            merge_mode: MergeMode::LastRow,
658            partition_expr_version,
659        }
660        .build()
661    }
662
663    fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
664        let region_id = RegionId::new(1, 1);
665        let key = RangeScanCacheKey {
666            region_id,
667            row_groups: vec![],
668            scan: test_scan_fingerprint(vec![], vec![], None, false, 0),
669        };
670
671        let metrics_set = ExecutionPlanMetricsSet::new();
672        let part_metrics =
673            PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
674
675        assert!(strategy.get_range_result(&key).is_none());
676        (key, part_metrics)
677    }
678
679    async fn finish_cache_batch_buffer(
680        buffer: CacheBatchBuffer,
681        key: RangeScanCacheKey,
682        cache_strategy: CacheStrategy,
683        part_metrics: PartitionMetrics,
684    ) -> Result<Arc<RangeScanCacheValue>> {
685        let (tx, rx) = oneshot::channel();
686        common_telemetry::info!("finish start");
687        buffer.finish(key, cache_strategy, part_metrics, Some(tx));
688        common_telemetry::info!("finish end");
689        rx.await.context(crate::error::RecvSnafu)?
690    }
691
692    async fn new_stream_context(
693        filters: Vec<Expr>,
694        query_time_range: Option<TimestampRange>,
695        partition_time_range: FileTimeRange,
696    ) -> (StreamContext, PartitionRange) {
697        let env = SchedulerEnv::new().await;
698        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
699        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
700        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
701        let file_id = FileId::random();
702        let file = sst_file_handle_with_file_id(
703            file_id,
704            partition_time_range.0.value(),
705            partition_time_range.1.value(),
706        );
707        let input = ScanInput::new(env.access_layer.clone(), mapper)
708            .with_predicate(predicate)
709            .with_time_range(query_time_range)
710            .with_files(vec![file])
711            .with_cache(test_cache_strategy());
712        let range_meta = RangeMeta {
713            time_range: partition_time_range,
714            indices: smallvec![SourceIndex {
715                index: 0,
716                num_row_groups: 1,
717            }],
718            row_group_indices: smallvec![RowGroupIndex {
719                index: 0,
720                row_group_index: 0,
721            }],
722            num_rows: 10,
723        };
724        let partition_range = range_meta.new_partition_range(0);
725        let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
726        let stream_ctx = StreamContext {
727            input,
728            ranges: vec![range_meta],
729            scan_fingerprint,
730            query_start: Instant::now(),
731        };
732
733        (stream_ctx, partition_range)
734    }
735
736    /// Helper to create a timestamp millisecond literal.
737    fn ts_lit(val: i64) -> Expr {
738        lit(ScalarValue::TimestampMillisecond(Some(val), None))
739    }
740
741    fn normalized_exprs(exprs: impl IntoIterator<Item = Expr>) -> Vec<String> {
742        let mut exprs = exprs
743            .into_iter()
744            .map(|expr| expr.to_string())
745            .collect::<Vec<_>>();
746        exprs.sort_unstable();
747        exprs
748    }
749
750    async fn assert_range_cache_filters(
751        filters: Vec<Expr>,
752        query_time_range: Option<TimestampRange>,
753        partition_time_range: FileTimeRange,
754        expected_filters: Vec<Expr>,
755        expected_time_filters: Vec<Expr>,
756    ) {
757        let (stream_ctx, part_range) =
758            new_stream_context(filters, query_time_range, partition_time_range).await;
759
760        let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
761
762        assert_eq!(
763            key.scan.filters(),
764            normalized_exprs(expected_filters).as_slice()
765        );
766        assert_eq!(
767            key.scan.time_filters(),
768            normalized_exprs(expected_time_filters).as_slice()
769        );
770    }
771
772    #[tokio::test]
773    async fn preserves_time_filters_when_query_covers_partition_range() {
774        assert_range_cache_filters(
775            vec![
776                col("ts").gt_eq(ts_lit(1000)),
777                col("ts").lt(ts_lit(2001)),
778                col("ts").is_not_null(),
779                col("k0").eq(lit("foo")),
780            ],
781            TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
782            (
783                Timestamp::new_millisecond(1000),
784                Timestamp::new_millisecond(2000),
785            ),
786            vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
787            vec![col("ts").gt_eq(ts_lit(1000)), col("ts").lt(ts_lit(2001))],
788        )
789        .await;
790    }
791
792    #[tokio::test]
793    async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
794        assert_range_cache_filters(
795            vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
796            TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
797            (
798                Timestamp::new_millisecond(1000),
799                Timestamp::new_millisecond(2000),
800            ),
801            vec![col("k0").eq(lit("foo"))],
802            vec![col("ts").gt_eq(ts_lit(1000))],
803        )
804        .await;
805    }
806
807    #[tokio::test]
808    async fn preserves_time_filters_when_query_has_no_time_range_limit() {
809        assert_range_cache_filters(
810            vec![
811                col("ts").gt_eq(ts_lit(1000)),
812                col("ts").is_not_null(),
813                col("k0").eq(lit("foo")),
814            ],
815            None,
816            (
817                Timestamp::new_millisecond(1000),
818                Timestamp::new_millisecond(2000),
819            ),
820            vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
821            vec![col("ts").gt_eq(ts_lit(1000))],
822        )
823        .await;
824    }
825
826    #[test]
827    fn normalizes_and_clears_time_filters() {
828        let normalized =
829            test_scan_fingerprint(vec!["k0 = 'foo'".to_string()], vec![], None, true, 0);
830
831        assert!(normalized.time_filters().is_empty());
832
833        let fingerprint = test_scan_fingerprint(
834            vec!["k0 = 'foo'".to_string()],
835            vec!["ts >= 1000".to_string()],
836            Some(TimeSeriesRowSelector::LastRow),
837            true,
838            7,
839        );
840
841        let reset = fingerprint.without_time_filters();
842
843        assert_eq!(reset.read_columns(), fingerprint.read_columns());
844        assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
845        assert_eq!(reset.filters(), fingerprint.filters());
846        assert!(reset.time_filters().is_empty());
847        assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
848        assert_eq!(reset.append_mode, fingerprint.append_mode);
849        assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
850        assert_eq!(reset.merge_mode, fingerprint.merge_mode);
851        assert_eq!(
852            reset.partition_expr_version,
853            fingerprint.partition_expr_version
854        );
855    }
856
857    fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
858        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
859
860        Arc::new(Schema::new(vec![Field::new(
861            "value",
862            ArrowDataType::Int64,
863            false,
864        )]))
865    }
866
867    fn make_batch(values: &[i64]) -> RecordBatch {
868        use datatypes::arrow::array::Int64Array;
869
870        RecordBatch::try_new(
871            test_schema(),
872            vec![Arc::new(Int64Array::from(values.to_vec()))],
873        )
874        .unwrap()
875    }
876
877    fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
878        use datatypes::arrow::array::BinaryArray;
879        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
880
881        let schema = Arc::new(Schema::new(vec![Field::new(
882            "value",
883            ArrowDataType::Binary,
884            false,
885        )]));
886        let payload = vec![b'x'; bytes_per_row];
887        let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
888
889        RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
890    }
891
892    #[test]
893    fn compact_record_batches_keeps_original_boundaries() {
894        let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
895
896        let compacted = compact_record_batches(batches).unwrap();
897
898        assert_eq!(compacted.batch.num_rows(), 5);
899        assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
900    }
901
902    #[tokio::test]
903    async fn cached_flat_range_stream_replays_original_batches() {
904        let value = Arc::new(RangeScanCacheValue::new(
905            vec![CachedBatchSlice {
906                batch: make_batch(&[1, 2, 3]),
907                slice_lengths: vec![2, 1],
908            }],
909            make_batch(&[1, 2, 3]).get_array_memory_size(),
910        ));
911
912        let replayed = cached_flat_range_stream(value)
913            .try_collect::<Vec<_>>()
914            .await
915            .unwrap();
916
917        assert_eq!(replayed.len(), 2);
918        assert_eq!(replayed[0].num_rows(), 2);
919        assert_eq!(replayed[1].num_rows(), 1);
920    }
921
922    #[tokio::test]
923    async fn cache_batch_buffer_finishes_pending_batches() {
924        let strategy = test_cache_strategy();
925        let batch = make_batch(&[1, 2, 3]);
926        let expected_size = batch.get_array_memory_size();
927        let (key, part_metrics) = test_cache_context(&strategy);
928
929        let mut buffer = CacheBatchBuffer::new(&strategy);
930        buffer.push(batch).unwrap();
931
932        let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
933            .await
934            .unwrap();
935        assert_eq!(value.cached_batches.len(), 1);
936        assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
937        assert_eq!(value.estimated_batches_size, expected_size);
938        assert!(Arc::ptr_eq(
939            &value,
940            &strategy.get_range_result(&key).unwrap()
941        ));
942    }
943
944    #[tokio::test]
945    async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
946        let strategy = test_cache_strategy();
947        let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
948        let (key, part_metrics) = test_cache_context(&strategy);
949
950        let mut buffer = CacheBatchBuffer::new(&strategy);
951        buffer.push(batch.clone()).unwrap();
952        buffer.push(batch).unwrap();
953
954        assert_eq!(buffer.buffered_rows, 0);
955        assert!(buffer.buffered_batches.is_empty());
956
957        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
958            .await
959            .unwrap();
960        assert_eq!(value.cached_batches.len(), 1);
961        assert_eq!(
962            value.cached_batches[0].slice_lengths,
963            vec![
964                DEFAULT_READ_BATCH_SIZE / 2 + 1,
965                DEFAULT_READ_BATCH_SIZE / 2 + 1
966            ]
967        );
968    }
969
970    #[tokio::test]
971    async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
972        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
973        let strategy = CacheStrategy::EnableAll(Arc::new(
974            CacheManager::builder()
975                .range_result_cache_size((large_batch.get_array_memory_size() * 3) as u64)
976                .build(),
977        ));
978        let (key, part_metrics) = test_cache_context(&strategy);
979
980        let mut buffer = CacheBatchBuffer::new(&strategy);
981        buffer.push(large_batch.clone()).unwrap();
982
983        assert_eq!(buffer.buffered_rows, large_batch.num_rows());
984        assert_eq!(buffer.buffered_batches.len(), 1);
985
986        buffer.push(large_batch.clone()).unwrap();
987
988        assert_eq!(buffer.buffered_rows, 0);
989        assert!(buffer.buffered_batches.is_empty());
990
991        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
992            .await
993            .unwrap();
994        assert_eq!(value.cached_batches.len(), 1);
995        assert_eq!(
996            value.cached_batches[0].slice_lengths,
997            vec![large_batch.num_rows(), large_batch.num_rows()]
998        );
999    }
1000
1001    #[tokio::test]
1002    async fn cache_batch_buffer_skips_cache_when_compacted_size_exceeds_limit() {
1003        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE / 2 + 1, 4096);
1004        // Budget only fits two large batches.
1005        let budget = (large_batch.get_array_memory_size() as u64) * 2 + 1;
1006        let strategy = CacheStrategy::EnableAll(Arc::new(
1007            CacheManager::builder()
1008                .range_result_cache_size(budget)
1009                .build(),
1010        ));
1011        let (key, part_metrics) = test_cache_context(&strategy);
1012
1013        let mut buffer = CacheBatchBuffer::new(&strategy);
1014        for _ in 0..4 {
1015            buffer.push(large_batch.clone()).unwrap();
1016        }
1017        assert!(
1018            finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
1019                .await
1020                .is_err()
1021        );
1022        assert!(strategy.get_range_result(&key).is_none());
1023    }
1024}