feat: query engine impl on datafusion (#10)

* feat: query engine impl on datafusion

* feat: adds physical_optimizer, physical_planner and executor

* feat: impl adpaters between datafuion and greptime query engine core APIs.

* feat: impl PhysicalPlanAdapter and ExecutionPlanAdapter

* feat: rename table datafusion mod to adapter

* fix: clippy warning

* fix: conflicts with develop branch

* feat: add database mod

* fix: CR comment

* fix: by CR comments

* fix: conflicts with develop branch

* fix: by CR comments
This commit is contained in:
dennis zhuang
2022-04-26 15:17:32 +08:00
committed by GitHub
parent e334e55bf7
commit 3a2f794f6c
35 changed files with 2597 additions and 8 deletions

1286
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,8 @@
[workspace]
members = [
"src/common/base",
"src/common/query",
"src/common/recordbatch",
"src/datanode",
"src/datatypes",
"src/log-store",

View File

@@ -0,0 +1,7 @@
[package]
name = "common-query"
version = "0.1.0"
edition = "2021"
[dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}

View File

@@ -0,0 +1 @@
pub mod logical_plan;

View File

@@ -0,0 +1,18 @@
use datafusion::logical_plan::Expr as DfExpr;
/// Central struct of query API.
/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
#[derive(Clone, PartialEq, Hash)]
pub struct Expr {
df_expr: DfExpr,
}
impl Expr {
pub fn new(df_expr: DfExpr) -> Self {
Self { df_expr }
}
pub fn df_expr(&self) -> &DfExpr {
&self.df_expr
}
}

View File

@@ -0,0 +1,3 @@
mod expr;
pub use self::expr::Expr;

View File

@@ -0,0 +1,16 @@
[package]
name = "common-recordbatch"
version = "0.1.0"
edition = "2021"
[dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
[dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
datatypes = {path ="../../datatypes" }
futures = "0.3"
snafu = "0.7.0"

View File

@@ -0,0 +1,10 @@
use arrow::error::ArrowError;
use snafu::Snafu;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Arrow error: {}", source))]
Arrow { source: ArrowError },
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -0,0 +1,15 @@
pub mod error;
mod recordbatch;
use std::pin::Pin;
use datatypes::schema::SchemaRef;
use error::Result;
use futures::Stream;
pub use recordbatch::RecordBatch;
pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;

View File

@@ -0,0 +1,10 @@
use std::sync::Arc;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::Schema;
#[derive(Clone, Debug)]
pub struct RecordBatch {
pub schema: Arc<Schema>,
pub df_recordbatch: DfRecordBatch,
}

View File

@@ -3,9 +3,8 @@ name = "datatypes"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow2 = "0.10"
common-base = { path = "../common/base" }
paste = "1.0"
serde ={ version = "1.0.136", features = ["derive"] }

View File

@@ -3,7 +3,6 @@
mod data_type;
pub mod prelude;
mod scalars;
mod schema;
pub mod type_id;
mod types;
pub mod value;
@@ -13,3 +12,4 @@ use arrow2::array::{BinaryArray, MutableBinaryArray};
pub type LargeBinaryArray = BinaryArray<i64>;
pub type MutableLargeBinaryArray = MutableBinaryArray<i64>;
pub mod schema;

View File

@@ -1 +1,25 @@
use std::sync::Arc;
use arrow2::datatypes::Schema as ArrowSchema;
#[derive(Debug, Clone)]
pub struct Schema {
arrow_schema: Arc<ArrowSchema>,
}
impl Schema {
pub fn new(arrow_schema: Arc<ArrowSchema>) -> Self {
Self { arrow_schema }
}
pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
&self.arrow_schema
}
}
pub type SchemaRef = Arc<Schema>;
impl From<Arc<ArrowSchema>> for Schema {
fn from(s: Arc<ArrowSchema>) -> Schema {
Schema::new(s)
}
}

View File

@@ -6,3 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
common-recordbatch = {path = "../common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datatypes = {path = "../datatypes" }
futures = "0.3"
snafu = "0.7.0"
table = { path = "../table" }
tokio = "1.0"

39
src/query/src/catalog.rs Normal file
View File

@@ -0,0 +1,39 @@
pub mod schema;
use std::any::Any;
use std::sync::Arc;
use crate::catalog::schema::SchemaProvider;
/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {
/// Returns the catalog list as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Adds a new catalog to this catalog list
/// If a catalog of the same name existed before, it is replaced in the list and returned.
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;
/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Vec<String>;
/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}
/// Represents a catalog, comprising a number of named schemas.
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;
/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
}

View File

@@ -0,0 +1,35 @@
use std::any::Any;
use table::TableRef;
use crate::error::Result;
/// Represents a schema, comprising a number of named tables.
pub trait SchemaProvider: Sync + Send {
/// Returns the schema provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String>;
/// Retrieves a specific table from the schema by name, provided it exists.
fn table(&self, name: &str) -> Option<TableRef>;
/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, _name: String, _table: TableRef) -> Result<Option<TableRef>> {
todo!();
}
/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, _name: &str) -> Result<Option<TableRef>> {
todo!();
}
/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.
/// Otherwise, return true.
fn table_exist(&self, name: &str) -> bool;
}

View File

@@ -0,0 +1 @@

20
src/query/src/error.rs Normal file
View File

@@ -0,0 +1,20 @@
use datafusion::error::DataFusionError;
use snafu::Snafu;
/// business error of query engine
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Datafusion query engine error: {}", source))]
Datafusion { source: DataFusionError },
#[snafu(display("PhysicalPlan downcast_ref failed"))]
PhysicalPlanDowncast,
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
DataFusionError::External(Box::new(e))
}
}

