diff --git a/Cargo.lock b/Cargo.lock index 5e5d205753..ad3076094f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9170,6 +9170,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "quoted-string" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a206a30ce37189d1340e7da2ee0b4d65e342590af676541c23a4f3959ba272e" + [[package]] name = "radium" version = "0.7.0" @@ -10907,6 +10913,7 @@ dependencies = [ "promql-parser", "prost 0.12.6", "query", + "quoted-string", "rand", "regex", "reqwest", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index a450829199..674cd3f7df 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -63,7 +63,6 @@ humantime-serde.workspace = true hyper = { version = "0.14", features = ["full"] } influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } itertools.workspace = true -json5 = "0.4" jsonb.workspace = true lazy_static.workspace = true log-query.workspace = true @@ -86,6 +85,7 @@ prometheus.workspace = true promql-parser.workspace = true prost.workspace = true query.workspace = true +quoted-string = "0.6" rand.workspace = true regex.workspace = true reqwest.workspace = true @@ -123,6 +123,7 @@ client = { workspace = true, features = ["testing"] } common-base.workspace = true common-test-util.workspace = true criterion = "0.5" +json5 = "0.4" mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } @@ -149,3 +150,7 @@ harness = false [[bench]] name = "to_http_output" harness = false + +[[bench]] +name = "loki_labels" +harness = false diff --git a/src/servers/benches/loki_labels.rs b/src/servers/benches/loki_labels.rs new file mode 100644 index 0000000000..e0d64976ef --- /dev/null +++ b/src/servers/benches/loki_labels.rs @@ -0,0 +1,41 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use servers::error::Result; +use servers::http::loki::parse_loki_labels; + +// cargo bench loki_labels + +fn json5_parse(input: &str) -> Result> { + let input = input.replace("=", ":"); + let result: BTreeMap = json5::from_str(&input).unwrap(); + Ok(result) +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("loki_labels"); + let input = r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#; + + group.bench_function("json5", |b| b.iter(|| json5_parse(black_box(input)))); + group.bench_function("hand_parse", |b| { + b.iter(|| parse_loki_labels(black_box(input))) + }); + group.finish(); // Important to call finish() on the group +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 31aa5342be..621211cf86 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -506,10 +506,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to parse payload as json5"))] - ParseJson5 { - #[snafu(source)] - error: json5::Error, + #[snafu(display("Invalid Loki labels: {}", msg))] + InvalidLokiLabels { + msg: String, #[snafu(implicit)] location: Location, }, @@ -666,7 +665,7 @@ impl ErrorExt for Error { | MissingQueryContext { .. } | MysqlValueConversion { .. } | ParseJson { .. } - | ParseJson5 { .. } + | InvalidLokiLabels { .. } | InvalidLokiPayload { .. } | UnsupportedContentType { .. } | TimestampOverflow { .. } diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index b101411061..272d2867db 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -27,17 +27,18 @@ use axum::{Extension, TypedHeader}; use bytes::Bytes; use common_query::prelude::GREPTIME_TIMESTAMP; use common_query::{Output, OutputData}; -use common_telemetry::warn; +use common_telemetry::{error, warn}; use hashbrown::HashMap; use lazy_static::lazy_static; use loki_api::prost_types::Timestamp; use prost::Message; +use quoted_string::test_utils::TestSpec; use session::context::{Channel, QueryContext}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - DecodeOtlpRequestSnafu, InvalidLokiPayloadSnafu, ParseJson5Snafu, ParseJsonSnafu, Result, - UnsupportedContentTypeSnafu, + DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu, + Result, UnsupportedContentTypeSnafu, }; use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE}; use crate::http::extractor::LogTableName; @@ -191,8 +192,7 @@ async fn handle_json_req( l.iter() .filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string()))) .collect::>() - }) - .unwrap_or_default(); + }); // process each line for (line_index, line) in lines.iter().enumerate() { @@ -230,7 +230,7 @@ async fn handle_json_req( // TODO(shuiyisong): we'll ignore structured metadata for now let mut row = init_row(schemas.len(), ts, line_text); - process_labels(&mut column_indexer, schemas, &mut row, labels.iter()); + process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref()); rows.push(row); } @@ -255,13 +255,11 @@ async fn handle_pb_req( let mut rows = Vec::with_capacity(cnt); for stream in req.streams { - // parse labels for each row - // encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145 - // use very dirty hack to parse labels - // TODO(shuiyisong): remove json5 and parse the string directly - let labels = stream.labels.replace("=", ":"); - // use btreemap to keep order - let labels: BTreeMap = json5::from_str(&labels).context(ParseJson5Snafu)?; + let labels = parse_loki_labels(&stream.labels) + .inspect_err(|e| { + error!(e; "failed to parse loki labels"); + }) + .ok(); // process entries for entry in stream.entries { @@ -273,7 +271,7 @@ async fn handle_pb_req( let line = entry.line; let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line); - process_labels(&mut column_indexer, schemas, &mut row, labels.iter()); + process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref()); rows.push(row); } @@ -282,6 +280,81 @@ async fn handle_pb_req( Ok(rows) } +/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label +/// note: pub here for bench usage +/// ref: +/// 1. encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145 +/// 2. test data: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go +pub fn parse_loki_labels(labels: &str) -> Result> { + let mut labels = labels.trim(); + ensure!( + labels.len() >= 2, + InvalidLokiLabelsSnafu { + msg: "labels string too short" + } + ); + ensure!( + labels.starts_with("{"), + InvalidLokiLabelsSnafu { + msg: "missing `{` at the beginning" + } + ); + ensure!( + labels.ends_with("}"), + InvalidLokiLabelsSnafu { + msg: "missing `}` at the end" + } + ); + + let mut result = BTreeMap::new(); + labels = &labels[1..labels.len() - 1]; + + while !labels.is_empty() { + // parse key + let first_index = labels.find("=").context(InvalidLokiLabelsSnafu { + msg: format!("missing `=` near: {}", labels), + })?; + let key = &labels[..first_index]; + labels = &labels[first_index + 1..]; + + // parse value + let qs = quoted_string::parse::(labels) + .map_err(|e| { + InvalidLokiLabelsSnafu { + msg: format!( + "failed to parse quoted string near: {}, reason: {}", + labels, e.1 + ), + } + .build() + })? + .quoted_string; + + labels = &labels[qs.len()..]; + + let value = quoted_string::to_content::(qs).map_err(|e| { + InvalidLokiLabelsSnafu { + msg: format!("failed to unquote the string: {}, reason: {}", qs, e), + } + .build() + })?; + + // insert key and value + result.insert(key.to_string(), value.to_string()); + + if labels.is_empty() { + break; + } + ensure!( + labels.starts_with(","), + InvalidLokiLabelsSnafu { msg: "missing `,`" } + ); + labels = labels[1..].trim_start(); + } + + Ok(result) +} + #[inline] fn prost_ts_to_nano(ts: &Timestamp) -> i64 { ts.seconds * 1_000_000_000 + ts.nanos as i64 @@ -303,12 +376,16 @@ fn init_row(schema_len: usize, ts: i64, line: String) -> Vec { row } -fn process_labels<'a>( +fn process_labels( column_indexer: &mut HashMap, schemas: &mut Vec, row: &mut Vec, - labels: impl Iterator, + labels: Option<&BTreeMap>, ) { + let Some(labels) = labels else { + return; + }; + // insert labels for (k, v) in labels { if let Some(index) = column_indexer.get(k) { @@ -359,9 +436,12 @@ macro_rules! unwrap_or_warn_continue { #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use loki_api::prost_types::Timestamp; - use crate::http::loki::prost_ts_to_nano; + use crate::error::Error::InvalidLokiLabels; + use crate::http::loki::{parse_loki_labels, prost_ts_to_nano}; #[test] fn test_ts_to_nano() { @@ -374,4 +454,50 @@ mod tests { }; assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888); } + + #[test] + fn test_parse_loki_labels() { + let mut expected = BTreeMap::new(); + expected.insert("job".to_string(), "foobar".to_string()); + expected.insert("cluster".to_string(), "foo-central1".to_string()); + expected.insert("namespace".to_string(), "bar".to_string()); + expected.insert("container_name".to_string(), "buzz".to_string()); + + // perfect case + let valid_labels = + r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#; + let re = parse_loki_labels(valid_labels); + assert!(re.is_ok()); + assert_eq!(re.unwrap(), expected); + + // too short + let too_short = r#"}"#; + let re = parse_loki_labels(too_short); + assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. })); + + // missing start + let missing_start = r#"job="foobar"}"#; + let re = parse_loki_labels(missing_start); + assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. })); + + // missing start + let missing_end = r#"{job="foobar""#; + let re = parse_loki_labels(missing_end); + assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. })); + + // missing equal + let missing_equal = r#"{job"foobar"}"#; + let re = parse_loki_labels(missing_equal); + assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. })); + + // missing quote + let missing_quote = r#"{job=foobar}"#; + let re = parse_loki_labels(missing_quote); + assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. })); + + // missing comma + let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#; + let re = parse_loki_labels(missing_comma); + assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. })); + } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 0027700e80..8b252e71e4 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1873,7 +1873,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { // init loki request let req: PushRequest = PushRequest { streams: vec![StreamAdapter { - labels: r#"{service="test",source="integration","wadaxi"="do anything"}"#.to_string(), + labels: r#"{service="test",source="integration",wadaxi="do anything"}"#.to_string(), entries: vec![ EntryAdapter { timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()), @@ -1953,7 +1953,8 @@ pub async fn test_loki_json_logs(store_type: StorageType) { "streams": [ { "stream": { - "source": "test" + "source": "test", + "sender": "integration" }, "values": [ [ "1735901380059465984", "this is line one" ], @@ -1987,7 +1988,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // test schema - let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; validate_data( "loki_json_schema", &client, @@ -1997,7 +1998,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) { .await; // test content - let expected = "[[1735901380059465984,\"this is line one\",\"test\"],[1735901398478897920,\"this is line two\",\"test\"]]"; + let expected = "[[1735901380059465984,\"this is line one\",\"integration\",\"test\"],[1735901398478897920,\"this is line two\",\"integration\",\"test\"]]"; validate_data( "loki_json_content", &client,