feat: impl QueryEngine#execute, adds numbers table and query engine test (#13)

* feat: impl QueryEngine#execute, adds numbers table and query engine test

* fix: clippy warning

* fix: reuse runtime in context in table adapter

* fix: by CR comments
This commit is contained in:
dennis zhuang
2022-04-27 15:15:26 +08:00
committed by GitHub
parent 12eefc3cd0
commit bf331ec4ac
19 changed files with 455 additions and 43 deletions

3
Cargo.lock generated
View File

@@ -1052,14 +1052,17 @@ dependencies = [
name = "query"
version = "0.1.0"
dependencies = [
"arrow2",
"async-trait",
"common-recordbatch",
"datafusion",
"datatypes",
"futures",
"futures-util",
"snafu",
"table",
"tokio",
"tokio-stream",
]
[[package]]

View File

@@ -5,6 +5,7 @@ use std::pin::Pin;
use datatypes::schema::SchemaRef;
use error::Result;
use futures::task::{Context, Poll};
use futures::Stream;
pub use recordbatch::RecordBatch;
@@ -13,3 +14,31 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
/// Schema wrapped by Arc
schema: SchemaRef,
}
impl EmptyRecordBatchStream {
/// Create an empty RecordBatchStream
pub fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}
impl RecordBatchStream for EmptyRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for EmptyRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}

View File

@@ -3,7 +3,10 @@ name = "query"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
[dependencies]
async-trait = "0.1"
@@ -11,6 +14,11 @@ common-recordbatch = {path = "../common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datatypes = {path = "../datatypes" }
futures = "0.3"
futures-util = "0.3.21"
snafu = "0.7.0"
table = { path = "../table" }
tokio = "1.0"
[dev-dependencies]
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"

View File

@@ -1,3 +1,4 @@
pub mod memory;
pub mod schema;
use std::any::Any;
use std::sync::Arc;

View File

@@ -0,0 +1,84 @@
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use crate::catalog::schema::SchemaProvider;
use crate::catalog::{CatalogList, CatalogProvider};
/// Simple in-memory list of catalogs
#[derive(Default)]
pub struct MemoryCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
}
impl CatalogList for MemoryCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let mut catalogs = self.catalogs.write().unwrap();
catalogs.insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
let catalogs = self.catalogs.read().unwrap();
catalogs.keys().map(|s| s.to_string()).collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let catalogs = self.catalogs.read().unwrap();
catalogs.get(name).cloned()
}
}
impl Default for MemoryCatalogProvider {
fn default() -> Self {
Self::new()
}
}
/// Simple in-memory implementation of a catalog.
pub struct MemoryCatalogProvider {
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
}
impl MemoryCatalogProvider {
/// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
pub fn new() -> Self {
Self {
schemas: RwLock::new(HashMap::new()),
}
}
pub fn register_schema(
&self,
name: impl Into<String>,
schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
let mut schemas = self.schemas.write().unwrap();
schemas.insert(name.into(), schema)
}
}
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.read().unwrap();
schemas.keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.read().unwrap();
schemas.get(name).cloned()
}
}

View File

@@ -1,3 +1,4 @@
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::error::DataFusionError;
use snafu::Snafu;
@@ -9,6 +10,8 @@ pub enum Error {
Datafusion { source: DataFusionError },
#[snafu(display("PhysicalPlan downcast_ref failed"))]
PhysicalPlanDowncast,
#[snafu(display("RecordBatch error: {}", source))]
RecordBatch { source: RecordBatchError },
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -1,9 +1,40 @@
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::runtime_env::RuntimeEnv;
use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};
/// Executor to run [ExecutionPlan].
#[async_trait::async_trait]
pub trait QueryExecutor {
async fn execute_stream(&self, ctx: &QueryContext, plan: &Arc<dyn PhysicalPlan>) -> Result<()>;
async fn execute_stream(
&self,
ctx: &QueryContext,
plan: &Arc<dyn PhysicalPlan>,
) -> Result<SendableRecordBatchStream>;
}
/// Execution runtime environment
#[derive(Clone, Default)]
pub struct Runtime {
runtime: Arc<RuntimeEnv>,
}
impl From<Arc<RuntimeEnv>> for Runtime {
fn from(runtime: Arc<RuntimeEnv>) -> Self {
Runtime { runtime }
}
}
impl From<Runtime> for Arc<RuntimeEnv> {
fn from(r: Runtime) -> Arc<RuntimeEnv> {
r.runtime
}
}
impl From<&Runtime> for Arc<RuntimeEnv> {
fn from(r: &Runtime) -> Arc<RuntimeEnv> {
r.runtime.clone()
}
}

