chore(deps): bump datafusion to the latest commit (#1967)

* bump deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix compile except pyo3 backend

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix promql-parser metric name matcher

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix pyarrow convert

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix pyo3 compiling

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deadcode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update stream adapter display format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix physical optimizer rule

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-01 10:10:49 +08:00
committed by GitHub
parent 5bd80a74ab
commit 44f3ed2f74
59 changed files with 1140 additions and 1047 deletions

1458
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -56,23 +56,22 @@ edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
arrow = { version = "40.0" }
arrow-array = "40.0"
arrow-flight = "40.0"
arrow-schema = { version = "40.0", features = ["serde"] }
arrow = { version = "43.0" }
etcd-client = "0.11"
arrow-array = "43.0"
arrow-flight = "43.0"
arrow-schema = { version = "43.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
# TODO(ruihang): use arrow-datafusion when it contains https://github.com/apache/arrow-datafusion/pull/6032
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
datafusion = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
datafusion-common = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
datafusion-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
datafusion-optimizer = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
datafusion-physical-expr = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "2ceb7f927c40787773fdc466d6a4b79f3a6c0001" }
derive_builder = "0.12"
etcd-client = "0.11"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "356694a72f12ad9e15008d4245a0b4fe48f982ad" }
@@ -80,7 +79,7 @@ itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"
opentelemetry-proto = { version = "0.2", features = ["gen-tonic", "metrics"] }
parquet = "40.0"
parquet = "43.0"
paste = "1.0"
prost = "0.11"
rand = "0.8"
@@ -88,7 +87,7 @@ regex = "1.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.34"
sqlparser = "0.35"
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }

View File

@@ -22,8 +22,8 @@ use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::datasource::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::scalars::ScalarVectorBuilder;

View File

@@ -20,8 +20,8 @@ use common_error::ext::BoxedError;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::datasource::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::prelude::{ConcreteDataType, ScalarVectorBuilder, VectorRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};

View File

@@ -30,8 +30,8 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::{ArrowError, Schema as ArrowSchema};
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::datasource::physical_plan::FileOpenFuture;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::file_format::FileOpenFuture;
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use object_store::ObjectStore;

View File

@@ -23,8 +23,8 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use common_runtime;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::Result as DataFusionResult;
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::physical_plan::SendableRecordBatchStream;
use derive_builder::Builder;
use object_store::ObjectStore;

View File

@@ -26,8 +26,8 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use common_runtime;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::physical_plan::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::ResultExt;

View File

@@ -20,8 +20,8 @@ use arrow::compute::cast;
use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::physical_plan::RecordBatchStream;
use futures::{Stream, StreamExt, TryStreamExt};
use object_store::ObjectStore;

View File

@@ -18,13 +18,13 @@ use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
use datafusion::datasource::physical_plan::{FileMeta, ParquetFileReaderFactory};
use datafusion::error::Result as DatafusionResult;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::arrow::{parquet_to_arrow_schema, ArrowWriter};
use datafusion::parquet::errors::{ParquetError, Result as ParquetResult};
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::parquet::format::FileMetaData;
use datafusion::physical_plan::file_format::{FileMeta, ParquetFileReaderFactory};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::future::BoxFuture;
use object_store::{ObjectStore, Reader};

View File

@@ -18,8 +18,8 @@ use std::sync::Arc;
use std::vec;
use datafusion::assert_batches_eq;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;

View File

@@ -19,7 +19,7 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::physical_plan::file_format::{FileScanConfig, FileStream};
use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use object_store::services::Fs;
use object_store::ObjectStore;
@@ -86,7 +86,7 @@ pub fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str)
projection: None,
limit,
table_partition_cols: vec![],
output_ordering: None,
output_ordering: vec![],
infinite_source: false,
}
}

View File

@@ -17,7 +17,7 @@ use std::sync::Arc;
use api::v1::{AffectedRows, FlightMetadata};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, IpcMessage, SchemaAsIpc};
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
use common_recordbatch::{RecordBatch, RecordBatches};
use datatypes::arrow;
@@ -25,6 +25,7 @@ use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
use datatypes::schema::{Schema, SchemaRef};
use flatbuffers::FlatBufferBuilder;
use prost::bytes::Bytes as ProstBytes;
use prost::Message;
use snafu::{OptionExt, ResultExt};
@@ -86,12 +87,12 @@ impl FlightEncoder {
affected_rows: Some(AffectedRows { value: rows as _ }),
}
.encode_to_vec();
FlightData::new(
None,
IpcMessage(build_none_flight_msg().into()),
metadata,
vec![],
)
FlightData {
flight_descriptor: None,
data_header: build_none_flight_msg().into(),
app_metadata: metadata.into(),
data_body: ProstBytes::default(),
}
}
}
}

View File

