refactor: Refactor usage of BoxedError (#48)

* feat: Define a general boxed error

* refactor: common_function use Error in common_query

* feat: Add tests to define_opaque_error macro

* refactor: Refactor table and table engine error

* refactor: recordbatch remove arrow dev-dependency

* refactor: datanode crate use common_error::BoxedError

* chore: Fix clippy

* feat: Returning source status code when using BoxedError

* test: Fix opaque error test

* test: Add tests for table::Error & table_engine::Error

* test: Add test for RecordBatch::new()

* test: Remove generated tests from define_opaque_error

* chore: Address cr comment
This commit is contained in:
evenyag
2022-06-21 15:24:45 +08:00
committed by GitHub
parent 4071b0cff2
commit 6ec870625f
18 changed files with 268 additions and 191 deletions

View File

@@ -83,101 +83,64 @@ macro_rules! define_opaque_error {
};
}
// Define a general boxed error.
define_opaque_error!(BoxedError);
#[cfg(test)]
mod tests {
use std::error::Error as StdError;
use std::error::Error;
use snafu::ErrorCompat;
use super::*;
use crate::prelude::*;
use crate::format::DebugFormat;
use crate::mock::MockError;
define_opaque_error!(Error);
#[test]
fn test_opaque_error_without_backtrace() {
let err = BoxedError::new(MockError::new(StatusCode::Internal));
assert!(err.backtrace_opt().is_none());
assert_eq!(StatusCode::Internal, err.status_code());
assert!(err.as_any().downcast_ref::<MockError>().is_some());
assert!(err.source().is_none());
#[derive(Debug, Snafu)]
enum InnerError {
#[snafu(display("This is a leaf error, val: {}", val))]
Leaf { val: i32, backtrace: Backtrace },
#[snafu(display("This is an internal error"))]
Internal {
source: std::io::Error,
backtrace: Backtrace,
},
}
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
StatusCode::Internal
}
fn backtrace_opt(&self) -> Option<&snafu::Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<InnerError> for Error {
fn from(err: InnerError) -> Self {
Self::new(err)
}
}
fn throw_leaf() -> std::result::Result<(), InnerError> {
LeafSnafu { val: 10 }.fail()
}
fn throw_io() -> std::result::Result<(), std::io::Error> {
Err(std::io::Error::new(std::io::ErrorKind::Other, "oh no!"))
}
fn throw_internal() -> std::result::Result<(), InnerError> {
throw_io().context(InternalSnafu)
assert!(ErrorCompat::backtrace(&err).is_none());
}
#[test]
fn test_inner_error() {
let leaf = throw_leaf().err().unwrap();
assert!(leaf.backtrace_opt().is_some());
assert!(leaf.source().is_none());
fn test_opaque_error_with_backtrace() {
let err = BoxedError::new(MockError::with_backtrace(StatusCode::Internal));
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Internal, err.status_code());
assert!(err.as_any().downcast_ref::<MockError>().is_some());
assert!(err.source().is_none());
let internal = throw_internal().err().unwrap();
assert!(internal.backtrace_opt().is_some());
assert!(internal.source().is_some());
}
assert!(ErrorCompat::backtrace(&err).is_some());
#[test]
fn test_opaque_error() {
// Test leaf error.
let err: Error = throw_leaf().map_err(Into::into).err().unwrap();
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
assert!(ErrorCompat::backtrace(&err).is_some());
let msg = err.to_string();
msg.contains("Internal");
}
#[test]
fn test_opaque_error_with_source() {
let leaf_err = MockError::with_backtrace(StatusCode::Internal);
let internal_err = MockError::with_source(leaf_err);
let err = BoxedError::new(internal_err);
assert!(err.backtrace_opt().is_some());
assert_eq!("This is a leaf error, val: 10", err.to_string());
assert_eq!(StatusCode::Internal, err.status_code());
assert!(err.as_any().downcast_ref::<MockError>().is_some());
assert!(err.source().is_some());
err.as_any().downcast_ref::<InnerError>().unwrap();
// Test internal error.
let err: Error = throw_internal().map_err(Into::into).err().unwrap();
let msg = format!("{:?}", err);
assert!(msg.contains("\nBacktrace:\n"));
assert!(msg.contains("Caused by"));
let fmt_msg = format!("{:?}", DebugFormat::new(&err));
assert_eq!(msg, fmt_msg);
assert!(ErrorCompat::backtrace(&err).is_some());
assert!(err.backtrace_opt().is_some());
assert_eq!("This is an internal error", err.to_string());
assert_eq!(StatusCode::Internal, err.status_code());
err.as_any().downcast_ref::<InnerError>().unwrap();
}
}

