mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
feat: http server and cmd crate etc. (#15)
* feat: adds cmd crate and http server * feat: impl sql http handler * feat: convert all arrow array types * feat: adds query test * feat: adds test for datanode * fix: format * feat: refactor state.rs * feat: adds collect test * fix: by code review * fix: style
This commit is contained in:
13
src/cmd/Cargo.toml
Normal file
13
src/cmd/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "cmd"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "greptime"
|
||||
path = "src/bin/greptime.rs"
|
||||
|
||||
[dependencies]
|
||||
clap = { version = "3.1", features = ["derive"] }
|
||||
datanode = { path = "../datanode" }
|
||||
tokio = { version = "1.18.0", features = ["full"] }
|
||||
20
src/cmd/src/bin/greptime.rs
Normal file
20
src/cmd/src/bin/greptime.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use clap::Parser;
|
||||
use cmd::opts::{GrepTimeOpts, NodeType};
|
||||
use datanode::DataNode;
|
||||
|
||||
async fn datanode_main(_opts: &GrepTimeOpts) {
|
||||
let data_node = DataNode::new().unwrap();
|
||||
|
||||
if let Err(e) = data_node.start().await {
|
||||
println!("Fail to start data node, error: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let opts = GrepTimeOpts::parse();
|
||||
|
||||
match opts.node_type {
|
||||
NodeType::Data => datanode_main(&opts).await,
|
||||
}
|
||||
}
|
||||
1
src/cmd/src/lib.rs
Normal file
1
src/cmd/src/lib.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod opts;
|
||||
15
src/cmd/src/opts.rs
Normal file
15
src/cmd/src/opts.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
//! greptime commandline options
|
||||
use clap::{ArgEnum, Parser};
|
||||
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ArgEnum)]
|
||||
pub enum NodeType {
|
||||
/// Data node
|
||||
Data,
|
||||
}
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(author, version, about, long_about = None)]
|
||||
pub struct GrepTimeOpts {
|
||||
#[clap(name = "type", short, long, arg_enum)]
|
||||
pub node_type: NodeType,
|
||||
}
|
||||
@@ -6,11 +6,21 @@ edition = "2021"
|
||||
[dependencies.arrow]
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
[dependencies]
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
|
||||
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
|
||||
datatypes = {path ="../../datatypes" }
|
||||
futures = "0.3"
|
||||
snafu = "0.7.0"
|
||||
paste = "1.0"
|
||||
serde = "1.0"
|
||||
snafu = "0.7"
|
||||
|
||||
[dev-dependencies.arrow]
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
pub mod error;
|
||||
mod recordbatch;
|
||||
pub mod util;
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
|
||||
@@ -1,10 +1,77 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::{
|
||||
BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
|
||||
UInt16Array, UInt32Array, UInt64Array, UInt8Array, Utf8Array,
|
||||
};
|
||||
use arrow::datatypes::DataType;
|
||||
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
|
||||
use datatypes::schema::Schema;
|
||||
use paste::paste;
|
||||
use serde::ser::SerializeStruct;
|
||||
use serde::{Serialize, Serializer};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct RecordBatch {
|
||||
pub schema: Arc<Schema>,
|
||||
pub df_recordbatch: DfRecordBatch,
|
||||
}
|
||||
|
||||
macro_rules! collect_columns {
|
||||
($array: ident, $columns: ident, $($data_type: expr), +) => {
|
||||
paste! {
|
||||
match $array.data_type() {
|
||||
$(DataType::$data_type => {
|
||||
if let Some(array) = $array.as_any().downcast_ref::<[<$data_type Array>]>() {
|
||||
$columns.push(Column::$data_type(array.values().as_slice()));
|
||||
}
|
||||
})+,
|
||||
DataType::Utf8 => {
|
||||
if let Some(array) = $array.as_any().downcast_ref::<Utf8Array<i32>>() {
|
||||
$columns.push(Column::Utf8(array.values().as_slice()));
|
||||
}
|
||||
},
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
enum Column<'a> {
|
||||
Int64(&'a [i64]),
|
||||
Int32(&'a [i32]),
|
||||
Int16(&'a [i16]),
|
||||
Int8(&'a [i8]),
|
||||
UInt64(&'a [u64]),
|
||||
UInt32(&'a [u32]),
|
||||
UInt16(&'a [u16]),
|
||||
UInt8(&'a [u8]),
|
||||
Float64(&'a [f64]),
|
||||
Float32(&'a [f32]),
|
||||
Boolean((&'a [u8], usize, usize)),
|
||||
Utf8(&'a [u8]),
|
||||
}
|
||||
|
||||
/// TODO(dennis): should be implemented in datatypes
|
||||
impl Serialize for RecordBatch {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let mut s = serializer.serialize_struct("record", 2)?;
|
||||
s.serialize_field("schema", &self.schema.arrow_schema())?;
|
||||
|
||||
let df_columns = self.df_recordbatch.columns();
|
||||
let mut columns: Vec<Column> = Vec::with_capacity(df_columns.len());
|
||||
for array in df_columns {
|
||||
collect_columns!(
|
||||
array, columns, Int64, Int32, Int16, Int8, UInt64, UInt32, UInt16, UInt8, Float64,
|
||||
Float32, Boolean
|
||||
);
|
||||
}
|
||||
s.serialize_field("columns", &columns)?;
|
||||
|
||||
s.end()
|
||||
}
|
||||
}
|
||||
|
||||
90
src/common/recordbatch/src/util.rs
Normal file
90
src/common/recordbatch/src/util.rs
Normal file
@@ -0,0 +1,90 @@
|
||||
use futures::TryStreamExt;
|
||||
|
||||
use crate::{error::Result, RecordBatch, SendableRecordBatchStream};
|
||||
|
||||
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
|
||||
stream.try_collect::<Vec<_>>().await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::UInt32Array;
|
||||
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
|
||||
use datafusion_common::field_util::SchemaExt;
|
||||
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
|
||||
use datatypes::schema::Schema;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures::Stream;
|
||||
|
||||
use super::*;
|
||||
use crate::RecordBatchStream;
|
||||
|
||||
struct MockRecordBatchStream {
|
||||
batch: Option<RecordBatch>,
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for MockRecordBatchStream {
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for MockRecordBatchStream {
|
||||
type Item = Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let batch = mem::replace(&mut self.batch, None);
|
||||
|
||||
if let Some(batch) = batch {
|
||||
Poll::Ready(Some(Ok(batch)))
|
||||
} else {
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_collect() {
|
||||
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
|
||||
"number",
|
||||
DataType::UInt32,
|
||||
false,
|
||||
)]));
|
||||
let schema = Arc::new(Schema::new(arrow_schema.clone()));
|
||||
|
||||
let stream = MockRecordBatchStream {
|
||||
schema: schema.clone(),
|
||||
batch: None,
|
||||
};
|
||||
|
||||
let batches = collect(Box::pin(stream)).await.unwrap();
|
||||
assert_eq!(0, batches.len());
|
||||
|
||||
let numbers: Vec<u32> = (0..10).collect();
|
||||
let df_batch = DfRecordBatch::try_new(
|
||||
arrow_schema.clone(),
|
||||
vec![Arc::new(UInt32Array::from_slice(&numbers))],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let batch = RecordBatch {
|
||||
schema: schema.clone(),
|
||||
df_recordbatch: df_batch,
|
||||
};
|
||||
|
||||
let stream = MockRecordBatchStream {
|
||||
schema: Arc::new(Schema::new(arrow_schema)),
|
||||
batch: Some(batch.clone()),
|
||||
};
|
||||
let batches = collect(Box::pin(stream)).await.unwrap();
|
||||
assert_eq!(1, batches.len());
|
||||
|
||||
assert_eq!(batch, batches[0]);
|
||||
}
|
||||
}
|
||||
@@ -6,4 +6,20 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
axum = "0.5"
|
||||
axum-macros = "0.2"
|
||||
common-recordbatch = {path = "../common/recordbatch" }
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
query = { path = "../query" }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
snafu = "0.7"
|
||||
table = { path = "../table" }
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
tower = { version = "0.4", features = ["full"]}
|
||||
tower-http = { version ="0.3", features = ["full"]}
|
||||
|
||||
[dev-dependencies.arrow]
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
33
src/datanode/src/datanode.rs
Normal file
33
src/datanode/src/datanode.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use query::catalog::memory;
|
||||
use query::catalog::CatalogListRef;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{QuerySnafu, Result};
|
||||
use crate::instance::{Instance, InstanceRef};
|
||||
use crate::server::Services;
|
||||
|
||||
/// DataNode service.
|
||||
pub struct DataNode {
|
||||
services: Services,
|
||||
_catalog_list: CatalogListRef,
|
||||
_instance: InstanceRef,
|
||||
}
|
||||
|
||||
impl DataNode {
|
||||
pub fn new() -> Result<DataNode> {
|
||||
let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?;
|
||||
let instance = Arc::new(Instance::new(catalog_list.clone()));
|
||||
|
||||
Ok(Self {
|
||||
services: Services::new(instance.clone()),
|
||||
_catalog_list: catalog_list,
|
||||
_instance: instance,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
self.services.start().await
|
||||
}
|
||||
}
|
||||
@@ -1,8 +1,15 @@
|
||||
use hyper::Error as HyperError;
|
||||
use query::error::Error as QueryError;
|
||||
use snafu::Snafu;
|
||||
|
||||
/// business error of datanode.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(display("DataNode error"))]
|
||||
pub struct Error;
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Query error: {}", source))]
|
||||
Query { source: QueryError },
|
||||
#[snafu(display("Http error: {}", source))]
|
||||
Hyper { source: HyperError },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
73
src/datanode/src/instance.rs
Normal file
73
src/datanode/src/instance.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use query::catalog::CatalogListRef;
|
||||
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{QuerySnafu, Result};
|
||||
|
||||
// An abstraction to read/write services.
|
||||
pub struct Instance {
|
||||
// Query service
|
||||
query_engine: QueryEngineRef,
|
||||
// Catalog list
|
||||
_catalog_list: CatalogListRef,
|
||||
}
|
||||
|
||||
pub type InstanceRef = Arc<Instance>;
|
||||
|
||||
impl Instance {
|
||||
pub fn new(catalog_list: CatalogListRef) -> Self {
|
||||
let factory = QueryEngineFactory::new(catalog_list.clone());
|
||||
let query_engine = factory.query_engine().clone();
|
||||
Self {
|
||||
query_engine,
|
||||
_catalog_list: catalog_list,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
|
||||
let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?;
|
||||
|
||||
self.query_engine
|
||||
.execute(&logical_plan)
|
||||
.await
|
||||
.context(QuerySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::array::UInt64Array;
|
||||
use common_recordbatch::util;
|
||||
use query::catalog::memory;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_sql() {
|
||||
let catalog_list = memory::new_memory_catalog_list().unwrap();
|
||||
|
||||
let instance = Instance::new(catalog_list);
|
||||
|
||||
let output = instance
|
||||
.execute_sql("select sum(number) from numbers limit 20")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
match output {
|
||||
Output::RecordBatch(recordbatch) => {
|
||||
let numbers = util::collect(recordbatch).await.unwrap();
|
||||
let columns = numbers[0].df_recordbatch.columns();
|
||||
assert_eq!(1, columns.len());
|
||||
assert_eq!(columns[0].len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
*columns[0].as_any().downcast_ref::<UInt64Array>().unwrap(),
|
||||
UInt64Array::from_slice(&[4950])
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,21 +1,7 @@
|
||||
mod catalog;
|
||||
pub mod datanode;
|
||||
mod error;
|
||||
mod processors;
|
||||
mod rpc;
|
||||
mod instance;
|
||||
mod server;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::rpc::Services;
|
||||
|
||||
/// DataNode service.
|
||||
pub struct DataNode {
|
||||
services: Services,
|
||||
}
|
||||
|
||||
impl DataNode {
|
||||
/// Shutdown the datanode service gracefully.
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
self.services.shutdown().await?;
|
||||
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
pub use crate::datanode::DataNode;
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
use crate::error::Result;
|
||||
|
||||
/// All rpc services.
|
||||
pub struct Services {}
|
||||
|
||||
impl Services {
|
||||
pub async fn shutdown(&self) -> Result<()> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
24
src/datanode/src/server.rs
Normal file
24
src/datanode/src/server.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
mod grpc;
|
||||
mod http;
|
||||
|
||||
use http::HttpServer;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::instance::InstanceRef;
|
||||
|
||||
/// All rpc services.
|
||||
pub struct Services {
|
||||
http_server: HttpServer,
|
||||
}
|
||||
|
||||
impl Services {
|
||||
pub fn new(instance: InstanceRef) -> Self {
|
||||
Self {
|
||||
http_server: HttpServer::new(instance),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
self.http_server.start().await
|
||||
}
|
||||
}
|
||||
1
src/datanode/src/server/grpc.rs
Normal file
1
src/datanode/src/server/grpc.rs
Normal file
@@ -0,0 +1 @@
|
||||
mod processors;
|
||||
122
src/datanode/src/server/http.rs
Normal file
122
src/datanode/src/server/http.rs
Normal file
@@ -0,0 +1,122 @@
|
||||
mod handler;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::{
|
||||
error_handling::HandleErrorLayer,
|
||||
response::IntoResponse,
|
||||
response::{Json, Response},
|
||||
routing::get,
|
||||
BoxError, Extension, Router,
|
||||
};
|
||||
use common_recordbatch::{util, RecordBatch};
|
||||
use query::Output;
|
||||
use serde::Serialize;
|
||||
use snafu::ResultExt;
|
||||
use tower::{timeout::TimeoutLayer, ServiceBuilder};
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
use crate::error::{HyperSnafu, Result};
|
||||
use crate::server::InstanceRef;
|
||||
|
||||
/// Http server
|
||||
pub struct HttpServer {
|
||||
instance: InstanceRef,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub enum JsonOutput {
|
||||
AffectedRows(usize),
|
||||
Rows(Vec<RecordBatch>),
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct JsonResponse {
|
||||
success: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
error: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
output: Option<JsonOutput>,
|
||||
}
|
||||
|
||||
impl IntoResponse for JsonResponse {
|
||||
fn into_response(self) -> Response {
|
||||
Json(self).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
impl JsonResponse {
|
||||
fn with_error(error: Option<String>) -> Self {
|
||||
JsonResponse {
|
||||
success: false,
|
||||
error,
|
||||
output: None,
|
||||
}
|
||||
}
|
||||
fn with_output(output: Option<JsonOutput>) -> Self {
|
||||
JsonResponse {
|
||||
success: true,
|
||||
error: None,
|
||||
output,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a json response from query result
|
||||
async fn from_output(output: Result<Output>) -> Self {
|
||||
match output {
|
||||
Ok(Output::AffectedRows(rows)) => {
|
||||
Self::with_output(Some(JsonOutput::AffectedRows(rows)))
|
||||
}
|
||||
Ok(Output::RecordBatch(stream)) => match util::collect(stream).await {
|
||||
Ok(rows) => Self::with_output(Some(JsonOutput::Rows(rows))),
|
||||
Err(e) => Self::with_error(Some(format!("Recordbatch error: {}", e))),
|
||||
},
|
||||
Err(e) => Self::with_error(Some(format!("Query engine output error: {}", e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn shutdown_signal() {
|
||||
// Wait for the CTRL+C signal
|
||||
// It has an issue on chrome: https://github.com/sigp/lighthouse/issues/478
|
||||
tokio::signal::ctrl_c()
|
||||
.await
|
||||
.expect("failed to install CTRL+C signal handler");
|
||||
}
|
||||
|
||||
impl HttpServer {
|
||||
pub fn new(instance: InstanceRef) -> Self {
|
||||
Self { instance }
|
||||
}
|
||||
|
||||
pub async fn start(&self) -> Result<()> {
|
||||
let app = Router::new().route("/sql", get(handler::sql)).layer(
|
||||
ServiceBuilder::new()
|
||||
.layer(HandleErrorLayer::new(handle_error))
|
||||
.layer(TraceLayer::new_for_http())
|
||||
.layer(Extension(self.instance.clone()))
|
||||
// TODO configure timeout
|
||||
.layer(TimeoutLayer::new(Duration::from_secs(30))),
|
||||
);
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
|
||||
// TODO(dennis): log
|
||||
println!("Datanode HTTP server is listening on {}", addr);
|
||||
let server = axum::Server::bind(&addr).serve(app.into_make_service());
|
||||
let graceful = server.with_graceful_shutdown(shutdown_signal());
|
||||
|
||||
graceful.await.context(HyperSnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// handle error middleware
|
||||
async fn handle_error(err: BoxError) -> Json<JsonResponse> {
|
||||
Json(JsonResponse {
|
||||
success: false,
|
||||
error: Some(format!("Unhandled internal error: {}", err)),
|
||||
output: None,
|
||||
})
|
||||
}
|
||||
75
src/datanode/src/server/http/handler.rs
Normal file
75
src/datanode/src/server/http/handler.rs
Normal file
@@ -0,0 +1,75 @@
|
||||
// http handlers
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use axum::extract::{Extension, Query};
|
||||
|
||||
use crate::instance::InstanceRef;
|
||||
use crate::server::http::JsonResponse;
|
||||
|
||||
/// Handler to execute sql
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn sql(
|
||||
Extension(instance): Extension<InstanceRef>,
|
||||
Query(params): Query<HashMap<String, String>>,
|
||||
) -> JsonResponse {
|
||||
if let Some(sql) = params.get("sql") {
|
||||
JsonResponse::from_output(instance.execute_sql(sql).await).await
|
||||
} else {
|
||||
JsonResponse::with_error(Some("sql parameter is required.".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use query::catalog::memory;
|
||||
|
||||
use super::*;
|
||||
use crate::instance::Instance;
|
||||
use crate::server::http::JsonOutput;
|
||||
|
||||
fn create_params() -> Query<HashMap<String, String>> {
|
||||
let mut map = HashMap::new();
|
||||
map.insert(
|
||||
"sql".to_string(),
|
||||
"select sum(number) from numbers limit 20".to_string(),
|
||||
);
|
||||
Query(map)
|
||||
}
|
||||
|
||||
fn create_extension() -> Extension<InstanceRef> {
|
||||
let catalog_list = memory::new_memory_catalog_list().unwrap();
|
||||
let instance = Arc::new(Instance::new(catalog_list));
|
||||
Extension(instance)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sql_not_provided() {
|
||||
let extension = create_extension();
|
||||
|
||||
let json = sql(extension, Query(HashMap::default())).await;
|
||||
assert!(!json.success);
|
||||
assert_eq!(Some("sql parameter is required.".to_string()), json.error);
|
||||
assert!(json.output.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sql_output_rows() {
|
||||
let query = create_params();
|
||||
let extension = create_extension();
|
||||
|
||||
let json = sql(extension, query).await;
|
||||
assert!(json.success);
|
||||
assert!(json.error.is_none());
|
||||
assert!(json.output.is_some());
|
||||
|
||||
match json.output.unwrap() {
|
||||
JsonOutput::Rows(rows) => {
|
||||
assert_eq!(1, rows.len());
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,12 @@ name = "datatypes"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies.arrow]
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
[dependencies]
|
||||
arrow2 = "0.10"
|
||||
common-base = { path = "../common/base" }
|
||||
paste = "1.0"
|
||||
serde ={ version = "1.0.136", features = ["derive"] }
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::datatypes::DataType as ArrowDataType;
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
|
||||
use crate::type_id::LogicalTypeId;
|
||||
use crate::value::Value;
|
||||
|
||||
@@ -8,7 +8,7 @@ mod types;
|
||||
pub mod value;
|
||||
pub mod vectors;
|
||||
|
||||
use arrow2::array::{BinaryArray, MutableBinaryArray};
|
||||
use arrow::array::{BinaryArray, MutableBinaryArray};
|
||||
|
||||
pub type LargeBinaryArray = BinaryArray<i64>;
|
||||
pub type MutableLargeBinaryArray = MutableBinaryArray<i64>;
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::datatypes::Schema as ArrowSchema;
|
||||
use arrow::datatypes::Schema as ArrowSchema;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct Schema {
|
||||
arrow_schema: Arc<ArrowSchema>,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::datatypes::DataType as ArrowDataType;
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use common_base::bytes::StringBytes;
|
||||
|
||||
use crate::data_type::{DataType, DataTypeRef};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use arrow2::types::NativeType;
|
||||
use arrow::types::NativeType;
|
||||
|
||||
use crate::value::Value;
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::datatypes::DataType as ArrowDataType;
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
|
||||
use crate::data_type::{DataType, DataTypeRef};
|
||||
use crate::type_id::LogicalTypeId;
|
||||
|
||||
@@ -4,7 +4,7 @@ pub mod primitive;
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::array::ArrayRef;
|
||||
use arrow::array::ArrayRef;
|
||||
pub use binary::*;
|
||||
pub use primitive::*;
|
||||
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::array::ArrayRef;
|
||||
use arrow2::array::BinaryValueIter;
|
||||
use arrow2::bitmap::utils::ZipValidity;
|
||||
use arrow::array::ArrayRef;
|
||||
use arrow::array::BinaryValueIter;
|
||||
use arrow::bitmap::utils::ZipValidity;
|
||||
|
||||
use crate::data_type::DataTypeRef;
|
||||
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
|
||||
@@ -2,8 +2,8 @@ use std::any::Any;
|
||||
use std::slice::Iter;
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow2::array::{ArrayRef, MutablePrimitiveArray, PrimitiveArray};
|
||||
use arrow2::bitmap::utils::ZipValidity;
|
||||
use arrow::array::{ArrayRef, MutablePrimitiveArray, PrimitiveArray};
|
||||
use arrow::bitmap::utils::ZipValidity;
|
||||
|
||||
use crate::data_type::DataTypeRef;
|
||||
use crate::scalars::{ScalarVector, ScalarVectorBuilder};
|
||||
|
||||
@@ -6,7 +6,7 @@ edition = "2021"
|
||||
[dependencies.arrow]
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
@@ -21,5 +21,5 @@ tokio = "1.0"
|
||||
sql = { path = "../sql" }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
tokio-stream = "0.1"
|
||||
|
||||
@@ -38,3 +38,9 @@ pub trait CatalogProvider: Sync + Send {
|
||||
/// Retrieves a specific schema from the catalog by name, provided it exists.
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
|
||||
}
|
||||
|
||||
pub type CatalogListRef = Arc<dyn CatalogList>;
|
||||
pub type CatalogProviderRef = Arc<dyn CatalogProvider>;
|
||||
|
||||
pub const DEFAULT_CATALOG_NAME: &str = "greptime";
|
||||
pub const DEFAULT_SCHEMA_NAME: &str = "public";
|
||||
|
||||
@@ -3,14 +3,21 @@ use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use table::table::numbers::NumbersTable;
|
||||
use table::TableRef;
|
||||
|
||||
use crate::catalog::schema::SchemaProvider;
|
||||
use crate::catalog::{CatalogList, CatalogProvider};
|
||||
use crate::catalog::{
|
||||
CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use crate::error::{ExecutionSnafu, Result};
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
#[derive(Default)]
|
||||
pub struct MemoryCatalogList {
|
||||
/// Collection of catalogs containing schemas and ultimately TableProviders
|
||||
pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
|
||||
/// Collection of catalogs containing schemas and ultimately Tables
|
||||
pub catalogs: RwLock<HashMap<String, CatalogProviderRef>>,
|
||||
}
|
||||
|
||||
impl CatalogList for MemoryCatalogList {
|
||||
@@ -21,8 +28,8 @@ impl CatalogList for MemoryCatalogList {
|
||||
fn register_catalog(
|
||||
&self,
|
||||
name: String,
|
||||
catalog: Arc<dyn CatalogProvider>,
|
||||
) -> Option<Arc<dyn CatalogProvider>> {
|
||||
catalog: CatalogProviderRef,
|
||||
) -> Option<CatalogProviderRef> {
|
||||
let mut catalogs = self.catalogs.write().unwrap();
|
||||
catalogs.insert(name, catalog)
|
||||
}
|
||||
@@ -32,7 +39,7 @@ impl CatalogList for MemoryCatalogList {
|
||||
catalogs.keys().map(|s| s.to_string()).collect()
|
||||
}
|
||||
|
||||
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
|
||||
fn catalog(&self, name: &str) -> Option<CatalogProviderRef> {
|
||||
let catalogs = self.catalogs.read().unwrap();
|
||||
catalogs.get(name).cloned()
|
||||
}
|
||||
@@ -82,3 +89,113 @@ impl CatalogProvider for MemoryCatalogProvider {
|
||||
schemas.get(name).cloned()
|
||||
}
|
||||
}
|
||||
|
||||
/// Simple in-memory implementation of a schema.
|
||||
pub struct MemorySchemaProvider {
|
||||
tables: RwLock<HashMap<String, TableRef>>,
|
||||
}
|
||||
|
||||
impl MemorySchemaProvider {
|
||||
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
tables: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MemorySchemaProvider {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl SchemaProvider for MemorySchemaProvider {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn table_names(&self) -> Vec<String> {
|
||||
let tables = self.tables.read().unwrap();
|
||||
tables.keys().cloned().collect()
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> Option<TableRef> {
|
||||
let tables = self.tables.read().unwrap();
|
||||
tables.get(name).cloned()
|
||||
}
|
||||
|
||||
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
|
||||
if self.table_exist(name.as_str()) {
|
||||
return ExecutionSnafu {
|
||||
message: format!("The table {} already exists", name),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
Ok(tables.insert(name, table))
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
Ok(tables.remove(name))
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> bool {
|
||||
let tables = self.tables.read().unwrap();
|
||||
tables.contains_key(name)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a memory catalog list contains a numbers table for test
|
||||
pub fn new_memory_catalog_list() -> Result<CatalogListRef> {
|
||||
let schema_provider = Arc::new(MemorySchemaProvider::new());
|
||||
let catalog_provider = Arc::new(MemoryCatalogProvider::new());
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
|
||||
// Add numbers table for test
|
||||
let table = Arc::new(NumbersTable::default());
|
||||
schema_provider.register_table("numbers".to_string(), table)?;
|
||||
catalog_provider.register_schema(DEFAULT_SCHEMA_NAME, schema_provider);
|
||||
catalog_list.register_catalog(DEFAULT_CATALOG_NAME.to_string(), catalog_provider);
|
||||
|
||||
Ok(catalog_list)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_new_memory_catalog_list() {
|
||||
let catalog_list = new_memory_catalog_list().unwrap();
|
||||
|
||||
let catalog_provider = catalog_list.catalog(DEFAULT_CATALOG_NAME).unwrap();
|
||||
let schema_provider = catalog_provider.schema(DEFAULT_SCHEMA_NAME).unwrap();
|
||||
|
||||
let table = schema_provider.table("numbers");
|
||||
assert!(table.is_some());
|
||||
|
||||
assert!(schema_provider.table("not_exists").is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_provider() {
|
||||
let provider = MemorySchemaProvider::new();
|
||||
let table_name = "numbers";
|
||||
assert!(!provider.table_exist(table_name));
|
||||
assert!(provider.deregister_table(table_name).unwrap().is_none());
|
||||
let test_table = NumbersTable::default();
|
||||
// register table successfully
|
||||
assert!(provider
|
||||
.register_table(table_name.to_string(), Arc::new(test_table))
|
||||
.unwrap()
|
||||
.is_none());
|
||||
assert!(provider.table_exist(table_name));
|
||||
let other_table = NumbersTable::default();
|
||||
let result = provider.register_table(table_name.to_string(), Arc::new(other_table));
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use table::TableRef;
|
||||
|
||||
@@ -33,3 +34,5 @@ pub trait SchemaProvider: Sync + Send {
|
||||
/// Otherwise, return true.
|
||||
fn table_exist(&self, name: &str) -> bool;
|
||||
}
|
||||
|
||||
pub type SchemaProviderRef = Arc<dyn SchemaProvider>;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use common_recordbatch::error::Error as RecordBatchError;
|
||||
use datafusion::error::DataFusionError;
|
||||
use snafu::Snafu;
|
||||
use sql::errors::ParserError;
|
||||
|
||||
/// business error of query engine
|
||||
#[derive(Debug, Snafu)]
|
||||
@@ -12,6 +13,15 @@ pub enum Error {
|
||||
PhysicalPlanDowncast,
|
||||
#[snafu(display("RecordBatch error: {}", source))]
|
||||
RecordBatch { source: RecordBatchError },
|
||||
#[snafu(display("Execution error: {}", message))]
|
||||
Execution { message: String },
|
||||
#[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))]
|
||||
ParseSql { sql: String, source: ParserError },
|
||||
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
|
||||
Planner {
|
||||
sql: String,
|
||||
source: DataFusionError,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -21,13 +31,3 @@ impl From<Error> for DataFusionError {
|
||||
DataFusionError::External(Box::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum PlannerError {
|
||||
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
|
||||
DfPlan {
|
||||
sql: String,
|
||||
source: DataFusionError,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -8,3 +8,7 @@ pub mod physical_planner;
|
||||
pub mod plan;
|
||||
pub mod planner;
|
||||
pub mod query_engine;
|
||||
|
||||
pub use crate::query_engine::{
|
||||
Output, QueryContext, QueryEngine, QueryEngineFactory, QueryEngineRef,
|
||||
};
|
||||
|
||||
@@ -16,7 +16,7 @@ use crate::executor::Runtime;
|
||||
/// an output relation (table) with a (potentially) different
|
||||
/// schema. A plan represents a dataflow tree where data flows
|
||||
/// from leaves up to the root to produce the query result.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum LogicalPlan {
|
||||
DfPlan(DfLogicalPlan),
|
||||
}
|
||||
|
||||
@@ -1,23 +1,31 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::datatypes::DataType;
|
||||
use datafusion::catalog::TableReference;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion::physical_plan::udaf::AggregateUDF;
|
||||
use datafusion::physical_plan::udf::ScalarUDF;
|
||||
use datafusion::sql::planner::{ContextProvider, SqlToRel};
|
||||
use snafu::ResultExt;
|
||||
use sql::statements::query::Query;
|
||||
use sql::statements::statement::Statement;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::error;
|
||||
use crate::error::PlannerError;
|
||||
use crate::plan::LogicalPlan;
|
||||
use crate::{
|
||||
catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME},
|
||||
error::{PlannerSnafu, Result},
|
||||
plan::LogicalPlan,
|
||||
};
|
||||
|
||||
pub trait Planner {
|
||||
pub trait Planner: Send + Sync {
|
||||
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan>;
|
||||
}
|
||||
|
||||
type Result<T> = std::result::Result<T, PlannerError>;
|
||||
|
||||
pub struct DfPlanner<'a, S: ContextProvider> {
|
||||
sql_to_rel: SqlToRel<'a, S>,
|
||||
}
|
||||
|
||||
impl<'a, S: ContextProvider> DfPlanner<'a, S> {
|
||||
impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
|
||||
/// Creates a DataFusion planner instance
|
||||
pub fn new(schema_provider: &'a S) -> Self {
|
||||
let rel = SqlToRel::new(schema_provider);
|
||||
@@ -31,7 +39,7 @@ impl<'a, S: ContextProvider> DfPlanner<'a, S> {
|
||||
let result = self
|
||||
.sql_to_rel
|
||||
.query_to_plan(query.inner)
|
||||
.context(error::DfPlanSnafu { sql })?;
|
||||
.context(PlannerSnafu { sql })?;
|
||||
|
||||
Ok(LogicalPlan::DfPlan(result))
|
||||
}
|
||||
@@ -39,7 +47,7 @@ impl<'a, S: ContextProvider> DfPlanner<'a, S> {
|
||||
|
||||
impl<'a, S> Planner for DfPlanner<'a, S>
|
||||
where
|
||||
S: ContextProvider,
|
||||
S: ContextProvider + Send + Sync,
|
||||
{
|
||||
/// Converts statement to logical plan using datafusion planner
|
||||
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
|
||||
@@ -54,3 +62,48 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct DfContextProviderAdapter<'a> {
|
||||
catalog_list: &'a CatalogListRef,
|
||||
}
|
||||
|
||||
impl<'a> DfContextProviderAdapter<'a> {
|
||||
pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self {
|
||||
Self { catalog_list }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ContextProvider for DfContextProviderAdapter<'a> {
|
||||
fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
|
||||
let (catalog, schema, table) = match name {
|
||||
TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table),
|
||||
TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table),
|
||||
TableReference::Full {
|
||||
catalog,
|
||||
schema,
|
||||
table,
|
||||
} => (catalog, schema, table),
|
||||
};
|
||||
|
||||
self.catalog_list
|
||||
.catalog(catalog)
|
||||
.and_then(|catalog_provider| catalog_provider.schema(schema))
|
||||
.and_then(|schema_provider| schema_provider.table(table))
|
||||
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
|
||||
}
|
||||
|
||||
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
|
||||
// TODO(dennis)
|
||||
None
|
||||
}
|
||||
|
||||
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
|
||||
// TODO(dennis)
|
||||
None
|
||||
}
|
||||
|
||||
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
|
||||
// TODO(dennis)
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,22 +1,28 @@
|
||||
mod context;
|
||||
mod datafusion;
|
||||
mod state;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
pub use context::QueryContext;
|
||||
|
||||
use crate::catalog::CatalogList;
|
||||
use crate::error::Result;
|
||||
use crate::plan::LogicalPlan;
|
||||
|
||||
mod context;
|
||||
mod datafusion;
|
||||
mod state;
|
||||
pub use context::QueryContext;
|
||||
|
||||
use crate::query_engine::datafusion::DatafusionQueryEngine;
|
||||
|
||||
/// Sql output
|
||||
pub enum Output {
|
||||
AffectedRows(usize),
|
||||
RecordBatch(SendableRecordBatchStream),
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait QueryEngine {
|
||||
pub trait QueryEngine: Send + Sync {
|
||||
fn name(&self) -> &str;
|
||||
async fn execute(&self, plan: &LogicalPlan) -> Result<SendableRecordBatchStream>;
|
||||
fn sql_to_plan(&self, sql: &str) -> Result<LogicalPlan>;
|
||||
async fn execute(&self, plan: &LogicalPlan) -> Result<Output>;
|
||||
}
|
||||
|
||||
pub struct QueryEngineFactory {
|
||||
@@ -36,3 +42,21 @@ impl QueryEngineFactory {
|
||||
&self.query_engine
|
||||
}
|
||||
}
|
||||
|
||||
pub type QueryEngineRef = Arc<dyn QueryEngine>;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::catalog::memory;
|
||||
|
||||
#[test]
|
||||
fn test_query_engine_factory() {
|
||||
let catalog_list = memory::new_memory_catalog_list().unwrap();
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
|
||||
let engine = factory.query_engine();
|
||||
|
||||
assert_eq!("datafusion", engine.name());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,30 +1,33 @@
|
||||
mod adapter;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use sql::{dialect::GenericDialect, parser::ParserContext};
|
||||
|
||||
use super::{context::QueryContext, state::QueryEngineState};
|
||||
use crate::{
|
||||
catalog::CatalogList,
|
||||
error::{self, Result},
|
||||
catalog::CatalogListRef,
|
||||
error::{self, ParseSqlSnafu, Result},
|
||||
executor::QueryExecutor,
|
||||
logical_optimizer::LogicalOptimizer,
|
||||
physical_optimizer::PhysicalOptimizer,
|
||||
physical_planner::PhysicalPlanner,
|
||||
plan::{LogicalPlan, PhysicalPlan},
|
||||
planner::{DfContextProviderAdapter, DfPlanner, Planner},
|
||||
query_engine::datafusion::adapter::PhysicalPlanAdapter,
|
||||
query_engine::QueryEngine,
|
||||
query_engine::{Output, QueryEngine},
|
||||
};
|
||||
mod adapter;
|
||||
|
||||
pub(crate) struct DatafusionQueryEngine {
|
||||
state: QueryEngineState,
|
||||
}
|
||||
|
||||
impl DatafusionQueryEngine {
|
||||
pub fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
|
||||
pub fn new(catalog_list: CatalogListRef) -> Self {
|
||||
Self {
|
||||
state: QueryEngineState::new(catalog_list),
|
||||
state: QueryEngineState::new(catalog_list.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -34,13 +37,28 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
fn name(&self) -> &str {
|
||||
"datafusion"
|
||||
}
|
||||
async fn execute(&self, plan: &LogicalPlan) -> Result<SendableRecordBatchStream> {
|
||||
|
||||
fn sql_to_plan(&self, sql: &str) -> Result<LogicalPlan> {
|
||||
let context_provider = DfContextProviderAdapter::new(self.state.catalog_list());
|
||||
let planner = DfPlanner::new(&context_provider);
|
||||
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
|
||||
.with_context(|_| ParseSqlSnafu {
|
||||
sql: sql.to_string(),
|
||||
})?;
|
||||
// TODO(dennis): supports multi statement in one sql?
|
||||
assert!(1 == statement.len());
|
||||
planner.statement_to_plan(statement.remove(0))
|
||||
}
|
||||
|
||||
async fn execute(&self, plan: &LogicalPlan) -> Result<Output> {
|
||||
let mut ctx = QueryContext::new(self.state.clone());
|
||||
let logical_plan = self.optimize_logical_plan(&mut ctx, plan)?;
|
||||
let physical_plan = self.create_physical_plan(&mut ctx, &logical_plan).await?;
|
||||
let physical_plan = self.optimize_physical_plan(&mut ctx, physical_plan)?;
|
||||
|
||||
Ok(self.execute_stream(&ctx, &physical_plan).await?)
|
||||
Ok(Output::RecordBatch(
|
||||
self.execute_stream(&ctx, &physical_plan).await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,3 +148,69 @@ impl QueryExecutor for DatafusionQueryEngine {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use arrow::array::UInt64Array;
|
||||
use common_recordbatch::util;
|
||||
use datafusion::field_util::FieldExt;
|
||||
use datafusion::field_util::SchemaExt;
|
||||
|
||||
use crate::catalog::memory;
|
||||
use crate::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
|
||||
|
||||
fn create_test_engine() -> QueryEngineRef {
|
||||
let catalog_list = memory::new_memory_catalog_list().unwrap();
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
factory.query_engine().clone()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sql_to_plan() {
|
||||
let engine = create_test_engine();
|
||||
let sql = "select sum(number) from numbers limit 20";
|
||||
|
||||
let plan = engine.sql_to_plan(sql).unwrap();
|
||||
|
||||
println!("{:?}", plan);
|
||||
assert_eq!(
|
||||
format!("{:?}", plan),
|
||||
r#"DfPlan(Limit: 20
|
||||
Projection: #SUM(numbers.number)
|
||||
Aggregate: groupBy=[[]], aggr=[[SUM(#numbers.number)]]
|
||||
TableScan: numbers projection=None)"#
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute() {
|
||||
let engine = create_test_engine();
|
||||
let sql = "select sum(number) from numbers limit 20";
|
||||
|
||||
let plan = engine.sql_to_plan(sql).unwrap();
|
||||
let output = engine.execute(&plan).await.unwrap();
|
||||
|
||||
match output {
|
||||
Output::RecordBatch(recordbatch) => {
|
||||
let numbers = util::collect(recordbatch).await.unwrap();
|
||||
assert_eq!(1, numbers.len());
|
||||
assert_eq!(numbers[0].df_recordbatch.num_columns(), 1);
|
||||
assert_eq!(1, numbers[0].schema.arrow_schema().fields().len());
|
||||
assert_eq!(
|
||||
"SUM(numbers.number)",
|
||||
numbers[0].schema.arrow_schema().field(0).name()
|
||||
);
|
||||
|
||||
let columns = numbers[0].df_recordbatch.columns();
|
||||
assert_eq!(1, columns.len());
|
||||
assert_eq!(columns[0].len(), 1);
|
||||
|
||||
assert_eq!(
|
||||
*columns[0].as_any().downcast_ref::<UInt64Array>().unwrap(),
|
||||
UInt64Array::from_slice(&[4950])
|
||||
);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,18 +16,18 @@ use table::{
|
||||
Table,
|
||||
};
|
||||
|
||||
use crate::catalog::{schema::SchemaProvider, CatalogList, CatalogProvider};
|
||||
use crate::catalog::{
|
||||
schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use crate::error::{self, Result};
|
||||
use crate::executor::Runtime;
|
||||
|
||||
const DEFAULT_CATALOG_NAME: &str = "greptime";
|
||||
const DEFAULT_SCHEMA_NAME: &str = "public";
|
||||
|
||||
/// Query engine global state
|
||||
#[derive(Clone)]
|
||||
pub struct QueryEngineState {
|
||||
df_context: ExecutionContext,
|
||||
catalog_list: Arc<dyn CatalogList>,
|
||||
catalog_list: CatalogListRef,
|
||||
}
|
||||
|
||||
impl fmt::Debug for QueryEngineState {
|
||||
@@ -38,7 +38,7 @@ impl fmt::Debug for QueryEngineState {
|
||||
}
|
||||
|
||||
impl QueryEngineState {
|
||||
pub(crate) fn new(catalog_list: Arc<dyn CatalogList>) -> Self {
|
||||
pub(crate) fn new(catalog_list: CatalogListRef) -> Self {
|
||||
let config = ExecutionConfig::new()
|
||||
.with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
|
||||
let df_context = ExecutionContext::with_config(config);
|
||||
@@ -54,6 +54,11 @@ impl QueryEngineState {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn catalog_list(&self) -> &CatalogListRef {
|
||||
&self.catalog_list
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub(crate) fn df_context(&self) -> &ExecutionContext {
|
||||
&self.df_context
|
||||
@@ -63,19 +68,12 @@ impl QueryEngineState {
|
||||
pub(crate) fn runtime(&self) -> Runtime {
|
||||
self.df_context.runtime_env().into()
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
|
||||
self.catalog_list
|
||||
.catalog(DEFAULT_CATALOG_NAME)
|
||||
.and_then(|c| c.schema(schema_name))
|
||||
}
|
||||
}
|
||||
|
||||
/// Adapters between datafusion and greptime query engine.
|
||||
struct DfCatalogListAdapter {
|
||||
runtime: Arc<RuntimeEnv>,
|
||||
catalog_list: Arc<dyn CatalogList>,
|
||||
catalog_list: CatalogListRef,
|
||||
}
|
||||
|
||||
impl DfCatalogList for DfCatalogListAdapter {
|
||||
@@ -92,13 +90,14 @@ impl DfCatalogList for DfCatalogListAdapter {
|
||||
df_cataglog_provider: catalog,
|
||||
runtime: self.runtime.clone(),
|
||||
});
|
||||
match self.catalog_list.register_catalog(name, catalog_adapter) {
|
||||
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter {
|
||||
catalog_provider,
|
||||
runtime: self.runtime.clone(),
|
||||
})),
|
||||
None => None,
|
||||
}
|
||||
self.catalog_list
|
||||
.register_catalog(name, catalog_adapter)
|
||||
.map(|catalog_provider| {
|
||||
Arc::new(DfCatalogProviderAdapter {
|
||||
catalog_provider,
|
||||
runtime: self.runtime.clone(),
|
||||
}) as _
|
||||
})
|
||||
}
|
||||
|
||||
fn catalog_names(&self) -> Vec<String> {
|
||||
@@ -106,13 +105,12 @@ impl DfCatalogList for DfCatalogListAdapter {
|
||||
}
|
||||
|
||||
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
|
||||
match self.catalog_list.catalog(name) {
|
||||
Some(catalog_provider) => Some(Arc::new(DfCatalogProviderAdapter {
|
||||
self.catalog_list.catalog(name).map(|catalog_provider| {
|
||||
Arc::new(DfCatalogProviderAdapter {
|
||||
catalog_provider,
|
||||
runtime: self.runtime.clone(),
|
||||
})),
|
||||
None => None,
|
||||
}
|
||||
}) as _
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -132,13 +130,14 @@ impl CatalogProvider for CatalogProviderAdapter {
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
|
||||
match self.df_cataglog_provider.schema(name) {
|
||||
Some(df_schema_provider) => Some(Arc::new(SchemaProviderAdapter {
|
||||
df_schema_provider,
|
||||
runtime: self.runtime.clone(),
|
||||
})),
|
||||
None => None,
|
||||
}
|
||||
self.df_cataglog_provider
|
||||
.schema(name)
|
||||
.map(|df_schema_provider| {
|
||||
Arc::new(SchemaProviderAdapter {
|
||||
df_schema_provider,
|
||||
runtime: self.runtime.clone(),
|
||||
}) as _
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -158,13 +157,12 @@ impl DfCatalogProvider for DfCatalogProviderAdapter {
|
||||
}
|
||||
|
||||
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
|
||||
match self.catalog_provider.schema(name) {
|
||||
Some(schema_provider) => Some(Arc::new(DfSchemaProviderAdapter {
|
||||
self.catalog_provider.schema(name).map(|schema_provider| {
|
||||
Arc::new(DfSchemaProviderAdapter {
|
||||
schema_provider,
|
||||
runtime: self.runtime.clone(),
|
||||
})),
|
||||
None => None,
|
||||
}
|
||||
}) as _
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -184,10 +182,9 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
|
||||
match self.schema_provider.table(name) {
|
||||
Some(table) => Some(Arc::new(DfTableProviderAdapter::new(table))),
|
||||
None => None,
|
||||
}
|
||||
self.schema_provider
|
||||
.table(name)
|
||||
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
|
||||
}
|
||||
|
||||
fn register_table(
|
||||
@@ -233,13 +230,9 @@ impl SchemaProvider for SchemaProviderAdapter {
|
||||
}
|
||||
|
||||
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
|
||||
match self.df_schema_provider.table(name) {
|
||||
Some(table_provider) => Some(Arc::new(TableAdapter::new(
|
||||
table_provider,
|
||||
self.runtime.clone(),
|
||||
))),
|
||||
None => None,
|
||||
}
|
||||
self.df_schema_provider.table(name).map(|table_provider| {
|
||||
Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _
|
||||
})
|
||||
}
|
||||
|
||||
fn register_table(
|
||||
@@ -248,31 +241,19 @@ impl SchemaProvider for SchemaProviderAdapter {
|
||||
table: Arc<dyn Table>,
|
||||
) -> Result<Option<Arc<dyn Table>>> {
|
||||
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
|
||||
match self
|
||||
Ok(self
|
||||
.df_schema_provider
|
||||
.register_table(name, table_provider)
|
||||
.context(error::DatafusionSnafu)?
|
||||
{
|
||||
Some(table) => Ok(Some(Arc::new(TableAdapter::new(
|
||||
table,
|
||||
self.runtime.clone(),
|
||||
)))),
|
||||
None => Ok(None),
|
||||
}
|
||||
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
|
||||
match self
|
||||
Ok(self
|
||||
.df_schema_provider
|
||||
.deregister_table(name)
|
||||
.context(error::DatafusionSnafu)?
|
||||
{
|
||||
Some(table) => Ok(Some(Arc::new(TableAdapter::new(
|
||||
table,
|
||||
self.runtime.clone(),
|
||||
)))),
|
||||
None => Ok(None),
|
||||
}
|
||||
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
|
||||
}
|
||||
|
||||
fn table_exist(&self, name: &str) -> bool {
|
||||
|
||||
@@ -1,22 +1,20 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::array::UInt32Array;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use common_recordbatch::util;
|
||||
use datafusion::field_util::FieldExt;
|
||||
use datafusion::field_util::SchemaExt;
|
||||
use datafusion::logical_plan::LogicalPlanBuilder;
|
||||
use futures_util::stream::TryStreamExt;
|
||||
use query::catalog::memory::MemoryCatalogList;
|
||||
use query::error::{RecordBatchSnafu, Result};
|
||||
use query::catalog::memory;
|
||||
use query::error::Result;
|
||||
use query::plan::LogicalPlan;
|
||||
use query::query_engine::QueryEngineFactory;
|
||||
use snafu::ResultExt;
|
||||
use query::query_engine::{Output, QueryEngineFactory};
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
use table::table::numbers::NumbersTable;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_datafusion_query_engine() -> Result<()> {
|
||||
let catalog_list = Arc::new(MemoryCatalogList::default());
|
||||
let catalog_list = memory::new_memory_catalog_list()?;
|
||||
let factory = QueryEngineFactory::new(catalog_list);
|
||||
let engine = factory.query_engine();
|
||||
|
||||
@@ -32,9 +30,14 @@ async fn test_datafusion_query_engine() -> Result<()> {
|
||||
.unwrap(),
|
||||
);
|
||||
|
||||
let ret = engine.execute(&plan).await;
|
||||
let output = engine.execute(&plan).await?;
|
||||
|
||||
let numbers = collect(ret.unwrap()).await.unwrap();
|
||||
let recordbatch = match output {
|
||||
Output::RecordBatch(recordbach) => recordbach,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let numbers = util::collect(recordbatch).await.unwrap();
|
||||
|
||||
assert_eq!(1, numbers.len());
|
||||
assert_eq!(numbers[0].df_recordbatch.num_columns(), 1);
|
||||
@@ -52,10 +55,3 @@ async fn test_datafusion_query_engine() -> Result<()> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
|
||||
stream
|
||||
.try_collect::<Vec<_>>()
|
||||
.await
|
||||
.context(RecordBatchSnafu)
|
||||
}
|
||||
|
||||
@@ -1 +1,3 @@
|
||||
// todo(hl) wrap sqlparser dialects
|
||||
|
||||
pub use sqlparser::dialect::{Dialect, GenericDialect};
|
||||
|
||||
@@ -6,7 +6,7 @@ edition = "2021"
|
||||
[dependencies.arrow]
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
pub mod adapter;
|
||||
pub mod numbers;
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
@@ -10,9 +13,6 @@ use datatypes::schema::{Schema, SchemaRef};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub mod adapter;
|
||||
pub mod numbers;
|
||||
|
||||
pub type TableId = u64;
|
||||
pub type TableVersion = u64;
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use crate::error::Result;
|
||||
use crate::table::{Expr, Table};
|
||||
|
||||
/// numbers table for test
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct NumbersTable {
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user