@@ -20,8 +20,8 @@ use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
use datafusion_expr::{
AccumulatorFunctionImplementation as DfAccumulatorFunctionImplementation,
AggregateUDF as DfAggregateUdf, StateTypeFunction as DfStateTypeFunction,
AccumulatorFactoryFunction, AggregateUDF as DfAggregateUdf,
StateTypeFunction as DfStateTypeFunction,
};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::prelude::*;
@@ -103,11 +103,11 @@ impl From<AggregateFunction> for DfAggregateUdf {
fn to_df_accumulator_func(
accumulator: AccumulatorFunctionImpl,
creator: AggregateFunctionCreatorRef,
) -> DfAccumulatorFunctionImplementation {
) -> AccumulatorFactoryFunction {
Arc::new(move |_| {
let accumulator = accumulator()?;
let creator = creator.clone();
Ok(Box::new(DfAccumulatorAdaptor::new(accumulator, creator)))
Ok(Box::new(DfAccumulatorAdaptor::new(accumulator, creator)) as _)
})
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::sync::Arc;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
@@ -24,7 +24,7 @@ pub use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::Statistics;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
@@ -218,6 +218,12 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
}
}
impl DisplayAs for DfPhysicalPlanAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}
#[cfg(test)]
mod test {
use async_trait::async_trait;

View File

@@ -26,7 +26,7 @@ table = { path = "../../table" }
[dependencies.substrait_proto]
package = "substrait"
version = "0.10"
version = "0.12"
[dev-dependencies]
datatypes = { path = "../../datatypes" }

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use datafusion::catalog::catalog::CatalogList;
use datafusion::catalog::CatalogList;
use datafusion::execution::context::SessionState;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::{SessionConfig, SessionContext};

View File

@@ -23,7 +23,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion::catalog::catalog::CatalogList;
use datafusion::catalog::CatalogList;
pub use crate::df_substrait::DFLogicalSubstraitConvertor;

View File

@@ -23,10 +23,8 @@ use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_grpc_expr::insert::to_table_insert_request;
use common_query::Output;
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider};
use datafusion::datasource::TableProvider;
use futures::future;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};

View File

