feat(pipeline): support Loki API (#6390)

* chore: use schema_info

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* refactor: abstract loki item generator

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: introduce middle item

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* feat: introduce pipeline in loki api

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* test: add tests

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: minor update

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update prefix and test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: change recursion to loop

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* fix: cr issue

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
shuiyisong
2025-06-27 18:01:08 -07:00
committed by Yingwen
parent 80fae1c559
commit 8a0e554e5a
4 changed files with 743 additions and 215 deletions

View File

@@ -220,6 +220,10 @@ impl ContextReq {
pub fn ref_all_req(&self) -> impl Iterator<Item = &RowInsertRequest> {
self.req.values().flatten()
}
pub fn map_len(&self) -> usize {
self.req.len()
}
}
// ContextReqIter is an iterator that iterates over the ContextReq.

View File

@@ -24,7 +24,7 @@ pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::GreptimeTransformer;
pub use etl::value::{Array, Map, Value};
pub use etl::value::{Array, Map, Timestamp, Value};
pub use etl::{
json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, Content,
DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput, TransformerMode,
@@ -50,24 +50,3 @@ macro_rules! unwrap_or_continue_if_err {
}
}};
}
#[macro_export]
macro_rules! unwrap_or_warn_continue {
($expr:expr, $msg:expr) => {
if let Some(value) = $expr {
value
} else {
warn!($msg);
continue;
}
};
($expr:expr, $fmt:expr, $($arg:tt)*) => {
if let Some(value) = $expr {
value
} else {
warn!($fmt, $($arg)*);
continue;
}
};
}

View File