View File

@@ -1 +1,9 @@
use std::sync::Arc;
use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};
/// Executor to run [ExecutionPlan].
#[async_trait::async_trait]
pub trait QueryExecutor {
async fn execute_stream(&self, ctx: &QueryContext, plan: &Arc<dyn PhysicalPlan>) -> Result<()>;
}

View File

@@ -1,4 +1,9 @@
pub mod catalog;
pub mod database;
pub mod error;
pub mod executor;
pub mod logical_optimizer;
pub mod physical_optimizer;
pub mod physical_planner;
mod plan;
pub mod query_engine;

View File

@@ -1 +1,11 @@
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::query_engine::QueryContext;
pub trait LogicalOptimizer {
fn optimize_logical_plan(
&self,
ctx: &mut QueryContext,
plan: &LogicalPlan,
) -> Result<LogicalPlan>;
}

View File

@@ -1 +1,11 @@
use std::sync::Arc;
use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};
pub trait PhysicalOptimizer {
fn optimize_physical_plan(
&self,
ctx: &mut QueryContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>>;
}

View File

@@ -1 +1,17 @@
use std::sync::Arc;
use crate::error::Result;
use crate::plan::{LogicalPlan, PhysicalPlan};
use crate::query_engine::QueryContext;
/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
#[async_trait::async_trait]
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
async fn create_physical_plan(
&self,
ctx: &mut QueryContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>>;
}

44
src/query/src/plan.rs Normal file
View File

@@ -0,0 +1,44 @@
use std::any::Any;
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::logical_plan::LogicalPlan as DfLogicalPlan;
use datatypes::schema::SchemaRef;
use crate::error::Result;
/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner.
///
/// A LogicalPlan represents transforming an input relation (table) to
/// an output relation (table) with a (potentially) different
/// schema. A plan represents a dataflow tree where data flows
/// from leaves up to the root to produce the query result.
#[derive(Clone)]
pub enum LogicalPlan {
DfPlan(DfLogicalPlan),
}
#[async_trait::async_trait]
pub trait PhysicalPlan: Send + Sync + Any {
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;
/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
fn children(&self) -> Vec<Arc<dyn PhysicalPlan>>;
/// Returns a new plan where all children were replaced by new plans.
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
fn with_new_children(
&self,
children: Vec<Arc<dyn PhysicalPlan>>,
) -> Result<Arc<dyn PhysicalPlan>>;
/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
fn as_any(&self) -> &dyn Any;
}

View File

@@ -0,0 +1,36 @@
use std::sync::Arc;
use crate::catalog::CatalogList;
use crate::error::Result;
use crate::plan::LogicalPlan;
mod context;
mod datafusion;
mod state;
pub use context::QueryContext;
use crate::query_engine::datafusion::DatafusionQueryEngine;
#[async_trait::async_trait]
pub trait QueryEngine {
fn name(&self) -> &str;
async fn execute(&self, plan: &LogicalPlan) -> Result<()>;
}
pub struct QueryEngineFactory {
query_engine: Arc<dyn QueryEngine>,
}
impl QueryEngineFactory {
pub fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
Self {
query_engine: Arc::new(DatafusionQueryEngine::new(catalog_list)),
}
}
}
impl QueryEngineFactory {
pub fn query_engine(&self) -> &Arc<dyn QueryEngine> {
&self.query_engine
}
}

