diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 857c321d33..d656094c2e 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -29,7 +29,7 @@ pub use etl::{ DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap, }; pub use manager::{ - pipeline_operator, table, util, IdentityTimeIndex, PipelineDefinition, PipelineInfo, - PipelineRef, PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo, + pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition, + PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME, }; diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index b929cf3c23..3928e00b35 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -26,7 +26,7 @@ use util::to_pipeline_version; use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result}; use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION}; use crate::table::PipelineTable; -use crate::{Pipeline, Value}; +use crate::{GreptimePipelineParams, Pipeline, Value}; pub mod pipeline_operator; pub mod table; @@ -104,6 +104,22 @@ impl PipelineDefinition { } } +pub struct PipelineContext<'a> { + pub pipeline_definition: &'a PipelineDefinition, + pub pipeline_param: &'a GreptimePipelineParams, +} + +impl<'a> PipelineContext<'a> { + pub fn new( + pipeline_definition: &'a PipelineDefinition, + pipeline_param: &'a GreptimePipelineParams, + ) -> Self { + Self { + pipeline_definition, + pipeline_param, + } + } +} pub enum PipelineWay { OtlpLogDirect(Box), Pipeline(PipelineDefinition), diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs index ba0e97acbf..23d7f1b2dc 100644 --- a/src/servers/src/elasticsearch.rs +++ b/src/servers/src/elasticsearch.rs @@ -33,7 +33,9 @@ use crate::error::{ status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu, Result as ServersResult, }; -use crate::http::event::{ingest_logs_inner, LogIngestRequest, LogIngesterQueryParams, LogState}; +use crate::http::event::{ + ingest_logs_inner, LogIngesterQueryParams, LogState, PipelineIngestRequest, +}; use crate::metrics::{ METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED, }; @@ -276,7 +278,7 @@ fn parse_bulk_request( input: &str, index_from_url: &Option, msg_field: &Option, -) -> ServersResult> { +) -> ServersResult> { // Read the ndjson payload and convert it to `Vec`. Return error if the input is not a valid JSON. let values: Vec = Deserializer::from_str(input) .into_iter::() @@ -291,7 +293,7 @@ fn parse_bulk_request( } ); - let mut requests: Vec = Vec::with_capacity(values.len() / 2); + let mut requests: Vec = Vec::with_capacity(values.len() / 2); let mut values = values.into_iter(); // Read the ndjson payload and convert it to a (index, value) vector. @@ -331,7 +333,7 @@ fn parse_bulk_request( ); let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?; - requests.push(LogIngestRequest { + requests.push(PipelineIngestRequest { table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()), values: vec![log_value], }); @@ -402,13 +404,13 @@ mod tests { None, None, Ok(vec![ - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![ pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(), ], }, - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()], }, @@ -425,11 +427,11 @@ mod tests { Some("logs".to_string()), None, Ok(vec![ - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()], }, - LogIngestRequest { + PipelineIngestRequest { table: "logs".to_string(), values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()], }, @@ -446,11 +448,11 @@ mod tests { Some("logs".to_string()), None, Ok(vec![ - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()], }, - LogIngestRequest { + PipelineIngestRequest { table: "logs".to_string(), values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()], }, @@ -466,7 +468,7 @@ mod tests { Some("logs".to_string()), None, Ok(vec![ - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()], }, @@ -483,11 +485,11 @@ mod tests { None, Some("data".to_string()), Ok(vec![ - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()], }, - LogIngestRequest { + PipelineIngestRequest { table: "test".to_string(), values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()], }, @@ -504,13 +506,13 @@ mod tests { None, Some("message".to_string()), Ok(vec![ - LogIngestRequest { + PipelineIngestRequest { table: "logs-generic-default".to_string(), values: vec![ pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(), ], }, - LogIngestRequest { + PipelineIngestRequest { table: "logs-generic-default".to_string(), values: vec![ pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(), diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index cfc0694ae0..fba3e33582 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -33,7 +33,7 @@ use datatypes::value::column_data_to_json; use headers::ContentType; use lazy_static::lazy_static; use pipeline::util::to_pipeline_version; -use pipeline::{GreptimePipelineParams, PipelineDefinition, PipelineMap}; +use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap}; use serde::{Deserialize, Serialize}; use serde_json::{json, Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; @@ -100,7 +100,7 @@ pub struct LogIngesterQueryParams { /// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests. /// Multiple LogIngestRequests will be ingested into the same database with the same pipeline. #[derive(Debug, PartialEq)] -pub(crate) struct LogIngestRequest { +pub(crate) struct PipelineIngestRequest { /// The table where the log data will be written to. pub table: String, /// The log data to be ingested. @@ -325,12 +325,15 @@ async fn dryrun_pipeline_inner( ) -> Result { let params = GreptimePipelineParams::default(); + let pipeline_def = PipelineDefinition::Resolved(pipeline); + let pipeline_ctx = PipelineContext::new(&pipeline_def, ¶ms); let results = run_pipeline( &pipeline_handler, - &PipelineDefinition::Resolved(pipeline), - ¶ms, - value, - "dry_run".to_owned(), + &pipeline_ctx, + PipelineIngestRequest { + table: "dry_run".to_owned(), + values: value, + }, query_ctx, true, ) @@ -603,7 +606,7 @@ pub async fn log_ingester( ingest_logs_inner( handler, pipeline, - vec![LogIngestRequest { + vec![PipelineIngestRequest { table: table_name, values: value, }], @@ -673,9 +676,9 @@ fn extract_pipeline_value_by_content_type( } pub(crate) async fn ingest_logs_inner( - state: PipelineHandlerRef, + handler: PipelineHandlerRef, pipeline: PipelineDefinition, - log_ingest_requests: Vec, + log_ingest_requests: Vec, query_ctx: QueryContextRef, headers: HeaderMap, ) -> Result { @@ -690,22 +693,15 @@ pub(crate) async fn ingest_logs_inner( .and_then(|v| v.to_str().ok()), ); - for request in log_ingest_requests { - let requests = run_pipeline( - &state, - &pipeline, - &pipeline_params, - request.values, - request.table, - &query_ctx, - true, - ) - .await?; + let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params); + for pipeline_req in log_ingest_requests { + let requests = + run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?; insert_requests.extend(requests); } - let output = state + let output = handler .insert( RowInsertRequests { inserts: insert_requests, diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index caafd0f1f3..8c849bf735 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -83,33 +83,17 @@ impl Default for RemoteWriteQuery { )] pub async fn remote_write( State(state): State, - query: Query, - extension: Extension, - content_encoding: TypedHeader, - raw_body: Bytes, -) -> Result { - remote_write_impl( - state.prom_store_handler, - query, - extension, - content_encoding, - raw_body, - state.is_strict_mode, - state.prom_store_with_metric_engine, - ) - .await -} - -async fn remote_write_impl( - handler: PromStoreProtocolHandlerRef, Query(params): Query, Extension(mut query_ctx): Extension, content_encoding: TypedHeader, body: Bytes, - is_strict_mode: bool, - is_metric_engine: bool, ) -> Result { - // VictoriaMetrics handshake + let PromStoreState { + prom_store_handler, + prom_store_with_metric_engine, + is_strict_mode, + } = state; + if let Some(_vm_handshake) = params.get_vm_proto_version { return Ok(VM_PROTO_VERSION.into_response()); } @@ -128,7 +112,9 @@ async fn remote_write_impl( } let query_ctx = Arc::new(query_ctx); - let output = handler.write(request, query_ctx, is_metric_engine).await?; + let output = prom_store_handler + .write(request, query_ctx, prom_store_with_metric_engine) + .await?; crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(( StatusCode::NO_CONTENT, diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 95b3b21e99..cb6a3d89cf 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -24,7 +24,7 @@ use jsonb::{Number as JsonbNumber, Value as JsonbValue}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; -use pipeline::{GreptimePipelineParams, PipelineWay, SchemaInfo, SelectInfo}; +use pipeline::{GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo}; use serde_json::{Map, Value}; use session::context::QueryContextRef; use snafu::{ensure, ResultExt}; @@ -33,6 +33,7 @@ use crate::error::{ IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, }; +use crate::http::event::PipelineIngestRequest; use crate::otlp::trace::attributes::OtlpAnyValue; use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb}; use crate::pipeline::run_pipeline; @@ -74,12 +75,14 @@ pub async fn to_grpc_insert_requests( let data = parse_export_logs_service_request(request); let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?; + let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_params); let inserts = run_pipeline( &pipeline_handler, - &pipeline_def, - &pipeline_params, - array, - table_name, + &pipeline_ctx, + PipelineIngestRequest { + table: table_name, + values: array, + }, query_ctx, true, ) diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 8d8538a319..2a7970e2f7 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -18,13 +18,14 @@ use std::sync::Arc; use api::v1::{RowInsertRequest, Rows}; use hashbrown::HashMap; use pipeline::{ - DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineDefinition, - PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineContext, + PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::QueryContextRef; use snafu::ResultExt; use crate::error::{CatalogSnafu, PipelineSnafu, Result}; +use crate::http::event::PipelineIngestRequest; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, }; @@ -51,36 +52,24 @@ pub async fn get_pipeline( pub(crate) async fn run_pipeline( handler: &PipelineHandlerRef, - pipeline_definition: &PipelineDefinition, - pipeline_parameters: &GreptimePipelineParams, - data_array: Vec, - table_name: String, + pipeline_ctx: &PipelineContext<'_>, + pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, is_top_level: bool, ) -> Result> { - match pipeline_definition { + match &pipeline_ctx.pipeline_definition { PipelineDefinition::GreptimeIdentityPipeline(custom_ts) => { run_identity_pipeline( handler, custom_ts.as_ref(), - pipeline_parameters, - data_array, - table_name, + pipeline_ctx.pipeline_param, + pipeline_req, query_ctx, ) .await } _ => { - run_custom_pipeline( - handler, - pipeline_definition, - pipeline_parameters, - data_array, - table_name, - query_ctx, - is_top_level, - ) - .await + run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await } } } @@ -89,10 +78,13 @@ async fn run_identity_pipeline( handler: &PipelineHandlerRef, custom_ts: Option<&IdentityTimeIndex>, pipeline_parameters: &GreptimePipelineParams, - data_array: Vec, - table_name: String, + pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, ) -> Result> { + let PipelineIngestRequest { + table: table_name, + values: data_array, + } = pipeline_req; let table = handler .get_table(&table_name, query_ctx) .await @@ -109,18 +101,20 @@ async fn run_identity_pipeline( async fn run_custom_pipeline( handler: &PipelineHandlerRef, - pipeline_definition: &PipelineDefinition, - pipeline_parameters: &GreptimePipelineParams, - data_array: Vec, - table_name: String, + pipeline_ctx: &PipelineContext<'_>, + pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, is_top_level: bool, ) -> Result> { let db = query_ctx.get_db_string(); - let pipeline = get_pipeline(pipeline_definition, handler, query_ctx).await?; + let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?; let transform_timer = std::time::Instant::now(); + let PipelineIngestRequest { + table: table_name, + values: data_array, + } = pipeline_req; let arr_len = data_array.len(); let mut req_map = HashMap::new(); let mut dispatched: BTreeMap> = BTreeMap::new(); @@ -185,12 +179,15 @@ async fn run_custom_pipeline( // run pipeline recursively. let next_pipeline_def = PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?; + let next_pipeline_ctx = + PipelineContext::new(&next_pipeline_def, pipeline_ctx.pipeline_param); let requests = Box::pin(run_pipeline( handler, - &next_pipeline_def, - pipeline_parameters, - coll, - table_name, + &next_pipeline_ctx, + PipelineIngestRequest { + table: table_name, + values: coll, + }, query_ctx, false, ))