Merge pull request #14 from GrepTimeTeam/refactor-error

refactor: Refactor error usage
This commit is contained in:
dennis zhuang
2022-05-09 14:17:52 +08:00
committed by GitHub
38 changed files with 1339 additions and 502 deletions

57
Cargo.lock generated
View File

@@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "addr2line"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
@@ -223,6 +232,21 @@ dependencies = [
"syn",
]
[[package]]
name = "backtrace"
version = "0.3.65"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61"
dependencies = [
"addr2line",
"cc",
"cfg-if",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.13.0"
@@ -433,6 +457,13 @@ dependencies = [
name = "common-base"
version = "0.1.0"
[[package]]
name = "common-error"
version = "0.1.0"
dependencies = [
"snafu",
]
[[package]]
name = "common-query"
version = "0.1.0"
@@ -620,6 +651,7 @@ dependencies = [
"arrow2",
"axum",
"axum-macros",
"common-error",
"common-recordbatch",
"hyper",
"query",
@@ -818,6 +850,12 @@ dependencies = [
"wasi 0.10.2+wasi-snapshot-preview1",
]
[[package]]
name = "gimli"
version = "0.26.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
[[package]]
name = "h2"
version = "0.3.13"
@@ -1300,6 +1338,15 @@ dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456"
dependencies = [
"memchr",
]
[[package]]
name = "object-store"
version = "0.1.0"
@@ -1486,6 +1533,7 @@ version = "0.1.0"
dependencies = [
"arrow2",
"async-trait",
"common-error",
"common-recordbatch",
"datafusion",
"datatypes",
@@ -1578,6 +1626,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "rustc-demangle"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
[[package]]
name = "rustversion"
version = "1.0.6"
@@ -1684,6 +1738,7 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eba135d2c579aa65364522eb78590cdf703176ef71ad4c32b00f58f7afb2df5"
dependencies = [
"backtrace",
"doc-comment",
"snafu-derive",
]
@@ -1720,6 +1775,7 @@ dependencies = [
name = "sql"
version = "0.1.0"
dependencies = [
"common-error",
"snafu",
"sqlparser",
]
@@ -1815,6 +1871,7 @@ dependencies = [
"arrow2",
"async-trait",
"chrono",
"common-error",
"common-query",
"common-recordbatch",
"datafusion",

View File

@@ -1,6 +1,7 @@
[workspace]
members = [
"src/common/base",
"src/common/error",
"src/common/query",
"src/common/recordbatch",
"src/cmd",

View File

@@ -0,0 +1,9 @@
[package]
name = "common-error"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
snafu = { version = "0.7", features = ["backtraces"] }

185
src/common/error/src/ext.rs Normal file
View File

@@ -0,0 +1,185 @@
use std::any::Any;
use crate::status_code::StatusCode;
/// Extension to [`Error`](std::error::Error) in std.
pub trait ErrorExt: std::error::Error {
/// Map this error to [StatusCode].
fn status_code(&self) -> StatusCode {
StatusCode::Unknown
}
/// Get the reference to the backtrace of this error, None if the backtrace is unavailable.
// Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`, once backtrace
// in std is stable, we can deprecate this method.
fn backtrace_opt(&self) -> Option<&crate::snafu::Backtrace>;
/// Returns the error as [Any](std::any::Any) so that it can be
/// downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;
}
/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait.
#[macro_export]
macro_rules! define_opaque_error {
($Error:ident) => {
/// An error behaves like `Box<dyn Error>`.
///
/// Define this error as a new type instead of using `Box<dyn Error>` directly so we can implement
/// more methods or traits for it.
pub struct $Error {
inner: Box<dyn $crate::ext::ErrorExt + Send + Sync>,
}
impl $Error {
pub fn new<E: $crate::ext::ErrorExt + Send + Sync + 'static>(err: E) -> Self {
Self {
inner: Box::new(err),
}
}
}
impl std::fmt::Debug for $Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Use the pretty debug format of inner error for opaque error.
let debug_format = $crate::format::DebugFormat::new(&*self.inner);
debug_format.fmt(f)
}
}
impl std::fmt::Display for $Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
}
}
impl std::error::Error for $Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.inner.source()
}
}
impl $crate::ext::ErrorExt for $Error {
fn status_code(&self) -> $crate::status_code::StatusCode {
self.inner.status_code()
}
fn backtrace_opt(&self) -> Option<&$crate::snafu::Backtrace> {
self.inner.backtrace_opt()
}
fn as_any(&self) -> &dyn std::any::Any {
self.inner.as_any()
}
}
// Implement ErrorCompat for this opaque error so the backtrace is also available
// via `ErrorCompat::backtrace()`.
impl $crate::snafu::ErrorCompat for $Error {
fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> {
self.inner.backtrace_opt()
}
}
};
}
#[cfg(test)]
mod tests {
use std::error::Error as StdError;
use snafu::{prelude::*, Backtrace, ErrorCompat};
use super::*;
use crate::prelude::*;
define_opaque_error!(Error);
#[derive(Debug, Snafu)]
enum InnerError {
#[snafu(display("This is a leaf error, val: {}", val))]
Leaf { val: i32, backtrace: Backtrace },
#[snafu(display("This is an internal error"))]
Internal {
source: std::io::Error,
backtrace: Backtrace,
},
}
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
StatusCode::Internal
}
fn backtrace_opt(&self) -> Option<&snafu::Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
fn throw_leaf() -> std::result::Result<(), InnerError> {
LeafSnafu { val: 10 }.fail()
}
fn throw_io() -> std::result::Result<(), std::io::Error> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!"))
}
fn throw_internal() -> std::result::Result<(), InnerError> {
throw_io().context(InternalSnafu)
}
#[test]
fn test_inner_error() {
let leaf = throw_leaf().err().unwrap();
assert!(leaf.backtrace_opt().is_some());
assert!(leaf.source().is_none());
let internal = throw_internal().err().unwrap();
assert!(internal.backtrace_opt().is_some());
assert!(internal.source().is_some());
}
#[test]
fn test_opaque_error() {
// Test leaf error.
let err: Error = throw_leaf().map_err(Into::into).err().unwrap();
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
assert!(ErrorCompat::backtrace(&err).is_some());
assert!(err.backtrace_opt().is_some());
assert_eq!("This is a leaf error, val: 10", err.to_string());
assert_eq!(StatusCode::Internal, err.status_code());
err.as_any().downcast_ref::<InnerError>().unwrap();
// Test internal error.
let err: Error = throw_internal().map_err(Into::into).err().unwrap();
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
assert!(msg.contains("Caused by"));
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
assert!(ErrorCompat::backtrace(&err).is_some());
assert!(err.backtrace_opt().is_some());
assert_eq!("This is an internal error", err.to_string());
assert_eq!(StatusCode::Internal, err.status_code());
err.as_any().downcast_ref::<InnerError>().unwrap();
}
}

