test: add tests to ensure nested data structure for identity pipeline (#5888)

This commit is contained in:
Ning Sun
2025-04-14 11:13:46 +08:00
committed by GitHub
parent 5b0c75c85f
commit be837ddc24

View File

@@ -95,9 +95,9 @@ macro_rules! http_tests {
test_pipeline_api,
test_test_pipeline_api,
test_plain_text_ingestion,
test_identify_pipeline,
test_identify_pipeline_with_flatten,
test_identify_pipeline_with_custom_ts,
test_identity_pipeline,
test_identity_pipeline_with_flatten,
test_identity_pipeline_with_custom_ts,
test_pipeline_dispatcher,
test_pipeline_suffix_template,
@@ -1413,15 +1413,15 @@ transform:
guard.remove_all().await;
}
pub async fn test_identify_pipeline(store_type: StorageType) {
pub async fn test_identity_pipeline(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_identify_pipeline").await;
setup_test_http_app_with_frontend(store_type, "test_identity_pipeline").await;
// handshake
let client = TestClient::new(app).await;
let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"}
{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#;
let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java", "json_object": {"a":1,"b":2}, "json_array":[1,2,3]}
{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua", "json_object": {"a":1,"b":2}, "json_array":[1,2,3]}"#;
let res = client
.post("/v1/ingest?db=public&table=logs&pipeline_name=greptime_identity")
.header("Content-Type", "application/json")
@@ -1440,8 +1440,8 @@ pub async fn test_identify_pipeline(store_type: StorageType) {
assert_eq!(res.status(), StatusCode::OK);
let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java",null,null]"#;
let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#;
let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java",null,null]"#;
let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#;
let res = client.get("/v1/sql?sql=select * from logs").send().await;
assert_eq!(res.status(), StatusCode::OK);
let resp: serde_json::Value = res.json().await;
@@ -1464,7 +1464,7 @@ pub async fn test_identify_pipeline(store_type: StorageType) {
serde_json::from_str::<Vec<Value>>(line2_expected).unwrap()
);
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#;
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["json_array","Json","","YES","","FIELD"],["json_object","Json","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#;
validate_data("identity_schema", &client, "desc logs", expected).await;
guard.remove_all().await;
@@ -1792,10 +1792,10 @@ table_suffix: _${type}
guard.remove_all().await;
}
pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) {
pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_identify_pipeline_with_flatten").await;
setup_test_http_app_with_frontend(store_type, "test_identity_pipeline_with_flatten").await;
let client = TestClient::new(app).await;
let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","custom_map":{"value_a":["a","b","c"],"value_b":"b"}}"#;
@@ -1822,7 +1822,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) {
let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","Json","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#;
validate_data(
"test_identify_pipeline_with_flatten_desc_logs",
"test_identity_pipeline_with_flatten_desc_logs",
&client,
"desc logs",
expected,
@@ -1831,7 +1831,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) {
let expected = "[[[\"a\",\"b\",\"c\"]]]";
validate_data(
"test_identify_pipeline_with_flatten_select_json",
"test_identity_pipeline_with_flatten_select_json",
&client,
"select `custom_map.value_a` from logs",
expected,
@@ -1841,10 +1841,10 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) {
pub async fn test_identity_pipeline_with_custom_ts(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_identify_pipeline_with_custom_ts")
setup_test_http_app_with_frontend(store_type, "test_identity_pipeline_with_custom_ts")
.await;
let client = TestClient::new(app).await;
@@ -1868,7 +1868,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) {
let expected = r#"[["__time__","TimestampSecond","PRI","NO","","TIMESTAMP"],["__name__","String","","YES","","FIELD"],["__source__","String","","YES","","FIELD"]]"#;
validate_data(
"test_identify_pipeline_with_custom_ts_desc_logs",
"test_identity_pipeline_with_custom_ts_desc_logs",
&client,
"desc logs",
expected,
@@ -1877,7 +1877,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) {
let expected = r#"[[1453809242,"hello","10.170.***.***"],[1453809252,null,"10.170.***.***"]]"#;
validate_data(
"test_identify_pipeline_with_custom_ts_data",
"test_identity_pipeline_with_custom_ts_data",
&client,
"select * from logs",
expected,
@@ -1908,7 +1908,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) {
let expected = r#"[["__time__","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__name__","String","","YES","","FIELD"]]"#;
validate_data(
"test_identify_pipeline_with_custom_ts_desc_logs",
"test_identity_pipeline_with_custom_ts_desc_logs",
&client,
"desc logs",
expected,
@@ -1917,7 +1917,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) {
let expected = r#"[[1547577721000000000,"10.170.***.***",null],[1547577724000000000,"10.170.***.***","hello"]]"#;
validate_data(
"test_identify_pipeline_with_custom_ts_data",
"test_identity_pipeline_with_custom_ts_data",
&client,
"select * from logs",
expected,