Compare commits

...

13 Commits

Author SHA1 Message Date
discord9
1d3cfdc0e5 clippy
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 21:07:24 +08:00
discord9
088401c3e9 c
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 21:05:23 +08:00
discord9
4419e0254f refactor: add list test
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 21:05:13 +08:00
discord9
709ccd3e31 c
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 18:39:14 +08:00
discord9
5b50b4824d wt
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 18:35:19 +08:00
discord9
1ef5c2e024 chore
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 18:22:03 +08:00
discord9
d20727f335 test: better fuzz
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 18:15:02 +08:00
discord9
2391ab1941 even more test
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 16:21:27 +08:00
discord9
ec77a5d53a sanity check
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 13:54:19 +08:00
discord9
dbad96eb80 more test
Signed-off-by: discord9 <discord9@163.com>
2025-12-22 13:44:00 +08:00
discord9
c0652f6dd5 chore: release push check against Cargo.toml (#7426)
Signed-off-by: discord9 <discord9@163.com>
2025-12-19 13:16:15 +00:00
Yingwen
fed6cb0806 fix: flat format use correct encoding in indexer for tags (#7440)
* test: add inverted and skipping test

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: Add tests for fulltext index

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: index dictionary type in correct encoding in flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: use encode_data_type() in SortField

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: refine imports

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add tests for sparse encoding

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove logs

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update list test

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: simplify tests

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-12-19 07:36:44 +00:00
discord9
69659211f6 chore: fix bincode version (#7445)
Signed-off-by: discord9 <discord9@163.com>
2025-12-19 07:36:28 +00:00
9 changed files with 1833 additions and 760 deletions

View File

@@ -49,6 +49,17 @@ function create_version() {
echo "GITHUB_REF_NAME is empty in push event" >&2
exit 1
fi
# For tag releases, ensure GITHUB_REF_NAME matches the version in Cargo.toml
CARGO_VERSION=$(grep '^version = ' Cargo.toml | cut -d '"' -f 2 | head -n 1)
EXPECTED_REF_NAME="v${CARGO_VERSION}"
if [ "$GITHUB_REF_NAME" != "$EXPECTED_REF_NAME" ]; then
echo "Error: GITHUB_REF_NAME '$GITHUB_REF_NAME' does not match Cargo.toml version 'v${CARGO_VERSION}'" >&2
echo "Expected tag name: '$EXPECTED_REF_NAME'" >&2
exit 1
fi
echo "$GITHUB_REF_NAME"
elif [ "$GITHUB_EVENT_NAME" = workflow_dispatch ]; then
echo "$NEXT_RELEASE_VERSION-$(git rev-parse --short HEAD)-$(date "+%Y%m%d-%s")"

View File

@@ -19,7 +19,7 @@ arc-swap = "1.0"
arrow.workspace = true
arrow-schema.workspace = true
async-trait.workspace = true
bincode = "1.3"
bincode = "=1.3.3"
catalog.workspace = true
chrono.workspace = true
common-base.workspace = true

View File

@@ -48,7 +48,7 @@ impl IndexValueCodec {
) -> Result<()> {
ensure!(!value.is_null(), IndexEncodeNullSnafu);
if field.data_type().is_string() {
if field.encode_data_type().is_string() {
let value = value
.try_into_string()
.context(FieldTypeMismatchSnafu)?

View File

@@ -57,15 +57,20 @@ impl SortField {
&self.data_type
}
pub fn estimated_size(&self) -> usize {
/// Returns the physical data type to encode of the field.
///
/// For example, a dictionary field will be encoded as its value type.
pub fn encode_data_type(&self) -> &ConcreteDataType {
match &self.data_type {
ConcreteDataType::Dictionary(dict_type) => {
Self::estimated_size_by_type(dict_type.value_type())
}
data_type => Self::estimated_size_by_type(data_type),
ConcreteDataType::Dictionary(dict_type) => dict_type.value_type(),
_ => &self.data_type,
}
}
pub fn estimated_size(&self) -> usize {
Self::estimated_size_by_type(self.encode_data_type())
}
fn estimated_size_by_type(data_type: &ConcreteDataType) -> usize {
match data_type {
ConcreteDataType::Boolean(_) => 2,
@@ -98,12 +103,7 @@ impl SortField {
serializer: &mut Serializer<&mut Vec<u8>>,
value: &ValueRef,
) -> Result<()> {
match self.data_type() {
ConcreteDataType::Dictionary(dict_type) => {
Self::serialize_by_type(dict_type.value_type(), serializer, value)
}
data_type => Self::serialize_by_type(data_type, serializer, value),
}
Self::serialize_by_type(self.encode_data_type(), serializer, value)
}
fn serialize_by_type(
@@ -194,12 +194,7 @@ impl SortField {
/// Deserialize a value from the deserializer.
pub fn deserialize<B: Buf>(&self, deserializer: &mut Deserializer<B>) -> Result<Value> {
match &self.data_type {
ConcreteDataType::Dictionary(dict_type) => {
Self::deserialize_by_type(dict_type.value_type(), deserializer)
}
data_type => Self::deserialize_by_type(data_type, deserializer),
}
Self::deserialize_by_type(self.encode_data_type(), deserializer)
}
fn deserialize_by_type<B: Buf>(
@@ -301,12 +296,7 @@ impl SortField {
return Ok(1);
}
match &self.data_type {
ConcreteDataType::Dictionary(dict_type) => {
Self::skip_deserialize_by_type(dict_type.value_type(), bytes, deserializer)
}
data_type => Self::skip_deserialize_by_type(data_type, bytes, deserializer),
}
Self::skip_deserialize_by_type(self.encode_data_type(), bytes, deserializer)
}
fn skip_deserialize_by_type(

View File

@@ -872,9 +872,9 @@ StorageSstEntry { file_path: "test/11_0000000002/index/<file_id>.puffin", file_s
StorageSstEntry { file_path: "test/22_0000000042/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/22_0000000042/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }"#).await;
test_list_ssts_with_format(true, r#"
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(292), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
ManifestSstEntry { table_dir: "test/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000001/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000001/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640257(11, 1), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/11_0000000002/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/11_0000000002/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 47244640258(11, 2), node_id: None, visible: true }
ManifestSstEntry { table_dir: "test/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", index_version: 0, level: 0, file_path: "test/22_0000000042/<file_id>.parquet", file_size: 2837, index_file_path: Some("test/22_0000000042/index/<file_id>.puffin"), index_file_size: Some(250), num_rows: 10, num_row_groups: 1, num_series: Some(1), min_ts: 0::Millisecond, max_ts: 9000::Millisecond, sequence: Some(10), origin_region_id: 94489280554(22, 42), node_id: None, visible: true }"#,
r#"
StorageSstEntry { file_path: "test/11_0000000001/<file_id>.parquet", file_size: None, last_modified_ms: None, node_id: None }
StorageSstEntry { file_path: "test/11_0000000001/index/<file_id>.puffin", file_size: None, last_modified_ms: None, node_id: None }

View File

@@ -95,21 +95,32 @@ mod tests {
use std::collections::HashSet;
use std::sync::Arc;
use api::v1::OpType;
use api::v1::{OpType, SemanticType};
use common_function::function::FunctionRef;
use common_function::function_factory::ScalarFunctionFactory;
use common_function::scalars::matches::MatchesFunction;
use common_function::scalars::matches_term::MatchesTermFunction;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator, col, lit};
use datatypes::arrow;
use datatypes::arrow::array::{
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringDictionaryBuilder,
ArrayRef, BinaryDictionaryBuilder, RecordBatch, StringArray, StringDictionaryBuilder,
TimestampMillisecondArray, UInt8Array, UInt64Array,
};
use datatypes::arrow::datatypes::{DataType, Field, Schema, UInt32Type};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextAnalyzer, FulltextBackend, FulltextOptions};
use object_store::ObjectStore;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_request::PathType;
use store_api::storage::{ColumnSchema, RegionId};
use table::predicate::Predicate;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -122,6 +133,7 @@ mod tests {
use crate::sst::file::{FileHandle, FileMeta, RegionFileId, RegionIndexId};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
@@ -133,11 +145,13 @@ mod tests {
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_batch_with_custom_sequence, new_primary_key, new_source,
sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
new_sparse_primary_key, sst_file_handle, sst_file_handle_with_file_id, sst_region_metadata,
sst_region_metadata_with_encoding,
};
use crate::test_util::{TestEnv, check_reader_result};
const FILE_DIR: &str = "/";
const REGION_ID: RegionId = RegionId::new(0, 0);
#[derive(Clone)]
struct FixedPathProvider {
@@ -1064,6 +1078,154 @@ mod tests {
FlatSource::Iter(Box::new(batches.into_iter().map(Ok)))
}
/// Creates a flat format RecordBatch for testing with sparse primary key encoding.
/// Similar to `new_record_batch_by_range` but without individual primary key columns.
fn new_record_batch_by_range_sparse(
tags: &[&str],
start: usize,
end: usize,
metadata: &Arc<RegionMetadata>,
) -> RecordBatch {
assert!(end >= start);
let flat_schema = to_flat_sst_arrow_schema(
metadata,
&FlatSchemaOptions::from_encoding(PrimaryKeyEncoding::Sparse),
);
let num_rows = end - start;
let mut columns: Vec<ArrayRef> = Vec::new();
// NOTE: Individual primary key columns (tag_0, tag_1) are NOT included in sparse format
// Add field column (field_0)
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
columns.push(Arc::new(UInt64Array::from(field_values)) as ArrayRef);
// Add time index column (ts)
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as ArrayRef);
// Add encoded primary key column using sparse encoding
let table_id = 1u32; // Test table ID
let tsid = 100u64; // Base TSID
let pk = new_sparse_primary_key(tags, metadata, table_id, tsid);
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
pk_builder.append(&pk).unwrap();
}
columns.push(Arc::new(pk_builder.finish()) as ArrayRef);
// Add sequence column
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)) as ArrayRef);
// Add op_type column
columns.push(Arc::new(UInt8Array::from_value(OpType::Put as u8, num_rows)) as ArrayRef);
RecordBatch::try_new(flat_schema, columns).unwrap()
}
/// Helper function to create IndexerBuilderImpl for tests.
fn create_test_indexer_builder(
env: &TestEnv,
object_store: ObjectStore,
file_path: RegionFilePathFactory,
metadata: Arc<RegionMetadata>,
row_group_size: usize,
) -> IndexerBuilderImpl {
let puffin_manager = env.get_puffin_manager().build(object_store, file_path);
let intermediate_manager = env.get_intermediate_manager();
IndexerBuilderImpl {
build_type: IndexBuildType::Flush,
metadata,
row_group_size,
puffin_manager,
write_cache_enabled: false,
intermediate_manager,
index_options: IndexOptions {
inverted_index: InvertedIndexOptions {
segment_row_count: 1,
..Default::default()
},
},
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
}
}
/// Helper function to write flat SST and return SstInfo.
async fn write_flat_sst(
object_store: ObjectStore,
metadata: Arc<RegionMetadata>,
indexer_builder: IndexerBuilderImpl,
file_path: RegionFilePathFactory,
flat_source: FlatSource,
write_opts: &WriteOptions,
) -> SstInfo {
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store(
object_store,
metadata,
IndexConfig::default(),
indexer_builder,
file_path,
&mut metrics,
)
.await;
writer
.write_all_flat(flat_source, write_opts)
.await
.unwrap()
.remove(0)
}
/// Helper function to create FileHandle from SstInfo.
fn create_file_handle_from_sst_info(
info: &SstInfo,
metadata: &Arc<RegionMetadata>,
) -> FileHandle {
FileHandle::new(
FileMeta {
region_id: metadata.region_id,
file_id: info.file_id,
time_range: info.time_range,
level: 0,
file_size: info.file_size,
max_row_group_uncompressed_size: info.max_row_group_uncompressed_size,
available_indexes: info.index_metadata.build_available_indexes(),
indexes: info.index_metadata.build_indexes(),
index_file_size: info.index_metadata.file_size,
index_version: 0,
num_row_groups: info.num_row_groups,
num_rows: info.num_rows as u64,
sequence: None,
partition_expr: match &metadata.partition_expr {
Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
.expect("partition expression should be valid JSON"),
None => None,
},
num_series: 0,
},
Arc::new(NoopFilePurger),
)
}
/// Helper function to create test cache with standard settings.
fn create_test_cache() -> Arc<CacheManager> {
Arc::new(
CacheManager::builder()
.index_result_cache_size(1024 * 1024)
.index_metadata_size(1024 * 1024)
.index_content_page_size(1024 * 1024)
.index_content_size(1024 * 1024)
.puffin_metadata_size(1024 * 1024)
.build(),
)
}
#[tokio::test]
async fn test_write_flat_with_index() {
let mut env = TestEnv::new().await;
@@ -1238,4 +1400,709 @@ mod tests {
assert_eq!(*override_batch, expected_batch);
}
}
#[tokio::test]
async fn test_write_flat_read_with_inverted_index() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
let metadata = Arc::new(sst_region_metadata());
let row_group_size = 100;
// Create flat format RecordBatches with non-overlapping timestamp ranges
// Each batch becomes one row group (row_group_size = 100)
// Data: ts tag_0 tag_1
// RG 0: 0-50 [a, d]
// RG 0: 50-100 [b, d]
// RG 1: 100-150 [c, d]
// RG 1: 150-200 [c, f]
let flat_batches = vec![
new_record_batch_by_range(&["a", "d"], 0, 50),
new_record_batch_by_range(&["b", "d"], 50, 100),
new_record_batch_by_range(&["c", "d"], 100, 150),
new_record_batch_by_range(&["c", "f"], 150, 200),
];
let flat_source = new_flat_source_from_record_batches(flat_batches);
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let indexer_builder = create_test_indexer_builder(
&env,
object_store.clone(),
file_path.clone(),
metadata.clone(),
row_group_size,
);
let info = write_flat_sst(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
flat_source,
&write_opts,
)
.await;
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
let handle = create_file_handle_from_sst_info(&info, &metadata);
let cache = create_test_cache();
// Test 1: Filter by tag_0 = "b"
// Expected: Only rows with tag_0="b"
let preds = vec![col("tag_0").eq(lit("b"))];
let inverted_index_applier = InvertedIndexApplierBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
object_store.clone(),
&metadata,
HashSet::from_iter([0]),
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_inverted_index_cache(cache.inverted_index_cache().cloned())
.build(&preds)
.unwrap()
.map(Arc::new);
let builder = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
// Verify selection contains only RG 0 (tag_0="b", ts 0-100)
assert_eq!(selection.row_group_count(), 1);
assert_eq!(50, selection.get(0).unwrap().row_count());
// Verify filtering metrics
assert_eq!(metrics.filter_metrics.rg_total, 2);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 1);
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 0);
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 50);
}
#[tokio::test]
async fn test_write_flat_read_with_bloom_filter() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
let metadata = Arc::new(sst_region_metadata());
let row_group_size = 100;
// Create flat format RecordBatches with non-overlapping timestamp ranges
// Each batch becomes one row group (row_group_size = 100)
// Data: ts tag_0 tag_1
// RG 0: 0-50 [a, d]
// RG 0: 50-100 [b, e]
// RG 1: 100-150 [c, d]
// RG 1: 150-200 [c, f]
let flat_batches = vec![
new_record_batch_by_range(&["a", "d"], 0, 50),
new_record_batch_by_range(&["b", "e"], 50, 100),
new_record_batch_by_range(&["c", "d"], 100, 150),
new_record_batch_by_range(&["c", "f"], 150, 200),
];
let flat_source = new_flat_source_from_record_batches(flat_batches);
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let indexer_builder = create_test_indexer_builder(
&env,
object_store.clone(),
file_path.clone(),
metadata.clone(),
row_group_size,
);
let info = write_flat_sst(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
flat_source,
&write_opts,
)
.await;
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
let handle = create_file_handle_from_sst_info(&info, &metadata);
let cache = create_test_cache();
// Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
// Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
let preds = vec![
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
col("tag_1").eq(lit("d")),
];
let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
object_store.clone(),
&metadata,
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
.build(&preds)
.unwrap()
.map(Arc::new);
let builder = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
// Verify selection contains RG 0 and RG 1
assert_eq!(selection.row_group_count(), 2);
assert_eq!(50, selection.get(0).unwrap().row_count());
assert_eq!(50, selection.get(1).unwrap().row_count());
// Verify filtering metrics
assert_eq!(metrics.filter_metrics.rg_total, 2);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
}
#[tokio::test]
async fn test_write_flat_read_with_inverted_index_sparse() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
let metadata = Arc::new(sst_region_metadata_with_encoding(
PrimaryKeyEncoding::Sparse,
));
let row_group_size = 100;
// Create flat format RecordBatches with non-overlapping timestamp ranges
// Each batch becomes one row group (row_group_size = 100)
// Data: ts tag_0 tag_1
// RG 0: 0-50 [a, d]
// RG 0: 50-100 [b, d]
// RG 1: 100-150 [c, d]
// RG 1: 150-200 [c, f]
let flat_batches = vec![
new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
new_record_batch_by_range_sparse(&["b", "d"], 50, 100, &metadata),
new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
];
let flat_source = new_flat_source_from_record_batches(flat_batches);
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let indexer_builder = create_test_indexer_builder(
&env,
object_store.clone(),
file_path.clone(),
metadata.clone(),
row_group_size,
);
let info = write_flat_sst(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
flat_source,
&write_opts,
)
.await;
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
let handle = create_file_handle_from_sst_info(&info, &metadata);
let cache = create_test_cache();
// Test 1: Filter by tag_0 = "b"
// Expected: Only rows with tag_0="b"
let preds = vec![col("tag_0").eq(lit("b"))];
let inverted_index_applier = InvertedIndexApplierBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
object_store.clone(),
&metadata,
HashSet::from_iter([0]),
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_inverted_index_cache(cache.inverted_index_cache().cloned())
.build(&preds)
.unwrap()
.map(Arc::new);
let builder = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.inverted_index_appliers([inverted_index_applier.clone(), None])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
// RG 0 has 50 matching rows (tag_0="b")
assert_eq!(selection.row_group_count(), 1);
assert_eq!(50, selection.get(0).unwrap().row_count());
// Verify filtering metrics
// Note: With sparse encoding, tag columns aren't stored separately,
// so minmax filtering on tags doesn't work (only inverted index)
assert_eq!(metrics.filter_metrics.rg_total, 2);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0); // No minmax stats for tags in sparse format
assert_eq!(metrics.filter_metrics.rg_inverted_filtered, 1);
assert_eq!(metrics.filter_metrics.rows_inverted_filtered, 150);
}
#[tokio::test]
async fn test_write_flat_read_with_bloom_filter_sparse() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
let metadata = Arc::new(sst_region_metadata_with_encoding(
PrimaryKeyEncoding::Sparse,
));
let row_group_size = 100;
// Create flat format RecordBatches with non-overlapping timestamp ranges
// Each batch becomes one row group (row_group_size = 100)
// Data: ts tag_0 tag_1
// RG 0: 0-50 [a, d]
// RG 0: 50-100 [b, e]
// RG 1: 100-150 [c, d]
// RG 1: 150-200 [c, f]
let flat_batches = vec![
new_record_batch_by_range_sparse(&["a", "d"], 0, 50, &metadata),
new_record_batch_by_range_sparse(&["b", "e"], 50, 100, &metadata),
new_record_batch_by_range_sparse(&["c", "d"], 100, 150, &metadata),
new_record_batch_by_range_sparse(&["c", "f"], 150, 200, &metadata),
];
let flat_source = new_flat_source_from_record_batches(flat_batches);
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let indexer_builder = create_test_indexer_builder(
&env,
object_store.clone(),
file_path.clone(),
metadata.clone(),
row_group_size,
);
let info = write_flat_sst(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
flat_source,
&write_opts,
)
.await;
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
let handle = create_file_handle_from_sst_info(&info, &metadata);
let cache = create_test_cache();
// Filter by ts >= 50 AND ts < 200 AND tag_1 = "d"
// Expected: RG 0 (ts 0-100) and RG 1 (ts 100-200), both have tag_1="d"
let preds = vec![
col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(50), None))),
col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(200), None))),
col("tag_1").eq(lit("d")),
];
let bloom_filter_applier = BloomFilterIndexApplierBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
object_store.clone(),
&metadata,
env.get_puffin_manager(),
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_bloom_filter_index_cache(cache.bloom_filter_index_cache().cloned())
.build(&preds)
.unwrap()
.map(Arc::new);
let builder = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.bloom_filter_index_appliers([None, bloom_filter_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
// Verify selection contains RG 0 and RG 1
assert_eq!(selection.row_group_count(), 2);
assert_eq!(50, selection.get(0).unwrap().row_count());
assert_eq!(50, selection.get(1).unwrap().row_count());
// Verify filtering metrics
assert_eq!(metrics.filter_metrics.rg_total, 2);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_bloom_filtered, 0);
assert_eq!(metrics.filter_metrics.rows_bloom_filtered, 100);
}
/// Creates region metadata for testing fulltext indexes.
/// Schema: tag_0, text_bloom, text_tantivy, field_0, ts
fn fulltext_region_metadata() -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(REGION_ID);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_0".to_string(),
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 0,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"text_bloom".to_string(),
ConcreteDataType::string_datatype(),
true,
)
.with_fulltext_options(FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Bloom,
granularity: 1,
false_positive_rate_in_10000: 50,
})
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"text_tantivy".to_string(),
ConcreteDataType::string_datatype(),
true,
)
.with_fulltext_options(FulltextOptions {
enable: true,
analyzer: FulltextAnalyzer::English,
case_sensitive: false,
backend: FulltextBackend::Tantivy,
granularity: 1,
false_positive_rate_in_10000: 50,
})
.unwrap(),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"field_0".to_string(),
ConcreteDataType::uint64_datatype(),
true,
),
semantic_type: SemanticType::Field,
column_id: 3,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 4,
})
.primary_key(vec![0]);
builder.build().unwrap()
}
/// Creates a flat format RecordBatch with string fields for fulltext testing.
fn new_fulltext_record_batch_by_range(
tag: &str,
text_bloom: &str,
text_tantivy: &str,
start: usize,
end: usize,
) -> RecordBatch {
assert!(end >= start);
let metadata = Arc::new(fulltext_region_metadata());
let flat_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
let num_rows = end - start;
let mut columns = Vec::new();
// Add primary key column (tag_0) as dictionary array
let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
tag_builder.append_value(tag);
}
columns.push(Arc::new(tag_builder.finish()) as ArrayRef);
// Add text_bloom field (fulltext with bloom backend)
let text_bloom_values: Vec<_> = (0..num_rows).map(|_| text_bloom).collect();
columns.push(Arc::new(StringArray::from(text_bloom_values)));
// Add text_tantivy field (fulltext with tantivy backend)
let text_tantivy_values: Vec<_> = (0..num_rows).map(|_| text_tantivy).collect();
columns.push(Arc::new(StringArray::from(text_tantivy_values)));
// Add field column (field_0)
let field_values: Vec<u64> = (start..end).map(|v| v as u64).collect();
columns.push(Arc::new(UInt64Array::from(field_values)));
// Add time index column (ts)
let timestamps: Vec<i64> = (start..end).map(|v| v as i64).collect();
columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)));
// Add encoded primary key column
let pk = new_primary_key(&[tag]);
let mut pk_builder = BinaryDictionaryBuilder::<UInt32Type>::new();
for _ in 0..num_rows {
pk_builder.append(&pk).unwrap();
}
columns.push(Arc::new(pk_builder.finish()));
// Add sequence column
columns.push(Arc::new(UInt64Array::from_value(1000, num_rows)));
// Add op_type column
columns.push(Arc::new(UInt8Array::from_value(
OpType::Put as u8,
num_rows,
)));
RecordBatch::try_new(flat_schema, columns).unwrap()
}
#[tokio::test]
async fn test_write_flat_read_with_fulltext_index() {
let mut env = TestEnv::new().await;
let object_store = env.init_object_store_manager();
let file_path = RegionFilePathFactory::new(FILE_DIR.to_string(), PathType::Bare);
let metadata = Arc::new(fulltext_region_metadata());
let row_group_size = 50;
// Create flat format RecordBatches with different text content
// RG 0: 0-50 tag="a", bloom="hello world", tantivy="quick brown fox"
// RG 1: 50-100 tag="b", bloom="hello world", tantivy="quick brown fox"
// RG 2: 100-150 tag="c", bloom="goodbye world", tantivy="lazy dog"
// RG 3: 150-200 tag="d", bloom="goodbye world", tantivy="lazy dog"
let flat_batches = vec![
new_fulltext_record_batch_by_range("a", "hello world", "quick brown fox", 0, 50),
new_fulltext_record_batch_by_range("b", "hello world", "quick brown fox", 50, 100),
new_fulltext_record_batch_by_range("c", "goodbye world", "lazy dog", 100, 150),
new_fulltext_record_batch_by_range("d", "goodbye world", "lazy dog", 150, 200),
];
let flat_source = new_flat_source_from_record_batches(flat_batches);
let write_opts = WriteOptions {
row_group_size,
..Default::default()
};
let indexer_builder = create_test_indexer_builder(
&env,
object_store.clone(),
file_path.clone(),
metadata.clone(),
row_group_size,
);
let mut info = write_flat_sst(
object_store.clone(),
metadata.clone(),
indexer_builder,
file_path.clone(),
flat_source,
&write_opts,
)
.await;
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert!(info.index_metadata.file_size > 0);
// Verify fulltext indexes were created
assert!(info.index_metadata.fulltext_index.index_size > 0);
assert_eq!(info.index_metadata.fulltext_index.row_count, 200);
// text_bloom (column_id 1) and text_tantivy (column_id 2)
info.index_metadata.fulltext_index.columns.sort_unstable();
assert_eq!(info.index_metadata.fulltext_index.columns, vec![1, 2]);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(199)
),
info.time_range
);
let handle = create_file_handle_from_sst_info(&info, &metadata);
let cache = create_test_cache();
// Helper functions to create fulltext function expressions
let matches_func = || {
Arc::new(
ScalarFunctionFactory::from(Arc::new(MatchesFunction::default()) as FunctionRef)
.provide(Default::default()),
)
};
let matches_term_func = || {
Arc::new(
ScalarFunctionFactory::from(
Arc::new(MatchesTermFunction::default()) as FunctionRef,
)
.provide(Default::default()),
)
};
// Test 1: Filter by text_bloom field using matches_term (bloom backend)
// Expected: RG 0 and RG 1 (rows 0-100) which have "hello" term
let preds = vec![Expr::ScalarFunction(ScalarFunction {
args: vec![col("text_bloom"), "hello".lit()],
func: matches_term_func(),
})];
let fulltext_applier = FulltextIndexApplierBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
object_store.clone(),
env.get_puffin_manager(),
&metadata,
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
.build(&preds)
.unwrap()
.map(Arc::new);
let builder = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.fulltext_index_appliers([None, fulltext_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
// Verify selection contains RG 0 and RG 1 (text_bloom="hello world")
assert_eq!(selection.row_group_count(), 2);
assert_eq!(50, selection.get(0).unwrap().row_count());
assert_eq!(50, selection.get(1).unwrap().row_count());
// Verify filtering metrics
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
// Test 2: Filter by text_tantivy field using matches (tantivy backend)
// Expected: RG 2 and RG 3 (rows 100-200) which have "lazy" in query
let preds = vec![Expr::ScalarFunction(ScalarFunction {
args: vec![col("text_tantivy"), "lazy".lit()],
func: matches_func(),
})];
let fulltext_applier = FulltextIndexApplierBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
object_store.clone(),
env.get_puffin_manager(),
&metadata,
)
.with_puffin_metadata_cache(cache.puffin_metadata_cache().cloned())
.with_bloom_filter_cache(cache.bloom_filter_index_cache().cloned())
.build(&preds)
.unwrap()
.map(Arc::new);
let builder = ParquetReaderBuilder::new(
FILE_DIR.to_string(),
PathType::Bare,
handle.clone(),
object_store.clone(),
)
.flat_format(true)
.predicate(Some(Predicate::new(preds)))
.fulltext_index_appliers([None, fulltext_applier.clone()])
.cache(CacheStrategy::EnableAll(cache.clone()));
let mut metrics = ReaderMetrics::default();
let (_context, selection) = builder.build_reader_input(&mut metrics).await.unwrap();
// Verify selection contains RG 2 and RG 3 (text_tantivy="lazy dog")
assert_eq!(selection.row_group_count(), 2);
assert_eq!(50, selection.get(2).unwrap().row_count());
assert_eq!(50, selection.get(3).unwrap().row_count());
// Verify filtering metrics
assert_eq!(metrics.filter_metrics.rg_total, 4);
assert_eq!(metrics.filter_metrics.rg_minmax_filtered, 0);
assert_eq!(metrics.filter_metrics.rg_fulltext_filtered, 2);
assert_eq!(metrics.filter_metrics.rows_fulltext_filtered, 100);
}
}

