feat: update pipeline header name to x-greptime-pipeline-name (#5710)

* feat: update pipeline header name to x-greptime-pipeline-name

* refactor: update string_value_from_header
This commit is contained in:
Ning Sun
2025-03-17 19:39:54 -07:00
committed by GitHub
parent 9e63018198
commit 1ab4ddab8d
3 changed files with 40 additions and 19 deletions

View File

@@ -23,7 +23,8 @@ use pipeline::{GreptimePipelineParams, SelectInfo};
use crate::http::header::constants::{
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_PARAMS_HEADER, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_NAME_HEADER_NAME, GREPTIME_PIPELINE_PARAMS_HEADER,
GREPTIME_PIPELINE_VERSION_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
/// Axum extractor for optional target log table name from HTTP header
@@ -38,7 +39,7 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_LOG_TABLE_NAME_HEADER_NAME).map(LogTableName)
string_value_from_header(headers, &[GREPTIME_LOG_TABLE_NAME_HEADER_NAME]).map(LogTableName)
}
}
@@ -54,7 +55,8 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let headers = &parts.headers;
string_value_from_header(headers, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME).map(TraceTableName)
string_value_from_header(headers, &[GREPTIME_TRACE_TABLE_NAME_HEADER_NAME])
.map(TraceTableName)
}
}
@@ -71,7 +73,7 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let select =
string_value_from_header(&parts.headers, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
string_value_from_header(&parts.headers, &[GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME])?;
match select {
Some(name) => {
@@ -102,12 +104,22 @@ where
async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
let headers = &parts.headers;
let pipeline_name =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME)?;
let pipeline_version =
string_value_from_header(headers, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME)?;
let pipeline_name = string_value_from_header(
headers,
&[
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
GREPTIME_PIPELINE_NAME_HEADER_NAME,
],
)?;
let pipeline_version = string_value_from_header(
headers,
&[
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_PIPELINE_VERSION_HEADER_NAME,
],
)?;
let pipeline_parameters =
string_value_from_header(headers, GREPTIME_PIPELINE_PARAMS_HEADER)?;
string_value_from_header(headers, &[GREPTIME_PIPELINE_PARAMS_HEADER])?;
Ok(PipelineInfo {
pipeline_name,
@@ -120,17 +132,19 @@ where
#[inline]
fn string_value_from_header(
headers: &HeaderMap,
header_key: &str,
header_keys: &[&str],
) -> Result<Option<String>, (StatusCode, String)> {
headers
.get(header_key)
.map(|value| {
String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
for header_key in header_keys {
if let Some(value) = headers.get(*header_key) {
return Some(String::from_utf8(value.as_bytes().to_vec()).map_err(|_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", header_key),
)
})
})
.transpose()
}))
.transpose();
}
}
Ok(None)
}

View File

@@ -45,8 +45,15 @@ pub mod constants {
pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name";
pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone";
pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE;
// Deprecated: pipeline is also used with trace, so we remove log from it.
pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name";
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
// More generic pipeline header name
pub const GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name";
pub const GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";

View File

@@ -2182,7 +2182,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(
@@ -2223,7 +2223,7 @@ pub async fn test_otlp_traces_v1(store_type: StorageType) {
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-pipeline-name"),
HeaderName::from_static("x-greptime-pipeline-name"),
HeaderValue::from_static(GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME),
),
(