From aa1a6b22056cfe0d446248041350c23303be7985 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 28 Apr 2022 17:45:34 +0800 Subject: [PATCH 01/14] feat: Add common-error crate and implement opaque error type. --- Cargo.lock | 55 ++++++++ Cargo.toml | 1 + src/common/error/Cargo.toml | 9 ++ src/common/error/src/ext.rs | 123 ++++++++++++++++++ src/common/error/src/format.rs | 95 ++++++++++++++ src/common/error/src/lib.rs | 3 + src/common/error/src/status_code.rs | 6 + src/common/recordbatch/Cargo.toml | 3 +- src/common/recordbatch/src/error.rs | 8 +- src/datanode/Cargo.toml | 3 +- src/query/Cargo.toml | 3 +- src/query/src/catalog/memory.rs | 8 +- src/query/src/error.rs | 24 +--- src/query/src/planner.rs | 6 +- src/query/src/query_engine.rs | 2 +- src/query/src/query_engine/datafusion.rs | 29 +++-- .../src/query_engine/datafusion/adapter.rs | 19 ++- .../src/query_engine/datafusion/error.rs | 46 +++++++ src/query/src/query_engine/state.rs | 36 ++--- src/sql/Cargo.toml | 2 +- src/sql/src/errors.rs | 32 ++--- src/sql/src/parsers/insert_parser.rs | 2 +- src/sql/src/parsers/query_parser.rs | 2 +- src/table/Cargo.toml | 3 +- src/table/src/error.rs | 47 +++++-- src/table/src/table/adapter.rs | 37 +++--- 26 files changed, 483 insertions(+), 121 deletions(-) create mode 100644 src/common/error/Cargo.toml create mode 100644 src/common/error/src/ext.rs create mode 100644 src/common/error/src/format.rs create mode 100644 src/common/error/src/lib.rs create mode 100644 src/common/error/src/status_code.rs create mode 100644 src/query/src/query_engine/datafusion/error.rs diff --git a/Cargo.lock b/Cargo.lock index 81af3bfe4e..ab314d117d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + [[package]] name = "adler" version = "1.0.2" @@ -223,6 +232,21 @@ dependencies = [ "syn", ] +[[package]] +name = "backtrace" +version = "0.3.65" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.13.0" @@ -433,6 +457,13 @@ dependencies = [ name = "common-base" version = "0.1.0" +[[package]] +name = "common-error" +version = "0.1.0" +dependencies = [ + "snafu", +] + [[package]] name = "common-query" version = "0.1.0" @@ -818,6 +849,12 @@ dependencies = [ "wasi 0.10.2+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "h2" version = "0.3.13" @@ -1300,6 +1337,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.28.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456" +dependencies = [ + "memchr", +] + [[package]] name = "object-store" version = "0.1.0" @@ -1486,6 +1532,7 @@ version = "0.1.0" dependencies = [ "arrow2", "async-trait", + "common-error", "common-recordbatch", "datafusion", "datatypes", @@ -1578,6 +1625,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustversion" version = "1.0.6" @@ -1684,6 +1737,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eba135d2c579aa65364522eb78590cdf703176ef71ad4c32b00f58f7afb2df5" dependencies = [ + "backtrace", "doc-comment", "snafu-derive", ] @@ -1815,6 +1869,7 @@ dependencies = [ "arrow2", "async-trait", "chrono", + "common-error", "common-query", "common-recordbatch", "datafusion", diff --git a/Cargo.toml b/Cargo.toml index daed9802d6..89d06f082b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "src/common/base", + "src/common/error", "src/common/query", "src/common/recordbatch", "src/cmd", diff --git a/src/common/error/Cargo.toml b/src/common/error/Cargo.toml new file mode 100644 index 0000000000..b41e6d1f0d --- /dev/null +++ b/src/common/error/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "common-error" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs new file mode 100644 index 0000000000..be982096a4 --- /dev/null +++ b/src/common/error/src/ext.rs @@ -0,0 +1,123 @@ +use crate::status_code::StatusCode; + +/// Extension to [`Error`](std::error::Error) in std. +pub trait ErrorExt: std::error::Error { + /// Returns the [StatusCode] of this holder. + fn status_code(&self) -> StatusCode { + StatusCode::Unknown + } + + /// Get the reference to the backtrace of this error. + // Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`. + fn backtrace_opt(&self) -> Option<&snafu::Backtrace>; +} + +/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait. +#[macro_export] +macro_rules! define_opaque_error { + ($Error:ident) => { + /// An error behaves like `Box`. + /// + /// Define this error as a new type instead of using `Box` directly so we can implement + /// more method or trait for it. + pub struct $Error { + inner: Box, + } + + impl $Error { + /// Create a new error. + pub fn new(err: E) -> Self { + Self { + inner: Box::new(err), + } + } + } + + impl std::fmt::Debug for $Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let debug_format = $crate::format::DebugFormat::new(&*self.inner); + debug_format.fmt(f) + } + } + + impl std::fmt::Display for $Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } + } + + impl std::error::Error for $Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.inner.source() + } + } + + impl $crate::ext::ErrorExt for $Error { + fn status_code(&self) -> $crate::status_code::StatusCode { + self.inner.status_code() + } + + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + self.inner.backtrace_opt() + } + } + }; +} + +#[cfg(test)] +mod tests { + use std::error::Error as StdError; + + use snafu::{prelude::*, Backtrace}; + + use super::*; + + define_opaque_error!(Error); + + #[derive(Debug, Snafu)] + enum InnerError { + #[snafu(display("This is a leaf error, val: {}", val))] + Leaf { val: i32, backtrace: Backtrace }, + + #[snafu(display("This is an internal error"))] + Internal { + source: std::io::Error, + backtrace: Backtrace, + }, + } + + impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + snafu::ErrorCompat::backtrace(self) + } + } + + impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } + } + + fn throw_leaf() -> std::result::Result<(), InnerError> { + LeafSnafu { val: 10 }.fail() + } + + fn throw_io() -> std::result::Result<(), std::io::Error> { + Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!")) + } + + fn throw_internal() -> std::result::Result<(), InnerError> { + throw_io().context(InternalSnafu) + } + + #[test] + fn test_opaque_error() { + let leaf = throw_leaf().err().unwrap(); + assert!(leaf.backtrace_opt().is_some()); + assert!(leaf.source().is_none()); + + let internal = throw_internal().err().unwrap(); + assert!(internal.backtrace_opt().is_some()); + assert!(internal.source().is_some()); + } +} diff --git a/src/common/error/src/format.rs b/src/common/error/src/format.rs new file mode 100644 index 0000000000..b994e0aa8b --- /dev/null +++ b/src/common/error/src/format.rs @@ -0,0 +1,95 @@ +use std::fmt; + +use crate::ext::ErrorExt; + +/// Pretty debug format for error, also prints source and backtrace. +pub struct DebugFormat<'a, E: ?Sized>(&'a E); + +impl<'a, E: ErrorExt + ?Sized> DebugFormat<'a, E> { + /// Create a new format struct from `err`. + pub fn new(err: &'a E) -> Self { + Self(err) + } +} + +impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}.", self.0)?; + if let Some(source) = self.0.source() { + // Source error use debug format for more verbose info. + write!(f, " Caused by: {:?}", source)?; + } + if let Some(backtrace) = self.0.backtrace_opt() { + // Add a newline to seperate causes and backtrace. + write!(f, "\nBacktrace:\n{}", backtrace)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use snafu::{prelude::*, Backtrace, GenerateImplicitData}; + + use super::*; + + #[derive(Debug, Snafu)] + #[snafu(display("This is a leaf error"))] + struct Leaf; + + impl ErrorExt for Leaf { + fn backtrace_opt(&self) -> Option<&Backtrace> { + None + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("This is a leaf with backtrace"))] + struct LeafWithBacktrace { + backtrace: Backtrace, + } + + impl ErrorExt for LeafWithBacktrace { + fn backtrace_opt(&self) -> Option<&Backtrace> { + Some(&self.backtrace) + } + } + + #[derive(Debug, Snafu)] + #[snafu(display("Internal error"))] + struct Internal { + #[snafu(source)] + source: Leaf, + backtrace: Backtrace, + } + + impl ErrorExt for Internal { + fn backtrace_opt(&self) -> Option<&Backtrace> { + Some(&self.backtrace) + } + } + + #[test] + fn test_debug_format() { + let err = Leaf; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!("This is a leaf error.", msg); + + let err = LeafWithBacktrace { + backtrace: Backtrace::generate(), + }; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert!(msg.starts_with("This is a leaf with backtrace.\nBacktrace:\n")); + + let err = Internal { + source: Leaf, + backtrace: Backtrace::generate(), + }; + + let msg = format!("{:?}", DebugFormat::new(&err)); + assert!(msg.contains("Internal error. Caused by: Leaf\nBacktrace:\n")); + } +} diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs new file mode 100644 index 0000000000..98f017a44f --- /dev/null +++ b/src/common/error/src/lib.rs @@ -0,0 +1,3 @@ +pub mod ext; +pub mod format; +pub mod status_code; diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs new file mode 100644 index 0000000000..096d88ae4b --- /dev/null +++ b/src/common/error/src/status_code.rs @@ -0,0 +1,6 @@ +/// Common status code for public API. +#[derive(Debug, Clone, Copy)] +pub enum StatusCode { + /// Unknown status. + Unknown, +} diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index d9e2570a45..da17d5c90d 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -15,7 +15,7 @@ datatypes = {path ="../../datatypes" } futures = "0.3" paste = "1.0" serde = "1.0" -snafu = "0.7" +snafu = { version = "0.7", features = ["backtraces"] } [dev-dependencies.arrow] package = "arrow2" @@ -24,3 +24,4 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dev-dependencies] tokio = { version = "1.18", features = ["full"] } + diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index e5666f485b..0d621a668b 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -1,10 +1,14 @@ use arrow::error::ArrowError; -use snafu::Snafu; +use snafu::{Backtrace, Snafu}; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Arrow error: {}", source))] - Arrow { source: ArrowError }, + Arrow { + source: ArrowError, + backtrace: Backtrace, + }, } + pub type Result = std::result::Result; diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index ebf3b8aa02..a6aa9a8fa6 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] } query = { path = "../query" } serde = "1.0" serde_json = "1.0" -snafu = "0.7" +snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } tokio = { version = "1.18", features = ["full"] } tower = { version = "0.4", features = ["full"]} @@ -23,3 +23,4 @@ tower-http = { version ="0.3", features = ["full"]} package = "arrow2" version="0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] + diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 131af06ab0..9462ce3753 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -10,12 +10,13 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" +common-error = { path = "../common/error" } common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} datatypes = {path = "../datatypes" } futures = "0.3" futures-util = "0.3.21" -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } table = { path = "../table" } tokio = "1.0" sql = { path = "../sql" } diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index 13418ecb67..6c4aa8be41 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -11,7 +11,8 @@ use crate::catalog::{ CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; -use crate::error::{ExecutionSnafu, Result}; +use crate::error::Result; +use crate::query_engine::datafusion::error; /// Simple in-memory list of catalogs #[derive(Default)] @@ -127,10 +128,11 @@ impl SchemaProvider for MemorySchemaProvider { fn register_table(&self, name: String, table: TableRef) -> Result> { if self.table_exist(name.as_str()) { - return ExecutionSnafu { + // FIXME(yingwen): Define another error. + return error::ExecutionSnafu { message: format!("The table {} already exists", name), } - .fail(); + .fail()?; } let mut tables = self.tables.write().unwrap(); Ok(tables.insert(name, table)) diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 3d240d1ac4..1253da97d2 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,28 +1,6 @@ -use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; -use snafu::Snafu; -use sql::errors::ParserError; -/// business error of query engine -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Datafusion query engine error: {}", source))] - Datafusion { source: DataFusionError }, - #[snafu(display("PhysicalPlan downcast_ref failed"))] - PhysicalPlanDowncast, - #[snafu(display("RecordBatch error: {}", source))] - RecordBatch { source: RecordBatchError }, - #[snafu(display("Execution error: {}", message))] - Execution { message: String }, - #[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))] - ParseSql { sql: String, source: ParserError }, - #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] - Planner { - sql: String, - source: DataFusionError, - }, -} +common_error::define_opaque_error!(Error); pub type Result = std::result::Result; diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index ee2645bb3a..cff92dfd1e 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -13,8 +13,9 @@ use table::table::adapter::DfTableProviderAdapter; use crate::{ catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, - error::{PlannerSnafu, Result}, + error::Result, plan::LogicalPlan, + query_engine::datafusion::error, }; pub trait Planner: Send + Sync { @@ -39,7 +40,8 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { let result = self .sql_to_rel .query_to_plan(query.inner) - .context(PlannerSnafu { sql })?; + // FIXME(yingwen): Move DfPlanner to datafusion mod. + .context(error::PlannerSnafu { sql })?; Ok(LogicalPlan::DfPlan(result)) } diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index ac274ef563..4e073a2e2b 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -1,5 +1,5 @@ mod context; -mod datafusion; +pub mod datafusion; mod state; use std::sync::Arc; diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/query_engine/datafusion.rs index 043585a89d..82a15c3568 100644 --- a/src/query/src/query_engine/datafusion.rs +++ b/src/query/src/query_engine/datafusion.rs @@ -1,4 +1,5 @@ mod adapter; +pub mod error; use std::sync::Arc; @@ -9,7 +10,7 @@ use sql::{dialect::GenericDialect, parser::ParserContext}; use super::{context::QueryContext, state::QueryEngineState}; use crate::{ catalog::CatalogListRef, - error::{self, ParseSqlSnafu, Result}, + error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, @@ -17,7 +18,7 @@ use crate::{ plan::{LogicalPlan, PhysicalPlan}, planner::{DfContextProviderAdapter, DfPlanner, Planner}, query_engine::datafusion::adapter::PhysicalPlanAdapter, - query_engine::{Output, QueryEngine}, + Output, QueryEngine, }; pub(crate) struct DatafusionQueryEngine { @@ -42,9 +43,7 @@ impl QueryEngine for DatafusionQueryEngine { let context_provider = DfContextProviderAdapter::new(self.state.catalog_list()); let planner = DfPlanner::new(&context_provider); let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {}) - .with_context(|_| ParseSqlSnafu { - sql: sql.to_string(), - })?; + .context(error::ParseSqlSnafu)?; // TODO(dennis): supports multi statement in one sql? assert!(1 == statement.len()); planner.statement_to_plan(statement.remove(0)) @@ -70,11 +69,13 @@ impl LogicalOptimizer for DatafusionQueryEngine { ) -> Result { match plan { LogicalPlan::DfPlan(df_plan) => { - let optimized_plan = self - .state - .df_context() - .optimize(df_plan) - .context(error::DatafusionSnafu)?; + let optimized_plan = + self.state + .df_context() + .optimize(df_plan) + .context(error::DatafusionSnafu { + msg: "Fail to optimize logical plan", + })?; Ok(LogicalPlan::DfPlan(optimized_plan)) } @@ -96,7 +97,9 @@ impl PhysicalPlanner for DatafusionQueryEngine { .df_context() .create_physical_plan(df_plan) .await - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to create physical plan", + })?; Ok(Arc::new(PhysicalPlanAdapter::new( Arc::new(physical_plan.schema().into()), @@ -126,7 +129,9 @@ impl PhysicalOptimizer for DatafusionQueryEngine { for optimizer in optimizers { new_plan = optimizer .optimize(new_plan, config) - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to optimize physical plan", + })?; } Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan))) } diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/query_engine/datafusion/adapter.rs index 5e432ce892..f6c10c9ba1 100644 --- a/src/query/src/query_engine/datafusion/adapter.rs +++ b/src/query/src/query_engine/datafusion/adapter.rs @@ -16,9 +16,10 @@ use datatypes::schema::SchemaRef; use snafu::ResultExt; use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; -use crate::error::{self, Result}; +use crate::error::Result; use crate::executor::Runtime; use crate::plan::{Partitioning, PhysicalPlan}; +use crate::query_engine::datafusion::error; /// Datafusion ExecutionPlan -> greptime PhysicalPlan pub struct PhysicalPlanAdapter { @@ -74,7 +75,9 @@ impl PhysicalPlan for PhysicalPlanAdapter { let plan = self .plan .with_new_children(df_children) - .context(error::DatafusionSnafu)?; + .context(error::DatafusionSnafu { + msg: "Fail to add children to plan", + })?; Ok(Arc::new(PhysicalPlanAdapter::new( self.schema.clone(), plan, @@ -86,11 +89,13 @@ impl PhysicalPlan for PhysicalPlanAdapter { runtime: &Runtime, partition: usize, ) -> Result { - let df_stream = self - .plan - .execute(partition, runtime.into()) - .await - .context(error::DatafusionSnafu)?; + let df_stream = + self.plan + .execute(partition, runtime.into()) + .await + .context(error::DatafusionSnafu { + msg: "Fail to execute physical plan", + })?; Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream))) } diff --git a/src/query/src/query_engine/datafusion/error.rs b/src/query/src/query_engine/datafusion/error.rs new file mode 100644 index 0000000000..50d6e96e0d --- /dev/null +++ b/src/query/src/query_engine/datafusion/error.rs @@ -0,0 +1,46 @@ +use common_error::ext::ErrorExt; +use datafusion::error::DataFusionError; +use snafu::{Backtrace, ErrorCompat, Snafu}; + +use crate::error::Error; + +/// Inner error of datafusion based query engine. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("{}: {}", msg, source))] + Datafusion { + msg: &'static str, + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("PhysicalPlan downcast failed"))] + PhysicalPlanDowncast { backtrace: Backtrace }, + + // The sql error already contains the SQL. + #[snafu(display("Cannot parse SQL, source: {}", source))] + ParseSql { source: sql::errors::ParserError }, + + #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] + Planner { + sql: String, + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Execution error: {}", message))] + Execution { message: String }, +} + +impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + ErrorCompat::backtrace(self) + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index d9f8ca19f9..1341b18318 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -16,12 +16,10 @@ use table::{ Table, }; -use crate::catalog::{ - schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME, - DEFAULT_SCHEMA_NAME, -}; -use crate::error::{self, Result}; +use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider}; +use crate::error::Result; use crate::executor::Runtime; +use crate::query_engine::datafusion::error; /// Query engine global state #[derive(Clone)] @@ -39,8 +37,10 @@ impl fmt::Debug for QueryEngineState { impl QueryEngineState { pub(crate) fn new(catalog_list: CatalogListRef) -> Self { - let config = ExecutionConfig::new() - .with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME); + let config = ExecutionConfig::new().with_default_catalog_and_schema( + catalog::DEFAULT_CATALOG_NAME, + catalog::DEFAULT_SCHEMA_NAME, + ); let df_context = ExecutionContext::with_config(config); df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { @@ -193,18 +193,16 @@ impl DfSchemaProvider for DfSchemaProviderAdapter { table: Arc, ) -> DataFusionResult>> { let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); - match self.schema_provider.register_table(name, table) { - Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - Ok(None) => Ok(None), - Err(e) => Err(e.into()), + match self.schema_provider.register_table(name, table)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), } } fn deregister_table(&self, name: &str) -> DataFusionResult>> { - match self.schema_provider.deregister_table(name) { - Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - Ok(None) => Ok(None), - Err(e) => Err(e.into()), + match self.schema_provider.deregister_table(name)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), } } @@ -244,7 +242,9 @@ impl SchemaProvider for SchemaProviderAdapter { Ok(self .df_schema_provider .register_table(name, table_provider) - .context(error::DatafusionSnafu)? + .context(error::DatafusionSnafu { + msg: "Fail to register table to datafusion", + })? .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) } @@ -252,7 +252,9 @@ impl SchemaProvider for SchemaProviderAdapter { Ok(self .df_schema_provider .deregister_table(name) - .context(error::DatafusionSnafu)? + .context(error::DatafusionSnafu { + msg: "Fail to deregister table from datafusion", + })? .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) } diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index b9b763e398..a3c34f5e47 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" diff --git a/src/sql/src/errors.rs b/src/sql/src/errors.rs index 155e4fd55d..d483dc2877 100644 --- a/src/sql/src/errors.rs +++ b/src/sql/src/errors.rs @@ -2,14 +2,20 @@ use snafu::prelude::*; use sqlparser::parser::ParserError as SpParserError; /// SQL parser errors. +// Now the error in parser does not contains backtrace to avoid generating backtrace +// every time the parser parses an invalid SQL. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum ParserError { - #[snafu(display("SQL statement is not supported: {sql}, keyword: {keyword}"))] + #[snafu(display("SQL statement is not supported: {}, keyword: {}", sql, keyword))] Unsupported { sql: String, keyword: String }, #[snafu(display( - "Unexpected token while parsing SQL statement: {sql}, expected: {expected}, found: {actual}, source: {source}" + "Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}", + sql, + expected, + actual, + source ))] Unexpected { sql: String, @@ -18,34 +24,30 @@ pub enum ParserError { source: SpParserError, }, - #[snafu(display("SQL syntax error: {msg}"))] - SyntaxError { msg: String }, - - #[snafu(display("Unknown inner parser error, sql: {sql}, source: {source}"))] - InnerError { sql: String, source: SpParserError }, + // Syntax error from sql parser. + #[snafu(display("Syntax error, sql: {}, source: {}", sql, source))] + SpSyntax { sql: String, source: SpParserError }, } #[cfg(test)] mod tests { use std::assert_matches::assert_matches; - use snafu::ResultExt; + use super::*; #[test] pub fn test_error_conversion() { - pub fn raise_error() -> Result<(), sqlparser::parser::ParserError> { - Err(sqlparser::parser::ParserError::ParserError( - "parser error".to_string(), - )) + pub fn raise_error() -> Result<(), SpParserError> { + Err(SpParserError::ParserError("parser error".to_string())) } assert_matches!( - raise_error().context(crate::errors::InnerSnafu { + raise_error().context(SpSyntaxSnafu { sql: "".to_string(), }), - Err(super::ParserError::InnerError { + Err(ParserError::SpSyntax { sql: _, - source: sqlparser::parser::ParserError::ParserError { .. } + source: SpParserError::ParserError { .. } }) ) } diff --git a/src/sql/src/parsers/insert_parser.rs b/src/sql/src/parsers/insert_parser.rs index 84af1fa78e..a1699e8b9a 100644 --- a/src/sql/src/parsers/insert_parser.rs +++ b/src/sql/src/parsers/insert_parser.rs @@ -14,7 +14,7 @@ impl<'a> ParserContext<'a> { let spstatement = self .parser .parse_insert() - .context(errors::InnerSnafu { sql: self.sql })?; + .context(errors::SpSyntaxSnafu { sql: self.sql })?; match spstatement { SpStatement::Insert { .. } => { diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index adcc4ab302..35975f2393 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -12,7 +12,7 @@ impl<'a> ParserContext<'a> { let spquery = self .parser .parse_query() - .context(errors::InnerSnafu { sql: self.sql })?; + .context(errors::SpSyntaxSnafu { sql: self.sql })?; Ok(Statement::Query(Box::new(Query::try_from(spquery)?))) } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 505f41b57c..1f20c6a0bb 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -11,6 +11,7 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc [dependencies] async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } +common-error = {path = "../common/error" } common-query = {path = "../common/query" } common-recordbatch = {path = "../common/recordbatch" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]} @@ -18,4 +19,4 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b datatypes = { path = "../datatypes" } futures = "0.3" serde = "1.0.136" -snafu = "0.7.0" +snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 13248151ca..8bece2377b 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -1,18 +1,45 @@ +use common_error::ext::ErrorExt; use datafusion::error::DataFusionError; -use snafu::Snafu; +use snafu::{Backtrace, ErrorCompat, Snafu}; + +common_error::define_opaque_error!(Error); -#[derive(Debug, Snafu)] -#[snafu(visibility(pub))] -pub enum Error { - #[snafu(display("Datafusion error: {}", source))] - Datafusion { source: DataFusionError }, - #[snafu(display("Not expected to run ExecutionPlan more than once."))] - ExecuteRepeatedly, -} pub type Result = std::result::Result; impl From for DataFusionError { - fn from(e: Error) -> DataFusionError { + fn from(e: Error) -> Self { + Self::External(Box::new(e)) + } +} + +/// Default error implementation of table. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum InnerError { + #[snafu(display("Datafusion error: {}", source))] + Datafusion { + source: DataFusionError, + backtrace: Backtrace, + }, + + #[snafu(display("Not expected to run ExecutionPlan more than once"))] + ExecuteRepeatedly { backtrace: Backtrace }, +} + +impl ErrorExt for InnerError { + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + ErrorCompat::backtrace(self) + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} + +impl From for DataFusionError { + fn from(e: InnerError) -> DataFusionError { DataFusionError::External(Box::new(e)) } } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index ba266a9ae8..c6746435f8 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -16,7 +16,7 @@ use datafusion::datasource::{ datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider, TableType as DfTableType, }; -use datafusion::error::{DataFusionError, Result as DfResult}; +use datafusion::error::Result as DfResult; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr as DfExpr; use datafusion::physical_plan::{ @@ -90,9 +90,7 @@ impl ExecutionPlan for ExecutionPlanAdapter { let stream = mem::replace(&mut *stream, None); Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap()))) } else { - error::ExecuteRepeatedlySnafu - .fail() - .map_err(|e| DataFusionError::External(Box::new(e))) + error::ExecuteRepeatedlySnafu.fail()? } } @@ -139,28 +137,23 @@ impl TableProvider for DfTableProviderAdapter { ) -> DfResult> { let filters: Vec = filters.iter().map(Clone::clone).map(Expr::new).collect(); - match self.table.scan(projection, &filters, limit).await { - Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter { - schema: stream.schema(), - stream: Mutex::new(Some(stream)), - })), - Err(e) => Err(e.into()), - } + let stream = self.table.scan(projection, &filters, limit).await?; + Ok(Arc::new(ExecutionPlanAdapter { + schema: stream.schema(), + stream: Mutex::new(Some(stream)), + })) } fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult { - match self + let p = self .table - .supports_filter_pushdown(&Expr::new(filter.clone())) - { - Ok(p) => match p { - TableProviderFilterPushDown::Unsupported => { - Ok(DfTableProviderFilterPushDown::Unsupported) - } - TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), - TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), - }, - Err(e) => Err(e.into()), + .supports_filter_pushdown(&Expr::new(filter.clone()))?; + match p { + TableProviderFilterPushDown::Unsupported => { + Ok(DfTableProviderFilterPushDown::Unsupported) + } + TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact), + TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact), } } } From 08ccb466cb0d84b71d8eff0de0b60fa07a1a9603 Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 29 Apr 2022 16:28:16 +0800 Subject: [PATCH 02/14] fix(query): Remove unnecessary unsafe impl Send/Sync for ExecutionPlanAdapter --- src/query/src/query_engine/datafusion/adapter.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/query_engine/datafusion/adapter.rs index f6c10c9ba1..32ef992949 100644 --- a/src/query/src/query_engine/datafusion/adapter.rs +++ b/src/query/src/query_engine/datafusion/adapter.rs @@ -118,9 +118,6 @@ impl Debug for ExecutionPlanAdapter { } } -unsafe impl Send for ExecutionPlanAdapter {} -unsafe impl Sync for ExecutionPlanAdapter {} - #[async_trait::async_trait] impl ExecutionPlan for ExecutionPlanAdapter { fn as_any(&self) -> &dyn Any { From 7e2e3e342979941c16d3e6728c2e0a3b0e91c57b Mon Sep 17 00:00:00 2001 From: evenyag Date: Fri, 29 Apr 2022 16:51:55 +0800 Subject: [PATCH 03/14] feat: Impl ErrorExt for opaque error and ParseError --- Cargo.lock | 1 + src/common/error/src/ext.rs | 38 ++++++++++++++----- src/common/error/src/lib.rs | 10 +++++ src/common/error/src/status_code.rs | 11 +++++- src/query/src/error.rs | 1 + .../src/query_engine/datafusion/error.rs | 1 + src/sql/Cargo.toml | 1 + src/sql/src/errors.rs | 16 +++++++- 8 files changed, 68 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab314d117d..fe9a95184e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1774,6 +1774,7 @@ dependencies = [ name = "sql" version = "0.1.0" dependencies = [ + "common-error", "snafu", "sqlparser", ] diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index be982096a4..deaf728601 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -2,14 +2,15 @@ use crate::status_code::StatusCode; /// Extension to [`Error`](std::error::Error) in std. pub trait ErrorExt: std::error::Error { - /// Returns the [StatusCode] of this holder. + /// Map this error to [StatusCode]. fn status_code(&self) -> StatusCode { StatusCode::Unknown } - /// Get the reference to the backtrace of this error. - // Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`. - fn backtrace_opt(&self) -> Option<&snafu::Backtrace>; + /// Get the reference to the backtrace of this error, None if the backtrace is unavailable. + // Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`, once backtrace + // in std is stable, we can deprecate this method. + fn backtrace_opt(&self) -> Option<&crate::snafu::Backtrace>; } /// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait. @@ -25,7 +26,6 @@ macro_rules! define_opaque_error { } impl $Error { - /// Create a new error. pub fn new(err: E) -> Self { Self { inner: Box::new(err), @@ -57,7 +57,13 @@ macro_rules! define_opaque_error { self.inner.status_code() } - fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + fn backtrace_opt(&self) -> Option<&$crate::snafu::Backtrace> { + self.inner.backtrace_opt() + } + } + + impl $crate::snafu::ErrorCompat for $Error { + fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> { self.inner.backtrace_opt() } } @@ -68,7 +74,7 @@ macro_rules! define_opaque_error { mod tests { use std::error::Error as StdError; - use snafu::{prelude::*, Backtrace}; + use snafu::{prelude::*, Backtrace, ErrorCompat}; use super::*; @@ -88,7 +94,7 @@ mod tests { impl ErrorExt for InnerError { fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { - snafu::ErrorCompat::backtrace(self) + ErrorCompat::backtrace(self) } } @@ -111,7 +117,7 @@ mod tests { } #[test] - fn test_opaque_error() { + fn test_inner_error() { let leaf = throw_leaf().err().unwrap(); assert!(leaf.backtrace_opt().is_some()); assert!(leaf.source().is_none()); @@ -120,4 +126,18 @@ mod tests { assert!(internal.backtrace_opt().is_some()); assert!(internal.source().is_some()); } + + #[test] + fn test_opaque_error() { + let err: Error = throw_leaf().map_err(Into::into).err().unwrap(); + let msg = format!("{:?}", err); + assert!(msg.contains("\nBacktrace:\n")); + assert!(ErrorCompat::backtrace(&err).is_some()); + + let err: Error = throw_internal().map_err(Into::into).err().unwrap(); + let msg = format!("{:?}", err); + assert!(msg.contains("\nBacktrace:\n")); + assert!(msg.contains("Caused by")); + assert!(ErrorCompat::backtrace(&err).is_some()); + } } diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 98f017a44f..9ac962b859 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -1,3 +1,13 @@ pub mod ext; pub mod format; pub mod status_code; + +pub mod prelude { + pub use snafu::Backtrace; + + pub use crate::ext::ErrorExt; + pub use crate::format::DebugFormat; + pub use crate::status_code::StatusCode; +} + +pub use snafu; diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 096d88ae4b..92f0ec2620 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -1,6 +1,15 @@ /// Common status code for public API. #[derive(Debug, Clone, Copy)] pub enum StatusCode { - /// Unknown status. + // ====== Begin of common status code =========== + /// Unknown error. Unknown, + /// Unsupported operation. + Unsupported, + // ====== End of common status code ============= + + // ====== Begin of SQL related status code ====== + /// SQL Syntax error. + InvalidSyntax, + // ====== End of SQL related status code ======== } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 1253da97d2..76cedc1872 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,4 +1,5 @@ use datafusion::error::DataFusionError; +use snafu::Snafu; common_error::define_opaque_error!(Error); diff --git a/src/query/src/query_engine/datafusion/error.rs b/src/query/src/query_engine/datafusion/error.rs index 50d6e96e0d..4cb207fcd1 100644 --- a/src/query/src/query_engine/datafusion/error.rs +++ b/src/query/src/query_engine/datafusion/error.rs @@ -33,6 +33,7 @@ pub enum InnerError { Execution { message: String }, } +// TODO(yingwen): Implement status_code(). impl ErrorExt for InnerError { fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { ErrorCompat::backtrace(self) diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index a3c34f5e47..98365426a0 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -6,5 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +common-error = { path = "../common/error" } snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.15.0" diff --git a/src/sql/src/errors.rs b/src/sql/src/errors.rs index d483dc2877..1bf4ed48a3 100644 --- a/src/sql/src/errors.rs +++ b/src/sql/src/errors.rs @@ -1,4 +1,5 @@ -use snafu::prelude::*; +use common_error::prelude::*; +use snafu::{prelude::*, ErrorCompat}; use sqlparser::parser::ParserError as SpParserError; /// SQL parser errors. @@ -29,6 +30,19 @@ pub enum ParserError { SpSyntax { sql: String, source: SpParserError }, } +impl ErrorExt for ParserError { + fn status_code(&self) -> StatusCode { + match self { + Self::Unsupported { .. } => StatusCode::Unsupported, + Self::Unexpected { .. } | Self::SpSyntax { .. } => StatusCode::InvalidSyntax, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; From 63d9aa1bffb8c49c46821cb727892ab9041f48e3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 11:33:45 +0800 Subject: [PATCH 04/14] refactor: Refactor datanode error and impl ErrorExt for it --- Cargo.lock | 1 + src/common/error/src/ext.rs | 8 +++++++ src/common/error/src/lib.rs | 2 +- src/common/error/src/status_code.rs | 2 ++ src/datanode/Cargo.toml | 4 ++-- src/datanode/src/datanode.rs | 4 ++-- src/datanode/src/error.rs | 33 +++++++++++++++++++++++------ src/datanode/src/instance.rs | 9 +++++--- src/datanode/src/server/http.rs | 4 ++-- src/query/src/error.rs | 1 - 10 files changed, 50 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe9a95184e..a776c55494 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,6 +651,7 @@ dependencies = [ "arrow2", "axum", "axum-macros", + "common-error", "common-recordbatch", "hyper", "query", diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index deaf728601..d78e35326c 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -35,6 +35,7 @@ macro_rules! define_opaque_error { impl std::fmt::Debug for $Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Use the pretty debug format of inner error for opaque error. let debug_format = $crate::format::DebugFormat::new(&*self.inner); debug_format.fmt(f) } @@ -62,6 +63,8 @@ macro_rules! define_opaque_error { } } + // Implement ErrorCompat for this opaque error so the backtrace is also available + // via `ErrorCompat::backtrace()`. impl $crate::snafu::ErrorCompat for $Error { fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> { self.inner.backtrace_opt() @@ -77,6 +80,7 @@ mod tests { use snafu::{prelude::*, Backtrace, ErrorCompat}; use super::*; + use crate::prelude::*; define_opaque_error!(Error); @@ -133,11 +137,15 @@ mod tests { let msg = format!("{:?}", err); assert!(msg.contains("\nBacktrace:\n")); assert!(ErrorCompat::backtrace(&err).is_some()); + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!(msg, fmt_msg); let err: Error = throw_internal().map_err(Into::into).err().unwrap(); let msg = format!("{:?}", err); assert!(msg.contains("\nBacktrace:\n")); assert!(msg.contains("Caused by")); assert!(ErrorCompat::backtrace(&err).is_some()); + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!(msg, fmt_msg); } } diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 9ac962b859..3f8cb939da 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -3,7 +3,7 @@ pub mod format; pub mod status_code; pub mod prelude { - pub use snafu::Backtrace; + pub use snafu::{Backtrace, ErrorCompat}; pub use crate::ext::ErrorExt; pub use crate::format::DebugFormat; diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 92f0ec2620..2afcbc278d 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -6,6 +6,8 @@ pub enum StatusCode { Unknown, /// Unsupported operation. Unsupported, + /// Internal server error. + Internal, // ====== End of common status code ============= // ====== Begin of SQL related status code ====== diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index a6aa9a8fa6..11bde84dfe 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -8,7 +8,8 @@ edition = "2021" [dependencies] axum = "0.5" axum-macros = "0.2" -common-recordbatch = {path = "../common/recordbatch" } +common-error = { path = "../common/error" } +common-recordbatch = { path = "../common/recordbatch" } hyper = { version = "0.14", features = ["full"] } query = { path = "../query" } serde = "1.0" @@ -23,4 +24,3 @@ tower-http = { version ="0.3", features = ["full"]} package = "arrow2" version="0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] - diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 454db221be..cef5f35745 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -4,7 +4,7 @@ use query::catalog::memory; use query::catalog::CatalogListRef; use snafu::ResultExt; -use crate::error::{QuerySnafu, Result}; +use crate::error::{NewCatalogSnafu, Result}; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; @@ -17,7 +17,7 @@ pub struct DataNode { impl DataNode { pub fn new() -> Result { - let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?; + let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?; let instance = Arc::new(Instance::new(catalog_list.clone())); Ok(Self { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 04cf8ab34c..6b4762d008 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,15 +1,34 @@ -use hyper::Error as HyperError; -use query::error::Error as QueryError; +use common_error::prelude::*; use snafu::Snafu; -/// business error of datanode. +/// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Query error: {}", source))] - Query { source: QueryError }, - #[snafu(display("Http error: {}", source))] - Hyper { source: HyperError }, + #[snafu(display("Fail to execute sql, source: {}", source))] + ExecuteSql { source: query::error::Error }, + + #[snafu(display("Fail to create catalog list, source: {}", source))] + NewCatalog { source: query::error::Error }, + + // The error source of http error is clear even without backtrace now so + // a backtrace is not carried in this varaint. + #[snafu(display("Fail to start HTTP server, source: {}", source))] + StartHttp { source: hyper::Error }, } pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), + // TODO(yingwen): Further categorize http error. + Error::StartHttp { .. } => StatusCode::Internal, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 6f04239c6d..b2ee532977 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -4,7 +4,7 @@ use query::catalog::CatalogListRef; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::ResultExt; -use crate::error::{QuerySnafu, Result}; +use crate::error::{ExecuteSqlSnafu, Result}; // An abstraction to read/write services. pub struct Instance { @@ -27,12 +27,15 @@ impl Instance { } pub async fn execute_sql(&self, sql: &str) -> Result { - let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?; + let logical_plan = self + .query_engine + .sql_to_plan(sql) + .context(ExecuteSqlSnafu)?; self.query_engine .execute(&logical_plan) .await - .context(QuerySnafu) + .context(ExecuteSqlSnafu) } } diff --git a/src/datanode/src/server/http.rs b/src/datanode/src/server/http.rs index a1f22c57ef..2061f05a6c 100644 --- a/src/datanode/src/server/http.rs +++ b/src/datanode/src/server/http.rs @@ -17,7 +17,7 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; -use crate::error::{HyperSnafu, Result}; +use crate::error::{Result, StartHttpSnafu}; use crate::server::InstanceRef; /// Http server @@ -106,7 +106,7 @@ impl HttpServer { let server = axum::Server::bind(&addr).serve(app.into_make_service()); let graceful = server.with_graceful_shutdown(shutdown_signal()); - graceful.await.context(HyperSnafu)?; + graceful.await.context(StartHttpSnafu)?; Ok(()) } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 76cedc1872..1253da97d2 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,5 +1,4 @@ use datafusion::error::DataFusionError; -use snafu::Snafu; common_error::define_opaque_error!(Error); From 83262acc0e34e146e987951ca1476e4071a85e2f Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 11:42:33 +0800 Subject: [PATCH 05/14] refactor: Add more imports to common_error::prelude --- src/common/error/src/lib.rs | 2 +- src/query/src/query_engine/datafusion/error.rs | 2 +- src/sql/src/errors.rs | 1 - src/table/src/error.rs | 3 +-- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 3f8cb939da..9b7ed10113 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -3,7 +3,7 @@ pub mod format; pub mod status_code; pub mod prelude { - pub use snafu::{Backtrace, ErrorCompat}; + pub use snafu::{prelude::*, Backtrace, ErrorCompat}; pub use crate::ext::ErrorExt; pub use crate::format::DebugFormat; diff --git a/src/query/src/query_engine/datafusion/error.rs b/src/query/src/query_engine/datafusion/error.rs index 4cb207fcd1..5798b20e9f 100644 --- a/src/query/src/query_engine/datafusion/error.rs +++ b/src/query/src/query_engine/datafusion/error.rs @@ -1,4 +1,4 @@ -use common_error::ext::ErrorExt; +use common_error::prelude::*; use datafusion::error::DataFusionError; use snafu::{Backtrace, ErrorCompat, Snafu}; diff --git a/src/sql/src/errors.rs b/src/sql/src/errors.rs index 1bf4ed48a3..22446d66dd 100644 --- a/src/sql/src/errors.rs +++ b/src/sql/src/errors.rs @@ -1,5 +1,4 @@ use common_error::prelude::*; -use snafu::{prelude::*, ErrorCompat}; use sqlparser::parser::ParserError as SpParserError; /// SQL parser errors. diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 8bece2377b..5b03f04628 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -1,6 +1,5 @@ -use common_error::ext::ErrorExt; +use common_error::prelude::*; use datafusion::error::DataFusionError; -use snafu::{Backtrace, ErrorCompat, Snafu}; common_error::define_opaque_error!(Error); From 99c7ffb45689967d923b833f85e81d2defa45379 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 14:12:57 +0800 Subject: [PATCH 06/14] refactor: Define error for memory catalog --- src/common/error/src/status_code.rs | 13 ++++--- src/datanode/src/error.rs | 1 - src/query/src/catalog/memory.rs | 34 +++++++++++++++---- .../src/query_engine/datafusion/error.rs | 18 ++++++---- 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 2afcbc278d..2b0a338b31 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -1,17 +1,22 @@ /// Common status code for public API. #[derive(Debug, Clone, Copy)] pub enum StatusCode { - // ====== Begin of common status code =========== + // ====== Begin of common status code ============== /// Unknown error. Unknown, /// Unsupported operation. Unsupported, /// Internal server error. Internal, - // ====== End of common status code ============= + // ====== End of common status code ================ - // ====== Begin of SQL related status code ====== + // ====== Begin of SQL related status code ========= /// SQL Syntax error. InvalidSyntax, - // ====== End of SQL related status code ======== + // ====== End of SQL related status code =========== + + // ====== Begin of catalog related status code ===== + /// Table already exists. + TableAlreadyExists, + // ====== End of catalog related status code ======= } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 6b4762d008..de767ffe8c 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,5 +1,4 @@ use common_error::prelude::*; -use snafu::Snafu; /// Business error of datanode. #[derive(Debug, Snafu)] diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index 6c4aa8be41..e2c194c001 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::RwLock; +use common_error::prelude::*; use table::table::numbers::NumbersTable; use table::TableRef; @@ -11,8 +12,32 @@ use crate::catalog::{ CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, }; -use crate::error::Result; -use crate::query_engine::datafusion::error; +use crate::error::{Error, Result}; + +/// Error implementation of memory catalog. +#[derive(Debug, Snafu)] +pub enum InnerError { + #[snafu(display("Table {} already exists", table))] + TableExists { table: String, backtrace: Backtrace }, +} + +impl ErrorExt for InnerError { + fn status_code(&self) -> StatusCode { + match self { + InnerError::TableExists { .. } => StatusCode::TableAlreadyExists, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } +} + +impl From for Error { + fn from(err: InnerError) -> Self { + Self::new(err) + } +} /// Simple in-memory list of catalogs #[derive(Default)] @@ -129,10 +154,7 @@ impl SchemaProvider for MemorySchemaProvider { fn register_table(&self, name: String, table: TableRef) -> Result> { if self.table_exist(name.as_str()) { // FIXME(yingwen): Define another error. - return error::ExecutionSnafu { - message: format!("The table {} already exists", name), - } - .fail()?; + return TableExistsSnafu { table: name }.fail()?; } let mut tables = self.tables.write().unwrap(); Ok(tables.insert(name, table)) diff --git a/src/query/src/query_engine/datafusion/error.rs b/src/query/src/query_engine/datafusion/error.rs index 5798b20e9f..1d89deacd2 100644 --- a/src/query/src/query_engine/datafusion/error.rs +++ b/src/query/src/query_engine/datafusion/error.rs @@ -1,6 +1,5 @@ use common_error::prelude::*; use datafusion::error::DataFusionError; -use snafu::{Backtrace, ErrorCompat, Snafu}; use crate::error::Error; @@ -28,14 +27,21 @@ pub enum InnerError { source: DataFusionError, backtrace: Backtrace, }, - - #[snafu(display("Execution error: {}", message))] - Execution { message: String }, } -// TODO(yingwen): Implement status_code(). impl ErrorExt for InnerError { - fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + fn status_code(&self) -> StatusCode { + use InnerError::*; + + match self { + ParseSql { source, .. } => source.status_code(), + Datafusion { .. } | PhysicalPlanDowncast { .. } | Planner { .. } => { + StatusCode::Internal + } + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } } From 6a206575912759173f3c1c7e181215f821539dd3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 14:38:57 +0800 Subject: [PATCH 07/14] refactor: query::query_engine::datafusion -> query::datafusion --- src/query/src/{query_engine => }/datafusion.rs | 5 ++--- src/query/src/{query_engine => }/datafusion/adapter.rs | 2 +- src/query/src/{query_engine => }/datafusion/error.rs | 0 src/query/src/lib.rs | 1 + src/query/src/planner.rs | 2 +- src/query/src/query_engine.rs | 6 +++--- src/query/src/query_engine/state.rs | 5 ++++- src/table/src/table/adapter.rs | 2 +- 8 files changed, 13 insertions(+), 10 deletions(-) rename src/query/src/{query_engine => }/datafusion.rs (97%) rename src/query/src/{query_engine => }/datafusion/adapter.rs (99%) rename src/query/src/{query_engine => }/datafusion/error.rs (100%) diff --git a/src/query/src/query_engine/datafusion.rs b/src/query/src/datafusion.rs similarity index 97% rename from src/query/src/query_engine/datafusion.rs rename to src/query/src/datafusion.rs index 82a15c3568..29dd339b97 100644 --- a/src/query/src/query_engine/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -7,9 +7,10 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use snafu::{OptionExt, ResultExt}; use sql::{dialect::GenericDialect, parser::ParserContext}; -use super::{context::QueryContext, state::QueryEngineState}; +use crate::query_engine::{QueryContext, QueryEngineState}; use crate::{ catalog::CatalogListRef, + datafusion::adapter::PhysicalPlanAdapter, error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, @@ -17,7 +18,6 @@ use crate::{ physical_planner::PhysicalPlanner, plan::{LogicalPlan, PhysicalPlan}, planner::{DfContextProviderAdapter, DfPlanner, Planner}, - query_engine::datafusion::adapter::PhysicalPlanAdapter, Output, QueryEngine, }; @@ -177,7 +177,6 @@ mod tests { let plan = engine.sql_to_plan(sql).unwrap(); - println!("{:?}", plan); assert_eq!( format!("{:?}", plan), r#"DfPlan(Limit: 20 diff --git a/src/query/src/query_engine/datafusion/adapter.rs b/src/query/src/datafusion/adapter.rs similarity index 99% rename from src/query/src/query_engine/datafusion/adapter.rs rename to src/query/src/datafusion/adapter.rs index 32ef992949..f2e64b68ca 100644 --- a/src/query/src/query_engine/datafusion/adapter.rs +++ b/src/query/src/datafusion/adapter.rs @@ -16,10 +16,10 @@ use datatypes::schema::SchemaRef; use snafu::ResultExt; use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter}; +use crate::datafusion::error; use crate::error::Result; use crate::executor::Runtime; use crate::plan::{Partitioning, PhysicalPlan}; -use crate::query_engine::datafusion::error; /// Datafusion ExecutionPlan -> greptime PhysicalPlan pub struct PhysicalPlanAdapter { diff --git a/src/query/src/query_engine/datafusion/error.rs b/src/query/src/datafusion/error.rs similarity index 100% rename from src/query/src/query_engine/datafusion/error.rs rename to src/query/src/datafusion/error.rs diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index c065b8857e..c77401c603 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -1,5 +1,6 @@ pub mod catalog; pub mod database; +mod datafusion; pub mod error; pub mod executor; pub mod logical_optimizer; diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index cff92dfd1e..d00d51a6db 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -13,9 +13,9 @@ use table::table::adapter::DfTableProviderAdapter; use crate::{ catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, + datafusion::error, error::Result, plan::LogicalPlan, - query_engine::datafusion::error, }; pub trait Planner: Send + Sync { diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 4e073a2e2b..215794a1fe 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -1,16 +1,16 @@ mod context; -pub mod datafusion; mod state; use std::sync::Arc; use common_recordbatch::SendableRecordBatchStream; -pub use context::QueryContext; use crate::catalog::CatalogList; +use crate::datafusion::DatafusionQueryEngine; use crate::error::Result; use crate::plan::LogicalPlan; -use crate::query_engine::datafusion::DatafusionQueryEngine; +pub use crate::query_engine::context::QueryContext; +pub use crate::query_engine::state::QueryEngineState; /// Sql output pub enum Output { diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 1341b18318..f5db6dabed 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -17,11 +17,14 @@ use table::{ }; use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider}; +use crate::datafusion::error; use crate::error::Result; use crate::executor::Runtime; -use crate::query_engine::datafusion::error; /// Query engine global state +// TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it, +// which allows different implementation use different engine state. The state can also be an associated +// type in QueryEngine trait. #[derive(Clone)] pub struct QueryEngineState { df_context: ExecutionContext, diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index c6746435f8..e1328b106b 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -30,8 +30,8 @@ use datatypes::schema::{Schema, SchemaRef}; use futures::Stream; use snafu::prelude::*; -use super::{Table, TableProviderFilterPushDown, TableRef, TableType}; use crate::error::{self, Result}; +use crate::table::{Table, TableProviderFilterPushDown, TableRef, TableType}; /// Greptime SendableRecordBatchStream -> datafusion ExecutionPlan. struct ExecutionPlanAdapter { From ec6335336428abec331bd624f329024f25adfbfe Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 15:27:00 +0800 Subject: [PATCH 08/14] refactor: Move planner and adapters to datafusion mod --- src/query/src/datafusion.rs | 14 +- src/query/src/datafusion/catalog_adapter.rs | 222 ++++++++++++++++++ src/query/src/datafusion/error.rs | 4 +- .../{adapter.rs => plan_adapter.rs} | 0 src/query/src/datafusion/planner.rs | 113 +++++++++ src/query/src/planner.rs | 107 +-------- src/query/src/query_engine/state.rs | 219 +---------------- 7 files changed, 355 insertions(+), 324 deletions(-) create mode 100644 src/query/src/datafusion/catalog_adapter.rs rename src/query/src/datafusion/{adapter.rs => plan_adapter.rs} (100%) create mode 100644 src/query/src/datafusion/planner.rs diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 29dd339b97..700a33534e 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -1,5 +1,9 @@ -mod adapter; -pub mod error; +//! Planner, QueryEngine implementations based on DataFusion. + +mod catalog_adapter; +mod error; +mod plan_adapter; +mod planner; use std::sync::Arc; @@ -7,17 +11,19 @@ use common_recordbatch::{EmptyRecordBatchStream, SendableRecordBatchStream}; use snafu::{OptionExt, ResultExt}; use sql::{dialect::GenericDialect, parser::ParserContext}; +pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; use crate::query_engine::{QueryContext, QueryEngineState}; use crate::{ catalog::CatalogListRef, - datafusion::adapter::PhysicalPlanAdapter, + datafusion::plan_adapter::PhysicalPlanAdapter, + datafusion::planner::{DfContextProviderAdapter, DfPlanner}, error::Result, executor::QueryExecutor, logical_optimizer::LogicalOptimizer, physical_optimizer::PhysicalOptimizer, physical_planner::PhysicalPlanner, plan::{LogicalPlan, PhysicalPlan}, - planner::{DfContextProviderAdapter, DfPlanner, Planner}, + planner::Planner, Output, QueryEngine, }; diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs new file mode 100644 index 0000000000..e357b28edb --- /dev/null +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -0,0 +1,222 @@ +//! Catalog adapter between datafusion and greptime query engine. + +use std::any::Any; +use std::sync::Arc; + +use datafusion::catalog::{ + catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, + schema::SchemaProvider as DfSchemaProvider, +}; +use datafusion::datasource::TableProvider as DfTableProvider; +use datafusion::error::Result as DataFusionResult; +use datafusion::execution::runtime_env::RuntimeEnv; +use snafu::ResultExt; +use table::{ + table::adapter::{DfTableProviderAdapter, TableAdapter}, + Table, +}; + +use crate::catalog::{schema::SchemaProvider, CatalogListRef, CatalogProvider}; +use crate::datafusion::error; +use crate::error::Result; + +pub struct DfCatalogListAdapter { + runtime: Arc, + catalog_list: CatalogListRef, +} + +impl DfCatalogListAdapter { + pub fn new(runtime: Arc, catalog_list: CatalogListRef) -> DfCatalogListAdapter { + DfCatalogListAdapter { + runtime, + catalog_list, + } + } +} + +impl DfCatalogList for DfCatalogListAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + let catalog_adapter = Arc::new(CatalogProviderAdapter { + df_cataglog_provider: catalog, + runtime: self.runtime.clone(), + }); + self.catalog_list + .register_catalog(name, catalog_adapter) + .map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } + + fn catalog_names(&self) -> Vec { + self.catalog_list.catalog_names() + } + + fn catalog(&self, name: &str) -> Option> { + self.catalog_list.catalog(name).map(|catalog_provider| { + Arc::new(DfCatalogProviderAdapter { + catalog_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +/// Datafusion's CatalogProvider -> greptime CatalogProvider +struct CatalogProviderAdapter { + df_cataglog_provider: Arc, + runtime: Arc, +} + +impl CatalogProvider for CatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.df_cataglog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.df_cataglog_provider + .schema(name) + .map(|df_schema_provider| { + Arc::new(SchemaProviderAdapter { + df_schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +///Greptime CatalogProvider -> datafusion's CatalogProvider +struct DfCatalogProviderAdapter { + catalog_provider: Arc, + runtime: Arc, +} + +impl DfCatalogProvider for DfCatalogProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.catalog_provider.schema_names() + } + + fn schema(&self, name: &str) -> Option> { + self.catalog_provider.schema(name).map(|schema_provider| { + Arc::new(DfSchemaProviderAdapter { + schema_provider, + runtime: self.runtime.clone(), + }) as _ + }) + } +} + +/// Greptime SchemaProvider -> datafusion SchemaProvider +struct DfSchemaProviderAdapter { + schema_provider: Arc, + runtime: Arc, +} + +impl DfSchemaProvider for DfSchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + self.schema_provider + .table(name) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> DataFusionResult>> { + let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); + match self.schema_provider.register_table(name, table)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), + } + } + + fn deregister_table(&self, name: &str) -> DataFusionResult>> { + match self.schema_provider.deregister_table(name)? { + Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), + None => Ok(None), + } + } + + fn table_exist(&self, name: &str) -> bool { + self.schema_provider.table_exist(name) + } +} + +/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter +struct SchemaProviderAdapter { + df_schema_provider: Arc, + runtime: Arc, +} + +impl SchemaProvider for SchemaProviderAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + /// Retrieves the list of available table names in this schema. + fn table_names(&self) -> Vec { + self.df_schema_provider.table_names() + } + + fn table(&self, name: &str) -> Option> { + self.df_schema_provider.table(name).map(|table_provider| { + Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ + }) + } + + fn register_table( + &self, + name: String, + table: Arc, + ) -> Result>> { + let table_provider = Arc::new(DfTableProviderAdapter::new(table)); + Ok(self + .df_schema_provider + .register_table(name, table_provider) + .context(error::DatafusionSnafu { + msg: "Fail to register table to datafusion", + })? + .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) + } + + fn deregister_table(&self, name: &str) -> Result>> { + Ok(self + .df_schema_provider + .deregister_table(name) + .context(error::DatafusionSnafu { + msg: "Fail to deregister table from datafusion", + })? + .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) + } + + fn table_exist(&self, name: &str) -> bool { + self.df_schema_provider.table_exist(name) + } +} diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 1d89deacd2..ba804e5081 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -22,7 +22,7 @@ pub enum InnerError { ParseSql { source: sql::errors::ParserError }, #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] - Planner { + PlanSql { sql: String, source: DataFusionError, backtrace: Backtrace, @@ -35,7 +35,7 @@ impl ErrorExt for InnerError { match self { ParseSql { source, .. } => source.status_code(), - Datafusion { .. } | PhysicalPlanDowncast { .. } | Planner { .. } => { + Datafusion { .. } | PhysicalPlanDowncast { .. } | PlanSql { .. } => { StatusCode::Internal } } diff --git a/src/query/src/datafusion/adapter.rs b/src/query/src/datafusion/plan_adapter.rs similarity index 100% rename from src/query/src/datafusion/adapter.rs rename to src/query/src/datafusion/plan_adapter.rs diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs new file mode 100644 index 0000000000..179b1099da --- /dev/null +++ b/src/query/src/datafusion/planner.rs @@ -0,0 +1,113 @@ +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion::catalog::TableReference; +use datafusion::datasource::TableProvider; +use datafusion::physical_plan::udaf::AggregateUDF; +use datafusion::physical_plan::udf::ScalarUDF; +use datafusion::sql::planner::{ContextProvider, SqlToRel}; +use snafu::ResultExt; +use sql::statements::query::Query; +use sql::statements::statement::Statement; +use table::table::adapter::DfTableProviderAdapter; + +use crate::{ + catalog::{self, CatalogListRef}, + datafusion::error, + error::Result, + plan::LogicalPlan, + planner::Planner, +}; + +pub struct DfPlanner<'a, S: ContextProvider> { + sql_to_rel: SqlToRel<'a, S>, +} + +impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { + /// Creates a DataFusion planner instance + pub fn new(schema_provider: &'a S) -> Self { + let rel = SqlToRel::new(schema_provider); + Self { sql_to_rel: rel } + } + + /// Converts QUERY statement to logical plan. + pub fn query_to_plan(&self, query: Box) -> Result { + // todo(hl): original SQL should be provided as an argument + let sql = query.inner.to_string(); + let result = self + .sql_to_rel + .query_to_plan(query.inner) + .context(error::PlanSqlSnafu { sql })?; + + Ok(LogicalPlan::DfPlan(result)) + } +} + +impl<'a, S> Planner for DfPlanner<'a, S> +where + S: ContextProvider + Send + Sync, +{ + /// Converts statement to logical plan using datafusion planner + fn statement_to_plan(&self, statement: Statement) -> Result { + match statement { + Statement::ShowDatabases(_) => { + todo!("Currently not supported") + } + Statement::Query(qb) => self.query_to_plan(qb), + Statement::Insert(_) => { + todo!() + } + } + } +} + +pub(crate) struct DfContextProviderAdapter<'a> { + catalog_list: &'a CatalogListRef, +} + +impl<'a> DfContextProviderAdapter<'a> { + pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { + Self { catalog_list } + } +} + +impl<'a> ContextProvider for DfContextProviderAdapter<'a> { + fn get_table_provider(&self, name: TableReference) -> Option> { + let (catalog, schema, table) = match name { + TableReference::Bare { table } => ( + catalog::DEFAULT_CATALOG_NAME, + catalog::DEFAULT_SCHEMA_NAME, + table, + ), + TableReference::Partial { schema, table } => { + (catalog::DEFAULT_CATALOG_NAME, schema, table) + } + TableReference::Full { + catalog, + schema, + table, + } => (catalog, schema, table), + }; + + self.catalog_list + .catalog(catalog) + .and_then(|catalog_provider| catalog_provider.schema(schema)) + .and_then(|schema_provider| schema_provider.table(table)) + .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) + } + + fn get_function_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_aggregate_meta(&self, _name: &str) -> Option> { + // TODO(dennis) + None + } + + fn get_variable_type(&self, _variable_names: &[String]) -> Option { + // TODO(dennis) + None + } +} diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index d00d51a6db..0814a03711 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -1,111 +1,8 @@ -use std::sync::Arc; - -use arrow::datatypes::DataType; -use datafusion::catalog::TableReference; -use datafusion::datasource::TableProvider; -use datafusion::physical_plan::udaf::AggregateUDF; -use datafusion::physical_plan::udf::ScalarUDF; -use datafusion::sql::planner::{ContextProvider, SqlToRel}; -use snafu::ResultExt; -use sql::statements::query::Query; use sql::statements::statement::Statement; -use table::table::adapter::DfTableProviderAdapter; -use crate::{ - catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}, - datafusion::error, - error::Result, - plan::LogicalPlan, -}; +use crate::{error::Result, plan::LogicalPlan}; +/// SQL logical planner. pub trait Planner: Send + Sync { fn statement_to_plan(&self, statement: Statement) -> Result; } - -pub struct DfPlanner<'a, S: ContextProvider> { - sql_to_rel: SqlToRel<'a, S>, -} - -impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { - /// Creates a DataFusion planner instance - pub fn new(schema_provider: &'a S) -> Self { - let rel = SqlToRel::new(schema_provider); - Self { sql_to_rel: rel } - } - - /// Converts QUERY statement to logical plan. - pub fn query_to_plan(&self, query: Box) -> Result { - // todo(hl): original SQL should be provided as an argument - let sql = query.inner.to_string(); - let result = self - .sql_to_rel - .query_to_plan(query.inner) - // FIXME(yingwen): Move DfPlanner to datafusion mod. - .context(error::PlannerSnafu { sql })?; - - Ok(LogicalPlan::DfPlan(result)) - } -} - -impl<'a, S> Planner for DfPlanner<'a, S> -where - S: ContextProvider + Send + Sync, -{ - /// Converts statement to logical plan using datafusion planner - fn statement_to_plan(&self, statement: Statement) -> Result { - match statement { - Statement::ShowDatabases(_) => { - todo!("Currently not supported") - } - Statement::Query(qb) => self.query_to_plan(qb), - Statement::Insert(_) => { - todo!() - } - } - } -} - -pub(crate) struct DfContextProviderAdapter<'a> { - catalog_list: &'a CatalogListRef, -} - -impl<'a> DfContextProviderAdapter<'a> { - pub(crate) fn new(catalog_list: &'a CatalogListRef) -> Self { - Self { catalog_list } - } -} - -impl<'a> ContextProvider for DfContextProviderAdapter<'a> { - fn get_table_provider(&self, name: TableReference) -> Option> { - let (catalog, schema, table) = match name { - TableReference::Bare { table } => (DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table), - TableReference::Partial { schema, table } => (DEFAULT_CATALOG_NAME, schema, table), - TableReference::Full { - catalog, - schema, - table, - } => (catalog, schema, table), - }; - - self.catalog_list - .catalog(catalog) - .and_then(|catalog_provider| catalog_provider.schema(schema)) - .and_then(|schema_provider| schema_provider.table(table)) - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn get_function_meta(&self, _name: &str) -> Option> { - // TODO(dennis) - None - } - - fn get_aggregate_meta(&self, _name: &str) -> Option> { - // TODO(dennis) - None - } - - fn get_variable_type(&self, _variable_names: &[String]) -> Option { - // TODO(dennis) - None - } -} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f5db6dabed..f7301f6a99 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -1,24 +1,10 @@ -use std::any::Any; use std::fmt; use std::sync::Arc; -use datafusion::catalog::{ - catalog::{CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider}, - schema::SchemaProvider as DfSchemaProvider, -}; -use datafusion::datasource::TableProvider as DfTableProvider; -use datafusion::error::Result as DataFusionResult; -use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; -use snafu::ResultExt; -use table::{ - table::adapter::{DfTableProviderAdapter, TableAdapter}, - Table, -}; -use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider}; -use crate::datafusion::error; -use crate::error::Result; +use crate::catalog::{self, CatalogListRef}; +use crate::datafusion::DfCatalogListAdapter; use crate::executor::Runtime; /// Query engine global state @@ -46,10 +32,10 @@ impl QueryEngineState { ); let df_context = ExecutionContext::with_config(config); - df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter { - catalog_list: catalog_list.clone(), - runtime: df_context.runtime_env(), - }); + df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter::new( + df_context.runtime_env(), + catalog_list.clone(), + )); Self { df_context, @@ -72,196 +58,3 @@ impl QueryEngineState { self.df_context.runtime_env().into() } } - -/// Adapters between datafusion and greptime query engine. -struct DfCatalogListAdapter { - runtime: Arc, - catalog_list: CatalogListRef, -} - -impl DfCatalogList for DfCatalogListAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn register_catalog( - &self, - name: String, - catalog: Arc, - ) -> Option> { - let catalog_adapter = Arc::new(CatalogProviderAdapter { - df_cataglog_provider: catalog, - runtime: self.runtime.clone(), - }); - self.catalog_list - .register_catalog(name, catalog_adapter) - .map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } - - fn catalog_names(&self) -> Vec { - self.catalog_list.catalog_names() - } - - fn catalog(&self, name: &str) -> Option> { - self.catalog_list.catalog(name).map(|catalog_provider| { - Arc::new(DfCatalogProviderAdapter { - catalog_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -/// Datafusion's CatalogProvider -> greptime CatalogProvider -struct CatalogProviderAdapter { - df_cataglog_provider: Arc, - runtime: Arc, -} - -impl CatalogProvider for CatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.df_cataglog_provider.schema_names() - } - - fn schema(&self, name: &str) -> Option> { - self.df_cataglog_provider - .schema(name) - .map(|df_schema_provider| { - Arc::new(SchemaProviderAdapter { - df_schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -///Greptime CatalogProvider -> datafusion's CatalogProvider -struct DfCatalogProviderAdapter { - catalog_provider: Arc, - runtime: Arc, -} - -impl DfCatalogProvider for DfCatalogProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema_names(&self) -> Vec { - self.catalog_provider.schema_names() - } - - fn schema(&self, name: &str) -> Option> { - self.catalog_provider.schema(name).map(|schema_provider| { - Arc::new(DfSchemaProviderAdapter { - schema_provider, - runtime: self.runtime.clone(), - }) as _ - }) - } -} - -/// Greptime SchemaProvider -> datafusion SchemaProvider -struct DfSchemaProviderAdapter { - schema_provider: Arc, - runtime: Arc, -} - -impl DfSchemaProvider for DfSchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - fn table_names(&self) -> Vec { - self.schema_provider.table_names() - } - - fn table(&self, name: &str) -> Option> { - self.schema_provider - .table(name) - .map(|table| Arc::new(DfTableProviderAdapter::new(table)) as _) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> DataFusionResult>> { - let table = Arc::new(TableAdapter::new(table, self.runtime.clone())); - match self.schema_provider.register_table(name, table)? { - Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - None => Ok(None), - } - } - - fn deregister_table(&self, name: &str) -> DataFusionResult>> { - match self.schema_provider.deregister_table(name)? { - Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))), - None => Ok(None), - } - } - - fn table_exist(&self, name: &str) -> bool { - self.schema_provider.table_exist(name) - } -} - -/// Datafuion SchemaProviderAdapter -> greptime SchemaProviderAdapter -struct SchemaProviderAdapter { - df_schema_provider: Arc, - runtime: Arc, -} - -impl SchemaProvider for SchemaProviderAdapter { - fn as_any(&self) -> &dyn Any { - self - } - - /// Retrieves the list of available table names in this schema. - fn table_names(&self) -> Vec { - self.df_schema_provider.table_names() - } - - fn table(&self, name: &str) -> Option> { - self.df_schema_provider.table(name).map(|table_provider| { - Arc::new(TableAdapter::new(table_provider, self.runtime.clone())) as _ - }) - } - - fn register_table( - &self, - name: String, - table: Arc, - ) -> Result>> { - let table_provider = Arc::new(DfTableProviderAdapter::new(table)); - Ok(self - .df_schema_provider - .register_table(name, table_provider) - .context(error::DatafusionSnafu { - msg: "Fail to register table to datafusion", - })? - .map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))) - } - - fn deregister_table(&self, name: &str) -> Result>> { - Ok(self - .df_schema_provider - .deregister_table(name) - .context(error::DatafusionSnafu { - msg: "Fail to deregister table from datafusion", - })? - .map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)) - } - - fn table_exist(&self, name: &str) -> bool { - self.df_schema_provider.table_exist(name) - } -} From 56258d6821a05961bcd9cf8dd46782cf9cd122ea Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 15:45:46 +0800 Subject: [PATCH 09/14] test: Add more test for opaque error --- src/common/error/src/ext.rs | 20 ++++++++++++++++++-- src/common/error/src/status_code.rs | 2 +- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index d78e35326c..31c8bf917a 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -97,6 +97,10 @@ mod tests { } impl ErrorExt for InnerError { + fn status_code(&self) -> StatusCode { + StatusCode::Internal + } + fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { ErrorCompat::backtrace(self) } @@ -133,19 +137,31 @@ mod tests { #[test] fn test_opaque_error() { + // Test leaf error. let err: Error = throw_leaf().map_err(Into::into).err().unwrap(); let msg = format!("{:?}", err); assert!(msg.contains("\nBacktrace:\n")); - assert!(ErrorCompat::backtrace(&err).is_some()); + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); assert_eq!(msg, fmt_msg); + assert!(ErrorCompat::backtrace(&err).is_some()); + assert!(err.backtrace_opt().is_some()); + assert_eq!("This is a leaf error, val: 10", err.to_string()); + assert_eq!(StatusCode::Internal, err.status_code()); + + // Test internal error. let err: Error = throw_internal().map_err(Into::into).err().unwrap(); let msg = format!("{:?}", err); assert!(msg.contains("\nBacktrace:\n")); assert!(msg.contains("Caused by")); - assert!(ErrorCompat::backtrace(&err).is_some()); + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); assert_eq!(msg, fmt_msg); + + assert!(ErrorCompat::backtrace(&err).is_some()); + assert!(err.backtrace_opt().is_some()); + assert_eq!("This is an internal error", err.to_string()); + assert_eq!(StatusCode::Internal, err.status_code()); } } diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 2b0a338b31..e81cc349fc 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -1,5 +1,5 @@ /// Common status code for public API. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum StatusCode { // ====== Begin of common status code ============== /// Unknown error. From fae876ec63c86dfdba2fb679b1c292d399b79e4b Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 16:07:41 +0800 Subject: [PATCH 10/14] test: Add more test for parser error --- src/sql/src/errors.rs | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/src/sql/src/errors.rs b/src/sql/src/errors.rs index 22446d66dd..f7d1b9ed48 100644 --- a/src/sql/src/errors.rs +++ b/src/sql/src/errors.rs @@ -48,20 +48,42 @@ mod tests { use super::*; - #[test] - pub fn test_error_conversion() { - pub fn raise_error() -> Result<(), SpParserError> { - Err(SpParserError::ParserError("parser error".to_string())) - } + fn raise_sp_error() -> Result<(), SpParserError> { + Err(SpParserError::ParserError("parser error".to_string())) + } + #[test] + fn test_syntax_error() { + let err = raise_sp_error() + .context(SpSyntaxSnafu { sql: "" }) + .err() + .unwrap(); assert_matches!( - raise_error().context(SpSyntaxSnafu { - sql: "".to_string(), - }), - Err(ParserError::SpSyntax { + err, + ParserError::SpSyntax { sql: _, source: SpParserError::ParserError { .. } + } + ); + assert_eq!(StatusCode::InvalidSyntax, err.status_code()); + + let err = raise_sp_error() + .context(UnexpectedSnafu { + sql: "", + expected: "", + actual: "", }) - ) + .err() + .unwrap(); + assert_eq!(StatusCode::InvalidSyntax, err.status_code()); + } + + #[test] + fn test_unsupported_error() { + let err = ParserError::Unsupported { + sql: "".to_string(), + keyword: "".to_string(), + }; + assert_eq!(StatusCode::Unsupported, err.status_code()); } } From e0c1bbfd21c99ae6a8f301991478faa29b9a638b Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 16:15:05 +0800 Subject: [PATCH 11/14] refactor: Rename errors::ParserError to error::Error --- src/query/src/datafusion/error.rs | 2 +- src/sql/src/{errors.rs => error.rs} | 28 +++++++++++++++------------- src/sql/src/lib.rs | 2 +- src/sql/src/parser.rs | 15 ++++++++------- src/sql/src/parsers/insert_parser.rs | 6 +++--- src/sql/src/parsers/query_parser.rs | 4 ++-- src/sql/src/statements/query.rs | 6 +++--- 7 files changed, 33 insertions(+), 30 deletions(-) rename src/sql/src/{errors.rs => error.rs} (73%) diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index ba804e5081..57a53a1755 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -19,7 +19,7 @@ pub enum InnerError { // The sql error already contains the SQL. #[snafu(display("Cannot parse SQL, source: {}", source))] - ParseSql { source: sql::errors::ParserError }, + ParseSql { source: sql::error::Error }, #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] PlanSql { diff --git a/src/sql/src/errors.rs b/src/sql/src/error.rs similarity index 73% rename from src/sql/src/errors.rs rename to src/sql/src/error.rs index f7d1b9ed48..1688a396f2 100644 --- a/src/sql/src/errors.rs +++ b/src/sql/src/error.rs @@ -1,12 +1,12 @@ use common_error::prelude::*; -use sqlparser::parser::ParserError as SpParserError; +use sqlparser::parser::ParserError; /// SQL parser errors. // Now the error in parser does not contains backtrace to avoid generating backtrace // every time the parser parses an invalid SQL. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum ParserError { +pub enum Error { #[snafu(display("SQL statement is not supported: {}, keyword: {}", sql, keyword))] Unsupported { sql: String, keyword: String }, @@ -21,19 +21,21 @@ pub enum ParserError { sql: String, expected: String, actual: String, - source: SpParserError, + source: ParserError, }, // Syntax error from sql parser. #[snafu(display("Syntax error, sql: {}, source: {}", sql, source))] - SpSyntax { sql: String, source: SpParserError }, + Syntax { sql: String, source: ParserError }, } -impl ErrorExt for ParserError { +impl ErrorExt for Error { fn status_code(&self) -> StatusCode { + use Error::*; + match self { - Self::Unsupported { .. } => StatusCode::Unsupported, - Self::Unexpected { .. } | Self::SpSyntax { .. } => StatusCode::InvalidSyntax, + Unsupported { .. } => StatusCode::Unsupported, + Unexpected { .. } | Syntax { .. } => StatusCode::InvalidSyntax, } } @@ -48,21 +50,21 @@ mod tests { use super::*; - fn raise_sp_error() -> Result<(), SpParserError> { - Err(SpParserError::ParserError("parser error".to_string())) + fn raise_sp_error() -> Result<(), ParserError> { + Err(ParserError::ParserError("parser error".to_string())) } #[test] fn test_syntax_error() { let err = raise_sp_error() - .context(SpSyntaxSnafu { sql: "" }) + .context(SyntaxSnafu { sql: "" }) .err() .unwrap(); assert_matches!( err, - ParserError::SpSyntax { + Error::Syntax { sql: _, - source: SpParserError::ParserError { .. } + source: ParserError::ParserError { .. } } ); assert_eq!(StatusCode::InvalidSyntax, err.status_code()); @@ -80,7 +82,7 @@ mod tests { #[test] fn test_unsupported_error() { - let err = ParserError::Unsupported { + let err = Error::Unsupported { sql: "".to_string(), keyword: "".to_string(), }; diff --git a/src/sql/src/lib.rs b/src/sql/src/lib.rs index 2ff323e974..6f681719ee 100644 --- a/src/sql/src/lib.rs +++ b/src/sql/src/lib.rs @@ -4,7 +4,7 @@ extern crate core; pub mod ast; pub mod dialect; -pub mod errors; +pub mod error; pub mod parser; pub mod parsers; pub mod statements; diff --git a/src/sql/src/parser.rs b/src/sql/src/parser.rs index 5bb5f2eaf7..cc3660756d 100644 --- a/src/sql/src/parser.rs +++ b/src/sql/src/parser.rs @@ -4,12 +4,12 @@ use sqlparser::keywords::Keyword; use sqlparser::parser::Parser; use sqlparser::tokenizer::{Token, Tokenizer}; -use crate::errors; +use crate::error::{self, Error}; use crate::statements::show_database::SqlShowDatabase; use crate::statements::show_kind::ShowKind; use crate::statements::statement::Statement; -pub type Result = std::result::Result; +pub type Result = std::result::Result; /// GrepTime SQL parser context, a simple wrapper for Datafusion SQL parser. pub struct ParserContext<'a> { @@ -87,10 +87,11 @@ impl<'a> ParserContext<'a> { /// Raises an "unsupported statement" error. pub fn unsupported(&self, keyword: String) -> Result { - Err(errors::ParserError::Unsupported { - sql: self.sql.to_string(), + error::UnsupportedSnafu { + sql: self.sql, keyword, - }) + } + .fail() } /// Parses SHOW statements @@ -137,7 +138,7 @@ impl<'a> ParserContext<'a> { ShowKind::Like( self.parser .parse_identifier() - .context(errors::UnexpectedSnafu { + .context(error::UnexpectedSnafu { sql: self.sql, expected: "LIKE", actual: tok.to_string(), @@ -146,7 +147,7 @@ impl<'a> ParserContext<'a> { ), ))), Keyword::WHERE => Ok(Statement::ShowDatabases(SqlShowDatabase::new( - ShowKind::Where(self.parser.parse_expr().context(errors::UnexpectedSnafu { + ShowKind::Where(self.parser.parse_expr().context(error::UnexpectedSnafu { sql: self.sql.to_string(), expected: "some valid expression".to_string(), actual: self.peek_token_as_string(), diff --git a/src/sql/src/parsers/insert_parser.rs b/src/sql/src/parsers/insert_parser.rs index a1699e8b9a..a3d14a348c 100644 --- a/src/sql/src/parsers/insert_parser.rs +++ b/src/sql/src/parsers/insert_parser.rs @@ -1,7 +1,7 @@ use snafu::ResultExt; use sqlparser::ast::Statement as SpStatement; -use crate::errors; +use crate::error; use crate::parser::ParserContext; use crate::parser::Result; use crate::statements::insert::Insert; @@ -14,13 +14,13 @@ impl<'a> ParserContext<'a> { let spstatement = self .parser .parse_insert() - .context(errors::SpSyntaxSnafu { sql: self.sql })?; + .context(error::SyntaxSnafu { sql: self.sql })?; match spstatement { SpStatement::Insert { .. } => { Ok(Statement::Insert(Box::new(Insert { inner: spstatement }))) } - unexp => errors::UnsupportedSnafu { + unexp => error::UnsupportedSnafu { sql: self.sql.to_string(), keyword: unexp.to_string(), } diff --git a/src/sql/src/parsers/query_parser.rs b/src/sql/src/parsers/query_parser.rs index 35975f2393..d5b332eeaf 100644 --- a/src/sql/src/parsers/query_parser.rs +++ b/src/sql/src/parsers/query_parser.rs @@ -1,6 +1,6 @@ use snafu::prelude::*; -use crate::errors; +use crate::error; use crate::parser::ParserContext; use crate::parser::Result; use crate::statements::query::Query; @@ -12,7 +12,7 @@ impl<'a> ParserContext<'a> { let spquery = self .parser .parse_query() - .context(errors::SpSyntaxSnafu { sql: self.sql })?; + .context(error::SyntaxSnafu { sql: self.sql })?; Ok(Statement::Query(Box::new(Query::try_from(spquery)?))) } diff --git a/src/sql/src/statements/query.rs b/src/sql/src/statements/query.rs index ec4315b91a..419f2a106a 100644 --- a/src/sql/src/statements/query.rs +++ b/src/sql/src/statements/query.rs @@ -1,6 +1,6 @@ use sqlparser::ast::Query as SpQuery; -use crate::errors::ParserError; +use crate::error::Error; /// Query statement instance. #[derive(Debug, Clone, PartialEq)] @@ -10,7 +10,7 @@ pub struct Query { /// Automatically converts from sqlparser Query instance to SqlQuery. impl TryFrom for Query { - type Error = ParserError; + type Error = Error; fn try_from(q: SpQuery) -> Result { Ok(Query { inner: q }) @@ -18,7 +18,7 @@ impl TryFrom for Query { } impl TryFrom for SpQuery { - type Error = ParserError; + type Error = Error; fn try_from(value: Query) -> Result { Ok(value.inner) From d2d4d88c89490fd025365a74e765434c88ed9663 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 17:29:39 +0800 Subject: [PATCH 12/14] test: Add MockError and add more tests for error --- src/common/error/src/lib.rs | 1 + src/common/error/src/mock.rs | 74 +++++++++++++++++++++++++++++++ src/datanode/src/error.rs | 36 ++++++++++++++- src/query/src/catalog/memory.rs | 5 ++- src/query/src/datafusion/error.rs | 53 +++++++++++++++++++++- src/table/src/error.rs | 26 ++++++++++- 6 files changed, 189 insertions(+), 6 deletions(-) create mode 100644 src/common/error/src/mock.rs diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 9b7ed10113..50ef09d166 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -1,5 +1,6 @@ pub mod ext; pub mod format; +pub mod mock; pub mod status_code; pub mod prelude { diff --git a/src/common/error/src/mock.rs b/src/common/error/src/mock.rs new file mode 100644 index 0000000000..e031779b4a --- /dev/null +++ b/src/common/error/src/mock.rs @@ -0,0 +1,74 @@ +//! Utils for mock. + +use std::fmt; + +use snafu::GenerateImplicitData; + +use crate::prelude::*; + +/// A mock error mainly for test. +#[derive(Debug)] +pub struct MockError { + pub code: StatusCode, + backtrace: Option, +} + +impl MockError { + /// Create a new [MockError] without backtrace. + pub fn new(code: StatusCode) -> MockError { + MockError { + code, + backtrace: None, + } + } + + /// Create a new [MockError] with backtrace. + pub fn with_backtrace(code: StatusCode) -> MockError { + MockError { + code, + backtrace: Some(Backtrace::generate()), + } + } +} + +impl fmt::Display for MockError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self.code) + } +} + +impl std::error::Error for MockError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + None + } +} + +impl ErrorExt for MockError { + fn status_code(&self) -> StatusCode { + self.code + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + self.backtrace.as_ref() + } +} + +impl ErrorCompat for MockError { + fn backtrace(&self) -> Option<&Backtrace> { + self.backtrace_opt() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_mock_error() { + let err = MockError::new(StatusCode::Unknown); + assert!(err.backtrace_opt().is_none()); + + let err = MockError::with_backtrace(StatusCode::Unknown); + assert!(err.backtrace_opt().is_some()); + } +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index de767ffe8c..02c82b0061 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -5,10 +5,16 @@ use common_error::prelude::*; #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Fail to execute sql, source: {}", source))] - ExecuteSql { source: query::error::Error }, + ExecuteSql { + #[snafu(backtrace)] + source: query::error::Error, + }, #[snafu(display("Fail to create catalog list, source: {}", source))] - NewCatalog { source: query::error::Error }, + NewCatalog { + #[snafu(backtrace)] + source: query::error::Error, + }, // The error source of http error is clear even without backtrace now so // a backtrace is not carried in this varaint. @@ -31,3 +37,29 @@ impl ErrorExt for Error { ErrorCompat::backtrace(self) } } + +#[cfg(test)] +mod tests { + use common_error::mock::MockError; + + use super::*; + + fn raise_query_error() -> std::result::Result<(), query::error::Error> { + Err(query::error::Error::new(MockError::with_backtrace( + StatusCode::Internal, + ))) + } + + fn assert_internal_error(err: &Error) { + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::Internal, err.status_code()); + } + + #[test] + fn test_error() { + let err = raise_query_error().context(ExecuteSqlSnafu).err().unwrap(); + assert_internal_error(&err); + let err = raise_query_error().context(NewCatalogSnafu).err().unwrap(); + assert_internal_error(&err); + } +} diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index e2c194c001..1245b855b5 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -153,7 +153,6 @@ impl SchemaProvider for MemorySchemaProvider { fn register_table(&self, name: String, table: TableRef) -> Result> { if self.table_exist(name.as_str()) { - // FIXME(yingwen): Define another error. return TableExistsSnafu { table: name }.fail()?; } let mut tables = self.tables.write().unwrap(); @@ -220,6 +219,8 @@ mod tests { assert!(provider.table_exist(table_name)); let other_table = NumbersTable::default(); let result = provider.register_table(table_name.to_string(), Arc::new(other_table)); - assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } } diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 57a53a1755..425519de02 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -19,7 +19,10 @@ pub enum InnerError { // The sql error already contains the SQL. #[snafu(display("Cannot parse SQL, source: {}", source))] - ParseSql { source: sql::error::Error }, + ParseSql { + #[snafu(backtrace)] + source: sql::error::Error, + }, #[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))] PlanSql { @@ -51,3 +54,51 @@ impl From for Error { Self::new(err) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn raise_df_error() -> Result<(), DataFusionError> { + Err(DataFusionError::NotImplemented("test".to_string())) + } + + fn assert_internal_error(err: &InnerError) { + assert_eq!(StatusCode::Internal, err.status_code()); + assert!(err.backtrace_opt().is_some()); + } + + #[test] + fn test_datafusion_as_source() { + let err = raise_df_error() + .context(DatafusionSnafu { msg: "test df" }) + .err() + .unwrap(); + assert_internal_error(&err); + + let err = raise_df_error() + .context(PlanSqlSnafu { sql: "" }) + .err() + .unwrap(); + assert_internal_error(&err); + + let res: Result<(), InnerError> = PhysicalPlanDowncastSnafu {}.fail(); + let err = res.err().unwrap(); + assert_internal_error(&err); + } + + fn raise_sql_error() -> Result<(), sql::error::Error> { + Err(sql::error::Error::Unsupported { + sql: "".to_string(), + keyword: "".to_string(), + }) + } + + #[test] + fn test_parse_error() { + let err = raise_sql_error().context(ParseSqlSnafu).err().unwrap(); + assert!(err.backtrace_opt().is_none()); + let sql_err = raise_sql_error().err().unwrap(); + assert_eq!(sql_err.status_code(), err.status_code()); + } +} diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 5b03f04628..e924a3d0ee 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -26,7 +26,7 @@ pub enum InnerError { } impl ErrorExt for InnerError { - fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { + fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } } @@ -42,3 +42,27 @@ impl From for DataFusionError { DataFusionError::External(Box::new(e)) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn raise_df_error() -> Result<()> { + Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)? + } + + fn raise_repeatedly() -> Result<()> { + ExecuteRepeatedlySnafu {}.fail()? + } + + #[test] + fn test_error() { + let err = raise_df_error().err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::Unknown, err.status_code()); + + let err = raise_repeatedly().err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::Unknown, err.status_code()); + } +} From 5f48b4996bf92d7ee8fe65096e502955cd034734 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 9 May 2022 11:52:01 +0800 Subject: [PATCH 13/14] chore: Address CR comments --- src/common/error/src/ext.rs | 2 +- src/common/error/src/mock.rs | 2 +- src/common/error/src/status_code.rs | 34 +++++++++++++++++++++++++++++ src/datanode/src/error.rs | 6 ++--- src/query/src/datafusion/error.rs | 24 ++++++++++---------- src/sql/src/error.rs | 6 ++--- src/table/src/error.rs | 8 +++---- 7 files changed, 59 insertions(+), 23 deletions(-) diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index 31c8bf917a..98bb9a083a 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -20,7 +20,7 @@ macro_rules! define_opaque_error { /// An error behaves like `Box`. /// /// Define this error as a new type instead of using `Box` directly so we can implement - /// more method or trait for it. + /// more methods or traits for it. pub struct $Error { inner: Box, } diff --git a/src/common/error/src/mock.rs b/src/common/error/src/mock.rs index e031779b4a..8568d87e31 100644 --- a/src/common/error/src/mock.rs +++ b/src/common/error/src/mock.rs @@ -33,7 +33,7 @@ impl MockError { impl fmt::Display for MockError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self.code) + write!(f, "{}", self.code) } } diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index e81cc349fc..0f3286e69e 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -1,3 +1,5 @@ +use std::fmt; + /// Common status code for public API. #[derive(Debug, Clone, Copy, PartialEq)] pub enum StatusCode { @@ -6,6 +8,8 @@ pub enum StatusCode { Unknown, /// Unsupported operation. Unsupported, + /// Unexpected error, maybe there is a BUG. + Unexpected, /// Internal server error. Internal, // ====== End of common status code ================ @@ -15,8 +19,38 @@ pub enum StatusCode { InvalidSyntax, // ====== End of SQL related status code =========== + // ====== Begin of query related status code ======= + /// Fail to create a plan for the query. + PlanQuery, + /// The query engine fail to execute query. + EngineExecuteQuery, + // ====== End of query related status code ========= + // ====== Begin of catalog related status code ===== /// Table already exists. TableAlreadyExists, // ====== End of catalog related status code ======= } + +impl fmt::Display for StatusCode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // The current debug format is suitable to display. + write!(f, "{:?}", self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn assert_status_code_display(code: StatusCode, msg: &str) { + let code_msg = format!("{}", code); + assert_eq!(msg, code_msg); + } + + #[test] + fn test_display_status_code() { + assert_status_code_display(StatusCode::Unknown, "Unknown"); + assert_status_code_display(StatusCode::TableAlreadyExists, "TableAlreadyExists"); + } +} diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 02c82b0061..195f4f0a94 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -44,7 +44,7 @@ mod tests { use super::*; - fn raise_query_error() -> std::result::Result<(), query::error::Error> { + fn throw_query_error() -> std::result::Result<(), query::error::Error> { Err(query::error::Error::new(MockError::with_backtrace( StatusCode::Internal, ))) @@ -57,9 +57,9 @@ mod tests { #[test] fn test_error() { - let err = raise_query_error().context(ExecuteSqlSnafu).err().unwrap(); + let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap(); assert_internal_error(&err); - let err = raise_query_error().context(NewCatalogSnafu).err().unwrap(); + let err = throw_query_error().context(NewCatalogSnafu).err().unwrap(); assert_internal_error(&err); } } diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 425519de02..69c6a01501 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -37,10 +37,12 @@ impl ErrorExt for InnerError { use InnerError::*; match self { + // TODO(yingwen): Further categorize datafusion error. + Datafusion { .. } => StatusCode::EngineExecuteQuery, + // This downcast should not fail in usual case. + PhysicalPlanDowncast { .. } => StatusCode::Unexpected, ParseSql { source, .. } => source.status_code(), - Datafusion { .. } | PhysicalPlanDowncast { .. } | PlanSql { .. } => { - StatusCode::Internal - } + PlanSql { .. } => StatusCode::PlanQuery, } } @@ -59,32 +61,32 @@ impl From for Error { mod tests { use super::*; - fn raise_df_error() -> Result<(), DataFusionError> { + fn throw_df_error() -> Result<(), DataFusionError> { Err(DataFusionError::NotImplemented("test".to_string())) } - fn assert_internal_error(err: &InnerError) { - assert_eq!(StatusCode::Internal, err.status_code()); + fn assert_error(err: &InnerError, code: StatusCode) { + assert_eq!(code, err.status_code()); assert!(err.backtrace_opt().is_some()); } #[test] fn test_datafusion_as_source() { - let err = raise_df_error() + let err = throw_df_error() .context(DatafusionSnafu { msg: "test df" }) .err() .unwrap(); - assert_internal_error(&err); + assert_error(&err, StatusCode::EngineExecuteQuery); - let err = raise_df_error() + let err = throw_df_error() .context(PlanSqlSnafu { sql: "" }) .err() .unwrap(); - assert_internal_error(&err); + assert_error(&err, StatusCode::PlanQuery); let res: Result<(), InnerError> = PhysicalPlanDowncastSnafu {}.fail(); let err = res.err().unwrap(); - assert_internal_error(&err); + assert_error(&err, StatusCode::Unexpected); } fn raise_sql_error() -> Result<(), sql::error::Error> { diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 1688a396f2..df0219c8d6 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -50,13 +50,13 @@ mod tests { use super::*; - fn raise_sp_error() -> Result<(), ParserError> { + fn throw_sp_error() -> Result<(), ParserError> { Err(ParserError::ParserError("parser error".to_string())) } #[test] fn test_syntax_error() { - let err = raise_sp_error() + let err = throw_sp_error() .context(SyntaxSnafu { sql: "" }) .err() .unwrap(); @@ -69,7 +69,7 @@ mod tests { ); assert_eq!(StatusCode::InvalidSyntax, err.status_code()); - let err = raise_sp_error() + let err = throw_sp_error() .context(UnexpectedSnafu { sql: "", expected: "", diff --git a/src/table/src/error.rs b/src/table/src/error.rs index e924a3d0ee..9e48a63248 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -47,21 +47,21 @@ impl From for DataFusionError { mod tests { use super::*; - fn raise_df_error() -> Result<()> { + fn throw_df_error() -> Result<()> { Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)? } - fn raise_repeatedly() -> Result<()> { + fn throw_repeatedly() -> Result<()> { ExecuteRepeatedlySnafu {}.fail()? } #[test] fn test_error() { - let err = raise_df_error().err().unwrap(); + let err = throw_df_error().err().unwrap(); assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::Unknown, err.status_code()); - let err = raise_repeatedly().err().unwrap(); + let err = throw_repeatedly().err().unwrap(); assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::Unknown, err.status_code()); } From d5de0306006106b204b7fa56d2d22d103074a748 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 9 May 2022 12:35:55 +0800 Subject: [PATCH 14/14] feat: Add as_any() to opaque error --- src/common/error/src/ext.rs | 18 ++++++++++++++++++ src/common/error/src/format.rs | 14 ++++++++++++++ src/common/error/src/mock.rs | 5 +++++ src/datanode/src/error.rs | 6 ++++++ src/query/src/catalog/memory.rs | 4 ++++ src/query/src/datafusion/error.rs | 6 ++++++ src/sql/src/error.rs | 6 ++++++ src/table/src/error.rs | 6 ++++++ 8 files changed, 65 insertions(+) diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index 98bb9a083a..d69e4f1e94 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use crate::status_code::StatusCode; /// Extension to [`Error`](std::error::Error) in std. @@ -11,6 +13,10 @@ pub trait ErrorExt: std::error::Error { // Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`, once backtrace // in std is stable, we can deprecate this method. fn backtrace_opt(&self) -> Option<&crate::snafu::Backtrace>; + + /// Returns the error as [Any](std::any::Any) so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; } /// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait. @@ -61,6 +67,10 @@ macro_rules! define_opaque_error { fn backtrace_opt(&self) -> Option<&$crate::snafu::Backtrace> { self.inner.backtrace_opt() } + + fn as_any(&self) -> &dyn std::any::Any { + self.inner.as_any() + } } // Implement ErrorCompat for this opaque error so the backtrace is also available @@ -104,6 +114,10 @@ mod tests { fn backtrace_opt(&self) -> Option<&snafu::Backtrace> { ErrorCompat::backtrace(self) } + + fn as_any(&self) -> &dyn Any { + self + } } impl From for Error { @@ -150,6 +164,8 @@ mod tests { assert_eq!("This is a leaf error, val: 10", err.to_string()); assert_eq!(StatusCode::Internal, err.status_code()); + err.as_any().downcast_ref::().unwrap(); + // Test internal error. let err: Error = throw_internal().map_err(Into::into).err().unwrap(); let msg = format!("{:?}", err); @@ -163,5 +179,7 @@ mod tests { assert!(err.backtrace_opt().is_some()); assert_eq!("This is an internal error", err.to_string()); assert_eq!(StatusCode::Internal, err.status_code()); + + err.as_any().downcast_ref::().unwrap(); } } diff --git a/src/common/error/src/format.rs b/src/common/error/src/format.rs index b994e0aa8b..f4e592dc21 100644 --- a/src/common/error/src/format.rs +++ b/src/common/error/src/format.rs @@ -30,6 +30,8 @@ impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> { #[cfg(test)] mod tests { + use std::any::Any; + use snafu::{prelude::*, Backtrace, GenerateImplicitData}; use super::*; @@ -42,6 +44,10 @@ mod tests { fn backtrace_opt(&self) -> Option<&Backtrace> { None } + + fn as_any(&self) -> &dyn Any { + self + } } #[derive(Debug, Snafu)] @@ -54,6 +60,10 @@ mod tests { fn backtrace_opt(&self) -> Option<&Backtrace> { Some(&self.backtrace) } + + fn as_any(&self) -> &dyn Any { + self + } } #[derive(Debug, Snafu)] @@ -68,6 +78,10 @@ mod tests { fn backtrace_opt(&self) -> Option<&Backtrace> { Some(&self.backtrace) } + + fn as_any(&self) -> &dyn Any { + self + } } #[test] diff --git a/src/common/error/src/mock.rs b/src/common/error/src/mock.rs index 8568d87e31..f33dcb88ed 100644 --- a/src/common/error/src/mock.rs +++ b/src/common/error/src/mock.rs @@ -1,5 +1,6 @@ //! Utils for mock. +use std::any::Any; use std::fmt; use snafu::GenerateImplicitData; @@ -51,6 +52,10 @@ impl ErrorExt for MockError { fn backtrace_opt(&self) -> Option<&Backtrace> { self.backtrace.as_ref() } + + fn as_any(&self) -> &dyn Any { + self + } } impl ErrorCompat for MockError { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 195f4f0a94..537d5c75dc 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use common_error::prelude::*; /// Business error of datanode. @@ -36,6 +38,10 @@ impl ErrorExt for Error { fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } + + fn as_any(&self) -> &dyn Any { + self + } } #[cfg(test)] diff --git a/src/query/src/catalog/memory.rs b/src/query/src/catalog/memory.rs index 1245b855b5..5d1eb5aa4a 100644 --- a/src/query/src/catalog/memory.rs +++ b/src/query/src/catalog/memory.rs @@ -31,6 +31,10 @@ impl ErrorExt for InnerError { fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } + + fn as_any(&self) -> &dyn Any { + self + } } impl From for Error { diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 69c6a01501..7ed2d319f1 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use common_error::prelude::*; use datafusion::error::DataFusionError; @@ -49,6 +51,10 @@ impl ErrorExt for InnerError { fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } + + fn as_any(&self) -> &dyn Any { + self + } } impl From for Error { diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index df0219c8d6..5bf2768d90 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use common_error::prelude::*; use sqlparser::parser::ParserError; @@ -42,6 +44,10 @@ impl ErrorExt for Error { fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } + + fn as_any(&self) -> &dyn Any { + self + } } #[cfg(test)] diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 9e48a63248..1e9a0f4903 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -1,3 +1,5 @@ +use std::any::Any; + use common_error::prelude::*; use datafusion::error::DataFusionError; @@ -29,6 +31,10 @@ impl ErrorExt for InnerError { fn backtrace_opt(&self) -> Option<&Backtrace> { ErrorCompat::backtrace(self) } + + fn as_any(&self) -> &dyn Any { + self + } } impl From for Error {