mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
fix: flat format use correct encoding in indexer for tags (#7440)
* test: add inverted and skipping test Signed-off-by: evenyag <realevenyag@gmail.com> * test: Add tests for fulltext index Signed-off-by: evenyag <realevenyag@gmail.com> * fix: index dictionary type in correct encoding in flat format Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: use encode_data_type() in SortField Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: refine imports Signed-off-by: evenyag <realevenyag@gmail.com> * test: add tests for sparse encoding Signed-off-by: evenyag <realevenyag@gmail.com> * chore: remove logs Signed-off-by: evenyag <realevenyag@gmail.com> * test: update list test Signed-off-by: evenyag <realevenyag@gmail.com> * test: simplify tests Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -48,7 +48,7 @@ impl IndexValueCodec {
|
||||
) -> Result<()> {
|
||||
ensure!(!value.is_null(), IndexEncodeNullSnafu);
|
||||
|
||||
if field.data_type().is_string() {
|
||||
if field.encode_data_type().is_string() {
|
||||
let value = value
|
||||
.try_into_string()
|
||||
.context(FieldTypeMismatchSnafu)?
|
||||
|
||||
@@ -57,15 +57,20 @@ impl SortField {
|
||||
&self.data_type
|
||||
}
|
||||
|
||||
pub fn estimated_size(&self) -> usize {
|
||||
/// Returns the physical data type to encode of the field.
|
||||
///
|
||||
/// For example, a dictionary field will be encoded as its value type.
|
||||
pub fn encode_data_type(&self) -> &ConcreteDataType {
|
||||
match &self.data_type {
|
||||
ConcreteDataType::Dictionary(dict_type) => {
|
||||
Self::estimated_size_by_type(dict_type.value_type())
|
||||
}
|
||||
data_type => Self::estimated_size_by_type(data_type),
|
||||
ConcreteDataType::Dictionary(dict_type) => dict_type.value_type(),
|
||||
_ => &self.data_type,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn estimated_size(&self) -> usize {
|
||||
Self::estimated_size_by_type(self.encode_data_type())
|
||||
}
|
||||
|
||||
fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize {
|
||||
match data_type {
|
||||
ConcreteDataType::Boolean(_) => 2,
|
||||
@@ -98,12 +103,7 @@ impl SortField {
|
||||
serializer: &mut Serializer<&mut Vec<u8>>,
|
||||
value: &ValueRef,
|
||||
) -> Result<()> {
|
||||
match self.data_type() {
|
||||
ConcreteDataType::Dictionary(dict_type) => {
|
||||
Self::serialize_by_type(dict_type.value_type(), serializer, value)
|
||||
}
|
||||
data_type => Self::serialize_by_type(data_type, serializer, value),
|
||||
}
|
||||
Self::serialize_by_type(self.encode_data_type(), serializer, value)
|
||||
}
|
||||
|
||||
fn serialize_by_type(
|
||||
@@ -194,12 +194,7 @@ impl SortField {
|
||||
|
||||
/// Deserialize a value from the deserializer.
|
||||
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
|
||||
match &self.data_type {
|
||||
ConcreteDataType::Dictionary(dict_type) => {
|
||||
Self::deserialize_by_type(dict_type.value_type(), deserializer)
|
||||
}
|
||||
data_type => Self::deserialize_by_type(data_type, deserializer),
|
||||
}
|
||||
Self::deserialize_by_type(self.encode_data_type(), deserializer)
|
||||
}
|
||||
|
||||
fn deserialize_by_type<B: Buf>(
|
||||
@@ -301,12 +296,7 @@ impl SortField {
|
||||
return Ok(1);
|
||||
}
|
||||
|
||||
match &self.data_type {
|
||||
ConcreteDataType::Dictionary(dict_type) => {
|
||||
Self::skip_deserialize_by_type(dict_type.value_type(), bytes, deserializer)
|
||||
}
|
||||
data_type => Self::skip_deserialize_by_type(data_type, bytes, deserializer),
|
||||
}
|
||||
Self::skip_deserialize_by_type(self.encode_data_type(), bytes, deserializer)
|
||||
}
|
||||
|
||||
fn skip_deserialize_by_type(
|
||||
|
||||
@@ -872,9 +872,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
|
||||
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
|
||||
test_list_ssts_with_format(true, r#"
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
|
||||
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
|
||||
r#"
|
||||
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
|
||||
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }
|
||||
|
||||
@@ -95,21 +95,32 @@ mod tests {
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::OpType;
|
||||
use api::v1::{OpType, SemanticType};
|
||||
use common_function::function::FunctionRef;
|
||||
use common_function::function_factory::ScalarFunctionFactory;
|
||||
use common_function::scalars::matches::MatchesFunction;
|
||||
use common_function::scalars::matches_term::MatchesTermFunction;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::{Column, ScalarValue};
|
||||
use datafusion_expr::expr::ScalarFunction;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{
|
||||
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
|
||||
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
|
||||
TimestampMillisecondArray, UInt8Array, UInt64Array,
|
||||
};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
|
||||
use object_store::ObjectStore;
|
||||
use parquet::arrow::AsyncArrowWriter;
|
||||
use parquet::basic::{Compression, Encoding, ZstdLevel};
|
||||
use parquet::file::metadata::KeyValue;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use store_api::codec::PrimaryKeyEncoding;
|
||||
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
|
||||
use store_api::region_request::PathType;
|
||||
use store_api::storage::{ColumnSchema, RegionId};
|
||||
use table::predicate::Predicate;
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
|
||||
@@ -122,6 +133,7 @@ mod tests {
|
||||
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
|
||||
use crate::sst::file_purger::NoopFilePurger;
|
||||
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
|
||||
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
|
||||
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
|
||||
use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
|
||||
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
|
||||
@@ -133,11 +145,13 @@ mod tests {
|
||||
use crate::test_util::sst_util::{
|
||||
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
|
||||
new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
|
||||
sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
|
||||
new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
|
||||
sst_region_metadata_with_encoding,
|
||||
};
|
||||
use crate::test_util::{TestEnv, check_reader_result};
|
||||
|
||||
const FILE_DIR: &str = "/";
|
||||
const REGION_ID: RegionId = RegionId::new(0, 0);
|
||||
|
||||
#[derive(Clone)]
|
||||
struct FixedPathProvider {
|
||||
@@ -1064,6 +1078,154 @@ mod tests {
|
||||
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
|
||||
}
|
||||
|
||||
/// Creates a flat format RecordBatch for testing with sparse primary key encoding.
|
||||
/// Similar to `new_record_batch_by_range` but without individual primary key columns.
|
||||
fn new_record_batch_by_range_sparse(
|
||||
tags: &[&str],
|
||||
start: usize,
|
||||
end: usize,
|
||||
metadata: &Arc<RegionMetadata>,
|
||||
) -> RecordBatch {
|
||||
assert!(end >= start);
|
||||
let flat_schema = to_flat_sst_arrow_schema(
|
||||
metadata,
|
||||
&FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
|
||||
);
|
||||
|
||||
let num_rows = end - start;
|
||||
let mut columns: Vec<ArrayRef> = Vec::new();
|
||||
|
||||
// NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
|
||||
|
||||
// Add field column (field_0)
|
||||
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
|
||||
columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
|
||||
|
||||
// Add time index column (ts)
|
||||
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
|
||||
|
||||
// Add encoded primary key column using sparse encoding
|
||||
let table_id = 1u32; // Test table ID
|
||||
let tsid = 100u64; // Base TSID
|
||||
let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
|
||||
|
||||
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
|
||||
for _ in 0..num_rows {
|
||||
pk_builder.append(&pk).unwrap();
|
||||
}
|
||||
columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
|
||||
|
||||
// Add sequence column
|
||||
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
|
||||
|
||||
// Add op_type column
|
||||
columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
|
||||
|
||||
RecordBatch::try_new(flat_schema, columns).unwrap()
|
||||
}
|
||||
|
||||
/// Helper function to create IndexerBuilderImpl for tests.
|
||||
fn create_test_indexer_builder(
|
||||
env: &TestEnv,
|
||||
object_store: ObjectStore,
|
||||
file_path: RegionFilePathFactory,
|
||||
metadata: Arc<RegionMetadata>,
|
||||
row_group_size: usize,
|
||||
) -> IndexerBuilderImpl {
|
||||
let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
|
||||
let intermediate_manager = env.get_intermediate_manager();
|
||||
|
||||
IndexerBuilderImpl {
|
||||
build_type: IndexBuildType::Flush,
|
||||
metadata,
|
||||
row_group_size,
|
||||
puffin_manager,
|
||||
write_cache_enabled: false,
|
||||
intermediate_manager,
|
||||
index_options: IndexOptions {
|
||||
inverted_index: InvertedIndexOptions {
|
||||
segment_row_count: 1,
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
inverted_index_config: Default::default(),
|
||||
fulltext_index_config: Default::default(),
|
||||
bloom_filter_index_config: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to write flat SST and return SstInfo.
|
||||
async fn write_flat_sst(
|
||||
object_store: ObjectStore,
|
||||
metadata: Arc<RegionMetadata>,
|
||||
indexer_builder: IndexerBuilderImpl,
|
||||
file_path: RegionFilePathFactory,
|
||||
flat_source: FlatSource,
|
||||
write_opts: &WriteOptions,
|
||||
) -> SstInfo {
|
||||
let mut metrics = Metrics::new(WriteType::Flush);
|
||||
let mut writer = ParquetWriter::new_with_object_store(
|
||||
object_store,
|
||||
metadata,
|
||||
IndexConfig::default(),
|
||||
indexer_builder,
|
||||
file_path,
|
||||
&mut metrics,
|
||||
)
|
||||
.await;
|
||||
|
||||
writer
|
||||
.write_all_flat(flat_source, write_opts)
|
||||
.await
|
||||
.unwrap()
|
||||
.remove(0)
|
||||
}
|
||||
|
||||
/// Helper function to create FileHandle from SstInfo.
|
||||
fn create_file_handle_from_sst_info(
|
||||
info: &SstInfo,
|
||||
metadata: &Arc<RegionMetadata>,
|
||||
) -> FileHandle {
|
||||
FileHandle::new(
|
||||
FileMeta {
|
||||
region_id: metadata.region_id,
|
||||
file_id: info.file_id,
|
||||
time_range: info.time_range,
|
||||
level: 0,
|
||||
file_size: info.file_size,
|
||||
max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
|
||||
available_indexes: info.index_metadata.build_available_indexes(),
|
||||
indexes: info.index_metadata.build_indexes(),
|
||||
index_file_size: info.index_metadata.file_size,
|
||||
index_version: 0,
|
||||
num_row_groups: info.num_row_groups,
|
||||
num_rows: info.num_rows as u64,
|
||||
sequence: None,
|
||||
partition_expr: match &metadata.partition_expr {
|
||||
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
|
||||
.expect("partition expression should be valid JSON"),
|
||||
None => None,
|
||||
},
|
||||
num_series: 0,
|
||||
},
|
||||
Arc::new(NoopFilePurger),
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper function to create test cache with standard settings.
|
||||
fn create_test_cache() -> Arc<CacheManager> {
|
||||
Arc::new(
|
||||
CacheManager::builder()
|
||||
.index_result_cache_size(1024 * 1024)
|
||||
.index_metadata_size(1024 * 1024)
|
||||
.index_content_page_size(1024 * 1024)
|
||||
.index_content_size(1024 * 1024)
|
||||
.puffin_metadata_size(1024 * 1024)
|
||||
.build(),
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_with_index() {
|
||||
let mut env = TestEnv::new().await;
|
||||
@@ -1238,4 +1400,709 @@ mod tests {
|
||||
assert_eq!(*override_batch, expected_batch);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_read_with_inverted_index() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let row_group_size = 100;
|
||||
|
||||
// Create flat format RecordBatches with non-overlapping timestamp ranges
|
||||
// Each batch becomes one row group (row_group_size = 100)
|
||||
// Data: ts tag_0 tag_1
|
||||
// RG 0: 0-50 [a, d]
|
||||
// RG 0: 50-100 [b, d]
|
||||
// RG 1: 100-150 [c, d]
|
||||
// RG 1: 150-200 [c, f]
|
||||
let flat_batches = vec![
|
||||
new_record_batch_by_range(&["a", "d"], 0, 50),
|
||||
new_record_batch_by_range(&["b", "d"], 50, 100),
|
||||
new_record_batch_by_range(&["c", "d"], 100, 150),
|
||||
new_record_batch_by_range(&["c", "f"], 150, 200),
|
||||
];
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(flat_batches);
|
||||
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path.clone(),
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(200, info.num_rows);
|
||||
assert!(info.file_size > 0);
|
||||
assert!(info.index_metadata.file_size > 0);
|
||||
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let cache = create_test_cache();
|
||||
|
||||
// Test 1: Filter by tag_0 = "b"
|
||||
// Expected: Only rows with tag_0="b"
|
||||
let preds = vec![col("tag_0").eq(lit("b"))];
|
||||
let inverted_index_applier = InvertedIndexApplierBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
object_store.clone(),
|
||||
&metadata,
|
||||
HashSet::from_iter([0]),
|
||||
env.get_puffin_manager(),
|
||||
)
|
||||
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
|
||||
.with_inverted_index_cache(cache.inverted_index_cache().cloned())
|
||||
.build(&preds)
|
||||
.unwrap()
|
||||
.map(Arc::new);
|
||||
|
||||
let builder = ParquetReaderBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(preds)))
|
||||
.inverted_index_appliers([inverted_index_applier.clone(), None])
|
||||
.cache(CacheStrategy::EnableAll(cache.clone()));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
|
||||
|
||||
// Verify selection contains only RG 0 (tag_0="b", ts 0-100)
|
||||
assert_eq!(selection.row_group_count(), 1);
|
||||
assert_eq!(50, selection.get(0).unwrap().row_count());
|
||||
|
||||
// Verify filtering metrics
|
||||
assert_eq!(metrics.filter_metrics.rg_total, 2);
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
|
||||
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_read_with_bloom_filter() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(sst_region_metadata());
|
||||
let row_group_size = 100;
|
||||
|
||||
// Create flat format RecordBatches with non-overlapping timestamp ranges
|
||||
// Each batch becomes one row group (row_group_size = 100)
|
||||
// Data: ts tag_0 tag_1
|
||||
// RG 0: 0-50 [a, d]
|
||||
// RG 0: 50-100 [b, e]
|
||||
// RG 1: 100-150 [c, d]
|
||||
// RG 1: 150-200 [c, f]
|
||||
let flat_batches = vec![
|
||||
new_record_batch_by_range(&["a", "d"], 0, 50),
|
||||
new_record_batch_by_range(&["b", "e"], 50, 100),
|
||||
new_record_batch_by_range(&["c", "d"], 100, 150),
|
||||
new_record_batch_by_range(&["c", "f"], 150, 200),
|
||||
];
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(flat_batches);
|
||||
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path.clone(),
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(200, info.num_rows);
|
||||
assert!(info.file_size > 0);
|
||||
assert!(info.index_metadata.file_size > 0);
|
||||
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let cache = create_test_cache();
|
||||
|
||||
// Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
|
||||
// Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
|
||||
let preds = vec![
|
||||
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
|
||||
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
|
||||
col("tag_1").eq(lit("d")),
|
||||
];
|
||||
let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
object_store.clone(),
|
||||
&metadata,
|
||||
env.get_puffin_manager(),
|
||||
)
|
||||
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
|
||||
.with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
|
||||
.build(&preds)
|
||||
.unwrap()
|
||||
.map(Arc::new);
|
||||
|
||||
let builder = ParquetReaderBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(preds)))
|
||||
.bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
|
||||
.cache(CacheStrategy::EnableAll(cache.clone()));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
|
||||
|
||||
// Verify selection contains RG 0 and RG 1
|
||||
assert_eq!(selection.row_group_count(), 2);
|
||||
assert_eq!(50, selection.get(0).unwrap().row_count());
|
||||
assert_eq!(50, selection.get(1).unwrap().row_count());
|
||||
|
||||
// Verify filtering metrics
|
||||
assert_eq!(metrics.filter_metrics.rg_total, 2);
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_read_with_inverted_index_sparse() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(sst_region_metadata_with_encoding(
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
));
|
||||
let row_group_size = 100;
|
||||
|
||||
// Create flat format RecordBatches with non-overlapping timestamp ranges
|
||||
// Each batch becomes one row group (row_group_size = 100)
|
||||
// Data: ts tag_0 tag_1
|
||||
// RG 0: 0-50 [a, d]
|
||||
// RG 0: 50-100 [b, d]
|
||||
// RG 1: 100-150 [c, d]
|
||||
// RG 1: 150-200 [c, f]
|
||||
let flat_batches = vec![
|
||||
new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
|
||||
new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
|
||||
new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
|
||||
new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
|
||||
];
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(flat_batches);
|
||||
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path.clone(),
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(200, info.num_rows);
|
||||
assert!(info.file_size > 0);
|
||||
assert!(info.index_metadata.file_size > 0);
|
||||
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let cache = create_test_cache();
|
||||
|
||||
// Test 1: Filter by tag_0 = "b"
|
||||
// Expected: Only rows with tag_0="b"
|
||||
let preds = vec![col("tag_0").eq(lit("b"))];
|
||||
let inverted_index_applier = InvertedIndexApplierBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
object_store.clone(),
|
||||
&metadata,
|
||||
HashSet::from_iter([0]),
|
||||
env.get_puffin_manager(),
|
||||
)
|
||||
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
|
||||
.with_inverted_index_cache(cache.inverted_index_cache().cloned())
|
||||
.build(&preds)
|
||||
.unwrap()
|
||||
.map(Arc::new);
|
||||
|
||||
let builder = ParquetReaderBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(preds)))
|
||||
.inverted_index_appliers([inverted_index_applier.clone(), None])
|
||||
.cache(CacheStrategy::EnableAll(cache.clone()));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
|
||||
|
||||
// RG 0 has 50 matching rows (tag_0="b")
|
||||
assert_eq!(selection.row_group_count(), 1);
|
||||
assert_eq!(50, selection.get(0).unwrap().row_count());
|
||||
|
||||
// Verify filtering metrics
|
||||
// Note: With sparse encoding, tag columns aren't stored separately,
|
||||
// so minmax filtering on tags doesn't work (only inverted index)
|
||||
assert_eq!(metrics.filter_metrics.rg_total, 2);
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
|
||||
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
|
||||
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_read_with_bloom_filter_sparse() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(sst_region_metadata_with_encoding(
|
||||
PrimaryKeyEncoding::Sparse,
|
||||
));
|
||||
let row_group_size = 100;
|
||||
|
||||
// Create flat format RecordBatches with non-overlapping timestamp ranges
|
||||
// Each batch becomes one row group (row_group_size = 100)
|
||||
// Data: ts tag_0 tag_1
|
||||
// RG 0: 0-50 [a, d]
|
||||
// RG 0: 50-100 [b, e]
|
||||
// RG 1: 100-150 [c, d]
|
||||
// RG 1: 150-200 [c, f]
|
||||
let flat_batches = vec![
|
||||
new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
|
||||
new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
|
||||
new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
|
||||
new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
|
||||
];
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(flat_batches);
|
||||
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path.clone(),
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(200, info.num_rows);
|
||||
assert!(info.file_size > 0);
|
||||
assert!(info.index_metadata.file_size > 0);
|
||||
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let cache = create_test_cache();
|
||||
|
||||
// Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
|
||||
// Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
|
||||
let preds = vec![
|
||||
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
|
||||
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
|
||||
col("tag_1").eq(lit("d")),
|
||||
];
|
||||
let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
object_store.clone(),
|
||||
&metadata,
|
||||
env.get_puffin_manager(),
|
||||
)
|
||||
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
|
||||
.with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
|
||||
.build(&preds)
|
||||
.unwrap()
|
||||
.map(Arc::new);
|
||||
|
||||
let builder = ParquetReaderBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(preds)))
|
||||
.bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
|
||||
.cache(CacheStrategy::EnableAll(cache.clone()));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
|
||||
|
||||
// Verify selection contains RG 0 and RG 1
|
||||
assert_eq!(selection.row_group_count(), 2);
|
||||
assert_eq!(50, selection.get(0).unwrap().row_count());
|
||||
assert_eq!(50, selection.get(1).unwrap().row_count());
|
||||
|
||||
// Verify filtering metrics
|
||||
assert_eq!(metrics.filter_metrics.rg_total, 2);
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
|
||||
}
|
||||
|
||||
/// Creates region metadata for testing fulltext indexes.
|
||||
/// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
|
||||
fn fulltext_region_metadata() -> RegionMetadata {
|
||||
let mut builder = RegionMetadataBuilder::new(REGION_ID);
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"tag_0".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: 0,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"text_bloom".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_fulltext_options(FulltextOptions {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::English,
|
||||
case_sensitive: false,
|
||||
backend: FulltextBackend::Bloom,
|
||||
granularity: 1,
|
||||
false_positive_rate_in_10000: 50,
|
||||
})
|
||||
.unwrap(),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 1,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"text_tantivy".to_string(),
|
||||
ConcreteDataType::string_datatype(),
|
||||
true,
|
||||
)
|
||||
.with_fulltext_options(FulltextOptions {
|
||||
enable: true,
|
||||
analyzer: FulltextAnalyzer::English,
|
||||
case_sensitive: false,
|
||||
backend: FulltextBackend::Tantivy,
|
||||
granularity: 1,
|
||||
false_positive_rate_in_10000: 50,
|
||||
})
|
||||
.unwrap(),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 2,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"field_0".to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 3,
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"ts".to_string(),
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 4,
|
||||
})
|
||||
.primary_key(vec![0]);
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
/// Creates a flat format RecordBatch with string fields for fulltext testing.
|
||||
fn new_fulltext_record_batch_by_range(
|
||||
tag: &str,
|
||||
text_bloom: &str,
|
||||
text_tantivy: &str,
|
||||
start: usize,
|
||||
end: usize,
|
||||
) -> RecordBatch {
|
||||
assert!(end >= start);
|
||||
let metadata = Arc::new(fulltext_region_metadata());
|
||||
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
|
||||
|
||||
let num_rows = end - start;
|
||||
let mut columns = Vec::new();
|
||||
|
||||
// Add primary key column (tag_0) as dictionary array
|
||||
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
|
||||
for _ in 0..num_rows {
|
||||
tag_builder.append_value(tag);
|
||||
}
|
||||
columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
|
||||
|
||||
// Add text_bloom field (fulltext with bloom backend)
|
||||
let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
|
||||
columns.push(Arc::new(StringArray::from(text_bloom_values)));
|
||||
|
||||
// Add text_tantivy field (fulltext with tantivy backend)
|
||||
let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
|
||||
columns.push(Arc::new(StringArray::from(text_tantivy_values)));
|
||||
|
||||
// Add field column (field_0)
|
||||
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
|
||||
columns.push(Arc::new(UInt64Array::from(field_values)));
|
||||
|
||||
// Add time index column (ts)
|
||||
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
|
||||
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
|
||||
|
||||
// Add encoded primary key column
|
||||
let pk = new_primary_key(&[tag]);
|
||||
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
|
||||
for _ in 0..num_rows {
|
||||
pk_builder.append(&pk).unwrap();
|
||||
}
|
||||
columns.push(Arc::new(pk_builder.finish()));
|
||||
|
||||
// Add sequence column
|
||||
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
|
||||
|
||||
// Add op_type column
|
||||
columns.push(Arc::new(UInt8Array::from_value(
|
||||
OpType::Put as u8,
|
||||
num_rows,
|
||||
)));
|
||||
|
||||
RecordBatch::try_new(flat_schema, columns).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_write_flat_read_with_fulltext_index() {
|
||||
let mut env = TestEnv::new().await;
|
||||
let object_store = env.init_object_store_manager();
|
||||
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
|
||||
let metadata = Arc::new(fulltext_region_metadata());
|
||||
let row_group_size = 50;
|
||||
|
||||
// Create flat format RecordBatches with different text content
|
||||
// RG 0: 0-50 tag="a", bloom="hello world", tantivy="quick brown fox"
|
||||
// RG 1: 50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
|
||||
// RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
|
||||
// RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
|
||||
let flat_batches = vec![
|
||||
new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
|
||||
new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
|
||||
new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
|
||||
new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
|
||||
];
|
||||
|
||||
let flat_source = new_flat_source_from_record_batches(flat_batches);
|
||||
|
||||
let write_opts = WriteOptions {
|
||||
row_group_size,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let indexer_builder = create_test_indexer_builder(
|
||||
&env,
|
||||
object_store.clone(),
|
||||
file_path.clone(),
|
||||
metadata.clone(),
|
||||
row_group_size,
|
||||
);
|
||||
|
||||
let mut info = write_flat_sst(
|
||||
object_store.clone(),
|
||||
metadata.clone(),
|
||||
indexer_builder,
|
||||
file_path.clone(),
|
||||
flat_source,
|
||||
&write_opts,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(200, info.num_rows);
|
||||
assert!(info.file_size > 0);
|
||||
assert!(info.index_metadata.file_size > 0);
|
||||
|
||||
// Verify fulltext indexes were created
|
||||
assert!(info.index_metadata.fulltext_index.index_size > 0);
|
||||
assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
|
||||
// text_bloom (column_id 1) and text_tantivy (column_id 2)
|
||||
info.index_metadata.fulltext_index.columns.sort_unstable();
|
||||
assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
|
||||
|
||||
assert_eq!(
|
||||
(
|
||||
Timestamp::new_millisecond(0),
|
||||
Timestamp::new_millisecond(199)
|
||||
),
|
||||
info.time_range
|
||||
);
|
||||
|
||||
let handle = create_file_handle_from_sst_info(&info, &metadata);
|
||||
|
||||
let cache = create_test_cache();
|
||||
|
||||
// Helper functions to create fulltext function expressions
|
||||
let matches_func = || {
|
||||
Arc::new(
|
||||
ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
|
||||
.provide(Default::default()),
|
||||
)
|
||||
};
|
||||
|
||||
let matches_term_func = || {
|
||||
Arc::new(
|
||||
ScalarFunctionFactory::from(
|
||||
Arc::new(MatchesTermFunction::default()) as FunctionRef,
|
||||
)
|
||||
.provide(Default::default()),
|
||||
)
|
||||
};
|
||||
|
||||
// Test 1: Filter by text_bloom field using matches_term (bloom backend)
|
||||
// Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
|
||||
let preds = vec![Expr::ScalarFunction(ScalarFunction {
|
||||
args: vec![col("text_bloom"), "hello".lit()],
|
||||
func: matches_term_func(),
|
||||
})];
|
||||
|
||||
let fulltext_applier = FulltextIndexApplierBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
object_store.clone(),
|
||||
env.get_puffin_manager(),
|
||||
&metadata,
|
||||
)
|
||||
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
|
||||
.with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
|
||||
.build(&preds)
|
||||
.unwrap()
|
||||
.map(Arc::new);
|
||||
|
||||
let builder = ParquetReaderBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(preds)))
|
||||
.fulltext_index_appliers([None, fulltext_applier.clone()])
|
||||
.cache(CacheStrategy::EnableAll(cache.clone()));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
|
||||
|
||||
// Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
|
||||
assert_eq!(selection.row_group_count(), 2);
|
||||
assert_eq!(50, selection.get(0).unwrap().row_count());
|
||||
assert_eq!(50, selection.get(1).unwrap().row_count());
|
||||
|
||||
// Verify filtering metrics
|
||||
assert_eq!(metrics.filter_metrics.rg_total, 4);
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
|
||||
assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
|
||||
|
||||
// Test 2: Filter by text_tantivy field using matches (tantivy backend)
|
||||
// Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
|
||||
let preds = vec![Expr::ScalarFunction(ScalarFunction {
|
||||
args: vec![col("text_tantivy"), "lazy".lit()],
|
||||
func: matches_func(),
|
||||
})];
|
||||
|
||||
let fulltext_applier = FulltextIndexApplierBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
object_store.clone(),
|
||||
env.get_puffin_manager(),
|
||||
&metadata,
|
||||
)
|
||||
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
|
||||
.with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
|
||||
.build(&preds)
|
||||
.unwrap()
|
||||
.map(Arc::new);
|
||||
|
||||
let builder = ParquetReaderBuilder::new(
|
||||
FILE_DIR.to_string(),
|
||||
PathType::Bare,
|
||||
handle.clone(),
|
||||
object_store.clone(),
|
||||
)
|
||||
.flat_format(true)
|
||||
.predicate(Some(Predicate::new(preds)))
|
||||
.fulltext_index_appliers([None, fulltext_applier.clone()])
|
||||
.cache(CacheStrategy::EnableAll(cache.clone()));
|
||||
|
||||
let mut metrics = ReaderMetrics::default();
|
||||
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
|
||||
|
||||
// Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
|
||||
assert_eq!(selection.row_group_count(), 2);
|
||||
assert_eq!(50, selection.get(2).unwrap().row_count());
|
||||
assert_eq!(50, selection.get(3).unwrap().row_count());
|
||||
|
||||
// Verify filtering metrics
|
||||
assert_eq!(metrics.filter_metrics.rg_total, 4);
|
||||
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
|
||||
assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
|
||||
assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +29,7 @@ use tokio::sync::mpsc::Sender;
|
||||
use crate::access_layer::{AccessLayer, AccessLayerRef};
|
||||
use crate::cache::CacheManager;
|
||||
use crate::compaction::CompactionScheduler;
|
||||
use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
|
||||
use crate::compaction::memory_manager::new_compaction_memory_manager;
|
||||
use crate::config::MitoConfig;
|
||||
use crate::error::Result;
|
||||
use crate::flush::FlushScheduler;
|
||||
|
||||
@@ -27,6 +27,10 @@ use parquet::file::metadata::ParquetMetaData;
|
||||
use store_api::metadata::{
|
||||
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
|
||||
};
|
||||
use store_api::metric_engine_consts::{
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
|
||||
};
|
||||
use store_api::storage::consts::ReservedColumnId;
|
||||
use store_api::storage::{FileId, RegionId};
|
||||
|
||||
use crate::read::{Batch, BatchBuilder, Source};
|
||||
@@ -36,11 +40,44 @@ use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger};
|
||||
/// Test region id.
|
||||
const REGION_ID: RegionId = RegionId::new(0, 0);
|
||||
|
||||
/// Creates a new region metadata for testing SSTs.
|
||||
/// Creates a new region metadata for testing SSTs with specified encoding.
|
||||
///
|
||||
/// Schema: tag_0, tag_1, field_0, ts
|
||||
pub fn sst_region_metadata() -> RegionMetadata {
|
||||
/// Dense schema: tag_0, tag_1, field_0, ts
|
||||
/// Sparse schema: __table_id, __tsid, tag_0, tag_1, field_0, ts
|
||||
pub fn sst_region_metadata_with_encoding(
|
||||
encoding: store_api::codec::PrimaryKeyEncoding,
|
||||
) -> RegionMetadata {
|
||||
let mut builder = RegionMetadataBuilder::new(REGION_ID);
|
||||
|
||||
// For sparse encoding, add internal columns first
|
||||
if encoding == store_api::codec::PrimaryKeyEncoding::Sparse {
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint32_datatype(),
|
||||
false,
|
||||
)
|
||||
.with_skipping_options(SkippingIndexOptions {
|
||||
granularity: 1,
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap(),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: ReservedColumnId::table_id(),
|
||||
})
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
|
||||
ConcreteDataType::uint64_datatype(),
|
||||
false,
|
||||
),
|
||||
semantic_type: SemanticType::Tag,
|
||||
column_id: ReservedColumnId::tsid(),
|
||||
});
|
||||
}
|
||||
|
||||
// Add user-defined columns (tag_0, tag_1, field_0, ts)
|
||||
builder
|
||||
.push_column_metadata(ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
@@ -83,12 +120,32 @@ pub fn sst_region_metadata() -> RegionMetadata {
|
||||
),
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
column_id: 3,
|
||||
})
|
||||
.primary_key(vec![0, 1]);
|
||||
});
|
||||
|
||||
// Set primary key based on encoding
|
||||
if encoding == store_api::codec::PrimaryKeyEncoding::Sparse {
|
||||
builder.primary_key(vec![
|
||||
ReservedColumnId::table_id(),
|
||||
ReservedColumnId::tsid(),
|
||||
0, // tag_0
|
||||
1, // tag_1
|
||||
]);
|
||||
} else {
|
||||
builder.primary_key(vec![0, 1]); // Dense: just user tags
|
||||
}
|
||||
|
||||
builder.primary_key_encoding(encoding);
|
||||
builder.build().unwrap()
|
||||
}
|
||||
|
||||
/// Encodes a primary key for specific tags.
|
||||
/// Creates a new region metadata for testing SSTs.
|
||||
///
|
||||
/// Schema: tag_0, tag_1, field_0, ts
|
||||
pub fn sst_region_metadata() -> RegionMetadata {
|
||||
sst_region_metadata_with_encoding(store_api::codec::PrimaryKeyEncoding::Dense)
|
||||
}
|
||||
|
||||
/// Encodes a primary key for specific tags using dense encoding.
|
||||
pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
|
||||
let fields = (0..tags.len())
|
||||
.map(|idx| {
|
||||
@@ -104,6 +161,31 @@ pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Encodes a primary key for specific tags using sparse encoding.
|
||||
/// Includes internal columns (table_id, tsid) required by sparse format.
|
||||
pub fn new_sparse_primary_key(
|
||||
tags: &[&str],
|
||||
metadata: &Arc<RegionMetadata>,
|
||||
table_id: u32,
|
||||
tsid: u64,
|
||||
) -> Vec<u8> {
|
||||
use mito_codec::row_converter::PrimaryKeyCodec;
|
||||
|
||||
let codec = mito_codec::row_converter::SparsePrimaryKeyCodec::new(metadata);
|
||||
|
||||
// Sparse encoding requires internal columns first, then user tags
|
||||
let values = vec![
|
||||
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
|
||||
(ReservedColumnId::tsid(), ValueRef::UInt64(tsid)),
|
||||
(0, ValueRef::String(tags[0])), // tag_0
|
||||
(1, ValueRef::String(tags[1])), // tag_1
|
||||
];
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
codec.encode_value_refs(&values, &mut buffer).unwrap();
|
||||
buffer
|
||||
}
|
||||
|
||||
/// Creates a [Source] from `batches`.
|
||||
pub fn new_source(batches: &[Batch]) -> Source {
|
||||
let reader = VecBatchReader::new(batches);
|
||||
|
||||
Reference in New Issue
Block a user