View File

@@ -0,0 +1,3 @@
/// Query engine execution context
#[derive(Default, Debug)]
pub struct QueryContext;

View File

@@ -0,0 +1,126 @@
use std::sync::Arc;
use snafu::{OptionExt, ResultExt};
use super::{context::QueryContext, state::QueryEngineState};
use crate::{
catalog::CatalogList,
error::{self, Result},
executor::QueryExecutor,
logical_optimizer::LogicalOptimizer,
physical_optimizer::PhysicalOptimizer,
physical_planner::PhysicalPlanner,
plan::{LogicalPlan, PhysicalPlan},
query_engine::datafusion::adapter::PhysicalPlanAdapter,
query_engine::QueryEngine,
};
mod adapter;
pub(crate) struct DatafusionQueryEngine {
state: QueryEngineState,
}
impl DatafusionQueryEngine {
pub fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
Self {
state: QueryEngineState::new(catalog_list),
}
}
}
#[async_trait::async_trait]
impl QueryEngine for DatafusionQueryEngine {
fn name(&self) -> &str {
"datafusion"
}
async fn execute(&self, plan: &LogicalPlan) -> Result<()> {
let mut ctx = QueryContext::default();
let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?;
let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?;
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
Ok(self.execute_stream(&ctx, &physical_plan).await?)
}
}
impl LogicalOptimizer for DatafusionQueryEngine {
fn optimize_logical_plan(
&self,
_ctx: &mut QueryContext,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::DfPlan(df_plan) => {
let optimized_plan = self
.state
.df_context()
.optimize(df_plan)
.context(error::DatafusionSnafu)?;
Ok(LogicalPlan::DfPlan(optimized_plan))
}
}
}
}
#[async_trait::async_trait]
impl PhysicalPlanner for DatafusionQueryEngine {
async fn create_physical_plan(
&self,
_ctx: &mut QueryContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>> {
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let physical_plan = self
.state
.df_context()
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(physical_plan.schema().into()),
physical_plan,
)))
}
}
}
}
impl PhysicalOptimizer for DatafusionQueryEngine {
fn optimize_physical_plan(
&self,
_ctx: &mut QueryContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>> {
let config = &self.state.df_context().state.lock().config;
let optimizers = &config.physical_optimizers;
let mut new_plan = plan
.as_any()
.downcast_ref::<PhysicalPlanAdapter>()
.context(error::PhysicalPlanDowncastSnafu)?
.df_plan()
.clone();
for optimizer in optimizers {
new_plan = optimizer
.optimize(new_plan, config)
.context(error::DatafusionSnafu)?;
}
Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan)))
}
}
#[async_trait::async_trait]
impl QueryExecutor for DatafusionQueryEngine {
async fn execute_stream(
&self,
_ctx: &QueryContext,
_plan: &Arc<dyn PhysicalPlan>,
) -> Result<()> {
let _runtime = self.state.df_context().runtime_env();
Ok(())
}
}

View File

