diff --git a/Cargo.lock b/Cargo.lock index 8bae194f43..2a185231ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,7 +5158,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=96c733f8472284d3c83a4c011dc6de9cf830c353#96c733f8472284d3c83a4c011dc6de9cf830c353" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a5d256ba4abb7393e0859ffbf7fca1e38f3433dc#a5d256ba4abb7393e0859ffbf7fca1e38f3433dc" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index ffe5b9d282..8ba667e562 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,7 +134,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "96c733f8472284d3c83a4c011dc6de9cf830c353" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a5d256ba4abb7393e0859ffbf7fca1e38f3433dc" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/substrait/src/extension_serializer.rs b/src/common/substrait/src/extension_serializer.rs index 48d86e2c59..bfae6e881f 100644 --- a/src/common/substrait/src/extension_serializer.rs +++ b/src/common/substrait/src/extension_serializer.rs @@ -19,7 +19,8 @@ use datafusion::execution::registry::SerializerRegistry; use datafusion_common::DataFusionError; use datafusion_expr::UserDefinedLogicalNode; use promql::extension_plan::{ - EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, + Absent, EmptyMetric, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, + SeriesNormalize, }; #[derive(Debug)] @@ -65,6 +66,13 @@ impl SerializerRegistry for ExtensionSerializer { .expect("Failed to downcast to SeriesDivide"); Ok(series_divide.serialize()) } + name if name == Absent::name() => { + let absent = node + .as_any() + .downcast_ref::() + .expect("Failed to downcast to Absent"); + Ok(absent.serialize()) + } name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( "EmptyMetric should not be serialized".to_string(), )), @@ -103,6 +111,10 @@ impl SerializerRegistry for ExtensionSerializer { let scalar_calculate = ScalarCalculate::deserialize(bytes)?; Ok(Arc::new(scalar_calculate)) } + name if name == Absent::name() => { + let absent = Absent::deserialize(bytes)?; + Ok(Arc::new(absent)) + } name if name == EmptyMetric::name() => Err(DataFusionError::Substrait( "EmptyMetric should not be deserialized".to_string(), )), diff --git a/src/promql/src/extension_plan.rs b/src/promql/src/extension_plan.rs index c526676d14..dbb83eb9f8 100644 --- a/src/promql/src/extension_plan.rs +++ b/src/promql/src/extension_plan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod absent; mod empty_metric; mod histogram_fold; mod instant_manipulate; @@ -24,6 +25,7 @@ mod series_divide; mod test_util; mod union_distinct_on; +pub use absent::{Absent, AbsentExec, AbsentStream}; use datafusion::arrow::datatypes::{ArrowPrimitiveType, TimestampMillisecondType}; pub use empty_metric::{build_special_time_expr, EmptyMetric, EmptyMetricExec, EmptyMetricStream}; pub use histogram_fold::{HistogramFold, HistogramFoldExec, HistogramFoldStream}; diff --git a/src/promql/src/extension_plan/absent.rs b/src/promql/src/extension_plan/absent.rs new file mode 100644 index 0000000000..ff051d244a --- /dev/null +++ b/src/promql/src/extension_plan/absent.rs @@ -0,0 +1,654 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::cmp::Ordering; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::arrow::array::Array; +use datafusion::common::{DFSchemaRef, Result as DataFusionResult}; +use datafusion::execution::context::TaskContext; +use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion::physical_expr::{EquivalenceProperties, LexRequirement, PhysicalSortRequirement}; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::expressions::Column as ColumnExpr; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion_common::DFSchema; +use datafusion_expr::EmptyRelation; +use datatypes::arrow; +use datatypes::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray}; +use datatypes::arrow::datatypes::{DataType, Field, SchemaRef, TimeUnit}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::arrow_array::StringArray; +use datatypes::compute::SortOptions; +use futures::{ready, Stream, StreamExt}; +use greptime_proto::substrait_extension as pb; +use prost::Message; +use snafu::ResultExt; + +use crate::error::DeserializeSnafu; +use crate::extension_plan::Millisecond; + +/// Maximum number of rows per output batch +const ABSENT_BATCH_SIZE: usize = 8192; + +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct Absent { + start: Millisecond, + end: Millisecond, + step: Millisecond, + time_index_column: String, + value_column: String, + fake_labels: Vec<(String, String)>, + input: LogicalPlan, + output_schema: DFSchemaRef, +} + +impl PartialOrd for Absent { + fn partial_cmp(&self, other: &Self) -> Option { + // compare on fields except schema and input + ( + self.start, + self.end, + self.step, + &self.time_index_column, + &self.value_column, + &self.fake_labels, + ) + .partial_cmp(&( + other.start, + other.end, + other.step, + &other.time_index_column, + &other.value_column, + &other.fake_labels, + )) + } +} + +impl UserDefinedLogicalNodeCore for Absent { + fn name(&self) -> &str { + Self::name() + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![&self.input] + } + + fn schema(&self) -> &DFSchemaRef { + &self.output_schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "PromAbsent: start={}, end={}, step={}", + self.start, self.end, self.step + ) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> DataFusionResult { + if inputs.is_empty() { + return Err(datafusion::error::DataFusionError::Internal( + "Absent must have at least one input".to_string(), + )); + } + + Ok(Self { + start: self.start, + end: self.end, + step: self.step, + time_index_column: self.time_index_column.clone(), + value_column: self.value_column.clone(), + fake_labels: self.fake_labels.clone(), + input: inputs[0].clone(), + output_schema: self.output_schema.clone(), + }) + } +} + +impl Absent { + pub fn try_new( + start: Millisecond, + end: Millisecond, + step: Millisecond, + time_index_column: String, + value_column: String, + fake_labels: Vec<(String, String)>, + input: LogicalPlan, + ) -> DataFusionResult { + let mut fields = vec![ + Field::new( + &time_index_column, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new(&value_column, DataType::Float64, true), + ]; + + // remove duplicate fake labels + let mut fake_labels = fake_labels + .into_iter() + .collect::>() + .into_iter() + .collect::>(); + fake_labels.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + for (name, _) in fake_labels.iter() { + fields.push(Field::new(name, DataType::Utf8, true)); + } + + let output_schema = Arc::new(DFSchema::from_unqualified_fields( + fields.into(), + HashMap::new(), + )?); + + Ok(Self { + start, + end, + step, + time_index_column, + value_column, + fake_labels, + input, + output_schema, + }) + } + + pub const fn name() -> &'static str { + "prom_absent" + } + + pub fn to_execution_plan(&self, exec_input: Arc) -> Arc { + let output_schema = Arc::new(self.output_schema.as_arrow().clone()); + let properties = PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ); + Arc::new(AbsentExec { + start: self.start, + end: self.end, + step: self.step, + time_index_column: self.time_index_column.clone(), + value_column: self.value_column.clone(), + fake_labels: self.fake_labels.clone(), + output_schema: output_schema.clone(), + input: exec_input, + properties, + metric: ExecutionPlanMetricsSet::new(), + }) + } + + pub fn serialize(&self) -> Vec { + pb::Absent { + start: self.start, + end: self.end, + step: self.step, + time_index_column: self.time_index_column.clone(), + value_column: self.value_column.clone(), + fake_labels: self + .fake_labels + .iter() + .map(|(name, value)| pb::LabelPair { + key: name.clone(), + value: value.clone(), + }) + .collect(), + } + .encode_to_vec() + } + + pub fn deserialize(bytes: &[u8]) -> DataFusionResult { + let pb_absent = pb::Absent::decode(bytes).context(DeserializeSnafu)?; + let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(DFSchema::empty()), + }); + Self::try_new( + pb_absent.start, + pb_absent.end, + pb_absent.step, + pb_absent.time_index_column, + pb_absent.value_column, + pb_absent + .fake_labels + .iter() + .map(|label| (label.key.clone(), label.value.clone())) + .collect(), + placeholder_plan, + ) + } +} + +#[derive(Debug)] +pub struct AbsentExec { + start: Millisecond, + end: Millisecond, + step: Millisecond, + time_index_column: String, + value_column: String, + fake_labels: Vec<(String, String)>, + output_schema: SchemaRef, + input: Arc, + properties: PlanProperties, + metric: ExecutionPlanMetricsSet, +} + +impl ExecutionPlan for AbsentExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn required_input_distribution(&self) -> Vec { + vec![Distribution::SinglePartition] + } + + fn required_input_ordering(&self) -> Vec> { + vec![Some(LexRequirement::new(vec![PhysicalSortRequirement { + expr: Arc::new( + ColumnExpr::new_with_schema(&self.time_index_column, &self.input.schema()).unwrap(), + ), + options: Some(SortOptions { + descending: false, + nulls_first: false, + }), + }]))] + } + + fn maintains_input_order(&self) -> Vec { + vec![false] + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> DataFusionResult> { + assert!(!children.is_empty()); + Ok(Arc::new(Self { + start: self.start, + end: self.end, + step: self.step, + time_index_column: self.time_index_column.clone(), + value_column: self.value_column.clone(), + fake_labels: self.fake_labels.clone(), + output_schema: self.output_schema.clone(), + input: children[0].clone(), + properties: self.properties.clone(), + metric: self.metric.clone(), + })) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let input = self.input.execute(partition, context)?; + + Ok(Box::pin(AbsentStream { + end: self.end, + step: self.step, + time_index_column_index: self + .input + .schema() + .column_with_name(&self.time_index_column) + .unwrap() // Safety: we have checked the column name in `try_new` + .0, + output_schema: self.output_schema.clone(), + fake_labels: self.fake_labels.clone(), + input, + metric: baseline_metric, + // Buffer for streaming output timestamps + output_timestamps: Vec::new(), + // Current timestamp in the output range + output_ts_cursor: self.start, + input_finished: false, + })) + } + + fn metrics(&self) -> Option { + Some(self.metric.clone_inner()) + } + + fn name(&self) -> &str { + "AbsentExec" + } +} + +impl DisplayAs for AbsentExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "PromAbsentExec: start={}, end={}, step={}", + self.start, self.end, self.step + ) + } + } + } +} + +pub struct AbsentStream { + end: Millisecond, + step: Millisecond, + time_index_column_index: usize, + output_schema: SchemaRef, + fake_labels: Vec<(String, String)>, + input: SendableRecordBatchStream, + metric: BaselineMetrics, + // Buffer for streaming output timestamps + output_timestamps: Vec, + // Current timestamp in the output range + output_ts_cursor: Millisecond, + input_finished: bool, +} + +impl RecordBatchStream for AbsentStream { + fn schema(&self) -> SchemaRef { + self.output_schema.clone() + } +} + +impl Stream for AbsentStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if !self.input_finished { + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = std::time::Instant::now(); + if let Err(e) = self.process_input_batch(&batch) { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + + // If we have enough data for a batch, output it + if self.output_timestamps.len() >= ABSENT_BATCH_SIZE { + let timer = std::time::Instant::now(); + let result = self.flush_output_batch(); + self.metric.elapsed_compute().add_elapsed(timer); + + match result { + Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), + Ok(None) => continue, + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + self.input_finished = true; + + let timer = std::time::Instant::now(); + // Process any remaining absent timestamps + if let Err(e) = self.process_remaining_absent_timestamps() { + return Poll::Ready(Some(Err(e))); + } + let result = self.flush_output_batch(); + self.metric.elapsed_compute().add_elapsed(timer); + return Poll::Ready(result.transpose()); + } + } + } else { + return Poll::Ready(None); + } + } + } +} + +impl AbsentStream { + fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> { + // Extract timestamps from this batch + let timestamp_array = batch.column(self.time_index_column_index); + let milli_ts_array = arrow::compute::cast( + timestamp_array, + &DataType::Timestamp(TimeUnit::Millisecond, None), + )?; + let timestamp_array = milli_ts_array + .as_any() + .downcast_ref::() + .unwrap(); + + // Process against current output cursor position + for &input_ts in timestamp_array.values() { + // Generate absent timestamps up to this input timestamp + while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end { + self.output_timestamps.push(self.output_ts_cursor); + self.output_ts_cursor += self.step; + } + + // Skip the input timestamp if it matches our cursor + if self.output_ts_cursor == input_ts { + self.output_ts_cursor += self.step; + } + } + + Ok(()) + } + + fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> { + // Generate all remaining absent timestamps (input is finished) + while self.output_ts_cursor <= self.end { + self.output_timestamps.push(self.output_ts_cursor); + self.output_ts_cursor += self.step; + } + Ok(()) + } + + fn flush_output_batch(&mut self) -> DataFusionResult> { + if self.output_timestamps.is_empty() { + return Ok(None); + } + + let mut columns: Vec = Vec::with_capacity(self.output_schema.fields().len()); + let num_rows = self.output_timestamps.len(); + columns.push(Arc::new(TimestampMillisecondArray::from( + self.output_timestamps.clone(), + )) as _); + columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _); + + for (_, value) in self.fake_labels.iter() { + columns.push(Arc::new(StringArray::from_iter(std::iter::repeat_n( + Some(value.clone()), + num_rows, + ))) as _); + } + + let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?; + + self.output_timestamps.clear(); + Ok(Some(batch)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; + use datafusion::arrow::record_batch::RecordBatch; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::prelude::SessionContext; + use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray}; + + use super::*; + + #[tokio::test] + async fn test_absent_basic() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + // Input has timestamps: 0, 2000, 4000 + let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![0, 2000, 4000])); + let value_array = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])); + let batch = + RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap(); + + let memory_exec = MemoryExec::try_new(&[vec![batch]], schema, None).unwrap(); + + let output_schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let absent_exec = AbsentExec { + start: 0, + end: 5000, + step: 1000, + time_index_column: "timestamp".to_string(), + value_column: "value".to_string(), + fake_labels: vec![], + output_schema: output_schema.clone(), + input: Arc::new(memory_exec), + properties: PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ), + metric: ExecutionPlanMetricsSet::new(), + }; + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let mut stream = absent_exec.execute(0, task_ctx).unwrap(); + + // Collect all output batches + let mut output_timestamps = Vec::new(); + while let Some(batch_result) = stream.next().await { + let batch = batch_result.unwrap(); + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_array.len() { + if !ts_array.is_null(i) { + let ts = ts_array.value(i); + output_timestamps.push(ts); + } + } + } + + // Should output absent timestamps: 1000, 3000, 5000 + // (0, 2000, 4000 exist in input, so 1000, 3000, 5000 are absent) + assert_eq!(output_timestamps, vec![1000, 3000, 5000]); + } + + #[tokio::test] + async fn test_absent_empty_input() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + // Empty input + let memory_exec = MemoryExec::try_new(&[vec![]], schema, None).unwrap(); + + let output_schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + let absent_exec = AbsentExec { + start: 0, + end: 2000, + step: 1000, + time_index_column: "timestamp".to_string(), + value_column: "value".to_string(), + fake_labels: vec![], + output_schema: output_schema.clone(), + input: Arc::new(memory_exec), + properties: PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + ), + metric: ExecutionPlanMetricsSet::new(), + }; + + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let mut stream = absent_exec.execute(0, task_ctx).unwrap(); + + // Collect all output timestamps + let mut output_timestamps = Vec::new(); + while let Some(batch_result) = stream.next().await { + let batch = batch_result.unwrap(); + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_array.len() { + if !ts_array.is_null(i) { + let ts = ts_array.value(i); + output_timestamps.push(ts); + } + } + } + + // Should output all timestamps in range: 0, 1000, 2000 + assert_eq!(output_timestamps, vec![0, 1000, 2000]); + } +} diff --git a/src/promql/src/extension_plan/planner.rs b/src/promql/src/extension_plan/planner.rs index c1af9b4731..1e6914a78d 100644 --- a/src/promql/src/extension_plan/planner.rs +++ b/src/promql/src/extension_plan/planner.rs @@ -22,8 +22,8 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use crate::extension_plan::{ - EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate, SeriesDivide, - SeriesNormalize, UnionDistinctOn, + Absent, EmptyMetric, HistogramFold, InstantManipulate, RangeManipulate, ScalarCalculate, + SeriesDivide, SeriesNormalize, UnionDistinctOn, }; pub struct PromExtensionPlanner; @@ -57,6 +57,8 @@ impl ExtensionPlanner for PromExtensionPlanner { physical_inputs[0].clone(), physical_inputs[1].clone(), ))) + } else if let Some(node) = node.as_any().downcast_ref::() { + Ok(Some(node.to_execution_plan(physical_inputs[0].clone()))) } else { Ok(None) } diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index a5d7ce38ca..04b702779c 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -27,6 +27,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::execution::context::SessionState; use datafusion::functions_aggregate::average::avg_udaf; use datafusion::functions_aggregate::count::count_udaf; +use datafusion::functions_aggregate::expr_fn::first_value; use datafusion::functions_aggregate::grouping::grouping_udaf; use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf}; use datafusion::functions_aggregate::stddev::stddev_pop_udaf; @@ -50,7 +51,7 @@ use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTi use datatypes::data_type::ConcreteDataType; use itertools::Itertools; use promql::extension_plan::{ - build_special_time_expr, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, + build_special_time_expr, Absent, EmptyMetric, HistogramFold, InstantManipulate, Millisecond, RangeManipulate, ScalarCalculate, SeriesDivide, SeriesNormalize, UnionDistinctOn, }; use promql::functions::{ @@ -86,6 +87,8 @@ use crate::promql::error::{ const SPECIAL_TIME_FUNCTION: &str = "time"; /// `scalar()` function in PromQL. const SCALAR_FUNCTION: &str = "scalar"; +/// `absent()` function in PromQL +const SPECIAL_ABSENT_FUNCTION: &str = "absent"; /// `histogram_quantile` function in PromQL const SPECIAL_HISTOGRAM_QUANTILE: &str = "histogram_quantile"; /// `vector` function in PromQL @@ -124,7 +127,10 @@ struct PromPlannerContext { time_index_column: Option, field_columns: Vec, tag_columns: Vec, + /// The matcher for field columns `__field__`. field_column_matcher: Option>, + /// The matcher for selectors (normal matchers). + selector_matcher: Vec, schema_name: Option, /// The range in millisecond of range selector. None if there is no range selector. range: Option, @@ -148,6 +154,7 @@ impl PromPlannerContext { self.field_columns = vec![]; self.tag_columns = vec![]; self.field_column_matcher = None; + self.selector_matcher.clear(); self.schema_name = None; self.range = None; } @@ -830,6 +837,7 @@ impl PromPlanner { } SPECIAL_VECTOR_FUNCTION => return self.create_vector_plan(args).await, SCALAR_FUNCTION => return self.create_scalar_plan(args, session_state).await, + SPECIAL_ABSENT_FUNCTION => return self.create_absent_plan(args, session_state).await, _ => {} } @@ -1001,6 +1009,7 @@ impl PromPlanner { ); self.ctx.schema_name = Some(matcher.value.clone()); } else if matcher.name != METRIC_NAME { + self.ctx.selector_matcher.push(matcher.clone()); let _ = matchers.insert(matcher.clone()); } } @@ -2449,6 +2458,69 @@ impl PromPlanner { Ok(scalar_plan) } + /// Create a [SPECIAL_ABSENT_FUNCTION] plan + async fn create_absent_plan( + &mut self, + args: &PromFunctionArgs, + session_state: &SessionState, + ) -> Result { + if args.args.len() != 1 { + return FunctionInvalidArgumentSnafu { + fn_name: SPECIAL_ABSENT_FUNCTION.to_string(), + } + .fail(); + } + let input = self.prom_expr_to_plan(&args.args[0], session_state).await?; + + let time_index_expr = self.create_time_index_column_expr()?; + let first_field_expr = + self.create_field_column_exprs()? + .pop() + .with_context(|| ValueNotFoundSnafu { + table: self.ctx.table_name.clone().unwrap_or_default(), + })?; + let first_value_expr = first_value(first_field_expr, None); + + let ordered_aggregated_input = LogicalPlanBuilder::from(input) + .aggregate( + vec![time_index_expr.clone()], + vec![first_value_expr.clone()], + ) + .context(DataFusionPlanningSnafu)? + .sort(vec![time_index_expr.sort(true, false)]) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + let fake_labels = self + .ctx + .selector_matcher + .iter() + .filter_map(|matcher| match matcher.op { + MatchOp::Equal => Some((matcher.name.clone(), matcher.value.clone())), + _ => None, + }) + .collect::>(); + + // Create the absent plan + let absent_plan = LogicalPlan::Extension(Extension { + node: Arc::new( + Absent::try_new( + self.ctx.start, + self.ctx.end, + self.ctx.interval, + self.ctx.time_index_column.as_ref().unwrap().clone(), + self.ctx.field_columns[0].clone(), + fake_labels, + ordered_aggregated_input, + ) + .context(DataFusionPlanningSnafu)?, + ), + }); + + Ok(absent_plan) + } + /// Try to build a DataFusion Literal Expression from PromQL Expr, return /// `None` if the input is not a literal expression. fn try_build_literal_expr(expr: &PromExpr) -> Option { diff --git a/tests/cases/standalone/common/promql/absent.result b/tests/cases/standalone/common/promql/absent.result new file mode 100644 index 0000000000..9c4fb4a2f7 --- /dev/null +++ b/tests/cases/standalone/common/promql/absent.result @@ -0,0 +1,122 @@ +create table t ( + ts timestamp(3) time index, + job STRING, + instance STRING, + val DOUBLE, + PRIMARY KEY(job, instance), +); + +Affected Rows: 0 + +insert into t values + (0, 'job1', 'instance1', 1), + (0, 'job2', 'instance2', 2), + (5000, 'job1', 'instance3',3), + (5000, 'job2', 'instance4',4), + (10000, 'job1', 'instance5',5), + (10000, 'job2', 'instance6',6), + (15000, 'job1', 'instance7',7), + (15000, 'job2', 'instance8',8); + +Affected Rows: 8 + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="job1"}); + +++ +++ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="job2"}); + +++ +++ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="job3"}); + ++---------------------+-----+------+ +| ts | val | job | ++---------------------+-----+------+ +| 1970-01-01T00:00:00 | 1.0 | job3 | +| 1970-01-01T00:00:05 | 1.0 | job3 | +| 1970-01-01T00:00:10 | 1.0 | job3 | +| 1970-01-01T00:00:15 | 1.0 | job3 | ++---------------------+-----+------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(nonexistent_table); + ++---------------------+-------+ +| time | value | ++---------------------+-------+ +| 1970-01-01T00:00:00 | 1.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 1.0 | +| 1970-01-01T00:00:15 | 1.0 | ++---------------------+-------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="nonexistent_job"}); + ++---------------------+-----+-----------------+ +| ts | val | job | ++---------------------+-----+-----------------+ +| 1970-01-01T00:00:00 | 1.0 | nonexistent_job | +| 1970-01-01T00:00:05 | 1.0 | nonexistent_job | +| 1970-01-01T00:00:10 | 1.0 | nonexistent_job | +| 1970-01-01T00:00:15 | 1.0 | nonexistent_job | ++---------------------+-----+-----------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (1000, 1000, '1s') absent(t{job="job1"}); + ++---------------------+-----+------+ +| ts | val | job | ++---------------------+-----+------+ +| 1970-01-01T00:16:40 | 1.0 | job1 | ++---------------------+-----+------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="nonexistent_job1", job="nonexistent_job2"}); + ++---------------------+-----+------------------+ +| ts | val | job | ++---------------------+-----+------------------+ +| 1970-01-01T00:00:00 | 1.0 | nonexistent_job2 | +| 1970-01-01T00:00:05 | 1.0 | nonexistent_job2 | +| 1970-01-01T00:00:10 | 1.0 | nonexistent_job2 | +| 1970-01-01T00:00:15 | 1.0 | nonexistent_job2 | ++---------------------+-----+------------------+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job=~"nonexistent_job1", job!="nonexistent_job2"}); + ++---------------------+-----+ +| ts | val | ++---------------------+-----+ +| 1970-01-01T00:00:00 | 1.0 | +| 1970-01-01T00:00:05 | 1.0 | +| 1970-01-01T00:00:10 | 1.0 | +| 1970-01-01T00:00:15 | 1.0 | ++---------------------+-----+ + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') sum(t{job="job2"}); + ++---------------------+------------+ +| ts | sum(t.val) | ++---------------------+------------+ +| 1970-01-01T00:00:00 | 2.0 | +| 1970-01-01T00:00:05 | 6.0 | +| 1970-01-01T00:00:10 | 12.0 | +| 1970-01-01T00:00:15 | 20.0 | ++---------------------+------------+ + +-- ABSENT is not supported for aggregation functions for now +-- tql eval (0, 15, '5s') absent(sum(t{job="job2"})); +-- tql eval (0, 15, '5s') absent(sum(t{job="job3"})); +drop table t; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/absent.sql b/tests/cases/standalone/common/promql/absent.sql new file mode 100644 index 0000000000..e3b5719f26 --- /dev/null +++ b/tests/cases/standalone/common/promql/absent.sql @@ -0,0 +1,50 @@ +create table t ( + ts timestamp(3) time index, + job STRING, + instance STRING, + val DOUBLE, + PRIMARY KEY(job, instance), +); + +insert into t values + (0, 'job1', 'instance1', 1), + (0, 'job2', 'instance2', 2), + (5000, 'job1', 'instance3',3), + (5000, 'job2', 'instance4',4), + (10000, 'job1', 'instance5',5), + (10000, 'job2', 'instance6',6), + (15000, 'job1', 'instance7',7), + (15000, 'job2', 'instance8',8); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="job1"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="job2"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="job3"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(nonexistent_table); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="nonexistent_job"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (1000, 1000, '1s') absent(t{job="job1"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job="nonexistent_job1", job="nonexistent_job2"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') absent(t{job=~"nonexistent_job1", job!="nonexistent_job2"}); + +-- SQLNESS SORT_RESULT 3 1 +tql eval (0, 15, '5s') sum(t{job="job2"}); + +-- ABSENT is not supported for aggregation functions for now +-- tql eval (0, 15, '5s') absent(sum(t{job="job2"})); +-- tql eval (0, 15, '5s') absent(sum(t{job="job3"})); + +drop table t;