feat: improve datafusion external error and mysql error (#4362)

* feat: improve datafusion external error and mysql error

* chore: address CR comments and fix tests

---------

Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
dennis zhuang
2024-07-16 00:01:09 -07:00
committed by GitHub
parent 7b28da277d
commit be3ea0fae7
20 changed files with 159 additions and 92 deletions

View File

@@ -18,6 +18,7 @@ use std::fmt::Debug;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
use datafusion::error::DataFusionError;
use snafu::{Location, Snafu};
@@ -282,9 +283,8 @@ impl ErrorExt for Error {
}
Error::QueryAccessDenied { .. } => StatusCode::AccessDenied,
Error::ProjectViewColumns { .. } | Error::Datafusion { .. } => {
StatusCode::EngineExecuteQuery
}
Error::Datafusion { error, .. } => datafusion_status_code::<Self>(error, None),
Error::ProjectViewColumns { .. } => StatusCode::EngineExecuteQuery,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::GetViewCache { source, .. } | Error::GetTableCache { source, .. } => {
source.status_code()
@@ -299,7 +299,7 @@ impl ErrorExt for Error {
impl From<Error> for DataFusionError {
fn from(e: Error) -> Self {
DataFusionError::Internal(e.to_string())
DataFusionError::External(Box::new(e))
}
}
@@ -338,7 +338,7 @@ mod tests {
}
.into();
match e {
DataFusionError::Internal(_) => {}
DataFusionError::External(_) => {}
_ => {
panic!("catalog error should be converted to DataFusionError::Internal")
}

View File

@@ -143,9 +143,7 @@ impl InformationTable for InformationSchemaTables {
.make_tables(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(|err| {
datafusion::error::DataFusionError::External(format!("{err:?}").into())
})
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
}),
));
Ok(Box::pin(

View File

@@ -124,9 +124,7 @@ impl InformationTable for InformationSchemaViews {
.make_views(Some(request))
.await
.map(|x| x.into_df_record_batch())
.map_err(|err| {
datafusion::error::DataFusionError::External(format!("{err:?}").into())
})
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))
}),
));
Ok(Box::pin(

View File

@@ -261,17 +261,20 @@ impl ErrorExt for Error {
match self {
Error::UdfTempRecordBatch { .. }
| Error::PyUdf { .. }
| Error::ExecuteFunction { .. }
| Error::GenerateFunction { .. }
| Error::CreateAccumulator { .. }
| Error::DowncastVector { .. }
| Error::InvalidInputState { .. }
| Error::InvalidInputCol { .. }
| Error::GenerateFunction { .. }
| Error::BadAccumulatorImpl { .. }
| Error::ToScalarValue { .. }
| Error::GetScalarVector { .. }
| Error::ArrowCompute { .. } => StatusCode::EngineExecuteQuery,
Error::ExecuteFunction { error, .. } | Error::GeneralDataFusion { error, .. } => {
datafusion_status_code::<Self>(error, None)
}
Error::InvalidInputType { source, .. }
| Error::IntoVector { source, .. }
| Error::FromScalarValue { source, .. }
@@ -281,8 +284,7 @@ impl ErrorExt for Error {
Error::MissingTableMutationHandler { .. }
| Error::MissingProcedureServiceHandler { .. }
| Error::ExecuteRepeatedly { .. }
| Error::ThreadJoin { .. }
| Error::GeneralDataFusion { .. } => StatusCode::Unexpected,
| Error::ThreadJoin { .. } => StatusCode::Unexpected,
Error::UnsupportedInputDataType { .. }
| Error::TypeCast { .. }
@@ -310,3 +312,24 @@ impl From<Error> for DataFusionError {
DataFusionError::External(Box::new(e))
}
}
/// Try to get the proper [`StatusCode`] of [`DataFusionError].
pub fn datafusion_status_code<T: ErrorExt + 'static>(
e: &DataFusionError,
default_status: Option<StatusCode>,
) -> StatusCode {
match e {
DataFusionError::Internal(_) => StatusCode::Internal,
DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
DataFusionError::ResourcesExhausted(_) => StatusCode::RuntimeResourcesExhausted,
DataFusionError::Plan(_) => StatusCode::PlanQuery,
DataFusionError::External(e) => {
if let Some(ext) = (*e).downcast_ref::<T>() {
ext.status_code()
} else {
default_status.unwrap_or(StatusCode::EngineExecuteQuery)
}
}
_ => default_status.unwrap_or(StatusCode::EngineExecuteQuery),
}
}

View File

@@ -26,12 +26,11 @@ use datafusion::execution::context::SessionState;
use datafusion::sql::planner::ContextProvider;
use datafusion::variable::VarType;
use datafusion_common::config::ConfigOptions;
use datafusion_common::DataFusionError;
use datafusion_expr::var_provider::is_system_variables;
use datafusion_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use datafusion_sql::parser::Statement as DfStatement;
use session::context::QueryContextRef;
use snafu::ResultExt;
use snafu::{Location, ResultExt};
use crate::error::{CatalogSnafu, DataFusionSnafu, Result};
use crate::query_engine::{DefaultPlanDecoder, QueryEngineState};
@@ -119,7 +118,13 @@ impl ContextProvider for DfContextProviderAdapter {
self.tables
.get(&table_ref.to_string())
.cloned()
.ok_or_else(|| DataFusionError::Plan(format!("Table not found: {}", table_ref)))
.ok_or_else(|| {
crate::error::Error::TableNotFound {
table: table_ref.to_string(),
location: Location::default(),
}
.into()
})
}
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {

View File

@@ -18,6 +18,7 @@ use std::time::Duration;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
use datafusion::error::DataFusionError;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::Value;
@@ -333,7 +334,6 @@ impl ErrorExt for Error {
}
UnsupportedExpr { .. }
| Unimplemented { .. }
| TableNotFound { .. }
| UnknownTable { .. }
| TimeIndexNotFound { .. }
| ParseTimestamp { .. }
@@ -348,6 +348,8 @@ impl ErrorExt for Error {
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,
TableNotFound { .. } => StatusCode::TableNotFound,
ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(),
QueryAccessDenied { .. } => StatusCode::AccessDenied,
@@ -355,16 +357,15 @@ impl ErrorExt for Error {
ConvertDatafusionSchema { source, .. } => source.status_code(),
CreateRecordBatch { source, .. } => source.status_code(),
QueryExecution { source, .. } | QueryPlan { source, .. } => source.status_code(),
DataFusion { error, .. } => match error {
DataFusionError::Internal(_) => StatusCode::Internal,
DataFusionError::NotImplemented(_) => StatusCode::Unsupported,
DataFusionError::ResourcesExhausted(_) => StatusCode::RuntimeResourcesExhausted,
DataFusionError::Plan(_) => StatusCode::PlanQuery,
_ => StatusCode::EngineExecuteQuery,
},
PlanSql { error, .. } => {
datafusion_status_code::<Self>(error, Some(StatusCode::PlanQuery))
}
DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
MissingTimestampColumn { .. } => StatusCode::EngineExecuteQuery,
Sql { source, .. } => source.status_code(),
PlanSql { .. } => StatusCode::PlanQuery,
ConvertSqlType { source, .. } | ConvertSqlValue { source, .. } => source.status_code(),
RegionQuery { source, .. } => source.status_code(),

View File

@@ -49,7 +49,7 @@ use crate::mysql::helper::{
self, format_placeholder, replace_placeholders, transform_placeholders,
};
use crate::mysql::writer;
use crate::mysql::writer::create_mysql_column;
use crate::mysql::writer::{create_mysql_column, handle_err};
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::SqlPlan;
@@ -332,18 +332,13 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
let plan = match replace_params_with_values(&plan, param_types, &params) {
Ok(plan) => plan,
Err(e) => {
if e.status_code().should_log_error() {
error!(e; "params: {}", params
.iter()
.map(|x| format!("({:?}, {:?})", x.value, x.coltype))
.join(", "));
}
let (kind, err) = handle_err(e);
debug!(
"Failed to replace params on execute, kind: {:?}, err: {}",
kind, err
);
w.error(kind, err.as_bytes()).await?;
w.error(
ErrorKind::ER_TRUNCATED_WRONG_VALUE,
e.output_msg().as_bytes(),
)
.await?;
return Ok(());
}
};
@@ -454,19 +449,13 @@ impl<W: AsyncWrite + Send + Sync + Unpin> AsyncMysqlShim<W> for MysqlInstanceShi
{
Ok(plan) => plan,
Err(e) => {
if e.status_code().should_log_error() {
error!(e; "params: {}", params
.iter()
.map(|x| format!("({:?})", x))
.join(", "));
}
let (kind, err) = handle_err(e);
debug!(
"Failed to replace params on query, kind: {:?}, err: {}",
kind, err
);
writer.error(kind, err.as_bytes()).await?;
writer
.error(
ErrorKind::ER_TRUNCATED_WRONG_VALUE,
e.output_msg().as_bytes(),
)
.await?;
return Ok(());
}
};

View File

@@ -15,6 +15,7 @@
use std::ops::Deref;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::{debug, error};
@@ -51,6 +52,26 @@ pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
Ok(())
}
/// Handle GreptimeDB error, convert it to MySQL error
pub fn handle_err(e: impl ErrorExt) -> (ErrorKind, String) {
let status_code = e.status_code();
let kind = mysql_error_kind(&status_code);
if status_code.should_log_error() {
error!(e; "Failed to handle mysql query, code: {}, kind: {:?}", status_code, kind);
} else {
debug!(
"Failed to handle mysql query, code: {}, kind: {:?}, error: {:?}",
status_code, kind, e
);
};
let msg = e.output_msg();
// Inline the status code to output message for MySQL
let err_msg = format!("({status_code}): {msg}");
(kind, err_msg)
}
struct QueryResult {
schema: SchemaRef,
stream: SendableRecordBatchStream,
@@ -148,15 +169,9 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
.await?
}
Err(e) => {
if e.status_code().should_log_error() {
error!(e; "Failed to handle mysql query");
} else {
debug!("Failed to handle mysql query, error: {e:?}");
}
let err = e.output_msg();
row_writer
.finish_error(ErrorKind::ER_INTERNAL_ERROR, &err.as_bytes())
.await?;
let (kind, err) = handle_err(e);
debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
row_writer.finish_error(kind, &err.as_bytes()).await?;
return Ok(());
}
@@ -224,15 +239,9 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
.with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
.inc();
if error.status_code().should_log_error() {
error!(error; "Failed to handle mysql query");
} else {
debug!("Failed to handle mysql query, error: {error:?}");
}
let kind = ErrorKind::ER_INTERNAL_ERROR;
let error = error.output_msg();
w.error(kind, error.as_bytes()).await?;
let (kind, err) = handle_err(error);
debug!("Write query error, kind: {:?}, err: {}", kind, err);
w.error(kind, err.as_bytes()).await?;
Ok(())
}
}
@@ -298,3 +307,46 @@ pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
.map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
.collect()
}
fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
match status_code {
StatusCode::Success => ErrorKind::ER_YES,
StatusCode::Unknown => ErrorKind::ER_UNKNOWN_ERROR,
StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
StatusCode::Cancelled => ErrorKind::ER_QUERY_INTERRUPTED,
StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
ErrorKind::ER_TABLE_EXISTS_ERROR
}
StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
StatusCode::PermissionDenied | StatusCode::AccessDenied => {
ErrorKind::ER_ACCESS_DENIED_ERROR
}
StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
}
StatusCode::Unexpected
| StatusCode::Internal
| StatusCode::IllegalState
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery
| StatusCode::RegionNotReady
| StatusCode::RegionBusy
| StatusCode::TableUnavailable
| StatusCode::StorageUnavailable
| StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
}
}

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_query::error::datafusion_status_code;
use datafusion::error::DataFusionError;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu};
@@ -141,9 +142,10 @@ pub enum Error {
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
match self {
Error::Datafusion { .. }
| Error::SchemaConversion { .. }
| Error::TableProjection { .. } => StatusCode::EngineExecuteQuery,
Error::Datafusion { error, .. } => datafusion_status_code::<Self>(error, None),
Error::SchemaConversion { .. } | Error::TableProjection { .. } => {
StatusCode::EngineExecuteQuery
}
Error::RemoveColumnInIndex { .. }
| Error::BuildColumnDescriptor { .. }
| Error::InvalidAlterRequest { .. } => StatusCode::InvalidArguments,

View File

@@ -264,7 +264,7 @@ pub async fn test_sql_api(store_type: StorageType) {
let body = serde_json::from_str::<ErrorResponse>(&res.text().await).unwrap();
assert!(body.error().contains("Table not found"));
assert_eq!(body.code(), ErrorCode::PlanQuery as u32);
assert_eq!(body.code(), ErrorCode::TableNotFound as u32);
// test database given
let res = client

View File

@@ -213,15 +213,14 @@ pub async fn test_mysql_crud(store_type: StorageType) {
.fetch_all(&pool)
.await;
assert!(query_re.is_err());
let err = query_re.unwrap_err();
common_telemetry::info!("Error is {}", err);
assert_eq!(
query_re
.err()
.unwrap()
.into_database_error()
err.into_database_error()
.unwrap()
.downcast::<MySqlDatabaseError>()
.code(),
Some("22007")
.number(),
1210,
);
let _ = sqlx::query("delete from demo")

View File

@@ -39,7 +39,7 @@ Error: 4001(TableNotFound), Table not found: t
SELECT * FROM t;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.public.t
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.t
CREATE TABLE t(i INTEGER, j TIMESTAMP TIME INDEX);

View File

@@ -132,7 +132,7 @@ Affected Rows: 0
SELECT * FROM test_public_schema.hello;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.test_public_schema.hello
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.test_public_schema.hello
USE public;

View File

@@ -16,7 +16,7 @@ Affected Rows: 2
select * from system_Metric;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.upper_case_table_name.system_metric
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.upper_case_table_name.system_metric
select * from "system_Metric";

View File

@@ -62,7 +62,7 @@ Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: WITH query na
-- reference to CTE before its actually defined, it's not supported by datafusion
with cte3 as (select ref2.j as i from cte1 as ref2), cte1 as (Select i as j from a), cte2 as (select ref.j+1 as k from cte1 as ref) select * from cte2 union all select * FROM cte3;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.public.cte1
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.cte1
with cte1 as (Select i as j from a) select * from cte1 cte11, cte1 cte12;
@@ -109,7 +109,7 @@ WITH RECURSIVE cte(d) AS (
)
SELECT max(d) FROM cte;
Error: 3000(PlanQuery), Failed to plan SQL: This feature is not implemented: Recursive CTE is not implemented
Error: 1001(Unsupported), Failed to plan SQL: This feature is not implemented: Recursive CTE is not implemented
-- Nested aliases is not supported in datafusion
with cte (a) as (

View File

@@ -69,7 +69,7 @@ Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: WITH query na
-- self-refer to non-existent cte-
with cte as (select * from cte) select * from cte;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.public.cte
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.cte
drop table a;

View File

@@ -84,7 +84,7 @@ SELECT date_format('2023-12-06 07:39:46.222'::TIMESTAMP_S, '%Y-%m-%d %H:%M:%S:%3
--- datetime not supported yet ---
SELECT date_format('2023-12-06 07:39:46.222'::DATETIME, '%Y-%m-%d %H:%M:%S:%3f');
Error: 3000(PlanQuery), Failed to plan SQL: This feature is not implemented: Unsupported SQL type Datetime(None)
Error: 1001(Unsupported), Failed to plan SQL: This feature is not implemented: Unsupported SQL type Datetime(None)
SELECT date_format('2023-12-06'::DATE, '%m-%d');

View File

@@ -49,15 +49,15 @@ SELECT INTERVAL '1 year 2 months 3 days 4 hours' - INTERVAL '1 year';
SELECT INTERVAL '6 years' * 2;
Error: 3000(PlanQuery), Failed to plan SQL: This feature is not implemented: Unsupported interval operator: Multiply
Error: 1001(Unsupported), Failed to plan SQL: This feature is not implemented: Unsupported interval operator: Multiply
SELECT INTERVAL '6 years' / 2;
Error: 3000(PlanQuery), Failed to plan SQL: This feature is not implemented: Unsupported interval operator: Divide
Error: 1001(Unsupported), Failed to plan SQL: This feature is not implemented: Unsupported interval operator: Divide
SELECT INTERVAL '6 years' = INTERVAL '72 months';
Error: 3000(PlanQuery), Failed to plan SQL: This feature is not implemented: Unsupported interval operator: Eq
Error: 1001(Unsupported), Failed to plan SQL: This feature is not implemented: Unsupported interval operator: Eq
SELECT arrow_typeof(INTERVAL '1 month');

View File

@@ -158,7 +158,7 @@ Affected Rows: 0
SELECT * FROM test_view LIMIT 10;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.public.test_view
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.test_view
SHOW TABLES;

View File

@@ -50,7 +50,7 @@ Error: 1004(InvalidArguments), Invalid SQL, error: column count mismatch, column
CREATE VIEW v1 AS SELECT * FROM dontexist;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.public.dontexist
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.dontexist
SHOW VIEWS;
@@ -66,7 +66,7 @@ Affected Rows: 0
SELECT * FROM v1;
Error: 3000(PlanQuery), Failed to plan SQL: Error during planning: Table not found: greptime.public.v1
Error: 4001(TableNotFound), Failed to plan SQL: Table not found: greptime.public.v1
--- view not exists ---
DROP VIEW v2;