View File

@@ -29,7 +29,7 @@ use tokio::sync::mpsc::Sender;
use crate::access_layer::{AccessLayer, AccessLayerRef};
use crate::cache::CacheManager;
use crate::compaction::CompactionScheduler;
use crate::compaction::memory_manager::{CompactionMemoryManager, new_compaction_memory_manager};
use crate::compaction::memory_manager::new_compaction_memory_manager;
use crate::config::MitoConfig;
use crate::error::Result;
use crate::flush::FlushScheduler;

View File

@@ -27,6 +27,10 @@ use parquet::file::metadata::ParquetMetaData;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{FileId, RegionId};
use crate::read::{Batch, BatchBuilder, Source};
@@ -36,11 +40,44 @@ use crate::test_util::{VecBatchReader, new_batch_builder, new_noop_file_purger};
/// Test region id.
const REGION_ID: RegionId = RegionId::new(0, 0);
/// Creates a new region metadata for testing SSTs.
/// Creates a new region metadata for testing SSTs with specified encoding.
///
/// Schema: tag_0, tag_1, field_0, ts
pub fn sst_region_metadata() -> RegionMetadata {
/// Dense schema: tag_0, tag_1, field_0, ts
/// Sparse schema: __table_id, __tsid, tag_0, tag_1, field_0, ts
pub fn sst_region_metadata_with_encoding(
encoding: store_api::codec::PrimaryKeyEncoding,
) -> RegionMetadata {
let mut builder = RegionMetadataBuilder::new(REGION_ID);
// For sparse encoding, add internal columns first
if encoding == store_api::codec::PrimaryKeyEncoding::Sparse {
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
ConcreteDataType::uint32_datatype(),
false,
)
.with_skipping_options(SkippingIndexOptions {
granularity: 1,
..Default::default()
})
.unwrap(),
semantic_type: SemanticType::Tag,
column_id: ReservedColumnId::table_id(),
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
ConcreteDataType::uint64_datatype(),
false,
),
semantic_type: SemanticType::Tag,
column_id: ReservedColumnId::tsid(),
});
}
// Add user-defined columns (tag_0, tag_1, field_0, ts)
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
@@ -83,12 +120,32 @@ pub fn sst_region_metadata() -> RegionMetadata {
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![0, 1]);
});
// Set primary key based on encoding
if encoding == store_api::codec::PrimaryKeyEncoding::Sparse {
builder.primary_key(vec![
ReservedColumnId::table_id(),
ReservedColumnId::tsid(),
0, // tag_0
1, // tag_1
]);
} else {
builder.primary_key(vec![0, 1]); // Dense: just user tags
}
builder.primary_key_encoding(encoding);
builder.build().unwrap()
}
/// Encodes a primary key for specific tags.
/// Creates a new region metadata for testing SSTs.
///
/// Schema: tag_0, tag_1, field_0, ts
pub fn sst_region_metadata() -> RegionMetadata {
sst_region_metadata_with_encoding(store_api::codec::PrimaryKeyEncoding::Dense)
}
/// Encodes a primary key for specific tags using dense encoding.
pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
let fields = (0..tags.len())
.map(|idx| {
@@ -104,6 +161,31 @@ pub fn new_primary_key(tags: &[&str]) -> Vec<u8> {
.unwrap()
}
/// Encodes a primary key for specific tags using sparse encoding.
/// Includes internal columns (table_id, tsid) required by sparse format.
pub fn new_sparse_primary_key(
tags: &[&str],
metadata: &Arc<RegionMetadata>,
table_id: u32,
tsid: u64,
) -> Vec<u8> {
use mito_codec::row_converter::PrimaryKeyCodec;
let codec = mito_codec::row_converter::SparsePrimaryKeyCodec::new(metadata);
// Sparse encoding requires internal columns first, then user tags
let values = vec![
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
(ReservedColumnId::tsid(), ValueRef::UInt64(tsid)),
(0, ValueRef::String(tags[0])), // tag_0
(1, ValueRef::String(tags[1])), // tag_1
];
let mut buffer = Vec::new();
codec.encode_value_refs(&values, &mut buffer).unwrap();
buffer
}
/// Creates a [Source] from `batches`.
pub fn new_source(batches: &[Batch]) -> Source {
let reader = VecBatchReader::new(batches);

File diff suppressed because it is too large Load Diff