@@ -0,0 +1,171 @@
use std::any::Any;
use std::fmt::{self, Debug};
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::{
error::Result as DfResult,
physical_plan::{
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
},
};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
use crate::error::{self, Result};
use crate::plan::PhysicalPlan;
/// Datafusion ExecutionPlan -> greptime PhysicalPlan
pub struct PhysicalPlanAdapter {
plan: Arc<dyn ExecutionPlan>,
schema: SchemaRef,
}
impl PhysicalPlanAdapter {
pub fn new(schema: SchemaRef, plan: Arc<dyn ExecutionPlan>) -> Self {
Self { schema, plan }
}
#[inline]
pub fn df_plan(&self) -> &Arc<dyn ExecutionPlan> {
&self.plan
}
}
#[async_trait::async_trait]
impl PhysicalPlan for PhysicalPlanAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn PhysicalPlan>> {
let mut plans: Vec<Arc<dyn PhysicalPlan>> = vec![];
for p in self.plan.children() {
let plan = PhysicalPlanAdapter::new(self.schema.clone(), p);
plans.push(Arc::new(plan));
}
plans
}
fn with_new_children(
&self,
children: Vec<Arc<dyn PhysicalPlan>>,
) -> Result<Arc<dyn PhysicalPlan>> {
let mut df_children: Vec<Arc<dyn ExecutionPlan>> = Vec::with_capacity(children.len());
for plan in children {
let p = Arc::new(ExecutionPlanAdapter {
plan,
schema: self.schema.clone(),
});
df_children.push(p);
}
let plan = self
.plan
.with_new_children(df_children)
.context(error::DatafusionSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(
self.schema.clone(),
plan,
)))
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
// FIXME(dennis) runtime
let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?;
let df_stream = self
.plan
.execute(partition, Arc::new(runtime))
.await
.context(error::DatafusionSnafu)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream)))
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Greptime PhysicalPlan -> datafusion ExecutionPlan.
struct ExecutionPlanAdapter {
plan: Arc<dyn PhysicalPlan>,
schema: SchemaRef,
}
impl Debug for ExecutionPlanAdapter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
//TODO(dennis) better debug info
write!(f, "ExecutionPlan(PlaceHolder)")
}
}
unsafe impl Send for ExecutionPlanAdapter {}
unsafe impl Sync for ExecutionPlanAdapter {}
#[async_trait::async_trait]
impl ExecutionPlan for ExecutionPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> DfSchemaRef {
self.schema.arrow_schema().clone()
}
fn output_partitioning(&self) -> Partitioning {
// FIXME(dennis)
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// FIXME(dennis)
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// TODO(dennis)
vec![]
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let mut gt_children: Vec<Arc<dyn PhysicalPlan>> = Vec::with_capacity(children.len());
for plan in children {
let p = Arc::new(PhysicalPlanAdapter::new(self.schema.clone(), plan));
gt_children.push(p);
}
match self.plan.with_new_children(gt_children) {
Ok(plan) => Ok(Arc::new(ExecutionPlanAdapter {
schema: self.schema.clone(),
plan,
})),
Err(e) => Err(e.into()),
}
}
async fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
) -> DfResult<DfSendableRecordBatchStream> {
match self.plan.execute(partition).await {
Ok(stream) => Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))),
Err(e) => Err(e.into()),
}
}
fn statistics(&self) -> Statistics {
//TODO(dennis)
Statistics::default()
}
}

View File

