fix: prom ql logical plan use column index not name (#7109)

* feat: use index not col name

Signed-off-by: discord9 <discord9@163.com>

* fix: use name without qualifier&output schema fix

Signed-off-by: discord9 <discord9@163.com>

* proto

Signed-off-by: discord9 <discord9@163.com>

* refactor: resolve column name/index

Signed-off-by: discord9 <discord9@163.com>

* pcr

Signed-off-by: discord9 <discord9@163.com>

* chore: update proto

Signed-off-by: discord9 <discord9@163.com>

* chore: update proto

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-10-22 17:04:09 +08:00
committed by GitHub
parent 41ce100624
commit a9a3e0b121
11 changed files with 576 additions and 110 deletions

2
Cargo.lock generated
View File

@@ -5328,7 +5328,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=69a6089933daa573c96808ec4bbc48f447ec6e8c#69a6089933daa573c96808ec4bbc48f447ec6e8c" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=72a0d22e0f5f716b2ee21bca091f87a88c36e5ca#72a0d22e0f5f716b2ee21bca091f87a88c36e5ca"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"prost-types 0.13.5", "prost-types 0.13.5",

View File

@@ -147,7 +147,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "69a6089933daa573c96808ec4bbc48f447ec6e8c" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "72a0d22e0f5f716b2ee21bca091f87a88c36e5ca" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -27,6 +27,8 @@ mod union_distinct_on;
pub use absent::{Absent, AbsentExec, AbsentStream}; pub use absent::{Absent, AbsentExec, AbsentStream};
use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType};
use datafusion::common::DFSchemaRef;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream, build_special_time_expr}; pub use empty_metric::{EmptyMetric, EmptyMetricExec, EmptyMetricStream, build_special_time_expr};
pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream}; pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream};
pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream}; pub use instant_manipulate::{InstantManipulate, InstantManipulateExec, InstantManipulateStream};
@@ -40,3 +42,44 @@ pub use union_distinct_on::{UnionDistinctOn, UnionDistinctOnExec, UnionDistinctO
pub type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native; pub type Millisecond = <TimestampMillisecondType as ArrowPrimitiveType>::Native;
const METRIC_NUM_SERIES: &str = "num_series"; const METRIC_NUM_SERIES: &str = "num_series";
/// Utilities for handling unfix logic in extension plans
/// Convert column name to index for serialization
pub fn serialize_column_index(schema: &DFSchemaRef, column_name: &str) -> u64 {
schema
.index_of_column_by_name(None, column_name)
.map(|idx| idx as u64)
.unwrap_or(u64::MAX) // make sure if not found, it will report error in deserialization
}
/// Convert index back to column name for deserialization
pub fn resolve_column_name(
index: u64,
schema: &DFSchemaRef,
context: &str,
column_type: &str,
) -> DataFusionResult<String> {
let columns = schema.columns();
columns
.get(index as usize)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Failed to get {} column at idx {} during unfixing {} with columns:{:?}",
column_type, index, context, columns
))
})
.map(|field| field.name().to_string())
}
/// Batch process multiple column indices
pub fn resolve_column_names(
indices: &[u64],
schema: &DFSchemaRef,
context: &str,
column_type: &str,
) -> DataFusionResult<Vec<String>> {
indices
.iter()
.map(|idx| resolve_column_name(*idx, schema, context, column_type))
.collect()
}

View File

