feat(fulltext_index): integrate full-text indexer with parquet reader (#4348)

* feat(fulltext_index): integrate full-text indexer with parquet reader

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* disable reload

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* address comments

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: range allow exceeding total row

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: unit tests in index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: prune row groups

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: rename creator

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: sst fulltext index

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-15 16:14:44 +08:00
committed by GitHub
parent 64cad4e891
commit 04ac0c8da0
30 changed files with 1778 additions and 190 deletions

2
Cargo.lock generated
View File

@@ -6274,6 +6274,7 @@ dependencies = [
"common-datasource",
"common-decimal",
"common-error",
"common-function",
"common-macro",
"common-procedure-test",
"common-query",
@@ -6315,6 +6316,7 @@ dependencies = [
"serde",
"serde_json",
"serde_with",
"session",
"smallvec",
"snafu 0.8.3",
"store-api",

View File

@@ -16,6 +16,10 @@ use serde::{Deserialize, Serialize};
pub mod create;
pub mod error;
pub mod search;
#[cfg(test)]
mod tests;
/// Configuration for fulltext index.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]

View File

@@ -15,7 +15,7 @@
mod tantivy;
use async_trait::async_trait;
pub use tantivy::TantivyFulltextIndexCreator;
pub use tantivy::{TantivyFulltextIndexCreator, TEXT_FIELD_NAME};
use crate::fulltext_index::error::Result;

View File

@@ -40,6 +40,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Tantivy parser error"))]
TantivyParser {
#[snafu(source)]
error: tantivy::query::QueryParserError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Operate on a finished creator"))]
Finished {
#[snafu(implicit)]
@@ -60,6 +68,8 @@ impl ErrorExt for Error {
match self {
Tantivy { .. } => StatusCode::Internal,
TantivyParser { .. } => StatusCode::InvalidSyntax,
Io { .. } | Finished { .. } => StatusCode::Unexpected,
External { source, .. } => source.status_code(),

View File

@@ -0,0 +1,30 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod tantivy;
use std::collections::BTreeSet;
use async_trait::async_trait;
use crate::fulltext_index::error::Result;
pub use crate::fulltext_index::search::tantivy::TantivyFulltextIndexSearcher;
pub type RowId = u32;
/// `FulltextIndexSearcher` is a trait for searching fulltext index.
#[async_trait]
pub trait FulltextIndexSearcher {
async fn search(&self, query: &str) -> Result<BTreeSet<RowId>>;
}

View File

@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::path::Path;
use std::time::Instant;
use async_trait::async_trait;
use common_telemetry::debug;
use snafu::ResultExt;
use tantivy::collector::DocSetCollector;
use tantivy::query::QueryParser;
use tantivy::schema::Field;
use tantivy::{Index, IndexReader, ReloadPolicy};
use crate::fulltext_index::create::TEXT_FIELD_NAME;
use crate::fulltext_index::error::{Result, TantivyParserSnafu, TantivySnafu};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId};
/// `TantivyFulltextIndexSearcher` is a searcher using Tantivy.
pub struct TantivyFulltextIndexSearcher {
/// Tanitvy index.
index: Index,
/// Tanitvy index reader.
reader: IndexReader,
/// The default field used to build `QueryParser`
default_field: Field,
}
impl TantivyFulltextIndexSearcher {
/// Creates a new `TantivyFulltextIndexSearcher`.
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let now = Instant::now();
let index = Index::open_in_dir(path.as_ref()).context(TantivySnafu)?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)
.num_warming_threads(0)
.try_into()
.context(TantivySnafu)?;
let default_field = index
.schema()
.get_field(TEXT_FIELD_NAME)
.context(TantivySnafu)?;
debug!(
"Opened tantivy index on {:?} in {:?}",
path.as_ref(),
now.elapsed()
);
Ok(Self {
index,
reader,
default_field,
})
}
}
#[async_trait]
impl FulltextIndexSearcher for TantivyFulltextIndexSearcher {
async fn search(&self, query: &str) -> Result<BTreeSet<RowId>> {
let searcher = self.reader.searcher();
let query_parser = QueryParser::for_index(&self.index, vec![self.default_field]);
let query = query_parser
.parse_query(query)
.context(TantivyParserSnafu)?;
let docs = searcher
.search(&query, &DocSetCollector)
.context(TantivySnafu)?;
Ok(docs.into_iter().map(|d| d.doc_id).collect())
}
}

View File

@@ -0,0 +1,167 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use crate::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator};
use crate::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use crate::fulltext_index::{Analyzer, Config};
async fn create_index(prefix: &str, texts: Vec<&str>, config: Config) -> TempDir {
let tempdir = create_temp_dir(prefix);
let mut creator = TantivyFulltextIndexCreator::new(tempdir.path(), config, 1024 * 1024)
.await
.unwrap();
for text in texts {
creator.push_text(text).await.unwrap();
}
creator.finish().await.unwrap();
tempdir
}
async fn test_search(
prefix: &str,
config: Config,
texts: Vec<&str>,
query: &str,
expected: impl IntoIterator<Item = RowId>,
) {
let index_path = create_index(prefix, texts, config).await;
let searcher = TantivyFulltextIndexSearcher::new(index_path.path()).unwrap();
let results = searcher.search(query).await.unwrap();
let expected = expected.into_iter().collect::<BTreeSet<_>>();
assert_eq!(results, expected);
}
#[tokio::test]
async fn test_simple_term() {
test_search(
"test_simple_term_",
Config::default(),
vec![
"This is a sample text containing Barack Obama",
"Another document mentioning Barack",
],
"Barack Obama",
[0, 1],
)
.await;
}
#[tokio::test]
async fn test_negative_term() {
test_search(
"test_negative_term_",
Config::default(),
vec!["apple is a fruit", "I like apple", "fruit is healthy"],
"apple -fruit",
[1],
)
.await;
}
#[tokio::test]
async fn test_must_term() {
test_search(
"test_must_term_",
Config::default(),
vec![
"apple is tasty",
"I love apples and fruits",
"apple and fruit are good",
],
"+apple +fruit",
[2],
)
.await;
}
#[tokio::test]
async fn test_boolean_operators() {
test_search(
"test_boolean_operators_",
Config::default(),
vec!["a b c", "a b", "b c", "c"],
"a AND b OR c",
[0, 1, 2, 3],
)
.await;
}
#[tokio::test]
async fn test_phrase_term() {
test_search(
"test_phrase_term_",
Config::default(),
vec![
"This is a sample text containing Barack Obama",
"Another document mentioning Barack",
],
"\"Barack Obama\"",
[0],
)
.await;
}
#[tokio::test]
async fn test_config_english_analyzer_case_insensitive() {
test_search(
"test_config_english_analyzer_case_insensitive_",
Config {
case_sensitive: false,
..Config::default()
},
vec!["Banana is a fruit", "I like apple", "Fruit is healthy"],
"banana",
[0],
)
.await;
}
#[tokio::test]
async fn test_config_english_analyzer_case_sensitive() {
test_search(
"test_config_english_analyzer_case_sensitive_",
Config {
case_sensitive: true,
..Config::default()
},
vec!["Banana is a fruit", "I like apple", "Fruit is healthy"],
"banana",
[],
)
.await;
}
#[tokio::test]
async fn test_config_chinese_analyzer() {
test_search(
"test_config_chinese_analyzer_",
Config {
analyzer: Analyzer::Chinese,
..Default::default()
},
vec!["苹果是一种水果", "我喜欢苹果", "水果很健康"],
"苹果",
[0, 1],
)
.await;
}

View File

@@ -73,6 +73,7 @@ tokio-util.workspace = true
uuid.workspace = true
[dev-dependencies]
common-function.workspace = true
common-procedure-test.workspace = true
common-test-util.workspace = true
criterion = "0.4"
@@ -82,6 +83,7 @@ object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
rstest.workspace = true
rstest_reuse.workspace = true
session.workspace = true
toml.workspace = true
[[bench]]

View File

@@ -431,6 +431,7 @@ impl EngineInner {
)
.with_parallelism(scan_parallelism)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
Ok(scan_region)

View File

@@ -556,8 +556,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to apply index"))]
ApplyIndex {
#[snafu(display("Failed to apply inverted index"))]
ApplyInvertedIndex {
source: index::inverted_index::error::Error,
#[snafu(implicit)]
location: Location,
@@ -821,6 +821,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to apply fulltext index"))]
ApplyFulltextIndex {
source: index::fulltext_index::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -924,7 +931,7 @@ impl ErrorExt for Error {
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. }
| PushIndexValue { source, .. }
| ApplyIndex { source, .. }
| ApplyInvertedIndex { source, .. }
| IndexFinish { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
@@ -948,7 +955,9 @@ impl ErrorExt for Error {
FulltextOptions { source, .. } => source.status_code(),
CreateFulltextCreator { source, .. } => source.status_code(),
CastVector { source, .. } => source.status_code(),
FulltextPushText { source, .. } | FulltextFinish { source, .. } => source.status_code(),
FulltextPushText { source, .. }
| FulltextFinish { source, .. }
| ApplyFulltextIndex { source, .. } => source.status_code(),
}
}

View File

@@ -194,9 +194,10 @@ lazy_static! {
// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_ELAPSED: Histogram = register_histogram!(
pub static ref INDEX_APPLY_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_apply_elapsed",
"index apply elapsed",
&[TYPE_LABEL],
)
.unwrap();
/// Gauge of index apply memory usage.

View File

@@ -45,8 +45,10 @@ use crate::read::{Batch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::file::{overlaps, FileHandle, FileMeta};
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::SstIndexApplierRef;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
/// A scanner scans a region and returns a [SendableRecordBatchStream].
@@ -165,6 +167,8 @@ pub(crate) struct ScanRegion {
parallelism: ScanParallism,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Whether to ignore fulltext index.
ignore_fulltext_index: bool,
/// Start time of the scan task.
start_time: Option<Instant>,
}
@@ -184,6 +188,7 @@ impl ScanRegion {
cache_manager,
parallelism: ScanParallism::default(),
ignore_inverted_index: false,
ignore_fulltext_index: false,
start_time: None,
}
}
@@ -195,12 +200,20 @@ impl ScanRegion {
self
}
/// Sets whether to ignore inverted index.
#[must_use]
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
self.ignore_inverted_index = ignore;
self
}
/// Sets whether to ignore fulltext index.
#[must_use]
pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
self.ignore_fulltext_index = ignore;
self
}
#[must_use]
pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
self.start_time = Some(now);
@@ -278,7 +291,8 @@ impl ScanRegion {
self.version.options.append_mode,
);
let index_applier = self.build_index_applier();
let inverted_index_applier = self.build_invereted_index_applier();
let fulltext_index_applier = self.build_fulltext_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
@@ -292,7 +306,8 @@ impl ScanRegion {
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_manager)
.with_index_applier(index_applier)
.with_inverted_index_applier(inverted_index_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallelism(self.parallelism)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
@@ -318,8 +333,8 @@ impl ScanRegion {
)
}
/// Use the latest schema to build the index applier.
fn build_index_applier(&self) -> Option<SstIndexApplierRef> {
/// Use the latest schema to build the inveretd index applier.
fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
if self.ignore_inverted_index {
return None;
}
@@ -337,7 +352,7 @@ impl ScanRegion {
.and_then(|c| c.index_cache())
.cloned();
SstIndexApplierBuilder::new(
InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
file_cache,
@@ -354,7 +369,26 @@ impl ScanRegion {
self.access_layer.puffin_manager_factory().clone(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build index applier"))
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
.ok()
.flatten()
.map(Arc::new)
}
/// Use the latest schema to build the fulltext index applier.
fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
if self.ignore_fulltext_index {
return None;
}
FulltextIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
self.version.metadata.as_ref(),
)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
.ok()
.flatten()
.map(Arc::new)
@@ -401,8 +435,9 @@ pub(crate) struct ScanInput {
ignore_file_not_found: bool,
/// Parallelism to scan data.
pub(crate) parallelism: ScanParallism,
/// Index applier.
index_applier: Option<SstIndexApplierRef>,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
/// Start time of the query.
pub(crate) query_start: Option<Instant>,
/// The region is using append mode.
@@ -429,7 +464,8 @@ impl ScanInput {
cache_manager: None,
ignore_file_not_found: false,
parallelism: ScanParallism::default(),
index_applier: None,
inverted_index_applier: None,
fulltext_index_applier: None,
query_start: None,
append_mode: false,
filter_deleted: true,
@@ -487,10 +523,23 @@ impl ScanInput {
self
}
/// Sets index applier.
/// Sets invereted index applier.
#[must_use]
pub(crate) fn with_index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
self.index_applier = index_applier;
pub(crate) fn with_inverted_index_applier(
mut self,
applier: Option<InvertedIndexApplierRef>,
) -> Self {
self.inverted_index_applier = applier;
self
}
/// Sets fulltext index applier.
#[must_use]
pub(crate) fn with_fulltext_index_applier(
mut self,
applier: Option<FulltextIndexApplierRef>,
) -> Self {
self.fulltext_index_applier = applier;
self
}
@@ -566,7 +615,8 @@ impl ScanInput {
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.await;

View File

@@ -34,9 +34,9 @@ use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer;
use crate::sst::index::inverted_index::creator::InvertedIndexer;
pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod applier;
pub(crate) mod creator;
const INDEX_BLOB_TYPE: &str = "greptime-fulltext-index-v1";

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeSet;
use std::sync::Arc;
use index::fulltext_index::search::{FulltextIndexSearcher, RowId, TantivyFulltextIndexSearcher};
use object_store::ObjectStore;
use puffin::puffin_manager::{DirGuard, PuffinManager, PuffinReader};
use snafu::ResultExt;
use store_api::storage::ColumnId;
use crate::error::{ApplyFulltextIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::INDEX_APPLY_ELAPSED;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinDir};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
use crate::sst::location;
pub mod builder;
/// `FulltextIndexApplier` is responsible for applying fulltext index to the provided SST files
pub struct FulltextIndexApplier {
/// The root directory of the region.
region_dir: String,
/// Queries to apply to the index.
queries: Vec<(ColumnId, String)>,
/// The puffin manager factory.
puffin_manager_factory: PuffinManagerFactory,
/// Store responsible for accessing index files.
store: ObjectStore,
}
pub type FulltextIndexApplierRef = Arc<FulltextIndexApplier>;
impl FulltextIndexApplier {
/// Creates a new `FulltextIndexApplier`.
pub fn new(
region_dir: String,
store: ObjectStore,
queries: Vec<(ColumnId, String)>,
puffin_manager_factory: PuffinManagerFactory,
) -> Self {
Self {
region_dir,
store,
queries,
puffin_manager_factory,
}
}
/// Applies the queries to the fulltext index of the specified SST file.
pub async fn apply(&self, file_id: FileId) -> Result<BTreeSet<RowId>> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_FULLTEXT_INDEX])
.start_timer();
let mut inited = false;
let mut row_ids = BTreeSet::new();
for (column_id, query) in &self.queries {
let dir = self.index_dir_path(file_id, *column_id).await?;
let path = match &dir {
Some(dir) => dir.path(),
None => {
// Return empty set if the index not found.
return Ok(BTreeSet::new());
}
};
let searcher =
TantivyFulltextIndexSearcher::new(path).context(ApplyFulltextIndexSnafu)?;
let result = searcher
.search(query)
.await
.context(ApplyFulltextIndexSnafu)?;
if !inited {
row_ids = result;
inited = true;
continue;
}
row_ids.retain(|id| result.contains(id));
if row_ids.is_empty() {
break;
}
}
Ok(row_ids)
}
/// Returns `None` if the index not found.
async fn index_dir_path(
&self,
file_id: FileId,
column_id: ColumnId,
) -> Result<Option<SstPuffinDir>> {
let puffin_manager = self.puffin_manager_factory.build(self.store.clone());
let file_path = location::index_file_path(&self.region_dir, file_id);
match puffin_manager
.reader(&file_path)
.await
.context(PuffinBuildReaderSnafu)?
.dir(&format!("{INDEX_BLOB_TYPE}-{column_id}"))
.await
{
Ok(dir) => Ok(Some(dir)),
Err(puffin::error::Error::BlobNotFound { .. }) => Ok(None),
Err(err) => Err(err).context(PuffinReadBlobSnafu),
}
}
}

View File

@@ -0,0 +1,234 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion_common::ScalarValue;
use datafusion_expr::Expr;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadata;
use store_api::storage::{ColumnId, ConcreteDataType};
use crate::error::Result;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// `FulltextIndexApplierBuilder` is a builder for `FulltextIndexApplier`.
pub struct FulltextIndexApplierBuilder<'a> {
region_dir: String,
store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
metadata: &'a RegionMetadata,
}
impl<'a> FulltextIndexApplierBuilder<'a> {
/// Creates a new `FulltextIndexApplierBuilder`.
pub fn new(
region_dir: String,
store: ObjectStore,
puffin_manager_factory: PuffinManagerFactory,
metadata: &'a RegionMetadata,
) -> Self {
Self {
region_dir,
store,
puffin_manager_factory,
metadata,
}
}
/// Builds `SstIndexApplier` from the given expressions.
pub fn build(self, exprs: &[Expr]) -> Result<Option<FulltextIndexApplier>> {
let mut queries = Vec::with_capacity(exprs.len());
for expr in exprs {
if let Some((column_id, query)) = Self::expr_to_query(self.metadata, expr) {
queries.push((column_id, query));
}
}
Ok((!queries.is_empty()).then(|| {
FulltextIndexApplier::new(
self.region_dir,
self.store,
queries,
self.puffin_manager_factory,
)
}))
}
fn expr_to_query(metadata: &RegionMetadata, expr: &Expr) -> Option<(ColumnId, String)> {
let Expr::ScalarFunction(f) = expr else {
return None;
};
if f.name() != "matches" {
return None;
}
if f.args.len() != 2 {
return None;
}
let Expr::Column(c) = &f.args[0] else {
return None;
};
let column = metadata.column_by_name(&c.name)?;
if column.column_schema.data_type != ConcreteDataType::string_datatype() {
return None;
}
let Expr::Literal(ScalarValue::Utf8(Some(query))) = &f.args[1] else {
return None;
};
Some((column.column_id, query.to_string()))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::scalars::udf::create_udf;
use datafusion_common::Column;
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::ScalarUDF;
use datatypes::schema::ColumnSchema;
use session::context::QueryContext;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
fn mock_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("text", ConcreteDataType::string_datatype(), true),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
});
builder.build().unwrap()
}
fn matches_func() -> Arc<ScalarUDF> {
Arc::new(
create_udf(
FUNCTION_REGISTRY.get_function("matches").unwrap(),
QueryContext::arc(),
Default::default(),
)
.into(),
)
}
#[test]
fn test_expr_to_query_basic() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
let (column_id, query) =
FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).unwrap();
assert_eq!(column_id, 1);
assert_eq!(query, "foo".to_string());
}
#[test]
fn test_expr_to_query_wrong_num_args() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
args: vec![Expr::Column(Column {
name: "text".to_string(),
relation: None,
})],
func: matches_func(),
});
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
}
#[test]
fn test_expr_to_query_not_found_column() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "not_found".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
}
#[test]
fn test_expr_to_query_column_wrong_data_type() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "ts".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Utf8(Some("foo".to_string()))),
],
func: matches_func(),
});
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
}
#[test]
fn test_expr_to_query_pattern_not_string() {
let metadata = mock_metadata();
let expr = Expr::ScalarFunction(ScalarFunction {
args: vec![
Expr::Column(Column {
name: "text".to_string(),
relation: None,
}),
Expr::Literal(ScalarValue::Int64(Some(42))),
],
func: matches_func(),
});
assert!(FulltextIndexApplierBuilder::expr_to_query(&metadata, &expr).is_none());
}
}

