diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index e8241a453f..e8dbca04fb 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -36,7 +36,9 @@ use crate::read::Batch; use crate::row_converter::{CompositeValues, PrimaryKeyCodec}; use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; -use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; +use crate::sst::parquet::reader::{ + MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext, +}; /// A range of a parquet SST. Now it is a row group. /// We can read different file ranges in parallel. @@ -255,8 +257,15 @@ impl RangeBase { // Run filter one by one and combine them result // TODO(ruihang): run primary key filter first. It may short circuit other filters - for filter in &self.filters { - let result = match filter.semantic_type() { + for filter_ctx in &self.filters { + let filter = match filter_ctx.filter() { + MaybeFilter::Filter(f) => f, + // Column matches. + MaybeFilter::Matched => continue, + // Column doesn't match, filter the entire batch. + MaybeFilter::Pruned => return Ok(None), + }; + let result = match filter_ctx.semantic_type() { SemanticType::Tag => { let pk_values = if let Some(pk_values) = input.pk_values() { pk_values @@ -270,21 +279,20 @@ impl RangeBase { let pk_index = self .read_format .metadata() - .primary_key_index(filter.column_id()) + .primary_key_index(filter_ctx.column_id()) .unwrap(); v[pk_index] .1 - .try_to_scalar_value(filter.data_type()) + .try_to_scalar_value(filter_ctx.data_type()) .context(FieldTypeMismatchSnafu)? } CompositeValues::Sparse(v) => { - let v = v.get_or_null(filter.column_id()); - v.try_to_scalar_value(filter.data_type()) + let v = v.get_or_null(filter_ctx.column_id()); + v.try_to_scalar_value(filter_ctx.data_type()) .context(FieldTypeMismatchSnafu)? } }; if filter - .filter() .evaluate_scalar(&pk_value) .context(FilterRecordBatchSnafu)? { @@ -295,18 +303,17 @@ impl RangeBase { } } SemanticType::Field => { - let Some(field_index) = self.read_format.field_index_by_id(filter.column_id()) + let Some(field_index) = + self.read_format.field_index_by_id(filter_ctx.column_id()) else { continue; }; let field_col = &input.fields()[field_index].data; filter - .filter() .evaluate_vector(field_col) .context(FilterRecordBatchSnafu)? } SemanticType::Timestamp => filter - .filter() .evaluate_vector(input.timestamps()) .context(FilterRecordBatchSnafu)?, }; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 5c2ab17591..ffa5c5d003 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -34,7 +34,7 @@ use parquet::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask} use parquet::file::metadata::ParquetMetaData; use parquet::format::KeyValue; use snafu::{OptionExt, ResultExt}; -use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -191,6 +191,7 @@ impl ParquetReaderBuilder { let file_path = self.file_handle.file_path(&self.file_dir); let file_size = self.file_handle.meta_ref().file_size; + // Loads parquet metadata of the file. let parquet_meta = self.read_parquet_metadata(&file_path, file_size).await?; // Decodes region metadata. @@ -550,11 +551,17 @@ impl ParquetReaderBuilder { let row_groups = parquet_meta.row_groups(); let stats = RowGroupPruningStats::new(row_groups, read_format, self.expected_metadata.clone()); + let prune_schema = self + .expected_metadata + .as_ref() + .map(|meta| meta.schema.arrow_schema()) + .unwrap_or_else(|| region_meta.schema.arrow_schema()); + // Here we use the schema of the SST to build the physical expression. If the column // in the SST doesn't have the same column id as the column in the expected metadata, // we will get a None statistics for that column. let res = predicate - .prune_with_stats(&stats, region_meta.schema.arrow_schema()) + .prune_with_stats(&stats, prune_schema) .iter() .zip(0..parquet_meta.num_row_groups()) .filter_map(|(mask, row_group)| { @@ -1009,10 +1016,20 @@ impl ReaderState { } } -/// Context to evaluate the column filter. +/// The filter to evaluate or the prune result of the default value. +pub(crate) enum MaybeFilter { + /// The filter to evaluate. + Filter(SimpleFilterEvaluator), + /// The filter matches the default value. + Matched, + /// The filter is pruned. + Pruned, +} + +/// Context to evaluate the column filter for a parquet file. pub(crate) struct SimpleFilterContext { /// Filter to evaluate. - filter: SimpleFilterEvaluator, + filter: MaybeFilter, /// Id of the column to evaluate. column_id: ColumnId, /// Semantic type of the column. @@ -1032,22 +1049,38 @@ impl SimpleFilterContext { expr: &Expr, ) -> Option { let filter = SimpleFilterEvaluator::try_new(expr)?; - let column_metadata = match expected_meta { + let (column_metadata, maybe_filter) = match expected_meta { Some(meta) => { // Gets the column metadata from the expected metadata. let column = meta.column_by_name(filter.column_name())?; // Checks if the column is present in the SST metadata. We still uses the // column from the expected metadata. - let sst_column = sst_meta.column_by_id(column.column_id)?; - debug_assert_eq!(column.semantic_type, sst_column.semantic_type); + match sst_meta.column_by_id(column.column_id) { + Some(sst_column) => { + debug_assert_eq!(column.semantic_type, sst_column.semantic_type); - column + (column, MaybeFilter::Filter(filter)) + } + None => { + // If the column is not present in the SST metadata, we evaluate the filter + // against the default value of the column. + // If we can't evaluate the filter, we return None. + if pruned_by_default(&filter, column)? { + (column, MaybeFilter::Pruned) + } else { + (column, MaybeFilter::Matched) + } + } + } + } + None => { + let column = sst_meta.column_by_name(filter.column_name())?; + (column, MaybeFilter::Filter(filter)) } - None => sst_meta.column_by_name(filter.column_name())?, }; Some(Self { - filter, + filter: maybe_filter, column_id: column_metadata.column_id, semantic_type: column_metadata.semantic_type, data_type: column_metadata.column_schema.data_type.clone(), @@ -1055,7 +1088,7 @@ impl SimpleFilterContext { } /// Returns the filter to evaluate. - pub(crate) fn filter(&self) -> &SimpleFilterEvaluator { + pub(crate) fn filter(&self) -> &MaybeFilter { &self.filter } @@ -1075,6 +1108,17 @@ impl SimpleFilterContext { } } +/// Prune a column by its default value. +/// Returns false if we can't create the default value or evaluate the filter. +fn pruned_by_default(filter: &SimpleFilterEvaluator, column: &ColumnMetadata) -> Option { + let value = column.column_schema.create_default().ok().flatten()?; + let scalar_value = value + .try_to_scalar_value(&column.column_schema.data_type) + .ok()?; + let matches = filter.evaluate_scalar(&scalar_value).ok()?; + Some(!matches) +} + type RowGroupMap = BTreeMap>; /// Parquet batch reader to read our SST format. diff --git a/src/mito2/src/sst/parquet/stats.rs b/src/mito2/src/sst/parquet/stats.rs index 09b837698c..ead3679397 100644 --- a/src/mito2/src/sst/parquet/stats.rs +++ b/src/mito2/src/sst/parquet/stats.rs @@ -16,10 +16,11 @@ use std::borrow::Borrow; use std::collections::HashSet; +use std::sync::Arc; use datafusion::physical_optimizer::pruning::PruningStatistics; use datafusion_common::{Column, ScalarValue}; -use datatypes::arrow::array::{ArrayRef, BooleanArray}; +use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array}; use parquet::file::metadata::RowGroupMetaData; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; @@ -54,25 +55,62 @@ impl<'a, T> RowGroupPruningStats<'a, T> { } /// Returns the column id of specific column name if we need to read it. + /// Prefers the column id in the expected metadata if it exists. fn column_id_to_prune(&self, name: &str) -> Option { let metadata = self .expected_metadata .as_ref() .unwrap_or_else(|| self.read_format.metadata()); - // Only use stats when the column to read has the same id as the column in the SST. metadata.column_by_name(name).map(|col| col.column_id) } + + /// Returns the default value of all row groups for `column` according to the metadata. + fn compat_default_value(&self, column: &str) -> Option { + let metadata = self.expected_metadata.as_ref()?; + let col_metadata = metadata.column_by_name(column)?; + col_metadata + .column_schema + .create_default_vector(self.row_groups.len()) + .unwrap_or(None) + .map(|vector| vector.to_arrow_array()) + } +} + +impl> RowGroupPruningStats<'_, T> { + /// Returns the null count of all row groups for `column` according to the metadata. + fn compat_null_count(&self, column: &str) -> Option { + let metadata = self.expected_metadata.as_ref()?; + let col_metadata = metadata.column_by_name(column)?; + let value = col_metadata + .column_schema + .create_default() + .unwrap_or(None)?; + let values = self.row_groups.iter().map(|meta| { + if value.is_null() { + u64::try_from(meta.borrow().num_rows()).ok() + } else { + Some(0) + } + }); + Some(Arc::new(UInt64Array::from_iter(values))) + } } impl> PruningStatistics for RowGroupPruningStats<'_, T> { fn min_values(&self, column: &Column) -> Option { let column_id = self.column_id_to_prune(&column.name)?; - self.read_format.min_values(self.row_groups, column_id) + match self.read_format.min_values(self.row_groups, column_id) { + Some(values) => Some(values), + None => self.compat_default_value(&column.name), + } } fn max_values(&self, column: &Column) -> Option { let column_id = self.column_id_to_prune(&column.name)?; - self.read_format.max_values(self.row_groups, column_id) + match self.read_format.max_values(self.row_groups, column_id) { + Some(values) => Some(values), + None => self.compat_default_value(&column.name), + } } fn num_containers(&self) -> usize { @@ -80,7 +118,9 @@ impl> PruningStatistics for RowGroupPruningStats<'_, } fn null_counts(&self, column: &Column) -> Option { - let column_id = self.column_id_to_prune(&column.name)?; + let Some(column_id) = self.column_id_to_prune(&column.name) else { + return self.compat_null_count(&column.name); + }; self.read_format.null_counts(self.row_groups, column_id) } diff --git a/tests/cases/standalone/common/alter/add_col_default.result b/tests/cases/standalone/common/alter/add_col_default.result index 6d8c523ba3..5a9baf7186 100644 --- a/tests/cases/standalone/common/alter/add_col_default.result +++ b/tests/cases/standalone/common/alter/add_col_default.result @@ -6,19 +6,56 @@ INSERT INTO test VALUES (1, 1), (2, 2); Affected Rows: 2 +ADMIN FLUSH_TABLE('test'); + ++---------------------------+ +| ADMIN FLUSH_TABLE('test') | ++---------------------------+ +| 0 | ++---------------------------+ + +ALTER TABLE test MODIFY COLUMN i SET INVERTED INDEX; + +Affected Rows: 0 + +INSERT INTO test VALUES (3, 3), (4, 4); + +Affected Rows: 2 + ALTER TABLE test ADD COLUMN k INTEGER DEFAULT 3; Affected Rows: 0 -SELECT * FROM test; +SELECT * FROM test order by j; +---+-------------------------+---+ | i | j | k | +---+-------------------------+---+ | 1 | 1970-01-01T00:00:00.001 | 3 | | 2 | 1970-01-01T00:00:00.002 | 3 | +| 3 | 1970-01-01T00:00:00.003 | 3 | +| 4 | 1970-01-01T00:00:00.004 | 3 | +---+-------------------------+---+ +SELECT * FROM test where k != 3; + +++ +++ + +ALTER TABLE test ADD COLUMN host STRING DEFAULT '' PRIMARY KEY; + +Affected Rows: 0 + +SELECT * FROM test where host != ''; + +++ +++ + +SELECT * FROM test where host != '' AND i = 3; + +++ +++ + DROP TABLE test; Affected Rows: 0 diff --git a/tests/cases/standalone/common/alter/add_col_default.sql b/tests/cases/standalone/common/alter/add_col_default.sql index 187e96821e..258ec1683a 100644 --- a/tests/cases/standalone/common/alter/add_col_default.sql +++ b/tests/cases/standalone/common/alter/add_col_default.sql @@ -2,8 +2,22 @@ CREATE TABLE test(i INTEGER, j TIMESTAMP TIME INDEX); INSERT INTO test VALUES (1, 1), (2, 2); +ADMIN FLUSH_TABLE('test'); + +ALTER TABLE test MODIFY COLUMN i SET INVERTED INDEX; + +INSERT INTO test VALUES (3, 3), (4, 4); + ALTER TABLE test ADD COLUMN k INTEGER DEFAULT 3; -SELECT * FROM test; +SELECT * FROM test order by j; + +SELECT * FROM test where k != 3; + +ALTER TABLE test ADD COLUMN host STRING DEFAULT '' PRIMARY KEY; + +SELECT * FROM test where host != ''; + +SELECT * FROM test where host != '' AND i = 3; DROP TABLE test;