mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat(inverted_index): inverted index cache (#4309)
* feat/inverted-index-cache: Update dependencies and add caching for inverted index reader - Updated `atomic` to 0.6.0 and `uuid` to 1.9.1 in `Cargo.lock`. - Added `moka` and `uuid` dependencies in `Cargo.toml`. - Introduced `seek_read` method in `InvertedIndexBlobReader` for common seek and read operations. - Added `cache.rs` module to implement caching for inverted index reader using `moka`. - Updated `async-compression` to 0.4.11 in `puffin/Cargo.toml`. * feat/inverted-index-cache: Refactor InvertedIndexReader and Add Index Cache Support - Refactored `InvertedIndexReader` to include `seek_read` method and default implementations for `fst` and `bitmap`. - Implemented `seek_read` in `InvertedIndexBlobReader` and `CachedInvertedIndexBlobReader`. - Introduced `InvertedIndexCache` in `CacheManager` and `SstIndexApplier`. - Updated `SstIndexApplierBuilder` to accept and utilize `InvertedIndexCache`. - Added `From<FileId> for Uuid` implementation. * feat/inverted-index-cache: Update Cargo.toml and refactor SstIndexApplier - Moved `uuid.workspace` entry in Cargo.toml for better organization. * feat/inverted-index-cache: Refactor InvertedIndexCache to use type alias for Arc - Replaced `Arc<InvertedIndexCache>` with `InvertedIndexCacheRef` type alias. * feat/inverted-index-cache: Add Prometheus metrics and caching improvements for inverted index - Introduced `prometheus` and `puffin` dependencies for metrics. * feat/inverted-index-cache: Refactor InvertedIndexReader and Cache handling - Simplified `InvertedIndexReader` trait by removing seek-related comments. * feat/inverted-index-cache: Add configurable cache sizes for inverted index metadata and content - Introduced `index_metadata_size` and `index_content_size` in `CacheManagerBuilder`. * feat/inverted-index-cache: Refactor and optimize inverted index caching - Removed `metrics.rs` and integrated cache metrics into `index.rs`. * feat/inverted-index-cache: Remove unused dependencies from Cargo.lock and Cargo.toml - Removed `moka`, `prometheus`, and `puffin` dependencies from both Cargo.lock and Cargo.toml. * feat/inverted-index-cache: Replace Uuid with FileId in CachedInvertedIndexBlobReader - Updated `file_id` type from `Uuid` to `FileId` in `CachedInvertedIndexBlobReader` and related methods. * feat/inverted-index-cache: Refactor cache configuration for inverted index - Moved `inverted_index_metadata_cache_size` and `inverted_index_cache_size` from `MitoConfig` to `InvertedIndexConfig`. * feat/inverted-index-cache: Remove unnecessary conversion of `file_id` in `SstIndexApplier` - Simplified the initialization of `CachedInvertedIndexBlobReader` by removing the redundant `into()` conversion for `file_id`.
This commit is contained in:
16
Cargo.lock
generated
16
Cargo.lock
generated
@@ -738,9 +738,12 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "atomic"
|
||||
version = "0.5.3"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
|
||||
checksum = "8d818003e740b63afc82337e3160717f4f63078720a810b7b903e70a5d1d2994"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
@@ -4993,6 +4996,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -12896,9 +12900,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.8.0"
|
||||
version = "1.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a183cf7feeba97b4dd1c0d46788634f6221d87fa961b305bed08c851829efcc0"
|
||||
checksum = "5de17fd2f7da591098415cff336e12965a28061ddace43b59cb3c430179c9439"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"getrandom",
|
||||
@@ -12909,9 +12913,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "uuid-macro-internal"
|
||||
version = "1.8.0"
|
||||
version = "1.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9881bea7cbe687e36c9ab3b778c36cd0487402e270304e8b1296d5085303c1a2"
|
||||
checksum = "a3ff64d5cde1e2cb5268bdb497235b6bd255ba8244f910dbc3574e59593de68c"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
||||
@@ -29,6 +29,7 @@ snafu.workspace = true
|
||||
tantivy = { version = "0.22", features = ["zstd-compression"] }
|
||||
tantivy-jieba = "0.11.0"
|
||||
tokio.workspace = true
|
||||
uuid.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-test-util.workspace = true
|
||||
|
||||
@@ -12,27 +12,41 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
mod blob;
|
||||
mod footer;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::inverted_index::error::Result;
|
||||
use crate::inverted_index::error::{DecodeFstSnafu, Result};
|
||||
pub use crate::inverted_index::format::reader::blob::InvertedIndexBlobReader;
|
||||
use crate::inverted_index::FstMap;
|
||||
|
||||
mod blob;
|
||||
mod footer;
|
||||
|
||||
/// InvertedIndexReader defines an asynchronous reader of inverted index data
|
||||
#[mockall::automock]
|
||||
#[async_trait]
|
||||
pub trait InvertedIndexReader: Send {
|
||||
/// Retrieve metadata of all inverted indices stored within the blob.
|
||||
async fn metadata(&mut self) -> Result<InvertedIndexMetas>;
|
||||
/// Reads all data to dest.
|
||||
async fn read_all(&mut self, dest: &mut Vec<u8>) -> Result<usize>;
|
||||
|
||||
/// Retrieve the finite state transducer (FST) map from the given offset and size.
|
||||
async fn fst(&mut self, offset: u64, size: u32) -> Result<FstMap>;
|
||||
/// Seeks to given offset and reads data with exact size as provided.
|
||||
async fn seek_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;
|
||||
|
||||
/// Retrieve the bitmap from the given offset and size.
|
||||
async fn bitmap(&mut self, offset: u64, size: u32) -> Result<BitVec>;
|
||||
/// Retrieves metadata of all inverted indices stored within the blob.
|
||||
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>>;
|
||||
|
||||
/// Retrieves the finite state transducer (FST) map from the given offset and size.
|
||||
async fn fst(&mut self, offset: u64, size: u32) -> Result<FstMap> {
|
||||
let fst_data = self.seek_read(offset, size).await?;
|
||||
FstMap::new(fst_data).context(DecodeFstSnafu)
|
||||
}
|
||||
|
||||
/// Retrieves the bitmap from the given offset and size.
|
||||
async fn bitmap(&mut self, offset: u64, size: u32) -> Result<BitVec> {
|
||||
self.seek_read(offset, size).await.map(BitVec::from_vec)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,18 +13,16 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::io::SeekFrom;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::inverted_index::error::{
|
||||
DecodeFstSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu,
|
||||
};
|
||||
use crate::inverted_index::error::{ReadSnafu, Result, SeekSnafu, UnexpectedBlobSizeSnafu};
|
||||
use crate::inverted_index::format::reader::footer::InvertedIndeFooterReader;
|
||||
use crate::inverted_index::format::reader::{FstMap, InvertedIndexReader};
|
||||
use crate::inverted_index::format::reader::InvertedIndexReader;
|
||||
use crate::inverted_index::format::MIN_BLOB_SIZE;
|
||||
|
||||
/// Inverted index blob reader, implements [`InvertedIndexReader`]
|
||||
@@ -52,35 +50,31 @@ impl<R> InvertedIndexBlobReader<R> {
|
||||
|
||||
#[async_trait]
|
||||
impl<R: AsyncRead + AsyncSeek + Unpin + Send> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
async fn metadata(&mut self) -> Result<InvertedIndexMetas> {
|
||||
async fn read_all(&mut self, dest: &mut Vec<u8>) -> Result<usize> {
|
||||
self.source
|
||||
.seek(SeekFrom::Start(0))
|
||||
.await
|
||||
.context(SeekSnafu)?;
|
||||
self.source.read_to_end(dest).await.context(ReadSnafu)
|
||||
}
|
||||
|
||||
async fn seek_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>> {
|
||||
self.source
|
||||
.seek(SeekFrom::Start(offset))
|
||||
.await
|
||||
.context(SeekSnafu)?;
|
||||
let mut buf = vec![0u8; size as usize];
|
||||
self.source.read(&mut buf).await.context(ReadSnafu)?;
|
||||
Ok(buf)
|
||||
}
|
||||
|
||||
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
let end = SeekFrom::End(0);
|
||||
let blob_size = self.source.seek(end).await.context(SeekSnafu)?;
|
||||
Self::validate_blob_size(blob_size)?;
|
||||
|
||||
let mut footer_reader = InvertedIndeFooterReader::new(&mut self.source, blob_size);
|
||||
footer_reader.metadata().await
|
||||
}
|
||||
|
||||
async fn fst(&mut self, offset: u64, size: u32) -> Result<FstMap> {
|
||||
self.source
|
||||
.seek(SeekFrom::Start(offset))
|
||||
.await
|
||||
.context(SeekSnafu)?;
|
||||
let mut buf = vec![0u8; size as usize];
|
||||
self.source.read_exact(&mut buf).await.context(ReadSnafu)?;
|
||||
|
||||
FstMap::new(buf).context(DecodeFstSnafu)
|
||||
}
|
||||
|
||||
async fn bitmap(&mut self, offset: u64, size: u32) -> Result<BitVec> {
|
||||
self.source
|
||||
.seek(SeekFrom::Start(offset))
|
||||
.await
|
||||
.context(SeekSnafu)?;
|
||||
let mut buf = vec![0u8; size as usize];
|
||||
self.source.read_exact(&mut buf).await.context(ReadSnafu)?;
|
||||
|
||||
Ok(BitVec::from_vec(buf))
|
||||
footer_reader.metadata().await.map(Arc::new)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -148,6 +148,8 @@ impl TryFrom<Vec<(String, Vec<Predicate>)>> for PredicatesIndexApplier {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_base::bit_vec::prelude::*;
|
||||
use greptime_proto::v1::index::InvertedIndexMeta;
|
||||
|
||||
@@ -161,7 +163,7 @@ mod tests {
|
||||
s.to_owned()
|
||||
}
|
||||
|
||||
fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> InvertedIndexMetas {
|
||||
fn mock_metas(tags: impl IntoIterator<Item = (&'static str, u32)>) -> Arc<InvertedIndexMetas> {
|
||||
let mut metas = InvertedIndexMetas {
|
||||
total_row_count: 8,
|
||||
segment_row_count: 1,
|
||||
@@ -175,7 +177,7 @@ mod tests {
|
||||
};
|
||||
metas.metas.insert(s(tag), meta);
|
||||
}
|
||||
metas
|
||||
Arc::new(metas)
|
||||
}
|
||||
|
||||
fn key_fst_applier(value: &'static str) -> Box<dyn FstApplier> {
|
||||
@@ -300,11 +302,11 @@ mod tests {
|
||||
async fn test_index_applier_with_empty_index() {
|
||||
let mut mock_reader = MockInvertedIndexReader::new();
|
||||
mock_reader.expect_metadata().returning(move || {
|
||||
Ok(InvertedIndexMetas {
|
||||
Ok(Arc::new(InvertedIndexMetas {
|
||||
total_row_count: 0, // No rows
|
||||
segment_row_count: 1,
|
||||
..Default::default()
|
||||
})
|
||||
}))
|
||||
});
|
||||
|
||||
let mut mock_fst_applier = MockFstApplier::new();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
mod cache_size;
|
||||
|
||||
pub(crate) mod file_cache;
|
||||
pub(crate) mod index;
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_util;
|
||||
pub(crate) mod write_cache;
|
||||
@@ -33,6 +34,7 @@ use store_api::storage::{ConcreteDataType, RegionId};
|
||||
|
||||
use crate::cache::cache_size::parquet_meta_size;
|
||||
use crate::cache::file_cache::{FileType, IndexKey};
|
||||
use crate::cache::index::{InvertedIndexCache, InvertedIndexCacheRef};
|
||||
use crate::cache::write_cache::WriteCacheRef;
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::FileId;
|
||||
@@ -59,6 +61,8 @@ pub struct CacheManager {
|
||||
page_cache: Option<PageCache>,
|
||||
/// A Cache for writing files to object stores.
|
||||
write_cache: Option<WriteCacheRef>,
|
||||
/// Cache for inverted index.
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
}
|
||||
|
||||
pub type CacheManagerRef = Arc<CacheManager>;
|
||||
@@ -167,6 +171,10 @@ impl CacheManager {
|
||||
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
|
||||
self.write_cache.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
|
||||
self.index_cache.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder to construct a [CacheManager].
|
||||
@@ -175,6 +183,8 @@ pub struct CacheManagerBuilder {
|
||||
sst_meta_cache_size: u64,
|
||||
vector_cache_size: u64,
|
||||
page_cache_size: u64,
|
||||
index_metadata_size: u64,
|
||||
index_content_size: u64,
|
||||
write_cache: Option<WriteCacheRef>,
|
||||
}
|
||||
|
||||
@@ -203,6 +213,18 @@ impl CacheManagerBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets cache size for index metadata.
|
||||
pub fn index_metadata_size(mut self, bytes: u64) -> Self {
|
||||
self.index_metadata_size = bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets cache size for index content.
|
||||
pub fn index_content_size(mut self, bytes: u64) -> Self {
|
||||
self.index_content_size = bytes;
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds the [CacheManager].
|
||||
pub fn build(self) -> CacheManager {
|
||||
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
|
||||
@@ -240,11 +262,14 @@ impl CacheManagerBuilder {
|
||||
.build()
|
||||
});
|
||||
|
||||
let inverted_index_cache =
|
||||
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
|
||||
CacheManager {
|
||||
sst_meta_cache,
|
||||
vector_cache,
|
||||
page_cache,
|
||||
write_cache: self.write_cache,
|
||||
index_cache: Some(Arc::new(inverted_index_cache)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
211
src/mito2/src/cache/index.rs
vendored
Normal file
211
src/mito2/src/cache/index.rs
vendored
Normal file
@@ -0,0 +1,211 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::index::InvertedIndexMetas;
|
||||
use async_trait::async_trait;
|
||||
use common_base::BitVec;
|
||||
use index::inverted_index::error::DecodeFstSnafu;
|
||||
use index::inverted_index::format::reader::InvertedIndexReader;
|
||||
use index::inverted_index::FstMap;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
|
||||
use crate::sst::file::FileId;
|
||||
|
||||
/// Metrics for index metadata.
|
||||
const INDEX_METADATA_TYPE: &str = "index_metadata";
|
||||
/// Metrics for index content.
|
||||
const INDEX_CONTENT_TYPE: &str = "index_content";
|
||||
|
||||
/// Inverted index blob reader with cache.
|
||||
pub struct CachedInvertedIndexBlobReader<R> {
|
||||
file_id: FileId,
|
||||
inner: R,
|
||||
cache: InvertedIndexCacheRef,
|
||||
}
|
||||
|
||||
impl<R> CachedInvertedIndexBlobReader<R> {
|
||||
pub fn new(file_id: FileId, inner: R, cache: InvertedIndexCacheRef) -> Self {
|
||||
Self {
|
||||
file_id,
|
||||
inner,
|
||||
cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R> CachedInvertedIndexBlobReader<R>
|
||||
where
|
||||
R: InvertedIndexReader,
|
||||
{
|
||||
/// Gets given range of index data from cache, and loads from source if the file
|
||||
/// is not already cached.
|
||||
async fn get_or_load(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
) -> index::inverted_index::error::Result<Vec<u8>> {
|
||||
let range = offset as usize..(offset + size as u64) as usize;
|
||||
if let Some(cached) = self.cache.get_index(IndexKey {
|
||||
file_id: self.file_id,
|
||||
}) {
|
||||
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
|
||||
Ok(cached[range].to_vec())
|
||||
} else {
|
||||
let mut all_data = Vec::with_capacity(1024 * 1024);
|
||||
self.inner.read_all(&mut all_data).await?;
|
||||
let result = all_data[range].to_vec();
|
||||
self.cache.put_index(
|
||||
IndexKey {
|
||||
file_id: self.file_id,
|
||||
},
|
||||
Arc::new(all_data),
|
||||
);
|
||||
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
|
||||
async fn read_all(
|
||||
&mut self,
|
||||
dest: &mut Vec<u8>,
|
||||
) -> index::inverted_index::error::Result<usize> {
|
||||
self.inner.read_all(dest).await
|
||||
}
|
||||
|
||||
async fn seek_read(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
) -> index::inverted_index::error::Result<Vec<u8>> {
|
||||
self.inner.seek_read(offset, size).await
|
||||
}
|
||||
|
||||
async fn metadata(&mut self) -> index::inverted_index::error::Result<Arc<InvertedIndexMetas>> {
|
||||
if let Some(cached) = self.cache.get_index_metadata(self.file_id) {
|
||||
CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(cached)
|
||||
} else {
|
||||
let meta = self.inner.metadata().await?;
|
||||
self.cache.put_index_metadata(self.file_id, meta.clone());
|
||||
CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
|
||||
Ok(meta)
|
||||
}
|
||||
}
|
||||
|
||||
async fn fst(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
) -> index::inverted_index::error::Result<FstMap> {
|
||||
self.get_or_load(offset, size)
|
||||
.await
|
||||
.and_then(|r| FstMap::new(r).context(DecodeFstSnafu))
|
||||
}
|
||||
|
||||
async fn bitmap(
|
||||
&mut self,
|
||||
offset: u64,
|
||||
size: u32,
|
||||
) -> index::inverted_index::error::Result<BitVec> {
|
||||
self.get_or_load(offset, size).await.map(BitVec::from_vec)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct IndexKey {
|
||||
file_id: FileId,
|
||||
}
|
||||
|
||||
pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
|
||||
|
||||
pub struct InvertedIndexCache {
|
||||
/// Cache for inverted index metadata
|
||||
index_metadata: moka::sync::Cache<IndexKey, Arc<InvertedIndexMetas>>,
|
||||
/// Cache for inverted index content.
|
||||
index: moka::sync::Cache<IndexKey, Arc<Vec<u8>>>,
|
||||
}
|
||||
|
||||
impl InvertedIndexCache {
|
||||
/// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`.
|
||||
pub fn new(index_metadata_cap: u64, index_content_cap: u64) -> Self {
|
||||
common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}");
|
||||
let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
|
||||
.name("inverted_index_metadata")
|
||||
.weigher(index_metadata_weight)
|
||||
.eviction_listener(|k, v, _cause| {
|
||||
let size = index_metadata_weight(&k, &v);
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_METADATA_TYPE])
|
||||
.sub(size.into());
|
||||
})
|
||||
.build();
|
||||
let index_cache = moka::sync::CacheBuilder::new(index_content_cap)
|
||||
.name("inverted_index_content")
|
||||
.weigher(index_content_weight)
|
||||
.eviction_listener(|k, v, _cause| {
|
||||
let size = index_content_weight(&k, &v);
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_CONTENT_TYPE])
|
||||
.sub(size.into());
|
||||
})
|
||||
.build();
|
||||
Self {
|
||||
index_metadata,
|
||||
index: index_cache,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InvertedIndexCache {
|
||||
pub fn get_index_metadata(&self, file_id: FileId) -> Option<Arc<InvertedIndexMetas>> {
|
||||
self.index_metadata.get(&IndexKey { file_id })
|
||||
}
|
||||
|
||||
pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc<InvertedIndexMetas>) {
|
||||
let key = IndexKey { file_id };
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_METADATA_TYPE])
|
||||
.add(index_metadata_weight(&key, &metadata).into());
|
||||
self.index_metadata.insert(key, metadata)
|
||||
}
|
||||
|
||||
// todo(hl): align index file content to pages with size like 4096 bytes.
|
||||
pub fn get_index(&self, key: IndexKey) -> Option<Arc<Vec<u8>>> {
|
||||
self.index.get(&key)
|
||||
}
|
||||
|
||||
pub fn put_index(&self, key: IndexKey, value: Arc<Vec<u8>>) {
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_CONTENT_TYPE])
|
||||
.add(index_content_weight(&key, &value).into());
|
||||
self.index.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculates weight for index metadata.
|
||||
fn index_metadata_weight(k: &IndexKey, v: &Arc<InvertedIndexMetas>) -> u32 {
|
||||
(k.file_id.as_bytes().len() + v.encoded_len()) as u32
|
||||
}
|
||||
|
||||
/// Calculates weight for index content.
|
||||
fn index_content_weight(k: &IndexKey, v: &Arc<Vec<u8>>) -> u32 {
|
||||
(k.file_id.as_bytes().len() + v.len()) as u32
|
||||
}
|
||||
@@ -381,6 +381,11 @@ pub struct InvertedIndexConfig {
|
||||
#[deprecated = "use [IndexConfig::write_buffer_size] instead"]
|
||||
#[serde(skip_serializing)]
|
||||
pub write_buffer_size: ReadableSize,
|
||||
|
||||
/// Cache size for metadata of inverted index. Setting it to 0 to disable the cache.
|
||||
pub metadata_cache_size: ReadableSize,
|
||||
/// Cache size for inverted index content. Setting it to 0 to disable the cache.
|
||||
pub content_cache_size: ReadableSize,
|
||||
}
|
||||
|
||||
impl Default for InvertedIndexConfig {
|
||||
@@ -392,9 +397,10 @@ impl Default for InvertedIndexConfig {
|
||||
apply_on_query: Mode::Auto,
|
||||
mem_threshold_on_create: MemoryThreshold::Auto,
|
||||
compress: true,
|
||||
|
||||
write_buffer_size: ReadableSize::mb(8),
|
||||
intermediate_path: String::new(),
|
||||
metadata_cache_size: ReadableSize::mb(32),
|
||||
content_cache_size: ReadableSize::mb(32),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,10 +330,17 @@ impl ScanRegion {
|
||||
Some(file_cache)
|
||||
}();
|
||||
|
||||
let index_cache = self
|
||||
.cache_manager
|
||||
.as_ref()
|
||||
.and_then(|c| c.index_cache())
|
||||
.cloned();
|
||||
|
||||
SstIndexApplierBuilder::new(
|
||||
self.access_layer.region_dir().to_string(),
|
||||
self.access_layer.object_store().clone(),
|
||||
file_cache,
|
||||
index_cache,
|
||||
self.version.metadata.as_ref(),
|
||||
self.version
|
||||
.options
|
||||
|
||||
@@ -63,6 +63,17 @@ impl FileId {
|
||||
pub fn as_puffin(&self) -> String {
|
||||
format!("{}{}", self, ".puffin")
|
||||
}
|
||||
|
||||
/// Converts [FileId] as byte slice.
|
||||
pub fn as_bytes(&self) -> &[u8] {
|
||||
self.0.as_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FileId> for Uuid {
|
||||
fn from(value: FileId) -> Self {
|
||||
value.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FileId {
|
||||
|
||||
@@ -27,6 +27,7 @@ use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
|
||||
use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
|
||||
use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
|
||||
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
|
||||
use crate::sst::file::FileId;
|
||||
@@ -55,6 +56,9 @@ pub(crate) struct SstIndexApplier {
|
||||
|
||||
/// The puffin manager factory.
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
|
||||
/// In-memory cache for inverted index.
|
||||
inverted_index_cache: Option<InvertedIndexCacheRef>,
|
||||
}
|
||||
|
||||
pub(crate) type SstIndexApplierRef = Arc<SstIndexApplier>;
|
||||
@@ -66,6 +70,7 @@ impl SstIndexApplier {
|
||||
region_id: RegionId,
|
||||
store: ObjectStore,
|
||||
file_cache: Option<FileCacheRef>,
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
index_applier: Box<dyn IndexApplier>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
) -> Self {
|
||||
@@ -78,6 +83,7 @@ impl SstIndexApplier {
|
||||
file_cache,
|
||||
index_applier,
|
||||
puffin_manager_factory,
|
||||
inverted_index_cache: index_cache,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,13 +105,24 @@ impl SstIndexApplier {
|
||||
self.remote_blob_reader(file_id).await?
|
||||
}
|
||||
};
|
||||
let mut blob_reader = InvertedIndexBlobReader::new(blob);
|
||||
let output = self
|
||||
.index_applier
|
||||
.apply(context, &mut blob_reader)
|
||||
.await
|
||||
.context(ApplyIndexSnafu)?;
|
||||
Ok(output)
|
||||
|
||||
if let Some(index_cache) = &self.inverted_index_cache {
|
||||
let mut index_reader = CachedInvertedIndexBlobReader::new(
|
||||
file_id,
|
||||
InvertedIndexBlobReader::new(blob),
|
||||
index_cache.clone(),
|
||||
);
|
||||
self.index_applier
|
||||
.apply(context, &mut index_reader)
|
||||
.await
|
||||
.context(ApplyIndexSnafu)
|
||||
} else {
|
||||
let mut index_reader = InvertedIndexBlobReader::new(blob);
|
||||
self.index_applier
|
||||
.apply(context, &mut index_reader)
|
||||
.await
|
||||
.context(ApplyIndexSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a blob reader from the cached index file.
|
||||
@@ -200,6 +217,7 @@ mod tests {
|
||||
RegionId::new(0, 0),
|
||||
object_store,
|
||||
None,
|
||||
None,
|
||||
Box::new(mock_index_applier),
|
||||
puffin_manager_factory,
|
||||
);
|
||||
@@ -241,6 +259,7 @@ mod tests {
|
||||
RegionId::new(0, 0),
|
||||
object_store,
|
||||
None,
|
||||
None,
|
||||
Box::new(mock_index_applier),
|
||||
puffin_manager_factory,
|
||||
);
|
||||
|
||||
@@ -34,6 +34,7 @@ use store_api::metadata::RegionMetadata;
|
||||
use store_api::storage::ColumnId;
|
||||
|
||||
use crate::cache::file_cache::FileCacheRef;
|
||||
use crate::cache::index::InvertedIndexCacheRef;
|
||||
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
|
||||
use crate::row_converter::SortField;
|
||||
use crate::sst::index::inverted_index::applier::SstIndexApplier;
|
||||
@@ -62,6 +63,9 @@ pub(crate) struct SstIndexApplierBuilder<'a> {
|
||||
|
||||
/// The puffin manager factory.
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
|
||||
/// Cache for inverted index.
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
}
|
||||
|
||||
impl<'a> SstIndexApplierBuilder<'a> {
|
||||
@@ -70,6 +74,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
|
||||
region_dir: String,
|
||||
object_store: ObjectStore,
|
||||
file_cache: Option<FileCacheRef>,
|
||||
index_cache: Option<InvertedIndexCacheRef>,
|
||||
metadata: &'a RegionMetadata,
|
||||
ignore_column_ids: HashSet<ColumnId>,
|
||||
puffin_manager_factory: PuffinManagerFactory,
|
||||
@@ -81,6 +86,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
|
||||
metadata,
|
||||
ignore_column_ids,
|
||||
output: HashMap::default(),
|
||||
index_cache,
|
||||
puffin_manager_factory,
|
||||
}
|
||||
}
|
||||
@@ -102,11 +108,13 @@ impl<'a> SstIndexApplierBuilder<'a> {
|
||||
.map(|(column_id, predicates)| (column_id.to_string(), predicates))
|
||||
.collect();
|
||||
let applier = PredicatesIndexApplier::try_from(predicates);
|
||||
|
||||
Ok(Some(SstIndexApplier::new(
|
||||
self.region_dir,
|
||||
self.metadata.region_id,
|
||||
self.object_store,
|
||||
self.file_cache,
|
||||
self.index_cache,
|
||||
Box::new(applier.context(BuildIndexApplierSnafu)?),
|
||||
self.puffin_manager_factory,
|
||||
)))
|
||||
@@ -320,6 +328,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
|
||||
@@ -76,6 +76,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -118,6 +119,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -143,6 +145,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -168,6 +171,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -194,6 +198,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
|
||||
@@ -232,6 +232,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -260,6 +261,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -279,6 +281,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -299,6 +302,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
|
||||
@@ -138,6 +138,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -175,6 +176,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -195,6 +197,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -214,6 +217,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -233,6 +237,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -291,6 +296,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -328,6 +334,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
|
||||
@@ -69,6 +69,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -101,6 +102,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -125,6 +127,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -149,6 +152,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -175,6 +179,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
|
||||
@@ -63,6 +63,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -91,6 +92,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -112,6 +114,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
@@ -133,6 +136,7 @@ mod tests {
|
||||
"test".to_string(),
|
||||
test_object_store(),
|
||||
None,
|
||||
None,
|
||||
&metadata,
|
||||
HashSet::default(),
|
||||
facotry,
|
||||
|
||||
@@ -304,6 +304,7 @@ mod tests {
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use super::*;
|
||||
use crate::cache::index::InvertedIndexCache;
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
|
||||
use crate::sst::index::puffin_manager::PuffinManagerFactory;
|
||||
@@ -414,10 +415,12 @@ mod tests {
|
||||
|
||||
move |expr| {
|
||||
let _d = &d;
|
||||
let cache = Arc::new(InvertedIndexCache::new(10, 10));
|
||||
let applier = SstIndexApplierBuilder::new(
|
||||
region_dir.clone(),
|
||||
object_store.clone(),
|
||||
None,
|
||||
Some(cache),
|
||||
®ion_metadata,
|
||||
Default::default(),
|
||||
factory.clone(),
|
||||
|
||||
@@ -158,6 +158,8 @@ impl WorkerGroup {
|
||||
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
|
||||
.vector_cache_size(config.vector_cache_size.as_bytes())
|
||||
.page_cache_size(config.page_cache_size.as_bytes())
|
||||
.index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes())
|
||||
.index_content_size(config.inverted_index.content_cache_size.as_bytes())
|
||||
.write_cache(write_cache)
|
||||
.build(),
|
||||
);
|
||||
|
||||
@@ -839,6 +839,8 @@ create_on_compaction = "auto"
|
||||
apply_on_query = "auto"
|
||||
mem_threshold_on_create = "auto"
|
||||
compress = true
|
||||
metadata_cache_size = "32MiB"
|
||||
content_cache_size = "32MiB"
|
||||
|
||||
[region_engine.mito.fulltext_index]
|
||||
create_on_flush = "auto"
|
||||
|
||||
Reference in New Issue
Block a user