diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 05ba5dae25..a7798a7678 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -19,10 +19,11 @@ use std::time::Duration; use api::v1::helper::{row, tag_column_schema}; use api::v1::value::ValueData; -use api::v1::{ColumnDataType, Row, Rows, SemanticType}; +use api::v1::{ColumnDataType, Row, Rows, SemanticType, Value}; use common_error::ext::ErrorExt; use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions}; use common_recordbatch::RecordBatches; +use datafusion_expr::col; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions}; use store_api::metadata::ColumnMetadata; @@ -41,8 +42,8 @@ use crate::error; use crate::sst::FormatType; use crate::test_util::batch_util::sort_batches_and_print; use crate::test_util::{ - CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, flush_region, put_rows, - rows_schema, + CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, + column_metadata_to_column_schema, flush_region, put_rows, rows_schema, }; async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) { @@ -102,6 +103,54 @@ fn alter_column_fulltext_options() -> RegionAlterRequest { } } +fn add_nullable_field1() -> RegionAlterRequest { + RegionAlterRequest { + kind: AlterKind::AddColumns { + columns: vec![AddColumn { + column_metadata: ColumnMetadata { + column_schema: ColumnSchema::new( + "field_1", + ConcreteDataType::float64_datatype(), + true, + ), + semantic_type: SemanticType::Field, + column_id: 3, + }, + location: None, + }], + }, + } +} + +fn build_row_with_added_field( + metadata: &[ColumnMetadata], + tag_0: &str, + field_0: f64, + field_1: Option, + ts_millis: i64, +) -> Row { + let values = metadata + .iter() + .map(|column| match column.column_schema.name.as_str() { + "tag_0" => Value { + value_data: Some(ValueData::StringValue(tag_0.to_string())), + }, + "field_0" => Value { + value_data: Some(ValueData::F64Value(field_0)), + }, + "field_1" => Value { + value_data: field_1.map(ValueData::F64Value), + }, + "ts" => Value { + value_data: Some(ValueData::TimestampMillisecondValue(ts_millis)), + }, + name => panic!("unexpected column {name}"), + }) + .collect(); + + Row { values } +} + fn check_region_version( engine: &MitoEngine, region_id: RegionId, @@ -236,6 +285,105 @@ async fn test_alter_region_with_format(flat_format: bool) { check_region_version(&engine, region_id, 1, 3, 1, 3); } +#[tokio::test] +async fn test_filter_is_null_after_alter_add_field() { + test_filter_is_null_after_alter_add_field_with_format(false).await; + test_filter_is_null_after_alter_add_field_with_format(true).await; +} + +async fn test_filter_is_null_after_alter_add_field_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas, + rows: vec![build_rows_for_key("a", 0, 1, 1).into_iter().next().unwrap()], + }, + ) + .await; + flush_region(&engine, region_id, None).await; + + engine + .handle_request(region_id, RegionRequest::Alter(add_nullable_field1())) + .await + .unwrap(); + + let region = engine.get_region(region_id).unwrap(); + let metadata = region.metadata().column_metadatas.clone(); + let schema = metadata + .iter() + .map(column_metadata_to_column_schema) + .collect(); + + put_rows( + &engine, + region_id, + Rows { + schema, + rows: vec![build_row_with_added_field( + &metadata, + "a", + 1.0, + Some(10.0), + 0, + )], + }, + ) + .await; + flush_region(&engine, region_id, None).await; + + // We skip field filters under merge mode because the flushed field values may be stale before + // the row is merged with newer field data. + let stream = engine + .scan_to_stream( + region_id, + ScanRequest { + filters: vec![col("field_1").is_null()], + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+---------+ +| tag_0 | field_0 | ts | field_1 | ++-------+---------+---------------------+---------+ +| a | 1.0 | 1970-01-01T00:00:00 | 10.0 | ++-------+---------+---------------------+---------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} + /// Build rows with schema (string, f64, ts_millis, string). fn build_rows_for_tags( tag0: &str, diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index 497583b8bc..319410c505 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::Rows; +use api::v1::value::ValueData; +use api::v1::{Row, Rows, Value}; use common_recordbatch::RecordBatches; +use datafusion_expr::{col, lit}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -24,6 +26,22 @@ use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows, delete_rows, flush_region, put_rows, rows_schema, }; +fn build_row_with_nullable_field(key: &str, field_0: Option, ts_millis: i64) -> Row { + Row { + values: vec![ + Value { + value_data: Some(ValueData::StringValue(key.to_string())), + }, + Value { + value_data: field_0.map(ValueData::F64Value), + }, + Value { + value_data: Some(ValueData::TimestampMillisecondValue(ts_millis)), + }, + ], + } +} + #[tokio::test] async fn test_scan_without_filtering_deleted() { test_scan_without_filtering_deleted_with_format(false).await; @@ -121,3 +139,84 @@ async fn test_scan_without_filtering_deleted_with_format(flat_format: bool) { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_filter_field_value_after_last_row_update() { + test_filter_field_value_after_last_row_update_with_format(false).await; + test_filter_field_value_after_last_row_update_with_format(true).await; +} + +async fn test_filter_field_value_after_last_row_update_with_format(flat_format: bool) { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_flat_format: flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: vec![build_row_with_nullable_field("a", Some(10.0), 0)], + }, + ) + .await; + flush_region(&engine, region_id, None).await; + + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas, + rows: vec![build_row_with_nullable_field("a", Some(20.0), 0)], + }, + ) + .await; + flush_region(&engine, region_id, None).await; + + // We skip field filters under merge mode because the flushed field values may be stale before + // the last-row update is merged. + let stream = engine + .scan_to_stream( + region_id, + ScanRequest { + filters: vec![col("field_0").eq(lit(10.0))], + ..Default::default() + }, + ) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 20.0 | 1970-01-01T00:00:00 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 599547ec8d..a64fcc90ea 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -138,7 +138,10 @@ async fn test_prune_tag_and_field() { async fn test_prune_tag_and_field_with_format(flat_format: bool) { common_telemetry::init_default_ut_logging(); - // prune result: only row group 1 + // Tag filter prunes to row group 1 (tags "5".."9"). The field filter is + // intentionally not applied inside the mito reader (see `PreFilterMode::SkipFields` + // for non-append merge modes — DataFusion re-applies it above the engine), so all + // rows in the surviving row group are returned. check_prune_row_groups( vec![ col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))), @@ -151,6 +154,8 @@ async fn test_prune_tag_and_field_with_format(flat_format: bool) { | 5 | 5.0 | 1970-01-01T00:00:05 | | 6 | 6.0 | 1970-01-01T00:00:06 | | 7 | 7.0 | 1970-01-01T00:00:07 | +| 8 | 8.0 | 1970-01-01T00:00:08 | +| 9 | 9.0 | 1970-01-01T00:00:09 | +-------+---------+---------------------+", flat_format, ) @@ -443,7 +448,10 @@ async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) { ) .await; - // Scans and filter fields, the field should be deleted. + // Scans and filters by a field value. The mito reader skips field filters under + // `PreFilterMode::SkipFields` (DataFusion re-applies them above the engine), so + // the returned batches still contain all non-deleted rows — the reader's job here + // is only to ensure the delete op is honored. let request = ScanRequest { filters: vec![col("field_0").eq(lit(3.0f64))], ..Default::default() @@ -454,10 +462,12 @@ async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) { .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ -+-------+---------+----+ -| tag_0 | field_0 | ts | -+-------+---------+----+ -+-------+---------+----+"; ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 4 | 4.0 | 1970-01-01T00:00:04 | ++-------+---------+---------------------+"; assert_eq!( expected, sort_batches_and_print(&batches, &["tag_0", "field_0", "ts"]) diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 986e9409ee..826474df05 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -59,7 +59,6 @@ use crate::memtable::time_series::{ValueBuilder, Values}; use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats}; use crate::sst::SeriesEstimator; use crate::sst::index::IndexOutput; -use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete}; use crate::sst::parquet::flat_format::primary_key_column_index; use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder}; use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo}; @@ -1028,9 +1027,8 @@ impl EncodedBulkPart { sequence: Option, mem_scan_metrics: Option, ) -> Result> { - // Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs. - let skip_fields_for_pruning = - Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata); + // Compute skip_fields for row group pruning from the configured pre-filter mode. + let skip_fields_for_pruning = context.pre_filter_mode().skip_fields(); // use predicate to find row groups to read. let row_groups_to_read = @@ -1050,20 +1048,6 @@ impl EncodedBulkPart { )?; Ok(Some(Box::new(iter) as BoxedRecordBatchIterator)) } - - /// Computes whether to skip field columns based on PreFilterMode. - fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool { - match pre_filter_mode { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => { - // Check if any row group contains delete op - (0..parquet_meta.num_row_groups()).any(|rg_idx| { - row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true) - }) - } - } - } } // TODO(yingwen): max_sequence diff --git a/src/mito2/src/memtable/bulk/part_reader.rs b/src/mito2/src/memtable/bulk/part_reader.rs index a9caeef08c..cea3304154 100644 --- a/src/mito2/src/memtable/bulk/part_reader.rs +++ b/src/mito2/src/memtable/bulk/part_reader.rs @@ -29,7 +29,7 @@ use crate::memtable::bulk::part::EncodedBulkPart; use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder; use crate::memtable::{MemScanMetrics, MemScanMetricsData}; use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; -use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState}; +use crate::sst::parquet::file_range::TagDecodeState; use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index}; use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key}; @@ -78,7 +78,7 @@ impl EncodedBulkPartIter { let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() { Some(first_row_group) => { - let skip_fields = builder.compute_skip_fields(&context, first_row_group); + let skip_fields = context.pre_filter_mode().skip_fields(); let reader = builder.build_row_group_reader(first_row_group, None)?; (Some(reader), skip_fields) } @@ -140,9 +140,7 @@ impl EncodedBulkPartIter { // Previous row group exhausted, read next row group while let Some(next_row_group) = self.row_groups_to_read.pop_front() { // Compute skip_fields for this row group - self.current_skip_fields = self - .builder - .compute_skip_fields(&self.context, next_row_group); + self.current_skip_fields = self.context.pre_filter_mode().skip_fields(); let next_reader = self.builder.build_row_group_reader(next_row_group, None)?; let current = self.current_reader.insert(next_reader); @@ -299,11 +297,7 @@ impl BulkPartBatchIter { let projected_batch = self.apply_projection(record_batch)?; // Apply combined filtering (both predicate and sequence filters) - let skip_fields = match self.context.pre_filter_mode() { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => true, - }; + let skip_fields = self.context.pre_filter_mode().skip_fields(); let Some(filtered_batch) = apply_combined_filters( &self.context, diff --git a/src/mito2/src/memtable/bulk/row_group_reader.rs b/src/mito2/src/memtable/bulk/row_group_reader.rs index 40a5b2f85d..36e8e8aac1 100644 --- a/src/mito2/src/memtable/bulk/row_group_reader.rs +++ b/src/mito2/src/memtable/bulk/row_group_reader.rs @@ -31,7 +31,6 @@ use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; pub(crate) struct MemtableRowGroupReaderBuilder { projection: ProjectionMask, - parquet_metadata: Arc, arrow_metadata: ArrowReaderMetadata, data: Bytes, } @@ -51,7 +50,6 @@ impl MemtableRowGroupReaderBuilder { .context(ReadDataPartSnafu)?; Ok(Self { projection, - parquet_metadata, arrow_metadata, data, }) @@ -79,23 +77,4 @@ impl MemtableRowGroupReaderBuilder { builder.build().context(ReadDataPartSnafu) } - - /// Computes whether to skip field filters for a specific row group based on PreFilterMode. - pub(crate) fn compute_skip_fields( - &self, - context: &BulkIterContextRef, - row_group_idx: usize, - ) -> bool { - use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete}; - - match context.pre_filter_mode() { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => { - // Check if this specific row group contains delete op - row_group_contains_delete(&self.parquet_metadata, row_group_idx, "memtable") - .unwrap_or(true) - } - } - } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 99b59ae577..2579183de8 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -1296,7 +1296,7 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode { } match merge_mode { - MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete, + MergeMode::LastRow => PreFilterMode::SkipFields, MergeMode::LastNonNull => PreFilterMode::SkipFields, } } diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index bf86e4a764..384b4f54f2 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -146,7 +146,7 @@ impl FileRange { std::slice::from_ref(curr_row_group), read_format, self.context.base.expected_metadata.clone(), - self.compute_skip_fields(), + self.context.base.pre_filter_mode.skip_fields(), ); // not costly to create a predicate here since dynamic filters are wrapped in Arc @@ -158,22 +158,6 @@ impl FileRange { .unwrap_or(true) // unexpected, not skip just in case } - fn compute_skip_fields(&self) -> bool { - match self.context.base.pre_filter_mode { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => { - // Check if this specific row group contains delete op - row_group_contains_delete( - self.context.reader_builder.parquet_metadata(), - self.row_group_idx, - self.context.reader_builder.file_path(), - ) - .unwrap_or(true) - } - } - } - /// Returns a reader to read the [FileRange]. #[allow(dead_code)] pub(crate) async fn reader( @@ -185,7 +169,7 @@ impl FileRange { return Ok(None); } // Compute skip_fields once for this row group - let skip_fields = self.context.should_skip_fields(self.row_group_idx); + let skip_fields = self.context.base.pre_filter_mode.skip_fields(); let parquet_reader = self .context .reader_builder @@ -247,7 +231,7 @@ impl FileRange { return Ok(None); } // Compute skip_fields once for this row group - let skip_fields = self.context.should_skip_fields(self.row_group_idx); + let skip_fields = self.context.base.pre_filter_mode.skip_fields(); let parquet_reader = self .context .reader_builder @@ -404,16 +388,8 @@ impl FileRangeContext { ) } - /// Determines whether to skip field filters based on PreFilterMode and row group delete status. - pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool { - match self.base.pre_filter_mode { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => { - // Check if this specific row group contains delete op - self.contains_delete(row_group_idx).unwrap_or(true) - } - } + pub(crate) fn pre_filter_mode(&self) -> PreFilterMode { + self.base.pre_filter_mode } //// Decodes parquet metadata and finds if row group contains delete op. @@ -451,13 +427,16 @@ impl FileRangeContext { pub enum PreFilterMode { /// Filters all columns. All, - /// If the range doesn't contain delete op or doesn't have statistics, filters all columns. - /// Otherwise, skips filtering fields. - SkipFieldsOnDelete, /// Always skip fields. SkipFields, } +impl PreFilterMode { + pub(crate) fn skip_fields(self) -> bool { + matches!(self, Self::SkipFields) + } +} + /// Context for partition expression filtering. pub(crate) struct PartitionFilterContext { pub(crate) region_partition_physical_expr: Arc, @@ -514,7 +493,7 @@ impl RangeBase { /// /// # Arguments /// * `input` - The batch to filter - /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status + /// * `skip_fields` - Whether to skip field filters based on PreFilterMode pub(crate) fn precise_filter( &self, mut input: Batch, @@ -626,7 +605,7 @@ impl RangeBase { /// /// # Arguments /// * `input` - The RecordBatch to filter - /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status + /// * `skip_fields` - Whether to skip field filters based on PreFilterMode pub(crate) fn precise_filter_flat( &self, input: RecordBatch, @@ -679,7 +658,7 @@ impl RangeBase { /// /// # Arguments /// * `input` - The RecordBatch to compute mask for - /// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status + /// * `skip_fields` - Whether to skip field filters based on PreFilterMode pub(crate) fn compute_filter_mask_flat( &self, input: &RecordBatch, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 6fdbb6f243..bcc20542c3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -72,7 +72,6 @@ use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; use crate::sst::parquet::async_reader::SstAsyncFileReader; use crate::sst::parquet::file_range::{ FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase, - row_group_contains_delete, }; use crate::sst::parquet::format::{ReadFormat, need_override_sequence}; use crate::sst::parquet::metadata::MetadataLoader; @@ -678,7 +677,7 @@ impl ParquetReaderBuilder { metrics.rows_total += num_rows as usize; // Compute skip_fields once for all pruning operations - let skip_fields = self.compute_skip_fields(parquet_meta); + let skip_fields = self.pre_filter_mode.skip_fields(); let mut output = self.row_groups_by_minmax( read_format, @@ -1114,25 +1113,6 @@ impl ParquetReaderBuilder { pruned } - /// Computes whether to skip field columns when building statistics based on PreFilterMode. - fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool { - match self.pre_filter_mode { - PreFilterMode::All => false, - PreFilterMode::SkipFields => true, - PreFilterMode::SkipFieldsOnDelete => { - // Check if any row group contains delete op - let file_path = self.file_handle.file_path(&self.table_dir, self.path_type); - (0..parquet_meta.num_row_groups()).any(|rg_idx| { - row_group_contains_delete(parquet_meta, rg_idx, &file_path) - .inspect_err(|e| { - warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields"); - }) - .unwrap_or(false) - }) - } - } - } - /// Computes row groups selection after min-max pruning. fn row_groups_by_minmax( &self, @@ -1957,7 +1937,7 @@ impl ParquetReader { return Ok(None); }; - let skip_fields = self.context.should_skip_fields(row_group_idx); + let skip_fields = self.context.pre_filter_mode().skip_fields(); let parquet_reader = self .context .reader_builder() @@ -1990,7 +1970,7 @@ impl ParquetReader { debug_assert!(context.read_format().as_flat().is_some()); let fetch_metrics = ParquetFetchMetrics::default(); let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() { - let skip_fields = context.should_skip_fields(row_group_idx); + let skip_fields = context.pre_filter_mode().skip_fields(); let parquet_reader = context .reader_builder() .build(context.build_context( diff --git a/tests/cases/standalone/common/alter/prefilter_last_row_null.result b/tests/cases/standalone/common/alter/prefilter_last_row_null.result new file mode 100644 index 0000000000..00aac5d9db --- /dev/null +++ b/tests/cases/standalone/common/alter/prefilter_last_row_null.result @@ -0,0 +1,69 @@ +CREATE TABLE prefilter_last_row_null( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito; + +Affected Rows: 0 + +INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, NULL); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('prefilter_last_row_null'); + ++----------------------------------------------+ +| ADMIN FLUSH_TABLE('prefilter_last_row_null') | ++----------------------------------------------+ +| 0 | ++----------------------------------------------+ + +INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, 10.0); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('prefilter_last_row_null'); + ++----------------------------------------------+ +| ADMIN FLUSH_TABLE('prefilter_last_row_null') | ++----------------------------------------------+ +| 0 | ++----------------------------------------------+ + +SELECT COUNT(*) FROM prefilter_last_row_null WHERE cpu IS NULL; + ++----------+ +| count(*) | ++----------+ +| 0 | ++----------+ + +ALTER TABLE prefilter_last_row_null ADD COLUMN memory DOUBLE NULL; + +Affected Rows: 0 + +INSERT INTO prefilter_last_row_null(host, ts, cpu, memory) VALUES ('host1', 0, 10.0, 20.0); + +Affected Rows: 1 + +ADMIN FLUSH_TABLE('prefilter_last_row_null'); + ++----------------------------------------------+ +| ADMIN FLUSH_TABLE('prefilter_last_row_null') | ++----------------------------------------------+ +| 0 | ++----------------------------------------------+ + +SELECT COUNT(*) FROM prefilter_last_row_null WHERE memory IS NULL; + ++----------+ +| count(*) | ++----------+ +| 0 | ++----------+ + +DROP TABLE prefilter_last_row_null; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/prefilter_last_row_null.sql b/tests/cases/standalone/common/alter/prefilter_last_row_null.sql new file mode 100644 index 0000000000..d92dbf7829 --- /dev/null +++ b/tests/cases/standalone/common/alter/prefilter_last_row_null.sql @@ -0,0 +1,26 @@ +CREATE TABLE prefilter_last_row_null( + host STRING, + ts TIMESTAMP TIME INDEX, + cpu DOUBLE, + PRIMARY KEY(host) +) ENGINE=mito; + +INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, NULL); + +ADMIN FLUSH_TABLE('prefilter_last_row_null'); + +INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, 10.0); + +ADMIN FLUSH_TABLE('prefilter_last_row_null'); + +SELECT COUNT(*) FROM prefilter_last_row_null WHERE cpu IS NULL; + +ALTER TABLE prefilter_last_row_null ADD COLUMN memory DOUBLE NULL; + +INSERT INTO prefilter_last_row_null(host, ts, cpu, memory) VALUES ('host1', 0, 10.0, 20.0); + +ADMIN FLUSH_TABLE('prefilter_last_row_null'); + +SELECT COUNT(*) FROM prefilter_last_row_null WHERE memory IS NULL; + +DROP TABLE prefilter_last_row_null;