From 63d9aa1bffb8c49c46821cb727892ab9041f48e3 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 7 May 2022 11:33:45 +0800 Subject: [PATCH] refactor: Refactor datanode error and impl ErrorExt for it --- Cargo.lock | 1 + src/common/error/src/ext.rs | 8 +++++++ src/common/error/src/lib.rs | 2 +- src/common/error/src/status_code.rs | 2 ++ src/datanode/Cargo.toml | 4 ++-- src/datanode/src/datanode.rs | 4 ++-- src/datanode/src/error.rs | 33 +++++++++++++++++++++++------ src/datanode/src/instance.rs | 9 +++++--- src/datanode/src/server/http.rs | 4 ++-- src/query/src/error.rs | 1 - 10 files changed, 50 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe9a95184e..a776c55494 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -651,6 +651,7 @@ dependencies = [ "arrow2", "axum", "axum-macros", + "common-error", "common-recordbatch", "hyper", "query", diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index deaf728601..d78e35326c 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -35,6 +35,7 @@ macro_rules! define_opaque_error { impl std::fmt::Debug for $Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Use the pretty debug format of inner error for opaque error. let debug_format = $crate::format::DebugFormat::new(&*self.inner); debug_format.fmt(f) } @@ -62,6 +63,8 @@ macro_rules! define_opaque_error { } } + // Implement ErrorCompat for this opaque error so the backtrace is also available + // via `ErrorCompat::backtrace()`. impl $crate::snafu::ErrorCompat for $Error { fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> { self.inner.backtrace_opt() @@ -77,6 +80,7 @@ mod tests { use snafu::{prelude::*, Backtrace, ErrorCompat}; use super::*; + use crate::prelude::*; define_opaque_error!(Error); @@ -133,11 +137,15 @@ mod tests { let msg = format!("{:?}", err); assert!(msg.contains("\nBacktrace:\n")); assert!(ErrorCompat::backtrace(&err).is_some()); + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!(msg, fmt_msg); let err: Error = throw_internal().map_err(Into::into).err().unwrap(); let msg = format!("{:?}", err); assert!(msg.contains("\nBacktrace:\n")); assert!(msg.contains("Caused by")); assert!(ErrorCompat::backtrace(&err).is_some()); + let fmt_msg = format!("{:?}", DebugFormat::new(&err)); + assert_eq!(msg, fmt_msg); } } diff --git a/src/common/error/src/lib.rs b/src/common/error/src/lib.rs index 9ac962b859..3f8cb939da 100644 --- a/src/common/error/src/lib.rs +++ b/src/common/error/src/lib.rs @@ -3,7 +3,7 @@ pub mod format; pub mod status_code; pub mod prelude { - pub use snafu::Backtrace; + pub use snafu::{Backtrace, ErrorCompat}; pub use crate::ext::ErrorExt; pub use crate::format::DebugFormat; diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index 92f0ec2620..2afcbc278d 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -6,6 +6,8 @@ pub enum StatusCode { Unknown, /// Unsupported operation. Unsupported, + /// Internal server error. + Internal, // ====== End of common status code ============= // ====== Begin of SQL related status code ====== diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index a6aa9a8fa6..11bde84dfe 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -8,7 +8,8 @@ edition = "2021" [dependencies] axum = "0.5" axum-macros = "0.2" -common-recordbatch = {path = "../common/recordbatch" } +common-error = { path = "../common/error" } +common-recordbatch = { path = "../common/recordbatch" } hyper = { version = "0.14", features = ["full"] } query = { path = "../query" } serde = "1.0" @@ -23,4 +24,3 @@ tower-http = { version ="0.3", features = ["full"]} package = "arrow2" version="0.10" features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"] - diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 454db221be..cef5f35745 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -4,7 +4,7 @@ use query::catalog::memory; use query::catalog::CatalogListRef; use snafu::ResultExt; -use crate::error::{QuerySnafu, Result}; +use crate::error::{NewCatalogSnafu, Result}; use crate::instance::{Instance, InstanceRef}; use crate::server::Services; @@ -17,7 +17,7 @@ pub struct DataNode { impl DataNode { pub fn new() -> Result { - let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?; + let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?; let instance = Arc::new(Instance::new(catalog_list.clone())); Ok(Self { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 04cf8ab34c..6b4762d008 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -1,15 +1,34 @@ -use hyper::Error as HyperError; -use query::error::Error as QueryError; +use common_error::prelude::*; use snafu::Snafu; -/// business error of datanode. +/// Business error of datanode. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Query error: {}", source))] - Query { source: QueryError }, - #[snafu(display("Http error: {}", source))] - Hyper { source: HyperError }, + #[snafu(display("Fail to execute sql, source: {}", source))] + ExecuteSql { source: query::error::Error }, + + #[snafu(display("Fail to create catalog list, source: {}", source))] + NewCatalog { source: query::error::Error }, + + // The error source of http error is clear even without backtrace now so + // a backtrace is not carried in this varaint. + #[snafu(display("Fail to start HTTP server, source: {}", source))] + StartHttp { source: hyper::Error }, } pub type Result = std::result::Result; + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + match self { + Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(), + // TODO(yingwen): Further categorize http error. + Error::StartHttp { .. } => StatusCode::Internal, + } + } + + fn backtrace_opt(&self) -> Option<&Backtrace> { + ErrorCompat::backtrace(self) + } +} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 6f04239c6d..b2ee532977 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -4,7 +4,7 @@ use query::catalog::CatalogListRef; use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef}; use snafu::ResultExt; -use crate::error::{QuerySnafu, Result}; +use crate::error::{ExecuteSqlSnafu, Result}; // An abstraction to read/write services. pub struct Instance { @@ -27,12 +27,15 @@ impl Instance { } pub async fn execute_sql(&self, sql: &str) -> Result { - let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?; + let logical_plan = self + .query_engine + .sql_to_plan(sql) + .context(ExecuteSqlSnafu)?; self.query_engine .execute(&logical_plan) .await - .context(QuerySnafu) + .context(ExecuteSqlSnafu) } } diff --git a/src/datanode/src/server/http.rs b/src/datanode/src/server/http.rs index a1f22c57ef..2061f05a6c 100644 --- a/src/datanode/src/server/http.rs +++ b/src/datanode/src/server/http.rs @@ -17,7 +17,7 @@ use snafu::ResultExt; use tower::{timeout::TimeoutLayer, ServiceBuilder}; use tower_http::trace::TraceLayer; -use crate::error::{HyperSnafu, Result}; +use crate::error::{Result, StartHttpSnafu}; use crate::server::InstanceRef; /// Http server @@ -106,7 +106,7 @@ impl HttpServer { let server = axum::Server::bind(&addr).serve(app.into_make_service()); let graceful = server.with_graceful_shutdown(shutdown_signal()); - graceful.await.context(HyperSnafu)?; + graceful.await.context(StartHttpSnafu)?; Ok(()) } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 76cedc1872..1253da97d2 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -1,5 +1,4 @@ use datafusion::error::DataFusionError; -use snafu::Snafu; common_error::define_opaque_error!(Error);