View File

@@ -6,7 +6,7 @@ pub mod status_code;
pub mod prelude {
pub use snafu::{prelude::*, Backtrace, ErrorCompat};
pub use crate::ext::ErrorExt;
pub use crate::ext::{BoxedError, ErrorExt};
pub use crate::format::DebugFormat;
pub use crate::status_code::StatusCode;
}

View File

@@ -1,13 +1,8 @@
use std::any::Any;
use common_error::prelude::*;
use common_query::error::Error as QueryError;
pub use common_query::error::{Error, Result};
use datatypes::error::Error as DataTypeError;
use snafu::GenerateImplicitData;
common_error::define_opaque_error!(Error);
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -35,17 +30,10 @@ impl From<InnerError> for Error {
}
}
impl From<Error> for QueryError {
fn from(err: Error) -> Self {
QueryError::External {
msg: err.to_string(),
backtrace: Backtrace::generate(),
}
}
}
#[cfg(test)]
mod tests {
use snafu::GenerateImplicitData;
use super::*;
fn raise_datatype_error() -> std::result::Result<(), DataTypeError> {
@@ -57,13 +45,11 @@ mod tests {
#[test]
fn test_get_scalar_vector_error() {
let err = raise_datatype_error()
let err: Error = raise_datatype_error()
.context(GetScalarVectorSnafu)
.err()
.unwrap();
.unwrap()
.into();
assert!(err.backtrace_opt().is_some());
let query_error = QueryError::from(Error::from(err));
assert!(matches!(query_error, QueryError::External { .. }));
}
}

View File

@@ -41,13 +41,15 @@ pub fn create_udf(func: FunctionRef) -> ScalarUdf {
let result = func_cloned.eval(func_ctx, &args.context(FromScalarValueSnafu)?);
if len.is_some() {
result.map(ColumnarValue::Vector).map_err(|e| e.into())
let udf = if len.is_some() {
result.map(ColumnarValue::Vector)?
} else {
ScalarValue::try_from_array(&result?.to_arrow_array(), 0)
.map(ColumnarValue::Scalar)
.context(ExecuteFunctionSnafu)
}
.context(ExecuteFunctionSnafu)?
};
Ok(udf)
});
ScalarUdf::new(func.name(), &func.signature(), &return_type, &fun)

View File

@@ -5,38 +5,39 @@ use common_error::prelude::*;
use datafusion_common::DataFusionError;
use datatypes::error::Error as DataTypeError;
common_error::define_opaque_error!(Error);
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
pub enum InnerError {
#[snafu(display("Fail to execute function, source: {}", source))]
ExecuteFunction {
source: DataFusionError,
backtrace: Backtrace,
},
#[snafu(display("Fail to cast scalar value into vector: {}", source))]
FromScalarValue {
#[snafu(backtrace)]
source: DataTypeError,
},
#[snafu(display("Fail to cast arrow array into vector: {:?}, {}", data_type, source))]
IntoVector {
#[snafu(backtrace)]
source: DataTypeError,
data_type: ArrowDatatype,
},
#[snafu(display("External error: {}, {}", msg, backtrace))]
External { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
match self {
Error::ExecuteFunction { .. } => StatusCode::EngineExecuteQuery,
Error::IntoVector { source, .. } => source.status_code(),
Error::FromScalarValue { source } => source.status_code(),
Error::External { .. } => StatusCode::Internal,
InnerError::ExecuteFunction { .. } => StatusCode::EngineExecuteQuery,
InnerError::IntoVector { source, .. } => source.status_code(),
InnerError::FromScalarValue { source } => source.status_code(),
}
}
@@ -49,6 +50,12 @@ impl ErrorExt for Error {
}
}
impl From<InnerError> for Error {
fn from(e: InnerError) -> Error {
Error::new(e)
}
}
impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
DataFusionError::External(Box::new(e))
@@ -66,16 +73,18 @@ mod tests {
}
fn assert_error(err: &Error, code: StatusCode) {
assert_eq!(code, err.status_code());
assert!(err.backtrace_opt().is_some());
let inner_err = err.as_any().downcast_ref::<InnerError>().unwrap();
assert_eq!(code, inner_err.status_code());
assert!(inner_err.backtrace_opt().is_some());
}
#[test]
fn test_datafusion_as_source() {
let err = throw_df_error()
.context(ExecuteFunctionSnafu {})
let err: Error = throw_df_error()
.context(ExecuteFunctionSnafu)
.err()
.unwrap();
.unwrap()
.into();
assert_error(&err, StatusCode::EngineExecuteQuery);
}
@@ -88,12 +97,13 @@ mod tests {
#[test]
fn test_into_vector_error() {
let err = raise_datatype_error()
let err: Error = raise_datatype_error()
.context(IntoVectorSnafu {
data_type: ArrowDatatype::Int32,
})
.err()
.unwrap();
.unwrap()
.into();
assert!(err.backtrace_opt().is_some());
let datatype_err = raise_datatype_error().err().unwrap();
assert_eq!(datatype_err.status_code(), err.status_code());

View File

@@ -62,9 +62,9 @@ where
if len.is_some() {
result.map(ColumnarValue::Vector)
} else {
ScalarValue::try_from_array(&result?.to_arrow_array(), 0)
Ok(ScalarValue::try_from_array(&result?.to_arrow_array(), 0)
.map(ColumnarValue::Scalar)
.context(ExecuteFunctionSnafu)
.context(ExecuteFunctionSnafu)?)
}
})
}

