diff --git a/src/pipeline/src/etl/ctx_req.rs b/src/pipeline/src/etl/ctx_req.rs index ebf8c3b539..2ce60a16d0 100644 --- a/src/pipeline/src/etl/ctx_req.rs +++ b/src/pipeline/src/etl/ctx_req.rs @@ -220,6 +220,10 @@ impl ContextReq { pub fn ref_all_req(&self) -> impl Iterator { self.req.values().flatten() } + + pub fn map_len(&self) -> usize { + self.req.len() + } } // ContextReqIter is an iterator that iterates over the ContextReq. diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 30ceb6866e..709a93dfab 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -24,7 +24,7 @@ pub use etl::processor::Processor; pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo}; pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::GreptimeTransformer; -pub use etl::value::{Array, Map, Value}; +pub use etl::value::{Array, Map, Timestamp, Value}; pub use etl::{ json_array_to_map, json_to_map, parse, simd_json_array_to_map, simd_json_to_map, Content, DispatchedTo, Pipeline, PipelineExecOutput, TransformedOutput, TransformerMode, @@ -50,24 +50,3 @@ macro_rules! unwrap_or_continue_if_err { } }}; } - -#[macro_export] -macro_rules! unwrap_or_warn_continue { - ($expr:expr, $msg:expr) => { - if let Some(value) = $expr { - value - } else { - warn!($msg); - continue; - } - }; - - ($expr:expr, $fmt:expr, $($arg:tt)*) => { - if let Some(value) = $expr { - value - } else { - warn!($fmt, $($arg)*); - continue; - } - }; -} diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index d2920ce109..090b70fe92 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -12,15 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, VecDeque}; use std::sync::Arc; use std::time::Instant; -use ahash::{HashMap, HashMapExt}; use api::v1::value::ValueData; use api::v1::{ ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, - RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue, + RowInsertRequest, Rows, SemanticType, Value as GreptimeValue, }; use axum::extract::State; use axum::Extension; @@ -32,8 +31,10 @@ use common_telemetry::{error, warn}; use headers::ContentType; use jsonb::Value; use lazy_static::lazy_static; -use loki_proto::prost_types::Timestamp; -use pipeline::unwrap_or_warn_continue; +use loki_proto::logproto::LabelPairAdapter; +use loki_proto::prost_types::Timestamp as LokiTimestamp; +use pipeline::util::to_pipeline_version; +use pipeline::{ContextReq, PipelineContext, PipelineDefinition, SchemaInfo}; use prost::Message; use quoted_string::test_utils::TestSpec; use session::context::{Channel, QueryContext}; @@ -41,22 +42,28 @@ use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ DecodeOtlpRequestSnafu, InvalidLokiLabelsSnafu, InvalidLokiPayloadSnafu, ParseJsonSnafu, - Result, UnsupportedContentTypeSnafu, + PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; -use crate::http::event::{LogState, JSON_CONTENT_TYPE, PB_CONTENT_TYPE}; -use crate::http::extractor::LogTableName; +use crate::http::event::{LogState, PipelineIngestRequest, JSON_CONTENT_TYPE, PB_CONTENT_TYPE}; +use crate::http::extractor::{LogTableName, PipelineInfo}; use crate::http::result::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_LOKI_LOGS_INGESTION_COUNTER, METRIC_LOKI_LOGS_INGESTION_ELAPSED, METRIC_SUCCESS_VALUE, }; +use crate::pipeline::run_pipeline; use crate::prom_store; const LOKI_TABLE_NAME: &str = "loki_logs"; const LOKI_LINE_COLUMN: &str = "line"; const LOKI_STRUCTURED_METADATA_COLUMN: &str = "structured_metadata"; +const LOKI_LINE_COLUMN_NAME: &str = "loki_line"; + +const LOKI_PIPELINE_METADATA_PREFIX: &str = "loki_metadata_"; +const LOKI_PIPELINE_LABEL_PREFIX: &str = "loki_label_"; + const STREAMS_KEY: &str = "streams"; const LABEL_KEY: &str = "stream"; const LINES_KEY: &str = "values"; @@ -97,6 +104,7 @@ pub async fn loki_ingest( Extension(mut ctx): Extension, TypedHeader(content_type): TypedHeader, LogTableName(table_name): LogTableName, + pipeline_info: PipelineInfo, bytes: Bytes, ) -> Result { ctx.set_channel(Channel::Loki); @@ -106,213 +114,507 @@ pub async fn loki_ingest( let db_str = db.as_str(); let exec_timer = Instant::now(); - // init schemas - let mut schemas = LOKI_INIT_SCHEMAS.clone(); - - let mut rows = match content_type { - x if x == *JSON_CONTENT_TYPE => handle_json_req(bytes, &mut schemas).await, - x if x == *PB_CONTENT_TYPE => handle_pb_req(bytes, &mut schemas).await, - _ => UnsupportedContentTypeSnafu { content_type }.fail(), - }?; - - // fill Null for missing values - for row in rows.iter_mut() { - row.resize(schemas.len(), GreptimeValue::default()); - } - - let rows = Rows { - rows: rows.into_iter().map(|values| Row { values }).collect(), - schema: schemas, - }; - let ins_req = RowInsertRequest { - table_name, - rows: Some(rows), - }; - let ins_reqs = RowInsertRequests { - inserts: vec![ins_req], - }; - let handler = log_state.log_handler; - let output = handler.insert(ins_reqs, ctx).await; - if let Ok(Output { - data: OutputData::AffectedRows(rows), - meta: _, - }) = &output - { - METRIC_LOKI_LOGS_INGESTION_COUNTER - .with_label_values(&[db_str]) - .inc_by(*rows as u64); - METRIC_LOKI_LOGS_INGESTION_ELAPSED - .with_label_values(&[db_str, METRIC_SUCCESS_VALUE]) - .observe(exec_timer.elapsed().as_secs_f64()); + let ctx_req = if let Some(pipeline_name) = pipeline_info.pipeline_name { + // go pipeline + let version = to_pipeline_version(pipeline_info.pipeline_version.as_deref()) + .context(PipelineSnafu)?; + let def = + PipelineDefinition::from_name(&pipeline_name, version, None).context(PipelineSnafu)?; + let pipeline_ctx = + PipelineContext::new(&def, &pipeline_info.pipeline_params, Channel::Loki); + + let v = extract_item::(content_type, bytes)? + .map(|i| i.map) + .collect::>(); + + let req = PipelineIngestRequest { + table: table_name, + values: v, + }; + + run_pipeline(&handler, &pipeline_ctx, req, &ctx, true).await? } else { - METRIC_LOKI_LOGS_INGESTION_ELAPSED - .with_label_values(&[db_str, METRIC_FAILURE_VALUE]) - .observe(exec_timer.elapsed().as_secs_f64()); + // init schemas + let mut schema_info = SchemaInfo::from_schema_list(LOKI_INIT_SCHEMAS.clone()); + let mut rows = Vec::with_capacity(256); + for loki_row in extract_item::(content_type, bytes)? { + let mut row = init_row( + schema_info.schema.len(), + loki_row.ts, + loki_row.line, + loki_row.structured_metadata, + ); + process_labels(&mut schema_info, &mut row, loki_row.labels); + rows.push(row); + } + + let schemas = schema_info.schema; + // fill Null for missing values + for row in rows.iter_mut() { + row.resize(schemas.len(), GreptimeValue::default()); + } + let rows = Rows { + rows: rows.into_iter().map(|values| Row { values }).collect(), + schema: schemas, + }; + let ins_req = RowInsertRequest { + table_name, + rows: Some(rows), + }; + + ContextReq::default_opt_with_reqs(vec![ins_req]) + }; + + let mut outputs = Vec::with_capacity(ctx_req.map_len()); + for (temp_ctx, req) in ctx_req.as_req_iter(ctx) { + let output = handler.insert(req, temp_ctx).await; + + if let Ok(Output { + data: OutputData::AffectedRows(rows), + meta: _, + }) = &output + { + METRIC_LOKI_LOGS_INGESTION_COUNTER + .with_label_values(&[db_str]) + .inc_by(*rows as u64); + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_SUCCESS_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } else { + METRIC_LOKI_LOGS_INGESTION_ELAPSED + .with_label_values(&[db_str, METRIC_FAILURE_VALUE]) + .observe(exec_timer.elapsed().as_secs_f64()); + } + outputs.push(output); } - let response = GreptimedbV1Response::from_output(vec![output]) + let response = GreptimedbV1Response::from_output(outputs) .await .with_execution_time(exec_timer.elapsed().as_millis() as u64); Ok(response) } -async fn handle_json_req( - bytes: Bytes, - schemas: &mut Vec, -) -> Result>> { - let mut column_indexer: HashMap = HashMap::new(); - column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0); - column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1); - - let payload: serde_json::Value = - serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?; - - let streams = payload - .get(STREAMS_KEY) - .context(InvalidLokiPayloadSnafu { - msg: "missing streams", - })? - .as_array() - .context(InvalidLokiPayloadSnafu { - msg: "streams is not an array", - })?; - - let mut rows = Vec::with_capacity(1000); - - for (stream_index, stream) in streams.iter().enumerate() { - // parse lines first - // do not use `?` in case there are multiple streams - let lines = unwrap_or_warn_continue!( - stream.get(LINES_KEY), - "missing values on stream {}", - stream_index - ); - let lines = unwrap_or_warn_continue!( - lines.as_array(), - "values is not an array on stream {}", - stream_index - ); - - // get labels - let labels = stream - .get(LABEL_KEY) - .and_then(|label| label.as_object()) - .map(|l| { - l.iter() - .filter_map(|(k, v)| v.as_str().map(|v| (k.clone(), v.to_string()))) - .collect::>() - }); - - // process each line - for (line_index, line) in lines.iter().enumerate() { - let line = unwrap_or_warn_continue!( - line.as_array(), - "missing line on stream {} index {}", - stream_index, - line_index - ); - if line.len() < 2 { - warn!( - "line on stream {} index {} is too short", - stream_index, line_index - ); - continue; - } - // get ts - let ts = unwrap_or_warn_continue!( - line.first() - .and_then(|ts| ts.as_str()) - .and_then(|ts| ts.parse::().ok()), - "missing or invalid timestamp on stream {} index {}", - stream_index, - line_index - ); - // get line - let line_text = unwrap_or_warn_continue!( - line.get(1) - .and_then(|line| line.as_str()) - .map(|line| line.to_string()), - "missing or invalid line on stream {} index {}", - stream_index, - line_index - ); - - let structured_metadata = match line.get(2) { - Some(sdata) if sdata.is_object() => sdata - .as_object() - .unwrap() - .iter() - .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), Value::String(s.into())))) - .collect(), - _ => BTreeMap::new(), - }; - let structured_metadata = Value::Object(structured_metadata); - - let mut row = init_row(schemas.len(), ts, line_text, structured_metadata); - - process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref()); - - rows.push(row); - } - } - - Ok(rows) +/// This is the holder of the loki lines parsed from json or protobuf. +/// The generic here is either [serde_json::Value] or [Vec]. +/// Depending on the target destination, this can be converted to [LokiRawItem] or [LokiPipeline]. +pub struct LokiMiddleItem { + pub ts: i64, + pub line: String, + pub structured_metadata: Option, + pub labels: Option>, } -async fn handle_pb_req( - bytes: Bytes, - schemas: &mut Vec, -) -> Result>> { - let decompressed = prom_store::snappy_decompress(&bytes).unwrap(); - let req = loki_proto::logproto::PushRequest::decode(&decompressed[..]) - .context(DecodeOtlpRequestSnafu)?; +/// This is the line item for the Loki raw ingestion. +/// We'll persist the line in its whole, set labels into tags, +/// and structured metadata into a big JSON. +pub struct LokiRawItem { + pub ts: i64, + pub line: String, + pub structured_metadata: Vec, + pub labels: Option>, +} - let mut column_indexer: HashMap = HashMap::new(); - column_indexer.insert(GREPTIME_TIMESTAMP.to_string(), 0); - column_indexer.insert(LOKI_LINE_COLUMN.to_string(), 1); +/// This is the line item prepared for the pipeline engine. +pub struct LokiPipeline { + pub map: pipeline::Value, +} - let cnt = req.streams.iter().map(|s| s.entries.len()).sum::(); - let mut rows = Vec::with_capacity(cnt); +/// This is the flow of the Loki ingestion. +/// +--------+ +/// | bytes | +/// +--------+ +/// | +/// +----------------------+----------------------+ +/// | | | +/// | JSON content type | PB content type | +/// +----------------------+----------------------+ +/// | | | +/// | JsonStreamItem | PbStreamItem | +/// | stream: serde_json | stream: adapter | +/// +----------------------+----------------------+ +/// | | | +/// | MiddleItem | MiddleItem | +/// +----------------------+----------------------+ +/// \ / +/// \ / +/// \ / +/// +----------------------+ +/// | MiddleItem | +/// +----------------------+ +/// | +/// +----------------+----------------+ +/// | | +/// +------------------+ +---------------------+ +/// | LokiRawItem | | LokiPipelineItem | +/// +------------------+ +---------------------+ +/// | | +/// +------------------+ +---------------------+ +/// | Loki ingest | | run_pipeline | +/// +------------------+ +---------------------+ +fn extract_item(content_type: ContentType, bytes: Bytes) -> Result>> +where + LokiMiddleItem: Into, + LokiMiddleItem>: Into, +{ + match content_type { + x if x == *JSON_CONTENT_TYPE => Ok(Box::new( + LokiJsonParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())), + )), + x if x == *PB_CONTENT_TYPE => Ok(Box::new( + LokiPbParser::from_bytes(bytes)?.flat_map(|item| item.into_iter().map(|i| i.into())), + )), + _ => UnsupportedContentTypeSnafu { content_type }.fail(), + } +} + +struct LokiJsonParser { + pub streams: VecDeque, +} + +impl LokiJsonParser { + pub fn from_bytes(bytes: Bytes) -> Result { + let payload: serde_json::Value = + serde_json::from_slice(bytes.as_ref()).context(ParseJsonSnafu)?; + + let serde_json::Value::Object(mut map) = payload else { + return InvalidLokiPayloadSnafu { + msg: "payload is not an object", + } + .fail(); + }; + + let streams = map.remove(STREAMS_KEY).context(InvalidLokiPayloadSnafu { + msg: "missing streams", + })?; + + let serde_json::Value::Array(streams) = streams else { + return InvalidLokiPayloadSnafu { + msg: "streams is not an array", + } + .fail(); + }; + + Ok(Self { + streams: streams.into(), + }) + } +} + +impl Iterator for LokiJsonParser { + type Item = JsonStreamItem; + + fn next(&mut self) -> Option { + while let Some(stream) = self.streams.pop_front() { + // get lines from the map + let serde_json::Value::Object(mut map) = stream else { + warn!("stream is not an object, {:?}", stream); + continue; + }; + let Some(lines) = map.remove(LINES_KEY) else { + warn!("missing lines on stream, {:?}", map); + continue; + }; + let serde_json::Value::Array(lines) = lines else { + warn!("lines is not an array, {:?}", lines); + continue; + }; + + // get labels + let labels = map + .remove(LABEL_KEY) + .and_then(|m| match m { + serde_json::Value::Object(labels) => Some(labels), + _ => None, + }) + .map(|m| { + m.into_iter() + .filter_map(|(k, v)| match v { + serde_json::Value::String(v) => Some((k, v)), + _ => None, + }) + .collect::>() + }); + + return Some(JsonStreamItem { + lines: lines.into(), + labels, + }); + } + None + } +} + +struct JsonStreamItem { + pub lines: VecDeque, + pub labels: Option>, +} + +impl Iterator for JsonStreamItem { + type Item = LokiMiddleItem; + + fn next(&mut self) -> Option { + while let Some(line) = self.lines.pop_front() { + let serde_json::Value::Array(line) = line else { + warn!("line is not an array, {:?}", line); + continue; + }; + if line.len() < 2 { + warn!("line is too short, {:?}", line); + continue; + } + let mut line: VecDeque = line.into(); + + // get ts + let ts = line.pop_front().and_then(|ts| match ts { + serde_json::Value::String(ts) => ts.parse::().ok(), + _ => { + warn!("missing or invalid timestamp, {:?}", ts); + None + } + }); + let Some(ts) = ts else { + continue; + }; + + let line_text = line.pop_front().and_then(|l| match l { + serde_json::Value::String(l) => Some(l), + _ => { + warn!("missing or invalid line, {:?}", l); + None + } + }); + let Some(line_text) = line_text else { + continue; + }; + + let structured_metadata = line.pop_front(); + + return Some(LokiMiddleItem { + ts, + line: line_text, + structured_metadata, + labels: self.labels.clone(), + }); + } + None + } +} + +impl From> for LokiRawItem { + fn from(val: LokiMiddleItem) -> Self { + let LokiMiddleItem { + ts, + line, + structured_metadata, + labels, + } = val; + + let structured_metadata = structured_metadata + .and_then(|m| match m { + serde_json::Value::Object(m) => Some(m), + _ => None, + }) + .map(|m| { + m.into_iter() + .filter_map(|(k, v)| match v { + serde_json::Value::String(v) => Some((k, Value::String(v.into()))), + _ => None, + }) + .collect::>() + }) + .unwrap_or_default(); + let structured_metadata = Value::Object(structured_metadata).to_vec(); + + LokiRawItem { + ts, + line, + structured_metadata, + labels, + } + } +} + +impl From> for LokiPipeline { + fn from(value: LokiMiddleItem) -> Self { + let LokiMiddleItem { + ts, + line, + structured_metadata, + labels, + } = value; + + let mut map = BTreeMap::new(); + map.insert( + GREPTIME_TIMESTAMP.to_string(), + pipeline::Value::Timestamp(pipeline::Timestamp::Nanosecond(ts)), + ); + map.insert( + LOKI_LINE_COLUMN_NAME.to_string(), + pipeline::Value::String(line), + ); + + if let Some(serde_json::Value::Object(m)) = structured_metadata { + for (k, v) in m { + match pipeline::Value::try_from(v) { + Ok(v) => { + map.insert(format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, k), v); + } + Err(e) => { + warn!("not a valid value, {:?}", e); + } + } + } + } + if let Some(v) = labels { + v.into_iter().for_each(|(k, v)| { + map.insert( + format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k), + pipeline::Value::String(v), + ); + }); + } + + LokiPipeline { + map: pipeline::Value::Map(pipeline::Map::from(map)), + } + } +} + +pub struct LokiPbParser { + pub streams: VecDeque, +} + +impl LokiPbParser { + pub fn from_bytes(bytes: Bytes) -> Result { + let decompressed = prom_store::snappy_decompress(&bytes).unwrap(); + let req = loki_proto::logproto::PushRequest::decode(&decompressed[..]) + .context(DecodeOtlpRequestSnafu)?; + + Ok(Self { + streams: req.streams.into(), + }) + } +} + +impl Iterator for LokiPbParser { + type Item = PbStreamItem; + + fn next(&mut self) -> Option { + let stream = self.streams.pop_front()?; - for stream in req.streams { let labels = parse_loki_labels(&stream.labels) .inspect_err(|e| { - error!(e; "failed to parse loki labels"); + error!(e; "failed to parse loki labels, {:?}", stream.labels); }) .ok(); - // process entries - for entry in stream.entries { + Some(PbStreamItem { + entries: stream.entries.into(), + labels, + }) + } +} + +pub struct PbStreamItem { + pub entries: VecDeque, + pub labels: Option>, +} + +impl Iterator for PbStreamItem { + type Item = LokiMiddleItem>; + + fn next(&mut self) -> Option { + while let Some(entry) = self.entries.pop_front() { let ts = if let Some(ts) = entry.timestamp { ts } else { + warn!("missing timestamp, {:?}", entry); continue; }; let line = entry.line; - let structured_metadata = entry - .structured_metadata - .into_iter() - .map(|d| (d.name, Value::String(d.value.into()))) - .collect::>(); - let structured_metadata = Value::Object(structured_metadata); + let structured_metadata = entry.structured_metadata; - let mut row = init_row( - schemas.len(), - prost_ts_to_nano(&ts), + return Some(LokiMiddleItem { + ts: prost_ts_to_nano(&ts), line, - structured_metadata, - ); + structured_metadata: Some(structured_metadata), + labels: self.labels.clone(), + }); + } + None + } +} - process_labels(&mut column_indexer, schemas, &mut row, labels.as_ref()); +impl From>> for LokiRawItem { + fn from(val: LokiMiddleItem>) -> Self { + let LokiMiddleItem { + ts, + line, + structured_metadata, + labels, + } = val; - rows.push(row); + let structured_metadata = structured_metadata + .unwrap_or_default() + .into_iter() + .map(|d| (d.name, Value::String(d.value.into()))) + .collect::>(); + let structured_metadata = Value::Object(structured_metadata).to_vec(); + + LokiRawItem { + ts, + line, + structured_metadata, + labels, } } +} - Ok(rows) +impl From>> for LokiPipeline { + fn from(value: LokiMiddleItem>) -> Self { + let LokiMiddleItem { + ts, + line, + structured_metadata, + labels, + } = value; + + let mut map = BTreeMap::new(); + map.insert( + GREPTIME_TIMESTAMP.to_string(), + pipeline::Value::Timestamp(pipeline::Timestamp::Nanosecond(ts)), + ); + map.insert( + LOKI_LINE_COLUMN_NAME.to_string(), + pipeline::Value::String(line), + ); + + structured_metadata + .unwrap_or_default() + .into_iter() + .for_each(|d| { + map.insert( + format!("{}{}", LOKI_PIPELINE_METADATA_PREFIX, d.name), + pipeline::Value::String(d.value), + ); + }); + + if let Some(v) = labels { + v.into_iter().for_each(|(k, v)| { + map.insert( + format!("{}{}", LOKI_PIPELINE_LABEL_PREFIX, k), + pipeline::Value::String(v), + ); + }); + } + + LokiPipeline { + map: pipeline::Value::Map(pipeline::Map::from(map)), + } + } } /// since we're hand-parsing the labels, if any error is encountered, we'll just skip the label @@ -391,7 +693,7 @@ pub fn parse_loki_labels(labels: &str) -> Result> { } #[inline] -fn prost_ts_to_nano(ts: &Timestamp) -> i64 { +fn prost_ts_to_nano(ts: &LokiTimestamp) -> i64 { ts.seconds * 1_000_000_000 + ts.nanos as i64 } @@ -399,7 +701,7 @@ fn init_row( schema_len: usize, ts: i64, line: String, - structured_metadata: Value, + structured_metadata: Vec, ) -> Vec { // create and init row let mut row = Vec::with_capacity(schema_len); @@ -411,7 +713,7 @@ fn init_row( value_data: Some(ValueData::StringValue(line)), }); row.push(GreptimeValue { - value_data: Some(ValueData::BinaryValue(structured_metadata.to_vec())), + value_data: Some(ValueData::BinaryValue(structured_metadata)), }); for _ in 0..(schema_len - 3) { row.push(GreptimeValue { value_data: None }); @@ -420,22 +722,24 @@ fn init_row( } fn process_labels( - column_indexer: &mut HashMap, - schemas: &mut Vec, + schema_info: &mut SchemaInfo, row: &mut Vec, - labels: Option<&BTreeMap>, + labels: Option>, ) { let Some(labels) = labels else { return; }; + let column_indexer = &mut schema_info.index; + let schemas = &mut schema_info.schema; + // insert labels for (k, v) in labels { - if let Some(index) = column_indexer.get(k) { + if let Some(index) = column_indexer.get(&k) { // exist in schema // insert value using index - row[*index as usize] = GreptimeValue { - value_data: Some(ValueData::StringValue(v.clone())), + row[*index] = GreptimeValue { + value_data: Some(ValueData::StringValue(v)), }; } else { // not exist @@ -447,10 +751,10 @@ fn process_labels( datatype_extension: None, options: None, }); - column_indexer.insert(k.clone(), (schemas.len() - 1) as u16); + column_indexer.insert(k, schemas.len() - 1); row.push(GreptimeValue { - value_data: Some(ValueData::StringValue(v.clone())), + value_data: Some(ValueData::StringValue(v)), }); } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index eea1065835..d1d84a1198 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -34,7 +34,9 @@ use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME; use prost::Message; use serde_json::{json, Value}; use servers::http::handler::HealthResponse; -use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME; +use servers::http::header::constants::{ + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME, +}; use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME}; use servers::http::jaeger::JAEGER_TIME_RANGE_FOR_OPERATIONS_HEADER; use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse}; @@ -116,7 +118,9 @@ macro_rules! http_tests { test_otlp_traces_v1, test_otlp_logs, test_loki_pb_logs, + test_loki_pb_logs_with_pipeline, test_loki_json_logs, + test_loki_json_logs_with_pipeline, test_elasticsearch_logs, test_elasticsearch_logs_with_index, test_log_query, @@ -3976,6 +3980,140 @@ pub async fn test_loki_pb_logs(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_loki_pb_logs_with_pipeline(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_loki_pb_logs_with_pipeline").await; + + let client = TestClient::new(app).await; + + let pipeline = r#" +processors: + - epoch: + field: greptime_timestamp + resolution: ms + "#; + + let res = client + .post("/v1/pipelines/loki_pipe") + .header("content-type", "application/x-yaml") + .body(pipeline) + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + + // init loki request + let req: PushRequest = PushRequest { + streams: vec![StreamAdapter { + labels: r#"{service="test",source="integration",wadaxi="do anything"}"#.to_string(), + entries: vec![ + EntryAdapter { + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()), + line: "this is a log message".to_string(), + structured_metadata: vec![ + LabelPairAdapter { + name: "key1".to_string(), + value: "value1".to_string(), + }, + LabelPairAdapter { + name: "key2".to_string(), + value: "value2".to_string(), + }, + ], + parsed: vec![], + }, + EntryAdapter { + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:51").unwrap()), + line: "this is a log message 2".to_string(), + structured_metadata: vec![LabelPairAdapter { + name: "key3".to_string(), + value: "value3".to_string(), + }], + parsed: vec![], + }, + EntryAdapter { + timestamp: Some(Timestamp::from_str("2024-11-07T10:53:52").unwrap()), + line: "this is a log message 2".to_string(), + structured_metadata: vec![], + parsed: vec![], + }, + ], + hash: rand::random(), + }], + }; + let encode = req.encode_to_vec(); + let body = prom_store::snappy_compress(&encode).unwrap(); + + // write to loki + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/x-protobuf"), + ), + ( + HeaderName::from_static("content-encoding"), + HeaderValue::from_static("snappy"), + ), + ( + HeaderName::from_static("accept-encoding"), + HeaderValue::from_static("identity"), + ), + ( + HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_table_name"), + ), + ( + HeaderName::from_static(GREPTIME_PIPELINE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_pipe"), + ), + ], + "/v1/loki/api/v1/push", + body, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // test schema + // CREATE TABLE IF NOT EXISTS "loki_table_name" ( + // "greptime_timestamp" TIMESTAMP(3) NOT NULL, + // "loki_label_service" STRING NULL, + // "loki_label_source" STRING NULL, + // "loki_label_wadaxi" STRING NULL, + // "loki_line" STRING NULL, + // "loki_metadata_key1" STRING NULL, + // "loki_metadata_key2" STRING NULL, + // "loki_metadata_key3" STRING NULL, + // TIME INDEX ("greptime_timestamp") + // ) + // ENGINE=mito + // WITH( + // append_mode = 'true' + // ) + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_service\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_label_wadaxi\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + validate_data( + "loki_pb_schema", + &client, + "show create table loki_table_name;", + expected, + ) + .await; + + // test content + let expected = "[[1730976830000,\"test\",\"integration\",\"do anything\",\"this is a log message\",\"value1\",\"value2\",null],[1730976831000,\"test\",\"integration\",\"do anything\",\"this is a log message 2\",null,null,\"value3\"],[1730976832000,\"test\",\"integration\",\"do anything\",\"this is a log message 2\",null,null,null]]"; + validate_data( + "loki_pb_content", + &client, + "select * from loki_table_name;", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_loki_json_logs(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = @@ -4046,6 +4184,109 @@ pub async fn test_loki_json_logs(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_loki_json_logs_with_pipeline(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_loki_json_logs_with_pipeline").await; + + let client = TestClient::new(app).await; + + let pipeline = r#" +processors: + - epoch: + field: greptime_timestamp + resolution: ms + "#; + + let res = client + .post("/v1/pipelines/loki_pipe") + .header("content-type", "application/x-yaml") + .body(pipeline) + .send() + .await; + assert_eq!(StatusCode::OK, res.status()); + + let body = r#" +{ + "streams": [ + { + "stream": { + "source": "test", + "sender": "integration" + }, + "values": [ + [ "1735901380059465984", "this is line one", {"key1":"value1","key2":"value2"}], + [ "1735901398478897920", "this is line two", {"key3":"value3"}], + [ "1735901398478897921", "this is line two updated"] + ] + } + ] +} + "#; + + let body = body.as_bytes().to_vec(); + + // write plain to loki + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/json"), + ), + ( + HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_table_name"), + ), + ( + HeaderName::from_static(GREPTIME_PIPELINE_NAME_HEADER_NAME), + HeaderValue::from_static("loki_pipe"), + ), + ], + "/v1/loki/api/v1/push", + body, + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + // test schema + // CREATE TABLE IF NOT EXISTS "loki_table_name" ( + // "greptime_timestamp" TIMESTAMP(3) NOT NULL, + // "loki_label_sender" STRING NULL, + // "loki_label_source" STRING NULL, + // "loki_line" STRING NULL, + // "loki_metadata_key1" STRING NULL, + // "loki_metadata_key2" STRING NULL, + // "loki_metadata_key3" STRING NULL, + // TIME INDEX ("greptime_timestamp") + // ) + // ENGINE=mito + // WITH( + // append_mode = 'true' + // ) + let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"greptime_timestamp\\\" TIMESTAMP(3) NOT NULL,\\n \\\"loki_label_sender\\\" STRING NULL,\\n \\\"loki_label_source\\\" STRING NULL,\\n \\\"loki_line\\\" STRING NULL,\\n \\\"loki_metadata_key1\\\" STRING NULL,\\n \\\"loki_metadata_key2\\\" STRING NULL,\\n \\\"loki_metadata_key3\\\" STRING NULL,\\n TIME INDEX (\\\"greptime_timestamp\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]"; + validate_data( + "loki_json_schema", + &client, + "show create table loki_table_name;", + expected, + ) + .await; + + // test content + let expected = "[[1735901380059,\"integration\",\"test\",\"this is line one\",\"value1\",\"value2\",null],[1735901398478,\"integration\",\"test\",\"this is line two updated\",null,null,null],[1735901398478,\"integration\",\"test\",\"this is line two\",null,null,\"value3\"]]"; + validate_data( + "loki_json_content", + &client, + "select * from loki_table_name;", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_elasticsearch_logs(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =