feat: apply terms with fulltext tantivy backend (#5869)

* feat: apply terms with fulltext tantivy backend

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-04-10 15:32:15 +08:00
committed by GitHub
parent 54ef29f394
commit dce5e35d7c
13 changed files with 591 additions and 196 deletions

View File

@@ -12,18 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use puffin::blob_metadata::BlobMetadata;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager};
use tantivy_jieba::JiebaTokenizer;
pub mod create;
pub mod error;
pub mod search;
pub mod tokenizer;
pub const KEY_FULLTEXT_CONFIG: &str = "fulltext_config";
use crate::fulltext_index::error::{DeserializeFromJsonSnafu, Result};
#[cfg(test)]
mod tests;
/// Configuration for fulltext index.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct Config {
/// Analyzer to use for tokenization.
pub analyzer: Analyzer,
@@ -33,10 +40,38 @@ pub struct Config {
}
/// Analyzer to use for tokenization.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum Analyzer {
#[default]
English,
Chinese,
}
impl Config {
fn build_tantivy_tokenizer(&self) -> TokenizerManager {
let mut builder = match self.analyzer {
Analyzer::English => TextAnalyzer::builder(SimpleTokenizer::default()).dynamic(),
Analyzer::Chinese => TextAnalyzer::builder(JiebaTokenizer {}).dynamic(),
};
if !self.case_sensitive {
builder = builder.filter_dynamic(LowerCaser);
}
let tokenizer = builder.build();
let tokenizer_manager = TokenizerManager::new();
tokenizer_manager.register("default", tokenizer);
tokenizer_manager
}
/// Extracts the fulltext index configuration from the blob metadata.
pub fn from_blob_metadata(metadata: &BlobMetadata) -> Result<Self> {
if let Some(config) = metadata.properties.get(KEY_FULLTEXT_CONFIG) {
let config = serde_json::from_str(config).context(DeserializeFromJsonSnafu)?;
return Ok(config);
}
Ok(Self::default())
}
}

View File

@@ -30,12 +30,10 @@ use crate::fulltext_index::error::{
SerializeToJsonSnafu,
};
use crate::fulltext_index::tokenizer::{Analyzer, ChineseTokenizer, EnglishTokenizer};
use crate::fulltext_index::Config;
use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
pub const KEY_FULLTEXT_CONFIG: &str = "fulltext_config";
/// `BloomFilterFulltextIndexCreator` is for creating a fulltext index using a bloom filter.
pub struct BloomFilterFulltextIndexCreator {
inner: Option<BloomFilterCreator>,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use async_trait::async_trait;
@@ -21,15 +22,13 @@ use snafu::{OptionExt, ResultExt};
use tantivy::indexer::NoMergePolicy;
use tantivy::schema::{Schema, STORED, TEXT};
use tantivy::store::{Compressor, ZstdCompressor};
use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager};
use tantivy::{doc, Index, IndexWriter};
use tantivy_jieba::JiebaTokenizer;
use crate::fulltext_index::create::FulltextIndexCreator;
use crate::fulltext_index::error::{
ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, TantivySnafu,
ExternalSnafu, FinishedSnafu, IoSnafu, JoinSnafu, Result, SerializeToJsonSnafu, TantivySnafu,
};
use crate::fulltext_index::{Analyzer, Config};
use crate::fulltext_index::{Config, KEY_FULLTEXT_CONFIG};
pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text";
pub const ROWID_FIELD_NAME: &str = "greptime_fulltext_rowid";
@@ -50,6 +49,9 @@ pub struct TantivyFulltextIndexCreator {
/// The directory path in filesystem to store the index.
path: PathBuf,
/// The configuration of the fulltext index.
config: Config,
}
impl TantivyFulltextIndexCreator {
@@ -68,7 +70,7 @@ impl TantivyFulltextIndexCreator {
let mut index = Index::create_in_dir(&path, schema).context(TantivySnafu)?;
index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default());
index.set_tokenizers(Self::build_tokenizer(&config));
index.set_tokenizers(config.build_tantivy_tokenizer());
let memory_limit = Self::sanitize_memory_limit(memory_limit);
@@ -84,25 +86,10 @@ impl TantivyFulltextIndexCreator {
rowid_field,
max_rowid: 0,
path: path.as_ref().to_path_buf(),
config,
})
}
fn build_tokenizer(config: &Config) -> TokenizerManager {
let mut builder = match config.analyzer {
Analyzer::English => TextAnalyzer::builder(SimpleTokenizer::default()).dynamic(),
Analyzer::Chinese => TextAnalyzer::builder(JiebaTokenizer {}).dynamic(),
};
if !config.case_sensitive {
builder = builder.filter_dynamic(LowerCaser);
}
let tokenizer = builder.build();
let tokenizer_manager = TokenizerManager::new();
tokenizer_manager.register("default", tokenizer);
tokenizer_manager
}
fn sanitize_memory_limit(memory_limit: usize) -> usize {
// Port from tantivy::indexer::index_writer::{MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX}
const MARGIN_IN_BYTES: usize = 1_000_000;
@@ -137,8 +124,16 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator {
.await
.context(JoinSnafu)??;
let property_key = KEY_FULLTEXT_CONFIG.to_string();
let property_value = serde_json::to_string(&self.config).context(SerializeToJsonSnafu)?;
puffin_writer
.put_dir(blob_key, self.path.clone(), put_options)
.put_dir(
blob_key,
self.path.clone(),
put_options,
HashMap::from([(property_key, property_value)]),
)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
@@ -174,6 +169,7 @@ mod tests {
use tantivy::TantivyDocument;
use super::*;
use crate::fulltext_index::Analyzer;
struct MockPuffinWriter;
@@ -197,6 +193,7 @@ mod tests {
_key: &str,
_dir: PathBuf,
_options: PutOptions,
_properties: HashMap<String, String>,
) -> puffin::error::Result<u64> {
Ok(0)
}
@@ -226,7 +223,7 @@ mod tests {
("foo", vec![3]),
("bar", vec![4]),
];
query_and_check(temp_dir.path(), &cases).await;
query_and_check(temp_dir.path(), config, &cases).await;
}
}
@@ -248,9 +245,13 @@ mod tests {
("hello", vec![0u32, 2]),
("world", vec![1, 2]),
("foo", vec![3]),
("Foo", vec![]),
("FOO", vec![]),
("bar", vec![]),
("Bar", vec![4]),
("BAR", vec![]),
];
query_and_check(temp_dir.path(), &cases).await;
query_and_check(temp_dir.path(), config, &cases).await;
}
}
@@ -274,7 +275,7 @@ mod tests {
("foo", vec![4]),
("bar", vec![5]),
];
query_and_check(temp_dir.path(), &cases).await;
query_and_check(temp_dir.path(), config, &cases).await;
}
}
@@ -297,8 +298,12 @@ mod tests {
("世界", vec![1, 2, 3]),
("foo", vec![4]),
("bar", vec![]),
("Foo", vec![]),
("FOO", vec![]),
("Bar", vec![5]),
("BAR", vec![]),
];
query_and_check(temp_dir.path(), &cases).await;
query_and_check(temp_dir.path(), config, &cases).await;
}
}
@@ -315,8 +320,9 @@ mod tests {
.unwrap();
}
async fn query_and_check(path: &Path, cases: &[(&str, Vec<u32>)]) {
let index = Index::open_in_dir(path).unwrap();
async fn query_and_check(path: &Path, config: Config, cases: &[(&str, Vec<u32>)]) {
let mut index = Index::open_in_dir(path).unwrap();
index.set_tokenizers(config.build_tantivy_tokenizer());
let reader = index.reader().unwrap();
let searcher = reader.searcher();
for (query, expected) in cases {

View File

@@ -29,6 +29,7 @@ use crate::fulltext_index::error::{
Result, TantivyDocNotFoundSnafu, TantivyParserSnafu, TantivySnafu,
};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId};
use crate::fulltext_index::Config;
/// `TantivyFulltextIndexSearcher` is a searcher using Tantivy.
pub struct TantivyFulltextIndexSearcher {
@@ -42,10 +43,11 @@ pub struct TantivyFulltextIndexSearcher {
impl TantivyFulltextIndexSearcher {
/// Creates a new `TantivyFulltextIndexSearcher`.
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
pub fn new(path: impl AsRef<Path>, config: Config) -> Result<Self> {
let now = Instant::now();
let index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?;
let mut index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?;
index.set_tokenizers(config.build_tantivy_tokenizer());
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)

View File

@@ -19,7 +19,7 @@ use common_test_util::temp_dir::{create_temp_dir, TempDir};
use puffin::puffin_manager::file_accessor::MockFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::BoundedStager;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
@@ -61,8 +61,7 @@ async fn test_search(
prefix: &str,
config: Config,
texts: Vec<&str>,
query: &str,
expected: impl IntoIterator<Item = RowId>,
query_expected: Vec<(&str, impl IntoIterator<Item = RowId>)>,
) {
let (_staging_dir, stager) = new_bounded_stager(prefix).await;
let file_accessor = Arc::new(MockFileAccessor::new(prefix));
@@ -72,14 +71,16 @@ async fn test_search(
let blob_key = "fulltext_index".to_string();
let mut writer = puffin_manager.writer(&file_name).await.unwrap();
create_index(prefix, &mut writer, &blob_key, texts, config).await;
writer.finish().await.unwrap();
let reader = puffin_manager.reader(&file_name).await.unwrap();
let index_dir = reader.dir(&blob_key).await.unwrap();
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path()).unwrap();
let results = searcher.search(query).await.unwrap();
let expected = expected.into_iter().collect::<BTreeSet<_>>();
assert_eq!(results, expected);
let searcher = TantivyFulltextIndexSearcher::new(index_dir.path(), config).unwrap();
for (query, expected) in query_expected {
let results = searcher.search(query).await.unwrap();
let expected = expected.into_iter().collect::<BTreeSet<_>>();
assert_eq!(results, expected);
}
}
#[tokio::test]
@@ -91,8 +92,7 @@ async fn test_simple_term() {
"This is a sample text containing Barack Obama",
"Another document mentioning Barack",
],
"Barack Obama",
[0, 1],
vec![("Barack Obama", [0, 1])],
)
.await;
}
@@ -103,8 +103,7 @@ async fn test_negative_term() {
"test_negative_term_",
Config::default(),
vec!["apple is a fruit", "I like apple", "fruit is healthy"],
"apple -fruit",
[1],
vec![("apple -fruit", [1])],
)
.await;
}
@@ -119,8 +118,7 @@ async fn test_must_term() {
"I love apples and fruits",
"apple and fruit are good",
],
"+apple +fruit",
[2],
vec![("+apple +fruit", [2])],
)
.await;
}
@@ -131,8 +129,7 @@ async fn test_boolean_operators() {
"test_boolean_operators_",
Config::default(),
vec!["a b c", "a b", "b c", "c"],
"a AND b OR c",
[0, 1, 2, 3],
vec![("a AND b OR c", [0, 1, 2, 3])],
)
.await;
}
@@ -146,8 +143,7 @@ async fn test_phrase_term() {
"This is a sample text containing Barack Obama",
"Another document mentioning Barack",
],
"\"Barack Obama\"",
[0],
vec![("\"Barack Obama\"", [0])],
)
.await;
}
@@ -161,8 +157,7 @@ async fn test_config_english_analyzer_case_insensitive() {
..Config::default()
},
vec!["Banana is a fruit", "I like apple", "Fruit is healthy"],
"banana",
[0],
vec![("banana", [0]), ("Banana", [0]), ("BANANA", [0])],
)
.await;
}
@@ -175,9 +170,8 @@ async fn test_config_english_analyzer_case_sensitive() {
case_sensitive: true,
..Config::default()
},
vec!["Banana is a fruit", "I like apple", "Fruit is healthy"],
"banana",
[],
vec!["Banana is a fruit", "I like banana", "Fruit is healthy"],
vec![("banana", [1]), ("Banana", [0])],
)
.await;
}
@@ -191,8 +185,7 @@ async fn test_config_chinese_analyzer() {
..Default::default()
},
vec!["苹果是一种水果", "我喜欢苹果", "水果很健康"],
"苹果",
[0, 1],
vec![("苹果", [0, 1])],
)
.await;
}

View File

@@ -17,9 +17,10 @@ use std::sync::Arc;
use common_telemetry::warn;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use index::fulltext_index::Config;
use object_store::ObjectStore;
use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
use puffin::puffin_manager::{BlobWithMetadata, DirGuard, PuffinManager, PuffinReader};
use puffin::puffin_manager::{GuardWithMetadata, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::{ColumnId, RegionId};
@@ -93,7 +94,7 @@ impl FulltextIndexApplier {
let mut row_ids: Option<BTreeSet<RowId>> = None;
for (column_id, request) in &self.requests {
if request.queries.is_empty() {
if request.queries.is_empty() && request.terms.is_empty() {
continue;
}
@@ -133,15 +134,21 @@ impl FulltextIndexApplier {
.dir(file_id, &blob_key, file_size_hint)
.await?;
let path = match &dir {
Some(dir) => dir.path(),
let dir = match &dir {
Some(dir) => dir,
None => {
return Ok(None);
}
};
let searcher = TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?;
let config = Config::from_blob_metadata(dir.metadata()).context(ApplyFulltextIndexSnafu)?;
let path = dir.path();
let searcher =
TantivyFulltextIndexSearcher::new(path, config).context(ApplyFulltextIndexSnafu)?;
let mut row_ids: Option<BTreeSet<RowId>> = None;
// 1. Apply queries
for query in &request.queries {
let result = searcher
.search(&query.0)
@@ -161,6 +168,21 @@ impl FulltextIndexApplier {
}
}
// 2. Apply terms
let query = request.terms_as_query(config.case_sensitive);
if !query.0.is_empty() {
let result = searcher
.search(&query.0)
.await
.context(ApplyFulltextIndexSnafu)?;
if let Some(ids) = row_ids.as_mut() {
ids.retain(|id| result.contains(id));
} else {
row_ids = Some(result);
}
}
Ok(row_ids)
}
}
@@ -217,7 +239,7 @@ impl IndexSource {
file_id: FileId,
key: &str,
file_size_hint: Option<u64>,
) -> Result<Option<BlobWithMetadata<SstPuffinBlob>>> {
) -> Result<Option<GuardWithMetadata<SstPuffinBlob>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
let res = reader.blob(key).await;
match res {
@@ -248,7 +270,7 @@ impl IndexSource {
file_id: FileId,
key: &str,
file_size_hint: Option<u64>,
) -> Result<Option<SstPuffinDir>> {
) -> Result<Option<GuardWithMetadata<SstPuffinDir>>> {
let (reader, fallbacked) = self.ensure_reader(file_id, file_size_hint).await?;
let res = reader.dir(key).await;
match res {

View File

@@ -30,12 +30,37 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// A request for fulltext index.
///
/// It contains all the queries and terms for a column.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct FulltextRequest {
pub queries: Vec<FulltextQuery>,
pub terms: Vec<FulltextTerm>,
}
impl FulltextRequest {
/// Convert terms to a query string.
///
/// For example, if the terms are ["foo", "bar"], the query string will be `r#"+"foo" +"bar""#`.
/// Need to escape the `"` in the term.
///
/// `skip_lowercased` is used for the situation that lowercased terms are not indexed.
pub fn terms_as_query(&self, skip_lowercased: bool) -> FulltextQuery {
let mut query = String::new();
for term in &self.terms {
if skip_lowercased && term.col_lowered {
continue;
}
// Escape the `"` in the term.
let escaped_term = term.term.replace("\"", "\\\"");
if query.is_empty() {
query = format!("+\"{escaped_term}\"");
} else {
query.push_str(&format!(" +\"{escaped_term}\""));
}
}
FulltextQuery(query)
}
}
/// A query to be matched in fulltext index.
///
/// `query` is the query to be matched, e.g. "+foo -bar" in `SELECT * FROM t WHERE matches(text, "+foo -bar")`.
@@ -543,4 +568,92 @@ mod tests {
}
);
}
#[test]
fn test_terms_as_query() {
// Test with empty terms
let request = FulltextRequest::default();
assert_eq!(request.terms_as_query(false), FulltextQuery(String::new()));
assert_eq!(request.terms_as_query(true), FulltextQuery(String::new()));
// Test with a single term (not lowercased)
let mut request = FulltextRequest::default();
request.terms.push(FulltextTerm {
col_lowered: false,
term: "foo".to_string(),
});
assert_eq!(
request.terms_as_query(false),
FulltextQuery("+\"foo\"".to_string())
);
assert_eq!(
request.terms_as_query(true),
FulltextQuery("+\"foo\"".to_string())
);
// Test with a single lowercased term and skip_lowercased=true
let mut request = FulltextRequest::default();
request.terms.push(FulltextTerm {
col_lowered: true,
term: "foo".to_string(),
});
assert_eq!(
request.terms_as_query(false),
FulltextQuery("+\"foo\"".to_string())
);
assert_eq!(request.terms_as_query(true), FulltextQuery(String::new())); // Should skip lowercased term
// Test with multiple terms, mix of lowercased and not
let mut request = FulltextRequest::default();
request.terms.push(FulltextTerm {
col_lowered: false,
term: "foo".to_string(),
});
request.terms.push(FulltextTerm {
col_lowered: true,
term: "bar".to_string(),
});
assert_eq!(
request.terms_as_query(false),
FulltextQuery("+\"foo\" +\"bar\"".to_string())
);
assert_eq!(
request.terms_as_query(true),
FulltextQuery("+\"foo\"".to_string()) // Only the non-lowercased term
);
// Test with term containing quotes that need escaping
let mut request = FulltextRequest::default();
request.terms.push(FulltextTerm {
col_lowered: false,
term: "foo\"bar".to_string(),
});
assert_eq!(
request.terms_as_query(false),
FulltextQuery("+\"foo\\\"bar\"".to_string())
);
// Test with a complex mix of terms
let mut request = FulltextRequest::default();
request.terms.push(FulltextTerm {
col_lowered: false,
term: "foo".to_string(),
});
request.terms.push(FulltextTerm {
col_lowered: true,
term: "bar\"quoted\"".to_string(),
});
request.terms.push(FulltextTerm {
col_lowered: false,
term: "baz\\escape".to_string(),
});
assert_eq!(
request.terms_as_query(false),
FulltextQuery("+\"foo\" +\"bar\\\"quoted\\\"\" +\"baz\\escape\"".to_string())
);
assert_eq!(
request.terms_as_query(true),
FulltextQuery("+\"foo\" +\"baz\\escape\"".to_string()) // Skips the lowercased term
);
}
}

View File

@@ -376,7 +376,9 @@ mod tests {
use crate::access_layer::RegionFilePathFactory;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::builder::{FulltextQuery, FulltextRequest};
use crate::sst::index::fulltext_index::applier::builder::{
FulltextQuery, FulltextRequest, FulltextTerm,
};
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -510,14 +512,25 @@ mod tests {
.unwrap()
}
async fn build_applier_factory(
/// Applier factory that can handle both queries and terms.
///
/// It builds a fulltext index with the given data rows, and returns a function
/// that can handle both queries and terms in a single request.
///
/// The function takes two parameters:
/// - `queries`: A list of (ColumnId, query_string) pairs for fulltext queries
/// - `terms`: A list of (ColumnId, [(bool, String)]) for fulltext terms, where bool indicates if term is lowercased
async fn build_fulltext_applier_factory(
prefix: &str,
rows: &[(
Option<&str>, // text_english_case_sensitive
Option<&str>, // text_english_case_insensitive
Option<&str>, // text_chinese
)],
) -> impl Fn(Vec<(ColumnId, &str)>) -> BoxFuture<'static, BTreeSet<RowId>> {
) -> impl Fn(
Vec<(ColumnId, &str)>,
Vec<(ColumnId, Vec<(bool, &str)>)>,
) -> BoxFuture<'static, Option<BTreeSet<RowId>>> {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
@@ -549,74 +562,253 @@ mod tests {
let _ = indexer.finish(&mut writer).await.unwrap();
writer.finish().await.unwrap();
move |queries| {
move |queries: Vec<(ColumnId, &str)>, terms_requests: Vec<(ColumnId, Vec<(bool, &str)>)>| {
let _d = &d;
let applier = FulltextIndexApplier::new(
region_dir.clone(),
region_metadata.region_id,
object_store.clone(),
queries
let region_dir = region_dir.clone();
let object_store = object_store.clone();
let factory = factory.clone();
let mut requests: HashMap<ColumnId, FulltextRequest> = HashMap::new();
// Add queries
for (column_id, query) in queries {
requests
.entry(column_id)
.or_default()
.queries
.push(FulltextQuery(query.to_string()));
}
// Add terms
for (column_id, terms) in terms_requests {
let fulltext_terms = terms
.into_iter()
.map(|(a, b)| {
(
a,
FulltextRequest {
queries: vec![FulltextQuery(b.to_string())],
terms: vec![],
},
)
.map(|(col_lowered, term)| FulltextTerm {
col_lowered,
term: term.to_string(),
})
.collect(),
factory.clone(),
.collect::<Vec<_>>();
requests
.entry(column_id)
.or_default()
.terms
.extend(fulltext_terms);
}
let applier = FulltextIndexApplier::new(
region_dir,
region_metadata.region_id,
object_store,
requests,
factory,
);
async move { applier.apply(sst_file_id, None).await.unwrap().unwrap() }.boxed()
async move { applier.apply(sst_file_id, None).await.unwrap() }.boxed()
}
}
fn rows(row_ids: impl IntoIterator<Item = RowId>) -> BTreeSet<RowId> {
row_ids.into_iter().collect()
}
#[tokio::test]
async fn test_fulltext_index_basic() {
let applier_factory = build_applier_factory(
"test_fulltext_index_basic_",
async fn test_fulltext_index_basic_case_sensitive() {
let applier_factory = build_fulltext_applier_factory(
"test_fulltext_index_basic_case_sensitive_",
&[
(Some("hello"), None, Some("你好")),
(Some("world"), Some("world"), None),
(None, Some("World"), Some("世界")),
(
Some("Hello, World"),
Some("Hello, World"),
Some("你好,世界"),
),
(Some("hello"), None, None),
(Some("world"), None, None),
(None, None, None),
(Some("Hello, World"), None, None),
],
)
.await;
let row_ids = applier_factory(vec![(1, "hello")]).await;
assert_eq!(row_ids, vec![0].into_iter().collect());
let row_ids = applier_factory(vec![(1, "hello")], vec![]).await;
assert_eq!(row_ids, Some(rows([0])));
let row_ids = applier_factory(vec![(1, "world")]).await;
assert_eq!(row_ids, vec![1].into_iter().collect());
let row_ids = applier_factory(vec![(1, "world")], vec![]).await;
assert_eq!(row_ids, Some(rows([1])));
let row_ids = applier_factory(vec![(2, "hello")]).await;
assert_eq!(row_ids, vec![3].into_iter().collect());
let row_ids = applier_factory(vec![(1, "Hello")], vec![]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids = applier_factory(vec![(2, "world")]).await;
assert_eq!(row_ids, vec![1, 2, 3].into_iter().collect());
let row_ids = applier_factory(vec![(1, "World")], vec![]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids = applier_factory(vec![(3, "你好")]).await;
assert_eq!(row_ids, vec![0, 3].into_iter().collect());
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "hello")])]).await;
assert_eq!(row_ids, Some(rows([0])));
let row_ids = applier_factory(vec![(3, "世界")]).await;
assert_eq!(row_ids, vec![2, 3].into_iter().collect());
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "hello")])]).await;
assert_eq!(row_ids, None);
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "world")])]).await;
assert_eq!(row_ids, Some(rows([1])));
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "world")])]).await;
assert_eq!(row_ids, None);
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello")])]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello")])]).await;
assert_eq!(row_ids, None);
let row_ids = applier_factory(vec![], vec![(1, vec![(false, "Hello, World")])]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids = applier_factory(vec![], vec![(1, vec![(true, "Hello, World")])]).await;
assert_eq!(row_ids, None);
}
#[tokio::test]
async fn test_fulltext_index_basic_case_insensitive() {
let applier_factory = build_fulltext_applier_factory(
"test_fulltext_index_basic_case_insensitive_",
&[
(None, Some("hello"), None),
(None, None, None),
(None, Some("world"), None),
(None, Some("Hello, World"), None),
],
)
.await;
let row_ids = applier_factory(vec![(2, "hello")], vec![]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![(2, "world")], vec![]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
let row_ids = applier_factory(vec![(2, "Hello")], vec![]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![(2, "World")], vec![]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "hello")])]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "hello")])]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "world")])]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "world")])]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "Hello")])]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "Hello")])]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(false, "World")])]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
let row_ids = applier_factory(vec![], vec![(2, vec![(true, "World")])]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
}
#[tokio::test]
async fn test_fulltext_index_basic_chinese() {
let applier_factory = build_fulltext_applier_factory(
"test_fulltext_index_basic_chinese_",
&[
(None, None, Some("你好")),
(None, None, None),
(None, None, Some("世界")),
(None, None, Some("你好,世界")),
],
)
.await;
let row_ids = applier_factory(vec![(3, "你好")], vec![]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![(3, "世界")], vec![]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
let row_ids = applier_factory(vec![], vec![(3, vec![(false, "你好")])]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids = applier_factory(vec![], vec![(3, vec![(false, "世界")])]).await;
assert_eq!(row_ids, Some(rows([2, 3])));
}
#[tokio::test]
async fn test_fulltext_index_multi_terms_case_sensitive() {
let applier_factory = build_fulltext_applier_factory(
"test_fulltext_index_multi_terms_case_sensitive_",
&[
(Some("Hello"), None, None),
(Some("World"), None, None),
(None, None, None),
(Some("Hello, World"), None, None),
],
)
.await;
let row_ids =
applier_factory(vec![], vec![(1, vec![(false, "hello"), (false, "world")])]).await;
assert_eq!(row_ids, Some(rows([])));
let row_ids =
applier_factory(vec![], vec![(1, vec![(false, "Hello"), (false, "World")])]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids =
applier_factory(vec![], vec![(1, vec![(true, "Hello"), (false, "World")])]).await;
assert_eq!(row_ids, Some(rows([1, 3])));
let row_ids =
applier_factory(vec![], vec![(1, vec![(false, "Hello"), (true, "World")])]).await;
assert_eq!(row_ids, Some(rows([0, 3])));
let row_ids =
applier_factory(vec![], vec![(1, vec![(true, "Hello"), (true, "World")])]).await;
assert_eq!(row_ids, None);
}
#[tokio::test]
async fn test_fulltext_index_multi_terms_case_insensitive() {
let applier_factory = build_fulltext_applier_factory(
"test_fulltext_index_multi_terms_case_insensitive_",
&[
(None, Some("hello"), None),
(None, None, None),
(None, Some("world"), None),
(None, Some("Hello, World"), None),
],
)
.await;
let row_ids =
applier_factory(vec![], vec![(2, vec![(false, "hello"), (false, "world")])]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids =
applier_factory(vec![], vec![(2, vec![(true, "hello"), (false, "world")])]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids =
applier_factory(vec![], vec![(2, vec![(false, "hello"), (true, "world")])]).await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids =
applier_factory(vec![], vec![(2, vec![(true, "hello"), (true, "world")])]).await;
assert_eq!(row_ids, Some(rows([3])));
}
#[tokio::test]
async fn test_fulltext_index_multi_columns() {
let applier_factory = build_applier_factory(
let applier_factory = build_fulltext_applier_factory(
"test_fulltext_index_multi_columns_",
&[
(Some("hello"), None, Some("你好")),
(Some("world"), Some("world"), None),
(Some("Hello"), None, Some("你好")),
(Some("World"), Some("world"), None),
(None, Some("World"), Some("世界")),
(
Some("Hello, World"),
@@ -627,13 +819,14 @@ mod tests {
)
.await;
let row_ids = applier_factory(vec![(1, "hello"), (3, "你好")]).await;
assert_eq!(row_ids, vec![0].into_iter().collect());
let row_ids = applier_factory(
vec![(1, "Hello"), (3, "你好")],
vec![(2, vec![(false, "world")])],
)
.await;
assert_eq!(row_ids, Some(rows([3])));
let row_ids = applier_factory(vec![(1, "world"), (3, "世界")]).await;
assert_eq!(row_ids, vec![].into_iter().collect());
let row_ids = applier_factory(vec![(2, "world"), (3, "世界")]).await;
assert_eq!(row_ids, vec![2, 3].into_iter().collect());
let row_ids = applier_factory(vec![(2, "World")], vec![(1, vec![(false, "World")])]).await;
assert_eq!(row_ids, Some(rows([1, 3])));
}
}

View File

@@ -174,12 +174,13 @@ impl PuffinFileAccessor for ObjectStorePuffinFileAccessor {
#[cfg(test)]
mod tests {
use common_base::range_read::RangeReader;
use common_test_util::temp_dir::create_temp_dir;
use futures::io::Cursor;
use object_store::services::Memory;
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use puffin::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use super::*;
@@ -229,6 +230,7 @@ mod tests {
PutOptions {
compression: Some(CompressionCodec::Zstd),
},
Default::default(),
)
.await
.unwrap();

View File

@@ -65,7 +65,13 @@ pub trait PuffinWriter {
/// Returns the number of bytes written.
///
/// The specified `dir` should be accessible from the filesystem.
async fn put_dir(&mut self, key: &str, dir: PathBuf, options: PutOptions) -> Result<u64>;
async fn put_dir(
&mut self,
key: &str,
dir: PathBuf,
options: PutOptions,
properties: HashMap<String, String>,
) -> Result<u64>;
/// Sets whether the footer should be LZ4 compressed.
fn set_footer_lz4_compressed(&mut self, lz4_compressed: bool);
@@ -94,15 +100,15 @@ pub trait PuffinReader {
/// Reads a blob from the Puffin file.
///
/// The returned `BlobWithMetadata` is used to access the blob data and its metadata.
/// Users should hold the `BlobWithMetadata` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<BlobWithMetadata<Self::Blob>>;
/// The returned `GuardWithMetadata` is used to access the blob data and its metadata.
/// Users should hold the `GuardWithMetadata` until they are done with the blob data.
async fn blob(&self, key: &str) -> Result<GuardWithMetadata<Self::Blob>>;
/// Reads a directory from the Puffin file.
///
/// The returned `DirGuard` is used to access the directory in the filesystem.
/// The caller is responsible for holding the `DirGuard` until they are done with the directory.
async fn dir(&self, key: &str) -> Result<Self::Dir>;
/// The returned `GuardWithMetadata` is used to access the directory data and its metadata.
/// Users should hold the `GuardWithMetadata` until they are done with the directory data.
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>>;
}
/// `BlobGuard` is provided by the `PuffinReader` to access the blob data.
@@ -114,32 +120,41 @@ pub trait BlobGuard {
async fn reader(&self) -> Result<Self::Reader>;
}
/// `BlobWithMetadata` provides access to the blob data and its metadata.
pub struct BlobWithMetadata<B> {
blob: B,
metadata: BlobMetadata,
}
impl<B: BlobGuard> BlobWithMetadata<B> {
/// Creates a new `BlobWithMetadata` instance.
pub fn new(blob: B, metadata: BlobMetadata) -> Self {
Self { blob, metadata }
}
/// Returns the reader for the blob data.
pub async fn reader(&self) -> Result<B::Reader> {
self.blob.reader().await
}
/// Returns the metadata of the blob.
pub fn metadata(&self) -> &BlobMetadata {
&self.metadata
}
}
/// `DirGuard` is provided by the `PuffinReader` to access the directory in the filesystem.
/// Users should hold the `DirGuard` until they are done with the directory.
#[auto_impl::auto_impl(Arc)]
pub trait DirGuard {
fn path(&self) -> &PathBuf;
}
/// `GuardWithMetadata` provides access to the blob or directory data and its metadata.
pub struct GuardWithMetadata<G> {
guard: G,
metadata: BlobMetadata,
}
impl<G> GuardWithMetadata<G> {
/// Creates a new `GuardWithMetadata` instance.
pub fn new(guard: G, metadata: BlobMetadata) -> Self {
Self { guard, metadata }
}
/// Returns the metadata of the directory.
pub fn metadata(&self) -> &BlobMetadata {
&self.metadata
}
}
impl<G: BlobGuard> GuardWithMetadata<G> {
/// Returns the reader for the blob data.
pub async fn reader(&self) -> Result<G::Reader> {
self.guard.reader().await
}
}
impl<G: DirGuard> GuardWithMetadata<G> {
/// Returns the path of the directory.
pub fn path(&self) -> &PathBuf {
self.guard.path()
}
}

View File

@@ -36,7 +36,7 @@ use crate::puffin_manager::file_accessor::PuffinFileAccessor;
use crate::puffin_manager::fs_puffin_manager::dir_meta::DirMetadata;
use crate::puffin_manager::fs_puffin_manager::PuffinMetadataCacheRef;
use crate::puffin_manager::stager::{BoxWriter, DirWriterProviderRef, Stager};
use crate::puffin_manager::{BlobGuard, BlobWithMetadata, PuffinReader};
use crate::puffin_manager::{BlobGuard, GuardWithMetadata, PuffinReader};
/// `FsPuffinReader` is a `PuffinReader` that provides fs readers for puffin files.
pub struct FsPuffinReader<S, F>
@@ -96,26 +96,13 @@ where
}
async fn metadata(&self) -> Result<Arc<FileMetadata>> {
let reader = self.puffin_file_accessor.reader(&self.handle).await?;
let mut file = PuffinFileReader::new(reader);
let mut file = self.puffin_reader().await?;
self.get_puffin_file_metadata(&mut file).await
}
async fn blob(&self, key: &str) -> Result<BlobWithMetadata<Self::Blob>> {
let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);
let metadata = self.get_puffin_file_metadata(&mut file).await?;
let blob_metadata = metadata
.blobs
.iter()
.find(|m| m.blob_type == key)
.context(BlobNotFoundSnafu { blob: key })?
.clone();
async fn blob(&self, key: &str) -> Result<GuardWithMetadata<Self::Blob>> {
let mut file = self.puffin_reader().await?;
let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
let blob = if blob_metadata.compression_codec.is_none() {
// If the blob is not compressed, we can directly read it from the puffin file.
Either::L(RandomReadBlob {
@@ -140,28 +127,33 @@ where
Either::R(staged_blob)
};
Ok(BlobWithMetadata::new(blob, blob_metadata))
Ok(GuardWithMetadata::new(blob, blob_metadata))
}
async fn dir(&self, key: &str) -> Result<Self::Dir> {
self.stager
async fn dir(&self, key: &str) -> Result<GuardWithMetadata<Self::Dir>> {
let mut file = self.puffin_reader().await?;
let blob_metadata = self.get_blob_metadata(key, &mut file).await?;
let dir = self
.stager
.get_dir(
&self.handle,
key,
Box::new(|writer_provider| {
let accessor = self.puffin_file_accessor.clone();
let handle = self.handle.clone();
let key = key.to_string();
let blob_metadata = blob_metadata.clone();
Box::pin(Self::init_dir_to_stager(
file,
blob_metadata,
handle,
key,
writer_provider,
accessor,
self.file_size_hint,
))
}),
)
.await
.await?;
Ok(GuardWithMetadata::new(dir, blob_metadata))
}
}
@@ -188,6 +180,30 @@ where
Ok(metadata)
}
async fn get_blob_metadata(
&self,
key: &str,
file: &mut PuffinFileReader<F::Reader>,
) -> Result<BlobMetadata> {
let metadata = self.get_puffin_file_metadata(file).await?;
let blob_metadata = metadata
.blobs
.iter()
.find(|m| m.blob_type == key)
.context(BlobNotFoundSnafu { blob: key })?
.clone();
Ok(blob_metadata)
}
async fn puffin_reader(&self) -> Result<PuffinFileReader<F::Reader>> {
let mut reader = self.puffin_file_accessor.reader(&self.handle).await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
Ok(PuffinFileReader::new(reader))
}
async fn init_blob_to_stager(
reader: PuffinFileReader<F::Reader>,
blob_metadata: BlobMetadata,
@@ -201,26 +217,14 @@ where
}
async fn init_dir_to_stager(
mut file: PuffinFileReader<F::Reader>,
blob_metadata: BlobMetadata,
handle: F::FileHandle,
key: String,
writer_provider: DirWriterProviderRef,
accessor: F,
file_size_hint: Option<u64>,
) -> Result<u64> {
let mut reader = accessor.reader(&handle).await?;
if let Some(file_size_hint) = file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);
let puffin_metadata = file.metadata().await?;
let blob_metadata = puffin_metadata
.blobs
.iter()
.find(|m| m.blob_type == key.as_str())
.context(BlobNotFoundSnafu { blob: key })?;
let reader = file.blob_reader(blob_metadata)?;
let reader = file.blob_reader(&blob_metadata)?;
let meta = reader.metadata().await.context(MetadataSnafu)?;
let buf = reader
.read(0..meta.content_length)

View File

@@ -88,7 +88,13 @@ where
Ok(written_bytes)
}
async fn put_dir(&mut self, key: &str, dir_path: PathBuf, options: PutOptions) -> Result<u64> {
async fn put_dir(
&mut self,
key: &str,
dir_path: PathBuf,
options: PutOptions,
properties: HashMap<String, String>,
) -> Result<u64> {
ensure!(
!self.blob_keys.contains(key),
DuplicateBlobSnafu { blob: key }
@@ -150,7 +156,7 @@ where
blob_type: key.to_string(),
compressed_data: encoded.as_slice(),
compression_codec: None,
properties: Default::default(),
properties,
};
written_bytes += self.puffin_file_writer.add_blob(dir_meta_blob).await?;

View File

@@ -23,7 +23,7 @@ use crate::blob_metadata::CompressionCodec;
use crate::puffin_manager::file_accessor::MockFileAccessor;
use crate::puffin_manager::fs_puffin_manager::FsPuffinManager;
use crate::puffin_manager::stager::BoundedStager;
use crate::puffin_manager::{DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions};
use crate::puffin_manager::{PuffinManager, PuffinReader, PuffinWriter, PutOptions};
async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc<BoundedStager<String>>) {
let staging_dir = create_temp_dir(prefix);
@@ -343,6 +343,7 @@ async fn put_dir(
PutOptions {
compression: compression_codec,
},
HashMap::from_iter([("test_key".to_string(), "test_value".to_string())]),
)
.await
.unwrap();
@@ -356,6 +357,11 @@ async fn check_dir(
puffin_reader: &impl PuffinReader,
) {
let res_dir = puffin_reader.dir(key).await.unwrap();
let metadata = res_dir.metadata();
assert_eq!(
metadata.properties,
HashMap::from_iter([("test_key".to_string(), "test_value".to_string())])
);
for (file_name, raw_data) in files_in_dir {
let file_path = if cfg!(windows) {
res_dir.path().join(file_name.replace('/', "\\"))