chore: Add more data format support to the pipeline dryrun api. (#6115)

* chore: supporting more data type for pipeline dryrun API

* chore: add docs for parse_dryrun_data

* chore: fix by pr comment

* chore: add user-friendly error message

* chore: change EventPayloadResolver content_type field type from owner to ref

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
This commit is contained in:
localhost
2025-05-20 11:29:28 +08:00
committed by GitHub
parent 400229c384
commit c2e3c3d398
4 changed files with 348 additions and 73 deletions

View File

@@ -561,8 +561,8 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Prepare value must be an object"))]
PrepareValueMustBeObject {
#[snafu(display("Input value must be an object"))]
InputValueMustBeObject {
#[snafu(implicit)]
location: Location,
},
@@ -833,7 +833,7 @@ impl ErrorExt for Error {
| ValueYamlKeyMustBeString { .. }
| YamlLoad { .. }
| YamlParse { .. }
| PrepareValueMustBeObject { .. }
| InputValueMustBeObject { .. }
| ColumnOptions { .. }
| UnsupportedIndexType { .. }
| UnsupportedNumberType { .. }

View File

@@ -29,7 +29,7 @@ use yaml_rust::YamlLoader;
use crate::dispatcher::{Dispatcher, Rule};
use crate::error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result,
InputValueMustBeObjectSnafu, IntermediateKeyIndexSnafu, Result,
TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
};
use crate::etl::processor::ProcessorKind;
@@ -186,7 +186,7 @@ pub fn json_to_map(val: serde_json::Value) -> Result<PipelineMap> {
}
Ok(intermediate_state)
}
_ => PrepareValueMustBeObjectSnafu.fail(),
_ => InputValueMustBeObjectSnafu.fail(),
}
}
@@ -203,7 +203,7 @@ pub fn simd_json_to_map(val: simd_json::OwnedValue) -> Result<PipelineMap> {
}
Ok(intermediate_state)
}
_ => PrepareValueMustBeObjectSnafu.fail(),
_ => InputValueMustBeObjectSnafu.fail(),
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Display;
use std::io::BufRead;
use std::str::FromStr;
use std::sync::Arc;
@@ -38,10 +39,10 @@ use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
use snafu::{ensure, OptionExt, ResultExt};
use strum::{EnumIter, IntoEnumIterator};
use crate::error::{
status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu,
Result, UnsupportedContentTypeSnafu,
status_code_to_http_status, Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result,
};
use crate::http::header::constants::GREPTIME_PIPELINE_PARAMS_HEADER;
use crate::http::header::{CONTENT_TYPE_NDJSON_STR, CONTENT_TYPE_PROTOBUF_STR};
@@ -300,7 +301,7 @@ fn transform_ndjson_array_factory(
if !ignore_error {
warn!("invalid item in array: {:?}", item_value);
return InvalidParameterSnafu {
reason: format!("invalid item:{} in array", item_value),
reason: format!("invalid item: {} in array", item_value),
}
.fail();
}
@@ -431,7 +432,8 @@ pub struct PipelineDryrunParams {
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
pub pipeline: Option<String>,
pub data: Vec<Value>,
pub data_type: Option<String>,
pub data: String,
}
/// Check if the payload is valid json
@@ -474,6 +476,24 @@ fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response
(status_code_to_http_status(&e.status_code()), body).into_response()
}
/// Parse the data with given content type
/// If the content type is invalid, return error
/// content type is one of application/json, text/plain, application/x-ndjson
fn parse_dryrun_data(data_type: String, data: String) -> Result<Vec<PipelineMap>> {
if let Ok(content_type) = ContentType::from_str(&data_type) {
extract_pipeline_value_by_content_type(content_type, Bytes::from(data), false)
} else {
InvalidParameterSnafu {
reason: format!(
"invalid content type: {}, expected: one of {}",
data_type,
EventPayloadResolver::support_content_type_list().join(", ")
),
}
.fail()
}
}
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
@@ -489,7 +509,10 @@ pub async fn pipeline_dryrun(
match check_pipeline_dryrun_params_valid(&payload) {
Some(params) => {
let data = pipeline::json_array_to_map(params.data).context(PipelineSnafu)?;
let data = parse_dryrun_data(
params.data_type.unwrap_or("application/json".to_string()),
params.data,
)?;
check_data_valid(data.len())?;
@@ -616,62 +639,152 @@ pub async fn log_ingester(
.await
}
#[derive(Debug, EnumIter)]
enum EventPayloadResolverInner {
Json,
Ndjson,
Text,
}
impl Display for EventPayloadResolverInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EventPayloadResolverInner::Json => write!(f, "{}", *JSON_CONTENT_TYPE),
EventPayloadResolverInner::Ndjson => write!(f, "{}", *NDJSON_CONTENT_TYPE),
EventPayloadResolverInner::Text => write!(f, "{}", *TEXT_CONTENT_TYPE),
}
}
}
impl TryFrom<&ContentType> for EventPayloadResolverInner {
type Error = Error;
fn try_from(content_type: &ContentType) -> Result<Self> {
match content_type {
x if *x == *JSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Json),
x if *x == *NDJSON_CONTENT_TYPE => Ok(EventPayloadResolverInner::Ndjson),
x if *x == *TEXT_CONTENT_TYPE || *x == *TEXT_UTF8_CONTENT_TYPE => {
Ok(EventPayloadResolverInner::Text)
}
_ => InvalidParameterSnafu {
reason: format!(
"invalid content type: {}, expected: one of {}",
content_type,
EventPayloadResolver::support_content_type_list().join(", ")
),
}
.fail(),
}
}
}
#[derive(Debug)]
struct EventPayloadResolver<'a> {
inner: EventPayloadResolverInner,
/// The content type of the payload.
/// keep it for logging original content type
#[allow(dead_code)]
content_type: &'a ContentType,
}
impl EventPayloadResolver<'_> {
pub(super) fn support_content_type_list() -> Vec<String> {
EventPayloadResolverInner::iter()
.map(|x| x.to_string())
.collect()
}
}
impl<'a> TryFrom<&'a ContentType> for EventPayloadResolver<'a> {
type Error = Error;
fn try_from(content_type: &'a ContentType) -> Result<Self> {
let inner = EventPayloadResolverInner::try_from(content_type)?;
Ok(EventPayloadResolver {
inner,
content_type,
})
}
}
impl EventPayloadResolver<'_> {
fn parse_payload(&self, payload: Bytes, ignore_errors: bool) -> Result<Vec<PipelineMap>> {
match self.inner {
EventPayloadResolverInner::Json => {
pipeline::json_array_to_map(transform_ndjson_array_factory(
Deserializer::from_slice(&payload).into_iter(),
ignore_errors,
)?)
.context(PipelineSnafu)
}
EventPayloadResolverInner::Ndjson => {
let mut result = Vec::with_capacity(1000);
for (index, line) in payload.lines().enumerate() {
let mut line = match line {
Ok(line) if !line.is_empty() => line,
Ok(_) => continue, // Skip empty lines
Err(_) if ignore_errors => continue,
Err(e) => {
warn!(e; "invalid string at index: {}", index);
return InvalidParameterSnafu {
reason: format!("invalid line at index: {}", index),
}
.fail();
}
};
// simd_json, according to description, only de-escapes string at character level,
// like any other json parser. So it should be safe here.
if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
result.push(v);
} else if !ignore_errors {
warn!("invalid JSON at index: {}, content: {:?}", index, line);
return InvalidParameterSnafu {
reason: format!("invalid JSON at index: {}", index),
}
.fail();
}
}
Ok(result)
}
EventPayloadResolverInner::Text => {
let result = payload
.lines()
.filter_map(|line| line.ok().filter(|line| !line.is_empty()))
.map(|line| {
let mut map = PipelineMap::new();
map.insert("message".to_string(), pipeline::Value::String(line));
map
})
.collect::<Vec<_>>();
Ok(result)
}
}
}
}
fn extract_pipeline_value_by_content_type(
content_type: ContentType,
payload: Bytes,
ignore_errors: bool,
) -> Result<Vec<PipelineMap>> {
Ok(match content_type {
ct if ct == *JSON_CONTENT_TYPE => {
// `simd_json` have not support stream and ndjson, see https://github.com/simd-lite/simd-json/issues/349
pipeline::json_array_to_map(transform_ndjson_array_factory(
Deserializer::from_slice(&payload).into_iter(),
ignore_errors,
)?)
.context(PipelineSnafu)?
}
ct if ct == *NDJSON_CONTENT_TYPE => {
let mut result = Vec::with_capacity(1000);
for (index, line) in payload.lines().enumerate() {
let mut line = match line {
Ok(line) if !line.is_empty() => line,
Ok(_) => continue, // Skip empty lines
Err(_) if ignore_errors => continue,
Err(e) => {
warn!(e; "invalid string at index: {}", index);
return InvalidParameterSnafu {
reason: format!("invalid line at index: {}", index),
EventPayloadResolver::try_from(&content_type).and_then(|resolver| {
resolver
.parse_payload(payload, ignore_errors)
.map_err(|e| match &e {
Error::InvalidParameter { reason, .. } if content_type == *JSON_CONTENT_TYPE => {
if reason.contains("invalid item:") {
InvalidParameterSnafu {
reason: "json format error, please check the date is valid JSON.",
}
.fail();
.build()
} else {
e
}
};
// simd_json, according to description, only de-escapes string at character level,
// like any other json parser. So it should be safe here.
if let Ok(v) = simd_json::to_owned_value(unsafe { line.as_bytes_mut() }) {
let v = pipeline::simd_json_to_map(v).context(PipelineSnafu)?;
result.push(v);
} else if !ignore_errors {
warn!("invalid JSON at index: {}, content: {:?}", index, line);
return InvalidParameterSnafu {
reason: format!("invalid JSON at index: {}", index),
}
.fail();
}
}
result
}
ct if ct == *TEXT_CONTENT_TYPE || ct == *TEXT_UTF8_CONTENT_TYPE => payload
.lines()
.filter_map(|line| line.ok().filter(|line| !line.is_empty()))
.map(|line| {
let mut map = PipelineMap::new();
map.insert("message".to_string(), pipeline::Value::String(line));
map
_ => e,
})
.collect::<Vec<_>>(),
_ => UnsupportedContentTypeSnafu { content_type }.fail()?,
})
}

View File

@@ -2206,7 +2206,7 @@ transform:
"data_type": "STRING",
"key": "log",
"semantic_type": "FIELD",
"value": "ClusterAdapter:enter sendTextDataToCluster\\n"
"value": "ClusterAdapter:enter sendTextDataToCluster"
},
{
"data_type": "STRING",
@@ -2220,6 +2220,44 @@ transform:
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:37.217+0000"
}
],
[
{
"data_type": "INT32",
"key": "id1",
"semantic_type": "FIELD",
"value": 1111
},
{
"data_type": "INT32",
"key": "id2",
"semantic_type": "FIELD",
"value": 2222
},
{
"data_type": "STRING",
"key": "type",
"semantic_type": "FIELD",
"value": "D"
},
{
"data_type": "STRING",
"key": "log",
"semantic_type": "FIELD",
"value": "ClusterAdapter:enter sendTextDataToCluster ggg"
},
{
"data_type": "STRING",
"key": "logger",
"semantic_type": "FIELD",
"value": "INTERACT.MANAGER"
},
{
"data_type": "TIMESTAMP_NANOSECOND",
"key": "time",
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:38.217+0000"
}
]
]);
{
@@ -2232,7 +2270,15 @@ transform:
"logger": "INTERACT.MANAGER",
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
"log": "ClusterAdapter:enter sendTextDataToCluster"
},
{
"id1": "1111",
"id2": "2222",
"logger": "INTERACT.MANAGER",
"type": "D",
"time": "2024-05-25 20:16:38.217",
"log": "ClusterAdapter:enter sendTextDataToCluster ggg"
}
]
"#;
@@ -2251,25 +2297,29 @@ transform:
}
{
// test new api specify pipeline via pipeline_name
let body = r#"
{
"pipeline_name": "test",
"data": [
let data = r#"[
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
"log": "ClusterAdapter:enter sendTextDataToCluster"
},
{
"id1": "1111",
"id2": "2222",
"logger": "INTERACT.MANAGER",
"type": "D",
"time": "2024-05-25 20:16:38.217",
"log": "ClusterAdapter:enter sendTextDataToCluster ggg"
}
]
}
"#;
]"#;
let body = json!({"pipeline_name":"test","data":data});
let res = client
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body)
.body(body.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
@@ -2280,18 +2330,55 @@ transform:
assert_eq!(rows, &dryrun_rows);
}
{
let pipeline_content_for_text = r#"
processors:
- dissect:
fields:
- message
patterns:
- "%{id1} %{id2} %{logger} %{type} \"%{time}\" \"%{log}\""
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
transform:
- fields:
- id1
- id2
type: int32
- fields:
- type
- log
- logger
type: string
- field: time
type: time
index: timestamp
"#;
// test new api specify pipeline via pipeline raw data
let mut body = json!({
"data": [
let data = r#"[
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
"log": "ClusterAdapter:enter sendTextDataToCluster"
},
{
"id1": "1111",
"id2": "2222",
"logger": "INTERACT.MANAGER",
"type": "D",
"time": "2024-05-25 20:16:38.217",
"log": "ClusterAdapter:enter sendTextDataToCluster ggg"
}
]
]"#;
let mut body = json!({
"data": data
});
body["pipeline"] = json!(pipeline_content);
let res = client
@@ -2306,6 +2393,73 @@ transform:
let rows = &body[0]["rows"];
assert_eq!(schema, &dryrun_schema);
assert_eq!(rows, &dryrun_rows);
let mut body_for_text = json!({
"data": r#"2436 2528 INTERACT.MANAGER I "2024-05-25 20:16:37.217" "ClusterAdapter:enter sendTextDataToCluster"
1111 2222 INTERACT.MANAGER D "2024-05-25 20:16:38.217" "ClusterAdapter:enter sendTextDataToCluster ggg"
"#,
});
body_for_text["pipeline"] = json!(pipeline_content_for_text);
body_for_text["data_type"] = json!("text/plain");
let ndjson_content = r#"{"id1":"2436","id2":"2528","logger":"INTERACT.MANAGER","type":"I","time":"2024-05-25 20:16:37.217","log":"ClusterAdapter:enter sendTextDataToCluster"}
{"id1":"1111","id2":"2222","logger":"INTERACT.MANAGER","type":"D","time":"2024-05-25 20:16:38.217","log":"ClusterAdapter:enter sendTextDataToCluster ggg"}
"#;
let body_for_ndjson = json!({
"pipeline":pipeline_content,
"data_type": "application/x-ndjson",
"data": ndjson_content,
});
let res = client
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body_for_ndjson.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await;
let schema = &body[0]["schema"];
let rows = &body[0]["rows"];
assert_eq!(schema, &dryrun_schema);
assert_eq!(rows, &dryrun_rows);
body_for_text["data_type"] = json!("application/yaml");
let res = client
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body_for_text.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let body: Value = res.json().await;
assert_eq!(body["error"], json!("Invalid request parameter: invalid content type: application/yaml, expected: one of application/json, application/x-ndjson, text/plain"));
body_for_text["data_type"] = json!("application/json");
let res = client
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body_for_text.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
let body: Value = res.json().await;
assert_eq!(
body["error"],
json!("Invalid request parameter: json format error, please check the date is valid JSON.")
);
body_for_text["data_type"] = json!("text/plain");
let res = client
.post("/v1/pipelines/_dryrun")
.header("Content-Type", "application/json")
.body(body_for_text.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await;
let schema = &body[0]["schema"];
let rows = &body[0]["rows"];
assert_eq!(schema, &dryrun_schema);
assert_eq!(rows, &dryrun_rows);
}
{
// failback to old version api
@@ -2319,6 +2473,14 @@ transform:
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"id1": "1111",
"id2": "2222",
"logger": "INTERACT.MANAGER",
"type": "D",
"time": "2024-05-25 20:16:38.217",
"log": "ClusterAdapter:enter sendTextDataToCluster ggg"
}
]
});