refactor!: make pipeline a required parameter when ingesting trace (#5828)

* feat: make pipeline a required header for trace

* test: add test case without pipeline
This commit is contained in:
Ning Sun
2025-04-07 14:18:17 +08:00
committed by GitHub
parent 1695919ee7
commit f2907bb009
4 changed files with 59 additions and 15 deletions

View File

@@ -710,6 +710,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Pipeline is required for this API."))]
PipelineMissing {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -819,7 +825,8 @@ impl ErrorExt for Error {
| ValueRequiredForDispatcherRule
| ReachedMaxNestedLevels { .. }
| RequiredTableSuffixTemplate
| InvalidTableSuffixTemplate { .. } => StatusCode::InvalidArguments,
| InvalidTableSuffixTemplate { .. }
| PipelineMissing { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -23,7 +23,7 @@ use itertools::Itertools;
use snafu::ensure;
use util::to_pipeline_version;
use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, Result};
use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result};
use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
use crate::table::PipelineTable;
use crate::{Pipeline, Value};
@@ -72,6 +72,7 @@ impl SelectInfo {
}
pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
/// Enum for holding information of a pipeline, which is either pipeline itself,
@@ -114,11 +115,13 @@ impl PipelineWay {
pub fn from_name_and_default(
name: Option<&str>,
version: Option<&str>,
default_pipeline: PipelineWay,
default_pipeline: Option<PipelineWay>,
) -> Result<PipelineWay> {
if let Some(pipeline_name) = name {
if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
Ok(PipelineWay::OtlpTraceDirectV1)
} else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
Ok(PipelineWay::OtlpTraceDirectV0)
} else {
Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
pipeline_name,
@@ -126,8 +129,10 @@ impl PipelineWay {
None,
)?))
}
} else {
} else if let Some(default_pipeline) = default_pipeline {
Ok(default_pipeline)
} else {
PipelineMissingSnafu.fail()
}
}
}

View File

@@ -92,7 +92,7 @@ pub async fn traces(
let pipeline = PipelineWay::from_name_and_default(
pipeline_info.pipeline_name.as_deref(),
pipeline_info.pipeline_version.as_deref(),
PipelineWay::OtlpTraceDirectV0,
None,
)
.context(PipelineSnafu)?;
@@ -142,7 +142,7 @@ pub async fn logs(
let pipeline = PipelineWay::from_name_and_default(
pipeline_info.pipeline_name.as_deref(),
pipeline_info.pipeline_version.as_deref(),
PipelineWay::OtlpLogDirect(Box::new(select_info)),
Some(PipelineWay::OtlpLogDirect(Box::new(select_info))),
)
.context(PipelineSnafu)?;
let pipeline_params = pipeline_info.pipeline_params;

View File

@@ -2337,10 +2337,16 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
// write traces data
let res = send_req(
&client,
vec![(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
)],
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static("greptime_trace_v0"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,
@@ -2366,6 +2372,26 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
// write traces data with gzip
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static("greptime_trace_v0"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
true,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// write traces data without pipeline
let res = send_req(
&client,
vec![(
@@ -2377,7 +2403,7 @@ pub async fn test_otlp_traces_v0(store_type: StorageType) {
true,
)
.await;
assert_eq!(StatusCode::OK, res.status());
assert_eq!(StatusCode::BAD_REQUEST, res.status());
// select traces data again
validate_data(
@@ -3045,10 +3071,16 @@ pub async fn test_jaeger_query_api(store_type: StorageType) {
// write traces data.
let res = send_req(
&client,
vec![(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
)],
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static("greptime_trace_v0"),
),
],
"/v1/otlp/v1/traces",
body.clone(),
false,