From fed6cb08066d1dc365575b2710e3691b029f60de Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 19 Dec 2025 15:36:44 +0800 Subject: [PATCH] fix: flat format use correct encoding in indexer for tags (#7440) * test: add inverted and skipping test Signed-off-by: evenyag * test: Add tests for fulltext index Signed-off-by: evenyag * fix: index dictionary type in correct encoding in flat format Signed-off-by: evenyag * refactor: use encode_data_type() in SortField Signed-off-by: evenyag * refactor: refine imports Signed-off-by: evenyag * test: add tests for sparse encoding Signed-off-by: evenyag * chore: remove logs Signed-off-by: evenyag * test: update list test Signed-off-by: evenyag * test: simplify tests Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito-codec/src/index.rs | 2 +- src/mito-codec/src/row_converter/dense.rs | 36 +- src/mito2/src/engine/basic_test.rs | 6 +- src/mito2/src/sst/parquet.rs | 873 +++++++++++++++++++++- src/mito2/src/test_util/scheduler_util.rs | 2 +- src/mito2/src/test_util/sst_util.rs | 94 ++- 6 files changed, 976 insertions(+), 37 deletions(-) diff --git a/src/mito-codec/src/index.rs b/src/mito-codec/src/index.rs index d98a6d3a51..539a3adedd 100644 --- a/src/mito-codec/src/index.rs +++ b/src/mito-codec/src/index.rs @@ -48,7 +48,7 @@ impl IndexValueCodec { ) -> Result<()> { ensure!(!value.is_null(), IndexEncodeNullSnafu); - if field.data_type().is_string() { + if field.encode_data_type().is_string() { let value = value .try_into_string() .context(FieldTypeMismatchSnafu)? diff --git a/src/mito-codec/src/row_converter/dense.rs b/src/mito-codec/src/row_converter/dense.rs index 76c2d65d5a..961e36115a 100644 --- a/src/mito-codec/src/row_converter/dense.rs +++ b/src/mito-codec/src/row_converter/dense.rs @@ -57,15 +57,20 @@ impl SortField { &self.data_type } - pub fn estimated_size(&self) -> usize { + /// Returns the physical data type to encode of the field. + /// + /// For example, a dictionary field will be encoded as its value type. + pub fn encode_data_type(&self) -> &ConcreteDataType { match &self.data_type { - ConcreteDataType::Dictionary(dict_type) => { - Self::estimated_size_by_type(dict_type.value_type()) - } - data_type => Self::estimated_size_by_type(data_type), + ConcreteDataType::Dictionary(dict_type) => dict_type.value_type(), + _ => &self.data_type, } } + pub fn estimated_size(&self) -> usize { + Self::estimated_size_by_type(self.encode_data_type()) + } + fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize { match data_type { ConcreteDataType::Boolean(_) => 2, @@ -98,12 +103,7 @@ impl SortField { serializer: &mut Serializer<&mut Vec>, value: &ValueRef, ) -> Result<()> { - match self.data_type() { - ConcreteDataType::Dictionary(dict_type) => { - Self::serialize_by_type(dict_type.value_type(), serializer, value) - } - data_type => Self::serialize_by_type(data_type, serializer, value), - } + Self::serialize_by_type(self.encode_data_type(), serializer, value) } fn serialize_by_type( @@ -194,12 +194,7 @@ impl SortField { /// Deserialize a value from the deserializer. pub fn deserialize(&self, deserializer: &mut Deserializer) -> Result { - match &self.data_type { - ConcreteDataType::Dictionary(dict_type) => { - Self::deserialize_by_type(dict_type.value_type(), deserializer) - } - data_type => Self::deserialize_by_type(data_type, deserializer), - } + Self::deserialize_by_type(self.encode_data_type(), deserializer) } fn deserialize_by_type( @@ -301,12 +296,7 @@ impl SortField { return Ok(1); } - match &self.data_type { - ConcreteDataType::Dictionary(dict_type) => { - Self::skip_deserialize_by_type(dict_type.value_type(), bytes, deserializer) - } - data_type => Self::skip_deserialize_by_type(data_type, bytes, deserializer), - } + Self::skip_deserialize_by_type(self.encode_data_type(), bytes, deserializer) } fn skip_deserialize_by_type( diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 88303d3f70..75da86ff80 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -872,9 +872,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/.puffin", file_s StorageSstEntry { file_path: "test/22_0000000042/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/22_0000000042/index/.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await; test_list_ssts_with_format(true, r#" -ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } -ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, +ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000001/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "", index_version: 0, level: 0, file_path: "test/11_0000000002/.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true } +ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "", index_version: 0, level: 0, file_path: "test/22_0000000042/.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#, r#" StorageSstEntry { file_path: "test/11_0000000001/.parquet", file_size: None, last_modified_ms: None, node_id: None } StorageSstEntry { file_path: "test/11_0000000001/index/.puffin", file_size: None, last_modified_ms: None, node_id: None } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 21187bedd3..41e3113d55 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -95,21 +95,32 @@ mod tests { use std::collections::HashSet; use std::sync::Arc; - use api::v1::OpType; + use api::v1::{OpType, SemanticType}; + use common_function::function::FunctionRef; + use common_function::function_factory::ScalarFunctionFactory; + use common_function::scalars::matches::MatchesFunction; + use common_function::scalars::matches_term::MatchesTermFunction; use common_time::Timestamp; use datafusion_common::{Column, ScalarValue}; + use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit}; use datatypes::arrow; use datatypes::arrow::array::{ - ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder, + ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder, TimestampMillisecondArray, UInt8Array, UInt64Array, }; use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type}; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions}; + use object_store::ObjectStore; use parquet::arrow::AsyncArrowWriter; use parquet::basic::{Compression, Encoding, ZstdLevel}; use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; + use store_api::codec::PrimaryKeyEncoding; + use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; use store_api::region_request::PathType; + use store_api::storage::{ColumnSchema, RegionId}; use table::predicate::Predicate; use tokio_util::compat::FuturesAsyncWriteCompatExt; @@ -122,6 +133,7 @@ mod tests { use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId}; use crate::sst::file_purger::NoopFilePurger; use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder; + use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl}; use crate::sst::parquet::format::PrimaryKeyWriteFormat; @@ -133,11 +145,13 @@ mod tests { use crate::test_util::sst_util::{ assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range, new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source, - sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata, + new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata, + sst_region_metadata_with_encoding, }; use crate::test_util::{TestEnv, check_reader_result}; const FILE_DIR: &str = "/"; + const REGION_ID: RegionId = RegionId::new(0, 0); #[derive(Clone)] struct FixedPathProvider { @@ -1064,6 +1078,154 @@ mod tests { FlatSource::Iter(Box::new(batches.into_iter().map(Ok))) } + /// Creates a flat format RecordBatch for testing with sparse primary key encoding. + /// Similar to `new_record_batch_by_range` but without individual primary key columns. + fn new_record_batch_by_range_sparse( + tags: &[&str], + start: usize, + end: usize, + metadata: &Arc, + ) -> RecordBatch { + assert!(end >= start); + let flat_schema = to_flat_sst_arrow_schema( + metadata, + &FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse), + ); + + let num_rows = end - start; + let mut columns: Vec = Vec::new(); + + // NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format + + // Add field column (field_0) + let field_values: Vec = (start..end).map(|v| v as u64).collect(); + columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef); + + // Add time index column (ts) + let timestamps: Vec = (start..end).map(|v| v as i64).collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef); + + // Add encoded primary key column using sparse encoding + let table_id = 1u32; // Test table ID + let tsid = 100u64; // Base TSID + let pk = new_sparse_primary_key(tags, metadata, table_id, tsid); + + let mut pk_builder = BinaryDictionaryBuilder::::new(); + for _ in 0..num_rows { + pk_builder.append(&pk).unwrap(); + } + columns.push(Arc::new(pk_builder.finish()) as ArrayRef); + + // Add sequence column + columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef); + + // Add op_type column + columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef); + + RecordBatch::try_new(flat_schema, columns).unwrap() + } + + /// Helper function to create IndexerBuilderImpl for tests. + fn create_test_indexer_builder( + env: &TestEnv, + object_store: ObjectStore, + file_path: RegionFilePathFactory, + metadata: Arc, + row_group_size: usize, + ) -> IndexerBuilderImpl { + let puffin_manager = env.get_puffin_manager().build(object_store, file_path); + let intermediate_manager = env.get_intermediate_manager(); + + IndexerBuilderImpl { + build_type: IndexBuildType::Flush, + metadata, + row_group_size, + puffin_manager, + write_cache_enabled: false, + intermediate_manager, + index_options: IndexOptions { + inverted_index: InvertedIndexOptions { + segment_row_count: 1, + ..Default::default() + }, + }, + inverted_index_config: Default::default(), + fulltext_index_config: Default::default(), + bloom_filter_index_config: Default::default(), + } + } + + /// Helper function to write flat SST and return SstInfo. + async fn write_flat_sst( + object_store: ObjectStore, + metadata: Arc, + indexer_builder: IndexerBuilderImpl, + file_path: RegionFilePathFactory, + flat_source: FlatSource, + write_opts: &WriteOptions, + ) -> SstInfo { + let mut metrics = Metrics::new(WriteType::Flush); + let mut writer = ParquetWriter::new_with_object_store( + object_store, + metadata, + IndexConfig::default(), + indexer_builder, + file_path, + &mut metrics, + ) + .await; + + writer + .write_all_flat(flat_source, write_opts) + .await + .unwrap() + .remove(0) + } + + /// Helper function to create FileHandle from SstInfo. + fn create_file_handle_from_sst_info( + info: &SstInfo, + metadata: &Arc, + ) -> FileHandle { + FileHandle::new( + FileMeta { + region_id: metadata.region_id, + file_id: info.file_id, + time_range: info.time_range, + level: 0, + file_size: info.file_size, + max_row_group_uncompressed_size: info.max_row_group_uncompressed_size, + available_indexes: info.index_metadata.build_available_indexes(), + indexes: info.index_metadata.build_indexes(), + index_file_size: info.index_metadata.file_size, + index_version: 0, + num_row_groups: info.num_row_groups, + num_rows: info.num_rows as u64, + sequence: None, + partition_expr: match &metadata.partition_expr { + Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str) + .expect("partition expression should be valid JSON"), + None => None, + }, + num_series: 0, + }, + Arc::new(NoopFilePurger), + ) + } + + /// Helper function to create test cache with standard settings. + fn create_test_cache() -> Arc { + Arc::new( + CacheManager::builder() + .index_result_cache_size(1024 * 1024) + .index_metadata_size(1024 * 1024) + .index_content_page_size(1024 * 1024) + .index_content_size(1024 * 1024) + .puffin_metadata_size(1024 * 1024) + .build(), + ) + } + #[tokio::test] async fn test_write_flat_with_index() { let mut env = TestEnv::new().await; @@ -1238,4 +1400,709 @@ mod tests { assert_eq!(*override_batch, expected_batch); } } + + #[tokio::test] + async fn test_write_flat_read_with_inverted_index() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata()); + let row_group_size = 100; + + // Create flat format RecordBatches with non-overlapping timestamp ranges + // Each batch becomes one row group (row_group_size = 100) + // Data: ts tag_0 tag_1 + // RG 0: 0-50 [a, d] + // RG 0: 50-100 [b, d] + // RG 1: 100-150 [c, d] + // RG 1: 150-200 [c, f] + let flat_batches = vec![ + new_record_batch_by_range(&["a", "d"], 0, 50), + new_record_batch_by_range(&["b", "d"], 50, 100), + new_record_batch_by_range(&["c", "d"], 100, 150), + new_record_batch_by_range(&["c", "f"], 150, 200), + ]; + + let flat_source = new_flat_source_from_record_batches(flat_batches); + + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + + let info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + flat_source, + &write_opts, + ) + .await; + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let cache = create_test_cache(); + + // Test 1: Filter by tag_0 = "b" + // Expected: Only rows with tag_0="b" + let preds = vec![col("tag_0").eq(lit("b"))]; + let inverted_index_applier = InvertedIndexApplierBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + object_store.clone(), + &metadata, + HashSet::from_iter([0]), + env.get_puffin_manager(), + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_inverted_index_cache(cache.inverted_index_cache().cloned()) + .build(&preds) + .unwrap() + .map(Arc::new); + + let builder = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + handle.clone(), + object_store.clone(), + ) + .flat_format(true) + .predicate(Some(Predicate::new(preds))) + .inverted_index_appliers([inverted_index_applier.clone(), None]) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + + // Verify selection contains only RG 0 (tag_0="b", ts 0-100) + assert_eq!(selection.row_group_count(), 1); + assert_eq!(50, selection.get(0).unwrap().row_count()); + + // Verify filtering metrics + assert_eq!(metrics.filter_metrics.rg_total, 2); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1); + assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0); + assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50); + } + + #[tokio::test] + async fn test_write_flat_read_with_bloom_filter() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata()); + let row_group_size = 100; + + // Create flat format RecordBatches with non-overlapping timestamp ranges + // Each batch becomes one row group (row_group_size = 100) + // Data: ts tag_0 tag_1 + // RG 0: 0-50 [a, d] + // RG 0: 50-100 [b, e] + // RG 1: 100-150 [c, d] + // RG 1: 150-200 [c, f] + let flat_batches = vec![ + new_record_batch_by_range(&["a", "d"], 0, 50), + new_record_batch_by_range(&["b", "e"], 50, 100), + new_record_batch_by_range(&["c", "d"], 100, 150), + new_record_batch_by_range(&["c", "f"], 150, 200), + ]; + + let flat_source = new_flat_source_from_record_batches(flat_batches); + + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + + let info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + flat_source, + &write_opts, + ) + .await; + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let cache = create_test_cache(); + + // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d" + // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d" + let preds = vec![ + col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))), + col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))), + col("tag_1").eq(lit("d")), + ]; + let bloom_filter_applier = BloomFilterIndexApplierBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + object_store.clone(), + &metadata, + env.get_puffin_manager(), + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned()) + .build(&preds) + .unwrap() + .map(Arc::new); + + let builder = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + handle.clone(), + object_store.clone(), + ) + .flat_format(true) + .predicate(Some(Predicate::new(preds))) + .bloom_filter_index_appliers([None, bloom_filter_applier.clone()]) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + + // Verify selection contains RG 0 and RG 1 + assert_eq!(selection.row_group_count(), 2); + assert_eq!(50, selection.get(0).unwrap().row_count()); + assert_eq!(50, selection.get(1).unwrap().row_count()); + + // Verify filtering metrics + assert_eq!(metrics.filter_metrics.rg_total, 2); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); + assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0); + assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100); + } + + #[tokio::test] + async fn test_write_flat_read_with_inverted_index_sparse() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let row_group_size = 100; + + // Create flat format RecordBatches with non-overlapping timestamp ranges + // Each batch becomes one row group (row_group_size = 100) + // Data: ts tag_0 tag_1 + // RG 0: 0-50 [a, d] + // RG 0: 50-100 [b, d] + // RG 1: 100-150 [c, d] + // RG 1: 150-200 [c, f] + let flat_batches = vec![ + new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata), + new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata), + new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata), + new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata), + ]; + + let flat_source = new_flat_source_from_record_batches(flat_batches); + + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + + let info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + flat_source, + &write_opts, + ) + .await; + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let cache = create_test_cache(); + + // Test 1: Filter by tag_0 = "b" + // Expected: Only rows with tag_0="b" + let preds = vec![col("tag_0").eq(lit("b"))]; + let inverted_index_applier = InvertedIndexApplierBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + object_store.clone(), + &metadata, + HashSet::from_iter([0]), + env.get_puffin_manager(), + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_inverted_index_cache(cache.inverted_index_cache().cloned()) + .build(&preds) + .unwrap() + .map(Arc::new); + + let builder = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + handle.clone(), + object_store.clone(), + ) + .flat_format(true) + .predicate(Some(Predicate::new(preds))) + .inverted_index_appliers([inverted_index_applier.clone(), None]) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + + // RG 0 has 50 matching rows (tag_0="b") + assert_eq!(selection.row_group_count(), 1); + assert_eq!(50, selection.get(0).unwrap().row_count()); + + // Verify filtering metrics + // Note: With sparse encoding, tag columns aren't stored separately, + // so minmax filtering on tags doesn't work (only inverted index) + assert_eq!(metrics.filter_metrics.rg_total, 2); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format + assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1); + assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150); + } + + #[tokio::test] + async fn test_write_flat_read_with_bloom_filter_sparse() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(sst_region_metadata_with_encoding( + PrimaryKeyEncoding::Sparse, + )); + let row_group_size = 100; + + // Create flat format RecordBatches with non-overlapping timestamp ranges + // Each batch becomes one row group (row_group_size = 100) + // Data: ts tag_0 tag_1 + // RG 0: 0-50 [a, d] + // RG 0: 50-100 [b, e] + // RG 1: 100-150 [c, d] + // RG 1: 150-200 [c, f] + let flat_batches = vec![ + new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata), + new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata), + new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata), + new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata), + ]; + + let flat_source = new_flat_source_from_record_batches(flat_batches); + + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + + let info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + flat_source, + &write_opts, + ) + .await; + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let cache = create_test_cache(); + + // Filter by ts >= 50 AND ts < 200 AND tag_1 = "d" + // Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d" + let preds = vec![ + col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))), + col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))), + col("tag_1").eq(lit("d")), + ]; + let bloom_filter_applier = BloomFilterIndexApplierBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + object_store.clone(), + &metadata, + env.get_puffin_manager(), + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned()) + .build(&preds) + .unwrap() + .map(Arc::new); + + let builder = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + handle.clone(), + object_store.clone(), + ) + .flat_format(true) + .predicate(Some(Predicate::new(preds))) + .bloom_filter_index_appliers([None, bloom_filter_applier.clone()]) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + + // Verify selection contains RG 0 and RG 1 + assert_eq!(selection.row_group_count(), 2); + assert_eq!(50, selection.get(0).unwrap().row_count()); + assert_eq!(50, selection.get(1).unwrap().row_count()); + + // Verify filtering metrics + assert_eq!(metrics.filter_metrics.rg_total, 2); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); + assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0); + assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100); + } + + /// Creates region metadata for testing fulltext indexes. + /// Schema: tag_0, text_bloom, text_tantivy, field_0, ts + fn fulltext_region_metadata() -> RegionMetadata { + let mut builder = RegionMetadataBuilder::new(REGION_ID); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "tag_0".to_string(), + ConcreteDataType::string_datatype(), + true, + ), + semantic_type: SemanticType::Tag, + column_id: 0, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "text_bloom".to_string(), + ConcreteDataType::string_datatype(), + true, + ) + .with_fulltext_options(FulltextOptions { + enable: true, + analyzer: FulltextAnalyzer::English, + case_sensitive: false, + backend: FulltextBackend::Bloom, + granularity: 1, + false_positive_rate_in_10000: 50, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "text_tantivy".to_string(), + ConcreteDataType::string_datatype(), + true, + ) + .with_fulltext_options(FulltextOptions { + enable: true, + analyzer: FulltextAnalyzer::English, + case_sensitive: false, + backend: FulltextBackend::Tantivy, + granularity: 1, + false_positive_rate_in_10000: 50, + }) + .unwrap(), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "field_0".to_string(), + ConcreteDataType::uint64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 3, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 4, + }) + .primary_key(vec![0]); + builder.build().unwrap() + } + + /// Creates a flat format RecordBatch with string fields for fulltext testing. + fn new_fulltext_record_batch_by_range( + tag: &str, + text_bloom: &str, + text_tantivy: &str, + start: usize, + end: usize, + ) -> RecordBatch { + assert!(end >= start); + let metadata = Arc::new(fulltext_region_metadata()); + let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default()); + + let num_rows = end - start; + let mut columns = Vec::new(); + + // Add primary key column (tag_0) as dictionary array + let mut tag_builder = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + tag_builder.append_value(tag); + } + columns.push(Arc::new(tag_builder.finish()) as ArrayRef); + + // Add text_bloom field (fulltext with bloom backend) + let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect(); + columns.push(Arc::new(StringArray::from(text_bloom_values))); + + // Add text_tantivy field (fulltext with tantivy backend) + let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect(); + columns.push(Arc::new(StringArray::from(text_tantivy_values))); + + // Add field column (field_0) + let field_values: Vec = (start..end).map(|v| v as u64).collect(); + columns.push(Arc::new(UInt64Array::from(field_values))); + + // Add time index column (ts) + let timestamps: Vec = (start..end).map(|v| v as i64).collect(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps))); + + // Add encoded primary key column + let pk = new_primary_key(&[tag]); + let mut pk_builder = BinaryDictionaryBuilder::::new(); + for _ in 0..num_rows { + pk_builder.append(&pk).unwrap(); + } + columns.push(Arc::new(pk_builder.finish())); + + // Add sequence column + columns.push(Arc::new(UInt64Array::from_value(1000, num_rows))); + + // Add op_type column + columns.push(Arc::new(UInt8Array::from_value( + OpType::Put as u8, + num_rows, + ))); + + RecordBatch::try_new(flat_schema, columns).unwrap() + } + + #[tokio::test] + async fn test_write_flat_read_with_fulltext_index() { + let mut env = TestEnv::new().await; + let object_store = env.init_object_store_manager(); + let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare); + let metadata = Arc::new(fulltext_region_metadata()); + let row_group_size = 50; + + // Create flat format RecordBatches with different text content + // RG 0: 0-50 tag="a", bloom="hello world", tantivy="quick brown fox" + // RG 1: 50-100 tag="b", bloom="hello world", tantivy="quick brown fox" + // RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog" + // RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog" + let flat_batches = vec![ + new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50), + new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100), + new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150), + new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200), + ]; + + let flat_source = new_flat_source_from_record_batches(flat_batches); + + let write_opts = WriteOptions { + row_group_size, + ..Default::default() + }; + + let indexer_builder = create_test_indexer_builder( + &env, + object_store.clone(), + file_path.clone(), + metadata.clone(), + row_group_size, + ); + + let mut info = write_flat_sst( + object_store.clone(), + metadata.clone(), + indexer_builder, + file_path.clone(), + flat_source, + &write_opts, + ) + .await; + assert_eq!(200, info.num_rows); + assert!(info.file_size > 0); + assert!(info.index_metadata.file_size > 0); + + // Verify fulltext indexes were created + assert!(info.index_metadata.fulltext_index.index_size > 0); + assert_eq!(info.index_metadata.fulltext_index.row_count, 200); + // text_bloom (column_id 1) and text_tantivy (column_id 2) + info.index_metadata.fulltext_index.columns.sort_unstable(); + assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]); + + assert_eq!( + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(199) + ), + info.time_range + ); + + let handle = create_file_handle_from_sst_info(&info, &metadata); + + let cache = create_test_cache(); + + // Helper functions to create fulltext function expressions + let matches_func = || { + Arc::new( + ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef) + .provide(Default::default()), + ) + }; + + let matches_term_func = || { + Arc::new( + ScalarFunctionFactory::from( + Arc::new(MatchesTermFunction::default()) as FunctionRef, + ) + .provide(Default::default()), + ) + }; + + // Test 1: Filter by text_bloom field using matches_term (bloom backend) + // Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term + let preds = vec![Expr::ScalarFunction(ScalarFunction { + args: vec![col("text_bloom"), "hello".lit()], + func: matches_term_func(), + })]; + + let fulltext_applier = FulltextIndexApplierBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + object_store.clone(), + env.get_puffin_manager(), + &metadata, + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned()) + .build(&preds) + .unwrap() + .map(Arc::new); + + let builder = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + handle.clone(), + object_store.clone(), + ) + .flat_format(true) + .predicate(Some(Predicate::new(preds))) + .fulltext_index_appliers([None, fulltext_applier.clone()]) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + + // Verify selection contains RG 0 and RG 1 (text_bloom="hello world") + assert_eq!(selection.row_group_count(), 2); + assert_eq!(50, selection.get(0).unwrap().row_count()); + assert_eq!(50, selection.get(1).unwrap().row_count()); + + // Verify filtering metrics + assert_eq!(metrics.filter_metrics.rg_total, 4); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); + assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2); + assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100); + + // Test 2: Filter by text_tantivy field using matches (tantivy backend) + // Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query + let preds = vec![Expr::ScalarFunction(ScalarFunction { + args: vec![col("text_tantivy"), "lazy".lit()], + func: matches_func(), + })]; + + let fulltext_applier = FulltextIndexApplierBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + object_store.clone(), + env.get_puffin_manager(), + &metadata, + ) + .with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned()) + .with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned()) + .build(&preds) + .unwrap() + .map(Arc::new); + + let builder = ParquetReaderBuilder::new( + FILE_DIR.to_string(), + PathType::Bare, + handle.clone(), + object_store.clone(), + ) + .flat_format(true) + .predicate(Some(Predicate::new(preds))) + .fulltext_index_appliers([None, fulltext_applier.clone()]) + .cache(CacheStrategy::EnableAll(cache.clone())); + + let mut metrics = ReaderMetrics::default(); + let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap(); + + // Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog") + assert_eq!(selection.row_group_count(), 2); + assert_eq!(50, selection.get(2).unwrap().row_count()); + assert_eq!(50, selection.get(3).unwrap().row_count()); + + // Verify filtering metrics + assert_eq!(metrics.filter_metrics.rg_total, 4); + assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); + assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2); + assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100); + } } diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 9f91a51747..94a16f0b1d 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -29,7 +29,7 @@ use tokio::sync::mpsc::Sender; use crate::access_layer::{AccessLayer, AccessLayerRef}; use crate::cache::CacheManager; use crate::compaction::CompactionScheduler; -use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager}; +use crate::compaction::memory_manager::new_compaction_memory_manager; use crate::config::MitoConfig; use crate::error::Result; use crate::flush::FlushScheduler; diff --git a/src/mito2/src/test_util/sst_util.rs b/src/mito2/src/test_util/sst_util.rs index 9f7b6a0658..d7b45b9237 100644 --- a/src/mito2/src/test_util/sst_util.rs +++ b/src/mito2/src/test_util/sst_util.rs @@ -27,6 +27,10 @@ use parquet::file::metadata::ParquetMetaData; use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; +use store_api::storage::consts::ReservedColumnId; use store_api::storage::{FileId, RegionId}; use crate::read::{Batch, BatchBuilder, Source}; @@ -36,11 +40,44 @@ use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger}; /// Test region id. const REGION_ID: RegionId = RegionId::new(0, 0); -/// Creates a new region metadata for testing SSTs. +/// Creates a new region metadata for testing SSTs with specified encoding. /// -/// Schema: tag_0, tag_1, field_0, ts -pub fn sst_region_metadata() -> RegionMetadata { +/// Dense schema: tag_0, tag_1, field_0, ts +/// Sparse schema: __table_id, __tsid, tag_0, tag_1, field_0, ts +pub fn sst_region_metadata_with_encoding( + encoding: store_api::codec::PrimaryKeyEncoding, +) -> RegionMetadata { let mut builder = RegionMetadataBuilder::new(REGION_ID); + + // For sparse encoding, add internal columns first + if encoding == store_api::codec::PrimaryKeyEncoding::Sparse { + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(), + ConcreteDataType::uint32_datatype(), + false, + ) + .with_skipping_options(SkippingIndexOptions { + granularity: 1, + ..Default::default() + }) + .unwrap(), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::table_id(), + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + DATA_SCHEMA_TSID_COLUMN_NAME.to_string(), + ConcreteDataType::uint64_datatype(), + false, + ), + semantic_type: SemanticType::Tag, + column_id: ReservedColumnId::tsid(), + }); + } + + // Add user-defined columns (tag_0, tag_1, field_0, ts) builder .push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new( @@ -83,12 +120,32 @@ pub fn sst_region_metadata() -> RegionMetadata { ), semantic_type: SemanticType::Timestamp, column_id: 3, - }) - .primary_key(vec![0, 1]); + }); + + // Set primary key based on encoding + if encoding == store_api::codec::PrimaryKeyEncoding::Sparse { + builder.primary_key(vec![ + ReservedColumnId::table_id(), + ReservedColumnId::tsid(), + 0, // tag_0 + 1, // tag_1 + ]); + } else { + builder.primary_key(vec![0, 1]); // Dense: just user tags + } + + builder.primary_key_encoding(encoding); builder.build().unwrap() } -/// Encodes a primary key for specific tags. +/// Creates a new region metadata for testing SSTs. +/// +/// Schema: tag_0, tag_1, field_0, ts +pub fn sst_region_metadata() -> RegionMetadata { + sst_region_metadata_with_encoding(store_api::codec::PrimaryKeyEncoding::Dense) +} + +/// Encodes a primary key for specific tags using dense encoding. pub fn new_primary_key(tags: &[&str]) -> Vec { let fields = (0..tags.len()) .map(|idx| { @@ -104,6 +161,31 @@ pub fn new_primary_key(tags: &[&str]) -> Vec { .unwrap() } +/// Encodes a primary key for specific tags using sparse encoding. +/// Includes internal columns (table_id, tsid) required by sparse format. +pub fn new_sparse_primary_key( + tags: &[&str], + metadata: &Arc, + table_id: u32, + tsid: u64, +) -> Vec { + use mito_codec::row_converter::PrimaryKeyCodec; + + let codec = mito_codec::row_converter::SparsePrimaryKeyCodec::new(metadata); + + // Sparse encoding requires internal columns first, then user tags + let values = vec![ + (ReservedColumnId::table_id(), ValueRef::UInt32(table_id)), + (ReservedColumnId::tsid(), ValueRef::UInt64(tsid)), + (0, ValueRef::String(tags[0])), // tag_0 + (1, ValueRef::String(tags[1])), // tag_1 + ]; + + let mut buffer = Vec::new(); + codec.encode_value_refs(&values, &mut buffer).unwrap(); + buffer +} + /// Creates a [Source] from `batches`. pub fn new_source(batches: &[Batch]) -> Source { let reader = VecBatchReader::new(batches);