feat: bloom filter as fulltext index v2 (Part 1) (#5406)

* feat: bloom filter as fulltext index v2

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* add unit tests for tokenizer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor dup vars

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-01-22 07:33:11 +08:00
committed by GitHub
parent 6f1b5101a3
commit f74a955504
40 changed files with 572 additions and 123 deletions

2
Cargo.lock generated
View File

@@ -5301,9 +5301,11 @@ dependencies = [
"futures",
"greptime-proto",
"itertools 0.10.5",
"jieba-rs",
"mockall",
"pin-project",
"prost 0.12.6",
"puffin",
"rand",
"regex",
"regex-automata 0.4.8",

View File

@@ -223,7 +223,6 @@ impl FileReader {
}
}
#[cfg(any(test, feature = "testing"))]
impl SizeAwareRangeReader for FileReader {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing

View File

@@ -22,9 +22,11 @@ fst.workspace = true
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
jieba-rs = "0.7"
mockall.workspace = true
pin-project.workspace = true
prost.workspace = true
puffin.workspace = true
regex.workspace = true
regex-automata.workspace = true
serde.workspace = true
@@ -33,6 +35,7 @@ snafu.workspace = true
tantivy = { version = "0.22", features = ["zstd-compression"] }
tantivy-jieba = "0.11.0"
tokio.workspace = true
tokio-util.workspace = true
uuid.workspace = true
[dev-dependencies]

View File

@@ -17,8 +17,5 @@ pub mod creator;
pub mod error;
pub mod reader;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];
/// The seed used for the Bloom filter.
pub const SEED: u128 = 42;

View File

@@ -20,7 +20,7 @@ use itertools::Itertools;
use crate::bloom_filter::error::Result;
use crate::bloom_filter::reader::BloomFilterReader;
use crate::bloom_filter::Bytes;
use crate::Bytes;
pub struct BloomFilterApplier {
reader: Box<dyn BloomFilterReader + Send>,

View File

@@ -26,8 +26,9 @@ use prost::Message;
use snafu::ResultExt;
use crate::bloom_filter::error::{IoSnafu, Result};
use crate::bloom_filter::{Bytes, SEED};
use crate::bloom_filter::SEED;
use crate::external_provider::ExternalTempFileProvider;
use crate::Bytes;
/// The false positive rate of the Bloom filter.
pub const FALSE_POSITIVE_RATE: f64 = 0.01;

View File

@@ -25,8 +25,8 @@ use snafu::ResultExt;
use super::intermediate_codec::IntermediateBloomFilterCodecV1;
use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
use crate::bloom_filter::Bytes;
use crate::external_provider::ExternalTempFileProvider;
use crate::Bytes;
/// The minimum memory usage threshold for flushing in-memory Bloom filters to disk.
const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB

View File

@@ -17,6 +17,7 @@ use serde::{Deserialize, Serialize};
pub mod create;
pub mod error;
pub mod search;
pub mod tokenizer;
#[cfg(test)]
mod tests;

View File

@@ -12,11 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod bloom_filter;
mod tantivy;
use async_trait::async_trait;
pub use tantivy::{TantivyFulltextIndexCreator, ROWID_FIELD_NAME, TEXT_FIELD_NAME};
use puffin::puffin_manager::{PuffinWriter, PutOptions};
pub use crate::fulltext_index::create::bloom_filter::BloomFilterFulltextIndexCreator;
pub use crate::fulltext_index::create::tantivy::{
TantivyFulltextIndexCreator, ROWID_FIELD_NAME, TEXT_FIELD_NAME,
};
use crate::fulltext_index::error::Result;
/// `FulltextIndexCreator` is for creating a fulltext index.
@@ -26,7 +31,15 @@ pub trait FulltextIndexCreator: Send {
async fn push_text(&mut self, text: &str) -> Result<()>;
/// Finalizes the creation of the index.
async fn finish(&mut self) -> Result<()>;
async fn finish(
&mut self,
puffin_writer: &mut (impl PuffinWriter + Send),
blob_key: &str,
put_options: PutOptions,
) -> Result<u64>;
/// Aborts the creation of the index.
async fn abort(&mut self) -> Result<()>;
/// Returns the memory usage in bytes during the creation of the index.
fn memory_usage(&self) -> usize;

View File

@@ -0,0 +1,127 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{OptionExt, ResultExt};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::bloom_filter::creator::BloomFilterCreator;
use crate::external_provider::ExternalTempFileProvider;
use crate::fulltext_index::create::FulltextIndexCreator;
use crate::fulltext_index::error::{
AbortedSnafu, BiErrorsSnafu, BloomFilterFinishSnafu, ExternalSnafu, PuffinAddBlobSnafu, Result,
};
use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
use crate::fulltext_index::Config;
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
/// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter.
pub struct BloomFilterFulltextIndexCreator {
inner: Option<BloomFilterCreator>,
analyzer: Analyzer,
}
impl BloomFilterFulltextIndexCreator {
pub fn new(
config: Config,
rows_per_segment: usize,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
let tokenizer = match config.analyzer {
crate::fulltext_index::Analyzer::English => Box::new(EnglishTokenizer) as _,
crate::fulltext_index::Analyzer::Chinese => Box::new(ChineseTokenizer) as _,
};
let analyzer = Analyzer::new(tokenizer, config.case_sensitive);
let inner = BloomFilterCreator::new(
rows_per_segment,
intermediate_provider,
global_memory_usage,
global_memory_usage_threshold,
);
Self {
inner: Some(inner),
analyzer,
}
}
}
#[async_trait]
impl FulltextIndexCreator for BloomFilterFulltextIndexCreator {
async fn push_text(&mut self, text: &str) -> Result<()> {
let tokens = self.analyzer.analyze_text(text)?;
self.inner
.as_mut()
.context(AbortedSnafu)?
.push_row_elems(tokens)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(())
}
async fn finish(
&mut self,
puffin_writer: &mut (impl PuffinWriter + Send),
blob_key: &str,
put_options: PutOptions,
) -> Result<u64> {
let creator = self.inner.as_mut().context(AbortedSnafu)?;
let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let (index_finish, puffin_add_blob) = futures::join!(
creator.finish(tx.compat_write()),
puffin_writer.put_blob(blob_key, rx.compat(), put_options)
);
match (
puffin_add_blob.context(PuffinAddBlobSnafu),
index_finish.context(BloomFilterFinishSnafu),
) {
(Err(e1), Err(e2)) => BiErrorsSnafu {
first: Box::new(e1),
second: Box::new(e2),
}
.fail()?,
(Ok(_), e @ Err(_)) => e?,
(e @ Err(_), Ok(_)) => e.map(|_| ())?,
(Ok(written_bytes), Ok(_)) => {
return Ok(written_bytes);
}
}
Ok(0)
}
async fn abort(&mut self) -> Result<()> {
self.inner.take().context(AbortedSnafu)?;
Ok(())
}
fn memory_usage(&self) -> usize {
self.inner
.as_ref()
.map(|i| i.memory_usage())
.unwrap_or_default()
}
}

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::Path;
use std::path::{Path, PathBuf};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{OptionExt, ResultExt};
use tantivy::indexer::NoMergePolicy;
use tantivy::schema::{Schema, STORED, TEXT};
@@ -24,7 +26,9 @@ use tantivy::{doc, Index, IndexWriter};
use tantivy_jieba::JiebaTokenizer;
use crate::fulltext_index::create::FulltextIndexCreator;
use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu};
use crate::fulltext_index::error::{
ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu,
};
use crate::fulltext_index::{Analyzer, Config};
pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text";
@@ -43,6 +47,9 @@ pub struct TantivyFulltextIndexCreator {
/// The current max row id.
max_rowid: u64,
/// The directory path in filesystem to store the index.
path: PathBuf,
}
impl TantivyFulltextIndexCreator {
@@ -59,7 +66,7 @@ impl TantivyFulltextIndexCreator {
let rowid_field = schema_builder.add_u64_field(ROWID_FIELD_NAME, STORED);
let schema = schema_builder.build();
let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?;
let mut index = Index::create_in_dir(&path, schema).context(TantivySnafu)?;
index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default());
index.set_tokenizers(Self::build_tokenizer(&config));
@@ -76,6 +83,7 @@ impl TantivyFulltextIndexCreator {
text_field,
rowid_field,
max_rowid: 0,
path: path.as_ref().to_path_buf(),
})
}
@@ -115,14 +123,37 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator {
Ok(())
}
async fn finish(&mut self) -> Result<()> {
async fn finish(
&mut self,
puffin_writer: &mut (impl PuffinWriter + Send),
blob_key: &str,
put_options: PutOptions,
) -> Result<u64> {
let mut writer = self.writer.take().context(FinishedSnafu)?;
common_runtime::spawn_blocking_global(move || {
writer.commit().context(TantivySnafu)?;
writer.wait_merging_threads().context(TantivySnafu)
})
.await
.context(JoinSnafu)?
.context(JoinSnafu)??;
puffin_writer
.put_dir(blob_key, self.path.clone(), put_options)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
async fn abort(&mut self) -> Result<()> {
let mut writer = self.writer.take().context(FinishedSnafu)?;
common_runtime::spawn_blocking_global(move || {
writer.commit().context(TantivySnafu)?;
writer.wait_merging_threads().context(TantivySnafu)
})
.await
.context(JoinSnafu)??;
tokio::fs::remove_dir_all(&self.path).await.context(IoSnafu)
}
fn memory_usage(&self) -> usize {
@@ -134,6 +165,7 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator {
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures::AsyncRead;
use tantivy::collector::DocSetCollector;
use tantivy::query::QueryParser;
use tantivy::schema::Value;
@@ -141,6 +173,39 @@ mod tests {
use super::*;
struct MockPuffinWriter;
#[async_trait]
impl PuffinWriter for MockPuffinWriter {
async fn put_blob<R>(
&mut self,
_key: &str,
_raw_data: R,
_options: PutOptions,
) -> puffin::error::Result<u64>
where
R: AsyncRead + Send,
{
unreachable!()
}
async fn put_dir(
&mut self,
_key: &str,
_dir: PathBuf,
_options: PutOptions,
) -> puffin::error::Result<u64> {
Ok(0)
}
fn set_footer_lz4_compressed(&mut self, _lz4_compressed: bool) {
unreachable!()
}
async fn finish(self) -> puffin::error::Result<u64> {
Ok(0)
}
}
#[tokio::test]
async fn test_creator_basic() {
let memory_limits = [1, 64_000_000, usize::MAX];
@@ -241,7 +306,10 @@ mod tests {
for text in texts {
creator.push_text(text).await.unwrap();
}
creator.finish().await.unwrap();
creator
.finish(&mut MockPuffinWriter, "", PutOptions::default())
.await
.unwrap();
}
async fn query_and_check(path: &Path, cases: &[(&str, Vec<u32>)]) {

View File

@@ -76,6 +76,34 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Aborted creator"))]
Aborted {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to add blob to puffin file"))]
PuffinAddBlob {
source: puffin::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to finish bloom filter"))]
BloomFilterFinish {
source: crate::bloom_filter::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("BiErrors, first: {first}, second: {second}"))]
BiErrors {
first: Box<Error>,
second: Box<Error>,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -86,7 +114,12 @@ impl ErrorExt for Error {
Tantivy { .. } | TantivyDocNotFound { .. } => StatusCode::Internal,
TantivyParser { .. } => StatusCode::InvalidSyntax,
Io { .. } | Finished { .. } | Join { .. } => StatusCode::Unexpected,
BiErrors { .. } => StatusCode::Internal,
Io { .. } | Finished { .. } | Join { .. } | Aborted { .. } => StatusCode::Unexpected,
BloomFilterFinish { source, .. } => source.status_code(),
PuffinAddBlob { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
}

View File

@@ -13,17 +13,37 @@
// limitations under the License.
use std::collections::BTreeSet;
use std::sync::Arc;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use puffin::puffin_manager::file_accessor::MockFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::BoundedStager;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use crate::fulltext_index::{Analyzer, Config};
async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir {
let tempdir = create_temp_dir(prefix);
async fn new_bounded_stager(prefix: &str) -> (TempDir, Arc<BoundedStager>) {
let staging_dir = create_temp_dir(prefix);
let path = staging_dir.path().to_path_buf();
(
staging_dir,
Arc::new(BoundedStager::new(path, 102400).await.unwrap()),
)
}
let mut creator = TantivyFulltextIndexCreator::new(tempdir.path(), config, 1024 * 1024)
async fn create_index(
prefix: &str,
puffin_writer: &mut (impl PuffinWriter + Send),
blob_key: &str,
texts: Vec<&str>,
config: Config,
) {
let tantivy_path = create_temp_dir(prefix);
let mut creator = TantivyFulltextIndexCreator::new(tantivy_path.path(), config, 1024 * 1024)
.await
.unwrap();
@@ -31,8 +51,10 @@ async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir
creator.push_text(text).await.unwrap();
}
creator.finish().await.unwrap();
tempdir
creator
.finish(puffin_writer, blob_key, PutOptions::default())
.await
.unwrap();
}
async fn test_search(
@@ -42,9 +64,18 @@ async fn test_search(
query: &str,
expected: impl IntoIterator<Item = RowId>,
) {
let index_path = create_index(prefix, texts, config).await;
let (_staging_dir, stager) = new_bounded_stager(prefix).await;
let file_accessor = Arc::new(MockFileAccessor::new(prefix));
let puffin_manager = FsPuffinManager::new(stager, file_accessor);
let searcher = TantivyFulltextIndexSearcher::new(index_path.path()).unwrap();
let file_name = "fulltext_index";
let blob_key = "fulltext_index";
let mut writer = puffin_manager.writer(file_name).await.unwrap();
create_index(prefix, &mut writer, blob_key, texts, config).await;
let reader = puffin_manager.reader(file_name).await.unwrap();
let index_dir = reader.dir(blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap();
let results = searcher.search(query).await.unwrap();
let expected = expected.into_iter().collect::<BTreeSet<_>>();

View File

@@ -0,0 +1,125 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use jieba_rs::Jieba;
use crate::fulltext_index::error::Result;
use crate::Bytes;
/// `Tokenizer` tokenizes a text into a list of tokens.
pub trait Tokenizer: Send {
fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str>;
}
/// `EnglishTokenizer` tokenizes an English text.
///
/// It splits the text by non-alphabetic characters.
#[derive(Debug, Default)]
pub struct EnglishTokenizer;
impl Tokenizer for EnglishTokenizer {
fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> {
text.split(|c: char| !c.is_alphanumeric())
.filter(|s| !s.is_empty())
.collect()
}
}
/// `ChineseTokenizer` tokenizes a Chinese text.
///
/// It uses the Jieba tokenizer to split the text into Chinese words.
#[derive(Debug, Default)]
pub struct ChineseTokenizer;
impl Tokenizer for ChineseTokenizer {
fn tokenize<'a>(&self, text: &'a str) -> Vec<&'a str> {
let jieba = Jieba::new();
jieba.cut(text, false)
}
}
/// `Analyzer` analyzes a text into a list of tokens.
///
/// It uses a `Tokenizer` to tokenize the text and optionally lowercases the tokens.
pub struct Analyzer {
tokenizer: Box<dyn Tokenizer>,
case_sensitive: bool,
}
impl Analyzer {
/// Creates a new `Analyzer` with the given `Tokenizer` and case sensitivity.
pub fn new(tokenizer: Box<dyn Tokenizer>, case_sensitive: bool) -> Self {
Self {
tokenizer,
case_sensitive,
}
}
/// Analyzes the given text into a list of tokens.
pub fn analyze_text(&self, text: &str) -> Result<Vec<Bytes>> {
let res = self
.tokenizer
.tokenize(text)
.iter()
.map(|s| {
if self.case_sensitive {
s.as_bytes().to_vec()
} else {
s.to_lowercase().as_bytes().to_vec()
}
})
.collect();
Ok(res)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_english_tokenizer() {
let tokenizer = EnglishTokenizer;
let text = "Hello, world! This is a test0.";
let tokens = tokenizer.tokenize(text);
assert_eq!(tokens, vec!["Hello", "world", "This", "is", "a", "test0"]);
}
#[test]
fn test_chinese_tokenizer() {
let tokenizer = ChineseTokenizer;
let text = "我喜欢苹果";
let tokens = tokenizer.tokenize(text);
assert_eq!(tokens, vec!["", "喜欢", "苹果"]);
}
#[test]
fn test_analyzer() {
let tokenizer = EnglishTokenizer;
let analyzer = Analyzer::new(Box::new(tokenizer), false);
let text = "Hello, world! This is a test.";
let tokens = analyzer.analyze_text(text).unwrap();
assert_eq!(
tokens,
vec![
b"hello".to_vec(),
b"world".to_vec(),
b"this".to_vec(),
b"is".to_vec(),
b"a".to_vec(),
b"test".to_vec()
]
);
}
}

View File

@@ -18,5 +18,3 @@ pub mod format;
pub mod search;
pub type FstMap = fst::Map<Vec<u8>>;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use crate::inverted_index::error::Result;
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::BytesRef;
use crate::BytesRef;
/// `InvertedIndexCreator` provides functionality to construct an inverted index
#[async_trait]

View File

@@ -21,7 +21,7 @@ use common_base::BitVec;
use futures::Stream;
use crate::inverted_index::error::Result;
use crate::inverted_index::{Bytes, BytesRef};
use crate::{Bytes, BytesRef};
/// A stream of sorted values along with their associated bitmap
pub type SortedStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;

View File

@@ -33,7 +33,7 @@ use crate::inverted_index::create::sort::merge_stream::MergeSortedStream;
use crate::inverted_index::create::sort::{SortOutput, SortedStream, Sorter};
use crate::inverted_index::create::sort_create::SorterFactory;
use crate::inverted_index::error::{IntermediateSnafu, Result};
use crate::inverted_index::{Bytes, BytesRef};
use crate::{Bytes, BytesRef};
/// `ExternalSorter` manages the sorting of data using both in-memory structures and external files.
/// It dumps data to external files when the in-memory buffer crosses a certain memory threshold.

View File

@@ -46,7 +46,7 @@ use crate::inverted_index::create::sort::SortedStream;
use crate::inverted_index::error::{
CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu,
};
use crate::inverted_index::Bytes;
use crate::Bytes;
/// `IntermediateWriter` serializes and writes intermediate data to the wrapped `writer`
pub struct IntermediateWriter<W> {

View File

@@ -20,7 +20,7 @@ use common_base::BitVec;
use snafu::ResultExt;
use crate::inverted_index::error::{CommonIoSnafu, Error, Result};
use crate::inverted_index::Bytes;
use crate::Bytes;
const U64_LENGTH: usize = std::mem::size_of::<u64>();

View File

@@ -22,7 +22,7 @@ use pin_project::pin_project;
use crate::inverted_index::create::sort::SortedStream;
use crate::inverted_index::error::Result;
use crate::inverted_index::Bytes;
use crate::Bytes;
/// A [`Stream`] implementation that merges two sorted streams into a single sorted stream
#[pin_project]

View File

@@ -22,7 +22,7 @@ use crate::inverted_index::create::sort::{SortOutput, Sorter};
use crate::inverted_index::create::InvertedIndexCreator;
use crate::inverted_index::error::{InconsistentRowCountSnafu, Result};
use crate::inverted_index::format::writer::InvertedIndexWriter;
use crate::inverted_index::BytesRef;
use crate::BytesRef;
type IndexName = String;
type SegmentRowCount = NonZeroUsize;
@@ -120,7 +120,7 @@ mod tests {
use crate::inverted_index::create::sort::SortedStream;
use crate::inverted_index::error::Error;
use crate::inverted_index::format::writer::MockInvertedIndexWriter;
use crate::inverted_index::Bytes;
use crate::Bytes;
#[tokio::test]
async fn test_sort_index_creator_basic() {

View File

@@ -23,7 +23,7 @@ use futures::Stream;
use crate::inverted_index::error::Result;
pub use crate::inverted_index::format::writer::blob::InvertedIndexBlobWriter;
use crate::inverted_index::Bytes;
use crate::Bytes;
pub type ValueStream = Box<dyn Stream<Item = Result<(Bytes, BitVec)>> + Send + Unpin>;

View File

@@ -103,7 +103,7 @@ mod tests {
use super::*;
use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use crate::inverted_index::Bytes;
use crate::Bytes;
fn unpack(fst_value: u64) -> [u32; 2] {
bytemuck::cast::<u64, [u32; 2]>(fst_value)

View File

@@ -19,7 +19,7 @@ use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats};
use snafu::ResultExt;
use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu};
use crate::inverted_index::Bytes;
use crate::Bytes;
/// `SingleIndexWriter` writes values to the blob storage for an individual inverted index
pub struct SingleIndexWriter<W, S> {
@@ -149,7 +149,7 @@ mod tests {
use super::*;
use crate::inverted_index::error::Error;
use crate::inverted_index::Bytes;
use crate::Bytes;
#[tokio::test]
async fn test_single_index_writer_write_empty() {

View File

@@ -23,7 +23,8 @@ use crate::inverted_index::error::{
};
use crate::inverted_index::search::fst_apply::FstApplier;
use crate::inverted_index::search::predicate::Predicate;
use crate::inverted_index::{Bytes, FstMap};
use crate::inverted_index::FstMap;
use crate::Bytes;
/// `KeysFstApplier` is responsible for applying a search using a set of predefined keys
/// against an FstMap to fetch associated values.

View File

@@ -14,7 +14,7 @@
use std::collections::HashSet;
use crate::inverted_index::Bytes;
use crate::Bytes;
/// Enumerates types of predicates for value filtering.
#[derive(Debug, Clone, PartialEq, Eq)]

View File

@@ -20,3 +20,6 @@ pub mod error;
pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub type Bytes = Vec<u8>;
pub type BytesRef<'a> = &'a [u8];

View File

@@ -131,7 +131,7 @@ mod test {
use futures::stream;
use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
use index::inverted_index::Bytes;
use index::Bytes;
use prometheus::register_int_counter_vec;
use rand::{Rng, RngCore};

View File

@@ -441,8 +441,9 @@ mod tests {
// - column_id: 3
let region_metadata = mock_region_metadata();
let prefix = "test_bloom_filter_applier_";
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let object_store = mock_object_store();
let intm_mgr = new_intm_mgr(prefix).await;
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
let memory_usage_threshold = Some(1024);
let file_id = FileId::random();
let region_dir = "region_dir".to_string();
@@ -459,7 +460,6 @@ mod tests {
let batch = new_batch("tag2", 10..20);
indexer.update(&batch).await.unwrap();
let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let puffin_manager = factory.build(object_store.clone());
let mut puffin_writer = puffin_manager.writer(&path).await.unwrap();

View File

@@ -20,7 +20,7 @@ use datafusion_expr::expr::InList;
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::data_type::ConcreteDataType;
use datatypes::value::Value;
use index::bloom_filter::Bytes;
use index::Bytes;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use snafu::{OptionExt, ResultExt};

View File

@@ -441,8 +441,9 @@ pub(crate) mod tests {
#[tokio::test]
async fn test_bloom_filter_indexer() {
let prefix = "test_bloom_filter_indexer_";
let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
let object_store = mock_object_store();
let intm_mgr = new_intm_mgr(prefix).await;
let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
let region_metadata = mock_region_metadata();
let memory_usage_threshold = Some(1024);

View File

@@ -15,4 +15,5 @@
pub(crate) mod applier;
pub(crate) mod creator;
const INDEX_BLOB_TYPE: &str = "greptime-fulltext-index-v1";
const INDEX_BLOB_TYPE_TANTIVY: &str = "greptime-fulltext-index-v1";
const INDEX_BLOB_TYPE_BLOOM: &str = "greptime-fulltext-index-bloom";

View File

@@ -24,7 +24,7 @@ use store_api::storage::ColumnId;
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE_TANTIVY;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::location;
@@ -118,7 +118,7 @@ impl FulltextIndexApplier {
.reader(&file_path)
.await
.context(PuffinBuildReaderSnafu)?
.dir(&format!("{INDEX_BLOB_TYPE}-{column_id}"))
.dir(&format!("{INDEX_BLOB_TYPE_TANTIVY}-{column_id}"))
.await
{
Ok(dir) => Ok(Some(dir)),

View File

@@ -13,25 +13,26 @@
// limitations under the License.
use std::collections::HashMap;
use std::path::PathBuf;
use common_telemetry::warn;
use datatypes::schema::FulltextAnalyzer;
use index::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator};
use index::fulltext_index::create::{
BloomFilterFulltextIndexCreator, FulltextIndexCreator, TantivyFulltextIndexCreator,
};
use index::fulltext_index::{Analyzer, Config};
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use puffin::puffin_manager::PutOptions;
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::error::{
CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu,
FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, Result,
FulltextPushTextSnafu, IndexOptionsSnafu, OperateAbortedIndexSnafu, Result,
};
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE;
use crate::sst::index::fulltext_index::{INDEX_BLOB_TYPE_BLOOM, INDEX_BLOB_TYPE_TANTIVY};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
@@ -85,16 +86,17 @@ impl FulltextIndexer {
case_sensitive: options.case_sensitive,
};
// TODO(zhongzc): according to fulltext options, choose in the Tantivy flavor or Bloom Filter flavor.
let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
.await
.context(CreateFulltextCreatorSnafu)?;
let inner = AltFulltextCreator::Tantivy(creator);
creators.insert(
column_id,
SingleCreator {
column_id,
inner: Box::new(creator),
intm_path,
inner,
compress,
},
);
@@ -209,9 +211,7 @@ struct SingleCreator {
/// Column ID.
column_id: ColumnId,
/// Inner creator.
inner: Box<dyn FulltextIndexCreator>,
/// Intermediate path where the index is written to.
intm_path: PathBuf,
inner: AltFulltextCreator,
/// Whether the index should be compressed.
compress: bool,
}
@@ -238,10 +238,7 @@ impl SingleCreator {
.as_string()
.context(FieldTypeMismatchSnafu)?
.unwrap_or_default();
self.inner
.push_text(text)
.await
.context(FulltextPushTextSnafu)?;
self.inner.push_text(text).await?;
}
}
_ => {
@@ -249,10 +246,7 @@ impl SingleCreator {
// Ensure that the number of texts pushed is the same as the number of rows in the SST,
// so that the texts are aligned with the row ids.
for _ in 0..batch.num_rows() {
self.inner
.push_text("")
.await
.context(FulltextPushTextSnafu)?;
self.inner.push_text("").await?;
}
}
}
@@ -261,30 +255,82 @@ impl SingleCreator {
}
async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
self.inner.finish().await.context(FulltextFinishSnafu)?;
let options = PutOptions {
compression: self.compress.then_some(CompressionCodec::Zstd),
};
let key = format!("{INDEX_BLOB_TYPE}-{}", self.column_id);
puffin_writer
.put_dir(&key, self.intm_path.clone(), options)
self.inner
.finish(puffin_writer, &self.column_id, options)
.await
.context(PuffinAddBlobSnafu)
}
async fn abort(&mut self) -> Result<()> {
if let Err(err) = self.inner.finish().await {
warn!(err; "Failed to finish fulltext index creator, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path);
}
if let Err(err) = tokio::fs::remove_dir_all(&self.intm_path).await {
warn!(err; "Failed to remove fulltext index directory, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path);
}
self.inner.abort(&self.column_id).await;
Ok(())
}
}
#[allow(dead_code, clippy::large_enum_variant)]
/// `AltFulltextCreator` is an alternative fulltext index creator that can be either Tantivy or BloomFilter.
enum AltFulltextCreator {
Tantivy(TantivyFulltextIndexCreator),
Bloom(BloomFilterFulltextIndexCreator),
}
impl AltFulltextCreator {
async fn push_text(&mut self, text: &str) -> Result<()> {
match self {
Self::Tantivy(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
Self::Bloom(creator) => creator.push_text(text).await.context(FulltextPushTextSnafu),
}
}
fn memory_usage(&self) -> usize {
match self {
Self::Tantivy(creator) => creator.memory_usage(),
Self::Bloom(creator) => creator.memory_usage(),
}
}
async fn finish(
&mut self,
puffin_writer: &mut SstPuffinWriter,
column_id: &ColumnId,
put_options: PutOptions,
) -> Result<ByteCount> {
match self {
Self::Tantivy(creator) => {
let key = format!("{INDEX_BLOB_TYPE_TANTIVY}-{}", column_id);
creator
.finish(puffin_writer, &key, put_options)
.await
.context(FulltextFinishSnafu)
}
Self::Bloom(creator) => {
let key = format!("{INDEX_BLOB_TYPE_BLOOM}-{}", column_id);
creator
.finish(puffin_writer, &key, put_options)
.await
.context(FulltextFinishSnafu)
}
}
}
async fn abort(&mut self, column_id: &ColumnId) {
match self {
Self::Tantivy(creator) => {
if let Err(err) = creator.abort().await {
warn!(err; "Failed to abort the fulltext index creator in the Tantivy flavor, col_id: {:?}", column_id);
}
}
Self::Bloom(creator) => {
if let Err(err) = creator.abort().await {
warn!(err; "Failed to abort the fulltext index creator in the Bloom Filter flavor, col_id: {:?}", column_id);
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
@@ -299,7 +345,7 @@ mod tests {
use index::fulltext_index::search::RowId;
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::PuffinManager;
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ConcreteDataType, RegionId};

View File

@@ -14,7 +14,7 @@
use datafusion_expr::{Expr as DfExpr, Operator};
use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
use index::inverted_index::Bytes;
use index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;

View File

@@ -17,7 +17,7 @@ use std::collections::HashSet;
use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
use datatypes::data_type::ConcreteDataType;
use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use index::inverted_index::Bytes;
use index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;

View File

@@ -20,6 +20,7 @@ common-error.workspace = true
common-macro.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-test-util.workspace = true
derive_builder.workspace = true
futures.workspace = true
lz4_flex = "0.11"
@@ -36,4 +37,3 @@ uuid.workspace = true
[dev-dependencies]
common-base = { workspace = true, features = ["testing"] }
common-test-util.workspace = true

View File

@@ -13,8 +13,11 @@
// limitations under the License.
use async_trait::async_trait;
use common_base::range_read::SizeAwareRangeReader;
use common_base::range_read::{FileReader, SizeAwareRangeReader};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use futures::AsyncWrite;
use tokio::fs::File;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use crate::error::Result;
@@ -31,3 +34,37 @@ pub trait PuffinFileAccessor: Send + Sync + 'static {
/// Creates a writer for the given puffin file.
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer>;
}
pub struct MockFileAccessor {
tempdir: TempDir,
}
impl MockFileAccessor {
pub fn new(prefix: &str) -> Self {
let tempdir = create_temp_dir(prefix);
Self { tempdir }
}
}
#[async_trait]
impl PuffinFileAccessor for MockFileAccessor {
type Reader = FileReader;
type Writer = Compat<File>;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FileReader::new(self.tempdir.path().join(puffin_file_name))
.await
.unwrap())
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let p = self.tempdir.path().join(puffin_file_name);
if let Some(p) = p.parent() {
if !tokio::fs::try_exists(p).await.unwrap() {
tokio::fs::create_dir_all(p).await.unwrap();
}
}
let f = tokio::fs::File::create(p).await.unwrap();
Ok(f.compat())
}
}

View File

@@ -15,16 +15,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_base::range_read::{FileReader, RangeReader};
use common_base::range_read::RangeReader;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use tokio::fs::File;
use tokio::io::AsyncReadExt as _;
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
use crate::blob_metadata::CompressionCodec;
use crate::error::Result;
use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::file_accessor::MockFileAccessor;
use crate::puffin_manager::fs_puffin_manager::FsPuffinManager;
use crate::puffin_manager::stager::BoundedStager;
use crate::puffin_manager::{
@@ -371,37 +367,3 @@ async fn check_dir(
assert_eq!(buf, *raw_data);
}
}
pub struct MockFileAccessor {
tempdir: TempDir,
}
impl MockFileAccessor {
pub fn new(prefix: &str) -> Self {
let tempdir = create_temp_dir(prefix);
Self { tempdir }
}
}
#[async_trait]
impl PuffinFileAccessor for MockFileAccessor {
type Reader = FileReader;
type Writer = Compat<File>;
async fn reader(&self, puffin_file_name: &str) -> Result<Self::Reader> {
Ok(FileReader::new(self.tempdir.path().join(puffin_file_name))
.await
.unwrap())
}
async fn writer(&self, puffin_file_name: &str) -> Result<Self::Writer> {
let p = self.tempdir.path().join(puffin_file_name);
if let Some(p) = p.parent() {
if !tokio::fs::try_exists(p).await.unwrap() {
tokio::fs::create_dir_all(p).await.unwrap();
}
}
let f = tokio::fs::File::create(p).await.unwrap();
Ok(f.compat())
}
}