refactor: abstract index source from fulltext index applier (#5845)

* feat: add term as fulltext index request

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: abstract index source from fulltext index applier

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-04-09 12:27:41 +08:00
committed by GitHub
parent 95d0c650ec
commit 6c66ec3ffc
4 changed files with 225 additions and 104 deletions

View File

@@ -19,7 +19,7 @@ use common_telemetry::warn;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader};
use puffin::puffin_manager::{BlobWithMetadata, DirGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};
@@ -30,33 +30,20 @@ use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::builder::FulltextRequest;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::puffin_manager::{
PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader,
};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
pub mod builder;
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
pub struct FulltextIndexApplier {
/// The root directory of the region.
region_dir: String,
/// The region ID.
region_id: RegionId,
/// Requests to be applied.
requests: HashMap<ColumnId, FulltextRequest>,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
/// Store responsible for accessing index files.
store: ObjectStore,
/// File cache to be used by the `FulltextIndexApplier`.
file_cache: Option<FileCacheRef>,
/// The puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
/// The source of the index.
index_source: IndexSource,
}
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
@@ -70,20 +57,17 @@ impl FulltextIndexApplier {
requests: HashMap<ColumnId, FulltextRequest>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store);
Self {
region_dir,
region_id,
store,
requests,
puffin_manager_factory,
file_cache: None,
puffin_metadata_cache: None,
index_source,
}
}
/// Sets the file cache.
pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
self.file_cache = file_cache;
self.index_source.set_file_cache(file_cache);
self
}
@@ -92,7 +76,8 @@ impl FulltextIndexApplier {
mut self,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
) -> Self {
self.puffin_metadata_cache = puffin_metadata_cache;
self.index_source
.set_puffin_metadata_cache(puffin_metadata_cache);
self
}
@@ -101,44 +86,33 @@ impl FulltextIndexApplier {
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BTreeSet<RowId>> {
) -> Result<Option<BTreeSet<RowId>>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let mut inited = false;
let mut row_ids = BTreeSet::new();
let mut row_ids: Option<BTreeSet<RowId>> = None;
for (column_id, request) in &self.requests {
if request.queries.is_empty() {
continue;
}
'outer: for (column_id, request) in &self.requests {
let dir = self
.index_dir_path(file_id, *column_id, file_size_hint)
.await?;
let path = match &dir {
Some(dir) => dir.path(),
None => {
// Return empty set if the index not found.
return Ok(BTreeSet::new());
}
let Some(result) = self
.apply_one_column(file_size_hint, file_id, *column_id, request)
.await?
else {
continue;
};
let searcher =
TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?;
if let Some(ids) = row_ids.as_mut() {
ids.retain(|id| result.contains(id));
} else {
row_ids = Some(result);
}
for query in &request.queries {
let result = searcher
.search(&query.0)
.await
.context(ApplyFulltextIndexSnafu)?;
if !inited {
row_ids = result;
inited = true;
continue;
}
row_ids.retain(|id| result.contains(id));
if row_ids.is_empty() {
break 'outer;
if let Some(ids) = row_ids.as_ref() {
if ids.is_empty() {
break;
}
}
}
@@ -146,84 +120,218 @@ impl FulltextIndexApplier {
Ok(row_ids)
}
/// Returns `None` if the index not found.
async fn index_dir_path(
async fn apply_one_column(
&self,
file_size_hint: Option<u64>,
file_id: FileId,
column_id: ColumnId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinDir>> {
request: &FulltextRequest,
) -> Result<Option<BTreeSet<RowId>>> {
let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}");
let dir = self
.index_source
.dir(file_id, &blob_key, file_size_hint)
.await?;
// FAST PATH: Try to read the index from the file cache.
if let Some(file_cache) = &self.file_cache {
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
if file_cache.get(index_key).await.is_some() {
match self
.get_index_from_file_cache(file_cache, file_id, file_size_hint, &blob_key)
.await
{
Ok(dir) => return Ok(dir),
Err(err) => {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}
let path = match &dir {
Some(dir) => dir.path(),
None => {
return Ok(None);
}
};
let searcher = TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?;
let mut row_ids: Option<BTreeSet<RowId>> = None;
for query in &request.queries {
let result = searcher
.search(&query.0)
.await
.context(ApplyFulltextIndexSnafu)?;
if let Some(ids) = row_ids.as_mut() {
ids.retain(|id| result.contains(id));
} else {
row_ids = Some(result);
}
if let Some(ids) = row_ids.as_ref() {
if ids.is_empty() {
break;
}
}
}
// SLOW PATH: Try to read the index from the remote file.
self.get_index_from_remote_file(file_id, file_size_hint, &blob_key)
.await
Ok(row_ids)
}
}
/// The source of the index.
struct IndexSource {
region_dir: String,
region_id: RegionId,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
/// Store responsible for accessing remote index files.
remote_store: ObjectStore,
/// Local file cache.
file_cache: Option<FileCacheRef>,
/// The puffin metadata cache.
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
}
impl IndexSource {
fn new(
region_dir: String,
region_id: RegionId,
puffin_manager_factory: PuffinManagerFactory,
remote_store: ObjectStore,
) -> Self {
Self {
region_dir,
region_id,
puffin_manager_factory,
remote_store,
file_cache: None,
puffin_metadata_cache: None,
}
}
async fn get_index_from_file_cache(
fn set_file_cache(&mut self, file_cache: Option<FileCacheRef>) {
self.file_cache = file_cache;
}
fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option<PuffinMetadataCacheRef>) {
self.puffin_metadata_cache = puffin_metadata_cache;
}
/// Returns the blob with the specified key from local cache or remote store.
///
/// Returns `None` if the blob is not found.
#[allow(unused)]
async fn blob(
&self,
file_id: FileId,
key: &str,
file_size_hint: Option<u64>,
) -> Result<Option<BlobWithMetadata<SstPuffinBlob>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
let res = reader.blob(key).await;
match res {
Ok(blob) => Ok(Some(blob)),
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => {
if fallbacked {
Err(err).context(PuffinReadBlobSnafu)
} else {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
let reader = self.build_remote(file_id, file_size_hint).await?;
let res = reader.blob(key).await;
match res {
Ok(blob) => Ok(Some(blob)),
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
}
}
}
/// Returns the directory with the specified key from local cache or remote store.
///
/// Returns `None` if the directory is not found.
async fn dir(
&self,
file_id: FileId,
key: &str,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinDir>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => {
if fallbacked {
Err(err).context(PuffinReadBlobSnafu)
} else {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.");
let reader = self.build_remote(file_id, file_size_hint).await?;
let res = reader.dir(key).await;
match res {
Ok(dir) => Ok(Some(dir)),
Err(err) if err.is_blob_not_found() => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
}
}
}
/// Return reader and whether it is fallbacked to remote store.
async fn ensure_reader(
&self,
file_cache: &FileCacheRef,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
) -> Result<(SstPuffinReader, bool)> {
match self.build_local_cache(file_id, file_size_hint).await {
Ok(Some(r)) => Ok((r, false)),
Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)),
Err(err) => Err(err),
}
}
async fn build_local_cache(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinReader>> {
let Some(file_cache) = &self.file_cache else {
return Ok(None);
};
let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
if file_cache.get(index_key).await.is_none() {
return Ok(None);
};
let puffin_manager = self
.puffin_manager_factory
.build(
file_cache.local_store(),
WriteCachePathProvider::new(self.region_id, file_cache.clone()),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let reader = puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
.with_file_size_hint(file_size_hint);
Ok(Some(reader))
}
async fn get_index_from_remote_file(
async fn build_remote(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
blob_key: &str,
) -> Result<Option<SstPuffinDir>> {
match self
) -> Result<SstPuffinReader> {
let puffin_manager = self
.puffin_manager_factory
.build(
self.store.clone(),
self.remote_store.clone(),
RegionFilePathFactory::new(self.region_dir.clone()),
)
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
let reader = puffin_manager
.reader(&file_id)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.dir(blob_key)
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
.with_file_size_hint(file_size_hint);
Ok(reader)
}
}

View File

@@ -570,7 +570,7 @@ mod tests {
factory.clone(),
);
async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed()
async move { applier.apply(sst_file_id, None).await.unwrap().unwrap() }.boxed()
}
}

View File

@@ -392,7 +392,10 @@ impl ParquetReaderBuilder {
.apply(self.file_handle.file_id(), Some(file_size_hint))
.await
{
Ok(res) => res,
Ok(Some(res)) => res,
Ok(None) => {
return false;
}
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(

View File

@@ -226,6 +226,16 @@ pub enum Error {
},
}
impl Error {
pub fn is_blob_not_found(&self) -> bool {
match self {
Error::BlobNotFound { .. } => true,
Error::CacheGet { source } => source.is_blob_not_found(),
_ => false,
}
}
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;