fix(bloom-filter): skip applying for non-indexed columns (#5246)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-12-30 14:56:58 +08:00
committed by Yingwen
parent 5b42546204
commit a22e8b421c
4 changed files with 67 additions and 31 deletions

View File

@@ -61,7 +61,7 @@ fn bloom_filter_index_content_weight((k, _): &((FileId, ColumnId), PageKey), v:
pub struct CachedBloomFilterIndexBlobReader<R> {
file_id: FileId,
column_id: ColumnId,
file_size: u64,
blob_size: u64,
inner: R,
cache: BloomFilterIndexCacheRef,
}
@@ -71,14 +71,14 @@ impl<R> CachedBloomFilterIndexBlobReader<R> {
pub fn new(
file_id: FileId,
column_id: ColumnId,
file_size: u64,
blob_size: u64,
inner: R,
cache: BloomFilterIndexCacheRef,
) -> Self {
Self {
file_id,
column_id,
file_size,
blob_size,
inner,
cache,
}
@@ -92,7 +92,7 @@ impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBl
self.cache
.get_or_load(
(self.file_id, self.column_id),
self.file_size,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },

View File

@@ -58,17 +58,17 @@ fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
/// Inverted index blob reader with cache.
pub struct CachedInvertedIndexBlobReader<R> {
file_id: FileId,
file_size: u64,
blob_size: u64,
inner: R,
cache: InvertedIndexCacheRef,
}
impl<R> CachedInvertedIndexBlobReader<R> {
/// Creates a new inverted index blob reader with cache.
pub fn new(file_id: FileId, file_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
Self {
file_id,
file_size,
blob_size,
inner,
cache,
}
@@ -82,7 +82,7 @@ impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobRead
self.cache
.get_or_load(
self.file_id,
self.file_size,
self.blob_size,
offset,
size,
move |ranges| async move { inner.read_vec(&ranges).await },

View File

@@ -33,18 +33,18 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, RegionId};
use super::INDEX_BLOB_TYPE;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::bloom_filter_index::{
BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader,
};
use crate::error::{
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, MetadataSnafu,
ApplyBloomFilterIndexSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Error, MetadataSnafu,
PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result,
};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
@@ -118,28 +118,21 @@ impl BloomFilterIndexApplier {
.start_timer();
for (column_id, predicates) in &self.filters {
let mut blob = match self.cached_blob_reader(file_id, *column_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
self.remote_blob_reader(file_id, *column_id, file_size_hint)
.await?
}
let mut blob = match self
.blob_reader(file_id, *column_id, file_size_hint)
.await?
{
Some(blob) => blob,
None => continue,
};
// Create appropriate reader based on whether we have caching enabled
if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let reader = CachedBloomFilterIndexBlobReader::new(
file_id,
*column_id,
file_size,
blob_size,
BloomFilterReaderImpl::new(blob),
bloom_filter_cache.clone(),
);
@@ -157,6 +150,43 @@ impl BloomFilterIndexApplier {
Ok(())
}
/// Creates a blob reader from the cached or remote index file.
///
/// Returus `None` if the column does not have an index.
async fn blob_reader(
&self,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<BlobReader>> {
let reader = match self.cached_blob_reader(file_id, column_id).await {
Ok(Some(puffin_reader)) => puffin_reader,
other => {
if let Err(err) = other {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
return Ok(None);
}
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
let res = self
.remote_blob_reader(file_id, column_id, file_size_hint)
.await;
if let Err(err) = res {
// Blob not found means no index for this column
if is_blob_not_found(&err) {
return Ok(None);
}
return Err(err);
}
res?
}
};
Ok(Some(reader))
}
/// Creates a blob reader from the cached index file
async fn cached_blob_reader(
&self,
@@ -242,6 +272,16 @@ impl BloomFilterIndexApplier {
}
}
fn is_blob_not_found(err: &Error) -> bool {
matches!(
err,
Error::PuffinBuildReader {
source: puffin::error::Error::BlobNotFound { .. },
..
}
)
}
pub struct BloomFilterIndexApplierBuilder<'a> {
region_dir: String,
object_store: ObjectStore,

View File

@@ -137,14 +137,10 @@ impl InvertedIndexApplier {
};
if let Some(index_cache) = &self.inverted_index_cache {
let file_size = if let Some(file_size) = file_size_hint {
file_size
} else {
blob.metadata().await.context(MetadataSnafu)?.content_length
};
let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
let mut index_reader = CachedInvertedIndexBlobReader::new(
file_id,
file_size,
blob_size,
InvertedIndexBlobReader::new(blob),
index_cache.clone(),
);