chore: support application/x-ndjson for log ingest (#5697)

chore: support ndjson content type
This commit is contained in:
shuiyisong
2025-03-13 12:29:22 +08:00
committed by GitHub
parent 25645a3303
commit e0ff701e51
2 changed files with 52 additions and 1 deletions

View File

@@ -43,7 +43,7 @@ use crate::error::{
Result, UnsupportedContentTypeSnafu,
};
use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
use crate::http::result::greptime_manage_resp::GreptimedbManageResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
@@ -63,6 +63,8 @@ lazy_static! {
pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8();
pub static ref PB_CONTENT_TYPE: ContentType =
ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap();
pub static ref NDJSON_CONTENT_TYPE: ContentType =
ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap();
}
/// LogIngesterQueryParams is used for query params of log ingester API.
@@ -571,6 +573,29 @@ fn extract_pipeline_value_by_content_type(
Deserializer::from_str(&payload).into_iter(),
ignore_errors,
)?,
ct if ct == *NDJSON_CONTENT_TYPE => {
let mut result = Vec::with_capacity(1000);
for (index, line) in payload.lines().enumerate() {
match serde_json::from_str(line) {
Ok(v) => {
result.push(v);
}
Err(_) => {
if !ignore_errors {
warn!(
"invalid json item in array, index: {:?}, value: {:?}",
index, line
);
return InvalidParameterSnafu {
reason: format!("invalid item:{} in array", line),
}
.fail();
}
}
}
}
result
}
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
.lines()
.filter(|line| !line.is_empty())
@@ -699,4 +724,28 @@ mod tests {
.to_string();
assert_eq!(a, "[{\"a\":1},{\"b\":2}]");
}
#[test]
fn test_extract_by_content() {
let payload = r#"
{"a": 1}
{"b": 2"}
{"c": 1}
"#;
let fail_rest =
extract_pipeline_value_by_content_type(ContentType::json(), payload.to_string(), true);
assert!(fail_rest.is_ok());
assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]);
let fail_only_wrong = extract_pipeline_value_by_content_type(
NDJSON_CONTENT_TYPE.clone(),
payload.to_string(),
true,
);
assert!(fail_only_wrong.is_ok());
assert_eq!(
fail_only_wrong.unwrap(),
vec![json!({"a": 1}), json!({"c": 1})]
);
}
}

View File

@@ -73,6 +73,8 @@ pub static CONTENT_TYPE_PROTOBUF_STR: &str = "application/x-protobuf";
pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static(CONTENT_TYPE_PROTOBUF_STR);
pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy");
pub static CONTENT_TYPE_NDJSON_STR: &str = "application/x-ndjson";
pub struct GreptimeDbName(Option<String>);
impl Header for GreptimeDbName {