diff --git a/Cargo.lock b/Cargo.lock index 64e03cd104..2af5ac466e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -937,6 +937,7 @@ name = "common-query" version = "0.1.0" dependencies = [ "arrow2", + "async-trait", "common-base", "common-error", "common-recordbatch", @@ -955,6 +956,7 @@ name = "common-recordbatch" version = "0.1.0" dependencies = [ "common-error", + "datafusion", "datafusion-common", "datatypes", "futures", @@ -5135,6 +5137,7 @@ dependencies = [ "common-recordbatch", "common-telemetry", "common-time", + "datafusion", "datafusion-common", "datatypes", "futures", diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index a908f6ce65..11c10fa1ce 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -103,6 +103,18 @@ pub enum Error { #[snafu(display("Illegal catalog manager state: {}", msg))] IllegalManagerState { backtrace: Backtrace, msg: String }, + + #[snafu(display("Failed to scan system catalog table, source: {}", source))] + SystemCatalogTableScan { + #[snafu(backtrace)] + source: table::error::Error, + }, + + #[snafu(display("Failed to execute system catalog table scan, source: {}", source))] + SystemCatalogTableScanExec { + #[snafu(backtrace)] + source: common_query::error::Error, + }, } pub type Result = std::result::Result; @@ -131,7 +143,10 @@ impl ErrorExt for Error { | Error::CreateSystemCatalog { source, .. } | Error::InsertTableRecord { source, .. } | Error::OpenTable { source, .. } - | Error::CreateTable { source, .. } => source.status_code(), + | Error::CreateTable { source, .. } + | Error::SystemCatalogTableScan { source } => source.status_code(), + + Error::SystemCatalogTableScanExec { source } => source.status_code(), } } diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index e306b7a8db..b6540361ca 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; use std::sync::Arc; use common_query::logical_plan::Expr; +use common_query::physical_plan::PhysicalPlanRef; +use common_query::physical_plan::RuntimeEnv; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::debug; use common_time::timestamp::Timestamp; @@ -22,7 +24,7 @@ use crate::consts::{ SYSTEM_CATALOG_TABLE_NAME, }; use crate::error::{ - CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, + self, CreateSystemCatalogSnafu, EmptyValueSnafu, Error, InvalidEntryTypeSnafu, InvalidKeySnafu, OpenSystemCatalogSnafu, Result, ValueDeserializeSnafu, }; @@ -51,7 +53,7 @@ impl Table for SystemCatalogTable { _projection: &Option>, _filters: &[Expr], _limit: Option, - ) -> table::Result { + ) -> table::Result { panic!("System catalog table does not support scan!") } @@ -111,7 +113,15 @@ impl SystemCatalogTable { /// Create a stream of all entries inside system catalog table pub async fn records(&self) -> Result { let full_projection = None; - let stream = self.table.scan(&full_projection, &[], None).await.unwrap(); + let scan = self + .table + .scan(&full_projection, &[], None) + .await + .context(error::SystemCatalogTableScanSnafu)?; + let stream = scan + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .context(error::SystemCatalogTableScanExecSnafu)?; Ok(stream) } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index c26c7db286..a22347a018 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -7,8 +7,9 @@ use std::task::{Context, Poll}; use async_stream::stream; use common_query::logical_plan::Expr; +use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream}; use datatypes::prelude::{ConcreteDataType, VectorBuilder}; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::value::Value; @@ -17,6 +18,7 @@ use futures::Stream; use snafu::ResultExt; use table::engine::TableEngineRef; use table::metadata::{TableId, TableInfoRef}; +use table::table::scan::SimpleTableScan; use table::{Table, TableRef}; use crate::consts::{INFORMATION_SCHEMA_NAME, SYSTEM_CATALOG_TABLE_NAME}; @@ -62,7 +64,7 @@ impl Table for Tables { _projection: &Option>, _filters: &[Expr], _limit: Option, - ) -> table::error::Result { + ) -> table::error::Result { let catalogs = self.catalogs.clone(); let schema_ref = self.schema.clone(); let engine_name = self.engine_name.clone(); @@ -89,10 +91,11 @@ impl Table for Tables { } }); - Ok(Box::pin(TablesRecordBatchStream { + let stream = Box::pin(TablesRecordBatchStream { schema: self.schema.clone(), stream: Box::pin(stream), - })) + }); + Ok(Arc::new(SimpleTableScan::new(stream))) } } @@ -277,6 +280,7 @@ fn build_schema_for_tables() -> Schema { #[cfg(test)] mod tests { + use common_query::physical_plan::RuntimeEnv; use datatypes::arrow::array::Utf8Array; use datatypes::arrow::datatypes::DataType; use futures_util::StreamExt; @@ -298,7 +302,11 @@ mod tests { catalog_list.register_catalog("test_catalog".to_string(), catalog_provider); let tables = Tables::new(catalog_list, "test_engine".to_string()); - let mut tables_stream = tables.scan(&None, &[], None).await.unwrap(); + let tables_stream = tables.scan(&None, &[], None).await.unwrap(); + let mut tables_stream = tables_stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); if let Some(t) = tables_stream.next().await { let batch = t.unwrap().df_recordbatch; assert_eq!(1, batch.num_rows()); diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 492ba32a18..93031856a1 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" edition = "2021" [dependencies] +async-trait = "0.1" common-error = { path = "../error" } common-recordbatch = { path = "../recordbatch" } common-time = { path = "../time" } diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index 652b1354e7..0231f7cdc0 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -62,6 +62,36 @@ pub enum InnerError { #[snafu(display("unexpected: not constant column"))] InvalidInputCol { backtrace: Backtrace }, + + #[snafu(display("Not expected to run ExecutionPlan more than once"))] + ExecuteRepeatedly { backtrace: Backtrace }, + + #[snafu(display("General DataFusion error, source: {}", source))] + GeneralDataFusion { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to execute DataFusion ExecutionPlan, source: {}", source))] + DataFusionExecutionPlan { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display( + "Failed to convert DataFusion's recordbatch stream, source: {}", + source + ))] + ConvertDfRecordBatchStream { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to convert arrow schema, source: {}", source))] + ConvertArrowSchema { + #[snafu(backtrace)] + source: DataTypeError, + }, } pub type Result = std::result::Result; @@ -76,9 +106,17 @@ impl ErrorExt for InnerError { | InnerError::InvalidInputState { .. } | InnerError::InvalidInputCol { .. } | InnerError::BadAccumulatorImpl { .. } => StatusCode::EngineExecuteQuery, - InnerError::InvalidInputs { source, .. } => source.status_code(), - InnerError::IntoVector { source, .. } => source.status_code(), - InnerError::FromScalarValue { source } => source.status_code(), + + InnerError::InvalidInputs { source, .. } + | InnerError::IntoVector { source, .. } + | InnerError::FromScalarValue { source } + | InnerError::ConvertArrowSchema { source } => source.status_code(), + + InnerError::ExecuteRepeatedly { .. } + | InnerError::GeneralDataFusion { .. } + | InnerError::DataFusionExecutionPlan { .. } => StatusCode::Unexpected, + + InnerError::ConvertDfRecordBatchStream { source, .. } => source.status_code(), } } @@ -105,6 +143,7 @@ impl From for DataFusionError { #[cfg(test)] mod tests { + use arrow::error::ArrowError; use snafu::GenerateImplicitData; use super::*; @@ -127,6 +166,48 @@ mod tests { .unwrap() .into(); assert_error(&err, StatusCode::EngineExecuteQuery); + + let err: Error = throw_df_error() + .context(GeneralDataFusionSnafu) + .err() + .unwrap() + .into(); + assert_error(&err, StatusCode::Unexpected); + + let err: Error = throw_df_error() + .context(DataFusionExecutionPlanSnafu) + .err() + .unwrap() + .into(); + assert_error(&err, StatusCode::Unexpected); + } + + #[test] + fn test_execute_repeatedly_error() { + let error: Error = None:: + .context(ExecuteRepeatedlySnafu) + .err() + .unwrap() + .into(); + assert_eq!(error.inner.status_code(), StatusCode::Unexpected); + assert!(error.backtrace_opt().is_some()); + } + + #[test] + fn test_convert_df_recordbatch_stream_error() { + let result: std::result::Result = + Err(common_recordbatch::error::InnerError::PollStream { + source: ArrowError::Overflow, + backtrace: Backtrace::generate(), + } + .into()); + let error: Error = result + .context(ConvertDfRecordBatchStreamSnafu) + .err() + .unwrap() + .into(); + assert_eq!(error.inner.status_code(), StatusCode::Internal); + assert!(error.backtrace_opt().is_some()); } fn raise_datatype_error() -> std::result::Result<(), DataTypeError> { diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 798e99f410..19bb5b8f1b 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -4,6 +4,7 @@ pub mod columnar_value; pub mod error; mod function; pub mod logical_plan; +pub mod physical_plan; pub mod prelude; mod signature; @@ -13,3 +14,5 @@ pub enum Output { RecordBatches(RecordBatches), Stream(SendableRecordBatchStream), } + +pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan; diff --git a/src/common/query/src/logical_plan/expr.rs b/src/common/query/src/logical_plan/expr.rs index b190e095ee..bfebbaa9c2 100644 --- a/src/common/query/src/logical_plan/expr.rs +++ b/src/common/query/src/logical_plan/expr.rs @@ -2,7 +2,7 @@ use datafusion::logical_plan::Expr as DfExpr; /// Central struct of query API. /// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`. -#[derive(Clone, PartialEq, Hash)] +#[derive(Clone, PartialEq, Hash, Debug)] pub struct Expr { df_expr: DfExpr, } diff --git a/src/common/query/src/physical_plan.rs b/src/common/query/src/physical_plan.rs new file mode 100644 index 0000000000..2e7972e903 --- /dev/null +++ b/src/common/query/src/physical_plan.rs @@ -0,0 +1,325 @@ +use std::any::Any; +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; +use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; +use common_recordbatch::DfSendableRecordBatchStream; +use common_recordbatch::SendableRecordBatchStream; +use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; +use datafusion::error::Result as DfResult; +pub use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion::physical_plan::expressions::PhysicalSortExpr; +pub use datafusion::physical_plan::Partitioning; +use datafusion::physical_plan::Statistics; +use datatypes::schema::SchemaRef; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::DfPhysicalPlan; + +pub type PhysicalPlanRef = Arc; + +/// `PhysicalPlan` represent nodes in the Physical Plan. +/// +/// Each `PhysicalPlan` is Partition-aware and is responsible for +/// creating the actual `async` [`SendableRecordBatchStream`]s +/// of [`RecordBatch`] that incrementally compute the operator's +/// output from its input partition. +#[async_trait] +pub trait PhysicalPlan: Debug + Send + Sync { + /// Returns the physical plan as [`Any`](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Get the schema for this physical plan + fn schema(&self) -> SchemaRef; + + /// Specifies the output partitioning scheme of this plan + fn output_partitioning(&self) -> Partitioning; + + /// Get a list of child physical plans that provide the input for this plan. The returned list + /// will be empty for leaf nodes, will contain a single value for unary nodes, or two + /// values for binary nodes (such as joins). + fn children(&self) -> Vec; + + /// Returns a new plan where all children were replaced by new plans. + /// The size of `children` must be equal to the size of `PhysicalPlan::children()`. + fn with_new_children(&self, children: Vec) -> Result; + + /// Creates an RecordBatch stream. + async fn execute( + &self, + partition: usize, + runtime: Arc, + ) -> Result; +} + +#[derive(Debug)] +pub struct PhysicalPlanAdapter { + schema: SchemaRef, + df_plan: Arc, +} + +impl PhysicalPlanAdapter { + pub fn new(schema: SchemaRef, df_plan: Arc) -> Self { + Self { schema, df_plan } + } + + pub fn df_plan(&self) -> Arc { + self.df_plan.clone() + } +} + +#[async_trait] +impl PhysicalPlan for PhysicalPlanAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + self.df_plan.output_partitioning() + } + + fn children(&self) -> Vec { + self.df_plan + .children() + .into_iter() + .map(|x| Arc::new(PhysicalPlanAdapter::new(self.schema(), x)) as _) + .collect() + } + + fn with_new_children(&self, children: Vec) -> Result { + let children = children + .into_iter() + .map(|x| Arc::new(DfPhysicalPlanAdapter(x)) as _) + .collect(); + let plan = self + .df_plan + .with_new_children(children) + .context(error::GeneralDataFusionSnafu)?; + Ok(Arc::new(PhysicalPlanAdapter::new(self.schema(), plan))) + } + + async fn execute( + &self, + partition: usize, + runtime: Arc, + ) -> Result { + let stream = self + .df_plan + .execute(partition, runtime) + .await + .context(error::DataFusionExecutionPlanSnafu)?; + let stream = RecordBatchStreamAdapter::try_new(stream) + .context(error::ConvertDfRecordBatchStreamSnafu)?; + Ok(Box::pin(stream)) + } +} + +#[derive(Debug)] +pub struct DfPhysicalPlanAdapter(pub PhysicalPlanRef); + +#[async_trait] +impl DfPhysicalPlan for DfPhysicalPlanAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> DfSchemaRef { + self.0.schema().arrow_schema().clone() + } + + fn output_partitioning(&self) -> Partitioning { + self.0.output_partitioning() + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + self.0 + .children() + .into_iter() + .map(|x| Arc::new(DfPhysicalPlanAdapter(x)) as _) + .collect() + } + + fn with_new_children( + &self, + children: Vec>, + ) -> DfResult> { + let df_schema = self.schema(); + let schema: SchemaRef = Arc::new( + df_schema + .try_into() + .context(error::ConvertArrowSchemaSnafu) + .map_err(error::Error::from)?, + ); + let children = children + .into_iter() + .map(|x| Arc::new(PhysicalPlanAdapter::new(schema.clone(), x)) as _) + .collect(); + let plan = self.0.with_new_children(children)?; + Ok(Arc::new(DfPhysicalPlanAdapter(plan))) + } + + async fn execute( + &self, + partition: usize, + runtime: Arc, + ) -> DfResult { + let stream = self.0.execute(partition, runtime).await?; + Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))) + } + + fn statistics(&self) -> Statistics { + // TODO(LFC): impl statistics + Statistics::default() + } +} + +#[cfg(test)] +mod test { + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use common_recordbatch::{RecordBatch, RecordBatches}; + use datafusion::arrow_print; + use datafusion::datasource::TableProvider as DfTableProvider; + use datafusion::logical_plan::LogicalPlanBuilder; + use datafusion::physical_plan::collect; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion::prelude::ExecutionContext; + use datafusion_common::field_util::SchemaExt; + use datafusion_expr::Expr; + use datatypes::schema::Schema; + use datatypes::vectors::Int32Vector; + + use super::*; + + struct MyDfTableProvider; + + #[async_trait] + impl DfTableProvider for MyDfTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> DfSchemaRef { + Arc::new(ArrowSchema::new(vec![Field::new( + "a", + DataType::Int32, + false, + )])) + } + + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + _limit: Option, + ) -> DfResult> { + let schema = Schema::try_from(self.schema()).unwrap(); + let my_plan = Arc::new(MyExecutionPlan { + schema: Arc::new(schema), + }); + let df_plan = DfPhysicalPlanAdapter(my_plan); + Ok(Arc::new(df_plan)) + } + } + + #[derive(Debug)] + struct MyExecutionPlan { + schema: SchemaRef, + } + + #[async_trait] + impl PhysicalPlan for MyExecutionPlan { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec { + vec![] + } + + fn with_new_children(&self, _children: Vec) -> Result { + unimplemented!() + } + + async fn execute( + &self, + _partition: usize, + _runtime: Arc, + ) -> Result { + let schema = self.schema(); + let recordbatches = RecordBatches::try_new( + schema.clone(), + vec![ + RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice(vec![1])) as _], + ) + .unwrap(), + RecordBatch::new( + schema, + vec![Arc::new(Int32Vector::from_slice(vec![2, 3])) as _], + ) + .unwrap(), + ], + ) + .unwrap(); + Ok(recordbatches.as_stream()) + } + } + + // Test our physical plan can be executed by DataFusion, through adapters. + #[tokio::test] + async fn test_execute_physical_plan() { + let ctx = ExecutionContext::new(); + let logical_plan = LogicalPlanBuilder::scan("test", Arc::new(MyDfTableProvider), None) + .unwrap() + .build() + .unwrap(); + let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap(); + let df_recordbatches = collect(physical_plan, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); + let pretty_print = arrow_print::write(&df_recordbatches); + let pretty_print = pretty_print.lines().collect::>(); + assert_eq!( + pretty_print, + vec!["+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+",] + ); + } + + #[test] + fn test_physical_plan_adapter() { + let df_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "name", + DataType::Utf8, + true, + )])); + + let plan = PhysicalPlanAdapter::new( + Arc::new(Schema::try_from(df_schema.clone()).unwrap()), + Arc::new(EmptyExec::new(true, df_schema.clone())), + ); + assert!(plan.df_plan.as_any().downcast_ref::().is_some()); + + let df_plan = DfPhysicalPlanAdapter(Arc::new(plan)); + assert_eq!(df_schema, df_plan.schema()); + } +} diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index 69e5f889ce..aa6aa7dfab 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] common-error = { path = "../error" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../../datatypes" } futures = "0.3" diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs new file mode 100644 index 0000000000..18bcd7f848 --- /dev/null +++ b/src/common/recordbatch/src/adapter.rs @@ -0,0 +1,92 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; +use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream; +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; +use datatypes::arrow::error::ArrowError; +use datatypes::arrow::error::Result as ArrowResult; +use datatypes::schema::{Schema, SchemaRef}; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::DfSendableRecordBatchStream; +use crate::{RecordBatch, RecordBatchStream, SendableRecordBatchStream, Stream}; + +/// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream +pub struct DfRecordBatchStreamAdapter { + stream: SendableRecordBatchStream, +} + +impl DfRecordBatchStreamAdapter { + pub fn new(stream: SendableRecordBatchStream) -> Self { + Self { stream } + } +} + +impl DfRecordBatchStream for DfRecordBatchStreamAdapter { + fn schema(&self) -> DfSchemaRef { + self.stream.schema().arrow_schema().clone() + } +} + +impl Stream for DfRecordBatchStreamAdapter { + type Item = ArrowResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(recordbatch)) => match recordbatch { + Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.df_recordbatch))), + Err(e) => Poll::Ready(Some(Err(ArrowError::External("".to_owned(), Box::new(e))))), + }, + Poll::Ready(None) => Poll::Ready(None), + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + +/// DataFusion SendableRecordBatchStream -> Greptime RecordBatchStream +pub struct RecordBatchStreamAdapter { + schema: SchemaRef, + stream: DfSendableRecordBatchStream, +} + +impl RecordBatchStreamAdapter { + pub fn try_new(stream: DfSendableRecordBatchStream) -> Result { + let schema = + Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); + Ok(Self { schema, stream }) + } +} + +impl RecordBatchStream for RecordBatchStreamAdapter { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for RecordBatchStreamAdapter { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.stream).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(df_recordbatch)) => Poll::Ready(Some(Ok(RecordBatch { + schema: self.schema(), + df_recordbatch: df_recordbatch.context(error::PollStreamSnafu)?, + }))), + Poll::Ready(None) => Poll::Ready(None), + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index 597e3b6f63..2524d159bd 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -33,16 +33,32 @@ pub enum InnerError { reason: String, backtrace: Backtrace, }, + + #[snafu(display("Failed to convert Arrow schema, source: {}", source))] + SchemaConversion { + source: datatypes::error::Error, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to poll stream, source: {}", source))] + PollStream { + source: datatypes::arrow::error::ArrowError, + backtrace: Backtrace, + }, } impl ErrorExt for InnerError { fn status_code(&self) -> StatusCode { match self { InnerError::NewDfRecordBatch { .. } => StatusCode::InvalidArguments, - InnerError::DataTypes { .. } | InnerError::CreateRecordBatches { .. } => { - StatusCode::Internal - } + + InnerError::DataTypes { .. } + | InnerError::CreateRecordBatches { .. } + | InnerError::PollStream { .. } => StatusCode::Internal, + InnerError::External { source } => source.status_code(), + + InnerError::SchemaConversion { source, .. } => source.status_code(), } } diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 988431e8a0..5b93b7a5df 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -1,9 +1,11 @@ +pub mod adapter; pub mod error; mod recordbatch; pub mod util; use std::pin::Pin; +pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::schema::SchemaRef; use error::Result; use futures::task::{Context, Poll}; @@ -74,6 +76,41 @@ impl RecordBatches { pub fn take(self) -> Vec { self.batches } + + pub fn as_stream(&self) -> SendableRecordBatchStream { + Box::pin(SimpleRecordBatchStream { + inner: RecordBatches { + schema: self.schema(), + batches: self.batches.clone(), + }, + index: 0, + }) + } +} + +pub struct SimpleRecordBatchStream { + inner: RecordBatches, + index: usize, +} + +impl RecordBatchStream for SimpleRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + +impl Stream for SimpleRecordBatchStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(if self.index < self.inner.batches.len() { + let batch = self.inner.batches[self.index].clone(); + self.index += 1; + Some(Ok(batch)) + } else { + None + }) + } } #[cfg(test)] @@ -116,4 +153,27 @@ mod tests { assert_eq!(schema1, batches.schema()); assert_eq!(vec![batch1], batches.take()); } + + #[tokio::test] + async fn test_simple_recordbatch_stream() { + let column_a = ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false); + let column_b = ColumnSchema::new("b", ConcreteDataType::string_datatype(), false); + let schema = Arc::new(Schema::new(vec![column_a, column_b])); + + let va1: VectorRef = Arc::new(Int32Vector::from_slice(&[1, 2])); + let vb1: VectorRef = Arc::new(StringVector::from(vec!["a", "b"])); + let batch1 = RecordBatch::new(schema.clone(), vec![va1, vb1]).unwrap(); + + let va2: VectorRef = Arc::new(Int32Vector::from_slice(&[3, 4, 5])); + let vb2: VectorRef = Arc::new(StringVector::from(vec!["c", "d", "e"])); + let batch2 = RecordBatch::new(schema.clone(), vec![va2, vb2]).unwrap(); + + let recordbatches = + RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); + let stream = recordbatches.as_stream(); + let collected = util::collect(stream).await.unwrap(); + assert_eq!(collected.len(), 2); + assert_eq!(collected[0], batch1); + assert_eq!(collected[1], batch2); + } } diff --git a/src/datanode/src/server/grpc/insert.rs b/src/datanode/src/server/grpc/insert.rs index dec4f91bc2..93d229478b 100644 --- a/src/datanode/src/server/grpc/insert.rs +++ b/src/datanode/src/server/grpc/insert.rs @@ -364,8 +364,8 @@ mod tests { insert_expr, Column, ColumnDataType, }; use common_base::BitVec; + use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; - use common_recordbatch::SendableRecordBatchStream; use common_time::timestamp::Timestamp; use datatypes::{ data_type::ConcreteDataType, @@ -552,7 +552,7 @@ mod tests { _projection: &Option>, _filters: &[Expr], _limit: Option, - ) -> TableResult { + ) -> TableResult { unimplemented!(); } } diff --git a/src/datanode/src/server/grpc/plan.rs b/src/datanode/src/server/grpc/plan.rs index 813d4e809b..cf18d43764 100644 --- a/src/datanode/src/server/grpc/plan.rs +++ b/src/datanode/src/server/grpc/plan.rs @@ -2,17 +2,16 @@ use std::sync::Arc; use common_grpc::AsExcutionPlan; use common_grpc::DefaultAsPlanImpl; +use common_query::physical_plan::PhysicalPlanAdapter; +use common_query::physical_plan::PhysicalPlanRef; use common_query::Output; use datatypes::schema::Schema; -use query::PhysicalPlanAdapter; -use query::{plan::PhysicalPlan, QueryEngineRef}; +use query::QueryEngineRef; use snafu::ResultExt; use crate::error::Result; use crate::error::{ConvertSchemaSnafu, ExecutePhysicalPlanSnafu, IntoPhysicalPlanSnafu}; -pub type PhysicalPlanRef = Arc; - pub struct PhysicalPlanner { query_engine: QueryEngineRef, } diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 0be1dade92..f9f1b3009d 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -92,7 +92,7 @@ mod tests { use catalog::SchemaProvider; use common_query::logical_plan::Expr; - use common_recordbatch::SendableRecordBatchStream; + use common_query::physical_plan::PhysicalPlanRef; use common_time::timestamp::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder, SchemaRef}; @@ -146,7 +146,7 @@ mod tests { _projection: &Option>, _filters: &[Expr], _limit: Option, - ) -> TableResult { + ) -> TableResult { unimplemented!(); } } diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml index 415834f871..2bb57d0111 100644 --- a/src/meta-client/Cargo.toml +++ b/src/meta-client/Cargo.toml @@ -2,7 +2,6 @@ name = "meta-client" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] api = { path = "../api" } diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index dfdad5c761..f5cf35aa81 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -2,7 +2,6 @@ name = "meta-srv" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] api = { path = "../api" } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index d89b176ebc..074ed76efb 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -2,7 +2,6 @@ mod catalog_adapter; mod error; -pub mod plan_adapter; mod planner; use std::sync::Arc; @@ -11,9 +10,14 @@ use catalog::CatalogListRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; use common_function::scalars::FunctionRef; +use common_query::physical_plan::PhysicalPlanAdapter; +use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlan}; use common_query::{prelude::ScalarUdf, Output}; +use common_recordbatch::adapter::RecordBatchStreamAdapter; use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::timer; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::ExecutionPlan; use snafu::{OptionExt, ResultExt}; use sql::statements::statement::Statement; use sql::{dialect::GenericDialect, parser::ParserContext}; @@ -22,14 +26,13 @@ pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; use crate::metric; use crate::query_engine::{QueryContext, QueryEngineState}; use crate::{ - datafusion::plan_adapter::PhysicalPlanAdapter, datafusion::planner::{DfContextProviderAdapter, DfPlanner}, error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, physical_planner::PhysicalPlanner, - plan::{LogicalPlan, PhysicalPlan}, + plan::LogicalPlan, planner::Planner, QueryEngine, }; @@ -179,8 +182,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine { .as_any() .downcast_ref::() .context(error::PhysicalPlanDowncastSnafu)? - .df_plan() - .clone(); + .df_plan(); for optimizer in optimizers { new_plan = optimizer @@ -203,9 +205,24 @@ impl QueryExecutor for DatafusionQueryEngine { let _timer = timer!(metric::METRIC_EXEC_PLAN_ELAPSED); match plan.output_partitioning().partition_count() { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), - 1 => Ok(plan.execute(&ctx.state().runtime(), 0).await?), + 1 => Ok(plan + .execute(0, ctx.state().runtime()) + .await + .context(error::ExecutePhysicalPlanSnafu)?), _ => { - unimplemented!(); + // merge into a single partition + let plan = + CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(plan.clone()))); + // CoalescePartitionsExec must produce a single partition + assert_eq!(1, plan.output_partitioning().partition_count()); + let df_stream = plan.execute(0, ctx.state().runtime()).await.context( + error::DatafusionSnafu { + msg: "Failed to execute DataFusion merge exec", + }, + )?; + let stream = RecordBatchStreamAdapter::try_new(df_stream) + .context(error::ConvertDfRecordBatchStreamSnafu)?; + Ok(Box::pin(stream)) } } } diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index fe6b1127a0..fe704c9aaf 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -12,7 +12,6 @@ use datafusion::catalog::{ }; use datafusion::datasource::TableProvider as DfTableProvider; use datafusion::error::Result as DataFusionResult; -use datafusion::execution::runtime_env::RuntimeEnv; use snafu::ResultExt; use table::{ table::adapter::{DfTableProviderAdapter, TableAdapter}, @@ -22,16 +21,12 @@ use table::{ use crate::datafusion::error; pub struct DfCatalogListAdapter { - runtime: Arc, catalog_list: CatalogListRef, } impl DfCatalogListAdapter { - pub fn new(runtime: Arc, catalog_list: CatalogListRef) -> DfCatalogListAdapter { - DfCatalogListAdapter { - runtime, - catalog_list, - } + pub fn new(catalog_list: CatalogListRef) -> DfCatalogListAdapter { + DfCatalogListAdapter { catalog_list } } } @@ -47,16 +42,10 @@ impl DfCatalogList for DfCatalogListAdapter { ) -> Option> { let catalog_adapter = Arc::new(CatalogProviderAdapter { df_catalog_provider: catalog, - runtime: self.runtime.clone(), }); self.catalog_list .register_catalog(name, catalog_adapter) - .map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) + .map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _) } fn catalog_names(&self) -> Vec { @@ -64,19 +53,15 @@ impl DfCatalogList for DfCatalogListAdapter { } fn catalog(&self, name: &str) -> Option> { - self.catalog_list.catalog(name).map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) + self.catalog_list + .catalog(name) + .map(|catalog_provider| Arc::new(DfCatalogProviderAdapter { catalog_provider }) as _) } } /// Datafusion's CatalogProvider -> greptime CatalogProvider struct CatalogProviderAdapter { df_catalog_provider: Arc, - runtime: Arc, } impl CatalogProvider for CatalogProviderAdapter { @@ -99,19 +84,13 @@ impl CatalogProvider for CatalogProviderAdapter { fn schema(&self, name: &str) -> Option> { self.df_catalog_provider .schema(name) - .map(|df_schema_provider| { - Arc::new(SchemaProviderAdapter { - df_schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) + .map(|df_schema_provider| Arc::new(SchemaProviderAdapter { df_schema_provider }) as _) } } ///Greptime CatalogProvider -> datafusion's CatalogProvider struct DfCatalogProviderAdapter { catalog_provider: CatalogProviderRef, - runtime: Arc, } impl DfCatalogProvider for DfCatalogProviderAdapter { @@ -124,19 +103,15 @@ impl DfCatalogProvider for DfCatalogProviderAdapter { } fn schema(&self, name: &str) -> Option> { - self.catalog_provider.schema(name).map(|schema_provider| { - Arc::new(DfSchemaProviderAdapter { - schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) + self.catalog_provider + .schema(name) + .map(|schema_provider| Arc::new(DfSchemaProviderAdapter { schema_provider }) as _) } } /// Greptime SchemaProvider -> datafusion SchemaProvider struct DfSchemaProviderAdapter { schema_provider: Arc, - runtime: Arc, } impl DfSchemaProvider for DfSchemaProviderAdapter { @@ -159,7 +134,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { name: String, table: Arc, ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table, self.runtime.clone())?); + let table = Arc::new(TableAdapter::new(table)?); match self.schema_provider.register_table(name, table)? { Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), None => Ok(None), @@ -181,7 +156,6 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { /// Datafusion SchemaProviderAdapter -> greptime SchemaProviderAdapter struct SchemaProviderAdapter { df_schema_provider: Arc, - runtime: Arc, } impl SchemaProvider for SchemaProviderAdapter { @@ -203,8 +177,8 @@ impl SchemaProvider for SchemaProviderAdapter { Some(adapter) => adapter.table(), None => { // TODO(yingwen): Avoid panic here. - let adapter = TableAdapter::new(table_provider, self.runtime.clone()) - .expect("convert datafusion table"); + let adapter = + TableAdapter::new(table_provider).expect("convert datafusion table"); Arc::new(adapter) as _ } } @@ -233,8 +207,7 @@ impl SchemaProvider for SchemaProviderAdapter { msg: "Fail to deregister table from datafusion", })? .map(|table| { - let adapter = TableAdapter::new(table, self.runtime.clone()) - .context(error::TableSchemaMismatchSnafu)?; + let adapter = TableAdapter::new(table).context(error::TableSchemaMismatchSnafu)?; Ok(Arc::new(adapter) as _) }) .transpose() @@ -259,7 +232,6 @@ mod tests { df_catalog_provider: Arc::new( datafusion::catalog::catalog::MemoryCatalogProvider::new(), ), - runtime: Arc::new(RuntimeEnv::default()), }; adapter.register_schema( @@ -271,7 +243,6 @@ mod tests { #[test] pub fn test_register_table() { let adapter = DfSchemaProviderAdapter { - runtime: Arc::new(RuntimeEnv::default()), schema_provider: Arc::new(MemorySchemaProvider::new()), }; @@ -288,9 +259,7 @@ mod tests { #[test] pub fn test_register_catalog() { - let rt = Arc::new(RuntimeEnv::default()); let catalog_list = DfCatalogListAdapter { - runtime: rt.clone(), catalog_list: new_memory_catalog_list().unwrap(), }; assert!(catalog_list @@ -298,7 +267,6 @@ mod tests { "test_catalog".to_string(), Arc::new(DfCatalogProviderAdapter { catalog_provider: Arc::new(MemoryCatalogProvider::new()), - runtime: rt, }), ) .is_none()); diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 846ec46675..c631eb36ec 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -44,6 +44,21 @@ pub enum InnerError { #[snafu(backtrace)] source: table::error::Error, }, + + #[snafu(display( + "Failed to convert DataFusion's recordbatch stream, source: {}", + source + ))] + ConvertDfRecordBatchStream { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to execute physical plan, source: {}", source))] + ExecutePhysicalPlan { + #[snafu(backtrace)] + source: common_query::error::Error, + }, } impl ErrorExt for InnerError { @@ -59,6 +74,8 @@ impl ErrorExt for InnerError { } ParseSql { source, .. } => source.status_code(), PlanSql { .. } => StatusCode::PlanQuery, + ConvertDfRecordBatchStream { source } => source.status_code(), + ExecutePhysicalPlan { source } => source.status_code(), } } diff --git a/src/query/src/datafusion/plan_adapter.rs b/src/query/src/datafusion/plan_adapter.rs deleted file mode 100644 index d71f2b9f6f..0000000000 --- a/src/query/src/datafusion/plan_adapter.rs +++ /dev/null @@ -1,215 +0,0 @@ -use std::any::Any; -use std::fmt::Debug; -use std::sync::Arc; - -use common_recordbatch::SendableRecordBatchStream; -use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; -use datafusion::execution::runtime_env::RuntimeEnv; -use datafusion::{ - error::Result as DfResult, - physical_plan::{ - expressions::PhysicalSortExpr, ExecutionPlan, Partitioning as DfPartitioning, - SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, - }, -}; -use datatypes::schema::SchemaRef; -use snafu::ResultExt; -use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; - -use crate::datafusion::error; -use crate::error::Result; -use crate::executor::Runtime; -use crate::plan::{Partitioning, PhysicalPlan}; - -/// Datafusion ExecutionPlan -> greptime PhysicalPlan -#[derive(Debug)] -pub struct PhysicalPlanAdapter { - plan: Arc, - schema: SchemaRef, -} - -impl PhysicalPlanAdapter { - pub fn new(schema: SchemaRef, plan: Arc) -> Self { - Self { schema, plan } - } - - #[inline] - pub fn df_plan(&self) -> &Arc { - &self.plan - } -} - -#[async_trait::async_trait] -impl PhysicalPlan for PhysicalPlanAdapter { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn output_partitioning(&self) -> Partitioning { - //FIXME(dennis) - Partitioning::UnknownPartitioning(1) - } - - fn children(&self) -> Vec> { - let mut plans: Vec> = vec![]; - for p in self.plan.children() { - let plan = PhysicalPlanAdapter::new(self.schema.clone(), p); - plans.push(Arc::new(plan)); - } - plans - } - - fn with_new_children( - &self, - children: Vec>, - ) -> Result> { - let mut df_children: Vec> = Vec::with_capacity(children.len()); - - for plan in children { - let p = Arc::new(ExecutionPlanAdapter { - plan, - schema: self.schema.clone(), - }); - df_children.push(p); - } - - let plan = self - .plan - .with_new_children(df_children) - .context(error::DatafusionSnafu { - msg: "Fail to add children to plan", - })?; - Ok(Arc::new(PhysicalPlanAdapter::new( - self.schema.clone(), - plan, - ))) - } - - async fn execute( - &self, - runtime: &Runtime, - partition: usize, - ) -> Result { - let df_stream = - self.plan - .execute(partition, runtime.into()) - .await - .context(error::DatafusionSnafu { - msg: "Fail to execute physical plan", - })?; - - Ok(Box::pin( - RecordBatchStreamAdapter::try_new(df_stream) - .context(error::TableSchemaMismatchSnafu)?, - )) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -/// Greptime PhysicalPlan -> datafusion ExecutionPlan. -#[derive(Debug)] -struct ExecutionPlanAdapter { - plan: Arc, - schema: SchemaRef, -} - -#[async_trait::async_trait] -impl ExecutionPlan for ExecutionPlanAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> DfSchemaRef { - self.schema.arrow_schema().clone() - } - - fn output_partitioning(&self) -> DfPartitioning { - // FIXME(dennis) - DfPartitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - // FIXME(dennis) - None - } - - fn children(&self) -> Vec> { - // TODO(dennis) - vec![] - } - - fn with_new_children( - &self, - children: Vec>, - ) -> DfResult> { - let mut gt_children: Vec> = Vec::with_capacity(children.len()); - - for plan in children { - let p = Arc::new(PhysicalPlanAdapter::new(self.schema.clone(), plan)); - gt_children.push(p); - } - - match self.plan.with_new_children(gt_children) { - Ok(plan) => Ok(Arc::new(ExecutionPlanAdapter { - schema: self.schema.clone(), - plan, - })), - Err(e) => Err(e.into()), - } - } - - async fn execute( - &self, - partition: usize, - runtime: Arc, - ) -> DfResult { - match self.plan.execute(&runtime.into(), partition).await { - Ok(stream) => Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))), - Err(e) => Err(e.into()), - } - } - - fn statistics(&self) -> Statistics { - //TODO(dennis) - Statistics::default() - } -} - -#[cfg(test)] -mod tests { - use arrow::datatypes::Field; - use datafusion::physical_plan::empty::EmptyExec; - use datafusion_common::field_util::SchemaExt; - use datatypes::schema::Schema; - - use super::*; - - #[test] - fn test_physical_plan_adapter() { - let arrow_schema = arrow::datatypes::Schema::new(vec![Field::new( - "name", - arrow::datatypes::DataType::Utf8, - true, - )]); - - let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); - let physical_plan = PhysicalPlanAdapter::new( - schema.clone(), - Arc::new(EmptyExec::new(true, Arc::new(arrow_schema))), - ); - - assert!(physical_plan - .plan - .as_any() - .downcast_ref::() - .is_some()); - let execution_plan_adapter = ExecutionPlanAdapter { - plan: Arc::new(physical_plan), - schema: schema.clone(), - }; - assert_eq!(schema, execution_plan_adapter.schema); - } -} diff --git a/src/query/src/executor.rs b/src/query/src/executor.rs index dabb713942..bc17ddd126 100644 --- a/src/query/src/executor.rs +++ b/src/query/src/executor.rs @@ -1,9 +1,9 @@ use std::sync::Arc; +use common_query::physical_plan::PhysicalPlan; use common_recordbatch::SendableRecordBatchStream; -use datafusion::execution::runtime_env::RuntimeEnv; -use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext}; +use crate::{error::Result, query_engine::QueryContext}; /// Executor to run [ExecutionPlan]. #[async_trait::async_trait] @@ -14,27 +14,3 @@ pub trait QueryExecutor { plan: &Arc, ) -> Result; } - -/// Execution runtime environment -#[derive(Clone, Default)] -pub struct Runtime { - runtime: Arc, -} - -impl From> for Runtime { - fn from(runtime: Arc) -> Self { - Runtime { runtime } - } -} - -impl From for Arc { - fn from(r: Runtime) -> Arc { - r.runtime - } -} - -impl From<&Runtime> for Arc { - fn from(r: &Runtime) -> Arc { - r.runtime.clone() - } -} diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index fe487506dc..3d8b4c6cd1 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -12,5 +12,4 @@ pub mod plan; pub mod planner; pub mod query_engine; -pub use crate::datafusion::plan_adapter::PhysicalPlanAdapter; pub use crate::query_engine::{QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef}; diff --git a/src/query/src/physical_optimizer.rs b/src/query/src/physical_optimizer.rs index bf7813a988..ad449043fc 100644 --- a/src/query/src/physical_optimizer.rs +++ b/src/query/src/physical_optimizer.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext}; +use common_query::physical_plan::PhysicalPlan; + +use crate::{error::Result, query_engine::QueryContext}; pub trait PhysicalOptimizer { fn optimize_physical_plan( diff --git a/src/query/src/physical_planner.rs b/src/query/src/physical_planner.rs index 967038afd0..6c8ab6c7b3 100644 --- a/src/query/src/physical_planner.rs +++ b/src/query/src/physical_planner.rs @@ -1,7 +1,9 @@ use std::sync::Arc; +use common_query::physical_plan::PhysicalPlan; + use crate::error::Result; -use crate::plan::{LogicalPlan, PhysicalPlan}; +use crate::plan::LogicalPlan; use crate::query_engine::QueryContext; /// Physical query planner that converts a `LogicalPlan` to an diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index d26c0cfa73..c881bdcb5a 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -1,13 +1,6 @@ -use std::any::Any; use std::fmt::Debug; -use std::sync::Arc; -use common_recordbatch::SendableRecordBatchStream; use datafusion::logical_plan::LogicalPlan as DfLogicalPlan; -use datatypes::schema::SchemaRef; - -use crate::error::Result; -use crate::executor::Runtime; /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by @@ -21,50 +14,3 @@ use crate::executor::Runtime; pub enum LogicalPlan { DfPlan(DfLogicalPlan), } - -/// Partitioning schemes supported by operators. -#[derive(Debug, Clone)] -pub enum Partitioning { - /// Unknown partitioning scheme with a known number of partitions - UnknownPartitioning(usize), -} - -impl Partitioning { - /// Returns the number of partitions in this partitioning scheme - pub fn partition_count(&self) -> usize { - use Partitioning::*; - match self { - UnknownPartitioning(n) => *n, - } - } -} - -#[async_trait::async_trait] -pub trait PhysicalPlan: Send + Sync + Any + Debug { - /// Get the schema for this execution plan - fn schema(&self) -> SchemaRef; - - /// Specifies the output partitioning scheme of this plan - fn output_partitioning(&self) -> Partitioning; - - /// Get a list of child execution plans that provide the input for this plan. The returned list - /// will be empty for leaf nodes, will contain a single value for unary nodes, or two - /// values for binary nodes (such as joins). - fn children(&self) -> Vec>; - - /// Returns a new plan where all children were replaced by new plans. - /// The size of `children` must be equal to the size of `ExecutionPlan::children()`. - fn with_new_children( - &self, - children: Vec>, - ) -> Result>; - - /// creates an iterator - async fn execute( - &self, - _runtime: &Runtime, - partition: usize, - ) -> Result; - - fn as_any(&self) -> &dyn Any; -} diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 2bd9711dc9..621a9a2ecc 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -6,13 +6,14 @@ use std::sync::Arc; use catalog::CatalogList; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; +use common_query::physical_plan::PhysicalPlan; use common_query::prelude::ScalarUdf; use common_query::Output; use sql::statements::statement::Statement; use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; -use crate::plan::{LogicalPlan, PhysicalPlan}; +use crate::plan::LogicalPlan; pub use crate::query_engine::context::QueryContext; pub use crate::query_engine::state::QueryEngineState; diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 73dfcbe0e7..2902177f99 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -4,6 +4,7 @@ use std::sync::{Arc, RwLock}; use catalog::CatalogListRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; +use common_query::physical_plan::RuntimeEnv; use common_query::prelude::ScalarUdf; use datafusion::optimizer::common_subexpr_eliminate::CommonSubexprEliminate; use datafusion::optimizer::eliminate_limit::EliminateLimit; @@ -15,7 +16,6 @@ use datafusion::optimizer::to_approx_perc::ToApproxPerc; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use crate::datafusion::DfCatalogListAdapter; -use crate::executor::Runtime; use crate::optimizer::TypeConversionRule; /// Query engine global state @@ -58,10 +58,8 @@ impl QueryEngineState { let df_context = ExecutionContext::with_config(config); - df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new( - df_context.runtime_env(), - catalog_list.clone(), - )); + df_context.state.lock().catalog_list = + Arc::new(DfCatalogListAdapter::new(catalog_list.clone())); Self { df_context, @@ -108,7 +106,7 @@ impl QueryEngineState { } #[inline] - pub(crate) fn runtime(&self) -> Runtime { - self.df_context.runtime_env().into() + pub(crate) fn runtime(&self) -> Arc { + self.df_context.runtime_env() } } diff --git a/src/table-engine/Cargo.toml b/src/table-engine/Cargo.toml index 635a1151bc..401bfe6917 100644 --- a/src/table-engine/Cargo.toml +++ b/src/table-engine/Cargo.toml @@ -17,6 +17,7 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2", features = ["simd"] } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", branch = "arrow2" } datatypes = { path = "../datatypes" } futures = "0.3" diff --git a/src/table-engine/src/engine.rs b/src/table-engine/src/engine.rs index d6ab139463..5185ee3eb8 100644 --- a/src/table-engine/src/engine.rs +++ b/src/table-engine/src/engine.rs @@ -426,6 +426,7 @@ impl MitoEngineInner { #[cfg(test)] mod tests { + use common_query::physical_plan::RuntimeEnv; use common_recordbatch::util; use datafusion_common::field_util::FieldExt; use datafusion_common::field_util::SchemaExt; @@ -520,6 +521,10 @@ mod tests { assert_eq!(2, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -555,6 +560,10 @@ mod tests { assert_eq!(2, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); @@ -612,6 +621,10 @@ mod tests { assert_eq!(2, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); assert_eq!(batches[0].df_recordbatch.num_columns(), 4); @@ -633,6 +646,10 @@ mod tests { // Scan with projections: cpu and memory let stream = table.scan(&Some(vec![1, 2]), &[], None).await.unwrap(); + let stream = stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); assert_eq!(batches[0].df_recordbatch.num_columns(), 2); @@ -650,6 +667,10 @@ mod tests { // Scan with projections: only ts let stream = table.scan(&Some(vec![3]), &[], None).await.unwrap(); + let stream = stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let batches = util::collect(stream).await.unwrap(); assert_eq!(1, batches.len()); assert_eq!(batches[0].df_recordbatch.num_columns(), 1); @@ -692,6 +713,10 @@ mod tests { assert_eq!(test_batch_size, table.insert(insert_req).await.unwrap()); let stream = table.scan(&None, &[], None).await.unwrap(); + let stream = stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let batches = util::collect(stream).await.unwrap(); let mut total = 0; for batch in batches { diff --git a/src/table-engine/src/table.rs b/src/table-engine/src/table.rs index 3a20f82001..04d1471bbe 100644 --- a/src/table-engine/src/table.rs +++ b/src/table-engine/src/table.rs @@ -8,8 +8,9 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_trait::async_trait; use common_query::logical_plan::Expr; +use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult}; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream}; use common_telemetry::logging; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use datatypes::vectors::VectorRef; @@ -25,6 +26,7 @@ use store_api::storage::{ use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult}; use table::metadata::{FilterPushDownType, TableInfoRef, TableMetaBuilder}; use table::requests::{AddColumnRequest, AlterKind, AlterTableRequest, InsertRequest}; +use table::table::scan::SimpleTableScan; use table::{ metadata::{TableInfo, TableType}, table::Table, @@ -154,7 +156,7 @@ impl Table for MitoTable { projection: &Option>, filters: &[Expr], _limit: Option, - ) -> TableResult { + ) -> TableResult { let read_ctx = ReadContext::default(); let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; @@ -180,7 +182,8 @@ impl Table for MitoTable { } }); - Ok(Box::pin(ChunkStream { schema, stream })) + let stream = Box::pin(ChunkStream { schema, stream }); + Ok(Arc::new(SimpleTableScan::new(stream))) } // Alter table changes the schemas of the table. The altering happens as cloning a new schema, diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 08aab5b2a9..e1dba45c6e 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -28,9 +28,6 @@ pub enum InnerError { #[snafu(display("Missing column when insert, column: {}", name))] MissingColumn { name: String, backtrace: Backtrace }, - #[snafu(display("Not expected to run ExecutionPlan more than once"))] - ExecuteRepeatedly { backtrace: Backtrace }, - #[snafu(display("Poll stream failed, source: {}", source))] PollStream { source: ArrowError, @@ -58,7 +55,6 @@ impl ErrorExt for InnerError { | InnerError::SchemaConversion { .. } | InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery, InnerError::MissingColumn { .. } => StatusCode::InvalidArguments, - InnerError::ExecuteRepeatedly { .. } => StatusCode::Unexpected, } } @@ -97,10 +93,6 @@ mod tests { Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)? } - fn throw_repeatedly() -> Result<()> { - ExecuteRepeatedlySnafu {}.fail()? - } - fn throw_missing_column_inner() -> std::result::Result<(), InnerError> { MissingColumnSnafu { name: "test" }.fail() } @@ -119,10 +111,6 @@ mod tests { assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::EngineExecuteQuery, err.status_code()); - let err = throw_repeatedly().err().unwrap(); - assert!(err.backtrace_opt().is_some()); - assert_eq!(StatusCode::Unexpected, err.status_code()); - let err = throw_missing_column().err().unwrap(); assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::InvalidArguments, err.status_code()); diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 7254df239f..7f3fbe854a 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -1,11 +1,13 @@ pub mod adapter; pub mod numbers; +pub mod scan; use std::any::Any; use std::sync::Arc; +use async_trait::async_trait; use common_query::logical_plan::Expr; -use common_recordbatch::SendableRecordBatchStream; +use common_query::physical_plan::PhysicalPlanRef; use datatypes::schema::SchemaRef; use crate::error::Result; @@ -13,7 +15,7 @@ use crate::metadata::{FilterPushDownType, TableInfoRef, TableType}; use crate::requests::{AlterTableRequest, InsertRequest}; /// Table abstraction. -#[async_trait::async_trait] +#[async_trait] pub trait Table: Send + Sync { /// Returns the table as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. @@ -45,7 +47,7 @@ pub trait Table: Send + Sync { // If set, it contains the amount of rows needed by the `LogicalPlan`, // The datasource should return *at least* this number of rows if available. limit: Option, - ) -> Result; + ) -> Result; /// Tests whether the table provider can make use of a filter expression /// to optimise data retrieval. diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 42481fcfc0..59a697d1d6 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -1,14 +1,9 @@ -use core::fmt::Formatter; -use core::pin::Pin; -use core::task::{Context, Poll}; use std::any::Any; -use std::fmt::Debug; -use std::mem; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use common_query::logical_plan::Expr; -use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_query::physical_plan::{DfPhysicalPlanAdapter, PhysicalPlanAdapter, PhysicalPlanRef}; +use common_query::DfPhysicalPlan; use common_telemetry::debug; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; /// Datafusion table adpaters @@ -17,92 +12,14 @@ use datafusion::datasource::{ TableType as DfTableType, }; use datafusion::error::Result as DfResult; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; -use datafusion::physical_plan::{ - expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, - RecordBatchStream as DfRecordBatchStream, - SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, -}; -use datafusion_common::record_batch::RecordBatch as DfRecordBatch; -use datatypes::arrow::error::{ArrowError, Result as ArrowResult}; -use datatypes::schema::SchemaRef; -use datatypes::schema::{Schema, SchemaRef as TableSchemaRef}; -use futures::Stream; +use datatypes::schema::{SchemaRef as TableSchemaRef, SchemaRef}; use snafu::prelude::*; use crate::error::{self, Result}; use crate::metadata::TableInfoRef; use crate::table::{FilterPushDownType, Table, TableRef, TableType}; -/// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan. -struct ExecutionPlanAdapter { - stream: Mutex>, - schema: SchemaRef, -} - -impl Debug for ExecutionPlanAdapter { - fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { - f.debug_struct("ExecutionPlanAdapter") - .field("schema", &self.schema) - .finish() - } -} - -#[async_trait::async_trait] -impl ExecutionPlan for ExecutionPlanAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> DfSchemaRef { - self.schema.arrow_schema().clone() - } - - fn output_partitioning(&self) -> Partitioning { - // FIXME(dennis) - Partitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - // FIXME(dennis) - None - } - - fn children(&self) -> Vec> { - // TODO(dennis) - vec![] - } - - fn with_new_children( - &self, - _children: Vec>, - ) -> DfResult> { - // TODO(dennis) - todo!(); - } - - async fn execute( - &self, - _partition: usize, - _runtime: Arc, - ) -> DfResult { - let mut stream = self.stream.lock().unwrap(); - - if stream.is_some() { - let stream = mem::replace(&mut *stream, None); - Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap()))) - } else { - error::ExecuteRepeatedlySnafu.fail()? - } - } - - fn statistics(&self) -> Statistics { - //TODO(dennis) - Statistics::default() - } -} - /// Greptime Table -> datafusion TableProvider pub struct DfTableProviderAdapter { table: TableRef, @@ -141,14 +58,10 @@ impl TableProvider for DfTableProviderAdapter { projection: &Option>, filters: &[DfExpr], limit: Option, - ) -> DfResult> { + ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Into::into).collect(); - - let stream = self.table.scan(projection, &filters, limit).await?; - Ok(Arc::new(ExecutionPlanAdapter { - schema: stream.schema(), - stream: Mutex::new(Some(stream)), - })) + let inner = self.table.scan(projection, &filters, limit).await?; + Ok(Arc::new(DfPhysicalPlanAdapter(inner))) } fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { @@ -167,15 +80,18 @@ impl TableProvider for DfTableProviderAdapter { pub struct TableAdapter { schema: TableSchemaRef, table_provider: Arc, - runtime: Arc, } impl TableAdapter { - pub fn new(table_provider: Arc, runtime: Arc) -> Result { + pub fn new(table_provider: Arc) -> Result { Ok(Self { - schema: Arc::new(table_provider.schema().try_into().unwrap()), + schema: Arc::new( + table_provider + .schema() + .try_into() + .context(error::SchemaConversionSnafu)?, + ), table_provider, - runtime, }) } } @@ -207,7 +123,7 @@ impl Table for TableAdapter { projection: &Option>, filters: &[Expr], limit: Option, - ) -> Result { + ) -> Result { let filters: Vec = filters.iter().map(|e| e.df_expr().clone()).collect(); debug!("TableScan filter size: {}", filters.len()); let execution_plan = self @@ -215,14 +131,13 @@ impl Table for TableAdapter { .scan(projection, &filters, limit) .await .context(error::DatafusionSnafu)?; - - // FIXME(dennis) Partitioning - let df_stream = execution_plan - .execute(0, self.runtime.clone()) - .await - .context(error::DatafusionSnafu)?; - - Ok(Box::pin(RecordBatchStreamAdapter::try_new(df_stream)?)) + let schema: SchemaRef = Arc::new( + execution_plan + .schema() + .try_into() + .context(error::SchemaConversionSnafu)?, + ); + Ok(Arc::new(PhysicalPlanAdapter::new(schema, execution_plan))) } fn supports_filter_pushdown(&self, filter: &Expr) -> Result { @@ -238,83 +153,6 @@ impl Table for TableAdapter { } } -/// Greptime SendableRecordBatchStream -> datafusion RecordBatchStream -pub struct DfRecordBatchStreamAdapter { - stream: SendableRecordBatchStream, -} - -impl DfRecordBatchStreamAdapter { - pub fn new(stream: SendableRecordBatchStream) -> Self { - Self { stream } - } -} - -impl DfRecordBatchStream for DfRecordBatchStreamAdapter { - fn schema(&self) -> DfSchemaRef { - self.stream.schema().arrow_schema().clone() - } -} - -impl Stream for DfRecordBatchStreamAdapter { - type Item = ArrowResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(recordbatch)) => match recordbatch { - Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.df_recordbatch))), - Err(e) => Poll::Ready(Some(Err(ArrowError::External("".to_owned(), Box::new(e))))), - }, - Poll::Ready(None) => Poll::Ready(None), - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - -/// Datafusion SendableRecordBatchStream to greptime RecordBatchStream -pub struct RecordBatchStreamAdapter { - schema: SchemaRef, - stream: DfSendableRecordBatchStream, -} - -impl RecordBatchStreamAdapter { - pub fn try_new(stream: DfSendableRecordBatchStream) -> Result { - let schema = - Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?); - Ok(Self { schema, stream }) - } -} - -impl RecordBatchStream for RecordBatchStreamAdapter { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -impl Stream for RecordBatchStreamAdapter { - type Item = RecordBatchResult; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Some(df_recordbatch)) => Poll::Ready(Some(Ok(RecordBatch { - schema: self.schema(), - df_recordbatch: df_recordbatch.context(error::PollStreamSnafu)?, - }))), - Poll::Ready(None) => Poll::Ready(None), - } - } - - #[inline] - fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() - } -} - #[cfg(test)] mod tests { use datafusion::arrow; @@ -328,14 +166,14 @@ mod tests { #[should_panic] fn test_table_adaptor_info() { let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - let table_adapter = TableAdapter::new(df_table, Arc::new(RuntimeEnv::default())).unwrap(); + let table_adapter = TableAdapter::new(df_table).unwrap(); let _ = table_adapter.table_info(); } #[test] fn test_table_adaptor_type() { let df_table = Arc::new(EmptyTable::new(Arc::new(arrow::datatypes::Schema::empty()))); - let table_adapter = TableAdapter::new(df_table, Arc::new(RuntimeEnv::default())).unwrap(); + let table_adapter = TableAdapter::new(df_table).unwrap(); assert_eq!(Base, table_adapter.table_type()); } } diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs index 92b019b6de..119ad78b9c 100644 --- a/src/table/src/table/numbers.rs +++ b/src/table/src/table/numbers.rs @@ -2,8 +2,9 @@ use std::any::Any; use std::pin::Pin; use std::sync::Arc; +use common_query::physical_plan::PhysicalPlanRef; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream}; use datafusion_common::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::array::UInt32Array; use datatypes::data_type::ConcreteDataType; @@ -13,6 +14,7 @@ use futures::Stream; use crate::error::Result; use crate::metadata::TableInfoRef; +use crate::table::scan::SimpleTableScan; use crate::table::{Expr, Table}; /// numbers table for test @@ -53,12 +55,13 @@ impl Table for NumbersTable { _projection: &Option>, _filters: &[Expr], limit: Option, - ) -> Result { - Ok(Box::pin(NumbersStream { + ) -> Result { + let stream = Box::pin(NumbersStream { limit: limit.unwrap_or(100) as u32, schema: self.schema.clone(), already_run: false, - })) + }); + Ok(Arc::new(SimpleTableScan::new(stream))) } } diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs new file mode 100644 index 0000000000..372e35c677 --- /dev/null +++ b/src/table/src/table/scan.rs @@ -0,0 +1,123 @@ +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use common_query::error as query_error; +use common_query::error::Result as QueryResult; +use common_query::physical_plan::Partitioning; +use common_query::physical_plan::RuntimeEnv; +use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; +use common_recordbatch::SendableRecordBatchStream; +use datatypes::schema::SchemaRef; +use snafu::OptionExt; + +pub struct SimpleTableScan { + stream: Mutex>, + schema: SchemaRef, +} + +impl Debug for SimpleTableScan { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimpleTableScan") + .field("stream", &"") + .field("schema", &self.schema) + .finish() + } +} + +impl SimpleTableScan { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema(); + Self { + stream: Mutex::new(Some(stream)), + schema, + } + } +} + +#[async_trait] +impl PhysicalPlan for SimpleTableScan { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn children(&self) -> Vec { + vec![] + } + + fn with_new_children(&self, _children: Vec) -> QueryResult { + unimplemented!() + } + + async fn execute( + &self, + _partition: usize, + _runtime: Arc, + ) -> QueryResult { + let mut stream = self.stream.lock().unwrap(); + Ok(stream.take().context(query_error::ExecuteRepeatedlySnafu)?) + } +} + +#[cfg(test)] +mod test { + use common_recordbatch::util; + use common_recordbatch::{RecordBatch, RecordBatches}; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + + use super::*; + + #[tokio::test] + async fn test_simple_table_scan() { + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "a", + ConcreteDataType::int32_datatype(), + false, + )])); + + let batch1 = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice(&[1, 2])) as _], + ) + .unwrap(); + let batch2 = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice(&[3, 4, 5])) as _], + ) + .unwrap(); + + let recordbatches = + RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); + let stream = recordbatches.as_stream(); + + let scan = SimpleTableScan::new(stream); + + assert_eq!(scan.schema(), schema); + + let runtime = Arc::new(RuntimeEnv::default()); + let stream = scan.execute(0, runtime.clone()).await.unwrap(); + let recordbatches = util::collect(stream).await.unwrap(); + assert_eq!(recordbatches[0], batch1); + assert_eq!(recordbatches[1], batch2); + + let result = scan.execute(0, runtime).await; + assert!(result.is_err()); + match result { + Err(e) => assert!(e + .to_string() + .contains("Not expected to run ExecutionPlan more than once")), + _ => unreachable!(), + } + } +} diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index ebe2e418c7..7ef87eda36 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -1,11 +1,13 @@ use std::sync::Arc; use async_trait::async_trait; -use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; +use common_query::physical_plan::PhysicalPlanRef; +use common_recordbatch::EmptyRecordBatchStream; use crate::metadata::TableInfoBuilder; use crate::metadata::TableInfoRef; use crate::requests::InsertRequest; +use crate::table::scan::SimpleTableScan; use crate::Result; use crate::{ metadata::{TableMetaBuilder, TableType}, @@ -64,7 +66,8 @@ impl Table for EmptyTable { _projection: &Option>, _filters: &[common_query::prelude::Expr], _limit: Option, - ) -> Result { - Ok(Box::pin(EmptyRecordBatchStream::new(self.schema()))) + ) -> Result { + let scan = SimpleTableScan::new(Box::pin(EmptyRecordBatchStream::new(self.schema()))); + Ok(Arc::new(scan)) } } diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index dd923fc3fc..08a7634d1c 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -3,9 +3,10 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use common_recordbatch::error::Result as RecordBatchResult; -use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStream}; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::UInt32Vector; @@ -15,6 +16,7 @@ use snafu::prelude::*; use crate::error::{Result, SchemaConversionSnafu, TableProjectionSnafu}; use crate::metadata::TableInfoRef; +use crate::table::scan::SimpleTableScan; use crate::Table; #[derive(Debug, Clone)] @@ -71,7 +73,7 @@ impl Table for MemTable { projection: &Option>, _filters: &[Expr], limit: Option, - ) -> Result { + ) -> Result { let df_recordbatch = if let Some(indices) = projection { self.recordbatch .df_recordbatch @@ -95,10 +97,10 @@ impl Table for MemTable { ), df_recordbatch, }; - Ok(Box::pin(MemtableStream { + Ok(Arc::new(SimpleTableScan::new(Box::pin(MemtableStream { schema: recordbatch.schema.clone(), recordbatch: Some(recordbatch), - })) + })))) } } @@ -126,6 +128,7 @@ impl Stream for MemtableStream { #[cfg(test)] mod test { + use common_query::physical_plan::RuntimeEnv; use common_recordbatch::util; use datatypes::prelude::*; use datatypes::schema::ColumnSchema; @@ -138,6 +141,10 @@ mod test { let table = build_testing_table(); let scan_stream = table.scan(&Some(vec![1]), &[], None).await.unwrap(); + let scan_stream = scan_stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let recordbatch = util::collect(scan_stream).await.unwrap(); assert_eq!(1, recordbatch.len()); let columns = recordbatch[0].df_recordbatch.columns(); @@ -157,6 +164,10 @@ mod test { let table = build_testing_table(); let scan_stream = table.scan(&None, &[], Some(2)).await.unwrap(); + let scan_stream = scan_stream + .execute(0, Arc::new(RuntimeEnv::default())) + .await + .unwrap(); let recordbatch = util::collect(scan_stream).await.unwrap(); assert_eq!(1, recordbatch.len()); let columns = recordbatch[0].df_recordbatch.columns();