it works on my machine ¯\_(ツ)_/¯

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-04-18 21:33:06 +08:00
parent 182e22dda9
commit 9e1e4a5181
17 changed files with 336 additions and 36 deletions

12
Cargo.lock generated
View File

@@ -2550,7 +2550,6 @@ checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5"
[[package]]
name = "datafusion"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"ahash 0.8.6",
"arrow",
@@ -2597,7 +2596,6 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"ahash 0.8.6",
"arrow",
@@ -2615,7 +2613,6 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"arrow",
"chrono",
@@ -2635,7 +2632,6 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"ahash 0.8.6",
"arrow",
@@ -2649,7 +2645,6 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"arrow",
"async-trait",
@@ -2666,7 +2661,6 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"ahash 0.8.6",
"arrow",
@@ -2699,7 +2693,6 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"ahash 0.8.6",
"arrow",
@@ -2729,7 +2722,6 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"arrow",
"arrow-schema",
@@ -2742,7 +2734,6 @@ dependencies = [
[[package]]
name = "datafusion-substrait"
version = "32.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=26e43acac3a96cec8dd4c8365f22dfb1a84306e9#26e43acac3a96cec8dd4c8365f22dfb1a84306e9"
dependencies = [
"async-recursion",
"chrono",
@@ -9934,6 +9925,7 @@ dependencies = [
"bytes",
"catalog",
"common-error",
"common-function",
"common-macro",
"datafusion",
"datafusion-common",
@@ -9942,6 +9934,7 @@ dependencies = [
"datatypes",
"promql",
"prost 0.12.3",
"session",
"snafu",
"substrait 0.17.1",
"tokio",
@@ -10183,6 +10176,7 @@ dependencies = [
"time",
"uuid",
"winapi",
"zstd 0.13.0",
]
[[package]]

View File

@@ -91,13 +91,20 @@ bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
# datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion = { path = "../arrow-datafusion/datafusion/core" }
datafusion-common = { path = "../arrow-datafusion/datafusion/common" }
datafusion-expr = { path = "../arrow-datafusion/datafusion/expr" }
datafusion-optimizer = { path = "../arrow-datafusion/datafusion/optimizer" }
datafusion-physical-expr = { path = "../arrow-datafusion/datafusion/physical-expr" }
datafusion-sql = { path = "../arrow-datafusion/datafusion/sql" }
datafusion-substrait = { path = "../arrow-datafusion/datafusion/substrait" }
derive_builder = "0.12"
dotenv = "0.15"
etcd-client = "0.12"

54
feed.py Normal file
View File

@@ -0,0 +1,54 @@
# read line from log-1000.txt and POST it to http://localhost:4000/v1/influxdb/write?db=public&precision=ms
# POST data format: "many_logs,host=1 log=<FILE CONTENT> <INCREMENT ID>"
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
batch_size = 3000
worker = 8
# Define the URL
url = "http://localhost:4000/v1/influxdb/write?db=public&precision=ms"
def send_data(start, data):
# Send the POST request
response = requests.post(url, data=data)
# Check the response
if response.status_code >= 300:
print(
f"Failed to send log line {start}: {response.status_code} {response.text}"
)
# Open the file
with open("target/log-1000.txt", "r") as file:
lines = file.readlines()
# Create a progress bar
with tqdm(
total=len(lines),
desc="Processing lines",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt}",
) as pbar:
data = ""
with ThreadPoolExecutor(max_workers=worker) as executor:
for i, line in enumerate(lines):
# Prepare the POST data
content = line.strip()
content = content.replace('"', " ")
content = content.replace("'", " ")
content = content.replace("=", " ")
content = content.replace(".", " ")
data = data + f'many_logs,host=1 log="{content}" {i}\n'
if i % batch_size == 0:
executor.submit(send_data, i, data)
data = ""
# Update the progress bar
pbar.update(batch_size)
# close the executor
executor.shutdown(wait=True)

View File

@@ -15,6 +15,7 @@
pub mod aggregate;
pub(crate) mod date;
pub mod expression;
pub mod matches;
pub mod math;
pub mod numpy;
#[cfg(test)]

View File

@@ -0,0 +1,61 @@
// Copyright 2024 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use common_query::error::Result;
use common_query::prelude::{Signature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{BooleanVector, VectorRef};
use crate::function::{Function, FunctionContext};
const NAME: &str = "matches";
/// The function to find remainders
#[derive(Clone, Debug, Default)]
pub struct MatchesFunction;
impl Display for MatchesFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", NAME.to_ascii_uppercase())
}
}
impl Function for MatchesFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::boolean_datatype())
}
fn signature(&self) -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
],
Volatility::Immutable,
)
}
fn eval(&self, _func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
let num_rows = columns[1].len();
Ok(Arc::new(BooleanVector::from(vec![true; num_rows])))
}
}

