diff --git a/Cargo.lock b/Cargo.lock index 81af3bfe4e..ab314d117d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -223,6 +232,21 @@ dependencies = [ "syn", ] +[[package]] +name = "backtrace" +version = "0.3.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.13.0" @@ -433,6 +457,13 @@ dependencies = [ name = "common-base" version = "0.1.0" +[[package]] +name = "common-error" +version = "0.1.0" +dependencies = [ + "snafu", +] + [[package]] name = "common-query" version = "0.1.0" @@ -818,6 +849,12 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "h2" version = "0.3.13" @@ -1300,6 +1337,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.28.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456" +dependencies = [ + "memchr", +] + [[package]] name = "object-store" version = "0.1.0" @@ -1486,6 +1532,7 @@ version = "0.1.0" dependencies = [ "arrow2", "async-trait", + "common-error", "common-recordbatch", "datafusion", "datatypes", @@ -1578,6 +1625,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustversion" version = "1.0.6" @@ -1684,6 +1737,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eba135d2c579aa65364522eb78590cdf703176ef71ad4c32b00f58f7afb2df5" dependencies = [ + "backtrace", "doc-comment", "snafu-derive", ] @@ -1815,6 +1869,7 @@ dependencies = [ "arrow2", "async-trait", "chrono", + "common-error", "common-query", "common-recordbatch", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index daed9802d6..89d06f082b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "src/common/base", + "src/common/error", "src/common/query", "src/common/recordbatch", "src/cmd", diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml new file mode 100644 index 0000000000..b41e6d1f0d --- /dev/null +++ b/src/common/error/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "common-error" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs new file mode 100644 index 0000000000..be982096a4 --- /dev/null +++ b/src/common/error/src/ext.rs @@ -0,0 +1,123 @@ +use crate::status_code::StatusCode; + +/// Extension to [`Error`](std::error::Error) in std. +pub trait ErrorExt: std::error::Error { + /// Returns the [StatusCode] of this holder. + fn status_code(&self) -> StatusCode { + StatusCode::Unknown + } + + /// Get the reference to the backtrace of this error. + // Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`. + fn backtrace_opt(&self) -> Option<&snafu::Backtrace>; +} + +/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait. +#[macro_export] +macro_rules! define_opaque_error { + ($Error:ident) => { + /// An error behaves like `Box`. + /// + /// Define this error as a new type instead of using `Box` directly so we can implement + /// more method or trait for it. + pub struct $Error { + inner: Box, + } + + impl $Error { + /// Create a new error. + pub fn new(err: E) -> Self { + Self { + inner: Box::new(err), + } + } + } + + impl std::fmt::Debug for $Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let debug_format = $crate::format::DebugFormat::new(&*self.inner); + debug_format.fmt(f) + } + } + + impl std::fmt::Display for $Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } + } + + impl std::error::Error for $Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.inner.source() + } + } + + impl $crate::ext::ErrorExt for $Error { + fn status_code(&self) -> $crate::status_code::StatusCode { + self.inner.status_code() + } + + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + self.inner.backtrace_opt() + } + } + }; +} + +#[cfg(test)] +mod tests { + use std::error::Error as StdError; + + use snafu::{prelude::*, Backtrace}; + + use super::*; + + define_opaque_error!(Error); + + #[derive(Debug, Snafu)] + enum InnerError { + #[snafu(display("This is a leaf error, val: {}", val))] + Leaf { val: i32, backtrace: Backtrace }, + + #[snafu(display("This is an internal error"))] + Internal { + source: std::io::Error, + backtrace: Backtrace, + }, + } + + impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + snafu::ErrorCompat::backtrace(self) + } + } + + impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } + } + + fn throw_leaf() -> std::result::Result<(), InnerError> { + LeafSnafu { val: 10 }.fail() + } + + fn throw_io() -> std::result::Result<(), std::io::Error> { + Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!")) + } + + fn throw_internal() -> std::result::Result<(), InnerError> { + throw_io().context(InternalSnafu) + } + + #[test] + fn test_opaque_error() { + let leaf = throw_leaf().err().unwrap(); + assert!(leaf.backtrace_opt().is_some()); + assert!(leaf.source().is_none()); + + let internal = throw_internal().err().unwrap(); + assert!(internal.backtrace_opt().is_some()); + assert!(internal.source().is_some()); + } +} diff --git a/src/common/error/src/format.rs b/src/common/error/src/format.rs new file mode 100644 index 0000000000..b994e0aa8b --- /dev/null +++ b/src/common/error/src/format.rs @@ -0,0 +1,95 @@ +use std::fmt; + +use crate::ext::ErrorExt; + +/// Pretty debug format for error, also prints source and backtrace. +pub struct DebugFormat<'a, E: ?Sized>(&'a E); + +impl<'a, E: ErrorExt + ?Sized> DebugFormat<'a, E> { + /// Create a new format struct from `err`. + pub fn new(err: &'a E) -> Self { + Self(err) + } +} + +impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}.", self.0)?; + if let Some(source) = self.0.source() { + // Source error use debug format for more verbose info. + write!(f, " Caused by: {:?}", source)?; + } + if let Some(backtrace) = self.0.backtrace_opt() { + // Add a newline to seperate causes and backtrace. + write!(f, "\nBacktrace:\n{}", backtrace)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use snafu::{prelude::*, Backtrace, GenerateImplicitData}; + + use super::*; + + #[derive(Debug, Snafu)] + #[snafu(display("This is a leaf error"))] + struct Leaf; + + impl ErrorExt for Leaf { + fn backtrace_opt(&self) -> Option<&Backtrace> { + None + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("This is a leaf with backtrace"))] + struct LeafWithBacktrace { + backtrace: Backtrace, + } + + impl ErrorExt for LeafWithBacktrace { + fn backtrace_opt(&self) -> Option<&Backtrace> { + Some(&self.backtrace) + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("Internal error"))] + struct Internal { + #[snafu(source)] + source: Leaf, + backtrace: Backtrace, + } + + impl ErrorExt for Internal { + fn backtrace_opt(&self) -> Option<&Backtrace> { + Some(&self.backtrace) + } + } + + #[test] + fn test_debug_format() { + let err = Leaf; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!("This is a leaf error.", msg); + + let err = LeafWithBacktrace { + backtrace: Backtrace::generate(), + }; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert!(msg.starts_with("This is a leaf with backtrace.\nBacktrace:\n")); + + let err = Internal { + source: Leaf, + backtrace: Backtrace::generate(), + }; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert!(msg.contains("Internal error. Caused by: Leaf\nBacktrace:\n")); + } +} diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs new file mode 100644 index 0000000000..98f017a44f --- /dev/null +++ b/src/common/error/src/lib.rs @@ -0,0 +1,3 @@ +pub mod ext; +pub mod format; +pub mod status_code; diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs new file mode 100644 index 0000000000..096d88ae4b --- /dev/null +++ b/src/common/error/src/status_code.rs @@ -0,0 +1,6 @@ +/// Common status code for public API. +#[derive(Debug, Clone, Copy)] +pub enum StatusCode { + /// Unknown status. + Unknown, +} diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index d9e2570a45..da17d5c90d 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -15,7 +15,7 @@ datatypes = {path ="../../datatypes" } futures = "0.3" paste = "1.0" serde = "1.0" -snafu = "0.7" +snafu = { version = "0.7", features = ["backtraces"] } [dev-dependencies.arrow] package = "arrow2" @@ -24,3 +24,4 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dev-dependencies] tokio = { version = "1.18", features = ["full"] } + diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index e5666f485b..0d621a668b 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -1,10 +1,14 @@ use arrow::error::ArrowError; -use snafu::Snafu; +use snafu::{Backtrace, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Arrow error: {}", source))] - Arrow { source: ArrowError }, + Arrow { + source: ArrowError, + backtrace: Backtrace, + }, } + pub type Result = std::result::Result; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index ebf3b8aa02..a6aa9a8fa6 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] } query = { path = "../query" } serde = "1.0" serde_json = "1.0" -snafu = "0.7" +snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } tower = { version = "0.4", features = ["full"]} @@ -23,3 +23,4 @@ tower-http = { version ="0.3", features = ["full"]} package = "arrow2" version="0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] + diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 131af06ab0..9462ce3753 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -10,12 +10,13 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" +common-error = { path = "../common/error" } common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} datatypes = {path = "../datatypes" } futures = "0.3" futures-util = "0.3.21" -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } tokio = "1.0" sql = { path = "../sql" } diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index 13418ecb67..6c4aa8be41 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -11,7 +11,8 @@ use crate::catalog::{ CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; -use crate::error::{ExecutionSnafu, Result}; +use crate::error::Result; +use crate::query_engine::datafusion::error; /// Simple in-memory list of catalogs #[derive(Default)] @@ -127,10 +128,11 @@ impl SchemaProvider for MemorySchemaProvider { fn register_table(&self, name: String, table: TableRef) -> Result> { if self.table_exist(name.as_str()) { - return ExecutionSnafu { + // FIXME(yingwen): Define another error. + return error::ExecutionSnafu { message: format!("The table {} already exists", name), } - .fail(); + .fail()?; } let mut tables = self.tables.write().unwrap(); Ok(tables.insert(name, table)) diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 3d240d1ac4..1253da97d2 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,28 +1,6 @@ -use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; -use snafu::Snafu; -use sql::errors::ParserError; -/// business error of query engine -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Datafusion query engine error: {}", source))] - Datafusion { source: DataFusionError }, - #[snafu(display("PhysicalPlan downcast_ref failed"))] - PhysicalPlanDowncast, - #[snafu(display("RecordBatch error: {}", source))] - RecordBatch { source: RecordBatchError }, - #[snafu(display("Execution error: {}", message))] - Execution { message: String }, - #[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))] - ParseSql { sql: String, source: ParserError }, - #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] - Planner { - sql: String, - source: DataFusionError, - }, -} +common_error::define_opaque_error!(Error); pub type Result = std::result::Result; diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index ee2645bb3a..cff92dfd1e 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -13,8 +13,9 @@ use table::table::adapter::DfTableProviderAdapter; use crate::{ catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, - error::{PlannerSnafu, Result}, + error::Result, plan::LogicalPlan, + query_engine::datafusion::error, }; pub trait Planner: Send + Sync { @@ -39,7 +40,8 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { let result = self .sql_to_rel .query_to_plan(query.inner) - .context(PlannerSnafu { sql })?; + // FIXME(yingwen): Move DfPlanner to datafusion mod. + .context(error::PlannerSnafu { sql })?; Ok(LogicalPlan::DfPlan(result)) } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index ac274ef563..4e073a2e2b 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -1,5 +1,5 @@ mod context; -mod datafusion; +pub mod datafusion; mod state; use std::sync::Arc; diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/query_engine/datafusion.rs index 043585a89d..82a15c3568 100644 --- a/src/query/src/query_engine/datafusion.rs +++ b/src/query/src/query_engine/datafusion.rs @@ -1,4 +1,5 @@ mod adapter; +pub mod error; use std::sync::Arc; @@ -9,7 +10,7 @@ use sql::{dialect::GenericDialect, parser::ParserContext}; use super::{context::QueryContext, state::QueryEngineState}; use crate::{ catalog::CatalogListRef, - error::{self, ParseSqlSnafu, Result}, + error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, @@ -17,7 +18,7 @@ use crate::{ plan::{LogicalPlan, PhysicalPlan}, planner::{DfContextProviderAdapter, DfPlanner, Planner}, query_engine::datafusion::adapter::PhysicalPlanAdapter, - query_engine::{Output, QueryEngine}, + Output, QueryEngine, }; pub(crate) struct DatafusionQueryEngine { @@ -42,9 +43,7 @@ impl QueryEngine for DatafusionQueryEngine { let context_provider = DfContextProviderAdapter::new(self.state.catalog_list()); let planner = DfPlanner::new(&context_provider); let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {}) - .with_context(|_| ParseSqlSnafu { - sql: sql.to_string(), - })?; + .context(error::ParseSqlSnafu)?; // TODO(dennis): supports multi statement in one sql? assert!(1 == statement.len()); planner.statement_to_plan(statement.remove(0)) @@ -70,11 +69,13 @@ impl LogicalOptimizer for DatafusionQueryEngine { ) -> Result { match plan { LogicalPlan::DfPlan(df_plan) => { - let optimized_plan = self - .state - .df_context() - .optimize(df_plan) - .context(error::DatafusionSnafu)?; + let optimized_plan = + self.state + .df_context() + .optimize(df_plan) + .context(error::DatafusionSnafu { + msg: "Fail to optimize logical plan", + })?; Ok(LogicalPlan::DfPlan(optimized_plan)) } @@ -96,7 +97,9 @@ impl PhysicalPlanner for DatafusionQueryEngine { .df_context() .create_physical_plan(df_plan) .await - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to create physical plan", + })?; Ok(Arc::new(PhysicalPlanAdapter::new( Arc::new(physical_plan.schema().into()), @@ -126,7 +129,9 @@ impl PhysicalOptimizer for DatafusionQueryEngine { for optimizer in optimizers { new_plan = optimizer .optimize(new_plan, config) - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to optimize physical plan", + })?; } Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan))) } diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/query_engine/datafusion/adapter.rs index 5e432ce892..f6c10c9ba1 100644 --- a/src/query/src/query_engine/datafusion/adapter.rs +++ b/src/query/src/query_engine/datafusion/adapter.rs @@ -16,9 +16,10 @@ use datatypes::schema::SchemaRef; use snafu::ResultExt; use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; -use crate::error::{self, Result}; +use crate::error::Result; use crate::executor::Runtime; use crate::plan::{Partitioning, PhysicalPlan}; +use crate::query_engine::datafusion::error; /// Datafusion ExecutionPlan -> greptime PhysicalPlan pub struct PhysicalPlanAdapter { @@ -74,7 +75,9 @@ impl PhysicalPlan for PhysicalPlanAdapter { let plan = self .plan .with_new_children(df_children) - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to add children to plan", + })?; Ok(Arc::new(PhysicalPlanAdapter::new( self.schema.clone(), plan, @@ -86,11 +89,13 @@ impl PhysicalPlan for PhysicalPlanAdapter { runtime: &Runtime, partition: usize, ) -> Result { - let df_stream = self - .plan - .execute(partition, runtime.into()) - .await - .context(error::DatafusionSnafu)?; + let df_stream = + self.plan + .execute(partition, runtime.into()) + .await + .context(error::DatafusionSnafu { + msg: "Fail to execute physical plan", + })?; Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) } diff --git a/src/query/src/query_engine/datafusion/error.rs b/src/query/src/query_engine/datafusion/error.rs new file mode 100644 index 0000000000..50d6e96e0d --- /dev/null +++ b/src/query/src/query_engine/datafusion/error.rs @@ -0,0 +1,46 @@ +use common_error::ext::ErrorExt; +use datafusion::error::DataFusionError; +use snafu::{Backtrace, ErrorCompat, Snafu}; + +use crate::error::Error; + +/// Inner error of datafusion based query engine. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("{}: {}", msg, source))] + Datafusion { + msg: &'static str, + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("PhysicalPlan downcast failed"))] + PhysicalPlanDowncast { backtrace: Backtrace }, + + // The sql error already contains the SQL. + #[snafu(display("Cannot parse SQL, source: {}", source))] + ParseSql { source: sql::errors::ParserError }, + + #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] + Planner { + sql: String, + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Execution error: {}", message))] + Execution { message: String }, +} + +impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + ErrorCompat::backtrace(self) + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index d9f8ca19f9..1341b18318 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -16,12 +16,10 @@ use table::{ Table, }; -use crate::catalog::{ - schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, -}; -use crate::error::{self, Result}; +use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider}; +use crate::error::Result; use crate::executor::Runtime; +use crate::query_engine::datafusion::error; /// Query engine global state #[derive(Clone)] @@ -39,8 +37,10 @@ impl fmt::Debug for QueryEngineState { impl QueryEngineState { pub(crate) fn new(catalog_list: CatalogListRef) -> Self { - let config = ExecutionConfig::new() - .with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let config = ExecutionConfig::new().with_default_catalog_and_schema( + catalog::DEFAULT_CATALOG_NAME, + catalog::DEFAULT_SCHEMA_NAME, + ); let df_context = ExecutionContext::with_config(config); df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { @@ -193,18 +193,16 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { table: Arc, ) -> DataFusionResult>> { let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); - match self.schema_provider.register_table(name, table) { - Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - Ok(None) => Ok(None), - Err(e) => Err(e.into()), + match self.schema_provider.register_table(name, table)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), } } fn deregister_table(&self, name: &str) -> DataFusionResult>> { - match self.schema_provider.deregister_table(name) { - Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - Ok(None) => Ok(None), - Err(e) => Err(e.into()), + match self.schema_provider.deregister_table(name)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), } } @@ -244,7 +242,9 @@ impl SchemaProvider for SchemaProviderAdapter { Ok(self .df_schema_provider .register_table(name, table_provider) - .context(error::DatafusionSnafu)? + .context(error::DatafusionSnafu { + msg: "Fail to register table to datafusion", + })? .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) } @@ -252,7 +252,9 @@ impl SchemaProvider for SchemaProviderAdapter { Ok(self .df_schema_provider .deregister_table(name) - .context(error::DatafusionSnafu)? + .context(error::DatafusionSnafu { + msg: "Fail to deregister table from datafusion", + })? .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) } diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index b9b763e398..a3c34f5e47 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" diff --git a/src/sql/src/errors.rs b/src/sql/src/errors.rs index 155e4fd55d..d483dc2877 100644 --- a/src/sql/src/errors.rs +++ b/src/sql/src/errors.rs @@ -2,14 +2,20 @@ use snafu::prelude::*; use sqlparser::parser::ParserError as SpParserError; /// SQL parser errors. +// Now the error in parser does not contains backtrace to avoid generating backtrace +// every time the parser parses an invalid SQL. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum ParserError { - #[snafu(display("SQL statement is not supported: {sql}, keyword: {keyword}"))] + #[snafu(display("SQL statement is not supported: {}, keyword: {}", sql, keyword))] Unsupported { sql: String, keyword: String }, #[snafu(display( - "Unexpected token while parsing SQL statement: {sql}, expected: {expected}, found: {actual}, source: {source}" + "Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}", + sql, + expected, + actual, + source ))] Unexpected { sql: String, @@ -18,34 +24,30 @@ pub enum ParserError { source: SpParserError, }, - #[snafu(display("SQL syntax error: {msg}"))] - SyntaxError { msg: String }, - - #[snafu(display("Unknown inner parser error, sql: {sql}, source: {source}"))] - InnerError { sql: String, source: SpParserError }, + // Syntax error from sql parser. + #[snafu(display("Syntax error, sql: {}, source: {}", sql, source))] + SpSyntax { sql: String, source: SpParserError }, } #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use snafu::ResultExt; + use super::*; #[test] pub fn test_error_conversion() { - pub fn raise_error() -> Result<(), sqlparser::parser::ParserError> { - Err(sqlparser::parser::ParserError::ParserError( - "parser error".to_string(), - )) + pub fn raise_error() -> Result<(), SpParserError> { + Err(SpParserError::ParserError("parser error".to_string())) } assert_matches!( - raise_error().context(crate::errors::InnerSnafu { + raise_error().context(SpSyntaxSnafu { sql: "".to_string(), }), - Err(super::ParserError::InnerError { + Err(ParserError::SpSyntax { sql: _, - source: sqlparser::parser::ParserError::ParserError { .. } + source: SpParserError::ParserError { .. } }) ) } diff --git a/src/sql/src/parsers/insert_parser.rs b/src/sql/src/parsers/insert_parser.rs index 84af1fa78e..a1699e8b9a 100644 --- a/src/sql/src/parsers/insert_parser.rs +++ b/src/sql/src/parsers/insert_parser.rs @@ -14,7 +14,7 @@ impl<'a> ParserContext<'a> { let spstatement = self .parser .parse_insert() - .context(errors::InnerSnafu { sql: self.sql })?; + .context(errors::SpSyntaxSnafu { sql: self.sql })?; match spstatement { SpStatement::Insert { .. } => { diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index adcc4ab302..35975f2393 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -12,7 +12,7 @@ impl<'a> ParserContext<'a> { let spquery = self .parser .parse_query() - .context(errors::InnerSnafu { sql: self.sql })?; + .context(errors::SpSyntaxSnafu { sql: self.sql })?; Ok(Statement::Query(Box::new(Query::try_from(spquery)?))) } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 505f41b57c..1f20c6a0bb 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -11,6 +11,7 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } +common-error = {path = "../common/error" } common-query = {path = "../common/query" } common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} @@ -18,4 +19,4 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b datatypes = { path = "../datatypes" } futures = "0.3" serde = "1.0.136" -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 13248151ca..8bece2377b 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -1,18 +1,45 @@ +use common_error::ext::ErrorExt; use datafusion::error::DataFusionError; -use snafu::Snafu; +use snafu::{Backtrace, ErrorCompat, Snafu}; + +common_error::define_opaque_error!(Error); -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Datafusion error: {}", source))] - Datafusion { source: DataFusionError }, - #[snafu(display("Not expected to run ExecutionPlan more than once."))] - ExecuteRepeatedly, -} pub type Result = std::result::Result; impl From for DataFusionError { - fn from(e: Error) -> DataFusionError { + fn from(e: Error) -> Self { + Self::External(Box::new(e)) + } +} + +/// Default error implementation of table. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("Datafusion error: {}", source))] + Datafusion { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Not expected to run ExecutionPlan more than once"))] + ExecuteRepeatedly { backtrace: Backtrace }, +} + +impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + ErrorCompat::backtrace(self) + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} + +impl From for DataFusionError { + fn from(e: InnerError) -> DataFusionError { DataFusionError::External(Box::new(e)) } } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index ba266a9ae8..c6746435f8 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -16,7 +16,7 @@ use datafusion::datasource::{ datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider, TableType as DfTableType, }; -use datafusion::error::{DataFusionError, Result as DfResult}; +use datafusion::error::Result as DfResult; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::{ @@ -90,9 +90,7 @@ impl ExecutionPlan for ExecutionPlanAdapter { let stream = mem::replace(&mut *stream, None); Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap()))) } else { - error::ExecuteRepeatedlySnafu - .fail() - .map_err(|e| DataFusionError::External(Box::new(e))) + error::ExecuteRepeatedlySnafu.fail()? } } @@ -139,28 +137,23 @@ impl TableProvider for DfTableProviderAdapter { ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Expr::new).collect(); - match self.table.scan(projection, &filters, limit).await { - Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter { - schema: stream.schema(), - stream: Mutex::new(Some(stream)), - })), - Err(e) => Err(e.into()), - } + let stream = self.table.scan(projection, &filters, limit).await?; + Ok(Arc::new(ExecutionPlanAdapter { + schema: stream.schema(), + stream: Mutex::new(Some(stream)), + })) } fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { - match self + let p = self .table - .supports_filter_pushdown(&Expr::new(filter.clone())) - { - Ok(p) => match p { - TableProviderFilterPushDown::Unsupported => { - Ok(DfTableProviderFilterPushDown::Unsupported) - } - TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), - TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), - }, - Err(e) => Err(e.into()), + .supports_filter_pushdown(&Expr::new(filter.clone()))?; + match p { + TableProviderFilterPushDown::Unsupported => { + Ok(DfTableProviderFilterPushDown::Unsupported) + } + TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), + TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), } } }