diff --git a/Cargo.lock b/Cargo.lock index f447ea1148..d4db1185f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,12 +200,6 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" -[[package]] -name = "anymap" -version = "1.0.0-beta.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1f8f5a6f3d50d89e3797d7593a50f96bb2aaa20ca0cc7be1fb673232c91d72" - [[package]] name = "anymap2" version = "0.13.0" @@ -6424,16 +6418,6 @@ dependencies = [ "url", ] -[[package]] -name = "meter-core" -version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd#80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" -dependencies = [ - "anymap", - "once_cell", - "parking_lot 0.12.3", -] - [[package]] name = "meter-core" version = "0.1.0" @@ -6447,9 +6431,9 @@ dependencies = [ [[package]] name = "meter-macros" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd#80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac" dependencies = [ - "meter-core 0.1.0 (git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd)", + "meter-core", ] [[package]] @@ -7614,7 +7598,7 @@ dependencies = [ "futures-util", "lazy_static", "meta-client", - "meter-core 0.1.0 (git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac)", + "meter-core", "meter-macros", "moka", "object-store", @@ -9018,7 +9002,7 @@ dependencies = [ "humantime", "itertools 0.10.5", "lazy_static", - "meter-core 0.1.0 (git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac)", + "meter-core", "meter-macros", "num", "num-traits", @@ -10903,7 +10887,7 @@ dependencies = [ "common-telemetry", "common-time", "derive_builder 0.12.0", - "meter-core 0.1.0 (git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac)", + "meter-core", "snafu 0.8.5", "sql", ] diff --git a/Cargo.toml b/Cargo.toml index 60d8215634..daf79b17f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -261,7 +261,7 @@ tokio-rustls = { git = "https://github.com/GreptimeTeam/tokio-rustls" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" -rev = "80eb97c24c88af4dd9a86f8bbaf50e741d4eb8cd" +rev = "a10facb353b41460eeb98578868ebf19c2084fac" [profile.release] debug = 1 diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index a2245d8fb8..0c12658b37 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -113,6 +113,7 @@ impl OpenTelemetryProtocolHandler for Instance { .plugins .get::>(); interceptor_ref.pre_execute(ctx.clone())?; + let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; self.handle_log_inserts(requests, ctx) .await diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 51f2683e1b..6e5a583c0d 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -41,7 +41,7 @@ use snafu::prelude::*; use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; -use crate::error::{self, Result}; +use crate::error::{self, PipelineSnafu, Result}; use crate::http::header::constants::{ GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, GREPTIME_LOG_TABLE_NAME_HEADER_NAME, @@ -227,15 +227,9 @@ pub async fn logs( .start_timer(); let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; - let pipeline_way; - if let Some(pipeline_name) = &pipeline_info.pipeline_name { + let pipeline_way = if let Some(pipeline_name) = &pipeline_info.pipeline_name { let pipeline_version = - to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { - error::InvalidParameterSnafu { - reason: GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - } - .build() - })?; + to_pipeline_version(pipeline_info.pipeline_version).context(PipelineSnafu)?; let pipeline = match handler .get_pipeline(pipeline_name, pipeline_version, query_ctx.clone()) .await @@ -245,10 +239,10 @@ pub async fn logs( return Err(e); } }; - pipeline_way = PipelineWay::Custom(pipeline); + PipelineWay::Custom(pipeline) } else { - pipeline_way = PipelineWay::OtlpLog(Box::new(select_info)); - } + PipelineWay::OtlpLog(Box::new(select_info)) + }; handler .logs(request, pipeline_way, table_info.table_name, query_ctx) diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 5faaced461..8f31a1db06 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -185,47 +185,6 @@ fn log_to_pipeline_value( fn build_otlp_logs_identity_schema() -> Vec { [ - ( - "scope_name", - ColumnDataType::String, - SemanticType::Tag, - None, - None, - ), - ( - "scope_version", - ColumnDataType::String, - SemanticType::Field, - None, - None, - ), - ( - "scope_attributes", - ColumnDataType::Binary, - SemanticType::Field, - Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - None, - ), - ( - "resource_attributes", - ColumnDataType::Binary, - SemanticType::Field, - Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - None, - ), - ( - "log_attributes", - ColumnDataType::Binary, - SemanticType::Field, - Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - None, - ), ( "timestamp", ColumnDataType::TimestampNanosecond, @@ -233,30 +192,16 @@ fn build_otlp_logs_identity_schema() -> Vec { None, None, ), - ( - "observed_timestamp", - ColumnDataType::TimestampNanosecond, - SemanticType::Field, - None, - None, - ), ( "trace_id", ColumnDataType::String, - SemanticType::Tag, + SemanticType::Field, None, None, ), ( "span_id", ColumnDataType::String, - SemanticType::Tag, - None, - None, - ), - ( - "trace_flags", - ColumnDataType::Uint32, SemanticType::Field, None, None, @@ -287,6 +232,68 @@ fn build_otlp_logs_identity_schema() -> Vec { )]), }), ), + ( + "log_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "trace_flags", + ColumnDataType::Uint32, + SemanticType::Field, + None, + None, + ), + ( + "scope_name", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "scope_version", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "scope_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "scope_schema_url", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "resource_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "resource_schema_url", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), ] .into_iter() .map( @@ -301,39 +308,17 @@ fn build_otlp_logs_identity_schema() -> Vec { .collect::>() } -fn build_otlp_build_in_row( - log: LogRecord, - resource_attr: JsonbValue<'static>, - scope_name: Option, - scope_version: Option, - scope_attrs: JsonbValue<'static>, -) -> Row { +fn build_otlp_build_in_row(log: LogRecord, parse_ctx: &mut ParseContext) -> Row { let log_attr = key_value_to_jsonb(log.attributes); + let ts = if log.time_unix_nano != 0 { + log.time_unix_nano + } else { + log.observed_time_unix_nano + }; + let row = vec![ GreptimeValue { - value_data: scope_name.map(ValueData::StringValue), - }, - GreptimeValue { - value_data: scope_version.map(ValueData::StringValue), - }, - GreptimeValue { - value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())), - }, - GreptimeValue { - value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), - }, - GreptimeValue { - value_data: Some(ValueData::BinaryValue(log_attr.to_vec())), - }, - GreptimeValue { - value_data: Some(ValueData::TimestampNanosecondValue( - log.time_unix_nano as i64, - )), - }, - GreptimeValue { - value_data: Some(ValueData::TimestampNanosecondValue( - log.observed_time_unix_nano as i64, - )), + value_data: Some(ValueData::TimestampNanosecondValue(ts as i64)), }, GreptimeValue { value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))), @@ -341,9 +326,6 @@ fn build_otlp_build_in_row( GreptimeValue { value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))), }, - GreptimeValue { - value_data: Some(ValueData::U32Value(log.flags)), - }, GreptimeValue { value_data: Some(ValueData::StringValue(log.severity_text)), }, @@ -356,6 +338,30 @@ fn build_otlp_build_in_row( .as_ref() .map(|x| ValueData::StringValue(log_body_to_string(x))), }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(log_attr.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::U32Value(log.flags)), + }, + GreptimeValue { + value_data: parse_ctx.scope_name.clone().map(ValueData::StringValue), + }, + GreptimeValue { + value_data: parse_ctx.scope_version.clone().map(ValueData::StringValue), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(parse_ctx.scope_attrs.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(parse_ctx.scope_url.clone())), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(parse_ctx.resource_attr.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())), + }, ]; Row { values: row } } @@ -363,7 +369,7 @@ fn build_otlp_build_in_row( fn extract_field_from_attr_and_combine_schema( schema_info: &mut SchemaInfo, log_select: &SelectInfo, - jsonb: &jsonb::Value<'static>, + jsonb: &jsonb::Value, ) -> Result> { if log_select.keys.is_empty() { return Ok(Vec::new()); @@ -498,13 +504,12 @@ fn parse_export_logs_service_request_to_rows( let mut extra_resource_schema = SchemaInfo::default(); let mut extra_scope_schema = SchemaInfo::default(); let mut extra_log_schema = SchemaInfo::default(); - let parse_infos = parse_resource( - &select_info, + let mut parse_ctx = ParseContext::new( &mut extra_resource_schema, &mut extra_scope_schema, &mut extra_log_schema, - request.resource_logs, - )?; + ); + let parse_infos = parse_resource(&select_info, &mut parse_ctx, request.resource_logs)?; // order of schema is important // resource < scope < log @@ -557,28 +562,27 @@ fn parse_export_logs_service_request_to_rows( fn parse_resource( select_info: &SelectInfo, - extra_resource_schema: &mut SchemaInfo, - extra_scope_schema: &mut SchemaInfo, - extra_log_schema: &mut SchemaInfo, + parse_ctx: &mut ParseContext, resource_logs_vec: Vec, ) -> Result> { let mut results = Vec::new(); + for r in resource_logs_vec { - let resource_attr = r + parse_ctx.resource_attr = r .resource .map(|resource| key_value_to_jsonb(resource.attributes)) .unwrap_or(JsonbValue::Null); + parse_ctx.resource_url = r.schema_url; + let resource_extracted_values = extract_field_from_attr_and_combine_schema( - extra_resource_schema, + parse_ctx.extra_resource_schema, select_info, - &resource_attr, + &parse_ctx.resource_attr, )?; let rows = parse_scope( - extra_scope_schema, - extra_log_schema, select_info, r.scope_logs, - resource_attr, + parse_ctx, resource_extracted_values, )?; results.extend(rows); @@ -586,38 +590,65 @@ fn parse_resource( Ok(results) } -struct ScopeInfo { +struct ParseContext<'a> { + // selector schema + extra_resource_schema: &'a mut SchemaInfo, + extra_scope_schema: &'a mut SchemaInfo, + extra_log_schema: &'a mut SchemaInfo, + + // passdown values + resource_url: String, + resource_attr: JsonbValue<'a>, scope_name: Option, scope_version: Option, - scope_attrs: JsonbValue<'static>, + scope_url: String, + scope_attrs: JsonbValue<'a>, +} + +impl<'a> ParseContext<'a> { + pub fn new( + extra_resource_schema: &'a mut SchemaInfo, + extra_scope_schema: &'a mut SchemaInfo, + extra_log_schema: &'a mut SchemaInfo, + ) -> ParseContext<'a> { + ParseContext { + extra_resource_schema, + extra_scope_schema, + extra_log_schema, + resource_url: String::new(), + resource_attr: JsonbValue::Null, + scope_name: None, + scope_version: None, + scope_url: String::new(), + scope_attrs: JsonbValue::Null, + } + } } fn parse_scope( - extra_scope_schema: &mut SchemaInfo, - extra_log_schema: &mut SchemaInfo, select_info: &SelectInfo, scopes_log_vec: Vec, - resource_attr: JsonbValue<'static>, + parse_ctx: &mut ParseContext, resource_extracted_values: Vec, ) -> Result> { let mut results = Vec::new(); for scope_logs in scopes_log_vec { let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); + parse_ctx.scope_name = scope_name; + parse_ctx.scope_version = scope_version; + parse_ctx.scope_attrs = scope_attrs; + parse_ctx.scope_url = scope_logs.schema_url; + let scope_extracted_values = extract_field_from_attr_and_combine_schema( - extra_scope_schema, + parse_ctx.extra_scope_schema, select_info, - &scope_attrs, + &parse_ctx.scope_attrs, )?; + let rows = parse_log( - extra_log_schema, select_info, scope_logs.log_records, - &resource_attr, - ScopeInfo { - scope_name, - scope_version, - scope_attrs, - }, + parse_ctx, &resource_extracted_values, &scope_extracted_values, )?; @@ -627,15 +658,9 @@ fn parse_scope( } fn parse_log( - extra_log_schema: &mut SchemaInfo, select_info: &SelectInfo, log_records: Vec, - resource_attr: &JsonbValue<'static>, - ScopeInfo { - scope_name, - scope_version, - scope_attrs, - }: ScopeInfo, + parse_ctx: &mut ParseContext, resource_extracted_values: &[GreptimeValue], scope_extracted_values: &[GreptimeValue], ) -> Result> { @@ -644,16 +669,13 @@ fn parse_log( for log in log_records { let log_attr = key_value_to_jsonb(log.attributes.clone()); - let row = build_otlp_build_in_row( - log, - resource_attr.clone(), - scope_name.clone(), - scope_version.clone(), - scope_attrs.clone(), - ); + let row = build_otlp_build_in_row(log, parse_ctx); - let log_extracted_values = - extract_field_from_attr_and_combine_schema(extra_log_schema, select_info, &log_attr)?; + let log_extracted_values = extract_field_from_attr_and_combine_schema( + parse_ctx.extra_log_schema, + select_info, + &log_attr, + )?; let parse_info = ParseInfo { values: row, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 131ba8363f..c426af4f5f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1245,7 +1245,7 @@ pub async fn test_identify_pipeline(store_type: StorageType) { ); let expected = r#"[["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"],["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"]]"#; - validate_data(&client, "desc logs", expected).await; + validate_data("identity_schema", &client, "desc logs", expected).await; guard.remove_all().await; } @@ -1527,7 +1527,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { // select metrics data let expected = r#"[[1726053452870391000,9471.0]]"#; - validate_data(&client, "select * from gen;", expected).await; + validate_data("otlp_metrics", &client, "select * from gen;", expected).await; // drop table let res = client.get("/v1/sql?sql=drop table gen;").send().await; @@ -1538,7 +1538,13 @@ pub async fn test_otlp_metrics(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // select metrics data again - validate_data(&client, "select * from gen;", expected).await; + validate_data( + "otlp_metrics_with_gzip", + &client, + "select * from gen;", + expected, + ) + .await; guard.remove_all().await; } @@ -1564,7 +1570,13 @@ pub async fn test_otlp_traces(store_type: StorageType) { // select traces data let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-server\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-client\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#; - validate_data(&client, "select * from traces_preview_v01;", expected).await; + validate_data( + "otlp_traces", + &client, + "select * from traces_preview_v01;", + expected, + ) + .await; // drop table let res = client @@ -1573,12 +1585,18 @@ pub async fn test_otlp_traces(store_type: StorageType) { .await; assert_eq!(res.status(), StatusCode::OK); - // write metrics data with gzip + // write traces data with gzip let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); - // select metrics data again - validate_data(&client, "select * from traces_preview_v01;", expected).await; + // select traces data again + validate_data( + "otlp_traces_with_gzip", + &client, + "select * from traces_preview_v01;", + expected, + ) + .await; guard.remove_all().await; } @@ -1594,8 +1612,27 @@ pub async fn test_otlp_logs(store_type: StorageType) { let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); let body = req.encode_to_vec(); + { // write log data + let res = send_req( + &client, + vec![( + HeaderName::from_static("x-greptime-log-table-name"), + HeaderValue::from_static("logs1"), + )], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#"[[1581452773000000789,"30","30","Info",9,"something happened",{"customer":"acme","env":"dev"},1,"","",{},"https://opentelemetry.io/schemas/1.0.0/scopeLogs",{"resource-attr":"resource-attr-val-1"},"https://opentelemetry.io/schemas/1.0.0/resourceLogs"],[1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030","Info",9,"This is a log message",{"app":"server","instance_num":1},1,"","",{},"https://opentelemetry.io/schemas/1.0.0/scopeLogs",{"resource-attr":"resource-attr-val-1"},"https://opentelemetry.io/schemas/1.0.0/resourceLogs"]]"#; + validate_data("otlp_logs", &client, "select * from logs1;", expected).await; + } + + { + // write log data with selector let res = send_req( &client, vec![ @@ -1615,32 +1652,20 @@ pub async fn test_otlp_logs(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - let expected = r#"[["","",{},{"resource-attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened",null,null,"resource-attr-val-1"],["","",{},{"resource-attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message","server",1,"resource-attr-val-1"]]"#; - validate_data(&client, "select * from logs;", expected).await; - } - - { - // write log data - let res = send_req( + let expected = r#"[[1581452773000000789,"30","30","Info",9,"something happened",{"customer":"acme","env":"dev"},1,"","",{},"https://opentelemetry.io/schemas/1.0.0/scopeLogs",{"resource-attr":"resource-attr-val-1"},"https://opentelemetry.io/schemas/1.0.0/resourceLogs",null,null,"resource-attr-val-1"],[1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030","Info",9,"This is a log message",{"app":"server","instance_num":1},1,"","",{},"https://opentelemetry.io/schemas/1.0.0/scopeLogs",{"resource-attr":"resource-attr-val-1"},"https://opentelemetry.io/schemas/1.0.0/resourceLogs","server",1,"resource-attr-val-1"]]"#; + validate_data( + "otlp_logs_with_selector", &client, - vec![( - HeaderName::from_static("x-greptime-log-table-name"), - HeaderValue::from_static("logs1"), - )], - "/v1/otlp/v1/logs?db=public", - body.clone(), - false, + "select * from logs;", + expected, ) .await; - assert_eq!(StatusCode::OK, res.status()); - let expected = r#"[["","",{},{"resource-attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource-attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; - validate_data(&client, "select * from logs1;", expected).await; } guard.remove_all().await; } -async fn validate_data(client: &TestClient, sql: &str, expected: &str) { +async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str()) .send() @@ -1649,7 +1674,7 @@ async fn validate_data(client: &TestClient, sql: &str, expected: &str) { let resp = res.text().await; let v = get_rows_from_output(&resp); - assert_eq!(v, expected); + assert_eq!(v, expected, "validate {test_name} fail"); } async fn send_req(