@@ -47,7 +47,7 @@ use prost::Message;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::DeserializeSnafu; use crate::error::DeserializeSnafu;
use crate::extension_plan::Millisecond; use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
/// Maximum number of rows per output batch /// Maximum number of rows per output batch
const ABSENT_BATCH_SIZE: usize = 8192; const ABSENT_BATCH_SIZE: usize = 8192;
@@ -62,6 +62,13 @@ pub struct Absent {
fake_labels: Vec<(String, String)>, fake_labels: Vec<(String, String)>,
input: LogicalPlan, input: LogicalPlan,
output_schema: DFSchemaRef, output_schema: DFSchemaRef,
unfix: Option<UnfixIndices>,
}
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
struct UnfixIndices {
pub time_index_column_idx: u64,
pub value_column_idx: u64,
} }
impl PartialOrd for Absent { impl PartialOrd for Absent {
@@ -122,16 +129,44 @@ impl UserDefinedLogicalNodeCore for Absent {
)); ));
} }
Ok(Self { let input: LogicalPlan = inputs[0].clone();
start: self.start, let input_schema = input.schema();
end: self.end,
step: self.step, if let Some(unfix) = &self.unfix {
time_index_column: self.time_index_column.clone(), // transform indices to names
value_column: self.value_column.clone(), let time_index_column = resolve_column_name(
fake_labels: self.fake_labels.clone(), unfix.time_index_column_idx,
input: inputs[0].clone(), input_schema,
output_schema: self.output_schema.clone(), "Absent",
}) "time index",
)?;
let value_column =
resolve_column_name(unfix.value_column_idx, input_schema, "Absent", "value")?;
// Recreate output schema with actual field names
Self::try_new(
self.start,
self.end,
self.step,
time_index_column,
value_column,
self.fake_labels.clone(),
input,
)
} else {
Ok(Self {
start: self.start,
end: self.end,
step: self.step,
time_index_column: self.time_index_column.clone(),
value_column: self.value_column.clone(),
fake_labels: self.fake_labels.clone(),
input,
output_schema: self.output_schema.clone(),
unfix: None,
})
}
} }
} }
@@ -179,6 +214,7 @@ impl Absent {
fake_labels, fake_labels,
input, input,
output_schema, output_schema,
unfix: None,
}) })
} }
@@ -209,12 +245,17 @@ impl Absent {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
let time_index_column_idx =
serialize_column_index(self.input.schema(), &self.time_index_column);
let value_column_idx = serialize_column_index(self.input.schema(), &self.value_column);
pb::Absent { pb::Absent {
start: self.start, start: self.start,
end: self.end, end: self.end,
step: self.step, step: self.step,
time_index_column: self.time_index_column.clone(), time_index_column_idx,
value_column: self.value_column.clone(), value_column_idx,
fake_labels: self fake_labels: self
.fake_labels .fake_labels
.iter() .iter()
@@ -223,6 +264,7 @@ impl Absent {
value: value.clone(), value: value.clone(),
}) })
.collect(), .collect(),
..Default::default()
} }
.encode_to_vec() .encode_to_vec()
} }
@@ -233,19 +275,27 @@ impl Absent {
produce_one_row: false, produce_one_row: false,
schema: Arc::new(DFSchema::empty()), schema: Arc::new(DFSchema::empty()),
}); });
Self::try_new(
pb_absent.start, let unfix = UnfixIndices {
pb_absent.end, time_index_column_idx: pb_absent.time_index_column_idx,
pb_absent.step, value_column_idx: pb_absent.value_column_idx,
pb_absent.time_index_column, };
pb_absent.value_column,
pb_absent Ok(Self {
start: pb_absent.start,
end: pb_absent.end,
step: pb_absent.step,
time_index_column: String::new(),
value_column: String::new(),
fake_labels: pb_absent
.fake_labels .fake_labels
.iter() .iter()
.map(|label| (label.key.clone(), label.value.clone())) .map(|label| (label.key.clone(), label.value.clone()))
.collect(), .collect(),
placeholder_plan, input: placeholder_plan,
) output_schema: Arc::new(DFSchema::empty()),
unfix: Some(unfix),
})
} }
} }

View File

