chore: merge error files under pipeline crate (#5738)

This commit is contained in:
shuiyisong
2025-03-19 17:55:51 +08:00
committed by GitHub
parent cd730e0486
commit 2431cd3bdf
34 changed files with 252 additions and 245 deletions

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline, Result};
use pipeline::error::Result;
use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline};
use serde_json::{Deserializer, Value};
fn processor_mut(

View File

@@ -16,7 +16,7 @@ use common_telemetry::debug;
use snafu::OptionExt;
use yaml_rust::Yaml;
use crate::etl::error::{
use crate::error::{
Error, FieldRequiredForDispatcherSnafu, Result, TableSuffixRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};

View File

@@ -17,6 +17,7 @@ use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::timestamp::TimestampNanosecond;
use snafu::{Location, Snafu};
#[derive(Snafu)]
@@ -51,7 +52,7 @@ pub enum Error {
#[snafu(display("Processor {processor}: expect string value, but got {v:?}"))]
ProcessorExpectString {
processor: String,
v: crate::etl::Value,
v: crate::Value,
#[snafu(implicit)]
location: Location,
},
@@ -607,13 +608,197 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline table not found"))]
PipelineTableNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to insert pipeline to pipelines table"))]
InsertPipeline {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
PipelineNotFound {
name: String,
version: Option<TimestampNanosecond>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch"))]
CollectRecords {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
error: datafusion_common::DataFusionError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute internal statement"))]
ExecuteInternalStatement {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create dataframe"))]
DataFrame {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("General catalog error"))]
Catalog {
#[snafu(source)]
source: catalog::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create table"))]
CreateTable {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid pipeline version format: {}", version))]
InvalidPipelineVersion {
version: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
StatusCode::InvalidArguments
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. } | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
EmptyInputField { .. }
| MissingInputField { .. }
| ProcessorMustBeMap { .. }
| ProcessorMissingField { .. }
| ProcessorExpectString { .. }
| ProcessorUnsupportedValue { .. }
| ProcessorKeyMustBeString { .. }
| ProcessorFailedToParseString { .. }
| ProcessorMustHaveStringKey { .. }
| UnsupportedProcessor { .. }
| FieldMustBeType { .. }
| FailedParseFieldFromString { .. }
| FailedToParseIntKey { .. }
| FailedToParseInt { .. }
| FailedToParseFloatKey { .. }
| IntermediateKeyIndex { .. }
| CmcdMissingValue { .. }
| CmcdMissingKey { .. }
| KeyMustBeString { .. }
| CsvRead { .. }
| CsvNoRecord { .. }
| CsvSeparatorName { .. }
| CsvQuoteName { .. }
| DateParseTimezone { .. }
| DateParse { .. }
| DateFailedToGetLocalTimezone { .. }
| DateFailedToGetTimestamp { .. }
| DateInvalidFormat { .. }
| DissectInvalidPattern { .. }
| DissectEmptyPattern { .. }
| DissectSplitExceedsInput { .. }
| DissectSplitNotMatchInput { .. }
| DissectConsecutiveNames { .. }
| DissectNoMatchingPattern { .. }
| DissectModifierAlreadySet { .. }
| DissectAppendOrderAlreadySet { .. }
| DissectOrderOnlyAppend { .. }
| DissectOrderOnlyAppendModifier { .. }
| DissectEndModifierAlreadySet { .. }
| EpochInvalidResolution { .. }
| GsubPatternRequired { .. }
| GsubReplacementRequired { .. }
| Regex { .. }
| JoinSeparatorRequired { .. }
| LetterInvalidMethod { .. }
| RegexNamedGroupNotFound { .. }
| RegexNoValidField { .. }
| RegexNoValidPattern { .. }
| UrlEncodingInvalidMethod { .. }
| DigestPatternInvalid { .. }
| UrlEncodingDecode { .. }
| TransformOnFailureInvalidValue { .. }
| TransformElementMustBeMap { .. }
| TransformTypeMustBeSet { .. }
| TransformEmpty { .. }
| TransformColumnNameMustBeUnique { .. }
| TransformMultipleTimestampIndex { .. }
| TransformTimestampIndexCount { .. }
| CoerceUnsupportedNullType { .. }
| CoerceUnsupportedNullTypeTo { .. }
| CoerceUnsupportedEpochType { .. }
| CoerceStringToType { .. }
| CoerceJsonTypeTo { .. }
| CoerceTypeToJson { .. }
| CoerceIncompatibleTypes { .. }
| ValueInvalidResolution { .. }
| ValueParseType { .. }
| ValueParseInt { .. }
| ValueParseFloat { .. }
| ValueParseBoolean { .. }
| ValueDefaultValueUnsupported { .. }
| ValueUnsupportedNumberType { .. }
| ValueUnsupportedYamlType { .. }
| ValueYamlKeyMustBeString { .. }
| YamlLoad { .. }
| YamlParse { .. }
| PrepareValueMustBeObject { .. }
| ColumnOptions { .. }
| UnsupportedIndexType { .. }
| UnsupportedNumberType { .. }
| IdentifyPipelineColumnTypeMismatch { .. }
| JsonPathParse { .. }
| JsonPathParseResultIndex { .. }
| FieldRequiredForDispatcher
| TableSuffixRequiredForDispatcherRule
| ValueRequiredForDispatcherRule
| ReachedMaxNestedLevels { .. } => StatusCode::InvalidArguments,
}
}
fn as_any(&self) -> &dyn Any {

View File

@@ -13,16 +13,11 @@
// limitations under the License.
#![allow(dead_code)]
pub mod error;
pub mod field;
pub mod processor;
pub mod transform;
pub mod value;
use error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use processor::{Processor, Processors};
use snafu::{ensure, OptionExt, ResultExt};
use transform::{Transformer, Transforms};
@@ -30,7 +25,9 @@ use value::Value;
use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::etl::error::Result;
use crate::error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
};
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";

View File

@@ -17,8 +17,7 @@ use std::str::FromStr;
use snafu::OptionExt;
use super::error::{EmptyInputFieldSnafu, MissingInputFieldSnafu};
use crate::etl::error::{Error, Result};
use crate::error::{EmptyInputFieldSnafu, Error, MissingInputFieldSnafu, Result};
/// Raw processor-defined inputs and outputs
#[derive(Debug, Default, Clone)]

View File

@@ -45,15 +45,13 @@ use snafu::{OptionExt, ResultExt};
use timestamp::TimestampProcessor;
use urlencoding::UrlEncodingProcessor;
use super::error::{
FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu,
};
use super::field::{Field, Fields};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
use crate::error::{
Error, FailedParseFieldFromStringSnafu, FieldMustBeTypeSnafu, ProcessorKeyMustBeStringSnafu,
ProcessorMustBeMapSnafu, ProcessorMustHaveStringKeySnafu, Result, UnsupportedProcessorSnafu,
};
use crate::etl::processor::simple_extract::SimpleExtractProcessor;
use crate::etl_error::UnsupportedProcessorSnafu;
const FIELD_NAME: &str = "field";
const FIELDS_NAME: &str = "fields";

View File

@@ -19,7 +19,7 @@
use snafu::{OptionExt, ResultExt};
use urlencoding::decode;
use crate::etl::error::{
use crate::error::{
CmcdMissingKeySnafu, CmcdMissingValueSnafu, Error, FailedToParseFloatKeySnafu,
FailedToParseIntKeySnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,

View File

@@ -19,7 +19,7 @@ use itertools::EitherOrBoth::{Both, Left, Right};
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
CsvNoRecordSnafu, CsvQuoteNameSnafu, CsvReadSnafu, CsvSeparatorNameSnafu, Error,
KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};

View File

@@ -19,7 +19,7 @@ use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateParseSnafu,
DateParseTimezoneSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu, Result,

View File

@@ -22,7 +22,7 @@ use once_cell::sync::Lazy;
use regex::Regex;
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::Fields;

View File

@@ -24,8 +24,9 @@ use std::borrow::Cow;
use regex::Regex;
use snafu::OptionExt;
use crate::etl::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
use crate::error::{
DigestPatternInvalidSnafu, Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::Fields;
use crate::etl::processor::{
@@ -33,7 +34,6 @@ use crate::etl::processor::{
};
use crate::etl::value::Value;
use crate::etl::PipelineMap;
use crate::etl_error::DigestPatternInvalidSnafu;
pub(crate) const PROCESSOR_DIGEST: &str = "digest";

View File

@@ -18,7 +18,7 @@ use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use itertools::Itertools;
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
DissectAppendOrderAlreadySetSnafu, DissectConsecutiveNamesSnafu, DissectEmptyPatternSnafu,
DissectEndModifierAlreadySetSnafu, DissectInvalidPatternSnafu, DissectModifierAlreadySetSnafu,
DissectNoMatchingPatternSnafu, DissectOrderOnlyAppendModifierSnafu,

View File

@@ -14,7 +14,7 @@
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
EpochInvalidResolutionSnafu, Error, FailedToParseIntSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu, ProcessorUnsupportedValueSnafu, Result,
};

View File

@@ -15,7 +15,7 @@
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
Error, GsubPatternRequiredSnafu, GsubReplacementRequiredSnafu, KeyMustBeStringSnafu,
ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, RegexSnafu, Result,
};

View File

@@ -14,7 +14,7 @@
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
Error, JoinSeparatorRequiredSnafu, KeyMustBeStringSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};

View File

@@ -19,12 +19,11 @@ use super::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, PipelineMap, Processor, FIELDS_NAME,
FIELD_NAME, IGNORE_MISSING_NAME, JSON_PATH_NAME, JSON_PATH_RESULT_INDEX_NAME,
};
use crate::etl::error::{Error, Result};
use crate::etl::field::Fields;
use crate::etl_error::{
JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu,
use crate::error::{
Error, JsonPathParseResultIndexSnafu, JsonPathParseSnafu, KeyMustBeStringSnafu,
ProcessorMissingFieldSnafu, Result,
};
use crate::etl::field::Fields;
use crate::Value;
pub(crate) const PROCESSOR_JSON_PATH: &str = "json_path";

View File

@@ -14,7 +14,7 @@
use snafu::OptionExt;
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, LetterInvalidMethodSnafu, ProcessorExpectStringSnafu,
ProcessorMissingFieldSnafu, Result,
};

View File

@@ -22,7 +22,7 @@ use lazy_static::lazy_static;
use regex::Regex;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu,
RegexNamedGroupNotFoundSnafu, RegexNoValidFieldSnafu, RegexNoValidPatternSnafu, RegexSnafu,
Result,

View File

@@ -14,13 +14,12 @@
use snafu::OptionExt as _;
use crate::etl::error::{Error, Result};
use crate::error::{Error, KeyMustBeStringSnafu, ProcessorMissingFieldSnafu, Result};
use crate::etl::field::Fields;
use crate::etl::processor::{
yaml_bool, yaml_new_field, yaml_new_fields, yaml_string, FIELDS_NAME, FIELD_NAME,
IGNORE_MISSING_NAME, SIMPLE_EXTRACT_KEY_NAME,
};
use crate::etl_error::{KeyMustBeStringSnafu, ProcessorMissingFieldSnafu};
use crate::{PipelineMap, Processor, Value};
pub(crate) const PROCESSOR_SIMPLE_EXTRACT: &str = "simple_extract";

View File

@@ -19,7 +19,7 @@ use chrono_tz::Tz;
use lazy_static::lazy_static;
use snafu::{OptionExt, ResultExt};
use crate::etl::error::{
use crate::error::{
DateFailedToGetLocalTimezoneSnafu, DateFailedToGetTimestampSnafu, DateInvalidFormatSnafu,
DateParseSnafu, DateParseTimezoneSnafu, EpochInvalidResolutionSnafu, Error,
KeyMustBeStringSnafu, ProcessorFailedToParseStringSnafu, ProcessorMissingFieldSnafu,

View File

@@ -15,7 +15,7 @@
use snafu::{OptionExt, ResultExt};
use urlencoding::{decode, encode};
use crate::etl::error::{
use crate::error::{
Error, KeyMustBeStringSnafu, ProcessorExpectStringSnafu, ProcessorMissingFieldSnafu, Result,
UrlEncodingDecodeSnafu, UrlEncodingInvalidMethodSnafu,
};

View File

@@ -17,7 +17,14 @@ pub mod transformer;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
use super::field::Fields;
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
use super::value::Timestamp;
use super::PipelineMap;
use crate::error::{
Error, KeyMustBeStringSnafu, Result, TransformElementMustBeMapSnafu,
TransformOnFailureInvalidValueSnafu, TransformTypeMustBeSetSnafu,
};
use crate::etl::processor::yaml_bool;
use crate::etl::transform::index::Index;
use crate::etl::value::Value;
@@ -32,15 +39,6 @@ const TRANSFORM_ON_FAILURE: &str = "on_failure";
pub use transformer::greptime::GreptimeTransformer;
use super::error::{
KeyMustBeStringSnafu, TransformElementMustBeMapSnafu, TransformOnFailureInvalidValueSnafu,
TransformTypeMustBeSetSnafu,
};
use super::field::Fields;
use super::processor::{yaml_new_field, yaml_new_fields, yaml_string};
use super::value::Timestamp;
use super::PipelineMap;
pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
type Output;
type VecOutput;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::etl::error::{Error, Result, UnsupportedIndexTypeSnafu};
use crate::error::{Error, Result, UnsupportedIndexTypeSnafu};
const INDEX_TIMESTAMP: &str = "timestamp";
const INDEX_TIMEINDEX: &str = "time";

View File

@@ -27,7 +27,7 @@ use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use serde_json::Number;
use crate::etl::error::{
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,

View File

@@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType};
use snafu::ResultExt;
use crate::etl::error::{
use crate::error::{
CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu,
CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu,
CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result,

View File

@@ -28,13 +28,12 @@ use regex::Regex;
use snafu::{OptionExt, ResultExt};
pub use time::Timestamp;
use super::error::{
ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu, ValueParseBooleanSnafu,
ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu, ValueUnsupportedNumberTypeSnafu,
ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
};
use super::PipelineMap;
use crate::etl::error::{Error, Result};
use crate::error::{
Error, Result, ValueDefaultValueUnsupportedSnafu, ValueInvalidResolutionSnafu,
ValueParseBooleanSnafu, ValueParseFloatSnafu, ValueParseIntSnafu, ValueParseTypeSnafu,
ValueUnsupportedNumberTypeSnafu, ValueUnsupportedYamlTypeSnafu, ValueYamlKeyMustBeStringSnafu,
};
/// Value can be used as type
/// acts as value: the enclosed value is the actual value

View File

@@ -13,22 +13,22 @@
// limitations under the License.
mod dispatcher;
pub mod error;
mod etl;
mod manager;
mod metrics;
pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{
error as etl_error, json_array_to_intermediate_state, json_to_intermediate_state, parse,
Content, DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
json_array_to_intermediate_state, json_to_intermediate_state, parse, Content, DispatchedTo,
Pipeline, PipelineExecOutput, PipelineMap,
};
pub use manager::{
error, pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef,
PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME,
};

View File

@@ -19,10 +19,10 @@ use datatypes::timestamp::TimestampNanosecond;
use itertools::Itertools;
use util::to_pipeline_version;
use crate::error::Result;
use crate::table::PipelineTable;
use crate::{GreptimeTransformer, Pipeline};
pub mod error;
pub mod pipeline_operator;
pub mod table;
pub mod util;
@@ -99,7 +99,7 @@ impl PipelineWay {
name: Option<&str>,
version: Option<&str>,
default_pipeline: PipelineWay,
) -> error::Result<PipelineWay> {
) -> Result<PipelineWay> {
if let Some(pipeline_name) = name {
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
Ok(PipelineWay::OtlpTraceDirectV1)

View File

@@ -1,153 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::timestamp::TimestampNanosecond;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Pipeline table not found"))]
PipelineTableNotFound {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to insert pipeline to pipelines table"))]
InsertPipeline {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to parse pipeline"))]
CompilePipeline {
#[snafu(source)]
source: crate::etl::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
PipelineNotFound {
name: String,
version: Option<TimestampNanosecond>,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to collect record batch"))]
CollectRecords {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to build DataFusion logical plan"))]
BuildDfLogicalPlan {
#[snafu(source)]
error: datafusion_common::DataFusionError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute internal statement"))]
ExecuteInternalStatement {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create dataframe"))]
DataFrame {
#[snafu(source)]
source: query::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("General catalog error"))]
Catalog {
#[snafu(source)]
source: catalog::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to create table"))]
CreateTable {
#[snafu(source)]
source: operator::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to execute pipeline"))]
PipelineTransform {
#[snafu(source)]
source: crate::etl::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid pipeline version format: {}", version))]
InvalidPipelineVersion {
version: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
CastType { .. } => StatusCode::Unexpected,
PipelineTableNotFound { .. } => StatusCode::TableNotFound,
InsertPipeline { source, .. } => source.status_code(),
CollectRecords { source, .. } => source.status_code(),
PipelineNotFound { .. }
| CompilePipeline { .. }
| PipelineTransform { .. }
| InvalidPipelineVersion { .. } => StatusCode::InvalidArguments,
BuildDfLogicalPlan { .. } => StatusCode::Internal,
ExecuteInternalStatement { source, .. } => source.status_code(),
DataFrame { source, .. } => source.status_code(),
Catalog { source, .. } => source.status_code(),
CreateTable { source, .. } => source.status_code(),
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -41,9 +41,9 @@ use table::metadata::TableInfo;
use table::TableRef;
use crate::error::{
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu,
DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu,
InvalidPipelineVersionSnafu, PipelineNotFoundSnafu, Result,
BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu,
ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu,
PipelineNotFoundSnafu, Result,
};
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
@@ -204,7 +204,7 @@ impl PipelineTable {
/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline);
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
parse::<GreptimeTransformer>(&yaml_content)
}
/// Insert a pipeline into the pipeline table.

View File

@@ -151,7 +151,7 @@ pub enum Error {
#[snafu(display("Failed to describe statement"))]
DescribeStatement { source: BoxedError },
#[snafu(display("Pipeline management api error"))]
#[snafu(display("Pipeline error"))]
Pipeline {
#[snafu(source)]
source: pipeline::error::Error,
@@ -159,14 +159,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Pipeline transform error"))]
PipelineTransform {
#[snafu(source)]
source: pipeline::etl_error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Not supported: {}", feat))]
NotSupported { feat: String },
@@ -661,7 +653,6 @@ impl ErrorExt for Error {
| CheckDatabaseValidity { source, .. } => source.status_code(),
Pipeline { source, .. } => source.status_code(),
PipelineTransform { source, .. } => source.status_code(),
NotSupported { .. }
| InvalidParameter { .. }

View File

@@ -32,7 +32,6 @@ use common_telemetry::{error, warn};
use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition, PipelineVersion};
use serde::{Deserialize, Serialize};
@@ -284,9 +283,7 @@ async fn dryrun_pipeline_inner(
&pipeline_handler,
PipelineDefinition::Resolved(pipeline),
&params,
pipeline::json_array_to_intermediate_state(value)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?,
pipeline::json_array_to_intermediate_state(value).context(PipelineSnafu)?,
"dry_run".to_owned(),
query_ctx,
true,
@@ -636,9 +633,7 @@ pub(crate) async fn ingest_logs_inner(
&state,
PipelineDefinition::from_name(&pipeline_name, version),
&pipeline_params,
pipeline::json_array_to_intermediate_state(request.values)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?,
pipeline::json_array_to_intermediate_state(request.values).context(PipelineSnafu)?,
request.table,
&query_ctx,
true,

View File

@@ -32,7 +32,7 @@ use snafu::{ensure, ResultExt};
use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineTransformSnafu, Result,
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result,
UnsupportedJsonDataTypeForTagSnafu,
};
use crate::pipeline::run_pipeline;
@@ -72,8 +72,7 @@ pub async fn to_grpc_insert_requests(
}
PipelineWay::Pipeline(pipeline_def) => {
let data = parse_export_logs_service_request(request);
let array =
pipeline::json_array_to_intermediate_state(data).context(PipelineTransformSnafu)?;
let array = pipeline::json_array_to_intermediate_state(data).context(PipelineSnafu)?;
let inserts = run_pipeline(
&pipeline_handler,

View File

@@ -23,7 +23,7 @@ use pipeline::{
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error::{CatalogSnafu, PipelineTransformSnafu, Result};
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
@@ -74,7 +74,7 @@ pub(crate) async fn run_pipeline(
table_name,
}]
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)
} else {
let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?;
@@ -91,7 +91,7 @@ pub(crate) async fn run_pipeline(
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)?;
.context(PipelineSnafu)?;
match r {
PipelineExecOutput::Transformed(row) => {