View File

@@ -4,6 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
common-error = { path = "../error" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
datatypes = { path = "../../datatypes" }
@@ -12,13 +13,6 @@ paste = "1.0"
serde = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
[dev-dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute", "serde_types"]
[dev-dependencies]
serde_json = "1.0"
tokio = { version = "1.18", features = ["full"] }

View File

@@ -1,20 +1,40 @@
use datatypes::arrow::error::ArrowError;
use snafu::{Backtrace, Snafu};
//! Error of record batch.
use std::any::Any;
// TODO(dennis): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
use common_error::prelude::*;
common_error::define_opaque_error!(Error);
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Arrow error: {}", source))]
Arrow {
source: ArrowError,
pub enum InnerError {
#[snafu(display("Fail to create datafusion record batch, source: {}", source))]
NewDfRecordBatch {
source: datatypes::arrow::error::ArrowError,
backtrace: Backtrace,
},
#[snafu(display("Storage error: {}, source: {}", msg, source))]
Storage { source: BoxedError, msg: String },
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
match self {
InnerError::NewDfRecordBatch { .. } => StatusCode::InvalidArguments,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl From<InnerError> for Error {
fn from(e: InnerError) -> Error {
Error::new(e)
}
}

View File

@@ -1,8 +1,11 @@
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::SchemaRef;
use datatypes::vectors::Helper;
use datatypes::vectors::{Helper, VectorRef};
use serde::ser::{Error, SerializeStruct};
use serde::{Serialize, Serializer};
use snafu::ResultExt;
use crate::error::{self, Result};
#[derive(Clone, Debug, PartialEq)]
pub struct RecordBatch {
@@ -10,8 +13,25 @@ pub struct RecordBatch {
pub df_recordbatch: DfRecordBatch,
}
impl RecordBatch {
pub fn new<I: IntoIterator<Item = VectorRef>>(
schema: SchemaRef,
columns: I,
) -> Result<RecordBatch> {
let arrow_arrays = columns.into_iter().map(|v| v.to_arrow_array()).collect();
let df_recordbatch = DfRecordBatch::try_new(schema.arrow_schema().clone(), arrow_arrays)
.context(error::NewDfRecordBatchSnafu)?;
Ok(RecordBatch {
schema,
df_recordbatch,
})
}
}
impl Serialize for RecordBatch {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
@@ -23,7 +43,7 @@ impl Serialize for RecordBatch {
let vec = df_columns
.iter()
.map(|c| Helper::try_into_vector(c.clone())?.serialize_to_json())
.collect::<Result<Vec<_>, _>>()
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(S::Error::custom)?;
s.serialize_field("columns", &vec)?;
@@ -35,14 +55,38 @@ impl Serialize for RecordBatch {
mod tests {
use std::sync::Arc;
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion_common::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::UInt32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::schema::Schema;
use datatypes::vectors::{UInt32Vector, Vector};
use super::*;
#[test]
fn test_new_record_batch() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt32, false),
]));
let schema = Arc::new(Schema::try_from(arrow_schema).unwrap());
let v = Arc::new(UInt32Vector::from_slice(&[1, 2, 3]));
let columns: Vec<VectorRef> = vec![v.clone(), v.clone()];
let batch = RecordBatch::new(schema.clone(), columns).unwrap();
let expect = v.to_arrow_array();
for column in batch.df_recordbatch.columns() {
let array = column.as_any().downcast_ref::<UInt32Array>().unwrap();
assert_eq!(
expect.as_any().downcast_ref::<UInt32Array>().unwrap(),
array
);
}
assert_eq!(schema, batch.schema);
}
#[test]
pub fn test_serialize_recordbatch() {
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
@@ -64,12 +108,10 @@ mod tests {
df_recordbatch: df_batch,
};
let mut output = vec![];
let mut serializer = serde_json::Serializer::new(&mut output);
batch.serialize(&mut serializer).unwrap();
let output = serde_json::to_string(&batch).unwrap();
assert_eq!(
r#"{"schema":{"fields":[{"name":"number","data_type":"UInt32","is_nullable":false,"metadata":{}}],"metadata":{}},"columns":[[0,1,2,3,4,5,6,7,8,9]]}"#,
String::from_utf8_lossy(&output)
output
);
}
}

View File

@@ -12,10 +12,10 @@ mod tests {
use std::pin::Pin;
use std::sync::Arc;
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion_common::field_util::SchemaExt;
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::arrow::array::UInt32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::schema::Schema;
use datatypes::schema::SchemaRef;
use futures::task::{Context, Poll};

View File

@@ -1,13 +1,11 @@
use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::*;
use datatypes::prelude::ConcreteDataType;
use table::error::Error as TableError;
use table_engine::error::Error as TableEngineError;
// TODO(dennis): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
/// Business error of datanode.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
@@ -27,12 +25,14 @@ pub enum Error {
#[snafu(display("Fail to create table: {}, {}", table_name, source))]
CreateTable {
table_name: String,
#[snafu(backtrace)]
source: TableEngineError,
},
#[snafu(display("Fail to get table: {}, {}", table_name, source))]
GetTable {
table_name: String,
#[snafu(backtrace)]
source: BoxedError,
},
@@ -94,7 +94,7 @@ impl ErrorExt for Error {
// TODO(yingwen): Further categorize http error.
Error::StartHttp { .. } | Error::ParseAddr { .. } => StatusCode::Internal,
Error::CreateTable { source, .. } => source.status_code(),
Error::GetTable { .. } => StatusCode::Internal,
Error::GetTable { source, .. } => source.status_code(),
Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
Error::ColumnValuesNumberMismatch { .. }

View File

@@ -1,6 +1,7 @@
//! sql handler
mod insert;
use common_error::ext::BoxedError;
use query::catalog::schema::SchemaProviderRef;
use query::query_engine::Output;
use snafu::{OptionExt, ResultExt};
@@ -34,7 +35,7 @@ impl<Engine: TableEngine> SqlHandler<Engine> {
pub(crate) fn get_table(&self, table_name: &str) -> Result<TableRef> {
self.table_engine
.get_table(&EngineContext::default(), table_name)
.map_err(|e| Box::new(e) as _)
.map_err(BoxedError::new)
.context(GetTableSnafu { table_name })?
.context(TableNotFoundSnafu { table_name })
}

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use std::sync::RwLock;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use snafu::ResultExt;
use store_api::storage::ConcreteDataType;
use store_api::storage::{
@@ -18,7 +19,7 @@ use table::{
table::TableRef,
};
use crate::error::{CreateTableSnafu, Error, Result};
use crate::error::{self, Error, Result};
use crate::table::MitoTable;
pub const DEFAULT_ENGINE: &str = "mito";
@@ -150,8 +151,8 @@ impl<Store: StorageEngine> MitoEngineInner<Store> {
},
)
.await
.map_err(|e| Box::new(e) as _)
.context(CreateTableSnafu)?;
.map_err(BoxedError::new)
.context(error::CreateRegionSnafu)?;
// Use region meta schema instead of request schema
let table_meta = TableMetaBuilder::new(region.in_memory_metadata().schema().clone())

View File

@@ -1,17 +1,15 @@
use std::any::Any;
use common_error::ext::BoxedError;
use common_error::prelude::*;
// TODO(dennis): use ErrorExt instead.
pub type BoxedError = Box<dyn std::error::Error + Send + Sync>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Fail to create table, source: {}", source))]
CreateTable {
#[snafu(display("Fail to create region, source: {}", source))]
CreateRegion {
#[snafu(backtrace)]
source: BoxedError,
backtrace: Backtrace,
},
}
@@ -20,8 +18,7 @@ pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
//TODO: should return the source's status code after use ErrorExt in BoxedError.
Error::CreateTable { .. } => StatusCode::InvalidArguments,
Error::CreateRegion { source, .. } => source.status_code(),
}
}
@@ -33,3 +30,25 @@ impl ErrorExt for Error {
self
}
}
#[cfg(test)]
mod tests {
use common_error::ext::BoxedError;
use common_error::mock::MockError;
use super::*;
fn throw_create_table(code: StatusCode) -> Result<()> {
let mock_err = MockError::with_backtrace(code);
Err(BoxedError::new(mock_err)).context(CreateRegionSnafu)
}
#[test]
fn test_error() {
let err = throw_create_table(StatusCode::InvalidArguments)
.err()
.unwrap();
assert_eq!(StatusCode::InvalidArguments, err.status_code());
assert!(err.backtrace_opt().is_some());
}
}

View File

@@ -1,20 +1,18 @@
#[cfg(test)]
pub mod test;
use std::any::Any;
use std::pin::Pin;
use async_trait::async_trait;
use common_query::logical_plan::Expr;
use common_recordbatch::error::{Result as RecordBatchResult, StorageSnafu};
use common_recordbatch::error::{Error as RecordBatchError, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use futures::task::{Context, Poll};
use futures::Stream;
use snafu::OptionExt;
use snafu::ResultExt;
use store_api::storage::SchemaRef;
use store_api::storage::{
ChunkReader, PutOperation, ReadContext, Region, ScanRequest, Snapshot, WriteContext,
ChunkReader, PutOperation, ReadContext, Region, ScanRequest, SchemaRef, Snapshot, WriteContext,
WriteRequest,
};
use table::error::{Error as TableError, MissingColumnSnafu, Result as TableResult};
@@ -110,27 +108,9 @@ impl<R: Region> Table for MitoTable<R> {
for chunk in reader.next_chunk()
.await
.map_err(|e| Box::new(e) as _)
.context(StorageSnafu {
msg: "Fail to reader chunk",
})?
.map_err(RecordBatchError::new)?
{
let batch = DfRecordBatch::try_new(
stream_schema.arrow_schema().clone(),
chunk.columns
.into_iter()
.map(|v| v.to_arrow_array())
.collect());
let batch = batch
.map_err(|e| Box::new(e) as _)
.context(StorageSnafu {
msg: "Fail to new datafusion record batch",
})?;
yield RecordBatch {
schema: stream_schema.clone(),
df_recordbatch: batch,
}
yield RecordBatch::new(stream_schema.clone(), chunk.columns)?
}
});

View File

@@ -1,7 +1,9 @@
use std::any::Any;
use common_error::prelude::*;
use common_recordbatch::error::Error as RecordBatchError;
use datafusion::error::DataFusionError;
use datatypes::arrow::error::ArrowError;
common_error::define_opaque_error!(Error);
@@ -23,14 +25,30 @@ pub enum InnerError {
backtrace: Backtrace,
},
#[snafu(display("Missing column when insert, column : {}", name))]
#[snafu(display("Missing column when insert, column: {}", name))]
MissingColumn { name: String, backtrace: Backtrace },
#[snafu(display("Not expected to run ExecutionPlan more than once"))]
ExecuteRepeatedly { backtrace: Backtrace },
#[snafu(display("Poll stream failed, source: {}", source))]
PollStream {
source: ArrowError,
backtrace: Backtrace,
},
}
impl ErrorExt for InnerError {
fn status_code(&self) -> StatusCode {
match self {
InnerError::Datafusion { .. } | InnerError::PollStream { .. } => {
StatusCode::EngineExecuteQuery
}
InnerError::MissingColumn { .. } => StatusCode::InvalidArguments,
InnerError::ExecuteRepeatedly { .. } => StatusCode::Unexpected,
}
}
fn backtrace_opt(&self) -> Option<&Backtrace> {
ErrorCompat::backtrace(self)
}
@@ -52,6 +70,12 @@ impl From<InnerError> for DataFusionError {
}
}
impl From<InnerError> for RecordBatchError {
fn from(e: InnerError) -> RecordBatchError {
RecordBatchError::new(e)
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -64,14 +88,49 @@ mod tests {
ExecuteRepeatedlySnafu {}.fail()?
}
fn throw_missing_column_inner() -> std::result::Result<(), InnerError> {
MissingColumnSnafu { name: "test" }.fail()
}
fn throw_missing_column() -> Result<()> {
Ok(throw_missing_column_inner()?)
}
fn throw_arrow() -> Result<()> {
Err(ArrowError::Overflow).context(PollStreamSnafu)?
}
#[test]
fn test_error() {
let err = throw_df_error().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Unknown, err.status_code());
assert_eq!(StatusCode::EngineExecuteQuery, err.status_code());
let err = throw_repeatedly().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::Unknown, err.status_code());
assert_eq!(StatusCode::Unexpected, err.status_code());
let err = throw_missing_column().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::InvalidArguments, err.status_code());
let err = throw_arrow().err().unwrap();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::EngineExecuteQuery, err.status_code());
}
#[test]
fn test_into_record_batch_error() {
let err = throw_missing_column_inner().err().unwrap();
let err: RecordBatchError = err.into();
assert!(err.backtrace_opt().is_some());
assert_eq!(StatusCode::InvalidArguments, err.status_code());
}
#[test]
fn test_into_df_error() {
let err = throw_missing_column_inner().err().unwrap();
let err: DataFusionError = err.into();
assert!(matches!(err, DataFusionError::External(_)));
}
}

View File

@@ -7,7 +7,7 @@ use std::mem;
use std::sync::{Arc, Mutex};
use common_query::logical_plan::Expr;
use common_recordbatch::error::{self as recordbatch_error, Result as RecordBatchResult};
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
/// Datafusion table adpaters
@@ -297,7 +297,7 @@ impl Stream for RecordBatchStreamAdapter {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(df_recordbatch)) => Poll::Ready(Some(Ok(RecordBatch {
schema: self.schema(),
df_recordbatch: df_recordbatch.context(recordbatch_error::ArrowSnafu)?,
df_recordbatch: df_recordbatch.context(error::PollStreamSnafu)?,
}))),
Poll::Ready(None) => Poll::Ready(None),
}