diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 2b621b7dba..b590e6847d 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -710,6 +710,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Pipeline is required for this API."))] + PipelineMissing { + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -819,7 +825,8 @@ impl ErrorExt for Error { | ValueRequiredForDispatcherRule | ReachedMaxNestedLevels { .. } | RequiredTableSuffixTemplate - | InvalidTableSuffixTemplate { .. } => StatusCode::InvalidArguments, + | InvalidTableSuffixTemplate { .. } + | PipelineMissing { .. } => StatusCode::InvalidArguments, } } diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index b10b22c983..b929cf3c23 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -23,7 +23,7 @@ use itertools::Itertools; use snafu::ensure; use util::to_pipeline_version; -use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, Result}; +use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result}; use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION}; use crate::table::PipelineTable; use crate::{Pipeline, Value}; @@ -72,6 +72,7 @@ impl SelectInfo { } pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity"; +pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0"; pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1"; /// Enum for holding information of a pipeline, which is either pipeline itself, @@ -114,11 +115,13 @@ impl PipelineWay { pub fn from_name_and_default( name: Option<&str>, version: Option<&str>, - default_pipeline: PipelineWay, + default_pipeline: Option, ) -> Result { if let Some(pipeline_name) = name { if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME { Ok(PipelineWay::OtlpTraceDirectV1) + } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME { + Ok(PipelineWay::OtlpTraceDirectV0) } else { Ok(PipelineWay::Pipeline(PipelineDefinition::from_name( pipeline_name, @@ -126,8 +129,10 @@ impl PipelineWay { None, )?)) } - } else { + } else if let Some(default_pipeline) = default_pipeline { Ok(default_pipeline) + } else { + PipelineMissingSnafu.fail() } } } diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 491e6e4868..90f97843f3 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -92,7 +92,7 @@ pub async fn traces( let pipeline = PipelineWay::from_name_and_default( pipeline_info.pipeline_name.as_deref(), pipeline_info.pipeline_version.as_deref(), - PipelineWay::OtlpTraceDirectV0, + None, ) .context(PipelineSnafu)?; @@ -142,7 +142,7 @@ pub async fn logs( let pipeline = PipelineWay::from_name_and_default( pipeline_info.pipeline_name.as_deref(), pipeline_info.pipeline_version.as_deref(), - PipelineWay::OtlpLogDirect(Box::new(select_info)), + Some(PipelineWay::OtlpLogDirect(Box::new(select_info))), ) .context(PipelineSnafu)?; let pipeline_params = pipeline_info.pipeline_params; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 1d4e795c9e..018d4e2196 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -2337,10 +2337,16 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) { // write traces data let res = send_req( &client, - vec![( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/x-protobuf"), - )], + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static("greptime_trace_v0"), + ), + ], "/v1/otlp/v1/traces", body.clone(), false, @@ -2366,6 +2372,26 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write traces data with gzip + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static("greptime_trace_v0"), + ), + ], + "/v1/otlp/v1/traces", + body.clone(), + true, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // write traces data without pipeline let res = send_req( &client, vec![( @@ -2377,7 +2403,7 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) { true, ) .await; - assert_eq!(StatusCode::OK, res.status()); + assert_eq!(StatusCode::BAD_REQUEST, res.status()); // select traces data again validate_data( @@ -3045,10 +3071,16 @@ pub async fn test_jaeger_query_api(store_type: StorageType) { // write traces data. let res = send_req( &client, - vec![( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/x-protobuf"), - )], + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-pipeline-name"), + HeaderValue::from_static("greptime_trace_v0"), + ), + ], "/v1/otlp/v1/traces", body.clone(), false,