feat(mito2): add partition range cache infrastructure (#7798)

* feat: add partition range cache infra

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: optimize scan request fingerprint cloning

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: merge loops

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: more docs

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update estimated size method and comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: only cache when we scan files

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: address PR review comments for partition range cache

- Remove TimeSeriesDistribution from fingerprint as it only affects yield order
- Disable range cache when dyn filters are present since they change at runtime

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-03-17 11:53:20 +08:00
committed by GitHub
parent dd82fcac00
commit 5a37e58b4f
5 changed files with 629 additions and 3 deletions

View File

@@ -49,6 +49,7 @@ use crate::cache::write_cache::WriteCacheRef;
use crate::memtable::record_batch_estimated_size;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
use crate::sst::file::{RegionFileId, RegionIndexId};
use crate::sst::parquet::reader::MetadataCacheMetrics;
@@ -64,6 +65,8 @@ const FILE_TYPE: &str = "file";
const INDEX_TYPE: &str = "index";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";
/// Metrics type key for range scan result cache.
const RANGE_RESULT_TYPE: &str = "range_result";
/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
@@ -223,6 +226,32 @@ impl CacheStrategy {
}
}
/// Calls [CacheManager::get_range_result()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn get_range_result(
&self,
key: &RangeScanCacheKey,
) -> Option<Arc<RangeScanCacheValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
/// Calls [CacheManager::put_range_result()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn put_range_result(
&self,
key: RangeScanCacheKey,
result: Arc<RangeScanCacheValue>,
) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_range_result(key, result);
}
}
/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
@@ -324,6 +353,9 @@ pub struct CacheManager {
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// Cache for time series selectors.
selector_result_cache: Option<SelectorResultCache>,
/// Cache for range scan outputs in flat format.
#[cfg_attr(not(test), allow(dead_code))]
range_result_cache: Option<RangeResultCache>,
/// Cache for index result.
index_result_cache: Option<IndexResultCache>,
}
@@ -512,6 +544,32 @@ impl CacheManager {
}
}
/// Gets cached result for range scan.
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn get_range_result(
&self,
key: &RangeScanCacheKey,
) -> Option<Arc<RangeScanCacheValue>> {
self.range_result_cache
.as_ref()
.and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
}
/// Puts range scan result into the cache.
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn put_range_result(
&self,
key: RangeScanCacheKey,
result: Arc<RangeScanCacheValue>,
) {
if let Some(cache) = &self.range_result_cache {
CACHE_BYTES
.with_label_values(&[RANGE_RESULT_TYPE])
.add(range_result_cache_weight(&key, &result).into());
cache.insert(key, result);
}
}
/// Gets the write cache.
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
@@ -562,6 +620,7 @@ pub struct CacheManagerBuilder {
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
range_result_cache_size: u64,
}
impl CacheManagerBuilder {
@@ -625,6 +684,12 @@ impl CacheManagerBuilder {
self
}
/// Sets range result cache size.
pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
self.range_result_cache_size = bytes;
self
}
/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
fn to_str(cause: RemovalCause) -> &'static str {
@@ -712,6 +777,21 @@ impl CacheManagerBuilder {
})
.build()
});
let range_result_cache = (self.range_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.range_result_cache_size)
.weigher(range_result_cache_weight)
.eviction_listener(|k, v, cause| {
let size = range_result_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[RANGE_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
.inc();
})
.build()
});
CacheManager {
sst_meta_cache,
vector_cache,
@@ -723,6 +803,7 @@ impl CacheManagerBuilder {
vector_index_cache,
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
range_result_cache,
index_result_cache,
}
}
@@ -746,6 +827,10 @@ fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultVal
(mem::size_of_val(k) + v.estimated_size()) as u32
}
fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
}
/// Updates cache hit/miss metrics.
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
@@ -902,6 +987,8 @@ type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
type PageCache = Cache<PageKey, Arc<PageValue>>;
/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
/// Maps partition-range scan key to cached flat batches.
type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
#[cfg(test)]
mod tests {
@@ -916,6 +1003,9 @@ mod tests {
use crate::cache::index::bloom_filter_index::Tag;
use crate::cache::index::result_cache::PredicateKey;
use crate::cache::test_util::parquet_meta;
use crate::read::range_cache::{
RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
};
use crate::sst::parquet::row_selection::RowGroupSelection;
#[tokio::test]
@@ -1028,6 +1118,50 @@ mod tests {
assert!(cache.get_selector_result(&key).is_some());
}
#[test]
fn test_range_result_cache() {
let cache = Arc::new(
CacheManager::builder()
.range_result_cache_size(1024 * 1024)
.build(),
);
let key = RangeScanCacheKey {
region_id: RegionId::new(1, 1),
row_groups: vec![(FileId::random(), 0)],
scan: ScanRequestFingerprintBuilder {
read_column_ids: vec![],
read_column_types: vec![],
filters: vec!["tag_0 = 1".to_string()],
time_filters: vec![],
series_row_selector: None,
append_mode: false,
filter_deleted: true,
merge_mode: crate::region::options::MergeMode::LastRow,
partition_expr_version: 0,
}
.build(),
};
let value = Arc::new(RangeScanCacheValue::new(Vec::new()));
assert!(cache.get_range_result(&key).is_none());
cache.put_range_result(key.clone(), value.clone());
assert!(cache.get_range_result(&key).is_some());
let enable_all = CacheStrategy::EnableAll(cache.clone());
assert!(enable_all.get_range_result(&key).is_some());
let compaction = CacheStrategy::Compaction(cache.clone());
assert!(compaction.get_range_result(&key).is_none());
compaction.put_range_result(key.clone(), value.clone());
assert!(cache.get_range_result(&key).is_some());
let disabled = CacheStrategy::Disabled;
assert!(disabled.get_range_result(&key).is_none());
disabled.put_range_result(key.clone(), value);
assert!(cache.get_range_result(&key).is_some());
}
#[tokio::test]
async fn test_evict_puffin_cache_clears_all_entries() {
use std::collections::{BTreeMap, HashMap};

View File

@@ -27,6 +27,7 @@ pub mod projection;
pub(crate) mod prune;
pub(crate) mod pruner;
pub mod range;
pub(crate) mod range_cache;
pub mod scan_region;
pub mod scan_util;
pub(crate) mod seq_scan;

View File

@@ -0,0 +1,252 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for the partition range scan result cache.
use std::mem;
use std::sync::Arc;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
use crate::memtable::record_batch_estimated_size;
use crate::region::options::MergeMode;
/// Fingerprint of the scan request fields that affect partition range cache reuse.
///
/// It records a normalized view of the projected columns and filters, plus
/// scan options that can change the returned rows. Schema-dependent metadata
/// and the partition expression version are included so cached results are not
/// reused across incompatible schema or partitioning changes.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct ScanRequestFingerprint {
/// Projection and filters without the time index and partition exprs.
inner: Arc<SharedScanRequestFingerprint>,
/// Filters with the time index column.
time_filters: Option<Arc<Vec<String>>>,
series_row_selector: Option<TimeSeriesRowSelector>,
append_mode: bool,
filter_deleted: bool,
merge_mode: MergeMode,
/// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
/// We store the version instead of the whole partition expr or partition expr filters.
partition_expr_version: u64,
}
#[derive(Debug)]
pub(crate) struct ScanRequestFingerprintBuilder {
pub(crate) read_column_ids: Vec<ColumnId>,
pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
pub(crate) filters: Vec<String>,
pub(crate) time_filters: Vec<String>,
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
pub(crate) append_mode: bool,
pub(crate) filter_deleted: bool,
pub(crate) merge_mode: MergeMode,
pub(crate) partition_expr_version: u64,
}
impl ScanRequestFingerprintBuilder {
pub(crate) fn build(self) -> ScanRequestFingerprint {
let Self {
read_column_ids,
read_column_types,
filters,
time_filters,
series_row_selector,
append_mode,
filter_deleted,
merge_mode,
partition_expr_version,
} = self;
ScanRequestFingerprint {
inner: Arc::new(SharedScanRequestFingerprint {
read_column_ids,
read_column_types,
filters,
}),
time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
series_row_selector,
append_mode,
filter_deleted,
merge_mode,
partition_expr_version,
}
}
}
/// Non-copiable struct of the fingerprint.
#[derive(Debug, PartialEq, Eq, Hash)]
struct SharedScanRequestFingerprint {
/// Column ids of the projection.
read_column_ids: Vec<ColumnId>,
/// Column types of the projection.
/// We keep this to ensure we won't reuse the fingerprint after a schema change.
read_column_types: Vec<Option<ConcreteDataType>>,
/// Filters without the time index column and region partition exprs.
filters: Vec<String>,
}
impl ScanRequestFingerprint {
#[cfg(test)]
pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
&self.inner.read_column_ids
}
#[cfg(test)]
pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
&self.inner.read_column_types
}
#[cfg(test)]
pub(crate) fn filters(&self) -> &[String] {
&self.inner.filters
}
#[cfg(test)]
pub(crate) fn time_filters(&self) -> &[String] {
self.time_filters
.as_deref()
.map(Vec::as_slice)
.unwrap_or(&[])
}
#[cfg(test)]
pub(crate) fn without_time_filters(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
time_filters: None,
series_row_selector: self.series_row_selector,
append_mode: self.append_mode,
filter_deleted: self.filter_deleted,
merge_mode: self.merge_mode,
partition_expr_version: self.partition_expr_version,
}
}
pub(crate) fn estimated_size(&self) -> usize {
mem::size_of::<SharedScanRequestFingerprint>()
+ self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
+ self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
+ self.inner.filters.capacity() * mem::size_of::<String>()
+ self
.inner
.filters
.iter()
.map(|filter| filter.capacity())
.sum::<usize>()
+ self.time_filters.as_ref().map_or(0, |filters| {
mem::size_of::<Vec<String>>()
+ filters.capacity() * mem::size_of::<String>()
+ filters
.iter()
.map(|filter| filter.capacity())
.sum::<usize>()
})
}
}
/// Cache key for range scan outputs.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub(crate) struct RangeScanCacheKey {
pub(crate) region_id: RegionId,
/// Sorted (file_id, row_group_index) pairs that uniquely identify the covered data.
pub(crate) row_groups: Vec<(FileId, i64)>,
pub(crate) scan: ScanRequestFingerprint,
}
impl RangeScanCacheKey {
pub(crate) fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
+ self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
+ self.scan.estimated_size()
}
}
/// Cached result for one range scan.
pub(crate) struct RangeScanCacheValue {
pub(crate) batches: Vec<RecordBatch>,
}
impl RangeScanCacheValue {
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn new(batches: Vec<RecordBatch>) -> Self {
Self { batches }
}
pub(crate) fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
+ self.batches.capacity() * mem::size_of::<RecordBatch>()
+ self
.batches
.iter()
.map(record_batch_estimated_size)
.sum::<usize>()
}
}
#[cfg(test)]
mod tests {
use store_api::storage::TimeSeriesRowSelector;
use super::*;
#[test]
fn normalizes_and_clears_time_filters() {
let normalized = ScanRequestFingerprintBuilder {
read_column_ids: vec![1, 2],
read_column_types: vec![None, None],
filters: vec!["k0 = 'foo'".to_string()],
time_filters: vec![],
series_row_selector: None,
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::LastRow,
partition_expr_version: 0,
}
.build();
assert!(normalized.time_filters().is_empty());
let fingerprint = ScanRequestFingerprintBuilder {
read_column_ids: vec![1, 2],
read_column_types: vec![None, None],
filters: vec!["k0 = 'foo'".to_string()],
time_filters: vec!["ts >= 1000".to_string()],
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::LastRow,
partition_expr_version: 7,
}
.build();
let reset = fingerprint.without_time_filters();
assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
assert_eq!(reset.filters(), fingerprint.filters());
assert!(reset.time_filters().is_empty());
assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
assert_eq!(reset.append_mode, fingerprint.append_mode);
assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
assert_eq!(reset.merge_mode, fingerprint.merge_mode);
assert_eq!(
reset.partition_expr_version,
fingerprint.partition_expr_version
);
}
}

