diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 92a6432baa..dd1e9695fa 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -43,7 +43,7 @@ use crate::error::{ Result, UnsupportedContentTypeSnafu, }; use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER; -use crate::http::header::CONTENT_TYPE_PROTOBUF_STR; +use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR}; use crate::http::result::greptime_manage_resp::GreptimedbManageResponse; use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; @@ -63,6 +63,8 @@ lazy_static! { pub static ref TEXT_UTF8_CONTENT_TYPE: ContentType = ContentType::text_utf8(); pub static ref PB_CONTENT_TYPE: ContentType = ContentType::from_str(CONTENT_TYPE_PROTOBUF_STR).unwrap(); + pub static ref NDJSON_CONTENT_TYPE: ContentType = + ContentType::from_str(CONTENT_TYPE_NDJSON_STR).unwrap(); } /// LogIngesterQueryParams is used for query params of log ingester API. @@ -571,6 +573,29 @@ fn extract_pipeline_value_by_content_type( Deserializer::from_str(&payload).into_iter(), ignore_errors, )?, + ct if ct == *NDJSON_CONTENT_TYPE => { + let mut result = Vec::with_capacity(1000); + for (index, line) in payload.lines().enumerate() { + match serde_json::from_str(line) { + Ok(v) => { + result.push(v); + } + Err(_) => { + if !ignore_errors { + warn!( + "invalid json item in array, index: {:?}, value: {:?}", + index, line + ); + return InvalidParameterSnafu { + reason: format!("invalid item:{} in array", line), + } + .fail(); + } + } + } + } + result + } ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload .lines() .filter(|line| !line.is_empty()) @@ -699,4 +724,28 @@ mod tests { .to_string(); assert_eq!(a, "[{\"a\":1},{\"b\":2}]"); } + + #[test] + fn test_extract_by_content() { + let payload = r#" + {"a": 1} + {"b": 2"} + {"c": 1} +"#; + let fail_rest = + extract_pipeline_value_by_content_type(ContentType::json(), payload.to_string(), true); + assert!(fail_rest.is_ok()); + assert_eq!(fail_rest.unwrap(), vec![json!({"a": 1})]); + + let fail_only_wrong = extract_pipeline_value_by_content_type( + NDJSON_CONTENT_TYPE.clone(), + payload.to_string(), + true, + ); + assert!(fail_only_wrong.is_ok()); + assert_eq!( + fail_only_wrong.unwrap(), + vec![json!({"a": 1}), json!({"c": 1})] + ); + } } diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index e14ce61729..bd7f35b9ae 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -73,6 +73,8 @@ pub static CONTENT_TYPE_PROTOBUF_STR: &str = "application/x-protobuf"; pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static(CONTENT_TYPE_PROTOBUF_STR); pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy"); +pub static CONTENT_TYPE_NDJSON_STR: &str = "application/x-ndjson"; + pub struct GreptimeDbName(Option); impl Header for GreptimeDbName {