diff --git a/src/common/query/src/logical_plan/mod.rs b/src/common/query/src/logical_plan.rs similarity index 99% rename from src/common/query/src/logical_plan/mod.rs rename to src/common/query/src/logical_plan.rs index 5f57cd96aa..fbf746c5be 100644 --- a/src/common/query/src/logical_plan/mod.rs +++ b/src/common/query/src/logical_plan.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use datatypes::prelude::ConcreteDataType; pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef}; -pub use self::expr::Expr; +pub use self::expr::{DfExpr, Expr}; pub use self::udaf::AggregateFunction; pub use self::udf::ScalarUdf; use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation}; diff --git a/src/common/query/src/logical_plan/accumulator.rs b/src/common/query/src/logical_plan/accumulator.rs index 717214f3ff..483fb74ac8 100644 --- a/src/common/query/src/logical_plan/accumulator.rs +++ b/src/common/query/src/logical_plan/accumulator.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use common_time::timestamp::TimeUnit; use datafusion_common::Result as DfResult; -use datafusion_expr::Accumulator as DfAccumulator; +use datafusion_expr::{Accumulator as DfAccumulator, AggregateState}; use datatypes::arrow::array::ArrayRef; use datatypes::prelude::*; use datatypes::value::ListValue; @@ -128,7 +128,7 @@ impl DfAccumulatorAdaptor { } impl DfAccumulator for DfAccumulatorAdaptor { - fn state(&self) -> DfResult> { + fn state(&self) -> DfResult> { let state_values = self.accumulator.state()?; let state_types = self.creator.state_types()?; if state_values.len() != state_types.len() { @@ -141,7 +141,10 @@ impl DfAccumulator for DfAccumulatorAdaptor { Ok(state_values .into_iter() .zip(state_types.iter()) - .map(|(v, t)| try_into_scalar_value(v, t)) + .map(|(v, t)| { + let scalar = try_into_scalar_value(v, t)?; + Ok(AggregateState::Scalar(scalar)) + }) .collect::>>() .map_err(Error::from)?) } @@ -234,7 +237,7 @@ fn try_convert_null_value(datatype: &ConcreteDataType) -> Result { ConcreteDataType::Float64(_) => ScalarValue::Float64(None), ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None), ConcreteDataType::String(_) => ScalarValue::Utf8(None), - ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit, None), + ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None), _ => { return error::BadAccumulatorImplSnafu { err_msg: format!( diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index 45cb12cdeb..bafce9f65a 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use datafusion::logical_plan::Expr as DfExpr; +pub use datafusion_expr::expr::Expr as DfExpr; /// Central struct of query API. /// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`. diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs index fae0443897..7d9861c329 100644 --- a/src/common/query/src/physical_plan.rs +++ b/src/common/query/src/physical_plan.rs @@ -21,7 +21,7 @@ use common_recordbatch::adapter::{AsyncRecordBatchStreamAdapter, DfRecordBatchSt use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream}; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; -pub use datafusion::execution::runtime_env::RuntimeEnv; +pub use datafusion::execution::context::TaskContext; use datafusion::physical_plan::expressions::PhysicalSortExpr; pub use datafusion::physical_plan::Partitioning; use datafusion::physical_plan::Statistics; @@ -63,7 +63,7 @@ pub trait PhysicalPlan: Debug + Send + Sync { fn execute( &self, partition: usize, - runtime: Arc, + context: Arc, ) -> Result; } @@ -119,10 +119,10 @@ impl PhysicalPlan for PhysicalPlanAdapter { fn execute( &self, partition: usize, - runtime: Arc, + context: Arc, ) -> Result { let df_plan = self.df_plan.clone(); - let stream = Box::pin(async move { df_plan.execute(partition, runtime).await }); + let stream = Box::pin(async move { df_plan.execute(partition, context).await }); let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream); Ok(Box::pin(stream)) @@ -132,7 +132,6 @@ impl PhysicalPlan for PhysicalPlanAdapter { #[derive(Debug)] pub struct DfPhysicalPlanAdapter(pub PhysicalPlanRef); -#[async_trait] impl DfPhysicalPlan for DfPhysicalPlanAdapter { fn as_any(&self) -> &dyn Any { self @@ -159,7 +158,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { } fn with_new_children( - &self, + self: Arc, children: Vec>, ) -> DfResult> { let df_schema = self.schema(); @@ -177,12 +176,12 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { Ok(Arc::new(DfPhysicalPlanAdapter(plan))) } - async fn execute( + fn execute( &self, partition: usize, - runtime: Arc, + context: Arc, ) -> DfResult { - let stream = self.0.execute(partition, runtime)?; + let stream = self.0.execute(partition, context)?; Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))) } @@ -195,15 +194,14 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter { #[cfg(test)] mod test { use common_recordbatch::{RecordBatch, RecordBatches}; - use datafusion::arrow_print; - use datafusion::datasource::TableProvider as DfTableProvider; - use datafusion::logical_plan::LogicalPlanBuilder; + use datafusion::datasource::{TableProvider as DfTableProvider, TableType}; + use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::physical_plan::collect; use datafusion::physical_plan::empty::EmptyExec; - use datafusion::prelude::ExecutionContext; - use datafusion_common::field_util::SchemaExt; + use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; use datafusion_expr::Expr; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datatypes::arrow::util::pretty; use datatypes::schema::Schema; use datatypes::vectors::Int32Vector; @@ -225,8 +223,13 @@ mod test { )])) } + fn table_type(&self) -> TableType { + TableType::Base + } + async fn scan( &self, + _ctx: &SessionState, _projection: &Option>, _filters: &[Expr], _limit: Option, @@ -269,7 +272,7 @@ mod test { fn execute( &self, _partition: usize, - _runtime: Arc, + _context: Arc, ) -> Result { let schema = self.schema(); let recordbatches = RecordBatches::try_new( @@ -295,16 +298,16 @@ mod test { // Test our physical plan can be executed by DataFusion, through adapters. #[tokio::test] async fn test_execute_physical_plan() { - let ctx = ExecutionContext::new(); + let ctx = SessionContext::new(); let logical_plan = LogicalPlanBuilder::scan("test", Arc::new(MyDfTableProvider), None) .unwrap() .build() .unwrap(); let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap(); - let df_recordbatches = collect(physical_plan, Arc::new(RuntimeEnv::default())) + let df_recordbatches = collect(physical_plan, Arc::new(TaskContext::from(&ctx))) .await .unwrap(); - let pretty_print = arrow_print::write(&df_recordbatches); + let pretty_print = pretty::pretty_format_batches(&df_recordbatches).unwrap(); let pretty_print = pretty_print.lines().collect::>(); assert_eq!( pretty_print, diff --git a/src/common/query/src/signature.rs b/src/common/query/src/signature.rs index c8d4963b6e..1d57ee7992 100644 --- a/src/common/query/src/signature.rs +++ b/src/common/query/src/signature.rs @@ -15,7 +15,7 @@ //! Signature module contains foundational types that are used to represent signatures, types, //! and return types of functions. //! Copied and modified from datafusion. -pub use datafusion::physical_plan::functions::Volatility; +pub use datafusion_expr::Volatility; use datafusion_expr::{Signature as DfSignature, TypeSignature as DfTypeSignature}; use datatypes::arrow::datatypes::DataType as ArrowDataType; use datatypes::data_type::DataType; diff --git a/src/common/recordbatch/src/recordbatch.rs b/src/common/recordbatch/src/recordbatch.rs index 534722e988..c3b90ba379 100644 --- a/src/common/recordbatch/src/recordbatch.rs +++ b/src/common/recordbatch/src/recordbatch.rs @@ -168,11 +168,9 @@ mod tests { let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); let numbers: Vec = (0..10).collect(); - let df_batch = DfRecordBatch::try_new( - arrow_schema, - vec![Arc::new(UInt32Array::from(numbers))], - ) - .unwrap(); + let df_batch = + DfRecordBatch::try_new(arrow_schema, vec![Arc::new(UInt32Array::from(numbers))]) + .unwrap(); let batch = RecordBatch { schema, diff --git a/src/common/recordbatch/src/util.rs b/src/common/recordbatch/src/util.rs index 461a64d8ae..d2c2987f46 100644 --- a/src/common/recordbatch/src/util.rs +++ b/src/common/recordbatch/src/util.rs @@ -30,7 +30,9 @@ mod tests { use datatypes::arrow::array::UInt32Array; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; - use datatypes::schema::{Schema, SchemaRef}; + use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; + use datatypes::prelude::*; + use datatypes::vectors::UInt32Vector; use futures::task::{Context, Poll}; use futures::Stream; @@ -64,12 +66,13 @@ mod tests { #[tokio::test] async fn test_collect() { - let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + let column_schemas = vec![ColumnSchema::new( "number", - DataType::UInt32, + ConcreteDataType::uint32_datatype(), false, - )])); - let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); + )]; + + let schema = Arc::new(Schema::try_new(column_schemas).unwrap()); let stream = MockRecordBatchStream { schema: schema.clone(), @@ -80,19 +83,17 @@ mod tests { assert_eq!(0, batches.len()); let numbers: Vec = (0..10).collect(); - let df_batch = DfRecordBatch::try_new( - arrow_schema.clone(), - vec![Arc::new(UInt32Array::from_slice(&numbers))], + let columns = [ + Arc::new(UInt32Vector::from_vec(numbers)) as _, + ]; + let batch = RecordBatch::new( + schema.clone(), + columns, ) .unwrap(); - let batch = RecordBatch { - schema: schema.clone(), - df_recordbatch: df_batch, - }; - let stream = MockRecordBatchStream { - schema: Arc::new(Schema::try_from(arrow_schema).unwrap()), + schema: schema.clone(), batch: Some(batch.clone()), }; let batches = collect(Box::pin(stream)).await.unwrap(); diff --git a/src/datatypes/src/types/timestamp_type.rs b/src/datatypes/src/types/timestamp_type.rs index fe86eeb8fd..8f38a57b55 100644 --- a/src/datatypes/src/types/timestamp_type.rs +++ b/src/datatypes/src/types/timestamp_type.rs @@ -50,6 +50,18 @@ pub enum TimestampType { Nanosecond(TimestampNanosecondType), } +impl TimestampType { + /// Returns the [`TimeUnit`] of this type. + pub fn unit(&self) -> TimeUnit { + match self { + TimestampType::Second(_) => TimeUnit::Second, + TimestampType::Millisecond(_) => TimeUnit::Millisecond, + TimestampType::Microsecond(_) => TimeUnit::Microsecond, + TimestampType::Nanosecond(_) => TimeUnit::Nanosecond, + } + } +} + macro_rules! impl_data_type_for_timestamp { ($unit: ident) => { paste! { @@ -82,7 +94,6 @@ macro_rules! impl_data_type_for_timestamp { } } - impl LogicalPrimitiveType for [] { type ArrowPrimitive = []; type Native = i64; @@ -138,3 +149,28 @@ impl_data_type_for_timestamp!(Nanosecond); impl_data_type_for_timestamp!(Second); impl_data_type_for_timestamp!(Millisecond); impl_data_type_for_timestamp!(Microsecond); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_timestamp_type_unit() { + assert_eq!( + TimeUnit::Second, + TimestampType::Second(TimestampSecondType).unit() + ); + assert_eq!( + TimeUnit::Millisecond, + TimestampType::Millisecond(TimestampMillisecondType).unit() + ); + assert_eq!( + TimeUnit::Microsecond, + TimestampType::Microsecond(TimestampMicrosecondType).unit() + ); + assert_eq!( + TimeUnit::Nanosecond, + TimestampType::Nanosecond(TimestampNanosecondType).unit() + ); + } +} diff --git a/src/query/src/expr.rs b/src/query/src/expr.rs deleted file mode 100644 index 3a2a59181e..0000000000 --- a/src/query/src/expr.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2022 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License.