From 8b9ee1f27910493da308ceeb16d2c44823b28778 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 13 Apr 2026 14:30:46 +0800 Subject: [PATCH] chore: revert accident change Signed-off-by: discord9 --- src/query/src/promql/planner.rs | 172 ++++++++++---------------------- 1 file changed, 52 insertions(+), 120 deletions(-) diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 3dfd75fdd4..640994dea2 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -23,8 +23,6 @@ use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_function::function::FunctionContext; use common_query::prelude::greptime_value; -use common_time::Timestamp; -use common_time::timestamp::TimeUnit; use datafusion::common::DFSchemaRef; use datafusion::datasource::DefaultTableSource; use datafusion::functions_aggregate::average::avg_udaf; @@ -93,9 +91,9 @@ use crate::promql::error::{ InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu, Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu, - TimestampOutOfRangeSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, - UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, - ValueNotFoundSnafu, ZeroRangeSelectorSnafu, + UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu, + UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu, + ZeroRangeSelectorSnafu, }; use crate::query_engine::QueryEngineState; @@ -1223,34 +1221,26 @@ impl PromPlanner { label_matchers: Matchers, is_range_selector: bool, ) -> Result { + // make table scan plan + let table_ref = self.table_ref()?; + let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?; + let table_schema = table_scan.schema(); + + // make filter exprs let offset_duration = match offset { Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond, Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond), None => 0, }; - - // make table scan plan - let table_ref = self.table_ref()?; - let (mut table_scan, time_filter_pushed_down) = self - .create_table_scan_plan(table_ref.clone(), offset_duration) - .await?; - let table_schema = table_scan.schema(); - - // make filter exprs let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?; - if !time_filter_pushed_down - && let Some(time_index_filter) = - self.build_time_index_filter(offset_duration, TimeUnit::Millisecond)? - { + if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? { scan_filters.push(time_index_filter); } - if let Some(scan_filter) = conjunction(scan_filters) { - table_scan = LogicalPlanBuilder::from(table_scan) - .filter(scan_filter) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; - } + table_scan = LogicalPlanBuilder::from(table_scan) + .filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty. + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; // make a projection plan if there is any `__field__` matcher if let Some(field_matchers) = &self.ctx.field_column_matcher { @@ -1600,11 +1590,7 @@ impl PromPlanner { Ok(table_ref) } - fn build_time_index_filter( - &self, - offset_duration: i64, - time_index_unit: TimeUnit, - ) -> Result> { + fn build_time_index_filter(&self, offset_duration: i64) -> Result> { let start = self.ctx.start; let end = self.ctx.end; if end < start { @@ -1628,92 +1614,56 @@ impl PromPlanner { // Scan a continuous time range if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H { - let lower_bound = self.build_scan_time_filter_literal( - start - offset_duration - selector_window + lower_exclusive_adjustment, - time_index_unit, - )?; - let upper_bound = self.build_scan_time_filter_literal( - self.ctx - .end - .checked_sub(offset_duration) - .and_then(|ts| ts.checked_add(1)) - .with_context(|| TimestampOutOfRangeSnafu { - timestamp: self.ctx.end - offset_duration, - unit: TimeUnit::Millisecond, - })?, - time_index_unit, - )?; let single_time_range = time_index_expr .clone() - .gt_eq(lower_bound) - .and(time_index_expr.lt(upper_bound)); + .gt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond( + Some( + self.ctx.start - offset_duration - selector_window + + lower_exclusive_adjustment, + ), + None, + ), + None, + )) + .and(time_index_expr.lt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None), + None, + ))); return Ok(Some(single_time_range)); } // Otherwise scan scatter ranges separately let mut filters = Vec::with_capacity(num_points as usize + 1); for timestamp in (start..=end).step_by(interval as usize) { - let lower_bound = self.build_scan_time_filter_literal( - timestamp - offset_duration - selector_window + lower_exclusive_adjustment, - time_index_unit, - )?; - let upper_bound = self.build_scan_time_filter_literal( - timestamp - .checked_sub(offset_duration) - .and_then(|ts| ts.checked_add(1)) - .with_context(|| TimestampOutOfRangeSnafu { - timestamp: timestamp - offset_duration, - unit: TimeUnit::Millisecond, - })?, - time_index_unit, - )?; filters.push( time_index_expr .clone() - .gt_eq(lower_bound) - .and(time_index_expr.clone().lt(upper_bound)), + .gt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond( + Some( + timestamp - offset_duration - selector_window + + lower_exclusive_adjustment, + ), + None, + ), + None, + )) + .and(time_index_expr.clone().lt_eq(DfExpr::Literal( + ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None), + None, + ))), ) } Ok(filters.into_iter().reduce(DfExpr::or)) } - fn build_scan_time_filter_literal( - &self, - timestamp_ms: i64, - time_index_unit: TimeUnit, - ) -> Result { - let timestamp = Timestamp::new(timestamp_ms, TimeUnit::Millisecond) - .convert_to_ceil(time_index_unit) - .with_context(|| TimestampOutOfRangeSnafu { - timestamp: timestamp_ms, - unit: time_index_unit, - })?; - Ok(DfExpr::Literal( - Self::timestamp_to_scalar_value(timestamp), - None, - )) - } - - fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue { - let value = timestamp.value(); - match timestamp.unit() { - TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None), - TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None), - TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None), - TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None), - } - } - /// Create a table scan plan and a filter plan with given filter. /// /// # Panic /// If the filter is empty - async fn create_table_scan_plan( - &mut self, - table_ref: TableReference, - offset_duration: i64, - ) -> Result<(LogicalPlan, bool)> { + async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result { let provider = self .table_provider .resolve_table(table_ref.clone()) @@ -1806,19 +1756,14 @@ impl PromPlanner { self.ctx.tag_columns.clone() }; - let time_index_unit = scan_table + let is_time_index_ms = scan_table .schema() .timestamp_column() .with_context(|| TimeIndexNotFoundSnafu { table: maybe_phy_table_ref.to_quoted_string(), })? .data_type - .as_timestamp() - .with_context(|| TimeIndexNotFoundSnafu { - table: maybe_phy_table_ref.to_quoted_string(), - })? - .unit(); - let is_time_index_ms = time_index_unit == TimeUnit::Millisecond; + == ConcreteDataType::timestamp_millisecond_datatype(); let scan_projection = if table_id_filter.is_some() { let mut required_columns = HashSet::new(); @@ -1871,17 +1816,6 @@ impl PromPlanner { .context(DataFusionPlanningSnafu)?; } - if !is_time_index_ms - && let Some(time_index_filter) = - self.build_time_index_filter(offset_duration, time_index_unit)? - { - scan_plan = LogicalPlanBuilder::from(scan_plan) - .filter(time_index_filter) - .context(DataFusionPlanningSnafu)? - .build() - .context(DataFusionPlanningSnafu)?; - } - if !is_time_index_ms { // cast to ms if time_index not in Millisecond precision let expr: Vec<_> = self @@ -1948,7 +1882,7 @@ impl PromPlanner { let result = LogicalPlanBuilder::from(scan_plan) .build() .context(DataFusionPlanningSnafu)?; - Ok((result, !is_time_index_ms)) + Ok(result) } fn collect_row_key_tag_columns_from_plan( @@ -6151,10 +6085,9 @@ mod test { "PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ - \n Filter: metrics.tag = Utf8(\"1\") [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ + \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ - \n Filter: metrics.timestamp >= TimestampNanosecond(-999000000, None) AND metrics.timestamp < TimestampNanosecond(100000001000000, None) [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]\ - \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]" + \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]" ); let plan = PromPlanner::stmt_to_plan( DfTableSourceProvider::new( @@ -6185,10 +6118,9 @@ mod test { \n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ \n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ \n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ - \n Filter: metrics.tag = Utf8(\"1\") [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ + \n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ \n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\ - \n Filter: metrics.timestamp >= TimestampNanosecond(-4999000000, None) AND metrics.timestamp < TimestampNanosecond(100000001000000, None) [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]\ - \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]" + \n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]" ); }