From 53b25c04a253a419b5fa68dd1ad7d45f739be58c Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Thu, 20 Feb 2025 00:44:26 +0800 Subject: [PATCH] chore: support Loki's structured metadata for ingestion (#5541) * chore: support loki's structured metadata * test: update test * chore: revert some code change * chore: address CR comment --- src/servers/src/http/loki.rs | 59 +++++++++++++++++++++++++++++---- tests-integration/tests/http.rs | 39 ++++++++++++++++------ 2 files changed, 81 insertions(+), 17 deletions(-) diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index 0315c318a7..ac7afe6d45 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -18,8 +18,8 @@ use std::time::Instant; use api::v1::value::ValueData; use api::v1::{ - ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, - Value as GreptimeValue, + ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue, }; use axum::extract::State; use axum::Extension; @@ -30,6 +30,7 @@ use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; use hashbrown::HashMap; use headers::ContentType; +use jsonb::Value; use lazy_static::lazy_static; use loki_proto::prost_types::Timestamp; use prost::Message; @@ -53,6 +54,7 @@ use crate::{prom_store, unwrap_or_warn_continue}; const LOKI_TABLE_NAME: &str = "loki_logs"; const LOKI_LINE_COLUMN: &str = "line"; +const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata"; const STREAMS_KEY: &str = "streams"; const LABEL_KEY: &str = "stream"; @@ -74,6 +76,17 @@ lazy_static! { datatype_extension: None, options: None, }, + ColumnSchema { + column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(), + datatype: ColumnDataType::Binary.into(), + semantic_type: SemanticType::Field.into(), + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType( + JsonTypeExtension::JsonBinary.into() + )) + }), + options: None, + } ]; } @@ -224,9 +237,20 @@ async fn handle_json_req( stream_index, line_index ); - // TODO(shuiyisong): we'll ignore structured metadata for now - let mut row = init_row(schemas.len(), ts, line_text); + let structured_metadata = match line.get(2) { + Some(sdata) if sdata.is_object() => sdata + .as_object() + .unwrap() + .iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), Value::String(s.into())))) + .collect(), + _ => BTreeMap::new(), + }; + let structured_metadata = Value::Object(structured_metadata); + + let mut row = init_row(schemas.len(), ts, line_text, structured_metadata); + process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref()); rows.push(row); @@ -267,7 +291,20 @@ async fn handle_pb_req( }; let line = entry.line; - let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line); + let structured_metadata = entry + .structured_metadata + .into_iter() + .map(|d| (d.name, Value::String(d.value.into()))) + .collect::>(); + let structured_metadata = Value::Object(structured_metadata); + + let mut row = init_row( + schemas.len(), + prost_ts_to_nano(&ts), + line, + structured_metadata, + ); + process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref()); rows.push(row); @@ -357,7 +394,12 @@ fn prost_ts_to_nano(ts: &Timestamp) -> i64 { ts.seconds * 1_000_000_000 + ts.nanos as i64 } -fn init_row(schema_len: usize, ts: i64, line: String) -> Vec { +fn init_row( + schema_len: usize, + ts: i64, + line: String, + structured_metadata: Value, +) -> Vec { // create and init row let mut row = Vec::with_capacity(schema_len); // set ts and line @@ -367,7 +409,10 @@ fn init_row(schema_len: usize, ts: i64, line: String) -> Vec { row.push(GreptimeValue { value_data: Some(ValueData::StringValue(line)), }); - for _ in 0..(schema_len - 2) { + row.push(GreptimeValue { + value_data: Some(ValueData::BinaryValue(structured_metadata.to_vec())), + }); + for _ in 0..(schema_len - 3) { row.push(GreptimeValue { value_data: None }); } row diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index ed67271527..c2d6567d33 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -23,7 +23,7 @@ use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; use log_query::{Context, Limit, LogQuery, TimeFilter}; -use loki_proto::logproto::{EntryAdapter, PushRequest, StreamAdapter}; +use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter}; use loki_proto::prost_types::Timestamp; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; @@ -2226,12 +2226,30 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { EntryAdapter { timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()), line: "this is a log message".to_string(), - structured_metadata: vec![], + structured_metadata: vec![ + LabelPairAdapter { + name: "key1".to_string(), + value: "value1".to_string(), + }, + LabelPairAdapter { + name: "key2".to_string(), + value: "value2".to_string(), + }, + ], parsed: vec![], }, EntryAdapter { - timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()), - line: "this is a log message".to_string(), + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:51").unwrap()), + line: "this is a log message 2".to_string(), + structured_metadata: vec![LabelPairAdapter { + name: "key3".to_string(), + value: "value3".to_string(), + }], + parsed: vec![], + }, + EntryAdapter { + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:52").unwrap()), + line: "this is a log message 2".to_string(), structured_metadata: vec![], parsed: vec![], }, @@ -2271,7 +2289,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_pb_schema", &client, @@ -2281,7 +2299,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { .await; // test content - let expected = r#"[[1730976830000000000,"this is a log message","test","integration","do anything"],[1730976830000000000,"this is a log message","test","integration","do anything"]]"#; + let expected = "[[1730976830000000000,\"this is a log message\",{\"key1\":\"value1\",\"key2\":\"value2\"},\"test\",\"integration\",\"do anything\"],[1730976831000000000,\"this is a log message 2\",{\"key3\":\"value3\"},\"test\",\"integration\",\"do anything\"],[1730976832000000000,\"this is a log message 2\",{},\"test\",\"integration\",\"do anything\"]]"; validate_data( "loki_pb_content", &client, @@ -2309,8 +2327,9 @@ pub async fn test_loki_json_logs(store_type: StorageType) { "sender": "integration" }, "values": [ - [ "1735901380059465984", "this is line one" ], - [ "1735901398478897920", "this is line two" ] + [ "1735901380059465984", "this is line one", {"key1":"value1","key2":"value2"}], + [ "1735901398478897920", "this is line two", {"key3":"value3"}], + [ "1735901398478897921", "this is line two updated"] ] } ] @@ -2340,7 +2359,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_json_schema", &client, @@ -2350,7 +2369,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) { .await; // test content - let expected = "[[1735901380059465984,\"this is line one\",\"integration\",\"test\"],[1735901398478897920,\"this is line two\",\"integration\",\"test\"]]"; + let expected = "[[1735901380059465984,\"this is line one\",{\"key1\":\"value1\",\"key2\":\"value2\"},\"integration\",\"test\"],[1735901398478897920,\"this is line two\",{\"key3\":\"value3\"},\"integration\",\"test\"],[1735901398478897921,\"this is line two updated\",{},\"integration\",\"test\"]]"; validate_data( "loki_json_content", &client,