diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 0172924db5..97ecc40491 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -561,8 +561,8 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("Prepare value must be an object"))] - PrepareValueMustBeObject { + #[snafu(display("Input value must be an object"))] + InputValueMustBeObject { #[snafu(implicit)] location: Location, }, @@ -833,7 +833,7 @@ impl ErrorExt for Error { | ValueYamlKeyMustBeString { .. } | YamlLoad { .. } | YamlParse { .. } - | PrepareValueMustBeObject { .. } + | InputValueMustBeObject { .. } | ColumnOptions { .. } | UnsupportedIndexType { .. } | UnsupportedNumberType { .. } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index daa94c87bf..db5453310f 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -29,7 +29,7 @@ use yaml_rust::YamlLoader; use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ - IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, + InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result, TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu, }; use crate::etl::processor::ProcessorKind; @@ -186,7 +186,7 @@ pub fn json_to_map(val: serde_json::Value) -> Result { } Ok(intermediate_state) } - _ => PrepareValueMustBeObjectSnafu.fail(), + _ => InputValueMustBeObjectSnafu.fail(), } } @@ -203,7 +203,7 @@ pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result { } Ok(intermediate_state) } - _ => PrepareValueMustBeObjectSnafu.fail(), + _ => InputValueMustBeObjectSnafu.fail(), } } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 86f8e11074..e044fcff61 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Display; use std::io::BufRead; use std::str::FromStr; use std::sync::Arc; @@ -38,10 +39,10 @@ use serde::{Deserialize, Serialize}; use serde_json::{json, Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; +use strum::{EnumIter, IntoEnumIterator}; use crate::error::{ - status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, - Result, UnsupportedContentTypeSnafu, + status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, }; use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER; use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR}; @@ -300,7 +301,7 @@ fn transform_ndjson_array_factory( if !ignore_error { warn!("invalid item in array: {:?}", item_value); return InvalidParameterSnafu { - reason: format!("invalid item:{} in array", item_value), + reason: format!("invalid item: {} in array", item_value), } .fail(); } @@ -431,7 +432,8 @@ pub struct PipelineDryrunParams { pub pipeline_name: Option, pub pipeline_version: Option, pub pipeline: Option, - pub data: Vec, + pub data_type: Option, + pub data: String, } /// Check if the payload is valid json @@ -474,6 +476,24 @@ fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response (status_code_to_http_status(&e.status_code()), body).into_response() } +/// Parse the data with given content type +/// If the content type is invalid, return error +/// content type is one of application/json, text/plain, application/x-ndjson +fn parse_dryrun_data(data_type: String, data: String) -> Result> { + if let Ok(content_type) = ContentType::from_str(&data_type) { + extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false) + } else { + InvalidParameterSnafu { + reason: format!( + "invalid content type: {}, expected: one of {}", + data_type, + EventPayloadResolver::support_content_type_list().join(", ") + ), + } + .fail() + } +} + #[axum_macros::debug_handler] pub async fn pipeline_dryrun( State(log_state): State, @@ -489,7 +509,10 @@ pub async fn pipeline_dryrun( match check_pipeline_dryrun_params_valid(&payload) { Some(params) => { - let data = pipeline::json_array_to_map(params.data).context(PipelineSnafu)?; + let data = parse_dryrun_data( + params.data_type.unwrap_or("application/json".to_string()), + params.data, + )?; check_data_valid(data.len())?; @@ -616,62 +639,152 @@ pub async fn log_ingester( .await } +#[derive(Debug, EnumIter)] +enum EventPayloadResolverInner { + Json, + Ndjson, + Text, +} + +impl Display for EventPayloadResolverInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE), + EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE), + EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE), + } + } +} + +impl TryFrom<&ContentType> for EventPayloadResolverInner { + type Error = Error; + + fn try_from(content_type: &ContentType) -> Result { + match content_type { + x if *x == *JSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Json), + x if *x == *NDJSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Ndjson), + x if *x == *TEXT_CONTENT_TYPE || *x == *TEXT_UTF8_CONTENT_TYPE => { + Ok(EventPayloadResolverInner::Text) + } + _ => InvalidParameterSnafu { + reason: format!( + "invalid content type: {}, expected: one of {}", + content_type, + EventPayloadResolver::support_content_type_list().join(", ") + ), + } + .fail(), + } + } +} + +#[derive(Debug)] +struct EventPayloadResolver<'a> { + inner: EventPayloadResolverInner, + /// The content type of the payload. + /// keep it for logging original content type + #[allow(dead_code)] + content_type: &'a ContentType, +} + +impl EventPayloadResolver<'_> { + pub(super) fn support_content_type_list() -> Vec { + EventPayloadResolverInner::iter() + .map(|x| x.to_string()) + .collect() + } +} + +impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> { + type Error = Error; + + fn try_from(content_type: &'a ContentType) -> Result { + let inner = EventPayloadResolverInner::try_from(content_type)?; + Ok(EventPayloadResolver { + inner, + content_type, + }) + } +} + +impl EventPayloadResolver<'_> { + fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result> { + match self.inner { + EventPayloadResolverInner::Json => { + pipeline::json_array_to_map(transform_ndjson_array_factory( + Deserializer::from_slice(&payload).into_iter(), + ignore_errors, + )?) + .context(PipelineSnafu) + } + EventPayloadResolverInner::Ndjson => { + let mut result = Vec::with_capacity(1000); + for (index, line) in payload.lines().enumerate() { + let mut line = match line { + Ok(line) if !line.is_empty() => line, + Ok(_) => continue, // Skip empty lines + Err(_) if ignore_errors => continue, + Err(e) => { + warn!(e; "invalid string at index: {}", index); + return InvalidParameterSnafu { + reason: format!("invalid line at index: {}", index), + } + .fail(); + } + }; + + // simd_json, according to description, only de-escapes string at character level, + // like any other json parser. So it should be safe here. + if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) { + let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?; + result.push(v); + } else if !ignore_errors { + warn!("invalid JSON at index: {}, content: {:?}", index, line); + return InvalidParameterSnafu { + reason: format!("invalid JSON at index: {}", index), + } + .fail(); + } + } + Ok(result) + } + EventPayloadResolverInner::Text => { + let result = payload + .lines() + .filter_map(|line| line.ok().filter(|line| !line.is_empty())) + .map(|line| { + let mut map = PipelineMap::new(); + map.insert("message".to_string(), pipeline::Value::String(line)); + map + }) + .collect::>(); + Ok(result) + } + } + } +} + fn extract_pipeline_value_by_content_type( content_type: ContentType, payload: Bytes, ignore_errors: bool, ) -> Result> { - Ok(match content_type { - ct if ct == *JSON_CONTENT_TYPE => { - // `simd_json` have not support stream and ndjson, see https://github.com/simd-lite/simd-json/issues/349 - pipeline::json_array_to_map(transform_ndjson_array_factory( - Deserializer::from_slice(&payload).into_iter(), - ignore_errors, - )?) - .context(PipelineSnafu)? - } - ct if ct == *NDJSON_CONTENT_TYPE => { - let mut result = Vec::with_capacity(1000); - for (index, line) in payload.lines().enumerate() { - let mut line = match line { - Ok(line) if !line.is_empty() => line, - Ok(_) => continue, // Skip empty lines - Err(_) if ignore_errors => continue, - Err(e) => { - warn!(e; "invalid string at index: {}", index); - return InvalidParameterSnafu { - reason: format!("invalid line at index: {}", index), + EventPayloadResolver::try_from(&content_type).and_then(|resolver| { + resolver + .parse_payload(payload, ignore_errors) + .map_err(|e| match &e { + Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => { + if reason.contains("invalid item:") { + InvalidParameterSnafu { + reason: "json format error, please check the date is valid JSON.", } - .fail(); + .build() + } else { + e } - }; - - // simd_json, according to description, only de-escapes string at character level, - // like any other json parser. So it should be safe here. - if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) { - let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?; - result.push(v); - } else if !ignore_errors { - warn!("invalid JSON at index: {}, content: {:?}", index, line); - return InvalidParameterSnafu { - reason: format!("invalid JSON at index: {}", index), - } - .fail(); } - } - result - } - ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload - .lines() - .filter_map(|line| line.ok().filter(|line| !line.is_empty())) - .map(|line| { - let mut map = PipelineMap::new(); - map.insert("message".to_string(), pipeline::Value::String(line)); - map + _ => e, }) - .collect::>(), - - _ => UnsupportedContentTypeSnafu { content_type }.fail()?, }) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index e67f939fc5..0c3a39a40c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -2206,7 +2206,7 @@ transform: "data_type": "STRING", "key": "log", "semantic_type": "FIELD", - "value": "ClusterAdapter:enter sendTextDataToCluster\\n" + "value": "ClusterAdapter:enter sendTextDataToCluster" }, { "data_type": "STRING", @@ -2220,6 +2220,44 @@ transform: "semantic_type": "TIMESTAMP", "value": "2024-05-25 20:16:37.217+0000" } + ], + [ + { + "data_type": "INT32", + "key": "id1", + "semantic_type": "FIELD", + "value": 1111 + }, + { + "data_type": "INT32", + "key": "id2", + "semantic_type": "FIELD", + "value": 2222 + }, + { + "data_type": "STRING", + "key": "type", + "semantic_type": "FIELD", + "value": "D" + }, + { + "data_type": "STRING", + "key": "log", + "semantic_type": "FIELD", + "value": "ClusterAdapter:enter sendTextDataToCluster ggg" + }, + { + "data_type": "STRING", + "key": "logger", + "semantic_type": "FIELD", + "value": "INTERACT.MANAGER" + }, + { + "data_type": "TIMESTAMP_NANOSECOND", + "key": "time", + "semantic_type": "TIMESTAMP", + "value": "2024-05-25 20:16:38.217+0000" + } ] ]); { @@ -2232,7 +2270,15 @@ transform: "logger": "INTERACT.MANAGER", "type": "I", "time": "2024-05-25 20:16:37.217", - "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + "log": "ClusterAdapter:enter sendTextDataToCluster" + }, + { + "id1": "1111", + "id2": "2222", + "logger": "INTERACT.MANAGER", + "type": "D", + "time": "2024-05-25 20:16:38.217", + "log": "ClusterAdapter:enter sendTextDataToCluster ggg" } ] "#; @@ -2251,25 +2297,29 @@ transform: } { // test new api specify pipeline via pipeline_name - let body = r#" - { - "pipeline_name": "test", - "data": [ + let data = r#"[ { "id1": "2436", "id2": "2528", "logger": "INTERACT.MANAGER", "type": "I", "time": "2024-05-25 20:16:37.217", - "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + "log": "ClusterAdapter:enter sendTextDataToCluster" + }, + { + "id1": "1111", + "id2": "2222", + "logger": "INTERACT.MANAGER", + "type": "D", + "time": "2024-05-25 20:16:38.217", + "log": "ClusterAdapter:enter sendTextDataToCluster ggg" } - ] - } - "#; + ]"#; + let body = json!({"pipeline_name":"test","data":data}); let res = client .post("/v1/pipelines/_dryrun") .header("Content-Type", "application/json") - .body(body) + .body(body.to_string()) .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -2280,18 +2330,55 @@ transform: assert_eq!(rows, &dryrun_rows); } { + let pipeline_content_for_text = r#" +processors: + - dissect: + fields: + - message + patterns: + - "%{id1} %{id2} %{logger} %{type} \"%{time}\" \"%{log}\"" + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + +transform: + - fields: + - id1 + - id2 + type: int32 + - fields: + - type + - log + - logger + type: string + - field: time + type: time + index: timestamp +"#; + // test new api specify pipeline via pipeline raw data - let mut body = json!({ - "data": [ + let data = r#"[ { "id1": "2436", "id2": "2528", "logger": "INTERACT.MANAGER", "type": "I", "time": "2024-05-25 20:16:37.217", - "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + "log": "ClusterAdapter:enter sendTextDataToCluster" + }, + { + "id1": "1111", + "id2": "2222", + "logger": "INTERACT.MANAGER", + "type": "D", + "time": "2024-05-25 20:16:38.217", + "log": "ClusterAdapter:enter sendTextDataToCluster ggg" } - ] + ]"#; + let mut body = json!({ + "data": data }); body["pipeline"] = json!(pipeline_content); let res = client @@ -2306,6 +2393,73 @@ transform: let rows = &body[0]["rows"]; assert_eq!(schema, &dryrun_schema); assert_eq!(rows, &dryrun_rows); + let mut body_for_text = json!({ + "data": r#"2436 2528 INTERACT.MANAGER I "2024-05-25 20:16:37.217" "ClusterAdapter:enter sendTextDataToCluster" +1111 2222 INTERACT.MANAGER D "2024-05-25 20:16:38.217" "ClusterAdapter:enter sendTextDataToCluster ggg" +"#, + }); + body_for_text["pipeline"] = json!(pipeline_content_for_text); + body_for_text["data_type"] = json!("text/plain"); + let ndjson_content = r#"{"id1":"2436","id2":"2528","logger":"INTERACT.MANAGER","type":"I","time":"2024-05-25 20:16:37.217","log":"ClusterAdapter:enter sendTextDataToCluster"} +{"id1":"1111","id2":"2222","logger":"INTERACT.MANAGER","type":"D","time":"2024-05-25 20:16:38.217","log":"ClusterAdapter:enter sendTextDataToCluster ggg"} +"#; + let body_for_ndjson = json!({ + "pipeline":pipeline_content, + "data_type": "application/x-ndjson", + "data": ndjson_content, + }); + let res = client + .post("/v1/pipelines/_dryrun") + .header("Content-Type", "application/json") + .body(body_for_ndjson.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + let schema = &body[0]["schema"]; + let rows = &body[0]["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); + + body_for_text["data_type"] = json!("application/yaml"); + let res = client + .post("/v1/pipelines/_dryrun") + .header("Content-Type", "application/json") + .body(body_for_text.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + let body: Value = res.json().await; + assert_eq!(body["error"], json!("Invalid request parameter: invalid content type: application/yaml, expected: one of application/json, application/x-ndjson, text/plain")); + + body_for_text["data_type"] = json!("application/json"); + let res = client + .post("/v1/pipelines/_dryrun") + .header("Content-Type", "application/json") + .body(body_for_text.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::BAD_REQUEST); + let body: Value = res.json().await; + assert_eq!( + body["error"], + json!("Invalid request parameter: json format error, please check the date is valid JSON.") + ); + + body_for_text["data_type"] = json!("text/plain"); + let res = client + .post("/v1/pipelines/_dryrun") + .header("Content-Type", "application/json") + .body(body_for_text.to_string()) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + let body: Value = res.json().await; + + let schema = &body[0]["schema"]; + let rows = &body[0]["rows"]; + assert_eq!(schema, &dryrun_schema); + assert_eq!(rows, &dryrun_rows); } { // failback to old version api @@ -2319,6 +2473,14 @@ transform: "type": "I", "time": "2024-05-25 20:16:37.217", "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + }, + { + "id1": "1111", + "id2": "2222", + "logger": "INTERACT.MANAGER", + "type": "D", + "time": "2024-05-25 20:16:38.217", + "log": "ClusterAdapter:enter sendTextDataToCluster ggg" } ] });