refactor: Refactor datanode error and impl ErrorExt for it

This commit is contained in:
evenyag
2022-05-07 11:33:45 +08:00
parent 7e2e3e3429
commit 63d9aa1bff
10 changed files with 50 additions and 18 deletions

1
Cargo.lock generated
View File

@@ -651,6 +651,7 @@ dependencies = [
"arrow2",
"axum",
"axum-macros",
"common-error",
"common-recordbatch",
"hyper",
"query",

View File

@@ -35,6 +35,7 @@ macro_rules! define_opaque_error {
impl std::fmt::Debug for $Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Use the pretty debug format of inner error for opaque error.
let debug_format = $crate::format::DebugFormat::new(&*self.inner);
debug_format.fmt(f)
}
@@ -62,6 +63,8 @@ macro_rules! define_opaque_error {
}
}
// Implement ErrorCompat for this opaque error so the backtrace is also available
// via `ErrorCompat::backtrace()`.
impl $crate::snafu::ErrorCompat for $Error {
fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> {
self.inner.backtrace_opt()
@@ -77,6 +80,7 @@ mod tests {
use snafu::{prelude::*, Backtrace, ErrorCompat};
use super::*;
use crate::prelude::*;
define_opaque_error!(Error);
@@ -133,11 +137,15 @@ mod tests {
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
assert!(ErrorCompat::backtrace(&err).is_some());
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
let err: Error = throw_internal().map_err(Into::into).err().unwrap();
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
assert!(msg.contains("Caused by"));
assert!(ErrorCompat::backtrace(&err).is_some());
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
}
}

View File

@@ -3,7 +3,7 @@ pub mod format;
pub mod status_code;
pub mod prelude {
pub use snafu::Backtrace;
pub use snafu::{Backtrace, ErrorCompat};
pub use crate::ext::ErrorExt;
pub use crate::format::DebugFormat;

View File

@@ -6,6 +6,8 @@ pub enum StatusCode {
Unknown,
/// Unsupported operation.
Unsupported,
/// Internal server error.
Internal,
// ====== End of common status code =============
// ====== Begin of SQL related status code ======

View File

@@ -8,7 +8,8 @@ edition = "2021"
[dependencies]
axum = "0.5"
axum-macros = "0.2"
common-recordbatch = {path = "../common/recordbatch" }
common-error = { path = "../common/error" }
common-recordbatch = { path = "../common/recordbatch" }
hyper = { version = "0.14", features = ["full"] }
query = { path = "../query" }
serde = "1.0"
@@ -23,4 +24,3 @@ tower-http = { version ="0.3", features = ["full"]}
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]

View File

@@ -4,7 +4,7 @@ use query::catalog::memory;
use query::catalog::CatalogListRef;
use snafu::ResultExt;
use crate::error::{QuerySnafu, Result};
use crate::error::{NewCatalogSnafu, Result};
use crate::instance::{Instance, InstanceRef};
use crate::server::Services;
@@ -17,7 +17,7 @@ pub struct DataNode {
impl DataNode {
pub fn new() -> Result<DataNode> {
let catalog_list = memory::new_memory_catalog_list().context(QuerySnafu)?;
let catalog_list = memory::new_memory_catalog_list().context(NewCatalogSnafu)?;
let instance = Arc::new(Instance::new(catalog_list.clone()));
Ok(Self {

View File

@@ -1,15 +1,34 @@
use hyper::Error as HyperError;
use query::error::Error as QueryError;
use common_error::prelude::*;
use snafu::Snafu;
/// business error of datanode.
/// Business error of datanode.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Query error: {}", source))]
Query { source: QueryError },
#[snafu(display("Http error: {}", source))]
Hyper { source: HyperError },
#[snafu(display("Fail to execute sql, source: {}", source))]
ExecuteSql { source: query::error::Error },
#[snafu(display("Fail to create catalog list, source: {}", source))]
NewCatalog { source: query::error::Error },
// The error source of http error is clear even without backtrace now so
// a backtrace is not carried in this varaint.
#[snafu(display("Fail to start HTTP server, source: {}", source))]
StartHttp { source: hyper::Error },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteSql { source } | Error::NewCatalog { source } => source.status_code(),
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. } => StatusCode::Internal,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
}

View File

@@ -4,7 +4,7 @@ use query::catalog::CatalogListRef;
use query::query_engine::{Output, QueryEngineFactory, QueryEngineRef};
use snafu::ResultExt;
use crate::error::{QuerySnafu, Result};
use crate::error::{ExecuteSqlSnafu, Result};
// An abstraction to read/write services.
pub struct Instance {
@@ -27,12 +27,15 @@ impl Instance {
}
pub async fn execute_sql(&self, sql: &str) -> Result<Output> {
let logical_plan = self.query_engine.sql_to_plan(sql).context(QuerySnafu)?;
let logical_plan = self
.query_engine
.sql_to_plan(sql)
.context(ExecuteSqlSnafu)?;
self.query_engine
.execute(&logical_plan)
.await
.context(QuerySnafu)
.context(ExecuteSqlSnafu)
}
}

View File

@@ -17,7 +17,7 @@ use snafu::ResultExt;
use tower::{timeout::TimeoutLayer, ServiceBuilder};
use tower_http::trace::TraceLayer;
use crate::error::{HyperSnafu, Result};
use crate::error::{Result, StartHttpSnafu};
use crate::server::InstanceRef;
/// Http server
@@ -106,7 +106,7 @@ impl HttpServer {
let server = axum::Server::bind(&addr).serve(app.into_make_service());
let graceful = server.with_graceful_shutdown(shutdown_signal());
graceful.await.context(HyperSnafu)?;
graceful.await.context(StartHttpSnafu)?;
Ok(())
}

View File

@@ -1,5 +1,4 @@
use datafusion::error::DataFusionError;
use snafu::Snafu;
common_error::define_opaque_error!(Error);