// Copyright 2023 Greptime Team // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Error definition for flow module use std::any::Any; use api::v1::CreateTableExpr; use arrow_schema::ArrowError; use common_error::ext::BoxedError; use common_error::{define_into_tonic_status, from_err_code_msg_to_header}; use common_macro::stack_trace_debug; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::common_error::status_code::StatusCode; use snafu::{Location, ResultExt, Snafu}; use tonic::metadata::MetadataMap; use crate::expr::EvalError; use crate::FlowId; /// This error is used to represent all possible errors that can occur in the flow module. #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { #[snafu(display( "Failed to insert into flow: region_id={}, flow_ids={:?}", region_id, flow_ids ))] InsertIntoFlow { region_id: u64, flow_ids: Vec, source: BoxedError, #[snafu(implicit)] location: Location, }, #[snafu(display("Flow engine is still recovering"))] FlowNotRecovered { #[snafu(implicit)] location: Location, }, #[snafu(display("Error encountered while creating flow: {sql}"))] CreateFlow { sql: String, source: BoxedError, #[snafu(implicit)] location: Location, }, #[snafu(display("Error encountered while creating sink table for flow: {create:?}"))] CreateSinkTable { create: CreateTableExpr, source: BoxedError, #[snafu(implicit)] location: Location, }, #[snafu(display("Time error"))] Time { source: common_time::error::Error, #[snafu(implicit)] location: Location, }, #[snafu(display( "No available frontend found after timeout: {timeout:?}, context: {context}" ))] NoAvailableFrontend { timeout: std::time::Duration, context: String, #[snafu(implicit)] location: Location, }, #[snafu(display("External error"))] External { source: BoxedError, #[snafu(implicit)] location: Location, }, #[snafu(display("Internal error"))] Internal { reason: String, #[snafu(implicit)] location: Location, }, /// TODO(discord9): add detailed location of column #[snafu(display("Failed to eval stream"))] Eval { source: EvalError, #[snafu(implicit)] location: Location, }, #[snafu(display("Table not found: {name}"))] TableNotFound { name: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Table not found: {msg}, meta error: {source}"))] TableNotFoundMeta { source: common_meta::error::Error, msg: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Flow not found, id={id}"))] FlowNotFound { id: FlowId, #[snafu(implicit)] location: Location, }, #[snafu(display("Failed to list flows in flownode={id:?}"))] ListFlows { id: Option, source: common_meta::error::Error, #[snafu(implicit)] location: Location, }, #[snafu(display("Flow already exist, id={id}"))] FlowAlreadyExist { id: FlowId, #[snafu(implicit)] location: Location, }, #[snafu(display("Failed to join task"))] JoinTask { #[snafu(source)] error: tokio::task::JoinError, #[snafu(implicit)] location: Location, }, #[snafu(display("Invalid query: {reason}"))] InvalidQuery { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Not implement in flow: {reason}"))] NotImplemented { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Invalid auth config"))] IllegalAuthConfig { source: auth::error::Error }, #[snafu(display("Flow plan error: {reason}"))] Plan { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Unsupported: {reason}"))] Unsupported { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Unsupported temporal filter: {reason}"))] UnsupportedTemporalFilter { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Datatypes error: {source} with extra message: {extra}"))] Datatypes { source: datatypes::Error, extra: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Arrow error: {raw:?} in context: {context}"))] Arrow { #[snafu(source)] raw: ArrowError, context: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Datafusion error: {raw:?} in context: {context}"))] Datafusion { #[snafu(source)] raw: datafusion_common::DataFusionError, context: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Unexpected: {reason}"))] Unexpected { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display("Illegal check task state: {reason}"))] IllegalCheckTaskState { reason: String, #[snafu(implicit)] location: Location, }, #[snafu(display( "Failed to sync with check task for flow {} with allow_drop={}", flow_id, allow_drop ))] SyncCheckTask { flow_id: FlowId, allow_drop: bool, #[snafu(implicit)] location: Location, }, #[snafu(display("Failed to start server"))] StartServer { #[snafu(implicit)] location: Location, source: servers::error::Error, }, #[snafu(display("Failed to shutdown server"))] ShutdownServer { #[snafu(implicit)] location: Location, source: servers::error::Error, }, #[snafu(display("Failed to initialize meta client"))] MetaClientInit { #[snafu(implicit)] location: Location, source: meta_client::error::Error, }, #[snafu(display("Failed to parse address {}", addr))] ParseAddr { addr: String, #[snafu(source)] error: std::net::AddrParseError, }, #[snafu(display("Failed to get cache from cache registry: {}", name))] CacheRequired { #[snafu(implicit)] location: Location, name: String, }, #[snafu(display("Invalid request: {context}"))] InvalidRequest { context: String, source: client::Error, #[snafu(implicit)] location: Location, }, #[snafu(display("Failed to encode logical plan in substrait"))] SubstraitEncodeLogicalPlan { #[snafu(implicit)] location: Location, source: substrait::error::Error, }, #[snafu(display("Failed to convert column schema to proto column def"))] ConvertColumnSchema { #[snafu(implicit)] location: Location, source: operator::error::Error, }, #[snafu(display("Failed to create channel manager for gRPC client"))] InvalidClientConfig { #[snafu(implicit)] location: Location, source: common_grpc::error::Error, }, } /// the outer message is the full error stack, and inner message in header is the last error message that can be show directly to user pub fn to_status_with_last_err(err: impl ErrorExt) -> tonic::Status { let msg = err.to_string(); let last_err_msg = common_error::ext::StackError::last(&err).to_string(); let code = err.status_code() as u32; let header = from_err_code_msg_to_header(code, &last_err_msg); tonic::Status::with_metadata( tonic::Code::InvalidArgument, msg, MetadataMap::from_headers(header), ) } /// Result type for flow module pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { match self { Self::Eval { .. } | Self::JoinTask { .. } | Self::Datafusion { .. } | Self::InsertIntoFlow { .. } | Self::NoAvailableFrontend { .. } | Self::FlowNotRecovered { .. } => StatusCode::Internal, Self::FlowAlreadyExist { .. } => StatusCode::TableAlreadyExists, Self::TableNotFound { .. } | Self::TableNotFoundMeta { .. } | Self::ListFlows { .. } => StatusCode::TableNotFound, Self::FlowNotFound { .. } => StatusCode::FlowNotFound, Self::Plan { .. } | Self::Datatypes { .. } => StatusCode::PlanQuery, Self::CreateFlow { .. } | Self::CreateSinkTable { .. } | Self::Arrow { .. } | Self::Time { .. } => StatusCode::EngineExecuteQuery, Self::Unexpected { .. } | Self::SyncCheckTask { .. } | Self::IllegalCheckTaskState { .. } => StatusCode::Unexpected, Self::NotImplemented { .. } | Self::UnsupportedTemporalFilter { .. } | Self::Unsupported { .. } => StatusCode::Unsupported, Self::External { source, .. } => source.status_code(), Self::Internal { .. } | Self::CacheRequired { .. } => StatusCode::Internal, Self::StartServer { source, .. } | Self::ShutdownServer { source, .. } => { source.status_code() } Self::MetaClientInit { source, .. } => source.status_code(), Self::InvalidQuery { .. } | Self::InvalidRequest { .. } | Self::ParseAddr { .. } | Self::IllegalAuthConfig { .. } | Self::InvalidClientConfig { .. } => StatusCode::InvalidArguments, Error::SubstraitEncodeLogicalPlan { source, .. } => source.status_code(), Error::ConvertColumnSchema { source, .. } => source.status_code(), } } fn as_any(&self) -> &dyn Any { self } } define_into_tonic_status!(Error); impl From for Error { fn from(e: EvalError) -> Self { Err::<(), _>(e).context(EvalSnafu).unwrap_err() } }