@@ -41,7 +41,9 @@ use prost::Message;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result}; use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond}; use crate::extension_plan::{
METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
};
use crate::metrics::PROMQL_SERIES_COUNT; use crate::metrics::PROMQL_SERIES_COUNT;
/// Manipulate the input record batch to make it suitable for Instant Operator. /// Manipulate the input record batch to make it suitable for Instant Operator.
@@ -59,6 +61,13 @@ pub struct InstantManipulate {
/// A optional column for validating staleness /// A optional column for validating staleness
field_column: Option<String>, field_column: Option<String>,
input: LogicalPlan, input: LogicalPlan,
unfix: Option<UnfixIndices>,
}
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
struct UnfixIndices {
pub time_index_idx: u64,
pub field_index_idx: u64,
} }
impl UserDefinedLogicalNodeCore for InstantManipulate { impl UserDefinedLogicalNodeCore for InstantManipulate {
@@ -97,15 +106,51 @@ impl UserDefinedLogicalNodeCore for InstantManipulate {
)); ));
} }
Ok(Self { let input: LogicalPlan = inputs.into_iter().next().unwrap();
start: self.start, let input_schema = input.schema();
end: self.end,
lookback_delta: self.lookback_delta, if let Some(unfix) = &self.unfix {
interval: self.interval, // transform indices to names
time_index_column: self.time_index_column.clone(), let time_index_column = resolve_column_name(
field_column: self.field_column.clone(), unfix.time_index_idx,
input: inputs.into_iter().next().unwrap(), input_schema,
}) "InstantManipulate",
"time index",
)?;
let field_column = if unfix.field_index_idx == u64::MAX {
None
} else {
Some(resolve_column_name(
unfix.field_index_idx,
input_schema,
"InstantManipulate",
"field",
)?)
};
Ok(Self {
start: self.start,
end: self.end,
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column,
field_column,
input,
unfix: None,
})
} else {
Ok(Self {
start: self.start,
end: self.end,
lookback_delta: self.lookback_delta,
interval: self.interval,
time_index_column: self.time_index_column.clone(),
field_column: self.field_column.clone(),
input,
unfix: None,
})
}
} }
} }
@@ -127,6 +172,7 @@ impl InstantManipulate {
time_index_column, time_index_column,
field_column, field_column,
input, input,
unfix: None,
} }
} }
@@ -148,13 +194,22 @@ impl InstantManipulate {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
let field_index_idx = self
.field_column
.as_ref()
.map(|name| serialize_column_index(self.input.schema(), name))
.unwrap_or(u64::MAX);
pb::InstantManipulate { pb::InstantManipulate {
start: self.start, start: self.start,
end: self.end, end: self.end,
interval: self.interval, interval: self.interval,
lookback_delta: self.lookback_delta, lookback_delta: self.lookback_delta,
time_index: self.time_index_column.clone(), time_index_idx,
field_index: self.field_column.clone().unwrap_or_default(), field_index_idx,
..Default::default()
} }
.encode_to_vec() .encode_to_vec()
} }
@@ -166,19 +221,21 @@ impl InstantManipulate {
produce_one_row: false, produce_one_row: false,
schema: Arc::new(DFSchema::empty()), schema: Arc::new(DFSchema::empty()),
}); });
let field_column = if pb_instant_manipulate.field_index.is_empty() {
None let unfix = UnfixIndices {
} else { time_index_idx: pb_instant_manipulate.time_index_idx,
Some(pb_instant_manipulate.field_index) field_index_idx: pb_instant_manipulate.field_index_idx,
}; };
Ok(Self { Ok(Self {
start: pb_instant_manipulate.start, start: pb_instant_manipulate.start,
end: pb_instant_manipulate.end, end: pb_instant_manipulate.end,
lookback_delta: pb_instant_manipulate.lookback_delta, lookback_delta: pb_instant_manipulate.lookback_delta,
interval: pb_instant_manipulate.interval, interval: pb_instant_manipulate.interval,
time_index_column: pb_instant_manipulate.time_index, time_index_column: String::new(),
field_column, field_column: None,
input: placeholder_plan, input: placeholder_plan,
unfix: Some(unfix),
}) })
} }
} }

View File

