perf: parse Loki labels in protobuf write path (#5305)

* chore: parse loki labels

* chore: add bench

* chore: add comment

* chore: add test

* chore: remove unnecessory default value and update test

* fix: typo and test

* chore: cr issue

* chore: cr issue
This commit is contained in:
shuiyisong
2025-01-08 15:55:06 +08:00
committed by GitHub
parent 2cd1b08ff7
commit d1f8ea7880
6 changed files with 207 additions and 28 deletions

7
Cargo.lock generated
View File

@@ -9170,6 +9170,12 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "quoted-string"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a206a30ce37189d1340e7da2ee0b4d65e342590af676541c23a4f3959ba272e"
[[package]]
name = "radium"
version = "0.7.0"
@@ -10907,6 +10913,7 @@ dependencies = [
"promql-parser",
"prost 0.12.6",
"query",
"quoted-string",
"rand",
"regex",
"reqwest",

View File

@@ -63,7 +63,6 @@ humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true
json5 = "0.4"
jsonb.workspace = true
lazy_static.workspace = true
log-query.workspace = true
@@ -86,6 +85,7 @@ prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
quoted-string = "0.6"
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
@@ -123,6 +123,7 @@ client = { workspace = true, features = ["testing"] }
common-base.workspace = true
common-test-util.workspace = true
criterion = "0.5"
json5 = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
@@ -149,3 +150,7 @@ harness = false
[[bench]]
name = "to_http_output"
harness = false
[[bench]]
name = "loki_labels"
harness = false

View File

@@ -0,0 +1,41 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use servers::error::Result;
use servers::http::loki::parse_loki_labels;
// cargo bench loki_labels
fn json5_parse(input: &str) -> Result<BTreeMap<String, String>> {
let input = input.replace("=", ":");
let result: BTreeMap<String, String> = json5::from_str(&input).unwrap();
Ok(result)
}
fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("loki_labels");
let input = r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
group.bench_function("json5", |b| b.iter(|| json5_parse(black_box(input))));
group.bench_function("hand_parse", |b| {
b.iter(|| parse_loki_labels(black_box(input)))
});
group.finish(); // Important to call finish() on the group
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -506,10 +506,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse payload as json5"))]
ParseJson5 {
#[snafu(source)]
error: json5::Error,
#[snafu(display("Invalid Loki labels: {}", msg))]
InvalidLokiLabels {
msg: String,
#[snafu(implicit)]
location: Location,
},
@@ -666,7 +665,7 @@ impl ErrorExt for Error {
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| ParseJson { .. }
| ParseJson5 { .. }
| InvalidLokiLabels { .. }
| InvalidLokiPayload { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }

View File

@@ -27,17 +27,18 @@ use axum::{Extension, TypedHeader};
use bytes::Bytes;
use common_query::prelude::GREPTIME_TIMESTAMP;
use common_query::{Output, OutputData};
use common_telemetry::warn;
use common_telemetry::{error, warn};
use hashbrown::HashMap;
use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use prost::Message;
use quoted_string::test_utils::TestSpec;
use session::context::{Channel, QueryContext};
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
DecodeOtlpRequestSnafu, InvalidLokiPayloadSnafu, ParseJson5Snafu, ParseJsonSnafu, Result,
UnsupportedContentTypeSnafu,
DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
Result, UnsupportedContentTypeSnafu,
};
use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
use crate::http::extractor::LogTableName;
@@ -191,8 +192,7 @@ async fn handle_json_req(
l.iter()
.filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string())))
.collect::<BTreeMap<String, String>>()
})
.unwrap_or_default();
});
// process each line
for (line_index, line) in lines.iter().enumerate() {
@@ -230,7 +230,7 @@ async fn handle_json_req(
// TODO(shuiyisong): we'll ignore structured metadata for now
let mut row = init_row(schemas.len(), ts, line_text);
process_labels(&mut column_indexer, schemas, &mut row, labels.iter());
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
rows.push(row);
}
@@ -255,13 +255,11 @@ async fn handle_pb_req(
let mut rows = Vec::with_capacity(cnt);
for stream in req.streams {
// parse labels for each row
// encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
// use very dirty hack to parse labels
// TODO(shuiyisong): remove json5 and parse the string directly
let labels = stream.labels.replace("=", ":");
// use btreemap to keep order
let labels: BTreeMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;
let labels = parse_loki_labels(&stream.labels)
.inspect_err(|e| {
error!(e; "failed to parse loki labels");
})
.ok();
// process entries
for entry in stream.entries {
@@ -273,7 +271,7 @@ async fn handle_pb_req(
let line = entry.line;
let mut row = init_row(schemas.len(), prost_ts_to_nano(&ts), line);
process_labels(&mut column_indexer, schemas, &mut row, labels.iter());
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
rows.push(row);
}
@@ -282,6 +280,81 @@ async fn handle_pb_req(
Ok(rows)
}
/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label
/// note: pub here for bench usage
/// ref:
/// 1. encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
/// 2. test data: https://github.com/grafana/loki/blob/a24ef7b206e0ca63ee74ca6ecb0a09b745cd2258/pkg/push/types_test.go
pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
let mut labels = labels.trim();
ensure!(
labels.len() >= 2,
InvalidLokiLabelsSnafu {
msg: "labels string too short"
}
);
ensure!(
labels.starts_with("{"),
InvalidLokiLabelsSnafu {
msg: "missing `{` at the beginning"
}
);
ensure!(
labels.ends_with("}"),
InvalidLokiLabelsSnafu {
msg: "missing `}` at the end"
}
);
let mut result = BTreeMap::new();
labels = &labels[1..labels.len() - 1];
while !labels.is_empty() {
// parse key
let first_index = labels.find("=").context(InvalidLokiLabelsSnafu {
msg: format!("missing `=` near: {}", labels),
})?;
let key = &labels[..first_index];
labels = &labels[first_index + 1..];
// parse value
let qs = quoted_string::parse::<TestSpec>(labels)
.map_err(|e| {
InvalidLokiLabelsSnafu {
msg: format!(
"failed to parse quoted string near: {}, reason: {}",
labels, e.1
),
}
.build()
})?
.quoted_string;
labels = &labels[qs.len()..];
let value = quoted_string::to_content::<TestSpec>(qs).map_err(|e| {
InvalidLokiLabelsSnafu {
msg: format!("failed to unquote the string: {}, reason: {}", qs, e),
}
.build()
})?;
// insert key and value
result.insert(key.to_string(), value.to_string());
if labels.is_empty() {
break;
}
ensure!(
labels.starts_with(","),
InvalidLokiLabelsSnafu { msg: "missing `,`" }
);
labels = labels[1..].trim_start();
}
Ok(result)
}
#[inline]
fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
@@ -303,12 +376,16 @@ fn init_row(schema_len: usize, ts: i64, line: String) -> Vec<GreptimeValue> {
row
}
fn process_labels<'a>(
fn process_labels(
column_indexer: &mut HashMap<String, u16>,
schemas: &mut Vec<ColumnSchema>,
row: &mut Vec<GreptimeValue>,
labels: impl Iterator<Item = (&'a String, &'a String)>,
labels: Option<&BTreeMap<String, String>>,
) {
let Some(labels) = labels else {
return;
};
// insert labels
for (k, v) in labels {
if let Some(index) = column_indexer.get(k) {
@@ -359,9 +436,12 @@ macro_rules! unwrap_or_warn_continue {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use loki_api::prost_types::Timestamp;
use crate::http::loki::prost_ts_to_nano;
use crate::error::Error::InvalidLokiLabels;
use crate::http::loki::{parse_loki_labels, prost_ts_to_nano};
#[test]
fn test_ts_to_nano() {
@@ -374,4 +454,50 @@ mod tests {
};
assert_eq!(prost_ts_to_nano(&ts), 1731748568804293888);
}
#[test]
fn test_parse_loki_labels() {
let mut expected = BTreeMap::new();
expected.insert("job".to_string(), "foobar".to_string());
expected.insert("cluster".to_string(), "foo-central1".to_string());
expected.insert("namespace".to_string(), "bar".to_string());
expected.insert("container_name".to_string(), "buzz".to_string());
// perfect case
let valid_labels =
r#"{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}"#;
let re = parse_loki_labels(valid_labels);
assert!(re.is_ok());
assert_eq!(re.unwrap(), expected);
// too short
let too_short = r#"}"#;
let re = parse_loki_labels(too_short);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
// missing start
let missing_start = r#"job="foobar"}"#;
let re = parse_loki_labels(missing_start);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
// missing start
let missing_end = r#"{job="foobar""#;
let re = parse_loki_labels(missing_end);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
// missing equal
let missing_equal = r#"{job"foobar"}"#;
let re = parse_loki_labels(missing_equal);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
// missing quote
let missing_quote = r#"{job=foobar}"#;
let re = parse_loki_labels(missing_quote);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
// missing comma
let missing_comma = r#"{job="foobar" cluster="foo-central1"}"#;
let re = parse_loki_labels(missing_comma);
assert!(matches!(re.err().unwrap(), InvalidLokiLabels { .. }));
}
}

View File

@@ -1873,7 +1873,7 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
// init loki request
let req: PushRequest = PushRequest {
streams: vec![StreamAdapter {
labels: r#"{service="test",source="integration","wadaxi"="do anything"}"#.to_string(),
labels: r#"{service="test",source="integration",wadaxi="do anything"}"#.to_string(),
entries: vec![
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
@@ -1953,7 +1953,8 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
"streams": [
{
"stream": {
"source": "test"
"source": "test",
"sender": "integration"
},
"values": [
[ "1735901380059465984", "this is line one" ],
@@ -1987,7 +1988,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"sender\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"sender\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
@@ -1997,7 +1998,7 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
.await;
// test content
let expected = "[[1735901380059465984,\"this is line one\",\"test\"],[1735901398478897920,\"this is line two\",\"test\"]]";
let expected = "[[1735901380059465984,\"this is line one\",\"integration\",\"test\"],[1735901398478897920,\"this is line two\",\"integration\",\"test\"]]";
validate_data(
"loki_json_content",
&client,