View File

@@ -38,8 +38,8 @@ use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
use crate::sst::index::TYPE_FULLTEXT_INDEX;
/// `SstIndexCreator` is responsible for creating fulltext indexes for SST files.
pub struct SstIndexCreator {
/// `FulltextIndexer` is responsible for creating fulltext indexes for SST files.
pub struct FulltextIndexer {
/// Creators for each column.
creators: HashMap<ColumnId, SingleCreator>,
/// Whether the index creation was aborted.
@@ -48,8 +48,8 @@ pub struct SstIndexCreator {
stats: Statistics,
}
impl SstIndexCreator {
/// Creates a new `SstIndexCreator`.
impl FulltextIndexer {
/// Creates a new `FulltextIndexer`.
pub async fn new(
region_id: &RegionId,
sst_file_id: &FileId,
@@ -173,7 +173,7 @@ impl SstIndexCreator {
}
}
impl SstIndexCreator {
impl FulltextIndexer {
async fn do_update(&mut self, batch: &Batch) -> Result<()> {
let mut guard = self.stats.record_update();
guard.inc_row_count(batch.num_rows());
@@ -293,5 +293,267 @@ impl SingleCreator {
#[cfg(test)]
mod tests {
// TODO(zhongzc): After search is implemented, add tests for full-text indexer.
use std::collections::BTreeSet;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::data_type::DataType;
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextOptions};
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use futures::future::BoxFuture;
use futures::FutureExt;
use index::fulltext_index::search::RowId;
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::PuffinManager;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::storage::{ConcreteDataType, RegionId};
use super::*;
use crate::read::{Batch, BatchColumn};
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplier;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
fn mock_object_store() -> ObjectStore {
ObjectStore::new(Memory::default()).unwrap().finish()
}
async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
IntermediateManager::init_fs(path).await.unwrap()
}
fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"text_english_case_sensitive",
ConcreteDataType::string_datatype(),
true,
)
.with_fulltext_options(FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: true,
})
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"text_english_case_insensitive",
ConcreteDataType::string_datatype(),
true,
)
.with_fulltext_options(FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
})
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"text_chinese",
ConcreteDataType::string_datatype(),
true,
)
.with_fulltext_options(FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::Chinese,
case_sensitive: false,
})
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 4,
});
Arc::new(builder.build().unwrap())
}
fn new_batch(
rows: &[(
Option<&str>, // text_english_case_sensitive
Option<&str>, // text_english_case_insensitive
Option<&str>, // text_chinese
)],
) -> Batch {
let mut vec_english_sensitive =
ConcreteDataType::string_datatype().create_mutable_vector(0);
let mut vec_english_insensitive =
ConcreteDataType::string_datatype().create_mutable_vector(0);
let mut vec_chinese = ConcreteDataType::string_datatype().create_mutable_vector(0);
for (text_english_case_sensitive, text_english_case_insensitive, text_chinese) in rows {
match text_english_case_sensitive {
Some(s) => vec_english_sensitive.push_value_ref((*s).into()),
None => vec_english_sensitive.push_null(),
}
match text_english_case_insensitive {
Some(s) => vec_english_insensitive.push_value_ref((*s).into()),
None => vec_english_insensitive.push_null(),
}
match text_chinese {
Some(s) => vec_chinese.push_value_ref((*s).into()),
None => vec_chinese.push_null(),
}
}
let num_rows = vec_english_sensitive.len();
Batch::new(
vec![],
Arc::new(UInt64Vector::from_iter_values(
(0..num_rows).map(|n| n as u64),
)),
Arc::new(UInt64Vector::from_iter_values(
std::iter::repeat(0).take(num_rows),
)),
Arc::new(UInt8Vector::from_iter_values(
std::iter::repeat(1).take(num_rows),
)),
vec![
BatchColumn {
column_id: 1,
data: vec_english_sensitive.to_vector(),
},
BatchColumn {
column_id: 2,
data: vec_english_insensitive.to_vector(),
},
BatchColumn {
column_id: 3,
data: vec_chinese.to_vector(),
},
],
)
.unwrap()
}
async fn build_applier_factory(
prefix: &str,
rows: &[(
Option<&str>, // text_english_case_sensitive
Option<&str>, // text_english_case_insensitive
Option<&str>, // text_chinese
)],
) -> impl Fn(Vec<(ColumnId, &str)>) -> BoxFuture<'static, BTreeSet<RowId>> {
let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
let region_dir = "region0".to_string();
let sst_file_id = FileId::random();
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
let mut indexer = FulltextIndexer::new(
&region_metadata.region_id,
&sst_file_id,
&intm_mgr,
&region_metadata,
true,
1024,
)
.await
.unwrap();
let batch = new_batch(rows);
indexer.update(&batch).await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
let _ = indexer.finish(&mut writer).await.unwrap();
writer.finish().await.unwrap();
move |queries| {
let _d = &d;
let applier = FulltextIndexApplier::new(
region_dir.clone(),
object_store.clone(),
queries
.into_iter()
.map(|(a, b)| (a, b.to_string()))
.collect(),
factory.clone(),
);
async move { applier.apply(sst_file_id).await.unwrap() }.boxed()
}
}
#[tokio::test]
async fn test_fulltext_index_basic() {
let applier_factory = build_applier_factory(
"test_fulltext_index_basic_",
&[
(Some("hello"), None, Some("你好")),
(Some("world"), Some("world"), None),
(None, Some("World"), Some("世界")),
(
Some("Hello, World"),
Some("Hello, World"),
Some("你好,世界"),
),
],
)
.await;
let row_ids = applier_factory(vec![(1, "hello")]).await;
assert_eq!(row_ids, vec![0].into_iter().collect());
let row_ids = applier_factory(vec![(1, "world")]).await;
assert_eq!(row_ids, vec![1].into_iter().collect());
let row_ids = applier_factory(vec![(2, "hello")]).await;
assert_eq!(row_ids, vec![3].into_iter().collect());
let row_ids = applier_factory(vec![(2, "world")]).await;
assert_eq!(row_ids, vec![1, 2, 3].into_iter().collect());
let row_ids = applier_factory(vec![(3, "你好")]).await;
assert_eq!(row_ids, vec![0, 3].into_iter().collect());
let row_ids = applier_factory(vec![(3, "世界")]).await;
assert_eq!(row_ids, vec![2, 3].into_iter().collect());
}
#[tokio::test]
async fn test_fulltext_index_multi_columns() {
let applier_factory = build_applier_factory(
"test_fulltext_index_multi_columns_",
&[
(Some("hello"), None, Some("你好")),
(Some("world"), Some("world"), None),
(None, Some("World"), Some("世界")),
(
Some("Hello, World"),
Some("Hello, World"),
Some("你好,世界"),
),
],
)
.await;
let row_ids = applier_factory(vec![(1, "hello"), (3, "你好")]).await;
assert_eq!(row_ids, vec![0].into_iter().collect());
let row_ids = applier_factory(vec![(1, "world"), (3, "世界")]).await;
assert_eq!(row_ids, vec![].into_iter().collect());
let row_ids = applier_factory(vec![(2, "world"), (3, "世界")]).await;
assert_eq!(row_ids, vec![2, 3].into_iter().collect());
}
}

