From be837ddc2428cb8ca7a055ffd37b2ad03cc00495 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Mon, 14 Apr 2025 11:13:46 +0800 Subject: [PATCH] test: add tests to ensure nested data structure for identity pipeline (#5888) --- tests-integration/tests/http.rs | 40 ++++++++++++++++----------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index b4690cf9e1..ffb74e1b16 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -95,9 +95,9 @@ macro_rules! http_tests { test_pipeline_api, test_test_pipeline_api, test_plain_text_ingestion, - test_identify_pipeline, - test_identify_pipeline_with_flatten, - test_identify_pipeline_with_custom_ts, + test_identity_pipeline, + test_identity_pipeline_with_flatten, + test_identity_pipeline_with_custom_ts, test_pipeline_dispatcher, test_pipeline_suffix_template, @@ -1413,15 +1413,15 @@ transform: guard.remove_all().await; } -pub async fn test_identify_pipeline(store_type: StorageType) { +pub async fn test_identity_pipeline(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = - setup_test_http_app_with_frontend(store_type, "test_identify_pipeline").await; + setup_test_http_app_with_frontend(store_type, "test_identity_pipeline").await; // handshake let client = TestClient::new(app).await; - let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"} -{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#; + let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java", "json_object": {"a":1,"b":2}, "json_array":[1,2,3]} +{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua", "json_object": {"a":1,"b":2}, "json_array":[1,2,3]}"#; let res = client .post("/v1/ingest?db=public&table=logs&pipeline_name=greptime_identity") .header("Content-Type", "application/json") @@ -1440,8 +1440,8 @@ pub async fn test_identify_pipeline(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); - let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null]"#; - let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#; + let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null]"#; + let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***",[1,2,3],{"a":1,"b":2},"200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#; let res = client.get("/v1/sql?sql=select * from logs").send().await; assert_eq!(res.status(), StatusCode::OK); let resp: serde_json::Value = res.json().await; @@ -1464,7 +1464,7 @@ pub async fn test_identify_pipeline(store_type: StorageType) { serde_json::from_str::>(line2_expected).unwrap() ); - let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#; + let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["json_array","Json","","YES","","FIELD"],["json_object","Json","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#; validate_data("identity_schema", &client, "desc logs", expected).await; guard.remove_all().await; @@ -1792,10 +1792,10 @@ table_suffix: _${type} guard.remove_all().await; } -pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { +pub async fn test_identity_pipeline_with_flatten(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = - setup_test_http_app_with_frontend(store_type, "test_identify_pipeline_with_flatten").await; + setup_test_http_app_with_frontend(store_type, "test_identity_pipeline_with_flatten").await; let client = TestClient::new(app).await; let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","custom_map":{"value_a":["a","b","c"],"value_b":"b"}}"#; @@ -1822,7 +1822,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","Json","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#; validate_data( - "test_identify_pipeline_with_flatten_desc_logs", + "test_identity_pipeline_with_flatten_desc_logs", &client, "desc logs", expected, @@ -1831,7 +1831,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { let expected = "[[[\"a\",\"b\",\"c\"]]]"; validate_data( - "test_identify_pipeline_with_flatten_select_json", + "test_identity_pipeline_with_flatten_select_json", &client, "select `custom_map.value_a` from logs", expected, @@ -1841,10 +1841,10 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { guard.remove_all().await; } -pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) { +pub async fn test_identity_pipeline_with_custom_ts(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = - setup_test_http_app_with_frontend(store_type, "test_identify_pipeline_with_custom_ts") + setup_test_http_app_with_frontend(store_type, "test_identity_pipeline_with_custom_ts") .await; let client = TestClient::new(app).await; @@ -1868,7 +1868,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) { let expected = r#"[["__time__","TimestampSecond","PRI","NO","","TIMESTAMP"],["__name__","String","","YES","","FIELD"],["__source__","String","","YES","","FIELD"]]"#; validate_data( - "test_identify_pipeline_with_custom_ts_desc_logs", + "test_identity_pipeline_with_custom_ts_desc_logs", &client, "desc logs", expected, @@ -1877,7 +1877,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) { let expected = r#"[[1453809242,"hello","10.170.***.***"],[1453809252,null,"10.170.***.***"]]"#; validate_data( - "test_identify_pipeline_with_custom_ts_data", + "test_identity_pipeline_with_custom_ts_data", &client, "select * from logs", expected, @@ -1908,7 +1908,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) { let expected = r#"[["__time__","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__name__","String","","YES","","FIELD"]]"#; validate_data( - "test_identify_pipeline_with_custom_ts_desc_logs", + "test_identity_pipeline_with_custom_ts_desc_logs", &client, "desc logs", expected, @@ -1917,7 +1917,7 @@ pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) { let expected = r#"[[1547577721000000000,"10.170.***.***",null],[1547577724000000000,"10.170.***.***","hello"]]"#; validate_data( - "test_identify_pipeline_with_custom_ts_data", + "test_identity_pipeline_with_custom_ts_data", &client, "select * from logs", expected,