@@ -40,7 +40,9 @@ use prost::Message;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result}; use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond}; use crate::extension_plan::{
METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
};
use crate::metrics::PROMQL_SERIES_COUNT; use crate::metrics::PROMQL_SERIES_COUNT;
/// Normalize the input record batch. Notice that for simplicity, this method assumes /// Normalize the input record batch. Notice that for simplicity, this method assumes
@@ -58,6 +60,13 @@ pub struct SeriesNormalize {
tag_columns: Vec<String>, tag_columns: Vec<String>,
input: LogicalPlan, input: LogicalPlan,
unfix: Option<UnfixIndices>,
}
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
struct UnfixIndices {
pub time_index_idx: u64,
pub tag_column_indices: Vec<u64>,
} }
impl UserDefinedLogicalNodeCore for SeriesNormalize { impl UserDefinedLogicalNodeCore for SeriesNormalize {
@@ -96,13 +105,42 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
)); ));
} }
Ok(Self { let input: LogicalPlan = inputs.into_iter().next().unwrap();
offset: self.offset, let input_schema = input.schema();
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan, if let Some(unfix) = &self.unfix {
input: inputs.into_iter().next().unwrap(), // transform indices to names
tag_columns: self.tag_columns.clone(), let time_index_column_name = resolve_column_name(
}) unfix.time_index_idx,
input_schema,
"SeriesNormalize",
"time index",
)?;
let tag_columns = unfix
.tag_column_indices
.iter()
.map(|idx| resolve_column_name(*idx, input_schema, "SeriesNormalize", "tag"))
.collect::<DataFusionResult<Vec<String>>>()?;
Ok(Self {
offset: self.offset,
time_index_column_name,
need_filter_out_nan: self.need_filter_out_nan,
tag_columns,
input,
unfix: None,
})
} else {
Ok(Self {
offset: self.offset,
time_index_column_name: self.time_index_column_name.clone(),
need_filter_out_nan: self.need_filter_out_nan,
tag_columns: self.tag_columns.clone(),
input,
unfix: None,
})
}
} }
} }
@@ -120,6 +158,7 @@ impl SeriesNormalize {
need_filter_out_nan, need_filter_out_nan,
tag_columns, tag_columns,
input, input,
unfix: None,
} }
} }
@@ -139,11 +178,21 @@ impl SeriesNormalize {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
let time_index_idx =
serialize_column_index(self.input.schema(), &self.time_index_column_name);
let tag_column_indices = self
.tag_columns
.iter()
.map(|name| serialize_column_index(self.input.schema(), name))
.collect::<Vec<u64>>();
pb::SeriesNormalize { pb::SeriesNormalize {
offset: self.offset, offset: self.offset,
time_index: self.time_index_column_name.clone(), time_index_idx,
filter_nan: self.need_filter_out_nan, filter_nan: self.need_filter_out_nan,
tag_columns: self.tag_columns.clone(), tag_column_indices,
..Default::default()
} }
.encode_to_vec() .encode_to_vec()
} }
@@ -154,13 +203,20 @@ impl SeriesNormalize {
produce_one_row: false, produce_one_row: false,
schema: Arc::new(DFSchema::empty()), schema: Arc::new(DFSchema::empty()),
}); });
Ok(Self::new(
pb_normalize.offset, let unfix = UnfixIndices {
pb_normalize.time_index, time_index_idx: pb_normalize.time_index_idx,
pb_normalize.filter_nan, tag_column_indices: pb_normalize.tag_column_indices.clone(),
pb_normalize.tag_columns, };
placeholder_plan,
)) Ok(Self {
offset: pb_normalize.offset,
time_index_column_name: String::new(),
need_filter_out_nan: pb_normalize.filter_nan,
tag_columns: Vec::new(),
input: placeholder_plan,
unfix: Some(unfix),
})
} }
} }

View File

