feat: serialize/deserialize support for PromQL plans (#1684)

* implement serializer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy and CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix compile error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* register registry

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* enable promql plan for dist planner

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-06-02 16:14:05 +08:00
committed by GitHub
parent 2615718999
commit 8e69aef973
20 changed files with 295 additions and 18 deletions

5
Cargo.lock generated
View File

@@ -4054,7 +4054,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=44c5adf34938d0650c18a14db2a374bdee471ae7#44c5adf34938d0650c18a14db2a374bdee471ae7"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45"
dependencies = [
"prost",
"serde",
@@ -6686,7 +6686,9 @@ dependencies = [
"datafusion",
"datatypes",
"futures",
"greptime-proto",
"promql-parser",
"prost",
"query",
"session",
"snafu",
@@ -6954,6 +6956,7 @@ dependencies = [
"format_num",
"futures",
"futures-util",
"greptime-proto",
"humantime",
"metrics",
"num",

View File

@@ -71,6 +71,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" }
parquet = "40.0"
paste = "1.0"
prost = "0.11"

View File

@@ -10,7 +10,7 @@ common-base = { path = "../common/base" }
common-error = { path = "../common/error" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "44c5adf34938d0650c18a14db2a374bdee471ae7" }
greptime-proto.workspace = true
prost.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
tonic.workspace = true

View File

@@ -138,7 +138,7 @@ async fn handle_create(
return Ok(RouteResponse {
header: Some(ResponseHeader::failed(
cluster_id,
Error::not_enough_available_datanodes(partitions.len(), peers.len()),
Error::not_enough_active_datanodes(peers.len() as _),
)),
..Default::default()
});

View File

@@ -15,7 +15,9 @@ common-function-macro = { path = "../common/function-macro" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
greptime-proto.workspace = true
promql-parser = "0.1.1"
prost.workspace = true
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }

View File

@@ -84,6 +84,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to deserialize: {}", source))]
Deserialize {
source: prost::DecodeError,
location: Location,
},
#[snafu(display("Empty range is not expected, location: {}", location))]
EmptyRange { location: Location },
@@ -120,7 +126,8 @@ impl ErrorExt for Error {
| ExpectExpr { .. }
| ExpectRangeSelector { .. }
| ZeroRangeSelector { .. }
| ColumnNotFound { .. } => StatusCode::InvalidArguments,
| ColumnNotFound { .. }
| Deserialize { .. } => StatusCode::InvalidArguments,
UnknownTable { .. }
| DataFusionPlanning { .. }

View File

@@ -84,6 +84,10 @@ impl EmptyMetric {
})
}
pub const fn name() -> &'static str {
"EmptyMetric"
}
pub fn to_execution_plan(
&self,
session_state: &SessionState,
@@ -110,7 +114,7 @@ impl EmptyMetric {
impl UserDefinedLogicalNodeCore for EmptyMetric {
fn name(&self) -> &str {
"EmptyMetric"
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {

View File

@@ -21,10 +21,10 @@ use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::common::{DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -34,7 +34,11 @@ use datafusion::physical_plan::{
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
use futures::{Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
/// Manipulate the input record batch to make it suitable for Instant Operator.
@@ -56,7 +60,7 @@ pub struct InstantManipulate {
impl UserDefinedLogicalNodeCore for InstantManipulate {
fn name(&self) -> &str {
"InstantManipulate"
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -115,6 +119,10 @@ impl InstantManipulate {
}
}
pub const fn name() -> &'static str {
"InstantManipulate"
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(InstantManipulateExec {
start: self.start,
@@ -127,6 +135,41 @@ impl InstantManipulate {
metric: ExecutionPlanMetricsSet::new(),
})
}
pub fn serialize(&self) -> Vec<u8> {
pb::InstantManipulate {
start: self.start,
end: self.end,
interval: self.interval,
lookback_delta: self.lookback_delta,
time_index: self.time_index_column.clone(),
field_index: self.field_column.clone().unwrap_or_default(),
}
.encode_to_vec()
}
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_instant_manipulate =
pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
let field_column = if pb_instant_manipulate.field_index.is_empty() {
None
} else {
Some(pb_instant_manipulate.field_index)
};
Ok(Self {
start: pb_instant_manipulate.start,
end: pb_instant_manipulate.end,
lookback_delta: pb_instant_manipulate.lookback_delta,
interval: pb_instant_manipulate.interval,
time_index_column: pb_instant_manipulate.time_index,
field_column,
input: placeholder_plan,
})
}
}
#[derive(Debug)]

View File

@@ -19,10 +19,10 @@ use std::task::{Context, Poll};
use datafusion::arrow::array::{BooleanArray, Float64Array};
use datafusion::arrow::compute;
use datafusion::common::{DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
use datafusion::error::DataFusionError;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -33,7 +33,11 @@ use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::error::Result as ArrowResult;
use datatypes::arrow::record_batch::RecordBatch;
use futures::{Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
/// Normalize the input record batch. Notice that for simplicity, this method assumes
@@ -54,7 +58,7 @@ pub struct SeriesNormalize {
impl UserDefinedLogicalNodeCore for SeriesNormalize {
fn name(&self) -> &str {
"SeriesNormalize"
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -104,6 +108,10 @@ impl SeriesNormalize {
}
}
pub const fn name() -> &'static str {
"SeriesNormalize"
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SeriesNormalizeExec {
offset: self.offset,
@@ -113,6 +121,29 @@ impl SeriesNormalize {
metric: ExecutionPlanMetricsSet::new(),
})
}
pub fn serialize(&self) -> Vec<u8> {
pb::SeriesNormalize {
offset: self.offset,
time_index: self.time_index_column_name.clone(),
filter_nan: self.need_filter_out_nan,
}
.encode_to_vec()
}
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
Ok(Self::new(
pb_normalize.offset,
pb_normalize.time_index,
pb_normalize.filter_nan,
placeholder_plan,
))
}
}
#[derive(Debug)]

View File

@@ -26,7 +26,7 @@ use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::{DFField, DFSchema, DFSchemaRef};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -35,7 +35,11 @@ use datafusion::physical_plan::{
};
use datafusion::sql::TableReference;
use futures::{Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DataFusionPlanningSnafu, DeserializeSnafu, Result};
use crate::extension_plan::Millisecond;
use crate::range_array::RangeArray;
@@ -85,6 +89,10 @@ impl RangeManipulate {
})
}
pub const fn name() -> &'static str {
"RangeManipulate"
}
pub fn build_timestamp_range_name(time_index: &str) -> String {
format!("{time_index}_range")
}
@@ -145,11 +153,41 @@ impl RangeManipulate {
metric: ExecutionPlanMetricsSet::new(),
})
}
pub fn serialize(&self) -> Vec<u8> {
pb::RangeManipulate {
start: self.start,
end: self.end,
interval: self.interval,
range: self.range,
time_index: self.time_index.clone(),
tag_columns: self.field_columns.clone(),
}
.encode_to_vec()
}
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
Self::new(
pb_range_manipulate.start,
pb_range_manipulate.end,
pb_range_manipulate.interval,
pb_range_manipulate.range,
pb_range_manipulate.time_index,
pb_range_manipulate.tag_columns,
placeholder_plan,
)
.context(DataFusionPlanningSnafu)
}
}
impl UserDefinedLogicalNodeCore for RangeManipulate {
fn name(&self) -> &str {
"RangeManipulate"
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {

View File

@@ -20,10 +20,10 @@ use std::task::{Context, Poll};
use datafusion::arrow::array::{Array, StringArray};
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::DFSchemaRef;
use datafusion::common::{DFSchema, DFSchemaRef};
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
@@ -32,6 +32,11 @@ use datafusion::physical_plan::{
};
use datatypes::arrow::compute;
use futures::{ready, Stream, StreamExt};
use greptime_proto::substrait_extension as pb;
use prost::Message;
use snafu::ResultExt;
use crate::error::{DeserializeSnafu, Result};
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SeriesDivide {
@@ -41,7 +46,7 @@ pub struct SeriesDivide {
impl UserDefinedLogicalNodeCore for SeriesDivide {
fn name(&self) -> &str {
"SeriesDivide"
Self::name()
}
fn inputs(&self) -> Vec<&LogicalPlan> {
@@ -75,6 +80,10 @@ impl SeriesDivide {
Self { tag_columns, input }
}
pub const fn name() -> &'static str {
"SeriesDivide"
}
pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(SeriesDivideExec {
tag_columns: self.tag_columns.clone(),
@@ -82,6 +91,25 @@ impl SeriesDivide {
metric: ExecutionPlanMetricsSet::new(),
})
}
pub fn serialize(&self) -> Vec<u8> {
pb::SeriesDivide {
tag_columns: self.tag_columns.clone(),
}
.encode_to_vec()
}
pub fn deserialize(bytes: &[u8]) -> Result<Self> {
let pb_series_divide = pb::SeriesDivide::decode(bytes).context(DeserializeSnafu)?;
let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(DFSchema::empty()),
});
Ok(Self {
tag_columns: pb_series_divide.tag_columns,
input: placeholder_plan,
})
}
}
#[derive(Debug)]

View File

@@ -32,6 +32,7 @@ datafusion-sql.workspace = true
datatypes = { path = "../datatypes" }
futures = "0.3"
futures-util.workspace = true
greptime-proto.workspace = true
humantime = "2.1"
metrics.workspace = true
object-store = { path = "../object-store" }

View File

@@ -15,6 +15,9 @@
use std::sync::Arc;
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
#[allow(dead_code)]
pub enum Commutativity {
@@ -69,8 +72,18 @@ impl Categorizer {
}
}
pub fn check_extension_plan(_plan: &dyn UserDefinedLogicalNode) -> Commutativity {
todo!("enumerate all the extension plans here")
pub fn check_extension_plan(plan: &dyn UserDefinedLogicalNode) -> Commutativity {
match plan.name() {
name if name == EmptyMetric::name()
|| name == InstantManipulate::name()
|| name == SeriesNormalize::name()
|| name == RangeManipulate::name()
|| name == SeriesDivide::name() =>
{
Commutativity::Commutative
}
_ => Commutativity::Unsupported,
}
}
}

View File

@@ -0,0 +1,103 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use datafusion::error::Result;
use datafusion::execution::registry::SerializerRegistry;
use datafusion_common::DataFusionError;
use datafusion_expr::UserDefinedLogicalNode;
use promql::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,
};
pub struct ExtensionSerializer;
impl SerializerRegistry for ExtensionSerializer {
/// Serialize this node to a byte array. This serialization should not include
/// input plans.
fn serialize_logical_plan(&self, node: &dyn UserDefinedLogicalNode) -> Result<Vec<u8>> {
match node.name() {
name if name == InstantManipulate::name() => {
let instant_manipulate = node
.as_any()
.downcast_ref::<InstantManipulate>()
.expect("Failed to downcast to InstantManipulate");
Ok(instant_manipulate.serialize())
}
name if name == SeriesNormalize::name() => {
let series_normalize = node
.as_any()
.downcast_ref::<SeriesNormalize>()
.expect("Failed to downcast to SeriesNormalize");
Ok(series_normalize.serialize())
}
name if name == RangeManipulate::name() => {
let range_manipulate = node
.as_any()
.downcast_ref::<RangeManipulate>()
.expect("Failed to downcast to RangeManipulate");
Ok(range_manipulate.serialize())
}
name if name == SeriesDivide::name() => {
let series_divide = node
.as_any()
.downcast_ref::<SeriesDivide>()
.expect("Failed to downcast to SeriesDivide");
Ok(series_divide.serialize())
}
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be serialized".to_string(),
)),
other => Err(DataFusionError::NotImplemented(format!(
"Serizlize logical plan for {}",
other
))),
}
}
/// Deserialize user defined logical plan node ([UserDefinedLogicalNode]) from
/// bytes.
fn deserialize_logical_plan(
&self,
name: &str,
bytes: &[u8],
) -> Result<Arc<dyn UserDefinedLogicalNode>> {
match name {
name if name == InstantManipulate::name() => {
let instant_manipulate = InstantManipulate::deserialize(bytes)?;
Ok(Arc::new(instant_manipulate))
}
name if name == SeriesNormalize::name() => {
let series_normalize = SeriesNormalize::deserialize(bytes)?;
Ok(Arc::new(series_normalize))
}
name if name == RangeManipulate::name() => {
let range_manipulate = RangeManipulate::deserialize(bytes)?;
Ok(Arc::new(range_manipulate))
}
name if name == SeriesDivide::name() => {
let series_divide = SeriesDivide::deserialize(bytes)?;
Ok(Arc::new(series_divide))
}
name if name == EmptyMetric::name() => Err(DataFusionError::Substrait(
"EmptyMetric should not be deserialized".to_string(),
)),
other => Err(DataFusionError::NotImplemented(format!(
"Deserialize logical plan for {}",
other
))),
}
}
}

View File

@@ -18,6 +18,7 @@ pub mod datafusion;
pub mod dist_plan;
pub mod error;
pub mod executor;
pub mod extension_serializer;
pub mod logical_optimizer;
mod metrics;
mod optimizer;

View File

@@ -36,6 +36,7 @@ use partition::manager::PartitionRuleManager;
use promql::extension_plan::PromExtensionPlanner;
use crate::dist_plan::{DistExtensionPlanner, DistPlannerAnalyzer};
use crate::extension_serializer::ExtensionSerializer;
use crate::optimizer::order_hint::OrderHintRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::query_engine::options::QueryOptions;
@@ -83,6 +84,7 @@ impl QueryEngineState {
runtime_env,
Arc::new(MemoryCatalogList::default()), // pass a dummy catalog list
)
.with_serializer_registry(Arc::new(ExtensionSerializer))
.with_analyzer_rules(analyzer.rules)
.with_query_planner(Arc::new(DfQueryPlanner::new(
partition_manager,