diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 676e2c17ad..309f76b621 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -80,6 +80,8 @@ mod remap_manifests_test; #[cfg(test)] mod apply_staging_manifest_test; +#[cfg(test)] +mod partition_filter_test; mod puffin_index; use std::any::Any; diff --git a/src/mito2/src/engine/apply_staging_manifest_test.rs b/src/mito2/src/engine/apply_staging_manifest_test.rs index d4fc7f7cb7..2010b9ac54 100644 --- a/src/mito2/src/engine/apply_staging_manifest_test.rs +++ b/src/mito2/src/engine/apply_staging_manifest_test.rs @@ -16,6 +16,7 @@ use std::assert_matches::assert_matches; use std::fs; use api::v1::Rows; +use common_recordbatch::RecordBatches; use datatypes::value::Value; use partition::expr::{PartitionExpr, col}; use store_api::region_engine::{ @@ -26,6 +27,7 @@ use store_api::region_request::{ }; use store_api::storage::{FileId, RegionId}; +use super::ScanRequest; use crate::config::MitoConfig; use crate::error::Error; use crate::manifest::action::RegionManifest; @@ -451,3 +453,364 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma Error::SerdeJson { .. } ); } + +#[tokio::test] +async fn test_split_repartition_causes_duplicate_data() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::with_prefix("split-duplicate").await; + let engine = env.create_engine(MitoConfig::default()).await; + + let source_region_id = RegionId::new(1, 1); + let target_region_id_1 = RegionId::new(1, 1); + let target_region_id_2 = RegionId::new(1, 3); + + // Use field_0 (i64) for partitioning. + let source_partition_expr = col("field_0").lt(Value::from(10.00)); + let target_partition_expr_1 = col("field_0").lt(Value::from(5.00)); + let target_partition_expr_2 = col("field_0") + .gt_eq(Value::from(5.00)) + .and(col("field_0").lt(Value::from(10.00))); + + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(source_partition_expr.as_json_str().unwrap())) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(source_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Insert 0..10 + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 10), + }; + put_rows(&engine, source_region_id, rows_data).await; + engine + .handle_request( + source_region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + let source_region = engine.get_region(source_region_id).unwrap(); + let source_manifest = source_region.manifest_ctx.manifest().await; + let source_file_count = source_manifest.files.len(); + assert_eq!(source_file_count, 1); + + engine + .handle_request( + source_region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: target_partition_expr_1.as_json_str().unwrap(), + }), + ) + .await + .unwrap(); + + let remap_result = engine + .remap_manifests(RemapManifestsRequest { + region_id: source_region_id, + input_regions: vec![source_region_id], + region_mapping: [( + source_region_id, + vec![target_region_id_1, target_region_id_2], + )] + .into_iter() + .collect(), + new_partition_exprs: [ + ( + target_region_id_1, + target_partition_expr_1.as_json_str().unwrap(), + ), + ( + target_region_id_2, + target_partition_expr_2.as_json_str().unwrap(), + ), + ] + .into_iter() + .collect(), + }) + .await + .unwrap(); + + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(target_partition_expr_2.as_json_str().unwrap())) + .build(); + engine + .handle_request(target_region_id_2, RegionRequest::Create(request)) + .await + .unwrap(); + + engine + .handle_request( + target_region_id_2, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: target_partition_expr_2.as_json_str().unwrap(), + }), + ) + .await + .unwrap(); + + engine + .handle_request( + target_region_id_2, + RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { + partition_expr: target_partition_expr_2.as_json_str().unwrap(), + central_region_id: source_region_id, + manifest_path: remap_result.manifest_paths[&target_region_id_2].clone(), + }), + ) + .await + .unwrap(); + + engine + .handle_request( + target_region_id_1, + RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { + partition_expr: target_partition_expr_1.as_json_str().unwrap(), + central_region_id: source_region_id, + manifest_path: remap_result.manifest_paths[&target_region_id_1].clone(), + }), + ) + .await + .unwrap(); + + let target_region_1 = engine.get_region(target_region_id_1).unwrap(); + let target_region_2 = engine.get_region(target_region_id_2).unwrap(); + let manifest_1 = target_region_1.manifest_ctx.manifest().await; + let manifest_2 = target_region_2.manifest_ctx.manifest().await; + + assert_eq!(manifest_1.files.len(), source_file_count); + assert_eq!(manifest_2.files.len(), source_file_count); + + // Verify duplication via Scan. + let scan_request = ScanRequest::default(); + let stream = engine + .scan_to_stream(target_region_id_1, scan_request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + + let expected = "+-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; + assert_eq!( + batches.pretty_print().unwrap(), + expected, + "actual: {}", + batches.pretty_print().unwrap() + ); + + let stream = engine + .scan_to_stream(target_region_id_2, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + + let expected = "+-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | +| 9 | 9.0 | 1970-01-01T00:00:09 | ++-------+---------+---------------------+"; + assert_eq!( + batches.pretty_print().unwrap(), + expected, + "actual: {}", + batches.pretty_print().unwrap() + ); +} + +#[tokio::test] +async fn test_merge_repartition_data_integrity() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::with_prefix("merge-data").await; + let engine = env.create_engine(MitoConfig::default()).await; + + // Merge R1 and R2 into R1. + let source_region_id_1 = RegionId::new(1, 1); + let source_region_id_2 = RegionId::new(1, 2); + // Target is same as Source 1 + let target_region_id = source_region_id_1; + + let source_partition_expr_1 = col("field_0") + .gt_eq(Value::from(0.00)) + .and(col("field_0").lt(Value::from(10.00))); + let source_partition_expr_2 = col("field_0").gt_eq(Value::from(10.00)); + // Target covers both + let target_partition_expr = col("field_0").gt_eq(Value::from(0.00)); + + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(source_partition_expr_1.as_json_str().unwrap())) + .build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(source_region_id_1, RegionRequest::Create(request)) + .await + .unwrap(); + + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(source_partition_expr_2.as_json_str().unwrap())) + .build(); + engine + .handle_request(source_region_id_2, RegionRequest::Create(request)) + .await + .unwrap(); + + // Insert data into R1: 0..5 + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 5), + }; + put_rows(&engine, source_region_id_1, rows_data).await; + engine + .handle_request( + source_region_id_1, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + // Insert data into R2: 10..15 + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(10, 15), + }; + put_rows(&engine, source_region_id_2, rows_data).await; + engine + .handle_request( + source_region_id_2, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + let source_region_1 = engine.get_region(source_region_id_1).unwrap(); + let source_region_2 = engine.get_region(source_region_id_2).unwrap(); + let source_manifest_1 = source_region_1.manifest_ctx.manifest().await; + let source_manifest_2 = source_region_2.manifest_ctx.manifest().await; + let source_1_file_count = source_manifest_1.files.len(); + let source_2_file_count = source_manifest_2.files.len(); + assert_eq!(source_1_file_count, 1); + assert_eq!(source_2_file_count, 1); + + // Enter staging for target (R1) + engine + .handle_request( + target_region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: target_partition_expr.as_json_str().unwrap(), + }), + ) + .await + .unwrap(); + + // Remap: R1 -> R1, R2 -> R1 + let remap_result = engine + .remap_manifests(RemapManifestsRequest { + region_id: target_region_id, + input_regions: vec![source_region_id_1, source_region_id_2], + region_mapping: [ + (source_region_id_1, vec![target_region_id]), + (source_region_id_2, vec![target_region_id]), + ] + .into_iter() + .collect(), + new_partition_exprs: [( + target_region_id, + target_partition_expr.as_json_str().unwrap(), + )] + .into_iter() + .collect(), + }) + .await + .unwrap(); + + assert_eq!(remap_result.manifest_paths.len(), 1); + + let target_region = engine.get_region(target_region_id).unwrap(); + let manager = target_region.manifest_ctx.manifest_manager.write().await; + let manifest_storage = manager.store(); + let blob_store = manifest_storage.staging_storage().blob_storage(); + + let target_manifest_bytes = blob_store + .get(&remap_result.manifest_paths[&target_region_id]) + .await + .unwrap(); + let target_manifest = serde_json::from_slice::(&target_manifest_bytes).unwrap(); + + assert_eq!( + target_manifest.files.len(), + source_1_file_count + source_2_file_count, + "Target manifest should have all files from both source regions" + ); + + drop(manager); + + engine + .handle_request( + target_region_id, + RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest { + partition_expr: target_partition_expr.as_json_str().unwrap(), + central_region_id: target_region_id, + manifest_path: remap_result.manifest_paths[&target_region_id].clone(), + }), + ) + .await + .unwrap(); + + let target_region = engine.get_region(target_region_id).unwrap(); + let manifest = target_region.manifest_ctx.manifest().await; + + assert_eq!( + manifest.files.len(), + source_1_file_count + source_2_file_count, + "After applying staging manifest, target region should have all files" + ); + + let scan_request = ScanRequest::default(); + let stream = engine + .scan_to_stream(target_region_id, scan_request) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + + let expected = "+-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 10 | 10.0 | 1970-01-01T00:00:10 | +| 11 | 11.0 | 1970-01-01T00:00:11 | +| 12 | 12.0 | 1970-01-01T00:00:12 | +| 13 | 13.0 | 1970-01-01T00:00:13 | +| 14 | 14.0 | 1970-01-01T00:00:14 | ++-------+---------+---------------------+"; + assert_eq!( + batches.pretty_print().unwrap(), + expected, + "actual: {}", + batches.pretty_print().unwrap() + ); +} diff --git a/src/mito2/src/engine/partition_filter_test.rs b/src/mito2/src/engine/partition_filter_test.rs new file mode 100644 index 0000000000..da3d472c99 --- /dev/null +++ b/src/mito2/src/engine/partition_filter_test.rs @@ -0,0 +1,168 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Tests for partition expression filtering in SST readers. + +use api::v1::Rows; +use common_recordbatch::RecordBatches; +use datatypes::value::Value; +use partition::expr::col; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{EnterStagingRequest, RegionFlushRequest, RegionRequest}; +use store_api::storage::{RegionId, ScanRequest}; + +use crate::config::MitoConfig; +use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema}; + +/// Helper to create a partition expression for testing. +/// Creates `col_name >= start AND col_name < end`. +fn range_expr_string(col_name: &str, start: f64, end: f64) -> String { + col(col_name) + .gt_eq(Value::Float64(start.into())) + .and(col(col_name).lt(Value::Float64(end.into()))) + .as_json_str() + .unwrap() +} + +#[tokio::test] +async fn test_partition_filter_basic() { + test_partition_filter_basic_with_format(false).await; + test_partition_filter_basic_with_format(true).await; +} + +/// Test that partition filter correctly filters rows from SST. +/// +/// This test: +/// 1. Creates a region with partition expr `tag_0 >= "0"` (covers all data) +/// 2. Writes data (tag_0 = "0"..="9") and flushes - SST has old partition expr +/// 3. Enters staging mode with partition expr `tag_0 >= "5"` (narrower) +/// 4. Writes more data and flushes in staging mode +/// 5. Exits staging mode - region now has new partition expr +/// 6. Scans all data - old SST data not matching new expr should be filtered +async fn test_partition_filter_basic_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1024, 0); + + // Create region with initial partition expr: tag_0 >= "0" (covers all data) + let initial_partition_expr = range_expr_string("field_0", 0., 99.); + let request = CreateRequestBuilder::new() + .partition_expr_json(Some(initial_partition_expr)) + .build(); + let column_schemas = rows_schema(&request); + + // Create region + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Write initial data (tag_0 = "0".."4") and flush to SST + // This SST will have partition_expr = "tag_0 >= 0" + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 5), + }; + put_rows(&engine, region_id, rows_data).await; + + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + // Enter staging mode with narrower partition expr: tag_0 >= "5" + // After staging, region will only accept data where tag_0 >= "5" + let new_partition_expr = range_expr_string("field_0", 5., 99.); + engine + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_expr: new_partition_expr.clone(), + }), + ) + .await + .unwrap(); + + // Write data in staging mode (tag_0 = "5".."11") + let rows_data = Rows { + schema: column_schemas.clone(), + rows: build_rows(5, 11), + }; + put_rows(&engine, region_id, rows_data).await; + + // Flush in staging mode + engine + .handle_request( + region_id, + RegionRequest::Flush(RegionFlushRequest { + row_group_size: None, + }), + ) + .await + .unwrap(); + + // Scan data in staging mode - should only see initial 5 rows (staging SST not visible) + let request = ScanRequest { + projection: Some(vec![1]), + ..Default::default() + }; + let scanner = engine.scanner(region_id, request).await.unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum(); + assert_eq!( + total_rows, 5, + "Should see 5 rows before exiting staging mode" + ); + + // Exit staging mode + use store_api::region_engine::SettableRegionRoleState; + engine + .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader) + .await + .unwrap(); + + // Scan after exiting staging - the old SST (tag_0 = "0".."4") should have + // rows filtered by partition expr (tag_0 >= "5"), which means none of them pass. + // But the staging SST (tag_0 = "5".."10") satisfies the partition expr. + let request = ScanRequest { + projection: Some(vec![1]), + ..Default::default() + }; + let scanner = engine.scanner(region_id, request).await.unwrap(); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum(); + // After exit: + // - Old SST (rows 0-4): partition expr is "tag_0 >= 5", so these are filtered out + // - Staging SST (rows 5-10): These satisfy partition expr, so they're visible + assert_eq!( + total_rows, 6, + "Should see 6 rows after exiting staging (rows 5-10 from staging SST), flat_format: {}", + flat_format + ); +} diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index 587e25298f..d54f8e4780 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -388,7 +388,7 @@ async fn test_staging_exit_success_with_manifests() { async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); - let partition_expr = default_partition_expr(); + let partition_expr = range_expr("field_0", 0, 100).as_json_str().unwrap(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f6385a292a..22296375d3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -413,6 +413,14 @@ pub enum Error { error: datatypes::arrow::error::ArrowError, }, + #[snafu(display("Failed to evaluate partition filter"))] + EvalPartitionFilter { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: datafusion::error::DataFusionError, + }, + #[snafu(display("Failed to compute vector"))] ComputeVector { #[snafu(implicit)] @@ -1266,6 +1274,7 @@ impl ErrorExt for Error { | Recv { .. } | DecodeWal { .. } | ComputeArrow { .. } + | EvalPartitionFilter { .. } | BiErrors { .. } | StopScheduler { .. } | ComputeVector { .. } diff --git a/src/mito2/src/memtable/bulk/context.rs b/src/mito2/src/memtable/bulk/context.rs index 67187669e3..f1c7d59013 100644 --- a/src/mito2/src/memtable/bulk/context.rs +++ b/src/mito2/src/memtable/bulk/context.rs @@ -98,6 +98,7 @@ impl BulkIterContext { // we don't need to compat batch since all batch in memtable have the same schema. compat_batch: None, pre_filter_mode, + partition_filter: None, }, predicate, }) diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index d779f1ff04..52ea70348a 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -28,7 +28,7 @@ use crate::memtable::bulk::part::EncodedBulkPart; use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder; use crate::memtable::{MemScanMetrics, MemScanMetricsData}; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; -use crate::sst::parquet::file_range::PreFilterMode; +use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState}; use crate::sst::parquet::flat_format::sequence_column_index; use crate::sst::parquet::reader::RowGroupReaderContext; @@ -340,12 +340,15 @@ fn apply_combined_filters( let num_rows = record_batch.num_rows(); let mut combined_filter = None; + let mut tag_decode_state = TagDecodeState::new(); // First, apply predicate filters using the shared method. if !context.base.filters.is_empty() { - let predicate_mask = context - .base - .compute_filter_mask_flat(&record_batch, skip_fields)?; + let predicate_mask = context.base.compute_filter_mask_flat( + &record_batch, + skip_fields, + &mut tag_decode_state, + )?; // If predicate filters out the entire batch, return None early let Some(mask) = predicate_mask else { return Ok(None); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 22cc9fb3ba..82dc475d36 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -119,7 +119,7 @@ impl PruneReader { /// Prunes batches by the pushed down predicate. fn prune(&mut self, batch: Batch) -> Result> { // fast path - if self.context.filters().is_empty() { + if self.context.filters().is_empty() && !self.context.has_partition_filter() { return Ok(Some(batch)); } @@ -315,7 +315,7 @@ impl FlatPruneReader { /// Prunes batches by the pushed down predicate and returns RecordBatch. fn prune_flat(&mut self, record_batch: RecordBatch) -> Result> { // fast path - if self.context.filters().is_empty() { + if self.context.filters().is_empty() && !self.context.has_partition_filter() { return Ok(Some(record_batch)); } diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 1d54eadac6..051c4b6986 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -21,10 +21,12 @@ use std::sync::Arc; use api::v1::{OpType, SemanticType}; use common_telemetry::error; +use datafusion::physical_plan::PhysicalExpr; use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr; use datatypes::arrow::array::{ArrayRef, BooleanArray}; use datatypes::arrow::buffer::BooleanBuffer; use datatypes::arrow::record_batch::RecordBatch; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::Schema; use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec}; use parquet::arrow::arrow_reader::RowSelection; @@ -36,8 +38,9 @@ use store_api::storage::{ColumnId, TimeSeriesRowSelector}; use table::predicate::Predicate; use crate::error::{ - ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, - Result, StatsNotPresentSnafu, + ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, + EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu, + UnexpectedSnafu, }; use crate::read::Batch; use crate::read::compat::CompatBatch; @@ -301,6 +304,11 @@ impl FileRangeContext { &self.base.filters } + /// Returns true if a partition filter is configured. + pub(crate) fn has_partition_filter(&self) -> bool { + self.base.partition_filter.is_some() + } + /// Returns the format helper. pub(crate) fn read_format(&self) -> &ReadFormat { &self.base.read_format @@ -323,11 +331,13 @@ impl FileRangeContext { /// TRY THE BEST to perform pushed down predicate precisely on the input batch. /// Return the filtered batch. If the entire batch is filtered out, return None. + /// If a partition expr filter is configured, it is also applied. pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result> { self.base.precise_filter(input, skip_fields) } /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. + /// If a partition expr filter is configured, it is also applied. pub(crate) fn precise_filter_flat( &self, input: RecordBatch, @@ -367,6 +377,14 @@ pub enum PreFilterMode { SkipFields, } +/// Context for partition expression filtering. +pub(crate) struct PartitionFilterContext { + pub(crate) region_partition_physical_expr: Arc, + /// Schema containing only columns referenced by the partition expression. + /// This is used to build a minimal RecordBatch for partition filter evaluation. + pub(crate) partition_schema: Arc, +} + /// Common fields for a range to read and filter batches. pub(crate) struct RangeBase { /// Filters pushed down. @@ -384,6 +402,22 @@ pub(crate) struct RangeBase { pub(crate) compat_batch: Option, /// Mode to pre-filter columns. pub(crate) pre_filter_mode: PreFilterMode, + /// Partition filter. + pub(crate) partition_filter: Option, +} + +pub(crate) struct TagDecodeState { + decoded_pks: Option, + decoded_tag_cache: HashMap, +} + +impl TagDecodeState { + pub(crate) fn new() -> Self { + Self { + decoded_pks: None, + decoded_tag_cache: HashMap::new(), + } + } } impl RangeBase { @@ -483,9 +517,36 @@ impl RangeBase { mask = mask.bitand(&result); } - input.filter(&BooleanArray::from(mask).into())?; + if mask.count_set_bits() == 0 { + return Ok(None); + } - Ok(Some(input)) + // Apply partition filter + if let Some(partition_filter) = &self.partition_filter { + let partition_result = self + .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema) + .and_then(|record_batch| { + self.evaluate_partition_filter(&record_batch, partition_filter) + }); + match partition_result { + Ok(partition_mask) => { + mask = mask.bitand(&partition_mask); + } + Err(err) => { + // FIXME(yingwen): due to a known bug, if partition expr include field column, and this field column is not in projection + // we will fail to evaluate partition filter since column is missing. + // we had to overlook the error for now. + error!(err; "Failed to evaluate partition filter, skip partition filtering"); + } + } + } + + if mask.count_set_bits() == 0 { + Ok(None) + } else { + input.filter(&BooleanArray::from(mask).into())?; + Ok(Some(input)) + } } /// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch. @@ -500,13 +561,42 @@ impl RangeBase { input: RecordBatch, skip_fields: bool, ) -> Result> { - let mask = self.compute_filter_mask_flat(&input, skip_fields)?; + let mut tag_decode_state = TagDecodeState::new(); + let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?; // If mask is None, the entire batch is filtered out - let Some(mask) = mask else { + let Some(mut mask) = mask else { return Ok(None); }; + // Apply partition filter + if let Some(partition_filter) = &self.partition_filter { + let partition_result = self + .project_record_batch_for_pruning_flat( + &input, + &partition_filter.partition_schema, + &mut tag_decode_state, + ) + .and_then(|record_batch| { + self.evaluate_partition_filter(&record_batch, partition_filter) + }); + match partition_result { + Ok(partition_mask) => { + mask = mask.bitand(&partition_mask); + } + Err(err) => { + // FIXME(yingwen): due to a known bug, if partition expr include field column, and this field column is not in projection + // we will fail to evaluate partition filter since column is missing. + // we had to overlook the error for now. + error!(err; "Failed to evaluate partition filter, skip partition filtering"); + } + } + } + + if mask.count_set_bits() == 0 { + return Ok(None); + } + let filtered_batch = datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask)) .context(ComputeArrowSnafu)?; @@ -519,6 +609,7 @@ impl RangeBase { } /// Computes the filter mask for the input RecordBatch based on pushed down predicates. + /// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function. /// /// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask. /// @@ -529,6 +620,7 @@ impl RangeBase { &self, input: &RecordBatch, skip_fields: bool, + tag_decode_state: &mut TagDecodeState, ) -> Result> { let mut mask = BooleanBuffer::new_set(input.num_rows()); @@ -538,11 +630,7 @@ impl RangeBase { .context(crate::error::UnexpectedSnafu { reason: "Expected flat format for precise_filter_flat", })?; - - // Decodes primary keys once if we have any tag filters not in projection - let mut decoded_pks: Option = None; - // Cache decoded tag arrays by column id to avoid redundant decoding - let mut decoded_tag_cache: HashMap = HashMap::new(); + let metadata = flat_format.metadata(); // Run filter one by one and combine them result for filter_ctx in &self.filters { @@ -567,47 +655,16 @@ impl RangeBase { mask = mask.bitand(&result); } else if filter_ctx.semantic_type() == SemanticType::Tag { // Column not found in projection, it may be a tag column. - // Decodes primary keys if not already decoded. - if decoded_pks.is_none() { - decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?); - } - - let metadata = flat_format.metadata(); let column_id = filter_ctx.column_id(); - // Check cache first - let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) { - cached_column.clone() - } else { - // For dense encoding, we need pk_index. For sparse encoding, pk_index is None. - let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse { - None - } else { - metadata.primary_key_index(column_id) - }; - let column_index = metadata.column_index_by_id(column_id); - - if let (Some(column_index), Some(decoded)) = - (column_index, decoded_pks.as_ref()) - { - let column_metadata = &metadata.column_metadatas[column_index]; - let tag_column = decoded.get_tag_column( - column_id, - pk_index, - &column_metadata.column_schema.data_type, - )?; - // Cache the decoded tag column - decoded_tag_cache.insert(column_id, tag_column.clone()); - tag_column - } else { - continue; - } - }; - - let result = filter - .evaluate_array(&tag_column) - .context(RecordBatchSnafu)?; - mask = mask.bitand(&result); + if let Some(tag_column) = + self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)? + { + let result = filter + .evaluate_array(&tag_column) + .context(RecordBatchSnafu)?; + mask = mask.bitand(&result); + } } else if filter_ctx.semantic_type() == SemanticType::Timestamp { let time_index_pos = time_index_column_index(input.num_columns()); let column = &input.columns()[time_index_pos]; @@ -619,4 +676,218 @@ impl RangeBase { Ok(Some(mask)) } + + /// Returns the decoded tag column for `column_id`, or `None` if it's not a tag. + fn maybe_decode_tag_column( + &self, + metadata: &RegionMetadataRef, + column_id: ColumnId, + input: &RecordBatch, + tag_decode_state: &mut TagDecodeState, + ) -> Result> { + let Some(pk_index) = metadata.primary_key_index(column_id) else { + return Ok(None); + }; + + if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) { + return Ok(Some(cached_column.clone())); + } + + if tag_decode_state.decoded_pks.is_none() { + tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?); + } + + let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse { + None + } else { + Some(pk_index) + }; + let Some(column_index) = metadata.column_index_by_id(column_id) else { + return Ok(None); + }; + let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else { + return Ok(None); + }; + + let column_metadata = &metadata.column_metadatas[column_index]; + let tag_column = decoded.get_tag_column( + column_id, + pk_index, + &column_metadata.column_schema.data_type, + )?; + tag_decode_state + .decoded_tag_cache + .insert(column_id, tag_column.clone()); + + Ok(Some(tag_column)) + } + + /// Evaluates the partition filter against the input `RecordBatch`. + fn evaluate_partition_filter( + &self, + record_batch: &RecordBatch, + partition_filter: &PartitionFilterContext, + ) -> Result { + let columnar_value = partition_filter + .region_partition_physical_expr + .evaluate(record_batch) + .context(EvalPartitionFilterSnafu)?; + let array = columnar_value + .into_array(record_batch.num_rows()) + .context(EvalPartitionFilterSnafu)?; + let boolean_array = + array + .as_any() + .downcast_ref::() + .context(UnexpectedSnafu { + reason: "Failed to downcast to BooleanArray".to_string(), + })?; + + Ok(boolean_array.values().clone()) + } + + /// Builds a `RecordBatch` from the input `Batch` matching the given schema. + /// + /// This is used for partition expression evaluation. The schema should only contain + /// the columns referenced by the partition expression to minimize overhead. + fn build_record_batch_for_pruning( + &self, + input: &mut Batch, + schema: &Arc, + ) -> Result { + let arrow_schema = schema.arrow_schema(); + let mut columns = Vec::with_capacity(arrow_schema.fields().len()); + + // Decode primary key if necessary. + if input.pk_values().is_none() { + input.set_pk_values( + self.codec + .decode(input.primary_key()) + .context(DecodeSnafu)?, + ); + } + + for field in arrow_schema.fields() { + let metadata = self.read_format.metadata(); + let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id); + + // Partition pruning schema should be a subset of the input batch schema. + let Some(column_id) = column_id else { + return UnexpectedSnafu { + reason: format!( + "Partition pruning schema expects column '{}' but it is missing in \ + region metadata", + field.name() + ), + } + .fail(); + }; + + // 1. Check if it's a tag. + if let Some(pk_index) = metadata.primary_key_index(column_id) { + let pk_values = input.pk_values().unwrap(); + let value = match pk_values { + CompositeValues::Dense(v) => &v[pk_index].1, + CompositeValues::Sparse(v) => v.get_or_null(column_id), + }; + let concrete_type = ConcreteDataType::from_arrow_type(field.data_type()); + let arrow_scalar = value + .try_to_scalar_value(&concrete_type) + .context(DataTypeMismatchSnafu)?; + let array = arrow_scalar + .to_array_of_size(input.num_rows()) + .context(EvalPartitionFilterSnafu)?; + columns.push(array); + } else if metadata.time_index_column().column_id == column_id { + // 2. Check if it's the timestamp column. + columns.push(input.timestamps().to_arrow_array()); + } else if let Some(field_index) = self + .read_format + .as_primary_key() + .and_then(|f| f.field_index_by_id(column_id)) + { + // 3. Check if it's a field column. + columns.push(input.fields()[field_index].data.to_arrow_array()); + } else { + return UnexpectedSnafu { + reason: format!( + "Partition pruning schema expects column '{}' (id {}) but it is not \ + present in input batch", + field.name(), + column_id + ), + } + .fail(); + } + } + + RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu) + } + + /// Projects the input `RecordBatch` to match the given schema. + /// + /// This is used for partition expression evaluation. The schema should only contain + /// the columns referenced by the partition expression to minimize overhead. + fn project_record_batch_for_pruning_flat( + &self, + input: &RecordBatch, + schema: &Arc, + tag_decode_state: &mut TagDecodeState, + ) -> Result { + let arrow_schema = schema.arrow_schema(); + let mut columns = Vec::with_capacity(arrow_schema.fields().len()); + + let flat_format = self + .read_format + .as_flat() + .context(crate::error::UnexpectedSnafu { + reason: "Expected flat format for precise_filter_flat", + })?; + let metadata = flat_format.metadata(); + + for field in arrow_schema.fields() { + let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id); + + let Some(column_id) = column_id else { + return UnexpectedSnafu { + reason: format!( + "Partition pruning schema expects column '{}' but it is missing in \ + region metadata", + field.name() + ), + } + .fail(); + }; + + if let Some(idx) = flat_format.projected_index_by_id(column_id) { + columns.push(input.column(idx).clone()); + continue; + } + + if metadata.time_index_column().column_id == column_id { + let time_index_pos = time_index_column_index(input.num_columns()); + columns.push(input.column(time_index_pos).clone()); + continue; + } + + if let Some(tag_column) = + self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)? + { + columns.push(tag_column); + continue; + } + + return UnexpectedSnafu { + reason: format!( + "Partition pruning schema expects column '{}' (id {}) but it is not \ + present in projected record batch", + field.name(), + column_id + ), + } + .fail(); + } + + RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu) + } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 1545a240cc..9be2907099 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -16,7 +16,7 @@ #[cfg(feature = "vector_index")] use std::collections::BTreeSet; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -26,9 +26,11 @@ use common_recordbatch::filter::SimpleFilterEvaluator; use common_telemetry::{debug, tracing, warn}; use datafusion_expr::Expr; use datatypes::arrow::array::ArrayRef; +use datatypes::arrow::datatypes::Field; use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::DataType; use mito_codec::row_converter::build_primary_key_codec; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; @@ -46,7 +48,7 @@ use crate::cache::index::result_cache::PredicateKey; use crate::error::ApplyVectorIndexSnafu; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, - ReadParquetSnafu, Result, + ReadParquetSnafu, Result, SerializePartitionExprSnafu, }; use crate::metrics::{ PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL, @@ -67,7 +69,8 @@ use crate::sst::index::inverted_index::applier::{ #[cfg(feature = "vector_index")] use crate::sst::index::vector_index::applier::VectorIndexApplierRef; use crate::sst::parquet::file_range::{ - FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete, + FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, + row_group_contains_delete, }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; @@ -75,6 +78,7 @@ use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics}; use crate::sst::parquet::row_selection::RowGroupSelection; use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; +use crate::sst::tag_maybe_to_dictionary_field; const INDEX_TYPE_FULLTEXT: &str = "fulltext"; const INDEX_TYPE_INVERTED: &str = "inverted"; @@ -438,6 +442,8 @@ impl ParquetReaderBuilder { let codec = build_primary_key_codec(read_format.metadata()); + let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?; + let context = FileRangeContext::new( reader_builder, RangeBase { @@ -449,6 +455,7 @@ impl ParquetReaderBuilder { codec, compat_batch: None, pre_filter_mode: self.pre_filter_mode, + partition_filter, }, ); @@ -457,6 +464,76 @@ impl ParquetReaderBuilder { Ok((context, selection)) } + /// Compare partition expressions from expected metadata and file metadata, + /// and build a partition filter if they differ. + fn build_partition_filter( + &self, + read_format: &ReadFormat, + prune_schema: &Arc, + ) -> Result> { + let region_partition_expr_str = self + .expected_metadata + .as_ref() + .and_then(|meta| meta.partition_expr.as_ref()); + let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref(); + + let Some(region_str) = region_partition_expr_str else { + return Ok(None); + }; + + let Some(region_partition_expr) = crate::region::parse_partition_expr(Some(region_str))? + else { + return Ok(None); + }; + + if Some(®ion_partition_expr) == file_partition_expr_ref { + return Ok(None); + } + + // Collect columns referenced by the partition expression. + let mut referenced_columns = HashSet::new(); + region_partition_expr.collect_column_names(&mut referenced_columns); + + // Build a partition_schema containing only referenced columns. + let is_flat = read_format.as_flat().is_some(); + let partition_schema = Arc::new(datatypes::schema::Schema::new( + prune_schema + .column_schemas() + .iter() + .filter(|col| referenced_columns.contains(&col.name)) + .map(|col| { + if is_flat + && let Some(column_meta) = read_format.metadata().column_by_name(&col.name) + && column_meta.semantic_type == SemanticType::Tag + && col.data_type.is_string() + { + let field = Arc::new(Field::new( + &col.name, + col.data_type.as_arrow_type(), + col.is_nullable(), + )); + let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field); + let mut column = col.clone(); + column.data_type = + ConcreteDataType::from_arrow_type(dict_field.data_type()); + return column; + } + + col.clone() + }) + .collect::>(), + )); + + let region_partition_physical_expr = region_partition_expr + .try_as_physical_expr(partition_schema.arrow_schema()) + .context(SerializePartitionExprSnafu)?; + + Ok(Some(PartitionFilterContext { + region_partition_physical_expr, + partition_schema, + })) + } + /// Decodes region metadata from key value. fn get_region_metadata( file_path: &str, diff --git a/src/partition/src/expr.rs b/src/partition/src/expr.rs index 56e77e3490..1e7b573acb 100644 --- a/src/partition/src/expr.rs +++ b/src/partition/src/expr.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; @@ -405,6 +406,24 @@ impl PartitionExpr { ..Default::default() }) } + + /// Collects all column names referenced by this expression. + pub fn collect_column_names(&self, columns: &mut HashSet) { + Self::collect_operand_columns(&self.lhs, columns); + Self::collect_operand_columns(&self.rhs, columns); + } + + fn collect_operand_columns(operand: &Operand, columns: &mut HashSet) { + match operand { + Operand::Column(c) => { + columns.insert(c.clone()); + } + Operand::Expr(e) => { + e.collect_column_names(columns); + } + Operand::Value(_) => {} + } + } } impl Display for PartitionExpr { @@ -620,4 +639,45 @@ mod tests { let expr5 = PartitionExpr::from_json_str(json).unwrap(); assert!(expr5.is_none()); } + + #[test] + fn test_collect_column_names() { + // Simple expression: col_a = 1 should give {col_a} + let expr = col("a").eq(Value::Int64(1)); + let mut columns = HashSet::new(); + expr.collect_column_names(&mut columns); + assert_eq!(columns.len(), 1); + assert!(columns.contains("a")); + + // Compound AND with same column: col_a >= 0 AND col_a < 10 should give {col_a} + let expr = col("a") + .gt_eq(Value::Int64(0)) + .and(col("a").lt(Value::Int64(10))); + let mut columns = HashSet::new(); + expr.collect_column_names(&mut columns); + assert_eq!(columns.len(), 1); + assert!(columns.contains("a")); + + // Multiple columns: col_a >= 0 AND col_b < 10 should give {col_a, col_b} + let expr = col("a") + .gt_eq(Value::Int64(0)) + .and(col("b").lt(Value::Int64(10))); + let mut columns = HashSet::new(); + expr.collect_column_names(&mut columns); + assert_eq!(columns.len(), 2); + assert!(columns.contains("a")); + assert!(columns.contains("b")); + + // Nested expression: (col_a >= 0 AND col_b < 10) AND col_c = 5 + let expr = col("a") + .gt_eq(Value::Int64(0)) + .and(col("b").lt(Value::Int64(10))) + .and(col("c").eq(Value::Int64(5))); + let mut columns = HashSet::new(); + expr.collect_column_names(&mut columns); + assert_eq!(columns.len(), 3); + assert!(columns.contains("a")); + assert!(columns.contains("b")); + assert!(columns.contains("c")); + } }