@@ -0,0 +1,225 @@
use std::any::Any;
use std::sync::Arc;
use datafusion::catalog::{
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
schema::SchemaProvider as DfSchemaProvider,
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
Table,
};
use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider};
use crate::error::{self, Result};
/// Query engine global state
#[derive(Clone)]
pub struct QueryEngineState {
df_context: ExecutionContext,
}
impl QueryEngineState {
pub(crate) fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
let config = ExecutionConfig::new().with_default_catalog_and_schema("greptime", "public");
let df_context = ExecutionContext::with_config(config);
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter {
catalog_list: catalog_list.clone(),
});
Self { df_context }
}
#[inline]
pub(crate) fn df_context(&self) -> &ExecutionContext {
&self.df_context
}
}
/// Adapters between datafusion and greptime query engine.
struct DfCatalogListAdapter {
catalog_list: Arc<dyn CatalogList>,
}
impl DfCatalogList for DfCatalogListAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn DfCatalogProvider>,
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_cataglog_provider: catalog,
});
match self.catalog_list.register_catalog(name, catalog_adapter) {
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })),
None => None,
}
}
fn catalog_names(&self) -> Vec<String> {
self.catalog_list.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
match self.catalog_list.catalog(name) {
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter { catalog_provider })),
None => None,
}
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
}
impl CatalogProvider for CatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.df_cataglog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
match self.df_cataglog_provider.schema(name) {
Some(df_schema_provider) => {
Some(Arc::new(SchemaProviderAdapter { df_schema_provider }))
}
None => None,
}
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: Arc<dyn CatalogProvider>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
match self.catalog_provider.schema(name) {
Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter { schema_provider })),
None => None,
}
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
match self.schema_provider.table(name) {
Some(table) => Some(Arc::new(DfTableProviderAdapter::new(table))),
None => None,
}
}
fn register_table(
&self,
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table));
match self.schema_provider.register_table(name, table) {
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
match self.schema_provider.deregister_table(name) {
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider.table_exist(name)
}
}
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
}
impl SchemaProvider for SchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String> {
self.df_schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
match self.df_schema_provider.table(name) {
Some(table_provider) => Some(Arc::new(TableAdapter::new(table_provider))),
None => None,
}
}
fn register_table(
&self,
name: String,
table: Arc<dyn Table>,
) -> Result<Option<Arc<dyn Table>>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
match self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu)?
{
Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))),
None => Ok(None),
}
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
match self
.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu)?
{
Some(table) => Ok(Some(Arc::new(TableAdapter::new(table)))),
None => Ok(None),
}
}
fn table_exist(&self, name: &str) -> bool {
self.df_schema_provider.table_exist(name)
}
}

View File

@@ -3,7 +3,19 @@ name = "table"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
common-query = {path = "../common/query" }
common-recordbatch = {path = "../common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
datatypes = { path = "../datatypes" }
futures = "0.3"
serde = "1.0.136"
snafu = "0.7.0"

18
src/table/src/error.rs Normal file
View File

@@ -0,0 +1,18 @@
use datafusion::error::DataFusionError;
use snafu::Snafu;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Datafusion error: {}", source))]
Datafusion { source: DataFusionError },
#[snafu(display("Not expected to run ExecutionPlan more than once."))]
ExecuteRepeatedly,
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
DataFusionError::External(Box::new(e))
}
}

View File

@@ -1,5 +1,5 @@
mod engine;
pub mod error;
pub mod table;
/// Table abstraction.
#[async_trait::async_trait]
pub trait Table: Send + Sync {}
pub use crate::table::{Table, TableRef};

104
src/table/src/table.rs Normal file
View File

@@ -0,0 +1,104 @@
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use common_query::logical_plan::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::{Schema, SchemaRef};
use crate::error::Result;
pub mod adapter;
pub mod memory;
pub type TableId = u64;
pub type TableVersion = u64;
/// Indicates whether and how a filter expression can be handled by a
/// Table for table scans.
#[derive(Debug, Clone, PartialEq)]
pub enum TableProviderFilterPushDown {
/// The expression cannot be used by the provider.
Unsupported,
/// The expression can be used to help minimise the data retrieved,
/// but the provider cannot guarantee that all returned tuples
/// satisfy the filter. The Filter plan node containing this expression
/// will be preserved.
Inexact,
/// The provider guarantees that all returned data satisfies this
/// filter expression. The Filter plan node containing this expression
/// will be removed.
Exact,
}
/// Indicates the type of this table for metadata/catalog purposes.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum TableType {
/// An ordinary physical table.
Base,
/// A non-materialised table that itself uses a query internally to provide data.
View,
/// A transient table.
Temporary,
}
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq, Default)]
pub struct TableIdent {
pub table_id: TableId,
pub version: TableVersion,
}
#[derive(Debug)]
pub struct TableInfo {
pub ident: TableIdent,
pub name: String,
pub desc: Option<String>,
pub meta: TableMeta,
}
#[derive(Clone, Debug)]
pub struct TableMeta {
pub schema: Arc<Schema>,
pub engine: String,
pub engine_options: HashMap<String, String>,
pub options: HashMap<String, String>,
pub created_on: DateTime<Utc>,
}
/// Table abstraction.
#[async_trait::async_trait]
pub trait Table: Send + Sync {
/// Returns the table as [`Any`](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
/// Get a reference to the schema for this table
fn schema(&self) -> SchemaRef;
/// Get the type of this table for metadata/catalog purposes.
fn table_type(&self) -> TableType {
TableType::Base
}
/// Scan the table and returns a SendableRecordBatchStream.
async fn scan(
&self,
projection: &Option<Vec<usize>>,
filters: &[Expr],
// limit can be used to reduce the amount scanned
// from the datasource as a performance optimization.
// If set, it contains the amount of rows needed by the `LogicalPlan`,
// The datasource should return *at least* this number of rows if available.
limit: Option<usize>,
) -> Result<SendableRecordBatchStream>;
/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(&self, _filter: &Expr) -> Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}
pub type TableRef = Arc<dyn Table>;