View File

@@ -55,6 +55,7 @@ use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::range_cache::ScanRequestFingerprint;
use crate::read::seq_scan::SeqScan;
use crate::read::series_scan::SeriesScan;
use crate::read::stream::ScanBatchStream;
@@ -815,7 +816,7 @@ pub struct ScanInput {
/// But this read columns might also include non-projected columns needed for filtering.
pub(crate) read_column_ids: Vec<ColumnId>,
/// Time range filter for time index.
time_range: Option<TimestampRange>,
pub(crate) time_range: Option<TimestampRange>,
/// Predicate to push down.
pub(crate) predicate: PredicateGroup,
/// Region partition expr applied at read time.
@@ -1417,6 +1418,92 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
}
}
/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
/// for partition range caching.
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
let eligible = input.flat_format
&& !input.compaction
&& !input.files.is_empty()
&& matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
if !eligible {
return None;
}
let metadata = input.region_metadata();
let tag_names: HashSet<&str> = metadata
.column_metadatas
.iter()
.filter(|col| col.semantic_type == SemanticType::Tag)
.map(|col| col.column_schema.name.as_str())
.collect();
let time_index_name = metadata.time_index_column().column_schema.name.clone();
let exprs = input
.predicate_group()
.predicate_without_region()
.map(|predicate| predicate.exprs())
.unwrap_or_default();
let mut filters = Vec::new();
let mut time_filters = Vec::new();
let mut has_tag_filter = false;
let mut columns = HashSet::new();
for expr in exprs {
columns.clear();
let is_time_only = match expr_to_columns(expr, &mut columns) {
Ok(()) if !columns.is_empty() => {
has_tag_filter |= columns
.iter()
.any(|col| tag_names.contains(col.name.as_str()));
columns.iter().all(|col| col.name == time_index_name)
}
_ => false,
};
if is_time_only {
time_filters.push(expr.to_string());
} else {
filters.push(expr.to_string());
}
}
if !has_tag_filter {
// We only cache requests that have tag filters to avoid caching all series.
return None;
}
// Ensure the filters are sorted for consistent fingerprinting.
filters.sort_unstable();
time_filters.sort_unstable();
Some(
crate::read::range_cache::ScanRequestFingerprintBuilder {
read_column_ids: input.read_column_ids.clone(),
read_column_types: input
.read_column_ids
.iter()
.map(|id| {
metadata
.column_by_id(*id)
.map(|col| col.column_schema.data_type.clone())
})
.collect(),
filters,
time_filters,
series_row_selector: input.series_row_selector,
append_mode: input.append_mode,
filter_deleted: input.filter_deleted,
merge_mode: input.merge_mode,
partition_expr_version: metadata.partition_expr_version,
}
.build(),
)
}
/// Context shared by different streams from a scanner.
/// It contains the input and ranges to scan.
pub struct StreamContext {
@@ -1763,10 +1850,15 @@ mod tests {
use datafusion::physical_plan::expressions::lit as physical_lit;
use datafusion_expr::{col, lit};
use store_api::storage::ScanRequest;
use datatypes::value::Value;
use partition::expr::col as partition_col;
use store_api::metadata::RegionMetadataBuilder;
use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use super::*;
use crate::cache::CacheManager;
use crate::memtable::time_partition::TimePartitions;
use crate::read::range_cache::ScanRequestFingerprintBuilder;
use crate::region::options::RegionOptions;
use crate::region::version::VersionBuilder;
use crate::sst::FormatType;
@@ -1804,6 +1896,26 @@ mod tests {
)
}
async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
let env = SchedulerEnv::new().await;
let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
let file = FileHandle::new(
crate::sst::file::FileMeta::default(),
Arc::new(crate::sst::file_purger::NoopFilePurger),
);
ScanInput::new(env.access_layer.clone(), mapper)
.with_predicate(predicate)
.with_cache(CacheStrategy::EnableAll(Arc::new(
CacheManager::builder()
.range_result_cache_size(1024)
.build(),
)))
.with_flat_format(true)
.with_files(vec![file])
}
#[tokio::test]
async fn test_build_read_column_ids_includes_filters() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
@@ -1923,6 +2035,133 @@ mod tests {
assert!(scan_region.use_flat_format());
}
#[tokio::test]
async fn test_build_scan_fingerprint_for_eligible_scan() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let input = new_scan_input(
metadata.clone(),
vec![
col("ts").gt_eq(lit(1000)),
col("k0").eq(lit("foo")),
col("v0").gt(lit(1)),
],
)
.await
.with_distribution(Some(TimeSeriesDistribution::PerSeries))
.with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
.with_merge_mode(MergeMode::LastNonNull)
.with_filter_deleted(false);
let fingerprint = build_scan_fingerprint(&input).unwrap();
let expected = ScanRequestFingerprintBuilder {
read_column_ids: input.read_column_ids.clone(),
read_column_types: vec![
metadata
.column_by_id(0)
.map(|col| col.column_schema.data_type.clone()),
metadata
.column_by_id(2)
.map(|col| col.column_schema.data_type.clone()),
metadata
.column_by_id(3)
.map(|col| col.column_schema.data_type.clone()),
],
filters: vec![
col("k0").eq(lit("foo")).to_string(),
col("v0").gt(lit(1)).to_string(),
],
time_filters: vec![col("ts").gt_eq(lit(1000)).to_string()],
series_row_selector: Some(TimeSeriesRowSelector::LastRow),
append_mode: false,
filter_deleted: false,
merge_mode: MergeMode::LastNonNull,
partition_expr_version: 0,
}
.build();
assert_eq!(expected, fingerprint);
}
#[tokio::test]
async fn test_build_scan_fingerprint_requires_tag_filter() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let input = new_scan_input(
metadata,
vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
)
.await;
assert!(build_scan_fingerprint(&input).is_none());
}
#[tokio::test]
async fn test_build_scan_fingerprint_respects_scan_eligibility() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
let filters = vec![col("k0").eq(lit("foo"))];
let disabled = ScanInput::new(
SchedulerEnv::new().await.access_layer.clone(),
ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(),
)
.with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap())
.with_flat_format(true);
assert!(build_scan_fingerprint(&disabled).is_none());
let non_flat = new_scan_input(metadata.clone(), filters.clone())
.await
.with_flat_format(false);
assert!(build_scan_fingerprint(&non_flat).is_none());
let compaction = new_scan_input(metadata.clone(), filters.clone())
.await
.with_compaction(true);
assert!(build_scan_fingerprint(&compaction).is_none());
// No files to read.
let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
assert!(build_scan_fingerprint(&no_files).is_none());
}
#[tokio::test]
async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
let base = metadata_with_primary_key(vec![0, 1], false);
let mut builder = RegionMetadataBuilder::from_existing(base);
let partition_expr = partition_col("k0")
.gt_eq(Value::String("foo".into()))
.as_json_str()
.unwrap();
builder.partition_expr_json(Some(partition_expr));
let metadata = Arc::new(builder.build_without_validation().unwrap());
let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
let fingerprint = build_scan_fingerprint(&input).unwrap();
let expected = ScanRequestFingerprintBuilder {
read_column_ids: input.read_column_ids.clone(),
read_column_types: vec![
metadata
.column_by_id(0)
.map(|col| col.column_schema.data_type.clone()),
metadata
.column_by_id(2)
.map(|col| col.column_schema.data_type.clone()),
metadata
.column_by_id(3)
.map(|col| col.column_schema.data_type.clone()),
],
filters: vec![col("k0").eq(lit("foo")).to_string()],
time_filters: vec![],
series_row_selector: None,
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::LastRow,
partition_expr_version: metadata.partition_expr_version,
}
.build();
assert_eq!(expected, fingerprint);
assert_ne!(0, metadata.partition_expr_version);
}
#[test]
fn test_update_dyn_filters_with_empty_base_predicates() {
let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));

View File

@@ -50,7 +50,7 @@ pub(crate) fn parse_wal_options(
}
/// Mode to handle duplicate rows while merging.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)]
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum MergeMode {