diff --git a/src/servers/src/http/extractor.rs b/src/servers/src/http/extractor.rs index ae578f21d3..1cb073ebdb 100644 --- a/src/servers/src/http/extractor.rs +++ b/src/servers/src/http/extractor.rs @@ -23,7 +23,8 @@ use pipeline::{GreptimePipelineParams, SelectInfo}; use crate::http::header::constants::{ GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME, - GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, + GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER, + GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, }; /// Axum extractor for optional target log table name from HTTP header @@ -38,7 +39,7 @@ where async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { let headers = &parts.headers; - string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName) + string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName) } } @@ -54,7 +55,8 @@ where async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { let headers = &parts.headers; - string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName) + string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME]) + .map(TraceTableName) } } @@ -71,7 +73,7 @@ where async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { let select = - string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?; + string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?; match select { Some(name) => { @@ -102,12 +104,22 @@ where async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { let headers = &parts.headers; - let pipeline_name = - string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?; - let pipeline_version = - string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?; + let pipeline_name = string_value_from_header( + headers, + &[ + GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, + GREPTIME_PIPELINE_NAME_HEADER_NAME, + ], + )?; + let pipeline_version = string_value_from_header( + headers, + &[ + GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, + GREPTIME_PIPELINE_VERSION_HEADER_NAME, + ], + )?; let pipeline_parameters = - string_value_from_header(headers, GREPTIME_PIPELINE_PARAMS_HEADER)?; + string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?; Ok(PipelineInfo { pipeline_name, @@ -120,17 +132,19 @@ where #[inline] fn string_value_from_header( headers: &HeaderMap, - header_key: &str, + header_keys: &[&str], ) -> Result, (StatusCode, String)> { - headers - .get(header_key) - .map(|value| { - String::from_utf8(value.as_bytes().to_vec()).map_err(|_| { + for header_key in header_keys { + if let Some(value) = headers.get(*header_key) { + return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| { ( StatusCode::BAD_REQUEST, format!("`{}` header is not valid UTF-8 string type.", header_key), ) - }) - }) - .transpose() + })) + .transpose(); + } + } + + Ok(None) } diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index bd7f35b9ae..e620e7dd9b 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -45,8 +45,15 @@ pub mod constants { pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name"; pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone"; pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE; + + // Deprecated: pipeline is also used with trace, so we remove log from it. pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name"; pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version"; + + // More generic pipeline header name + pub const GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name"; + pub const GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version"; + pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name"; pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys"; pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name"; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c48547b25c..8d16726f58 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -2182,7 +2182,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { HeaderValue::from_static("application/x-protobuf"), ), ( - HeaderName::from_static("x-greptime-log-pipeline-name"), + HeaderName::from_static("x-greptime-pipeline-name"), HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), ), ( @@ -2223,7 +2223,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) { HeaderValue::from_static("application/x-protobuf"), ), ( - HeaderName::from_static("x-greptime-log-pipeline-name"), + HeaderName::from_static("x-greptime-pipeline-name"), HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME), ), (