refactor: cache inverted index with fixed-size page (#5114)

* feat: cache inverted index by page instead of file

* fix: add unit test and fix bugs

* chore: typo

* chore: ci

* fix: math

* chore: apply review comments

* chore: renames

* test: add unit test for index key calculation

* refactor: use ReadableSize

* feat: add config for inverted index page size

* chore: update config file

* refactor: handle multiple range read and fix some related bugs

* fix: add config

* test: turn to a fs reader to match behaviors of object store
This commit is contained in:
Yohan Wal
2024-12-13 15:34:24 +08:00
committed by Yingwen
parent 8b1484c064
commit fdccf4ff84
18 changed files with 434 additions and 69 deletions

1
Cargo.lock generated
View File

@@ -6643,6 +6643,7 @@ dependencies = [
"async-channel 1.9.0",
"async-stream",
"async-trait",
"bytemuck",
"bytes",
"common-base",
"common-config",

View File

@@ -150,6 +150,7 @@
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
@@ -475,6 +476,9 @@
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.inverted_index.metadata_cache_size` | String | `64MiB` | Cache size for inverted index metadata. |
| `region_engine.mito.inverted_index.content_cache_size` | String | `128MiB` | Cache size for inverted index content. |
| `region_engine.mito.inverted_index.content_cache_page_size` | String | `8MiB` | Page size for inverted index content cache. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |

View File

@@ -543,6 +543,15 @@ mem_threshold_on_create = "auto"
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""
## Cache size for inverted index metadata.
metadata_cache_size = "64MiB"
## Cache size for inverted index content.
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

View File

@@ -588,6 +588,9 @@ metadata_cache_size = "64MiB"
## Cache size for inverted index content.
content_cache_size = "128MiB"
## Page size for inverted index content cache.
content_cache_page_size = "8MiB"
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]

View File

@@ -205,9 +205,7 @@ impl RangeReader for Vec<u8> {
})
}
async fn read(&mut self, mut range: Range<u64>) -> io::Result<Bytes> {
range.end = range.end.min(self.len() as u64);
async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
let bytes = Bytes::copy_from_slice(&self[range.start as usize..range.end as usize]);
Ok(bytes)
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
@@ -30,23 +31,23 @@ mod footer;
#[mockall::automock]
#[async_trait]
pub trait InvertedIndexReader: Send {
/// Reads all data to dest.
async fn read_all(&mut self, dest: &mut Vec<u8>) -> Result<usize>;
/// Seeks to given offset and reads data with exact size as provided.
async fn seek_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;
/// Reads the bytes in the given ranges.
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>>;
/// Retrieves metadata of all inverted indices stored within the blob.
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>>;
/// Retrieves the finite state transducer (FST) map from the given offset and size.
async fn fst(&mut self, offset: u64, size: u32) -> Result<FstMap> {
let fst_data = self.seek_read(offset, size).await?;
let fst_data = self.range_read(offset, size).await?;
FstMap::new(fst_data).context(DecodeFstSnafu)
}
/// Retrieves the bitmap from the given offset and size.
async fn bitmap(&mut self, offset: u64, size: u32) -> Result<BitVec> {
self.seek_read(offset, size).await.map(BitVec::from_vec)
self.range_read(offset, size).await.map(BitVec::from_vec)
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use async_trait::async_trait;
@@ -50,16 +51,7 @@ impl<R> InvertedIndexBlobReader<R> {
#[async_trait]
impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
async fn read_all(&mut self, dest: &mut Vec<u8>) -> Result<usize> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
self.source
.read_into(0..metadata.content_length, dest)
.await
.context(CommonIoSnafu)?;
Ok(metadata.content_length as usize)
}
async fn seek_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>> {
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>> {
let buf = self
.source
.read(offset..offset + size as u64)
@@ -68,6 +60,11 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
Ok(buf.into())
}
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>> {
let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
Ok(bufs.into_iter().map(|buf| buf.into()).collect())
}
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
let blob_size = metadata.content_length;

View File

@@ -17,6 +17,7 @@ aquamarine.workspace = true
async-channel = "1.9"
async-stream.workspace = true
async-trait = "0.1"
bytemuck.workspace = true
bytes.workspace = true
common-base.workspace = true
common-config.workspace = true

View File

@@ -244,6 +244,7 @@ pub struct CacheManagerBuilder {
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
index_content_page_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
@@ -286,6 +287,12 @@ impl CacheManagerBuilder {
self
}
/// Sets page size for index content.
pub fn index_content_page_size(mut self, bytes: u64) -> Self {
self.index_content_page_size = bytes;
self
}
/// Sets cache size for puffin metadata.
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
@@ -352,8 +359,11 @@ impl CacheManagerBuilder {
})
.build()
});
let inverted_index_cache =
InvertedIndexCache::new(self.index_metadata_size, self.index_content_size);
let inverted_index_cache = InvertedIndexCache::new(
self.index_metadata_size,
self.index_content_size,
self.index_content_page_size,
);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use api::v1::index::InvertedIndexMetas;
@@ -34,14 +35,16 @@ const INDEX_CONTENT_TYPE: &str = "index_content";
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
file_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
}
impl<R> CachedInvertedIndexBlobReader<R> {
pub fn new(file_id: FileId, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
Self {
file_id,
file_size,
inner,
cache,
}
@@ -59,43 +62,77 @@ where
offset: u64,
size: u32,
) -> index::inverted_index::error::Result<Vec<u8>> {
let range = offset as usize..(offset + size as u64) as usize;
if let Some(cached) = self.cache.get_index(IndexKey {
file_id: self.file_id,
}) {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
Ok(cached[range].to_vec())
} else {
let mut all_data = Vec::with_capacity(1024 * 1024);
self.inner.read_all(&mut all_data).await?;
let result = all_data[range].to_vec();
self.cache.put_index(
IndexKey {
file_id: self.file_id,
},
Arc::new(all_data),
);
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
Ok(result)
let keys =
IndexDataPageKey::generate_page_keys(self.file_id, offset, size, self.cache.page_size);
// Size is 0, return empty data.
if keys.is_empty() {
return Ok(Vec::new());
}
// TODO: Can be replaced by an uncontinuous structure like opendal::Buffer.
let mut data = Vec::with_capacity(keys.len());
data.resize(keys.len(), Arc::new(Vec::new()));
let mut cache_miss_range = vec![];
let mut cache_miss_idx = vec![];
let last_index = keys.len() - 1;
// TODO: Avoid copy as much as possible.
for (i, index) in keys.clone().into_iter().enumerate() {
match self.cache.get_index(&index) {
Some(page) => {
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
data[i] = page;
}
None => {
CACHE_MISS.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
let base_offset = index.page_id * self.cache.page_size;
let pruned_size = if i == last_index {
prune_size(&keys, self.file_size, self.cache.page_size)
} else {
self.cache.page_size
};
cache_miss_range.push(base_offset..base_offset + pruned_size);
cache_miss_idx.push(i);
}
}
}
if !cache_miss_range.is_empty() {
let pages = self.inner.read_vec(&cache_miss_range).await?;
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
let page = Arc::new(page);
let key = keys[i].clone();
data[i] = page.clone();
self.cache.put_index(key, page.clone());
}
}
let mut result = Vec::with_capacity(size as usize);
data.iter().enumerate().for_each(|(i, page)| {
let range = if i == 0 {
IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size)
} else if i == last_index {
IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size)
} else {
0..self.cache.page_size as usize
};
result.extend_from_slice(&page[range]);
});
Ok(result)
}
}
#[async_trait]
impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
async fn read_all(
&mut self,
dest: &mut Vec<u8>,
) -> index::inverted_index::error::Result<usize> {
self.inner.read_all(dest).await
}
async fn seek_read(
async fn range_read(
&mut self,
offset: u64,
size: u32,
) -> index::inverted_index::error::Result<Vec<u8>> {
self.inner.seek_read(offset, size).await
self.inner.range_read(offset, size).await
}
async fn read_vec(
&mut self,
ranges: &[Range<u64>],
) -> index::inverted_index::error::Result<Vec<Vec<u8>>> {
self.inner.read_vec(ranges).await
}
async fn metadata(&mut self) -> index::inverted_index::error::Result<Arc<InvertedIndexMetas>> {
@@ -130,22 +167,81 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IndexKey {
pub struct IndexMetadataKey {
file_id: FileId,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IndexDataPageKey {
file_id: FileId,
page_id: u64,
}
impl IndexDataPageKey {
/// Converts an offset to a page ID based on the page size.
fn calculate_page_id(offset: u64, page_size: u64) -> u64 {
offset / page_size
}
/// Calculates the total number of pages that a given size spans, starting from a specific offset.
fn calculate_page_count(offset: u64, size: u32, page_size: u64) -> u32 {
let start_page = Self::calculate_page_id(offset, page_size);
let end_page = Self::calculate_page_id(offset + (size as u64) - 1, page_size);
(end_page + 1 - start_page) as u32
}
/// Computes the byte range in the first page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096.
fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let start = (offset % page_size) as usize;
let end = if size > page_size as u32 - start as u32 {
page_size as usize
} else {
start + size as usize
};
start..end
}
/// Computes the byte range in the last page based on the offset and size.
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904.
fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
let offset = offset as usize;
let size = size as usize;
let page_size = page_size as usize;
if (offset + size) % page_size == 0 {
0..page_size
} else {
0..((offset + size) % page_size)
}
}
/// Generates a vector of IndexKey instances for the pages that a given offset and size span.
fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec<Self> {
let start_page = Self::calculate_page_id(offset, page_size);
let total_pages = Self::calculate_page_count(offset, size, page_size);
(0..total_pages)
.map(|i| Self {
file_id,
page_id: start_page + i as u64,
})
.collect()
}
}
pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
pub struct InvertedIndexCache {
/// Cache for inverted index metadata
index_metadata: moka::sync::Cache<IndexKey, Arc<InvertedIndexMetas>>,
index_metadata: moka::sync::Cache<IndexMetadataKey, Arc<InvertedIndexMetas>>,
/// Cache for inverted index content.
index: moka::sync::Cache<IndexKey, Arc<Vec<u8>>>,
index: moka::sync::Cache<IndexDataPageKey, Arc<Vec<u8>>>,
// Page size for index content.
page_size: u64,
}
impl InvertedIndexCache {
/// Creates `InvertedIndexCache` with provided `index_metadata_cap` and `index_content_cap`.
pub fn new(index_metadata_cap: u64, index_content_cap: u64) -> Self {
pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
common_telemetry::debug!("Building InvertedIndexCache with metadata size: {index_metadata_cap}, content size: {index_content_cap}");
let index_metadata = moka::sync::CacheBuilder::new(index_metadata_cap)
.name("inverted_index_metadata")
@@ -170,29 +266,29 @@ impl InvertedIndexCache {
Self {
index_metadata,
index: index_cache,
page_size,
}
}
}
impl InvertedIndexCache {
pub fn get_index_metadata(&self, file_id: FileId) -> Option<Arc<InvertedIndexMetas>> {
self.index_metadata.get(&IndexKey { file_id })
self.index_metadata.get(&IndexMetadataKey { file_id })
}
pub fn put_index_metadata(&self, file_id: FileId, metadata: Arc<InvertedIndexMetas>) {
let key = IndexKey { file_id };
let key = IndexMetadataKey { file_id };
CACHE_BYTES
.with_label_values(&[INDEX_METADATA_TYPE])
.add(index_metadata_weight(&key, &metadata).into());
self.index_metadata.insert(key, metadata)
}
// todo(hl): align index file content to pages with size like 4096 bytes.
pub fn get_index(&self, key: IndexKey) -> Option<Arc<Vec<u8>>> {
self.index.get(&key)
pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Arc<Vec<u8>>> {
self.index.get(key)
}
pub fn put_index(&self, key: IndexKey, value: Arc<Vec<u8>>) {
pub fn put_index(&self, key: IndexDataPageKey, value: Arc<Vec<u8>>) {
CACHE_BYTES
.with_label_values(&[INDEX_CONTENT_TYPE])
.add(index_content_weight(&key, &value).into());
@@ -201,11 +297,234 @@ impl InvertedIndexCache {
}
/// Calculates weight for index metadata.
fn index_metadata_weight(k: &IndexKey, v: &Arc<InvertedIndexMetas>) -> u32 {
fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc<InvertedIndexMetas>) -> u32 {
(k.file_id.as_bytes().len() + v.encoded_len()) as u32
}
/// Calculates weight for index content.
fn index_content_weight(k: &IndexKey, v: &Arc<Vec<u8>>) -> u32 {
fn index_content_weight(k: &IndexDataPageKey, v: &Arc<Vec<u8>>) -> u32 {
(k.file_id.as_bytes().len() + v.len()) as u32
}
/// Prunes the size of the last page based on the indexes.
/// We have following cases:
/// 1. The rest file size is less than the page size, read to the end of the file.
/// 2. Otherwise, read the page size.
fn prune_size(indexes: &[IndexDataPageKey], file_size: u64, page_size: u64) -> u64 {
let last_page_start = indexes.last().map(|i| i.page_id * page_size).unwrap_or(0);
page_size.min(file_size - last_page_start)
}
#[cfg(test)]
mod test {
use std::num::NonZeroUsize;
use common_base::BitVec;
use futures::stream;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
use index::inverted_index::Bytes;
use prometheus::register_int_counter_vec;
use rand::{Rng, RngCore};
use super::*;
use crate::sst::index::store::InstrumentedStore;
use crate::test_util::TestEnv;
// Fuzz test for index data page key
#[test]
fn fuzz_index_calculation() {
// randomly generate a large u8 array
let mut rng = rand::thread_rng();
let mut data = vec![0u8; 1024 * 1024];
rng.fill_bytes(&mut data);
let file_id = FileId::random();
for _ in 0..100 {
let offset = rng.gen_range(0..data.len() as u64);
let size = rng.gen_range(0..data.len() as u32 - offset as u32);
let page_size: usize = rng.gen_range(1..1024);
let indexes =
IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64);
let page_num = indexes.len();
let mut read = Vec::with_capacity(size as usize);
let last_index = indexes.len() - 1;
for (i, key) in indexes.into_iter().enumerate() {
let start = key.page_id as usize * page_size;
let page = if start + page_size < data.len() {
&data[start..start + page_size]
} else {
&data[start..]
};
let range = if i == 0 {
// first page range
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64)
} else if i == last_index {
// last page range. when the first page is the last page, the range is not used.
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64)
} else {
0..page_size
};
read.extend_from_slice(&page[range]);
}
let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
if read != data.get(expected_range).unwrap() {
panic!(
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}",
offset, size, page_size, read.len(), size as usize,
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64),
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num
);
}
}
}
fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)
}
async fn create_inverted_index_blob() -> Vec<u8> {
let mut blob = Vec::new();
let mut writer = InvertedIndexBlobWriter::new(&mut blob);
writer
.add_index(
"tag0".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
Box::new(stream::iter(vec![
Ok((Bytes::from("a"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("b"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("c"), BitVec::from_slice(&[0b0000_0001]))),
])),
)
.await
.unwrap();
writer
.add_index(
"tag1".to_string(),
BitVec::from_slice(&[0b0000_0001, 0b0000_0000]),
Box::new(stream::iter(vec![
Ok((Bytes::from("x"), BitVec::from_slice(&[0b0000_0001]))),
Ok((Bytes::from("y"), BitVec::from_slice(&[0b0010_0000]))),
Ok((Bytes::from("z"), BitVec::from_slice(&[0b0000_0001]))),
])),
)
.await
.unwrap();
writer
.finish(8, NonZeroUsize::new(1).unwrap())
.await
.unwrap();
blob
}
#[tokio::test]
async fn test_inverted_index_cache() {
let blob = create_inverted_index_blob().await;
// Init a test range reader in local fs.
let mut env = TestEnv::new();
let file_size = blob.len() as u64;
let store = env.init_object_store_manager();
let temp_path = "data";
store.write(temp_path, blob).await.unwrap();
let store = InstrumentedStore::new(store);
let metric =
register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
let counter = metric.with_label_values(&["test"]);
let range_reader = store
.range_reader("data", &counter, &counter)
.await
.unwrap();
let reader = InvertedIndexBlobReader::new(range_reader);
let mut cached_reader = CachedInvertedIndexBlobReader::new(
FileId::random(),
file_size,
reader,
Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
);
let metadata = cached_reader.metadata().await.unwrap();
assert_eq!(metadata.total_row_count, 8);
assert_eq!(metadata.segment_row_count, 1);
assert_eq!(metadata.metas.len(), 2);
// tag0
let tag0 = metadata.metas.get("tag0").unwrap();
let stats0 = tag0.stats.as_ref().unwrap();
assert_eq!(stats0.distinct_count, 3);
assert_eq!(stats0.null_count, 1);
assert_eq!(stats0.min_value, Bytes::from("a"));
assert_eq!(stats0.max_value, Bytes::from("c"));
let fst0 = cached_reader
.fst(
tag0.base_offset + tag0.relative_fst_offset as u64,
tag0.fst_size,
)
.await
.unwrap();
assert_eq!(fst0.len(), 3);
let [offset, size] = unpack(fst0.get(b"a").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
let [offset, size] = unpack(fst0.get(b"b").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
let [offset, size] = unpack(fst0.get(b"c").unwrap());
let bitmap = cached_reader
.bitmap(tag0.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
// tag1
let tag1 = metadata.metas.get("tag1").unwrap();
let stats1 = tag1.stats.as_ref().unwrap();
assert_eq!(stats1.distinct_count, 3);
assert_eq!(stats1.null_count, 1);
assert_eq!(stats1.min_value, Bytes::from("x"));
assert_eq!(stats1.max_value, Bytes::from("z"));
let fst1 = cached_reader
.fst(
tag1.base_offset + tag1.relative_fst_offset as u64,
tag1.fst_size,
)
.await
.unwrap();
assert_eq!(fst1.len(), 3);
let [offset, size] = unpack(fst1.get(b"x").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
let [offset, size] = unpack(fst1.get(b"y").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0010_0000]));
let [offset, size] = unpack(fst1.get(b"z").unwrap());
let bitmap = cached_reader
.bitmap(tag1.base_offset + offset as u64, size)
.await
.unwrap();
assert_eq!(bitmap, BitVec::from_slice(&[0b0000_0001]));
// fuzz test
let mut rng = rand::thread_rng();
for _ in 0..100 {
let offset = rng.gen_range(0..file_size);
let size = rng.gen_range(0..file_size as u32 - offset as u32);
let expected = cached_reader.range_read(offset, size).await.unwrap();
let read = cached_reader.get_or_load(offset, size).await.unwrap();
assert_eq!(read, expected);
}
}
}

View File

@@ -416,6 +416,8 @@ pub struct InvertedIndexConfig {
pub metadata_cache_size: ReadableSize,
/// Cache size for inverted index content. Setting it to 0 to disable the cache.
pub content_cache_size: ReadableSize,
/// Page size for inverted index content.
pub content_cache_page_size: ReadableSize,
}
impl InvertedIndexConfig {
@@ -441,6 +443,7 @@ impl Default for InvertedIndexConfig {
intermediate_path: String::new(),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),
content_cache_page_size: ReadableSize::mb(8),
};
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {

View File

@@ -893,6 +893,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to read file metadata"))]
Metadata {
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -965,7 +973,8 @@ impl ErrorExt for Error {
| CreateDir { .. }
| ReadDataPart { .. }
| CorruptedEntry { .. }
| BuildEntry { .. } => StatusCode::Internal,
| BuildEntry { .. }
| Metadata { .. } => StatusCode::Internal,
OpenRegion { source, .. } => source.status_code(),

View File

@@ -18,7 +18,7 @@ pub(crate) mod intermediate;
pub(crate) mod inverted_index;
pub(crate) mod puffin_manager;
mod statistics;
mod store;
pub(crate) mod store;
use std::num::NonZeroUsize;

View File

@@ -16,6 +16,7 @@ pub mod builder;
use std::sync::Arc;
use common_base::range_read::RangeReader;
use common_telemetry::warn;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
@@ -29,7 +30,9 @@ use store_api::storage::RegionId;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::error::{ApplyInvertedIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::error::{
ApplyInvertedIndexSnafu, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::FileId;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
@@ -123,7 +126,7 @@ impl InvertedIndexApplier {
index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty,
};
let blob = match self.cached_blob_reader(file_id).await {
let mut blob = match self.cached_blob_reader(file_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
@@ -134,8 +137,14 @@ impl InvertedIndexApplier {
};
if let Some(index_cache) = &self.inverted_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id,
file_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
);

View File

@@ -448,7 +448,7 @@ mod tests {
move |expr| {
let _d = &d;
let cache = Arc::new(InvertedIndexCache::new(10, 10));
let cache = Arc::new(InvertedIndexCache::new(10, 10, 100));
let puffin_metadata_cache = Arc::new(PuffinMetadataCache::new(10, &CACHE_BYTES));
let applier = InvertedIndexApplierBuilder::new(
region_dir.clone(),

View File

@@ -35,8 +35,7 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_meta::cache::{new_schema_cache, new_table_info_cache, new_table_schema_cache};
use common_meta::key::schema_name::{SchemaName, SchemaNameValue};
use common_meta::cache::{new_schema_cache, new_table_schema_cache};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -49,7 +48,7 @@ use datatypes::schema::ColumnSchema;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use moka::future::{Cache, CacheBuilder};
use moka::future::CacheBuilder;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::ObjectStore;

View File

@@ -170,6 +170,7 @@ impl WorkerGroup {
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.inverted_index.metadata_cache_size.as_bytes())
.index_content_size(config.inverted_index.content_cache_size.as_bytes())
.index_content_page_size(config.inverted_index.content_cache_page_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),

View File

@@ -946,6 +946,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
content_cache_page_size = "8MiB"
[region_engine.mito.fulltext_index]
create_on_flush = "auto"