From 6c66ec3ffc9b8cc056b35def0856ca2afd37832d Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 9 Apr 2025 12:27:41 +0800 Subject: [PATCH] refactor: abstract index source from fulltext index applier (#5845) * feat: add term as fulltext index request Signed-off-by: Zhenchi * fix fmt Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi * refactor: abstract index source from fulltext index applier Signed-off-by: Zhenchi * address comments Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- .../src/sst/index/fulltext_index/applier.rs | 312 ++++++++++++------ .../src/sst/index/fulltext_index/creator.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 5 +- src/puffin/src/error.rs | 10 + 4 files changed, 225 insertions(+), 104 deletions(-) diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 8de040ae66..e463bd0ee8 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -19,7 +19,7 @@ use common_telemetry::warn; use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; -use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader}; +use puffin::puffin_manager::{BlobWithMetadata, DirGuard, PuffinManager, PuffinReader}; use snafu::ResultExt; use store_api::storage::{ColumnId, RegionId}; @@ -30,33 +30,20 @@ use crate::metrics::INDEX_APPLY_ELAPSED; use crate::sst::file::FileId; use crate::sst::index::fulltext_index::applier::builder::FulltextRequest; use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY; -use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir}; +use crate::sst::index::puffin_manager::{ + PuffinManagerFactory, SstPuffinBlob, SstPuffinDir, SstPuffinReader, +}; use crate::sst::index::TYPE_FULLTEXT_INDEX; pub mod builder; /// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files pub struct FulltextIndexApplier { - /// The root directory of the region. - region_dir: String, - - /// The region ID. - region_id: RegionId, - /// Requests to be applied. requests: HashMap, - /// The puffin manager factory. - puffin_manager_factory: PuffinManagerFactory, - - /// Store responsible for accessing index files. - store: ObjectStore, - - /// File cache to be used by the `FulltextIndexApplier`. - file_cache: Option, - - /// The puffin metadata cache. - puffin_metadata_cache: Option, + /// The source of the index. + index_source: IndexSource, } pub type FulltextIndexApplierRef = Arc; @@ -70,20 +57,17 @@ impl FulltextIndexApplier { requests: HashMap, puffin_manager_factory: PuffinManagerFactory, ) -> Self { + let index_source = IndexSource::new(region_dir, region_id, puffin_manager_factory, store); + Self { - region_dir, - region_id, - store, requests, - puffin_manager_factory, - file_cache: None, - puffin_metadata_cache: None, + index_source, } } /// Sets the file cache. pub fn with_file_cache(mut self, file_cache: Option) -> Self { - self.file_cache = file_cache; + self.index_source.set_file_cache(file_cache); self } @@ -92,7 +76,8 @@ impl FulltextIndexApplier { mut self, puffin_metadata_cache: Option, ) -> Self { - self.puffin_metadata_cache = puffin_metadata_cache; + self.index_source + .set_puffin_metadata_cache(puffin_metadata_cache); self } @@ -101,44 +86,33 @@ impl FulltextIndexApplier { &self, file_id: FileId, file_size_hint: Option, - ) -> Result> { + ) -> Result>> { let _timer = INDEX_APPLY_ELAPSED .with_label_values(&[TYPE_FULLTEXT_INDEX]) .start_timer(); - let mut inited = false; - let mut row_ids = BTreeSet::new(); + let mut row_ids: Option> = None; + for (column_id, request) in &self.requests { + if request.queries.is_empty() { + continue; + } - 'outer: for (column_id, request) in &self.requests { - let dir = self - .index_dir_path(file_id, *column_id, file_size_hint) - .await?; - let path = match &dir { - Some(dir) => dir.path(), - None => { - // Return empty set if the index not found. - return Ok(BTreeSet::new()); - } + let Some(result) = self + .apply_one_column(file_size_hint, file_id, *column_id, request) + .await? + else { + continue; }; - let searcher = - TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?; + if let Some(ids) = row_ids.as_mut() { + ids.retain(|id| result.contains(id)); + } else { + row_ids = Some(result); + } - for query in &request.queries { - let result = searcher - .search(&query.0) - .await - .context(ApplyFulltextIndexSnafu)?; - - if !inited { - row_ids = result; - inited = true; - continue; - } - - row_ids.retain(|id| result.contains(id)); - if row_ids.is_empty() { - break 'outer; + if let Some(ids) = row_ids.as_ref() { + if ids.is_empty() { + break; } } } @@ -146,84 +120,218 @@ impl FulltextIndexApplier { Ok(row_ids) } - /// Returns `None` if the index not found. - async fn index_dir_path( + async fn apply_one_column( &self, + file_size_hint: Option, file_id: FileId, column_id: ColumnId, - file_size_hint: Option, - ) -> Result> { + request: &FulltextRequest, + ) -> Result>> { let blob_key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}"); + let dir = self + .index_source + .dir(file_id, &blob_key, file_size_hint) + .await?; - // FAST PATH: Try to read the index from the file cache. - if let Some(file_cache) = &self.file_cache { - let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); - if file_cache.get(index_key).await.is_some() { - match self - .get_index_from_file_cache(file_cache, file_id, file_size_hint, &blob_key) - .await - { - Ok(dir) => return Ok(dir), - Err(err) => { - warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") - } + let path = match &dir { + Some(dir) => dir.path(), + None => { + return Ok(None); + } + }; + + let searcher = TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?; + let mut row_ids: Option> = None; + for query in &request.queries { + let result = searcher + .search(&query.0) + .await + .context(ApplyFulltextIndexSnafu)?; + + if let Some(ids) = row_ids.as_mut() { + ids.retain(|id| result.contains(id)); + } else { + row_ids = Some(result); + } + + if let Some(ids) = row_ids.as_ref() { + if ids.is_empty() { + break; } } } - // SLOW PATH: Try to read the index from the remote file. - self.get_index_from_remote_file(file_id, file_size_hint, &blob_key) - .await + Ok(row_ids) + } +} + +/// The source of the index. +struct IndexSource { + region_dir: String, + region_id: RegionId, + + /// The puffin manager factory. + puffin_manager_factory: PuffinManagerFactory, + + /// Store responsible for accessing remote index files. + remote_store: ObjectStore, + + /// Local file cache. + file_cache: Option, + + /// The puffin metadata cache. + puffin_metadata_cache: Option, +} + +impl IndexSource { + fn new( + region_dir: String, + region_id: RegionId, + puffin_manager_factory: PuffinManagerFactory, + remote_store: ObjectStore, + ) -> Self { + Self { + region_dir, + region_id, + puffin_manager_factory, + remote_store, + file_cache: None, + puffin_metadata_cache: None, + } } - async fn get_index_from_file_cache( + fn set_file_cache(&mut self, file_cache: Option) { + self.file_cache = file_cache; + } + + fn set_puffin_metadata_cache(&mut self, puffin_metadata_cache: Option) { + self.puffin_metadata_cache = puffin_metadata_cache; + } + + /// Returns the blob with the specified key from local cache or remote store. + /// + /// Returns `None` if the blob is not found. + #[allow(unused)] + async fn blob( + &self, + file_id: FileId, + key: &str, + file_size_hint: Option, + ) -> Result>> { + let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; + let res = reader.blob(key).await; + match res { + Ok(blob) => Ok(Some(blob)), + Err(err) if err.is_blob_not_found() => Ok(None), + Err(err) => { + if fallbacked { + Err(err).context(PuffinReadBlobSnafu) + } else { + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file."); + let reader = self.build_remote(file_id, file_size_hint).await?; + let res = reader.blob(key).await; + match res { + Ok(blob) => Ok(Some(blob)), + Err(err) if err.is_blob_not_found() => Ok(None), + Err(err) => Err(err).context(PuffinReadBlobSnafu), + } + } + } + } + } + + /// Returns the directory with the specified key from local cache or remote store. + /// + /// Returns `None` if the directory is not found. + async fn dir( + &self, + file_id: FileId, + key: &str, + file_size_hint: Option, + ) -> Result> { + let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?; + let res = reader.dir(key).await; + match res { + Ok(dir) => Ok(Some(dir)), + Err(err) if err.is_blob_not_found() => Ok(None), + Err(err) => { + if fallbacked { + Err(err).context(PuffinReadBlobSnafu) + } else { + warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file."); + let reader = self.build_remote(file_id, file_size_hint).await?; + let res = reader.dir(key).await; + match res { + Ok(dir) => Ok(Some(dir)), + Err(err) if err.is_blob_not_found() => Ok(None), + Err(err) => Err(err).context(PuffinReadBlobSnafu), + } + } + } + } + } + + /// Return reader and whether it is fallbacked to remote store. + async fn ensure_reader( &self, - file_cache: &FileCacheRef, file_id: FileId, file_size_hint: Option, - blob_key: &str, - ) -> Result> { - match self + ) -> Result<(SstPuffinReader, bool)> { + match self.build_local_cache(file_id, file_size_hint).await { + Ok(Some(r)) => Ok((r, false)), + Ok(None) => Ok((self.build_remote(file_id, file_size_hint).await?, true)), + Err(err) => Err(err), + } + } + + async fn build_local_cache( + &self, + file_id: FileId, + file_size_hint: Option, + ) -> Result> { + let Some(file_cache) = &self.file_cache else { + return Ok(None); + }; + + let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin); + if file_cache.get(index_key).await.is_none() { + return Ok(None); + }; + + let puffin_manager = self .puffin_manager_factory .build( file_cache.local_store(), WriteCachePathProvider::new(self.region_id, file_cache.clone()), ) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + let reader = puffin_manager .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? - .with_file_size_hint(file_size_hint) - .dir(blob_key) - .await - { - Ok(dir) => Ok(Some(dir)), - Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None), - Err(err) => Err(err).context(PuffinReadBlobSnafu), - } + .with_file_size_hint(file_size_hint); + Ok(Some(reader)) } - async fn get_index_from_remote_file( + async fn build_remote( &self, file_id: FileId, file_size_hint: Option, - blob_key: &str, - ) -> Result> { - match self + ) -> Result { + let puffin_manager = self .puffin_manager_factory .build( - self.store.clone(), + self.remote_store.clone(), RegionFilePathFactory::new(self.region_dir.clone()), ) + .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + + let reader = puffin_manager .reader(&file_id) .await .context(PuffinBuildReaderSnafu)? - .with_file_size_hint(file_size_hint) - .dir(blob_key) - .await - { - Ok(dir) => Ok(Some(dir)), - Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None), - Err(err) => Err(err).context(PuffinReadBlobSnafu), - } + .with_file_size_hint(file_size_hint); + + Ok(reader) } } diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 2d71892874..b6eab05bfa 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -570,7 +570,7 @@ mod tests { factory.clone(), ); - async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed() + async move { applier.apply(sst_file_id, None).await.unwrap().unwrap() }.boxed() } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 7575584e8e..069e10344c 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -392,7 +392,10 @@ impl ParquetReaderBuilder { .apply(self.file_handle.file_id(), Some(file_size_hint)) .await { - Ok(res) => res, + Ok(Some(res)) => res, + Ok(None) => { + return false; + } Err(err) => { if cfg!(any(test, feature = "test")) { panic!( diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 634ede5b13..40d1eff9ad 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -226,6 +226,16 @@ pub enum Error { }, } +impl Error { + pub fn is_blob_not_found(&self) -> bool { + match self { + Error::BlobNotFound { .. } => true, + Error::CacheGet { source } => source.is_blob_not_found(), + _ => false, + } + } +} + impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*;