diff --git a/src/index/src/fulltext_index/create/bloom_filter.rs b/src/index/src/fulltext_index/create/bloom_filter.rs index 4fd2593e8e..2249b2f2ee 100644 --- a/src/index/src/fulltext_index/create/bloom_filter.rs +++ b/src/index/src/fulltext_index/create/bloom_filter.rs @@ -89,8 +89,12 @@ impl FulltextIndexCreator for BloomFilterFulltextIndexCreator { &mut self, puffin_writer: &mut (impl PuffinWriter + Send), blob_key: &str, - put_options: PutOptions, + mut put_options: PutOptions, ) -> Result { + // Compressing the bloom filter doesn't reduce the size but hurts read performance. + // Always disable compression here. + put_options.compression = None; + let creator = self.inner.as_mut().context(AbortedSnafu)?; let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); diff --git a/src/mito2/src/cache/index/bloom_filter_index.rs b/src/mito2/src/cache/index/bloom_filter_index.rs index 61df853573..ef8f8aecf2 100644 --- a/src/mito2/src/cache/index/bloom_filter_index.rs +++ b/src/mito2/src/cache/index/bloom_filter_index.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use api::v1::index::BloomFilterMeta; use async_trait::async_trait; use bytes::Bytes; -use futures::future::try_join_all; use index::bloom_filter::error::Result; use index::bloom_filter::reader::BloomFilterReader; use store_api::storage::ColumnId; @@ -120,21 +119,24 @@ impl BloomFilterReader for CachedBloomFilterIndexBl } async fn read_vec(&self, ranges: &[Range]) -> Result> { - let fetch = ranges.iter().map(|range| { + let mut pages = Vec::with_capacity(ranges.len()); + for range in ranges { let inner = &self.inner; - self.cache.get_or_load( - (self.file_id, self.column_id, self.tag), - self.blob_size, - range.start, - (range.end - range.start) as u32, - move |ranges| async move { inner.read_vec(&ranges).await }, - ) - }); - Ok(try_join_all(fetch) - .await? - .into_iter() - .map(Bytes::from) - .collect::>()) + let page = self + .cache + .get_or_load( + (self.file_id, self.column_id, self.tag), + self.blob_size, + range.start, + (range.end - range.start) as u32, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await?; + + pages.push(Bytes::from(page)); + } + + Ok(pages) } /// Reads the meta information of the bloom filter. diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index 10a9f96307..869aa699da 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use api::v1::index::InvertedIndexMetas; use async_trait::async_trait; use bytes::Bytes; -use futures::future::try_join_all; use index::inverted_index::error::Result; use index::inverted_index::format::reader::InvertedIndexReader; use prost::Message; @@ -93,21 +92,24 @@ impl InvertedIndexReader for CachedInvertedIndexBlobRead } async fn read_vec(&self, ranges: &[Range]) -> Result> { - let fetch = ranges.iter().map(|range| { + let mut pages = Vec::with_capacity(ranges.len()); + for range in ranges { let inner = &self.inner; - self.cache.get_or_load( - self.file_id, - self.blob_size, - range.start, - (range.end - range.start) as u32, - move |ranges| async move { inner.read_vec(&ranges).await }, - ) - }); - Ok(try_join_all(fetch) - .await? - .into_iter() - .map(Bytes::from) - .collect::>()) + let page = self + .cache + .get_or_load( + self.file_id, + self.blob_size, + range.start, + (range.end - range.start) as u32, + move |ranges| async move { inner.read_vec(&ranges).await }, + ) + .await?; + + pages.push(Bytes::from(page)); + } + + Ok(pages) } async fn metadata(&self) -> Result> {