Skip to main content

mito2/read/
range_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for the partition range scan result cache.
16
17use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_telemetry::warn;
22use common_time::range::TimestampRange;
23use datatypes::arrow::compute::concat_batches;
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::region_engine::PartitionRange;
29use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
30use tokio::sync::{mpsc, oneshot};
31
32use crate::cache::CacheStrategy;
33use crate::error::{ComputeArrowSnafu, Result};
34use crate::read::BoxedRecordBatchStream;
35use crate::read::scan_region::StreamContext;
36use crate::read::scan_util::PartitionMetrics;
37use crate::region::options::MergeMode;
38use crate::sst::file::FileTimeRange;
39use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
40
41const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
42
43/// Fingerprint of the scan request fields that affect partition range cache reuse.
44///
45/// It records a normalized view of the projected columns and filters, plus
46/// scan options that can change the returned rows. Schema-dependent metadata
47/// and the partition expression version are included so cached results are not
48/// reused across incompatible schema or partitioning changes.
49#[derive(Debug, Clone, PartialEq, Eq, Hash)]
50pub(crate) struct ScanRequestFingerprint {
51    /// Projection and filters without the time index and partition exprs.
52    inner: Arc<SharedScanRequestFingerprint>,
53    /// Filters with the time index column.
54    time_filters: Option<Arc<Vec<String>>>,
55    series_row_selector: Option<TimeSeriesRowSelector>,
56    append_mode: bool,
57    filter_deleted: bool,
58    merge_mode: MergeMode,
59    /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
60    /// We store the version instead of the whole partition expr or partition expr filters.
61    partition_expr_version: u64,
62}
63
64#[derive(Debug)]
65pub(crate) struct ScanRequestFingerprintBuilder {
66    pub(crate) read_column_ids: Vec<ColumnId>,
67    pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
68    pub(crate) filters: Vec<String>,
69    pub(crate) time_filters: Vec<String>,
70    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
71    pub(crate) append_mode: bool,
72    pub(crate) filter_deleted: bool,
73    pub(crate) merge_mode: MergeMode,
74    pub(crate) partition_expr_version: u64,
75}
76
77impl ScanRequestFingerprintBuilder {
78    pub(crate) fn build(self) -> ScanRequestFingerprint {
79        let Self {
80            read_column_ids,
81            read_column_types,
82            filters,
83            time_filters,
84            series_row_selector,
85            append_mode,
86            filter_deleted,
87            merge_mode,
88            partition_expr_version,
89        } = self;
90
91        ScanRequestFingerprint {
92            inner: Arc::new(SharedScanRequestFingerprint {
93                read_column_ids,
94                read_column_types,
95                filters,
96            }),
97            time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
98            series_row_selector,
99            append_mode,
100            filter_deleted,
101            merge_mode,
102            partition_expr_version,
103        }
104    }
105}
106
107/// Non-copiable struct of the fingerprint.
108#[derive(Debug, PartialEq, Eq, Hash)]
109struct SharedScanRequestFingerprint {
110    /// Column ids of the projection.
111    read_column_ids: Vec<ColumnId>,
112    /// Column types of the projection.
113    /// We keep this to ensure we won't reuse the fingerprint after a schema change.
114    read_column_types: Vec<Option<ConcreteDataType>>,
115    /// Filters without the time index column and region partition exprs.
116    filters: Vec<String>,
117}
118
119impl ScanRequestFingerprint {
120    #[cfg(test)]
121    pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
122        &self.inner.read_column_ids
123    }
124
125    #[cfg(test)]
126    pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
127        &self.inner.read_column_types
128    }
129
130    #[cfg(test)]
131    pub(crate) fn filters(&self) -> &[String] {
132        &self.inner.filters
133    }
134
135    #[cfg(test)]
136    pub(crate) fn time_filters(&self) -> &[String] {
137        self.time_filters
138            .as_deref()
139            .map(Vec::as_slice)
140            .unwrap_or(&[])
141    }
142
143    pub(crate) fn without_time_filters(&self) -> Self {
144        Self {
145            inner: Arc::clone(&self.inner),
146            time_filters: None,
147            series_row_selector: self.series_row_selector,
148            append_mode: self.append_mode,
149            filter_deleted: self.filter_deleted,
150            merge_mode: self.merge_mode,
151            partition_expr_version: self.partition_expr_version,
152        }
153    }
154
155    pub(crate) fn estimated_size(&self) -> usize {
156        mem::size_of::<SharedScanRequestFingerprint>()
157            + self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
158            + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
159            + self.inner.filters.capacity() * mem::size_of::<String>()
160            + self
161                .inner
162                .filters
163                .iter()
164                .map(|filter| filter.capacity())
165                .sum::<usize>()
166            + self.time_filters.as_ref().map_or(0, |filters| {
167                mem::size_of::<Vec<String>>()
168                    + filters.capacity() * mem::size_of::<String>()
169                    + filters
170                        .iter()
171                        .map(|filter| filter.capacity())
172                        .sum::<usize>()
173            })
174    }
175}
176
177/// Cache key for range scan outputs.
178#[derive(Debug, Clone, PartialEq, Eq, Hash)]
179pub(crate) struct RangeScanCacheKey {
180    pub(crate) region_id: RegionId,
181    /// Sorted (file_id, row_group_index) pairs that uniquely identify the data this range covers.
182    pub(crate) row_groups: Vec<(FileId, i64)>,
183    pub(crate) scan: ScanRequestFingerprint,
184}
185
186impl RangeScanCacheKey {
187    pub(crate) fn estimated_size(&self) -> usize {
188        mem::size_of::<Self>()
189            + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
190            + self.scan.estimated_size()
191    }
192}
193
194/// Cached result for one range scan.
195#[derive(Debug)]
196pub(crate) struct CachedBatchSlice {
197    batch: RecordBatch,
198    slice_lengths: Vec<usize>,
199}
200
201impl CachedBatchSlice {
202    fn metadata_size(&self) -> usize {
203        self.slice_lengths.capacity() * mem::size_of::<usize>()
204    }
205}
206
207pub(crate) struct RangeScanCacheValue {
208    cached_batches: Vec<CachedBatchSlice>,
209    /// Precomputed size of all compacted batches.
210    estimated_batches_size: usize,
211}
212
213impl RangeScanCacheValue {
214    pub(crate) fn new(
215        cached_batches: Vec<CachedBatchSlice>,
216        estimated_batches_size: usize,
217    ) -> Self {
218        Self {
219            cached_batches,
220            estimated_batches_size,
221        }
222    }
223
224    pub(crate) fn estimated_size(&self) -> usize {
225        mem::size_of::<Self>()
226            + self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
227            + self
228                .cached_batches
229                .iter()
230                .map(CachedBatchSlice::metadata_size)
231                .sum::<usize>()
232            + self.estimated_batches_size
233    }
234}
235
236/// Row groups and whether all sources are file-only for a partition range.
237pub(crate) struct PartitionRangeRowGroups {
238    /// Sorted (file_id, row_group_index) pairs.
239    pub(crate) row_groups: Vec<(FileId, i64)>,
240    pub(crate) only_file_sources: bool,
241}
242
243/// Collects (file_id, row_group_index) pairs from a partition range's row group indices.
244pub(crate) fn collect_partition_range_row_groups(
245    stream_ctx: &StreamContext,
246    part_range: &PartitionRange,
247) -> PartitionRangeRowGroups {
248    let range_meta = &stream_ctx.ranges[part_range.identifier];
249    let mut row_groups = Vec::new();
250    let mut only_file_sources = true;
251
252    for index in &range_meta.row_group_indices {
253        if stream_ctx.is_file_range_index(*index) {
254            let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
255            row_groups.push((file_id, index.row_group_index));
256        } else {
257            only_file_sources = false;
258        }
259    }
260
261    row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
262
263    PartitionRangeRowGroups {
264        row_groups,
265        only_file_sources,
266    }
267}
268
269/// Builds a cache key for the given partition range if it is eligible for caching.
270pub(crate) fn build_range_cache_key(
271    stream_ctx: &StreamContext,
272    part_range: &PartitionRange,
273) -> Option<RangeScanCacheKey> {
274    if !stream_ctx.input.cache_strategy.has_range_result_cache() {
275        return None;
276    }
277
278    let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
279
280    // Dyn filters can change at runtime, so we can't cache when they're present.
281    let has_dyn_filters = stream_ctx
282        .input
283        .predicate_group()
284        .predicate_without_region()
285        .is_some_and(|p| !p.dyn_filters().is_empty());
286    if has_dyn_filters {
287        return None;
288    }
289
290    let rg = collect_partition_range_row_groups(stream_ctx, part_range);
291    if !rg.only_file_sources || rg.row_groups.is_empty() {
292        return None;
293    }
294
295    let range_meta = &stream_ctx.ranges[part_range.identifier];
296    let scan = if query_time_range_covers_partition_range(
297        stream_ctx.input.time_range.as_ref(),
298        range_meta.time_range,
299    ) {
300        fingerprint.without_time_filters()
301    } else {
302        fingerprint.clone()
303    };
304
305    Some(RangeScanCacheKey {
306        region_id: stream_ctx.input.region_metadata().region_id,
307        row_groups: rg.row_groups,
308        scan,
309    })
310}
311
312fn query_time_range_covers_partition_range(
313    query_time_range: Option<&TimestampRange>,
314    partition_time_range: FileTimeRange,
315) -> bool {
316    let Some(query_time_range) = query_time_range else {
317        return true;
318    };
319
320    let (part_start, part_end) = partition_time_range;
321    query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
322}
323
324/// Returns a stream that replays cached record batches.
325pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
326    Box::pin(try_stream! {
327        for cached_batch in &value.cached_batches {
328            let mut offset = 0;
329            for &len in &cached_batch.slice_lengths {
330                yield cached_batch.batch.slice(offset, len);
331                offset += len;
332            }
333        }
334    })
335}
336
337enum CacheConcatCommand {
338    Compact(Vec<RecordBatch>),
339    Finish {
340        pending: Vec<RecordBatch>,
341        key: RangeScanCacheKey,
342        cache_strategy: CacheStrategy,
343        part_metrics: PartitionMetrics,
344        result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
345    },
346}
347
348#[derive(Default)]
349struct CacheConcatState {
350    cached_batches: Vec<CachedBatchSlice>,
351    estimated_size: usize,
352}
353
354impl CacheConcatState {
355    async fn compact(
356        &mut self,
357        batches: Vec<RecordBatch>,
358        limiter: &crate::cache::RangeResultMemoryLimiter,
359    ) -> Result<()> {
360        if batches.is_empty() {
361            return Ok(());
362        }
363
364        let input_size = batches
365            .iter()
366            .map(RecordBatch::get_array_memory_size)
367            .sum::<usize>();
368        let _permit = limiter.acquire(input_size).await?;
369
370        let compacted = compact_record_batches(batches)?;
371        self.estimated_size += compacted.batch.get_array_memory_size();
372        self.cached_batches.push(compacted);
373        Ok(())
374    }
375
376    fn finish(self) -> RangeScanCacheValue {
377        RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
378    }
379}
380
381fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
382    debug_assert!(!batches.is_empty());
383
384    let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
385    build_cached_batch_slice(batches, slice_lengths)
386}
387
388fn build_cached_batch_slice(
389    batches: Vec<RecordBatch>,
390    slice_lengths: Vec<usize>,
391) -> Result<CachedBatchSlice> {
392    let batch = if batches.len() == 1 {
393        batches.into_iter().next().unwrap()
394    } else {
395        let schema = batches[0].schema();
396        concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
397    };
398
399    Ok(CachedBatchSlice {
400        batch,
401        slice_lengths,
402    })
403}
404
405async fn run_cache_concat_task(
406    mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
407    limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
408    skip_threshold_bytes: usize,
409) {
410    let mut state = CacheConcatState::default();
411
412    while let Some(cmd) = rx.recv().await {
413        match cmd {
414            CacheConcatCommand::Compact(batches) => {
415                if let Err(err) = state.compact(batches, &limiter).await {
416                    warn!(err; "Failed to compact range cache batches");
417                    return;
418                }
419                // Close the channel to stop further work as soon as the cached
420                // size exceeds the configured cache budget.
421                if state.estimated_size > skip_threshold_bytes {
422                    return;
423                }
424            }
425            CacheConcatCommand::Finish {
426                pending,
427                key,
428                cache_strategy,
429                part_metrics,
430                result_tx,
431            } => {
432                let compact_result = state
433                    .compact(pending, &limiter)
434                    .await
435                    .map(|()| state.finish());
436                let result = match compact_result {
437                    Ok(v) => {
438                        let value = Arc::new(v);
439                        part_metrics
440                            .inc_range_cache_size(key.estimated_size() + value.estimated_size());
441                        cache_strategy.put_range_result(key, value.clone());
442
443                        Ok(value)
444                    }
445                    Err(e) => {
446                        warn!(e; "Failed to finalize range cache batches");
447
448                        Err(e)
449                    }
450                };
451
452                if let Some(tx) = result_tx {
453                    let _ = tx.send(result);
454                }
455
456                break;
457            }
458        }
459    }
460}
461
462struct CacheBatchBuffer {
463    buffered_batches: Vec<RecordBatch>,
464    buffered_rows: usize,
465    buffered_size: usize,
466    sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
467}
468
469impl CacheBatchBuffer {
470    fn new(cache_strategy: &CacheStrategy) -> Self {
471        let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
472            let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0);
473            let (tx, rx) = mpsc::unbounded_channel();
474            common_runtime::spawn_global(run_cache_concat_task(
475                rx,
476                limiter.clone(),
477                skip_threshold_bytes,
478            ));
479            tx
480        });
481
482        Self {
483            buffered_batches: Vec::new(),
484            buffered_rows: 0,
485            buffered_size: 0,
486            sender,
487        }
488    }
489
490    fn push(&mut self, batch: RecordBatch) -> Result<()> {
491        if self.sender.is_none() {
492            return Ok(());
493        }
494
495        self.buffered_rows += batch.num_rows();
496        self.buffered_size += batch.get_array_memory_size();
497        self.buffered_batches.push(batch);
498
499        if self.buffered_batches.len() > 1
500            && (self.buffered_rows > DEFAULT_READ_BATCH_SIZE
501                || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES)
502        {
503            self.notify_compact();
504        }
505
506        Ok(())
507    }
508
509    fn notify_compact(&mut self) {
510        if self.buffered_batches.is_empty() || self.sender.is_none() {
511            return;
512        }
513
514        let batches = mem::take(&mut self.buffered_batches);
515        self.buffered_rows = 0;
516        self.buffered_size = 0;
517
518        let Some(sender) = &self.sender else {
519            return;
520        };
521        if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
522            self.sender = None;
523        }
524    }
525
526    fn finish(
527        mut self,
528        key: RangeScanCacheKey,
529        cache_strategy: CacheStrategy,
530        part_metrics: PartitionMetrics,
531        result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
532    ) {
533        let Some(sender) = self.sender.take() else {
534            return;
535        };
536
537        if sender
538            .send(CacheConcatCommand::Finish {
539                pending: mem::take(&mut self.buffered_batches),
540                key,
541                cache_strategy,
542                part_metrics,
543                result_tx,
544            })
545            .is_err()
546        {
547            self.sender = None;
548        }
549    }
550}
551
552/// Wraps a stream to cache its output for future range cache hits.
553pub(crate) fn cache_flat_range_stream(
554    mut stream: BoxedRecordBatchStream,
555    cache_strategy: CacheStrategy,
556    key: RangeScanCacheKey,
557    part_metrics: PartitionMetrics,
558) -> BoxedRecordBatchStream {
559    Box::pin(try_stream! {
560        let mut buffer = CacheBatchBuffer::new(&cache_strategy);
561        while let Some(batch) = stream.try_next().await? {
562            buffer.push(batch.clone())?;
563            yield batch;
564        }
565
566        buffer.finish(key, cache_strategy, part_metrics, None);
567    })
568}
569
570/// Creates a `cache_flat_range_stream` with dummy internals for benchmarking.
571///
572/// This avoids exposing `RangeScanCacheKey`, `ScanRequestFingerprint`, and
573/// `PartitionMetrics` publicly.
574#[cfg(feature = "test")]
575pub fn bench_cache_flat_range_stream(
576    stream: BoxedRecordBatchStream,
577    cache_size_bytes: u64,
578    region_id: RegionId,
579) -> BoxedRecordBatchStream {
580    use std::time::Instant;
581
582    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
583
584    use crate::region::options::MergeMode;
585
586    let cache_manager = Arc::new(
587        crate::cache::CacheManager::builder()
588            .range_result_cache_size(cache_size_bytes)
589            .build(),
590    );
591    let cache_strategy = CacheStrategy::EnableAll(cache_manager);
592
593    let fingerprint = ScanRequestFingerprintBuilder {
594        read_column_ids: vec![],
595        read_column_types: vec![],
596        filters: vec![],
597        time_filters: vec![],
598        series_row_selector: None,
599        append_mode: false,
600        filter_deleted: false,
601        merge_mode: MergeMode::LastRow,
602        partition_expr_version: 0,
603    }
604    .build();
605
606    let key = RangeScanCacheKey {
607        region_id,
608        row_groups: vec![],
609        scan: fingerprint,
610    };
611
612    let metrics_set = ExecutionPlanMetricsSet::new();
613    let part_metrics =
614        PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
615
616    cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
617}
618
619#[cfg(test)]
620mod tests {
621    use std::sync::Arc;
622    use std::time::Instant;
623
624    use common_time::Timestamp;
625    use common_time::range::TimestampRange;
626    use common_time::timestamp::TimeUnit;
627    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
628    use datafusion_common::ScalarValue;
629    use datafusion_expr::{Expr, col, lit};
630    use smallvec::smallvec;
631    use store_api::storage::{FileId, RegionId};
632
633    use super::*;
634    use crate::cache::CacheManager;
635    use crate::read::flat_projection::FlatProjectionMapper;
636    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
637    use crate::read::scan_region::{PredicateGroup, ScanInput};
638    use crate::test_util::memtable_util::metadata_with_primary_key;
639    use crate::test_util::scheduler_util::SchedulerEnv;
640    use crate::test_util::sst_util::sst_file_handle_with_file_id;
641
642    fn test_cache_strategy() -> CacheStrategy {
643        CacheStrategy::EnableAll(Arc::new(
644            CacheManager::builder()
645                .range_result_cache_size(1024 * 1024)
646                .build(),
647        ))
648    }
649
650    fn test_scan_fingerprint(
651        filters: Vec<String>,
652        time_filters: Vec<String>,
653        series_row_selector: Option<TimeSeriesRowSelector>,
654        filter_deleted: bool,
655        partition_expr_version: u64,
656    ) -> ScanRequestFingerprint {
657        ScanRequestFingerprintBuilder {
658            read_column_ids: vec![1, 2],
659            read_column_types: vec![None, None],
660            filters,
661            time_filters,
662            series_row_selector,
663            append_mode: false,
664            filter_deleted,
665            merge_mode: MergeMode::LastRow,
666            partition_expr_version,
667        }
668        .build()
669    }
670
671    fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
672        let region_id = RegionId::new(1, 1);
673        let key = RangeScanCacheKey {
674            region_id,
675            row_groups: vec![],
676            scan: test_scan_fingerprint(vec![], vec![], None, false, 0),
677        };
678
679        let metrics_set = ExecutionPlanMetricsSet::new();
680        let part_metrics =
681            PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
682
683        assert!(strategy.get_range_result(&key).is_none());
684        (key, part_metrics)
685    }
686
687    async fn finish_cache_batch_buffer(
688        buffer: CacheBatchBuffer,
689        key: RangeScanCacheKey,
690        cache_strategy: CacheStrategy,
691        part_metrics: PartitionMetrics,
692    ) -> Result<Arc<RangeScanCacheValue>> {
693        let (tx, rx) = oneshot::channel();
694        common_telemetry::info!("finish start");
695        buffer.finish(key, cache_strategy, part_metrics, Some(tx));
696        common_telemetry::info!("finish end");
697        rx.await.context(crate::error::RecvSnafu)?
698    }
699
700    async fn new_stream_context(
701        filters: Vec<Expr>,
702        query_time_range: Option<TimestampRange>,
703        partition_time_range: FileTimeRange,
704    ) -> (StreamContext, PartitionRange) {
705        let env = SchedulerEnv::new().await;
706        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
707        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
708        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
709        let file_id = FileId::random();
710        let file = sst_file_handle_with_file_id(
711            file_id,
712            partition_time_range.0.value(),
713            partition_time_range.1.value(),
714        );
715        let input = ScanInput::new(env.access_layer.clone(), mapper)
716            .with_predicate(predicate)
717            .with_time_range(query_time_range)
718            .with_files(vec![file])
719            .with_cache(test_cache_strategy());
720        let range_meta = RangeMeta {
721            time_range: partition_time_range,
722            indices: smallvec![SourceIndex {
723                index: 0,
724                num_row_groups: 1,
725            }],
726            row_group_indices: smallvec![RowGroupIndex {
727                index: 0,
728                row_group_index: 0,
729            }],
730            num_rows: 10,
731        };
732        let partition_range = range_meta.new_partition_range(0);
733        let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
734        let stream_ctx = StreamContext {
735            input,
736            ranges: vec![range_meta],
737            scan_fingerprint,
738            query_start: Instant::now(),
739        };
740
741        (stream_ctx, partition_range)
742    }
743
744    /// Helper to create a timestamp millisecond literal.
745    fn ts_lit(val: i64) -> Expr {
746        lit(ScalarValue::TimestampMillisecond(Some(val), None))
747    }
748
749    fn normalized_exprs(exprs: impl IntoIterator<Item = Expr>) -> Vec<String> {
750        let mut exprs = exprs
751            .into_iter()
752            .map(|expr| expr.to_string())
753            .collect::<Vec<_>>();
754        exprs.sort_unstable();
755        exprs
756    }
757
758    async fn assert_range_cache_filters(
759        filters: Vec<Expr>,
760        query_time_range: Option<TimestampRange>,
761        partition_time_range: FileTimeRange,
762        expected_filters: Vec<Expr>,
763        expected_time_filters: Vec<Expr>,
764    ) {
765        let (stream_ctx, part_range) =
766            new_stream_context(filters, query_time_range, partition_time_range).await;
767
768        let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
769
770        assert_eq!(
771            key.scan.filters(),
772            normalized_exprs(expected_filters).as_slice()
773        );
774        assert_eq!(
775            key.scan.time_filters(),
776            normalized_exprs(expected_time_filters).as_slice()
777        );
778    }
779
780    #[tokio::test]
781    async fn strips_time_only_filters_when_query_covers_partition_range() {
782        assert_range_cache_filters(
783            vec![
784                col("ts").gt_eq(ts_lit(1000)),
785                col("ts").lt(ts_lit(2001)),
786                col("ts").is_not_null(),
787                col("k0").eq(lit("foo")),
788            ],
789            TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
790            (
791                Timestamp::new_millisecond(1000),
792                Timestamp::new_millisecond(2000),
793            ),
794            vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
795            vec![],
796        )
797        .await;
798    }
799
800    #[tokio::test]
801    async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
802        assert_range_cache_filters(
803            vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
804            TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
805            (
806                Timestamp::new_millisecond(1000),
807                Timestamp::new_millisecond(2000),
808            ),
809            vec![col("k0").eq(lit("foo"))],
810            vec![col("ts").gt_eq(ts_lit(1000))],
811        )
812        .await;
813    }
814
815    #[tokio::test]
816    async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
817        assert_range_cache_filters(
818            vec![
819                col("ts").gt_eq(ts_lit(1000)),
820                col("ts").is_not_null(),
821                col("k0").eq(lit("foo")),
822            ],
823            None,
824            (
825                Timestamp::new_millisecond(1000),
826                Timestamp::new_millisecond(2000),
827            ),
828            vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
829            vec![],
830        )
831        .await;
832    }
833
834    #[test]
835    fn normalizes_and_clears_time_filters() {
836        let normalized =
837            test_scan_fingerprint(vec!["k0 = 'foo'".to_string()], vec![], None, true, 0);
838
839        assert!(normalized.time_filters().is_empty());
840
841        let fingerprint = test_scan_fingerprint(
842            vec!["k0 = 'foo'".to_string()],
843            vec!["ts >= 1000".to_string()],
844            Some(TimeSeriesRowSelector::LastRow),
845            true,
846            7,
847        );
848
849        let reset = fingerprint.without_time_filters();
850
851        assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
852        assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
853        assert_eq!(reset.filters(), fingerprint.filters());
854        assert!(reset.time_filters().is_empty());
855        assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
856        assert_eq!(reset.append_mode, fingerprint.append_mode);
857        assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
858        assert_eq!(reset.merge_mode, fingerprint.merge_mode);
859        assert_eq!(
860            reset.partition_expr_version,
861            fingerprint.partition_expr_version
862        );
863    }
864
865    fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
866        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
867
868        Arc::new(Schema::new(vec![Field::new(
869            "value",
870            ArrowDataType::Int64,
871            false,
872        )]))
873    }
874
875    fn make_batch(values: &[i64]) -> RecordBatch {
876        use datatypes::arrow::array::Int64Array;
877
878        RecordBatch::try_new(
879            test_schema(),
880            vec![Arc::new(Int64Array::from(values.to_vec()))],
881        )
882        .unwrap()
883    }
884
885    fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
886        use datatypes::arrow::array::BinaryArray;
887        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
888
889        let schema = Arc::new(Schema::new(vec![Field::new(
890            "value",
891            ArrowDataType::Binary,
892            false,
893        )]));
894        let payload = vec![b'x'; bytes_per_row];
895        let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
896
897        RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
898    }
899
900    #[test]
901    fn compact_record_batches_keeps_original_boundaries() {
902        let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
903
904        let compacted = compact_record_batches(batches).unwrap();
905
906        assert_eq!(compacted.batch.num_rows(), 5);
907        assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
908    }
909
910    #[tokio::test]
911    async fn cached_flat_range_stream_replays_original_batches() {
912        let value = Arc::new(RangeScanCacheValue::new(
913            vec![CachedBatchSlice {
914                batch: make_batch(&[1, 2, 3]),
915                slice_lengths: vec![2, 1],
916            }],
917            make_batch(&[1, 2, 3]).get_array_memory_size(),
918        ));
919
920        let replayed = cached_flat_range_stream(value)
921            .try_collect::<Vec<_>>()
922            .await
923            .unwrap();
924
925        assert_eq!(replayed.len(), 2);
926        assert_eq!(replayed[0].num_rows(), 2);
927        assert_eq!(replayed[1].num_rows(), 1);
928    }
929
930    #[tokio::test]
931    async fn cache_batch_buffer_finishes_pending_batches() {
932        let strategy = test_cache_strategy();
933        let batch = make_batch(&[1, 2, 3]);
934        let expected_size = batch.get_array_memory_size();
935        let (key, part_metrics) = test_cache_context(&strategy);
936
937        let mut buffer = CacheBatchBuffer::new(&strategy);
938        buffer.push(batch).unwrap();
939
940        let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
941            .await
942            .unwrap();
943        assert_eq!(value.cached_batches.len(), 1);
944        assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
945        assert_eq!(value.estimated_batches_size, expected_size);
946        assert!(Arc::ptr_eq(
947            &value,
948            &strategy.get_range_result(&key).unwrap()
949        ));
950    }
951
952    #[tokio::test]
953    async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
954        let strategy = test_cache_strategy();
955        let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
956        let (key, part_metrics) = test_cache_context(&strategy);
957
958        let mut buffer = CacheBatchBuffer::new(&strategy);
959        buffer.push(batch.clone()).unwrap();
960        buffer.push(batch).unwrap();
961
962        assert_eq!(buffer.buffered_rows, 0);
963        assert!(buffer.buffered_batches.is_empty());
964
965        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
966            .await
967            .unwrap();
968        assert_eq!(value.cached_batches.len(), 1);
969        assert_eq!(
970            value.cached_batches[0].slice_lengths,
971            vec![
972                DEFAULT_READ_BATCH_SIZE / 2 + 1,
973                DEFAULT_READ_BATCH_SIZE / 2 + 1
974            ]
975        );
976    }
977
978    #[tokio::test]
979    async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
980        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
981        let strategy = CacheStrategy::EnableAll(Arc::new(
982            CacheManager::builder()
983                .range_result_cache_size((large_batch.get_array_memory_size() * 3) as u64)
984                .build(),
985        ));
986        let (key, part_metrics) = test_cache_context(&strategy);
987
988        let mut buffer = CacheBatchBuffer::new(&strategy);
989        buffer.push(large_batch.clone()).unwrap();
990
991        assert_eq!(buffer.buffered_rows, large_batch.num_rows());
992        assert_eq!(buffer.buffered_batches.len(), 1);
993
994        buffer.push(large_batch.clone()).unwrap();
995
996        assert_eq!(buffer.buffered_rows, 0);
997        assert!(buffer.buffered_batches.is_empty());
998
999        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1000            .await
1001            .unwrap();
1002        assert_eq!(value.cached_batches.len(), 1);
1003        assert_eq!(
1004            value.cached_batches[0].slice_lengths,
1005            vec![large_batch.num_rows(), large_batch.num_rows()]
1006        );
1007    }
1008
1009    #[tokio::test]
1010    async fn cache_batch_buffer_skips_cache_when_compacted_size_exceeds_limit() {
1011        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE / 2 + 1, 4096);
1012        // Budget only fits two large batches.
1013        let budget = (large_batch.get_array_memory_size() as u64) * 2 + 1;
1014        let strategy = CacheStrategy::EnableAll(Arc::new(
1015            CacheManager::builder()
1016                .range_result_cache_size(budget)
1017                .build(),
1018        ));
1019        let (key, part_metrics) = test_cache_context(&strategy);
1020
1021        let mut buffer = CacheBatchBuffer::new(&strategy);
1022        for _ in 0..4 {
1023            buffer.push(large_batch.clone()).unwrap();
1024        }
1025        assert!(
1026            finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
1027                .await
1028                .is_err()
1029        );
1030        assert!(strategy.get_range_result(&key).is_none());
1031    }
1032}