View File

@@ -5,5 +5,5 @@ pub mod executor;
pub mod logical_optimizer;
pub mod physical_optimizer;
pub mod physical_planner;
mod plan;
pub mod plan;
pub mod query_engine;

View File

@@ -6,6 +6,7 @@ use datafusion::logical_plan::LogicalPlan as DfLogicalPlan;
use datatypes::schema::SchemaRef;
use crate::error::Result;
use crate::executor::Runtime;
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
@@ -20,11 +21,31 @@ pub enum LogicalPlan {
DfPlan(DfLogicalPlan),
}
/// Partitioning schemes supported by operators.
#[derive(Debug, Clone)]
pub enum Partitioning {
/// Unknown partitioning scheme with a known number of partitions
UnknownPartitioning(usize),
}
impl Partitioning {
/// Returns the number of partitions in this partitioning scheme
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
UnknownPartitioning(n) => *n,
}
}
}
#[async_trait::async_trait]
pub trait PhysicalPlan: Send + Sync + Any {
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
/// Specifies the output partitioning scheme of this plan
fn output_partitioning(&self) -> Partitioning;
/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
@@ -38,7 +59,11 @@ pub trait PhysicalPlan: Send + Sync + Any {
) -> Result<Arc<dyn PhysicalPlan>>;
/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
async fn execute(
&self,
_runtime: &Runtime,
partition: usize,
) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -1,5 +1,7 @@
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use crate::catalog::CatalogList;
use crate::error::Result;
use crate::plan::LogicalPlan;
@@ -14,7 +16,7 @@ use crate::query_engine::datafusion::DatafusionQueryEngine;
#[async_trait::async_trait]
pub trait QueryEngine {
fn name(&self) -> &str;
async fn execute(&self, plan: &LogicalPlan) -> Result<()>;
async fn execute(&self, plan: &LogicalPlan) -> Result<SendableRecordBatchStream>;
}
pub struct QueryEngineFactory {

View File

@@ -1,3 +1,18 @@
/// Query engine execution context
#[derive(Default, Debug)]
pub struct QueryContext;
use crate::query_engine::state::QueryEngineState;
#[derive(Debug)]
pub struct QueryContext {
state: QueryEngineState,
}
impl QueryContext {
pub fn new(state: QueryEngineState) -> Self {
Self { state }
}
#[inline]
pub fn state(&self) -> &QueryEngineState {
&self.state
}
}

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use snafu::{OptionExt, ResultExt};
use super::{context::QueryContext, state::QueryEngineState};
@@ -33,8 +34,8 @@ impl QueryEngine for DatafusionQueryEngine {
fn name(&self) -> &str {
"datafusion"
}
async fn execute(&self, plan: &LogicalPlan) -> Result<()> {
let mut ctx = QueryContext::default();
async fn execute(&self, plan: &LogicalPlan) -> Result<SendableRecordBatchStream> {
let mut ctx = QueryContext::new(self.state.clone());
let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?;
let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?;
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
@@ -117,10 +118,15 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
impl QueryExecutor for DatafusionQueryEngine {
async fn execute_stream(
&self,
_ctx: &QueryContext,
_plan: &Arc<dyn PhysicalPlan>,
) -> Result<()> {
let _runtime = self.state.df_context().runtime_env();
Ok(())
ctx: &QueryContext,
plan: &Arc<dyn PhysicalPlan>,
) -> Result<SendableRecordBatchStream> {
match plan.output_partitioning().partition_count() {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => Ok(plan.execute(&ctx.state().runtime(), 0).await?),
_ => {
unimplemented!();
}
}
}
}

View File

@@ -4,11 +4,11 @@ use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::{
error::Result as DfResult,
physical_plan::{
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning,
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning as DfPartitioning,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
@@ -17,7 +17,8 @@ use snafu::ResultExt;
use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
use crate::error::{self, Result};
use crate::plan::PhysicalPlan;
use crate::executor::Runtime;
use crate::plan::{Partitioning, PhysicalPlan};
/// Datafusion ExecutionPlan -> greptime PhysicalPlan
pub struct PhysicalPlanAdapter {
@@ -42,6 +43,11 @@ impl PhysicalPlan for PhysicalPlanAdapter {
self.schema.clone()
}
fn output_partitioning(&self) -> Partitioning {
//FIXME(dennis)
Partitioning::UnknownPartitioning(1)
}
fn children(&self) -> Vec<Arc<dyn PhysicalPlan>> {
let mut plans: Vec<Arc<dyn PhysicalPlan>> = vec![];
for p in self.plan.children() {
@@ -75,12 +81,14 @@ impl PhysicalPlan for PhysicalPlanAdapter {
)))
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
// FIXME(dennis) runtime
let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?;
async fn execute(
&self,
runtime: &Runtime,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let df_stream = self
.plan
.execute(partition, Arc::new(runtime))
.execute(partition, runtime.into())
.await
.context(error::DatafusionSnafu)?;
@@ -118,9 +126,9 @@ impl ExecutionPlan for ExecutionPlanAdapter {
self.schema.arrow_schema().clone()
}
fn output_partitioning(&self) -> Partitioning {
fn output_partitioning(&self) -> DfPartitioning {
// FIXME(dennis)
Partitioning::UnknownPartitioning(1)
DfPartitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
@@ -156,9 +164,9 @@ impl ExecutionPlan for ExecutionPlanAdapter {
async fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
runtime: Arc<RuntimeEnv>,
) -> DfResult<DfSendableRecordBatchStream> {
match self.plan.execute(partition).await {
match self.plan.execute(&runtime.into(), partition).await {
Ok(stream) => Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))),
Err(e) => Err(e.into()),
}

View File

@@ -1,4 +1,5 @@
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use datafusion::catalog::{
@@ -7,6 +8,7 @@ use datafusion::catalog::{
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use snafu::ResultExt;
use table::{
@@ -16,6 +18,7 @@ use table::{
use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider};
use crate::error::{self, Result};
use crate::executor::Runtime;
/// Query engine global state
#[derive(Clone)]
@@ -23,6 +26,13 @@ pub struct QueryEngineState {
df_context: ExecutionContext,
}
impl fmt::Debug for QueryEngineState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// TODO(dennis) better debug info
write!(f, "QueryEngineState: <datafusion context>")
}
}
impl QueryEngineState {
pub(crate) fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
let config = ExecutionConfig::new().with_default_catalog_and_schema("greptime", "public");
@@ -30,6 +40,7 @@ impl QueryEngineState {
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter {
catalog_list: catalog_list.clone(),
runtime: df_context.runtime_env(),
});
Self { df_context }
@@ -39,10 +50,16 @@ impl QueryEngineState {
pub(crate) fn df_context(&self) -> &ExecutionContext {
&self.df_context
}
#[inline]
pub(crate) fn runtime(&self) -> Runtime {
self.df_context.runtime_env().into()
}
}
/// Adapters between datafusion and greptime query engine.
struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
catalog_list: Arc<dyn CatalogList>,
}
@@ -58,9 +75,13 @@ impl DfCatalogList for DfCatalogListAdapter {
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_cataglog_provider: catalog,
runtime: self.runtime.clone(),
});
match self.catalog_list.register_catalog(name, catalog_adapter) {
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })),
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
})),
None => None,
}
}
@@ -71,7 +92,10 @@ impl DfCatalogList for DfCatalogListAdapter {
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
match self.catalog_list.catalog(name) {
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })),
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
})),
None => None,
}
}
@@ -80,6 +104,7 @@ impl DfCatalogList for DfCatalogListAdapter {
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl CatalogProvider for CatalogProviderAdapter {
@@ -93,9 +118,10 @@ impl CatalogProvider for CatalogProviderAdapter {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
match self.df_cataglog_provider.schema(name) {
Some(df_schema_provider) => {
Some(Arc::new(SchemaProviderAdapter { df_schema_provider }))
}
Some(df_schema_provider) => Some(Arc::new(SchemaProviderAdapter {
df_schema_provider,
runtime: self.runtime.clone(),
})),
None => None,
}
}
@@ -104,6 +130,7 @@ impl CatalogProvider for CatalogProviderAdapter {
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: Arc<dyn CatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
@@ -117,7 +144,10 @@ impl DfCatalogProvider for DfCatalogProviderAdapter {
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
match self.catalog_provider.schema(name) {
Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter { schema_provider })),
Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter {
schema_provider,
runtime: self.runtime.clone(),
})),
None => None,
}
}
@@ -126,6 +156,7 @@ impl DfCatalogProvider for DfCatalogProviderAdapter {
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
@@ -149,7 +180,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table));
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
match self.schema_provider.register_table(name, table) {
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
Ok(None) => Ok(None),
@@ -173,6 +204,7 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl SchemaProvider for SchemaProviderAdapter {
@@ -187,7 +219,10 @@ impl SchemaProvider for SchemaProviderAdapter {
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
match self.df_schema_provider.table(name) {
Some(table_provider) => Some(Arc::new(TableAdapter::new(table_provider))),
Some(table_provider) => Some(Arc::new(TableAdapter::new(
table_provider,
self.runtime.clone(),
))),
None => None,
}
}
@@ -203,7 +238,10 @@ impl SchemaProvider for SchemaProviderAdapter {
.register_table(name, table_provider)
.context(error::DatafusionSnafu)?
{
Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))),
Some(table) => Ok(Some(Arc::new(TableAdapter::new(
table,
self.runtime.clone(),
)))),
None => Ok(None),
}
}
@@ -214,7 +252,10 @@ impl SchemaProvider for SchemaProviderAdapter {
.deregister_table(name)
.context(error::DatafusionSnafu)?
{
Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))),
Some(table) => Ok(Some(Arc::new(TableAdapter::new(
table,
self.runtime.clone(),
)))),
None => Ok(None),
}
}

