chore: revert accident change

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-04-13 14:30:46 +08:00
parent 22a6c8c13f
commit 8b9ee1f279

View File

@@ -23,8 +23,6 @@ use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_function::function::FunctionContext;
use common_query::prelude::greptime_value;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use datafusion::common::DFSchemaRef;
use datafusion::datasource::DefaultTableSource;
use datafusion::functions_aggregate::average::avg_udaf;
@@ -93,9 +91,9 @@ use crate::promql::error::{
InvalidRegularExpressionSnafu, InvalidTimeRangeSnafu, MultiFieldsNotSupportedSnafu,
MultipleMetricMatchersSnafu, MultipleVectorSnafu, NoMetricMatcherSnafu, PromqlPlanNodeSnafu,
Result, SameLabelSetSnafu, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
TimestampOutOfRangeSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu,
ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
UnsupportedMatcherOpSnafu, UnsupportedVectorMatchSnafu, ValueNotFoundSnafu,
ZeroRangeSelectorSnafu,
};
use crate::query_engine::QueryEngineState;
@@ -1223,34 +1221,26 @@ impl PromPlanner {
label_matchers: Matchers,
is_range_selector: bool,
) -> Result<LogicalPlan> {
// make table scan plan
let table_ref = self.table_ref()?;
let mut table_scan = self.create_table_scan_plan(table_ref.clone()).await?;
let table_schema = table_scan.schema();
// make filter exprs
let offset_duration = match offset {
Some(Offset::Pos(duration)) => duration.as_millis() as Millisecond,
Some(Offset::Neg(duration)) => -(duration.as_millis() as Millisecond),
None => 0,
};
// make table scan plan
let table_ref = self.table_ref()?;
let (mut table_scan, time_filter_pushed_down) = self
.create_table_scan_plan(table_ref.clone(), offset_duration)
.await?;
let table_schema = table_scan.schema();
// make filter exprs
let mut scan_filters = Self::matchers_to_expr(label_matchers.clone(), table_schema)?;
if !time_filter_pushed_down
&& let Some(time_index_filter) =
self.build_time_index_filter(offset_duration, TimeUnit::Millisecond)?
{
if let Some(time_index_filter) = self.build_time_index_filter(offset_duration)? {
scan_filters.push(time_index_filter);
}
if let Some(scan_filter) = conjunction(scan_filters) {
table_scan = LogicalPlanBuilder::from(table_scan)
.filter(scan_filter)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}
table_scan = LogicalPlanBuilder::from(table_scan)
.filter(conjunction(scan_filters).unwrap()) // Safety: `scan_filters` is not empty.
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
// make a projection plan if there is any `__field__` matcher
if let Some(field_matchers) = &self.ctx.field_column_matcher {
@@ -1600,11 +1590,7 @@ impl PromPlanner {
Ok(table_ref)
}
fn build_time_index_filter(
&self,
offset_duration: i64,
time_index_unit: TimeUnit,
) -> Result<Option<DfExpr>> {
fn build_time_index_filter(&self, offset_duration: i64) -> Result<Option<DfExpr>> {
let start = self.ctx.start;
let end = self.ctx.end;
if end < start {
@@ -1628,92 +1614,56 @@ impl PromPlanner {
// Scan a continuous time range
if (end - start) / interval > MAX_SCATTER_POINTS || interval <= INTERVAL_1H {
let lower_bound = self.build_scan_time_filter_literal(
start - offset_duration - selector_window + lower_exclusive_adjustment,
time_index_unit,
)?;
let upper_bound = self.build_scan_time_filter_literal(
self.ctx
.end
.checked_sub(offset_duration)
.and_then(|ts| ts.checked_add(1))
.with_context(|| TimestampOutOfRangeSnafu {
timestamp: self.ctx.end - offset_duration,
unit: TimeUnit::Millisecond,
})?,
time_index_unit,
)?;
let single_time_range = time_index_expr
.clone()
.gt_eq(lower_bound)
.and(time_index_expr.lt(upper_bound));
.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(
self.ctx.start - offset_duration - selector_window
+ lower_exclusive_adjustment,
),
None,
),
None,
))
.and(time_index_expr.lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(Some(self.ctx.end - offset_duration), None),
None,
)));
return Ok(Some(single_time_range));
}
// Otherwise scan scatter ranges separately
let mut filters = Vec::with_capacity(num_points as usize + 1);
for timestamp in (start..=end).step_by(interval as usize) {
let lower_bound = self.build_scan_time_filter_literal(
timestamp - offset_duration - selector_window + lower_exclusive_adjustment,
time_index_unit,
)?;
let upper_bound = self.build_scan_time_filter_literal(
timestamp
.checked_sub(offset_duration)
.and_then(|ts| ts.checked_add(1))
.with_context(|| TimestampOutOfRangeSnafu {
timestamp: timestamp - offset_duration,
unit: TimeUnit::Millisecond,
})?,
time_index_unit,
)?;
filters.push(
time_index_expr
.clone()
.gt_eq(lower_bound)
.and(time_index_expr.clone().lt(upper_bound)),
.gt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(
Some(
timestamp - offset_duration - selector_window
+ lower_exclusive_adjustment,
),
None,
),
None,
))
.and(time_index_expr.clone().lt_eq(DfExpr::Literal(
ScalarValue::TimestampMillisecond(Some(timestamp - offset_duration), None),
None,
))),
)
}
Ok(filters.into_iter().reduce(DfExpr::or))
}
fn build_scan_time_filter_literal(
&self,
timestamp_ms: i64,
time_index_unit: TimeUnit,
) -> Result<DfExpr> {
let timestamp = Timestamp::new(timestamp_ms, TimeUnit::Millisecond)
.convert_to_ceil(time_index_unit)
.with_context(|| TimestampOutOfRangeSnafu {
timestamp: timestamp_ms,
unit: time_index_unit,
})?;
Ok(DfExpr::Literal(
Self::timestamp_to_scalar_value(timestamp),
None,
))
}
fn timestamp_to_scalar_value(timestamp: Timestamp) -> ScalarValue {
let value = timestamp.value();
match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(value), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(value), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(value), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(value), None),
}
}
/// Create a table scan plan and a filter plan with given filter.
///
/// # Panic
/// If the filter is empty
async fn create_table_scan_plan(
&mut self,
table_ref: TableReference,
offset_duration: i64,
) -> Result<(LogicalPlan, bool)> {
async fn create_table_scan_plan(&mut self, table_ref: TableReference) -> Result<LogicalPlan> {
let provider = self
.table_provider
.resolve_table(table_ref.clone())
@@ -1806,19 +1756,14 @@ impl PromPlanner {
self.ctx.tag_columns.clone()
};
let time_index_unit = scan_table
let is_time_index_ms = scan_table
.schema()
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: maybe_phy_table_ref.to_quoted_string(),
})?
.data_type
.as_timestamp()
.with_context(|| TimeIndexNotFoundSnafu {
table: maybe_phy_table_ref.to_quoted_string(),
})?
.unit();
let is_time_index_ms = time_index_unit == TimeUnit::Millisecond;
== ConcreteDataType::timestamp_millisecond_datatype();
let scan_projection = if table_id_filter.is_some() {
let mut required_columns = HashSet::new();
@@ -1871,17 +1816,6 @@ impl PromPlanner {
.context(DataFusionPlanningSnafu)?;
}
if !is_time_index_ms
&& let Some(time_index_filter) =
self.build_time_index_filter(offset_duration, time_index_unit)?
{
scan_plan = LogicalPlanBuilder::from(scan_plan)
.filter(time_index_filter)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}
if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr: Vec<_> = self
@@ -1948,7 +1882,7 @@ impl PromPlanner {
let result = LogicalPlanBuilder::from(scan_plan)
.build()
.context(DataFusionPlanningSnafu)?;
Ok((result, !is_time_index_ms))
Ok(result)
}
fn collect_row_key_tag_columns_from_plan(
@@ -6151,10 +6085,9 @@ mod test {
"PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Filter: metrics.tag = Utf8(\"1\") [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Filter: metrics.timestamp >= TimestampNanosecond(-999000000, None) AND metrics.timestamp < TimestampNanosecond(100000001000000, None) [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
);
let plan = PromPlanner::stmt_to_plan(
DfTableSourceProvider::new(
@@ -6185,10 +6118,9 @@ mod test {
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [true] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n PromSeriesDivide: tags=[\"tag\"] [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Sort: metrics.tag ASC NULLS FIRST, metrics.timestamp ASC NULLS FIRST [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Filter: metrics.tag = Utf8(\"1\") [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Filter: metrics.tag = Utf8(\"1\") AND metrics.timestamp >= TimestampMillisecond(-4999, None) AND metrics.timestamp <= TimestampMillisecond(100000000, None) [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Projection: metrics.field, metrics.tag, CAST(metrics.timestamp AS Timestamp(ms)) AS timestamp [field:Float64;N, tag:Utf8, timestamp:Timestamp(ms)]\
\n Filter: metrics.timestamp >= TimestampNanosecond(-4999000000, None) AND metrics.timestamp < TimestampNanosecond(100000001000000, None) [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]\
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(ns), field:Float64;N]"
);
}