diff --git a/src/mito2/src/engine/staging_test.rs b/src/mito2/src/engine/staging_test.rs index d54f8e4780..2d5f57c6f2 100644 --- a/src/mito2/src/engine/staging_test.rs +++ b/src/mito2/src/engine/staging_test.rs @@ -50,6 +50,12 @@ fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr { .and(col(col_name).lt(Value::Int64(end))) } +fn float_range_expr(col_name: &str, start: f64, end: f64) -> PartitionExpr { + col(col_name) + .gt_eq(Value::Float64(start.into())) + .and(col(col_name).lt(Value::Float64(end.into()))) +} + #[tokio::test] async fn test_staging_state_integration() { test_staging_state_integration_with_format(false).await; @@ -388,7 +394,7 @@ async fn test_staging_exit_success_with_manifests() { async fn test_staging_exit_success_with_manifests_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); - let partition_expr = range_expr("field_0", 0, 100).as_json_str().unwrap(); + let partition_expr = float_range_expr("field_0", 0., 100.).as_json_str().unwrap(); let mut env = TestEnv::new().await; let engine = env .create_engine(MitoConfig { diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index bf75b15f12..52b706fa0e 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -529,22 +529,10 @@ impl RangeBase { // Apply partition filter if let Some(partition_filter) = &self.partition_filter { - let partition_result = self - .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema) - .and_then(|record_batch| { - self.evaluate_partition_filter(&record_batch, partition_filter) - }); - match partition_result { - Ok(partition_mask) => { - mask = mask.bitand(&partition_mask); - } - Err(err) => { - // FIXME(yingwen): due to a known bug, if partition expr include field column, and this field column is not in projection - // we will fail to evaluate partition filter since column is missing. - // we had to overlook the error for now. - error!(err; "Failed to evaluate partition filter, skip partition filtering"); - } - } + let record_batch = self + .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?; + let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?; + mask = mask.bitand(&partition_mask); } if mask.count_set_bits() == 0 { @@ -577,26 +565,13 @@ impl RangeBase { // Apply partition filter if let Some(partition_filter) = &self.partition_filter { - let partition_result = self - .project_record_batch_for_pruning_flat( - &input, - &partition_filter.partition_schema, - &mut tag_decode_state, - ) - .and_then(|record_batch| { - self.evaluate_partition_filter(&record_batch, partition_filter) - }); - match partition_result { - Ok(partition_mask) => { - mask = mask.bitand(&partition_mask); - } - Err(err) => { - // FIXME(yingwen): due to a known bug, if partition expr include field column, and this field column is not in projection - // we will fail to evaluate partition filter since column is missing. - // we had to overlook the error for now. - error!(err; "Failed to evaluate partition filter, skip partition filtering"); - } - } + let record_batch = self.project_record_batch_for_pruning_flat( + &input, + &partition_filter.partition_schema, + &mut tag_decode_state, + )?; + let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?; + mask = mask.bitand(&partition_mask); } if mask.count_set_bits() == 0 {