@@ -18,6 +18,7 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use common_telemetry::debug;
use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray}; use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
use datafusion::arrow::compute; use datafusion::arrow::compute;
use datafusion::arrow::datatypes::{Field, SchemaRef}; use datafusion::arrow::datatypes::{Field, SchemaRef};
@@ -43,7 +44,9 @@ use prost::Message;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result}; use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond}; use crate::extension_plan::{
METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
};
use crate::metrics::PROMQL_SERIES_COUNT; use crate::metrics::PROMQL_SERIES_COUNT;
use crate::range_array::RangeArray; use crate::range_array::RangeArray;
@@ -62,11 +65,17 @@ pub struct RangeManipulate {
end: Millisecond, end: Millisecond,
interval: Millisecond, interval: Millisecond,
range: Millisecond, range: Millisecond,
time_index: String, time_index: String,
field_columns: Vec<String>, field_columns: Vec<String>,
input: LogicalPlan, input: LogicalPlan,
output_schema: DFSchemaRef, output_schema: DFSchemaRef,
unfix: Option<UnfixIndices>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct UnfixIndices {
pub time_index_idx: u64,
pub tag_column_indices: Vec<u64>,
} }
impl RangeManipulate { impl RangeManipulate {
@@ -90,6 +99,7 @@ impl RangeManipulate {
field_columns, field_columns,
input, input,
output_schema, output_schema,
unfix: None,
}) })
} }
@@ -181,13 +191,22 @@ impl RangeManipulate {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
let tag_column_indices = self
.field_columns
.iter()
.map(|name| serialize_column_index(self.input.schema(), name))
.collect::<Vec<u64>>();
pb::RangeManipulate { pb::RangeManipulate {
start: self.start, start: self.start,
end: self.end, end: self.end,
interval: self.interval, interval: self.interval,
range: self.range, range: self.range,
time_index: self.time_index.clone(), time_index_idx,
tag_columns: self.field_columns.clone(), tag_column_indices,
..Default::default()
} }
.encode_to_vec() .encode_to_vec()
} }
@@ -200,6 +219,12 @@ impl RangeManipulate {
schema: empty_schema.clone(), schema: empty_schema.clone(),
}); });
let unfix = UnfixIndices {
time_index_idx: pb_range_manipulate.time_index_idx,
tag_column_indices: pb_range_manipulate.tag_column_indices.clone(),
};
debug!("RangeManipulate deserialize unfix: {:?}", unfix);
// Unlike `Self::new()`, this method doesn't check the input schema as it will fail // Unlike `Self::new()`, this method doesn't check the input schema as it will fail
// because the input schema is empty. // because the input schema is empty.
// But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
@@ -209,10 +234,11 @@ impl RangeManipulate {
end: pb_range_manipulate.end, end: pb_range_manipulate.end,
interval: pb_range_manipulate.interval, interval: pb_range_manipulate.interval,
range: pb_range_manipulate.range, range: pb_range_manipulate.range,
time_index: pb_range_manipulate.time_index, time_index: String::new(),
field_columns: pb_range_manipulate.tag_columns, field_columns: Vec::new(),
input: placeholder_plan, input: placeholder_plan,
output_schema: empty_schema, output_schema: empty_schema,
unfix: Some(unfix),
}) })
} }
} }
@@ -286,19 +312,52 @@ impl UserDefinedLogicalNodeCore for RangeManipulate {
let input: LogicalPlan = inputs.pop().unwrap(); let input: LogicalPlan = inputs.pop().unwrap();
let input_schema = input.schema(); let input_schema = input.schema();
let output_schema =
Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
Ok(Self { if let Some(unfix) = &self.unfix {
start: self.start, // transform indices to names
end: self.end, let time_index = resolve_column_name(
interval: self.interval, unfix.time_index_idx,
range: self.range, input_schema,
time_index: self.time_index.clone(), "RangeManipulate",
field_columns: self.field_columns.clone(), "time index",
input, )?;
output_schema,
}) let field_columns = unfix
.tag_column_indices
.iter()
.map(|idx| resolve_column_name(*idx, input_schema, "RangeManipulate", "tag"))
.collect::<DataFusionResult<Vec<String>>>()?;
let output_schema =
Self::calculate_output_schema(input_schema, &time_index, &field_columns)?;
Ok(Self {
start: self.start,
end: self.end,
interval: self.interval,
range: self.range,
time_index,
field_columns,
input,
output_schema,
unfix: None,
})
} else {
let output_schema =
Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
Ok(Self {
start: self.start,
end: self.end,
interval: self.interval,
range: self.range,
time_index: self.time_index.clone(),
field_columns: self.field_columns.clone(),
input,
output_schema,
unfix: None,
})
}
} }
} }

View File

