diff --git a/config/config.md b/config/config.md index fe318c91b9..423e05a668 100644 --- a/config/config.md +++ b/config/config.md @@ -154,6 +154,7 @@ | `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. | | `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. | | `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. | +| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. | | `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. | | `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | @@ -494,6 +495,7 @@ | `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. | | `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. | | `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. | +| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. | | `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. | | `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.
- `auto`: automatically (default)
- `disable`: never | | `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 6b00cf90bd..aa1c5b7c54 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -499,6 +499,9 @@ content_cache_size = "128MiB" ## Page size for inverted index content cache. content_cache_page_size = "64KiB" +## Cache size for index result. +result_cache_size = "128MiB" + ## The options for inverted index in Mito engine. [region_engine.mito.inverted_index] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 649b570350..4093ab4423 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -590,6 +590,9 @@ content_cache_size = "128MiB" ## Page size for inverted index content cache. content_cache_page_size = "64KiB" +## Cache size for index result. +result_cache_size = "128MiB" + ## The options for inverted index in Mito engine. [region_engine.mito.inverted_index] diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index f625283607..4e29eb538b 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::BTreeSet; use std::ops::Range; use fastbloom::BloomFilter; @@ -25,10 +25,10 @@ use crate::Bytes; /// `InListPredicate` contains a list of acceptable values. A value needs to match at least /// one of the elements (logical OR semantic) for the predicate to be satisfied. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InListPredicate { /// List of acceptable values. - pub list: HashSet, + pub list: BTreeSet, } pub struct BloomFilterApplier { @@ -277,21 +277,21 @@ mod tests { // Single value predicates ( vec![InListPredicate { - list: HashSet::from_iter([b"row00".to_vec()]), + list: BTreeSet::from_iter([b"row00".to_vec()]), }], 0..28, vec![0..4], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"row05".to_vec()]), + list: BTreeSet::from_iter([b"row05".to_vec()]), }], 4..8, vec![4..8], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"row03".to_vec()]), + list: BTreeSet::from_iter([b"row03".to_vec()]), }], 4..8, vec![], @@ -299,14 +299,14 @@ mod tests { // Multiple values in a single predicate (OR logic) ( vec![InListPredicate { - list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]), + list: BTreeSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]), }], 0..28, vec![0..8], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]), + list: BTreeSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]), }], 0..28, vec![4..12], @@ -314,7 +314,7 @@ mod tests { // Non-existent values ( vec![InListPredicate { - list: HashSet::from_iter([b"row99".to_vec()]), + list: BTreeSet::from_iter([b"row99".to_vec()]), }], 0..28, vec![], @@ -322,7 +322,7 @@ mod tests { // Empty range ( vec![InListPredicate { - list: HashSet::from_iter([b"row00".to_vec()]), + list: BTreeSet::from_iter([b"row00".to_vec()]), }], 12..12, vec![], @@ -330,21 +330,21 @@ mod tests { // Multiple values in a single predicate within specific ranges ( vec![InListPredicate { - list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]), + list: BTreeSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]), }], 0..12, vec![4..8], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"seg01".to_vec()]), + list: BTreeSet::from_iter([b"seg01".to_vec()]), }], 0..28, vec![4..8], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"seg01".to_vec()]), + list: BTreeSet::from_iter([b"seg01".to_vec()]), }], 6..28, vec![6..8], @@ -352,21 +352,21 @@ mod tests { // Values spanning multiple segments ( vec![InListPredicate { - list: HashSet::from_iter([b"overl".to_vec()]), + list: BTreeSet::from_iter([b"overl".to_vec()]), }], 0..28, vec![0..8], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"overl".to_vec()]), + list: BTreeSet::from_iter([b"overl".to_vec()]), }], 2..28, vec![2..8], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"overp".to_vec()]), + list: BTreeSet::from_iter([b"overp".to_vec()]), }], 0..10, vec![4..10], @@ -374,21 +374,21 @@ mod tests { // Duplicate values ( vec![InListPredicate { - list: HashSet::from_iter([b"dup".to_vec()]), + list: BTreeSet::from_iter([b"dup".to_vec()]), }], 0..12, vec![], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"dup".to_vec()]), + list: BTreeSet::from_iter([b"dup".to_vec()]), }], 0..16, vec![12..16], ), ( vec![InListPredicate { - list: HashSet::from_iter([b"dup".to_vec()]), + list: BTreeSet::from_iter([b"dup".to_vec()]), }], 0..28, vec![12..28], @@ -397,10 +397,10 @@ mod tests { ( vec![ InListPredicate { - list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]), + list: BTreeSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]), }, InListPredicate { - list: HashSet::from_iter([b"seg00".to_vec()]), + list: BTreeSet::from_iter([b"seg00".to_vec()]), }, ], 0..28, @@ -409,10 +409,10 @@ mod tests { ( vec![ InListPredicate { - list: HashSet::from_iter([b"overl".to_vec()]), + list: BTreeSet::from_iter([b"overl".to_vec()]), }, InListPredicate { - list: HashSet::from_iter([b"seg01".to_vec()]), + list: BTreeSet::from_iter([b"seg01".to_vec()]), }, ], 0..28, diff --git a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs index ed8d483230..5bc227a982 100644 --- a/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/intersection_apply.rs @@ -183,7 +183,7 @@ impl TryFrom> for IntersectionFstApplier { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::BTreeSet; use super::*; use crate::inverted_index::error::Error; @@ -405,7 +405,7 @@ mod tests { #[test] fn test_intersection_fst_applier_with_in_list_predicate() { let result = IntersectionFstApplier::try_from(vec![Predicate::InList(InListPredicate { - list: HashSet::from_iter([b"one".to_vec(), b"two".to_vec()]), + list: BTreeSet::from_iter([b"one".to_vec(), b"two".to_vec()]), })]); assert!(matches!( result, diff --git a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs index 79da9b0e0c..eb30f38f5c 100644 --- a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::BTreeSet; use std::mem::size_of; use snafu::{ensure, ResultExt}; @@ -93,7 +93,7 @@ impl KeysFstApplier { fn intersect_with_lists(in_lists: &mut [Predicate]) -> Vec { #[inline] - fn get_list(p: &Predicate) -> &HashSet { + fn get_list(p: &Predicate) -> &BTreeSet { match p { Predicate::InList(i) => &i.list, _ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists` @@ -229,7 +229,7 @@ mod tests { fn test_keys_fst_applier_try_from() { let predicates = vec![ Predicate::InList(InListPredicate { - list: HashSet::from_iter(vec![b("foo"), b("bar")]), + list: BTreeSet::from_iter(vec![b("foo"), b("bar")]), }), Predicate::Range(RangePredicate { range: Range { @@ -252,7 +252,7 @@ mod tests { fn test_keys_fst_applier_try_from_filter_out_unmatched_keys() { let predicates = vec![ Predicate::InList(InListPredicate { - list: HashSet::from_iter(vec![b("foo"), b("bar")]), + list: BTreeSet::from_iter(vec![b("foo"), b("bar")]), }), Predicate::Range(RangePredicate { range: Range { @@ -300,7 +300,7 @@ mod tests { fn test_keys_fst_applier_try_from_with_invalid_regex() { let predicates = vec![ Predicate::InList(InListPredicate { - list: HashSet::from_iter(vec![b("foo"), b("bar")]), + list: BTreeSet::from_iter(vec![b("foo"), b("bar")]), }), Predicate::RegexMatch(RegexMatchPredicate { pattern: "*invalid regex".to_string(), diff --git a/src/index/src/inverted_index/search/predicate.rs b/src/index/src/inverted_index/search/predicate.rs index dbbc361270..1ae8acebba 100644 --- a/src/index/src/inverted_index/search/predicate.rs +++ b/src/index/src/inverted_index/search/predicate.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::BTreeSet; use crate::Bytes; /// Enumerates types of predicates for value filtering. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum Predicate { /// Predicate for matching values in a list. InList(InListPredicate), @@ -31,14 +31,14 @@ pub enum Predicate { /// `InListPredicate` contains a list of acceptable values. A value needs to match at least /// one of the elements (logical OR semantic) for the predicate to be satisfied. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct InListPredicate { /// List of acceptable values. - pub list: HashSet, + pub list: BTreeSet, } /// `Bound` is a sub-component of a range, representing a single-sided limit that could be inclusive or exclusive. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Bound { /// Whether the bound is inclusive or exclusive. pub inclusive: bool, @@ -48,7 +48,7 @@ pub struct Bound { /// `Range` defines a single continuous range which can optionally have a lower and/or upper limit. /// Both the lower and upper bounds must be satisfied for the range condition to be true. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Range { /// The lower bound of the range. pub lower: Option, @@ -58,7 +58,7 @@ pub struct Range { /// `RangePredicate` encapsulates a range condition that must be satisfied /// for the predicate to hold true (logical AND semantic between the bounds). -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RangePredicate { /// The range condition. pub range: Range, @@ -66,7 +66,7 @@ pub struct RangePredicate { /// `RegexMatchPredicate` encapsulates a single regex pattern. A value must match /// the pattern for the predicate to be satisfied. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct RegexMatchPredicate { /// The regex pattern. pub pattern: String, diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index bc3ce9cbff..0bbb8400a1 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -29,6 +29,7 @@ use bytes::Bytes; use datatypes::value::Value; use datatypes::vectors::VectorRef; use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef}; +use index::result_cache::IndexResultCache; use moka::notification::RemovalCause; use moka::sync::Cache; use parquet::column::page::Page; @@ -242,6 +243,15 @@ impl CacheStrategy { CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } + + /// Calls [CacheManager::index_result_cache()]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. + pub fn index_result_cache(&self) -> Option<&IndexResultCache> { + match self { + CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(), + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, + } + } } /// Manages cached data for the engine. @@ -258,13 +268,15 @@ pub struct CacheManager { /// A Cache for writing files to object stores. write_cache: Option, /// Cache for inverted index. - index_cache: Option, + inverted_index_cache: Option, /// Cache for bloom filter index. bloom_filter_index_cache: Option, /// Puffin metadata cache. puffin_metadata_cache: Option, /// Cache for time series selectors. selector_result_cache: Option, + /// Cache for index result. + index_result_cache: Option, } pub type CacheManagerRef = Arc; @@ -410,7 +422,7 @@ impl CacheManager { } pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> { - self.index_cache.as_ref() + self.inverted_index_cache.as_ref() } pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> { @@ -420,6 +432,10 @@ impl CacheManager { pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { self.puffin_metadata_cache.as_ref() } + + pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> { + self.index_result_cache.as_ref() + } } /// Increases selector cache miss metrics. @@ -441,6 +457,7 @@ pub struct CacheManagerBuilder { index_metadata_size: u64, index_content_size: u64, index_content_page_size: u64, + index_result_cache_size: u64, puffin_metadata_size: u64, write_cache: Option, selector_result_cache_size: u64, @@ -489,6 +506,12 @@ impl CacheManagerBuilder { self } + /// Sets cache size for index result. + pub fn index_result_cache_size(mut self, bytes: u64) -> Self { + self.index_result_cache_size = bytes; + self + } + /// Sets cache size for puffin metadata. pub fn puffin_metadata_size(mut self, bytes: u64) -> Self { self.puffin_metadata_size = bytes; @@ -566,6 +589,8 @@ impl CacheManagerBuilder { self.index_content_size, self.index_content_page_size, ); + let index_result_cache = (self.index_result_cache_size != 0) + .then(|| IndexResultCache::new(self.index_result_cache_size)); let puffin_metadata_cache = PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES); let selector_result_cache = (self.selector_result_cache_size != 0).then(|| { @@ -588,10 +613,11 @@ impl CacheManagerBuilder { vector_cache, page_cache, write_cache: self.write_cache, - index_cache: Some(Arc::new(inverted_index_cache)), + inverted_index_cache: Some(Arc::new(inverted_index_cache)), bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)), puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)), selector_result_cache, + index_result_cache, } } } diff --git a/src/mito2/src/cache/index.rs b/src/mito2/src/cache/index.rs index c28ac063b3..5cb7770e62 100644 --- a/src/mito2/src/cache/index.rs +++ b/src/mito2/src/cache/index.rs @@ -14,6 +14,7 @@ pub mod bloom_filter_index; pub mod inverted_index; +pub mod result_cache; use std::future::Future; use std::hash::Hash; diff --git a/src/mito2/src/cache/index/result_cache.rs b/src/mito2/src/cache/index/result_cache.rs new file mode 100644 index 0000000000..b82788763d --- /dev/null +++ b/src/mito2/src/cache/index/result_cache.rs @@ -0,0 +1,423 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use index::bloom_filter::applier::InListPredicate; +use index::inverted_index::search::predicate::{Predicate, RangePredicate}; +use moka::notification::RemovalCause; +use moka::sync::Cache; +use store_api::storage::ColumnId; + +use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS}; +use crate::sst::file::FileId; +use crate::sst::index::fulltext_index::applier::builder::{ + FulltextQuery, FulltextRequest, FulltextTerm, +}; +use crate::sst::parquet::row_selection::RowGroupSelection; + +const INDEX_RESULT_TYPE: &str = "index_result"; + +/// Cache for storing index query results. +pub struct IndexResultCache { + cache: Cache<(PredicateKey, FileId), Arc>, +} + +impl IndexResultCache { + /// Creates a new cache with the given capacity. + pub fn new(capacity: u64) -> Self { + fn to_str(cause: RemovalCause) -> &'static str { + match cause { + RemovalCause::Expired => "expired", + RemovalCause::Explicit => "explicit", + RemovalCause::Replaced => "replaced", + RemovalCause::Size => "size", + } + } + + let cache = Cache::builder() + .max_capacity(capacity) + .weigher(Self::index_result_cache_weight) + .eviction_listener(|k, v, cause| { + let size = Self::index_result_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[INDEX_RESULT_TYPE]) + .sub(size.into()); + CACHE_EVICTION + .with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)]) + .inc(); + }) + .build(); + Self { cache } + } + + /// Puts a query result into the cache. + pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc) { + let key = (key, file_id); + let size = Self::index_result_cache_weight(&key, &result); + CACHE_BYTES + .with_label_values(&[INDEX_RESULT_TYPE]) + .add(size.into()); + self.cache.insert(key, result); + } + + /// Gets a query result from the cache. + pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option> { + let res = self.cache.get(&(key.clone(), file_id)); + if res.is_some() { + CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc(); + } else { + CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc() + } + res + } + + /// Calculates the memory usage of a cache entry. + fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc) -> u32 { + k.0.mem_usage() as u32 + v.mem_usage() as u32 + } +} + +/// Key for different types of index predicates. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum PredicateKey { + /// Fulltext index predicate. + Fulltext(FulltextIndexKey), + /// Bloom filter predicate. + Bloom(BloomFilterKey), + /// Inverted index predicate. + Inverted(InvertedIndexKey), +} + +impl PredicateKey { + /// Creates a new fulltext index key. + pub fn new_fulltext(predicates: Arc>) -> Self { + Self::Fulltext(FulltextIndexKey::new(predicates)) + } + + /// Creates a new bloom filter key. + pub fn new_bloom(predicates: Arc>>) -> Self { + Self::Bloom(BloomFilterKey::new(predicates)) + } + + /// Creates a new inverted index key. + pub fn new_inverted(predicates: Arc>>) -> Self { + Self::Inverted(InvertedIndexKey::new(predicates)) + } + + /// Returns the memory usage of this key. + pub fn mem_usage(&self) -> usize { + match self { + Self::Fulltext(key) => key.mem_usage, + Self::Bloom(key) => key.mem_usage, + Self::Inverted(key) => key.mem_usage, + } + } +} + +/// Key for fulltext index queries. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct FulltextIndexKey { + predicates: Arc>, + mem_usage: usize, +} + +impl FulltextIndexKey { + /// Creates a new fulltext index key with the given predicates. + /// Calculates memory usage based on the size of queries and terms. + pub fn new(predicates: Arc>) -> Self { + let mem_usage = predicates + .values() + .map(|request| { + let query_size = request + .queries + .iter() + .map(|query| query.0.len() + size_of::()) + .sum::(); + let term_size = request + .terms + .iter() + .map(|term| term.term.len() + size_of::()) + .sum::(); + query_size + term_size + }) + .sum(); + Self { + predicates, + mem_usage, + } + } +} + +/// Key for bloom filter queries. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct BloomFilterKey { + predicates: Arc>>, + mem_usage: usize, +} + +impl BloomFilterKey { + /// Creates a new bloom filter key with the given predicates. + /// Calculates memory usage based on the size of predicate lists. + pub fn new(predicates: Arc>>) -> Self { + let mem_usage = predicates + .values() + .map(|predicates| { + predicates + .iter() + .map(|predicate| predicate.list.iter().map(|list| list.len()).sum::()) + .sum::() + }) + .sum(); + Self { + predicates, + mem_usage, + } + } +} + +/// Key for inverted index queries. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct InvertedIndexKey { + predicates: Arc>>, + mem_usage: usize, +} + +impl InvertedIndexKey { + /// Creates a new inverted index key with the given predicates. + /// Calculates memory usage based on the type and size of predicates. + pub fn new(predicates: Arc>>) -> Self { + let mem_usage = predicates + .values() + .map(|predicates| { + predicates + .iter() + .map(|predicate| match predicate { + Predicate::InList(predicate) => { + predicate.list.iter().map(|list| list.len()).sum::() + } + Predicate::Range(_) => size_of::(), + Predicate::RegexMatch(predicate) => predicate.pattern.len(), + }) + .sum::() + }) + .sum(); + + Self { + predicates, + mem_usage, + } + } +} + +#[cfg(test)] +#[allow(clippy::single_range_in_vec_init)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + use std::sync::Arc; + + use index::bloom_filter::applier::InListPredicate as BloomInListPredicate; + use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate}; + + use super::*; + use crate::sst::index::fulltext_index::applier::builder::{ + FulltextQuery, FulltextRequest, FulltextTerm, + }; + use crate::sst::parquet::row_selection::RowGroupSelection; + + #[test] + fn test_cache_basic_operations() { + let cache = IndexResultCache::new(1000); + let file_id = FileId::random(); + + // Create a test key and value + let predicates = BTreeMap::new(); + let key = PredicateKey::new_fulltext(Arc::new(predicates)); + let selection = Arc::new(RowGroupSelection::from_row_ids( + [1, 2, 3].into_iter().collect(), + 1, + 10, + )); + + // Test put and get + cache.put(key.clone(), file_id, selection.clone()); + let retrieved = cache.get(&key, file_id); + assert!(retrieved.is_some()); + assert_eq!( + retrieved.unwrap().as_ref().row_count(), + selection.as_ref().row_count() + ); + + // Test get non-existent key + let non_existent_file_id = FileId::random(); + assert!(cache.get(&key, non_existent_file_id).is_none()); + } + + #[test] + fn test_cache_capacity_limit() { + // Create a cache with small capacity (100 bytes) + let cache = IndexResultCache::new(100); + let file_id1 = FileId::random(); + let file_id2 = FileId::random(); + + // Create two large keys that will exceed capacity + let mut predicates1 = BTreeMap::new(); + let request1 = FulltextRequest { + queries: vec![ + FulltextQuery( + "test query 1 with a very long string to ensure large weight".to_string(), + ), + FulltextQuery("another long query string".to_string()), + ], + terms: vec![], + }; + predicates1.insert(1, request1); + let key1 = PredicateKey::new_fulltext(Arc::new(predicates1)); + let selection1 = Arc::new(RowGroupSelection::default()); + + let mut predicates2 = BTreeMap::new(); + let request2 = FulltextRequest { + queries: vec![ + FulltextQuery( + "test query 2 with a very long string to ensure large weight".to_string(), + ), + FulltextQuery("another long query string".to_string()), + ], + terms: vec![], + }; + predicates2.insert(1, request2); + let key2 = PredicateKey::new_fulltext(Arc::new(predicates2)); + let selection2 = Arc::new(RowGroupSelection::default()); + + // Calculate weights + let weight1 = + IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1); + let weight2 = + IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2); + assert!(weight1 > 100); + assert!(weight2 > 100); + + // Put first key-value pair + cache.put(key1.clone(), file_id1, selection1.clone()); + + // Verify first key is in cache + let retrieved1 = cache.get(&key1, file_id1); + assert!(retrieved1.is_some()); + assert_eq!( + retrieved1.unwrap().as_ref().row_count(), + selection1.as_ref().row_count() + ); + + // Put second key-value pair, which should trigger eviction + cache.put(key2.clone(), file_id2, selection2.clone()); + + // Verify second key is in cache + let retrieved2 = cache.get(&key2, file_id2); + assert!(retrieved2.is_some()); + assert_eq!( + retrieved2.unwrap().as_ref().row_count(), + selection2.as_ref().row_count() + ); + + // Verify first key was evicted + cache.cache.run_pending_tasks(); + let retrieved1_after_eviction = cache.get(&key1, file_id1); + assert!( + retrieved1_after_eviction.is_none(), + "First key should have been evicted" + ); + } + + #[test] + fn test_index_result_cache_weight() { + let file_id = FileId::random(); + + // Test empty values + let empty_predicates = BTreeMap::new(); + let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates)); + let empty_selection = Arc::new(RowGroupSelection::default()); + let empty_weight = IndexResultCache::index_result_cache_weight( + &(empty_key.clone(), file_id), + &empty_selection, + ); + assert_eq!(empty_weight, 0); + assert_eq!( + empty_weight, + empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32 + ); + + // Test 1: FulltextIndexKey + let mut predicates1 = BTreeMap::new(); + let request1 = FulltextRequest { + queries: vec![FulltextQuery("test query".to_string())], + terms: vec![FulltextTerm { + col_lowered: false, + term: "test term".to_string(), + }], + }; + predicates1.insert(1, request1); + let key1 = PredicateKey::new_fulltext(Arc::new(predicates1)); + let selection1 = Arc::new(RowGroupSelection::new(100, 250)); + let weight1 = + IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1); + assert!(weight1 > 0); + assert_eq!( + weight1, + key1.mem_usage() as u32 + selection1.mem_usage() as u32 + ); + + // Test 2: BloomFilterKey + let mut predicates2 = BTreeMap::new(); + let predicate2 = BloomInListPredicate { + list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]), + }; + predicates2.insert(1, vec![predicate2]); + let key2 = PredicateKey::new_bloom(Arc::new(predicates2)); + let selection2 = Arc::new(RowGroupSelection::from_row_ids( + [1, 2, 3].into_iter().collect(), + 100, + 1, + )); + let weight2 = + IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2); + assert!(weight2 > 0); + assert_eq!( + weight2, + key2.mem_usage() as u32 + selection2.mem_usage() as u32 + ); + + // Test 3: InvertedIndexKey + let mut predicates3 = BTreeMap::new(); + let predicate3 = Predicate::Range(RangePredicate { + range: Range { + lower: None, + upper: None, + }, + }); + predicates3.insert(1, vec![predicate3]); + let key3 = PredicateKey::new_inverted(Arc::new(predicates3)); + let selection3 = Arc::new(RowGroupSelection::from_row_ranges( + vec![(0, vec![5..15])], + 20, + )); + let weight3 = + IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3); + assert!(weight3 > 0); + assert_eq!( + weight3, + key3.mem_usage() as u32 + selection3.mem_usage() as u32 + ); + } +} diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index abdfec912e..b9613292c8 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -265,6 +265,8 @@ impl MitoConfig { self.vector_cache_size = mem_cache_size; self.page_cache_size = page_cache_size; self.selector_result_cache_size = mem_cache_size; + + self.index.adjust_buffer_and_cache_size(sys_memory); } /// Enable write cache. @@ -315,6 +317,8 @@ pub struct IndexConfig { pub content_cache_size: ReadableSize, /// Page size for inverted index content. pub content_cache_page_size: ReadableSize, + /// Cache size for index result. Setting it to 0 to disable the cache. + pub result_cache_size: ReadableSize, } impl Default for IndexConfig { @@ -327,6 +331,7 @@ impl Default for IndexConfig { metadata_cache_size: ReadableSize::mb(64), content_cache_size: ReadableSize::mb(128), content_cache_page_size: ReadableSize::kb(64), + result_cache_size: ReadableSize::mb(128), } } } @@ -365,6 +370,18 @@ impl IndexConfig { Ok(()) } + + pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) { + let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128)); + self.result_cache_size = cmp::min(self.result_cache_size, cache_size); + self.content_cache_size = cmp::min(self.content_cache_size, cache_size); + + let metadata_cache_size = cmp::min( + sys_memory / SST_META_CACHE_SIZE_FACTOR, + ReadableSize::mb(64), + ); + self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size); + } } /// Operational mode for certain actions. diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index fac5db5405..6f7c587397 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -14,7 +14,7 @@ mod builder; -use std::collections::HashMap; +use std::collections::BTreeMap; use std::ops::Range; use std::sync::Arc; @@ -33,6 +33,7 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, }; +use crate::cache::index::result_cache::PredicateKey; use crate::error::{ ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, @@ -71,7 +72,10 @@ pub struct BloomFilterIndexApplier { /// Bloom filter predicates. /// For each column, the value will be retained only if it contains __all__ predicates. - predicates: HashMap>, + predicates: Arc>>, + + /// Predicate key. Used to identify the predicate and fetch result from cache. + predicate_key: PredicateKey, } impl BloomFilterIndexApplier { @@ -83,8 +87,9 @@ impl BloomFilterIndexApplier { region_id: RegionId, object_store: ObjectStore, puffin_manager_factory: PuffinManagerFactory, - predicates: HashMap>, + predicates: BTreeMap>, ) -> Self { + let predicates = Arc::new(predicates); Self { region_dir, region_id, @@ -93,6 +98,7 @@ impl BloomFilterIndexApplier { puffin_manager_factory, puffin_metadata_cache: None, bloom_filter_index_cache: None, + predicate_key: PredicateKey::new_bloom(predicates.clone()), predicates, } } @@ -150,7 +156,7 @@ impl BloomFilterIndexApplier { .map(|(i, range)| (*i, vec![range.clone()])) .collect::>(); - for (column_id, predicates) in &self.predicates { + for (column_id, predicates) in self.predicates.iter() { let blob = match self .blob_reader(file_id, *column_id, file_size_hint) .await? @@ -320,6 +326,11 @@ impl BloomFilterIndexApplier { Ok(()) } + + /// Returns the predicate key. + pub fn predicate_key(&self) -> &PredicateKey { + &self.predicate_key + } } fn is_blob_not_found(err: &Error) -> bool { diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs index 8d6a8db6f2..a9e43ebe32 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet}; use common_telemetry::warn; use datafusion_common::ScalarValue; @@ -44,7 +44,7 @@ pub struct BloomFilterIndexApplierBuilder<'a> { file_cache: Option, puffin_metadata_cache: Option, bloom_filter_index_cache: Option, - predicates: HashMap>, + predicates: BTreeMap>, } impl<'a> BloomFilterIndexApplierBuilder<'a> { @@ -62,7 +62,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { file_cache: None, puffin_metadata_cache: None, bloom_filter_index_cache: None, - predicates: HashMap::default(), + predicates: BTreeMap::default(), } } @@ -168,7 +168,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { .entry(column_id) .or_default() .push(InListPredicate { - list: HashSet::from([value]), + list: BTreeSet::from([value]), }); Ok(()) @@ -196,7 +196,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> { .map(|lit| encode_lit(lit, data_type.clone())); // Collect successful conversions - let mut valid_predicates = HashSet::new(); + let mut valid_predicates = BTreeSet::new(); for predicate in predicates { match predicate { Ok(p) => { @@ -323,7 +323,7 @@ mod tests { ConcreteDataType::string_datatype(), ) .unwrap(); - assert_eq!(column_predicates[0].list, HashSet::from([expected])); + assert_eq!(column_predicates[0].list, BTreeSet::from([expected])); } fn int64_lit(i: i64) -> Expr { diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 063227a89f..20a15937ac 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeSet, HashMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::iter; use std::ops::Range; use std::sync::Arc; @@ -34,6 +34,7 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::bloom_filter_index::{ BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag, }; +use crate::cache::index::result_cache::PredicateKey; use crate::error::{ ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, @@ -52,13 +53,16 @@ pub mod builder; /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files pub struct FulltextIndexApplier { /// Requests to be applied. - requests: HashMap, + requests: Arc>, /// The source of the index. index_source: IndexSource, /// Cache for bloom filter index. bloom_filter_index_cache: Option, + + /// Predicate key. Used to identify the predicate and fetch result from cache. + predicate_key: PredicateKey, } pub type FulltextIndexApplierRef = Arc; @@ -69,12 +73,14 @@ impl FulltextIndexApplier { region_dir: String, region_id: RegionId, store: ObjectStore, - requests: HashMap, + requests: BTreeMap, puffin_manager_factory: PuffinManagerFactory, ) -> Self { + let requests = Arc::new(requests); let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store); Self { + predicate_key: PredicateKey::new_fulltext(requests.clone()), requests, index_source, bloom_filter_index_cache: None, @@ -105,6 +111,11 @@ impl FulltextIndexApplier { self.bloom_filter_index_cache = bloom_filter_index_cache; self } + + /// Returns the predicate key. + pub fn predicate_key(&self) -> &PredicateKey { + &self.predicate_key + } } impl FulltextIndexApplier { @@ -120,7 +131,7 @@ impl FulltextIndexApplier { .start_timer(); let mut row_ids: Option> = None; - for (column_id, request) in &self.requests { + for (column_id, request) in self.requests.iter() { if request.queries.is_empty() && request.terms.is_empty() { continue; } @@ -233,7 +244,7 @@ impl FulltextIndexApplier { let (input, mut output) = Self::init_coarse_output(row_groups); let mut applied = false; - for (column_id, request) in &self.requests { + for (column_id, request) in self.requests.iter() { if request.terms.is_empty() { // only apply terms continue; diff --git a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs index 3297275f26..d3054306e6 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier/builder.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier/builder.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::BTreeMap; use datafusion_common::ScalarValue; use datafusion_expr::expr::ScalarFunction; @@ -31,7 +31,7 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory; /// A request for fulltext index. /// /// It contains all the queries and terms for a column. -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)] pub struct FulltextRequest { pub queries: Vec, pub terms: Vec, @@ -65,14 +65,14 @@ impl FulltextRequest { /// A query to be matched in fulltext index. /// /// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FulltextQuery(pub String); /// A term to be matched in fulltext index. /// /// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`. /// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FulltextTerm { pub col_lowered: bool, pub term: String, @@ -137,7 +137,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> { /// Builds `SstIndexApplier` from the given expressions. pub fn build(self, exprs: &[Expr]) -> Result> { - let mut requests = HashMap::new(); + let mut requests = BTreeMap::new(); for expr in exprs { Self::extract_requests(expr, self.metadata, &mut requests); } @@ -164,7 +164,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> { fn extract_requests( expr: &Expr, metadata: &'a RegionMetadata, - requests: &mut HashMap, + requests: &mut BTreeMap, ) { match expr { Expr::BinaryExpr(BinaryExpr { @@ -526,7 +526,7 @@ mod tests { func: matches_func(), }); - let mut requests = HashMap::new(); + let mut requests = BTreeMap::new(); FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests); assert_eq!(requests.len(), 1); @@ -565,7 +565,7 @@ mod tests { right: Box::new(matches_term_expr), }); - let mut requests = HashMap::new(); + let mut requests = BTreeMap::new(); FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests); assert_eq!(requests.len(), 1); diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index fc9aae9f42..dfc4127435 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -356,7 +356,7 @@ impl AltFulltextCreator { #[cfg(test)] mod tests { - use std::collections::BTreeSet; + use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use api::v1::SemanticType; @@ -573,7 +573,7 @@ mod tests { let object_store = object_store.clone(); let factory = factory.clone(); - let mut requests: HashMap = HashMap::new(); + let mut requests: BTreeMap = BTreeMap::new(); // Add queries for (column_id, query) in queries { diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index c1f2325130..8a5d76d578 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -14,6 +14,7 @@ pub mod builder; +use std::collections::BTreeMap; use std::sync::Arc; use common_base::range_read::RangeReader; @@ -22,15 +23,17 @@ use index::inverted_index::format::reader::InvertedIndexBlobReader; use index::inverted_index::search::index_apply::{ ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext, }; +use index::inverted_index::search::predicate::Predicate; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use puffin::puffin_manager::{PuffinManager, PuffinReader}; use snafu::ResultExt; -use store_api::storage::RegionId; +use store_api::storage::{ColumnId, RegionId}; use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider}; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef}; +use crate::cache::index::result_cache::PredicateKey; use crate::error::{ ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result, }; @@ -67,6 +70,9 @@ pub(crate) struct InvertedIndexApplier { /// Puffin metadata cache. puffin_metadata_cache: Option, + + /// Predicate key. Used to identify the predicate and fetch result from cache. + predicate_key: PredicateKey, } pub(crate) type InvertedIndexApplierRef = Arc; @@ -79,6 +85,7 @@ impl InvertedIndexApplier { store: ObjectStore, index_applier: Box, puffin_manager_factory: PuffinManagerFactory, + predicates: BTreeMap>, ) -> Self { INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); @@ -91,6 +98,7 @@ impl InvertedIndexApplier { puffin_manager_factory, inverted_index_cache: None, puffin_metadata_cache: None, + predicate_key: PredicateKey::new_inverted(Arc::new(predicates)), } } @@ -218,6 +226,11 @@ impl InvertedIndexApplier { .await .context(PuffinBuildReaderSnafu) } + + /// Returns the predicate key. + pub fn predicate_key(&self) -> &PredicateKey { + &self.predicate_key + } } impl Drop for InvertedIndexApplier { @@ -276,6 +289,7 @@ mod tests { object_store, Box::new(mock_index_applier), puffin_manager_factory, + Default::default(), ); let output = sst_index_applier.apply(file_id, None).await.unwrap(); assert_eq!( @@ -323,6 +337,7 @@ mod tests { object_store, Box::new(mock_index_applier), puffin_manager_factory, + Default::default(), ); let res = sst_index_applier.apply(file_id, None).await; assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder.rs b/src/mito2/src/sst/index/inverted_index/applier/builder.rs index 5daa01b9f5..eeda08e47a 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder.rs @@ -18,7 +18,7 @@ mod eq_list; mod in_list; mod regex_match; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashSet}; use common_telemetry::warn; use datafusion_common::ScalarValue; @@ -59,7 +59,7 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> { indexed_column_ids: HashSet, /// Stores predicates during traversal on the Expr tree. - output: HashMap>, + output: BTreeMap>, /// The puffin manager factory. puffin_manager_factory: PuffinManagerFactory, @@ -85,7 +85,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> { object_store, metadata, indexed_column_ids, - output: HashMap::default(), + output: BTreeMap::default(), puffin_manager_factory, file_cache: None, inverted_index_cache: None, @@ -130,8 +130,8 @@ impl<'a> InvertedIndexApplierBuilder<'a> { let predicates = self .output - .into_iter() - .map(|(column_id, predicates)| (column_id.to_string(), predicates)) + .iter() + .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone())) .collect(); let applier = PredicatesIndexApplier::try_from(predicates); @@ -142,6 +142,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> { self.object_store, Box::new(applier.context(BuildIndexApplierSnafu)?), self.puffin_manager_factory, + self.output, ) .with_file_cache(self.file_cache) .with_puffin_metadata_cache(self.puffin_metadata_cache) diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index 765d872500..9b08118eea 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::BTreeSet; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::data_type::ConcreteDataType; @@ -36,7 +36,7 @@ impl InvertedIndexApplierBuilder<'_> { }; let predicate = Predicate::InList(InListPredicate { - list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]), + list: BTreeSet::from_iter([Self::encode_lit(lit, data_type)?]), }); self.add_predicate(column_id, predicate); Ok(()) @@ -64,7 +64,7 @@ impl InvertedIndexApplierBuilder<'_> { }; let bytes = Self::encode_lit(lit, data_type.clone())?; - let mut inlist = HashSet::from_iter([bytes]); + let mut inlist = BTreeSet::from_iter([bytes]); if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? { let predicate = Predicate::InList(InListPredicate { list: inlist }); @@ -82,7 +82,7 @@ impl InvertedIndexApplierBuilder<'_> { column_name: &str, data_type: &ConcreteDataType, expr: &DfExpr, - inlist: &mut HashSet, + inlist: &mut BTreeSet, ) -> Result { let DfExpr::BinaryExpr(BinaryExpr { left, @@ -122,6 +122,8 @@ impl InvertedIndexApplierBuilder<'_> { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; use crate::error::Error; use crate::sst::index::inverted_index::applier::builder::tests::{ @@ -154,13 +156,13 @@ mod tests { assert_eq!( predicates[0], Predicate::InList(InListPredicate { - list: HashSet::from_iter([encoded_string("foo")]) + list: BTreeSet::from_iter([encoded_string("foo")]) }) ); assert_eq!( predicates[1], Predicate::InList(InListPredicate { - list: HashSet::from_iter([encoded_string("bar")]) + list: BTreeSet::from_iter([encoded_string("bar")]) }) ); } @@ -187,7 +189,7 @@ mod tests { assert_eq!( predicates[0], Predicate::InList(InListPredicate { - list: HashSet::from_iter([encoded_string("abc")]) + list: BTreeSet::from_iter([encoded_string("abc")]) }) ); } @@ -275,7 +277,7 @@ mod tests { assert_eq!( predicates[0], Predicate::InList(InListPredicate { - list: HashSet::from_iter([ + list: BTreeSet::from_iter([ encoded_string("abc"), encoded_string("foo"), encoded_string("bar"), diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs index 224e10c452..cb9993b6fb 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/in_list.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::BTreeSet; use datafusion_expr::expr::InList; use index::inverted_index::search::predicate::{InListPredicate, Predicate}; @@ -34,7 +34,7 @@ impl InvertedIndexApplierBuilder<'_> { }; let mut predicate = InListPredicate { - list: HashSet::with_capacity(inlist.list.len()), + list: BTreeSet::new(), }; for lit in &inlist.list { let Some(lit) = Self::nonnull_lit(lit) else { @@ -53,6 +53,8 @@ impl InvertedIndexApplierBuilder<'_> { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; use crate::error::Error; use crate::sst::index::inverted_index::applier::builder::tests::{ @@ -86,7 +88,7 @@ mod tests { assert_eq!( predicates[0], Predicate::InList(InListPredicate { - list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")]) + list: BTreeSet::from_iter([encoded_string("foo"), encoded_string("bar")]) }) ); } @@ -140,7 +142,7 @@ mod tests { assert_eq!( predicates[0], Predicate::InList(InListPredicate { - list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")]) + list: BTreeSet::from_iter([encoded_string("foo"), encoded_string("bar")]) }) ); } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index bf5c99d7b2..6f3feeec07 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -36,6 +36,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; +use crate::cache::index::result_cache::PredicateKey; use crate::cache::CacheStrategy; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, @@ -48,7 +49,7 @@ use crate::metrics::{ use crate::read::prune::{PruneReader, Source}; use crate::read::{Batch, BatchReader}; use crate::row_converter::build_primary_key_codec; -use crate::sst::file::FileHandle; +use crate::sst::file::{FileHandle, FileId}; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; @@ -60,6 +61,31 @@ use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; +const INDEX_TYPE_FULLTEXT: &str = "fulltext"; +const INDEX_TYPE_INVERTED: &str = "inverted"; +const INDEX_TYPE_BLOOM: &str = "bloom filter"; + +macro_rules! handle_index_error { + ($err:expr, $file_handle:expr, $index_type:expr) => { + if cfg!(any(test, feature = "test")) { + panic!( + "Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}", + $index_type, + $file_handle.region_id(), + $file_handle.file_id(), + $err + ); + } else { + warn!( + $err; "Failed to apply {} index, region_id: {}, file_id: {}", + $index_type, + $file_handle.region_id(), + $file_handle.file_id() + ); + } + }; +} + /// Parquet SST reader builder. pub struct ParquetReaderBuilder { /// SST directory. @@ -346,34 +372,39 @@ impl ParquetReaderBuilder { let mut output = RowGroupSelection::new(row_group_size, num_rows as _); - self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics) + self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); + if output.is_empty() { + return output; + } + + let fulltext_filtered = self + .prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics) .await; if output.is_empty() { return output; } - let inverted_filtered = self - .prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics) + self.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics) .await; if output.is_empty() { return output; } - if !inverted_filtered { - self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics); - if output.is_empty() { - return output; - } - } - self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics) .await; if output.is_empty() { return output; } - self.prune_row_groups_by_fulltext_bloom(row_group_size, parquet_meta, &mut output, metrics) + if !fulltext_filtered { + self.prune_row_groups_by_fulltext_bloom( + row_group_size, + parquet_meta, + &mut output, + metrics, + ) .await; + } output } @@ -392,46 +423,42 @@ impl ParquetReaderBuilder { return false; } - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let apply_res = match index_applier - .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) - .await - { - Ok(Some(res)) => res, - Ok(None) => { - return false; - } - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply full-text index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + if self.index_result_cache_get( + predicate_key, + self.file_handle.file_id(), + output, + metrics, + INDEX_TYPE_FULLTEXT, + ) { + return true; + } + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_res = index_applier + .apply_fine(self.file_handle.file_id(), Some(file_size_hint)) + .await; + let selection = match apply_res { + Ok(Some(res)) => { + RowGroupSelection::from_row_ids(res, row_group_size, parquet_meta.num_row_groups()) + } + Ok(None) => return false, + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); return false; } }; - let selection = RowGroupSelection::from_row_ids( - apply_res, - row_group_size, - parquet_meta.num_row_groups(), + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id(), + selection, + output, + metrics, + INDEX_TYPE_FULLTEXT, ); - let intersection = output.intersect(&selection); - - metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count(); - metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count(); - - *output = intersection; - true } @@ -449,44 +476,158 @@ impl ParquetReaderBuilder { let Some(index_applier) = &self.inverted_index_applier else { return false; }; - if !self.file_handle.meta_ref().inverted_index_available() { return false; } - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let apply_output = match index_applier - .apply(self.file_handle.file_id(), Some(file_size_hint)) - .await - { - Ok(output) => output, - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply inverted index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + if self.index_result_cache_get( + predicate_key, + self.file_handle.file_id(), + output, + metrics, + INDEX_TYPE_INVERTED, + ) { + return true; + } + + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let apply_res = index_applier + .apply(self.file_handle.file_id(), Some(file_size_hint)) + .await; + let selection = match apply_res { + Ok(output) => { + RowGroupSelection::from_inverted_index_apply_output(row_group_size, output) + } + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED); return false; } }; - let selection = - RowGroupSelection::from_inverted_index_apply_output(row_group_size, apply_output); - let intersection = output.intersect(&selection); + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id(), + selection, + output, + metrics, + INDEX_TYPE_INVERTED, + ); + true + } - metrics.rg_inverted_filtered += output.row_group_count() - intersection.row_group_count(); - metrics.rows_inverted_filtered += output.row_count() - intersection.row_count(); + async fn prune_row_groups_by_bloom_filter( + &self, + row_group_size: usize, + parquet_meta: &ParquetMetaData, + output: &mut RowGroupSelection, + metrics: &mut ReaderFilterMetrics, + ) -> bool { + let Some(index_applier) = &self.bloom_filter_index_applier else { + return false; + }; + if !self.file_handle.meta_ref().bloom_filter_index_available() { + return false; + } - *output = intersection; + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + if self.index_result_cache_get( + predicate_key, + self.file_handle.file_id(), + output, + metrics, + INDEX_TYPE_BLOOM, + ) { + return true; + } + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let rgs = parquet_meta + .row_groups() + .iter() + .enumerate() + .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))); + let apply_res = index_applier + .apply(self.file_handle.file_id(), Some(file_size_hint), rgs) + .await; + let selection = match apply_res { + Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size), + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM); + return false; + } + }; + + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id(), + selection, + output, + metrics, + INDEX_TYPE_BLOOM, + ); + true + } + + async fn prune_row_groups_by_fulltext_bloom( + &self, + row_group_size: usize, + parquet_meta: &ParquetMetaData, + output: &mut RowGroupSelection, + metrics: &mut ReaderFilterMetrics, + ) -> bool { + let Some(index_applier) = &self.fulltext_index_applier else { + return false; + }; + if !self.file_handle.meta_ref().fulltext_index_available() { + return false; + } + + let predicate_key = index_applier.predicate_key(); + // Fast path: return early if the result is in the cache. + if self.index_result_cache_get( + predicate_key, + self.file_handle.file_id(), + output, + metrics, + INDEX_TYPE_FULLTEXT, + ) { + return true; + } + + // Slow path: apply the index from the file. + let file_size_hint = self.file_handle.meta_ref().index_file_size(); + let rgs = parquet_meta + .row_groups() + .iter() + .enumerate() + .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))); + let apply_res = index_applier + .apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs) + .await; + let selection = match apply_res { + Ok(Some(apply_output)) => { + RowGroupSelection::from_row_ranges(apply_output, row_group_size) + } + Ok(None) => return false, + Err(err) => { + handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT); + return false; + } + }; + + self.apply_index_result_and_update_cache( + predicate_key, + self.file_handle.file_id(), + selection, + output, + metrics, + INDEX_TYPE_FULLTEXT, + ); true } @@ -533,126 +674,57 @@ impl ParquetReaderBuilder { true } - async fn prune_row_groups_by_bloom_filter( + fn index_result_cache_get( &self, - row_group_size: usize, - parquet_meta: &ParquetMetaData, + predicate_key: &PredicateKey, + file_id: FileId, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, + index_type: &str, ) -> bool { - let Some(index_applier) = &self.bloom_filter_index_applier else { - return false; - }; - - if !self.file_handle.meta_ref().bloom_filter_index_available() { - return false; - } - - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let apply_output = match index_applier - .apply( - self.file_handle.file_id(), - Some(file_size_hint), - parquet_meta - .row_groups() - .iter() - .enumerate() - .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))), - ) - .await - { - Ok(apply_output) => apply_output, - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply bloom filter index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } - - return false; + if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() { + let result = index_result_cache.get(predicate_key, file_id); + if let Some(result) = result { + apply_selection_and_update_metrics(output, &result, metrics, index_type); + return true; } - }; - - let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size); - let intersection = output.intersect(&selection); - - metrics.rg_bloom_filtered += output.row_group_count() - intersection.row_group_count(); - metrics.rows_bloom_filtered += output.row_count() - intersection.row_count(); - - *output = intersection; - - true + } + false } - async fn prune_row_groups_by_fulltext_bloom( + fn apply_index_result_and_update_cache( &self, - row_group_size: usize, - parquet_meta: &ParquetMetaData, + predicate_key: &PredicateKey, + file_id: FileId, + result: RowGroupSelection, output: &mut RowGroupSelection, metrics: &mut ReaderFilterMetrics, - ) -> bool { - let Some(index_applier) = &self.fulltext_index_applier else { - return false; - }; + index_type: &str, + ) { + apply_selection_and_update_metrics(output, &result, metrics, index_type); - if !self.file_handle.meta_ref().fulltext_index_available() { - return false; + if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() { + index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result)); } - - let file_size_hint = self.file_handle.meta_ref().index_file_size(); - let apply_output = match index_applier - .apply_coarse( - self.file_handle.file_id(), - Some(file_size_hint), - parquet_meta - .row_groups() - .iter() - .enumerate() - .map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))), - ) - .await - { - Ok(Some(apply_output)) => apply_output, - Ok(None) => return false, - Err(err) => { - if cfg!(any(test, feature = "test")) { - panic!( - "Failed to apply fulltext index, region_id: {}, file_id: {}, err: {:?}", - self.file_handle.region_id(), - self.file_handle.file_id(), - err - ); - } else { - warn!( - err; "Failed to apply fulltext index, region_id: {}, file_id: {}", - self.file_handle.region_id(), self.file_handle.file_id() - ); - } - - return false; - } - }; - - let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size); - let intersection = output.intersect(&selection); - - metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count(); - metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count(); - - *output = intersection; - - true } } +fn apply_selection_and_update_metrics( + output: &mut RowGroupSelection, + result: &RowGroupSelection, + metrics: &mut ReaderFilterMetrics, + index_type: &str, +) { + let intersection = output.intersect(result); + + let row_group_count = output.row_group_count() - intersection.row_group_count(); + let row_count = output.row_count() - intersection.row_count(); + + metrics.update_index_metrics(index_type, row_group_count, row_count); + + *output = intersection; +} + /// Metrics of filtering rows groups and rows. #[derive(Debug, Default, Clone, Copy)] pub(crate) struct ReaderFilterMetrics { @@ -729,6 +801,24 @@ impl ReaderFilterMetrics { .with_label_values(&["bloom_filter_index_filtered"]) .inc_by(self.rows_bloom_filtered as u64); } + + fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) { + match index_type { + INDEX_TYPE_FULLTEXT => { + self.rg_fulltext_filtered += row_group_count; + self.rows_fulltext_filtered += row_count; + } + INDEX_TYPE_INVERTED => { + self.rg_inverted_filtered += row_group_count; + self.rows_inverted_filtered += row_count; + } + INDEX_TYPE_BLOOM => { + self.rg_bloom_filtered += row_group_count; + self.rows_bloom_filtered += row_count; + } + _ => {} + } + } } /// Parquet reader metrics. diff --git a/src/mito2/src/sst/parquet/row_selection.rs b/src/mito2/src/sst/parquet/row_selection.rs index 964f148cb1..ce8ba377d3 100644 --- a/src/mito2/src/sst/parquet/row_selection.rs +++ b/src/mito2/src/sst/parquet/row_selection.rs @@ -26,6 +26,8 @@ pub struct RowGroupSelection { selection_in_rg: BTreeMap, /// Total number of rows in the selection. row_count: usize, + /// Total length of the selectors. + selector_len: usize, } /// A row selection with its count. @@ -35,6 +37,8 @@ struct RowSelectionWithCount { selection: RowSelection, /// Number of rows in the selection. row_count: usize, + /// Length of the selectors. + selector_len: usize, } impl RowGroupSelection { @@ -61,6 +65,7 @@ impl RowGroupSelection { RowSelectionWithCount { selection, row_count: row_group_size, + selector_len: 1, }, ); } @@ -68,6 +73,7 @@ impl RowGroupSelection { Self { selection_in_rg, row_count: total_row_count, + selector_len: row_group_count, } } @@ -109,6 +115,7 @@ impl RowGroupSelection { // Step 2: Group ranges by row group ID and create row selections let mut total_row_count = 0; + let mut total_selector_len = 0; let selection_in_rg = row_group_ranges .chunk_by(|(row_group_id, _)| *row_group_id) .into_iter() @@ -122,12 +129,15 @@ impl RowGroupSelection { // by the min() operation above let selection = row_selection_from_row_ranges(ranges, row_group_size); let row_count = selection.row_count(); + let selector_len = selector_len(&selection); total_row_count += row_count; + total_selector_len += selector_len; ( row_group_id, RowSelectionWithCount { selection, row_count, + selector_len, }, ) }) @@ -136,6 +146,7 @@ impl RowGroupSelection { Self { selection_in_rg, row_count: total_row_count, + selector_len: total_selector_len, } } @@ -161,18 +172,22 @@ impl RowGroupSelection { // Step 2: Create row selections for each row group let mut total_row_count = 0; + let mut total_selector_len = 0; let selection_in_rg = row_group_to_row_ids .into_iter() .map(|(row_group_id, row_ids)| { let selection = row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size); let row_count = selection.row_count(); + let selector_len = selector_len(&selection); total_row_count += row_count; + total_selector_len += selector_len; ( row_group_id, RowSelectionWithCount { selection, row_count, + selector_len, }, ) }) @@ -181,6 +196,7 @@ impl RowGroupSelection { Self { selection_in_rg, row_count: total_row_count, + selector_len: total_selector_len, } } @@ -201,17 +217,21 @@ impl RowGroupSelection { row_group_size: usize, ) -> Self { let mut total_row_count = 0; + let mut total_selector_len = 0; let selection_in_rg = row_ranges .into_iter() .map(|(row_group_id, ranges)| { let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size); let row_count = selection.row_count(); + let selector_len = selector_len(&selection); total_row_count += row_count; + total_selector_len += selector_len; ( row_group_id, RowSelectionWithCount { selection, row_count, + selector_len, }, ) }) @@ -220,6 +240,7 @@ impl RowGroupSelection { Self { selection_in_rg, row_count: total_row_count, + selector_len: total_selector_len, } } @@ -262,6 +283,7 @@ impl RowGroupSelection { pub fn intersect(&self, other: &Self) -> Self { let mut res = BTreeMap::new(); let mut total_row_count = 0; + let mut total_selector_len = 0; for (rg_id, x) in other.selection_in_rg.iter() { let Some(y) = self.selection_in_rg.get(rg_id) else { @@ -269,13 +291,16 @@ impl RowGroupSelection { }; let selection = x.selection.intersection(&y.selection); let row_count = selection.row_count(); + let selector_len = selector_len(&selection); if row_count > 0 { total_row_count += row_count; + total_selector_len += selector_len; res.insert( *rg_id, RowSelectionWithCount { selection, row_count, + selector_len, }, ); } @@ -284,6 +309,7 @@ impl RowGroupSelection { Self { selection_in_rg: res, row_count: total_row_count, + selector_len: total_selector_len, } } @@ -304,21 +330,27 @@ impl RowGroupSelection { RowSelectionWithCount { selection, row_count, + selector_len, }, ) = self.selection_in_rg.pop_first()?; self.row_count -= row_count; + self.selector_len -= selector_len; Some((row_group_id, selection)) } /// Removes a row group from the selection. pub fn remove_row_group(&mut self, row_group_id: usize) { - let Some(RowSelectionWithCount { row_count, .. }) = - self.selection_in_rg.remove(&row_group_id) + let Some(RowSelectionWithCount { + row_count, + selector_len, + .. + }) = self.selection_in_rg.remove(&row_group_id) else { return; }; self.row_count -= row_count; + self.selector_len -= selector_len; } /// Returns true if the selection is empty. @@ -337,6 +369,12 @@ impl RowGroupSelection { .iter() .map(|(row_group_id, x)| (row_group_id, &x.selection)) } + + /// Returns the memory usage of the selection. + pub fn mem_usage(&self) -> usize { + self.selector_len * size_of::() + + self.selection_in_rg.len() * size_of::() + } } /// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s. @@ -420,11 +458,32 @@ fn add_or_merge_selector(selectors: &mut Vec, count: usize, is_skip selectors.push(new_selector); } +/// Returns the length of the selectors in the selection. +fn selector_len(selection: &RowSelection) -> usize { + selection.iter().size_hint().0 +} + #[cfg(test)] #[allow(clippy::single_range_in_vec_init)] mod tests { use super::*; + #[test] + fn test_selector_len() { + let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]); + assert_eq!(selector_len(&selection), 2); + + let selection = RowSelection::from(vec![ + RowSelector::select(5), + RowSelector::skip(5), + RowSelector::select(5), + ]); + assert_eq!(selector_len(&selection), 3); + + let selection = RowSelection::from(vec![]); + assert_eq!(selector_len(&selection), 0); + } + #[test] fn test_single_contiguous_range() { let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10); diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 9cc81da453..a45dc5277f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -177,6 +177,7 @@ impl WorkerGroup { .index_metadata_size(config.index.metadata_cache_size.as_bytes()) .index_content_size(config.index.content_cache_size.as_bytes()) .index_content_page_size(config.index.content_cache_page_size.as_bytes()) + .index_result_cache_size(config.index.result_cache_size.as_bytes()) .puffin_metadata_size(config.index.metadata_cache_size.as_bytes()) .write_cache(write_cache) .build(), diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index aa373e1bbc..e67f939fc5 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1187,6 +1187,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "selector_result_cache_size =", "metadata_cache_size =", "content_cache_size =", + "result_cache_size =", "name =", "recovery_parallelism =", "max_background_flushes =",