chore: support Loki's structured metadata for ingestion (#5541)

* chore: support loki's structured metadata

* test: update test

* chore: revert some code change

* chore: address CR comment
This commit is contained in:
shuiyisong
2025-02-20 00:44:26 +08:00
committed by GitHub
parent 62a8b8b9dc
commit 53b25c04a2
2 changed files with 81 additions and 17 deletions

View File

@@ -18,8 +18,8 @@ use std::time::Instant;
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType,
Value as GreptimeValue,
ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue,
};
use axum::extract::State;
use axum::Extension;
@@ -30,6 +30,7 @@ use common_query::{Output, OutputData};
use common_telemetry::{error, warn};
use hashbrown::HashMap;
use headers::ContentType;
use jsonb::Value;
use lazy_static::lazy_static;
use loki_proto::prost_types::Timestamp;
use prost::Message;
@@ -53,6 +54,7 @@ use crate::{prom_store, unwrap_or_warn_continue};
const LOKI_TABLE_NAME: &str = "loki_logs";
const LOKI_LINE_COLUMN: &str = "line";
const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
const STREAMS_KEY: &str = "streams";
const LABEL_KEY: &str = "stream";
@@ -74,6 +76,17 @@ lazy_static! {
datatype_extension: None,
options: None,
},
ColumnSchema {
column_name: LOKI_STRUCTURED_METADATA_COLUMN.to_string(),
datatype: ColumnDataType::Binary.into(),
semantic_type: SemanticType::Field.into(),
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(api::v1::column_data_type_extension::TypeExt::JsonType(
JsonTypeExtension::JsonBinary.into()
))
}),
options: None,
}
];
}
@@ -224,9 +237,20 @@ async fn handle_json_req(
stream_index,
line_index
);
// TODO(shuiyisong): we'll ignore structured metadata for now
let mut row = init_row(schemas.len(), ts, line_text);
let structured_metadata = match line.get(2) {
Some(sdata) if sdata.is_object() => sdata
.as_object()
.unwrap()
.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), Value::String(s.into()))))
.collect(),
_ => BTreeMap::new(),
};
let structured_metadata = Value::Object(structured_metadata);
let mut row = init_row(schemas.len(), ts, line_text, structured_metadata);
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
rows.push(row);
@@ -267,7 +291,20 @@ async fn handle_pb_req(
};
let line = entry.line;
let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line);
let structured_metadata = entry
.structured_metadata
.into_iter()
.map(|d| (d.name, Value::String(d.value.into())))
.collect::<BTreeMap<String, Value>>();
let structured_metadata = Value::Object(structured_metadata);
let mut row = init_row(
schemas.len(),
prost_ts_to_nano(&ts),
line,
structured_metadata,
);
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
rows.push(row);
@@ -357,7 +394,12 @@ fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
}
fn init_row(schema_len: usize, ts: i64, line: String) -> Vec<GreptimeValue> {
fn init_row(
schema_len: usize,
ts: i64,
line: String,
structured_metadata: Value,
) -> Vec<GreptimeValue> {
// create and init row
let mut row = Vec::with_capacity(schema_len);
// set ts and line
@@ -367,7 +409,10 @@ fn init_row(schema_len: usize, ts: i64, line: String) -> Vec<GreptimeValue> {
row.push(GreptimeValue {
value_data: Some(ValueData::StringValue(line)),
});
for _ in 0..(schema_len - 2) {
row.push(GreptimeValue {
value_data: Some(ValueData::BinaryValue(structured_metadata.to_vec())),
});
for _ in 0..(schema_len - 3) {
row.push(GreptimeValue { value_data: None });
}
row

View File

@@ -23,7 +23,7 @@ use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use log_query::{Context, Limit, LogQuery, TimeFilter};
use loki_proto::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter};
use loki_proto::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
@@ -2226,12 +2226,30 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
structured_metadata: vec![],
structured_metadata: vec![
LabelPairAdapter {
name: "key1".to_string(),
value: "value1".to_string(),
},
LabelPairAdapter {
name: "key2".to_string(),
value: "value2".to_string(),
},
],
parsed: vec![],
},
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:51").unwrap()),
line: "this is a log message 2".to_string(),
structured_metadata: vec![LabelPairAdapter {
name: "key3".to_string(),
value: "value3".to_string(),
}],
parsed: vec![],
},
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:52").unwrap()),
line: "this is a log message 2".to_string(),
structured_metadata: vec![],
parsed: vec![],
},
@@ -2271,7 +2289,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_pb_schema",
&client,
@@ -2281,7 +2299,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
.await;
// test content
let expected = r#"[[1730976830000000000,"this is a log message","test","integration","do anything"],[1730976830000000000,"this is a log message","test","integration","do anything"]]"#;
let expected = "[[1730976830000000000,\"this is a log message\",{\"key1\":\"value1\",\"key2\":\"value2\"},\"test\",\"integration\",\"do anything\"],[1730976831000000000,\"this is a log message 2\",{\"key3\":\"value3\"},\"test\",\"integration\",\"do anything\"],[1730976832000000000,\"this is a log message 2\",{},\"test\",\"integration\",\"do anything\"]]";
validate_data(
"loki_pb_content",
&client,
@@ -2309,8 +2327,9 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
"sender": "integration"
},
"values": [
[ "1735901380059465984", "this is line one" ],
[ "1735901398478897920", "this is line two" ]
[ "1735901380059465984", "this is line one", {"key1":"value1","key2":"value2"}],
[ "1735901398478897920", "this is line two", {"key3":"value3"}],
[ "1735901398478897921", "this is line two updated"]
]
}
]
@@ -2340,7 +2359,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"structured_metadata\\\" JSON NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
@@ -2350,7 +2369,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
.await;
// test content
let expected = "[[1735901380059465984,\"this is line one\",\"integration\",\"test\"],[1735901398478897920,\"this is line two\",\"integration\",\"test\"]]";
let expected = "[[1735901380059465984,\"this is line one\",{\"key1\":\"value1\",\"key2\":\"value2\"},\"integration\",\"test\"],[1735901398478897920,\"this is line two\",{\"key3\":\"value3\"},\"integration\",\"test\"],[1735901398478897921,\"this is line two updated\",{},\"integration\",\"test\"]]";
validate_data(
"loki_json_content",
&client,