diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 3ad71d2a61..e232489768 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -49,6 +49,7 @@ use crate::cache::write_cache::WriteCacheRef; use crate::memtable::record_batch_estimated_size; use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; use crate::read::Batch; +use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue}; use crate::sst::file::{RegionFileId, RegionIndexId}; use crate::sst::parquet::reader::MetadataCacheMetrics; @@ -64,6 +65,8 @@ const FILE_TYPE: &str = "file"; const INDEX_TYPE: &str = "index"; /// Metrics type key for selector result cache. const SELECTOR_RESULT_TYPE: &str = "selector_result"; +/// Metrics type key for range scan result cache. +const RANGE_RESULT_TYPE: &str = "range_result"; /// Cache strategies that may only enable a subset of caches. #[derive(Clone)] @@ -223,6 +226,32 @@ impl CacheStrategy { } } + /// Calls [CacheManager::get_range_result()]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn get_range_result( + &self, + key: &RangeScanCacheKey, + ) -> Option> { + match self { + CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key), + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } + + /// Calls [CacheManager::put_range_result()]. + /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn put_range_result( + &self, + key: RangeScanCacheKey, + result: Arc, + ) { + if let CacheStrategy::EnableAll(cache_manager) = self { + cache_manager.put_range_result(key, result); + } + } + /// Calls [CacheManager::write_cache()]. /// It returns None if the strategy is [CacheStrategy::Disabled]. pub fn write_cache(&self) -> Option<&WriteCacheRef> { @@ -324,6 +353,9 @@ pub struct CacheManager { puffin_metadata_cache: Option, /// Cache for time series selectors. selector_result_cache: Option, + /// Cache for range scan outputs in flat format. + #[cfg_attr(not(test), allow(dead_code))] + range_result_cache: Option, /// Cache for index result. index_result_cache: Option, } @@ -512,6 +544,32 @@ impl CacheManager { } } + /// Gets cached result for range scan. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn get_range_result( + &self, + key: &RangeScanCacheKey, + ) -> Option> { + self.range_result_cache + .as_ref() + .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE)) + } + + /// Puts range scan result into the cache. + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn put_range_result( + &self, + key: RangeScanCacheKey, + result: Arc, + ) { + if let Some(cache) = &self.range_result_cache { + CACHE_BYTES + .with_label_values(&[RANGE_RESULT_TYPE]) + .add(range_result_cache_weight(&key, &result).into()); + cache.insert(key, result); + } + } + /// Gets the write cache. pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> { self.write_cache.as_ref() @@ -562,6 +620,7 @@ pub struct CacheManagerBuilder { puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, + range_result_cache_size: u64, } impl CacheManagerBuilder { @@ -625,6 +684,12 @@ impl CacheManagerBuilder { self } + /// Sets range result cache size. + pub fn range_result_cache_size(mut self, bytes: u64) -> Self { + self.range_result_cache_size = bytes; + self + } + /// Builds the [CacheManager]. pub fn build(self) -> CacheManager { fn to_str(cause: RemovalCause) -> &'static str { @@ -712,6 +777,21 @@ impl CacheManagerBuilder { }) .build() }); + let range_result_cache = (self.range_result_cache_size != 0).then(|| { + Cache::builder() + .max_capacity(self.range_result_cache_size) + .weigher(range_result_cache_weight) + .eviction_listener(|k, v, cause| { + let size = range_result_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[RANGE_RESULT_TYPE]) + .sub(size.into()); + CACHE_EVICTION + .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)]) + .inc(); + }) + .build() + }); CacheManager { sst_meta_cache, vector_cache, @@ -723,6 +803,7 @@ impl CacheManagerBuilder { vector_index_cache, puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, + range_result_cache, index_result_cache, } } @@ -746,6 +827,10 @@ fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc) -> u32 { + (k.estimated_size() + v.estimated_size()) as u32 +} + /// Updates cache hit/miss metrics. fn update_hit_miss(value: Option, cache_type: &str) -> Option { if value.is_some() { @@ -902,6 +987,8 @@ type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>; type PageCache = Cache>; /// Maps (file id, row group id, time series row selector) to [SelectorResultValue]. type SelectorResultCache = Cache>; +/// Maps partition-range scan key to cached flat batches. +type RangeResultCache = Cache>; #[cfg(test)] mod tests { @@ -916,6 +1003,9 @@ mod tests { use crate::cache::index::bloom_filter_index::Tag; use crate::cache::index::result_cache::PredicateKey; use crate::cache::test_util::parquet_meta; + use crate::read::range_cache::{ + RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder, + }; use crate::sst::parquet::row_selection::RowGroupSelection; #[tokio::test] @@ -1028,6 +1118,50 @@ mod tests { assert!(cache.get_selector_result(&key).is_some()); } + #[test] + fn test_range_result_cache() { + let cache = Arc::new( + CacheManager::builder() + .range_result_cache_size(1024 * 1024) + .build(), + ); + + let key = RangeScanCacheKey { + region_id: RegionId::new(1, 1), + row_groups: vec![(FileId::random(), 0)], + scan: ScanRequestFingerprintBuilder { + read_column_ids: vec![], + read_column_types: vec![], + filters: vec!["tag_0 = 1".to_string()], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: true, + merge_mode: crate::region::options::MergeMode::LastRow, + partition_expr_version: 0, + } + .build(), + }; + let value = Arc::new(RangeScanCacheValue::new(Vec::new())); + + assert!(cache.get_range_result(&key).is_none()); + cache.put_range_result(key.clone(), value.clone()); + assert!(cache.get_range_result(&key).is_some()); + + let enable_all = CacheStrategy::EnableAll(cache.clone()); + assert!(enable_all.get_range_result(&key).is_some()); + + let compaction = CacheStrategy::Compaction(cache.clone()); + assert!(compaction.get_range_result(&key).is_none()); + compaction.put_range_result(key.clone(), value.clone()); + assert!(cache.get_range_result(&key).is_some()); + + let disabled = CacheStrategy::Disabled; + assert!(disabled.get_range_result(&key).is_none()); + disabled.put_range_result(key.clone(), value); + assert!(cache.get_range_result(&key).is_some()); + } + #[tokio::test] async fn test_evict_puffin_cache_clears_all_entries() { use std::collections::{BTreeMap, HashMap}; diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 5fbd63ce8b..240a99c247 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -27,6 +27,7 @@ pub mod projection; pub(crate) mod prune; pub(crate) mod pruner; pub mod range; +pub(crate) mod range_cache; pub mod scan_region; pub mod scan_util; pub(crate) mod seq_scan; diff --git a/src/mito2/src/read/range_cache.rs b/src/mito2/src/read/range_cache.rs new file mode 100644 index 0000000000..5b90e68bae --- /dev/null +++ b/src/mito2/src/read/range_cache.rs @@ -0,0 +1,252 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for the partition range scan result cache. + +use std::mem; +use std::sync::Arc; + +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::ConcreteDataType; +use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector}; + +use crate::memtable::record_batch_estimated_size; +use crate::region::options::MergeMode; + +/// Fingerprint of the scan request fields that affect partition range cache reuse. +/// +/// It records a normalized view of the projected columns and filters, plus +/// scan options that can change the returned rows. Schema-dependent metadata +/// and the partition expression version are included so cached results are not +/// reused across incompatible schema or partitioning changes. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct ScanRequestFingerprint { + /// Projection and filters without the time index and partition exprs. + inner: Arc, + /// Filters with the time index column. + time_filters: Option>>, + series_row_selector: Option, + append_mode: bool, + filter_deleted: bool, + merge_mode: MergeMode, + /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr. + /// We store the version instead of the whole partition expr or partition expr filters. + partition_expr_version: u64, +} + +#[derive(Debug)] +pub(crate) struct ScanRequestFingerprintBuilder { + pub(crate) read_column_ids: Vec, + pub(crate) read_column_types: Vec>, + pub(crate) filters: Vec, + pub(crate) time_filters: Vec, + pub(crate) series_row_selector: Option, + pub(crate) append_mode: bool, + pub(crate) filter_deleted: bool, + pub(crate) merge_mode: MergeMode, + pub(crate) partition_expr_version: u64, +} + +impl ScanRequestFingerprintBuilder { + pub(crate) fn build(self) -> ScanRequestFingerprint { + let Self { + read_column_ids, + read_column_types, + filters, + time_filters, + series_row_selector, + append_mode, + filter_deleted, + merge_mode, + partition_expr_version, + } = self; + + ScanRequestFingerprint { + inner: Arc::new(SharedScanRequestFingerprint { + read_column_ids, + read_column_types, + filters, + }), + time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)), + series_row_selector, + append_mode, + filter_deleted, + merge_mode, + partition_expr_version, + } + } +} + +/// Non-copiable struct of the fingerprint. +#[derive(Debug, PartialEq, Eq, Hash)] +struct SharedScanRequestFingerprint { + /// Column ids of the projection. + read_column_ids: Vec, + /// Column types of the projection. + /// We keep this to ensure we won't reuse the fingerprint after a schema change. + read_column_types: Vec>, + /// Filters without the time index column and region partition exprs. + filters: Vec, +} + +impl ScanRequestFingerprint { + #[cfg(test)] + pub(crate) fn read_column_ids(&self) -> &[ColumnId] { + &self.inner.read_column_ids + } + + #[cfg(test)] + pub(crate) fn read_column_types(&self) -> &[Option] { + &self.inner.read_column_types + } + + #[cfg(test)] + pub(crate) fn filters(&self) -> &[String] { + &self.inner.filters + } + + #[cfg(test)] + pub(crate) fn time_filters(&self) -> &[String] { + self.time_filters + .as_deref() + .map(Vec::as_slice) + .unwrap_or(&[]) + } + + #[cfg(test)] + pub(crate) fn without_time_filters(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + time_filters: None, + series_row_selector: self.series_row_selector, + append_mode: self.append_mode, + filter_deleted: self.filter_deleted, + merge_mode: self.merge_mode, + partition_expr_version: self.partition_expr_version, + } + } + + pub(crate) fn estimated_size(&self) -> usize { + mem::size_of::() + + self.inner.read_column_ids.capacity() * mem::size_of::() + + self.inner.read_column_types.capacity() * mem::size_of::>() + + self.inner.filters.capacity() * mem::size_of::() + + self + .inner + .filters + .iter() + .map(|filter| filter.capacity()) + .sum::() + + self.time_filters.as_ref().map_or(0, |filters| { + mem::size_of::>() + + filters.capacity() * mem::size_of::() + + filters + .iter() + .map(|filter| filter.capacity()) + .sum::() + }) + } +} + +/// Cache key for range scan outputs. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct RangeScanCacheKey { + pub(crate) region_id: RegionId, + /// Sorted (file_id, row_group_index) pairs that uniquely identify the covered data. + pub(crate) row_groups: Vec<(FileId, i64)>, + pub(crate) scan: ScanRequestFingerprint, +} + +impl RangeScanCacheKey { + pub(crate) fn estimated_size(&self) -> usize { + mem::size_of::() + + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>() + + self.scan.estimated_size() + } +} + +/// Cached result for one range scan. +pub(crate) struct RangeScanCacheValue { + pub(crate) batches: Vec, +} + +impl RangeScanCacheValue { + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) fn new(batches: Vec) -> Self { + Self { batches } + } + + pub(crate) fn estimated_size(&self) -> usize { + mem::size_of::() + + self.batches.capacity() * mem::size_of::() + + self + .batches + .iter() + .map(record_batch_estimated_size) + .sum::() + } +} + +#[cfg(test)] +mod tests { + use store_api::storage::TimeSeriesRowSelector; + + use super::*; + + #[test] + fn normalizes_and_clears_time_filters() { + let normalized = ScanRequestFingerprintBuilder { + read_column_ids: vec![1, 2], + read_column_types: vec![None, None], + filters: vec!["k0 = 'foo'".to_string()], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: true, + merge_mode: MergeMode::LastRow, + partition_expr_version: 0, + } + .build(); + + assert!(normalized.time_filters().is_empty()); + + let fingerprint = ScanRequestFingerprintBuilder { + read_column_ids: vec![1, 2], + read_column_types: vec![None, None], + filters: vec!["k0 = 'foo'".to_string()], + time_filters: vec!["ts >= 1000".to_string()], + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + append_mode: false, + filter_deleted: true, + merge_mode: MergeMode::LastRow, + partition_expr_version: 7, + } + .build(); + + let reset = fingerprint.without_time_filters(); + + assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids()); + assert_eq!(reset.read_column_types(), fingerprint.read_column_types()); + assert_eq!(reset.filters(), fingerprint.filters()); + assert!(reset.time_filters().is_empty()); + assert_eq!(reset.series_row_selector, fingerprint.series_row_selector); + assert_eq!(reset.append_mode, fingerprint.append_mode); + assert_eq!(reset.filter_deleted, fingerprint.filter_deleted); + assert_eq!(reset.merge_mode, fingerprint.merge_mode); + assert_eq!( + reset.partition_expr_version, + fingerprint.partition_expr_version + ); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5d934afd2d..5cb2d75e25 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -55,6 +55,7 @@ use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch}; use crate::read::projection::ProjectionMapper; use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; +use crate::read::range_cache::ScanRequestFingerprint; use crate::read::seq_scan::SeqScan; use crate::read::series_scan::SeriesScan; use crate::read::stream::ScanBatchStream; @@ -815,7 +816,7 @@ pub struct ScanInput { /// But this read columns might also include non-projected columns needed for filtering. pub(crate) read_column_ids: Vec, /// Time range filter for time index. - time_range: Option, + pub(crate) time_range: Option, /// Predicate to push down. pub(crate) predicate: PredicateGroup, /// Region partition expr applied at read time. @@ -1417,6 +1418,92 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { } } +/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible +/// for partition range caching. +#[cfg_attr(not(test), allow(dead_code))] +pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option { + let eligible = input.flat_format + && !input.compaction + && !input.files.is_empty() + && matches!(input.cache_strategy, CacheStrategy::EnableAll(_)); + + if !eligible { + return None; + } + + let metadata = input.region_metadata(); + let tag_names: HashSet<&str> = metadata + .column_metadatas + .iter() + .filter(|col| col.semantic_type == SemanticType::Tag) + .map(|col| col.column_schema.name.as_str()) + .collect(); + + let time_index_name = metadata.time_index_column().column_schema.name.clone(); + + let exprs = input + .predicate_group() + .predicate_without_region() + .map(|predicate| predicate.exprs()) + .unwrap_or_default(); + + let mut filters = Vec::new(); + let mut time_filters = Vec::new(); + let mut has_tag_filter = false; + let mut columns = HashSet::new(); + + for expr in exprs { + columns.clear(); + let is_time_only = match expr_to_columns(expr, &mut columns) { + Ok(()) if !columns.is_empty() => { + has_tag_filter |= columns + .iter() + .any(|col| tag_names.contains(col.name.as_str())); + columns.iter().all(|col| col.name == time_index_name) + } + _ => false, + }; + + if is_time_only { + time_filters.push(expr.to_string()); + } else { + filters.push(expr.to_string()); + } + } + + if !has_tag_filter { + // We only cache requests that have tag filters to avoid caching all series. + return None; + } + + // Ensure the filters are sorted for consistent fingerprinting. + filters.sort_unstable(); + time_filters.sort_unstable(); + + Some( + crate::read::range_cache::ScanRequestFingerprintBuilder { + read_column_ids: input.read_column_ids.clone(), + read_column_types: input + .read_column_ids + .iter() + .map(|id| { + metadata + .column_by_id(*id) + .map(|col| col.column_schema.data_type.clone()) + }) + .collect(), + filters, + time_filters, + series_row_selector: input.series_row_selector, + append_mode: input.append_mode, + filter_deleted: input.filter_deleted, + merge_mode: input.merge_mode, + partition_expr_version: metadata.partition_expr_version, + } + .build(), + ) +} + /// Context shared by different streams from a scanner. /// It contains the input and ranges to scan. pub struct StreamContext { @@ -1763,10 +1850,15 @@ mod tests { use datafusion::physical_plan::expressions::lit as physical_lit; use datafusion_expr::{col, lit}; - use store_api::storage::ScanRequest; + use datatypes::value::Value; + use partition::expr::col as partition_col; + use store_api::metadata::RegionMetadataBuilder; + use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; use super::*; + use crate::cache::CacheManager; use crate::memtable::time_partition::TimePartitions; + use crate::read::range_cache::ScanRequestFingerprintBuilder; use crate::region::options::RegionOptions; use crate::region::version::VersionBuilder; use crate::sst::FormatType; @@ -1804,6 +1896,26 @@ mod tests { ) } + async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec) -> ScanInput { + let env = SchedulerEnv::new().await; + let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(); + let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap(); + let file = FileHandle::new( + crate::sst::file::FileMeta::default(), + Arc::new(crate::sst::file_purger::NoopFilePurger), + ); + + ScanInput::new(env.access_layer.clone(), mapper) + .with_predicate(predicate) + .with_cache(CacheStrategy::EnableAll(Arc::new( + CacheManager::builder() + .range_result_cache_size(1024) + .build(), + ))) + .with_flat_format(true) + .with_files(vec![file]) + } + #[tokio::test] async fn test_build_read_column_ids_includes_filters() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); @@ -1923,6 +2035,133 @@ mod tests { assert!(scan_region.use_flat_format()); } + #[tokio::test] + async fn test_build_scan_fingerprint_for_eligible_scan() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let input = new_scan_input( + metadata.clone(), + vec![ + col("ts").gt_eq(lit(1000)), + col("k0").eq(lit("foo")), + col("v0").gt(lit(1)), + ], + ) + .await + .with_distribution(Some(TimeSeriesDistribution::PerSeries)) + .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow)) + .with_merge_mode(MergeMode::LastNonNull) + .with_filter_deleted(false); + + let fingerprint = build_scan_fingerprint(&input).unwrap(); + + let expected = ScanRequestFingerprintBuilder { + read_column_ids: input.read_column_ids.clone(), + read_column_types: vec![ + metadata + .column_by_id(0) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(2) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(3) + .map(|col| col.column_schema.data_type.clone()), + ], + filters: vec![ + col("k0").eq(lit("foo")).to_string(), + col("v0").gt(lit(1)).to_string(), + ], + time_filters: vec![col("ts").gt_eq(lit(1000)).to_string()], + series_row_selector: Some(TimeSeriesRowSelector::LastRow), + append_mode: false, + filter_deleted: false, + merge_mode: MergeMode::LastNonNull, + partition_expr_version: 0, + } + .build(); + assert_eq!(expected, fingerprint); + } + + #[tokio::test] + async fn test_build_scan_fingerprint_requires_tag_filter() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let input = new_scan_input( + metadata, + vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))], + ) + .await; + + assert!(build_scan_fingerprint(&input).is_none()); + } + + #[tokio::test] + async fn test_build_scan_fingerprint_respects_scan_eligibility() { + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let filters = vec![col("k0").eq(lit("foo"))]; + + let disabled = ScanInput::new( + SchedulerEnv::new().await.access_layer.clone(), + ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(), + ) + .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap()) + .with_flat_format(true); + assert!(build_scan_fingerprint(&disabled).is_none()); + + let non_flat = new_scan_input(metadata.clone(), filters.clone()) + .await + .with_flat_format(false); + assert!(build_scan_fingerprint(&non_flat).is_none()); + + let compaction = new_scan_input(metadata.clone(), filters.clone()) + .await + .with_compaction(true); + assert!(build_scan_fingerprint(&compaction).is_none()); + + // No files to read. + let no_files = new_scan_input(metadata, filters).await.with_files(vec![]); + assert!(build_scan_fingerprint(&no_files).is_none()); + } + + #[tokio::test] + async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() { + let base = metadata_with_primary_key(vec![0, 1], false); + let mut builder = RegionMetadataBuilder::from_existing(base); + let partition_expr = partition_col("k0") + .gt_eq(Value::String("foo".into())) + .as_json_str() + .unwrap(); + builder.partition_expr_json(Some(partition_expr)); + let metadata = Arc::new(builder.build_without_validation().unwrap()); + + let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await; + let fingerprint = build_scan_fingerprint(&input).unwrap(); + + let expected = ScanRequestFingerprintBuilder { + read_column_ids: input.read_column_ids.clone(), + read_column_types: vec![ + metadata + .column_by_id(0) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(2) + .map(|col| col.column_schema.data_type.clone()), + metadata + .column_by_id(3) + .map(|col| col.column_schema.data_type.clone()), + ], + filters: vec![col("k0").eq(lit("foo")).to_string()], + time_filters: vec![], + series_row_selector: None, + append_mode: false, + filter_deleted: true, + merge_mode: MergeMode::LastRow, + partition_expr_version: metadata.partition_expr_version, + } + .build(); + assert_eq!(expected, fingerprint); + assert_ne!(0, metadata.partition_expr_version); + } + #[test] fn test_update_dyn_filters_with_empty_base_predicates() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 0fe0a8f12a..fcf68a9216 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -50,7 +50,7 @@ pub(crate) fn parse_wal_options( } /// Mode to handle duplicate rows while merging. -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, EnumString)] +#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, EnumString)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum MergeMode {