@@ -41,7 +41,7 @@ use prost::Message;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result}; use crate::error::{ColumnNotFoundSnafu, DataFusionPlanningSnafu, DeserializeSnafu, Result};
use crate::extension_plan::Millisecond; use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index};
/// `ScalarCalculate` is the custom logical plan to calculate /// `ScalarCalculate` is the custom logical plan to calculate
/// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar) /// [`scalar`](https://prometheus.io/docs/prometheus/latest/querying/functions/#scalar)
@@ -59,6 +59,14 @@ pub struct ScalarCalculate {
field_column: String, field_column: String,
input: LogicalPlan, input: LogicalPlan,
output_schema: DFSchemaRef, output_schema: DFSchemaRef,
unfix: Option<UnfixIndices>,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd)]
struct UnfixIndices {
pub time_index_idx: u64,
pub tag_column_indices: Vec<u64>,
pub field_column_idx: u64,
} }
impl ScalarCalculate { impl ScalarCalculate {
@@ -101,6 +109,7 @@ impl ScalarCalculate {
field_column: field_column.to_string(), field_column: field_column.to_string(),
input, input,
output_schema: Arc::new(schema), output_schema: Arc::new(schema),
unfix: None,
}) })
} }
@@ -149,13 +158,24 @@ impl ScalarCalculate {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
let tag_column_indices = self
.tag_columns
.iter()
.map(|name| serialize_column_index(self.input.schema(), name))
.collect::<Vec<u64>>();
let field_column_idx = serialize_column_index(self.input.schema(), &self.field_column);
pb::ScalarCalculate { pb::ScalarCalculate {
start: self.start, start: self.start,
end: self.end, end: self.end,
interval: self.interval, interval: self.interval,
time_index: self.time_index.clone(), time_index_idx,
tag_columns: self.tag_columns.clone(), tag_column_indices,
field_column: self.field_column.clone(), field_column_idx,
..Default::default()
} }
.encode_to_vec() .encode_to_vec()
} }
@@ -166,17 +186,20 @@ impl ScalarCalculate {
produce_one_row: false, produce_one_row: false,
schema: Arc::new(DFSchema::empty()), schema: Arc::new(DFSchema::empty()),
}); });
let unfix = UnfixIndices {
time_index_idx: pb_scalar_calculate.time_index_idx,
tag_column_indices: pb_scalar_calculate.tag_column_indices.clone(),
field_column_idx: pb_scalar_calculate.field_column_idx,
};
// TODO(Taylor-lagrange): Supports timestamps of different precisions // TODO(Taylor-lagrange): Supports timestamps of different precisions
let ts_field = Field::new( let ts_field = Field::new(
&pb_scalar_calculate.time_index, "placeholder_time_index",
DataType::Timestamp(TimeUnit::Millisecond, None), DataType::Timestamp(TimeUnit::Millisecond, None),
true, true,
); );
let val_field = Field::new( let val_field = Field::new("placeholder_field", DataType::Float64, true);
format!("scalar({})", pb_scalar_calculate.field_column),
DataType::Float64,
true,
);
// TODO(Taylor-lagrange): missing tablename in pb // TODO(Taylor-lagrange): missing tablename in pb
let schema = DFSchema::new_with_metadata( let schema = DFSchema::new_with_metadata(
vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))], vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
@@ -188,11 +211,12 @@ impl ScalarCalculate {
start: pb_scalar_calculate.start, start: pb_scalar_calculate.start,
end: pb_scalar_calculate.end, end: pb_scalar_calculate.end,
interval: pb_scalar_calculate.interval, interval: pb_scalar_calculate.interval,
time_index: pb_scalar_calculate.time_index, time_index: String::new(),
tag_columns: pb_scalar_calculate.tag_columns, tag_columns: Vec::new(),
field_column: pb_scalar_calculate.field_column, field_column: String::new(),
output_schema: Arc::new(schema), output_schema: Arc::new(schema),
input: placeholder_plan, input: placeholder_plan,
unfix: Some(unfix),
}) })
} }
} }
@@ -259,16 +283,70 @@ impl UserDefinedLogicalNodeCore for ScalarCalculate {
"ScalarCalculate should not have any expressions".to_string(), "ScalarCalculate should not have any expressions".to_string(),
)); ));
} }
Ok(ScalarCalculate {
start: self.start, let input: LogicalPlan = inputs.into_iter().next().unwrap();
end: self.end, let input_schema = input.schema();
interval: self.interval,
time_index: self.time_index.clone(), if let Some(unfix) = &self.unfix {
tag_columns: self.tag_columns.clone(), // transform indices to names
field_column: self.field_column.clone(), let time_index = resolve_column_name(
input: inputs.into_iter().next().unwrap(), unfix.time_index_idx,
output_schema: self.output_schema.clone(), input_schema,
}) "ScalarCalculate",
"time index",
)?;
let tag_columns = unfix
.tag_column_indices
.iter()
.map(|idx| resolve_column_name(*idx, input_schema, "ScalarCalculate", "tag"))
.collect::<DataFusionResult<Vec<String>>>()?;
let field_column = resolve_column_name(
unfix.field_column_idx,
input_schema,
"ScalarCalculate",
"field",
)?;
// Recreate output schema with actual field names
let ts_field = Field::new(
&time_index,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
);
let val_field =
Field::new(format!("scalar({})", field_column), DataType::Float64, true);
let schema = DFSchema::new_with_metadata(
vec![(None, Arc::new(ts_field)), (None, Arc::new(val_field))],
HashMap::new(),
)
.context(DataFusionPlanningSnafu)?;
Ok(ScalarCalculate {
start: self.start,
end: self.end,
interval: self.interval,
time_index,
tag_columns,
field_column,
input,
output_schema: Arc::new(schema),
unfix: None,
})
} else {
Ok(ScalarCalculate {
start: self.start,
end: self.end,
interval: self.interval,
time_index: self.time_index.clone(),
tag_columns: self.tag_columns.clone(),
field_column: self.field_column.clone(),
input,
output_schema: self.output_schema.clone(),
unfix: None,
})
}
} }
} }