@@ -635,7 +635,7 @@ impl TryFrom<ScalarValue> for Value {
ScalarValue::Binary(b)
| ScalarValue::LargeBinary(b)
| ScalarValue::FixedSizeBinary(_, b) => Value::from(b.map(Bytes::from)),
ScalarValue::List(vs, field) => {
ScalarValue::List(vs, field) | ScalarValue::Fixedsizelist(vs, field, _) => {
let items = if let Some(vs) = vs {
let vs = vs
.into_iter()
@@ -687,6 +687,10 @@ impl TryFrom<ScalarValue> for Value {
.map(|x| Value::Interval(Interval::from_i128(x)))
.unwrap_or(Value::Null),
ScalarValue::Decimal128(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _) => {
return error::UnsupportedArrowTypeSnafu {

View File

@@ -27,7 +27,6 @@ mod tests {
use arrow::array::{Array, PrimitiveArray};
use arrow_array::ArrayRef;
use common_time::DateTime;
use datafusion_common::from_slice::FromSlice;
use super::*;
use crate::data_type::DataType;
@@ -39,7 +38,7 @@ mod tests {
#[test]
fn test_datetime_vector() {
std::env::set_var("TZ", "Asia/Shanghai");
let v = DateTimeVector::new(PrimitiveArray::from_slice([1, 2, 3]));
let v = DateTimeVector::new(PrimitiveArray::from(vec![1, 2, 3]));
assert_eq!(ConcreteDataType::datetime_datatype(), v.data_type());
assert_eq!(3, v.len());
assert_eq!("DateTimeVector", v.vector_type_name());
@@ -57,7 +56,7 @@ mod tests {
assert_eq!(Some(DateTime::new(2)), iter.next().unwrap());
assert_eq!(Some(DateTime::new(3)), iter.next().unwrap());
assert!(!v.is_null(0));
assert_eq!(64, v.memory_size());
assert_eq!(24, v.memory_size());
if let Value::DateTime(d) = v.get(0) {
assert_eq!(1, d.val());

View File

@@ -159,7 +159,7 @@ impl Helper {
| ScalarValue::FixedSizeBinary(_, v) => {
ConstantVector::new(Arc::new(BinaryVector::from(vec![v])), length)
}
ScalarValue::List(v, field) => {
ScalarValue::List(v, field) | ScalarValue::Fixedsizelist(v, field, _) => {
let item_type = ConcreteDataType::try_from(field.data_type())?;
let mut builder = ListVectorBuilder::with_type_capacity(item_type.clone(), 1);
if let Some(values) = v {
@@ -219,6 +219,10 @@ impl Helper {
ConstantVector::new(Arc::new(IntervalMonthDayNanoVector::from(vec![v])), length)
}
ScalarValue::Decimal128(_, _, _)
| ScalarValue::DurationSecond(_)
| ScalarValue::DurationMillisecond(_)
| ScalarValue::DurationMicrosecond(_)
| ScalarValue::DurationNanosecond(_)
| ScalarValue::Struct(_, _)
| ScalarValue::Dictionary(_, _) => {
return error::ConversionSnafu {

View File

@@ -84,24 +84,28 @@ impl<T: LogicalPrimitiveType> PrimitiveVector<T> {
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.clone()
.with_timezone_opt(None::<String>)
.to_data(),
arrow_schema::TimeUnit::Millisecond => array
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.clone()
.with_timezone_opt(None::<String>)
.to_data(),
arrow_schema::TimeUnit::Microsecond => array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.clone()
.with_timezone_opt(None::<String>)
.to_data(),
arrow_schema::TimeUnit::Nanosecond => array
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.clone()
.with_timezone_opt(None::<String>)
.to_data(),
},

View File

@@ -312,7 +312,7 @@ mod tests {
assert!(!v.is_const());
assert!(v.validity().is_all_valid());
assert!(!v.only_null());
assert_eq!(128, v.memory_size());
assert_eq!(1088, v.memory_size());
for (i, s) in strs.iter().enumerate() {
assert_eq!(Value::from(*s), v.get(i));

View File

@@ -26,10 +26,10 @@ use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::ToDFSchema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::SessionContext;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
@@ -113,7 +113,7 @@ fn build_record_batch_stream<T: FileOpener + Send + 'static>(
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: None,
output_ordering: vec![],
infinite_source: false,
},
0, // partition: hard-code
@@ -185,7 +185,7 @@ fn new_parquet_stream_with_exec_plan(
projection: projection.cloned(),
limit: *limit,
table_partition_cols: vec![],
output_ordering: None,
output_ordering: vec![],
infinite_source: false,
};

View File

@@ -31,8 +31,8 @@ use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter;
use common_recordbatch::DfSendableRecordBatchStream;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream};
use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::compute::can_cast_types;
use datatypes::arrow::datatypes::{Schema, SchemaRef};
@@ -143,7 +143,7 @@ impl StatementExecutor {
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
output_ordering: vec![],
infinite_source: false,
},
0,

View File

@@ -113,6 +113,12 @@ pub enum Error {
#[snafu(display("Cannot find column {col}, location: {}", location))]
ColumnNotFound { col: String, location: Location },
#[snafu(display("Found multiple metric matchers in selector, location: {}", location))]
MultipleMetricMatchers { location: Location },
#[snafu(display("Expect a metric matcher, but not found, location: {}", location))]
NoMetricMatcher { location: Location },
}
impl ErrorExt for Error {
@@ -138,6 +144,8 @@ impl ErrorExt for Error {
TableNameNotFound { .. } => StatusCode::TableNotFound,
MultipleMetricMatchers { .. } | NoMetricMatcher { .. } => StatusCode::InvalidSyntax,
Catalog { source, .. } => source.status_code(),
}
}

View File

@@ -27,9 +27,10 @@ use datafusion::logical_expr::{ExprSchemable, LogicalPlan, UserDefinedLogicalNod
use datafusion::physical_expr::{PhysicalExprRef, PhysicalSortExpr};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, PhysicalPlanner, RecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use datafusion::physical_planner::PhysicalPlanner;
use datafusion::prelude::{col, lit, Expr};
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
@@ -207,16 +208,6 @@ impl ExecutionPlan for EmptyMetricExec {
}))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => write!(
f,
"EmptyMetric: range=[{}..{}], interval=[{}]",
self.start, self.end, self.interval,
),
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
@@ -234,6 +225,18 @@ impl ExecutionPlan for EmptyMetricExec {
}
}
impl DisplayAs for EmptyMetricExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => write!(
f,
"EmptyMetric: range=[{}..{}], interval=[{}]",
self.start, self.end, self.interval,
),
}
}
}
pub struct EmptyMetricStream {
start: Millisecond,
end: Millisecond,
@@ -319,7 +322,7 @@ pub fn build_special_time_expr(time_index_column_name: &str) -> Expr {
#[cfg(test)]
mod test {
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_planner::DefaultPhysicalPlanner;
use datafusion::prelude::SessionContext;
use super::*;

View File

@@ -28,8 +28,8 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
use datatypes::arrow::error::Result as ArrowResult;
@@ -258,18 +258,6 @@ impl ExecutionPlan for InstantManipulateExec {
}))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
self.start,self.end, self.lookback_delta, self.interval, self.time_index_column
)
}
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
@@ -294,6 +282,20 @@ impl ExecutionPlan for InstantManipulateExec {
}
}
impl DisplayAs for InstantManipulateExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
self.start,self.end, self.lookback_delta, self.interval, self.time_index_column
)
}
}
}
}
pub struct InstantManipulateStream {
start: Millisecond,
end: Millisecond,
@@ -442,7 +444,6 @@ mod test {
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
@@ -458,13 +459,13 @@ mod test {
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _;
let field_column = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, field_column, path_column],
@@ -748,16 +749,11 @@ mod test {
Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
Field::new("value", DataType::Float64, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
])) as _;
let field_column = Arc::new(Float64Array::from_slice([
0.0,
f64::NAN,
6.0,
f64::NAN,
12.0,
])) as _;
let field_column =
Arc::new(Float64Array::from(vec![0.0, f64::NAN, 6.0, f64::NAN, 12.0])) as _;
let data =
RecordBatch::try_new(schema.clone(), vec![timestamp_column, field_column]).unwrap();

View File

@@ -26,7 +26,8 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream,
};
use datatypes::arrow::array::TimestampMillisecondArray;
use datatypes::arrow::datatypes::SchemaRef;
@@ -214,9 +215,19 @@ impl ExecutionPlan for SeriesNormalizeExec {
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> Statistics {
self.input.statistics()
}
}
impl DisplayAs for SeriesNormalizeExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
@@ -225,14 +236,6 @@ impl ExecutionPlan for SeriesNormalizeExec {
}
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
fn statistics(&self) -> Statistics {
self.input.statistics()
}
}
pub struct SeriesNormalizeStream {
@@ -324,7 +327,6 @@ mod test {
use datafusion::arrow::datatypes::{
ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
@@ -340,12 +342,11 @@ mod test {
Field::new("value", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
60_000, 120_000, 0, 30_000, 90_000,
])) as _;
let field_column = Arc::new(Float64Array::from_slice([0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
let path_column =
Arc::new(StringArray::from_slice(["foo", "foo", "foo", "foo", "foo"])) as _;
let field_column = Arc::new(Float64Array::from(vec![0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo", "foo"])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![timestamp_column, field_column, path_column],

View File

@@ -18,8 +18,8 @@ use async_trait::async_trait;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use crate::extension_plan::{
EmptyMetric, InstantManipulate, RangeManipulate, SeriesDivide, SeriesNormalize,

View File

@@ -30,8 +30,8 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
Statistics,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion::sql::TableReference;
use futures::{Stream, StreamExt};
@@ -321,18 +321,6 @@ impl ExecutionPlan for RangeManipulateExec {
}))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(
f,
"PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
self.start, self.end, self.interval, self.range, self.time_index_column
)
}
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
@@ -357,6 +345,20 @@ impl ExecutionPlan for RangeManipulateExec {
}
}
impl DisplayAs for RangeManipulateExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
self.start, self.end, self.interval, self.range, self.time_index_column
)
}
}
}
}
pub struct RangeManipulateStream {
start: Millisecond,
end: Millisecond,
@@ -490,7 +492,6 @@ mod test {
ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
};
use datafusion::common::ToDFSchema;
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
use datatypes::arrow::array::TimestampMillisecondArray;
@@ -506,13 +507,13 @@ mod test {
Field::new("value_2", DataType::Float64, true),
Field::new("path", DataType::Utf8, true),
]));
let timestamp_column = Arc::new(TimestampMillisecondArray::from_slice([
let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
0, 30_000, 60_000, 90_000, 120_000, // every 30s
180_000, 240_000, // every 60s
241_000, 271_000, 291_000, // others
])) as _;
let field_column: ArrayRef = Arc::new(Float64Array::from_slice([1.0; 10])) as _;
let path_column = Arc::new(StringArray::from_slice(["foo"; 10])) as _;
let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
let data = RecordBatch::try_new(
schema.clone(),
vec![

View File

@@ -27,7 +27,7 @@ use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogi
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datatypes::arrow::compute;
@@ -190,14 +190,6 @@ impl ExecutionPlan for SeriesDivideExec {
}))
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
}
}
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metric.clone_inner())
}
@@ -213,6 +205,16 @@ impl ExecutionPlan for SeriesDivideExec {
}
}
impl DisplayAs for SeriesDivideExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(f, "PromSeriesDivideExec: tags={:?}", self.tag_columns)
}
}
}
}
/// Assume the input stream is ordered on the tag columns.
pub struct SeriesDivideStream {
tag_indices: Vec<usize>,
@@ -312,7 +314,6 @@ impl SeriesDivideStream {
#[cfg(test)]
mod test {
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::prelude::SessionContext;
@@ -324,20 +325,20 @@ mod test {
Field::new("path", DataType::Utf8, true),
]));
let path_column_1 = Arc::new(StringArray::from_slice([
let path_column_1 = Arc::new(StringArray::from(vec![
"foo", "foo", "foo", "bar", "bar", "bar", "bar", "bar", "bar", "bla", "bla", "bla",
])) as _;
let host_column_1 = Arc::new(StringArray::from_slice([
let host_column_1 = Arc::new(StringArray::from(vec![
"000", "000", "001", "002", "002", "002", "002", "002", "003", "005", "005", "005",
])) as _;
let path_column_2 = Arc::new(StringArray::from_slice(["bla", "bla", "bla"])) as _;
let host_column_2 = Arc::new(StringArray::from_slice(["005", "005", "005"])) as _;
let path_column_2 = Arc::new(StringArray::from(vec!["bla", "bla", "bla"])) as _;
let host_column_2 = Arc::new(StringArray::from(vec!["005", "005", "005"])) as _;
let path_column_3 = Arc::new(StringArray::from_slice([
let path_column_3 = Arc::new(StringArray::from(vec![
"bla", "🥺", "🥺", "🥺", "🥺", "🥺", "🫠", "🫠",
])) as _;
let host_column_3 = Arc::new(StringArray::from_slice([
let host_column_3 = Arc::new(StringArray::from(vec![
"005", "001", "001", "001", "001", "001", "001", "001",
])) as _;

View File

@@ -21,7 +21,7 @@ use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use datafusion::common::{DFSchemaRef, OwnedTableReference, Result as DfResult};
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::expr::{AggregateFunction, ScalarFunction, ScalarUDF};
use datafusion::logical_expr::expr::{AggregateFunction, Alias, ScalarFunction, ScalarUDF};
use datafusion::logical_expr::expr_rewriter::normalize_cols;
use datafusion::logical_expr::{
AggregateFunction as AggregateFunctionEnum, BinaryExpr, BuiltinScalarFunction, Cast, Extension,
@@ -44,9 +44,10 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::error::{
CatalogSnafu, ColumnNotFoundSnafu, DataFusionPlanningSnafu, ExpectExprSnafu,
ExpectRangeSelectorSnafu, MultipleVectorSnafu, Result, TableNameNotFoundSnafu,
TimeIndexNotFoundSnafu, UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu,
UnsupportedExprSnafu, ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
ExpectRangeSelectorSnafu, MultipleMetricMatchersSnafu, MultipleVectorSnafu,
NoMetricMatcherSnafu, Result, TableNameNotFoundSnafu, TimeIndexNotFoundSnafu,
UnexpectedPlanExprSnafu, UnexpectedTokenSnafu, UnknownTableSnafu, UnsupportedExprSnafu,
ValueNotFoundSnafu, ZeroRangeSelectorSnafu,
};
use crate::extension_plan::{
build_special_time_expr, EmptyMetric, InstantManipulate, Millisecond, RangeManipulate,
@@ -338,12 +339,12 @@ impl PromPlanner {
})
}
PromExpr::VectorSelector(VectorSelector {
name: _,
name,
offset,
matchers,
at: _,
}) => {
let matchers = self.preprocess_label_matchers(matchers)?;
let matchers = self.preprocess_label_matchers(matchers, name)?;
self.setup_context().await?;
let normalize = self
.selector_to_series_normalize_plan(offset, matchers, false)
@@ -364,14 +365,14 @@ impl PromPlanner {
node: Arc::new(manipulate),
})
}
PromExpr::MatrixSelector(MatrixSelector {
vector_selector,
range,
}) => {
PromExpr::MatrixSelector(MatrixSelector { vs, range }) => {
let VectorSelector {
offset, matchers, ..
} = vector_selector;
let matchers = self.preprocess_label_matchers(matchers)?;
name,
offset,
matchers,
..
} = vs;
let matchers = self.preprocess_label_matchers(matchers, name)?;
self.setup_context().await?;
ensure!(!range.is_zero(), ZeroRangeSelectorSnafu);
@@ -471,13 +472,35 @@ impl PromPlanner {
/// Extract metric name from `__name__` matcher and set it into [PromPlannerContext].
/// Returns a new [Matchers] that doesn't contains metric name matcher.
fn preprocess_label_matchers(&mut self, label_matchers: &Matchers) -> Result<Matchers> {
///
/// Name rule:
/// - if `name` is some, then the matchers MUST NOT contains `__name__` matcher.
/// - if `name` is none, then the matchers MAY contains NONE OR MULTIPLE `__name__` matchers.
fn preprocess_label_matchers(
&mut self,
label_matchers: &Matchers,
name: &Option<String>,
) -> Result<Matchers> {
let metric_name;
if let Some(name) = name.clone() {
metric_name = Some(name);
ensure!(
label_matchers.find_matcher(METRIC_NAME).is_none(),
MultipleMetricMatchersSnafu
);
} else {
metric_name = Some(
label_matchers
.find_matcher(METRIC_NAME)
.context(NoMetricMatcherSnafu)?,
);
}
self.ctx.table_name = metric_name;
let mut matchers = HashSet::new();
for matcher in &label_matchers.matchers {
// TODO(ruihang): support other metric match ops
if matcher.name == METRIC_NAME && matches!(matcher.op, MatchOp::Equal) {
self.ctx.table_name = Some(matcher.value.clone());
} else if matcher.name == FIELD_COLUMN_MATCHER {
if matcher.name == FIELD_COLUMN_MATCHER {
self.ctx
.field_column_matcher
.get_or_insert_default()
@@ -486,6 +509,7 @@ impl PromPlanner {
let _ = matchers.insert(matcher.clone());
}
}
let matchers = matchers.into_iter().collect();
Ok(Matchers { matchers })
}
@@ -640,8 +664,8 @@ impl PromPlanner {
) -> Result<Vec<DfExpr>> {
match modifier {
LabelModifier::Include(labels) => {
let mut exprs = Vec::with_capacity(labels.len());
for label in labels {
let mut exprs = Vec::with_capacity(labels.labels.len());
for label in &labels.labels {
// nonexistence label will be ignored
if let Ok(field) = input_schema.field_with_unqualified_name(label) {
exprs.push(DfExpr::Column(Column::from(field.name())));
@@ -649,7 +673,7 @@ impl PromPlanner {
}
// change the tag columns in context
self.ctx.tag_columns = labels.iter().cloned().collect();
self.ctx.tag_columns = labels.labels.clone();
// add timestamp column
exprs.push(self.create_time_index_column_expr()?);
@@ -665,7 +689,7 @@ impl PromPlanner {
// remove "without"-ed fields
// nonexistence label will be ignored
for label in labels {
for label in &labels.labels {
let _ = all_fields.remove(label);
}
@@ -1250,7 +1274,7 @@ impl PromPlanner {
let field_columns_iter = result_field_columns
.into_iter()
.zip(self.ctx.field_columns.iter())
.map(|(expr, name)| Ok(DfExpr::Alias(Box::new(expr), name.to_string())));
.map(|(expr, name)| Ok(DfExpr::Alias(Alias::new(expr, name.to_string()))));
// chain non-value columns (unchanged) and value columns (applied computation then alias)
let project_fields = non_field_columns_iter
@@ -1315,6 +1339,7 @@ mod test {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use promql_parser::label::Labels;
use promql_parser::parser;
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
@@ -1615,7 +1640,7 @@ mod test {
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt.clone())
.await
.unwrap();
let expected_no_without = String::from(
let expected_no_without = String::from(
"Sort: some_metric.tag_1 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_1, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_1:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\
@@ -1632,15 +1657,15 @@ mod test {
// test group without
if let PromExpr::Aggregate(AggregateExpr { modifier, .. }) = &mut eval_stmt.expr {
*modifier = Some(LabelModifier::Exclude(
vec![String::from("tag_1")].into_iter().collect(),
));
*modifier = Some(LabelModifier::Exclude(Labels {
labels: vec![String::from("tag_1")].into_iter().collect(),
}));
}
let table_provider = build_test_table_provider("some_metric".to_string(), 2, 2).await;
let plan = PromPlanner::stmt_to_plan(table_provider, eval_stmt)
.await
.unwrap();
let expected_without = String::from(
let expected_without = String::from(
"Sort: some_metric.tag_0 ASC NULLS LAST, some_metric.timestamp ASC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n Aggregate: groupBy=[[some_metric.tag_0, some_metric.timestamp]], aggr=[[TEMPLATE(some_metric.field_0), TEMPLATE(some_metric.field_1)]] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), TEMPLATE(some_metric.field_0):Float64;N, TEMPLATE(some_metric.field_1):Float64;N]\
\n PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N, field_1:Float64;N]\

View File

@@ -27,7 +27,7 @@ use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::ContextProvider;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{DataFusionError, OwnedTableReference};
use datafusion_expr::TableSource;
use datafusion_expr::{TableSource, WindowUDF};
use datafusion_physical_expr::var_provider::{is_system_variables, VarType};
use datafusion_sql::parser::Statement as DfStatement;
use session::context::QueryContextRef;
@@ -115,6 +115,10 @@ impl ContextProvider for DfContextProviderAdapter {
})
}
fn get_window_meta(&self, _name: &str) -> Option<Arc<WindowUDF>> {
None
}
fn get_variable_type(&self, variable_names: &[String]) -> Option<DataType> {
if variable_names.is_empty() {
return None;

View File

@@ -118,7 +118,6 @@ impl Categorizer {
| Expr::Exists(_) => Commutativity::Commutative,
Expr::Like(_)
| Expr::ILike(_)
| Expr::SimilarTo(_)
| Expr::IsUnknown(_)
| Expr::IsNotUnknown(_)
@@ -136,7 +135,7 @@ impl Categorizer {
| Expr::ScalarSubquery(_)
| Expr::Wildcard => Commutativity::Unimplemented,
Expr::Alias(_, _)
Expr::Alias(_)
| Expr::QualifiedWildcard { .. }
| Expr::GroupingSet(_)
| Expr::Placeholder(_)

View File

@@ -30,7 +30,7 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
};
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::PhysicalSortExpr;
@@ -243,7 +243,9 @@ impl ExecutionPlan for MergeScanExec {
fn statistics(&self) -> Statistics {
Statistics::default()
}
}
impl DisplayAs for MergeScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MergeScanExec: peers=[")?;
for peer in self.peers.iter() {

View File

@@ -26,8 +26,8 @@ use common_meta::table_name::TableName;
use datafusion::common::Result;
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::planner::ExtensionPlanner;
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};

View File

@@ -337,11 +337,7 @@ mod test {
expr: VectorSelector(VectorSelector { \
name: Some(\"http_request\"), \
matchers: Matchers { \
matchers: {Matcher { \
op: Equal, \
name: \"__name__\", \
value: \"http_request\" \
}} }, \
matchers: [] }, \
offset: None, at: None }), \
start: SystemTime { tv_sec: 1644772440, tv_nsec: 0 }, \
end: SystemTime { tv_sec: 1676308440, tv_nsec: 0 }, \

View File

@@ -47,6 +47,7 @@ impl QueryEngineContext {
state.config().clone(),
state.scalar_functions().clone(),
state.aggregate_functions().clone(),
state.window_functions().clone(),
state.runtime_env().clone(),
))
}

View File

@@ -23,7 +23,7 @@ use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::SessionContext;
use common_query::prelude::ScalarUdf;
use datafusion::catalog::catalog::MemoryCatalogList;
use datafusion::catalog::MemoryCatalogList;
use datafusion::dataframe::DataFrame;
use datafusion::error::Result as DfResult;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
@@ -32,8 +32,8 @@ use datafusion::physical_optimizer::dist_enforcement::EnforceDistribution;
use datafusion::physical_optimizer::repartition::Repartition;
use datafusion::physical_optimizer::sort_enforcement::EnforceSorting;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner};
use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;

View File

@@ -61,7 +61,7 @@ rustpython-vm = { git = "https://github.com/discord9/RustPython", optional = tru
"default",
"codegen",
] }
pyo3 = { version = "0.18", optional = true, features = ["abi3", "abi3-py37"] }
pyo3 = { version = "0.19", optional = true, features = ["abi3", "abi3-py37"] }
session = { path = "../session" }
snafu = { version = "0.7", features = ["backtraces"] }
sql = { path = "../sql" }

View File

@@ -178,24 +178,24 @@ coprocessor = copr
/// 4. a single constant, will be expanded to a PyVector of length of `col_len`
fn py_any_to_vec(obj: &PyAny, col_len: usize) -> PyResult<Vec<VectorRef>> {
let is_literal = |obj: &PyAny| -> PyResult<bool> {
Ok(obj.is_instance_of::<PyInt>()?
|| obj.is_instance_of::<PyFloat>()?
|| obj.is_instance_of::<PyString>()?
|| obj.is_instance_of::<PyBool>()?)
Ok(obj.is_instance_of::<PyInt>()
|| obj.is_instance_of::<PyFloat>()
|| obj.is_instance_of::<PyString>()
|| obj.is_instance_of::<PyBool>())
};
let check = if obj.is_instance_of::<PyTuple>()? {
let check = if obj.is_instance_of::<PyTuple>() {
let tuple = obj.downcast::<PyTuple>()?;
(0..tuple.len())
.map(|idx| {
tuple.get_item(idx).map(|i| -> PyResult<bool> {
Ok(i.is_instance_of::<PyVector>()?
|| i.is_instance_of::<PyList>()?
Ok(i.is_instance_of::<PyVector>()
|| i.is_instance_of::<PyList>()
|| is_literal(i)?)
})
})
.all(|i| matches!(i, Ok(Ok(true))))
} else {
obj.is_instance_of::<PyVector>()? || obj.is_instance_of::<PyList>()? || is_literal(obj)?
obj.is_instance_of::<PyVector>() || obj.is_instance_of::<PyList>() || is_literal(obj)?
};
if !check {
return Err(PyRuntimeError::new_err(format!(
@@ -241,13 +241,13 @@ fn py_list_to_vec(list: &PyList) -> PyResult<VectorRef> {
let mut expected_type = None;
let mut v = Vec::with_capacity(list.len());
for (idx, elem) in list.iter().enumerate() {
let (elem_ty, con_type) = if elem.is_instance_of::<PyBool>()? {
let (elem_ty, con_type) = if elem.is_instance_of::<PyBool>() {
(ExpectType::Bool, ConcreteDataType::boolean_datatype())
} else if elem.is_instance_of::<PyInt>()? {
} else if elem.is_instance_of::<PyInt>() {
(ExpectType::Int, ConcreteDataType::int64_datatype())
} else if elem.is_instance_of::<PyFloat>()? {
} else if elem.is_instance_of::<PyFloat>() {
(ExpectType::Float, ConcreteDataType::float64_datatype())
} else if elem.is_instance_of::<PyString>()? {
} else if elem.is_instance_of::<PyString>() {
(ExpectType::String, ConcreteDataType::string_datatype())
} else {
return Err(PyRuntimeError::new_err(format!(

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::Not;
use arrow::compute;
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datafusion::dataframe::DataFrame as DfDataFrame;

View File

@@ -246,8 +246,8 @@ pub fn try_into_columnar_value(py: Python<'_>, obj: PyObject) -> PyResult<Column
}
if let Ok(v) = obj.extract::<PyVector>(py) {
Ok(ColumnarValue::Array(v.to_arrow_array()))
} else if obj.as_ref(py).is_instance_of::<PyList>()?
|| obj.as_ref(py).is_instance_of::<PyTuple>()?
} else if obj.as_ref(py).is_instance_of::<PyList>()
|| obj.as_ref(py).is_instance_of::<PyTuple>()
{
let ret: Vec<ScalarValue> = {
if let Ok(val) = obj.downcast::<PyList>(py) {

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use arrow::array::{make_array, ArrayData};
use arrow::pyarrow::PyArrowConvert;
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use datafusion::arrow::array::BooleanArray;
use datafusion::arrow::compute;
use datafusion::arrow::compute::kernels::{arithmetic, comparison};
@@ -33,10 +33,10 @@ use crate::python::pyo3::utils::{pyo3_obj_try_to_typed_val, to_py_err};
macro_rules! get_con_type {
($obj:ident, $($pyty:ident => $con_ty:ident),*$(,)?) => {
$(
if $obj.is_instance_of::<$pyty>()?{
if $obj.is_instance_of::<$pyty>() {
Ok(ConcreteDataType::$con_ty())
}
)else* else{
) else* else{
Err(PyValueError::new_err("Unsupported pyobject type: {obj:?}"))
}
};

View File

@@ -374,7 +374,7 @@ pub(crate) mod data_frame {
#[pymethod(magic)]
fn invert(zelf: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyExpr> {
let zelf = obj_cast_to::<Self>(zelf, vm)?;
Ok(zelf.inner.clone().not().into())
Ok((!zelf.inner.clone()).into())
}
/// sort ascending&nulls_first

View File

@@ -752,14 +752,12 @@ fn promql_expr_to_metric_name(expr: &PromqlExpr) -> Option<String> {
PromqlExpr::NumberLiteral(_) => Some(String::new()),
PromqlExpr::StringLiteral(_) => Some(String::new()),
PromqlExpr::Extension(_) => None,
PromqlExpr::VectorSelector(VectorSelector { matchers, .. }) => {
matchers.find_matchers(METRIC_NAME).pop().cloned()
PromqlExpr::VectorSelector(VectorSelector { name, matchers, .. }) => {
name.clone().or(matchers.find_matcher(METRIC_NAME))
}
PromqlExpr::MatrixSelector(MatrixSelector {
vector_selector, ..
}) => {
let VectorSelector { matchers, .. } = vector_selector;
matchers.find_matchers(METRIC_NAME).pop().cloned()
PromqlExpr::MatrixSelector(MatrixSelector { vs, .. }) => {
let VectorSelector { name, matchers, .. } = vs;
name.clone().or(matchers.find_matcher(METRIC_NAME))
}
PromqlExpr::Call(Call { args, .. }) => {
args.args.iter().find_map(|e| promql_expr_to_metric_name(e))

View File

@@ -20,11 +20,11 @@ use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::DfPhysicalPlan;
use common_recordbatch::OrderOption;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::datasource::datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown;
use datafusion::datasource::{TableProvider, TableType as DfTableType};
use datafusion::error::Result as DfResult;
use datafusion::execution::context::SessionState;
use datafusion_expr::expr::Expr as DfExpr;
use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
use store_api::storage::ScanRequest;

View File

@@ -20,7 +20,6 @@ use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, NUMBERS_
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion_common::from_slice::FromSlice;
use datatypes::arrow::array::UInt32Array;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef};
@@ -147,7 +146,7 @@ impl Stream for NumbersStream {
let numbers: Vec<u32> = (0..self.limit).collect();
let batch = DfRecordBatch::try_new(
self.schema.arrow_schema().clone(),
vec![Arc::new(UInt32Array::from_slice(numbers))],
vec![Arc::new(UInt32Array::from(numbers))],
)
.unwrap();

View File

@@ -42,7 +42,7 @@ impl Debug for StreamScanAdapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamScanAdapter")
.field("stream", &"<SendableRecordBatchStream>")
.field("schema", &self.schema)
.field("schema", &self.schema.arrow_schema().fields)
.finish()
}
}

View File

@@ -20,7 +20,7 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
|_|_TableScan: integers projection=[i]_|
| physical_plan | SortPreservingMergeExec: [integers.i % Int64(2)@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[integers.i % Int64(2)@0 ASC NULLS LAST]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[] |
|_|_CoalesceBatchesExec: target_batch_size=8192_|
|_|_RepartitionExec: partitioning=REDACTED
|_|_AggregateExec: mode=Partial, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[]_|

View File

@@ -56,7 +56,7 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=1 WHERE i1.i>
SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i IS NOT NULL ORDER BY 2;
Error: 1003(Internal), This feature is not implemented: Unsupported expression: greptime.public.integers.i IS NOT NULL
Error: 1003(Internal), This feature is not implemented: Unsupported expression: IsNotNull(Column(Column { relation: Some(Full { catalog: "greptime", schema: "public", table: "integers" }), name: "i" }))
SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i>1 ORDER BY 2;
@@ -65,7 +65,7 @@ SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i>
SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE CASE WHEN i2.i IS NULL THEN False ELSE True END ORDER BY 2;
Error: 1003(Internal), This feature is not implemented: Unsupported expression: greptime.public.integers.i IS NOT NULL
Error: 1003(Internal), This feature is not implemented: Unsupported expression: IsNotNull(Column(Column { relation: Some(Full { catalog: "greptime", schema: "public", table: "integers" }), name: "i" }))
SELECT DISTINCT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i IS NULL ORDER BY 1;

View File

@@ -1,67 +1,67 @@
explain select * from numbers;
+---------------+------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number desc;
+---------------+--------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+--------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number asc;
+---------------+--------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+--------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number desc limit 10;
+---------------+---------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number asc limit 10;
+---------------+------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | MergeScan [is_placeholder=false] |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -18,8 +18,7 @@ TQL ANALYZE (0, 10, '5s') test;
+-+-+
| plan_type_| plan_|
+-+-+
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED

View File

@@ -64,7 +64,7 @@ SELECT DISTINCT MAX(b) FROM test GROUP BY a;
SELECT DISTINCT CASE WHEN a > 11 THEN 11 ELSE a END FROM test;
+-------------------------------------------------------------+
| CASE WHEN test.a > Int64(11) THEN Int64(11) ELSE test.a END |
| CASE WHEN test.a > Int32(11) THEN Int64(11) ELSE test.a END |
+-------------------------------------------------------------+
| 11 |
+-------------------------------------------------------------+

View File

@@ -54,7 +54,11 @@ with cte1 as (Select i as j from a) select * from (with cte2 as (select max(j) a
-- this feature is not implemented in datafusion
with cte1 as (Select i as j from a) select * from cte1 where j = (with cte2 as (select max(j) as j from cte1) select j from cte2);
Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression (<subquery>)
+----+
| j |
+----+
| 42 |
+----+
-- Refer to same-named CTE in a subquery expression
-- this feature is not implemented in datafusion

View File

@@ -1,62 +1,62 @@
explain select * from numbers;
+---------------+----------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------+
| logical_plan | TableScan: numbers projection=[number] |
| physical_plan | ExecutionPlan(PlaceHolder) |
| | |
+---------------+----------------------------------------+
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | TableScan: numbers projection=[number] |
| physical_plan | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number desc;
+---------------+------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number DESC NULLS FIRST |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number asc;
+---------------+------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------+
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: numbers.number ASC NULLS LAST |
| | TableScan: numbers projection=[number] |
| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number desc limit 10;
+---------------+---------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number DESC NULLS FIRST, fetch=10 |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 DESC] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
explain select * from numbers order by number asc limit 10;
+---------------+------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Limit: skip=0, fetch=10 |
| | Sort: numbers.number ASC NULLS LAST, fetch=10 |
| | TableScan: numbers projection=[number] |
| physical_plan | GlobalLimitExec: skip=0, fetch=10 |
| | SortExec: fetch=10, expr=[number@0 ASC NULLS LAST] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

View File

@@ -17,13 +17,12 @@ TQL ANALYZE (0, 10, '5s') test;
+-+-+
| plan_type_| plan_|
+-+-+
| Plan with Metrics | CoalescePartitionsExec, REDACTED
|_|_PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
| Plan with Metrics | PromInstantManipulateExec: range=[0..10000], lookback=[300000], interval=[5000], time index=[j], REDACTED
|_|_PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false], REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_RepartitionExec: partitioning=REDACTED
|_|_PromSeriesDivideExec: tags=["k"], REDACTED
|_|_ExecutionPlan(PlaceHolder), REDACTED
|_|_StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] }, REDACTED
|_|_|
+-+-+

View File

@@ -11,21 +11,21 @@ Affected Rows: 3
-- SQLNESS REPLACE (RoundRobinBatch.*) REDACTED
TQL EXPLAIN (0, 10, '5s') test;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | PromInstantManipulate: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalize: offset=[0], time index=[j], filter NaN: [false] |
| | PromSeriesDivide: tags=["k"] |
| | Sort: test.k DESC NULLS LAST, test.j DESC NULLS LAST |
| | TableScan: test projection=[i, j, k], partial_filters=[j >= TimestampMillisecond(-300000, None), j <= TimestampMillisecond(300000, None)] |
| physical_plan | PromInstantManipulateExec: range=[0..0], lookback=[300000], interval=[300000], time index=[j] |
| | PromSeriesNormalizeExec: offset=[0], time index=[j], filter NaN: [false] |
| | RepartitionExec: partitioning=REDACTED
| | PromSeriesDivideExec: tags=["k"] |
| | ExecutionPlan(PlaceHolder) |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
| | PromSeriesDivideExec: tags=["k"] |
| | StreamScanAdapter { stream: "<SendableRecordBatchStream>", schema: [Field { name: "i", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "j", data_type: Timestamp(Millisecond, None), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"greptime:time_index": "true"} }, Field { name: "k", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }] } |
| | |
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
DROP TABLE test;