feat: exact partition filter (#7571)

* feat(mito2): add repartition tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: filter(VIBED NOT REVIEW YET)

Signed-off-by: discord9 <discord9@163.com>

* feat: only use related columns

Signed-off-by: discord9 <discord9@163.com>

* feat: add partition filter tests and enhance pruning logic

Signed-off-by: discord9 <discord9@163.com>

* pre review

Signed-off-by: discord9 <discord9@163.com>

* feat: refine partition filter logic and update related function names

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* c

Signed-off-by: discord9 <discord9@163.com>

* rm useless test

Signed-off-by: discord9 <discord9@163.com>

* feat: enhance partition filter error handling to skip failures

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* test: use real column

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

* feat: add TagDecodeState initialization to filter processing

Signed-off-by: discord9 <discord9@163.com>

* chore: update test doc

Signed-off-by: discord9 <discord9@163.com>

* per review

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
discord9
2026-01-19 21:06:32 +08:00
committed by GitHub
parent 653740b42a
commit d916409d04
11 changed files with 1014 additions and 60 deletions

View File

@@ -80,6 +80,8 @@ mod remap_manifests_test;
#[cfg(test)]
mod apply_staging_manifest_test;
#[cfg(test)]
mod partition_filter_test;
mod puffin_index;
use std::any::Any;

View File

@@ -16,6 +16,7 @@ use std::assert_matches::assert_matches;
use std::fs;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use partition::expr::{PartitionExpr, col};
use store_api::region_engine::{
@@ -26,6 +27,7 @@ use store_api::region_request::{
};
use store_api::storage::{FileId, RegionId};
use super::ScanRequest;
use crate::config::MitoConfig;
use crate::error::Error;
use crate::manifest::action::RegionManifest;
@@ -451,3 +453,364 @@ async fn test_apply_staging_manifest_invalid_files_to_add_with_format(flat_forma
Error::SerdeJson { .. }
);
}
#[tokio::test]
async fn test_split_repartition_causes_duplicate_data() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("split-duplicate").await;
let engine = env.create_engine(MitoConfig::default()).await;
let source_region_id = RegionId::new(1, 1);
let target_region_id_1 = RegionId::new(1, 1);
let target_region_id_2 = RegionId::new(1, 3);
// Use field_0 (i64) for partitioning.
let source_partition_expr = col("field_0").lt(Value::from(10.00));
let target_partition_expr_1 = col("field_0").lt(Value::from(5.00));
let target_partition_expr_2 = col("field_0")
.gt_eq(Value::from(5.00))
.and(col("field_0").lt(Value::from(10.00)));
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(source_partition_expr.as_json_str().unwrap()))
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(source_region_id, RegionRequest::Create(request))
.await
.unwrap();
// Insert 0..10
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
};
put_rows(&engine, source_region_id, rows_data).await;
engine
.handle_request(
source_region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let source_region = engine.get_region(source_region_id).unwrap();
let source_manifest = source_region.manifest_ctx.manifest().await;
let source_file_count = source_manifest.files.len();
assert_eq!(source_file_count, 1);
engine
.handle_request(
source_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: target_partition_expr_1.as_json_str().unwrap(),
}),
)
.await
.unwrap();
let remap_result = engine
.remap_manifests(RemapManifestsRequest {
region_id: source_region_id,
input_regions: vec![source_region_id],
region_mapping: [(
source_region_id,
vec![target_region_id_1, target_region_id_2],
)]
.into_iter()
.collect(),
new_partition_exprs: [
(
target_region_id_1,
target_partition_expr_1.as_json_str().unwrap(),
),
(
target_region_id_2,
target_partition_expr_2.as_json_str().unwrap(),
),
]
.into_iter()
.collect(),
})
.await
.unwrap();
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(target_partition_expr_2.as_json_str().unwrap()))
.build();
engine
.handle_request(target_region_id_2, RegionRequest::Create(request))
.await
.unwrap();
engine
.handle_request(
target_region_id_2,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: target_partition_expr_2.as_json_str().unwrap(),
}),
)
.await
.unwrap();
engine
.handle_request(
target_region_id_2,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: target_partition_expr_2.as_json_str().unwrap(),
central_region_id: source_region_id,
manifest_path: remap_result.manifest_paths[&target_region_id_2].clone(),
}),
)
.await
.unwrap();
engine
.handle_request(
target_region_id_1,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: target_partition_expr_1.as_json_str().unwrap(),
central_region_id: source_region_id,
manifest_path: remap_result.manifest_paths[&target_region_id_1].clone(),
}),
)
.await
.unwrap();
let target_region_1 = engine.get_region(target_region_id_1).unwrap();
let target_region_2 = engine.get_region(target_region_id_2).unwrap();
let manifest_1 = target_region_1.manifest_ctx.manifest().await;
let manifest_2 = target_region_2.manifest_ctx.manifest().await;
assert_eq!(manifest_1.files.len(), source_file_count);
assert_eq!(manifest_2.files.len(), source_file_count);
// Verify duplication via Scan.
let scan_request = ScanRequest::default();
let stream = engine
.scan_to_stream(target_region_id_1, scan_request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+";
assert_eq!(
batches.pretty_print().unwrap(),
expected,
"actual: {}",
batches.pretty_print().unwrap()
);
let stream = engine
.scan_to_stream(target_region_id_2, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+";
assert_eq!(
batches.pretty_print().unwrap(),
expected,
"actual: {}",
batches.pretty_print().unwrap()
);
}
#[tokio::test]
async fn test_merge_repartition_data_integrity() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("merge-data").await;
let engine = env.create_engine(MitoConfig::default()).await;
// Merge R1 and R2 into R1.
let source_region_id_1 = RegionId::new(1, 1);
let source_region_id_2 = RegionId::new(1, 2);
// Target is same as Source 1
let target_region_id = source_region_id_1;
let source_partition_expr_1 = col("field_0")
.gt_eq(Value::from(0.00))
.and(col("field_0").lt(Value::from(10.00)));
let source_partition_expr_2 = col("field_0").gt_eq(Value::from(10.00));
// Target covers both
let target_partition_expr = col("field_0").gt_eq(Value::from(0.00));
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(source_partition_expr_1.as_json_str().unwrap()))
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(source_region_id_1, RegionRequest::Create(request))
.await
.unwrap();
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(source_partition_expr_2.as_json_str().unwrap()))
.build();
engine
.handle_request(source_region_id_2, RegionRequest::Create(request))
.await
.unwrap();
// Insert data into R1: 0..5
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 5),
};
put_rows(&engine, source_region_id_1, rows_data).await;
engine
.handle_request(
source_region_id_1,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Insert data into R2: 10..15
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(10, 15),
};
put_rows(&engine, source_region_id_2, rows_data).await;
engine
.handle_request(
source_region_id_2,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
let source_region_1 = engine.get_region(source_region_id_1).unwrap();
let source_region_2 = engine.get_region(source_region_id_2).unwrap();
let source_manifest_1 = source_region_1.manifest_ctx.manifest().await;
let source_manifest_2 = source_region_2.manifest_ctx.manifest().await;
let source_1_file_count = source_manifest_1.files.len();
let source_2_file_count = source_manifest_2.files.len();
assert_eq!(source_1_file_count, 1);
assert_eq!(source_2_file_count, 1);
// Enter staging for target (R1)
engine
.handle_request(
target_region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: target_partition_expr.as_json_str().unwrap(),
}),
)
.await
.unwrap();
// Remap: R1 -> R1, R2 -> R1
let remap_result = engine
.remap_manifests(RemapManifestsRequest {
region_id: target_region_id,
input_regions: vec![source_region_id_1, source_region_id_2],
region_mapping: [
(source_region_id_1, vec![target_region_id]),
(source_region_id_2, vec![target_region_id]),
]
.into_iter()
.collect(),
new_partition_exprs: [(
target_region_id,
target_partition_expr.as_json_str().unwrap(),
)]
.into_iter()
.collect(),
})
.await
.unwrap();
assert_eq!(remap_result.manifest_paths.len(), 1);
let target_region = engine.get_region(target_region_id).unwrap();
let manager = target_region.manifest_ctx.manifest_manager.write().await;
let manifest_storage = manager.store();
let blob_store = manifest_storage.staging_storage().blob_storage();
let target_manifest_bytes = blob_store
.get(&remap_result.manifest_paths[&target_region_id])
.await
.unwrap();
let target_manifest = serde_json::from_slice::<RegionManifest>(&target_manifest_bytes).unwrap();
assert_eq!(
target_manifest.files.len(),
source_1_file_count + source_2_file_count,
"Target manifest should have all files from both source regions"
);
drop(manager);
engine
.handle_request(
target_region_id,
RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
partition_expr: target_partition_expr.as_json_str().unwrap(),
central_region_id: target_region_id,
manifest_path: remap_result.manifest_paths[&target_region_id].clone(),
}),
)
.await
.unwrap();
let target_region = engine.get_region(target_region_id).unwrap();
let manifest = target_region.manifest_ctx.manifest().await;
assert_eq!(
manifest.files.len(),
source_1_file_count + source_2_file_count,
"After applying staging manifest, target region should have all files"
);
let scan_request = ScanRequest::default();
let stream = engine
.scan_to_stream(target_region_id, scan_request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 10 | 10.0 | 1970-01-01T00:00:10 |
| 11 | 11.0 | 1970-01-01T00:00:11 |
| 12 | 12.0 | 1970-01-01T00:00:12 |
| 13 | 13.0 | 1970-01-01T00:00:13 |
| 14 | 14.0 | 1970-01-01T00:00:14 |
+-------+---------+---------------------+";
assert_eq!(
batches.pretty_print().unwrap(),
expected,
"actual: {}",
batches.pretty_print().unwrap()
);
}

View File

@@ -0,0 +1,168 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Tests for partition expression filtering in SST readers.
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use datatypes::value::Value;
use partition::expr::col;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{EnterStagingRequest, RegionFlushRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::{CreateRequestBuilder, TestEnv, build_rows, put_rows, rows_schema};
/// Helper to create a partition expression for testing.
/// Creates `col_name >= start AND col_name < end`.
fn range_expr_string(col_name: &str, start: f64, end: f64) -> String {
col(col_name)
.gt_eq(Value::Float64(start.into()))
.and(col(col_name).lt(Value::Float64(end.into())))
.as_json_str()
.unwrap()
}
#[tokio::test]
async fn test_partition_filter_basic() {
test_partition_filter_basic_with_format(false).await;
test_partition_filter_basic_with_format(true).await;
}
/// Test that partition filter correctly filters rows from SST.
///
/// This test:
/// 1. Creates a region with partition expr `tag_0 >= "0"` (covers all data)
/// 2. Writes data (tag_0 = "0"..="9") and flushes - SST has old partition expr
/// 3. Enters staging mode with partition expr `tag_0 >= "5"` (narrower)
/// 4. Writes more data and flushes in staging mode
/// 5. Exits staging mode - region now has new partition expr
/// 6. Scans all data - old SST data not matching new expr should be filtered
async fn test_partition_filter_basic_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1024, 0);
// Create region with initial partition expr: tag_0 >= "0" (covers all data)
let initial_partition_expr = range_expr_string("field_0", 0., 99.);
let request = CreateRequestBuilder::new()
.partition_expr_json(Some(initial_partition_expr))
.build();
let column_schemas = rows_schema(&request);
// Create region
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Write initial data (tag_0 = "0".."4") and flush to SST
// This SST will have partition_expr = "tag_0 >= 0"
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 5),
};
put_rows(&engine, region_id, rows_data).await;
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Enter staging mode with narrower partition expr: tag_0 >= "5"
// After staging, region will only accept data where tag_0 >= "5"
let new_partition_expr = range_expr_string("field_0", 5., 99.);
engine
.handle_request(
region_id,
RegionRequest::EnterStaging(EnterStagingRequest {
partition_expr: new_partition_expr.clone(),
}),
)
.await
.unwrap();
// Write data in staging mode (tag_0 = "5".."11")
let rows_data = Rows {
schema: column_schemas.clone(),
rows: build_rows(5, 11),
};
put_rows(&engine, region_id, rows_data).await;
// Flush in staging mode
engine
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
// Scan data in staging mode - should only see initial 5 rows (staging SST not visible)
let request = ScanRequest {
projection: Some(vec![1]),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(
total_rows, 5,
"Should see 5 rows before exiting staging mode"
);
// Exit staging mode
use store_api::region_engine::SettableRegionRoleState;
engine
.set_region_role_state_gracefully(region_id, SettableRegionRoleState::Leader)
.await
.unwrap();
// Scan after exiting staging - the old SST (tag_0 = "0".."4") should have
// rows filtered by partition expr (tag_0 >= "5"), which means none of them pass.
// But the staging SST (tag_0 = "5".."10") satisfies the partition expr.
let request = ScanRequest {
projection: Some(vec![1]),
..Default::default()
};
let scanner = engine.scanner(region_id, request).await.unwrap();
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let total_rows: usize = batches.iter().map(|rb| rb.num_rows()).sum();
// After exit:
// - Old SST (rows 0-4): partition expr is "tag_0 >= 5", so these are filtered out
// - Staging SST (rows 5-10): These satisfy partition expr, so they're visible
assert_eq!(
total_rows, 6,
"Should see 6 rows after exiting staging (rows 5-10 from staging SST), flat_format: {}",
flat_format
);
}

View File

@@ -388,7 +388,7 @@ async fn test_staging_exit_success_with_manifests() {
async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let partition_expr = default_partition_expr();
let partition_expr = range_expr("field_0", 0, 100).as_json_str().unwrap();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {

View File

@@ -413,6 +413,14 @@ pub enum Error {
error: datatypes::arrow::error::ArrowError,
},
#[snafu(display("Failed to evaluate partition filter"))]
EvalPartitionFilter {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: datafusion::error::DataFusionError,
},
#[snafu(display("Failed to compute vector"))]
ComputeVector {
#[snafu(implicit)]
@@ -1266,6 +1274,7 @@ impl ErrorExt for Error {
| Recv { .. }
| DecodeWal { .. }
| ComputeArrow { .. }
| EvalPartitionFilter { .. }
| BiErrors { .. }
| StopScheduler { .. }
| ComputeVector { .. }

View File

@@ -98,6 +98,7 @@ impl BulkIterContext {
// we don't need to compat batch since all batch in memtable have the same schema.
compat_batch: None,
pre_filter_mode,
partition_filter: None,
},
predicate,
})

View File

@@ -28,7 +28,7 @@ use crate::memtable::bulk::part::EncodedBulkPart;
use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
use crate::memtable::{MemScanMetrics, MemScanMetricsData};
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::sst::parquet::file_range::PreFilterMode;
use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
use crate::sst::parquet::flat_format::sequence_column_index;
use crate::sst::parquet::reader::RowGroupReaderContext;
@@ -340,12 +340,15 @@ fn apply_combined_filters(
let num_rows = record_batch.num_rows();
let mut combined_filter = None;
let mut tag_decode_state = TagDecodeState::new();
// First, apply predicate filters using the shared method.
if !context.base.filters.is_empty() {
let predicate_mask = context
.base
.compute_filter_mask_flat(&record_batch, skip_fields)?;
let predicate_mask = context.base.compute_filter_mask_flat(
&record_batch,
skip_fields,
&mut tag_decode_state,
)?;
// If predicate filters out the entire batch, return None early
let Some(mask) = predicate_mask else {
return Ok(None);

View File

@@ -119,7 +119,7 @@ impl PruneReader {
/// Prunes batches by the pushed down predicate.
fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
// fast path
if self.context.filters().is_empty() {
if self.context.filters().is_empty() && !self.context.has_partition_filter() {
return Ok(Some(batch));
}
@@ -315,7 +315,7 @@ impl FlatPruneReader {
/// Prunes batches by the pushed down predicate and returns RecordBatch.
fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
// fast path
if self.context.filters().is_empty() {
if self.context.filters().is_empty() && !self.context.has_partition_filter() {
return Ok(Some(record_batch));
}

View File

@@ -21,10 +21,12 @@ use std::sync::Arc;
use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use datafusion::physical_plan::PhysicalExpr;
use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::Schema;
use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
use parquet::arrow::arrow_reader::RowSelection;
@@ -36,8 +38,9 @@ use store_api::storage::{ColumnId, TimeSeriesRowSelector};
use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
Result, StatsNotPresentSnafu,
ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
UnexpectedSnafu,
};
use crate::read::Batch;
use crate::read::compat::CompatBatch;
@@ -301,6 +304,11 @@ impl FileRangeContext {
&self.base.filters
}
/// Returns true if a partition filter is configured.
pub(crate) fn has_partition_filter(&self) -> bool {
self.base.partition_filter.is_some()
}
/// Returns the format helper.
pub(crate) fn read_format(&self) -> &ReadFormat {
&self.base.read_format
@@ -323,11 +331,13 @@ impl FileRangeContext {
/// TRY THE BEST to perform pushed down predicate precisely on the input batch.
/// Return the filtered batch. If the entire batch is filtered out, return None.
/// If a partition expr filter is configured, it is also applied.
pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
self.base.precise_filter(input, skip_fields)
}
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
/// If a partition expr filter is configured, it is also applied.
pub(crate) fn precise_filter_flat(
&self,
input: RecordBatch,
@@ -367,6 +377,14 @@ pub enum PreFilterMode {
SkipFields,
}
/// Context for partition expression filtering.
pub(crate) struct PartitionFilterContext {
pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
/// Schema containing only columns referenced by the partition expression.
/// This is used to build a minimal RecordBatch for partition filter evaluation.
pub(crate) partition_schema: Arc<Schema>,
}
/// Common fields for a range to read and filter batches.
pub(crate) struct RangeBase {
/// Filters pushed down.
@@ -384,6 +402,22 @@ pub(crate) struct RangeBase {
pub(crate) compat_batch: Option<CompatBatch>,
/// Mode to pre-filter columns.
pub(crate) pre_filter_mode: PreFilterMode,
/// Partition filter.
pub(crate) partition_filter: Option<PartitionFilterContext>,
}
pub(crate) struct TagDecodeState {
decoded_pks: Option<DecodedPrimaryKeys>,
decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
}
impl TagDecodeState {
pub(crate) fn new() -> Self {
Self {
decoded_pks: None,
decoded_tag_cache: HashMap::new(),
}
}
}
impl RangeBase {
@@ -483,9 +517,36 @@ impl RangeBase {
mask = mask.bitand(&result);
}
input.filter(&BooleanArray::from(mask).into())?;
if mask.count_set_bits() == 0 {
return Ok(None);
}
Ok(Some(input))
// Apply partition filter
if let Some(partition_filter) = &self.partition_filter {
let partition_result = self
.build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)
.and_then(|record_batch| {
self.evaluate_partition_filter(&record_batch, partition_filter)
});
match partition_result {
Ok(partition_mask) => {
mask = mask.bitand(&partition_mask);
}
Err(err) => {
// FIXME(yingwen): due to a known bug, if partition expr include field column, and this field column is not in projection
// we will fail to evaluate partition filter since column is missing.
// we had to overlook the error for now.
error!(err; "Failed to evaluate partition filter, skip partition filtering");
}
}
}
if mask.count_set_bits() == 0 {
Ok(None)
} else {
input.filter(&BooleanArray::from(mask).into())?;
Ok(Some(input))
}
}
/// Filters the input RecordBatch by the pushed down predicate and returns RecordBatch.
@@ -500,13 +561,42 @@ impl RangeBase {
input: RecordBatch,
skip_fields: bool,
) -> Result<Option<RecordBatch>> {
let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
let mut tag_decode_state = TagDecodeState::new();
let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
// If mask is None, the entire batch is filtered out
let Some(mask) = mask else {
let Some(mut mask) = mask else {
return Ok(None);
};
// Apply partition filter
if let Some(partition_filter) = &self.partition_filter {
let partition_result = self
.project_record_batch_for_pruning_flat(
&input,
&partition_filter.partition_schema,
&mut tag_decode_state,
)
.and_then(|record_batch| {
self.evaluate_partition_filter(&record_batch, partition_filter)
});
match partition_result {
Ok(partition_mask) => {
mask = mask.bitand(&partition_mask);
}
Err(err) => {
// FIXME(yingwen): due to a known bug, if partition expr include field column, and this field column is not in projection
// we will fail to evaluate partition filter since column is missing.
// we had to overlook the error for now.
error!(err; "Failed to evaluate partition filter, skip partition filtering");
}
}
}
if mask.count_set_bits() == 0 {
return Ok(None);
}
let filtered_batch =
datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
.context(ComputeArrowSnafu)?;
@@ -519,6 +609,7 @@ impl RangeBase {
}
/// Computes the filter mask for the input RecordBatch based on pushed down predicates.
/// If a partition expr filter is configured, it is applied later in `precise_filter_flat` but **NOT** in this function.
///
/// Returns `None` if the entire batch is filtered out, otherwise returns the boolean mask.
///
@@ -529,6 +620,7 @@ impl RangeBase {
&self,
input: &RecordBatch,
skip_fields: bool,
tag_decode_state: &mut TagDecodeState,
) -> Result<Option<BooleanBuffer>> {
let mut mask = BooleanBuffer::new_set(input.num_rows());
@@ -538,11 +630,7 @@ impl RangeBase {
.context(crate::error::UnexpectedSnafu {
reason: "Expected flat format for precise_filter_flat",
})?;
// Decodes primary keys once if we have any tag filters not in projection
let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
// Cache decoded tag arrays by column id to avoid redundant decoding
let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
let metadata = flat_format.metadata();
// Run filter one by one and combine them result
for filter_ctx in &self.filters {
@@ -567,47 +655,16 @@ impl RangeBase {
mask = mask.bitand(&result);
} else if filter_ctx.semantic_type() == SemanticType::Tag {
// Column not found in projection, it may be a tag column.
// Decodes primary keys if not already decoded.
if decoded_pks.is_none() {
decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
}
let metadata = flat_format.metadata();
let column_id = filter_ctx.column_id();
// Check cache first
let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
cached_column.clone()
} else {
// For dense encoding, we need pk_index. For sparse encoding, pk_index is None.
let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
None
} else {
metadata.primary_key_index(column_id)
};
let column_index = metadata.column_index_by_id(column_id);
if let (Some(column_index), Some(decoded)) =
(column_index, decoded_pks.as_ref())
{
let column_metadata = &metadata.column_metadatas[column_index];
let tag_column = decoded.get_tag_column(
column_id,
pk_index,
&column_metadata.column_schema.data_type,
)?;
// Cache the decoded tag column
decoded_tag_cache.insert(column_id, tag_column.clone());
tag_column
} else {
continue;
}
};
let result = filter
.evaluate_array(&tag_column)
.context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
if let Some(tag_column) =
self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
{
let result = filter
.evaluate_array(&tag_column)
.context(RecordBatchSnafu)?;
mask = mask.bitand(&result);
}
} else if filter_ctx.semantic_type() == SemanticType::Timestamp {
let time_index_pos = time_index_column_index(input.num_columns());
let column = &input.columns()[time_index_pos];
@@ -619,4 +676,218 @@ impl RangeBase {
Ok(Some(mask))
}
/// Returns the decoded tag column for `column_id`, or `None` if it's not a tag.
fn maybe_decode_tag_column(
&self,
metadata: &RegionMetadataRef,
column_id: ColumnId,
input: &RecordBatch,
tag_decode_state: &mut TagDecodeState,
) -> Result<Option<ArrayRef>> {
let Some(pk_index) = metadata.primary_key_index(column_id) else {
return Ok(None);
};
if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
return Ok(Some(cached_column.clone()));
}
if tag_decode_state.decoded_pks.is_none() {
tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
}
let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
None
} else {
Some(pk_index)
};
let Some(column_index) = metadata.column_index_by_id(column_id) else {
return Ok(None);
};
let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
return Ok(None);
};
let column_metadata = &metadata.column_metadatas[column_index];
let tag_column = decoded.get_tag_column(
column_id,
pk_index,
&column_metadata.column_schema.data_type,
)?;
tag_decode_state
.decoded_tag_cache
.insert(column_id, tag_column.clone());
Ok(Some(tag_column))
}
/// Evaluates the partition filter against the input `RecordBatch`.
fn evaluate_partition_filter(
&self,
record_batch: &RecordBatch,
partition_filter: &PartitionFilterContext,
) -> Result<BooleanBuffer> {
let columnar_value = partition_filter
.region_partition_physical_expr
.evaluate(record_batch)
.context(EvalPartitionFilterSnafu)?;
let array = columnar_value
.into_array(record_batch.num_rows())
.context(EvalPartitionFilterSnafu)?;
let boolean_array =
array
.as_any()
.downcast_ref::<BooleanArray>()
.context(UnexpectedSnafu {
reason: "Failed to downcast to BooleanArray".to_string(),
})?;
Ok(boolean_array.values().clone())
}
/// Builds a `RecordBatch` from the input `Batch` matching the given schema.
///
/// This is used for partition expression evaluation. The schema should only contain
/// the columns referenced by the partition expression to minimize overhead.
fn build_record_batch_for_pruning(
&self,
input: &mut Batch,
schema: &Arc<Schema>,
) -> Result<RecordBatch> {
let arrow_schema = schema.arrow_schema();
let mut columns = Vec::with_capacity(arrow_schema.fields().len());
// Decode primary key if necessary.
if input.pk_values().is_none() {
input.set_pk_values(
self.codec
.decode(input.primary_key())
.context(DecodeSnafu)?,
);
}
for field in arrow_schema.fields() {
let metadata = self.read_format.metadata();
let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
// Partition pruning schema should be a subset of the input batch schema.
let Some(column_id) = column_id else {
return UnexpectedSnafu {
reason: format!(
"Partition pruning schema expects column '{}' but it is missing in \
region metadata",
field.name()
),
}
.fail();
};
// 1. Check if it's a tag.
if let Some(pk_index) = metadata.primary_key_index(column_id) {
let pk_values = input.pk_values().unwrap();
let value = match pk_values {
CompositeValues::Dense(v) => &v[pk_index].1,
CompositeValues::Sparse(v) => v.get_or_null(column_id),
};
let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
let arrow_scalar = value
.try_to_scalar_value(&concrete_type)
.context(DataTypeMismatchSnafu)?;
let array = arrow_scalar
.to_array_of_size(input.num_rows())
.context(EvalPartitionFilterSnafu)?;
columns.push(array);
} else if metadata.time_index_column().column_id == column_id {
// 2. Check if it's the timestamp column.
columns.push(input.timestamps().to_arrow_array());
} else if let Some(field_index) = self
.read_format
.as_primary_key()
.and_then(|f| f.field_index_by_id(column_id))
{
// 3. Check if it's a field column.
columns.push(input.fields()[field_index].data.to_arrow_array());
} else {
return UnexpectedSnafu {
reason: format!(
"Partition pruning schema expects column '{}' (id {}) but it is not \
present in input batch",
field.name(),
column_id
),
}
.fail();
}
}
RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
}
/// Projects the input `RecordBatch` to match the given schema.
///
/// This is used for partition expression evaluation. The schema should only contain
/// the columns referenced by the partition expression to minimize overhead.
fn project_record_batch_for_pruning_flat(
&self,
input: &RecordBatch,
schema: &Arc<Schema>,
tag_decode_state: &mut TagDecodeState,
) -> Result<RecordBatch> {
let arrow_schema = schema.arrow_schema();
let mut columns = Vec::with_capacity(arrow_schema.fields().len());
let flat_format = self
.read_format
.as_flat()
.context(crate::error::UnexpectedSnafu {
reason: "Expected flat format for precise_filter_flat",
})?;
let metadata = flat_format.metadata();
for field in arrow_schema.fields() {
let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
let Some(column_id) = column_id else {
return UnexpectedSnafu {
reason: format!(
"Partition pruning schema expects column '{}' but it is missing in \
region metadata",
field.name()
),
}
.fail();
};
if let Some(idx) = flat_format.projected_index_by_id(column_id) {
columns.push(input.column(idx).clone());
continue;
}
if metadata.time_index_column().column_id == column_id {
let time_index_pos = time_index_column_index(input.num_columns());
columns.push(input.column(time_index_pos).clone());
continue;
}
if let Some(tag_column) =
self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
{
columns.push(tag_column);
continue;
}
return UnexpectedSnafu {
reason: format!(
"Partition pruning schema expects column '{}' (id {}) but it is not \
present in projected record batch",
field.name(),
column_id
),
}
.fail();
}
RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
}
}

View File

@@ -16,7 +16,7 @@
#[cfg(feature = "vector_index")]
use std::collections::BTreeSet;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -26,9 +26,11 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, tracing, warn};
use datafusion_expr::Expr;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::datatypes::Field;
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::DataType;
use mito_codec::row_converter::build_primary_key_codec;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection};
@@ -46,7 +48,7 @@ use crate::cache::index::result_cache::PredicateKey;
use crate::error::ApplyVectorIndexSnafu;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu,
ReadParquetSnafu, Result,
ReadParquetSnafu, Result, SerializePartitionExprSnafu,
};
use crate::metrics::{
PRECISE_FILTER_ROWS_TOTAL, READ_ROW_GROUPS_TOTAL, READ_ROWS_IN_ROW_GROUP_TOTAL,
@@ -67,7 +69,8 @@ use crate::sst::index::inverted_index::applier::{
#[cfg(feature = "vector_index")]
use crate::sst::index::vector_index::applier::VectorIndexApplierRef;
use crate::sst::parquet::file_range::{
FileRangeContext, FileRangeContextRef, PreFilterMode, RangeBase, row_group_contains_delete,
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
row_group_contains_delete,
};
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
use crate::sst::parquet::metadata::MetadataLoader;
@@ -75,6 +78,7 @@ use crate::sst::parquet::row_group::{InMemoryRowGroup, ParquetFetchMetrics};
use crate::sst::parquet::row_selection::RowGroupSelection;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY};
use crate::sst::tag_maybe_to_dictionary_field;
const INDEX_TYPE_FULLTEXT: &str = "fulltext";
const INDEX_TYPE_INVERTED: &str = "inverted";
@@ -438,6 +442,8 @@ impl ParquetReaderBuilder {
let codec = build_primary_key_codec(read_format.metadata());
let partition_filter = self.build_partition_filter(&read_format, &prune_schema)?;
let context = FileRangeContext::new(
reader_builder,
RangeBase {
@@ -449,6 +455,7 @@ impl ParquetReaderBuilder {
codec,
compat_batch: None,
pre_filter_mode: self.pre_filter_mode,
partition_filter,
},
);
@@ -457,6 +464,76 @@ impl ParquetReaderBuilder {
Ok((context, selection))
}
/// Compare partition expressions from expected metadata and file metadata,
/// and build a partition filter if they differ.
fn build_partition_filter(
&self,
read_format: &ReadFormat,
prune_schema: &Arc<datatypes::schema::Schema>,
) -> Result<Option<PartitionFilterContext>> {
let region_partition_expr_str = self
.expected_metadata
.as_ref()
.and_then(|meta| meta.partition_expr.as_ref());
let file_partition_expr_ref = self.file_handle.meta_ref().partition_expr.as_ref();
let Some(region_str) = region_partition_expr_str else {
return Ok(None);
};
let Some(region_partition_expr) = crate::region::parse_partition_expr(Some(region_str))?
else {
return Ok(None);
};
if Some(&region_partition_expr) == file_partition_expr_ref {
return Ok(None);
}
// Collect columns referenced by the partition expression.
let mut referenced_columns = HashSet::new();
region_partition_expr.collect_column_names(&mut referenced_columns);
// Build a partition_schema containing only referenced columns.
let is_flat = read_format.as_flat().is_some();
let partition_schema = Arc::new(datatypes::schema::Schema::new(
prune_schema
.column_schemas()
.iter()
.filter(|col| referenced_columns.contains(&col.name))
.map(|col| {
if is_flat
&& let Some(column_meta) = read_format.metadata().column_by_name(&col.name)
&& column_meta.semantic_type == SemanticType::Tag
&& col.data_type.is_string()
{
let field = Arc::new(Field::new(
&col.name,
col.data_type.as_arrow_type(),
col.is_nullable(),
));
let dict_field = tag_maybe_to_dictionary_field(&col.data_type, &field);
let mut column = col.clone();
column.data_type =
ConcreteDataType::from_arrow_type(dict_field.data_type());
return column;
}
col.clone()
})
.collect::<Vec<_>>(),
));
let region_partition_physical_expr = region_partition_expr
.try_as_physical_expr(partition_schema.arrow_schema())
.context(SerializePartitionExprSnafu)?;
Ok(Some(PartitionFilterContext {
region_partition_physical_expr,
partition_schema,
}))
}
/// Decodes region metadata from key value.
fn get_region_metadata(
file_path: &str,

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;
@@ -405,6 +406,24 @@ impl PartitionExpr {
..Default::default()
})
}
/// Collects all column names referenced by this expression.
pub fn collect_column_names(&self, columns: &mut HashSet<String>) {
Self::collect_operand_columns(&self.lhs, columns);
Self::collect_operand_columns(&self.rhs, columns);
}
fn collect_operand_columns(operand: &Operand, columns: &mut HashSet<String>) {
match operand {
Operand::Column(c) => {
columns.insert(c.clone());
}
Operand::Expr(e) => {
e.collect_column_names(columns);
}
Operand::Value(_) => {}
}
}
}
impl Display for PartitionExpr {
@@ -620,4 +639,45 @@ mod tests {
let expr5 = PartitionExpr::from_json_str(json).unwrap();
assert!(expr5.is_none());
}
#[test]
fn test_collect_column_names() {
// Simple expression: col_a = 1 should give {col_a}
let expr = col("a").eq(Value::Int64(1));
let mut columns = HashSet::new();
expr.collect_column_names(&mut columns);
assert_eq!(columns.len(), 1);
assert!(columns.contains("a"));
// Compound AND with same column: col_a >= 0 AND col_a < 10 should give {col_a}
let expr = col("a")
.gt_eq(Value::Int64(0))
.and(col("a").lt(Value::Int64(10)));
let mut columns = HashSet::new();
expr.collect_column_names(&mut columns);
assert_eq!(columns.len(), 1);
assert!(columns.contains("a"));
// Multiple columns: col_a >= 0 AND col_b < 10 should give {col_a, col_b}
let expr = col("a")
.gt_eq(Value::Int64(0))
.and(col("b").lt(Value::Int64(10)));
let mut columns = HashSet::new();
expr.collect_column_names(&mut columns);
assert_eq!(columns.len(), 2);
assert!(columns.contains("a"));
assert!(columns.contains("b"));
// Nested expression: (col_a >= 0 AND col_b < 10) AND col_c = 5
let expr = col("a")
.gt_eq(Value::Int64(0))
.and(col("b").lt(Value::Int64(10)))
.and(col("c").eq(Value::Int64(5)));
let mut columns = HashSet::new();
expr.collect_column_names(&mut columns);
assert_eq!(columns.len(), 3);
assert!(columns.contains("a"));
assert!(columns.contains("b"));
assert!(columns.contains("c"));
}
}