fix: disable field pruning in last non null mode (#4740)

* fix: don't prune fields in last non null mode

* test: add sqlness test for field pruning

* test: add flush

* refine implementation

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Yingwen
2024-09-20 08:35:37 +08:00
committed by GitHub
parent f5cf25b0db
commit f02410c39b
3 changed files with 131 additions and 1 deletions

View File

@@ -14,6 +14,7 @@
//! Scans a region according to the scan request.
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -24,6 +25,7 @@ use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use datafusion_expr::utils::expr_to_columns;
use smallvec::SmallVec;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
@@ -295,6 +297,9 @@ impl ScanRegion {
self.version.options.append_mode,
);
// Remove field filters for LastNonNull mode after logging the request.
self.maybe_remove_field_filters();
let inverted_index_applier = self.build_invereted_index_applier();
let fulltext_index_applier = self.build_fulltext_index_applier();
let predicate = Predicate::new(self.request.filters.clone());
@@ -321,7 +326,7 @@ impl ScanRegion {
Ok(input)
}
/// Build time range predicate from filters.
/// Build time range predicate from filters, also remove time filters from request.
fn build_time_range_predicate(&mut self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
@@ -337,6 +342,38 @@ impl ScanRegion {
)
}
/// Remove field filters if the merge mode is [MergeMode::LastNonNull].
fn maybe_remove_field_filters(&mut self) {
if self.version.options.merge_mode() != MergeMode::LastNonNull {
return;
}
// TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
let field_columns = self
.version
.metadata
.field_columns()
.map(|col| &col.column_schema.name)
.collect::<HashSet<_>>();
// Columns in the expr.
let mut columns = HashSet::new();
self.request.filters.retain(|expr| {
columns.clear();
// `expr_to_columns` won't return error.
if expr_to_columns(expr, &mut columns).is_err() {
return false;
}
for column in &columns {
if field_columns.contains(&column.name) {
// This expr uses the field column.
return false;
}
}
true
});
}
/// Use the latest schema to build the inveretd index applier.
fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
if self.ignore_inverted_index {