diff --git a/Cargo.lock b/Cargo.lock index 81af3bfe4e..a776c55494 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -223,6 +232,21 @@ dependencies = [ "syn", ] +[[package]] +name = "backtrace" +version = "0.3.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.13.0" @@ -433,6 +457,13 @@ dependencies = [ name = "common-base" version = "0.1.0" +[[package]] +name = "common-error" +version = "0.1.0" +dependencies = [ + "snafu", +] + [[package]] name = "common-query" version = "0.1.0" @@ -620,6 +651,7 @@ dependencies = [ "arrow2", "axum", "axum-macros", + "common-error", "common-recordbatch", "hyper", "query", @@ -818,6 +850,12 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "h2" version = "0.3.13" @@ -1300,6 +1338,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.28.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456" +dependencies = [ + "memchr", +] + [[package]] name = "object-store" version = "0.1.0" @@ -1486,6 +1533,7 @@ version = "0.1.0" dependencies = [ "arrow2", "async-trait", + "common-error", "common-recordbatch", "datafusion", "datatypes", @@ -1578,6 +1626,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustversion" version = "1.0.6" @@ -1684,6 +1738,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eba135d2c579aa65364522eb78590cdf703176ef71ad4c32b00f58f7afb2df5" dependencies = [ + "backtrace", "doc-comment", "snafu-derive", ] @@ -1720,6 +1775,7 @@ dependencies = [ name = "sql" version = "0.1.0" dependencies = [ + "common-error", "snafu", "sqlparser", ] @@ -1815,6 +1871,7 @@ dependencies = [ "arrow2", "async-trait", "chrono", + "common-error", "common-query", "common-recordbatch", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index daed9802d6..89d06f082b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "src/common/base", + "src/common/error", "src/common/query", "src/common/recordbatch", "src/cmd", diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml new file mode 100644 index 0000000000..b41e6d1f0d --- /dev/null +++ b/src/common/error/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "common-error" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs new file mode 100644 index 0000000000..d69e4f1e94 --- /dev/null +++ b/src/common/error/src/ext.rs @@ -0,0 +1,185 @@ +use std::any::Any; + +use crate::status_code::StatusCode; + +/// Extension to [`Error`](std::error::Error) in std. +pub trait ErrorExt: std::error::Error { + /// Map this error to [StatusCode]. + fn status_code(&self) -> StatusCode { + StatusCode::Unknown + } + + /// Get the reference to the backtrace of this error, None if the backtrace is unavailable. + // Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`, once backtrace + // in std is stable, we can deprecate this method. + fn backtrace_opt(&self) -> Option<&crate::snafu::Backtrace>; + + /// Returns the error as [Any](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; +} + +/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait. +#[macro_export] +macro_rules! define_opaque_error { + ($Error:ident) => { + /// An error behaves like `Box`. + /// + /// Define this error as a new type instead of using `Box` directly so we can implement + /// more methods or traits for it. + pub struct $Error { + inner: Box, + } + + impl $Error { + pub fn new(err: E) -> Self { + Self { + inner: Box::new(err), + } + } + } + + impl std::fmt::Debug for $Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Use the pretty debug format of inner error for opaque error. + let debug_format = $crate::format::DebugFormat::new(&*self.inner); + debug_format.fmt(f) + } + } + + impl std::fmt::Display for $Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } + } + + impl std::error::Error for $Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.inner.source() + } + } + + impl $crate::ext::ErrorExt for $Error { + fn status_code(&self) -> $crate::status_code::StatusCode { + self.inner.status_code() + } + + fn backtrace_opt(&self) -> Option<&$crate::snafu::Backtrace> { + self.inner.backtrace_opt() + } + + fn as_any(&self) -> &dyn std::any::Any { + self.inner.as_any() + } + } + + // Implement ErrorCompat for this opaque error so the backtrace is also available + // via `ErrorCompat::backtrace()`. + impl $crate::snafu::ErrorCompat for $Error { + fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> { + self.inner.backtrace_opt() + } + } + }; +} + +#[cfg(test)] +mod tests { + use std::error::Error as StdError; + + use snafu::{prelude::*, Backtrace, ErrorCompat}; + + use super::*; + use crate::prelude::*; + + define_opaque_error!(Error); + + #[derive(Debug, Snafu)] + enum InnerError { + #[snafu(display("This is a leaf error, val: {}", val))] + Leaf { val: i32, backtrace: Backtrace }, + + #[snafu(display("This is an internal error"))] + Internal { + source: std::io::Error, + backtrace: Backtrace, + }, + } + + impl ErrorExt for InnerError { + fn status_code(&self) -> StatusCode { + StatusCode::Internal + } + + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } + } + + fn throw_leaf() -> std::result::Result<(), InnerError> { + LeafSnafu { val: 10 }.fail() + } + + fn throw_io() -> std::result::Result<(), std::io::Error> { + Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!")) + } + + fn throw_internal() -> std::result::Result<(), InnerError> { + throw_io().context(InternalSnafu) + } + + #[test] + fn test_inner_error() { + let leaf = throw_leaf().err().unwrap(); + assert!(leaf.backtrace_opt().is_some()); + assert!(leaf.source().is_none()); + + let internal = throw_internal().err().unwrap(); + assert!(internal.backtrace_opt().is_some()); + assert!(internal.source().is_some()); + } + + #[test] + fn test_opaque_error() { + // Test leaf error. + let err: Error = throw_leaf().map_err(Into::into).err().unwrap(); + let msg = format!("{:?}", err); + assert!(msg.contains("\nBacktrace:\n")); + + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!(msg, fmt_msg); + + assert!(ErrorCompat::backtrace(&err).is_some()); + assert!(err.backtrace_opt().is_some()); + assert_eq!("This is a leaf error, val: 10", err.to_string()); + assert_eq!(StatusCode::Internal, err.status_code()); + + err.as_any().downcast_ref::().unwrap(); + + // Test internal error. + let err: Error = throw_internal().map_err(Into::into).err().unwrap(); + let msg = format!("{:?}", err); + assert!(msg.contains("\nBacktrace:\n")); + assert!(msg.contains("Caused by")); + + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!(msg, fmt_msg); + + assert!(ErrorCompat::backtrace(&err).is_some()); + assert!(err.backtrace_opt().is_some()); + assert_eq!("This is an internal error", err.to_string()); + assert_eq!(StatusCode::Internal, err.status_code()); + + err.as_any().downcast_ref::().unwrap(); + } +} diff --git a/src/common/error/src/format.rs b/src/common/error/src/format.rs new file mode 100644 index 0000000000..f4e592dc21 --- /dev/null +++ b/src/common/error/src/format.rs @@ -0,0 +1,109 @@ +use std::fmt; + +use crate::ext::ErrorExt; + +/// Pretty debug format for error, also prints source and backtrace. +pub struct DebugFormat<'a, E: ?Sized>(&'a E); + +impl<'a, E: ErrorExt + ?Sized> DebugFormat<'a, E> { + /// Create a new format struct from `err`. + pub fn new(err: &'a E) -> Self { + Self(err) + } +} + +impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}.", self.0)?; + if let Some(source) = self.0.source() { + // Source error use debug format for more verbose info. + write!(f, " Caused by: {:?}", source)?; + } + if let Some(backtrace) = self.0.backtrace_opt() { + // Add a newline to seperate causes and backtrace. + write!(f, "\nBacktrace:\n{}", backtrace)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::any::Any; + + use snafu::{prelude::*, Backtrace, GenerateImplicitData}; + + use super::*; + + #[derive(Debug, Snafu)] + #[snafu(display("This is a leaf error"))] + struct Leaf; + + impl ErrorExt for Leaf { + fn backtrace_opt(&self) -> Option<&Backtrace> { + None + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("This is a leaf with backtrace"))] + struct LeafWithBacktrace { + backtrace: Backtrace, + } + + impl ErrorExt for LeafWithBacktrace { + fn backtrace_opt(&self) -> Option<&Backtrace> { + Some(&self.backtrace) + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("Internal error"))] + struct Internal { + #[snafu(source)] + source: Leaf, + backtrace: Backtrace, + } + + impl ErrorExt for Internal { + fn backtrace_opt(&self) -> Option<&Backtrace> { + Some(&self.backtrace) + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + #[test] + fn test_debug_format() { + let err = Leaf; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!("This is a leaf error.", msg); + + let err = LeafWithBacktrace { + backtrace: Backtrace::generate(), + }; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert!(msg.starts_with("This is a leaf with backtrace.\nBacktrace:\n")); + + let err = Internal { + source: Leaf, + backtrace: Backtrace::generate(), + }; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert!(msg.contains("Internal error. Caused by: Leaf\nBacktrace:\n")); + } +} diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs new file mode 100644 index 0000000000..50ef09d166 --- /dev/null +++ b/src/common/error/src/lib.rs @@ -0,0 +1,14 @@ +pub mod ext; +pub mod format; +pub mod mock; +pub mod status_code; + +pub mod prelude { + pub use snafu::{prelude::*, Backtrace, ErrorCompat}; + + pub use crate::ext::ErrorExt; + pub use crate::format::DebugFormat; + pub use crate::status_code::StatusCode; +} + +pub use snafu; diff --git a/src/common/error/src/mock.rs b/src/common/error/src/mock.rs new file mode 100644 index 0000000000..f33dcb88ed --- /dev/null +++ b/src/common/error/src/mock.rs @@ -0,0 +1,79 @@ +//! Utils for mock. + +use std::any::Any; +use std::fmt; + +use snafu::GenerateImplicitData; + +use crate::prelude::*; + +/// A mock error mainly for test. +#[derive(Debug)] +pub struct MockError { + pub code: StatusCode, + backtrace: Option, +} + +impl MockError { + /// Create a new [MockError] without backtrace. + pub fn new(code: StatusCode) -> MockError { + MockError { + code, + backtrace: None, + } + } + + /// Create a new [MockError] with backtrace. + pub fn with_backtrace(code: StatusCode) -> MockError { + MockError { + code, + backtrace: Some(Backtrace::generate()), + } + } +} + +impl fmt::Display for MockError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.code) + } +} + +impl std::error::Error for MockError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + +impl ErrorExt for MockError { + fn status_code(&self) -> StatusCode { + self.code + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + self.backtrace.as_ref() + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl ErrorCompat for MockError { + fn backtrace(&self) -> Option<&Backtrace> { + self.backtrace_opt() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mock_error() { + let err = MockError::new(StatusCode::Unknown); + assert!(err.backtrace_opt().is_none()); + + let err = MockError::with_backtrace(StatusCode::Unknown); + assert!(err.backtrace_opt().is_some()); + } +} diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs new file mode 100644 index 0000000000..0f3286e69e --- /dev/null +++ b/src/common/error/src/status_code.rs @@ -0,0 +1,56 @@ +use std::fmt; + +/// Common status code for public API. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum StatusCode { + // ====== Begin of common status code ============== + /// Unknown error. + Unknown, + /// Unsupported operation. + Unsupported, + /// Unexpected error, maybe there is a BUG. + Unexpected, + /// Internal server error. + Internal, + // ====== End of common status code ================ + + // ====== Begin of SQL related status code ========= + /// SQL Syntax error. + InvalidSyntax, + // ====== End of SQL related status code =========== + + // ====== Begin of query related status code ======= + /// Fail to create a plan for the query. + PlanQuery, + /// The query engine fail to execute query. + EngineExecuteQuery, + // ====== End of query related status code ========= + + // ====== Begin of catalog related status code ===== + /// Table already exists. + TableAlreadyExists, + // ====== End of catalog related status code ======= +} + +impl fmt::Display for StatusCode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // The current debug format is suitable to display. + write!(f, "{:?}", self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_status_code_display(code: StatusCode, msg: &str) { + let code_msg = format!("{}", code); + assert_eq!(msg, code_msg); + } + + #[test] + fn test_display_status_code() { + assert_status_code_display(StatusCode::Unknown, "Unknown"); + assert_status_code_display(StatusCode::TableAlreadyExists, "TableAlreadyExists"); + } +} diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index d9e2570a45..da17d5c90d 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -15,7 +15,7 @@ datatypes = {path ="../../datatypes" } futures = "0.3" paste = "1.0" serde = "1.0" -snafu = "0.7" +snafu = { version = "0.7", features = ["backtraces"] } [dev-dependencies.arrow] package = "arrow2" @@ -24,3 +24,4 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dev-dependencies] tokio = { version = "1.18", features = ["full"] } + diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index e5666f485b..0d621a668b 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -1,10 +1,14 @@ use arrow::error::ArrowError; -use snafu::Snafu; +use snafu::{Backtrace, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Arrow error: {}", source))] - Arrow { source: ArrowError }, + Arrow { + source: ArrowError, + backtrace: Backtrace, + }, } + pub type Result = std::result::Result; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index ebf3b8aa02..11bde84dfe 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -8,12 +8,13 @@ edition = "2021" [dependencies] axum = "0.5" axum-macros = "0.2" -common-recordbatch = {path = "../common/recordbatch" } +common-error = { path = "../common/error" } +common-recordbatch = { path = "../common/recordbatch" } hyper = { version = "0.14", features = ["full"] } query = { path = "../query" } serde = "1.0" serde_json = "1.0" -snafu = "0.7" +snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } tower = { version = "0.4", features = ["full"]} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 454db221be..cef5f35745 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -4,7 +4,7 @@ use query::catalog::memory; use query::catalog::CatalogListRef; use snafu::ResultExt; -use crate::error::{QuerySnafu, Result}; +use crate::error::{NewCatalogSnafu, Result}; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; @@ -17,7 +17,7 @@ pub struct DataNode { impl DataNode { pub fn new() -> Result { - let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?; + let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?; let instance = Arc::new(Instance::new(catalog_list.clone())); Ok(Self { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 04cf8ab34c..537d5c75dc 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,15 +1,71 @@ -use hyper::Error as HyperError; -use query::error::Error as QueryError; -use snafu::Snafu; +use std::any::Any; -/// business error of datanode. +use common_error::prelude::*; + +/// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Query error: {}", source))] - Query { source: QueryError }, - #[snafu(display("Http error: {}", source))] - Hyper { source: HyperError }, + #[snafu(display("Fail to execute sql, source: {}", source))] + ExecuteSql { + #[snafu(backtrace)] + source: query::error::Error, + }, + + #[snafu(display("Fail to create catalog list, source: {}", source))] + NewCatalog { + #[snafu(backtrace)] + source: query::error::Error, + }, + + // The error source of http error is clear even without backtrace now so + // a backtrace is not carried in this varaint. + #[snafu(display("Fail to start HTTP server, source: {}", source))] + StartHttp { source: hyper::Error }, } pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), + // TODO(yingwen): Further categorize http error. + Error::StartHttp { .. } => StatusCode::Internal, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use common_error::mock::MockError; + + use super::*; + + fn throw_query_error() -> std::result::Result<(), query::error::Error> { + Err(query::error::Error::new(MockError::with_backtrace( + StatusCode::Internal, + ))) + } + + fn assert_internal_error(err: &Error) { + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::Internal, err.status_code()); + } + + #[test] + fn test_error() { + let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap(); + assert_internal_error(&err); + let err = throw_query_error().context(NewCatalogSnafu).err().unwrap(); + assert_internal_error(&err); + } +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 6f04239c6d..b2ee532977 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -4,7 +4,7 @@ use query::catalog::CatalogListRef; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::ResultExt; -use crate::error::{QuerySnafu, Result}; +use crate::error::{ExecuteSqlSnafu, Result}; // An abstraction to read/write services. pub struct Instance { @@ -27,12 +27,15 @@ impl Instance { } pub async fn execute_sql(&self, sql: &str) -> Result { - let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?; + let logical_plan = self + .query_engine + .sql_to_plan(sql) + .context(ExecuteSqlSnafu)?; self.query_engine .execute(&logical_plan) .await - .context(QuerySnafu) + .context(ExecuteSqlSnafu) } } diff --git a/src/datanode/src/server/http.rs b/src/datanode/src/server/http.rs index a1f22c57ef..2061f05a6c 100644 --- a/src/datanode/src/server/http.rs +++ b/src/datanode/src/server/http.rs @@ -17,7 +17,7 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; -use crate::error::{HyperSnafu, Result}; +use crate::error::{Result, StartHttpSnafu}; use crate::server::InstanceRef; /// Http server @@ -106,7 +106,7 @@ impl HttpServer { let server = axum::Server::bind(&addr).serve(app.into_make_service()); let graceful = server.with_graceful_shutdown(shutdown_signal()); - graceful.await.context(HyperSnafu)?; + graceful.await.context(StartHttpSnafu)?; Ok(()) } diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 131af06ab0..9462ce3753 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -10,12 +10,13 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" +common-error = { path = "../common/error" } common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} datatypes = {path = "../datatypes" } futures = "0.3" futures-util = "0.3.21" -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } tokio = "1.0" sql = { path = "../sql" } diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index 13418ecb67..5d1eb5aa4a 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; +use common_error::prelude::*; use table::table::numbers::NumbersTable; use table::TableRef; @@ -11,7 +12,36 @@ use crate::catalog::{ CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; -use crate::error::{ExecutionSnafu, Result}; +use crate::error::{Error, Result}; + +/// Error implementation of memory catalog. +#[derive(Debug, Snafu)] +pub enum InnerError { + #[snafu(display("Table {} already exists", table))] + TableExists { table: String, backtrace: Backtrace }, +} + +impl ErrorExt for InnerError { + fn status_code(&self) -> StatusCode { + match self { + InnerError::TableExists { .. } => StatusCode::TableAlreadyExists, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} /// Simple in-memory list of catalogs #[derive(Default)] @@ -127,10 +157,7 @@ impl SchemaProvider for MemorySchemaProvider { fn register_table(&self, name: String, table: TableRef) -> Result> { if self.table_exist(name.as_str()) { - return ExecutionSnafu { - message: format!("The table {} already exists", name), - } - .fail(); + return TableExistsSnafu { table: name }.fail()?; } let mut tables = self.tables.write().unwrap(); Ok(tables.insert(name, table)) @@ -196,6 +223,8 @@ mod tests { assert!(provider.table_exist(table_name)); let other_table = NumbersTable::default(); let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); - assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } } diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/datafusion.rs similarity index 85% rename from src/query/src/query_engine/datafusion.rs rename to src/query/src/datafusion.rs index 043585a89d..700a33534e 100644 --- a/src/query/src/query_engine/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -1,4 +1,9 @@ -mod adapter; +//! Planner, QueryEngine implementations based on DataFusion. + +mod catalog_adapter; +mod error; +mod plan_adapter; +mod planner; use std::sync::Arc; @@ -6,18 +11,20 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use snafu::{OptionExt, ResultExt}; use sql::{dialect::GenericDialect, parser::ParserContext}; -use super::{context::QueryContext, state::QueryEngineState}; +pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; +use crate::query_engine::{QueryContext, QueryEngineState}; use crate::{ catalog::CatalogListRef, - error::{self, ParseSqlSnafu, Result}, + datafusion::plan_adapter::PhysicalPlanAdapter, + datafusion::planner::{DfContextProviderAdapter, DfPlanner}, + error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, physical_planner::PhysicalPlanner, plan::{LogicalPlan, PhysicalPlan}, - planner::{DfContextProviderAdapter, DfPlanner, Planner}, - query_engine::datafusion::adapter::PhysicalPlanAdapter, - query_engine::{Output, QueryEngine}, + planner::Planner, + Output, QueryEngine, }; pub(crate) struct DatafusionQueryEngine { @@ -42,9 +49,7 @@ impl QueryEngine for DatafusionQueryEngine { let context_provider = DfContextProviderAdapter::new(self.state.catalog_list()); let planner = DfPlanner::new(&context_provider); let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {}) - .with_context(|_| ParseSqlSnafu { - sql: sql.to_string(), - })?; + .context(error::ParseSqlSnafu)?; // TODO(dennis): supports multi statement in one sql? assert!(1 == statement.len()); planner.statement_to_plan(statement.remove(0)) @@ -70,11 +75,13 @@ impl LogicalOptimizer for DatafusionQueryEngine { ) -> Result { match plan { LogicalPlan::DfPlan(df_plan) => { - let optimized_plan = self - .state - .df_context() - .optimize(df_plan) - .context(error::DatafusionSnafu)?; + let optimized_plan = + self.state + .df_context() + .optimize(df_plan) + .context(error::DatafusionSnafu { + msg: "Fail to optimize logical plan", + })?; Ok(LogicalPlan::DfPlan(optimized_plan)) } @@ -96,7 +103,9 @@ impl PhysicalPlanner for DatafusionQueryEngine { .df_context() .create_physical_plan(df_plan) .await - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to create physical plan", + })?; Ok(Arc::new(PhysicalPlanAdapter::new( Arc::new(physical_plan.schema().into()), @@ -126,7 +135,9 @@ impl PhysicalOptimizer for DatafusionQueryEngine { for optimizer in optimizers { new_plan = optimizer .optimize(new_plan, config) - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to optimize physical plan", + })?; } Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan))) } @@ -172,7 +183,6 @@ mod tests { let plan = engine.sql_to_plan(sql).unwrap(); - println!("{:?}", plan); assert_eq!( format!("{:?}", plan), r#"DfPlan(Limit: 20 diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs new file mode 100644 index 0000000000..e357b28edb --- /dev/null +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -0,0 +1,222 @@ +//! Catalog adapter between datafusion and greptime query engine. + +use std::any::Any; +use std::sync::Arc; + +use datafusion::catalog::{ + catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, + schema::SchemaProvider as DfSchemaProvider, +}; +use datafusion::datasource::TableProvider as DfTableProvider; +use datafusion::error::Result as DataFusionResult; +use datafusion::execution::runtime_env::RuntimeEnv; +use snafu::ResultExt; +use table::{ + table::adapter::{DfTableProviderAdapter, TableAdapter}, + Table, +}; + +use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider}; +use crate::datafusion::error; +use crate::error::Result; + +pub struct DfCatalogListAdapter { + runtime: Arc, + catalog_list: CatalogListRef, +} + +impl DfCatalogListAdapter { + pub fn new(runtime: Arc, catalog_list: CatalogListRef) -> DfCatalogListAdapter { + DfCatalogListAdapter { + runtime, + catalog_list, + } + } +} + +impl DfCatalogList for DfCatalogListAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let catalog_adapter = Arc::new(CatalogProviderAdapter { + df_cataglog_provider: catalog, + runtime: self.runtime.clone(), + }); + self.catalog_list + .register_catalog(name, catalog_adapter) + .map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } + + fn catalog_names(&self) -> Vec { + self.catalog_list.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + self.catalog_list.catalog(name).map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +/// Datafusion's CatalogProvider -> greptime CatalogProvider +struct CatalogProviderAdapter { + df_cataglog_provider: Arc, + runtime: Arc, +} + +impl CatalogProvider for CatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.df_cataglog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.df_cataglog_provider + .schema(name) + .map(|df_schema_provider| { + Arc::new(SchemaProviderAdapter { + df_schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +///Greptime CatalogProvider -> datafusion's CatalogProvider +struct DfCatalogProviderAdapter { + catalog_provider: Arc, + runtime: Arc, +} + +impl DfCatalogProvider for DfCatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.catalog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.catalog_provider.schema(name).map(|schema_provider| { + Arc::new(DfSchemaProviderAdapter { + schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +/// Greptime SchemaProvider -> datafusion SchemaProvider +struct DfSchemaProviderAdapter { + schema_provider: Arc, + runtime: Arc, +} + +impl DfSchemaProvider for DfSchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + self.schema_provider + .table(name) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DataFusionResult>> { + let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); + match self.schema_provider.register_table(name, table)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), + } + } + + fn deregister_table(&self, name: &str) -> DataFusionResult>> { + match self.schema_provider.deregister_table(name)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), + } + } + + fn table_exist(&self, name: &str) -> bool { + self.schema_provider.table_exist(name) + } +} + +/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter +struct SchemaProviderAdapter { + df_schema_provider: Arc, + runtime: Arc, +} + +impl SchemaProvider for SchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + /// Retrieves the list of available table names in this schema. + fn table_names(&self) -> Vec { + self.df_schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + self.df_schema_provider.table(name).map(|table_provider| { + Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ + }) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + Ok(self + .df_schema_provider + .register_table(name, table_provider) + .context(error::DatafusionSnafu { + msg: "Fail to register table to datafusion", + })? + .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) + } + + fn deregister_table(&self, name: &str) -> Result>> { + Ok(self + .df_schema_provider + .deregister_table(name) + .context(error::DatafusionSnafu { + msg: "Fail to deregister table from datafusion", + })? + .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) + } + + fn table_exist(&self, name: &str) -> bool { + self.df_schema_provider.table_exist(name) + } +} diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs new file mode 100644 index 0000000000..7ed2d319f1 --- /dev/null +++ b/src/query/src/datafusion/error.rs @@ -0,0 +1,112 @@ +use std::any::Any; + +use common_error::prelude::*; +use datafusion::error::DataFusionError; + +use crate::error::Error; + +/// Inner error of datafusion based query engine. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("{}: {}", msg, source))] + Datafusion { + msg: &'static str, + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("PhysicalPlan downcast failed"))] + PhysicalPlanDowncast { backtrace: Backtrace }, + + // The sql error already contains the SQL. + #[snafu(display("Cannot parse SQL, source: {}", source))] + ParseSql { + #[snafu(backtrace)] + source: sql::error::Error, + }, + + #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] + PlanSql { + sql: String, + source: DataFusionError, + backtrace: Backtrace, + }, +} + +impl ErrorExt for InnerError { + fn status_code(&self) -> StatusCode { + use InnerError::*; + + match self { + // TODO(yingwen): Further categorize datafusion error. + Datafusion { .. } => StatusCode::EngineExecuteQuery, + // This downcast should not fail in usual case. + PhysicalPlanDowncast { .. } => StatusCode::Unexpected, + ParseSql { source, .. } => source.status_code(), + PlanSql { .. } => StatusCode::PlanQuery, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn throw_df_error() -> Result<(), DataFusionError> { + Err(DataFusionError::NotImplemented("test".to_string())) + } + + fn assert_error(err: &InnerError, code: StatusCode) { + assert_eq!(code, err.status_code()); + assert!(err.backtrace_opt().is_some()); + } + + #[test] + fn test_datafusion_as_source() { + let err = throw_df_error() + .context(DatafusionSnafu { msg: "test df" }) + .err() + .unwrap(); + assert_error(&err, StatusCode::EngineExecuteQuery); + + let err = throw_df_error() + .context(PlanSqlSnafu { sql: "" }) + .err() + .unwrap(); + assert_error(&err, StatusCode::PlanQuery); + + let res: Result<(), InnerError> = PhysicalPlanDowncastSnafu {}.fail(); + let err = res.err().unwrap(); + assert_error(&err, StatusCode::Unexpected); + } + + fn raise_sql_error() -> Result<(), sql::error::Error> { + Err(sql::error::Error::Unsupported { + sql: "".to_string(), + keyword: "".to_string(), + }) + } + + #[test] + fn test_parse_error() { + let err = raise_sql_error().context(ParseSqlSnafu).err().unwrap(); + assert!(err.backtrace_opt().is_none()); + let sql_err = raise_sql_error().err().unwrap(); + assert_eq!(sql_err.status_code(), err.status_code()); + } +} diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/datafusion/plan_adapter.rs similarity index 91% rename from src/query/src/query_engine/datafusion/adapter.rs rename to src/query/src/datafusion/plan_adapter.rs index 5e432ce892..f2e64b68ca 100644 --- a/src/query/src/query_engine/datafusion/adapter.rs +++ b/src/query/src/datafusion/plan_adapter.rs @@ -16,7 +16,8 @@ use datatypes::schema::SchemaRef; use snafu::ResultExt; use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; -use crate::error::{self, Result}; +use crate::datafusion::error; +use crate::error::Result; use crate::executor::Runtime; use crate::plan::{Partitioning, PhysicalPlan}; @@ -74,7 +75,9 @@ impl PhysicalPlan for PhysicalPlanAdapter { let plan = self .plan .with_new_children(df_children) - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to add children to plan", + })?; Ok(Arc::new(PhysicalPlanAdapter::new( self.schema.clone(), plan, @@ -86,11 +89,13 @@ impl PhysicalPlan for PhysicalPlanAdapter { runtime: &Runtime, partition: usize, ) -> Result { - let df_stream = self - .plan - .execute(partition, runtime.into()) - .await - .context(error::DatafusionSnafu)?; + let df_stream = + self.plan + .execute(partition, runtime.into()) + .await + .context(error::DatafusionSnafu { + msg: "Fail to execute physical plan", + })?; Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) } @@ -113,9 +118,6 @@ impl Debug for ExecutionPlanAdapter { } } -unsafe impl Send for ExecutionPlanAdapter {} -unsafe impl Sync for ExecutionPlanAdapter {} - #[async_trait::async_trait] impl ExecutionPlan for ExecutionPlanAdapter { fn as_any(&self) -> &dyn Any { diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs new file mode 100644 index 0000000000..179b1099da --- /dev/null +++ b/src/query/src/datafusion/planner.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion::catalog::TableReference; +use datafusion::datasource::TableProvider; +use datafusion::physical_plan::udaf::AggregateUDF; +use datafusion::physical_plan::udf::ScalarUDF; +use datafusion::sql::planner::{ContextProvider, SqlToRel}; +use snafu::ResultExt; +use sql::statements::query::Query; +use sql::statements::statement::Statement; +use table::table::adapter::DfTableProviderAdapter; + +use crate::{ + catalog::{self, CatalogListRef}, + datafusion::error, + error::Result, + plan::LogicalPlan, + planner::Planner, +}; + +pub struct DfPlanner<'a, S: ContextProvider> { + sql_to_rel: SqlToRel<'a, S>, +} + +impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { + /// Creates a DataFusion planner instance + pub fn new(schema_provider: &'a S) -> Self { + let rel = SqlToRel::new(schema_provider); + Self { sql_to_rel: rel } + } + + /// Converts QUERY statement to logical plan. + pub fn query_to_plan(&self, query: Box) -> Result { + // todo(hl): original SQL should be provided as an argument + let sql = query.inner.to_string(); + let result = self + .sql_to_rel + .query_to_plan(query.inner) + .context(error::PlanSqlSnafu { sql })?; + + Ok(LogicalPlan::DfPlan(result)) + } +} + +impl<'a, S> Planner for DfPlanner<'a, S> +where + S: ContextProvider + Send + Sync, +{ + /// Converts statement to logical plan using datafusion planner + fn statement_to_plan(&self, statement: Statement) -> Result { + match statement { + Statement::ShowDatabases(_) => { + todo!("Currently not supported") + } + Statement::Query(qb) => self.query_to_plan(qb), + Statement::Insert(_) => { + todo!() + } + } + } +} + +pub(crate) struct DfContextProviderAdapter<'a> { + catalog_list: &'a CatalogListRef, +} + +impl<'a> DfContextProviderAdapter<'a> { + pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { + Self { catalog_list } + } +} + +impl<'a> ContextProvider for DfContextProviderAdapter<'a> { + fn get_table_provider(&self, name: TableReference) -> Option> { + let (catalog, schema, table) = match name { + TableReference::Bare { table } => ( + catalog::DEFAULT_CATALOG_NAME, + catalog::DEFAULT_SCHEMA_NAME, + table, + ), + TableReference::Partial { schema, table } => { + (catalog::DEFAULT_CATALOG_NAME, schema, table) + } + TableReference::Full { + catalog, + schema, + table, + } => (catalog, schema, table), + }; + + self.catalog_list + .catalog(catalog) + .and_then(|catalog_provider| catalog_provider.schema(schema)) + .and_then(|schema_provider| schema_provider.table(table)) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn get_function_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + // TODO(dennis) + None + } +} diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 3d240d1ac4..1253da97d2 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,28 +1,6 @@ -use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; -use snafu::Snafu; -use sql::errors::ParserError; -/// business error of query engine -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Datafusion query engine error: {}", source))] - Datafusion { source: DataFusionError }, - #[snafu(display("PhysicalPlan downcast_ref failed"))] - PhysicalPlanDowncast, - #[snafu(display("RecordBatch error: {}", source))] - RecordBatch { source: RecordBatchError }, - #[snafu(display("Execution error: {}", message))] - Execution { message: String }, - #[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))] - ParseSql { sql: String, source: ParserError }, - #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] - Planner { - sql: String, - source: DataFusionError, - }, -} +common_error::define_opaque_error!(Error); pub type Result = std::result::Result; diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index c065b8857e..c77401c603 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -1,5 +1,6 @@ pub mod catalog; pub mod database; +mod datafusion; pub mod error; pub mod executor; pub mod logical_optimizer; diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index ee2645bb3a..0814a03711 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -1,109 +1,8 @@ -use std::sync::Arc; - -use arrow::datatypes::DataType; -use datafusion::catalog::TableReference; -use datafusion::datasource::TableProvider; -use datafusion::physical_plan::udaf::AggregateUDF; -use datafusion::physical_plan::udf::ScalarUDF; -use datafusion::sql::planner::{ContextProvider, SqlToRel}; -use snafu::ResultExt; -use sql::statements::query::Query; use sql::statements::statement::Statement; -use table::table::adapter::DfTableProviderAdapter; -use crate::{ - catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, - error::{PlannerSnafu, Result}, - plan::LogicalPlan, -}; +use crate::{error::Result, plan::LogicalPlan}; +/// SQL logical planner. pub trait Planner: Send + Sync { fn statement_to_plan(&self, statement: Statement) -> Result; } - -pub struct DfPlanner<'a, S: ContextProvider> { - sql_to_rel: SqlToRel<'a, S>, -} - -impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { - /// Creates a DataFusion planner instance - pub fn new(schema_provider: &'a S) -> Self { - let rel = SqlToRel::new(schema_provider); - Self { sql_to_rel: rel } - } - - /// Converts QUERY statement to logical plan. - pub fn query_to_plan(&self, query: Box) -> Result { - // todo(hl): original SQL should be provided as an argument - let sql = query.inner.to_string(); - let result = self - .sql_to_rel - .query_to_plan(query.inner) - .context(PlannerSnafu { sql })?; - - Ok(LogicalPlan::DfPlan(result)) - } -} - -impl<'a, S> Planner for DfPlanner<'a, S> -where - S: ContextProvider + Send + Sync, -{ - /// Converts statement to logical plan using datafusion planner - fn statement_to_plan(&self, statement: Statement) -> Result { - match statement { - Statement::ShowDatabases(_) => { - todo!("Currently not supported") - } - Statement::Query(qb) => self.query_to_plan(qb), - Statement::Insert(_) => { - todo!() - } - } - } -} - -pub(crate) struct DfContextProviderAdapter<'a> { - catalog_list: &'a CatalogListRef, -} - -impl<'a> DfContextProviderAdapter<'a> { - pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { - Self { catalog_list } - } -} - -impl<'a> ContextProvider for DfContextProviderAdapter<'a> { - fn get_table_provider(&self, name: TableReference) -> Option> { - let (catalog, schema, table) = match name { - TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table), - TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table), - TableReference::Full { - catalog, - schema, - table, - } => (catalog, schema, table), - }; - - self.catalog_list - .catalog(catalog) - .and_then(|catalog_provider| catalog_provider.schema(schema)) - .and_then(|schema_provider| schema_provider.table(table)) - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn get_function_meta(&self, _name: &str) -> Option> { - // TODO(dennis) - None - } - - fn get_aggregate_meta(&self, _name: &str) -> Option> { - // TODO(dennis) - None - } - - fn get_variable_type(&self, _variable_names: &[String]) -> Option { - // TODO(dennis) - None - } -} diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index ac274ef563..215794a1fe 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -1,16 +1,16 @@ mod context; -mod datafusion; mod state; use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; -pub use context::QueryContext; use crate::catalog::CatalogList; +use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; use crate::plan::LogicalPlan; -use crate::query_engine::datafusion::DatafusionQueryEngine; +pub use crate::query_engine::context::QueryContext; +pub use crate::query_engine::state::QueryEngineState; /// Sql output pub enum Output { diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index d9f8ca19f9..f7301f6a99 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -1,29 +1,16 @@ -use std::any::Any; use std::fmt; use std::sync::Arc; -use datafusion::catalog::{ - catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, - schema::SchemaProvider as DfSchemaProvider, -}; -use datafusion::datasource::TableProvider as DfTableProvider; -use datafusion::error::Result as DataFusionResult; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; -use snafu::ResultExt; -use table::{ - table::adapter::{DfTableProviderAdapter, TableAdapter}, - Table, -}; -use crate::catalog::{ - schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, -}; -use crate::error::{self, Result}; +use crate::catalog::{self, CatalogListRef}; +use crate::datafusion::DfCatalogListAdapter; use crate::executor::Runtime; /// Query engine global state +// TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it, +// which allows different implementation use different engine state. The state can also be an associated +// type in QueryEngine trait. #[derive(Clone)] pub struct QueryEngineState { df_context: ExecutionContext, @@ -39,14 +26,16 @@ impl fmt::Debug for QueryEngineState { impl QueryEngineState { pub(crate) fn new(catalog_list: CatalogListRef) -> Self { - let config = ExecutionConfig::new() - .with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let config = ExecutionConfig::new().with_default_catalog_and_schema( + catalog::DEFAULT_CATALOG_NAME, + catalog::DEFAULT_SCHEMA_NAME, + ); let df_context = ExecutionContext::with_config(config); - df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { - catalog_list: catalog_list.clone(), - runtime: df_context.runtime_env(), - }); + df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new( + df_context.runtime_env(), + catalog_list.clone(), + )); Self { df_context, @@ -69,194 +58,3 @@ impl QueryEngineState { self.df_context.runtime_env().into() } } - -/// Adapters between datafusion and greptime query engine. -struct DfCatalogListAdapter { - runtime: Arc, - catalog_list: CatalogListRef, -} - -impl DfCatalogList for DfCatalogListAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - name: String, - catalog: Arc, - ) -> Option> { - let catalog_adapter = Arc::new(CatalogProviderAdapter { - df_cataglog_provider: catalog, - runtime: self.runtime.clone(), - }); - self.catalog_list - .register_catalog(name, catalog_adapter) - .map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } - - fn catalog_names(&self) -> Vec { - self.catalog_list.catalog_names() - } - - fn catalog(&self, name: &str) -> Option> { - self.catalog_list.catalog(name).map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -/// Datafusion's CatalogProvider -> greptime CatalogProvider -struct CatalogProviderAdapter { - df_cataglog_provider: Arc, - runtime: Arc, -} - -impl CatalogProvider for CatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.df_cataglog_provider.schema_names() - } - - fn schema(&self, name: &str) -> Option> { - self.df_cataglog_provider - .schema(name) - .map(|df_schema_provider| { - Arc::new(SchemaProviderAdapter { - df_schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -///Greptime CatalogProvider -> datafusion's CatalogProvider -struct DfCatalogProviderAdapter { - catalog_provider: Arc, - runtime: Arc, -} - -impl DfCatalogProvider for DfCatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.catalog_provider.schema_names() - } - - fn schema(&self, name: &str) -> Option> { - self.catalog_provider.schema(name).map(|schema_provider| { - Arc::new(DfSchemaProviderAdapter { - schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -/// Greptime SchemaProvider -> datafusion SchemaProvider -struct DfSchemaProviderAdapter { - schema_provider: Arc, - runtime: Arc, -} - -impl DfSchemaProvider for DfSchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - self.schema_provider.table_names() - } - - fn table(&self, name: &str) -> Option> { - self.schema_provider - .table(name) - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); - match self.schema_provider.register_table(name, table) { - Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - Ok(None) => Ok(None), - Err(e) => Err(e.into()), - } - } - - fn deregister_table(&self, name: &str) -> DataFusionResult>> { - match self.schema_provider.deregister_table(name) { - Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - Ok(None) => Ok(None), - Err(e) => Err(e.into()), - } - } - - fn table_exist(&self, name: &str) -> bool { - self.schema_provider.table_exist(name) - } -} - -/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter -struct SchemaProviderAdapter { - df_schema_provider: Arc, - runtime: Arc, -} - -impl SchemaProvider for SchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Vec { - self.df_schema_provider.table_names() - } - - fn table(&self, name: &str) -> Option> { - self.df_schema_provider.table(name).map(|table_provider| { - Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ - }) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> Result>> { - let table_provider = Arc::new(DfTableProviderAdapter::new(table)); - Ok(self - .df_schema_provider - .register_table(name, table_provider) - .context(error::DatafusionSnafu)? - .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) - } - - fn deregister_table(&self, name: &str) -> Result>> { - Ok(self - .df_schema_provider - .deregister_table(name) - .context(error::DatafusionSnafu)? - .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) - } - - fn table_exist(&self, name: &str) -> bool { - self.df_schema_provider.table_exist(name) - } -} diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index b9b763e398..98365426a0 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -6,5 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -snafu = "0.7.0" +common-error = { path = "../common/error" } +snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs new file mode 100644 index 0000000000..5bf2768d90 --- /dev/null +++ b/src/sql/src/error.rs @@ -0,0 +1,97 @@ +use std::any::Any; + +use common_error::prelude::*; +use sqlparser::parser::ParserError; + +/// SQL parser errors. +// Now the error in parser does not contains backtrace to avoid generating backtrace +// every time the parser parses an invalid SQL. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + #[snafu(display("SQL statement is not supported: {}, keyword: {}", sql, keyword))] + Unsupported { sql: String, keyword: String }, + + #[snafu(display( + "Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}", + sql, + expected, + actual, + source + ))] + Unexpected { + sql: String, + expected: String, + actual: String, + source: ParserError, + }, + + // Syntax error from sql parser. + #[snafu(display("Syntax error, sql: {}, source: {}", sql, source))] + Syntax { sql: String, source: ParserError }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + Unsupported { .. } => StatusCode::Unsupported, + Unexpected { .. } | Syntax { .. } => StatusCode::InvalidSyntax, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + + use super::*; + + fn throw_sp_error() -> Result<(), ParserError> { + Err(ParserError::ParserError("parser error".to_string())) + } + + #[test] + fn test_syntax_error() { + let err = throw_sp_error() + .context(SyntaxSnafu { sql: "" }) + .err() + .unwrap(); + assert_matches!( + err, + Error::Syntax { + sql: _, + source: ParserError::ParserError { .. } + } + ); + assert_eq!(StatusCode::InvalidSyntax, err.status_code()); + + let err = throw_sp_error() + .context(UnexpectedSnafu { + sql: "", + expected: "", + actual: "", + }) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidSyntax, err.status_code()); + } + + #[test] + fn test_unsupported_error() { + let err = Error::Unsupported { + sql: "".to_string(), + keyword: "".to_string(), + }; + assert_eq!(StatusCode::Unsupported, err.status_code()); + } +} diff --git a/src/sql/src/errors.rs b/src/sql/src/errors.rs deleted file mode 100644 index 155e4fd55d..0000000000 --- a/src/sql/src/errors.rs +++ /dev/null @@ -1,52 +0,0 @@ -use snafu::prelude::*; -use sqlparser::parser::ParserError as SpParserError; - -/// SQL parser errors. -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum ParserError { - #[snafu(display("SQL statement is not supported: {sql}, keyword: {keyword}"))] - Unsupported { sql: String, keyword: String }, - - #[snafu(display( - "Unexpected token while parsing SQL statement: {sql}, expected: {expected}, found: {actual}, source: {source}" - ))] - Unexpected { - sql: String, - expected: String, - actual: String, - source: SpParserError, - }, - - #[snafu(display("SQL syntax error: {msg}"))] - SyntaxError { msg: String }, - - #[snafu(display("Unknown inner parser error, sql: {sql}, source: {source}"))] - InnerError { sql: String, source: SpParserError }, -} - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - - use snafu::ResultExt; - - #[test] - pub fn test_error_conversion() { - pub fn raise_error() -> Result<(), sqlparser::parser::ParserError> { - Err(sqlparser::parser::ParserError::ParserError( - "parser error".to_string(), - )) - } - - assert_matches!( - raise_error().context(crate::errors::InnerSnafu { - sql: "".to_string(), - }), - Err(super::ParserError::InnerError { - sql: _, - source: sqlparser::parser::ParserError::ParserError { .. } - }) - ) - } -} diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 2ff323e974..6f681719ee 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -4,7 +4,7 @@ extern crate core; pub mod ast; pub mod dialect; -pub mod errors; +pub mod error; pub mod parser; pub mod parsers; pub mod statements; diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 5bb5f2eaf7..cc3660756d 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -4,12 +4,12 @@ use sqlparser::keywords::Keyword; use sqlparser::parser::Parser; use sqlparser::tokenizer::{Token, Tokenizer}; -use crate::errors; +use crate::error::{self, Error}; use crate::statements::show_database::SqlShowDatabase; use crate::statements::show_kind::ShowKind; use crate::statements::statement::Statement; -pub type Result = std::result::Result; +pub type Result = std::result::Result; /// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser. pub struct ParserContext<'a> { @@ -87,10 +87,11 @@ impl<'a> ParserContext<'a> { /// Raises an "unsupported statement" error. pub fn unsupported(&self, keyword: String) -> Result { - Err(errors::ParserError::Unsupported { - sql: self.sql.to_string(), + error::UnsupportedSnafu { + sql: self.sql, keyword, - }) + } + .fail() } /// Parses SHOW statements @@ -137,7 +138,7 @@ impl<'a> ParserContext<'a> { ShowKind::Like( self.parser .parse_identifier() - .context(errors::UnexpectedSnafu { + .context(error::UnexpectedSnafu { sql: self.sql, expected: "LIKE", actual: tok.to_string(), @@ -146,7 +147,7 @@ impl<'a> ParserContext<'a> { ), ))), Keyword::WHERE => Ok(Statement::ShowDatabases(SqlShowDatabase::new( - ShowKind::Where(self.parser.parse_expr().context(errors::UnexpectedSnafu { + ShowKind::Where(self.parser.parse_expr().context(error::UnexpectedSnafu { sql: self.sql.to_string(), expected: "some valid expression".to_string(), actual: self.peek_token_as_string(), diff --git a/src/sql/src/parsers/insert_parser.rs b/src/sql/src/parsers/insert_parser.rs index 84af1fa78e..a3d14a348c 100644 --- a/src/sql/src/parsers/insert_parser.rs +++ b/src/sql/src/parsers/insert_parser.rs @@ -1,7 +1,7 @@ use snafu::ResultExt; use sqlparser::ast::Statement as SpStatement; -use crate::errors; +use crate::error; use crate::parser::ParserContext; use crate::parser::Result; use crate::statements::insert::Insert; @@ -14,13 +14,13 @@ impl<'a> ParserContext<'a> { let spstatement = self .parser .parse_insert() - .context(errors::InnerSnafu { sql: self.sql })?; + .context(error::SyntaxSnafu { sql: self.sql })?; match spstatement { SpStatement::Insert { .. } => { Ok(Statement::Insert(Box::new(Insert { inner: spstatement }))) } - unexp => errors::UnsupportedSnafu { + unexp => error::UnsupportedSnafu { sql: self.sql.to_string(), keyword: unexp.to_string(), } diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index adcc4ab302..d5b332eeaf 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -1,6 +1,6 @@ use snafu::prelude::*; -use crate::errors; +use crate::error; use crate::parser::ParserContext; use crate::parser::Result; use crate::statements::query::Query; @@ -12,7 +12,7 @@ impl<'a> ParserContext<'a> { let spquery = self .parser .parse_query() - .context(errors::InnerSnafu { sql: self.sql })?; + .context(error::SyntaxSnafu { sql: self.sql })?; Ok(Statement::Query(Box::new(Query::try_from(spquery)?))) } diff --git a/src/sql/src/statements/query.rs b/src/sql/src/statements/query.rs index ec4315b91a..419f2a106a 100644 --- a/src/sql/src/statements/query.rs +++ b/src/sql/src/statements/query.rs @@ -1,6 +1,6 @@ use sqlparser::ast::Query as SpQuery; -use crate::errors::ParserError; +use crate::error::Error; /// Query statement instance. #[derive(Debug, Clone, PartialEq)] @@ -10,7 +10,7 @@ pub struct Query { /// Automatically converts from sqlparser Query instance to SqlQuery. impl TryFrom for Query { - type Error = ParserError; + type Error = Error; fn try_from(q: SpQuery) -> Result { Ok(Query { inner: q }) @@ -18,7 +18,7 @@ impl TryFrom for Query { } impl TryFrom for SpQuery { - type Error = ParserError; + type Error = Error; fn try_from(value: Query) -> Result { Ok(value.inner) diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 505f41b57c..1f20c6a0bb 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -11,6 +11,7 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } +common-error = {path = "../common/error" } common-query = {path = "../common/query" } common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} @@ -18,4 +19,4 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b datatypes = { path = "../datatypes" } futures = "0.3" serde = "1.0.136" -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 13248151ca..1e9a0f4903 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -1,18 +1,74 @@ -use datafusion::error::DataFusionError; -use snafu::Snafu; +use std::any::Any; + +use common_error::prelude::*; +use datafusion::error::DataFusionError; + +common_error::define_opaque_error!(Error); -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Datafusion error: {}", source))] - Datafusion { source: DataFusionError }, - #[snafu(display("Not expected to run ExecutionPlan more than once."))] - ExecuteRepeatedly, -} pub type Result = std::result::Result; impl From for DataFusionError { - fn from(e: Error) -> DataFusionError { + fn from(e: Error) -> Self { + Self::External(Box::new(e)) + } +} + +/// Default error implementation of table. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("Datafusion error: {}", source))] + Datafusion { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Not expected to run ExecutionPlan more than once"))] + ExecuteRepeatedly { backtrace: Backtrace }, +} + +impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} + +impl From for DataFusionError { + fn from(e: InnerError) -> DataFusionError { DataFusionError::External(Box::new(e)) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn throw_df_error() -> Result<()> { + Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)? + } + + fn throw_repeatedly() -> Result<()> { + ExecuteRepeatedlySnafu {}.fail()? + } + + #[test] + fn test_error() { + let err = throw_df_error().err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::Unknown, err.status_code()); + + let err = throw_repeatedly().err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::Unknown, err.status_code()); + } +} diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index ba266a9ae8..e1328b106b 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -16,7 +16,7 @@ use datafusion::datasource::{ datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider, TableType as DfTableType, }; -use datafusion::error::{DataFusionError, Result as DfResult}; +use datafusion::error::Result as DfResult; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::{ @@ -30,8 +30,8 @@ use datatypes::schema::{Schema, SchemaRef}; use futures::Stream; use snafu::prelude::*; -use super::{Table, TableProviderFilterPushDown, TableRef, TableType}; use crate::error::{self, Result}; +use crate::table::{Table, TableProviderFilterPushDown, TableRef, TableType}; /// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan. struct ExecutionPlanAdapter { @@ -90,9 +90,7 @@ impl ExecutionPlan for ExecutionPlanAdapter { let stream = mem::replace(&mut *stream, None); Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap()))) } else { - error::ExecuteRepeatedlySnafu - .fail() - .map_err(|e| DataFusionError::External(Box::new(e))) + error::ExecuteRepeatedlySnafu.fail()? } } @@ -139,28 +137,23 @@ impl TableProvider for DfTableProviderAdapter { ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Expr::new).collect(); - match self.table.scan(projection, &filters, limit).await { - Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter { - schema: stream.schema(), - stream: Mutex::new(Some(stream)), - })), - Err(e) => Err(e.into()), - } + let stream = self.table.scan(projection, &filters, limit).await?; + Ok(Arc::new(ExecutionPlanAdapter { + schema: stream.schema(), + stream: Mutex::new(Some(stream)), + })) } fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { - match self + let p = self .table - .supports_filter_pushdown(&Expr::new(filter.clone())) - { - Ok(p) => match p { - TableProviderFilterPushDown::Unsupported => { - Ok(DfTableProviderFilterPushDown::Unsupported) - } - TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), - TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), - }, - Err(e) => Err(e.into()), + .supports_filter_pushdown(&Expr::new(filter.clone()))?; + match p { + TableProviderFilterPushDown::Unsupported => { + Ok(DfTableProviderFilterPushDown::Unsupported) + } + TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), + TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), } } }