fix: loki write row len error (#5161)

This commit is contained in:
localhost
2024-12-13 18:10:59 +08:00
committed by Yingwen
parent b90d8f7dbd
commit 6066ce2c4a
2 changed files with 15 additions and 9 deletions

View File

@@ -514,8 +514,8 @@ pub async fn loki_ingest(
let line = entry.line;
// create and init row
let mut row = Vec::with_capacity(schemas.capacity());
for _ in 0..row.capacity() {
let mut row = Vec::with_capacity(schemas.len());
for _ in 0..schemas.len() {
row.push(GreptimeValue { value_data: None });
}
// insert ts and line

View File

@@ -1816,11 +1816,17 @@ pub async fn test_loki_logs(store_type: StorageType) {
// init loki request
let req: PushRequest = PushRequest {
streams: vec![StreamAdapter {
labels: "{service=\"test\",source=\"integration\"}".to_string(),
entries: vec![EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
}],
labels: r#"{service="test",source="integration","wadaxi"="do anything"}"#.to_string(),
entries: vec![
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
},
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
},
],
hash: rand::random(),
}],
};
@@ -1848,7 +1854,7 @@ pub async fn test_loki_logs(store_type: StorageType) {
assert_eq!(StatusCode::OK, res.status());
// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n \\\"wadaxi\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\", \\\"wadaxi\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_schema",
&client,
@@ -1858,7 +1864,7 @@ pub async fn test_loki_logs(store_type: StorageType) {
.await;
// test content
let expected = r#"[[1730976830000000000,"this is a log message","test","integration"]]"#;
let expected = r#"[[1730976830000000000,"this is a log message","test","integration","do anything"],[1730976830000000000,"this is a log message","test","integration","do anything"]]"#;
validate_data(
"loki_content",
&client,