mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 00:19:58 +00:00
feat: introduce Buffer for non-continuous bytes (#5164)
* feat: introduce Buffer for non-continuous bytes * Update src/mito2/src/cache/index.rs Co-authored-by: Weny Xu <wenymedia@gmail.com> * chore: apply review comments * refactor: use opendal::Buffer --------- Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1904,6 +1904,7 @@ dependencies = [
|
||||
"futures",
|
||||
"paste",
|
||||
"pin-project",
|
||||
"rand",
|
||||
"serde",
|
||||
"snafu 0.8.5",
|
||||
"tokio",
|
||||
|
||||
@@ -17,6 +17,7 @@ common-macro.workspace = true
|
||||
futures.workspace = true
|
||||
paste = "1.0"
|
||||
pin-project.workspace = true
|
||||
rand.workspace = true
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
snafu.workspace = true
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_base::BitVec;
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use snafu::ResultExt;
|
||||
@@ -35,7 +36,7 @@ pub trait InvertedIndexReader: Send {
|
||||
async fn range_read(&mut self, offset: u64, size: u32) -> Result<Vec<u8>>;
|
||||
|
||||
/// Reads the bytes in the given ranges.
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>>;
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>>;
|
||||
|
||||
/// Retrieves metadata of all inverted indices stored within the blob.
|
||||
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>>;
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_base::range_read::RangeReader;
|
||||
use greptime_proto::v1::index::InvertedIndexMetas;
|
||||
use snafu::{ensure, ResultExt};
|
||||
@@ -60,9 +61,8 @@ impl<R: RangeReader> InvertedIndexReader for InvertedIndexBlobReader<R> {
|
||||
Ok(buf.into())
|
||||
}
|
||||
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Vec<u8>>> {
|
||||
let bufs = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
|
||||
Ok(bufs.into_iter().map(|buf| buf.into()).collect())
|
||||
async fn read_vec(&mut self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
self.source.read_vec(ranges).await.context(CommonIoSnafu)
|
||||
}
|
||||
|
||||
async fn metadata(&mut self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
|
||||
99
src/mito2/src/cache/index.rs
vendored
99
src/mito2/src/cache/index.rs
vendored
@@ -17,10 +17,12 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::index::InvertedIndexMetas;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_base::BitVec;
|
||||
use index::inverted_index::error::DecodeFstSnafu;
|
||||
use index::inverted_index::format::reader::InvertedIndexReader;
|
||||
use index::inverted_index::FstMap;
|
||||
use object_store::Buffer;
|
||||
use prost::Message;
|
||||
use snafu::ResultExt;
|
||||
|
||||
@@ -68,15 +70,14 @@ where
|
||||
if keys.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
// TODO: Can be replaced by an uncontinuous structure like opendal::Buffer.
|
||||
let mut data = Vec::with_capacity(keys.len());
|
||||
data.resize(keys.len(), Arc::new(Vec::new()));
|
||||
data.resize(keys.len(), Bytes::new());
|
||||
let mut cache_miss_range = vec![];
|
||||
let mut cache_miss_idx = vec![];
|
||||
let last_index = keys.len() - 1;
|
||||
// TODO: Avoid copy as much as possible.
|
||||
for (i, index) in keys.clone().into_iter().enumerate() {
|
||||
match self.cache.get_index(&index) {
|
||||
for (i, index) in keys.iter().enumerate() {
|
||||
match self.cache.get_index(index) {
|
||||
Some(page) => {
|
||||
CACHE_HIT.with_label_values(&[INDEX_CONTENT_TYPE]).inc();
|
||||
data[i] = page;
|
||||
@@ -97,24 +98,19 @@ where
|
||||
if !cache_miss_range.is_empty() {
|
||||
let pages = self.inner.read_vec(&cache_miss_range).await?;
|
||||
for (i, page) in cache_miss_idx.into_iter().zip(pages.into_iter()) {
|
||||
let page = Arc::new(page);
|
||||
let key = keys[i].clone();
|
||||
data[i] = page.clone();
|
||||
self.cache.put_index(key, page.clone());
|
||||
}
|
||||
}
|
||||
let mut result = Vec::with_capacity(size as usize);
|
||||
data.iter().enumerate().for_each(|(i, page)| {
|
||||
let range = if i == 0 {
|
||||
IndexDataPageKey::calculate_first_page_range(offset, size, self.cache.page_size)
|
||||
} else if i == last_index {
|
||||
IndexDataPageKey::calculate_last_page_range(offset, size, self.cache.page_size)
|
||||
} else {
|
||||
0..self.cache.page_size as usize
|
||||
};
|
||||
result.extend_from_slice(&page[range]);
|
||||
});
|
||||
Ok(result)
|
||||
let buffer = Buffer::from_iter(data.into_iter());
|
||||
Ok(buffer
|
||||
.slice(IndexDataPageKey::calculate_range(
|
||||
offset,
|
||||
size,
|
||||
self.cache.page_size,
|
||||
))
|
||||
.to_vec())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -131,7 +127,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
async fn read_vec(
|
||||
&mut self,
|
||||
ranges: &[Range<u64>],
|
||||
) -> index::inverted_index::error::Result<Vec<Vec<u8>>> {
|
||||
) -> index::inverted_index::error::Result<Vec<Bytes>> {
|
||||
self.inner.read_vec(ranges).await
|
||||
}
|
||||
|
||||
@@ -190,31 +186,19 @@ impl IndexDataPageKey {
|
||||
(end_page + 1 - start_page) as u32
|
||||
}
|
||||
|
||||
/// Computes the byte range in the first page based on the offset and size.
|
||||
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the first page range is 1000..4096.
|
||||
fn calculate_first_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
|
||||
/// Calculates the byte range for data retrieval based on the specified offset and size.
|
||||
///
|
||||
/// This function determines the starting and ending byte positions required for reading data.
|
||||
/// For example, with an offset of 5000 and a size of 5000, using a PAGE_SIZE of 4096,
|
||||
/// the resulting byte range will be 904..5904. This indicates that:
|
||||
/// - The reader will first access fixed-size pages [4096, 8192) and [8192, 12288).
|
||||
/// - To read the range [5000..10000), it only needs to fetch bytes within the range [904, 5904) across two pages.
|
||||
fn calculate_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
|
||||
let start = (offset % page_size) as usize;
|
||||
let end = if size > page_size as u32 - start as u32 {
|
||||
page_size as usize
|
||||
} else {
|
||||
start + size as usize
|
||||
};
|
||||
let end = start + size as usize;
|
||||
start..end
|
||||
}
|
||||
|
||||
/// Computes the byte range in the last page based on the offset and size.
|
||||
/// For example, if offset is 1000 and size is 5000 with PAGE_SIZE of 4096, the last page range is 0..1904.
|
||||
fn calculate_last_page_range(offset: u64, size: u32, page_size: u64) -> Range<usize> {
|
||||
let offset = offset as usize;
|
||||
let size = size as usize;
|
||||
let page_size = page_size as usize;
|
||||
if (offset + size) % page_size == 0 {
|
||||
0..page_size
|
||||
} else {
|
||||
0..((offset + size) % page_size)
|
||||
}
|
||||
}
|
||||
|
||||
/// Generates a vector of IndexKey instances for the pages that a given offset and size span.
|
||||
fn generate_page_keys(file_id: FileId, offset: u64, size: u32, page_size: u64) -> Vec<Self> {
|
||||
let start_page = Self::calculate_page_id(offset, page_size);
|
||||
@@ -234,7 +218,7 @@ pub struct InvertedIndexCache {
|
||||
/// Cache for inverted index metadata
|
||||
index_metadata: moka::sync::Cache<IndexMetadataKey, Arc<InvertedIndexMetas>>,
|
||||
/// Cache for inverted index content.
|
||||
index: moka::sync::Cache<IndexDataPageKey, Arc<Vec<u8>>>,
|
||||
index: moka::sync::Cache<IndexDataPageKey, Bytes>,
|
||||
// Page size for index content.
|
||||
page_size: u64,
|
||||
}
|
||||
@@ -284,11 +268,11 @@ impl InvertedIndexCache {
|
||||
self.index_metadata.insert(key, metadata)
|
||||
}
|
||||
|
||||
pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Arc<Vec<u8>>> {
|
||||
pub fn get_index(&self, key: &IndexDataPageKey) -> Option<Bytes> {
|
||||
self.index.get(key)
|
||||
}
|
||||
|
||||
pub fn put_index(&self, key: IndexDataPageKey, value: Arc<Vec<u8>>) {
|
||||
pub fn put_index(&self, key: IndexDataPageKey, value: Bytes) {
|
||||
CACHE_BYTES
|
||||
.with_label_values(&[INDEX_CONTENT_TYPE])
|
||||
.add(index_content_weight(&key, &value).into());
|
||||
@@ -302,7 +286,7 @@ fn index_metadata_weight(k: &IndexMetadataKey, v: &Arc<InvertedIndexMetas>) -> u
|
||||
}
|
||||
|
||||
/// Calculates weight for index content.
|
||||
fn index_content_weight(k: &IndexDataPageKey, v: &Arc<Vec<u8>>) -> u32 {
|
||||
fn index_content_weight(k: &IndexDataPageKey, v: &Bytes) -> u32 {
|
||||
(k.file_id.as_bytes().len() + v.len()) as u32
|
||||
}
|
||||
|
||||
@@ -331,6 +315,9 @@ mod test {
|
||||
use crate::sst::index::store::InstrumentedStore;
|
||||
use crate::test_util::TestEnv;
|
||||
|
||||
// Repeat times for following little fuzz tests.
|
||||
const FUZZ_REPEAT_TIMES: usize = 100;
|
||||
|
||||
// Fuzz test for index data page key
|
||||
#[test]
|
||||
fn fuzz_index_calculation() {
|
||||
@@ -340,7 +327,7 @@ mod test {
|
||||
rng.fill_bytes(&mut data);
|
||||
let file_id = FileId::random();
|
||||
|
||||
for _ in 0..100 {
|
||||
for _ in 0..FUZZ_REPEAT_TIMES {
|
||||
let offset = rng.gen_range(0..data.len() as u64);
|
||||
let size = rng.gen_range(0..data.len() as u32 - offset as u32);
|
||||
let page_size: usize = rng.gen_range(1..1024);
|
||||
@@ -349,32 +336,24 @@ mod test {
|
||||
IndexDataPageKey::generate_page_keys(file_id, offset, size, page_size as u64);
|
||||
let page_num = indexes.len();
|
||||
let mut read = Vec::with_capacity(size as usize);
|
||||
let last_index = indexes.len() - 1;
|
||||
for (i, key) in indexes.into_iter().enumerate() {
|
||||
for key in indexes.into_iter() {
|
||||
let start = key.page_id as usize * page_size;
|
||||
let page = if start + page_size < data.len() {
|
||||
&data[start..start + page_size]
|
||||
} else {
|
||||
&data[start..]
|
||||
};
|
||||
let range = if i == 0 {
|
||||
// first page range
|
||||
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64)
|
||||
} else if i == last_index {
|
||||
// last page range. when the first page is the last page, the range is not used.
|
||||
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64)
|
||||
} else {
|
||||
0..page_size
|
||||
};
|
||||
read.extend_from_slice(&page[range]);
|
||||
read.extend_from_slice(page);
|
||||
}
|
||||
let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
|
||||
let read =
|
||||
read[IndexDataPageKey::calculate_range(offset, size, page_size as u64)].to_vec();
|
||||
if read != data.get(expected_range).unwrap() {
|
||||
panic!(
|
||||
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nfirst page range: {:?}, last page range: {:?}, page num: {}",
|
||||
"fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
|
||||
offset, size, page_size, read.len(), size as usize,
|
||||
IndexDataPageKey::calculate_first_page_range(offset, size, page_size as u64),
|
||||
IndexDataPageKey::calculate_last_page_range(offset, size, page_size as u64), page_num
|
||||
IndexDataPageKey::calculate_range(offset, size, page_size as u64),
|
||||
page_num
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -519,7 +498,7 @@ mod test {
|
||||
|
||||
// fuzz test
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 0..100 {
|
||||
for _ in 0..FUZZ_REPEAT_TIMES {
|
||||
let offset = rng.gen_range(0..file_size);
|
||||
let size = rng.gen_range(0..file_size as u32 - offset as u32);
|
||||
let expected = cached_reader.range_read(offset, size).await.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user