diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index ba7240b9d5..b26ef7e63f 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -13,7 +13,8 @@ // limitations under the License. use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline, Result}; +use pipeline::error::Result; +use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline}; use serde_json::{Deserializer, Value}; fn processor_mut( diff --git a/src/pipeline/src/dispatcher.rs b/src/pipeline/src/dispatcher.rs index 909b1afa42..37b3469a25 100644 --- a/src/pipeline/src/dispatcher.rs +++ b/src/pipeline/src/dispatcher.rs @@ -16,7 +16,7 @@ use common_telemetry::debug; use snafu::OptionExt; use yaml_rust::Yaml; -use crate::etl::error::{ +use crate::error::{ Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu, ValueRequiredForDispatcherRuleSnafu, }; diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/error.rs similarity index 71% rename from src/pipeline/src/etl/error.rs rename to src/pipeline/src/error.rs index 8365ad6ffb..ac653daacd 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/error.rs @@ -17,6 +17,7 @@ use std::any::Any; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use datatypes::timestamp::TimestampNanosecond; use snafu::{Location, Snafu}; #[derive(Snafu)] @@ -51,7 +52,7 @@ pub enum Error { #[snafu(display("Processor {processor}: expect string value, but got {v:?}"))] ProcessorExpectString { processor: String, - v: crate::etl::Value, + v: crate::Value, #[snafu(implicit)] location: Location, }, @@ -607,13 +608,197 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Pipeline table not found"))] + PipelineTableNotFound { + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to insert pipeline to pipelines table"))] + InsertPipeline { + #[snafu(source)] + source: operator::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))] + PipelineNotFound { + name: String, + version: Option, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to collect record batch"))] + CollectRecords { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to cast type, msg: {}", msg))] + CastType { + msg: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to build DataFusion logical plan"))] + BuildDfLogicalPlan { + #[snafu(source)] + error: datafusion_common::DataFusionError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to execute internal statement"))] + ExecuteInternalStatement { + #[snafu(source)] + source: query::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to create dataframe"))] + DataFrame { + #[snafu(source)] + source: query::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("General catalog error"))] + Catalog { + #[snafu(source)] + source: catalog::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to create table"))] + CreateTable { + #[snafu(source)] + source: operator::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Invalid pipeline version format: {}", version))] + InvalidPipelineVersion { + version: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; impl ErrorExt for Error { fn status_code(&self) -> StatusCode { - StatusCode::InvalidArguments + use Error::*; + match self { + CastType { .. } => StatusCode::Unexpected, + PipelineTableNotFound { .. } => StatusCode::TableNotFound, + InsertPipeline { source, .. } => source.status_code(), + CollectRecords { source, .. } => source.status_code(), + PipelineNotFound { .. } | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, + BuildDfLogicalPlan { .. } => StatusCode::Internal, + ExecuteInternalStatement { source, .. } => source.status_code(), + DataFrame { source, .. } => source.status_code(), + Catalog { source, .. } => source.status_code(), + CreateTable { source, .. } => source.status_code(), + + EmptyInputField { .. } + | MissingInputField { .. } + | ProcessorMustBeMap { .. } + | ProcessorMissingField { .. } + | ProcessorExpectString { .. } + | ProcessorUnsupportedValue { .. } + | ProcessorKeyMustBeString { .. } + | ProcessorFailedToParseString { .. } + | ProcessorMustHaveStringKey { .. } + | UnsupportedProcessor { .. } + | FieldMustBeType { .. } + | FailedParseFieldFromString { .. } + | FailedToParseIntKey { .. } + | FailedToParseInt { .. } + | FailedToParseFloatKey { .. } + | IntermediateKeyIndex { .. } + | CmcdMissingValue { .. } + | CmcdMissingKey { .. } + | KeyMustBeString { .. } + | CsvRead { .. } + | CsvNoRecord { .. } + | CsvSeparatorName { .. } + | CsvQuoteName { .. } + | DateParseTimezone { .. } + | DateParse { .. } + | DateFailedToGetLocalTimezone { .. } + | DateFailedToGetTimestamp { .. } + | DateInvalidFormat { .. } + | DissectInvalidPattern { .. } + | DissectEmptyPattern { .. } + | DissectSplitExceedsInput { .. } + | DissectSplitNotMatchInput { .. } + | DissectConsecutiveNames { .. } + | DissectNoMatchingPattern { .. } + | DissectModifierAlreadySet { .. } + | DissectAppendOrderAlreadySet { .. } + | DissectOrderOnlyAppend { .. } + | DissectOrderOnlyAppendModifier { .. } + | DissectEndModifierAlreadySet { .. } + | EpochInvalidResolution { .. } + | GsubPatternRequired { .. } + | GsubReplacementRequired { .. } + | Regex { .. } + | JoinSeparatorRequired { .. } + | LetterInvalidMethod { .. } + | RegexNamedGroupNotFound { .. } + | RegexNoValidField { .. } + | RegexNoValidPattern { .. } + | UrlEncodingInvalidMethod { .. } + | DigestPatternInvalid { .. } + | UrlEncodingDecode { .. } + | TransformOnFailureInvalidValue { .. } + | TransformElementMustBeMap { .. } + | TransformTypeMustBeSet { .. } + | TransformEmpty { .. } + | TransformColumnNameMustBeUnique { .. } + | TransformMultipleTimestampIndex { .. } + | TransformTimestampIndexCount { .. } + | CoerceUnsupportedNullType { .. } + | CoerceUnsupportedNullTypeTo { .. } + | CoerceUnsupportedEpochType { .. } + | CoerceStringToType { .. } + | CoerceJsonTypeTo { .. } + | CoerceTypeToJson { .. } + | CoerceIncompatibleTypes { .. } + | ValueInvalidResolution { .. } + | ValueParseType { .. } + | ValueParseInt { .. } + | ValueParseFloat { .. } + | ValueParseBoolean { .. } + | ValueDefaultValueUnsupported { .. } + | ValueUnsupportedNumberType { .. } + | ValueUnsupportedYamlType { .. } + | ValueYamlKeyMustBeString { .. } + | YamlLoad { .. } + | YamlParse { .. } + | PrepareValueMustBeObject { .. } + | ColumnOptions { .. } + | UnsupportedIndexType { .. } + | UnsupportedNumberType { .. } + | IdentifyPipelineColumnTypeMismatch { .. } + | JsonPathParse { .. } + | JsonPathParseResultIndex { .. } + | FieldRequiredForDispatcher + | TableSuffixRequiredForDispatcherRule + | ValueRequiredForDispatcherRule + | ReachedMaxNestedLevels { .. } => StatusCode::InvalidArguments, + } } fn as_any(&self) -> &dyn Any { diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 5493dbbdf4..42af8af22e 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -13,16 +13,11 @@ // limitations under the License. #![allow(dead_code)] - -pub mod error; pub mod field; pub mod processor; pub mod transform; pub mod value; -use error::{ - IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu, -}; use processor::{Processor, Processors}; use snafu::{ensure, OptionExt, ResultExt}; use transform::{Transformer, Transforms}; @@ -30,7 +25,9 @@ use value::Value; use yaml_rust::YamlLoader; use crate::dispatcher::{Dispatcher, Rule}; -use crate::etl::error::Result; +use crate::error::{ + IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu, +}; const DESCRIPTION: &str = "description"; const PROCESSORS: &str = "processors"; diff --git a/src/pipeline/src/etl/field.rs b/src/pipeline/src/etl/field.rs index dd4835ec92..46bc1b770c 100644 --- a/src/pipeline/src/etl/field.rs +++ b/src/pipeline/src/etl/field.rs @@ -17,8 +17,7 @@ use std::str::FromStr; use snafu::OptionExt; -use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu}; -use crate::etl::error::{Error, Result}; +use crate::error::{EmptyInputFieldSnafu, Error, MissingInputFieldSnafu, Result}; /// Raw processor-defined inputs and outputs #[derive(Debug, Default, Clone)] diff --git a/src/pipeline/src/etl/processor.rs b/src/pipeline/src/etl/processor.rs index eeb5a30e9c..6753d0ee8a 100644 --- a/src/pipeline/src/etl/processor.rs +++ b/src/pipeline/src/etl/processor.rs @@ -45,15 +45,13 @@ use snafu::{OptionExt, ResultExt}; use timestamp::TimestampProcessor; use urlencoding::UrlEncodingProcessor; -use super::error::{ - FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu, - ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, -}; use super::field::{Field, Fields}; use super::PipelineMap; -use crate::etl::error::{Error, Result}; +use crate::error::{ + Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu, + ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, Result, UnsupportedProcessorSnafu, +}; use crate::etl::processor::simple_extract::SimpleExtractProcessor; -use crate::etl_error::UnsupportedProcessorSnafu; const FIELD_NAME: &str = "field"; const FIELDS_NAME: &str = "fields"; diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index 18c6e71998..57e0695098 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -19,7 +19,7 @@ use snafu::{OptionExt, ResultExt}; use urlencoding::decode; -use crate::etl::error::{ +use crate::error::{ CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu, FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, diff --git a/src/pipeline/src/etl/processor/csv.rs b/src/pipeline/src/etl/processor/csv.rs index 2fe130c600..bef4655033 100644 --- a/src/pipeline/src/etl/processor/csv.rs +++ b/src/pipeline/src/etl/processor/csv.rs @@ -19,7 +19,7 @@ use itertools::EitherOrBoth::{Both, Left, Right}; use itertools::Itertools; use snafu::{OptionExt, ResultExt}; -use crate::etl::error::{ +use crate::error::{ CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; diff --git a/src/pipeline/src/etl/processor/date.rs b/src/pipeline/src/etl/processor/date.rs index 0af0424423..1ba4dcd605 100644 --- a/src/pipeline/src/etl/processor/date.rs +++ b/src/pipeline/src/etl/processor/date.rs @@ -19,7 +19,7 @@ use chrono_tz::Tz; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt}; -use crate::etl::error::{ +use crate::error::{ DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu, DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result, diff --git a/src/pipeline/src/etl/processor/decolorize.rs b/src/pipeline/src/etl/processor/decolorize.rs index fa70f4a288..69053c8743 100644 --- a/src/pipeline/src/etl/processor/decolorize.rs +++ b/src/pipeline/src/etl/processor/decolorize.rs @@ -22,7 +22,7 @@ use once_cell::sync::Lazy; use regex::Regex; use snafu::OptionExt; -use crate::etl::error::{ +use crate::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; use crate::etl::field::Fields; diff --git a/src/pipeline/src/etl/processor/digest.rs b/src/pipeline/src/etl/processor/digest.rs index b93af08d3c..334bfd6e63 100644 --- a/src/pipeline/src/etl/processor/digest.rs +++ b/src/pipeline/src/etl/processor/digest.rs @@ -24,8 +24,9 @@ use std::borrow::Cow; use regex::Regex; use snafu::OptionExt; -use crate::etl::error::{ - Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, +use crate::error::{ + DigestPatternInvalidSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, + ProcessorMissingFieldSnafu, Result, }; use crate::etl::field::Fields; use crate::etl::processor::{ @@ -33,7 +34,6 @@ use crate::etl::processor::{ }; use crate::etl::value::Value; use crate::etl::PipelineMap; -use crate::etl_error::DigestPatternInvalidSnafu; pub(crate) const PROCESSOR_DIGEST: &str = "digest"; diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index 2a41d75923..8c31f42ace 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -18,7 +18,7 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; use itertools::Itertools; use snafu::OptionExt; -use crate::etl::error::{ +use crate::error::{ DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu, DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu, DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu, diff --git a/src/pipeline/src/etl/processor/epoch.rs b/src/pipeline/src/etl/processor/epoch.rs index da638def9b..32124f92f4 100644 --- a/src/pipeline/src/etl/processor/epoch.rs +++ b/src/pipeline/src/etl/processor/epoch.rs @@ -14,7 +14,7 @@ use snafu::{OptionExt, ResultExt}; -use crate::etl::error::{ +use crate::error::{ EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result, }; diff --git a/src/pipeline/src/etl/processor/gsub.rs b/src/pipeline/src/etl/processor/gsub.rs index 8950b418df..519a6d5f8c 100644 --- a/src/pipeline/src/etl/processor/gsub.rs +++ b/src/pipeline/src/etl/processor/gsub.rs @@ -15,7 +15,7 @@ use regex::Regex; use snafu::{OptionExt, ResultExt}; -use crate::etl::error::{ +use crate::error::{ Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result, }; diff --git a/src/pipeline/src/etl/processor/join.rs b/src/pipeline/src/etl/processor/join.rs index 64cf6d425b..e70007ca95 100644 --- a/src/pipeline/src/etl/processor/join.rs +++ b/src/pipeline/src/etl/processor/join.rs @@ -14,7 +14,7 @@ use snafu::OptionExt; -use crate::etl::error::{ +use crate::error::{ Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; diff --git a/src/pipeline/src/etl/processor/json_path.rs b/src/pipeline/src/etl/processor/json_path.rs index 6b0e97f448..436408f9a3 100644 --- a/src/pipeline/src/etl/processor/json_path.rs +++ b/src/pipeline/src/etl/processor/json_path.rs @@ -19,12 +19,11 @@ use super::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME, }; -use crate::etl::error::{Error, Result}; -use crate::etl::field::Fields; -use crate::etl_error::{ - JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu, - ProcessorMissingFieldSnafu, +use crate::error::{ + Error, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu, + ProcessorMissingFieldSnafu, Result, }; +use crate::etl::field::Fields; use crate::Value; pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path"; diff --git a/src/pipeline/src/etl/processor/letter.rs b/src/pipeline/src/etl/processor/letter.rs index 1d4d248b87..7e439b59e3 100644 --- a/src/pipeline/src/etl/processor/letter.rs +++ b/src/pipeline/src/etl/processor/letter.rs @@ -14,7 +14,7 @@ use snafu::OptionExt; -use crate::etl::error::{ +use crate::error::{ Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, }; diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index a08b944725..d31fe5e189 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -22,7 +22,7 @@ use lazy_static::lazy_static; use regex::Regex; use snafu::{OptionExt, ResultExt}; -use crate::etl::error::{ +use crate::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu, Result, diff --git a/src/pipeline/src/etl/processor/simple_extract.rs b/src/pipeline/src/etl/processor/simple_extract.rs index 6054120c1b..ebac88f937 100644 --- a/src/pipeline/src/etl/processor/simple_extract.rs +++ b/src/pipeline/src/etl/processor/simple_extract.rs @@ -14,13 +14,12 @@ use snafu::OptionExt as _; -use crate::etl::error::{Error, Result}; +use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result}; use crate::etl::field::Fields; use crate::etl::processor::{ yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME, IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME, }; -use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu}; use crate::{PipelineMap, Processor, Value}; pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract"; diff --git a/src/pipeline/src/etl/processor/timestamp.rs b/src/pipeline/src/etl/processor/timestamp.rs index a4d215ed49..6d4c228025 100644 --- a/src/pipeline/src/etl/processor/timestamp.rs +++ b/src/pipeline/src/etl/processor/timestamp.rs @@ -19,7 +19,7 @@ use chrono_tz::Tz; use lazy_static::lazy_static; use snafu::{OptionExt, ResultExt}; -use crate::etl::error::{ +use crate::error::{ DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu, DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error, KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, diff --git a/src/pipeline/src/etl/processor/urlencoding.rs b/src/pipeline/src/etl/processor/urlencoding.rs index 33d3f521a1..95209bce5a 100644 --- a/src/pipeline/src/etl/processor/urlencoding.rs +++ b/src/pipeline/src/etl/processor/urlencoding.rs @@ -15,7 +15,7 @@ use snafu::{OptionExt, ResultExt}; use urlencoding::{decode, encode}; -use crate::etl::error::{ +use crate::error::{ Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result, UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu, }; diff --git a/src/pipeline/src/etl/transform.rs b/src/pipeline/src/etl/transform.rs index 27afeaa7de..076fd89acf 100644 --- a/src/pipeline/src/etl/transform.rs +++ b/src/pipeline/src/etl/transform.rs @@ -17,7 +17,14 @@ pub mod transformer; use snafu::OptionExt; -use crate::etl::error::{Error, Result}; +use super::field::Fields; +use super::processor::{yaml_new_field, yaml_new_fields, yaml_string}; +use super::value::Timestamp; +use super::PipelineMap; +use crate::error::{ + Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu, + TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu, +}; use crate::etl::processor::yaml_bool; use crate::etl::transform::index::Index; use crate::etl::value::Value; @@ -32,15 +39,6 @@ const TRANSFORM_ON_FAILURE: &str = "on_failure"; pub use transformer::greptime::GreptimeTransformer; -use super::error::{ - KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu, - TransformTypeMustBeSetSnafu, -}; -use super::field::Fields; -use super::processor::{yaml_new_field, yaml_new_fields, yaml_string}; -use super::value::Timestamp; -use super::PipelineMap; - pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static { type Output; type VecOutput; diff --git a/src/pipeline/src/etl/transform/index.rs b/src/pipeline/src/etl/transform/index.rs index 0300c56838..53e16dca22 100644 --- a/src/pipeline/src/etl/transform/index.rs +++ b/src/pipeline/src/etl/transform/index.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::etl::error::{Error, Result, UnsupportedIndexTypeSnafu}; +use crate::error::{Error, Result, UnsupportedIndexTypeSnafu}; const INDEX_TIMESTAMP: &str = "timestamp"; const INDEX_TIMEINDEX: &str = "time"; diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 7b4dab958a..008964dc79 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -27,7 +27,7 @@ use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; use serde_json::Number; -use crate::etl::error::{ +use crate::error::{ IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index df6ab2fff9..71c83dc477 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; -use crate::etl::error::{ +use crate::error::{ CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu, CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index cfe774f8bf..ece852056a 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -28,13 +28,12 @@ use regex::Regex; use snafu::{OptionExt, ResultExt}; pub use time::Timestamp; -use super::error::{ - ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, ValueParseBooleanSnafu, - ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu, - ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu, -}; use super::PipelineMap; -use crate::etl::error::{Error, Result}; +use crate::error::{ + Error, Result, ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, + ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, + ValueUnsupportedNumberTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu, +}; /// Value can be used as type /// acts as value: the enclosed value is the actual value diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 47ba67561a..a471ce721e 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -13,22 +13,22 @@ // limitations under the License. mod dispatcher; +pub mod error; mod etl; mod manager; mod metrics; -pub use etl::error::Result; pub use etl::processor::Processor; pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo}; pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; pub use etl::{ - error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse, - Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap, + json_array_to_intermediate_state, json_to_intermediate_state, parse, Content, DispatchedTo, + Pipeline, PipelineExecOutput, PipelineMap, }; pub use manager::{ - error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef, + pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME, }; diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index 77d0f9b9f6..479f5fb340 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -19,10 +19,10 @@ use datatypes::timestamp::TimestampNanosecond; use itertools::Itertools; use util::to_pipeline_version; +use crate::error::Result; use crate::table::PipelineTable; use crate::{GreptimeTransformer, Pipeline}; -pub mod error; pub mod pipeline_operator; pub mod table; pub mod util; @@ -99,7 +99,7 @@ impl PipelineWay { name: Option<&str>, version: Option<&str>, default_pipeline: PipelineWay, - ) -> error::Result { + ) -> Result { if let Some(pipeline_name) = name { if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME { Ok(PipelineWay::OtlpTraceDirectV1) diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs deleted file mode 100644 index a51ad61dac..0000000000 --- a/src/pipeline/src/manager/error.rs +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; - -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; -use common_macro::stack_trace_debug; -use datatypes::timestamp::TimestampNanosecond; -use snafu::{Location, Snafu}; - -#[derive(Snafu)] -#[snafu(visibility(pub))] -#[stack_trace_debug] -pub enum Error { - #[snafu(display("Pipeline table not found"))] - PipelineTableNotFound { - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to insert pipeline to pipelines table"))] - InsertPipeline { - #[snafu(source)] - source: operator::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to parse pipeline"))] - CompilePipeline { - #[snafu(source)] - source: crate::etl::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))] - PipelineNotFound { - name: String, - version: Option, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to collect record batch"))] - CollectRecords { - #[snafu(implicit)] - location: Location, - #[snafu(source)] - source: common_recordbatch::error::Error, - }, - - #[snafu(display("Failed to cast type, msg: {}", msg))] - CastType { - msg: String, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to build DataFusion logical plan"))] - BuildDfLogicalPlan { - #[snafu(source)] - error: datafusion_common::DataFusionError, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to execute internal statement"))] - ExecuteInternalStatement { - #[snafu(source)] - source: query::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to create dataframe"))] - DataFrame { - #[snafu(source)] - source: query::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("General catalog error"))] - Catalog { - #[snafu(source)] - source: catalog::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to create table"))] - CreateTable { - #[snafu(source)] - source: operator::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Failed to execute pipeline"))] - PipelineTransform { - #[snafu(source)] - source: crate::etl::error::Error, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Invalid pipeline version format: {}", version))] - InvalidPipelineVersion { - version: String, - #[snafu(implicit)] - location: Location, - }, -} - -pub type Result = std::result::Result; - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - use Error::*; - match self { - CastType { .. } => StatusCode::Unexpected, - PipelineTableNotFound { .. } => StatusCode::TableNotFound, - InsertPipeline { source, .. } => source.status_code(), - CollectRecords { source, .. } => source.status_code(), - PipelineNotFound { .. } - | CompilePipeline { .. } - | PipelineTransform { .. } - | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, - BuildDfLogicalPlan { .. } => StatusCode::Internal, - ExecuteInternalStatement { source, .. } => source.status_code(), - DataFrame { source, .. } => source.status_code(), - Catalog { source, .. } => source.status_code(), - CreateTable { source, .. } => source.status_code(), - } - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index c2a36c63ec..94d9a199c0 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -41,9 +41,9 @@ use table::metadata::TableInfo; use table::TableRef; use crate::error::{ - BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu, - DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu, - InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result, + BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, + ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu, + PipelineNotFoundSnafu, Result, }; use crate::etl::transform::GreptimeTransformer; use crate::etl::{parse, Content, Pipeline}; @@ -204,7 +204,7 @@ impl PipelineTable { /// Compile a pipeline from a string. pub fn compile_pipeline(pipeline: &str) -> Result> { let yaml_content = Content::Yaml(pipeline); - parse::(&yaml_content).context(CompilePipelineSnafu) + parse::(&yaml_content) } /// Insert a pipeline into the pipeline table. diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 6c76942d05..b850a4d938 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -151,7 +151,7 @@ pub enum Error { #[snafu(display("Failed to describe statement"))] DescribeStatement { source: BoxedError }, - #[snafu(display("Pipeline management api error"))] + #[snafu(display("Pipeline error"))] Pipeline { #[snafu(source)] source: pipeline::error::Error, @@ -159,14 +159,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Pipeline transform error"))] - PipelineTransform { - #[snafu(source)] - source: pipeline::etl_error::Error, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Not supported: {}", feat))] NotSupported { feat: String }, @@ -661,7 +653,6 @@ impl ErrorExt for Error { | CheckDatabaseValidity { source, .. } => source.status_code(), Pipeline { source, .. } => source.status_code(), - PipelineTransform { source, .. } => source.status_code(), NotSupported { .. } | InvalidParameter { .. } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 2759a07102..93bc5f8f73 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -32,7 +32,6 @@ use common_telemetry::{error, warn}; use datatypes::value::column_data_to_json; use headers::ContentType; use lazy_static::lazy_static; -use pipeline::error::PipelineTransformSnafu; use pipeline::util::to_pipeline_version; use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition, PipelineVersion}; use serde::{Deserialize, Serialize}; @@ -284,9 +283,7 @@ async fn dryrun_pipeline_inner( &pipeline_handler, PipelineDefinition::Resolved(pipeline), ¶ms, - pipeline::json_array_to_intermediate_state(value) - .context(PipelineTransformSnafu) - .context(PipelineSnafu)?, + pipeline::json_array_to_intermediate_state(value).context(PipelineSnafu)?, "dry_run".to_owned(), query_ctx, true, @@ -636,9 +633,7 @@ pub(crate) async fn ingest_logs_inner( &state, PipelineDefinition::from_name(&pipeline_name, version), &pipeline_params, - pipeline::json_array_to_intermediate_state(request.values) - .context(PipelineTransformSnafu) - .context(PipelineSnafu)?, + pipeline::json_array_to_intermediate_state(request.values).context(PipelineSnafu)?, request.table, &query_ctx, true, diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index c4b59a866a..10dc8b5cc4 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -32,7 +32,7 @@ use snafu::{ensure, ResultExt}; use super::trace::attributes::OtlpAnyValue; use super::utils::{bytes_to_hex_string, key_value_to_jsonb}; use crate::error::{ - IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result, + IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, }; use crate::pipeline::run_pipeline; @@ -72,8 +72,7 @@ pub async fn to_grpc_insert_requests( } PipelineWay::Pipeline(pipeline_def) => { let data = parse_export_logs_service_request(request); - let array = - pipeline::json_array_to_intermediate_state(data).context(PipelineTransformSnafu)?; + let array = pipeline::json_array_to_intermediate_state(data).context(PipelineSnafu)?; let inserts = run_pipeline( &pipeline_handler, diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index e952e4ba8a..0b51da94ea 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -23,7 +23,7 @@ use pipeline::{ use session::context::QueryContextRef; use snafu::ResultExt; -use crate::error::{CatalogSnafu, PipelineTransformSnafu, Result}; +use crate::error::{CatalogSnafu, PipelineSnafu, Result}; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, }; @@ -74,7 +74,7 @@ pub(crate) async fn run_pipeline( table_name, }] }) - .context(PipelineTransformSnafu) + .context(PipelineSnafu) } else { let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?; @@ -91,7 +91,7 @@ pub(crate) async fn run_pipeline( .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) .observe(transform_timer.elapsed().as_secs_f64()); }) - .context(PipelineTransformSnafu)?; + .context(PipelineSnafu)?; match r { PipelineExecOutput::Transformed(row) => {