mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: introduce index result cache (#6110)
* feat: introduce index result cache Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * Update src/mito2/src/sst/index/inverted_index/applier/builder.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * optimize selector_len Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * address comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * address comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * address comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -154,6 +154,7 @@
|
||||
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
|
||||
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
|
||||
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
|
||||
| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. |
|
||||
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
|
||||
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
@@ -494,6 +495,7 @@
|
||||
| `region_engine.mito.index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
|
||||
| `region_engine.mito.index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
|
||||
| `region_engine.mito.index.content_cache_page_size` | String | `64KiB` | Page size for inverted index content cache. |
|
||||
| `region_engine.mito.index.result_cache_size` | String | `128MiB` | Cache size for index result. |
|
||||
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
|
||||
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
|
||||
|
||||
@@ -499,6 +499,9 @@ content_cache_size = "128MiB"
|
||||
## Page size for inverted index content cache.
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
## Cache size for index result.
|
||||
result_cache_size = "128MiB"
|
||||
|
||||
## The options for inverted index in Mito engine.
|
||||
[region_engine.mito.inverted_index]
|
||||
|
||||
|
||||
@@ -590,6 +590,9 @@ content_cache_size = "128MiB"
|
||||
## Page size for inverted index content cache.
|
||||
content_cache_page_size = "64KiB"
|
||||
|
||||
## Cache size for index result.
|
||||
result_cache_size = "128MiB"
|
||||
|
||||
## The options for inverted index in Mito engine.
|
||||
[region_engine.mito.inverted_index]
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
use std::ops::Range;
|
||||
|
||||
use fastbloom::BloomFilter;
|
||||
@@ -25,10 +25,10 @@ use crate::Bytes;
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
pub list: BTreeSet<Bytes>,
|
||||
}
|
||||
|
||||
pub struct BloomFilterApplier {
|
||||
@@ -277,21 +277,21 @@ mod tests {
|
||||
// Single value predicates
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row00".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..4],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row05".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row05".to_vec()]),
|
||||
}],
|
||||
4..8,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row03".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row03".to_vec()]),
|
||||
}],
|
||||
4..8,
|
||||
vec![],
|
||||
@@ -299,14 +299,14 @@ mod tests {
|
||||
// Multiple values in a single predicate (OR logic)
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![4..12],
|
||||
@@ -314,7 +314,7 @@ mod tests {
|
||||
// Non-existent values
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row99".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row99".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![],
|
||||
@@ -322,7 +322,7 @@ mod tests {
|
||||
// Empty range
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row00".to_vec()]),
|
||||
}],
|
||||
12..12,
|
||||
vec![],
|
||||
@@ -330,21 +330,21 @@ mod tests {
|
||||
// Multiple values in a single predicate within specific ranges
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
|
||||
}],
|
||||
0..12,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![4..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec()]),
|
||||
}],
|
||||
6..28,
|
||||
vec![6..8],
|
||||
@@ -352,21 +352,21 @@ mod tests {
|
||||
// Values spanning multiple segments
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![0..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec()]),
|
||||
}],
|
||||
2..28,
|
||||
vec![2..8],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"overp".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overp".to_vec()]),
|
||||
}],
|
||||
0..10,
|
||||
vec![4..10],
|
||||
@@ -374,21 +374,21 @@ mod tests {
|
||||
// Duplicate values
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..12,
|
||||
vec![],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..16,
|
||||
vec![12..16],
|
||||
),
|
||||
(
|
||||
vec![InListPredicate {
|
||||
list: HashSet::from_iter([b"dup".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"dup".to_vec()]),
|
||||
}],
|
||||
0..28,
|
||||
vec![12..28],
|
||||
@@ -397,10 +397,10 @@ mod tests {
|
||||
(
|
||||
vec![
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
|
||||
},
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"seg00".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg00".to_vec()]),
|
||||
},
|
||||
],
|
||||
0..28,
|
||||
@@ -409,10 +409,10 @@ mod tests {
|
||||
(
|
||||
vec![
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"overl".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"overl".to_vec()]),
|
||||
},
|
||||
InListPredicate {
|
||||
list: HashSet::from_iter([b"seg01".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"seg01".to_vec()]),
|
||||
},
|
||||
],
|
||||
0..28,
|
||||
|
||||
@@ -183,7 +183,7 @@ impl TryFrom<Vec<Predicate>> for IntersectionFstApplier {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use super::*;
|
||||
use crate::inverted_index::error::Error;
|
||||
@@ -405,7 +405,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_intersection_fst_applier_with_in_list_predicate() {
|
||||
let result = IntersectionFstApplier::try_from(vec![Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([b"one".to_vec(), b"two".to_vec()]),
|
||||
list: BTreeSet::from_iter([b"one".to_vec(), b"two".to_vec()]),
|
||||
})]);
|
||||
assert!(matches!(
|
||||
result,
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
use std::mem::size_of;
|
||||
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -93,7 +93,7 @@ impl KeysFstApplier {
|
||||
|
||||
fn intersect_with_lists(in_lists: &mut [Predicate]) -> Vec<Bytes> {
|
||||
#[inline]
|
||||
fn get_list(p: &Predicate) -> &HashSet<Bytes> {
|
||||
fn get_list(p: &Predicate) -> &BTreeSet<Bytes> {
|
||||
match p {
|
||||
Predicate::InList(i) => &i.list,
|
||||
_ => unreachable!(), // `in_lists` is filtered by `split_at_in_lists`
|
||||
@@ -229,7 +229,7 @@ mod tests {
|
||||
fn test_keys_fst_applier_try_from() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
list: BTreeSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
@@ -252,7 +252,7 @@ mod tests {
|
||||
fn test_keys_fst_applier_try_from_filter_out_unmatched_keys() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
list: BTreeSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
@@ -300,7 +300,7 @@ mod tests {
|
||||
fn test_keys_fst_applier_try_from_with_invalid_regex() {
|
||||
let predicates = vec![
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
list: BTreeSet::from_iter(vec![b("foo"), b("bar")]),
|
||||
}),
|
||||
Predicate::RegexMatch(RegexMatchPredicate {
|
||||
pattern: "*invalid regex".to_string(),
|
||||
|
||||
@@ -12,12 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use crate::Bytes;
|
||||
|
||||
/// Enumerates types of predicates for value filtering.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum Predicate {
|
||||
/// Predicate for matching values in a list.
|
||||
InList(InListPredicate),
|
||||
@@ -31,14 +31,14 @@ pub enum Predicate {
|
||||
|
||||
/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
|
||||
/// one of the elements (logical OR semantic) for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct InListPredicate {
|
||||
/// List of acceptable values.
|
||||
pub list: HashSet<Bytes>,
|
||||
pub list: BTreeSet<Bytes>,
|
||||
}
|
||||
|
||||
/// `Bound` is a sub-component of a range, representing a single-sided limit that could be inclusive or exclusive.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct Bound {
|
||||
/// Whether the bound is inclusive or exclusive.
|
||||
pub inclusive: bool,
|
||||
@@ -48,7 +48,7 @@ pub struct Bound {
|
||||
|
||||
/// `Range` defines a single continuous range which can optionally have a lower and/or upper limit.
|
||||
/// Both the lower and upper bounds must be satisfied for the range condition to be true.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct Range {
|
||||
/// The lower bound of the range.
|
||||
pub lower: Option<Bound>,
|
||||
@@ -58,7 +58,7 @@ pub struct Range {
|
||||
|
||||
/// `RangePredicate` encapsulates a range condition that must be satisfied
|
||||
/// for the predicate to hold true (logical AND semantic between the bounds).
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct RangePredicate {
|
||||
/// The range condition.
|
||||
pub range: Range,
|
||||
@@ -66,7 +66,7 @@ pub struct RangePredicate {
|
||||
|
||||
/// `RegexMatchPredicate` encapsulates a single regex pattern. A value must match
|
||||
/// the pattern for the predicate to be satisfied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct RegexMatchPredicate {
|
||||
/// The regex pattern.
|
||||
pub pattern: String,
|
||||
|
||||
@@ -29,6 +29,7 @@ use bytes::Bytes;
|
||||
use datatypes::value::Value;
|
||||
use datatypes::vectors::VectorRef;
|
||||
use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
|
||||
use index::result_cache::IndexResultCache;
|
||||
use moka::notification::RemovalCause;
|
||||
use moka::sync::Cache;
|
||||
use parquet::column::page::Page;
|
||||
@@ -242,6 +243,15 @@ impl CacheStrategy {
|
||||
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls [CacheManager::index_result_cache()].
|
||||
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
|
||||
pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
|
||||
match self {
|
||||
CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
|
||||
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Manages cached data for the engine.
|
||||
@@ -258,13 +268,15 @@ pub struct CacheManager {
|
||||
/// A Cache for writing files to object stores.
|
||||
write_cache: Option<WriteCacheRef>,
|
||||
/// Cache for inverted index.
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
inverted_index_cache: Option<InvertedIndexCacheRef>,
|
||||
/// Cache for bloom filter index.
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
/// Puffin metadata cache.
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
/// Cache for time series selectors.
|
||||
selector_result_cache: Option<SelectorResultCache>,
|
||||
/// Cache for index result.
|
||||
index_result_cache: Option<IndexResultCache>,
|
||||
}
|
||||
|
||||
pub type CacheManagerRef = Arc<CacheManager>;
|
||||
@@ -410,7 +422,7 @@ impl CacheManager {
|
||||
}
|
||||
|
||||
pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
|
||||
self.index_cache.as_ref()
|
||||
self.inverted_index_cache.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
|
||||
@@ -420,6 +432,10 @@ impl CacheManager {
|
||||
pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
|
||||
self.puffin_metadata_cache.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
|
||||
self.index_result_cache.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Increases selector cache miss metrics.
|
||||
@@ -441,6 +457,7 @@ pub struct CacheManagerBuilder {
|
||||
index_metadata_size: u64,
|
||||
index_content_size: u64,
|
||||
index_content_page_size: u64,
|
||||
index_result_cache_size: u64,
|
||||
puffin_metadata_size: u64,
|
||||
write_cache: Option<WriteCacheRef>,
|
||||
selector_result_cache_size: u64,
|
||||
@@ -489,6 +506,12 @@ impl CacheManagerBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets cache size for index result.
|
||||
pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
|
||||
self.index_result_cache_size = bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets cache size for puffin metadata.
|
||||
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
|
||||
self.puffin_metadata_size = bytes;
|
||||
@@ -566,6 +589,8 @@ impl CacheManagerBuilder {
|
||||
self.index_content_size,
|
||||
self.index_content_page_size,
|
||||
);
|
||||
let index_result_cache = (self.index_result_cache_size != 0)
|
||||
.then(|| IndexResultCache::new(self.index_result_cache_size));
|
||||
let puffin_metadata_cache =
|
||||
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
|
||||
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
|
||||
@@ -588,10 +613,11 @@ impl CacheManagerBuilder {
|
||||
vector_cache,
|
||||
page_cache,
|
||||
write_cache: self.write_cache,
|
||||
index_cache: Some(Arc::new(inverted_index_cache)),
|
||||
inverted_index_cache: Some(Arc::new(inverted_index_cache)),
|
||||
bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
|
||||
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
|
||||
selector_result_cache,
|
||||
index_result_cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
1
src/mito2/src/cache/index.rs
vendored
1
src/mito2/src/cache/index.rs
vendored
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod bloom_filter_index;
|
||||
pub mod inverted_index;
|
||||
pub mod result_cache;
|
||||
|
||||
use std::future::Future;
|
||||
use std::hash::Hash;
|
||||
|
||||
423
src/mito2/src/cache/index/result_cache.rs
vendored
Normal file
423
src/mito2/src/cache/index/result_cache.rs
vendored
Normal file
@@ -0,0 +1,423 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use index::bloom_filter::applier::InListPredicate;
|
||||
use index::inverted_index::search::predicate::{Predicate, RangePredicate};
|
||||
use moka::notification::RemovalCause;
|
||||
use moka::sync::Cache;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::fulltext_index::applier::builder::{
|
||||
FulltextQuery, FulltextRequest, FulltextTerm,
|
||||
};
|
||||
use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
|
||||
const INDEX_RESULT_TYPE: &str = "index_result";
|
||||
|
||||
/// Cache for storing index query results.
|
||||
pub struct IndexResultCache {
|
||||
cache: Cache<(PredicateKey, FileId), Arc<RowGroupSelection>>,
|
||||
}
|
||||
|
||||
impl IndexResultCache {
|
||||
/// Creates a new cache with the given capacity.
|
||||
pub fn new(capacity: u64) -> Self {
|
||||
fn to_str(cause: RemovalCause) -> &'static str {
|
||||
match cause {
|
||||
RemovalCause::Expired => "expired",
|
||||
RemovalCause::Explicit => "explicit",
|
||||
RemovalCause::Replaced => "replaced",
|
||||
RemovalCause::Size => "size",
|
||||
}
|
||||
}
|
||||
|
||||
let cache = Cache::builder()
|
||||
.max_capacity(capacity)
|
||||
.weigher(Self::index_result_cache_weight)
|
||||
.eviction_listener(|k, v, cause| {
|
||||
let size = Self::index_result_cache_weight(&k, &v);
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_RESULT_TYPE])
|
||||
.sub(size.into());
|
||||
CACHE_EVICTION
|
||||
.with_label_values(&[INDEX_RESULT_TYPE, to_str(cause)])
|
||||
.inc();
|
||||
})
|
||||
.build();
|
||||
Self { cache }
|
||||
}
|
||||
|
||||
/// Puts a query result into the cache.
|
||||
pub fn put(&self, key: PredicateKey, file_id: FileId, result: Arc<RowGroupSelection>) {
|
||||
let key = (key, file_id);
|
||||
let size = Self::index_result_cache_weight(&key, &result);
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_RESULT_TYPE])
|
||||
.add(size.into());
|
||||
self.cache.insert(key, result);
|
||||
}
|
||||
|
||||
/// Gets a query result from the cache.
|
||||
pub fn get(&self, key: &PredicateKey, file_id: FileId) -> Option<Arc<RowGroupSelection>> {
|
||||
let res = self.cache.get(&(key.clone(), file_id));
|
||||
if res.is_some() {
|
||||
CACHE_HIT.with_label_values(&[INDEX_RESULT_TYPE]).inc();
|
||||
} else {
|
||||
CACHE_MISS.with_label_values(&[INDEX_RESULT_TYPE]).inc()
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
/// Calculates the memory usage of a cache entry.
|
||||
fn index_result_cache_weight(k: &(PredicateKey, FileId), v: &Arc<RowGroupSelection>) -> u32 {
|
||||
k.0.mem_usage() as u32 + v.mem_usage() as u32
|
||||
}
|
||||
}
|
||||
|
||||
/// Key for different types of index predicates.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum PredicateKey {
|
||||
/// Fulltext index predicate.
|
||||
Fulltext(FulltextIndexKey),
|
||||
/// Bloom filter predicate.
|
||||
Bloom(BloomFilterKey),
|
||||
/// Inverted index predicate.
|
||||
Inverted(InvertedIndexKey),
|
||||
}
|
||||
|
||||
impl PredicateKey {
|
||||
/// Creates a new fulltext index key.
|
||||
pub fn new_fulltext(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
|
||||
Self::Fulltext(FulltextIndexKey::new(predicates))
|
||||
}
|
||||
|
||||
/// Creates a new bloom filter key.
|
||||
pub fn new_bloom(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
|
||||
Self::Bloom(BloomFilterKey::new(predicates))
|
||||
}
|
||||
|
||||
/// Creates a new inverted index key.
|
||||
pub fn new_inverted(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
|
||||
Self::Inverted(InvertedIndexKey::new(predicates))
|
||||
}
|
||||
|
||||
/// Returns the memory usage of this key.
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
match self {
|
||||
Self::Fulltext(key) => key.mem_usage,
|
||||
Self::Bloom(key) => key.mem_usage,
|
||||
Self::Inverted(key) => key.mem_usage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Key for fulltext index queries.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
|
||||
pub struct FulltextIndexKey {
|
||||
predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>,
|
||||
mem_usage: usize,
|
||||
}
|
||||
|
||||
impl FulltextIndexKey {
|
||||
/// Creates a new fulltext index key with the given predicates.
|
||||
/// Calculates memory usage based on the size of queries and terms.
|
||||
pub fn new(predicates: Arc<BTreeMap<ColumnId, FulltextRequest>>) -> Self {
|
||||
let mem_usage = predicates
|
||||
.values()
|
||||
.map(|request| {
|
||||
let query_size = request
|
||||
.queries
|
||||
.iter()
|
||||
.map(|query| query.0.len() + size_of::<FulltextQuery>())
|
||||
.sum::<usize>();
|
||||
let term_size = request
|
||||
.terms
|
||||
.iter()
|
||||
.map(|term| term.term.len() + size_of::<FulltextTerm>())
|
||||
.sum::<usize>();
|
||||
query_size + term_size
|
||||
})
|
||||
.sum();
|
||||
Self {
|
||||
predicates,
|
||||
mem_usage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Key for bloom filter queries.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
|
||||
pub struct BloomFilterKey {
|
||||
predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
|
||||
mem_usage: usize,
|
||||
}
|
||||
|
||||
impl BloomFilterKey {
|
||||
/// Creates a new bloom filter key with the given predicates.
|
||||
/// Calculates memory usage based on the size of predicate lists.
|
||||
pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>) -> Self {
|
||||
let mem_usage = predicates
|
||||
.values()
|
||||
.map(|predicates| {
|
||||
predicates
|
||||
.iter()
|
||||
.map(|predicate| predicate.list.iter().map(|list| list.len()).sum::<usize>())
|
||||
.sum::<usize>()
|
||||
})
|
||||
.sum();
|
||||
Self {
|
||||
predicates,
|
||||
mem_usage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Key for inverted index queries.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
|
||||
pub struct InvertedIndexKey {
|
||||
predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>,
|
||||
mem_usage: usize,
|
||||
}
|
||||
|
||||
impl InvertedIndexKey {
|
||||
/// Creates a new inverted index key with the given predicates.
|
||||
/// Calculates memory usage based on the type and size of predicates.
|
||||
pub fn new(predicates: Arc<BTreeMap<ColumnId, Vec<Predicate>>>) -> Self {
|
||||
let mem_usage = predicates
|
||||
.values()
|
||||
.map(|predicates| {
|
||||
predicates
|
||||
.iter()
|
||||
.map(|predicate| match predicate {
|
||||
Predicate::InList(predicate) => {
|
||||
predicate.list.iter().map(|list| list.len()).sum::<usize>()
|
||||
}
|
||||
Predicate::Range(_) => size_of::<RangePredicate>(),
|
||||
Predicate::RegexMatch(predicate) => predicate.pattern.len(),
|
||||
})
|
||||
.sum::<usize>()
|
||||
})
|
||||
.sum();
|
||||
|
||||
Self {
|
||||
predicates,
|
||||
mem_usage,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::single_range_in_vec_init)]
|
||||
mod tests {
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use index::bloom_filter::applier::InListPredicate as BloomInListPredicate;
|
||||
use index::inverted_index::search::predicate::{Predicate, Range, RangePredicate};
|
||||
|
||||
use super::*;
|
||||
use crate::sst::index::fulltext_index::applier::builder::{
|
||||
FulltextQuery, FulltextRequest, FulltextTerm,
|
||||
};
|
||||
use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
|
||||
#[test]
|
||||
fn test_cache_basic_operations() {
|
||||
let cache = IndexResultCache::new(1000);
|
||||
let file_id = FileId::random();
|
||||
|
||||
// Create a test key and value
|
||||
let predicates = BTreeMap::new();
|
||||
let key = PredicateKey::new_fulltext(Arc::new(predicates));
|
||||
let selection = Arc::new(RowGroupSelection::from_row_ids(
|
||||
[1, 2, 3].into_iter().collect(),
|
||||
1,
|
||||
10,
|
||||
));
|
||||
|
||||
// Test put and get
|
||||
cache.put(key.clone(), file_id, selection.clone());
|
||||
let retrieved = cache.get(&key, file_id);
|
||||
assert!(retrieved.is_some());
|
||||
assert_eq!(
|
||||
retrieved.unwrap().as_ref().row_count(),
|
||||
selection.as_ref().row_count()
|
||||
);
|
||||
|
||||
// Test get non-existent key
|
||||
let non_existent_file_id = FileId::random();
|
||||
assert!(cache.get(&key, non_existent_file_id).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_capacity_limit() {
|
||||
// Create a cache with small capacity (100 bytes)
|
||||
let cache = IndexResultCache::new(100);
|
||||
let file_id1 = FileId::random();
|
||||
let file_id2 = FileId::random();
|
||||
|
||||
// Create two large keys that will exceed capacity
|
||||
let mut predicates1 = BTreeMap::new();
|
||||
let request1 = FulltextRequest {
|
||||
queries: vec![
|
||||
FulltextQuery(
|
||||
"test query 1 with a very long string to ensure large weight".to_string(),
|
||||
),
|
||||
FulltextQuery("another long query string".to_string()),
|
||||
],
|
||||
terms: vec![],
|
||||
};
|
||||
predicates1.insert(1, request1);
|
||||
let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
|
||||
let selection1 = Arc::new(RowGroupSelection::default());
|
||||
|
||||
let mut predicates2 = BTreeMap::new();
|
||||
let request2 = FulltextRequest {
|
||||
queries: vec![
|
||||
FulltextQuery(
|
||||
"test query 2 with a very long string to ensure large weight".to_string(),
|
||||
),
|
||||
FulltextQuery("another long query string".to_string()),
|
||||
],
|
||||
terms: vec![],
|
||||
};
|
||||
predicates2.insert(1, request2);
|
||||
let key2 = PredicateKey::new_fulltext(Arc::new(predicates2));
|
||||
let selection2 = Arc::new(RowGroupSelection::default());
|
||||
|
||||
// Calculate weights
|
||||
let weight1 =
|
||||
IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id1), &selection1);
|
||||
let weight2 =
|
||||
IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id2), &selection2);
|
||||
assert!(weight1 > 100);
|
||||
assert!(weight2 > 100);
|
||||
|
||||
// Put first key-value pair
|
||||
cache.put(key1.clone(), file_id1, selection1.clone());
|
||||
|
||||
// Verify first key is in cache
|
||||
let retrieved1 = cache.get(&key1, file_id1);
|
||||
assert!(retrieved1.is_some());
|
||||
assert_eq!(
|
||||
retrieved1.unwrap().as_ref().row_count(),
|
||||
selection1.as_ref().row_count()
|
||||
);
|
||||
|
||||
// Put second key-value pair, which should trigger eviction
|
||||
cache.put(key2.clone(), file_id2, selection2.clone());
|
||||
|
||||
// Verify second key is in cache
|
||||
let retrieved2 = cache.get(&key2, file_id2);
|
||||
assert!(retrieved2.is_some());
|
||||
assert_eq!(
|
||||
retrieved2.unwrap().as_ref().row_count(),
|
||||
selection2.as_ref().row_count()
|
||||
);
|
||||
|
||||
// Verify first key was evicted
|
||||
cache.cache.run_pending_tasks();
|
||||
let retrieved1_after_eviction = cache.get(&key1, file_id1);
|
||||
assert!(
|
||||
retrieved1_after_eviction.is_none(),
|
||||
"First key should have been evicted"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_index_result_cache_weight() {
|
||||
let file_id = FileId::random();
|
||||
|
||||
// Test empty values
|
||||
let empty_predicates = BTreeMap::new();
|
||||
let empty_key = PredicateKey::new_fulltext(Arc::new(empty_predicates));
|
||||
let empty_selection = Arc::new(RowGroupSelection::default());
|
||||
let empty_weight = IndexResultCache::index_result_cache_weight(
|
||||
&(empty_key.clone(), file_id),
|
||||
&empty_selection,
|
||||
);
|
||||
assert_eq!(empty_weight, 0);
|
||||
assert_eq!(
|
||||
empty_weight,
|
||||
empty_key.mem_usage() as u32 + empty_selection.mem_usage() as u32
|
||||
);
|
||||
|
||||
// Test 1: FulltextIndexKey
|
||||
let mut predicates1 = BTreeMap::new();
|
||||
let request1 = FulltextRequest {
|
||||
queries: vec![FulltextQuery("test query".to_string())],
|
||||
terms: vec![FulltextTerm {
|
||||
col_lowered: false,
|
||||
term: "test term".to_string(),
|
||||
}],
|
||||
};
|
||||
predicates1.insert(1, request1);
|
||||
let key1 = PredicateKey::new_fulltext(Arc::new(predicates1));
|
||||
let selection1 = Arc::new(RowGroupSelection::new(100, 250));
|
||||
let weight1 =
|
||||
IndexResultCache::index_result_cache_weight(&(key1.clone(), file_id), &selection1);
|
||||
assert!(weight1 > 0);
|
||||
assert_eq!(
|
||||
weight1,
|
||||
key1.mem_usage() as u32 + selection1.mem_usage() as u32
|
||||
);
|
||||
|
||||
// Test 2: BloomFilterKey
|
||||
let mut predicates2 = BTreeMap::new();
|
||||
let predicate2 = BloomInListPredicate {
|
||||
list: BTreeSet::from([b"test1".to_vec(), b"test2".to_vec()]),
|
||||
};
|
||||
predicates2.insert(1, vec![predicate2]);
|
||||
let key2 = PredicateKey::new_bloom(Arc::new(predicates2));
|
||||
let selection2 = Arc::new(RowGroupSelection::from_row_ids(
|
||||
[1, 2, 3].into_iter().collect(),
|
||||
100,
|
||||
1,
|
||||
));
|
||||
let weight2 =
|
||||
IndexResultCache::index_result_cache_weight(&(key2.clone(), file_id), &selection2);
|
||||
assert!(weight2 > 0);
|
||||
assert_eq!(
|
||||
weight2,
|
||||
key2.mem_usage() as u32 + selection2.mem_usage() as u32
|
||||
);
|
||||
|
||||
// Test 3: InvertedIndexKey
|
||||
let mut predicates3 = BTreeMap::new();
|
||||
let predicate3 = Predicate::Range(RangePredicate {
|
||||
range: Range {
|
||||
lower: None,
|
||||
upper: None,
|
||||
},
|
||||
});
|
||||
predicates3.insert(1, vec![predicate3]);
|
||||
let key3 = PredicateKey::new_inverted(Arc::new(predicates3));
|
||||
let selection3 = Arc::new(RowGroupSelection::from_row_ranges(
|
||||
vec![(0, vec![5..15])],
|
||||
20,
|
||||
));
|
||||
let weight3 =
|
||||
IndexResultCache::index_result_cache_weight(&(key3.clone(), file_id), &selection3);
|
||||
assert!(weight3 > 0);
|
||||
assert_eq!(
|
||||
weight3,
|
||||
key3.mem_usage() as u32 + selection3.mem_usage() as u32
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -265,6 +265,8 @@ impl MitoConfig {
|
||||
self.vector_cache_size = mem_cache_size;
|
||||
self.page_cache_size = page_cache_size;
|
||||
self.selector_result_cache_size = mem_cache_size;
|
||||
|
||||
self.index.adjust_buffer_and_cache_size(sys_memory);
|
||||
}
|
||||
|
||||
/// Enable write cache.
|
||||
@@ -315,6 +317,8 @@ pub struct IndexConfig {
|
||||
pub content_cache_size: ReadableSize,
|
||||
/// Page size for inverted index content.
|
||||
pub content_cache_page_size: ReadableSize,
|
||||
/// Cache size for index result. Setting it to 0 to disable the cache.
|
||||
pub result_cache_size: ReadableSize,
|
||||
}
|
||||
|
||||
impl Default for IndexConfig {
|
||||
@@ -327,6 +331,7 @@ impl Default for IndexConfig {
|
||||
metadata_cache_size: ReadableSize::mb(64),
|
||||
content_cache_size: ReadableSize::mb(128),
|
||||
content_cache_page_size: ReadableSize::kb(64),
|
||||
result_cache_size: ReadableSize::mb(128),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -365,6 +370,18 @@ impl IndexConfig {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn adjust_buffer_and_cache_size(&mut self, sys_memory: ReadableSize) {
|
||||
let cache_size = cmp::min(sys_memory / MEM_CACHE_SIZE_FACTOR, ReadableSize::mb(128));
|
||||
self.result_cache_size = cmp::min(self.result_cache_size, cache_size);
|
||||
self.content_cache_size = cmp::min(self.content_cache_size, cache_size);
|
||||
|
||||
let metadata_cache_size = cmp::min(
|
||||
sys_memory / SST_META_CACHE_SIZE_FACTOR,
|
||||
ReadableSize::mb(64),
|
||||
);
|
||||
self.metadata_cache_size = cmp::min(self.metadata_cache_size, metadata_cache_size);
|
||||
}
|
||||
}
|
||||
|
||||
/// Operational mode for certain actions.
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
mod builder;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -33,6 +33,7 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::bloom_filter_index::{
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
|
||||
};
|
||||
use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::error::{
|
||||
ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
|
||||
Result,
|
||||
@@ -71,7 +72,10 @@ pub struct BloomFilterIndexApplier {
|
||||
|
||||
/// Bloom filter predicates.
|
||||
/// For each column, the value will be retained only if it contains __all__ predicates.
|
||||
predicates: HashMap<ColumnId, Vec<InListPredicate>>,
|
||||
predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
|
||||
|
||||
/// Predicate key. Used to identify the predicate and fetch result from cache.
|
||||
predicate_key: PredicateKey,
|
||||
}
|
||||
|
||||
impl BloomFilterIndexApplier {
|
||||
@@ -83,8 +87,9 @@ impl BloomFilterIndexApplier {
|
||||
region_id: RegionId,
|
||||
object_store: ObjectStore,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
predicates: HashMap<ColumnId, Vec<InListPredicate>>,
|
||||
predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
|
||||
) -> Self {
|
||||
let predicates = Arc::new(predicates);
|
||||
Self {
|
||||
region_dir,
|
||||
region_id,
|
||||
@@ -93,6 +98,7 @@ impl BloomFilterIndexApplier {
|
||||
puffin_manager_factory,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_index_cache: None,
|
||||
predicate_key: PredicateKey::new_bloom(predicates.clone()),
|
||||
predicates,
|
||||
}
|
||||
}
|
||||
@@ -150,7 +156,7 @@ impl BloomFilterIndexApplier {
|
||||
.map(|(i, range)| (*i, vec![range.clone()]))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for (column_id, predicates) in &self.predicates {
|
||||
for (column_id, predicates) in self.predicates.iter() {
|
||||
let blob = match self
|
||||
.blob_reader(file_id, *column_id, file_size_hint)
|
||||
.await?
|
||||
@@ -320,6 +326,11 @@ impl BloomFilterIndexApplier {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the predicate key.
|
||||
pub fn predicate_key(&self) -> &PredicateKey {
|
||||
&self.predicate_key
|
||||
}
|
||||
}
|
||||
|
||||
fn is_blob_not_found(err: &Error) -> bool {
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use common_telemetry::warn;
|
||||
use datafusion_common::ScalarValue;
|
||||
@@ -44,7 +44,7 @@ pub struct BloomFilterIndexApplierBuilder<'a> {
|
||||
file_cache: Option<FileCacheRef>,
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
predicates: HashMap<ColumnId, Vec<InListPredicate>>,
|
||||
predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
|
||||
}
|
||||
|
||||
impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
@@ -62,7 +62,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
file_cache: None,
|
||||
puffin_metadata_cache: None,
|
||||
bloom_filter_index_cache: None,
|
||||
predicates: HashMap::default(),
|
||||
predicates: BTreeMap::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,7 +168,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
.entry(column_id)
|
||||
.or_default()
|
||||
.push(InListPredicate {
|
||||
list: HashSet::from([value]),
|
||||
list: BTreeSet::from([value]),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
@@ -196,7 +196,7 @@ impl<'a> BloomFilterIndexApplierBuilder<'a> {
|
||||
.map(|lit| encode_lit(lit, data_type.clone()));
|
||||
|
||||
// Collect successful conversions
|
||||
let mut valid_predicates = HashSet::new();
|
||||
let mut valid_predicates = BTreeSet::new();
|
||||
for predicate in predicates {
|
||||
match predicate {
|
||||
Ok(p) => {
|
||||
@@ -323,7 +323,7 @@ mod tests {
|
||||
ConcreteDataType::string_datatype(),
|
||||
)
|
||||
.unwrap();
|
||||
assert_eq!(column_predicates[0].list, HashSet::from([expected]));
|
||||
assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
|
||||
}
|
||||
|
||||
fn int64_lit(i: i64) -> Expr {
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, BTreeSet, HashSet};
|
||||
use std::iter;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
@@ -34,6 +34,7 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::bloom_filter_index::{
|
||||
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
|
||||
};
|
||||
use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::error::{
|
||||
ApplyBloomFilterIndexSnafu, ApplyFulltextIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu,
|
||||
PuffinReadBlobSnafu, Result,
|
||||
@@ -52,13 +53,16 @@ pub mod builder;
|
||||
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
|
||||
pub struct FulltextIndexApplier {
|
||||
/// Requests to be applied.
|
||||
requests: HashMap<ColumnId, FulltextRequest>,
|
||||
requests: Arc<BTreeMap<ColumnId, FulltextRequest>>,
|
||||
|
||||
/// The source of the index.
|
||||
index_source: IndexSource,
|
||||
|
||||
/// Cache for bloom filter index.
|
||||
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
|
||||
|
||||
/// Predicate key. Used to identify the predicate and fetch result from cache.
|
||||
predicate_key: PredicateKey,
|
||||
}
|
||||
|
||||
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
|
||||
@@ -69,12 +73,14 @@ impl FulltextIndexApplier {
|
||||
region_dir: String,
|
||||
region_id: RegionId,
|
||||
store: ObjectStore,
|
||||
requests: HashMap<ColumnId, FulltextRequest>,
|
||||
requests: BTreeMap<ColumnId, FulltextRequest>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
) -> Self {
|
||||
let requests = Arc::new(requests);
|
||||
let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
|
||||
|
||||
Self {
|
||||
predicate_key: PredicateKey::new_fulltext(requests.clone()),
|
||||
requests,
|
||||
index_source,
|
||||
bloom_filter_index_cache: None,
|
||||
@@ -105,6 +111,11 @@ impl FulltextIndexApplier {
|
||||
self.bloom_filter_index_cache = bloom_filter_index_cache;
|
||||
self
|
||||
}
|
||||
|
||||
/// Returns the predicate key.
|
||||
pub fn predicate_key(&self) -> &PredicateKey {
|
||||
&self.predicate_key
|
||||
}
|
||||
}
|
||||
|
||||
impl FulltextIndexApplier {
|
||||
@@ -120,7 +131,7 @@ impl FulltextIndexApplier {
|
||||
.start_timer();
|
||||
|
||||
let mut row_ids: Option<BTreeSet<RowId>> = None;
|
||||
for (column_id, request) in &self.requests {
|
||||
for (column_id, request) in self.requests.iter() {
|
||||
if request.queries.is_empty() && request.terms.is_empty() {
|
||||
continue;
|
||||
}
|
||||
@@ -233,7 +244,7 @@ impl FulltextIndexApplier {
|
||||
let (input, mut output) = Self::init_coarse_output(row_groups);
|
||||
let mut applied = false;
|
||||
|
||||
for (column_id, request) in &self.requests {
|
||||
for (column_id, request) in self.requests.iter() {
|
||||
if request.terms.is_empty() {
|
||||
// only apply terms
|
||||
continue;
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
@@ -31,7 +31,7 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
/// A request for fulltext index.
|
||||
///
|
||||
/// It contains all the queries and terms for a column.
|
||||
#[derive(Default, Debug)]
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct FulltextRequest {
|
||||
pub queries: Vec<FulltextQuery>,
|
||||
pub terms: Vec<FulltextTerm>,
|
||||
@@ -65,14 +65,14 @@ impl FulltextRequest {
|
||||
/// A query to be matched in fulltext index.
|
||||
///
|
||||
/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct FulltextQuery(pub String);
|
||||
|
||||
/// A term to be matched in fulltext index.
|
||||
///
|
||||
/// `term` is the term to be matched, e.g. "foo" in `SELECT * FROM t WHERE matches_term(text, "foo")`.
|
||||
/// `col_lowered` indicates whether the column is lowercased, e.g. `col_lowered = true` when `matches_term(lower(text), "foo")`.
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct FulltextTerm {
|
||||
pub col_lowered: bool,
|
||||
pub term: String,
|
||||
@@ -137,7 +137,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
|
||||
|
||||
/// Builds `SstIndexApplier` from the given expressions.
|
||||
pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
|
||||
let mut requests = HashMap::new();
|
||||
let mut requests = BTreeMap::new();
|
||||
for expr in exprs {
|
||||
Self::extract_requests(expr, self.metadata, &mut requests);
|
||||
}
|
||||
@@ -164,7 +164,7 @@ impl<'a> FulltextIndexApplierBuilder<'a> {
|
||||
fn extract_requests(
|
||||
expr: &Expr,
|
||||
metadata: &'a RegionMetadata,
|
||||
requests: &mut HashMap<ColumnId, FulltextRequest>,
|
||||
requests: &mut BTreeMap<ColumnId, FulltextRequest>,
|
||||
) {
|
||||
match expr {
|
||||
Expr::BinaryExpr(BinaryExpr {
|
||||
@@ -526,7 +526,7 @@ mod tests {
|
||||
func: matches_func(),
|
||||
});
|
||||
|
||||
let mut requests = HashMap::new();
|
||||
let mut requests = BTreeMap::new();
|
||||
FulltextIndexApplierBuilder::extract_requests(&matches_expr, &metadata, &mut requests);
|
||||
|
||||
assert_eq!(requests.len(), 1);
|
||||
@@ -565,7 +565,7 @@ mod tests {
|
||||
right: Box::new(matches_term_expr),
|
||||
});
|
||||
|
||||
let mut requests = HashMap::new();
|
||||
let mut requests = BTreeMap::new();
|
||||
FulltextIndexApplierBuilder::extract_requests(&binary_expr, &metadata, &mut requests);
|
||||
|
||||
assert_eq!(requests.len(), 1);
|
||||
|
||||
@@ -356,7 +356,7 @@ impl AltFulltextCreator {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::BTreeSet;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::SemanticType;
|
||||
@@ -573,7 +573,7 @@ mod tests {
|
||||
let object_store = object_store.clone();
|
||||
let factory = factory.clone();
|
||||
|
||||
let mut requests: HashMap<ColumnId, FulltextRequest> = HashMap::new();
|
||||
let mut requests: BTreeMap<ColumnId, FulltextRequest> = BTreeMap::new();
|
||||
|
||||
// Add queries
|
||||
for (column_id, query) in queries {
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
pub mod builder;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::range_read::RangeReader;
|
||||
@@ -22,15 +23,17 @@ use index::inverted_index::format::reader::InvertedIndexBlobReader;
|
||||
use index::inverted_index::search::index_apply::{
|
||||
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
|
||||
};
|
||||
use index::inverted_index::search::predicate::Predicate;
|
||||
use object_store::ObjectStore;
|
||||
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
|
||||
use puffin::puffin_manager::{PuffinManager, PuffinReader};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
use store_api::storage::{ColumnId, RegionId};
|
||||
|
||||
use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::inverted_index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
|
||||
use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::error::{
|
||||
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
|
||||
};
|
||||
@@ -67,6 +70,9 @@ pub(crate) struct InvertedIndexApplier {
|
||||
|
||||
/// Puffin metadata cache.
|
||||
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
|
||||
|
||||
/// Predicate key. Used to identify the predicate and fetch result from cache.
|
||||
predicate_key: PredicateKey,
|
||||
}
|
||||
|
||||
pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
|
||||
@@ -79,6 +85,7 @@ impl InvertedIndexApplier {
|
||||
store: ObjectStore,
|
||||
index_applier: Box<dyn IndexApplier>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
predicates: BTreeMap<ColumnId, Vec<Predicate>>,
|
||||
) -> Self {
|
||||
INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64);
|
||||
|
||||
@@ -91,6 +98,7 @@ impl InvertedIndexApplier {
|
||||
puffin_manager_factory,
|
||||
inverted_index_cache: None,
|
||||
puffin_metadata_cache: None,
|
||||
predicate_key: PredicateKey::new_inverted(Arc::new(predicates)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,6 +226,11 @@ impl InvertedIndexApplier {
|
||||
.await
|
||||
.context(PuffinBuildReaderSnafu)
|
||||
}
|
||||
|
||||
/// Returns the predicate key.
|
||||
pub fn predicate_key(&self) -> &PredicateKey {
|
||||
&self.predicate_key
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for InvertedIndexApplier {
|
||||
@@ -276,6 +289,7 @@ mod tests {
|
||||
object_store,
|
||||
Box::new(mock_index_applier),
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let output = sst_index_applier.apply(file_id, None).await.unwrap();
|
||||
assert_eq!(
|
||||
@@ -323,6 +337,7 @@ mod tests {
|
||||
object_store,
|
||||
Box::new(mock_index_applier),
|
||||
puffin_manager_factory,
|
||||
Default::default(),
|
||||
);
|
||||
let res = sst_index_applier.apply(file_id, None).await;
|
||||
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
|
||||
|
||||
@@ -18,7 +18,7 @@ mod eq_list;
|
||||
mod in_list;
|
||||
mod regex_match;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
|
||||
use common_telemetry::warn;
|
||||
use datafusion_common::ScalarValue;
|
||||
@@ -59,7 +59,7 @@ pub(crate) struct InvertedIndexApplierBuilder<'a> {
|
||||
indexed_column_ids: HashSet<ColumnId>,
|
||||
|
||||
/// Stores predicates during traversal on the Expr tree.
|
||||
output: HashMap<ColumnId, Vec<Predicate>>,
|
||||
output: BTreeMap<ColumnId, Vec<Predicate>>,
|
||||
|
||||
/// The puffin manager factory.
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
@@ -85,7 +85,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
object_store,
|
||||
metadata,
|
||||
indexed_column_ids,
|
||||
output: HashMap::default(),
|
||||
output: BTreeMap::default(),
|
||||
puffin_manager_factory,
|
||||
file_cache: None,
|
||||
inverted_index_cache: None,
|
||||
@@ -130,8 +130,8 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
|
||||
let predicates = self
|
||||
.output
|
||||
.into_iter()
|
||||
.map(|(column_id, predicates)| (column_id.to_string(), predicates))
|
||||
.iter()
|
||||
.map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
|
||||
.collect();
|
||||
let applier = PredicatesIndexApplier::try_from(predicates);
|
||||
|
||||
@@ -142,6 +142,7 @@ impl<'a> InvertedIndexApplierBuilder<'a> {
|
||||
self.object_store,
|
||||
Box::new(applier.context(BuildIndexApplierSnafu)?),
|
||||
self.puffin_manager_factory,
|
||||
self.output,
|
||||
)
|
||||
.with_file_cache(self.file_cache)
|
||||
.with_puffin_metadata_cache(self.puffin_metadata_cache)
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
@@ -36,7 +36,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
};
|
||||
|
||||
let predicate = Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]),
|
||||
list: BTreeSet::from_iter([Self::encode_lit(lit, data_type)?]),
|
||||
});
|
||||
self.add_predicate(column_id, predicate);
|
||||
Ok(())
|
||||
@@ -64,7 +64,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
};
|
||||
|
||||
let bytes = Self::encode_lit(lit, data_type.clone())?;
|
||||
let mut inlist = HashSet::from_iter([bytes]);
|
||||
let mut inlist = BTreeSet::from_iter([bytes]);
|
||||
|
||||
if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
|
||||
let predicate = Predicate::InList(InListPredicate { list: inlist });
|
||||
@@ -82,7 +82,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
column_name: &str,
|
||||
data_type: &ConcreteDataType,
|
||||
expr: &DfExpr,
|
||||
inlist: &mut HashSet<Bytes>,
|
||||
inlist: &mut BTreeSet<Bytes>,
|
||||
) -> Result<bool> {
|
||||
let DfExpr::BinaryExpr(BinaryExpr {
|
||||
left,
|
||||
@@ -122,6 +122,8 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::sst::index::inverted_index::applier::builder::tests::{
|
||||
@@ -154,13 +156,13 @@ mod tests {
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("foo")])
|
||||
list: BTreeSet::from_iter([encoded_string("foo")])
|
||||
})
|
||||
);
|
||||
assert_eq!(
|
||||
predicates[1],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("bar")])
|
||||
list: BTreeSet::from_iter([encoded_string("bar")])
|
||||
})
|
||||
);
|
||||
}
|
||||
@@ -187,7 +189,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("abc")])
|
||||
list: BTreeSet::from_iter([encoded_string("abc")])
|
||||
})
|
||||
);
|
||||
}
|
||||
@@ -275,7 +277,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([
|
||||
list: BTreeSet::from_iter([
|
||||
encoded_string("abc"),
|
||||
encoded_string("foo"),
|
||||
encoded_string("bar"),
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::collections::BTreeSet;
|
||||
|
||||
use datafusion_expr::expr::InList;
|
||||
use index::inverted_index::search::predicate::{InListPredicate, Predicate};
|
||||
@@ -34,7 +34,7 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
};
|
||||
|
||||
let mut predicate = InListPredicate {
|
||||
list: HashSet::with_capacity(inlist.list.len()),
|
||||
list: BTreeSet::new(),
|
||||
};
|
||||
for lit in &inlist.list {
|
||||
let Some(lit) = Self::nonnull_lit(lit) else {
|
||||
@@ -53,6 +53,8 @@ impl InvertedIndexApplierBuilder<'_> {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use super::*;
|
||||
use crate::error::Error;
|
||||
use crate::sst::index::inverted_index::applier::builder::tests::{
|
||||
@@ -86,7 +88,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")])
|
||||
list: BTreeSet::from_iter([encoded_string("foo"), encoded_string("bar")])
|
||||
})
|
||||
);
|
||||
}
|
||||
@@ -140,7 +142,7 @@ mod tests {
|
||||
assert_eq!(
|
||||
predicates[0],
|
||||
Predicate::InList(InListPredicate {
|
||||
list: HashSet::from_iter([encoded_string("foo"), encoded_string("bar")])
|
||||
list: BTreeSet::from_iter([encoded_string("foo"), encoded_string("bar")])
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
|
||||
use store_api::storage::ColumnId;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::cache::index::result_cache::PredicateKey;
|
||||
use crate::cache::CacheStrategy;
|
||||
use crate::error::{
|
||||
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
|
||||
@@ -48,7 +49,7 @@ use crate::metrics::{
|
||||
use crate::read::prune::{PruneReader, Source};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::row_converter::build_primary_key_codec;
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::file::{FileHandle, FileId};
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierRef;
|
||||
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
|
||||
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
|
||||
@@ -60,6 +61,31 @@ use crate::sst::parquet::row_selection::RowGroupSelection;
|
||||
use crate::sst::parquet::stats::RowGroupPruningStats;
|
||||
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
|
||||
|
||||
const INDEX_TYPE_FULLTEXT: &str = "fulltext";
|
||||
const INDEX_TYPE_INVERTED: &str = "inverted";
|
||||
const INDEX_TYPE_BLOOM: &str = "bloom filter";
|
||||
|
||||
macro_rules! handle_index_error {
|
||||
($err:expr, $file_handle:expr, $index_type:expr) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply {} index, region_id: {}, file_id: {}, err: {:?}",
|
||||
$index_type,
|
||||
$file_handle.region_id(),
|
||||
$file_handle.file_id(),
|
||||
$err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
$err; "Failed to apply {} index, region_id: {}, file_id: {}",
|
||||
$index_type,
|
||||
$file_handle.region_id(),
|
||||
$file_handle.file_id()
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// Parquet SST reader builder.
|
||||
pub struct ParquetReaderBuilder {
|
||||
/// SST directory.
|
||||
@@ -346,34 +372,39 @@ impl ParquetReaderBuilder {
|
||||
|
||||
let mut output = RowGroupSelection::new(row_group_size, num_rows as _);
|
||||
|
||||
self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
|
||||
self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
|
||||
if output.is_empty() {
|
||||
return output;
|
||||
}
|
||||
|
||||
let fulltext_filtered = self
|
||||
.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
|
||||
.await;
|
||||
if output.is_empty() {
|
||||
return output;
|
||||
}
|
||||
|
||||
let inverted_filtered = self
|
||||
.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics)
|
||||
self.prune_row_groups_by_inverted_index(row_group_size, &mut output, metrics)
|
||||
.await;
|
||||
if output.is_empty() {
|
||||
return output;
|
||||
}
|
||||
|
||||
if !inverted_filtered {
|
||||
self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
|
||||
if output.is_empty() {
|
||||
return output;
|
||||
}
|
||||
}
|
||||
|
||||
self.prune_row_groups_by_bloom_filter(row_group_size, parquet_meta, &mut output, metrics)
|
||||
.await;
|
||||
if output.is_empty() {
|
||||
return output;
|
||||
}
|
||||
|
||||
self.prune_row_groups_by_fulltext_bloom(row_group_size, parquet_meta, &mut output, metrics)
|
||||
if !fulltext_filtered {
|
||||
self.prune_row_groups_by_fulltext_bloom(
|
||||
row_group_size,
|
||||
parquet_meta,
|
||||
&mut output,
|
||||
metrics,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
output
|
||||
}
|
||||
|
||||
@@ -392,46 +423,42 @@ impl ParquetReaderBuilder {
|
||||
return false;
|
||||
}
|
||||
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = match index_applier
|
||||
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.await
|
||||
{
|
||||
Ok(Some(res)) => res,
|
||||
Ok(None) => {
|
||||
return false;
|
||||
}
|
||||
Err(err) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply full-text index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to apply full-text index, region_id: {}, file_id: {}",
|
||||
self.file_handle.region_id(), self.file_handle.file_id()
|
||||
);
|
||||
}
|
||||
let predicate_key = index_applier.predicate_key();
|
||||
// Fast path: return early if the result is in the cache.
|
||||
if self.index_result_cache_get(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_FULLTEXT,
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Slow path: apply the index from the file.
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = index_applier
|
||||
.apply_fine(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.await;
|
||||
let selection = match apply_res {
|
||||
Ok(Some(res)) => {
|
||||
RowGroupSelection::from_row_ids(res, row_group_size, parquet_meta.num_row_groups())
|
||||
}
|
||||
Ok(None) => return false,
|
||||
Err(err) => {
|
||||
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let selection = RowGroupSelection::from_row_ids(
|
||||
apply_res,
|
||||
row_group_size,
|
||||
parquet_meta.num_row_groups(),
|
||||
self.apply_index_result_and_update_cache(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
selection,
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_FULLTEXT,
|
||||
);
|
||||
let intersection = output.intersect(&selection);
|
||||
|
||||
metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count();
|
||||
metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count();
|
||||
|
||||
*output = intersection;
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
@@ -449,44 +476,158 @@ impl ParquetReaderBuilder {
|
||||
let Some(index_applier) = &self.inverted_index_applier else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if !self.file_handle.meta_ref().inverted_index_available() {
|
||||
return false;
|
||||
}
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_output = match index_applier
|
||||
.apply(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.await
|
||||
{
|
||||
Ok(output) => output,
|
||||
Err(err) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply inverted index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to apply inverted index, region_id: {}, file_id: {}",
|
||||
self.file_handle.region_id(), self.file_handle.file_id()
|
||||
);
|
||||
}
|
||||
|
||||
let predicate_key = index_applier.predicate_key();
|
||||
// Fast path: return early if the result is in the cache.
|
||||
if self.index_result_cache_get(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_INVERTED,
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Slow path: apply the index from the file.
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_res = index_applier
|
||||
.apply(self.file_handle.file_id(), Some(file_size_hint))
|
||||
.await;
|
||||
let selection = match apply_res {
|
||||
Ok(output) => {
|
||||
RowGroupSelection::from_inverted_index_apply_output(row_group_size, output)
|
||||
}
|
||||
Err(err) => {
|
||||
handle_index_error!(err, self.file_handle, INDEX_TYPE_INVERTED);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let selection =
|
||||
RowGroupSelection::from_inverted_index_apply_output(row_group_size, apply_output);
|
||||
let intersection = output.intersect(&selection);
|
||||
self.apply_index_result_and_update_cache(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
selection,
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_INVERTED,
|
||||
);
|
||||
true
|
||||
}
|
||||
|
||||
metrics.rg_inverted_filtered += output.row_group_count() - intersection.row_group_count();
|
||||
metrics.rows_inverted_filtered += output.row_count() - intersection.row_count();
|
||||
async fn prune_row_groups_by_bloom_filter(
|
||||
&self,
|
||||
row_group_size: usize,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
output: &mut RowGroupSelection,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
) -> bool {
|
||||
let Some(index_applier) = &self.bloom_filter_index_applier else {
|
||||
return false;
|
||||
};
|
||||
if !self.file_handle.meta_ref().bloom_filter_index_available() {
|
||||
return false;
|
||||
}
|
||||
|
||||
*output = intersection;
|
||||
let predicate_key = index_applier.predicate_key();
|
||||
// Fast path: return early if the result is in the cache.
|
||||
if self.index_result_cache_get(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_BLOOM,
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Slow path: apply the index from the file.
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let rgs = parquet_meta
|
||||
.row_groups()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
|
||||
let apply_res = index_applier
|
||||
.apply(self.file_handle.file_id(), Some(file_size_hint), rgs)
|
||||
.await;
|
||||
let selection = match apply_res {
|
||||
Ok(apply_output) => RowGroupSelection::from_row_ranges(apply_output, row_group_size),
|
||||
Err(err) => {
|
||||
handle_index_error!(err, self.file_handle, INDEX_TYPE_BLOOM);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
self.apply_index_result_and_update_cache(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
selection,
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_BLOOM,
|
||||
);
|
||||
true
|
||||
}
|
||||
|
||||
async fn prune_row_groups_by_fulltext_bloom(
|
||||
&self,
|
||||
row_group_size: usize,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
output: &mut RowGroupSelection,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
) -> bool {
|
||||
let Some(index_applier) = &self.fulltext_index_applier else {
|
||||
return false;
|
||||
};
|
||||
if !self.file_handle.meta_ref().fulltext_index_available() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let predicate_key = index_applier.predicate_key();
|
||||
// Fast path: return early if the result is in the cache.
|
||||
if self.index_result_cache_get(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_FULLTEXT,
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Slow path: apply the index from the file.
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let rgs = parquet_meta
|
||||
.row_groups()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i)));
|
||||
let apply_res = index_applier
|
||||
.apply_coarse(self.file_handle.file_id(), Some(file_size_hint), rgs)
|
||||
.await;
|
||||
let selection = match apply_res {
|
||||
Ok(Some(apply_output)) => {
|
||||
RowGroupSelection::from_row_ranges(apply_output, row_group_size)
|
||||
}
|
||||
Ok(None) => return false,
|
||||
Err(err) => {
|
||||
handle_index_error!(err, self.file_handle, INDEX_TYPE_FULLTEXT);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
self.apply_index_result_and_update_cache(
|
||||
predicate_key,
|
||||
self.file_handle.file_id(),
|
||||
selection,
|
||||
output,
|
||||
metrics,
|
||||
INDEX_TYPE_FULLTEXT,
|
||||
);
|
||||
true
|
||||
}
|
||||
|
||||
@@ -533,126 +674,57 @@ impl ParquetReaderBuilder {
|
||||
true
|
||||
}
|
||||
|
||||
async fn prune_row_groups_by_bloom_filter(
|
||||
fn index_result_cache_get(
|
||||
&self,
|
||||
row_group_size: usize,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
predicate_key: &PredicateKey,
|
||||
file_id: FileId,
|
||||
output: &mut RowGroupSelection,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
index_type: &str,
|
||||
) -> bool {
|
||||
let Some(index_applier) = &self.bloom_filter_index_applier else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if !self.file_handle.meta_ref().bloom_filter_index_available() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_output = match index_applier
|
||||
.apply(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
parquet_meta
|
||||
.row_groups()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(apply_output) => apply_output,
|
||||
Err(err) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply bloom filter index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to apply bloom filter index, region_id: {}, file_id: {}",
|
||||
self.file_handle.region_id(), self.file_handle.file_id()
|
||||
);
|
||||
}
|
||||
|
||||
return false;
|
||||
if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
|
||||
let result = index_result_cache.get(predicate_key, file_id);
|
||||
if let Some(result) = result {
|
||||
apply_selection_and_update_metrics(output, &result, metrics, index_type);
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size);
|
||||
let intersection = output.intersect(&selection);
|
||||
|
||||
metrics.rg_bloom_filtered += output.row_group_count() - intersection.row_group_count();
|
||||
metrics.rows_bloom_filtered += output.row_count() - intersection.row_count();
|
||||
|
||||
*output = intersection;
|
||||
|
||||
true
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
async fn prune_row_groups_by_fulltext_bloom(
|
||||
fn apply_index_result_and_update_cache(
|
||||
&self,
|
||||
row_group_size: usize,
|
||||
parquet_meta: &ParquetMetaData,
|
||||
predicate_key: &PredicateKey,
|
||||
file_id: FileId,
|
||||
result: RowGroupSelection,
|
||||
output: &mut RowGroupSelection,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
) -> bool {
|
||||
let Some(index_applier) = &self.fulltext_index_applier else {
|
||||
return false;
|
||||
};
|
||||
index_type: &str,
|
||||
) {
|
||||
apply_selection_and_update_metrics(output, &result, metrics, index_type);
|
||||
|
||||
if !self.file_handle.meta_ref().fulltext_index_available() {
|
||||
return false;
|
||||
if let Some(index_result_cache) = &self.cache_strategy.index_result_cache() {
|
||||
index_result_cache.put(predicate_key.clone(), file_id, Arc::new(result));
|
||||
}
|
||||
|
||||
let file_size_hint = self.file_handle.meta_ref().index_file_size();
|
||||
let apply_output = match index_applier
|
||||
.apply_coarse(
|
||||
self.file_handle.file_id(),
|
||||
Some(file_size_hint),
|
||||
parquet_meta
|
||||
.row_groups()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, rg)| (rg.num_rows() as usize, output.contains_row_group(i))),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(apply_output)) => apply_output,
|
||||
Ok(None) => return false,
|
||||
Err(err) => {
|
||||
if cfg!(any(test, feature = "test")) {
|
||||
panic!(
|
||||
"Failed to apply fulltext index, region_id: {}, file_id: {}, err: {:?}",
|
||||
self.file_handle.region_id(),
|
||||
self.file_handle.file_id(),
|
||||
err
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
err; "Failed to apply fulltext index, region_id: {}, file_id: {}",
|
||||
self.file_handle.region_id(), self.file_handle.file_id()
|
||||
);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let selection = RowGroupSelection::from_row_ranges(apply_output, row_group_size);
|
||||
let intersection = output.intersect(&selection);
|
||||
|
||||
metrics.rg_fulltext_filtered += output.row_group_count() - intersection.row_group_count();
|
||||
metrics.rows_fulltext_filtered += output.row_count() - intersection.row_count();
|
||||
|
||||
*output = intersection;
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_selection_and_update_metrics(
|
||||
output: &mut RowGroupSelection,
|
||||
result: &RowGroupSelection,
|
||||
metrics: &mut ReaderFilterMetrics,
|
||||
index_type: &str,
|
||||
) {
|
||||
let intersection = output.intersect(result);
|
||||
|
||||
let row_group_count = output.row_group_count() - intersection.row_group_count();
|
||||
let row_count = output.row_count() - intersection.row_count();
|
||||
|
||||
metrics.update_index_metrics(index_type, row_group_count, row_count);
|
||||
|
||||
*output = intersection;
|
||||
}
|
||||
|
||||
/// Metrics of filtering rows groups and rows.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub(crate) struct ReaderFilterMetrics {
|
||||
@@ -729,6 +801,24 @@ impl ReaderFilterMetrics {
|
||||
.with_label_values(&["bloom_filter_index_filtered"])
|
||||
.inc_by(self.rows_bloom_filtered as u64);
|
||||
}
|
||||
|
||||
fn update_index_metrics(&mut self, index_type: &str, row_group_count: usize, row_count: usize) {
|
||||
match index_type {
|
||||
INDEX_TYPE_FULLTEXT => {
|
||||
self.rg_fulltext_filtered += row_group_count;
|
||||
self.rows_fulltext_filtered += row_count;
|
||||
}
|
||||
INDEX_TYPE_INVERTED => {
|
||||
self.rg_inverted_filtered += row_group_count;
|
||||
self.rows_inverted_filtered += row_count;
|
||||
}
|
||||
INDEX_TYPE_BLOOM => {
|
||||
self.rg_bloom_filtered += row_group_count;
|
||||
self.rows_bloom_filtered += row_count;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parquet reader metrics.
|
||||
|
||||
@@ -26,6 +26,8 @@ pub struct RowGroupSelection {
|
||||
selection_in_rg: BTreeMap<usize, RowSelectionWithCount>,
|
||||
/// Total number of rows in the selection.
|
||||
row_count: usize,
|
||||
/// Total length of the selectors.
|
||||
selector_len: usize,
|
||||
}
|
||||
|
||||
/// A row selection with its count.
|
||||
@@ -35,6 +37,8 @@ struct RowSelectionWithCount {
|
||||
selection: RowSelection,
|
||||
/// Number of rows in the selection.
|
||||
row_count: usize,
|
||||
/// Length of the selectors.
|
||||
selector_len: usize,
|
||||
}
|
||||
|
||||
impl RowGroupSelection {
|
||||
@@ -61,6 +65,7 @@ impl RowGroupSelection {
|
||||
RowSelectionWithCount {
|
||||
selection,
|
||||
row_count: row_group_size,
|
||||
selector_len: 1,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -68,6 +73,7 @@ impl RowGroupSelection {
|
||||
Self {
|
||||
selection_in_rg,
|
||||
row_count: total_row_count,
|
||||
selector_len: row_group_count,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,6 +115,7 @@ impl RowGroupSelection {
|
||||
|
||||
// Step 2: Group ranges by row group ID and create row selections
|
||||
let mut total_row_count = 0;
|
||||
let mut total_selector_len = 0;
|
||||
let selection_in_rg = row_group_ranges
|
||||
.chunk_by(|(row_group_id, _)| *row_group_id)
|
||||
.into_iter()
|
||||
@@ -122,12 +129,15 @@ impl RowGroupSelection {
|
||||
// by the min() operation above
|
||||
let selection = row_selection_from_row_ranges(ranges, row_group_size);
|
||||
let row_count = selection.row_count();
|
||||
let selector_len = selector_len(&selection);
|
||||
total_row_count += row_count;
|
||||
total_selector_len += selector_len;
|
||||
(
|
||||
row_group_id,
|
||||
RowSelectionWithCount {
|
||||
selection,
|
||||
row_count,
|
||||
selector_len,
|
||||
},
|
||||
)
|
||||
})
|
||||
@@ -136,6 +146,7 @@ impl RowGroupSelection {
|
||||
Self {
|
||||
selection_in_rg,
|
||||
row_count: total_row_count,
|
||||
selector_len: total_selector_len,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,18 +172,22 @@ impl RowGroupSelection {
|
||||
|
||||
// Step 2: Create row selections for each row group
|
||||
let mut total_row_count = 0;
|
||||
let mut total_selector_len = 0;
|
||||
let selection_in_rg = row_group_to_row_ids
|
||||
.into_iter()
|
||||
.map(|(row_group_id, row_ids)| {
|
||||
let selection =
|
||||
row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size);
|
||||
let row_count = selection.row_count();
|
||||
let selector_len = selector_len(&selection);
|
||||
total_row_count += row_count;
|
||||
total_selector_len += selector_len;
|
||||
(
|
||||
row_group_id,
|
||||
RowSelectionWithCount {
|
||||
selection,
|
||||
row_count,
|
||||
selector_len,
|
||||
},
|
||||
)
|
||||
})
|
||||
@@ -181,6 +196,7 @@ impl RowGroupSelection {
|
||||
Self {
|
||||
selection_in_rg,
|
||||
row_count: total_row_count,
|
||||
selector_len: total_selector_len,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,17 +217,21 @@ impl RowGroupSelection {
|
||||
row_group_size: usize,
|
||||
) -> Self {
|
||||
let mut total_row_count = 0;
|
||||
let mut total_selector_len = 0;
|
||||
let selection_in_rg = row_ranges
|
||||
.into_iter()
|
||||
.map(|(row_group_id, ranges)| {
|
||||
let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size);
|
||||
let row_count = selection.row_count();
|
||||
let selector_len = selector_len(&selection);
|
||||
total_row_count += row_count;
|
||||
total_selector_len += selector_len;
|
||||
(
|
||||
row_group_id,
|
||||
RowSelectionWithCount {
|
||||
selection,
|
||||
row_count,
|
||||
selector_len,
|
||||
},
|
||||
)
|
||||
})
|
||||
@@ -220,6 +240,7 @@ impl RowGroupSelection {
|
||||
Self {
|
||||
selection_in_rg,
|
||||
row_count: total_row_count,
|
||||
selector_len: total_selector_len,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,6 +283,7 @@ impl RowGroupSelection {
|
||||
pub fn intersect(&self, other: &Self) -> Self {
|
||||
let mut res = BTreeMap::new();
|
||||
let mut total_row_count = 0;
|
||||
let mut total_selector_len = 0;
|
||||
|
||||
for (rg_id, x) in other.selection_in_rg.iter() {
|
||||
let Some(y) = self.selection_in_rg.get(rg_id) else {
|
||||
@@ -269,13 +291,16 @@ impl RowGroupSelection {
|
||||
};
|
||||
let selection = x.selection.intersection(&y.selection);
|
||||
let row_count = selection.row_count();
|
||||
let selector_len = selector_len(&selection);
|
||||
if row_count > 0 {
|
||||
total_row_count += row_count;
|
||||
total_selector_len += selector_len;
|
||||
res.insert(
|
||||
*rg_id,
|
||||
RowSelectionWithCount {
|
||||
selection,
|
||||
row_count,
|
||||
selector_len,
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -284,6 +309,7 @@ impl RowGroupSelection {
|
||||
Self {
|
||||
selection_in_rg: res,
|
||||
row_count: total_row_count,
|
||||
selector_len: total_selector_len,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -304,21 +330,27 @@ impl RowGroupSelection {
|
||||
RowSelectionWithCount {
|
||||
selection,
|
||||
row_count,
|
||||
selector_len,
|
||||
},
|
||||
) = self.selection_in_rg.pop_first()?;
|
||||
|
||||
self.row_count -= row_count;
|
||||
self.selector_len -= selector_len;
|
||||
Some((row_group_id, selection))
|
||||
}
|
||||
|
||||
/// Removes a row group from the selection.
|
||||
pub fn remove_row_group(&mut self, row_group_id: usize) {
|
||||
let Some(RowSelectionWithCount { row_count, .. }) =
|
||||
self.selection_in_rg.remove(&row_group_id)
|
||||
let Some(RowSelectionWithCount {
|
||||
row_count,
|
||||
selector_len,
|
||||
..
|
||||
}) = self.selection_in_rg.remove(&row_group_id)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
self.row_count -= row_count;
|
||||
self.selector_len -= selector_len;
|
||||
}
|
||||
|
||||
/// Returns true if the selection is empty.
|
||||
@@ -337,6 +369,12 @@ impl RowGroupSelection {
|
||||
.iter()
|
||||
.map(|(row_group_id, x)| (row_group_id, &x.selection))
|
||||
}
|
||||
|
||||
/// Returns the memory usage of the selection.
|
||||
pub fn mem_usage(&self) -> usize {
|
||||
self.selector_len * size_of::<RowSelector>()
|
||||
+ self.selection_in_rg.len() * size_of::<RowSelectionWithCount>()
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
|
||||
@@ -420,11 +458,32 @@ fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip
|
||||
selectors.push(new_selector);
|
||||
}
|
||||
|
||||
/// Returns the length of the selectors in the selection.
|
||||
fn selector_len(selection: &RowSelection) -> usize {
|
||||
selection.iter().size_hint().0
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::single_range_in_vec_init)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_selector_len() {
|
||||
let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
|
||||
assert_eq!(selector_len(&selection), 2);
|
||||
|
||||
let selection = RowSelection::from(vec![
|
||||
RowSelector::select(5),
|
||||
RowSelector::skip(5),
|
||||
RowSelector::select(5),
|
||||
]);
|
||||
assert_eq!(selector_len(&selection), 3);
|
||||
|
||||
let selection = RowSelection::from(vec![]);
|
||||
assert_eq!(selector_len(&selection), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_single_contiguous_range() {
|
||||
let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
|
||||
|
||||
@@ -177,6 +177,7 @@ impl WorkerGroup {
|
||||
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
|
||||
.index_content_size(config.index.content_cache_size.as_bytes())
|
||||
.index_content_page_size(config.index.content_cache_page_size.as_bytes())
|
||||
.index_result_cache_size(config.index.result_cache_size.as_bytes())
|
||||
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
|
||||
.write_cache(write_cache)
|
||||
.build(),
|
||||
|
||||
@@ -1187,6 +1187,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
|
||||
"selector_result_cache_size =",
|
||||
"metadata_cache_size =",
|
||||
"content_cache_size =",
|
||||
"result_cache_size =",
|
||||
"name =",
|
||||
"recovery_parallelism =",
|
||||
"max_background_flushes =",
|
||||
|
||||
Reference in New Issue
Block a user