diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index fa828b9f33..16b2ee0878 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -163,6 +163,9 @@ pub enum Error { source: datatypes::error::Error, }, + #[snafu(display("Failure during SchemaProvider operation, source: {}", source))] + SchemaProviderOperation { source: BoxedError }, + #[snafu(display("Failed to execute system catalog table scan, source: {}", source))] SystemCatalogTableScanExec { #[snafu(backtrace)] @@ -240,7 +243,9 @@ impl ErrorExt for Error { Error::SystemCatalogTableScanExec { source } => source.status_code(), Error::InvalidTableSchema { source, .. } => source.status_code(), Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected, - Error::Internal { source, .. } => source.status_code(), + Error::Internal { source, .. } | Error::SchemaProviderOperation { source } => { + source.status_code() + } Error::Unimplemented { .. } => StatusCode::Unsupported, } @@ -263,7 +268,6 @@ impl From for DataFusionError { #[cfg(test)] mod tests { - use common_error::mock::MockError; use snafu::GenerateImplicitData; use super::*; @@ -284,22 +288,6 @@ mod tests { InvalidKeySnafu { key: None }.build().status_code() ); - assert_eq!( - StatusCode::StorageUnavailable, - Error::OpenSystemCatalog { - source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable)) - } - .status_code() - ); - - assert_eq!( - StatusCode::StorageUnavailable, - Error::CreateSystemCatalog { - source: table::error::Error::new(MockError::new(StatusCode::StorageUnavailable)) - } - .status_code() - ); - assert_eq!( StatusCode::StorageUnavailable, Error::SystemCatalog { diff --git a/src/common/error/src/ext.rs b/src/common/error/src/ext.rs index aca7f9e821..573411e535 100644 --- a/src/common/error/src/ext.rs +++ b/src/common/error/src/ext.rs @@ -33,72 +33,60 @@ pub trait ErrorExt: std::error::Error { fn as_any(&self) -> &dyn Any; } -/// A helper macro to define a opaque boxed error based on errors that implement [ErrorExt] trait. -#[macro_export] -macro_rules! define_opaque_error { - ($Error:ident) => { - /// An error behaves like `Box`. - /// - /// Define this error as a new type instead of using `Box` directly so we can implement - /// more methods or traits for it. - pub struct $Error { - inner: Box, - } - - impl $Error { - pub fn new(err: E) -> Self { - Self { - inner: Box::new(err), - } - } - } - - impl std::fmt::Debug for $Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // Use the pretty debug format of inner error for opaque error. - let debug_format = $crate::format::DebugFormat::new(&*self.inner); - debug_format.fmt(f) - } - } - - impl std::fmt::Display for $Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.inner) - } - } - - impl std::error::Error for $Error { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - self.inner.source() - } - } - - impl $crate::ext::ErrorExt for $Error { - fn status_code(&self) -> $crate::status_code::StatusCode { - self.inner.status_code() - } - - fn backtrace_opt(&self) -> Option<&$crate::snafu::Backtrace> { - self.inner.backtrace_opt() - } - - fn as_any(&self) -> &dyn std::any::Any { - self.inner.as_any() - } - } - - // Implement ErrorCompat for this opaque error so the backtrace is also available - // via `ErrorCompat::backtrace()`. - impl $crate::snafu::ErrorCompat for $Error { - fn backtrace(&self) -> Option<&$crate::snafu::Backtrace> { - self.inner.backtrace_opt() - } - } - }; +/// An opaque boxed error based on errors that implement [ErrorExt] trait. +pub struct BoxedError { + inner: Box, } -// Define a general boxed error. -define_opaque_error!(BoxedError); +impl BoxedError { + pub fn new(err: E) -> Self { + Self { + inner: Box::new(err), + } + } +} + +impl std::fmt::Debug for BoxedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Use the pretty debug format of inner error for opaque error. + let debug_format = crate::format::DebugFormat::new(&*self.inner); + debug_format.fmt(f) + } +} + +impl std::fmt::Display for BoxedError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl std::error::Error for BoxedError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.inner.source() + } +} + +impl crate::ext::ErrorExt for BoxedError { + fn status_code(&self) -> crate::status_code::StatusCode { + self.inner.status_code() + } + + fn backtrace_opt(&self) -> Option<&crate::snafu::Backtrace> { + self.inner.backtrace_opt() + } + + fn as_any(&self) -> &dyn std::any::Any { + self.inner.as_any() + } +} + +// Implement ErrorCompat for this opaque error so the backtrace is also available +// via `ErrorCompat::backtrace()`. +impl crate::snafu::ErrorCompat for BoxedError { + fn backtrace(&self) -> Option<&crate::snafu::Backtrace> { + self.inner.backtrace_opt() + } +} #[cfg(test)] mod tests { diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 72c70f085d..011e6de10f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -412,9 +412,10 @@ mod tests { use super::*; fn throw_query_error() -> std::result::Result<(), query::error::Error> { - Err(query::error::Error::new(MockError::with_backtrace( - StatusCode::Internal, - ))) + query::error::CatalogNotFoundSnafu { + catalog: String::new(), + } + .fail() } fn throw_catalog_error() -> catalog::error::Result<()> { @@ -428,6 +429,11 @@ mod tests { assert_eq!(StatusCode::Internal, err.status_code()); } + fn assert_invalid_argument_error(err: &Error) { + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); + } + fn assert_tonic_internal_error(err: Error) { let s: tonic::Status = err.into(); assert_eq!(s.code(), tonic::Code::Internal); @@ -436,7 +442,7 @@ mod tests { #[test] fn test_error() { let err = throw_query_error().context(ExecuteSqlSnafu).err().unwrap(); - assert_internal_error(&err); + assert_invalid_argument_error(&err); assert_tonic_internal_error(err); let err = throw_catalog_error() .context(NewCatalogSnafu) diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index c50bd2395d..b8518141f3 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -22,6 +22,7 @@ use api::v1::AlterExpr; use async_trait::async_trait; use client::{Database, RpcOutput}; use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_error::prelude::BoxedError; use common_query::error::Result as QueryResult; use common_query::logical_plan::Expr; use common_query::physical_plan::{PhysicalPlan, PhysicalPlanRef}; @@ -40,7 +41,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use meta_client::rpc::{Peer, TableName}; use snafu::prelude::*; use store_api::storage::RegionNumber; -use table::error::Error as TableError; +use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfoRef}; use table::requests::InsertRequest; use table::Table; @@ -83,12 +84,23 @@ impl Table for DistTable { } async fn insert(&self, request: InsertRequest) -> table::Result { - let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?; + let partition_rule = self + .find_partition_rule() + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; let spliter = WriteSpliter::with_partition_rule(partition_rule); - let inserts = spliter.split(request).map_err(TableError::new)?; + let inserts = spliter + .split(request) + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; - let output = self.dist_insert(inserts).await.map_err(TableError::new)?; + let output = self + .dist_insert(inserts) + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; let RpcOutput::AffectedRows(rows) = output else { unreachable!() }; Ok(rows) } @@ -99,15 +111,21 @@ impl Table for DistTable { filters: &[Expr], limit: Option, ) -> table::Result { - let partition_rule = self.find_partition_rule().await.map_err(TableError::new)?; + let partition_rule = self + .find_partition_rule() + .await + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; let regions = self .find_regions(partition_rule, filters) - .map_err(TableError::new)?; + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; let datanodes = self .find_datanodes(regions) .await - .map_err(TableError::new)?; + .map_err(BoxedError::new) + .context(TableOperationSnafu)?; let mut partition_execs = Vec::with_capacity(datanodes.len()); for (datanode, _regions) in datanodes.iter() { diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 8f0b3a6f25..2db2fee811 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -31,7 +31,7 @@ use table::engine::{EngineContext, TableEngine, TableReference}; use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; use table::table::TableRef; -use table::{Result as TableResult, Table}; +use table::{error as table_error, Result as TableResult, Table}; use tokio::sync::Mutex; use crate::config::EngineConfig; @@ -90,7 +90,11 @@ impl TableEngine for MitoEngine { ctx: &EngineContext, request: CreateTableRequest, ) -> TableResult { - Ok(self.inner.create_table(ctx, request).await?) + self.inner + .create_table(ctx, request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) } async fn open_table( @@ -98,7 +102,11 @@ impl TableEngine for MitoEngine { ctx: &EngineContext, request: OpenTableRequest, ) -> TableResult> { - Ok(self.inner.open_table(ctx, request).await?) + self.inner + .open_table(ctx, request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) } async fn alter_table( @@ -106,7 +114,11 @@ impl TableEngine for MitoEngine { ctx: &EngineContext, req: AlterTableRequest, ) -> TableResult { - Ok(self.inner.alter_table(ctx, req).await?) + self.inner + .alter_table(ctx, req) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) } fn get_table( @@ -126,7 +138,11 @@ impl TableEngine for MitoEngine { _ctx: &EngineContext, request: DropTableRequest, ) -> TableResult { - Ok(self.inner.drop_table(request).await?) + self.inner + .drop_table(request) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) } } @@ -437,14 +453,17 @@ impl MitoEngineInner { .open_region(&engine_ctx, ®ion_name, &opts) .await .map_err(BoxedError::new) - .context(error::OpenRegionSnafu { region_name })? + .context(table_error::TableOperationSnafu)? { None => return Ok(None), Some(region) => region, }; let table = Arc::new( - MitoTable::open(table_name, &table_dir, region, self.object_store.clone()).await?, + MitoTable::open(table_name, &table_dir, region, self.object_store.clone()) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?, ); self.tables diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index dc65d09507..9a93ef729e 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -27,13 +27,6 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("Failed to open region, region: {}, source: {}", region_name, source))] - OpenRegion { - region_name: String, - #[snafu(backtrace)] - source: BoxedError, - }, - #[snafu(display( "Failed to build table meta for table: {}, source: {}", table_name, @@ -179,12 +172,6 @@ pub enum Error { }, } -impl From for table::error::Error { - fn from(e: Error) -> Self { - table::error::Error::new(e) - } -} - pub type Result = std::result::Result; impl ErrorExt for Error { @@ -192,7 +179,7 @@ impl ErrorExt for Error { use Error::*; match self { - CreateRegion { source, .. } | OpenRegion { source, .. } => source.status_code(), + CreateRegion { source, .. } => source.status_code(), AlterTable { source, .. } => source.status_code(), @@ -243,12 +230,4 @@ mod tests { assert_eq!(StatusCode::InvalidArguments, err.status_code()); assert!(err.backtrace_opt().is_some()); } - - #[test] - pub fn test_opaque_error() { - let error = throw_create_table(StatusCode::InvalidSyntax).err().unwrap(); - let table_engine_error: table::error::Error = error.into(); - assert!(table_engine_error.backtrace_opt().is_some()); - assert_eq!(StatusCode::InvalidSyntax, table_engine_error.status_code()); - } } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 5489297228..12d4900294 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -36,7 +36,8 @@ use store_api::storage::{ AddColumn, AlterOperation, AlterRequest, ChunkReader, ReadContext, Region, RegionMeta, ScanRequest, SchemaRef, Snapshot, WriteContext, WriteRequest, }; -use table::error::{Error as TableError, Result as TableResult}; +use table::error as table_error; +use table::error::Result as TableResult; use table::metadata::{ FilterPushDownType, RawTableInfo, TableInfo, TableInfoRef, TableMeta, TableType, }; @@ -94,13 +95,17 @@ impl Table for MitoTable { columns_values ); - write_request.put(columns_values).map_err(TableError::new)?; + write_request + .put(columns_values) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; let _resp = self .region .write(&WriteContext::default(), write_request) .await - .map_err(TableError::new)?; + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; Ok(rows_num) } @@ -120,9 +125,16 @@ impl Table for MitoTable { _limit: Option, ) -> TableResult { let read_ctx = ReadContext::default(); - let snapshot = self.region.snapshot(&read_ctx).map_err(TableError::new)?; + let snapshot = self + .region + .snapshot(&read_ctx) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; - let projection = self.transform_projection(&self.region, projection.cloned())?; + let projection = self + .transform_projection(&self.region, projection.cloned()) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; let filters = filters.into(); let scan_request = ScanRequest { projection, @@ -132,7 +144,8 @@ impl Table for MitoTable { let mut reader = snapshot .scan(&read_ctx, scan_request) .await - .map_err(TableError::new)? + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)? .reader; let schema = reader.schema().clone(); @@ -158,7 +171,9 @@ impl Table for MitoTable { let mut new_meta = table_meta .builder_with_alter_kind(table_name, &req.alter_kind)? .build() - .context(error::BuildTableMetaSnafu { table_name })?; + .context(error::BuildTableMetaSnafu { table_name }) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; let alter_op = create_alter_operation(table_name, &req.alter_kind, &mut new_meta)?; @@ -182,7 +197,9 @@ impl Table for MitoTable { .await .context(UpdateTableManifestSnafu { table_name: &self.table_info().name, - })?; + }) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; // TODO(yingwen): Error handling. Maybe the region need to provide a method to // validate the request first. @@ -199,7 +216,11 @@ impl Table for MitoTable { table_name, alter_req, ); - region.alter(alter_req).await.map_err(TableError::new)?; + region + .alter(alter_req) + .await + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?; // Update in memory metadata of the table. self.set_table_info(new_info); diff --git a/src/promql/src/error.rs b/src/promql/src/error.rs index 5d98d29c7a..0ec503fa4c 100644 --- a/src/promql/src/error.rs +++ b/src/promql/src/error.rs @@ -16,18 +16,16 @@ use std::any::Any; use common_error::prelude::*; -common_error::define_opaque_error!(Error); - #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum InnerError { +pub enum Error { #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String, backtrace: Backtrace }, } -impl ErrorExt for InnerError { +impl ErrorExt for Error { fn status_code(&self) -> StatusCode { - use InnerError::*; + use Error::*; match self { UnsupportedExpr { .. } => StatusCode::InvalidArguments, } @@ -41,10 +39,4 @@ impl ErrorExt for InnerError { } } -impl From for Error { - fn from(e: InnerError) -> Error { - Error::new(e) - } -} - pub type Result = std::result::Result; diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 932cbe5149..bbe735f18c 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -21,6 +21,7 @@ mod planner; use std::sync::Arc; use catalog::CatalogListRef; +use common_error::prelude::BoxedError; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::udf::create_udf; use common_function::scalars::FunctionRef; @@ -33,14 +34,14 @@ use common_telemetry::timer; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::ExecutionPlan; use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{IntoError, OptionExt, ResultExt}; use sql::dialect::GenericDialect; use sql::parser::ParserContext; use sql::statements::statement::Statement; pub use crate::datafusion::catalog_adapter::DfCatalogListAdapter; use crate::datafusion::planner::{DfContextProviderAdapter, DfPlanner}; -use crate::error::Result; +use crate::error::{QueryExecutionSnafu, Result}; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; use crate::physical_optimizer::PhysicalOptimizer; @@ -71,9 +72,16 @@ impl QueryEngine for DatafusionQueryEngine { fn sql_to_statement(&self, sql: &str) -> Result { let mut statement = ParserContext::create_with_dialect(sql, &GenericDialect {}) - .context(error::ParseSqlSnafu)?; - ensure!(1 == statement.len(), error::MultipleStatementsSnafu { sql }); - Ok(statement.remove(0)) + .context(error::ParseSqlSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + if statement.len() != 1 { + Err(QueryExecutionSnafu {}.into_error(BoxedError::new( + error::MultipleStatementsSnafu { sql }.build(), + ))) + } else { + Ok(statement.remove(0)) + } } fn statement_to_plan( @@ -138,12 +146,14 @@ impl LogicalOptimizer for DatafusionQueryEngine { let _timer = timer!(metric::METRIC_OPTIMIZE_LOGICAL_ELAPSED); match plan { LogicalPlan::DfPlan(df_plan) => { - let optimized_plan = - self.state - .optimize(df_plan) - .context(error::DatafusionSnafu { - msg: "Fail to optimize logical plan", - })?; + let optimized_plan = self + .state + .optimize(df_plan) + .context(error::DatafusionSnafu { + msg: "Fail to optimize logical plan", + }) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; Ok(LogicalPlan::DfPlan(optimized_plan)) } @@ -161,18 +171,24 @@ impl PhysicalPlanner for DatafusionQueryEngine { let _timer = timer!(metric::METRIC_CREATE_PHYSICAL_ELAPSED); match logical_plan { LogicalPlan::DfPlan(df_plan) => { - let physical_plan = self.state.create_physical_plan(df_plan).await.context( - error::DatafusionSnafu { + let physical_plan = self + .state + .create_physical_plan(df_plan) + .await + .context(error::DatafusionSnafu { msg: "Fail to create physical plan", - }, - )?; + }) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; Ok(Arc::new(PhysicalPlanAdapter::new( Arc::new( physical_plan .schema() .try_into() - .context(error::ConvertSchemaSnafu)?, + .context(error::ConvertSchemaSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?, ), physical_plan, ))) @@ -192,15 +208,19 @@ impl PhysicalOptimizer for DatafusionQueryEngine { let new_plan = plan .as_any() .downcast_ref::() - .context(error::PhysicalPlanDowncastSnafu)? + .context(error::PhysicalPlanDowncastSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)? .df_plan(); - let new_plan = - self.state - .optimize_physical_plan(new_plan) - .context(error::DatafusionSnafu { - msg: "Fail to optimize physical plan", - })?; + let new_plan = self + .state + .optimize_physical_plan(new_plan) + .context(error::DatafusionSnafu { + msg: "Fail to optimize physical plan", + }) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; Ok(Arc::new(PhysicalPlanAdapter::new(plan.schema(), new_plan))) } } @@ -217,20 +237,26 @@ impl QueryExecutor for DatafusionQueryEngine { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), 1 => Ok(plan .execute(0, ctx.state().task_ctx()) - .context(error::ExecutePhysicalPlanSnafu)?), + .context(error::ExecutePhysicalPlanSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu))?, _ => { // merge into a single partition let plan = CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(plan.clone()))); // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); - let df_stream = - plan.execute(0, ctx.state().task_ctx()) - .context(error::DatafusionSnafu { - msg: "Failed to execute DataFusion merge exec", - })?; + let df_stream = plan + .execute(0, ctx.state().task_ctx()) + .context(error::DatafusionSnafu { + msg: "Failed to execute DataFusion merge exec", + }) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; let stream = RecordBatchStreamAdapter::try_new(df_stream) - .context(error::ConvertDfRecordBatchStreamSnafu)?; + .context(error::ConvertDfRecordBatchStreamSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; Ok(Box::pin(stream)) } } diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index 9957bca99a..ce16df8c65 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -17,10 +17,11 @@ use std::any::Any; use std::sync::Arc; -use catalog::error::Error; +use catalog::error::{self as catalog_error, Error}; use catalog::{ CatalogListRef, CatalogProvider, CatalogProviderRef, SchemaProvider, SchemaProviderRef, }; +use common_error::prelude::BoxedError; use datafusion::catalog::catalog::{ CatalogList as DfCatalogList, CatalogProvider as DfCatalogProvider, }; @@ -224,7 +225,9 @@ impl SchemaProvider for SchemaProviderAdapter { .register_table(name, table_provider) .context(error::DatafusionSnafu { msg: "Fail to register table to datafusion", - })? + }) + .map_err(BoxedError::new) + .context(catalog_error::SchemaProviderOperationSnafu)? .map(|_| table)) } @@ -233,9 +236,14 @@ impl SchemaProvider for SchemaProviderAdapter { .deregister_table(name) .context(error::DatafusionSnafu { msg: "Fail to deregister table from datafusion", - })? + }) + .map_err(BoxedError::new) + .context(catalog_error::SchemaProviderOperationSnafu)? .map(|table| { - let adapter = TableAdapter::new(table).context(error::TableSchemaMismatchSnafu)?; + let adapter = TableAdapter::new(table) + .context(error::TableSchemaMismatchSnafu) + .map_err(BoxedError::new) + .context(catalog_error::SchemaProviderOperationSnafu)?; Ok(Arc::new(adapter) as _) }) .transpose() diff --git a/src/query/src/datafusion/error.rs b/src/query/src/datafusion/error.rs index 526973d228..9b7cdf8ced 100644 --- a/src/query/src/datafusion/error.rs +++ b/src/query/src/datafusion/error.rs @@ -17,8 +17,6 @@ use std::any::Any; use common_error::prelude::*; use datafusion::error::DataFusionError; -use crate::error::Error; - /// Inner error of datafusion based query engine. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -106,23 +104,8 @@ impl ErrorExt for InnerError { } } -impl From for catalog::error::Error { - fn from(e: InnerError) -> Self { - catalog::error::Error::RegisterTable { - source: BoxedError::new(e), - } - } -} - -impl From for Error { - fn from(err: InnerError) -> Self { - Self::new(err) - } -} - #[cfg(test)] mod tests { - use common_error::mock::MockError; use super::*; @@ -168,15 +151,4 @@ mod tests { let sql_err = raise_sql_error().err().unwrap(); assert_eq!(sql_err.status_code(), err.status_code()); } - - #[test] - pub fn test_from_inner_error() { - let err = InnerError::TableSchemaMismatch { - source: table::error::Error::new(MockError::new(StatusCode::Unexpected)), - }; - - let catalog_error = catalog::error::Error::from(err); - // [InnerError] to [catalog::error::Error] is considered as Internal error - assert_eq!(StatusCode::Internal, catalog_error.status_code()); - } } diff --git a/src/query/src/datafusion/planner.rs b/src/query/src/datafusion/planner.rs index 6ca3223ab8..95fc3016b4 100644 --- a/src/query/src/datafusion/planner.rs +++ b/src/query/src/datafusion/planner.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_error::prelude::BoxedError; use common_query::logical_plan::create_aggregate_function; use datafusion::catalog::TableReference; use datafusion::error::Result as DfResult; @@ -30,7 +31,7 @@ use sql::statements::query::Query; use sql::statements::statement::Statement; use crate::datafusion::error; -use crate::error::Result; +use crate::error::{QueryPlanSnafu, Result}; use crate::plan::LogicalPlan; use crate::planner::Planner; use crate::query_engine::QueryEngineState; @@ -53,7 +54,9 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { let result = self .sql_to_rel .query_to_plan(query.inner, &mut PlannerContext::default()) - .context(error::PlanSqlSnafu { sql })?; + .context(error::PlanSqlSnafu { sql }) + .map_err(BoxedError::new) + .context(QueryPlanSnafu)?; Ok(LogicalPlan::DfPlan(result)) } @@ -65,7 +68,9 @@ impl<'a, S: ContextProvider + Send + Sync> DfPlanner<'a, S> { .sql_statement_to_plan(explain.inner.clone()) .context(error::PlanSqlSnafu { sql: explain.to_string(), - })?; + }) + .map_err(BoxedError::new) + .context(QueryPlanSnafu)?; Ok(LogicalPlan::DfPlan(result)) } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index 4ca16f10f8..5ae1f01522 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -18,11 +18,9 @@ use common_error::prelude::*; use datafusion::error::DataFusionError; use snafu::{Backtrace, ErrorCompat, Snafu}; -common_error::define_opaque_error!(Error); - #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum InnerError { +pub enum Error { #[snafu(display("Unsupported expr type: {}", name))] UnsupportedExpr { name: String, backtrace: Backtrace }, @@ -58,11 +56,17 @@ pub enum InnerError { #[snafu(backtrace)] source: common_recordbatch::error::Error, }, + + #[snafu(display("Failure during query execution, source: {}", source))] + QueryExecution { source: BoxedError }, + + #[snafu(display("Failure during query planning, source: {}", source))] + QueryPlan { source: BoxedError }, } -impl ErrorExt for InnerError { +impl ErrorExt for Error { fn status_code(&self) -> StatusCode { - use InnerError::*; + use Error::*; match self { UnsupportedExpr { .. } @@ -72,6 +76,7 @@ impl ErrorExt for InnerError { Catalog { source } => source.status_code(), VectorComputation { source } => source.status_code(), CreateRecordBatch { source } => source.status_code(), + QueryExecution { source } | QueryPlan { source } => source.status_code(), } } @@ -84,12 +89,6 @@ impl ErrorExt for InnerError { } } -impl From for Error { - fn from(e: InnerError) -> Error { - Error::new(e) - } -} - pub type Result = std::result::Result; impl From for DataFusionError { @@ -97,9 +96,3 @@ impl From for DataFusionError { DataFusionError::External(Box::new(e)) } } - -impl From for Error { - fn from(e: catalog::error::Error) -> Self { - Error::new(e) - } -} diff --git a/src/query/src/sql.rs b/src/query/src/sql.rs index 7a68b52b69..e8e95f1328 100644 --- a/src/query/src/sql.rs +++ b/src/query/src/sql.rs @@ -292,9 +292,9 @@ mod test { let stmt = DescribeTable::new("unknown".to_string(), schema_name, table_name.to_string()); let err = describe_table(stmt, catalog_manager).err().unwrap(); - let err = err.as_any().downcast_ref::().unwrap(); + let err = err.as_any().downcast_ref::().unwrap(); - if let error::InnerError::CatalogNotFound { catalog, .. } = err { + if let error::Error::CatalogNotFound { catalog, .. } = err { assert_eq!(catalog, "unknown"); } else { panic!("describe table returned incorrect error"); @@ -320,9 +320,9 @@ mod test { let stmt = DescribeTable::new(catalog_name, "unknown".to_string(), table_name.to_string()); let err = describe_table(stmt, catalog_manager).err().unwrap(); - let err = err.as_any().downcast_ref::().unwrap(); + let err = err.as_any().downcast_ref::().unwrap(); - if let error::InnerError::SchemaNotFound { schema, .. } = err { + if let error::Error::SchemaNotFound { schema, .. } = err { assert_eq!(schema, "unknown"); } else { panic!("describe table returned incorrect error"); @@ -348,9 +348,9 @@ mod test { let stmt = DescribeTable::new(catalog_name, schema_name, "unknown".to_string()); let err = describe_table(stmt, catalog_manager).err().unwrap(); - let err = err.as_any().downcast_ref::().unwrap(); + let err = err.as_any().downcast_ref::().unwrap(); - if let error::InnerError::TableNotFound { table, .. } = err { + if let error::Error::TableNotFound { table, .. } = err { assert_eq!(table, "unknown"); } else { panic!("describe table returned incorrect error"); diff --git a/src/query/tests/query_engine_test.rs b/src/query/tests/query_engine_test.rs index 010bee1176..86c25431dd 100644 --- a/src/query/tests/query_engine_test.rs +++ b/src/query/tests/query_engine_test.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use catalog::local::{MemoryCatalogProvider, MemorySchemaProvider}; use catalog::{CatalogList, CatalogProvider, SchemaProvider}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_error::prelude::BoxedError; use common_query::prelude::{create_udf, make_scalar_function, Volatility}; use common_query::Output; use common_recordbatch::{util, RecordBatch}; @@ -32,10 +33,11 @@ use datafusion_expr::logical_plan::builder::LogicalPlanBuilder; use datatypes::prelude::*; use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::UInt32Vector; -use query::error::Result; +use query::error::{QueryExecutionSnafu, Result}; use query::plan::LogicalPlan; use query::query_engine::QueryEngineFactory; use session::context::QueryContext; +use snafu::ResultExt; use table::table::adapter::DfTableProviderAdapter; use table::table::numbers::NumbersTable; use table::test_util::MemTable; @@ -45,7 +47,9 @@ use crate::pow::pow; #[tokio::test] async fn test_datafusion_query_engine() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = catalog::local::new_memory_catalog_list()?; + let catalog_list = catalog::local::new_memory_catalog_list() + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; let factory = QueryEngineFactory::new(catalog_list); let engine = factory.query_engine(); @@ -105,7 +109,9 @@ async fn test_datafusion_query_engine() -> Result<()> { #[tokio::test] async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); - let catalog_list = catalog::local::new_memory_catalog_list()?; + let catalog_list = catalog::local::new_memory_catalog_list() + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; let default_schema = Arc::new(MemorySchemaProvider::new()); default_schema diff --git a/src/script/src/python/error.rs b/src/script/src/python/error.rs index 05ecdc2ac2..70e3b89ab3 100644 --- a/src/script/src/python/error.rs +++ b/src/script/src/python/error.rs @@ -227,20 +227,21 @@ pub fn get_error_reason_loc(err: &Error) -> (String, Option) { #[cfg(test)] mod tests { - use common_error::mock::MockError; use snafu::ResultExt; use super::*; fn throw_query_error() -> query::error::Result<()> { - let mock_err = MockError::with_backtrace(StatusCode::TableColumnNotFound); - Err(query::error::Error::new(mock_err)) + query::error::TableNotFoundSnafu { + table: String::new(), + } + .fail() } #[test] fn test_error() { let err = throw_query_error().context(DatabaseQuerySnafu).unwrap_err(); - assert_eq!(StatusCode::TableColumnNotFound, err.status_code()); + assert_eq!(StatusCode::InvalidArguments, err.status_code()); assert!(err.backtrace_opt().is_some()); } } diff --git a/src/table/src/error.rs b/src/table/src/error.rs index 0b1b424e86..32e31a5250 100644 --- a/src/table/src/error.rs +++ b/src/table/src/error.rs @@ -19,20 +19,12 @@ use common_recordbatch::error::Error as RecordBatchError; use datafusion::error::DataFusionError; use datatypes::arrow::error::ArrowError; -common_error::define_opaque_error!(Error); - pub type Result = std::result::Result; -impl From for DataFusionError { - fn from(e: Error) -> Self { - Self::External(Box::new(e)) - } -} - /// Default error implementation of table. #[derive(Debug, Snafu)] #[snafu(visibility(pub))] -pub enum InnerError { +pub enum Error { #[snafu(display("Datafusion error: {}", source))] Datafusion { source: DataFusionError, @@ -107,22 +99,26 @@ pub enum InnerError { column_name: String, backtrace: Backtrace, }, + + #[snafu(display("Failed to operate table, source: {}", source))] + TableOperation { source: BoxedError }, } -impl ErrorExt for InnerError { +impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { - InnerError::Datafusion { .. } - | InnerError::PollStream { .. } - | InnerError::SchemaConversion { .. } - | InnerError::TableProjection { .. } => StatusCode::EngineExecuteQuery, - InnerError::RemoveColumnInIndex { .. } | InnerError::BuildColumnDescriptor { .. } => { + Error::Datafusion { .. } + | Error::PollStream { .. } + | Error::SchemaConversion { .. } + | Error::TableProjection { .. } => StatusCode::EngineExecuteQuery, + Error::RemoveColumnInIndex { .. } | Error::BuildColumnDescriptor { .. } => { StatusCode::InvalidArguments } - InnerError::TablesRecordBatch { .. } => StatusCode::Unexpected, - InnerError::ColumnExists { .. } => StatusCode::TableColumnExists, - InnerError::SchemaBuild { source, .. } => source.status_code(), - InnerError::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, + Error::TablesRecordBatch { .. } => StatusCode::Unexpected, + Error::ColumnExists { .. } => StatusCode::TableColumnExists, + Error::SchemaBuild { source, .. } => source.status_code(), + Error::TableOperation { source } => source.status_code(), + Error::ColumnNotExists { .. } => StatusCode::TableColumnNotFound, } } @@ -135,20 +131,14 @@ impl ErrorExt for InnerError { } } -impl From for Error { - fn from(err: InnerError) -> Self { - Self::new(err) - } -} - -impl From for DataFusionError { - fn from(e: InnerError) -> DataFusionError { +impl From for DataFusionError { + fn from(e: Error) -> DataFusionError { DataFusionError::External(Box::new(e)) } } -impl From for RecordBatchError { - fn from(e: InnerError) -> RecordBatchError { +impl From for RecordBatchError { + fn from(e: Error) -> RecordBatchError { RecordBatchError::External { source: BoxedError::new(e), } @@ -163,7 +153,7 @@ mod tests { Err(DataFusionError::NotImplemented("table test".to_string())).context(DatafusionSnafu)? } - fn throw_column_exists_inner() -> std::result::Result<(), InnerError> { + fn throw_column_exists_inner() -> std::result::Result<(), Error> { ColumnExistsSnafu { column_name: "col", table_name: "test", @@ -172,7 +162,7 @@ mod tests { } fn throw_missing_column() -> Result<()> { - Ok(throw_column_exists_inner()?) + throw_column_exists_inner() } fn throw_arrow() -> Result<()> {