View File

@@ -31,6 +31,7 @@ pub use pow::PowFunction;
pub use rate::RateFunction;
use snafu::ResultExt;
use super::matches::MatchesFunction;
use crate::function::{Function, FunctionContext};
use crate::function_registry::FunctionRegistry;
use crate::scalars::math::modulo::ModuloFunction;
@@ -44,6 +45,7 @@ impl MathFunction {
registry.register(Arc::new(RateFunction));
registry.register(Arc::new(RangeFunction));
registry.register(Arc::new(ClampFunction));
registry.register(Arc::new(MatchesFunction));
}
}

View File

@@ -12,14 +12,16 @@ async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
common-error.workspace = true
common-function.workspace = true
common-macro.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-substrait.workspace = true
datafusion.workspace = true
datatypes.workspace = true
promql.workspace = true
prost.workspace = true
session.workspace = true
snafu.workspace = true
[dependencies.substrait_proto]

View File

@@ -16,6 +16,9 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use common_function::scalars::matches::MatchesFunction;
use common_function::scalars::udf::create_udf;
use common_function::state::FunctionState;
use datafusion::catalog::CatalogList;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
@@ -24,6 +27,7 @@ use datafusion_expr::LogicalPlan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use datafusion_substrait::logical_plan::producer::to_substrait_plan;
use prost::Message;
use session::context::QueryContext;
use snafu::ResultExt;
use substrait_proto::proto::Plan;
@@ -50,6 +54,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
let state = SessionState::new_with_config_rt(state_config, Arc::new(RuntimeEnv::default()))
.with_serializer_registry(Arc::new(ExtensionSerializer));
let mut context = SessionContext::new_with_state(state);
let udf = create_udf(
Arc::new(MatchesFunction),
QueryContext::arc(),
Arc::new(FunctionState::default()),
);
context.register_udf(udf.into());
context.register_catalog_list(catalog_list);
let plan = Plan::decode(message).context(DecodeRelSnafu)?;
let df_plan = from_substrait_plan(&mut context, &plan)
@@ -65,6 +76,13 @@ impl SubstraitPlan for DFLogicalSubstraitConvertor {
.with_serializer_registry(Arc::new(ExtensionSerializer));
let context = SessionContext::new_with_state(session_state);
let udf = create_udf(
Arc::new(MatchesFunction),
QueryContext::arc(),
Arc::new(FunctionState::default()),
);
context.register_udf(udf.into());
let substrait_plan = to_substrait_plan(plan, &context).context(EncodeDfPlanSnafu)?;
substrait_plan.encode(&mut buf).context(EncodeRelSnafu)?;

View File

@@ -22,13 +22,13 @@ greptime-proto.workspace = true
mockall.workspace = true
pin-project.workspace = true
prost.workspace = true
regex.workspace = true
regex-automata.workspace = true
regex.workspace = true
snafu.workspace = true
tantivy = "0.22"
tantivy = { version = "0.22", features = ["zstd-compression"] }
[dev-dependencies]
rand.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tokio.workspace = true

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::path::Path;
use snafu::ResultExt;
use tantivy::schema::{OwnedValue, Schema, INDEXED, STORED, TEXT};
use tantivy::{Document, Index, IndexWriter, TantivyDocument};
use tantivy::schema::{Schema, INDEXED, STORED, TEXT};
use tantivy::store::{Compressor, ZstdCompressor};
use tantivy::{Index, IndexWriter, TantivyDocument};
use super::error::TantivySnafu;
use crate::full_text_index::error::Result;
@@ -43,12 +43,20 @@ impl FullTextIndexCreater {
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
// create path
std::fs::create_dir_all(&path).unwrap();
common_telemetry::info!("[DEBUG] create full text index in {:?}", path.as_ref());
// build index
let index = Index::create_in_dir(path, schema).context(TantivySnafu)?;
let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?;
// tune
index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default());
index.settings_mut().docstore_blocksize = 65_536;
// build writer
// 100 MB
let writer = index.writer(100_000_000).context(TantivySnafu)?;
let writer = index.writer(400_000_000).context(TantivySnafu)?;
Ok(Self {
index,
@@ -67,11 +75,14 @@ impl FullTextIndexCreater {
self.writer.add_document(doc).context(TantivySnafu)?;
self.row_count += 1;
self.writer.commit().context(TantivySnafu)?;
Ok(())
}
pub fn finish(&mut self) -> Result<()> {
common_telemetry::info!(
"[DEBUG] full text index finish with {} entries",
self.row_count
);
self.row_count = 0;
self.writer.commit().context(TantivySnafu)?;
Ok(())

View File

@@ -55,7 +55,7 @@ impl FullTextIndexSearcher {
let query_parser = QueryParser::for_index(&self.index, vec![self.text_field]);
let query = query_parser.parse_query(query).context(ParseQuerySnafu)?;
let top_docs = searcher
.search(&query, &tantivy::collector::TopDocs::with_limit(100))
.search(&query, &tantivy::collector::TopDocs::with_limit(1000_0000))
.context(TantivySnafu)?;
let mut result = HashSet::new();
for (_score, doc_address) in top_docs {

View File

@@ -96,6 +96,13 @@ impl AccessLayer {
})?;
}
let full_text_index_dir = format!(
"/tmp/greptimedb/{}index/{}/full_text_index",
self.region_dir, file_meta.file_id
);
common_telemetry::info!("[DEBUG] removing {}", full_text_index_dir);
tokio::fs::remove_dir(full_text_index_dir).await.unwrap();
Ok(())
}

View File

@@ -39,7 +39,7 @@ use crate::read::{compat, Batch, Source};
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::index::applier::{FullTextIndexApplier, SstIndexApplierRef};
/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
@@ -269,6 +269,7 @@ impl ScanRegion {
);
let index_applier = self.build_index_applier();
let full_text_index_applier = self.build_full_text_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
@@ -283,6 +284,7 @@ impl ScanRegion {
.with_files(files)
.with_cache(self.cache_manager)
.with_index_applier(index_applier)
.with_full_index_applier(full_text_index_applier)
.with_parallelism(self.parallelism)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
@@ -337,9 +339,12 @@ impl ScanRegion {
.map(Arc::new)
}
fn build_full_text_index_applier(&self) -> Option<SstIndexApplierRef> {
// start here
todo!()
fn build_full_text_index_applier(&self) -> Option<FullTextIndexApplier> {
FullTextIndexApplier::new(
self.access_layer.region_dir().to_string(),
self.version.metadata.region_id,
&self.request.filters,
)
}
}
@@ -398,6 +403,8 @@ pub(crate) struct ScanInput {
pub(crate) append_mode: bool,
/// Whether to remove deletion markers.
pub(crate) filter_deleted: bool,
full_text_index_applier: Option<FullTextIndexApplier>,
}
impl ScanInput {
@@ -418,6 +425,7 @@ impl ScanInput {
query_start: None,
append_mode: false,
filter_deleted: true,
full_text_index_applier: None,
}
}
@@ -477,6 +485,15 @@ impl ScanInput {
self
}
#[must_use]
pub(crate) fn with_full_index_applier(
mut self,
index_applier: Option<FullTextIndexApplier>,
) -> Self {
self.full_text_index_applier = index_applier;
self
}
/// Sets start time of the query.
#[must_use]
pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
@@ -514,6 +531,7 @@ impl ScanInput {
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.full_text_index_applier(self.full_text_index_applier.clone())
.build()
.await;
let reader = match maybe_reader {

View File

@@ -63,6 +63,8 @@ impl Indexer {
// Skip index creation if error occurs.
self.inner = None;
}
} else {
common_telemetry::info!("[DEBUG] Indexer::update: inner is None");
}
if let Some(creator) = self.inner.as_ref() {

View File

@@ -16,7 +16,10 @@ pub mod builder;
use std::sync::Arc;
use common_query::logical_plan::Expr;
use datafusion_expr::Expr as DfExpr;
use futures::{AsyncRead, AsyncSeek};
use index::full_text_index::search::FullTextIndexSearcher;
use index::inverted_index::format::reader::InvertedIndexBlobReader;
use index::inverted_index::search::index_apply::{
ApplyOutput, IndexApplier, IndexNotFoundStrategy, SearchContext,
@@ -172,6 +175,53 @@ impl Drop for SstIndexApplier {
}
}
#[derive(Debug, Clone)]
pub(crate) struct FullTextIndexApplier {
region_dir: String,
region_id: RegionId,
query: String,
}
impl FullTextIndexApplier {
pub fn new(region_dir: String, region_id: RegionId, filters: &[Expr]) -> Option<Self> {
let query = Self::extract_from_filter(filters)?;
Some(Self {
region_dir,
region_id,
query,
})
}
fn extract_from_filter(filters: &[Expr]) -> Option<String> {
common_telemetry::info!("[DEBUG] filters in scan request: {:?}", filters);
for filter in filters {
if let DfExpr::ScalarUDF(udf) = filter.df_expr()
&& udf.fun.name == "matches"
{
let pattern = &udf.args[0];
if let DfExpr::Literal(literal) = pattern {
return Some(literal.to_string());
} else {
return None;
}
}
}
None
}
/// Returns the selected row number
pub fn apply(&self, file_id: FileId) -> Result<Vec<usize>> {
let index_path = format!(
"/tmp/greptimedb/{}index/{}/full_text_index",
self.region_dir, file_id
);
common_telemetry::info!("[DEBUG] open index at {index_path}");
let searcher = FullTextIndexSearcher::open(index_path).unwrap();
Ok(searcher.search(&self.query).unwrap())
}
}
#[cfg(test)]
mod tests {
use common_base::BitVec;

View File

@@ -121,9 +121,11 @@ impl SstIndexCreator {
);
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
let full_text_index_path = format!("{file_path}/full_text_index");
let full_text_index_creater =
FullTextIndexCreater::new(segment_row_count.get(), full_text_index_path).unwrap();
let file_id = file_path.trim_end_matches(".puffin");
let full_text_index_path = format!("/tmp/greptimedb/{file_id}/full_text_index");
// let full_text_index_creater =
// FullTextIndexCreater::new(segment_row_count.get(), full_text_index_path).unwrap();
let full_text_index_creater = FullTextIndexCreater::new(1, full_text_index_path).unwrap();
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
Self {
@@ -250,6 +252,7 @@ impl SstIndexCreator {
}
// try find column named "log" and update it into full text index
common_telemetry::info!("[DEBUG] do_update: log_column_id: {:?}", self.log_column_id);
if let Some(log_column_id) = self.log_column_id {
for col in batch.fields() {
if col.column_id == log_column_id {

View File

@@ -29,7 +29,7 @@ use datafusion_common::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection, RowSelector};
use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
use parquet::file::metadata::ParquetMetaData;
use parquet::format::KeyValue;
@@ -50,7 +50,7 @@ use crate::metrics::{
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::index::applier::{FullTextIndexApplier, SstIndexApplierRef};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::row_group::InMemoryRowGroup;
@@ -77,6 +77,8 @@ pub(crate) struct ParquetReaderBuilder {
cache_manager: Option<CacheManagerRef>,
/// Index applier.
index_applier: Option<SstIndexApplierRef>,
full_text_index_applier: Option<FullTextIndexApplier>,
}
impl ParquetReaderBuilder {
@@ -95,6 +97,7 @@ impl ParquetReaderBuilder {
projection: None,
cache_manager: None,
index_applier: None,
full_text_index_applier: None,
}
}
@@ -131,6 +134,15 @@ impl ParquetReaderBuilder {
self
}
#[must_use]
pub fn full_text_index_applier(
mut self,
full_text_index_applier: Option<FullTextIndexApplier>,
) -> Self {
self.full_text_index_applier = full_text_index_applier;
self
}
/// Builds and initializes a [ParquetReader].
///
/// This needs to perform IO operation.
@@ -280,6 +292,11 @@ impl ParquetReaderBuilder {
}
metrics.num_row_groups_before_filtering += num_row_groups;
if let Some(full_text_index_result) = self.prune_row_groups_by_full_text_index(parquet_meta)
{
return full_text_index_result;
}
self.prune_row_groups_by_inverted_index(parquet_meta, metrics)
.await
.or_else(|| self.prune_row_groups_by_minmax(read_format, parquet_meta, metrics))
@@ -408,6 +425,59 @@ impl ParquetReaderBuilder {
Some(row_groups)
}
fn prune_row_groups_by_full_text_index(
&self,
parquet_meta: &ParquetMetaData,
) -> Option<BTreeMap<usize, Option<RowSelection>>> {
let applier = self.full_text_index_applier.as_ref()?;
let file_id = self.file_handle.file_id();
let mut selected_row = applier.apply(file_id).unwrap();
common_telemetry::info!("[DEBUG] selected_row: {:?}", selected_row.len());
// Let's assume that the number of rows in the first row group
// can represent the `row_group_size` of the Parquet file.
//
// If the file contains only one row group, i.e. the number of rows
// less than the `row_group_size`, the calculation of `row_group_id`
// and `rg_begin_row_id` is still correct.
let row_group_size = parquet_meta.row_group(0).num_rows() as usize;
if row_group_size == 0 {
return None;
}
// translate `selected_row` into row groups selection
selected_row.sort_unstable();
let mut row_groups_selected = BTreeMap::new();
for row_id in selected_row.iter() {
let row_group_id = row_id / row_group_size;
let rg_row_id = row_id % row_group_size;
row_groups_selected
.entry(row_group_id)
.or_insert_with(Vec::new)
.push(rg_row_id);
}
let row_group = row_groups_selected
.into_iter()
.map(|(row_group_id, row_ids)| {
let mut current_row = 0;
let mut selection = vec![];
for row_id in row_ids {
selection.push(RowSelector::skip(row_id - current_row));
selection.push(RowSelector::select(1));
current_row = row_id + 1;
}
(row_group_id, Some(RowSelection::from(selection)))
})
.collect();
// common_telemetry::info!("[DEBUG] row_group: {:?}", row_group);
Some(row_group)
}
}
/// Parquet reader metrics.