mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: do not remove time filters in ScanRegion (#5180)
* feat: do not remove time filters * chore: remove `time_range` from parquet reader * chore: print more message in the check script * chore: fix unused error
This commit is contained in:
@@ -58,8 +58,10 @@ def main():
|
||||
if not check_snafu_in_files(branch_name, other_rust_files)
|
||||
]
|
||||
|
||||
for name in unused_snafu:
|
||||
print(name)
|
||||
if unused_snafu:
|
||||
print("Unused error variants:")
|
||||
for name in unused_snafu:
|
||||
print(name)
|
||||
|
||||
if unused_snafu:
|
||||
raise SystemExit(1)
|
||||
|
||||
@@ -756,13 +756,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
|
||||
BuildTimeRangeFilter {
|
||||
timestamp: Timestamp,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to open region"))]
|
||||
OpenRegion {
|
||||
#[snafu(implicit)]
|
||||
@@ -1023,7 +1016,6 @@ impl ErrorExt for Error {
|
||||
ChecksumMismatch { .. } => StatusCode::Unexpected,
|
||||
RegionStopped { .. } => StatusCode::RegionNotReady,
|
||||
TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments,
|
||||
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
|
||||
UnsupportedOperation { .. } => StatusCode::Unsupported,
|
||||
RemoteCompaction { .. } => StatusCode::Unexpected,
|
||||
|
||||
|
||||
@@ -355,8 +355,8 @@ impl ScanRegion {
|
||||
Ok(input)
|
||||
}
|
||||
|
||||
/// Build time range predicate from filters, also remove time filters from request.
|
||||
fn build_time_range_predicate(&mut self) -> TimestampRange {
|
||||
/// Build time range predicate from filters.
|
||||
fn build_time_range_predicate(&self) -> TimestampRange {
|
||||
let time_index = self.version.metadata.time_index_column();
|
||||
let unit = time_index
|
||||
.column_schema
|
||||
@@ -364,11 +364,7 @@ impl ScanRegion {
|
||||
.as_timestamp()
|
||||
.expect("Time index must have timestamp-compatible type")
|
||||
.unit();
|
||||
build_time_range_predicate(
|
||||
&time_index.column_schema.name,
|
||||
unit,
|
||||
&mut self.request.filters,
|
||||
)
|
||||
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
|
||||
}
|
||||
|
||||
/// Remove field filters if the merge mode is [MergeMode::LastNonNull].
|
||||
@@ -695,7 +691,6 @@ impl ScanInput {
|
||||
.access_layer
|
||||
.read_sst(file.clone())
|
||||
.predicate(self.predicate.clone())
|
||||
.time_range(self.time_range)
|
||||
.projection(Some(self.mapper.column_ids().to_vec()))
|
||||
.cache(self.cache_manager.clone())
|
||||
.inverted_index_applier(self.inverted_index_applier.clone())
|
||||
|
||||
@@ -23,11 +23,7 @@ use api::v1::SemanticType;
|
||||
use async_trait::async_trait;
|
||||
use common_recordbatch::filter::SimpleFilterEvaluator;
|
||||
use common_telemetry::{debug, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::ScalarValue;
|
||||
use datafusion_expr::{Expr, Operator};
|
||||
use datafusion_expr::Expr;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use itertools::Itertools;
|
||||
@@ -42,7 +38,6 @@ use store_api::storage::ColumnId;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::error;
|
||||
use crate::error::{
|
||||
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
|
||||
};
|
||||
@@ -74,8 +69,6 @@ pub struct ParquetReaderBuilder {
|
||||
object_store: ObjectStore,
|
||||
/// Predicate to push down.
|
||||
predicate: Option<Predicate>,
|
||||
/// Time range to filter.
|
||||
time_range: Option<TimestampRange>,
|
||||
/// Metadata of columns to read.
|
||||
///
|
||||
/// `None` reads all columns. Due to schema change, the projection
|
||||
@@ -104,7 +97,6 @@ impl ParquetReaderBuilder {
|
||||
file_handle,
|
||||
object_store,
|
||||
predicate: None,
|
||||
time_range: None,
|
||||
projection: None,
|
||||
cache_manager: None,
|
||||
inverted_index_applier: None,
|
||||
@@ -120,13 +112,6 @@ impl ParquetReaderBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
/// Attaches the time range to the builder.
|
||||
#[must_use]
|
||||
pub fn time_range(mut self, time_range: Option<TimestampRange>) -> ParquetReaderBuilder {
|
||||
self.time_range = time_range;
|
||||
self
|
||||
}
|
||||
|
||||
/// Attaches the projection to the builder.
|
||||
///
|
||||
/// The reader only applies the projection to fields.
|
||||
@@ -238,7 +223,7 @@ impl ParquetReaderBuilder {
|
||||
cache_manager: self.cache_manager.clone(),
|
||||
};
|
||||
|
||||
let mut filters = if let Some(predicate) = &self.predicate {
|
||||
let filters = if let Some(predicate) = &self.predicate {
|
||||
predicate
|
||||
.exprs()
|
||||
.iter()
|
||||
@@ -254,10 +239,6 @@ impl ParquetReaderBuilder {
|
||||
vec![]
|
||||
};
|
||||
|
||||
if let Some(time_range) = &self.time_range {
|
||||
filters.extend(time_range_to_predicate(*time_range, ®ion_meta)?);
|
||||
}
|
||||
|
||||
let codec = McmpRowCodec::new(
|
||||
read_format
|
||||
.metadata()
|
||||
@@ -678,59 +659,6 @@ impl ParquetReaderBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Transforms time range into [SimpleFilterEvaluator].
|
||||
fn time_range_to_predicate(
|
||||
time_range: TimestampRange,
|
||||
metadata: &RegionMetadataRef,
|
||||
) -> Result<Vec<SimpleFilterContext>> {
|
||||
let ts_col = metadata.time_index_column();
|
||||
let ts_col_id = ts_col.column_id;
|
||||
|
||||
let ts_to_filter = |op: Operator, timestamp: &Timestamp| {
|
||||
let value = match timestamp.unit() {
|
||||
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
|
||||
TimeUnit::Millisecond => {
|
||||
ScalarValue::TimestampMillisecond(Some(timestamp.value()), None)
|
||||
}
|
||||
TimeUnit::Microsecond => {
|
||||
ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None)
|
||||
}
|
||||
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
|
||||
};
|
||||
let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op)
|
||||
.context(error::BuildTimeRangeFilterSnafu {
|
||||
timestamp: *timestamp,
|
||||
})?;
|
||||
Ok(SimpleFilterContext::new(
|
||||
evaluator,
|
||||
ts_col_id,
|
||||
SemanticType::Timestamp,
|
||||
ts_col.column_schema.data_type.clone(),
|
||||
))
|
||||
};
|
||||
|
||||
let predicates = match (time_range.start(), time_range.end()) {
|
||||
(Some(start), Some(end)) => {
|
||||
vec![
|
||||
ts_to_filter(Operator::GtEq, start)?,
|
||||
ts_to_filter(Operator::Lt, end)?,
|
||||
]
|
||||
}
|
||||
|
||||
(Some(start), None) => {
|
||||
vec![ts_to_filter(Operator::GtEq, start)?]
|
||||
}
|
||||
|
||||
(None, Some(end)) => {
|
||||
vec![ts_to_filter(Operator::Lt, end)?]
|
||||
}
|
||||
(None, None) => {
|
||||
vec![]
|
||||
}
|
||||
};
|
||||
Ok(predicates)
|
||||
}
|
||||
|
||||
/// Metrics of filtering rows groups and rows.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub(crate) struct ReaderFilterMetrics {
|
||||
@@ -939,20 +867,6 @@ pub(crate) struct SimpleFilterContext {
|
||||
}
|
||||
|
||||
impl SimpleFilterContext {
|
||||
fn new(
|
||||
filter: SimpleFilterEvaluator,
|
||||
column_id: ColumnId,
|
||||
semantic_type: SemanticType,
|
||||
data_type: ConcreteDataType,
|
||||
) -> Self {
|
||||
Self {
|
||||
filter,
|
||||
column_id,
|
||||
semantic_type,
|
||||
data_type,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a context for the `expr`.
|
||||
///
|
||||
/// Returns None if the column to filter doesn't exist in the SST metadata or the
|
||||
|
||||
@@ -115,9 +115,9 @@ struct TimeRangeTester {
|
||||
impl TimeRangeTester {
|
||||
async fn check(&self, sql: &str, expect: TimestampRange) {
|
||||
let _ = exec_selection(self.engine.clone(), sql).await;
|
||||
let mut filters = self.take_filters();
|
||||
let filters = self.take_filters();
|
||||
|
||||
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters);
|
||||
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &filters);
|
||||
assert_eq!(expect, range);
|
||||
}
|
||||
|
||||
|
||||
@@ -135,21 +135,17 @@ impl Predicate {
|
||||
// since it requires query engine to convert sql to filters.
|
||||
/// `build_time_range_predicate` extracts time range from logical exprs to facilitate fast
|
||||
/// time range pruning.
|
||||
pub fn build_time_range_predicate<'a>(
|
||||
ts_col_name: &'a str,
|
||||
pub fn build_time_range_predicate(
|
||||
ts_col_name: &str,
|
||||
ts_col_unit: TimeUnit,
|
||||
filters: &'a mut Vec<Expr>,
|
||||
filters: &[Expr],
|
||||
) -> TimestampRange {
|
||||
let mut res = TimestampRange::min_to_max();
|
||||
let mut filters_remain = vec![];
|
||||
for expr in std::mem::take(filters) {
|
||||
if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, &expr) {
|
||||
for expr in filters {
|
||||
if let Some(range) = extract_time_range_from_expr(ts_col_name, ts_col_unit, expr) {
|
||||
res = res.and(&range);
|
||||
} else {
|
||||
filters_remain.push(expr);
|
||||
}
|
||||
}
|
||||
*filters = filters_remain;
|
||||
res
|
||||
}
|
||||
|
||||
@@ -392,7 +388,7 @@ mod tests {
|
||||
fn check_build_predicate(expr: Expr, expect: TimestampRange) {
|
||||
assert_eq!(
|
||||
expect,
|
||||
build_time_range_predicate("ts", TimeUnit::Millisecond, &mut vec![expr])
|
||||
build_time_range_predicate("ts", TimeUnit::Millisecond, &[expr])
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user