@@ -12,15 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use ahash::{HashMap, HashMapExt};
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue,
RowInsertRequest, Rows, SemanticType, Value as GreptimeValue,
};
use axum::extract::State;
use axum::Extension;
@@ -32,8 +31,10 @@ use common_telemetry::{error, warn};
use headers::ContentType;
use jsonb::Value;
use lazy_static::lazy_static;
use loki_proto::prost_types::Timestamp;
use pipeline::unwrap_or_warn_continue;
use loki_proto::logproto::LabelPairAdapter;
use loki_proto::prost_types::Timestamp as LokiTimestamp;
use pipeline::util::to_pipeline_version;
use pipeline::{ContextReq, PipelineContext, PipelineDefinition, SchemaInfo};
use prost::Message;
use quoted_string::test_utils::TestSpec;
use session::context::{Channel, QueryContext};
@@ -41,22 +42,28 @@ use snafu::{ensure, OptionExt, ResultExt};
use crate::error::{
DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu,
Result, UnsupportedContentTypeSnafu,
PipelineSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
use crate::http::extractor::LogTableName;
use crate::http::event::{LogState, PipelineIngestRequest, JSON_CONTENT_TYPE, PB_CONTENT_TYPE};
use crate::http::extractor::{LogTableName, PipelineInfo};
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED,
METRIC_SUCCESS_VALUE,
};
use crate::pipeline::run_pipeline;
use crate::prom_store;
const LOKI_TABLE_NAME: &str = "loki_logs";
const LOKI_LINE_COLUMN: &str = "line";
const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata";
const LOKI_LINE_COLUMN_NAME: &str = "loki_line";
const LOKI_PIPELINE_METADATA_PREFIX: &str = "loki_metadata_";
const LOKI_PIPELINE_LABEL_PREFIX: &str = "loki_label_";
const STREAMS_KEY: &str = "streams";
const LABEL_KEY: &str = "stream";
const LINES_KEY: &str = "values";
@@ -97,6 +104,7 @@ pub async fn loki_ingest(
Extension(mut ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
LogTableName(table_name): LogTableName,
pipeline_info: PipelineInfo,
bytes: Bytes,
) -> Result<HttpResponse> {
ctx.set_channel(Channel::Loki);
@@ -106,213 +114,507 @@ pub async fn loki_ingest(
let db_str = db.as_str();
let exec_timer = Instant::now();
// init schemas
let mut schemas = LOKI_INIT_SCHEMAS.clone();
let mut rows = match content_type {
x if x == *JSON_CONTENT_TYPE => handle_json_req(bytes, &mut schemas).await,
x if x == *PB_CONTENT_TYPE => handle_pb_req(bytes, &mut schemas).await,
_ => UnsupportedContentTypeSnafu { content_type }.fail(),
}?;
// fill Null for missing values
for row in rows.iter_mut() {
row.resize(schemas.len(), GreptimeValue::default());
}
let rows = Rows {
rows: rows.into_iter().map(|values| Row { values }).collect(),
schema: schemas,
};
let ins_req = RowInsertRequest {
table_name,
rows: Some(rows),
};
let ins_reqs = RowInsertRequests {
inserts: vec![ins_req],
};
let handler = log_state.log_handler;
let output = handler.insert(ins_reqs, ctx).await;
if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_LOKI_LOGS_INGESTION_COUNTER
.with_label_values(&[db_str])
.inc_by(*rows as u64);
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
let ctx_req = if let Some(pipeline_name) = pipeline_info.pipeline_name {
// go pipeline
let version = to_pipeline_version(pipeline_info.pipeline_version.as_deref())
.context(PipelineSnafu)?;
let def =
PipelineDefinition::from_name(&pipeline_name, version, None).context(PipelineSnafu)?;
let pipeline_ctx =
PipelineContext::new(&def, &pipeline_info.pipeline_params, Channel::Loki);
let v = extract_item::<LokiPipeline>(content_type, bytes)?
.map(|i| i.map)
.collect::<Vec<_>>();
let req = PipelineIngestRequest {
table: table_name,
values: v,
};
run_pipeline(&handler, &pipeline_ctx, req, &ctx, true).await?
} else {
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
// init schemas
let mut schema_info = SchemaInfo::from_schema_list(LOKI_INIT_SCHEMAS.clone());
let mut rows = Vec::with_capacity(256);
for loki_row in extract_item::<LokiRawItem>(content_type, bytes)? {
let mut row = init_row(
schema_info.schema.len(),
loki_row.ts,
loki_row.line,
loki_row.structured_metadata,
);
process_labels(&mut schema_info, &mut row, loki_row.labels);
rows.push(row);
}
let schemas = schema_info.schema;
// fill Null for missing values
for row in rows.iter_mut() {
row.resize(schemas.len(), GreptimeValue::default());
}
let rows = Rows {
rows: rows.into_iter().map(|values| Row { values }).collect(),
schema: schemas,
};
let ins_req = RowInsertRequest {
table_name,
rows: Some(rows),
};
ContextReq::default_opt_with_reqs(vec![ins_req])
};
let mut outputs = Vec::with_capacity(ctx_req.map_len());
for (temp_ctx, req) in ctx_req.as_req_iter(ctx) {
let output = handler.insert(req, temp_ctx).await;
if let Ok(Output {
data: OutputData::AffectedRows(rows),
meta: _,
}) = &output
{
METRIC_LOKI_LOGS_INGESTION_COUNTER
.with_label_values(&[db_str])
.inc_by(*rows as u64);
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_SUCCESS_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
} else {
METRIC_LOKI_LOGS_INGESTION_ELAPSED
.with_label_values(&[db_str, METRIC_FAILURE_VALUE])
.observe(exec_timer.elapsed().as_secs_f64());
}
outputs.push(output);
}
let response = GreptimedbV1Response::from_output(vec![output])
let response = GreptimedbV1Response::from_output(outputs)
.await
.with_execution_time(exec_timer.elapsed().as_millis() as u64);
Ok(response)
}
async fn handle_json_req(
bytes: Bytes,
schemas: &mut Vec<ColumnSchema>,
) -> Result<Vec<Vec<GreptimeValue>>> {
let mut column_indexer: HashMap<String, u16> = HashMap::new();
column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);
let payload: serde_json::Value =
serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
let streams = payload
.get(STREAMS_KEY)
.context(InvalidLokiPayloadSnafu {
msg: "missing streams",
})?
.as_array()
.context(InvalidLokiPayloadSnafu {
msg: "streams is not an array",
})?;
let mut rows = Vec::with_capacity(1000);
for (stream_index, stream) in streams.iter().enumerate() {
// parse lines first
// do not use `?` in case there are multiple streams
let lines = unwrap_or_warn_continue!(
stream.get(LINES_KEY),
"missing values on stream {}",
stream_index
);
let lines = unwrap_or_warn_continue!(
lines.as_array(),
"values is not an array on stream {}",
stream_index
);
// get labels
let labels = stream
.get(LABEL_KEY)
.and_then(|label| label.as_object())
.map(|l| {
l.iter()
.filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string())))
.collect::<BTreeMap<String, String>>()
});
// process each line
for (line_index, line) in lines.iter().enumerate() {
let line = unwrap_or_warn_continue!(
line.as_array(),
"missing line on stream {} index {}",
stream_index,
line_index
);
if line.len() < 2 {
warn!(
"line on stream {} index {} is too short",
stream_index, line_index
);
continue;
}
// get ts
let ts = unwrap_or_warn_continue!(
line.first()
.and_then(|ts| ts.as_str())
.and_then(|ts| ts.parse::<i64>().ok()),
"missing or invalid timestamp on stream {} index {}",
stream_index,
line_index
);
// get line
let line_text = unwrap_or_warn_continue!(
line.get(1)
.and_then(|line| line.as_str())
.map(|line| line.to_string()),
"missing or invalid line on stream {} index {}",
stream_index,
line_index
);
let structured_metadata = match line.get(2) {
Some(sdata) if sdata.is_object() => sdata
.as_object()
.unwrap()
.iter()
.filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), Value::String(s.into()))))
.collect(),
_ => BTreeMap::new(),
};
let structured_metadata = Value::Object(structured_metadata);
let mut row = init_row(schemas.len(), ts, line_text, structured_metadata);
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
rows.push(row);
}
}
Ok(rows)
/// This is the holder of the loki lines parsed from json or protobuf.
/// The generic here is either [serde_json::Value] or [Vec<LabelPairAdapter>].
/// Depending on the target destination, this can be converted to [LokiRawItem] or [LokiPipeline].
pub struct LokiMiddleItem<T> {
pub ts: i64,
pub line: String,
pub structured_metadata: Option<T>,
pub labels: Option<BTreeMap<String, String>>,
}
async fn handle_pb_req(
bytes: Bytes,
schemas: &mut Vec<ColumnSchema>,
) -> Result<Vec<Vec<GreptimeValue>>> {
let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
.context(DecodeOtlpRequestSnafu)?;
/// This is the line item for the Loki raw ingestion.
/// We'll persist the line in its whole, set labels into tags,
/// and structured metadata into a big JSON.
pub struct LokiRawItem {
pub ts: i64,
pub line: String,
pub structured_metadata: Vec<u8>,
pub labels: Option<BTreeMap<String, String>>,
}
let mut column_indexer: HashMap<String, u16> = HashMap::new();
column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0);
column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1);
/// This is the line item prepared for the pipeline engine.
pub struct LokiPipeline {
pub map: pipeline::Value,
}
let cnt = req.streams.iter().map(|s| s.entries.len()).sum::<usize>();
let mut rows = Vec::with_capacity(cnt);
/// This is the flow of the Loki ingestion.
/// +--------+
/// | bytes |
/// +--------+
/// |
/// +----------------------+----------------------+
/// | | |
/// | JSON content type | PB content type |
/// +----------------------+----------------------+
/// | | |
/// | JsonStreamItem | PbStreamItem |
/// | stream: serde_json | stream: adapter |
/// +----------------------+----------------------+
/// | | |
/// | MiddleItem<serde_json> | MiddleItem<entry> |
/// +----------------------+----------------------+
/// \ /
/// \ /
/// \ /
/// +----------------------+
/// | MiddleItem<T> |
/// +----------------------+
/// |
/// +----------------+----------------+
/// | |
/// +------------------+ +---------------------+
/// | LokiRawItem | | LokiPipelineItem |
/// +------------------+ +---------------------+
/// | |
/// +------------------+ +---------------------+
/// | Loki ingest | | run_pipeline |
/// +------------------+ +---------------------+
fn extract_item<T>(content_type: ContentType, bytes: Bytes) -> Result<Box<dyn Iterator<Item = T>>>
where
LokiMiddleItem<serde_json::Value>: Into<T>,
LokiMiddleItem<Vec<LabelPairAdapter>>: Into<T>,
{
match content_type {
x if x == *JSON_CONTENT_TYPE => Ok(Box::new(
LokiJsonParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
)),
x if x == *PB_CONTENT_TYPE => Ok(Box::new(
LokiPbParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())),
)),
_ => UnsupportedContentTypeSnafu { content_type }.fail(),
}
}
struct LokiJsonParser {
pub streams: VecDeque<serde_json::Value>,
}
impl LokiJsonParser {
pub fn from_bytes(bytes: Bytes) -> Result<Self> {
let payload: serde_json::Value =
serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?;
let serde_json::Value::Object(mut map) = payload else {
return InvalidLokiPayloadSnafu {
msg: "payload is not an object",
}
.fail();
};
let streams = map.remove(STREAMS_KEY).context(InvalidLokiPayloadSnafu {
msg: "missing streams",
})?;
let serde_json::Value::Array(streams) = streams else {
return InvalidLokiPayloadSnafu {
msg: "streams is not an array",
}
.fail();
};
Ok(Self {
streams: streams.into(),
})
}
}
impl Iterator for LokiJsonParser {
type Item = JsonStreamItem;
fn next(&mut self) -> Option<Self::Item> {
while let Some(stream) = self.streams.pop_front() {
// get lines from the map
let serde_json::Value::Object(mut map) = stream else {
warn!("stream is not an object, {:?}", stream);
continue;
};
let Some(lines) = map.remove(LINES_KEY) else {
warn!("missing lines on stream, {:?}", map);
continue;
};
let serde_json::Value::Array(lines) = lines else {
warn!("lines is not an array, {:?}", lines);
continue;
};
// get labels
let labels = map
.remove(LABEL_KEY)
.and_then(|m| match m {
serde_json::Value::Object(labels) => Some(labels),
_ => None,
})
.map(|m| {
m.into_iter()
.filter_map(|(k, v)| match v {
serde_json::Value::String(v) => Some((k, v)),
_ => None,
})
.collect::<BTreeMap<String, String>>()
});
return Some(JsonStreamItem {
lines: lines.into(),
labels,
});
}
None
}
}
struct JsonStreamItem {
pub lines: VecDeque<serde_json::Value>,
pub labels: Option<BTreeMap<String, String>>,
}
impl Iterator for JsonStreamItem {
type Item = LokiMiddleItem<serde_json::Value>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(line) = self.lines.pop_front() {
let serde_json::Value::Array(line) = line else {
warn!("line is not an array, {:?}", line);
continue;
};
if line.len() < 2 {
warn!("line is too short, {:?}", line);
continue;
}
let mut line: VecDeque<serde_json::Value> = line.into();
// get ts
let ts = line.pop_front().and_then(|ts| match ts {
serde_json::Value::String(ts) => ts.parse::<i64>().ok(),
_ => {
warn!("missing or invalid timestamp, {:?}", ts);
None
}
});
let Some(ts) = ts else {
continue;
};
let line_text = line.pop_front().and_then(|l| match l {
serde_json::Value::String(l) => Some(l),
_ => {
warn!("missing or invalid line, {:?}", l);
None
}
});
let Some(line_text) = line_text else {
continue;
};
let structured_metadata = line.pop_front();
return Some(LokiMiddleItem {
ts,
line: line_text,
structured_metadata,
labels: self.labels.clone(),
});
}
None
}
}
impl From<LokiMiddleItem<serde_json::Value>> for LokiRawItem {
fn from(val: LokiMiddleItem<serde_json::Value>) -> Self {
let LokiMiddleItem {
ts,
line,
structured_metadata,
labels,
} = val;
let structured_metadata = structured_metadata
.and_then(|m| match m {
serde_json::Value::Object(m) => Some(m),
_ => None,
})
.map(|m| {
m.into_iter()
.filter_map(|(k, v)| match v {
serde_json::Value::String(v) => Some((k, Value::String(v.into()))),
_ => None,
})
.collect::<BTreeMap<String, Value>>()
})
.unwrap_or_default();
let structured_metadata = Value::Object(structured_metadata).to_vec();
LokiRawItem {
ts,
line,
structured_metadata,
labels,
}
}
}
impl From<LokiMiddleItem<serde_json::Value>> for LokiPipeline {
fn from(value: LokiMiddleItem<serde_json::Value>) -> Self {
let LokiMiddleItem {
ts,
line,
structured_metadata,
labels,
} = value;
let mut map = BTreeMap::new();
map.insert(
GREPTIME_TIMESTAMP.to_string(),
pipeline::Value::Timestamp(pipeline::Timestamp::Nanosecond(ts)),
);
map.insert(
LOKI_LINE_COLUMN_NAME.to_string(),
pipeline::Value::String(line),
);
if let Some(serde_json::Value::Object(m)) = structured_metadata {
for (k, v) in m {
match pipeline::Value::try_from(v) {
Ok(v) => {
map.insert(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, k), v);
}
Err(e) => {
warn!("not a valid value, {:?}", e);
}
}
}
}
if let Some(v) = labels {
v.into_iter().for_each(|(k, v)| {
map.insert(
format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k),
pipeline::Value::String(v),
);
});
}
LokiPipeline {
map: pipeline::Value::Map(pipeline::Map::from(map)),
}
}
}
pub struct LokiPbParser {
pub streams: VecDeque<loki_proto::logproto::StreamAdapter>,
}
impl LokiPbParser {
pub fn from_bytes(bytes: Bytes) -> Result<Self> {
let decompressed = prom_store::snappy_decompress(&bytes).unwrap();
let req = loki_proto::logproto::PushRequest::decode(&decompressed[..])
.context(DecodeOtlpRequestSnafu)?;
Ok(Self {
streams: req.streams.into(),
})
}
}
impl Iterator for LokiPbParser {
type Item = PbStreamItem;
fn next(&mut self) -> Option<Self::Item> {
let stream = self.streams.pop_front()?;
for stream in req.streams {
let labels = parse_loki_labels(&stream.labels)
.inspect_err(|e| {
error!(e; "failed to parse loki labels");
error!(e; "failed to parse loki labels, {:?}", stream.labels);
})
.ok();
// process entries
for entry in stream.entries {
Some(PbStreamItem {
entries: stream.entries.into(),
labels,
})
}
}
pub struct PbStreamItem {
pub entries: VecDeque<loki_proto::logproto::EntryAdapter>,
pub labels: Option<BTreeMap<String, String>>,
}
impl Iterator for PbStreamItem {
type Item = LokiMiddleItem<Vec<LabelPairAdapter>>;
fn next(&mut self) -> Option<Self::Item> {
while let Some(entry) = self.entries.pop_front() {
let ts = if let Some(ts) = entry.timestamp {
ts
} else {
warn!("missing timestamp, {:?}", entry);
continue;
};
let line = entry.line;
let structured_metadata = entry
.structured_metadata
.into_iter()
.map(|d| (d.name, Value::String(d.value.into())))
.collect::<BTreeMap<String, Value>>();
let structured_metadata = Value::Object(structured_metadata);
let structured_metadata = entry.structured_metadata;
let mut row = init_row(
schemas.len(),
prost_ts_to_nano(&ts),
return Some(LokiMiddleItem {
ts: prost_ts_to_nano(&ts),
line,
structured_metadata,
);
structured_metadata: Some(structured_metadata),
labels: self.labels.clone(),
});
}
None
}
}
process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref());
impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiRawItem {
fn from(val: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
let LokiMiddleItem {
ts,
line,
structured_metadata,
labels,
} = val;
rows.push(row);
let structured_metadata = structured_metadata
.unwrap_or_default()
.into_iter()
.map(|d| (d.name, Value::String(d.value.into())))
.collect::<BTreeMap<String, Value>>();
let structured_metadata = Value::Object(structured_metadata).to_vec();
LokiRawItem {
ts,
line,
structured_metadata,
labels,
}
}
}
Ok(rows)
impl From<LokiMiddleItem<Vec<LabelPairAdapter>>> for LokiPipeline {
fn from(value: LokiMiddleItem<Vec<LabelPairAdapter>>) -> Self {
let LokiMiddleItem {
ts,
line,
structured_metadata,
labels,
} = value;
let mut map = BTreeMap::new();
map.insert(
GREPTIME_TIMESTAMP.to_string(),
pipeline::Value::Timestamp(pipeline::Timestamp::Nanosecond(ts)),
);
map.insert(
LOKI_LINE_COLUMN_NAME.to_string(),
pipeline::Value::String(line),
);
structured_metadata
.unwrap_or_default()
.into_iter()
.for_each(|d| {
map.insert(
format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, d.name),
pipeline::Value::String(d.value),
);
});
if let Some(v) = labels {
v.into_iter().for_each(|(k, v)| {
map.insert(
format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k),
pipeline::Value::String(v),
);
});
}
LokiPipeline {
map: pipeline::Value::Map(pipeline::Map::from(map)),
}
}
}
/// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label
@@ -391,7 +693,7 @@ pub fn parse_loki_labels(labels: &str) -> Result<BTreeMap<String, String>> {
}
#[inline]
fn prost_ts_to_nano(ts: &Timestamp) -> i64 {
fn prost_ts_to_nano(ts: &LokiTimestamp) -> i64 {
ts.seconds * 1_000_000_000 + ts.nanos as i64
}
@@ -399,7 +701,7 @@ fn init_row(
schema_len: usize,
ts: i64,
line: String,
structured_metadata: Value,
structured_metadata: Vec<u8>,
) -> Vec<GreptimeValue> {
// create and init row
let mut row = Vec::with_capacity(schema_len);
@@ -411,7 +713,7 @@ fn init_row(
value_data: Some(ValueData::StringValue(line)),
});
row.push(GreptimeValue {
value_data: Some(ValueData::BinaryValue(structured_metadata.to_vec())),
value_data: Some(ValueData::BinaryValue(structured_metadata)),
});
for _ in 0..(schema_len - 3) {
row.push(GreptimeValue { value_data: None });
@@ -420,22 +722,24 @@ fn init_row(
}
fn process_labels(
column_indexer: &mut HashMap<String, u16>,
schemas: &mut Vec<ColumnSchema>,
schema_info: &mut SchemaInfo,
row: &mut Vec<GreptimeValue>,
labels: Option<&BTreeMap<String, String>>,
labels: Option<BTreeMap<String, String>>,
) {
let Some(labels) = labels else {
return;
};
let column_indexer = &mut schema_info.index;
let schemas = &mut schema_info.schema;
// insert labels
for (k, v) in labels {
if let Some(index) = column_indexer.get(k) {
if let Some(index) = column_indexer.get(&k) {
// exist in schema
// insert value using index
row[*index as usize] = GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
row[*index] = GreptimeValue {
value_data: Some(ValueData::StringValue(v)),
};
} else {
// not exist
@@ -447,10 +751,10 @@ fn process_labels(
datatype_extension: None,
options: None,
});
column_indexer.insert(k.clone(), (schemas.len() - 1) as u16);
column_indexer.insert(k, schemas.len() - 1);
row.push(GreptimeValue {
value_data: Some(ValueData::StringValue(v.clone())),
value_data: Some(ValueData::StringValue(v)),
});
}
}

View File

@@ -34,7 +34,9 @@ use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
use prost::Message;
use serde_json::{json, Value};
use servers::http::handler::HealthResponse;
use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME;
use servers::http::header::constants::{
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
};
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::jaeger::JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER;
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
@@ -116,7 +118,9 @@ macro_rules! http_tests {
test_otlp_traces_v1,
test_otlp_logs,
test_loki_pb_logs,
test_loki_pb_logs_with_pipeline,
test_loki_json_logs,
test_loki_json_logs_with_pipeline,
test_elasticsearch_logs,
test_elasticsearch_logs_with_index,
test_log_query,
@@ -3976,6 +3980,140 @@ pub async fn test_loki_pb_logs(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_loki_pb_logs_with_pipeline(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_loki_pb_logs_with_pipeline").await;
let client = TestClient::new(app).await;
let pipeline = r#"
processors:
- epoch:
field: greptime_timestamp
resolution: ms
"#;
let res = client
.post("/v1/pipelines/loki_pipe")
.header("content-type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
// init loki request
let req: PushRequest = PushRequest {
streams: vec![StreamAdapter {
labels: r#"{service="test",source="integration",wadaxi="do anything"}"#.to_string(),
entries: vec![
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
structured_metadata: vec![
LabelPairAdapter {
name: "key1".to_string(),
value: "value1".to_string(),
},
LabelPairAdapter {
name: "key2".to_string(),
value: "value2".to_string(),
},
],
parsed: vec![],
},
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:51").unwrap()),
line: "this is a log message 2".to_string(),
structured_metadata: vec![LabelPairAdapter {
name: "key3".to_string(),
value: "value3".to_string(),
}],
parsed: vec![],
},
EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:52").unwrap()),
line: "this is a log message 2".to_string(),
structured_metadata: vec![],
parsed: vec![],
},
],
hash: rand::random(),
}],
};
let encode = req.encode_to_vec();
let body = prom_store::snappy_compress(&encode).unwrap();
// write to loki
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static("content-encoding"),
HeaderValue::from_static("snappy"),
),
(
HeaderName::from_static("accept-encoding"),
HeaderValue::from_static("identity"),
),
(
HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME),
HeaderValue::from_static("loki_table_name"),
),
(
HeaderName::from_static(GREPTIME_PIPELINE_NAME_HEADER_NAME),
HeaderValue::from_static("loki_pipe"),
),
],
"/v1/loki/api/v1/push",
body,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// test schema
// CREATE TABLE IF NOT EXISTS "loki_table_name" (
// "greptime_timestamp" TIMESTAMP(3) NOT NULL,
// "loki_label_service" STRING NULL,
// "loki_label_source" STRING NULL,
// "loki_label_wadaxi" STRING NULL,
// "loki_line" STRING NULL,
// "loki_metadata_key1" STRING NULL,
// "loki_metadata_key2" STRING NULL,
// "loki_metadata_key3" STRING NULL,
// TIME INDEX ("greptime_timestamp")
// )
// ENGINE=mito
// WITH(
// append_mode = 'true'
// )
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_pb_schema",
&client,
"show create table loki_table_name;",
expected,
)
.await;
// test content
let expected = "[[1730976830000,\"test\",\"integration\",\"do anything\",\"this is a log message\",\"value1\",\"value2\",null],[1730976831000,\"test\",\"integration\",\"do anything\",\"this is a log message 2\",null,null,\"value3\"],[1730976832000,\"test\",\"integration\",\"do anything\",\"this is a log message 2\",null,null,null]]";
validate_data(
"loki_pb_content",
&client,
"select * from loki_table_name;",
expected,
)
.await;
guard.remove_all().await;
}
pub async fn test_loki_json_logs(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
@@ -4046,6 +4184,109 @@ pub async fn test_loki_json_logs(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_loki_json_logs_with_pipeline(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_loki_json_logs_with_pipeline").await;
let client = TestClient::new(app).await;
let pipeline = r#"
processors:
- epoch:
field: greptime_timestamp
resolution: ms
"#;
let res = client
.post("/v1/pipelines/loki_pipe")
.header("content-type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(StatusCode::OK, res.status());
let body = r#"
{
"streams": [
{
"stream": {
"source": "test",
"sender": "integration"
},
"values": [
[ "1735901380059465984", "this is line one", {"key1":"value1","key2":"value2"}],
[ "1735901398478897920", "this is line two", {"key3":"value3"}],
[ "1735901398478897921", "this is line two updated"]
]
}
]
}
"#;
let body = body.as_bytes().to_vec();
// write plain to loki
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/json"),
),
(
HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME),
HeaderValue::from_static("loki_table_name"),
),
(
HeaderName::from_static(GREPTIME_PIPELINE_NAME_HEADER_NAME),
HeaderValue::from_static("loki_pipe"),
),
],
"/v1/loki/api/v1/push",
body,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());
// test schema
// CREATE TABLE IF NOT EXISTS "loki_table_name" (
// "greptime_timestamp" TIMESTAMP(3) NOT NULL,
// "loki_label_sender" STRING NULL,
// "loki_label_source" STRING NULL,
// "loki_line" STRING NULL,
// "loki_metadata_key1" STRING NULL,
// "loki_metadata_key2" STRING NULL,
// "loki_metadata_key3" STRING NULL,
// TIME INDEX ("greptime_timestamp")
// )
// ENGINE=mito
// WITH(
// append_mode = 'true'
// )
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_json_schema",
&client,
"show create table loki_table_name;",
expected,
)
.await;
// test content
let expected = "[[1735901380059,\"integration\",\"test\",\"this is line one\",\"value1\",\"value2\",null],[1735901398478,\"integration\",\"test\",\"this is line two updated\",null,null,null],[1735901398478,\"integration\",\"test\",\"this is line two\",null,null,\"value3\"]]";
validate_data(
"loki_json_content",
&client,
"select * from loki_table_name;",
expected,
)
.await;
guard.remove_all().await;
}
pub async fn test_elasticsearch_logs(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =