mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-31 12:20:38 +00:00
fix: always skip field pruning when using merge mode (#7957)
* test: add prefilter regressions for last_row null filters Signed-off-by: evenyag <realevenyag@gmail.com> * fix: skip fields in all merge mode Signed-off-by: evenyag <realevenyag@gmail.com> * refactor: simplify pre-filter skip fields handling Signed-off-by: evenyag <realevenyag@gmail.com> * test: update test Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -19,10 +19,11 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::helper::{row, tag_column_schema};
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, Row, Rows, SemanticType};
|
||||
use api::v1::{ColumnDataType, Row, Rows, SemanticType, Value};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::ddl::utils::{parse_column_metadatas, parse_manifest_infos_from_extensions};
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datafusion_expr::col;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions};
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
@@ -41,8 +42,8 @@ use crate::error;
|
||||
use crate::sst::FormatType;
|
||||
use crate::test_util::batch_util::sort_batches_and_print;
|
||||
use crate::test_util::{
|
||||
CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, flush_region, put_rows,
|
||||
rows_schema,
|
||||
CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key,
|
||||
column_metadata_to_column_schema, flush_region, put_rows, rows_schema,
|
||||
};
|
||||
|
||||
async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
|
||||
@@ -102,6 +103,54 @@ fn alter_column_fulltext_options() -> RegionAlterRequest {
|
||||
}
|
||||
}
|
||||
|
||||
fn add_nullable_field1() -> RegionAlterRequest {
|
||||
RegionAlterRequest {
|
||||
kind: AlterKind::AddColumns {
|
||||
columns: vec![AddColumn {
|
||||
column_metadata: ColumnMetadata {
|
||||
column_schema: ColumnSchema::new(
|
||||
"field_1",
|
||||
ConcreteDataType::float64_datatype(),
|
||||
true,
|
||||
),
|
||||
semantic_type: SemanticType::Field,
|
||||
column_id: 3,
|
||||
},
|
||||
location: None,
|
||||
}],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn build_row_with_added_field(
|
||||
metadata: &[ColumnMetadata],
|
||||
tag_0: &str,
|
||||
field_0: f64,
|
||||
field_1: Option<f64>,
|
||||
ts_millis: i64,
|
||||
) -> Row {
|
||||
let values = metadata
|
||||
.iter()
|
||||
.map(|column| match column.column_schema.name.as_str() {
|
||||
"tag_0" => Value {
|
||||
value_data: Some(ValueData::StringValue(tag_0.to_string())),
|
||||
},
|
||||
"field_0" => Value {
|
||||
value_data: Some(ValueData::F64Value(field_0)),
|
||||
},
|
||||
"field_1" => Value {
|
||||
value_data: field_1.map(ValueData::F64Value),
|
||||
},
|
||||
"ts" => Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts_millis)),
|
||||
},
|
||||
name => panic!("unexpected column {name}"),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Row { values }
|
||||
}
|
||||
|
||||
fn check_region_version(
|
||||
engine: &MitoEngine,
|
||||
region_id: RegionId,
|
||||
@@ -236,6 +285,105 @@ async fn test_alter_region_with_format(flat_format: bool) {
|
||||
check_region_version(&engine, region_id, 1, 3, 1, 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_filter_is_null_after_alter_add_field() {
|
||||
test_filter_is_null_after_alter_add_field_with_format(false).await;
|
||||
test_filter_is_null_after_alter_add_field_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_filter_is_null_after_alter_add_field_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas,
|
||||
rows: vec![build_rows_for_key("a", 0, 1, 1).into_iter().next().unwrap()],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Alter(add_nullable_field1()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let region = engine.get_region(region_id).unwrap();
|
||||
let metadata = region.metadata().column_metadatas.clone();
|
||||
let schema = metadata
|
||||
.iter()
|
||||
.map(column_metadata_to_column_schema)
|
||||
.collect();
|
||||
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema,
|
||||
rows: vec![build_row_with_added_field(
|
||||
&metadata,
|
||||
"a",
|
||||
1.0,
|
||||
Some(10.0),
|
||||
0,
|
||||
)],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
// We skip field filters under merge mode because the flushed field values may be stale before
|
||||
// the row is merged with newer field data.
|
||||
let stream = engine
|
||||
.scan_to_stream(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
filters: vec![col("field_1").is_null()],
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+---------+
|
||||
| tag_0 | field_0 | ts | field_1 |
|
||||
+-------+---------+---------------------+---------+
|
||||
| a | 1.0 | 1970-01-01T00:00:00 | 10.0 |
|
||||
+-------+---------+---------------------+---------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
/// Build rows with schema (string, f64, ts_millis, string).
|
||||
fn build_rows_for_tags(
|
||||
tag0: &str,
|
||||
|
||||
@@ -12,8 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::Rows;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{Row, Rows, Value};
|
||||
use common_recordbatch::RecordBatches;
|
||||
use datafusion_expr::{col, lit};
|
||||
use store_api::region_engine::RegionEngine;
|
||||
use store_api::region_request::RegionRequest;
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
@@ -24,6 +26,22 @@ use crate::test_util::{
|
||||
CreateRequestBuilder, TestEnv, build_rows, delete_rows, flush_region, put_rows, rows_schema,
|
||||
};
|
||||
|
||||
fn build_row_with_nullable_field(key: &str, field_0: Option<f64>, ts_millis: i64) -> Row {
|
||||
Row {
|
||||
values: vec![
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue(key.to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: field_0.map(ValueData::F64Value),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(ts_millis)),
|
||||
},
|
||||
],
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_scan_without_filtering_deleted() {
|
||||
test_scan_without_filtering_deleted_with_format(false).await;
|
||||
@@ -121,3 +139,84 @@ async fn test_scan_without_filtering_deleted_with_format(flat_format: bool) {
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_filter_field_value_after_last_row_update() {
|
||||
test_filter_field_value_after_last_row_update_with_format(false).await;
|
||||
test_filter_field_value_after_last_row_update_with_format(true).await;
|
||||
}
|
||||
|
||||
async fn test_filter_field_value_after_last_row_update_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let mut env = TestEnv::new().await;
|
||||
let engine = env
|
||||
.create_engine(MitoConfig {
|
||||
default_flat_format: flat_format,
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
|
||||
env.get_schema_metadata_manager()
|
||||
.register_region_table_info(
|
||||
region_id.table_id(),
|
||||
"test_table",
|
||||
"test_catalog",
|
||||
"test_schema",
|
||||
None,
|
||||
env.get_kv_backend(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: vec![build_row_with_nullable_field("a", Some(10.0), 0)],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas,
|
||||
rows: vec![build_row_with_nullable_field("a", Some(20.0), 0)],
|
||||
},
|
||||
)
|
||||
.await;
|
||||
flush_region(&engine, region_id, None).await;
|
||||
|
||||
// We skip field filters under merge mode because the flushed field values may be stale before
|
||||
// the last-row update is merged.
|
||||
let stream = engine
|
||||
.scan_to_stream(
|
||||
region_id,
|
||||
ScanRequest {
|
||||
filters: vec![col("field_0").eq(lit(10.0))],
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| a | 20.0 | 1970-01-01T00:00:00 |
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
@@ -138,7 +138,10 @@ async fn test_prune_tag_and_field() {
|
||||
|
||||
async fn test_prune_tag_and_field_with_format(flat_format: bool) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
// prune result: only row group 1
|
||||
// Tag filter prunes to row group 1 (tags "5".."9"). The field filter is
|
||||
// intentionally not applied inside the mito reader (see `PreFilterMode::SkipFields`
|
||||
// for non-append merge modes — DataFusion re-applies it above the engine), so all
|
||||
// rows in the surviving row group are returned.
|
||||
check_prune_row_groups(
|
||||
vec![
|
||||
col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
|
||||
@@ -151,6 +154,8 @@ async fn test_prune_tag_and_field_with_format(flat_format: bool) {
|
||||
| 5 | 5.0 | 1970-01-01T00:00:05 |
|
||||
| 6 | 6.0 | 1970-01-01T00:00:06 |
|
||||
| 7 | 7.0 | 1970-01-01T00:00:07 |
|
||||
| 8 | 8.0 | 1970-01-01T00:00:08 |
|
||||
| 9 | 9.0 | 1970-01-01T00:00:09 |
|
||||
+-------+---------+---------------------+",
|
||||
flat_format,
|
||||
)
|
||||
@@ -443,7 +448,10 @@ async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) {
|
||||
)
|
||||
.await;
|
||||
|
||||
// Scans and filter fields, the field should be deleted.
|
||||
// Scans and filters by a field value. The mito reader skips field filters under
|
||||
// `PreFilterMode::SkipFields` (DataFusion re-applies them above the engine), so
|
||||
// the returned batches still contain all non-deleted rows — the reader's job here
|
||||
// is only to ensure the delete op is honored.
|
||||
let request = ScanRequest {
|
||||
filters: vec![col("field_0").eq(lit(3.0f64))],
|
||||
..Default::default()
|
||||
@@ -454,10 +462,12 @@ async fn test_scan_filter_field_after_delete_with_format(flat_format: bool) {
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+----+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+----+
|
||||
+-------+---------+----+";
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 1 | 1.0 | 1970-01-01T00:00:01 |
|
||||
| 4 | 4.0 | 1970-01-01T00:00:04 |
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(
|
||||
expected,
|
||||
sort_batches_and_print(&batches, &["tag_0", "field_0", "ts"])
|
||||
|
||||
@@ -59,7 +59,6 @@ use crate::memtable::time_series::{ValueBuilder, Values};
|
||||
use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
|
||||
use crate::sst::SeriesEstimator;
|
||||
use crate::sst::index::IndexOutput;
|
||||
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
|
||||
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
|
||||
@@ -1028,9 +1027,8 @@ impl EncodedBulkPart {
|
||||
sequence: Option<SequenceRange>,
|
||||
mem_scan_metrics: Option<MemScanMetrics>,
|
||||
) -> Result<Option<BoxedRecordBatchIterator>> {
|
||||
// Compute skip_fields for row group pruning using the same approach as compute_skip_fields in reader.rs.
|
||||
let skip_fields_for_pruning =
|
||||
Self::compute_skip_fields(context.pre_filter_mode(), &self.metadata.parquet_metadata);
|
||||
// Compute skip_fields for row group pruning from the configured pre-filter mode.
|
||||
let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
|
||||
|
||||
// use predicate to find row groups to read.
|
||||
let row_groups_to_read =
|
||||
@@ -1050,20 +1048,6 @@ impl EncodedBulkPart {
|
||||
)?;
|
||||
Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
|
||||
}
|
||||
|
||||
/// Computes whether to skip field columns based on PreFilterMode.
|
||||
fn compute_skip_fields(pre_filter_mode: PreFilterMode, parquet_meta: &ParquetMetaData) -> bool {
|
||||
match pre_filter_mode {
|
||||
PreFilterMode::All => false,
|
||||
PreFilterMode::SkipFields => true,
|
||||
PreFilterMode::SkipFieldsOnDelete => {
|
||||
// Check if any row group contains delete op
|
||||
(0..parquet_meta.num_row_groups()).any(|rg_idx| {
|
||||
row_group_contains_delete(parquet_meta, rg_idx, "memtable").unwrap_or(true)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(yingwen): max_sequence
|
||||
|
||||
@@ -29,7 +29,7 @@ use crate::memtable::bulk::part::EncodedBulkPart;
|
||||
use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
|
||||
use crate::memtable::{MemScanMetrics, MemScanMetricsData};
|
||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
|
||||
use crate::sst::parquet::file_range::TagDecodeState;
|
||||
use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index};
|
||||
use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key};
|
||||
|
||||
@@ -78,7 +78,7 @@ impl EncodedBulkPartIter {
|
||||
|
||||
let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
|
||||
Some(first_row_group) => {
|
||||
let skip_fields = builder.compute_skip_fields(&context, first_row_group);
|
||||
let skip_fields = context.pre_filter_mode().skip_fields();
|
||||
let reader = builder.build_row_group_reader(first_row_group, None)?;
|
||||
(Some(reader), skip_fields)
|
||||
}
|
||||
@@ -140,9 +140,7 @@ impl EncodedBulkPartIter {
|
||||
// Previous row group exhausted, read next row group
|
||||
while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
|
||||
// Compute skip_fields for this row group
|
||||
self.current_skip_fields = self
|
||||
.builder
|
||||
.compute_skip_fields(&self.context, next_row_group);
|
||||
self.current_skip_fields = self.context.pre_filter_mode().skip_fields();
|
||||
|
||||
let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
|
||||
let current = self.current_reader.insert(next_reader);
|
||||
@@ -299,11 +297,7 @@ impl BulkPartBatchIter {
|
||||
let projected_batch = self.apply_projection(record_batch)?;
|
||||
|
||||
// Apply combined filtering (both predicate and sequence filters)
|
||||
let skip_fields = match self.context.pre_filter_mode() {
|
||||
PreFilterMode::All => false,
|
||||
PreFilterMode::SkipFields => true,
|
||||
PreFilterMode::SkipFieldsOnDelete => true,
|
||||
};
|
||||
let skip_fields = self.context.pre_filter_mode().skip_fields();
|
||||
|
||||
let Some(filtered_batch) = apply_combined_filters(
|
||||
&self.context,
|
||||
|
||||
@@ -31,7 +31,6 @@ use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
|
||||
|
||||
pub(crate) struct MemtableRowGroupReaderBuilder {
|
||||
projection: ProjectionMask,
|
||||
parquet_metadata: Arc<ParquetMetaData>,
|
||||
arrow_metadata: ArrowReaderMetadata,
|
||||
data: Bytes,
|
||||
}
|
||||
@@ -51,7 +50,6 @@ impl MemtableRowGroupReaderBuilder {
|
||||
.context(ReadDataPartSnafu)?;
|
||||
Ok(Self {
|
||||
projection,
|
||||
parquet_metadata,
|
||||
arrow_metadata,
|
||||
data,
|
||||
})
|
||||
@@ -79,23 +77,4 @@ impl MemtableRowGroupReaderBuilder {
|
||||
|
||||
builder.build().context(ReadDataPartSnafu)
|
||||
}
|
||||
|
||||
/// Computes whether to skip field filters for a specific row group based on PreFilterMode.
|
||||
pub(crate) fn compute_skip_fields(
|
||||
&self,
|
||||
context: &BulkIterContextRef,
|
||||
row_group_idx: usize,
|
||||
) -> bool {
|
||||
use crate::sst::parquet::file_range::{PreFilterMode, row_group_contains_delete};
|
||||
|
||||
match context.pre_filter_mode() {
|
||||
PreFilterMode::All => false,
|
||||
PreFilterMode::SkipFields => true,
|
||||
PreFilterMode::SkipFieldsOnDelete => {
|
||||
// Check if this specific row group contains delete op
|
||||
row_group_contains_delete(&self.parquet_metadata, row_group_idx, "memtable")
|
||||
.unwrap_or(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1296,7 +1296,7 @@ fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
|
||||
}
|
||||
|
||||
match merge_mode {
|
||||
MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
|
||||
MergeMode::LastRow => PreFilterMode::SkipFields,
|
||||
MergeMode::LastNonNull => PreFilterMode::SkipFields,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ impl FileRange {
|
||||
std::slice::from_ref(curr_row_group),
|
||||
read_format,
|
||||
self.context.base.expected_metadata.clone(),
|
||||
self.compute_skip_fields(),
|
||||
self.context.base.pre_filter_mode.skip_fields(),
|
||||
);
|
||||
|
||||
// not costly to create a predicate here since dynamic filters are wrapped in Arc
|
||||
@@ -158,22 +158,6 @@ impl FileRange {
|
||||
.unwrap_or(true) // unexpected, not skip just in case
|
||||
}
|
||||
|
||||
fn compute_skip_fields(&self) -> bool {
|
||||
match self.context.base.pre_filter_mode {
|
||||
PreFilterMode::All => false,
|
||||
PreFilterMode::SkipFields => true,
|
||||
PreFilterMode::SkipFieldsOnDelete => {
|
||||
// Check if this specific row group contains delete op
|
||||
row_group_contains_delete(
|
||||
self.context.reader_builder.parquet_metadata(),
|
||||
self.row_group_idx,
|
||||
self.context.reader_builder.file_path(),
|
||||
)
|
||||
.unwrap_or(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reader to read the [FileRange].
|
||||
#[allow(dead_code)]
|
||||
pub(crate) async fn reader(
|
||||
@@ -185,7 +169,7 @@ impl FileRange {
|
||||
return Ok(None);
|
||||
}
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
|
||||
let skip_fields = self.context.base.pre_filter_mode.skip_fields();
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
@@ -247,7 +231,7 @@ impl FileRange {
|
||||
return Ok(None);
|
||||
}
|
||||
// Compute skip_fields once for this row group
|
||||
let skip_fields = self.context.should_skip_fields(self.row_group_idx);
|
||||
let skip_fields = self.context.base.pre_filter_mode.skip_fields();
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder
|
||||
@@ -404,16 +388,8 @@ impl FileRangeContext {
|
||||
)
|
||||
}
|
||||
|
||||
/// Determines whether to skip field filters based on PreFilterMode and row group delete status.
|
||||
pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
|
||||
match self.base.pre_filter_mode {
|
||||
PreFilterMode::All => false,
|
||||
PreFilterMode::SkipFields => true,
|
||||
PreFilterMode::SkipFieldsOnDelete => {
|
||||
// Check if this specific row group contains delete op
|
||||
self.contains_delete(row_group_idx).unwrap_or(true)
|
||||
}
|
||||
}
|
||||
pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
|
||||
self.base.pre_filter_mode
|
||||
}
|
||||
|
||||
//// Decodes parquet metadata and finds if row group contains delete op.
|
||||
@@ -451,13 +427,16 @@ impl FileRangeContext {
|
||||
pub enum PreFilterMode {
|
||||
/// Filters all columns.
|
||||
All,
|
||||
/// If the range doesn't contain delete op or doesn't have statistics, filters all columns.
|
||||
/// Otherwise, skips filtering fields.
|
||||
SkipFieldsOnDelete,
|
||||
/// Always skip fields.
|
||||
SkipFields,
|
||||
}
|
||||
|
||||
impl PreFilterMode {
|
||||
pub(crate) fn skip_fields(self) -> bool {
|
||||
matches!(self, Self::SkipFields)
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for partition expression filtering.
|
||||
pub(crate) struct PartitionFilterContext {
|
||||
pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
|
||||
@@ -514,7 +493,7 @@ impl RangeBase {
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `input` - The batch to filter
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode
|
||||
pub(crate) fn precise_filter(
|
||||
&self,
|
||||
mut input: Batch,
|
||||
@@ -626,7 +605,7 @@ impl RangeBase {
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `input` - The RecordBatch to filter
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode
|
||||
pub(crate) fn precise_filter_flat(
|
||||
&self,
|
||||
input: RecordBatch,
|
||||
@@ -679,7 +658,7 @@ impl RangeBase {
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `input` - The RecordBatch to compute mask for
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode and row group delete status
|
||||
/// * `skip_fields` - Whether to skip field filters based on PreFilterMode
|
||||
pub(crate) fn compute_filter_mask_flat(
|
||||
&self,
|
||||
input: &RecordBatch,
|
||||
|
||||
@@ -72,7 +72,6 @@ use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
|
||||
use crate::sst::parquet::async_reader::SstAsyncFileReader;
|
||||
use crate::sst::parquet::file_range::{
|
||||
FileRangeContext, FileRangeContextRef, PartitionFilterContext, PreFilterMode, RangeBase,
|
||||
row_group_contains_delete,
|
||||
};
|
||||
use crate::sst::parquet::format::{ReadFormat, need_override_sequence};
|
||||
use crate::sst::parquet::metadata::MetadataLoader;
|
||||
@@ -678,7 +677,7 @@ impl ParquetReaderBuilder {
|
||||
metrics.rows_total += num_rows as usize;
|
||||
|
||||
// Compute skip_fields once for all pruning operations
|
||||
let skip_fields = self.compute_skip_fields(parquet_meta);
|
||||
let skip_fields = self.pre_filter_mode.skip_fields();
|
||||
|
||||
let mut output = self.row_groups_by_minmax(
|
||||
read_format,
|
||||
@@ -1114,25 +1113,6 @@ impl ParquetReaderBuilder {
|
||||
pruned
|
||||
}
|
||||
|
||||
/// Computes whether to skip field columns when building statistics based on PreFilterMode.
|
||||
fn compute_skip_fields(&self, parquet_meta: &ParquetMetaData) -> bool {
|
||||
match self.pre_filter_mode {
|
||||
PreFilterMode::All => false,
|
||||
PreFilterMode::SkipFields => true,
|
||||
PreFilterMode::SkipFieldsOnDelete => {
|
||||
// Check if any row group contains delete op
|
||||
let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
|
||||
(0..parquet_meta.num_row_groups()).any(|rg_idx| {
|
||||
row_group_contains_delete(parquet_meta, rg_idx, &file_path)
|
||||
.inspect_err(|e| {
|
||||
warn!(e; "Failed to decode min value of op_type, fallback to not skipping fields");
|
||||
})
|
||||
.unwrap_or(false)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes row groups selection after min-max pruning.
|
||||
fn row_groups_by_minmax(
|
||||
&self,
|
||||
@@ -1957,7 +1937,7 @@ impl ParquetReader {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let skip_fields = self.context.should_skip_fields(row_group_idx);
|
||||
let skip_fields = self.context.pre_filter_mode().skip_fields();
|
||||
let parquet_reader = self
|
||||
.context
|
||||
.reader_builder()
|
||||
@@ -1990,7 +1970,7 @@ impl ParquetReader {
|
||||
debug_assert!(context.read_format().as_flat().is_some());
|
||||
let fetch_metrics = ParquetFetchMetrics::default();
|
||||
let reader = if let Some((row_group_idx, row_selection)) = selection.pop_first() {
|
||||
let skip_fields = context.should_skip_fields(row_group_idx);
|
||||
let skip_fields = context.pre_filter_mode().skip_fields();
|
||||
let parquet_reader = context
|
||||
.reader_builder()
|
||||
.build(context.build_context(
|
||||
|
||||
@@ -0,0 +1,69 @@
|
||||
CREATE TABLE prefilter_last_row_null(
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
) ENGINE=mito;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, NULL);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN FLUSH_TABLE('prefilter_last_row_null');
|
||||
|
||||
+----------------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('prefilter_last_row_null') |
|
||||
+----------------------------------------------+
|
||||
| 0 |
|
||||
+----------------------------------------------+
|
||||
|
||||
INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, 10.0);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN FLUSH_TABLE('prefilter_last_row_null');
|
||||
|
||||
+----------------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('prefilter_last_row_null') |
|
||||
+----------------------------------------------+
|
||||
| 0 |
|
||||
+----------------------------------------------+
|
||||
|
||||
SELECT COUNT(*) FROM prefilter_last_row_null WHERE cpu IS NULL;
|
||||
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 0 |
|
||||
+----------+
|
||||
|
||||
ALTER TABLE prefilter_last_row_null ADD COLUMN memory DOUBLE NULL;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
INSERT INTO prefilter_last_row_null(host, ts, cpu, memory) VALUES ('host1', 0, 10.0, 20.0);
|
||||
|
||||
Affected Rows: 1
|
||||
|
||||
ADMIN FLUSH_TABLE('prefilter_last_row_null');
|
||||
|
||||
+----------------------------------------------+
|
||||
| ADMIN FLUSH_TABLE('prefilter_last_row_null') |
|
||||
+----------------------------------------------+
|
||||
| 0 |
|
||||
+----------------------------------------------+
|
||||
|
||||
SELECT COUNT(*) FROM prefilter_last_row_null WHERE memory IS NULL;
|
||||
|
||||
+----------+
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 0 |
|
||||
+----------+
|
||||
|
||||
DROP TABLE prefilter_last_row_null;
|
||||
|
||||
Affected Rows: 0
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
CREATE TABLE prefilter_last_row_null(
|
||||
host STRING,
|
||||
ts TIMESTAMP TIME INDEX,
|
||||
cpu DOUBLE,
|
||||
PRIMARY KEY(host)
|
||||
) ENGINE=mito;
|
||||
|
||||
INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, NULL);
|
||||
|
||||
ADMIN FLUSH_TABLE('prefilter_last_row_null');
|
||||
|
||||
INSERT INTO prefilter_last_row_null(host, ts, cpu) VALUES ('host1', 0, 10.0);
|
||||
|
||||
ADMIN FLUSH_TABLE('prefilter_last_row_null');
|
||||
|
||||
SELECT COUNT(*) FROM prefilter_last_row_null WHERE cpu IS NULL;
|
||||
|
||||
ALTER TABLE prefilter_last_row_null ADD COLUMN memory DOUBLE NULL;
|
||||
|
||||
INSERT INTO prefilter_last_row_null(host, ts, cpu, memory) VALUES ('host1', 0, 10.0, 20.0);
|
||||
|
||||
ADMIN FLUSH_TABLE('prefilter_last_row_null');
|
||||
|
||||
SELECT COUNT(*) FROM prefilter_last_row_null WHERE memory IS NULL;
|
||||
|
||||
DROP TABLE prefilter_last_row_null;
|
||||
Reference in New Issue
Block a user