View File

@@ -15,8 +15,8 @@
use common_telemetry::{debug, warn};
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer;
use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::inverted_index::creator::InvertedIndexer;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput};

View File

@@ -28,16 +28,17 @@ use store_api::storage::RegionId;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::index::{CachedInvertedIndexBlobReader, InvertedIndexCacheRef};
use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::error::{ApplyInvertedIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::FileId;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::TYPE_INVERTED_INDEX;
use crate::sst::location;
/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files
/// `InvertedIndexApplier` is responsible for applying predicates to the provided SST files
/// and returning the relevant row group ids for further scan.
pub(crate) struct SstIndexApplier {
pub(crate) struct InvertedIndexApplier {
/// The root directory of the region.
region_dir: String,
@@ -61,10 +62,10 @@ pub(crate) struct SstIndexApplier {
inverted_index_cache: Option<InvertedIndexCacheRef>,
}
pub(crate) type SstIndexApplierRef = Arc<SstIndexApplier>;
pub(crate) type InvertedIndexApplierRef = Arc<InvertedIndexApplier>;
impl SstIndexApplier {
/// Creates a new [`SstIndexApplier`].
impl InvertedIndexApplier {
/// Creates a new `InvertedIndexApplier`.
pub fn new(
region_dir: String,
region_id: RegionId,
@@ -89,7 +90,9 @@ impl SstIndexApplier {
/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED.start_timer();
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
let context = SearchContext {
// Encountering a non-existing column indicates that it doesn't match predicates.
@@ -115,13 +118,13 @@ impl SstIndexApplier {
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
.context(ApplyInvertedIndexSnafu)
} else {
let mut index_reader = InvertedIndexBlobReader::new(blob);
self.index_applier
.apply(context, &mut index_reader)
.await
.context(ApplyIndexSnafu)
.context(ApplyInvertedIndexSnafu)
}
}
@@ -169,7 +172,7 @@ impl SstIndexApplier {
}
}
impl Drop for SstIndexApplier {
impl Drop for InvertedIndexApplier {
fn drop(&mut self) {
INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64);
}
@@ -212,7 +215,7 @@ mod tests {
})
});
let sst_index_applier = SstIndexApplier::new(
let sst_index_applier = InvertedIndexApplier::new(
region_dir.clone(),
RegionId::new(0, 0),
object_store,
@@ -254,7 +257,7 @@ mod tests {
mock_index_applier.expect_memory_usage().returning(|| 100);
mock_index_applier.expect_apply().never();
let sst_index_applier = SstIndexApplier::new(
let sst_index_applier = InvertedIndexApplier::new(
region_dir.clone(),
RegionId::new(0, 0),
object_store,

View File

@@ -37,19 +37,19 @@ use crate::cache::file_cache::FileCacheRef;
use crate::cache::index::InvertedIndexCacheRef;
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::inverted_index::applier::SstIndexApplier;
use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
use crate::sst::index::inverted_index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan.
pub(crate) struct SstIndexApplierBuilder<'a> {
/// Directory of the region, required argument for constructing [`SstIndexApplier`].
/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan.
pub(crate) struct InvertedIndexApplierBuilder<'a> {
/// Directory of the region, required argument for constructing [`InvertedIndexApplier`].
region_dir: String,
/// Object store, required argument for constructing [`SstIndexApplier`].
/// Object store, required argument for constructing [`InvertedIndexApplier`].
object_store: ObjectStore,
/// File cache, required argument for constructing [`SstIndexApplier`].
/// File cache, required argument for constructing [`InvertedIndexApplier`].
file_cache: Option<FileCacheRef>,
/// Metadata of the region, used to get metadata like column type.
@@ -68,8 +68,8 @@ pub(crate) struct SstIndexApplierBuilder<'a> {
index_cache: Option<InvertedIndexCacheRef>,
}
impl<'a> SstIndexApplierBuilder<'a> {
/// Creates a new [`SstIndexApplierBuilder`].
impl<'a> InvertedIndexApplierBuilder<'a> {
/// Creates a new [`InvertedIndexApplierBuilder`].
pub fn new(
region_dir: String,
object_store: ObjectStore,
@@ -91,9 +91,9 @@ impl<'a> SstIndexApplierBuilder<'a> {
}
}
/// Consumes the builder to construct an [`SstIndexApplier`], optionally returned based on
/// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
/// the expressions provided. If no predicates match, returns `None`.
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<SstIndexApplier>> {
pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
for expr in exprs {
self.traverse_and_collect(expr);
}
@@ -109,7 +109,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
.collect();
let applier = PredicatesIndexApplier::try_from(predicates);
Ok(Some(SstIndexApplier::new(
Ok(Some(InvertedIndexApplier::new(
self.region_dir,
self.metadata.region_id,
self.object_store,
@@ -324,7 +324,7 @@ mod tests {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,

View File

@@ -16,9 +16,9 @@ use datafusion_expr::Between;
use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
impl<'a> InvertedIndexApplierBuilder<'a> {
/// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`.
pub(crate) fn collect_between(&mut self, between: &Between) -> Result<()> {
if between.negated {
@@ -72,7 +72,7 @@ mod tests {
fn test_collect_between_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_between_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -115,7 +115,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_negated_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -141,7 +141,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -167,7 +167,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -194,7 +194,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_between_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,

View File

@@ -17,9 +17,9 @@ use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePre
use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
impl<'a> InvertedIndexApplierBuilder<'a> {
/// Collects a comparison expression in the form of
/// `column < lit`, `column > lit`, `column <= lit`, `column >= lit`,
/// `lit < column`, `lit > column`, `lit <= column`, `lit >= column`.
@@ -228,7 +228,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -257,7 +257,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -277,7 +277,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -298,7 +298,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_comparison_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,

View File

@@ -20,9 +20,9 @@ use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
impl<'a> InvertedIndexApplierBuilder<'a> {
/// Collects an eq expression in the form of `column = lit`.
pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
@@ -134,7 +134,7 @@ mod tests {
fn test_collect_eq_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -172,7 +172,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -193,7 +193,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -213,7 +213,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -233,7 +233,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -292,7 +292,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -330,7 +330,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,

View File

@@ -18,9 +18,9 @@ use datafusion_expr::expr::InList;
use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
impl<'a> InvertedIndexApplierBuilder<'a> {
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
pub(crate) fn collect_inlist(&mut self, inlist: &InList) -> Result<()> {
if inlist.negated {
@@ -65,7 +65,7 @@ mod tests {
fn test_collect_in_list_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_in_list_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -98,7 +98,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_negated_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -123,7 +123,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -148,7 +148,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_collect_in_list_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -175,7 +175,7 @@ mod tests {
PuffinManagerFactory::new_for_test_block("test_collect_in_list_nonexistent_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,

View File

@@ -17,9 +17,9 @@ use datafusion_expr::Expr as DfExpr;
use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate};
use crate::error::Result;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
impl<'a> InvertedIndexApplierBuilder<'a> {
/// Collects a regex match expression in the form of `column ~ pattern`.
pub(crate) fn collect_regex_match(&mut self, column: &DfExpr, pattern: &DfExpr) -> Result<()> {
let Some(column_name) = Self::column_name(column) else {
@@ -59,7 +59,7 @@ mod tests {
fn test_regex_match_basic() {
let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_regex_match_basic_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -88,7 +88,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_regex_match_field_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -110,7 +110,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_mismatch_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,
@@ -132,7 +132,7 @@ mod tests {
let (_d, facotry) =
PuffinManagerFactory::new_for_test_block("test_regex_match_type_nonexist_column_");
let metadata = test_region_metadata();
let mut builder = SstIndexApplierBuilder::new(
let mut builder = InvertedIndexApplierBuilder::new(
"test".to_string(),
test_object_store(),
None,

View File

@@ -51,8 +51,8 @@ const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
/// Creates SST index.
pub struct SstIndexCreator {
/// `InvertedIndexer` creates inverted index for SST files.
pub struct InvertedIndexer {
/// The index creator.
index_creator: Box<dyn InvertedIndexCreator>,
/// The provider of intermediate files.
@@ -75,8 +75,8 @@ pub struct SstIndexCreator {
column_ids: HashSet<ColumnId>,
}
impl SstIndexCreator {
/// Creates a new `SstIndexCreator`.
impl InvertedIndexer {
/// Creates a new `InvertedIndexer`.
/// Should ensure that the number of tag columns is greater than 0.
pub fn new(
sst_file_id: FileId,
@@ -298,7 +298,7 @@ mod tests {
use super::*;
use crate::cache::index::InvertedIndexCache;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
@@ -384,7 +384,7 @@ mod tests {
let memory_threshold = None;
let segment_row_count = 2;
let mut creator = SstIndexCreator::new(
let mut creator = InvertedIndexer::new(
sst_file_id,
&region_metadata,
intm_mgr,
@@ -407,7 +407,7 @@ mod tests {
move |expr| {
let _d = &d;
let cache = Arc::new(InvertedIndexCache::new(10, 10));
let applier = SstIndexApplierBuilder::new(
let applier = InvertedIndexApplierBuilder::new(
region_dir.clone(),
object_store.clone(),
None,

View File

@@ -39,7 +39,9 @@ pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;
pub(crate) type SstPuffinReader = <SstPuffinManager as PuffinManager>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type BlobReader = <<SstPuffinReader as PuffinReader>::Blob as BlobGuard>::Reader;
pub(crate) type SstPuffinBlob = <SstPuffinReader as PuffinReader>::Blob;
pub(crate) type SstPuffinDir = <SstPuffinReader as PuffinReader>::Dir;
pub(crate) type BlobReader = <SstPuffinBlob as BlobGuard>::Reader;
const STAGING_DIR: &str = "staging";

View File

@@ -15,6 +15,7 @@
//! Parquet reader.
use std::collections::{BTreeMap, VecDeque};
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -52,12 +53,15 @@ use crate::metrics::{
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::inverted_index::applier::SstIndexApplierRef;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
use crate::sst::parquet::row_selection::row_selection_from_row_ranges;
use crate::sst::parquet::row_selection::{
intersect_row_selections, row_selection_from_row_ranges, row_selection_from_sorted_row_ids,
};
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
@@ -78,8 +82,9 @@ pub struct ParquetReaderBuilder {
projection: Option<Vec<ColumnId>>,
/// Manager that caches SST data.
cache_manager: Option<CacheManagerRef>,
/// Index applier.
index_applier: Option<SstIndexApplierRef>,
/// Index appliers.
inverted_index_applier: Option<InvertedIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
/// Expected metadata of the region while reading the SST.
/// This is usually the latest metadata of the region. The reader use
/// it get the correct column id of a column by name.
@@ -101,7 +106,8 @@ impl ParquetReaderBuilder {
time_range: None,
projection: None,
cache_manager: None,
index_applier: None,
inverted_index_applier: None,
fulltext_index_applier: None,
expected_metadata: None,
}
}
@@ -136,10 +142,23 @@ impl ParquetReaderBuilder {
self
}
/// Attaches the index applier to the builder.
/// Attaches the inverted index applier to the builder.
#[must_use]
pub(crate) fn index_applier(mut self, index_applier: Option<SstIndexApplierRef>) -> Self {
self.index_applier = index_applier;
pub(crate) fn inverted_index_applier(
mut self,
index_applier: Option<InvertedIndexApplierRef>,
) -> Self {
self.inverted_index_applier = index_applier;
self
}
/// Attaches the fulltext index applier to the builder.
#[must_use]
pub(crate) fn fulltext_index_applier(
mut self,
index_applier: Option<FulltextIndexApplierRef>,
) -> Self {
self.fulltext_index_applier = index_applier;
self
}
@@ -315,15 +334,92 @@ impl ParquetReaderBuilder {
metrics: &mut ReaderMetrics,
) -> BTreeMap<usize, Option<RowSelection>> {
let num_row_groups = parquet_meta.num_row_groups();
if num_row_groups == 0 {
let num_rows = parquet_meta.file_metadata().num_rows();
if num_row_groups == 0 || num_rows == 0 {
return BTreeMap::default();
}
metrics.num_row_groups_before_filtering += num_row_groups;
self.prune_row_groups_by_inverted_index(parquet_meta, metrics)
.await
.or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics))
.unwrap_or_else(|| (0..num_row_groups).map(|i| (i, None)).collect())
// Let's assume that the number of rows in the first row group
// can represent the `row_group_size` of the Parquet file.
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
if row_group_size == 0 {
return BTreeMap::default();
}
metrics.num_row_groups_before_filtering += num_row_groups;
metrics.num_rows_in_row_group_before_filtering += num_rows as usize;
let mut output = (0..num_row_groups).map(|i| (i, None)).collect();
self.prune_row_groups_by_fulltext_index(row_group_size, parquet_meta, &mut output, metrics)
.await;
if output.is_empty() {
return output;
}
let inverted_filtered = self
.prune_row_groups_by_inverted_index(row_group_size, parquet_meta, &mut output, metrics)
.await;
if output.is_empty() {
return output;
}
if !inverted_filtered {
self.prune_row_groups_by_minmax(read_format, parquet_meta, &mut output, metrics);
}
output
}
/// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
async fn prune_row_groups_by_fulltext_index(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
return false;
};
if !self.file_handle.meta_ref().fulltext_index_available() {
return false;
}
let apply_res = match index_applier.apply(self.file_handle.file_id()).await {
Ok(res) => res,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply full-text index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply full-text index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
return false;
}
};
let row_group_to_row_ids = apply_res
.into_iter()
.group_by(|row_id| *row_id as usize / row_group_size);
Self::prune_row_groups_by_rows(
parquet_meta,
row_group_to_row_ids.into_iter(),
output,
&mut metrics.num_row_groups_fulltext_index_filtered,
&mut metrics.num_rows_in_row_group_fulltext_index_filtered,
);
true
}
/// Applies index to prune row groups.
@@ -333,51 +429,42 @@ impl ParquetReaderBuilder {
/// the correctness of the index.
async fn prune_row_groups_by_inverted_index(
&self,
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
let Some(index_applier) = &self.index_applier else {
return None;
) -> bool {
let Some(index_applier) = &self.inverted_index_applier else {
return false;
};
if !self.file_handle.meta_ref().inverted_index_available() {
return None;
return false;
}
let output = match index_applier.apply(self.file_handle.file_id()).await {
let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to apply index, region_id: {}, file_id: {}, err: {}",
"Failed to apply inverted index, region_id: {}, file_id: {}, err: {}",
self.file_handle.region_id(),
self.file_handle.file_id(),
err
);
} else {
warn!(
err; "Failed to apply index, region_id: {}, file_id: {}",
err; "Failed to apply inverted index, region_id: {}, file_id: {}",
self.file_handle.region_id(), self.file_handle.file_id()
);
}
return None;
return false;
}
};
// Let's assume that the number of rows in the first row group
// can represent the `row_group_size` of the Parquet file.
//
// If the file contains only one row group, i.e. the number of rows
// less than the `row_group_size`, the calculation of `row_group_id`
// and `rg_begin_row_id` is still correct.
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
if row_group_size == 0 {
return None;
}
let segment_row_count = output.segment_row_count;
let row_groups = output
let segment_row_count = apply_output.segment_row_count;
let grouped_in_row_groups = apply_output
.matched_segment_ids
.iter_ones()
.map(|seg_id| {
@@ -389,26 +476,21 @@ impl ParquetReaderBuilder {
(row_group_id, rg_begin_row_id..rg_end_row_id)
})
.group_by(|(row_group_id, _)| *row_group_id)
.group_by(|(row_group_id, _)| *row_group_id);
let ranges_in_row_groups = grouped_in_row_groups
.into_iter()
.map(|(row_group_id, group)| {
let row_ranges = group.map(|(_, range)| range);
.map(|(row_group_id, group)| (row_group_id, group.map(|(_, ranges)| ranges)));
let total_row_count = parquet_meta.row_group(row_group_id).num_rows() as usize;
let (row_selection, skipped) =
row_selection_from_row_ranges(row_ranges, total_row_count);
Self::prune_row_groups_by_ranges(
parquet_meta,
ranges_in_row_groups,
output,
&mut metrics.num_row_groups_inverted_index_filtered,
&mut metrics.num_rows_in_row_group_inverted_index_filtered,
);
metrics.num_rows_in_row_group_before_filtering += total_row_count;
metrics.num_rows_in_row_group_inverted_index_filtered += skipped;
(row_group_id, Some(row_selection))
})
.collect::<BTreeMap<_, _>>();
let filtered = parquet_meta.num_row_groups() - row_groups.len();
metrics.num_row_groups_inverted_index_filtered += filtered;
Some(row_groups)
true
}
/// Prunes row groups by min-max index.
@@ -416,13 +498,14 @@ impl ParquetReaderBuilder {
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
) -> bool {
let Some(predicate) = &self.predicate else {
return None;
return false;
};
let num_row_groups = parquet_meta.num_row_groups();
let row_groups_before = output.len();
let region_meta = read_format.metadata();
let row_groups = parquet_meta.row_groups();
@@ -431,18 +514,129 @@ impl ParquetReaderBuilder {
// Here we use the schema of the SST to build the physical expression. If the column
// in the SST doesn't have the same column id as the column in the expected metadata,
// we will get a None statistics for that column.
let row_groups = predicate
let res = predicate
.prune_with_stats(&stats, region_meta.schema.arrow_schema())
.iter()
.zip(0..num_row_groups)
.filter(|&(mask, _)| *mask)
.map(|(_, id)| (id, None))
.zip(0..parquet_meta.num_row_groups())
.filter_map(|(mask, row_group)| {
if !*mask {
return None;
}
let selection = output.remove(&row_group)?;
Some((row_group, selection))
})
.collect::<BTreeMap<_, _>>();
let filtered = num_row_groups - row_groups.len();
metrics.num_row_groups_min_max_filtered += filtered;
let row_groups_after = res.len();
metrics.num_row_groups_min_max_filtered += row_groups_before - row_groups_after;
Some(row_groups)
*output = res;
true
}
/// Prunes row groups by rows. The `rows_in_row_groups` is like a map from row group to
/// a list of row ids to keep.
fn prune_row_groups_by_rows(
parquet_meta: &ParquetMetaData,
rows_in_row_groups: impl Iterator<Item = (usize, impl Iterator<Item = u32>)>,
output: &mut BTreeMap<usize, Option<RowSelection>>,
filtered_row_groups: &mut usize,
filtered_rows: &mut usize,
) {
let row_groups_before = output.len();
let mut rows_in_row_group_before = 0;
let mut rows_in_row_group_after = 0;
let mut res = BTreeMap::new();
for (row_group, row_ids) in rows_in_row_groups {
let Some(selection) = output.remove(&row_group) else {
continue;
};
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
let new_selection = row_selection_from_sorted_row_ids(row_ids, total_row_count);
let intersected_selection = intersect_row_selections(selection, Some(new_selection));
let num_rows_after = intersected_selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
rows_in_row_group_after += num_rows_after;
if num_rows_after > 0 {
res.insert(row_group, intersected_selection);
}
}
// Pruned row groups.
while let Some((row_group, selection)) = output.pop_first() {
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
}
let row_groups_after = res.len();
*filtered_row_groups += row_groups_before - row_groups_after;
*filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
*output = res;
}
/// Prunes row groups by ranges. The `ranges_in_row_groups` is like a map from row group to
/// a list of row ranges to keep.
fn prune_row_groups_by_ranges(
parquet_meta: &ParquetMetaData,
ranges_in_row_groups: impl Iterator<Item = (usize, impl Iterator<Item = Range<usize>>)>,
output: &mut BTreeMap<usize, Option<RowSelection>>,
filtered_row_groups: &mut usize,
filtered_rows: &mut usize,
) {
let row_groups_before = output.len();
let mut rows_in_row_group_before = 0;
let mut rows_in_row_group_after = 0;
let mut res = BTreeMap::new();
for (row_group, row_ranges) in ranges_in_row_groups {
let Some(selection) = output.remove(&row_group) else {
continue;
};
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
let new_selection = row_selection_from_row_ranges(row_ranges, total_row_count);
let intersected_selection = intersect_row_selections(selection, Some(new_selection));
let num_rows_after = intersected_selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
rows_in_row_group_after += num_rows_after;
if num_rows_after > 0 {
res.insert(row_group, intersected_selection);
}
}
// Pruned row groups.
while let Some((row_group, selection)) = output.pop_first() {
let total_row_count = parquet_meta.row_group(row_group).num_rows() as usize;
rows_in_row_group_before += selection
.as_ref()
.map_or(total_row_count, |s| s.row_count());
}
let row_groups_after = res.len();
*filtered_row_groups += row_groups_before - row_groups_after;
*filtered_rows += rows_in_row_group_before - rows_in_row_group_after;
*output = res;
}
}
@@ -504,6 +698,8 @@ fn time_range_to_predicate(
pub(crate) struct ReaderMetrics {
/// Number of row groups before filtering.
num_row_groups_before_filtering: usize,
/// Number of row groups filtered by fulltext index.
num_row_groups_fulltext_index_filtered: usize,
/// Number of row groups filtered by inverted index.
num_row_groups_inverted_index_filtered: usize,
/// Number of row groups filtered by min-max index.
@@ -512,6 +708,8 @@ pub(crate) struct ReaderMetrics {
num_rows_precise_filtered: usize,
/// Number of rows in row group before filtering.
num_rows_in_row_group_before_filtering: usize,
/// Number of rows in row group filtered by fulltext index.
num_rows_in_row_group_fulltext_index_filtered: usize,
/// Number of rows in row group filtered by inverted index.
num_rows_in_row_group_inverted_index_filtered: usize,
/// Duration to build the parquet reader.
@@ -530,10 +728,13 @@ impl ReaderMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
self.build_cost += other.build_cost;
@@ -782,6 +983,9 @@ impl Drop for ParquetReader {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(metrics.num_row_groups_fulltext_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_row_groups_inverted_index_filtered as u64);
@@ -794,6 +998,9 @@ impl Drop for ParquetReader {
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_fulltext_index_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64);
@@ -936,3 +1143,294 @@ impl RowGroupReader {
Ok(())
}
}
#[cfg(test)]
mod tests {
use parquet::arrow::arrow_reader::RowSelector;
use parquet::file::metadata::{FileMetaData, RowGroupMetaData};
use parquet::schema::types::{SchemaDescriptor, Type};
use super::*;
fn mock_parquet_metadata_from_row_groups(num_rows_in_row_groups: Vec<i64>) -> ParquetMetaData {
let tp = Arc::new(Type::group_type_builder("test").build().unwrap());
let schema_descr = Arc::new(SchemaDescriptor::new(tp));
let mut row_groups = Vec::new();
for num_rows in &num_rows_in_row_groups {
let row_group = RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(*num_rows)
.build()
.unwrap();
row_groups.push(row_group);
}
let file_meta = FileMetaData::new(
0,
num_rows_in_row_groups.iter().sum(),
None,
None,
schema_descr,
None,
);
ParquetMetaData::new(file_meta, row_groups)
}
#[test]
fn prune_row_groups_by_rows_from_empty() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let rows_in_row_groups = [
(0, [5, 6, 7, 8, 9].into_iter()),
(2, [0, 1, 2, 3, 4].into_iter()),
];
// The original output is empty. No row groups are pruned.
let mut output = BTreeMap::new();
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert!(output.is_empty());
assert_eq!(filtered_row_groups, 0);
assert_eq!(filtered_rows, 0);
}
#[test]
fn prune_row_groups_by_rows_from_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let rows_in_row_groups = [
(0, [5, 6, 7, 8, 9].into_iter()),
(2, [0, 1, 2, 3, 4].into_iter()),
];
// The original output is full.
let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::skip(5),
RowSelector::select(5),
]))
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
])
);
assert_eq!(filtered_row_groups, 1);
assert_eq!(filtered_rows, 15);
}
#[test]
fn prune_row_groups_by_rows_from_not_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let rows_in_row_groups = [
(0, [5, 6, 7, 8, 9].into_iter()),
(2, [0, 1, 2, 3, 4].into_iter()),
];
// The original output is not full.
let mut output = BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(
1,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
);
assert_eq!(filtered_row_groups, 2);
assert_eq!(filtered_rows, 10);
}
#[test]
fn prune_row_groups_by_ranges_from_empty() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
// The original output is empty. No row groups are pruned.
let mut output = BTreeMap::new();
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert!(output.is_empty());
assert_eq!(filtered_row_groups, 0);
assert_eq!(filtered_rows, 0);
}
#[test]
fn prune_row_groups_by_ranges_from_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
// The original output is full.
let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::skip(5),
RowSelector::select(5),
]))
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
])
);
assert_eq!(filtered_row_groups, 1);
assert_eq!(filtered_rows, 15);
}
#[test]
fn prune_row_groups_by_ranges_from_not_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..5).into_iter())];
// The original output is not full.
let mut output = BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(
1,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
);
assert_eq!(filtered_row_groups, 2);
assert_eq!(filtered_rows, 10);
}
#[test]
fn prune_row_groups_by_ranges_exceed_end() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);
// The range exceeds the end of the row group.
let ranges_in_row_groups = [(0, Some(5..10).into_iter()), (2, Some(0..10).into_iter())];
let mut output = BTreeMap::from([
(
0,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(
1,
Some(RowSelection::from(vec![
RowSelector::select(5),
RowSelector::skip(5),
])),
),
(2, Some(RowSelection::from(vec![RowSelector::select(5)]))),
]);
let mut filtered_row_groups = 0;
let mut filtered_rows = 0;
ParquetReaderBuilder::prune_row_groups_by_ranges(
&parquet_meta,
ranges_in_row_groups.into_iter(),
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
);
assert_eq!(
output,
BTreeMap::from([(2, Some(RowSelection::from(vec![RowSelector::select(5)])))])
);
assert_eq!(filtered_row_groups, 2);
assert_eq!(filtered_rows, 10);
}
}

View File

@@ -16,10 +16,7 @@ use std::ops::Range;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
type SkipRowCount = usize;
/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
/// Returns the `RowSelection` and the number of rows that were skipped.
///
/// This function processes each range in the input and either creates a new selector or merges
/// with the existing one, depending on whether the current range is contiguous with the preceding one
@@ -30,23 +27,68 @@ type SkipRowCount = usize;
pub(crate) fn row_selection_from_row_ranges(
row_ranges: impl Iterator<Item = Range<usize>>,
total_row_count: usize,
) -> (RowSelection, SkipRowCount) {
) -> RowSelection {
let mut selectors: Vec<RowSelector> = Vec::new();
let mut last_processed_end = 0;
let mut skip_row_count = 0;
for Range { start, end } in row_ranges {
let end = end.min(total_row_count);
if start > last_processed_end {
add_or_merge_selector(&mut selectors, start - last_processed_end, true);
skip_row_count += start - last_processed_end;
}
add_or_merge_selector(&mut selectors, end - start, false);
last_processed_end = end;
}
skip_row_count += total_row_count.saturating_sub(last_processed_end);
(RowSelection::from(selectors), skip_row_count)
if last_processed_end < total_row_count {
add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
}
RowSelection::from(selectors)
}
/// Converts an iterator of sorted row IDs into a `RowSelection`.
///
/// Note: the input iterator must be sorted in ascending order and
/// contain unique row IDs in the range [0, total_row_count).
pub(crate) fn row_selection_from_sorted_row_ids(
row_ids: impl Iterator<Item = u32>,
total_row_count: usize,
) -> RowSelection {
let mut selectors: Vec<RowSelector> = Vec::new();
let mut last_processed_end = 0;
for row_id in row_ids {
let start = row_id as usize;
let end = start + 1;
if start > last_processed_end {
add_or_merge_selector(&mut selectors, start - last_processed_end, true);
}
add_or_merge_selector(&mut selectors, end - start, false);
last_processed_end = end;
}
if last_processed_end < total_row_count {
add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
}
RowSelection::from(selectors)
}
/// Intersects two `RowSelection`s.
pub(crate) fn intersect_row_selections(
a: Option<RowSelection>,
b: Option<RowSelection>,
) -> Option<RowSelection> {
match (a, b) {
(Some(a), Some(b)) => Some(a.intersection(&b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}
/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
@@ -74,55 +116,98 @@ mod tests {
#[test]
fn test_single_contiguous_range() {
let (selection, skipped) = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
assert_eq!(selection, expected);
assert_eq!(skipped, 5);
}
#[test]
fn test_non_contiguous_ranges() {
let ranges = [1..3, 5..8];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(2),
RowSelector::skip(2),
RowSelector::select(3),
RowSelector::skip(2),
]);
assert_eq!(selection, expected);
assert_eq!(skipped, 5);
}
#[test]
fn test_empty_range() {
let ranges = [];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![]);
let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(10)]);
assert_eq!(selection, expected);
assert_eq!(skipped, 10);
}
#[test]
fn test_adjacent_ranges() {
let ranges = [1..2, 2..3];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(2)]);
let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(2),
RowSelector::skip(7),
]);
assert_eq!(selection, expected);
assert_eq!(skipped, 8);
}
#[test]
fn test_large_gap_between_ranges() {
let ranges = [1..2, 100..101];
let (selection, skipped) = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(98),
RowSelector::select(1),
RowSelector::skip(10139),
]);
assert_eq!(selection, expected);
assert_eq!(skipped, 10238);
}
#[test]
fn test_range_end_over_total_row_count() {
let ranges = Some(1..10);
let selection = row_selection_from_row_ranges(ranges.into_iter(), 5);
let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(4)]);
assert_eq!(selection, expected);
}
#[test]
fn test_row_ids_to_selection() {
let row_ids = [1, 3, 5, 7, 9].into_iter();
let selection = row_selection_from_sorted_row_ids(row_ids, 10);
let expected = RowSelection::from(vec![
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(1),
RowSelector::select(1),
RowSelector::skip(1),
RowSelector::select(1),
]);
assert_eq!(selection, expected);
}
#[test]
fn test_row_ids_to_selection_full() {
let row_ids = 0..10;
let selection = row_selection_from_sorted_row_ids(row_ids, 10);
let expected = RowSelection::from(vec![RowSelector::select(10)]);
assert_eq!(selection, expected);
}
#[test]
fn test_row_ids_to_selection_empty() {
let selection = row_selection_from_sorted_row_ids(None.into_iter(), 10);
let expected = RowSelection::from(vec![RowSelector::skip(10)]);
assert_eq!(selection, expected);
}
}

View File

@@ -129,7 +129,7 @@ where
impl<S, F> FsPuffinReader<S, F>
where
S: Stager,
F: PuffinFileAccessor,
F: PuffinFileAccessor + Clone,
{
async fn init_blob_to_stager(
mut reader: PuffinFileReader<F::Reader>,
@@ -164,14 +164,16 @@ where
let dir_meta: DirMetadata =
serde_json::from_slice(buf.as_slice()).context(DeserializeJsonSnafu)?;
let mut size = 0;
let mut tasks = vec![];
for file_meta in dir_meta.files {
let blob_meta = puffin_metadata.blobs.get(file_meta.blob_index).context(
BlobIndexOutOfBoundSnafu {
let blob_meta = puffin_metadata
.blobs
.get(file_meta.blob_index)
.context(BlobIndexOutOfBoundSnafu {
index: file_meta.blob_index,
max_index: puffin_metadata.blobs.len(),
},
)?;
})?
.clone();
ensure!(
blob_meta.blob_type == file_meta.key,
FileKeyNotMatchSnafu {
@@ -180,13 +182,24 @@ where
}
);
let reader = file.blob_reader(blob_meta)?;
let reader = accessor.reader(&puffin_file_name).await?;
let writer = writer_provider.writer(&file_meta.relative_path).await?;
let compression = blob_meta.compression_codec;
size += Self::handle_decompress(reader, writer, compression).await?;
let task = common_runtime::spawn_read(async move {
let mut file = PuffinFileReader::new(reader);
let reader = file.blob_reader(&blob_meta)?;
let compression = blob_meta.compression_codec;
let size = Self::handle_decompress(reader, writer, compression).await?;
Ok(size)
});
tasks.push(task);
}
let size = futures::future::try_join_all(tasks.into_iter())
.await
.into_iter()
.flatten()
.sum::<Result<_>>()?;
Ok(size)
}