mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-30 11:50:38 +00:00
fix: qualify inputs on handling join in promql (#2297)
* add qualifier to join inputs Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * add one more case Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * update test results Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -142,11 +142,7 @@ impl PromPlanner {
|
||||
|
||||
// calculate columns to group by
|
||||
// Need to append time index column into group by columns
|
||||
let group_exprs = modifier
|
||||
.as_ref()
|
||||
.map_or(Ok(vec![self.create_time_index_column_expr()?]), |m| {
|
||||
self.agg_modifier_to_col(input.schema(), m)
|
||||
})?;
|
||||
let group_exprs = self.agg_modifier_to_col(input.schema(), modifier)?;
|
||||
|
||||
// convert op and value columns to aggregate exprs
|
||||
let aggr_exprs = self.create_aggregate_exprs(*op, &input)?;
|
||||
@@ -261,24 +257,34 @@ impl PromPlanner {
|
||||
(None, None) => {
|
||||
let left_input = self.prom_expr_to_plan(*lhs.clone()).await?;
|
||||
let left_field_columns = self.ctx.field_columns.clone();
|
||||
let left_schema = left_input.schema().clone();
|
||||
let left_table_ref: OwnedTableReference =
|
||||
self.ctx.table_name.clone().unwrap_or_default().into();
|
||||
|
||||
let right_input = self.prom_expr_to_plan(*rhs.clone()).await?;
|
||||
let right_field_columns = self.ctx.field_columns.clone();
|
||||
let right_schema = right_input.schema().clone();
|
||||
let right_table_ref: OwnedTableReference =
|
||||
self.ctx.table_name.clone().unwrap_or_default().into();
|
||||
|
||||
// TODO(ruihang): avoid join if left and right are the same table
|
||||
|
||||
let mut field_columns =
|
||||
left_field_columns.iter().zip(right_field_columns.iter());
|
||||
// the new ctx.field_columns for the generated join plan
|
||||
let join_plan = self.join_on_non_field_columns(left_input, right_input)?;
|
||||
let join_plan = self.join_on_non_field_columns(
|
||||
left_input,
|
||||
right_input,
|
||||
left_table_ref.clone(),
|
||||
right_table_ref.clone(),
|
||||
)?;
|
||||
let join_plan_schema = join_plan.schema().clone();
|
||||
|
||||
let bin_expr_builder = |_: &String| {
|
||||
let (left_col_name, right_col_name) = field_columns.next().unwrap();
|
||||
let left_col = left_schema
|
||||
.field_with_name(None, left_col_name)
|
||||
let left_col = join_plan_schema
|
||||
.field_with_name(Some(&left_table_ref), left_col_name)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.qualified_column();
|
||||
let right_col = right_schema
|
||||
.field_with_name(None, right_col_name)
|
||||
let right_col = join_plan_schema
|
||||
.field_with_name(Some(&right_table_ref), right_col_name)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.qualified_column();
|
||||
|
||||
@@ -681,10 +687,14 @@ impl PromPlanner {
|
||||
fn agg_modifier_to_col(
|
||||
&mut self,
|
||||
input_schema: &DFSchemaRef,
|
||||
modifier: &LabelModifier,
|
||||
modifier: &Option<LabelModifier>,
|
||||
) -> Result<Vec<DfExpr>> {
|
||||
match modifier {
|
||||
LabelModifier::Include(labels) => {
|
||||
None => {
|
||||
self.ctx.tag_columns = vec![];
|
||||
Ok(vec![self.create_time_index_column_expr()?])
|
||||
}
|
||||
Some(LabelModifier::Include(labels)) => {
|
||||
let mut exprs = Vec::with_capacity(labels.labels.len());
|
||||
for label in &labels.labels {
|
||||
// nonexistence label will be ignored
|
||||
@@ -701,7 +711,7 @@ impl PromPlanner {
|
||||
|
||||
Ok(exprs)
|
||||
}
|
||||
LabelModifier::Exclude(labels) => {
|
||||
Some(LabelModifier::Exclude(labels)) => {
|
||||
let mut all_fields = input_schema
|
||||
.fields()
|
||||
.iter()
|
||||
@@ -1225,6 +1235,8 @@ impl PromPlanner {
|
||||
&self,
|
||||
left: LogicalPlan,
|
||||
right: LogicalPlan,
|
||||
left_table_ref: OwnedTableReference,
|
||||
right_table_ref: OwnedTableReference,
|
||||
) -> Result<LogicalPlan> {
|
||||
let mut tag_columns = self
|
||||
.ctx
|
||||
@@ -1238,8 +1250,16 @@ impl PromPlanner {
|
||||
tag_columns.push(Column::from_name(time_index_column));
|
||||
}
|
||||
|
||||
let right = LogicalPlanBuilder::from(right)
|
||||
.alias(right_table_ref)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.build()
|
||||
.context(DataFusionPlanningSnafu)?;
|
||||
|
||||
// Inner Join on time index column to concat two operator
|
||||
LogicalPlanBuilder::from(left)
|
||||
.alias(left_table_ref)
|
||||
.context(DataFusionPlanningSnafu)?
|
||||
.join(
|
||||
right,
|
||||
JoinType::Inner,
|
||||
@@ -1810,18 +1830,20 @@ mod test {
|
||||
let expected = String::from(
|
||||
"Projection: some_metric.tag_0, some_metric.timestamp, some_metric.field_0 + some_metric.field_0 AS some_metric.field_0 + some_metric.field_0 [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), some_metric.field_0 + some_metric.field_0:Float64;N]\
|
||||
\n Inner Join: some_metric.tag_0 = some_metric.tag_0, some_metric.timestamp = some_metric.timestamp [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"foo\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"foo\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n SubqueryAlias: some_metric [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n PromSeriesDivide: tags=[\"tag_0\"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Sort: some_metric.tag_0 DESC NULLS LAST, some_metric.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n Filter: some_metric.tag_0 = Utf8(\"bar\") [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]\
|
||||
\n TableScan: some_metric, unsupported_filters=[tag_0 = Utf8(\"bar\"), timestamp >= TimestampMillisecond(-1000, None), timestamp <= TimestampMillisecond(100001000, None)] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"
|
||||
);
|
||||
|
||||
assert_eq!(plan.display_indent_schema().to_string(), expected);
|
||||
|
||||
Reference in New Issue
Block a user