mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: Add common-error crate and implement opaque error type.
This commit is contained in:
55
Cargo.lock
generated
55
Cargo.lock
generated
@@ -2,6 +2,15 @@
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "addr2line"
|
||||
version = "0.17.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b"
|
||||
dependencies = [
|
||||
"gimli",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "adler"
|
||||
version = "1.0.2"
|
||||
@@ -223,6 +232,21 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.65"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "11a17d453482a265fd5f8479f2a3f405566e6ca627837aaddb85af8b1ab8ef61"
|
||||
dependencies = [
|
||||
"addr2line",
|
||||
"cc",
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"miniz_oxide",
|
||||
"object",
|
||||
"rustc-demangle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "base64"
|
||||
version = "0.13.0"
|
||||
@@ -433,6 +457,13 @@ dependencies = [
|
||||
name = "common-base"
|
||||
version = "0.1.0"
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"snafu",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.1.0"
|
||||
@@ -818,6 +849,12 @@ dependencies = [
|
||||
"wasi 0.10.2+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gimli"
|
||||
version = "0.26.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4"
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.3.13"
|
||||
@@ -1300,6 +1337,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object"
|
||||
version = "0.28.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "40bec70ba014595f99f7aa110b84331ffe1ee9aece7fe6f387cc7e3ecda4d456"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.1.0"
|
||||
@@ -1486,6 +1532,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"arrow2",
|
||||
"async-trait",
|
||||
"common-error",
|
||||
"common-recordbatch",
|
||||
"datafusion",
|
||||
"datatypes",
|
||||
@@ -1578,6 +1625,12 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustc-demangle"
|
||||
version = "0.1.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342"
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.6"
|
||||
@@ -1684,6 +1737,7 @@ version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2eba135d2c579aa65364522eb78590cdf703176ef71ad4c32b00f58f7afb2df5"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"doc-comment",
|
||||
"snafu-derive",
|
||||
]
|
||||
@@ -1815,6 +1869,7 @@ dependencies = [
|
||||
"arrow2",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"common-error",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"datafusion",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
[workspace]
|
||||
members = [
|
||||
"src/common/base",
|
||||
"src/common/error",
|
||||
"src/common/query",
|
||||
"src/common/recordbatch",
|
||||
"src/cmd",
|
||||
|
||||
9
src/common/error/Cargo.toml
Normal file
9
src/common/error/Cargo.toml
Normal file
@@ -0,0 +1,9 @@
|
||||
[package]
|
||||
name = "common-error"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
123
src/common/error/src/ext.rs
Normal file
123
src/common/error/src/ext.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use crate::status_code::StatusCode;
|
||||
|
||||
/// Extension to [`Error`](std::error::Error) in std.
|
||||
pub trait ErrorExt: std::error::Error {
|
||||
/// Returns the [StatusCode] of this holder.
|
||||
fn status_code(&self) -> StatusCode {
|
||||
StatusCode::Unknown
|
||||
}
|
||||
|
||||
/// Get the reference to the backtrace of this error.
|
||||
// Add `_opt` suffix to avoid confusing with similar method in `std::error::Error`.
|
||||
fn backtrace_opt(&self) -> Option<&snafu::Backtrace>;
|
||||
}
|
||||
|
||||
/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait.
|
||||
#[macro_export]
|
||||
macro_rules! define_opaque_error {
|
||||
($Error:ident) => {
|
||||
/// An error behaves like `Box<dyn Error>`.
|
||||
///
|
||||
/// Define this error as a new type instead of using `Box<dyn Error>` directly so we can implement
|
||||
/// more method or trait for it.
|
||||
pub struct $Error {
|
||||
inner: Box<dyn $crate::ext::ErrorExt + Send + Sync>,
|
||||
}
|
||||
|
||||
impl $Error {
|
||||
/// Create a new error.
|
||||
pub fn new<E: $crate::ext::ErrorExt + Send + Sync + 'static>(err: E) -> Self {
|
||||
Self {
|
||||
inner: Box::new(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for $Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let debug_format = $crate::format::DebugFormat::new(&*self.inner);
|
||||
debug_format.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for $Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for $Error {
|
||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||
self.inner.source()
|
||||
}
|
||||
}
|
||||
|
||||
impl $crate::ext::ErrorExt for $Error {
|
||||
fn status_code(&self) -> $crate::status_code::StatusCode {
|
||||
self.inner.status_code()
|
||||
}
|
||||
|
||||
fn backtrace_opt(&self) -> Option<&snafu::Backtrace> {
|
||||
self.inner.backtrace_opt()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::error::Error as StdError;
|
||||
|
||||
use snafu::{prelude::*, Backtrace};
|
||||
|
||||
use super::*;
|
||||
|
||||
define_opaque_error!(Error);
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
enum InnerError {
|
||||
#[snafu(display("This is a leaf error, val: {}", val))]
|
||||
Leaf { val: i32, backtrace: Backtrace },
|
||||
|
||||
#[snafu(display("This is an internal error"))]
|
||||
Internal {
|
||||
source: std::io::Error,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
fn backtrace_opt(&self) -> Option<&snafu::Backtrace> {
|
||||
snafu::ErrorCompat::backtrace(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(err: InnerError) -> Self {
|
||||
Self::new(err)
|
||||
}
|
||||
}
|
||||
|
||||
fn throw_leaf() -> std::result::Result<(), InnerError> {
|
||||
LeafSnafu { val: 10 }.fail()
|
||||
}
|
||||
|
||||
fn throw_io() -> std::result::Result<(), std::io::Error> {
|
||||
Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!"))
|
||||
}
|
||||
|
||||
fn throw_internal() -> std::result::Result<(), InnerError> {
|
||||
throw_io().context(InternalSnafu)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_opaque_error() {
|
||||
let leaf = throw_leaf().err().unwrap();
|
||||
assert!(leaf.backtrace_opt().is_some());
|
||||
assert!(leaf.source().is_none());
|
||||
|
||||
let internal = throw_internal().err().unwrap();
|
||||
assert!(internal.backtrace_opt().is_some());
|
||||
assert!(internal.source().is_some());
|
||||
}
|
||||
}
|
||||
95
src/common/error/src/format.rs
Normal file
95
src/common/error/src/format.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use std::fmt;
|
||||
|
||||
use crate::ext::ErrorExt;
|
||||
|
||||
/// Pretty debug format for error, also prints source and backtrace.
|
||||
pub struct DebugFormat<'a, E: ?Sized>(&'a E);
|
||||
|
||||
impl<'a, E: ErrorExt + ?Sized> DebugFormat<'a, E> {
|
||||
/// Create a new format struct from `err`.
|
||||
pub fn new(err: &'a E) -> Self {
|
||||
Self(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, E: ErrorExt + ?Sized> fmt::Debug for DebugFormat<'a, E> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}.", self.0)?;
|
||||
if let Some(source) = self.0.source() {
|
||||
// Source error use debug format for more verbose info.
|
||||
write!(f, " Caused by: {:?}", source)?;
|
||||
}
|
||||
if let Some(backtrace) = self.0.backtrace_opt() {
|
||||
// Add a newline to seperate causes and backtrace.
|
||||
write!(f, "\nBacktrace:\n{}", backtrace)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use snafu::{prelude::*, Backtrace, GenerateImplicitData};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(display("This is a leaf error"))]
|
||||
struct Leaf;
|
||||
|
||||
impl ErrorExt for Leaf {
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(display("This is a leaf with backtrace"))]
|
||||
struct LeafWithBacktrace {
|
||||
backtrace: Backtrace,
|
||||
}
|
||||
|
||||
impl ErrorExt for LeafWithBacktrace {
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
Some(&self.backtrace)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(display("Internal error"))]
|
||||
struct Internal {
|
||||
#[snafu(source)]
|
||||
source: Leaf,
|
||||
backtrace: Backtrace,
|
||||
}
|
||||
|
||||
impl ErrorExt for Internal {
|
||||
fn backtrace_opt(&self) -> Option<&Backtrace> {
|
||||
Some(&self.backtrace)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_debug_format() {
|
||||
let err = Leaf;
|
||||
|
||||
let msg = format!("{:?}", DebugFormat::new(&err));
|
||||
assert_eq!("This is a leaf error.", msg);
|
||||
|
||||
let err = LeafWithBacktrace {
|
||||
backtrace: Backtrace::generate(),
|
||||
};
|
||||
|
||||
let msg = format!("{:?}", DebugFormat::new(&err));
|
||||
assert!(msg.starts_with("This is a leaf with backtrace.\nBacktrace:\n"));
|
||||
|
||||
let err = Internal {
|
||||
source: Leaf,
|
||||
backtrace: Backtrace::generate(),
|
||||
};
|
||||
|
||||
let msg = format!("{:?}", DebugFormat::new(&err));
|
||||
assert!(msg.contains("Internal error. Caused by: Leaf\nBacktrace:\n"));
|
||||
}
|
||||
}
|
||||
3
src/common/error/src/lib.rs
Normal file
3
src/common/error/src/lib.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
pub mod ext;
|
||||
pub mod format;
|
||||
pub mod status_code;
|
||||
6
src/common/error/src/status_code.rs
Normal file
6
src/common/error/src/status_code.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
/// Common status code for public API.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum StatusCode {
|
||||
/// Unknown status.
|
||||
Unknown,
|
||||
}
|
||||
@@ -15,7 +15,7 @@ datatypes = {path ="../../datatypes" }
|
||||
futures = "0.3"
|
||||
paste = "1.0"
|
||||
serde = "1.0"
|
||||
snafu = "0.7"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
|
||||
[dev-dependencies.arrow]
|
||||
package = "arrow2"
|
||||
@@ -24,3 +24,4 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
use arrow::error::ArrowError;
|
||||
use snafu::Snafu;
|
||||
use snafu::{Backtrace, Snafu};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Arrow error: {}", source))]
|
||||
Arrow { source: ArrowError },
|
||||
Arrow {
|
||||
source: ArrowError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
@@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] }
|
||||
query = { path = "../query" }
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
snafu = "0.7"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
table = { path = "../table" }
|
||||
tokio = { version = "1.18", features = ["full"] }
|
||||
tower = { version = "0.4", features = ["full"]}
|
||||
@@ -23,3 +23,4 @@ tower-http = { version ="0.3", features = ["full"]}
|
||||
package = "arrow2"
|
||||
version="0.10"
|
||||
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
|
||||
|
||||
|
||||
@@ -10,12 +10,13 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
common-error = { path = "../common/error" }
|
||||
common-recordbatch = {path = "../common/recordbatch" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
|
||||
datatypes = {path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
futures-util = "0.3.21"
|
||||
snafu = "0.7.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
table = { path = "../table" }
|
||||
tokio = "1.0"
|
||||
sql = { path = "../sql" }
|
||||
|
||||
@@ -11,7 +11,8 @@ use crate::catalog::{
|
||||
CatalogList, CatalogListRef, CatalogProvider, CatalogProviderRef, DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use crate::error::{ExecutionSnafu, Result};
|
||||
use crate::error::Result;
|
||||
use crate::query_engine::datafusion::error;
|
||||
|
||||
/// Simple in-memory list of catalogs
|
||||
#[derive(Default)]
|
||||
@@ -127,10 +128,11 @@ impl SchemaProvider for MemorySchemaProvider {
|
||||
|
||||
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
|
||||
if self.table_exist(name.as_str()) {
|
||||
return ExecutionSnafu {
|
||||
// FIXME(yingwen): Define another error.
|
||||
return error::ExecutionSnafu {
|
||||
message: format!("The table {} already exists", name),
|
||||
}
|
||||
.fail();
|
||||
.fail()?;
|
||||
}
|
||||
let mut tables = self.tables.write().unwrap();
|
||||
Ok(tables.insert(name, table))
|
||||
|
||||
@@ -1,28 +1,6 @@
|
||||
use common_recordbatch::error::Error as RecordBatchError;
|
||||
use datafusion::error::DataFusionError;
|
||||
use snafu::Snafu;
|
||||
use sql::errors::ParserError;
|
||||
|
||||
/// business error of query engine
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Datafusion query engine error: {}", source))]
|
||||
Datafusion { source: DataFusionError },
|
||||
#[snafu(display("PhysicalPlan downcast_ref failed"))]
|
||||
PhysicalPlanDowncast,
|
||||
#[snafu(display("RecordBatch error: {}", source))]
|
||||
RecordBatch { source: RecordBatchError },
|
||||
#[snafu(display("Execution error: {}", message))]
|
||||
Execution { message: String },
|
||||
#[snafu(display("Cannot parse SQL: {}, source: {}", sql, source))]
|
||||
ParseSql { sql: String, source: ParserError },
|
||||
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
|
||||
Planner {
|
||||
sql: String,
|
||||
source: DataFusionError,
|
||||
},
|
||||
}
|
||||
common_error::define_opaque_error!(Error);
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
|
||||
@@ -13,8 +13,9 @@ use table::table::adapter::DfTableProviderAdapter;
|
||||
|
||||
use crate::{
|
||||
catalog::{CatalogListRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME},
|
||||
error::{PlannerSnafu, Result},
|
||||
error::Result,
|
||||
plan::LogicalPlan,
|
||||
query_engine::datafusion::error,
|
||||
};
|
||||
|
||||
pub trait Planner: Send + Sync {
|
||||
@@ -39,7 +40,8 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
|
||||
let result = self
|
||||
.sql_to_rel
|
||||
.query_to_plan(query.inner)
|
||||
.context(PlannerSnafu { sql })?;
|
||||
// FIXME(yingwen): Move DfPlanner to datafusion mod.
|
||||
.context(error::PlannerSnafu { sql })?;
|
||||
|
||||
Ok(LogicalPlan::DfPlan(result))
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
mod context;
|
||||
mod datafusion;
|
||||
pub mod datafusion;
|
||||
mod state;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
mod adapter;
|
||||
pub mod error;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -9,7 +10,7 @@ use sql::{dialect::GenericDialect, parser::ParserContext};
|
||||
use super::{context::QueryContext, state::QueryEngineState};
|
||||
use crate::{
|
||||
catalog::CatalogListRef,
|
||||
error::{self, ParseSqlSnafu, Result},
|
||||
error::Result,
|
||||
executor::QueryExecutor,
|
||||
logical_optimizer::LogicalOptimizer,
|
||||
physical_optimizer::PhysicalOptimizer,
|
||||
@@ -17,7 +18,7 @@ use crate::{
|
||||
plan::{LogicalPlan, PhysicalPlan},
|
||||
planner::{DfContextProviderAdapter, DfPlanner, Planner},
|
||||
query_engine::datafusion::adapter::PhysicalPlanAdapter,
|
||||
query_engine::{Output, QueryEngine},
|
||||
Output, QueryEngine,
|
||||
};
|
||||
|
||||
pub(crate) struct DatafusionQueryEngine {
|
||||
@@ -42,9 +43,7 @@ impl QueryEngine for DatafusionQueryEngine {
|
||||
let context_provider = DfContextProviderAdapter::new(self.state.catalog_list());
|
||||
let planner = DfPlanner::new(&context_provider);
|
||||
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
|
||||
.with_context(|_| ParseSqlSnafu {
|
||||
sql: sql.to_string(),
|
||||
})?;
|
||||
.context(error::ParseSqlSnafu)?;
|
||||
// TODO(dennis): supports multi statement in one sql?
|
||||
assert!(1 == statement.len());
|
||||
planner.statement_to_plan(statement.remove(0))
|
||||
@@ -70,11 +69,13 @@ impl LogicalOptimizer for DatafusionQueryEngine {
|
||||
) -> Result<LogicalPlan> {
|
||||
match plan {
|
||||
LogicalPlan::DfPlan(df_plan) => {
|
||||
let optimized_plan = self
|
||||
.state
|
||||
.df_context()
|
||||
.optimize(df_plan)
|
||||
.context(error::DatafusionSnafu)?;
|
||||
let optimized_plan =
|
||||
self.state
|
||||
.df_context()
|
||||
.optimize(df_plan)
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to optimize logical plan",
|
||||
})?;
|
||||
|
||||
Ok(LogicalPlan::DfPlan(optimized_plan))
|
||||
}
|
||||
@@ -96,7 +97,9 @@ impl PhysicalPlanner for DatafusionQueryEngine {
|
||||
.df_context()
|
||||
.create_physical_plan(df_plan)
|
||||
.await
|
||||
.context(error::DatafusionSnafu)?;
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to create physical plan",
|
||||
})?;
|
||||
|
||||
Ok(Arc::new(PhysicalPlanAdapter::new(
|
||||
Arc::new(physical_plan.schema().into()),
|
||||
@@ -126,7 +129,9 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
|
||||
for optimizer in optimizers {
|
||||
new_plan = optimizer
|
||||
.optimize(new_plan, config)
|
||||
.context(error::DatafusionSnafu)?;
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to optimize physical plan",
|
||||
})?;
|
||||
}
|
||||
Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan)))
|
||||
}
|
||||
|
||||
@@ -16,9 +16,10 @@ use datatypes::schema::SchemaRef;
|
||||
use snafu::ResultExt;
|
||||
use table::table::adapter::{DfRecordBatchStreamAdapter, RecordBatchStreamAdapter};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::error::Result;
|
||||
use crate::executor::Runtime;
|
||||
use crate::plan::{Partitioning, PhysicalPlan};
|
||||
use crate::query_engine::datafusion::error;
|
||||
|
||||
/// Datafusion ExecutionPlan -> greptime PhysicalPlan
|
||||
pub struct PhysicalPlanAdapter {
|
||||
@@ -74,7 +75,9 @@ impl PhysicalPlan for PhysicalPlanAdapter {
|
||||
let plan = self
|
||||
.plan
|
||||
.with_new_children(df_children)
|
||||
.context(error::DatafusionSnafu)?;
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to add children to plan",
|
||||
})?;
|
||||
Ok(Arc::new(PhysicalPlanAdapter::new(
|
||||
self.schema.clone(),
|
||||
plan,
|
||||
@@ -86,11 +89,13 @@ impl PhysicalPlan for PhysicalPlanAdapter {
|
||||
runtime: &Runtime,
|
||||
partition: usize,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let df_stream = self
|
||||
.plan
|
||||
.execute(partition, runtime.into())
|
||||
.await
|
||||
.context(error::DatafusionSnafu)?;
|
||||
let df_stream =
|
||||
self.plan
|
||||
.execute(partition, runtime.into())
|
||||
.await
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to execute physical plan",
|
||||
})?;
|
||||
|
||||
Ok(Box::pin(RecordBatchStreamAdapter::new(df_stream)))
|
||||
}
|
||||
|
||||
46
src/query/src/query_engine/datafusion/error.rs
Normal file
46
src/query/src/query_engine/datafusion/error.rs
Normal file
@@ -0,0 +1,46 @@
|
||||
use common_error::ext::ErrorExt;
|
||||
use datafusion::error::DataFusionError;
|
||||
use snafu::{Backtrace, ErrorCompat, Snafu};
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
/// Inner error of datafusion based query engine.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
#[snafu(display("{}: {}", msg, source))]
|
||||
Datafusion {
|
||||
msg: &'static str,
|
||||
source: DataFusionError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("PhysicalPlan downcast failed"))]
|
||||
PhysicalPlanDowncast { backtrace: Backtrace },
|
||||
|
||||
// The sql error already contains the SQL.
|
||||
#[snafu(display("Cannot parse SQL, source: {}", source))]
|
||||
ParseSql { source: sql::errors::ParserError },
|
||||
|
||||
#[snafu(display("Cannot plan SQL: {}, source: {}", sql, source))]
|
||||
Planner {
|
||||
sql: String,
|
||||
source: DataFusionError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Execution error: {}", message))]
|
||||
Execution { message: String },
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
fn backtrace_opt(&self) -> Option<&snafu::Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(err: InnerError) -> Self {
|
||||
Self::new(err)
|
||||
}
|
||||
}
|
||||
@@ -16,12 +16,10 @@ use table::{
|
||||
Table,
|
||||
};
|
||||
|
||||
use crate::catalog::{
|
||||
schema::SchemaProvider, CatalogListRef, CatalogProvider, DEFAULT_CATALOG_NAME,
|
||||
DEFAULT_SCHEMA_NAME,
|
||||
};
|
||||
use crate::error::{self, Result};
|
||||
use crate::catalog::{self, schema::SchemaProvider, CatalogListRef, CatalogProvider};
|
||||
use crate::error::Result;
|
||||
use crate::executor::Runtime;
|
||||
use crate::query_engine::datafusion::error;
|
||||
|
||||
/// Query engine global state
|
||||
#[derive(Clone)]
|
||||
@@ -39,8 +37,10 @@ impl fmt::Debug for QueryEngineState {
|
||||
|
||||
impl QueryEngineState {
|
||||
pub(crate) fn new(catalog_list: CatalogListRef) -> Self {
|
||||
let config = ExecutionConfig::new()
|
||||
.with_default_catalog_and_schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME);
|
||||
let config = ExecutionConfig::new().with_default_catalog_and_schema(
|
||||
catalog::DEFAULT_CATALOG_NAME,
|
||||
catalog::DEFAULT_SCHEMA_NAME,
|
||||
);
|
||||
let df_context = ExecutionContext::with_config(config);
|
||||
|
||||
df_context.state.lock().catalog_list = Arc::new(DfCatalogListAdapter {
|
||||
@@ -193,18 +193,16 @@ impl DfSchemaProvider for DfSchemaProviderAdapter {
|
||||
table: Arc<dyn DfTableProvider>,
|
||||
) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
|
||||
let table = Arc::new(TableAdapter::new(table, self.runtime.clone()));
|
||||
match self.schema_provider.register_table(name, table) {
|
||||
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
|
||||
Ok(None) => Ok(None),
|
||||
Err(e) => Err(e.into()),
|
||||
match self.schema_provider.register_table(name, table)? {
|
||||
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister_table(&self, name: &str) -> DataFusionResult<Option<Arc<dyn DfTableProvider>>> {
|
||||
match self.schema_provider.deregister_table(name) {
|
||||
Ok(Some(p)) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
|
||||
Ok(None) => Ok(None),
|
||||
Err(e) => Err(e.into()),
|
||||
match self.schema_provider.deregister_table(name)? {
|
||||
Some(p) => Ok(Some(Arc::new(DfTableProviderAdapter::new(p)))),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -244,7 +242,9 @@ impl SchemaProvider for SchemaProviderAdapter {
|
||||
Ok(self
|
||||
.df_schema_provider
|
||||
.register_table(name, table_provider)
|
||||
.context(error::DatafusionSnafu)?
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to register table to datafusion",
|
||||
})?
|
||||
.map(|table| (Arc::new(TableAdapter::new(table, self.runtime.clone())) as _)))
|
||||
}
|
||||
|
||||
@@ -252,7 +252,9 @@ impl SchemaProvider for SchemaProviderAdapter {
|
||||
Ok(self
|
||||
.df_schema_provider
|
||||
.deregister_table(name)
|
||||
.context(error::DatafusionSnafu)?
|
||||
.context(error::DatafusionSnafu {
|
||||
msg: "Fail to deregister table from datafusion",
|
||||
})?
|
||||
.map(|table| Arc::new(TableAdapter::new(table, self.runtime.clone())) as _))
|
||||
}
|
||||
|
||||
|
||||
@@ -6,5 +6,5 @@ edition = "2021"
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
snafu = "0.7.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
sqlparser = "0.15.0"
|
||||
|
||||
@@ -2,14 +2,20 @@ use snafu::prelude::*;
|
||||
use sqlparser::parser::ParserError as SpParserError;
|
||||
|
||||
/// SQL parser errors.
|
||||
// Now the error in parser does not contains backtrace to avoid generating backtrace
|
||||
// every time the parser parses an invalid SQL.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum ParserError {
|
||||
#[snafu(display("SQL statement is not supported: {sql}, keyword: {keyword}"))]
|
||||
#[snafu(display("SQL statement is not supported: {}, keyword: {}", sql, keyword))]
|
||||
Unsupported { sql: String, keyword: String },
|
||||
|
||||
#[snafu(display(
|
||||
"Unexpected token while parsing SQL statement: {sql}, expected: {expected}, found: {actual}, source: {source}"
|
||||
"Unexpected token while parsing SQL statement: {}, expected: {}, found: {}, source: {}",
|
||||
sql,
|
||||
expected,
|
||||
actual,
|
||||
source
|
||||
))]
|
||||
Unexpected {
|
||||
sql: String,
|
||||
@@ -18,34 +24,30 @@ pub enum ParserError {
|
||||
source: SpParserError,
|
||||
},
|
||||
|
||||
#[snafu(display("SQL syntax error: {msg}"))]
|
||||
SyntaxError { msg: String },
|
||||
|
||||
#[snafu(display("Unknown inner parser error, sql: {sql}, source: {source}"))]
|
||||
InnerError { sql: String, source: SpParserError },
|
||||
// Syntax error from sql parser.
|
||||
#[snafu(display("Syntax error, sql: {}, source: {}", sql, source))]
|
||||
SpSyntax { sql: String, source: SpParserError },
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use snafu::ResultExt;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
pub fn test_error_conversion() {
|
||||
pub fn raise_error() -> Result<(), sqlparser::parser::ParserError> {
|
||||
Err(sqlparser::parser::ParserError::ParserError(
|
||||
"parser error".to_string(),
|
||||
))
|
||||
pub fn raise_error() -> Result<(), SpParserError> {
|
||||
Err(SpParserError::ParserError("parser error".to_string()))
|
||||
}
|
||||
|
||||
assert_matches!(
|
||||
raise_error().context(crate::errors::InnerSnafu {
|
||||
raise_error().context(SpSyntaxSnafu {
|
||||
sql: "".to_string(),
|
||||
}),
|
||||
Err(super::ParserError::InnerError {
|
||||
Err(ParserError::SpSyntax {
|
||||
sql: _,
|
||||
source: sqlparser::parser::ParserError::ParserError { .. }
|
||||
source: SpParserError::ParserError { .. }
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ impl<'a> ParserContext<'a> {
|
||||
let spstatement = self
|
||||
.parser
|
||||
.parse_insert()
|
||||
.context(errors::InnerSnafu { sql: self.sql })?;
|
||||
.context(errors::SpSyntaxSnafu { sql: self.sql })?;
|
||||
|
||||
match spstatement {
|
||||
SpStatement::Insert { .. } => {
|
||||
|
||||
@@ -12,7 +12,7 @@ impl<'a> ParserContext<'a> {
|
||||
let spquery = self
|
||||
.parser
|
||||
.parse_query()
|
||||
.context(errors::InnerSnafu { sql: self.sql })?;
|
||||
.context(errors::SpSyntaxSnafu { sql: self.sql })?;
|
||||
|
||||
Ok(Statement::Query(Box::new(Query::try_from(spquery)?)))
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
common-error = {path = "../common/error" }
|
||||
common-query = {path = "../common/query" }
|
||||
common-recordbatch = {path = "../common/recordbatch" }
|
||||
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
|
||||
@@ -18,4 +19,4 @@ datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , b
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
serde = "1.0.136"
|
||||
snafu = "0.7.0"
|
||||
snafu = { version = "0.7", features = ["backtraces"] }
|
||||
|
||||
@@ -1,18 +1,45 @@
|
||||
use common_error::ext::ErrorExt;
|
||||
use datafusion::error::DataFusionError;
|
||||
use snafu::Snafu;
|
||||
use snafu::{Backtrace, ErrorCompat, Snafu};
|
||||
|
||||
common_error::define_opaque_error!(Error);
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Datafusion error: {}", source))]
|
||||
Datafusion { source: DataFusionError },
|
||||
#[snafu(display("Not expected to run ExecutionPlan more than once."))]
|
||||
ExecuteRepeatedly,
|
||||
}
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
impl From<Error> for DataFusionError {
|
||||
fn from(e: Error) -> DataFusionError {
|
||||
fn from(e: Error) -> Self {
|
||||
Self::External(Box::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
/// Default error implementation of table.
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum InnerError {
|
||||
#[snafu(display("Datafusion error: {}", source))]
|
||||
Datafusion {
|
||||
source: DataFusionError,
|
||||
backtrace: Backtrace,
|
||||
},
|
||||
|
||||
#[snafu(display("Not expected to run ExecutionPlan more than once"))]
|
||||
ExecuteRepeatedly { backtrace: Backtrace },
|
||||
}
|
||||
|
||||
impl ErrorExt for InnerError {
|
||||
fn backtrace_opt(&self) -> Option<&snafu::Backtrace> {
|
||||
ErrorCompat::backtrace(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for Error {
|
||||
fn from(err: InnerError) -> Self {
|
||||
Self::new(err)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<InnerError> for DataFusionError {
|
||||
fn from(e: InnerError) -> DataFusionError {
|
||||
DataFusionError::External(Box::new(e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ use datafusion::datasource::{
|
||||
datasource::TableProviderFilterPushDown as DfTableProviderFilterPushDown, TableProvider,
|
||||
TableType as DfTableType,
|
||||
};
|
||||
use datafusion::error::{DataFusionError, Result as DfResult};
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion::execution::runtime_env::RuntimeEnv;
|
||||
use datafusion::logical_plan::Expr as DfExpr;
|
||||
use datafusion::physical_plan::{
|
||||
@@ -90,9 +90,7 @@ impl ExecutionPlan for ExecutionPlanAdapter {
|
||||
let stream = mem::replace(&mut *stream, None);
|
||||
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream.unwrap())))
|
||||
} else {
|
||||
error::ExecuteRepeatedlySnafu
|
||||
.fail()
|
||||
.map_err(|e| DataFusionError::External(Box::new(e)))
|
||||
error::ExecuteRepeatedlySnafu.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,28 +137,23 @@ impl TableProvider for DfTableProviderAdapter {
|
||||
) -> DfResult<Arc<dyn ExecutionPlan>> {
|
||||
let filters: Vec<Expr> = filters.iter().map(Clone::clone).map(Expr::new).collect();
|
||||
|
||||
match self.table.scan(projection, &filters, limit).await {
|
||||
Ok(stream) => Ok(Arc::new(ExecutionPlanAdapter {
|
||||
schema: stream.schema(),
|
||||
stream: Mutex::new(Some(stream)),
|
||||
})),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
let stream = self.table.scan(projection, &filters, limit).await?;
|
||||
Ok(Arc::new(ExecutionPlanAdapter {
|
||||
schema: stream.schema(),
|
||||
stream: Mutex::new(Some(stream)),
|
||||
}))
|
||||
}
|
||||
|
||||
fn supports_filter_pushdown(&self, filter: &DfExpr) -> DfResult<DfTableProviderFilterPushDown> {
|
||||
match self
|
||||
let p = self
|
||||
.table
|
||||
.supports_filter_pushdown(&Expr::new(filter.clone()))
|
||||
{
|
||||
Ok(p) => match p {
|
||||
TableProviderFilterPushDown::Unsupported => {
|
||||
Ok(DfTableProviderFilterPushDown::Unsupported)
|
||||
}
|
||||
TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact),
|
||||
TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact),
|
||||
},
|
||||
Err(e) => Err(e.into()),
|
||||
.supports_filter_pushdown(&Expr::new(filter.clone()))?;
|
||||
match p {
|
||||
TableProviderFilterPushDown::Unsupported => {
|
||||
Ok(DfTableProviderFilterPushDown::Unsupported)
|
||||
}
|
||||
TableProviderFilterPushDown::Inexact => Ok(DfTableProviderFilterPushDown::Inexact),
|
||||
TableProviderFilterPushDown::Exact => Ok(DfTableProviderFilterPushDown::Exact),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user