feat: Fix some compiler errors in common::query (#710)

* feat: Fix some compiler errors in common::query

* feat: test_collect use vectors api
This commit is contained in:
Yingwen
2022-12-06 15:32:12 +08:00
committed by GitHub
parent 0ccb8b4302
commit b32438e78c
9 changed files with 86 additions and 58 deletions

View File

@@ -22,7 +22,7 @@ use std::sync::Arc;
use datatypes::prelude::ConcreteDataType;
pub use self::accumulator::{Accumulator, AggregateFunctionCreator, AggregateFunctionCreatorRef};
pub use self::expr::Expr;
pub use self::expr::{DfExpr, Expr};
pub use self::udaf::AggregateFunction;
pub use self::udf::ScalarUdf;
use crate::function::{ReturnTypeFunction, ScalarFunctionImplementation};

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use common_time::timestamp::TimeUnit;
use datafusion_common::Result as DfResult;
use datafusion_expr::Accumulator as DfAccumulator;
use datafusion_expr::{Accumulator as DfAccumulator, AggregateState};
use datatypes::arrow::array::ArrayRef;
use datatypes::prelude::*;
use datatypes::value::ListValue;
@@ -128,7 +128,7 @@ impl DfAccumulatorAdaptor {
}
impl DfAccumulator for DfAccumulatorAdaptor {
fn state(&self) -> DfResult<Vec<ScalarValue>> {
fn state(&self) -> DfResult<Vec<AggregateState>> {
let state_values = self.accumulator.state()?;
let state_types = self.creator.state_types()?;
if state_values.len() != state_types.len() {
@@ -141,7 +141,10 @@ impl DfAccumulator for DfAccumulatorAdaptor {
Ok(state_values
.into_iter()
.zip(state_types.iter())
.map(|(v, t)| try_into_scalar_value(v, t))
.map(|(v, t)| {
let scalar = try_into_scalar_value(v, t)?;
Ok(AggregateState::Scalar(scalar))
})
.collect::<Result<Vec<_>>>()
.map_err(Error::from)?)
}
@@ -234,7 +237,7 @@ fn try_convert_null_value(datatype: &ConcreteDataType) -> Result<ScalarValue> {
ConcreteDataType::Float64(_) => ScalarValue::Float64(None),
ConcreteDataType::Binary(_) => ScalarValue::LargeBinary(None),
ConcreteDataType::String(_) => ScalarValue::Utf8(None),
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit, None),
ConcreteDataType::Timestamp(t) => timestamp_to_scalar_value(t.unit(), None),
_ => {
return error::BadAccumulatorImplSnafu {
err_msg: format!(

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use datafusion::logical_plan::Expr as DfExpr;
pub use datafusion_expr::expr::Expr as DfExpr;
/// Central struct of query API.
/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`.

View File

@@ -21,7 +21,7 @@ use common_recordbatch::adapter::{AsyncRecordBatchStreamAdapter, DfRecordBatchSt
use common_recordbatch::{DfSendableRecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
pub use datafusion::execution::runtime_env::RuntimeEnv;
pub use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
pub use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::Statistics;
@@ -63,7 +63,7 @@ pub trait PhysicalPlan: Debug + Send + Sync {
fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>;
}
@@ -119,10 +119,10 @@ impl PhysicalPlan for PhysicalPlanAdapter {
fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let df_plan = self.df_plan.clone();
let stream = Box::pin(async move { df_plan.execute(partition, runtime).await });
let stream = Box::pin(async move { df_plan.execute(partition, context).await });
let stream = AsyncRecordBatchStreamAdapter::new(self.schema(), stream);
Ok(Box::pin(stream))
@@ -132,7 +132,6 @@ impl PhysicalPlan for PhysicalPlanAdapter {
#[derive(Debug)]
pub struct DfPhysicalPlanAdapter(pub PhysicalPlanRef);
#[async_trait]
impl DfPhysicalPlan for DfPhysicalPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
@@ -159,7 +158,7 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
}
fn with_new_children(
&self,
self: Arc<Self>,
children: Vec<Arc<dyn DfPhysicalPlan>>,
) -> DfResult<Arc<dyn DfPhysicalPlan>> {
let df_schema = self.schema();
@@ -177,12 +176,12 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
Ok(Arc::new(DfPhysicalPlanAdapter(plan)))
}
async fn execute(
fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> DfResult<DfSendableRecordBatchStream> {
let stream = self.0.execute(partition, runtime)?;
let stream = self.0.execute(partition, context)?;
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
}
@@ -195,15 +194,14 @@ impl DfPhysicalPlan for DfPhysicalPlanAdapter {
#[cfg(test)]
mod test {
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::arrow_print;
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::logical_plan::LogicalPlanBuilder;
use datafusion::datasource::{TableProvider as DfTableProvider, TableType};
use datafusion::execution::context::{SessionContext, SessionState};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::prelude::ExecutionContext;
use datafusion_common::field_util::SchemaExt;
use datafusion_expr::logical_plan::builder::LogicalPlanBuilder;
use datafusion_expr::Expr;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow::util::pretty;
use datatypes::schema::Schema;
use datatypes::vectors::Int32Vector;
@@ -225,8 +223,13 @@ mod test {
)]))
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_ctx: &SessionState,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
@@ -269,7 +272,7 @@ mod test {
fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let schema = self.schema();
let recordbatches = RecordBatches::try_new(
@@ -295,16 +298,16 @@ mod test {
// Test our physical plan can be executed by DataFusion, through adapters.
#[tokio::test]
async fn test_execute_physical_plan() {
let ctx = ExecutionContext::new();
let ctx = SessionContext::new();
let logical_plan = LogicalPlanBuilder::scan("test", Arc::new(MyDfTableProvider), None)
.unwrap()
.build()
.unwrap();
let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap();
let df_recordbatches = collect(physical_plan, Arc::new(RuntimeEnv::default()))
let df_recordbatches = collect(physical_plan, Arc::new(TaskContext::from(&ctx)))
.await
.unwrap();
let pretty_print = arrow_print::write(&df_recordbatches);
let pretty_print = pretty::pretty_format_batches(&df_recordbatches).unwrap();
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
assert_eq!(
pretty_print,

View File

@@ -15,7 +15,7 @@
//! Signature module contains foundational types that are used to represent signatures, types,
//! and return types of functions.
//! Copied and modified from datafusion.
pub use datafusion::physical_plan::functions::Volatility;
pub use datafusion_expr::Volatility;
use datafusion_expr::{Signature as DfSignature, TypeSignature as DfTypeSignature};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::DataType;

View File

@@ -168,11 +168,9 @@ mod tests {
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
let numbers: Vec<u32> = (0..10).collect();
let df_batch = DfRecordBatch::try_new(
arrow_schema,
vec![Arc::new(UInt32Array::from(numbers))],
)
.unwrap();
let df_batch =
DfRecordBatch::try_new(arrow_schema, vec![Arc::new(UInt32Array::from(numbers))])
.unwrap();
let batch = RecordBatch {
schema,

View File

@@ -30,7 +30,9 @@ mod tests {
use datatypes::arrow::array::UInt32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::prelude::*;
use datatypes::vectors::UInt32Vector;
use futures::task::{Context, Poll};
use futures::Stream;
@@ -64,12 +66,13 @@ mod tests {
#[tokio::test]
async fn test_collect() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
let column_schemas = vec![ColumnSchema::new(
"number",
DataType::UInt32,
ConcreteDataType::uint32_datatype(),
false,
)]));
let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap());
)];
let schema = Arc::new(Schema::try_new(column_schemas).unwrap());
let stream = MockRecordBatchStream {
schema: schema.clone(),
@@ -80,19 +83,17 @@ mod tests {
assert_eq!(0, batches.len());
let numbers: Vec<u32> = (0..10).collect();
let df_batch = DfRecordBatch::try_new(
arrow_schema.clone(),
vec![Arc::new(UInt32Array::from_slice(&numbers))],
let columns = [
Arc::new(UInt32Vector::from_vec(numbers)) as _,
];
let batch = RecordBatch::new(
schema.clone(),
columns,
)
.unwrap();
let batch = RecordBatch {
schema: schema.clone(),
df_recordbatch: df_batch,
};
let stream = MockRecordBatchStream {
schema: Arc::new(Schema::try_from(arrow_schema).unwrap()),
schema: schema.clone(),
batch: Some(batch.clone()),
};
let batches = collect(Box::pin(stream)).await.unwrap();

View File

@@ -50,6 +50,18 @@ pub enum TimestampType {
Nanosecond(TimestampNanosecondType),
}
impl TimestampType {
/// Returns the [`TimeUnit`] of this type.
pub fn unit(&self) -> TimeUnit {
match self {
TimestampType::Second(_) => TimeUnit::Second,
TimestampType::Millisecond(_) => TimeUnit::Millisecond,
TimestampType::Microsecond(_) => TimeUnit::Microsecond,
TimestampType::Nanosecond(_) => TimeUnit::Nanosecond,
}
}
}
macro_rules! impl_data_type_for_timestamp {
($unit: ident) => {
paste! {
@@ -82,7 +94,6 @@ macro_rules! impl_data_type_for_timestamp {
}
}
impl LogicalPrimitiveType for [<Timestamp $unit Type>] {
type ArrowPrimitive = [<Arrow Timestamp $unit Type>];
type Native = i64;
@@ -138,3 +149,28 @@ impl_data_type_for_timestamp!(Nanosecond);
impl_data_type_for_timestamp!(Second);
impl_data_type_for_timestamp!(Millisecond);
impl_data_type_for_timestamp!(Microsecond);
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_timestamp_type_unit() {
assert_eq!(
TimeUnit::Second,
TimestampType::Second(TimestampSecondType).unit()
);
assert_eq!(
TimeUnit::Millisecond,
TimestampType::Millisecond(TimestampMillisecondType).unit()
);
assert_eq!(
TimeUnit::Microsecond,
TimestampType::Microsecond(TimestampMicrosecondType).unit()
);
assert_eq!(
TimeUnit::Nanosecond,
TimestampType::Nanosecond(TimestampNanosecondType).unit()
);
}
}

View File

@@ -1,13 +0,0 @@
// Copyright 2022 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.