View File

@@ -41,7 +41,7 @@ use prost::Message;
use snafu::ResultExt; use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result}; use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::METRIC_NUM_SERIES; use crate::extension_plan::{METRIC_NUM_SERIES, resolve_column_name, serialize_column_index};
use crate::metrics::PROMQL_SERIES_COUNT; use crate::metrics::PROMQL_SERIES_COUNT;
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)] #[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
@@ -53,6 +53,13 @@ pub struct SeriesDivide {
/// here can avoid unnecessary sort in follow on plans. /// here can avoid unnecessary sort in follow on plans.
time_index_column: String, time_index_column: String,
input: LogicalPlan, input: LogicalPlan,
unfix: Option<UnfixIndices>,
}
#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
struct UnfixIndices {
pub tag_column_indices: Vec<u64>,
pub time_index_column_idx: u64,
} }
impl UserDefinedLogicalNodeCore for SeriesDivide { impl UserDefinedLogicalNodeCore for SeriesDivide {
@@ -87,11 +94,38 @@ impl UserDefinedLogicalNodeCore for SeriesDivide {
)); ));
} }
Ok(Self { let input: LogicalPlan = inputs[0].clone();
tag_columns: self.tag_columns.clone(), let input_schema = input.schema();
time_index_column: self.time_index_column.clone(),
input: inputs[0].clone(), if let Some(unfix) = &self.unfix {
}) // transform indices to names
let tag_columns = unfix
.tag_column_indices
.iter()
.map(|idx| resolve_column_name(*idx, input_schema, "SeriesDivide", "tag"))
.collect::<DataFusionResult<Vec<String>>>()?;
let time_index_column = resolve_column_name(
unfix.time_index_column_idx,
input_schema,
"SeriesDivide",
"time index",
)?;
Ok(Self {
tag_columns,
time_index_column,
input,
unfix: None,
})
} else {
Ok(Self {
tag_columns: self.tag_columns.clone(),
time_index_column: self.time_index_column.clone(),
input,
unfix: None,
})
}
} }
} }
@@ -101,6 +135,7 @@ impl SeriesDivide {
tag_columns, tag_columns,
time_index_column, time_index_column,
input, input,
unfix: None,
} }
} }
@@ -122,9 +157,19 @@ impl SeriesDivide {
} }
pub fn serialize(&self) -> Vec<u8> { pub fn serialize(&self) -> Vec<u8> {
let tag_column_indices = self
.tag_columns
.iter()
.map(|name| serialize_column_index(self.input.schema(), name))
.collect::<Vec<u64>>();
let time_index_column_idx =
serialize_column_index(self.input.schema(), &self.time_index_column);
pb::SeriesDivide { pb::SeriesDivide {
tag_columns: self.tag_columns.clone(), tag_column_indices,
time_index_column: self.time_index_column.clone(), time_index_column_idx,
..Default::default()
} }
.encode_to_vec() .encode_to_vec()
} }
@@ -135,10 +180,17 @@ impl SeriesDivide {
produce_one_row: false, produce_one_row: false,
schema: Arc::new(DFSchema::empty()), schema: Arc::new(DFSchema::empty()),
}); });
let unfix = UnfixIndices {
tag_column_indices: pb_series_divide.tag_column_indices.clone(),
time_index_column_idx: pb_series_divide.time_index_column_idx,
};
Ok(Self { Ok(Self {
tag_columns: pb_series_divide.tag_columns, tag_columns: Vec::new(),
time_index_column: pb_series_divide.time_index_column, time_index_column: String::new(),
input: placeholder_plan, input: placeholder_plan,
unfix: Some(unfix),
}) })
} }
} }