View File

@@ -0,0 +1,308 @@
use core::pin::Pin;
use core::task::{Context, Poll};
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
use std::mem;
use std::sync::{Arc, Mutex};
use arrow::error::{ArrowError, Result as ArrowResult};
use common_query::logical_plan::Expr;
use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
/// Datafusion table adpaters
use datafusion::datasource::{
datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider,
TableType as DfTableType,
};
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::{
expressions::PhysicalSortExpr, ExecutionPlan, Partitioning,
RecordBatchStream as DfRecordBatchStream,
SendableRecordBatchStream as DfSendableRecordBatchStream, Statistics,
};
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::SchemaRef as TableSchemaRef;
use datatypes::schema::{Schema, SchemaRef};
use futures::Stream;
use snafu::prelude::*;
use super::{Table, TableProviderFilterPushDown, TableRef, TableType};
use crate::error::{self, Result};
/// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan.
struct ExecutionPlanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
}
impl Debug for ExecutionPlanAdapter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
//TODO(dennis) better debug info
write!(f, "ExecutionPlan(PlaceHolder)")
}
}
#[async_trait::async_trait]
impl ExecutionPlan for ExecutionPlanAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> DfSchemaRef {
self.schema.arrow_schema().clone()
}
fn output_partitioning(&self) -> Partitioning {
// FIXME(dennis)
Partitioning::UnknownPartitioning(1)
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
// FIXME(dennis)
None
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
// TODO(dennis)
vec![]
}
fn with_new_children(
&self,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
// TODO(dennis)
todo!();
}
async fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
) -> DfResult<DfSendableRecordBatchStream> {
let mut stream = self.stream.lock().unwrap();
if stream.is_some() {
let stream = mem::replace(&mut *stream, None);
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap())))
} else {
error::ExecuteRepeatedlySnafu
.fail()
.map_err(|e| DataFusionError::External(Box::new(e)))
}
}
fn statistics(&self) -> Statistics {
//TODO(dennis)
Statistics::default()
}
}
/// Greptime Table -> datafusion TableProvider
pub struct DfTableProviderAdapter {
table: TableRef,
}
impl DfTableProviderAdapter {
pub fn new(table: TableRef) -> Self {
Self { table }
}
}
#[async_trait::async_trait]
impl TableProvider for DfTableProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> DfSchemaRef {
self.table.schema().arrow_schema().clone()
}
fn table_type(&self) -> DfTableType {
match self.table.table_type() {
TableType::Base => DfTableType::Base,
TableType::View => DfTableType::View,
TableType::Temporary => DfTableType::Temporary,
}
}
async fn scan(
&self,
projection: &Option<Vec<usize>>,
filters: &[DfExpr],
limit: Option<usize>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let filters: Vec<Expr> = filters.iter().map(Clone::clone).map(Expr::new).collect();
match self.table.scan(projection, &filters, limit).await {
Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter {
schema: stream.schema(),
stream: Mutex::new(Some(stream)),
})),
Err(e) => Err(e.into()),
}
}
fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult<DfTableProviderFilterPushDown> {
match self
.table
.supports_filter_pushdown(&Expr::new(filter.clone()))
{
Ok(p) => match p {
TableProviderFilterPushDown::Unsupported => {
Ok(DfTableProviderFilterPushDown::Unsupported)
}
TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact),
TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact),
},
Err(e) => Err(e.into()),
}
}
}
/// Datafusion TableProvider -> greptime Table
pub struct TableAdapter {
table_provider: Arc<dyn TableProvider>,
}
impl TableAdapter {
pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
Self { table_provider }
}
}
#[async_trait::async_trait]
impl Table for TableAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> TableSchemaRef {
Arc::new(self.table_provider.schema().into())
}
fn table_type(&self) -> TableType {
match self.table_provider.table_type() {
DfTableType::Base => TableType::Base,
DfTableType::View => TableType::View,
DfTableType::Temporary => TableType::Temporary,
}
}
async fn scan(
&self,
projection: &Option<Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<SendableRecordBatchStream> {
let filters: Vec<DfExpr> = filters.iter().map(|e| e.df_expr().clone()).collect();
let execution_plan = self
.table_provider
.scan(projection, &filters, limit)
.await
.context(error::DatafusionSnafu)?;
// FIXME(dennis) Partitioning and runtime
let runtime = RuntimeEnv::new(RuntimeConfig::default()).context(error::DatafusionSnafu)?;
let df_stream = execution_plan
.execute(0, Arc::new(runtime))
.await
.context(error::DatafusionSnafu)?;
Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream)))
}
fn supports_filter_pushdown(&self, filter: &Expr) -> Result<TableProviderFilterPushDown> {
match self
.table_provider
.supports_filter_pushdown(filter.df_expr())
.context(error::DatafusionSnafu)?
{
DfTableProviderFilterPushDown::Unsupported => {
Ok(TableProviderFilterPushDown::Unsupported)
}
DfTableProviderFilterPushDown::Inexact => Ok(TableProviderFilterPushDown::Inexact),
DfTableProviderFilterPushDown::Exact => Ok(TableProviderFilterPushDown::Exact),
}
}
}
/// Greptime SendableRecordBatchStream -> datafusion RecordBatchStream
pub struct DfRecordBatchStreamAdapter {
stream: SendableRecordBatchStream,
}
impl DfRecordBatchStreamAdapter {
pub fn new(stream: SendableRecordBatchStream) -> Self {
Self { stream }
}
}
impl DfRecordBatchStream for DfRecordBatchStreamAdapter {
fn schema(&self) -> DfSchemaRef {
self.stream.schema().arrow_schema().clone()
}
}
impl Stream for DfRecordBatchStreamAdapter {
type Item = ArrowResult<DfRecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(recordbatch)) => match recordbatch {
Ok(recordbatch) => Poll::Ready(Some(Ok(recordbatch.df_recordbatch))),
Err(e) => Poll::Ready(Some(Err(ArrowError::External("".to_owned(), Box::new(e))))),
},
Poll::Ready(None) => Poll::Ready(None),
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
/// Datafusion SendableRecordBatchStream to greptime RecordBatchStream
pub struct RecordBatchStreamAdapter {
stream: DfSendableRecordBatchStream,
}
impl RecordBatchStreamAdapter {
pub fn new(stream: DfSendableRecordBatchStream) -> Self {
Self { stream }
}
}
impl RecordBatchStream for RecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
Arc::new(Schema::new(self.stream.schema()))
}
}
impl Stream for RecordBatchStreamAdapter {
type Item = RecordBatchResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_recordbatch)) => Poll::Ready(Some(Ok(RecordBatch {
schema: self.schema(),
df_recordbatch: df_recordbatch.context(recordbatch_error::ArrowSnafu)?,
}))),
Poll::Ready(None) => Poll::Ready(None),
}
}
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}

View File

@@ -0,0 +1 @@