diff --git a/Cargo.lock b/Cargo.lock index aa00fcf081..6e374f8b6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6610,17 +6610,18 @@ dependencies = [ [[package]] name = "meter-core" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=5618e779cf2bb4755b499c630fba4c35e91898cb#5618e779cf2bb4755b499c630fba4c35e91898cb" dependencies = [ "anymap2", "once_cell", "parking_lot 0.12.3", + "tracing", ] [[package]] name = "meter-macros" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=5618e779cf2bb4755b499c630fba4c35e91898cb#5618e779cf2bb4755b499c630fba4c35e91898cb" dependencies = [ "meter-core", ] diff --git a/Cargo.toml b/Cargo.toml index c094a2d651..de50b357a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,7 +139,7 @@ jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a3 lazy_static = "1.4" local-ip-address = "0.6" loki-api = { git = "https://github.com/shuiyisong/tracing-loki", branch = "chore/prost_version" } -meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" } +meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" } mockall = "0.11.4" moka = "0.12" nalgebra = "0.33" @@ -283,7 +283,7 @@ pprof = { git = "https://github.com/GreptimeTeam/pprof-rs", rev = "1bd1e21" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" -rev = "a10facb353b41460eeb98578868ebf19c2084fac" +rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" [profile.release] debug = 1 diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index d55cf25d54..08ce929fd6 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -312,6 +312,7 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str } /// SelectInfo is used to store the selected keys from OpenTelemetry record attrs +/// The key is used to uplift value from the attributes and serve as column name in the table #[derive(Default)] pub struct SelectInfo { pub keys: Vec, diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index dedb07e842..7bbca8ad77 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -17,7 +17,7 @@ pub mod coerce; use std::collections::HashSet; use std::sync::Arc; -use ahash::HashMap; +use ahash::{HashMap, HashMapExt}; use api::helper::proto_value_type; use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; @@ -245,6 +245,15 @@ pub struct SchemaInfo { pub index: HashMap, } +impl SchemaInfo { + pub fn with_capacity(capacity: usize) -> Self { + Self { + schema: Vec::with_capacity(capacity), + index: HashMap::with_capacity(capacity), + } + } +} + fn resolve_schema( index: Option, value_data: ValueData, diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index 05c2366a01..219db16986 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -103,11 +103,7 @@ pub async fn loki_ingest( // fill Null for missing values for row in rows.iter_mut() { - if row.len() < schemas.len() { - for _ in row.len()..schemas.len() { - row.push(GreptimeValue { value_data: None }); - } - } + row.resize(schemas.len(), GreptimeValue::default()); } let rows = Rows { diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index b5c4607c29..5cc8f777c4 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -38,6 +38,7 @@ use snafu::prelude::*; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, PipelineSnafu, Result}; use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName}; +use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED; use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::OpenTelemetryProtocolHandlerRef; @@ -112,7 +113,7 @@ pub async fn logs( let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); - let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED + let _timer = METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index f11cd4ff3c..71c104666b 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap as StdHashMap}; -use std::mem; use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; @@ -310,7 +309,10 @@ fn build_otlp_logs_identity_schema() -> Vec { .collect::>() } -fn build_otlp_build_in_row(log: LogRecord, parse_ctx: &mut ParseContext) -> Row { +fn build_otlp_build_in_row( + log: LogRecord, + parse_ctx: &mut ParseContext, +) -> (Row, JsonbValue<'static>) { let log_attr = key_value_to_jsonb(log.attributes); let ts = if log.time_unix_nano != 0 { log.time_unix_nano @@ -365,50 +367,52 @@ fn build_otlp_build_in_row(log: LogRecord, parse_ctx: &mut ParseContext) -> Row value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())), }, ]; - Row { values: row } + (Row { values: row }, log_attr) } fn extract_field_from_attr_and_combine_schema( - schema_info: &mut SchemaInfo, - log_select: &SelectInfo, - jsonb: &jsonb::Value, + select_info: &SelectInfo, + select_schema: &mut SchemaInfo, + attrs: &jsonb::Value, ) -> Result> { - if log_select.keys.is_empty() { - return Ok(Vec::new()); - } - let mut append_value = Vec::with_capacity(schema_info.schema.len()); - for _ in schema_info.schema.iter() { - append_value.push(GreptimeValue { value_data: None }); - } - for k in &log_select.keys { - let index = schema_info.index.get(k).copied(); - if let Some(value) = jsonb.get_by_name_ignore_case(k).cloned() { - if let Some((schema, value)) = decide_column_schema(k, value)? { - if let Some(index) = index { - let column_schema = &schema_info.schema[index]; - ensure!( - column_schema.datatype == schema.datatype, - IncompatibleSchemaSnafu { - column_name: k.clone(), - datatype: column_schema.datatype().as_str_name(), - expected: column_schema.datatype, - actual: schema.datatype, - } - ); - append_value[index] = value; - } else { - let key = k.clone(); - schema_info.schema.push(schema); - schema_info.index.insert(key, schema_info.schema.len() - 1); - append_value.push(value); + // note we use schema.len instead of select_keys.len + // because the len of the row value should always matches the len of the schema + let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()]; + + for key in select_info.keys.iter() { + let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else { + continue; + }; + let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else { + continue; + }; + + if let Some(index) = select_schema.index.get(key) { + let column_schema = &select_schema.schema[*index]; + // datatype of the same column name should be the same + ensure!( + column_schema.datatype == schema.datatype, + IncompatibleSchemaSnafu { + column_name: key, + datatype: column_schema.datatype().as_str_name(), + expected: column_schema.datatype, + actual: schema.datatype, } - } + ); + extracted_values[*index] = value; + } else { + select_schema.schema.push(schema); + select_schema + .index + .insert(key.clone(), select_schema.schema.len() - 1); + extracted_values.push(value); } } - Ok(append_value) + + Ok(extracted_values) } -fn decide_column_schema( +fn decide_column_schema_and_convert_value( column_name: &str, value: JsonbValue, ) -> Result> { @@ -475,128 +479,69 @@ fn decide_column_schema( }) } -#[derive(Debug, Clone, Copy)] -enum OpenTelemetryLogRecordAttrType { - Resource, - Scope, - Log, -} - -fn merge_schema( - input_schemas: Vec<(&SchemaInfo, OpenTelemetryLogRecordAttrType)>, -) -> BTreeMap<&String, (OpenTelemetryLogRecordAttrType, usize, &ColumnSchema)> { - let mut schemas = BTreeMap::new(); - input_schemas - .into_iter() - .for_each(|(schema_info, attr_type)| { - for (key, index) in schema_info.index.iter() { - if let Some(col_schema) = schema_info.schema.get(*index) { - schemas.insert(key, (attr_type, *index, col_schema)); - } - } - }); - schemas -} - fn parse_export_logs_service_request_to_rows( request: ExportLogsServiceRequest, select_info: Box, ) -> Result { let mut schemas = build_otlp_logs_identity_schema(); - let mut extra_resource_schema = SchemaInfo::default(); - let mut extra_scope_schema = SchemaInfo::default(); - let mut extra_log_schema = SchemaInfo::default(); - let mut parse_ctx = ParseContext::new( - &mut extra_resource_schema, - &mut extra_scope_schema, - &mut extra_log_schema, - ); - let parse_infos = parse_resource(&select_info, &mut parse_ctx, request.resource_logs)?; - // order of schema is important - // resource < scope < log - // do not change the order - let final_extra_schema_info = merge_schema(vec![ - ( - &extra_resource_schema, - OpenTelemetryLogRecordAttrType::Resource, - ), - (&extra_scope_schema, OpenTelemetryLogRecordAttrType::Scope), - (&extra_log_schema, OpenTelemetryLogRecordAttrType::Log), - ]); + let mut parse_ctx = ParseContext::new(select_info); + let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?; - let final_extra_schema = final_extra_schema_info - .iter() - .map(|(_, (_, _, v))| (*v).clone()) - .collect::>(); + schemas.extend(parse_ctx.select_schema.schema); - let extra_schema_len = final_extra_schema.len(); - schemas.extend(final_extra_schema); + rows.iter_mut().for_each(|row| { + row.values.resize(schemas.len(), GreptimeValue::default()); + }); - let mut results = Vec::with_capacity(parse_infos.len()); - for parse_info in parse_infos.into_iter() { - let mut row = parse_info.values; - let mut resource_values = parse_info.resource_extracted_values; - let mut scope_values = parse_info.scope_extracted_values; - let mut log_values = parse_info.log_extracted_values; - - let mut final_extra_values = vec![GreptimeValue { value_data: None }; extra_schema_len]; - for (idx, (_, (attr_type, index, _))) in final_extra_schema_info.iter().enumerate() { - let value = match attr_type { - OpenTelemetryLogRecordAttrType::Resource => resource_values.get_mut(*index), - OpenTelemetryLogRecordAttrType::Scope => scope_values.get_mut(*index), - OpenTelemetryLogRecordAttrType::Log => log_values.get_mut(*index), - }; - if let Some(value) = value { - // swap value to final_extra_values - mem::swap(&mut final_extra_values[idx], value); - } - } - - row.values.extend(final_extra_values); - results.push(row); - } Ok(Rows { schema: schemas, - rows: results, + rows, }) } fn parse_resource( - select_info: &SelectInfo, parse_ctx: &mut ParseContext, resource_logs_vec: Vec, -) -> Result> { - let mut results = Vec::new(); +) -> Result> { + let total_len = resource_logs_vec + .iter() + .flat_map(|r| r.scope_logs.iter()) + .map(|s| s.log_records.len()) + .sum(); + + let mut results = Vec::with_capacity(total_len); for r in resource_logs_vec { parse_ctx.resource_attr = r .resource .map(|resource| key_value_to_jsonb(resource.attributes)) .unwrap_or(JsonbValue::Null); + parse_ctx.resource_url = r.schema_url; - let resource_extracted_values = extract_field_from_attr_and_combine_schema( - parse_ctx.extra_resource_schema, - select_info, + parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema( + &parse_ctx.select_info, + &mut parse_ctx.select_schema, &parse_ctx.resource_attr, )?; - let rows = parse_scope( - select_info, - r.scope_logs, - parse_ctx, - resource_extracted_values, - )?; + + let rows = parse_scope(r.scope_logs, parse_ctx)?; results.extend(rows); } Ok(results) } struct ParseContext<'a> { - // selector schema - extra_resource_schema: &'a mut SchemaInfo, - extra_scope_schema: &'a mut SchemaInfo, - extra_log_schema: &'a mut SchemaInfo, + // input selected keys + select_info: Box, + // schema infos for selected keys from resource/scope/log for current request + // since the value override from bottom to top, the max capacity is the length of the keys + select_schema: SchemaInfo, + + // extracted and uplifted values using select keys + resource_uplift_values: Vec, + scope_uplift_values: Vec, // passdown values resource_url: String, @@ -608,15 +553,13 @@ struct ParseContext<'a> { } impl<'a> ParseContext<'a> { - pub fn new( - extra_resource_schema: &'a mut SchemaInfo, - extra_scope_schema: &'a mut SchemaInfo, - extra_log_schema: &'a mut SchemaInfo, - ) -> ParseContext<'a> { + pub fn new(select_info: Box) -> ParseContext<'a> { + let len = select_info.keys.len(); ParseContext { - extra_resource_schema, - extra_scope_schema, - extra_log_schema, + select_info, + select_schema: SchemaInfo::with_capacity(len), + resource_uplift_values: vec![], + scope_uplift_values: vec![], resource_url: String::new(), resource_attr: JsonbValue::Null, scope_name: None, @@ -627,74 +570,68 @@ impl<'a> ParseContext<'a> { } } -fn parse_scope( - select_info: &SelectInfo, - scopes_log_vec: Vec, - parse_ctx: &mut ParseContext, - resource_extracted_values: Vec, -) -> Result> { - let mut results = Vec::new(); +fn parse_scope(scopes_log_vec: Vec, parse_ctx: &mut ParseContext) -> Result> { + let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum(); + let mut results = Vec::with_capacity(len); + for scope_logs in scopes_log_vec { let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); parse_ctx.scope_name = scope_name; parse_ctx.scope_version = scope_version; - parse_ctx.scope_attrs = scope_attrs; parse_ctx.scope_url = scope_logs.schema_url; + parse_ctx.scope_attrs = scope_attrs; - let scope_extracted_values = extract_field_from_attr_and_combine_schema( - parse_ctx.extra_scope_schema, - select_info, + parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema( + &parse_ctx.select_info, + &mut parse_ctx.select_schema, &parse_ctx.scope_attrs, )?; - let rows = parse_log( - select_info, - scope_logs.log_records, - parse_ctx, - &resource_extracted_values, - &scope_extracted_values, - )?; + let rows = parse_log(scope_logs.log_records, parse_ctx)?; results.extend(rows); } Ok(results) } -fn parse_log( - select_info: &SelectInfo, - log_records: Vec, - parse_ctx: &mut ParseContext, - resource_extracted_values: &[GreptimeValue], - scope_extracted_values: &[GreptimeValue], -) -> Result> { +fn parse_log(log_records: Vec, parse_ctx: &mut ParseContext) -> Result> { let mut result = Vec::with_capacity(log_records.len()); for log in log_records { - let log_attr = key_value_to_jsonb(log.attributes.clone()); + let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx); - let row = build_otlp_build_in_row(log, parse_ctx); - - let log_extracted_values = extract_field_from_attr_and_combine_schema( - parse_ctx.extra_log_schema, - select_info, + let log_values = extract_field_from_attr_and_combine_schema( + &parse_ctx.select_info, + &mut parse_ctx.select_schema, &log_attr, )?; - let parse_info = ParseInfo { - values: row, - resource_extracted_values: resource_extracted_values.to_vec(), - scope_extracted_values: scope_extracted_values.to_vec(), - log_extracted_values, - }; - result.push(parse_info); + let extracted_values = merge_values( + log_values, + &parse_ctx.scope_uplift_values, + &parse_ctx.resource_uplift_values, + ); + + row.values.extend(extracted_values); + + result.push(row); } Ok(result) } -struct ParseInfo { - values: Row, - resource_extracted_values: Vec, - scope_extracted_values: Vec, - log_extracted_values: Vec, +fn merge_values( + log: Vec, + scope: &[GreptimeValue], + resource: &[GreptimeValue], +) -> Vec { + log.into_iter() + .enumerate() + .map(|(i, value)| GreptimeValue { + value_data: value + .value_data + .or_else(|| scope.get(i).and_then(|x| x.value_data.clone())) + .or_else(|| resource.get(i).and_then(|x| x.value_data.clone())), + }) + .collect() } /// transform otlp logs request to pipeline value diff --git a/src/servers/src/otlp/utils.rs b/src/servers/src/otlp/utils.rs index be3741666d..1ed37de452 100644 --- a/src/servers/src/otlp/utils.rs +++ b/src/servers/src/otlp/utils.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use api::v1::value::ValueData; use api::v1::ColumnDataType; use itertools::Itertools; @@ -47,18 +45,19 @@ pub fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> { } pub fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { - let mut map = BTreeMap::new(); - for kv in key_values { - let value = match kv.value { - Some(value) => match value.value { - Some(value) => any_value_to_jsonb(value), - None => JsonbValue::Null, - }, - None => JsonbValue::Null, - }; - map.insert(kv.key.clone(), value); - } - JsonbValue::Object(map) + JsonbValue::Object( + key_values + .into_iter() + .map(|kv| { + ( + kv.key, + kv.value + .and_then(|v| v.value) + .map_or(JsonbValue::Null, any_value_to_jsonb), + ) + }) + .collect(), + ) } #[inline] diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index a74b00ea0f..81b31fe767 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1905,7 +1905,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { let client = TestClient::new(app).await; let content = r#" -{"resourceLogs":[{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568497632000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line one"},"attributes":[{"key":"app","value":{"stringValue":"server"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"},{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568538897000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line two"},"attributes":[{"key":"app","value":{"stringValue":"server"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]} +{"resourceLogs":[{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[{"key":"instance_num","value":{"stringValue":"10"}}],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568497632000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line one"},"attributes":[{"key":"app","value":{"stringValue":"server1"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"},{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568538897000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line two"},"attributes":[{"key":"app","value":{"stringValue":"server2"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]} "#; let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); @@ -1915,24 +1915,24 @@ pub async fn test_otlp_logs(store_type: StorageType) { // write log data let res = send_req( &client, - vec![ - ( - HeaderName::from_static("content-type"), - HeaderValue::from_static("application/x-protobuf"), - ), - ( - HeaderName::from_static("x-greptime-log-table-name"), - HeaderValue::from_static("logs1"), - ), - ], + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + )], "/v1/otlp/v1/logs?db=public", body.clone(), false, ) .await; assert_eq!(StatusCode::OK, res.status()); - let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"],[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"]]"; - validate_data("otlp_logs", &client, "select * from logs1;", expected).await; + let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server1\"},0,\"\",\"\",{\"instance_num\":\"10\"},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"],[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server2\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"]]"; + validate_data( + "otlp_logs", + &client, + "select * from opentelemetry_logs;", + expected, + ) + .await; } { @@ -1946,7 +1946,7 @@ pub async fn test_otlp_logs(store_type: StorageType) { ), ( HeaderName::from_static("x-greptime-log-table-name"), - HeaderValue::from_static("logs"), + HeaderValue::from_static("cus_logs"), ), ( HeaderName::from_static("x-greptime-log-extract-keys"), @@ -1960,11 +1960,51 @@ pub async fn test_otlp_logs(store_type: StorageType) { .await; assert_eq!(StatusCode::OK, res.status()); - let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",\"server\"],[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",\"server\"]]"; + let expected = "[[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server2\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",null,\"server2\"],[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server1\"},0,\"\",\"\",{\"instance_num\":\"10\"},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",\"10\",\"server1\"]]"; validate_data( "otlp_logs_with_selector", &client, - "select * from logs;", + "select * from cus_logs;", + expected, + ) + .await; + } + + { + // test same selector with multiple value + let content = r#" + {"resourceLogs":[{"resource":{"attributes":[{"key":"fromwhere","value":{"stringValue":"resource"}}],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[{"key":"fromwhere","value":{"stringValue":"scope"}}],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568497632000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line one"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"fromwhere","value":{"stringValue":"log_attr"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]} + "#; + let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("x-greptime-log-table-name"), + HeaderValue::from_static("logs2"), + ), + ( + HeaderName::from_static("x-greptime-log-extract-keys"), + HeaderValue::from_static("fromwhere"), + ), + ], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server\",\"fromwhere\":\"log_attr\"},0,\"\",\"\",{\"fromwhere\":\"scope\"},\"\",{\"fromwhere\":\"resource\"},\"https://opentelemetry.io/schemas/1.4.0\",\"log_attr\"]]"; + validate_data( + "otlp_logs_with_selector_overlapping", + &client, + "select * from logs2;", expected, ) .await;