View File

@@ -0,0 +1,61 @@
use std::sync::Arc;
use arrow::array::UInt32Array;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::field_util::FieldExt;
use datafusion::field_util::SchemaExt;
use datafusion::logical_plan::LogicalPlanBuilder;
use futures_util::stream::TryStreamExt;
use query::catalog::memory::MemoryCatalogList;
use query::error::{RecordBatchSnafu, Result};
use query::plan::LogicalPlan;
use query::query_engine::QueryEngineFactory;
use snafu::ResultExt;
use table::table::adapter::DfTableProviderAdapter;
use table::table::numbers::NumbersTable;
#[tokio::test]
async fn test_datafusion_query_engine() -> Result<()> {
let catalog_list = Arc::new(MemoryCatalogList::default());
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();
let limit = 10;
let table = Arc::new(NumbersTable::default());
let table_provider = Arc::new(DfTableProviderAdapter::new(table.clone()));
let plan = LogicalPlan::DfPlan(
LogicalPlanBuilder::scan("numbers", table_provider, None)
.unwrap()
.limit(limit)
.unwrap()
.build()
.unwrap(),
);
let ret = engine.execute(&plan).await;
let numbers = collect(ret.unwrap()).await.unwrap();
assert_eq!(1, numbers.len());
assert_eq!(numbers[0].df_recordbatch.num_columns(), 1);
assert_eq!(1, numbers[0].schema.arrow_schema().fields().len());
assert_eq!("number", numbers[0].schema.arrow_schema().field(0).name());
let columns = numbers[0].df_recordbatch.columns();
assert_eq!(1, columns.len());
assert_eq!(columns[0].len(), limit);
let expected: Vec<u32> = (0u32..limit as u32).collect();
assert_eq!(
*columns[0].as_any().downcast_ref::<UInt32Array>().unwrap(),
UInt32Array::from_slice(&expected)
);
Ok(())
}
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
stream
.try_collect::<Vec<_>>()
.await
.context(RecordBatchSnafu)
}

