mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 12:22:55 +00:00
perf: Reduce fulltext bloom load time (#6651)
* perf: cached reader do not get page concurrently Otherwise they will all fetch the same pages in parallel Signed-off-by: evenyag <realevenyag@gmail.com> * perf: always disable zstd for bloom Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -89,8 +89,12 @@ impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
|
||||
&mut self,
|
||||
puffin_writer: &mut (impl PuffinWriter + Send),
|
||||
blob_key: &str,
|
||||
put_options: PutOptions,
|
||||
mut put_options: PutOptions,
|
||||
) -> Result<u64> {
|
||||
// Compressing the bloom filter doesn't reduce the size but hurts read performance.
|
||||
// Always disable compression here.
|
||||
put_options.compression = None;
|
||||
|
||||
let creator = self.inner.as_mut().context(AbortedSnafu)?;
|
||||
|
||||
let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
|
||||
|
||||
32
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
32
src/mito2/src/cache/index/bloom_filter_index.rs
vendored
@@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
use api::v1::index::BloomFilterMeta;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::future::try_join_all;
|
||||
use index::bloom_filter::error::Result;
|
||||
use index::bloom_filter::reader::BloomFilterReader;
|
||||
use store_api::storage::ColumnId;
|
||||
@@ -120,21 +119,24 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
|
||||
}
|
||||
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
let fetch = ranges.iter().map(|range| {
|
||||
let mut pages = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
let inner = &self.inner;
|
||||
self.cache.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
)
|
||||
});
|
||||
Ok(try_join_all(fetch)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Bytes::from)
|
||||
.collect::<Vec<_>>())
|
||||
let page = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
(self.file_id, self.column_id, self.tag),
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
)
|
||||
.await?;
|
||||
|
||||
pages.push(Bytes::from(page));
|
||||
}
|
||||
|
||||
Ok(pages)
|
||||
}
|
||||
|
||||
/// Reads the meta information of the bloom filter.
|
||||
|
||||
32
src/mito2/src/cache/index/inverted_index.rs
vendored
32
src/mito2/src/cache/index/inverted_index.rs
vendored
@@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
use api::v1::index::InvertedIndexMetas;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::future::try_join_all;
|
||||
use index::inverted_index::error::Result;
|
||||
use index::inverted_index::format::reader::InvertedIndexReader;
|
||||
use prost::Message;
|
||||
@@ -93,21 +92,24 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
|
||||
}
|
||||
|
||||
async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
|
||||
let fetch = ranges.iter().map(|range| {
|
||||
let mut pages = Vec::with_capacity(ranges.len());
|
||||
for range in ranges {
|
||||
let inner = &self.inner;
|
||||
self.cache.get_or_load(
|
||||
self.file_id,
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
)
|
||||
});
|
||||
Ok(try_join_all(fetch)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Bytes::from)
|
||||
.collect::<Vec<_>>())
|
||||
let page = self
|
||||
.cache
|
||||
.get_or_load(
|
||||
self.file_id,
|
||||
self.blob_size,
|
||||
range.start,
|
||||
(range.end - range.start) as u32,
|
||||
move |ranges| async move { inner.read_vec(&ranges).await },
|
||||
)
|
||||
.await?;
|
||||
|
||||
pages.push(Bytes::from(page));
|
||||
}
|
||||
|
||||
Ok(pages)
|
||||
}
|
||||
|
||||
async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
|
||||
|
||||
Reference in New Issue
Block a user