View File

@@ -0,0 +1,109 @@
use std::fmt;
use crate::ext::ErrorExt;
/// Pretty debug format for error, also prints source and backtrace.
pub struct DebugFormat<'a, E: ?Sized>(&'a E);
impl<'a, E: ErrorExt + ?Sized> DebugFormat<'a, E> {
/// Create a new format struct from `err`.
pub fn new(err: &'a E) -> Self {
Self(err)
}
}
impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}.", self.0)?;
if let Some(source) = self.0.source() {
// Source error use debug format for more verbose info.
write!(f, " Caused by: {:?}", source)?;
}
if let Some(backtrace) = self.0.backtrace_opt() {
// Add a newline to seperate causes and backtrace.
write!(f, "\nBacktrace:\n{}", backtrace)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use snafu::{prelude::*, Backtrace, GenerateImplicitData};
use super::*;
#[derive(Debug, Snafu)]
#[snafu(display("This is a leaf error"))]
struct Leaf;
impl ErrorExt for Leaf {
fn backtrace_opt(&self) -> Option<&Backtrace> {
None
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug, Snafu)]
#[snafu(display("This is a leaf with backtrace"))]
struct LeafWithBacktrace {
backtrace: Backtrace,
}
impl ErrorExt for LeafWithBacktrace {
fn backtrace_opt(&self) -> Option<&Backtrace> {
Some(&self.backtrace)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug, Snafu)]
#[snafu(display("Internal error"))]
struct Internal {
#[snafu(source)]
source: Leaf,
backtrace: Backtrace,
}
impl ErrorExt for Internal {
fn backtrace_opt(&self) -> Option<&Backtrace> {
Some(&self.backtrace)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[test]
fn test_debug_format() {
let err = Leaf;
let msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!("This is a leaf error.", msg);
let err = LeafWithBacktrace {
backtrace: Backtrace::generate(),
};
let msg = format!("{:?}", DebugFormat::new(&err));
assert!(msg.starts_with("This is a leaf with backtrace.\nBacktrace:\n"));
let err = Internal {
source: Leaf,
backtrace: Backtrace::generate(),
};
let msg = format!("{:?}", DebugFormat::new(&err));
assert!(msg.contains("Internal error. Caused by: Leaf\nBacktrace:\n"));
}
}

View File

@@ -0,0 +1,14 @@
pub mod ext;
pub mod format;
pub mod mock;
pub mod status_code;
pub mod prelude {
pub use snafu::{prelude::*, Backtrace, ErrorCompat};
pub use crate::ext::ErrorExt;
pub use crate::format::DebugFormat;
pub use crate::status_code::StatusCode;
}
pub use snafu;

View File

@@ -0,0 +1,79 @@
//! Utils for mock.
use std::any::Any;
use std::fmt;
use snafu::GenerateImplicitData;
use crate::prelude::*;
/// A mock error mainly for test.
#[derive(Debug)]
pub struct MockError {
pub code: StatusCode,
backtrace: Option<Backtrace>,
}
impl MockError {
/// Create a new [MockError] without backtrace.
pub fn new(code: StatusCode) -> MockError {
MockError {
code,
backtrace: None,
}
}
/// Create a new [MockError] with backtrace.
pub fn with_backtrace(code: StatusCode) -> MockError {
MockError {
code,
backtrace: Some(Backtrace::generate()),
}
}
}
impl fmt::Display for MockError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.code)
}
}
impl std::error::Error for MockError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None
}
}
impl ErrorExt for MockError {
fn status_code(&self) -> StatusCode {
self.code
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
self.backtrace.as_ref()
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl ErrorCompat for MockError {
fn backtrace(&self) -> Option<&Backtrace> {
self.backtrace_opt()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mock_error() {
let err = MockError::new(StatusCode::Unknown);
assert!(err.backtrace_opt().is_none());
let err = MockError::with_backtrace(StatusCode::Unknown);
assert!(err.backtrace_opt().is_some());
}
}

View File

@@ -0,0 +1,56 @@
use std::fmt;
/// Common status code for public API.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum StatusCode {
// ====== Begin of common status code ==============
/// Unknown error.
Unknown,
/// Unsupported operation.
Unsupported,
/// Unexpected error, maybe there is a BUG.
Unexpected,
/// Internal server error.
Internal,
// ====== End of common status code ================
// ====== Begin of SQL related status code =========
/// SQL Syntax error.
InvalidSyntax,
// ====== End of SQL related status code ===========
// ====== Begin of query related status code =======
/// Fail to create a plan for the query.
PlanQuery,
/// The query engine fail to execute query.
EngineExecuteQuery,
// ====== End of query related status code =========
// ====== Begin of catalog related status code =====
/// Table already exists.
TableAlreadyExists,
// ====== End of catalog related status code =======
}
impl fmt::Display for StatusCode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The current debug format is suitable to display.
write!(f, "{:?}", self)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn assert_status_code_display(code: StatusCode, msg: &str) {
let code_msg = format!("{}", code);
assert_eq!(msg, code_msg);
}
#[test]
fn test_display_status_code() {
assert_status_code_display(StatusCode::Unknown, "Unknown");
assert_status_code_display(StatusCode::TableAlreadyExists, "TableAlreadyExists");
}
}

View File

@@ -15,7 +15,7 @@ datatypes = {path ="../../datatypes" }
futures = "0.3"
paste = "1.0"
serde = "1.0"
snafu = "0.7"
snafu = { version = "0.7", features = ["backtraces"] }
[dev-dependencies.arrow]
package = "arrow2"
@@ -24,3 +24,4 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
[dev-dependencies]
tokio = { version = "1.18", features = ["full"] }

View File

@@ -1,10 +1,14 @@
use arrow::error::ArrowError;
use snafu::Snafu;
use snafu::{Backtrace, Snafu};
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Arrow error: {}", source))]
Arrow { source: ArrowError },
Arrow {
source: ArrowError,
backtrace: Backtrace,
},
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -8,12 +8,13 @@ edition = "2021"
[dependencies]
axum = "0.5"
axum-macros = "0.2"
common-recordbatch = {path = "../common/recordbatch" }
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
hyper = { version = "0.14", features = ["full"] }
query = { path = "../query" }
serde = "1.0"
serde_json = "1.0"
snafu = "0.7"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
tokio = { version = "1.18", features = ["full"] }
tower = { version = "0.4", features = ["full"]}

View File

@@ -4,7 +4,7 @@ use query::catalog::memory;
use query::catalog::CatalogListRef;
use snafu::ResultExt;
use crate::error::{QuerySnafu, Result};
use crate::error::{NewCatalogSnafu, Result};
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;
@@ -17,7 +17,7 @@ pub struct DataNode {
impl DataNode {
pub fn new() -> Result<DataNode> {
let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?;
let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
let instance = Arc::new(Instance::new(catalog_list.clone()));
Ok(Self {

View File

@@ -1,15 +1,71 @@
use hyper::Error as HyperError;
use query::error::Error as QueryError;
use snafu::Snafu;
use std::any::Any;
/// business error of datanode.
use common_error::prelude::*;
/// Business error of datanode.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Query error: {}", source))]
Query { source: QueryError },
#[snafu(display("Http error: {}", source))]
Hyper { source: HyperError },
#[snafu(display("Fail to execute sql, source: {}", source))]
ExecuteSql {
#[snafu(backtrace)]
source: query::error::Error,
},
#[snafu(display("Fail to create catalog list, source: {}", source))]
NewCatalog {
#[snafu(backtrace)]
source: query::error::Error,
},
// The error source of http error is clear even without backtrace now so
// a backtrace is not carried in this varaint.
#[snafu(display("Fail to start HTTP server, source: {}", source))]
StartHttp { source: hyper::Error },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(),
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. } => StatusCode::Internal,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use super::*;
fn throw_query_error() -> std::result::Result<(), query::error::Error> {
Err(query::error::Error::new(MockError::with_backtrace(
StatusCode::Internal,
)))
}
fn assert_internal_error(err: &Error) {
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Internal, err.status_code());
}
#[test]
fn test_error() {
let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap();
assert_internal_error(&err);
let err = throw_query_error().context(NewCatalogSnafu).err().unwrap();
assert_internal_error(&err);
}
}

View File

@@ -4,7 +4,7 @@ use query::catalog::CatalogListRef;
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::ResultExt;
use crate::error::{QuerySnafu, Result};
use crate::error::{ExecuteSqlSnafu, Result};
// An abstraction to read/write services.
pub struct Instance {
@@ -27,12 +27,15 @@ impl Instance {
}
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?;
let logical_plan = self
.query_engine
.sql_to_plan(sql)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
.await
.context(QuerySnafu)
.context(ExecuteSqlSnafu)
}
}

View File

@@ -17,7 +17,7 @@ use snafu::ResultExt;
use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
use crate::error::{HyperSnafu, Result};
use crate::error::{Result, StartHttpSnafu};
use crate::server::InstanceRef;
/// Http server
@@ -106,7 +106,7 @@ impl HttpServer {
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await.context(HyperSnafu)?;
graceful.await.context(StartHttpSnafu)?;
Ok(())
}

View File

@@ -10,12 +10,13 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
[dependencies]
async-trait = "0.1"
common-error = { path = "../common/error" }
common-recordbatch = {path = "../common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datatypes = {path = "../datatypes" }
futures = "0.3"
futures-util = "0.3.21"
snafu = "0.7.0"
snafu = { version = "0.7", features = ["backtraces"] }
table = { path = "../table" }
tokio = "1.0"
sql = { path = "../sql" }

View File

@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use common_error::prelude::*;
use table::table::numbers::NumbersTable;
use table::TableRef;
@@ -11,7 +12,36 @@ use crate::catalog::{
CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
};
use crate::error::{ExecutionSnafu, Result};
use crate::error::{Error, Result};
/// Error implementation of memory catalog.
#[derive(Debug, Snafu)]
pub enum InnerError {
#[snafu(display("Table {} already exists", table))]
TableExists { table: String, backtrace: Backtrace },
}
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
match self {
InnerError::TableExists { .. } => StatusCode::TableAlreadyExists,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
/// Simple in-memory list of catalogs
#[derive(Default)]
@@ -127,10 +157,7 @@ impl SchemaProvider for MemorySchemaProvider {
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
if self.table_exist(name.as_str()) {
return ExecutionSnafu {
message: format!("The table {} already exists", name),
}
.fail();
return TableExistsSnafu { table: name }.fail()?;
}
let mut tables = self.tables.write().unwrap();
Ok(tables.insert(name, table))
@@ -196,6 +223,8 @@ mod tests {
assert!(provider.table_exist(table_name));
let other_table = NumbersTable::default();
let result = provider.register_table(table_name.to_string(), Arc::new(other_table));
assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}
}

View File

@@ -1,4 +1,9 @@
mod adapter;
//! Planner, QueryEngine implementations based on DataFusion.
mod catalog_adapter;
mod error;
mod plan_adapter;
mod planner;
use std::sync::Arc;
@@ -6,18 +11,20 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream};
use snafu::{OptionExt, ResultExt};
use sql::{dialect::GenericDialect, parser::ParserContext};
use super::{context::QueryContext, state::QueryEngineState};
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
use crate::query_engine::{QueryContext, QueryEngineState};
use crate::{
catalog::CatalogListRef,
error::{self, ParseSqlSnafu, Result},
datafusion::plan_adapter::PhysicalPlanAdapter,
datafusion::planner::{DfContextProviderAdapter, DfPlanner},
error::Result,
executor::QueryExecutor,
logical_optimizer::LogicalOptimizer,
physical_optimizer::PhysicalOptimizer,
physical_planner::PhysicalPlanner,
plan::{LogicalPlan, PhysicalPlan},
planner::{DfContextProviderAdapter, DfPlanner, Planner},
query_engine::datafusion::adapter::PhysicalPlanAdapter,
query_engine::{Output, QueryEngine},
planner::Planner,
Output, QueryEngine,
};
pub(crate) struct DatafusionQueryEngine {
@@ -42,9 +49,7 @@ impl QueryEngine for DatafusionQueryEngine {
let context_provider = DfContextProviderAdapter::new(self.state.catalog_list());
let planner = DfPlanner::new(&context_provider);
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
.with_context(|_| ParseSqlSnafu {
sql: sql.to_string(),
})?;
.context(error::ParseSqlSnafu)?;
// TODO(dennis): supports multi statement in one sql?
assert!(1 == statement.len());
planner.statement_to_plan(statement.remove(0))
@@ -70,11 +75,13 @@ impl LogicalOptimizer for DatafusionQueryEngine {
) -> Result<LogicalPlan> {
match plan {
LogicalPlan::DfPlan(df_plan) => {
let optimized_plan = self
.state
.df_context()
.optimize(df_plan)
.context(error::DatafusionSnafu)?;
let optimized_plan =
self.state
.df_context()
.optimize(df_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize logical plan",
})?;
Ok(LogicalPlan::DfPlan(optimized_plan))
}
@@ -96,7 +103,9 @@ impl PhysicalPlanner for DatafusionQueryEngine {
.df_context()
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu)?;
.context(error::DatafusionSnafu {
msg: "Fail to create physical plan",
})?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(physical_plan.schema().into()),
@@ -126,7 +135,9 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
for optimizer in optimizers {
new_plan = optimizer
.optimize(new_plan, config)
.context(error::DatafusionSnafu)?;
.context(error::DatafusionSnafu {
msg: "Fail to optimize physical plan",
})?;
}
Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan)))
}
@@ -172,7 +183,6 @@ mod tests {
let plan = engine.sql_to_plan(sql).unwrap();
println!("{:?}", plan);
assert_eq!(
format!("{:?}", plan),
r#"DfPlan(Limit: 20

View File

@@ -0,0 +1,222 @@
//! Catalog adapter between datafusion and greptime query engine.
use std::any::Any;
use std::sync::Arc;
use datafusion::catalog::{
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
schema::SchemaProvider as DfSchemaProvider,
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
Table,
};
use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider};
use crate::datafusion::error;
use crate::error::Result;
pub struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
catalog_list: CatalogListRef,
}
impl DfCatalogListAdapter {
pub fn new(runtime: Arc<RuntimeEnv>, catalog_list: CatalogListRef) -> DfCatalogListAdapter {
DfCatalogListAdapter {
runtime,
catalog_list,
}
}
}
impl DfCatalogList for DfCatalogListAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn DfCatalogProvider>,
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_cataglog_provider: catalog,
runtime: self.runtime.clone(),
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
fn catalog_names(&self) -> Vec<String> {
self.catalog_list.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list.catalog(name).map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl CatalogProvider for CatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.df_cataglog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.df_cataglog_provider
.schema(name)
.map(|df_schema_provider| {
Arc::new(SchemaProviderAdapter {
df_schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: Arc<dyn CatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider.schema(name).map(|schema_provider| {
Arc::new(DfSchemaProviderAdapter {
schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
self.schema_provider
.table(name)
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn register_table(
&self,
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
match self.schema_provider.register_table(name, table)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
match self.schema_provider.deregister_table(name)? {
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
None => Ok(None),
}
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider.table_exist(name)
}
}
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl SchemaProvider for SchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String> {
self.df_schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
self.df_schema_provider.table(name).map(|table_provider| {
Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _
})
}
fn register_table(
&self,
name: String,
table: Arc<dyn Table>,
) -> Result<Option<Arc<dyn Table>>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
Ok(self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu {
msg: "Fail to register table to datafusion",
})?
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
Ok(self
.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})?
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
}
fn table_exist(&self, name: &str) -> bool {
self.df_schema_provider.table_exist(name)
}
}

View File

@@ -0,0 +1,112 @@
use std::any::Any;
use common_error::prelude::*;
use datafusion::error::DataFusionError;
use crate::error::Error;
/// Inner error of datafusion based query engine.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum InnerError {
#[snafu(display("{}: {}", msg, source))]
Datafusion {
msg: &'static str,
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("PhysicalPlan downcast failed"))]
PhysicalPlanDowncast { backtrace: Backtrace },
// The sql error already contains the SQL.
#[snafu(display("Cannot parse SQL, source: {}", source))]
ParseSql {
#[snafu(backtrace)]
source: sql::error::Error,
},
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
PlanSql {
sql: String,
source: DataFusionError,
backtrace: Backtrace,
},
}
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
use InnerError::*;
match self {
// TODO(yingwen): Further categorize datafusion error.
Datafusion { .. } => StatusCode::EngineExecuteQuery,
// This downcast should not fail in usual case.
PhysicalPlanDowncast { .. } => StatusCode::Unexpected,
ParseSql { source, .. } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn throw_df_error() -> Result<(), DataFusionError> {
Err(DataFusionError::NotImplemented("test".to_string()))
}
fn assert_error(err: &InnerError, code: StatusCode) {
assert_eq!(code, err.status_code());
assert!(err.backtrace_opt().is_some());
}
#[test]
fn test_datafusion_as_source() {
let err = throw_df_error()
.context(DatafusionSnafu { msg: "test df" })
.err()
.unwrap();
assert_error(&err, StatusCode::EngineExecuteQuery);
let err = throw_df_error()
.context(PlanSqlSnafu { sql: "" })
.err()
.unwrap();
assert_error(&err, StatusCode::PlanQuery);
let res: Result<(), InnerError> = PhysicalPlanDowncastSnafu {}.fail();
let err = res.err().unwrap();
assert_error(&err, StatusCode::Unexpected);
}
fn raise_sql_error() -> Result<(), sql::error::Error> {
Err(sql::error::Error::Unsupported {
sql: "".to_string(),
keyword: "".to_string(),
})
}
#[test]
fn test_parse_error() {
let err = raise_sql_error().context(ParseSqlSnafu).err().unwrap();
assert!(err.backtrace_opt().is_none());
let sql_err = raise_sql_error().err().unwrap();
assert_eq!(sql_err.status_code(), err.status_code());
}
}

View File

@@ -16,7 +16,8 @@ use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
use crate::error::{self, Result};
use crate::datafusion::error;
use crate::error::Result;
use crate::executor::Runtime;
use crate::plan::{Partitioning, PhysicalPlan};
@@ -74,7 +75,9 @@ impl PhysicalPlan for PhysicalPlanAdapter {
let plan = self
.plan
.with_new_children(df_children)
.context(error::DatafusionSnafu)?;
.context(error::DatafusionSnafu {
msg: "Fail to add children to plan",
})?;
Ok(Arc::new(PhysicalPlanAdapter::new(
self.schema.clone(),
plan,
@@ -86,11 +89,13 @@ impl PhysicalPlan for PhysicalPlanAdapter {
runtime: &Runtime,
partition: usize,
) -> Result<SendableRecordBatchStream> {
let df_stream = self
.plan
.execute(partition, runtime.into())
.await
.context(error::DatafusionSnafu)?;
let df_stream =
self.plan
.execute(partition, runtime.into())
.await
.context(error::DatafusionSnafu {
msg: "Fail to execute physical plan",
})?;
Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream)))
}
@@ -113,9 +118,6 @@ impl Debug for ExecutionPlanAdapter {
}
}
unsafe impl Send for ExecutionPlanAdapter {}
unsafe impl Sync for ExecutionPlanAdapter {}
#[async_trait::async_trait]
impl ExecutionPlan for ExecutionPlanAdapter {
fn as_any(&self) -> &dyn Any {

View File

@@ -0,0 +1,113 @@
use std::sync::Arc;
use arrow::datatypes::DataType;
use datafusion::catalog::TableReference;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use snafu::ResultExt;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use table::table::adapter::DfTableProviderAdapter;
use crate::{
catalog::{self, CatalogListRef},
datafusion::error,
error::Result,
plan::LogicalPlan,
planner::Planner,
};
pub struct DfPlanner<'a, S: ContextProvider> {
sql_to_rel: SqlToRel<'a, S>,
}
impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
/// Creates a DataFusion planner instance
pub fn new(schema_provider: &'a S) -> Self {
let rel = SqlToRel::new(schema_provider);
Self { sql_to_rel: rel }
}
/// Converts QUERY statement to logical plan.
pub fn query_to_plan(&self, query: Box<Query>) -> Result<LogicalPlan> {
// todo(hl): original SQL should be provided as an argument
let sql = query.inner.to_string();
let result = self
.sql_to_rel
.query_to_plan(query.inner)
.context(error::PlanSqlSnafu { sql })?;
Ok(LogicalPlan::DfPlan(result))
}
}
impl<'a, S> Planner for DfPlanner<'a, S>
where
S: ContextProvider + Send + Sync,
{
/// Converts statement to logical plan using datafusion planner
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::ShowDatabases(_) => {
todo!("Currently not supported")
}
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Insert(_) => {
todo!()
}
}
}
}
pub(crate) struct DfContextProviderAdapter<'a> {
catalog_list: &'a CatalogListRef,
}
impl<'a> DfContextProviderAdapter<'a> {
pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self {
Self { catalog_list }
}
}
impl<'a> ContextProvider for DfContextProviderAdapter<'a> {
fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
let (catalog, schema, table) = match name {
TableReference::Bare { table } => (
catalog::DEFAULT_CATALOG_NAME,
catalog::DEFAULT_SCHEMA_NAME,
table,
),
TableReference::Partial { schema, table } => {
(catalog::DEFAULT_CATALOG_NAME, schema, table)
}
TableReference::Full {
catalog,
schema,
table,
} => (catalog, schema, table),
};
self.catalog_list
.catalog(catalog)
.and_then(|catalog_provider| catalog_provider.schema(schema))
.and_then(|schema_provider| schema_provider.table(table))
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
// TODO(dennis)
None
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
// TODO(dennis)
None
}
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
// TODO(dennis)
None
}
}