View File

@@ -0,0 +1,49 @@
create table count_total (
ts timestamp time index,
tag_a string,
tag_b string,
val double,
primary key (tag_a, tag_b),
);
Affected Rows: 0
-- if `RangeManipulate` can be encoded/decoded correctly in substrait, the following queries should pass
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
tql explain (0, 100, '1s')
increase(count_total{
tag_a="ffa",
}[1h])[12h:1h];
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false, remote_input=[ |
| | PromRangeManipulate: req range=[0..0], interval=[300000], eval range=[43200000], time index=[ts], values=["prom_increase(ts_range,val,ts,Int64(3600000))"] |
| | Filter: prom_increase(ts_range,val,ts,Int64(3600000)) IS NOT NULL |
| | Projection: count_total.ts, prom_increase(ts_range, val, count_total.ts, Int64(3600000)) AS prom_increase(ts_range,val,ts,Int64(3600000)), count_total.tag_a, count_total.tag_b |
| | PromRangeManipulate: req range=[-39600000..0], interval=[3600000], eval range=[3600000], time index=[ts], values=["val"] |
| | PromSeriesNormalize: offset=[0], time index=[ts], filter NaN: [true] |
| | PromSeriesDivide: tags=["tag_a", "tag_b"] |
| | Sort: count_total.tag_a ASC NULLS FIRST, count_total.tag_b ASC NULLS FIRST, count_total.ts ASC NULLS FIRST |
| | Filter: count_total.tag_a = Utf8("ffa") AND count_total.ts >= TimestampMillisecond(-43500000, None) AND count_total.ts <= TimestampMillisecond(300000, None) |
| | TableScan: count_total |
| | ]] |
| physical_plan | CooperativeExec |
| | MergeScanExec: REDACTED
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
tql eval (0, 100, '1s')
increase(count_total{
tag_a="ffa",
}[1h])[12h:1h];
++
++
drop table count_total;
Affected Rows: 0

View File

@@ -0,0 +1,22 @@
create table count_total (
ts timestamp time index,
tag_a string,
tag_b string,
val double,
primary key (tag_a, tag_b),
);
-- if `RangeManipulate` can be encoded/decoded correctly in substrait, the following queries should pass
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (partitioning.*) REDACTED
tql explain (0, 100, '1s')
increase(count_total{
tag_a="ffa",
}[1h])[12h:1h];
tql eval (0, 100, '1s')
increase(count_total{
tag_a="ffa",
}[1h])[12h:1h];
drop table count_total;