View File

@@ -11,7 +11,7 @@ use datatypes::schema::{Schema, SchemaRef};
use crate::error::Result;
pub mod adapter;
pub mod memory;
pub mod numbers;
pub type TableId = u64;
pub type TableVersion = u64;

View File

@@ -17,7 +17,7 @@ use datafusion::datasource::{
TableType as DfTableType,
};
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::{
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning,
@@ -168,11 +168,15 @@ impl TableProvider for DfTableProviderAdapter {
/// Datafusion TableProvider -> greptime Table
pub struct TableAdapter {
table_provider: Arc<dyn TableProvider>,
runtime: Arc<RuntimeEnv>,
}
impl TableAdapter {
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
pub fn new(table_provider: Arc<dyn TableProvider>, runtime: Arc<RuntimeEnv>) -> Self {
Self {
table_provider,
runtime,
}
}
}
@@ -208,10 +212,9 @@ impl Table for TableAdapter {
.await
.context(error::DatafusionSnafu)?;
// FIXME(dennis) Partitioning and runtime
let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?;
// FIXME(dennis) Partitioning
let df_stream = execution_plan
.execute(0, Arc::new(runtime))
.execute(0, self.runtime.clone())
.await
.context(error::DatafusionSnafu)?;

View File

@@ -1 +0,0 @@

View File

@@ -0,0 +1,93 @@
use std::any::Any;
use std::pin::Pin;
use std::sync::Arc;
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::{Schema, SchemaRef};
use futures::task::{Context, Poll};
use futures::Stream;
use crate::error::Result;
use crate::table::{Expr, Table};
/// numbers table for test
pub struct NumbersTable {
schema: SchemaRef,
}
impl Default for NumbersTable {
fn default() -> Self {
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
"number",
DataType::UInt32,
false,
)]));
Self {
schema: Arc::new(Schema::new(arrow_schema)),
}
}
}
#[async_trait::async_trait]
impl Table for NumbersTable {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
async fn scan(
&self,
_projection: &Option<Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> Result<SendableRecordBatchStream> {
Ok(Box::pin(NumbersStream {
limit: limit.unwrap_or(100) as u32,
schema: self.schema.clone(),
already_run: false,
}))
}
}
// Limited numbers stream
struct NumbersStream {
limit: u32,
schema: SchemaRef,
already_run: bool,
}
impl RecordBatchStream for NumbersStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
impl Stream for NumbersStream {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.already_run {
return Poll::Ready(None);
}
self.already_run = true;
let numbers: Vec<u32> = (0..self.limit).collect();
let batch = DfRecordBatch::try_new(
self.schema.arrow_schema().clone(),
vec![Arc::new(UInt32Array::from_slice(&numbers))],
)
.unwrap();
Poll::Ready(Some(Ok(RecordBatch {
schema: self.schema.clone(),
df_recordbatch: batch,
})))
}
}