View File

@@ -1,28 +1,6 @@
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::error::DataFusionError;
use snafu::Snafu;
use sql::errors::ParserError;
/// business error of query engine
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Datafusion query engine error: {}", source))]
Datafusion { source: DataFusionError },
#[snafu(display("PhysicalPlan downcast_ref failed"))]
PhysicalPlanDowncast,
#[snafu(display("RecordBatch error: {}", source))]
RecordBatch { source: RecordBatchError },
#[snafu(display("Execution error: {}", message))]
Execution { message: String },
#[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))]
ParseSql { sql: String, source: ParserError },
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
Planner {
sql: String,
source: DataFusionError,
},
}
common_error::define_opaque_error!(Error);
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -1,5 +1,6 @@
pub mod catalog;
pub mod database;
mod datafusion;
pub mod error;
pub mod executor;
pub mod logical_optimizer;

View File

@@ -1,109 +1,8 @@
use std::sync::Arc;
use arrow::datatypes::DataType;
use datafusion::catalog::TableReference;
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::udaf::AggregateUDF;
use datafusion::physical_plan::udf::ScalarUDF;
use datafusion::sql::planner::{ContextProvider, SqlToRel};
use snafu::ResultExt;
use sql::statements::query::Query;
use sql::statements::statement::Statement;
use table::table::adapter::DfTableProviderAdapter;
use crate::{
catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME},
error::{PlannerSnafu, Result},
plan::LogicalPlan,
};
use crate::{error::Result, plan::LogicalPlan};
/// SQL logical planner.
pub trait Planner: Send + Sync {
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan>;
}
pub struct DfPlanner<'a, S: ContextProvider> {
sql_to_rel: SqlToRel<'a, S>,
}
impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
/// Creates a DataFusion planner instance
pub fn new(schema_provider: &'a S) -> Self {
let rel = SqlToRel::new(schema_provider);
Self { sql_to_rel: rel }
}
/// Converts QUERY statement to logical plan.
pub fn query_to_plan(&self, query: Box<Query>) -> Result<LogicalPlan> {
// todo(hl): original SQL should be provided as an argument
let sql = query.inner.to_string();
let result = self
.sql_to_rel
.query_to_plan(query.inner)
.context(PlannerSnafu { sql })?;
Ok(LogicalPlan::DfPlan(result))
}
}
impl<'a, S> Planner for DfPlanner<'a, S>
where
S: ContextProvider + Send + Sync,
{
/// Converts statement to logical plan using datafusion planner
fn statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
match statement {
Statement::ShowDatabases(_) => {
todo!("Currently not supported")
}
Statement::Query(qb) => self.query_to_plan(qb),
Statement::Insert(_) => {
todo!()
}
}
}
}
pub(crate) struct DfContextProviderAdapter<'a> {
catalog_list: &'a CatalogListRef,
}
impl<'a> DfContextProviderAdapter<'a> {
pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self {
Self { catalog_list }
}
}
impl<'a> ContextProvider for DfContextProviderAdapter<'a> {
fn get_table_provider(&self, name: TableReference) -> Option<Arc<dyn TableProvider>> {
let (catalog, schema, table) = match name {
TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table),
TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table),
TableReference::Full {
catalog,
schema,
table,
} => (catalog, schema, table),
};
self.catalog_list
.catalog(catalog)
.and_then(|catalog_provider| catalog_provider.schema(schema))
.and_then(|schema_provider| schema_provider.table(table))
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn get_function_meta(&self, _name: &str) -> Option<Arc<ScalarUDF>> {
// TODO(dennis)
None
}
fn get_aggregate_meta(&self, _name: &str) -> Option<Arc<AggregateUDF>> {
// TODO(dennis)
None
}
fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
// TODO(dennis)
None
}
}

