refactor: otlp logs insertion (#5479)

* chore: add test for selector overlapping

* refactor: simplify otlp logs insertion

* fix: use layered extracted value array

* fix: wrong len

* chore: minor renaming and update

* chore: rename

* fix: clippy

* fix: typos

* chore: update test

* chore: address CR comment & update meter-deps version
This commit is contained in:
shuiyisong
2025-02-07 15:21:20 +08:00
committed by GitHub
parent 79acc9911e
commit 88c3d331a1
9 changed files with 202 additions and 218 deletions

5
Cargo.lock generated
View File

@@ -6610,17 +6610,18 @@ dependencies = [
[[package]]
name = "meter-core"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=5618e779cf2bb4755b499c630fba4c35e91898cb#5618e779cf2bb4755b499c630fba4c35e91898cb"
dependencies = [
"anymap2",
"once_cell",
"parking_lot 0.12.3",
"tracing",
]
[[package]]
name = "meter-macros"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=a10facb353b41460eeb98578868ebf19c2084fac#a10facb353b41460eeb98578868ebf19c2084fac"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=5618e779cf2bb4755b499c630fba4c35e91898cb#5618e779cf2bb4755b499c630fba4c35e91898cb"
dependencies = [
"meter-core",
]

View File

@@ -139,7 +139,7 @@ jsonb = { git = "https://github.com/databendlabs/jsonb.git", rev = "8c8d2fc294a3
lazy_static = "1.4"
local-ip-address = "0.6"
loki-api = { git = "https://github.com/shuiyisong/tracing-loki", branch = "chore/prost_version" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "a10facb353b41460eeb98578868ebf19c2084fac" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "5618e779cf2bb4755b499c630fba4c35e91898cb" }
mockall = "0.11.4"
moka = "0.12"
nalgebra = "0.33"
@@ -283,7 +283,7 @@ pprof = { git = "https://github.com/GreptimeTeam/pprof-rs", rev = "1bd1e21" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "a10facb353b41460eeb98578868ebf19c2084fac"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"
[profile.release]
debug = 1

View File

@@ -312,6 +312,7 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
}
/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
/// The key is used to uplift value from the attributes and serve as column name in the table
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,

View File

@@ -17,7 +17,7 @@ pub mod coerce;
use std::collections::HashSet;
use std::sync::Arc;
use ahash::HashMap;
use ahash::{HashMap, HashMapExt};
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
@@ -245,6 +245,15 @@ pub struct SchemaInfo {
pub index: HashMap<String, usize>,
}
impl SchemaInfo {
pub fn with_capacity(capacity: usize) -> Self {
Self {
schema: Vec::with_capacity(capacity),
index: HashMap::with_capacity(capacity),
}
}
}
fn resolve_schema(
index: Option<usize>,
value_data: ValueData,

View File

@@ -103,11 +103,7 @@ pub async fn loki_ingest(
// fill Null for missing values
for row in rows.iter_mut() {
if row.len() < schemas.len() {
for _ in row.len()..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
}
row.resize(schemas.len(), GreptimeValue::default());
}
let rows = Rows {

View File

@@ -38,6 +38,7 @@ use snafu::prelude::*;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, PipelineSnafu, Result};
use crate::http::extractor::{LogTableName, PipelineInfo, SelectInfoWrapper, TraceTableName};
use crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
@@ -112,7 +113,7 @@ pub async fn logs(
let db = query_ctx.get_db_string();
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
let _timer = METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();
let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::{BTreeMap, HashMap as StdHashMap};
use std::mem;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
@@ -310,7 +309,10 @@ fn build_otlp_logs_identity_schema() -> Vec<ColumnSchema> {
.collect::<Vec<ColumnSchema>>()
}
fn build_otlp_build_in_row(log: LogRecord, parse_ctx: &mut ParseContext) -> Row {
fn build_otlp_build_in_row(
log: LogRecord,
parse_ctx: &mut ParseContext,
) -> (Row, JsonbValue<'static>) {
let log_attr = key_value_to_jsonb(log.attributes);
let ts = if log.time_unix_nano != 0 {
log.time_unix_nano
@@ -365,50 +367,52 @@ fn build_otlp_build_in_row(log: LogRecord, parse_ctx: &mut ParseContext) -> Row
value_data: Some(ValueData::StringValue(parse_ctx.resource_url.clone())),
},
];
Row { values: row }
(Row { values: row }, log_attr)
}
fn extract_field_from_attr_and_combine_schema(
schema_info: &mut SchemaInfo,
log_select: &SelectInfo,
jsonb: &jsonb::Value,
select_info: &SelectInfo,
select_schema: &mut SchemaInfo,
attrs: &jsonb::Value,
) -> Result<Vec<GreptimeValue>> {
if log_select.keys.is_empty() {
return Ok(Vec::new());
}
let mut append_value = Vec::with_capacity(schema_info.schema.len());
for _ in schema_info.schema.iter() {
append_value.push(GreptimeValue { value_data: None });
}
for k in &log_select.keys {
let index = schema_info.index.get(k).copied();
if let Some(value) = jsonb.get_by_name_ignore_case(k).cloned() {
if let Some((schema, value)) = decide_column_schema(k, value)? {
if let Some(index) = index {
let column_schema = &schema_info.schema[index];
ensure!(
column_schema.datatype == schema.datatype,
IncompatibleSchemaSnafu {
column_name: k.clone(),
datatype: column_schema.datatype().as_str_name(),
expected: column_schema.datatype,
actual: schema.datatype,
}
);
append_value[index] = value;
} else {
let key = k.clone();
schema_info.schema.push(schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
append_value.push(value);
// note we use schema.len instead of select_keys.len
// because the len of the row value should always matches the len of the schema
let mut extracted_values = vec![GreptimeValue::default(); select_schema.schema.len()];
for key in select_info.keys.iter() {
let Some(value) = attrs.get_by_name_ignore_case(key).cloned() else {
continue;
};
let Some((schema, value)) = decide_column_schema_and_convert_value(key, value)? else {
continue;
};
if let Some(index) = select_schema.index.get(key) {
let column_schema = &select_schema.schema[*index];
// datatype of the same column name should be the same
ensure!(
column_schema.datatype == schema.datatype,
IncompatibleSchemaSnafu {
column_name: key,
datatype: column_schema.datatype().as_str_name(),
expected: column_schema.datatype,
actual: schema.datatype,
}
}
);
extracted_values[*index] = value;
} else {
select_schema.schema.push(schema);
select_schema
.index
.insert(key.clone(), select_schema.schema.len() - 1);
extracted_values.push(value);
}
}
Ok(append_value)
Ok(extracted_values)
}
fn decide_column_schema(
fn decide_column_schema_and_convert_value(
column_name: &str,
value: JsonbValue,
) -> Result<Option<(ColumnSchema, GreptimeValue)>> {
@@ -475,128 +479,69 @@ fn decide_column_schema(
})
}
#[derive(Debug, Clone, Copy)]
enum OpenTelemetryLogRecordAttrType {
Resource,
Scope,
Log,
}
fn merge_schema(
input_schemas: Vec<(&SchemaInfo, OpenTelemetryLogRecordAttrType)>,
) -> BTreeMap<&String, (OpenTelemetryLogRecordAttrType, usize, &ColumnSchema)> {
let mut schemas = BTreeMap::new();
input_schemas
.into_iter()
.for_each(|(schema_info, attr_type)| {
for (key, index) in schema_info.index.iter() {
if let Some(col_schema) = schema_info.schema.get(*index) {
schemas.insert(key, (attr_type, *index, col_schema));
}
}
});
schemas
}
fn parse_export_logs_service_request_to_rows(
request: ExportLogsServiceRequest,
select_info: Box<SelectInfo>,
) -> Result<Rows> {
let mut schemas = build_otlp_logs_identity_schema();
let mut extra_resource_schema = SchemaInfo::default();
let mut extra_scope_schema = SchemaInfo::default();
let mut extra_log_schema = SchemaInfo::default();
let mut parse_ctx = ParseContext::new(
&mut extra_resource_schema,
&mut extra_scope_schema,
&mut extra_log_schema,
);
let parse_infos = parse_resource(&select_info, &mut parse_ctx, request.resource_logs)?;
// order of schema is important
// resource < scope < log
// do not change the order
let final_extra_schema_info = merge_schema(vec![
(
&extra_resource_schema,
OpenTelemetryLogRecordAttrType::Resource,
),
(&extra_scope_schema, OpenTelemetryLogRecordAttrType::Scope),
(&extra_log_schema, OpenTelemetryLogRecordAttrType::Log),
]);
let mut parse_ctx = ParseContext::new(select_info);
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
let final_extra_schema = final_extra_schema_info
.iter()
.map(|(_, (_, _, v))| (*v).clone())
.collect::<Vec<_>>();
schemas.extend(parse_ctx.select_schema.schema);
let extra_schema_len = final_extra_schema.len();
schemas.extend(final_extra_schema);
rows.iter_mut().for_each(|row| {
row.values.resize(schemas.len(), GreptimeValue::default());
});
let mut results = Vec::with_capacity(parse_infos.len());
for parse_info in parse_infos.into_iter() {
let mut row = parse_info.values;
let mut resource_values = parse_info.resource_extracted_values;
let mut scope_values = parse_info.scope_extracted_values;
let mut log_values = parse_info.log_extracted_values;
let mut final_extra_values = vec![GreptimeValue { value_data: None }; extra_schema_len];
for (idx, (_, (attr_type, index, _))) in final_extra_schema_info.iter().enumerate() {
let value = match attr_type {
OpenTelemetryLogRecordAttrType::Resource => resource_values.get_mut(*index),
OpenTelemetryLogRecordAttrType::Scope => scope_values.get_mut(*index),
OpenTelemetryLogRecordAttrType::Log => log_values.get_mut(*index),
};
if let Some(value) = value {
// swap value to final_extra_values
mem::swap(&mut final_extra_values[idx], value);
}
}
row.values.extend(final_extra_values);
results.push(row);
}
Ok(Rows {
schema: schemas,
rows: results,
rows,
})
}
fn parse_resource(
select_info: &SelectInfo,
parse_ctx: &mut ParseContext,
resource_logs_vec: Vec<ResourceLogs>,
) -> Result<Vec<ParseInfo>> {
let mut results = Vec::new();
) -> Result<Vec<Row>> {
let total_len = resource_logs_vec
.iter()
.flat_map(|r| r.scope_logs.iter())
.map(|s| s.log_records.len())
.sum();
let mut results = Vec::with_capacity(total_len);
for r in resource_logs_vec {
parse_ctx.resource_attr = r
.resource
.map(|resource| key_value_to_jsonb(resource.attributes))
.unwrap_or(JsonbValue::Null);
parse_ctx.resource_url = r.schema_url;
let resource_extracted_values = extract_field_from_attr_and_combine_schema(
parse_ctx.extra_resource_schema,
select_info,
parse_ctx.resource_uplift_values = extract_field_from_attr_and_combine_schema(
&parse_ctx.select_info,
&mut parse_ctx.select_schema,
&parse_ctx.resource_attr,
)?;
let rows = parse_scope(
select_info,
r.scope_logs,
parse_ctx,
resource_extracted_values,
)?;
let rows = parse_scope(r.scope_logs, parse_ctx)?;
results.extend(rows);
}
Ok(results)
}
struct ParseContext<'a> {
// selector schema
extra_resource_schema: &'a mut SchemaInfo,
extra_scope_schema: &'a mut SchemaInfo,
extra_log_schema: &'a mut SchemaInfo,
// input selected keys
select_info: Box<SelectInfo>,
// schema infos for selected keys from resource/scope/log for current request
// since the value override from bottom to top, the max capacity is the length of the keys
select_schema: SchemaInfo,
// extracted and uplifted values using select keys
resource_uplift_values: Vec<GreptimeValue>,
scope_uplift_values: Vec<GreptimeValue>,
// passdown values
resource_url: String,
@@ -608,15 +553,13 @@ struct ParseContext<'a> {
}
impl<'a> ParseContext<'a> {
pub fn new(
extra_resource_schema: &'a mut SchemaInfo,
extra_scope_schema: &'a mut SchemaInfo,
extra_log_schema: &'a mut SchemaInfo,
) -> ParseContext<'a> {
pub fn new(select_info: Box<SelectInfo>) -> ParseContext<'a> {
let len = select_info.keys.len();
ParseContext {
extra_resource_schema,
extra_scope_schema,
extra_log_schema,
select_info,
select_schema: SchemaInfo::with_capacity(len),
resource_uplift_values: vec![],
scope_uplift_values: vec![],
resource_url: String::new(),
resource_attr: JsonbValue::Null,
scope_name: None,
@@ -627,74 +570,68 @@ impl<'a> ParseContext<'a> {
}
}
fn parse_scope(
select_info: &SelectInfo,
scopes_log_vec: Vec<ScopeLogs>,
parse_ctx: &mut ParseContext,
resource_extracted_values: Vec<GreptimeValue>,
) -> Result<Vec<ParseInfo>> {
let mut results = Vec::new();
fn parse_scope(scopes_log_vec: Vec<ScopeLogs>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
let len = scopes_log_vec.iter().map(|l| l.log_records.len()).sum();
let mut results = Vec::with_capacity(len);
for scope_logs in scopes_log_vec {
let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope);
parse_ctx.scope_name = scope_name;
parse_ctx.scope_version = scope_version;
parse_ctx.scope_attrs = scope_attrs;
parse_ctx.scope_url = scope_logs.schema_url;
parse_ctx.scope_attrs = scope_attrs;
let scope_extracted_values = extract_field_from_attr_and_combine_schema(
parse_ctx.extra_scope_schema,
select_info,
parse_ctx.scope_uplift_values = extract_field_from_attr_and_combine_schema(
&parse_ctx.select_info,
&mut parse_ctx.select_schema,
&parse_ctx.scope_attrs,
)?;
let rows = parse_log(
select_info,
scope_logs.log_records,
parse_ctx,
&resource_extracted_values,
&scope_extracted_values,
)?;
let rows = parse_log(scope_logs.log_records, parse_ctx)?;
results.extend(rows);
}
Ok(results)
}
fn parse_log(
select_info: &SelectInfo,
log_records: Vec<LogRecord>,
parse_ctx: &mut ParseContext,
resource_extracted_values: &[GreptimeValue],
scope_extracted_values: &[GreptimeValue],
) -> Result<Vec<ParseInfo>> {
fn parse_log(log_records: Vec<LogRecord>, parse_ctx: &mut ParseContext) -> Result<Vec<Row>> {
let mut result = Vec::with_capacity(log_records.len());
for log in log_records {
let log_attr = key_value_to_jsonb(log.attributes.clone());
let (mut row, log_attr) = build_otlp_build_in_row(log, parse_ctx);
let row = build_otlp_build_in_row(log, parse_ctx);
let log_extracted_values = extract_field_from_attr_and_combine_schema(
parse_ctx.extra_log_schema,
select_info,
let log_values = extract_field_from_attr_and_combine_schema(
&parse_ctx.select_info,
&mut parse_ctx.select_schema,
&log_attr,
)?;
let parse_info = ParseInfo {
values: row,
resource_extracted_values: resource_extracted_values.to_vec(),
scope_extracted_values: scope_extracted_values.to_vec(),
log_extracted_values,
};
result.push(parse_info);
let extracted_values = merge_values(
log_values,
&parse_ctx.scope_uplift_values,
&parse_ctx.resource_uplift_values,
);
row.values.extend(extracted_values);
result.push(row);
}
Ok(result)
}
struct ParseInfo {
values: Row,
resource_extracted_values: Vec<GreptimeValue>,
scope_extracted_values: Vec<GreptimeValue>,
log_extracted_values: Vec<GreptimeValue>,
fn merge_values(
log: Vec<GreptimeValue>,
scope: &[GreptimeValue],
resource: &[GreptimeValue],
) -> Vec<GreptimeValue> {
log.into_iter()
.enumerate()
.map(|(i, value)| GreptimeValue {
value_data: value
.value_data
.or_else(|| scope.get(i).and_then(|x| x.value_data.clone()))
.or_else(|| resource.get(i).and_then(|x| x.value_data.clone())),
})
.collect()
}
/// transform otlp logs request to pipeline value

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use api::v1::value::ValueData;
use api::v1::ColumnDataType;
use itertools::Itertools;
@@ -47,18 +45,19 @@ pub fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> {
}
pub fn key_value_to_jsonb(key_values: Vec<KeyValue>) -> JsonbValue<'static> {
let mut map = BTreeMap::new();
for kv in key_values {
let value = match kv.value {
Some(value) => match value.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
},
None => JsonbValue::Null,
};
map.insert(kv.key.clone(), value);
}
JsonbValue::Object(map)
JsonbValue::Object(
key_values
.into_iter()
.map(|kv| {
(
kv.key,
kv.value
.and_then(|v| v.value)
.map_or(JsonbValue::Null, any_value_to_jsonb),
)
})
.collect(),
)
}
#[inline]

View File

@@ -1905,7 +1905,7 @@ pub async fn test_otlp_logs(store_type: StorageType) {
let client = TestClient::new(app).await;
let content = r#"
{"resourceLogs":[{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568497632000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line one"},"attributes":[{"key":"app","value":{"stringValue":"server"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"},{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568538897000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line two"},"attributes":[{"key":"app","value":{"stringValue":"server"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
{"resourceLogs":[{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[{"key":"instance_num","value":{"stringValue":"10"}}],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568497632000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line one"},"attributes":[{"key":"app","value":{"stringValue":"server1"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"},{"resource":{"attributes":[],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568538897000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line two"},"attributes":[{"key":"app","value":{"stringValue":"server2"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
"#;
let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap();
@@ -1915,24 +1915,24 @@ pub async fn test_otlp_logs(store_type: StorageType) {
// write log data
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-table-name"),
HeaderValue::from_static("logs1"),
),
],
vec![(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
)],
"/v1/otlp/v1/logs?db=public",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"],[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"]]";
validate_data("otlp_logs", &client, "select * from logs1;", expected).await;
let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server1\"},0,\"\",\"\",{\"instance_num\":\"10\"},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"],[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server2\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\"]]";
validate_data(
"otlp_logs",
&client,
"select * from opentelemetry_logs;",
expected,
)
.await;
}
{
@@ -1946,7 +1946,7 @@ pub async fn test_otlp_logs(store_type: StorageType) {
),
(
HeaderName::from_static("x-greptime-log-table-name"),
HeaderValue::from_static("logs"),
HeaderValue::from_static("cus_logs"),
),
(
HeaderName::from_static("x-greptime-log-extract-keys"),
@@ -1960,11 +1960,51 @@ pub async fn test_otlp_logs(store_type: StorageType) {
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",\"server\"],[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",\"server\"]]";
let expected = "[[1736413568538897000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line two\",{\"app\":\"server2\"},0,\"\",\"\",{},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",null,\"server2\"],[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server1\"},0,\"\",\"\",{\"instance_num\":\"10\"},\"\",{},\"https://opentelemetry.io/schemas/1.4.0\",\"10\",\"server1\"]]";
validate_data(
"otlp_logs_with_selector",
&client,
"select * from logs;",
"select * from cus_logs;",
expected,
)
.await;
}
{
// test same selector with multiple value
let content = r#"
{"resourceLogs":[{"resource":{"attributes":[{"key":"fromwhere","value":{"stringValue":"resource"}}],"droppedAttributesCount":0},"scopeLogs":[{"scope":{"name":"","version":"","attributes":[{"key":"fromwhere","value":{"stringValue":"scope"}}],"droppedAttributesCount":0},"logRecords":[{"timeUnixNano":"1736413568497632000","observedTimeUnixNano":"0","severityNumber":9,"severityText":"Info","body":{"stringValue":"the message line one"},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"fromwhere","value":{"stringValue":"log_attr"}}],"droppedAttributesCount":0,"flags":0,"traceId":"f665100a612542b69cc362fe2ae9d3bf","spanId":"e58f01c4c69f4488"}],"schemaUrl":""}],"schemaUrl":"https://opentelemetry.io/schemas/1.4.0"}]}
"#;
let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap();
let body = req.encode_to_vec();
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("x-greptime-log-table-name"),
HeaderValue::from_static("logs2"),
),
(
HeaderName::from_static("x-greptime-log-extract-keys"),
HeaderValue::from_static("fromwhere"),
),
],
"/v1/otlp/v1/logs?db=public",
body.clone(),
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
let expected = "[[1736413568497632000,\"f665100a612542b69cc362fe2ae9d3bf\",\"e58f01c4c69f4488\",\"Info\",9,\"the message line one\",{\"app\":\"server\",\"fromwhere\":\"log_attr\"},0,\"\",\"\",{\"fromwhere\":\"scope\"},\"\",{\"fromwhere\":\"resource\"},\"https://opentelemetry.io/schemas/1.4.0\",\"log_attr\"]]";
validate_data(
"otlp_logs_with_selector_overlapping",
&client,
"select * from logs2;",
expected,
)
.await;