refactor: remove macro define_opaque_error (#812)

* refactor: remove macro define_opaque_error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl BoxedError

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove open-region error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-01-03 15:50:27 +08:00
committed by GitHub
parent 334fd26bc5
commit 0566f812d3
17 changed files with 280 additions and 268 deletions

View File

@@ -163,6 +163,9 @@ pub enum Error {
source: datatypes::error::Error,
},
#[snafu(display("Failure during SchemaProvider operation, source: {}", source))]
SchemaProviderOperation { source: BoxedError },
#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
SystemCatalogTableScanExec {
#[snafu(backtrace)]
@@ -240,7 +243,9 @@ impl ErrorExt for Error {
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } => source.status_code(),
Error::Internal { source, .. } | Error::SchemaProviderOperation { source } => {
source.status_code()
}
Error::Unimplemented { .. } => StatusCode::Unsupported,
}
@@ -263,7 +268,6 @@ impl From<Error> for DataFusionError {
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use snafu::GenerateImplicitData;
use super::*;
@@ -284,22 +288,6 @@ mod tests {
InvalidKeySnafu { key: None }.build().status_code()
);
assert_eq!(
StatusCode::StorageUnavailable,
Error::OpenSystemCatalog {
source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable))
}
.status_code()
);
assert_eq!(
StatusCode::StorageUnavailable,
Error::CreateSystemCatalog {
source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable))
}
.status_code()
);
assert_eq!(
StatusCode::StorageUnavailable,
Error::SystemCatalog {

View File

@@ -33,72 +33,60 @@ pub trait ErrorExt: std::error::Error {
fn as_any(&self) -> &dyn Any;
}
/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait.
#[macro_export]
macro_rules! define_opaque_error {
($Error:ident) => {
/// An error behaves like `Box<dyn Error>`.
///
/// Define this error as a new type instead of using `Box<dyn Error>` directly so we can implement
/// more methods or traits for it.
pub struct $Error {
inner: Box<dyn $crate::ext::ErrorExt + Send + Sync>,
}
impl $Error {
pub fn new<E: $crate::ext::ErrorExt + Send + Sync + 'static>(err: E) -> Self {
Self {
inner: Box::new(err),
}
}
}
impl std::fmt::Debug for $Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Use the pretty debug format of inner error for opaque error.
let debug_format = $crate::format::DebugFormat::new(&*self.inner);
debug_format.fmt(f)
}
}
impl std::fmt::Display for $Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
}
}
impl std::error::Error for $Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.inner.source()
}
}
impl $crate::ext::ErrorExt for $Error {
fn status_code(&self) -> $crate::status_code::StatusCode {
self.inner.status_code()
}
fn backtrace_opt(&self) -> Option<&$crate::snafu::Backtrace> {
self.inner.backtrace_opt()
}
fn as_any(&self) -> &dyn std::any::Any {
self.inner.as_any()
}
}
// Implement ErrorCompat for this opaque error so the backtrace is also available
// via `ErrorCompat::backtrace()`.
impl $crate::snafu::ErrorCompat for $Error {
fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> {
self.inner.backtrace_opt()
}
}
};
/// An opaque boxed error based on errors that implement [ErrorExt] trait.
pub struct BoxedError {
inner: Box<dyn crate::ext::ErrorExt + Send + Sync>,
}
// Define a general boxed error.
define_opaque_error!(BoxedError);
impl BoxedError {
pub fn new<E: crate::ext::ErrorExt + Send + Sync + 'static>(err: E) -> Self {
Self {
inner: Box::new(err),
}
}
}
impl std::fmt::Debug for BoxedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Use the pretty debug format of inner error for opaque error.
let debug_format = crate::format::DebugFormat::new(&*self.inner);
debug_format.fmt(f)
}
}
impl std::fmt::Display for BoxedError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.inner)
}
}
impl std::error::Error for BoxedError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.inner.source()
}
}
impl crate::ext::ErrorExt for BoxedError {
fn status_code(&self) -> crate::status_code::StatusCode {
self.inner.status_code()
}
fn backtrace_opt(&self) -> Option<&crate::snafu::Backtrace> {
self.inner.backtrace_opt()
}
fn as_any(&self) -> &dyn std::any::Any {
self.inner.as_any()
}
}
// Implement ErrorCompat for this opaque error so the backtrace is also available
// via `ErrorCompat::backtrace()`.
impl crate::snafu::ErrorCompat for BoxedError {
fn backtrace(&self) -> Option<&crate::snafu::Backtrace> {
self.inner.backtrace_opt()
}
}
#[cfg(test)]
mod tests {

View File

@@ -412,9 +412,10 @@ mod tests {
use super::*;
fn throw_query_error() -> std::result::Result<(), query::error::Error> {
Err(query::error::Error::new(MockError::with_backtrace(
StatusCode::Internal,
)))
query::error::CatalogNotFoundSnafu {
catalog: String::new(),
}
.fail()
}
fn throw_catalog_error() -> catalog::error::Result<()> {
@@ -428,6 +429,11 @@ mod tests {
assert_eq!(StatusCode::Internal, err.status_code());
}
fn assert_invalid_argument_error(err: &Error) {
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
fn assert_tonic_internal_error(err: Error) {
let s: tonic::Status = err.into();
assert_eq!(s.code(), tonic::Code::Internal);
@@ -436,7 +442,7 @@ mod tests {
#[test]
fn test_error() {
let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap();
assert_internal_error(&err);
assert_invalid_argument_error(&err);
assert_tonic_internal_error(err);
let err = throw_catalog_error()
.context(NewCatalogSnafu)

View File

@@ -22,6 +22,7 @@ use api::v1::AlterExpr;
use async_trait::async_trait;
use client::{Database, RpcOutput};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_query::error::Result as QueryResult;
use common_query::logical_plan::Expr;
use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef};
@@ -40,7 +41,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use meta_client::rpc::{Peer, TableName};
use snafu::prelude::*;
use store_api::storage::RegionNumber;
use table::error::Error as TableError;
use table::error::TableOperationSnafu;
use table::metadata::{FilterPushDownType, TableInfoRef};
use table::requests::InsertRequest;
use table::Table;
@@ -83,12 +84,23 @@ impl Table for DistTable {
}
async fn insert(&self, request: InsertRequest) -> table::Result<usize> {
let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?;
let partition_rule = self
.find_partition_rule()
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let spliter = WriteSpliter::with_partition_rule(partition_rule);
let inserts = spliter.split(request).map_err(TableError::new)?;
let inserts = spliter
.split(request)
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let output = self.dist_insert(inserts).await.map_err(TableError::new)?;
let output = self
.dist_insert(inserts)
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let RpcOutput::AffectedRows(rows) = output else { unreachable!() };
Ok(rows)
}
@@ -99,15 +111,21 @@ impl Table for DistTable {
filters: &[Expr],
limit: Option<usize>,
) -> table::Result<PhysicalPlanRef> {
let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?;
let partition_rule = self
.find_partition_rule()
.await
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let regions = self
.find_regions(partition_rule, filters)
.map_err(TableError::new)?;
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let datanodes = self
.find_datanodes(regions)
.await
.map_err(TableError::new)?;
.map_err(BoxedError::new)
.context(TableOperationSnafu)?;
let mut partition_execs = Vec::with_capacity(datanodes.len());
for (datanode, _regions) in datanodes.iter() {

View File

@@ -31,7 +31,7 @@ use table::engine::{EngineContext, TableEngine, TableReference};
use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::table::TableRef;
use table::{Result as TableResult, Table};
use table::{error as table_error, Result as TableResult, Table};
use tokio::sync::Mutex;
use crate::config::EngineConfig;
@@ -90,7 +90,11 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
ctx: &EngineContext,
request: CreateTableRequest,
) -> TableResult<TableRef> {
Ok(self.inner.create_table(ctx, request).await?)
self.inner
.create_table(ctx, request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn open_table(
@@ -98,7 +102,11 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
ctx: &EngineContext,
request: OpenTableRequest,
) -> TableResult<Option<TableRef>> {
Ok(self.inner.open_table(ctx, request).await?)
self.inner
.open_table(ctx, request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn alter_table(
@@ -106,7 +114,11 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
ctx: &EngineContext,
req: AlterTableRequest,
) -> TableResult<TableRef> {
Ok(self.inner.alter_table(ctx, req).await?)
self.inner
.alter_table(ctx, req)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
fn get_table(
@@ -126,7 +138,11 @@ impl<S: StorageEngine> TableEngine for MitoEngine<S> {
_ctx: &EngineContext,
request: DropTableRequest,
) -> TableResult<bool> {
Ok(self.inner.drop_table(request).await?)
self.inner
.drop_table(request)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
}
@@ -437,14 +453,17 @@ impl<S: StorageEngine> MitoEngineInner<S> {
.open_region(&engine_ctx, &region_name, &opts)
.await
.map_err(BoxedError::new)
.context(error::OpenRegionSnafu { region_name })?
.context(table_error::TableOperationSnafu)?
{
None => return Ok(None),
Some(region) => region,
};
let table = Arc::new(
MitoTable::open(table_name, &table_dir, region, self.object_store.clone()).await?,
MitoTable::open(table_name, &table_dir, region, self.object_store.clone())
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?,
);
self.tables

View File

@@ -27,13 +27,6 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to open region, region: {}, source: {}", region_name, source))]
OpenRegion {
region_name: String,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display(
"Failed to build table meta for table: {}, source: {}",
table_name,
@@ -179,12 +172,6 @@ pub enum Error {
},
}
impl From<Error> for table::error::Error {
fn from(e: Error) -> Self {
table::error::Error::new(e)
}
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
@@ -192,7 +179,7 @@ impl ErrorExt for Error {
use Error::*;
match self {
CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(),
CreateRegion { source, .. } => source.status_code(),
AlterTable { source, .. } => source.status_code(),
@@ -243,12 +230,4 @@ mod tests {
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
}
#[test]
pub fn test_opaque_error() {
let error = throw_create_table(StatusCode::InvalidSyntax).err().unwrap();
let table_engine_error: table::error::Error = error.into();
assert!(table_engine_error.backtrace_opt().is_some());
assert_eq!(StatusCode::InvalidSyntax, table_engine_error.status_code());
}
}

View File

@@ -36,7 +36,8 @@ use store_api::storage::{
AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta,
ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest,
};
use table::error::{Error as TableError, Result as TableResult};
use table::error as table_error;
use table::error::Result as TableResult;
use table::metadata::{
FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType,
};
@@ -94,13 +95,17 @@ impl<R: Region> Table for MitoTable<R> {
columns_values
);
write_request.put(columns_values).map_err(TableError::new)?;
write_request
.put(columns_values)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let _resp = self
.region
.write(&WriteContext::default(), write_request)
.await
.map_err(TableError::new)?;
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
Ok(rows_num)
}
@@ -120,9 +125,16 @@ impl<R: Region> Table for MitoTable<R> {
_limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
let read_ctx = ReadContext::default();
let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?;
let snapshot = self
.region
.snapshot(&read_ctx)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let projection = self.transform_projection(&self.region, projection.cloned())?;
let projection = self
.transform_projection(&self.region, projection.cloned())
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let filters = filters.into();
let scan_request = ScanRequest {
projection,
@@ -132,7 +144,8 @@ impl<R: Region> Table for MitoTable<R> {
let mut reader = snapshot
.scan(&read_ctx, scan_request)
.await
.map_err(TableError::new)?
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?
.reader;
let schema = reader.schema().clone();
@@ -158,7 +171,9 @@ impl<R: Region> Table for MitoTable<R> {
let mut new_meta = table_meta
.builder_with_alter_kind(table_name, &req.alter_kind)?
.build()
.context(error::BuildTableMetaSnafu { table_name })?;
.context(error::BuildTableMetaSnafu { table_name })
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
let alter_op = create_alter_operation(table_name, &req.alter_kind, &mut new_meta)?;
@@ -182,7 +197,9 @@ impl<R: Region> Table for MitoTable<R> {
.await
.context(UpdateTableManifestSnafu {
table_name: &self.table_info().name,
})?;
})
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
// TODO(yingwen): Error handling. Maybe the region need to provide a method to
// validate the request first.
@@ -199,7 +216,11 @@ impl<R: Region> Table for MitoTable<R> {
table_name,
alter_req,
);
region.alter(alter_req).await.map_err(TableError::new)?;
region
.alter(alter_req)
.await
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?;
// Update in memory metadata of the table.
self.set_table_info(new_info);

View File

@@ -16,18 +16,16 @@ use std::any::Any;
use common_error::prelude::*;
common_error::define_opaque_error!(Error);
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum InnerError {
pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, backtrace: Backtrace },
}
impl ErrorExt for InnerError {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use InnerError::*;
use Error::*;
match self {
UnsupportedExpr { .. } => StatusCode::InvalidArguments,
}
@@ -41,10 +39,4 @@ impl ErrorExt for InnerError {
}
}
impl From<InnerError> for Error {
fn from(e: InnerError) -> Error {
Error::new(e)
}
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -21,6 +21,7 @@ mod planner;
use std::sync::Arc;
use catalog::CatalogListRef;
use common_error::prelude::BoxedError;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::udf::create_udf;
use common_function::scalars::FunctionRef;
@@ -33,14 +34,14 @@ use common_telemetry::timer;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::ExecutionPlan;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{IntoError, OptionExt, ResultExt};
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
use sql::statements::statement::Statement;
pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter;
use crate::datafusion::planner::{DfContextProviderAdapter, DfPlanner};
use crate::error::Result;
use crate::error::{QueryExecutionSnafu, Result};
use crate::executor::QueryExecutor;
use crate::logical_optimizer::LogicalOptimizer;
use crate::physical_optimizer::PhysicalOptimizer;
@@ -71,9 +72,16 @@ impl QueryEngine for DatafusionQueryEngine {
fn sql_to_statement(&self, sql: &str) -> Result<Statement> {
let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {})
.context(error::ParseSqlSnafu)?;
ensure!(1 == statement.len(), error::MultipleStatementsSnafu { sql });
Ok(statement.remove(0))
.context(error::ParseSqlSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
if statement.len() != 1 {
Err(QueryExecutionSnafu {}.into_error(BoxedError::new(
error::MultipleStatementsSnafu { sql }.build(),
)))
} else {
Ok(statement.remove(0))
}
}
fn statement_to_plan(
@@ -138,12 +146,14 @@ impl LogicalOptimizer for DatafusionQueryEngine {
let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED);
match plan {
LogicalPlan::DfPlan(df_plan) => {
let optimized_plan =
self.state
.optimize(df_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize logical plan",
})?;
let optimized_plan = self
.state
.optimize(df_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize logical plan",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(LogicalPlan::DfPlan(optimized_plan))
}
@@ -161,18 +171,24 @@ impl PhysicalPlanner for DatafusionQueryEngine {
let _timer = timer!(metric::METRIC_CREATE_PHYSICAL_ELAPSED);
match logical_plan {
LogicalPlan::DfPlan(df_plan) => {
let physical_plan = self.state.create_physical_plan(df_plan).await.context(
error::DatafusionSnafu {
let physical_plan = self
.state
.create_physical_plan(df_plan)
.await
.context(error::DatafusionSnafu {
msg: "Fail to create physical plan",
},
)?;
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(
physical_plan
.schema()
.try_into()
.context(error::ConvertSchemaSnafu)?,
.context(error::ConvertSchemaSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?,
),
physical_plan,
)))
@@ -192,15 +208,19 @@ impl PhysicalOptimizer for DatafusionQueryEngine {
let new_plan = plan
.as_any()
.downcast_ref::<PhysicalPlanAdapter>()
.context(error::PhysicalPlanDowncastSnafu)?
.context(error::PhysicalPlanDowncastSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?
.df_plan();
let new_plan =
self.state
.optimize_physical_plan(new_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize physical plan",
})?;
let new_plan = self
.state
.optimize_physical_plan(new_plan)
.context(error::DatafusionSnafu {
msg: "Fail to optimize physical plan",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan)))
}
}
@@ -217,20 +237,26 @@ impl QueryExecutor for DatafusionQueryEngine {
0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
1 => Ok(plan
.execute(0, ctx.state().task_ctx())
.context(error::ExecutePhysicalPlanSnafu)?),
.context(error::ExecutePhysicalPlanSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu))?,
_ => {
// merge into a single partition
let plan =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(plan.clone())));
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
let df_stream =
plan.execute(0, ctx.state().task_ctx())
.context(error::DatafusionSnafu {
msg: "Failed to execute DataFusion merge exec",
})?;
let df_stream = plan
.execute(0, ctx.state().task_ctx())
.context(error::DatafusionSnafu {
msg: "Failed to execute DataFusion merge exec",
})
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let stream = RecordBatchStreamAdapter::try_new(df_stream)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
Ok(Box::pin(stream))
}
}

View File

@@ -17,10 +17,11 @@
use std::any::Any;
use std::sync::Arc;
use catalog::error::Error;
use catalog::error::{self as catalog_error, Error};
use catalog::{
CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef,
};
use common_error::prelude::BoxedError;
use datafusion::catalog::catalog::{
CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider,
};
@@ -224,7 +225,9 @@ impl SchemaProvider for SchemaProviderAdapter {
.register_table(name, table_provider)
.context(error::DatafusionSnafu {
msg: "Fail to register table to datafusion",
})?
})
.map_err(BoxedError::new)
.context(catalog_error::SchemaProviderOperationSnafu)?
.map(|_| table))
}
@@ -233,9 +236,14 @@ impl SchemaProvider for SchemaProviderAdapter {
.deregister_table(name)
.context(error::DatafusionSnafu {
msg: "Fail to deregister table from datafusion",
})?
})
.map_err(BoxedError::new)
.context(catalog_error::SchemaProviderOperationSnafu)?
.map(|table| {
let adapter = TableAdapter::new(table).context(error::TableSchemaMismatchSnafu)?;
let adapter = TableAdapter::new(table)
.context(error::TableSchemaMismatchSnafu)
.map_err(BoxedError::new)
.context(catalog_error::SchemaProviderOperationSnafu)?;
Ok(Arc::new(adapter) as _)
})
.transpose()

View File

@@ -17,8 +17,6 @@ use std::any::Any;
use common_error::prelude::*;
use datafusion::error::DataFusionError;
use crate::error::Error;
/// Inner error of datafusion based query engine.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -106,23 +104,8 @@ impl ErrorExt for InnerError {
}
}
impl From<InnerError> for catalog::error::Error {
fn from(e: InnerError) -> Self {
catalog::error::Error::RegisterTable {
source: BoxedError::new(e),
}
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use super::*;
@@ -168,15 +151,4 @@ mod tests {
let sql_err = raise_sql_error().err().unwrap();
assert_eq!(sql_err.status_code(), err.status_code());
}
#[test]
pub fn test_from_inner_error() {
let err = InnerError::TableSchemaMismatch {
source: table::error::Error::new(MockError::new(StatusCode::Unexpected)),
};
let catalog_error = catalog::error::Error::from(err);
// [InnerError] to [catalog::error::Error] is considered as Internal error
assert_eq!(StatusCode::Internal, catalog_error.status_code());
}
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use common_error::prelude::BoxedError;
use common_query::logical_plan::create_aggregate_function;
use datafusion::catalog::TableReference;
use datafusion::error::Result as DfResult;
@@ -30,7 +31,7 @@ use sql::statements::query::Query;
use sql::statements::statement::Statement;
use crate::datafusion::error;
use crate::error::Result;
use crate::error::{QueryPlanSnafu, Result};
use crate::plan::LogicalPlan;
use crate::planner::Planner;
use crate::query_engine::QueryEngineState;
@@ -53,7 +54,9 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
let result = self
.sql_to_rel
.query_to_plan(query.inner, &mut PlannerContext::default())
.context(error::PlanSqlSnafu { sql })?;
.context(error::PlanSqlSnafu { sql })
.map_err(BoxedError::new)
.context(QueryPlanSnafu)?;
Ok(LogicalPlan::DfPlan(result))
}
@@ -65,7 +68,9 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> {
.sql_statement_to_plan(explain.inner.clone())
.context(error::PlanSqlSnafu {
sql: explain.to_string(),
})?;
})
.map_err(BoxedError::new)
.context(QueryPlanSnafu)?;
Ok(LogicalPlan::DfPlan(result))
}

View File

@@ -18,11 +18,9 @@ use common_error::prelude::*;
use datafusion::error::DataFusionError;
use snafu::{Backtrace, ErrorCompat, Snafu};
common_error::define_opaque_error!(Error);
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum InnerError {
pub enum Error {
#[snafu(display("Unsupported expr type: {}", name))]
UnsupportedExpr { name: String, backtrace: Backtrace },
@@ -58,11 +56,17 @@ pub enum InnerError {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failure during query execution, source: {}", source))]
QueryExecution { source: BoxedError },
#[snafu(display("Failure during query planning, source: {}", source))]
QueryPlan { source: BoxedError },
}
impl ErrorExt for InnerError {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use InnerError::*;
use Error::*;
match self {
UnsupportedExpr { .. }
@@ -72,6 +76,7 @@ impl ErrorExt for InnerError {
Catalog { source } => source.status_code(),
VectorComputation { source } => source.status_code(),
CreateRecordBatch { source } => source.status_code(),
QueryExecution { source } | QueryPlan { source } => source.status_code(),
}
}
@@ -84,12 +89,6 @@ impl ErrorExt for InnerError {
}
}
impl From<InnerError> for Error {
fn from(e: InnerError) -> Error {
Error::new(e)
}
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
@@ -97,9 +96,3 @@ impl From<Error> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}
impl From<catalog::error::Error> for Error {
fn from(e: catalog::error::Error) -> Self {
Error::new(e)
}
}

View File

@@ -292,9 +292,9 @@ mod test {
let stmt = DescribeTable::new("unknown".to_string(), schema_name, table_name.to_string());
let err = describe_table(stmt, catalog_manager).err().unwrap();
let err = err.as_any().downcast_ref::<error::InnerError>().unwrap();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
if let error::InnerError::CatalogNotFound { catalog, .. } = err {
if let error::Error::CatalogNotFound { catalog, .. } = err {
assert_eq!(catalog, "unknown");
} else {
panic!("describe table returned incorrect error");
@@ -320,9 +320,9 @@ mod test {
let stmt = DescribeTable::new(catalog_name, "unknown".to_string(), table_name.to_string());
let err = describe_table(stmt, catalog_manager).err().unwrap();
let err = err.as_any().downcast_ref::<error::InnerError>().unwrap();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
if let error::InnerError::SchemaNotFound { schema, .. } = err {
if let error::Error::SchemaNotFound { schema, .. } = err {
assert_eq!(schema, "unknown");
} else {
panic!("describe table returned incorrect error");
@@ -348,9 +348,9 @@ mod test {
let stmt = DescribeTable::new(catalog_name, schema_name, "unknown".to_string());
let err = describe_table(stmt, catalog_manager).err().unwrap();
let err = err.as_any().downcast_ref::<error::InnerError>().unwrap();
let err = err.as_any().downcast_ref::<error::Error>().unwrap();
if let error::InnerError::TableNotFound { table, .. } = err {
if let error::Error::TableNotFound { table, .. } = err {
assert_eq!(table, "unknown");
} else {
panic!("describe table returned incorrect error");

View File

@@ -24,6 +24,7 @@ use std::sync::Arc;
use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider};
use catalog::{CatalogList, CatalogProvider, SchemaProvider};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::BoxedError;
use common_query::prelude::{create_udf, make_scalar_function, Volatility};
use common_query::Output;
use common_recordbatch::{util, RecordBatch};
@@ -32,10 +33,11 @@ use datafusion_expr::logical_plan::builder::LogicalPlanBuilder;
use datatypes::prelude::*;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::UInt32Vector;
use query::error::Result;
use query::error::{QueryExecutionSnafu, Result};
use query::plan::LogicalPlan;
use query::query_engine::QueryEngineFactory;
use session::context::QueryContext;
use snafu::ResultExt;
use table::table::adapter::DfTableProviderAdapter;
use table::table::numbers::NumbersTable;
use table::test_util::MemTable;
@@ -45,7 +47,9 @@ use crate::pow::pow;
#[tokio::test]
async fn test_datafusion_query_engine() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog::local::new_memory_catalog_list()?;
let catalog_list = catalog::local::new_memory_catalog_list()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let factory = QueryEngineFactory::new(catalog_list);
let engine = factory.query_engine();
@@ -105,7 +109,9 @@ async fn test_datafusion_query_engine() -> Result<()> {
#[tokio::test]
async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog::local::new_memory_catalog_list()?;
let catalog_list = catalog::local::new_memory_catalog_list()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let default_schema = Arc::new(MemorySchemaProvider::new());
default_schema

View File

@@ -227,20 +227,21 @@ pub fn get_error_reason_loc(err: &Error) -> (String, Option<Location>) {
#[cfg(test)]
mod tests {
use common_error::mock::MockError;
use snafu::ResultExt;
use super::*;
fn throw_query_error() -> query::error::Result<()> {
let mock_err = MockError::with_backtrace(StatusCode::TableColumnNotFound);
Err(query::error::Error::new(mock_err))
query::error::TableNotFoundSnafu {
table: String::new(),
}
.fail()
}
#[test]
fn test_error() {
let err = throw_query_error().context(DatabaseQuerySnafu).unwrap_err();
assert_eq!(StatusCode::TableColumnNotFound, err.status_code());
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
}
}

View File

@@ -19,20 +19,12 @@ use common_recordbatch::error::Error as RecordBatchError;
use datafusion::error::DataFusionError;
use datatypes::arrow::error::ArrowError;
common_error::define_opaque_error!(Error);
pub type Result<T> = std::result::Result<T, Error>;
impl From<Error> for DataFusionError {
fn from(e: Error) -> Self {
Self::External(Box::new(e))
}
}
/// Default error implementation of table.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum InnerError {
pub enum Error {
#[snafu(display("Datafusion error: {}", source))]
Datafusion {
source: DataFusionError,
@@ -107,22 +99,26 @@ pub enum InnerError {
column_name: String,
backtrace: Backtrace,
},
#[snafu(display("Failed to operate table, source: {}", source))]
TableOperation { source: BoxedError },
}
impl ErrorExt for InnerError {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
InnerError::Datafusion { .. }
| InnerError::PollStream { .. }
| InnerError::SchemaConversion { .. }
| InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery,
InnerError::RemoveColumnInIndex { .. } | InnerError::BuildColumnDescriptor { .. } => {
Error::Datafusion { .. }
| Error::PollStream { .. }
| Error::SchemaConversion { .. }
| Error::TableProjection { .. } => StatusCode::EngineExecuteQuery,
Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => {
StatusCode::InvalidArguments
}
InnerError::TablesRecordBatch { .. } => StatusCode::Unexpected,
InnerError::ColumnExists { .. } => StatusCode::TableColumnExists,
InnerError::SchemaBuild { source, .. } => source.status_code(),
InnerError::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
Error::TablesRecordBatch { .. } => StatusCode::Unexpected,
Error::ColumnExists { .. } => StatusCode::TableColumnExists,
Error::SchemaBuild { source, .. } => source.status_code(),
Error::TableOperation { source } => source.status_code(),
Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound,
}
}
@@ -135,20 +131,14 @@ impl ErrorExt for InnerError {
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
impl From<InnerError> for DataFusionError {
fn from(e: InnerError) -> DataFusionError {
impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
DataFusionError::External(Box::new(e))
}
}
impl From<InnerError> for RecordBatchError {
fn from(e: InnerError) -> RecordBatchError {
impl From<Error> for RecordBatchError {
fn from(e: Error) -> RecordBatchError {
RecordBatchError::External {
source: BoxedError::new(e),
}
@@ -163,7 +153,7 @@ mod tests {
Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)?
}
fn throw_column_exists_inner() -> std::result::Result<(), InnerError> {
fn throw_column_exists_inner() -> std::result::Result<(), Error> {
ColumnExistsSnafu {
column_name: "col",
table_name: "test",
@@ -172,7 +162,7 @@ mod tests {
}
fn throw_missing_column() -> Result<()> {
Ok(throw_column_exists_inner()?)
throw_column_exists_inner()
}
fn throw_arrow() -> Result<()> {