diff --git a/Cargo.lock b/Cargo.lock index fdfaef032c..3f965e195c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1052,14 +1052,17 @@ dependencies = [ name = "query" version = "0.1.0" dependencies = [ + "arrow2", "async-trait", "common-recordbatch", "datafusion", "datatypes", "futures", + "futures-util", "snafu", "table", "tokio", + "tokio-stream", ] [[package]] diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 85e75ed4e3..d785487e13 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -5,6 +5,7 @@ use std::pin::Pin; use datatypes::schema::SchemaRef; use error::Result; +use futures::task::{Context, Poll}; use futures::Stream; pub use recordbatch::RecordBatch; @@ -13,3 +14,31 @@ pub trait RecordBatchStream: Stream> { } pub type SendableRecordBatchStream = Pin>; + +/// EmptyRecordBatchStream can be used to create a RecordBatchStream +/// that will produce no results +pub struct EmptyRecordBatchStream { + /// Schema wrapped by Arc + schema: SchemaRef, +} + +impl EmptyRecordBatchStream { + /// Create an empty RecordBatchStream + pub fn new(schema: SchemaRef) -> Self { + Self { schema } + } +} + +impl RecordBatchStream for EmptyRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for EmptyRecordBatchStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(None) + } +} diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 3757bbb7a0..31f608e922 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -3,7 +3,10 @@ name = "query" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies.arrow] +package = "arrow2" +version="0.10" +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] [dependencies] async-trait = "0.1" @@ -11,6 +14,11 @@ common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} datatypes = {path = "../datatypes" } futures = "0.3" +futures-util = "0.3.21" snafu = "0.7.0" table = { path = "../table" } tokio = "1.0" + +[dev-dependencies] +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] } +tokio-stream = "0.1" diff --git a/src/query/src/catalog.rs b/src/query/src/catalog.rs index 4eaf4b5dbd..b6c802ad07 100644 --- a/src/query/src/catalog.rs +++ b/src/query/src/catalog.rs @@ -1,3 +1,4 @@ +pub mod memory; pub mod schema; use std::any::Any; use std::sync::Arc; diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs new file mode 100644 index 0000000000..cd75224b3b --- /dev/null +++ b/src/query/src/catalog/memory.rs @@ -0,0 +1,84 @@ +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; +use std::sync::RwLock; + +use crate::catalog::schema::SchemaProvider; +use crate::catalog::{CatalogList, CatalogProvider}; + +/// Simple in-memory list of catalogs +#[derive(Default)] +pub struct MemoryCatalogList { + /// Collection of catalogs containing schemas and ultimately TableProviders + pub catalogs: RwLock>>, +} + +impl CatalogList for MemoryCatalogList { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let mut catalogs = self.catalogs.write().unwrap(); + catalogs.insert(name, catalog) + } + + fn catalog_names(&self) -> Vec { + let catalogs = self.catalogs.read().unwrap(); + catalogs.keys().map(|s| s.to_string()).collect() + } + + fn catalog(&self, name: &str) -> Option> { + let catalogs = self.catalogs.read().unwrap(); + catalogs.get(name).cloned() + } +} + +impl Default for MemoryCatalogProvider { + fn default() -> Self { + Self::new() + } +} + +/// Simple in-memory implementation of a catalog. +pub struct MemoryCatalogProvider { + schemas: RwLock>>, +} + +impl MemoryCatalogProvider { + /// Instantiates a new MemoryCatalogProvider with an empty collection of schemas. + pub fn new() -> Self { + Self { + schemas: RwLock::new(HashMap::new()), + } + } + + pub fn register_schema( + &self, + name: impl Into, + schema: Arc, + ) -> Option> { + let mut schemas = self.schemas.write().unwrap(); + schemas.insert(name.into(), schema) + } +} + +impl CatalogProvider for MemoryCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + let schemas = self.schemas.read().unwrap(); + schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option> { + let schemas = self.schemas.read().unwrap(); + schemas.get(name).cloned() + } +} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 9dd8d7506e..8cda8b87cb 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,3 +1,4 @@ +use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; use snafu::Snafu; @@ -9,6 +10,8 @@ pub enum Error { Datafusion { source: DataFusionError }, #[snafu(display("PhysicalPlan downcast_ref failed"))] PhysicalPlanDowncast, + #[snafu(display("RecordBatch error: {}", source))] + RecordBatch { source: RecordBatchError }, } pub type Result = std::result::Result; diff --git a/src/query/src/executor.rs b/src/query/src/executor.rs index af472a419f..dabb713942 100644 --- a/src/query/src/executor.rs +++ b/src/query/src/executor.rs @@ -1,9 +1,40 @@ use std::sync::Arc; +use common_recordbatch::SendableRecordBatchStream; +use datafusion::execution::runtime_env::RuntimeEnv; + use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext}; /// Executor to run [ExecutionPlan]. #[async_trait::async_trait] pub trait QueryExecutor { - async fn execute_stream(&self, ctx: &QueryContext, plan: &Arc) -> Result<()>; + async fn execute_stream( + &self, + ctx: &QueryContext, + plan: &Arc, + ) -> Result; +} + +/// Execution runtime environment +#[derive(Clone, Default)] +pub struct Runtime { + runtime: Arc, +} + +impl From> for Runtime { + fn from(runtime: Arc) -> Self { + Runtime { runtime } + } +} + +impl From for Arc { + fn from(r: Runtime) -> Arc { + r.runtime + } +} + +impl From<&Runtime> for Arc { + fn from(r: &Runtime) -> Arc { + r.runtime.clone() + } } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 7f37f417d8..505028de67 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -5,5 +5,5 @@ pub mod executor; pub mod logical_optimizer; pub mod physical_optimizer; pub mod physical_planner; -mod plan; +pub mod plan; pub mod query_engine; diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index c259a6886e..25c3418ec1 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -6,6 +6,7 @@ use datafusion::logical_plan::LogicalPlan as DfLogicalPlan; use datatypes::schema::SchemaRef; use crate::error::Result; +use crate::executor::Runtime; /// A LogicalPlan represents the different types of relational /// operators (such as Projection, Filter, etc) and can be created by @@ -20,11 +21,31 @@ pub enum LogicalPlan { DfPlan(DfLogicalPlan), } +/// Partitioning schemes supported by operators. +#[derive(Debug, Clone)] +pub enum Partitioning { + /// Unknown partitioning scheme with a known number of partitions + UnknownPartitioning(usize), +} + +impl Partitioning { + /// Returns the number of partitions in this partitioning scheme + pub fn partition_count(&self) -> usize { + use Partitioning::*; + match self { + UnknownPartitioning(n) => *n, + } + } +} + #[async_trait::async_trait] pub trait PhysicalPlan: Send + Sync + Any { /// Get the schema for this execution plan fn schema(&self) -> SchemaRef; + /// Specifies the output partitioning scheme of this plan + fn output_partitioning(&self) -> Partitioning; + /// Get a list of child execution plans that provide the input for this plan. The returned list /// will be empty for leaf nodes, will contain a single value for unary nodes, or two /// values for binary nodes (such as joins). @@ -38,7 +59,11 @@ pub trait PhysicalPlan: Send + Sync + Any { ) -> Result>; /// creates an iterator - async fn execute(&self, partition: usize) -> Result; + async fn execute( + &self, + _runtime: &Runtime, + partition: usize, + ) -> Result; fn as_any(&self) -> &dyn Any; } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 76ae018c06..6746af567d 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -1,5 +1,7 @@ use std::sync::Arc; +use common_recordbatch::SendableRecordBatchStream; + use crate::catalog::CatalogList; use crate::error::Result; use crate::plan::LogicalPlan; @@ -14,7 +16,7 @@ use crate::query_engine::datafusion::DatafusionQueryEngine; #[async_trait::async_trait] pub trait QueryEngine { fn name(&self) -> &str; - async fn execute(&self, plan: &LogicalPlan) -> Result<()>; + async fn execute(&self, plan: &LogicalPlan) -> Result; } pub struct QueryEngineFactory { diff --git a/src/query/src/query_engine/context.rs b/src/query/src/query_engine/context.rs index 97bf384a59..6606b65048 100644 --- a/src/query/src/query_engine/context.rs +++ b/src/query/src/query_engine/context.rs @@ -1,3 +1,18 @@ /// Query engine execution context -#[derive(Default, Debug)] -pub struct QueryContext; +use crate::query_engine::state::QueryEngineState; + +#[derive(Debug)] +pub struct QueryContext { + state: QueryEngineState, +} + +impl QueryContext { + pub fn new(state: QueryEngineState) -> Self { + Self { state } + } + + #[inline] + pub fn state(&self) -> &QueryEngineState { + &self.state + } +} diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/query_engine/datafusion.rs index 5f16b21de0..2b223f0c03 100644 --- a/src/query/src/query_engine/datafusion.rs +++ b/src/query/src/query_engine/datafusion.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use snafu::{OptionExt, ResultExt}; use super::{context::QueryContext, state::QueryEngineState}; @@ -33,8 +34,8 @@ impl QueryEngine for DatafusionQueryEngine { fn name(&self) -> &str { "datafusion" } - async fn execute(&self, plan: &LogicalPlan) -> Result<()> { - let mut ctx = QueryContext::default(); + async fn execute(&self, plan: &LogicalPlan) -> Result { + let mut ctx = QueryContext::new(self.state.clone()); let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?; let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?; let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?; @@ -117,10 +118,15 @@ impl PhysicalOptimizer for DatafusionQueryEngine { impl QueryExecutor for DatafusionQueryEngine { async fn execute_stream( &self, - _ctx: &QueryContext, - _plan: &Arc, - ) -> Result<()> { - let _runtime = self.state.df_context().runtime_env(); - Ok(()) + ctx: &QueryContext, + plan: &Arc, + ) -> Result { + match plan.output_partitioning().partition_count() { + 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), + 1 => Ok(plan.execute(&ctx.state().runtime(), 0).await?), + _ => { + unimplemented!(); + } + } } } diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/query_engine/datafusion/adapter.rs index c60f183d5d..5e432ce892 100644 --- a/src/query/src/query_engine/datafusion/adapter.rs +++ b/src/query/src/query_engine/datafusion/adapter.rs @@ -4,11 +4,11 @@ use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::{ error::Result as DfResult, physical_plan::{ - expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, + expressions::PhysicalSortExpr, ExecutionPlan, Partitioning as DfPartitioning, SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics, }, }; @@ -17,7 +17,8 @@ use snafu::ResultExt; use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; use crate::error::{self, Result}; -use crate::plan::PhysicalPlan; +use crate::executor::Runtime; +use crate::plan::{Partitioning, PhysicalPlan}; /// Datafusion ExecutionPlan -> greptime PhysicalPlan pub struct PhysicalPlanAdapter { @@ -42,6 +43,11 @@ impl PhysicalPlan for PhysicalPlanAdapter { self.schema.clone() } + fn output_partitioning(&self) -> Partitioning { + //FIXME(dennis) + Partitioning::UnknownPartitioning(1) + } + fn children(&self) -> Vec> { let mut plans: Vec> = vec![]; for p in self.plan.children() { @@ -75,12 +81,14 @@ impl PhysicalPlan for PhysicalPlanAdapter { ))) } - async fn execute(&self, partition: usize) -> Result { - // FIXME(dennis) runtime - let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?; + async fn execute( + &self, + runtime: &Runtime, + partition: usize, + ) -> Result { let df_stream = self .plan - .execute(partition, Arc::new(runtime)) + .execute(partition, runtime.into()) .await .context(error::DatafusionSnafu)?; @@ -118,9 +126,9 @@ impl ExecutionPlan for ExecutionPlanAdapter { self.schema.arrow_schema().clone() } - fn output_partitioning(&self) -> Partitioning { + fn output_partitioning(&self) -> DfPartitioning { // FIXME(dennis) - Partitioning::UnknownPartitioning(1) + DfPartitioning::UnknownPartitioning(1) } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { @@ -156,9 +164,9 @@ impl ExecutionPlan for ExecutionPlanAdapter { async fn execute( &self, partition: usize, - _runtime: Arc, + runtime: Arc, ) -> DfResult { - match self.plan.execute(partition).await { + match self.plan.execute(&runtime.into(), partition).await { Ok(stream) => Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))), Err(e) => Err(e.into()), } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index d1f467f357..ef5fd34bfd 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -1,4 +1,5 @@ use std::any::Any; +use std::fmt; use std::sync::Arc; use datafusion::catalog::{ @@ -7,6 +8,7 @@ use datafusion::catalog::{ }; use datafusion::datasource::TableProvider as DfTableProvider; use datafusion::error::Result as DataFusionResult; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use snafu::ResultExt; use table::{ @@ -16,6 +18,7 @@ use table::{ use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider}; use crate::error::{self, Result}; +use crate::executor::Runtime; /// Query engine global state #[derive(Clone)] @@ -23,6 +26,13 @@ pub struct QueryEngineState { df_context: ExecutionContext, } +impl fmt::Debug for QueryEngineState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // TODO(dennis) better debug info + write!(f, "QueryEngineState: ") + } +} + impl QueryEngineState { pub(crate) fn new(catalog_list: Arc) -> Self { let config = ExecutionConfig::new().with_default_catalog_and_schema("greptime", "public"); @@ -30,6 +40,7 @@ impl QueryEngineState { df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { catalog_list: catalog_list.clone(), + runtime: df_context.runtime_env(), }); Self { df_context } @@ -39,10 +50,16 @@ impl QueryEngineState { pub(crate) fn df_context(&self) -> &ExecutionContext { &self.df_context } + + #[inline] + pub(crate) fn runtime(&self) -> Runtime { + self.df_context.runtime_env().into() + } } /// Adapters between datafusion and greptime query engine. struct DfCatalogListAdapter { + runtime: Arc, catalog_list: Arc, } @@ -58,9 +75,13 @@ impl DfCatalogList for DfCatalogListAdapter { ) -> Option> { let catalog_adapter = Arc::new(CatalogProviderAdapter { df_cataglog_provider: catalog, + runtime: self.runtime.clone(), }); match self.catalog_list.register_catalog(name, catalog_adapter) { - Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })), + Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + })), None => None, } } @@ -71,7 +92,10 @@ impl DfCatalogList for DfCatalogListAdapter { fn catalog(&self, name: &str) -> Option> { match self.catalog_list.catalog(name) { - Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })), + Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + })), None => None, } } @@ -80,6 +104,7 @@ impl DfCatalogList for DfCatalogListAdapter { /// Datafusion's CatalogProvider -> greptime CatalogProvider struct CatalogProviderAdapter { df_cataglog_provider: Arc, + runtime: Arc, } impl CatalogProvider for CatalogProviderAdapter { @@ -93,9 +118,10 @@ impl CatalogProvider for CatalogProviderAdapter { fn schema(&self, name: &str) -> Option> { match self.df_cataglog_provider.schema(name) { - Some(df_schema_provider) => { - Some(Arc::new(SchemaProviderAdapter { df_schema_provider })) - } + Some(df_schema_provider) => Some(Arc::new(SchemaProviderAdapter { + df_schema_provider, + runtime: self.runtime.clone(), + })), None => None, } } @@ -104,6 +130,7 @@ impl CatalogProvider for CatalogProviderAdapter { ///Greptime CatalogProvider -> datafusion's CatalogProvider struct DfCatalogProviderAdapter { catalog_provider: Arc, + runtime: Arc, } impl DfCatalogProvider for DfCatalogProviderAdapter { @@ -117,7 +144,10 @@ impl DfCatalogProvider for DfCatalogProviderAdapter { fn schema(&self, name: &str) -> Option> { match self.catalog_provider.schema(name) { - Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter { schema_provider })), + Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter { + schema_provider, + runtime: self.runtime.clone(), + })), None => None, } } @@ -126,6 +156,7 @@ impl DfCatalogProvider for DfCatalogProviderAdapter { /// Greptime SchemaProvider -> datafusion SchemaProvider struct DfSchemaProviderAdapter { schema_provider: Arc, + runtime: Arc, } impl DfSchemaProvider for DfSchemaProviderAdapter { @@ -149,7 +180,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { name: String, table: Arc, ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table)); + let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); match self.schema_provider.register_table(name, table) { Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), Ok(None) => Ok(None), @@ -173,6 +204,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { /// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter struct SchemaProviderAdapter { df_schema_provider: Arc, + runtime: Arc, } impl SchemaProvider for SchemaProviderAdapter { @@ -187,7 +219,10 @@ impl SchemaProvider for SchemaProviderAdapter { fn table(&self, name: &str) -> Option> { match self.df_schema_provider.table(name) { - Some(table_provider) => Some(Arc::new(TableAdapter::new(table_provider))), + Some(table_provider) => Some(Arc::new(TableAdapter::new( + table_provider, + self.runtime.clone(), + ))), None => None, } } @@ -203,7 +238,10 @@ impl SchemaProvider for SchemaProviderAdapter { .register_table(name, table_provider) .context(error::DatafusionSnafu)? { - Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))), + Some(table) => Ok(Some(Arc::new(TableAdapter::new( + table, + self.runtime.clone(), + )))), None => Ok(None), } } @@ -214,7 +252,10 @@ impl SchemaProvider for SchemaProviderAdapter { .deregister_table(name) .context(error::DatafusionSnafu)? { - Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))), + Some(table) => Ok(Some(Arc::new(TableAdapter::new( + table, + self.runtime.clone(), + )))), None => Ok(None), } } diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs new file mode 100644 index 0000000000..e03abc2206 --- /dev/null +++ b/src/query/tests/query_engine_test.rs @@ -0,0 +1,61 @@ +use std::sync::Arc; + +use arrow::array::UInt32Array; +use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; +use datafusion::field_util::FieldExt; +use datafusion::field_util::SchemaExt; +use datafusion::logical_plan::LogicalPlanBuilder; +use futures_util::stream::TryStreamExt; +use query::catalog::memory::MemoryCatalogList; +use query::error::{RecordBatchSnafu, Result}; +use query::plan::LogicalPlan; +use query::query_engine::QueryEngineFactory; +use snafu::ResultExt; +use table::table::adapter::DfTableProviderAdapter; +use table::table::numbers::NumbersTable; + +#[tokio::test] +async fn test_datafusion_query_engine() -> Result<()> { + let catalog_list = Arc::new(MemoryCatalogList::default()); + let factory = QueryEngineFactory::new(catalog_list); + let engine = factory.query_engine(); + + let limit = 10; + let table = Arc::new(NumbersTable::default()); + let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone())); + let plan = LogicalPlan::DfPlan( + LogicalPlanBuilder::scan("numbers", table_provider, None) + .unwrap() + .limit(limit) + .unwrap() + .build() + .unwrap(), + ); + + let ret = engine.execute(&plan).await; + + let numbers = collect(ret.unwrap()).await.unwrap(); + + assert_eq!(1, numbers.len()); + assert_eq!(numbers[0].df_recordbatch.num_columns(), 1); + assert_eq!(1, numbers[0].schema.arrow_schema().fields().len()); + assert_eq!("number", numbers[0].schema.arrow_schema().field(0).name()); + + let columns = numbers[0].df_recordbatch.columns(); + assert_eq!(1, columns.len()); + assert_eq!(columns[0].len(), limit); + let expected: Vec = (0u32..limit as u32).collect(); + assert_eq!( + *columns[0].as_any().downcast_ref::().unwrap(), + UInt32Array::from_slice(&expected) + ); + + Ok(()) +} + +pub async fn collect(stream: SendableRecordBatchStream) -> Result> { + stream + .try_collect::>() + .await + .context(RecordBatchSnafu) +} diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 31ae9d2532..2a039e8ba8 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -11,7 +11,7 @@ use datatypes::schema::{Schema, SchemaRef}; use crate::error::Result; pub mod adapter; -pub mod memory; +pub mod numbers; pub type TableId = u64; pub type TableVersion = u64; diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 2a3145eea1..ba266a9ae8 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -17,7 +17,7 @@ use datafusion::datasource::{ TableType as DfTableType, }; use datafusion::error::{DataFusionError, Result as DfResult}; -use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::{ expressions::PhysicalSortExpr, ExecutionPlan, Partitioning, @@ -168,11 +168,15 @@ impl TableProvider for DfTableProviderAdapter { /// Datafusion TableProvider -> greptime Table pub struct TableAdapter { table_provider: Arc, + runtime: Arc, } impl TableAdapter { - pub fn new(table_provider: Arc) -> Self { - Self { table_provider } + pub fn new(table_provider: Arc, runtime: Arc) -> Self { + Self { + table_provider, + runtime, + } } } @@ -208,10 +212,9 @@ impl Table for TableAdapter { .await .context(error::DatafusionSnafu)?; - // FIXME(dennis) Partitioning and runtime - let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?; + // FIXME(dennis) Partitioning let df_stream = execution_plan - .execute(0, Arc::new(runtime)) + .execute(0, self.runtime.clone()) .await .context(error::DatafusionSnafu)?; diff --git a/src/table/src/table/memory.rs b/src/table/src/table/memory.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/src/table/src/table/memory.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/table/src/table/numbers.rs b/src/table/src/table/numbers.rs new file mode 100644 index 0000000000..c3f0c668fe --- /dev/null +++ b/src/table/src/table/numbers.rs @@ -0,0 +1,93 @@ +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; + +use arrow::array::UInt32Array; +use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use common_recordbatch::error::Result as RecordBatchResult; +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use datafusion::field_util::SchemaExt; +use datafusion_common::record_batch::RecordBatch as DfRecordBatch; +use datatypes::schema::{Schema, SchemaRef}; +use futures::task::{Context, Poll}; +use futures::Stream; + +use crate::error::Result; +use crate::table::{Expr, Table}; + +/// numbers table for test +pub struct NumbersTable { + schema: SchemaRef, +} + +impl Default for NumbersTable { + fn default() -> Self { + let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "number", + DataType::UInt32, + false, + )])); + Self { + schema: Arc::new(Schema::new(arrow_schema)), + } + } +} + +#[async_trait::async_trait] +impl Table for NumbersTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + async fn scan( + &self, + _projection: &Option>, + _filters: &[Expr], + limit: Option, + ) -> Result { + Ok(Box::pin(NumbersStream { + limit: limit.unwrap_or(100) as u32, + schema: self.schema.clone(), + already_run: false, + })) + } +} + +// Limited numbers stream +struct NumbersStream { + limit: u32, + schema: SchemaRef, + already_run: bool, +} + +impl RecordBatchStream for NumbersStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +impl Stream for NumbersStream { + type Item = RecordBatchResult; + + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.already_run { + return Poll::Ready(None); + } + self.already_run = true; + let numbers: Vec = (0..self.limit).collect(); + let batch = DfRecordBatch::try_new( + self.schema.arrow_schema().clone(), + vec![Arc::new(UInt32Array::from_slice(&numbers))], + ) + .unwrap(); + + Poll::Ready(Some(Ok(RecordBatch { + schema: self.schema.clone(), + df_recordbatch: batch, + }))) + } +}