View File

@@ -1,16 +1,16 @@
mod context;
mod datafusion;
mod state;
use std::sync::Arc;
use common_recordbatch::SendableRecordBatchStream;
pub use context::QueryContext;
use crate::catalog::CatalogList;
use crate::datafusion::DatafusionQueryEngine;
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::query_engine::datafusion::DatafusionQueryEngine;
pub use crate::query_engine::context::QueryContext;
pub use crate::query_engine::state::QueryEngineState;
/// Sql output
pub enum Output {

View File

@@ -1,29 +1,16 @@
use std::any::Any;
use std::fmt;
use std::sync::Arc;
use datafusion::catalog::{
catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider},
schema::SchemaProvider as DfSchemaProvider,
};
use datafusion::datasource::TableProvider as DfTableProvider;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use snafu::ResultExt;
use table::{
table::adapter::{DfTableProviderAdapter, TableAdapter},
Table,
};
use crate::catalog::{
schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
};
use crate::error::{self, Result};
use crate::catalog::{self, CatalogListRef};
use crate::datafusion::DfCatalogListAdapter;
use crate::executor::Runtime;
/// Query engine global state
// TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it,
// which allows different implementation use different engine state. The state can also be an associated
// type in QueryEngine trait.
#[derive(Clone)]
pub struct QueryEngineState {
df_context: ExecutionContext,
@@ -39,14 +26,16 @@ impl fmt::Debug for QueryEngineState {
impl QueryEngineState {
pub(crate) fn new(catalog_list: CatalogListRef) -> Self {
let config = ExecutionConfig::new()
.with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
let config = ExecutionConfig::new().with_default_catalog_and_schema(
catalog::DEFAULT_CATALOG_NAME,
catalog::DEFAULT_SCHEMA_NAME,
);
let df_context = ExecutionContext::with_config(config);
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter {
catalog_list: catalog_list.clone(),
runtime: df_context.runtime_env(),
});
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new(
df_context.runtime_env(),
catalog_list.clone(),
));
Self {
df_context,
@@ -69,194 +58,3 @@ impl QueryEngineState {
self.df_context.runtime_env().into()
}
}
/// Adapters between datafusion and greptime query engine.
struct DfCatalogListAdapter {
runtime: Arc<RuntimeEnv>,
catalog_list: CatalogListRef,
}
impl DfCatalogList for DfCatalogListAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn DfCatalogProvider>,
) -> Option<Arc<dyn DfCatalogProvider>> {
let catalog_adapter = Arc::new(CatalogProviderAdapter {
df_cataglog_provider: catalog,
runtime: self.runtime.clone(),
});
self.catalog_list
.register_catalog(name, catalog_adapter)
.map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
fn catalog_names(&self) -> Vec<String> {
self.catalog_list.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn DfCatalogProvider>> {
self.catalog_list.catalog(name).map(|catalog_provider| {
Arc::new(DfCatalogProviderAdapter {
catalog_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Datafusion's CatalogProvider -> greptime CatalogProvider
struct CatalogProviderAdapter {
df_cataglog_provider: Arc<dyn DfCatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl CatalogProvider for CatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.df_cataglog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.df_cataglog_provider
.schema(name)
.map(|df_schema_provider| {
Arc::new(SchemaProviderAdapter {
df_schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
///Greptime CatalogProvider -> datafusion's CatalogProvider
struct DfCatalogProviderAdapter {
catalog_provider: Arc<dyn CatalogProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfCatalogProvider for DfCatalogProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.catalog_provider.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn DfSchemaProvider>> {
self.catalog_provider.schema(name).map(|schema_provider| {
Arc::new(DfSchemaProviderAdapter {
schema_provider,
runtime: self.runtime.clone(),
}) as _
})
}
}
/// Greptime SchemaProvider -> datafusion SchemaProvider
struct DfSchemaProviderAdapter {
schema_provider: Arc<dyn SchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl DfSchemaProvider for DfSchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn DfTableProvider>> {
self.schema_provider
.table(name)
.map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _)
}
fn register_table(
&self,
name: String,
table: Arc<dyn DfTableProvider>,
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
match self.schema_provider.register_table(name, table) {
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
match self.schema_provider.deregister_table(name) {
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
Ok(None) => Ok(None),
Err(e) => Err(e.into()),
}
}
fn table_exist(&self, name: &str) -> bool {
self.schema_provider.table_exist(name)
}
}
/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter
struct SchemaProviderAdapter {
df_schema_provider: Arc<dyn DfSchemaProvider>,
runtime: Arc<RuntimeEnv>,
}
impl SchemaProvider for SchemaProviderAdapter {
fn as_any(&self) -> &dyn Any {
self
}
/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String> {
self.df_schema_provider.table_names()
}
fn table(&self, name: &str) -> Option<Arc<dyn Table>> {
self.df_schema_provider.table(name).map(|table_provider| {
Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _
})
}
fn register_table(
&self,
name: String,
table: Arc<dyn Table>,
) -> Result<Option<Arc<dyn Table>>> {
let table_provider = Arc::new(DfTableProviderAdapter::new(table));
Ok(self
.df_schema_provider
.register_table(name, table_provider)
.context(error::DatafusionSnafu)?
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn Table>>> {
Ok(self
.df_schema_provider
.deregister_table(name)
.context(error::DatafusionSnafu)?
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
}
fn table_exist(&self, name: &str) -> bool {
self.df_schema_provider.table_exist(name)
}
}

View File

@@ -6,5 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
snafu = "0.7.0"
common-error = { path = "../common/error" }
snafu = { version = "0.7", features = ["backtraces"] }
sqlparser = "0.15.0"

97
src/sql/src/error.rs Normal file
View File

@@ -0,0 +1,97 @@
use std::any::Any;
use common_error::prelude::*;
use sqlparser::parser::ParserError;
/// SQL parser errors.
// Now the error in parser does not contains backtrace to avoid generating backtrace
// every time the parser parses an invalid SQL.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("SQL statement is not supported: {}, keyword: {}", sql, keyword))]
Unsupported { sql: String, keyword: String },
#[snafu(display(
"Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}",
sql,
expected,
actual,
source
))]
Unexpected {
sql: String,
expected: String,
actual: String,
source: ParserError,
},
// Syntax error from sql parser.
#[snafu(display("Syntax error, sql: {}, source: {}", sql, source))]
Syntax { sql: String, source: ParserError },
}
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
Unsupported { .. } => StatusCode::Unsupported,
Unexpected { .. } | Syntax { .. } => StatusCode::InvalidSyntax,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use super::*;
fn throw_sp_error() -> Result<(), ParserError> {
Err(ParserError::ParserError("parser error".to_string()))
}
#[test]
fn test_syntax_error() {
let err = throw_sp_error()
.context(SyntaxSnafu { sql: "" })
.err()
.unwrap();
assert_matches!(
err,
Error::Syntax {
sql: _,
source: ParserError::ParserError { .. }
}
);
assert_eq!(StatusCode::InvalidSyntax, err.status_code());
let err = throw_sp_error()
.context(UnexpectedSnafu {
sql: "",
expected: "",
actual: "",
})
.err()
.unwrap();
assert_eq!(StatusCode::InvalidSyntax, err.status_code());
}
#[test]
fn test_unsupported_error() {
let err = Error::Unsupported {
sql: "".to_string(),
keyword: "".to_string(),
};
assert_eq!(StatusCode::Unsupported, err.status_code());
}
}

View File

@@ -1,52 +0,0 @@
use snafu::prelude::*;
use sqlparser::parser::ParserError as SpParserError;
/// SQL parser errors.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum ParserError {
#[snafu(display("SQL statement is not supported: {sql}, keyword: {keyword}"))]
Unsupported { sql: String, keyword: String },
#[snafu(display(
"Unexpected token while parsing SQL statement: {sql}, expected: {expected}, found: {actual}, source: {source}"
))]
Unexpected {
sql: String,
expected: String,
actual: String,
source: SpParserError,
},
#[snafu(display("SQL syntax error: {msg}"))]
SyntaxError { msg: String },
#[snafu(display("Unknown inner parser error, sql: {sql}, source: {source}"))]
InnerError { sql: String, source: SpParserError },
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use snafu::ResultExt;
#[test]
pub fn test_error_conversion() {
pub fn raise_error() -> Result<(), sqlparser::parser::ParserError> {
Err(sqlparser::parser::ParserError::ParserError(
"parser error".to_string(),
))
}
assert_matches!(
raise_error().context(crate::errors::InnerSnafu {
sql: "".to_string(),
}),
Err(super::ParserError::InnerError {
sql: _,
source: sqlparser::parser::ParserError::ParserError { .. }
})
)
}
}

View File

@@ -4,7 +4,7 @@ extern crate core;
pub mod ast;
pub mod dialect;
pub mod errors;
pub mod error;
pub mod parser;
pub mod parsers;
pub mod statements;

View File

@@ -4,12 +4,12 @@ use sqlparser::keywords::Keyword;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::{Token, Tokenizer};
use crate::errors;
use crate::error::{self, Error};
use crate::statements::show_database::SqlShowDatabase;
use crate::statements::show_kind::ShowKind;
use crate::statements::statement::Statement;
pub type Result<T> = std::result::Result<T, errors::ParserError>;
pub type Result<T> = std::result::Result<T, Error>;
/// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser.
pub struct ParserContext<'a> {
@@ -87,10 +87,11 @@ impl<'a> ParserContext<'a> {
/// Raises an "unsupported statement" error.
pub fn unsupported<T>(&self, keyword: String) -> Result<T> {
Err(errors::ParserError::Unsupported {
sql: self.sql.to_string(),
error::UnsupportedSnafu {
sql: self.sql,
keyword,
})
}
.fail()
}
/// Parses SHOW statements
@@ -137,7 +138,7 @@ impl<'a> ParserContext<'a> {
ShowKind::Like(
self.parser
.parse_identifier()
.context(errors::UnexpectedSnafu {
.context(error::UnexpectedSnafu {
sql: self.sql,
expected: "LIKE",
actual: tok.to_string(),
@@ -146,7 +147,7 @@ impl<'a> ParserContext<'a> {
),
))),
Keyword::WHERE => Ok(Statement::ShowDatabases(SqlShowDatabase::new(
ShowKind::Where(self.parser.parse_expr().context(errors::UnexpectedSnafu {
ShowKind::Where(self.parser.parse_expr().context(error::UnexpectedSnafu {
sql: self.sql.to_string(),
expected: "some valid expression".to_string(),
actual: self.peek_token_as_string(),

View File

@@ -1,7 +1,7 @@
use snafu::ResultExt;
use sqlparser::ast::Statement as SpStatement;
use crate::errors;
use crate::error;
use crate::parser::ParserContext;
use crate::parser::Result;
use crate::statements::insert::Insert;
@@ -14,13 +14,13 @@ impl<'a> ParserContext<'a> {
let spstatement = self
.parser
.parse_insert()
.context(errors::InnerSnafu { sql: self.sql })?;
.context(error::SyntaxSnafu { sql: self.sql })?;
match spstatement {
SpStatement::Insert { .. } => {
Ok(Statement::Insert(Box::new(Insert { inner: spstatement })))
}
unexp => errors::UnsupportedSnafu {
unexp => error::UnsupportedSnafu {
sql: self.sql.to_string(),
keyword: unexp.to_string(),
}

View File

@@ -1,6 +1,6 @@
use snafu::prelude::*;
use crate::errors;
use crate::error;
use crate::parser::ParserContext;
use crate::parser::Result;
use crate::statements::query::Query;
@@ -12,7 +12,7 @@ impl<'a> ParserContext<'a> {
let spquery = self
.parser
.parse_query()
.context(errors::InnerSnafu { sql: self.sql })?;
.context(error::SyntaxSnafu { sql: self.sql })?;
Ok(Statement::Query(Box::new(Query::try_from(spquery)?)))
}

View File

@@ -1,6 +1,6 @@
use sqlparser::ast::Query as SpQuery;
use crate::errors::ParserError;
use crate::error::Error;
/// Query statement instance.
#[derive(Debug, Clone, PartialEq)]
@@ -10,7 +10,7 @@ pub struct Query {
/// Automatically converts from sqlparser Query instance to SqlQuery.
impl TryFrom<SpQuery> for Query {
type Error = ParserError;
type Error = Error;
fn try_from(q: SpQuery) -> Result<Self, Self::Error> {
Ok(Query { inner: q })
@@ -18,7 +18,7 @@ impl TryFrom<SpQuery> for Query {
}
impl TryFrom<Query> for SpQuery {
type Error = ParserError;
type Error = Error;
fn try_from(value: Query) -> Result<Self, Self::Error> {
Ok(value.inner)

View File

@@ -11,6 +11,7 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
common-error = {path = "../common/error" }
common-query = {path = "../common/query" }
common-recordbatch = {path = "../common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
@@ -18,4 +19,4 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b
datatypes = { path = "../datatypes" }
futures = "0.3"
serde = "1.0.136"
snafu = "0.7.0"
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -1,18 +1,74 @@
use datafusion::error::DataFusionError;
use snafu::Snafu;
use std::any::Any;
use common_error::prelude::*;
use datafusion::error::DataFusionError;
common_error::define_opaque_error!(Error);
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Datafusion error: {}", source))]
Datafusion { source: DataFusionError },
#[snafu(display("Not expected to run ExecutionPlan more than once."))]
ExecuteRepeatedly,
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
fn from(e: Error) -> Self {
Self::External(Box::new(e))
}
}
/// Default error implementation of table.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum InnerError {
#[snafu(display("Datafusion error: {}", source))]
Datafusion {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Not expected to run ExecutionPlan more than once"))]
ExecuteRepeatedly { backtrace: Backtrace },
}
impl ErrorExt for InnerError {
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
impl From<InnerError> for DataFusionError {
fn from(e: InnerError) -> DataFusionError {
DataFusionError::External(Box::new(e))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn throw_df_error() -> Result<()> {
Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)?
}
fn throw_repeatedly() -> Result<()> {
ExecuteRepeatedlySnafu {}.fail()?
}
#[test]
fn test_error() {
let err = throw_df_error().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Unknown, err.status_code());
let err = throw_repeatedly().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Unknown, err.status_code());
}
}

View File

@@ -16,7 +16,7 @@ use datafusion::datasource::{
datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider,
TableType as DfTableType,
};
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::error::Result as DfResult;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr as DfExpr;
use datafusion::physical_plan::{
@@ -30,8 +30,8 @@ use datatypes::schema::{Schema, SchemaRef};
use futures::Stream;
use snafu::prelude::*;
use super::{Table, TableProviderFilterPushDown, TableRef, TableType};
use crate::error::{self, Result};
use crate::table::{Table, TableProviderFilterPushDown, TableRef, TableType};
/// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan.
struct ExecutionPlanAdapter {
@@ -90,9 +90,7 @@ impl ExecutionPlan for ExecutionPlanAdapter {
let stream = mem::replace(&mut *stream, None);
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap())))
} else {
error::ExecuteRepeatedlySnafu
.fail()
.map_err(|e| DataFusionError::External(Box::new(e)))
error::ExecuteRepeatedlySnafu.fail()?
}
}
@@ -139,28 +137,23 @@ impl TableProvider for DfTableProviderAdapter {
) -> DfResult<Arc<dyn ExecutionPlan>> {
let filters: Vec<Expr> = filters.iter().map(Clone::clone).map(Expr::new).collect();
match self.table.scan(projection, &filters, limit).await {
Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter {
schema: stream.schema(),
stream: Mutex::new(Some(stream)),
})),
Err(e) => Err(e.into()),
}
let stream = self.table.scan(projection, &filters, limit).await?;
Ok(Arc::new(ExecutionPlanAdapter {
schema: stream.schema(),
stream: Mutex::new(Some(stream)),
}))
}
fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult<DfTableProviderFilterPushDown> {
match self
let p = self
.table
.supports_filter_pushdown(&Expr::new(filter.clone()))
{
Ok(p) => match p {
TableProviderFilterPushDown::Unsupported => {
Ok(DfTableProviderFilterPushDown::Unsupported)
}
TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact),
TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact),
},
Err(e) => Err(e.into()),
.supports_filter_pushdown(&Expr::new(filter.clone()))?;
match p {
TableProviderFilterPushDown::Unsupported => {
Ok(DfTableProviderFilterPushDown::Unsupported)
}
TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact),
TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact),
}
}
}