diff --git a/Cargo.lock b/Cargo.lock index f0b659d66e..efe48899a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5301,9 +5301,11 @@ dependencies = [ "futures", "greptime-proto", "itertools 0.10.5", + "jieba-rs", "mockall", "pin-project", "prost 0.12.6", + "puffin", "rand", "regex", "regex-automata 0.4.8", diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs index fb0fc61fb0..5fabc8cacb 100644 --- a/src/common/base/src/range_read.rs +++ b/src/common/base/src/range_read.rs @@ -223,7 +223,6 @@ impl FileReader { } } -#[cfg(any(test, feature = "testing"))] impl SizeAwareRangeReader for FileReader { fn with_file_size_hint(&mut self, _file_size_hint: u64) { // do nothing diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index 4186834d46..f149c76565 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -22,9 +22,11 @@ fst.workspace = true futures.workspace = true greptime-proto.workspace = true itertools.workspace = true +jieba-rs = "0.7" mockall.workspace = true pin-project.workspace = true prost.workspace = true +puffin.workspace = true regex.workspace = true regex-automata.workspace = true serde.workspace = true @@ -33,6 +35,7 @@ snafu.workspace = true tantivy = { version = "0.22", features = ["zstd-compression"] } tantivy-jieba = "0.11.0" tokio.workspace = true +tokio-util.workspace = true uuid.workspace = true [dev-dependencies] diff --git a/src/index/src/bloom_filter.rs b/src/index/src/bloom_filter.rs index 69f6cb94e0..eb818a0f5a 100644 --- a/src/index/src/bloom_filter.rs +++ b/src/index/src/bloom_filter.rs @@ -17,8 +17,5 @@ pub mod creator; pub mod error; pub mod reader; -pub type Bytes = Vec; -pub type BytesRef<'a> = &'a [u8]; - /// The seed used for the Bloom filter. pub const SEED: u128 = 42; diff --git a/src/index/src/bloom_filter/applier.rs b/src/index/src/bloom_filter/applier.rs index b847edb18f..c60b99d008 100644 --- a/src/index/src/bloom_filter/applier.rs +++ b/src/index/src/bloom_filter/applier.rs @@ -20,7 +20,7 @@ use itertools::Itertools; use crate::bloom_filter::error::Result; use crate::bloom_filter::reader::BloomFilterReader; -use crate::bloom_filter::Bytes; +use crate::Bytes; pub struct BloomFilterApplier { reader: Box, diff --git a/src/index/src/bloom_filter/creator.rs b/src/index/src/bloom_filter/creator.rs index b4030d28fd..0b6810a688 100644 --- a/src/index/src/bloom_filter/creator.rs +++ b/src/index/src/bloom_filter/creator.rs @@ -26,8 +26,9 @@ use prost::Message; use snafu::ResultExt; use crate::bloom_filter::error::{IoSnafu, Result}; -use crate::bloom_filter::{Bytes, SEED}; +use crate::bloom_filter::SEED; use crate::external_provider::ExternalTempFileProvider; +use crate::Bytes; /// The false positive rate of the Bloom filter. pub const FALSE_POSITIVE_RATE: f64 = 0.01; diff --git a/src/index/src/bloom_filter/creator/finalize_segment.rs b/src/index/src/bloom_filter/creator/finalize_segment.rs index 072d661f56..84f358f053 100644 --- a/src/index/src/bloom_filter/creator/finalize_segment.rs +++ b/src/index/src/bloom_filter/creator/finalize_segment.rs @@ -25,8 +25,8 @@ use snafu::ResultExt; use super::intermediate_codec::IntermediateBloomFilterCodecV1; use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED}; use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result}; -use crate::bloom_filter::Bytes; use crate::external_provider::ExternalTempFileProvider; +use crate::Bytes; /// The minimum memory usage threshold for flushing in-memory Bloom filters to disk. const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB diff --git a/src/index/src/fulltext_index.rs b/src/index/src/fulltext_index.rs index 8b0bde3d64..3a7f58c8ab 100644 --- a/src/index/src/fulltext_index.rs +++ b/src/index/src/fulltext_index.rs @@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize}; pub mod create; pub mod error; pub mod search; +pub mod tokenizer; #[cfg(test)] mod tests; diff --git a/src/index/src/fulltext_index/create.rs b/src/index/src/fulltext_index/create.rs index 99567a3f72..46f18999cc 100644 --- a/src/index/src/fulltext_index/create.rs +++ b/src/index/src/fulltext_index/create.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod bloom_filter; mod tantivy; use async_trait::async_trait; -pub use tantivy::{TantivyFulltextIndexCreator, ROWID_FIELD_NAME, TEXT_FIELD_NAME}; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +pub use crate::fulltext_index::create::bloom_filter::BloomFilterFulltextIndexCreator; +pub use crate::fulltext_index::create::tantivy::{ + TantivyFulltextIndexCreator, ROWID_FIELD_NAME, TEXT_FIELD_NAME, +}; use crate::fulltext_index::error::Result; /// `FulltextIndexCreator` is for creating a fulltext index. @@ -26,7 +31,15 @@ pub trait FulltextIndexCreator: Send { async fn push_text(&mut self, text: &str) -> Result<()>; /// Finalizes the creation of the index. - async fn finish(&mut self) -> Result<()>; + async fn finish( + &mut self, + puffin_writer: &mut (impl PuffinWriter + Send), + blob_key: &str, + put_options: PutOptions, + ) -> Result; + + /// Aborts the creation of the index. + async fn abort(&mut self) -> Result<()>; /// Returns the memory usage in bytes during the creation of the index. fn memory_usage(&self) -> usize; diff --git a/src/index/src/fulltext_index/create/bloom_filter.rs b/src/index/src/fulltext_index/create/bloom_filter.rs new file mode 100644 index 0000000000..ba6d4eceed --- /dev/null +++ b/src/index/src/fulltext_index/create/bloom_filter.rs @@ -0,0 +1,127 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::ext::BoxedError; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use snafu::{OptionExt, ResultExt}; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; + +use crate::bloom_filter::creator::BloomFilterCreator; +use crate::external_provider::ExternalTempFileProvider; +use crate::fulltext_index::create::FulltextIndexCreator; +use crate::fulltext_index::error::{ + AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result, +}; +use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer}; +use crate::fulltext_index::Config; + +const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192; + +/// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter. +pub struct BloomFilterFulltextIndexCreator { + inner: Option, + analyzer: Analyzer, +} + +impl BloomFilterFulltextIndexCreator { + pub fn new( + config: Config, + rows_per_segment: usize, + intermediate_provider: Arc, + global_memory_usage: Arc, + global_memory_usage_threshold: Option, + ) -> Self { + let tokenizer = match config.analyzer { + crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _, + crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _, + }; + let analyzer = Analyzer::new(tokenizer, config.case_sensitive); + + let inner = BloomFilterCreator::new( + rows_per_segment, + intermediate_provider, + global_memory_usage, + global_memory_usage_threshold, + ); + Self { + inner: Some(inner), + analyzer, + } + } +} + +#[async_trait] +impl FulltextIndexCreator for BloomFilterFulltextIndexCreator { + async fn push_text(&mut self, text: &str) -> Result<()> { + let tokens = self.analyzer.analyze_text(text)?; + self.inner + .as_mut() + .context(AbortedSnafu)? + .push_row_elems(tokens) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + Ok(()) + } + + async fn finish( + &mut self, + puffin_writer: &mut (impl PuffinWriter + Send), + blob_key: &str, + put_options: PutOptions, + ) -> Result { + let creator = self.inner.as_mut().context(AbortedSnafu)?; + + let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB); + + let (index_finish, puffin_add_blob) = futures::join!( + creator.finish(tx.compat_write()), + puffin_writer.put_blob(blob_key, rx.compat(), put_options) + ); + + match ( + puffin_add_blob.context(PuffinAddBlobSnafu), + index_finish.context(BloomFilterFinishSnafu), + ) { + (Err(e1), Err(e2)) => BiErrorsSnafu { + first: Box::new(e1), + second: Box::new(e2), + } + .fail()?, + + (Ok(_), e @ Err(_)) => e?, + (e @ Err(_), Ok(_)) => e.map(|_| ())?, + (Ok(written_bytes), Ok(_)) => { + return Ok(written_bytes); + } + } + Ok(0) + } + + async fn abort(&mut self) -> Result<()> { + self.inner.take().context(AbortedSnafu)?; + Ok(()) + } + + fn memory_usage(&self) -> usize { + self.inner + .as_ref() + .map(|i| i.memory_usage()) + .unwrap_or_default() + } +} diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index aa5966b218..2ddc8299ae 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::Path; +use std::path::{Path, PathBuf}; use async_trait::async_trait; +use common_error::ext::BoxedError; +use puffin::puffin_manager::{PuffinWriter, PutOptions}; use snafu::{OptionExt, ResultExt}; use tantivy::indexer::NoMergePolicy; use tantivy::schema::{Schema, STORED, TEXT}; @@ -24,7 +26,9 @@ use tantivy::{doc, Index, IndexWriter}; use tantivy_jieba::JiebaTokenizer; use crate::fulltext_index::create::FulltextIndexCreator; -use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu}; +use crate::fulltext_index::error::{ + ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu, +}; use crate::fulltext_index::{Analyzer, Config}; pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text"; @@ -43,6 +47,9 @@ pub struct TantivyFulltextIndexCreator { /// The current max row id. max_rowid: u64, + + /// The directory path in filesystem to store the index. + path: PathBuf, } impl TantivyFulltextIndexCreator { @@ -59,7 +66,7 @@ impl TantivyFulltextIndexCreator { let rowid_field = schema_builder.add_u64_field(ROWID_FIELD_NAME, STORED); let schema = schema_builder.build(); - let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?; + let mut index = Index::create_in_dir(&path, schema).context(TantivySnafu)?; index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default()); index.set_tokenizers(Self::build_tokenizer(&config)); @@ -76,6 +83,7 @@ impl TantivyFulltextIndexCreator { text_field, rowid_field, max_rowid: 0, + path: path.as_ref().to_path_buf(), }) } @@ -115,14 +123,37 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator { Ok(()) } - async fn finish(&mut self) -> Result<()> { + async fn finish( + &mut self, + puffin_writer: &mut (impl PuffinWriter + Send), + blob_key: &str, + put_options: PutOptions, + ) -> Result { let mut writer = self.writer.take().context(FinishedSnafu)?; common_runtime::spawn_blocking_global(move || { writer.commit().context(TantivySnafu)?; writer.wait_merging_threads().context(TantivySnafu) }) .await - .context(JoinSnafu)? + .context(JoinSnafu)??; + + puffin_writer + .put_dir(blob_key, self.path.clone(), put_options) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + + async fn abort(&mut self) -> Result<()> { + let mut writer = self.writer.take().context(FinishedSnafu)?; + common_runtime::spawn_blocking_global(move || { + writer.commit().context(TantivySnafu)?; + writer.wait_merging_threads().context(TantivySnafu) + }) + .await + .context(JoinSnafu)??; + + tokio::fs::remove_dir_all(&self.path).await.context(IoSnafu) } fn memory_usage(&self) -> usize { @@ -134,6 +165,7 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator { #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; + use futures::AsyncRead; use tantivy::collector::DocSetCollector; use tantivy::query::QueryParser; use tantivy::schema::Value; @@ -141,6 +173,39 @@ mod tests { use super::*; + struct MockPuffinWriter; + + #[async_trait] + impl PuffinWriter for MockPuffinWriter { + async fn put_blob( + &mut self, + _key: &str, + _raw_data: R, + _options: PutOptions, + ) -> puffin::error::Result + where + R: AsyncRead + Send, + { + unreachable!() + } + + async fn put_dir( + &mut self, + _key: &str, + _dir: PathBuf, + _options: PutOptions, + ) -> puffin::error::Result { + Ok(0) + } + fn set_footer_lz4_compressed(&mut self, _lz4_compressed: bool) { + unreachable!() + } + + async fn finish(self) -> puffin::error::Result { + Ok(0) + } + } + #[tokio::test] async fn test_creator_basic() { let memory_limits = [1, 64_000_000, usize::MAX]; @@ -241,7 +306,10 @@ mod tests { for text in texts { creator.push_text(text).await.unwrap(); } - creator.finish().await.unwrap(); + creator + .finish(&mut MockPuffinWriter, "", PutOptions::default()) + .await + .unwrap(); } async fn query_and_check(path: &Path, cases: &[(&str, Vec)]) { diff --git a/src/index/src/fulltext_index/error.rs b/src/index/src/fulltext_index/error.rs index 26a4331104..6cf7f74943 100644 --- a/src/index/src/fulltext_index/error.rs +++ b/src/index/src/fulltext_index/error.rs @@ -76,6 +76,34 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Aborted creator"))] + Aborted { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to add blob to puffin file"))] + PuffinAddBlob { + source: puffin::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to finish bloom filter"))] + BloomFilterFinish { + source: crate::bloom_filter::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("BiErrors, first: {first}, second: {second}"))] + BiErrors { + first: Box, + second: Box, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -86,7 +114,12 @@ impl ErrorExt for Error { Tantivy { .. } | TantivyDocNotFound { .. } => StatusCode::Internal, TantivyParser { .. } => StatusCode::InvalidSyntax, - Io { .. } | Finished { .. } | Join { .. } => StatusCode::Unexpected, + BiErrors { .. } => StatusCode::Internal, + + Io { .. } | Finished { .. } | Join { .. } | Aborted { .. } => StatusCode::Unexpected, + + BloomFilterFinish { source, .. } => source.status_code(), + PuffinAddBlob { source, .. } => source.status_code(), External { source, .. } => source.status_code(), } diff --git a/src/index/src/fulltext_index/tests.rs b/src/index/src/fulltext_index/tests.rs index 3e7c88c6a2..90449f9dde 100644 --- a/src/index/src/fulltext_index/tests.rs +++ b/src/index/src/fulltext_index/tests.rs @@ -13,17 +13,37 @@ // limitations under the License. use std::collections::BTreeSet; +use std::sync::Arc; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use puffin::puffin_manager::file_accessor::MockFileAccessor; +use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager; +use puffin::puffin_manager::stager::BoundedStager; +use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions}; use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator}; use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher}; use crate::fulltext_index::{Analyzer, Config}; -async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir { - let tempdir = create_temp_dir(prefix); +async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc) { + let staging_dir = create_temp_dir(prefix); + let path = staging_dir.path().to_path_buf(); + ( + staging_dir, + Arc::new(BoundedStager::new(path, 102400).await.unwrap()), + ) +} - let mut creator = TantivyFulltextIndexCreator::new(tempdir.path(), config, 1024 * 1024) +async fn create_index( + prefix: &str, + puffin_writer: &mut (impl PuffinWriter + Send), + blob_key: &str, + texts: Vec<&str>, + config: Config, +) { + let tantivy_path = create_temp_dir(prefix); + + let mut creator = TantivyFulltextIndexCreator::new(tantivy_path.path(), config, 1024 * 1024) .await .unwrap(); @@ -31,8 +51,10 @@ async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir creator.push_text(text).await.unwrap(); } - creator.finish().await.unwrap(); - tempdir + creator + .finish(puffin_writer, blob_key, PutOptions::default()) + .await + .unwrap(); } async fn test_search( @@ -42,9 +64,18 @@ async fn test_search( query: &str, expected: impl IntoIterator, ) { - let index_path = create_index(prefix, texts, config).await; + let (_staging_dir, stager) = new_bounded_stager(prefix).await; + let file_accessor = Arc::new(MockFileAccessor::new(prefix)); + let puffin_manager = FsPuffinManager::new(stager, file_accessor); - let searcher = TantivyFulltextIndexSearcher::new(index_path.path()).unwrap(); + let file_name = "fulltext_index"; + let blob_key = "fulltext_index"; + let mut writer = puffin_manager.writer(file_name).await.unwrap(); + create_index(prefix, &mut writer, blob_key, texts, config).await; + + let reader = puffin_manager.reader(file_name).await.unwrap(); + let index_dir = reader.dir(blob_key).await.unwrap(); + let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap(); let results = searcher.search(query).await.unwrap(); let expected = expected.into_iter().collect::>(); diff --git a/src/index/src/fulltext_index/tokenizer.rs b/src/index/src/fulltext_index/tokenizer.rs new file mode 100644 index 0000000000..721ffdd3b9 --- /dev/null +++ b/src/index/src/fulltext_index/tokenizer.rs @@ -0,0 +1,125 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use jieba_rs::Jieba; + +use crate::fulltext_index::error::Result; +use crate::Bytes; + +/// `Tokenizer` tokenizes a text into a list of tokens. +pub trait Tokenizer: Send { + fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str>; +} + +/// `EnglishTokenizer` tokenizes an English text. +/// +/// It splits the text by non-alphabetic characters. +#[derive(Debug, Default)] +pub struct EnglishTokenizer; + +impl Tokenizer for EnglishTokenizer { + fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> { + text.split(|c: char| !c.is_alphanumeric()) + .filter(|s| !s.is_empty()) + .collect() + } +} + +/// `ChineseTokenizer` tokenizes a Chinese text. +/// +/// It uses the Jieba tokenizer to split the text into Chinese words. +#[derive(Debug, Default)] +pub struct ChineseTokenizer; + +impl Tokenizer for ChineseTokenizer { + fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> { + let jieba = Jieba::new(); + jieba.cut(text, false) + } +} + +/// `Analyzer` analyzes a text into a list of tokens. +/// +/// It uses a `Tokenizer` to tokenize the text and optionally lowercases the tokens. +pub struct Analyzer { + tokenizer: Box, + case_sensitive: bool, +} + +impl Analyzer { + /// Creates a new `Analyzer` with the given `Tokenizer` and case sensitivity. + pub fn new(tokenizer: Box, case_sensitive: bool) -> Self { + Self { + tokenizer, + case_sensitive, + } + } + + /// Analyzes the given text into a list of tokens. + pub fn analyze_text(&self, text: &str) -> Result> { + let res = self + .tokenizer + .tokenize(text) + .iter() + .map(|s| { + if self.case_sensitive { + s.as_bytes().to_vec() + } else { + s.to_lowercase().as_bytes().to_vec() + } + }) + .collect(); + Ok(res) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_english_tokenizer() { + let tokenizer = EnglishTokenizer; + let text = "Hello, world! This is a test0."; + let tokens = tokenizer.tokenize(text); + assert_eq!(tokens, vec!["Hello", "world", "This", "is", "a", "test0"]); + } + + #[test] + fn test_chinese_tokenizer() { + let tokenizer = ChineseTokenizer; + let text = "我喜欢苹果"; + let tokens = tokenizer.tokenize(text); + assert_eq!(tokens, vec!["我", "喜欢", "苹果"]); + } + + #[test] + fn test_analyzer() { + let tokenizer = EnglishTokenizer; + let analyzer = Analyzer::new(Box::new(tokenizer), false); + let text = "Hello, world! This is a test."; + let tokens = analyzer.analyze_text(text).unwrap(); + assert_eq!( + tokens, + vec![ + b"hello".to_vec(), + b"world".to_vec(), + b"this".to_vec(), + b"is".to_vec(), + b"a".to_vec(), + b"test".to_vec() + ] + ); + } +} diff --git a/src/index/src/inverted_index.rs b/src/index/src/inverted_index.rs index 7a34bae213..9dc5c87014 100644 --- a/src/index/src/inverted_index.rs +++ b/src/index/src/inverted_index.rs @@ -18,5 +18,3 @@ pub mod format; pub mod search; pub type FstMap = fst::Map>; -pub type Bytes = Vec; -pub type BytesRef<'a> = &'a [u8]; diff --git a/src/index/src/inverted_index/create.rs b/src/index/src/inverted_index/create.rs index 15674d696c..b56d09dc99 100644 --- a/src/index/src/inverted_index/create.rs +++ b/src/index/src/inverted_index/create.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use crate::inverted_index::error::Result; use crate::inverted_index::format::writer::InvertedIndexWriter; -use crate::inverted_index::BytesRef; +use crate::BytesRef; /// `InvertedIndexCreator` provides functionality to construct an inverted index #[async_trait] diff --git a/src/index/src/inverted_index/create/sort.rs b/src/index/src/inverted_index/create/sort.rs index 81ca9aeca6..cb92bfa1ad 100644 --- a/src/index/src/inverted_index/create/sort.rs +++ b/src/index/src/inverted_index/create/sort.rs @@ -21,7 +21,7 @@ use common_base::BitVec; use futures::Stream; use crate::inverted_index::error::Result; -use crate::inverted_index::{Bytes, BytesRef}; +use crate::{Bytes, BytesRef}; /// A stream of sorted values along with their associated bitmap pub type SortedStream = Box> + Send + Unpin>; diff --git a/src/index/src/inverted_index/create/sort/external_sort.rs b/src/index/src/inverted_index/create/sort/external_sort.rs index f4e1d9f910..cdd6e848c9 100644 --- a/src/index/src/inverted_index/create/sort/external_sort.rs +++ b/src/index/src/inverted_index/create/sort/external_sort.rs @@ -33,7 +33,7 @@ use crate::inverted_index::create::sort::merge_stream::MergeSortedStream; use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter}; use crate::inverted_index::create::sort_create::SorterFactory; use crate::inverted_index::error::{IntermediateSnafu, Result}; -use crate::inverted_index::{Bytes, BytesRef}; +use crate::{Bytes, BytesRef}; /// `ExternalSorter` manages the sorting of data using both in-memory structures and external files. /// It dumps data to external files when the in-memory buffer crosses a certain memory threshold. diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw.rs b/src/index/src/inverted_index/create/sort/intermediate_rw.rs index dbadd5498b..85fc76e951 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw.rs @@ -46,7 +46,7 @@ use crate::inverted_index::create::sort::SortedStream; use crate::inverted_index::error::{ CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu, }; -use crate::inverted_index::Bytes; +use crate::Bytes; /// `IntermediateWriter` serializes and writes intermediate data to the wrapped `writer` pub struct IntermediateWriter { diff --git a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs index 8e4feb9902..05a4eeb57d 100644 --- a/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs +++ b/src/index/src/inverted_index/create/sort/intermediate_rw/codec_v1.rs @@ -20,7 +20,7 @@ use common_base::BitVec; use snafu::ResultExt; use crate::inverted_index::error::{CommonIoSnafu, Error, Result}; -use crate::inverted_index::Bytes; +use crate::Bytes; const U64_LENGTH: usize = std::mem::size_of::(); diff --git a/src/index/src/inverted_index/create/sort/merge_stream.rs b/src/index/src/inverted_index/create/sort/merge_stream.rs index 84debecb8a..0e60f7d8af 100644 --- a/src/index/src/inverted_index/create/sort/merge_stream.rs +++ b/src/index/src/inverted_index/create/sort/merge_stream.rs @@ -22,7 +22,7 @@ use pin_project::pin_project; use crate::inverted_index::create::sort::SortedStream; use crate::inverted_index::error::Result; -use crate::inverted_index::Bytes; +use crate::Bytes; /// A [`Stream`] implementation that merges two sorted streams into a single sorted stream #[pin_project] diff --git a/src/index/src/inverted_index/create/sort_create.rs b/src/index/src/inverted_index/create/sort_create.rs index b491c0a8b4..46c0c76269 100644 --- a/src/index/src/inverted_index/create/sort_create.rs +++ b/src/index/src/inverted_index/create/sort_create.rs @@ -22,7 +22,7 @@ use crate::inverted_index::create::sort::{SortOutput, Sorter}; use crate::inverted_index::create::InvertedIndexCreator; use crate::inverted_index::error::{InconsistentRowCountSnafu, Result}; use crate::inverted_index::format::writer::InvertedIndexWriter; -use crate::inverted_index::BytesRef; +use crate::BytesRef; type IndexName = String; type SegmentRowCount = NonZeroUsize; @@ -120,7 +120,7 @@ mod tests { use crate::inverted_index::create::sort::SortedStream; use crate::inverted_index::error::Error; use crate::inverted_index::format::writer::MockInvertedIndexWriter; - use crate::inverted_index::Bytes; + use crate::Bytes; #[tokio::test] async fn test_sort_index_creator_basic() { diff --git a/src/index/src/inverted_index/format/writer.rs b/src/index/src/inverted_index/format/writer.rs index 176b1f1561..f167766f6f 100644 --- a/src/index/src/inverted_index/format/writer.rs +++ b/src/index/src/inverted_index/format/writer.rs @@ -23,7 +23,7 @@ use futures::Stream; use crate::inverted_index::error::Result; pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter; -use crate::inverted_index::Bytes; +use crate::Bytes; pub type ValueStream = Box> + Send + Unpin>; diff --git a/src/index/src/inverted_index/format/writer/blob.rs b/src/index/src/inverted_index/format/writer/blob.rs index d53dfee855..ff4898d0dd 100644 --- a/src/index/src/inverted_index/format/writer/blob.rs +++ b/src/index/src/inverted_index/format/writer/blob.rs @@ -103,7 +103,7 @@ mod tests { use super::*; use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; - use crate::inverted_index::Bytes; + use crate::Bytes; fn unpack(fst_value: u64) -> [u32; 2] { bytemuck::cast::(fst_value) diff --git a/src/index/src/inverted_index/format/writer/single.rs b/src/index/src/inverted_index/format/writer/single.rs index 07d10b3880..e101873203 100644 --- a/src/index/src/inverted_index/format/writer/single.rs +++ b/src/index/src/inverted_index/format/writer/single.rs @@ -19,7 +19,7 @@ use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats}; use snafu::ResultExt; use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu}; -use crate::inverted_index::Bytes; +use crate::Bytes; /// `SingleIndexWriter` writes values to the blob storage for an individual inverted index pub struct SingleIndexWriter { @@ -149,7 +149,7 @@ mod tests { use super::*; use crate::inverted_index::error::Error; - use crate::inverted_index::Bytes; + use crate::Bytes; #[tokio::test] async fn test_single_index_writer_write_empty() { diff --git a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs index 118ba1edba..79da9b0e0c 100644 --- a/src/index/src/inverted_index/search/fst_apply/keys_apply.rs +++ b/src/index/src/inverted_index/search/fst_apply/keys_apply.rs @@ -23,7 +23,8 @@ use crate::inverted_index::error::{ }; use crate::inverted_index::search::fst_apply::FstApplier; use crate::inverted_index::search::predicate::Predicate; -use crate::inverted_index::{Bytes, FstMap}; +use crate::inverted_index::FstMap; +use crate::Bytes; /// `KeysFstApplier` is responsible for applying a search using a set of predefined keys /// against an FstMap to fetch associated values. diff --git a/src/index/src/inverted_index/search/predicate.rs b/src/index/src/inverted_index/search/predicate.rs index 25101e0ece..dbbc361270 100644 --- a/src/index/src/inverted_index/search/predicate.rs +++ b/src/index/src/inverted_index/search/predicate.rs @@ -14,7 +14,7 @@ use std::collections::HashSet; -use crate::inverted_index::Bytes; +use crate::Bytes; /// Enumerates types of predicates for value filtering. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index e490dbc064..91850424ad 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -20,3 +20,6 @@ pub mod error; pub mod external_provider; pub mod fulltext_index; pub mod inverted_index; + +pub type Bytes = Vec; +pub type BytesRef<'a> = &'a [u8]; diff --git a/src/mito2/src/cache/index/inverted_index.rs b/src/mito2/src/cache/index/inverted_index.rs index aaedcd8f89..1cca175f59 100644 --- a/src/mito2/src/cache/index/inverted_index.rs +++ b/src/mito2/src/cache/index/inverted_index.rs @@ -131,7 +131,7 @@ mod test { use futures::stream; use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader}; use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter}; - use index::inverted_index::Bytes; + use index::Bytes; use prometheus::register_int_counter_vec; use rand::{Rng, RngCore}; diff --git a/src/mito2/src/sst/index/bloom_filter/applier.rs b/src/mito2/src/sst/index/bloom_filter/applier.rs index 887832ce47..2ae85e0594 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier.rs @@ -441,8 +441,9 @@ mod tests { // - column_id: 3 let region_metadata = mock_region_metadata(); let prefix = "test_bloom_filter_applier_"; + let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let object_store = mock_object_store(); - let intm_mgr = new_intm_mgr(prefix).await; + let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await; let memory_usage_threshold = Some(1024); let file_id = FileId::random(); let region_dir = "region_dir".to_string(); @@ -459,7 +460,6 @@ mod tests { let batch = new_batch("tag2", 10..20); indexer.update(&batch).await.unwrap(); - let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await; let puffin_manager = factory.build(object_store.clone()); let mut puffin_writer = puffin_manager.writer(&path).await.unwrap(); diff --git a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs index 14b55cb047..956c5ce38e 100644 --- a/src/mito2/src/sst/index/bloom_filter/applier/builder.rs +++ b/src/mito2/src/sst/index/bloom_filter/applier/builder.rs @@ -20,7 +20,7 @@ use datafusion_expr::expr::InList; use datafusion_expr::{BinaryExpr, Expr, Operator}; use datatypes::data_type::ConcreteDataType; use datatypes::value::Value; -use index::bloom_filter::Bytes; +use index::Bytes; use object_store::ObjectStore; use puffin::puffin_manager::cache::PuffinMetadataCacheRef; use snafu::{OptionExt, ResultExt}; diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 6676375cb6..0f97ea1027 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -441,8 +441,9 @@ pub(crate) mod tests { #[tokio::test] async fn test_bloom_filter_indexer() { let prefix = "test_bloom_filter_indexer_"; + let tempdir = common_test_util::temp_dir::create_temp_dir(prefix); let object_store = mock_object_store(); - let intm_mgr = new_intm_mgr(prefix).await; + let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await; let region_metadata = mock_region_metadata(); let memory_usage_threshold = Some(1024); diff --git a/src/mito2/src/sst/index/fulltext_index.rs b/src/mito2/src/sst/index/fulltext_index.rs index 04c2e6daba..86d8a35b9d 100644 --- a/src/mito2/src/sst/index/fulltext_index.rs +++ b/src/mito2/src/sst/index/fulltext_index.rs @@ -15,4 +15,5 @@ pub(crate) mod applier; pub(crate) mod creator; -const INDEX_BLOB_TYPE: &str = "greptime-fulltext-index-v1"; +const INDEX_BLOB_TYPE_TANTIVY: &str = "greptime-fulltext-index-v1"; +const INDEX_BLOB_TYPE_BLOOM: &str = "greptime-fulltext-index-bloom"; diff --git a/src/mito2/src/sst/index/fulltext_index/applier.rs b/src/mito2/src/sst/index/fulltext_index/applier.rs index 8eda101608..7d3230781e 100644 --- a/src/mito2/src/sst/index/fulltext_index/applier.rs +++ b/src/mito2/src/sst/index/fulltext_index/applier.rs @@ -24,7 +24,7 @@ use store_api::storage::ColumnId; use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result}; use crate::metrics::INDEX_APPLY_ELAPSED; use crate::sst::file::FileId; -use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE; +use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY; use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir}; use crate::sst::index::TYPE_FULLTEXT_INDEX; use crate::sst::location; @@ -118,7 +118,7 @@ impl FulltextIndexApplier { .reader(&file_path) .await .context(PuffinBuildReaderSnafu)? - .dir(&format!("{INDEX_BLOB_TYPE}-{column_id}")) + .dir(&format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}")) .await { Ok(dir) => Ok(Some(dir)), diff --git a/src/mito2/src/sst/index/fulltext_index/creator.rs b/src/mito2/src/sst/index/fulltext_index/creator.rs index 41fa15bd7c..3275f00a14 100644 --- a/src/mito2/src/sst/index/fulltext_index/creator.rs +++ b/src/mito2/src/sst/index/fulltext_index/creator.rs @@ -13,25 +13,26 @@ // limitations under the License. use std::collections::HashMap; -use std::path::PathBuf; use common_telemetry::warn; use datatypes::schema::FulltextAnalyzer; -use index::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator}; +use index::fulltext_index::create::{ + BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator, +}; use index::fulltext_index::{Analyzer, Config}; use puffin::blob_metadata::CompressionCodec; -use puffin::puffin_manager::{PuffinWriter, PutOptions}; +use puffin::puffin_manager::PutOptions; use snafu::{ensure, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, ConcreteDataType, RegionId}; use crate::error::{ CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu, - FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result, + FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result, }; use crate::read::Batch; use crate::sst::file::FileId; -use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE; +use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::SstPuffinWriter; use crate::sst::index::statistics::{ByteCount, RowCount, Statistics}; @@ -85,16 +86,17 @@ impl FulltextIndexer { case_sensitive: options.case_sensitive, }; + // TODO(zhongzc): according to fulltext options, choose in the Tantivy flavor or Bloom Filter flavor. let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit) .await .context(CreateFulltextCreatorSnafu)?; + let inner = AltFulltextCreator::Tantivy(creator); creators.insert( column_id, SingleCreator { column_id, - inner: Box::new(creator), - intm_path, + inner, compress, }, ); @@ -209,9 +211,7 @@ struct SingleCreator { /// Column ID. column_id: ColumnId, /// Inner creator. - inner: Box, - /// Intermediate path where the index is written to. - intm_path: PathBuf, + inner: AltFulltextCreator, /// Whether the index should be compressed. compress: bool, } @@ -238,10 +238,7 @@ impl SingleCreator { .as_string() .context(FieldTypeMismatchSnafu)? .unwrap_or_default(); - self.inner - .push_text(text) - .await - .context(FulltextPushTextSnafu)?; + self.inner.push_text(text).await?; } } _ => { @@ -249,10 +246,7 @@ impl SingleCreator { // Ensure that the number of texts pushed is the same as the number of rows in the SST, // so that the texts are aligned with the row ids. for _ in 0..batch.num_rows() { - self.inner - .push_text("") - .await - .context(FulltextPushTextSnafu)?; + self.inner.push_text("").await?; } } } @@ -261,30 +255,82 @@ impl SingleCreator { } async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result { - self.inner.finish().await.context(FulltextFinishSnafu)?; - let options = PutOptions { compression: self.compress.then_some(CompressionCodec::Zstd), }; - - let key = format!("{INDEX_BLOB_TYPE}-{}", self.column_id); - puffin_writer - .put_dir(&key, self.intm_path.clone(), options) + self.inner + .finish(puffin_writer, &self.column_id, options) .await - .context(PuffinAddBlobSnafu) } async fn abort(&mut self) -> Result<()> { - if let Err(err) = self.inner.finish().await { - warn!(err; "Failed to finish fulltext index creator, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path); - } - if let Err(err) = tokio::fs::remove_dir_all(&self.intm_path).await { - warn!(err; "Failed to remove fulltext index directory, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path); - } + self.inner.abort(&self.column_id).await; Ok(()) } } +#[allow(dead_code, clippy::large_enum_variant)] +/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter. +enum AltFulltextCreator { + Tantivy(TantivyFulltextIndexCreator), + Bloom(BloomFilterFulltextIndexCreator), +} + +impl AltFulltextCreator { + async fn push_text(&mut self, text: &str) -> Result<()> { + match self { + Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu), + Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu), + } + } + + fn memory_usage(&self) -> usize { + match self { + Self::Tantivy(creator) => creator.memory_usage(), + Self::Bloom(creator) => creator.memory_usage(), + } + } + + async fn finish( + &mut self, + puffin_writer: &mut SstPuffinWriter, + column_id: &ColumnId, + put_options: PutOptions, + ) -> Result { + match self { + Self::Tantivy(creator) => { + let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id); + creator + .finish(puffin_writer, &key, put_options) + .await + .context(FulltextFinishSnafu) + } + Self::Bloom(creator) => { + let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id); + creator + .finish(puffin_writer, &key, put_options) + .await + .context(FulltextFinishSnafu) + } + } + } + + async fn abort(&mut self, column_id: &ColumnId) { + match self { + Self::Tantivy(creator) => { + if let Err(err) = creator.abort().await { + warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id); + } + } + Self::Bloom(creator) => { + if let Err(err) = creator.abort().await { + warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id); + } + } + } + } +} + #[cfg(test)] mod tests { use std::collections::BTreeSet; @@ -299,7 +345,7 @@ mod tests { use index::fulltext_index::search::RowId; use object_store::services::Memory; use object_store::ObjectStore; - use puffin::puffin_manager::PuffinManager; + use puffin::puffin_manager::{PuffinManager, PuffinWriter}; use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; use store_api::storage::{ConcreteDataType, RegionId}; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs index 138b15b82e..b09ac93f71 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/comparison.rs @@ -14,7 +14,7 @@ use datafusion_expr::{Expr as DfExpr, Operator}; use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate}; -use index::inverted_index::Bytes; +use index::Bytes; use crate::error::Result; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; diff --git a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs index 35a5caad56..765d872500 100644 --- a/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/inverted_index/applier/builder/eq_list.rs @@ -17,7 +17,7 @@ use std::collections::HashSet; use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator}; use datatypes::data_type::ConcreteDataType; use index::inverted_index::search::predicate::{InListPredicate, Predicate}; -use index::inverted_index::Bytes; +use index::Bytes; use crate::error::Result; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; diff --git a/src/puffin/Cargo.toml b/src/puffin/Cargo.toml index 7116bbef52..078e3aebeb 100644 --- a/src/puffin/Cargo.toml +++ b/src/puffin/Cargo.toml @@ -20,6 +20,7 @@ common-error.workspace = true common-macro.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +common-test-util.workspace = true derive_builder.workspace = true futures.workspace = true lz4_flex = "0.11" @@ -36,4 +37,3 @@ uuid.workspace = true [dev-dependencies] common-base = { workspace = true, features = ["testing"] } -common-test-util.workspace = true diff --git a/src/puffin/src/puffin_manager/file_accessor.rs b/src/puffin/src/puffin_manager/file_accessor.rs index 351423b054..193aa037f5 100644 --- a/src/puffin/src/puffin_manager/file_accessor.rs +++ b/src/puffin/src/puffin_manager/file_accessor.rs @@ -13,8 +13,11 @@ // limitations under the License. use async_trait::async_trait; -use common_base::range_read::SizeAwareRangeReader; +use common_base::range_read::{FileReader, SizeAwareRangeReader}; +use common_test_util::temp_dir::{create_temp_dir, TempDir}; use futures::AsyncWrite; +use tokio::fs::File; +use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use crate::error::Result; @@ -31,3 +34,37 @@ pub trait PuffinFileAccessor: Send + Sync + 'static { /// Creates a writer for the given puffin file. async fn writer(&self, puffin_file_name: &str) -> Result; } + +pub struct MockFileAccessor { + tempdir: TempDir, +} + +impl MockFileAccessor { + pub fn new(prefix: &str) -> Self { + let tempdir = create_temp_dir(prefix); + Self { tempdir } + } +} + +#[async_trait] +impl PuffinFileAccessor for MockFileAccessor { + type Reader = FileReader; + type Writer = Compat; + + async fn reader(&self, puffin_file_name: &str) -> Result { + Ok(FileReader::new(self.tempdir.path().join(puffin_file_name)) + .await + .unwrap()) + } + + async fn writer(&self, puffin_file_name: &str) -> Result { + let p = self.tempdir.path().join(puffin_file_name); + if let Some(p) = p.parent() { + if !tokio::fs::try_exists(p).await.unwrap() { + tokio::fs::create_dir_all(p).await.unwrap(); + } + } + let f = tokio::fs::File::create(p).await.unwrap(); + Ok(f.compat()) + } +} diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index c4057a5f5b..23756aec64 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -15,16 +15,12 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; -use common_base::range_read::{FileReader, RangeReader}; +use common_base::range_read::RangeReader; use common_test_util::temp_dir::{create_temp_dir, TempDir}; -use tokio::fs::File; use tokio::io::AsyncReadExt as _; -use tokio_util::compat::{Compat, TokioAsyncReadCompatExt}; use crate::blob_metadata::CompressionCodec; -use crate::error::Result; -use crate::puffin_manager::file_accessor::PuffinFileAccessor; +use crate::puffin_manager::file_accessor::MockFileAccessor; use crate::puffin_manager::fs_puffin_manager::FsPuffinManager; use crate::puffin_manager::stager::BoundedStager; use crate::puffin_manager::{ @@ -371,37 +367,3 @@ async fn check_dir( assert_eq!(buf, *raw_data); } } - -pub struct MockFileAccessor { - tempdir: TempDir, -} - -impl MockFileAccessor { - pub fn new(prefix: &str) -> Self { - let tempdir = create_temp_dir(prefix); - Self { tempdir } - } -} - -#[async_trait] -impl PuffinFileAccessor for MockFileAccessor { - type Reader = FileReader; - type Writer = Compat; - - async fn reader(&self, puffin_file_name: &str) -> Result { - Ok(FileReader::new(self.tempdir.path().join(puffin_file_name)) - .await - .unwrap()) - } - - async fn writer(&self, puffin_file_name: &str) -> Result { - let p = self.tempdir.path().join(puffin_file_name); - if let Some(p) = p.parent() { - if !tokio::fs::try_exists(p).await.unwrap() { - tokio::fs::create_dir_all(p).await.unwrap(); - } - } - let f = tokio::fs::File::create(p).await.unwrap(); - Ok(f.compat()) - } -}