mito2/read/
range_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for the partition range scan result cache.
16
17use std::mem;
18use std::sync::Arc;
19
20use datatypes::arrow::record_batch::RecordBatch;
21use datatypes::prelude::ConcreteDataType;
22use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
23
24use crate::memtable::record_batch_estimated_size;
25use crate::region::options::MergeMode;
26
27/// Fingerprint of the scan request fields that affect partition range cache reuse.
28///
29/// It records a normalized view of the projected columns and filters, plus
30/// scan options that can change the returned rows. Schema-dependent metadata
31/// and the partition expression version are included so cached results are not
32/// reused across incompatible schema or partitioning changes.
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub(crate) struct ScanRequestFingerprint {
35    /// Projection and filters without the time index and partition exprs.
36    inner: Arc<SharedScanRequestFingerprint>,
37    /// Filters with the time index column.
38    time_filters: Option<Arc<Vec<String>>>,
39    series_row_selector: Option<TimeSeriesRowSelector>,
40    append_mode: bool,
41    filter_deleted: bool,
42    merge_mode: MergeMode,
43    /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
44    /// We store the version instead of the whole partition expr or partition expr filters.
45    partition_expr_version: u64,
46}
47
48#[derive(Debug)]
49pub(crate) struct ScanRequestFingerprintBuilder {
50    pub(crate) read_column_ids: Vec<ColumnId>,
51    pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
52    pub(crate) filters: Vec<String>,
53    pub(crate) time_filters: Vec<String>,
54    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
55    pub(crate) append_mode: bool,
56    pub(crate) filter_deleted: bool,
57    pub(crate) merge_mode: MergeMode,
58    pub(crate) partition_expr_version: u64,
59}
60
61impl ScanRequestFingerprintBuilder {
62    pub(crate) fn build(self) -> ScanRequestFingerprint {
63        let Self {
64            read_column_ids,
65            read_column_types,
66            filters,
67            time_filters,
68            series_row_selector,
69            append_mode,
70            filter_deleted,
71            merge_mode,
72            partition_expr_version,
73        } = self;
74
75        ScanRequestFingerprint {
76            inner: Arc::new(SharedScanRequestFingerprint {
77                read_column_ids,
78                read_column_types,
79                filters,
80            }),
81            time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
82            series_row_selector,
83            append_mode,
84            filter_deleted,
85            merge_mode,
86            partition_expr_version,
87        }
88    }
89}
90
91/// Non-copiable struct of the fingerprint.
92#[derive(Debug, PartialEq, Eq, Hash)]
93struct SharedScanRequestFingerprint {
94    /// Column ids of the projection.
95    read_column_ids: Vec<ColumnId>,
96    /// Column types of the projection.
97    /// We keep this to ensure we won't reuse the fingerprint after a schema change.
98    read_column_types: Vec<Option<ConcreteDataType>>,
99    /// Filters without the time index column and region partition exprs.
100    filters: Vec<String>,
101}
102
103impl ScanRequestFingerprint {
104    #[cfg(test)]
105    pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
106        &self.inner.read_column_ids
107    }
108
109    #[cfg(test)]
110    pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
111        &self.inner.read_column_types
112    }
113
114    #[cfg(test)]
115    pub(crate) fn filters(&self) -> &[String] {
116        &self.inner.filters
117    }
118
119    #[cfg(test)]
120    pub(crate) fn time_filters(&self) -> &[String] {
121        self.time_filters
122            .as_deref()
123            .map(Vec::as_slice)
124            .unwrap_or(&[])
125    }
126
127    #[cfg(test)]
128    pub(crate) fn without_time_filters(&self) -> Self {
129        Self {
130            inner: Arc::clone(&self.inner),
131            time_filters: None,
132            series_row_selector: self.series_row_selector,
133            append_mode: self.append_mode,
134            filter_deleted: self.filter_deleted,
135            merge_mode: self.merge_mode,
136            partition_expr_version: self.partition_expr_version,
137        }
138    }
139
140    pub(crate) fn estimated_size(&self) -> usize {
141        mem::size_of::<SharedScanRequestFingerprint>()
142            + self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
143            + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
144            + self.inner.filters.capacity() * mem::size_of::<String>()
145            + self
146                .inner
147                .filters
148                .iter()
149                .map(|filter| filter.capacity())
150                .sum::<usize>()
151            + self.time_filters.as_ref().map_or(0, |filters| {
152                mem::size_of::<Vec<String>>()
153                    + filters.capacity() * mem::size_of::<String>()
154                    + filters
155                        .iter()
156                        .map(|filter| filter.capacity())
157                        .sum::<usize>()
158            })
159    }
160}
161
162/// Cache key for range scan outputs.
163#[derive(Debug, Clone, PartialEq, Eq, Hash)]
164pub(crate) struct RangeScanCacheKey {
165    pub(crate) region_id: RegionId,
166    /// Sorted (file_id, row_group_index) pairs that uniquely identify the covered data.
167    pub(crate) row_groups: Vec<(FileId, i64)>,
168    pub(crate) scan: ScanRequestFingerprint,
169}
170
171impl RangeScanCacheKey {
172    pub(crate) fn estimated_size(&self) -> usize {
173        mem::size_of::<Self>()
174            + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
175            + self.scan.estimated_size()
176    }
177}
178
179/// Cached result for one range scan.
180pub(crate) struct RangeScanCacheValue {
181    pub(crate) batches: Vec<RecordBatch>,
182}
183
184impl RangeScanCacheValue {
185    #[cfg_attr(not(test), allow(dead_code))]
186    pub(crate) fn new(batches: Vec<RecordBatch>) -> Self {
187        Self { batches }
188    }
189
190    pub(crate) fn estimated_size(&self) -> usize {
191        mem::size_of::<Self>()
192            + self.batches.capacity() * mem::size_of::<RecordBatch>()
193            + self
194                .batches
195                .iter()
196                .map(record_batch_estimated_size)
197                .sum::<usize>()
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use store_api::storage::TimeSeriesRowSelector;
204
205    use super::*;
206
207    #[test]
208    fn normalizes_and_clears_time_filters() {
209        let normalized = ScanRequestFingerprintBuilder {
210            read_column_ids: vec![1, 2],
211            read_column_types: vec![None, None],
212            filters: vec!["k0 = 'foo'".to_string()],
213            time_filters: vec![],
214            series_row_selector: None,
215            append_mode: false,
216            filter_deleted: true,
217            merge_mode: MergeMode::LastRow,
218            partition_expr_version: 0,
219        }
220        .build();
221
222        assert!(normalized.time_filters().is_empty());
223
224        let fingerprint = ScanRequestFingerprintBuilder {
225            read_column_ids: vec![1, 2],
226            read_column_types: vec![None, None],
227            filters: vec!["k0 = 'foo'".to_string()],
228            time_filters: vec!["ts >= 1000".to_string()],
229            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
230            append_mode: false,
231            filter_deleted: true,
232            merge_mode: MergeMode::LastRow,
233            partition_expr_version: 7,
234        }
235        .build();
236
237        let reset = fingerprint.without_time_filters();
238
239        assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
240        assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
241        assert_eq!(reset.filters(), fingerprint.filters());
242        assert!(reset.time_filters().is_empty());
243        assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
244        assert_eq!(reset.append_mode, fingerprint.append_mode);
245        assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
246        assert_eq!(reset.merge_mode, fingerprint.merge_mode);
247        assert_eq!(
248            reset.partition_expr_version,
249            fingerprint.partition_expr_version
250        );
251    }
252}