refactor: run_pipeline parameters (#5954)

* refactor: simplify run_pipeline params

* refactor: remove unnecessory function wrap
This commit is contained in:
shuiyisong
2025-04-22 19:34:19 +08:00
committed by GitHub
parent 9fb0487e67
commit 5c07f0dec7
7 changed files with 98 additions and 98 deletions

View File

@@ -29,7 +29,7 @@ pub use etl::{
DispatchedTo, Pipeline, PipelineExecOutput, PipelineMap,
};
pub use manager::{
pipeline_operator, table, util, IdentityTimeIndex, PipelineDefinition, PipelineInfo,
PipelineRef, PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
pipeline_operator, table, util, IdentityTimeIndex, PipelineContext, PipelineDefinition,
PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo,
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME,
};

View File

@@ -26,7 +26,7 @@ use util::to_pipeline_version;
use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result};
use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
use crate::table::PipelineTable;
use crate::{Pipeline, Value};
use crate::{GreptimePipelineParams, Pipeline, Value};
pub mod pipeline_operator;
pub mod table;
@@ -104,6 +104,22 @@ impl PipelineDefinition {
}
}
pub struct PipelineContext<'a> {
pub pipeline_definition: &'a PipelineDefinition,
pub pipeline_param: &'a GreptimePipelineParams,
}
impl<'a> PipelineContext<'a> {
pub fn new(
pipeline_definition: &'a PipelineDefinition,
pipeline_param: &'a GreptimePipelineParams,
) -> Self {
Self {
pipeline_definition,
pipeline_param,
}
}
}
pub enum PipelineWay {
OtlpLogDirect(Box<SelectInfo>),
Pipeline(PipelineDefinition),

View File

@@ -33,7 +33,9 @@ use crate::error::{
status_code_to_http_status, InvalidElasticsearchInputSnafu, ParseJsonSnafu, PipelineSnafu,
Result as ServersResult,
};
use crate::http::event::{ingest_logs_inner, LogIngestRequest, LogIngesterQueryParams, LogState};
use crate::http::event::{
ingest_logs_inner, LogIngesterQueryParams, LogState, PipelineIngestRequest,
};
use crate::metrics::{
METRIC_ELASTICSEARCH_LOGS_DOCS_COUNT, METRIC_ELASTICSEARCH_LOGS_INGESTION_ELAPSED,
};
@@ -276,7 +278,7 @@ fn parse_bulk_request(
input: &str,
index_from_url: &Option<String>,
msg_field: &Option<String>,
) -> ServersResult<Vec<LogIngestRequest>> {
) -> ServersResult<Vec<PipelineIngestRequest>> {
// Read the ndjson payload and convert it to `Vec<Value>`. Return error if the input is not a valid JSON.
let values: Vec<Value> = Deserializer::from_str(input)
.into_iter::<Value>()
@@ -291,7 +293,7 @@ fn parse_bulk_request(
}
);
let mut requests: Vec<LogIngestRequest> = Vec::with_capacity(values.len() / 2);
let mut requests: Vec<PipelineIngestRequest> = Vec::with_capacity(values.len() / 2);
let mut values = values.into_iter();
// Read the ndjson payload and convert it to a (index, value) vector.
@@ -331,7 +333,7 @@ fn parse_bulk_request(
);
let log_value = pipeline::json_to_map(log_value).context(PipelineSnafu)?;
requests.push(LogIngestRequest {
requests.push(PipelineIngestRequest {
table: index.unwrap_or_else(|| index_from_url.as_ref().unwrap().clone()),
values: vec![log_value],
});
@@ -402,13 +404,13 @@ mod tests {
None,
None,
Ok(vec![
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![
pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap(),
],
},
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
@@ -425,11 +427,11 @@ mod tests {
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
LogIngestRequest {
PipelineIngestRequest {
table: "logs".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
@@ -446,11 +448,11 @@ mod tests {
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
LogIngestRequest {
PipelineIngestRequest {
table: "logs".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
@@ -466,7 +468,7 @@ mod tests {
Some("logs".to_string()),
None,
Ok(vec![
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
@@ -483,11 +485,11 @@ mod tests {
None,
Some("data".to_string()),
Ok(vec![
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo1": "foo1_value", "bar1": "bar1_value"})).unwrap()],
},
LogIngestRequest {
PipelineIngestRequest {
table: "test".to_string(),
values: vec![pipeline::json_to_map(json!({"foo2": "foo2_value", "bar2": "bar2_value"})).unwrap()],
},
@@ -504,13 +506,13 @@ mod tests {
None,
Some("message".to_string()),
Ok(vec![
LogIngestRequest {
PipelineIngestRequest {
table: "logs-generic-default".to_string(),
values: vec![
pipeline::json_to_map(json!({"message": "172.16.0.1 - - [25/May/2024:20:19:37 +0000] \"GET /contact HTTP/1.1\" 404 162 \"-\" \"Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0 Mobile/15E148 Safari/604.1\""})).unwrap(),
],
},
LogIngestRequest {
PipelineIngestRequest {
table: "logs-generic-default".to_string(),
values: vec![
pipeline::json_to_map(json!({"message": "10.0.0.1 - - [25/May/2024:20:18:37 +0000] \"GET /images/logo.png HTTP/1.1\" 304 0 \"-\" \"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:89.0) Gecko/20100101 Firefox/89.0\""})).unwrap(),

View File

@@ -33,7 +33,7 @@ use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimePipelineParams, PipelineDefinition, PipelineMap};
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineDefinition, PipelineMap};
use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
@@ -100,7 +100,7 @@ pub struct LogIngesterQueryParams {
/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
/// Multiple LogIngestRequests will be ingested into the same database with the same pipeline.
#[derive(Debug, PartialEq)]
pub(crate) struct LogIngestRequest {
pub(crate) struct PipelineIngestRequest {
/// The table where the log data will be written to.
pub table: String,
/// The log data to be ingested.
@@ -325,12 +325,15 @@ async fn dryrun_pipeline_inner(
) -> Result<Response> {
let params = GreptimePipelineParams::default();
let pipeline_def = PipelineDefinition::Resolved(pipeline);
let pipeline_ctx = PipelineContext::new(&pipeline_def, &params);
let results = run_pipeline(
&pipeline_handler,
&PipelineDefinition::Resolved(pipeline),
&params,
value,
"dry_run".to_owned(),
&pipeline_ctx,
PipelineIngestRequest {
table: "dry_run".to_owned(),
values: value,
},
query_ctx,
true,
)
@@ -603,7 +606,7 @@ pub async fn log_ingester(
ingest_logs_inner(
handler,
pipeline,
vec![LogIngestRequest {
vec![PipelineIngestRequest {
table: table_name,
values: value,
}],
@@ -673,9 +676,9 @@ fn extract_pipeline_value_by_content_type(
}
pub(crate) async fn ingest_logs_inner(
state: PipelineHandlerRef,
handler: PipelineHandlerRef,
pipeline: PipelineDefinition,
log_ingest_requests: Vec<LogIngestRequest>,
log_ingest_requests: Vec<PipelineIngestRequest>,
query_ctx: QueryContextRef,
headers: HeaderMap,
) -> Result<HttpResponse> {
@@ -690,22 +693,15 @@ pub(crate) async fn ingest_logs_inner(
.and_then(|v| v.to_str().ok()),
);
for request in log_ingest_requests {
let requests = run_pipeline(
&state,
&pipeline,
&pipeline_params,
request.values,
request.table,
&query_ctx,
true,
)
.await?;
let pipeline_ctx = PipelineContext::new(&pipeline, &pipeline_params);
for pipeline_req in log_ingest_requests {
let requests =
run_pipeline(&handler, &pipeline_ctx, pipeline_req, &query_ctx, true).await?;
insert_requests.extend(requests);
}
let output = state
let output = handler
.insert(
RowInsertRequests {
inserts: insert_requests,

View File

@@ -83,33 +83,17 @@ impl Default for RemoteWriteQuery {
)]
pub async fn remote_write(
State(state): State<PromStoreState>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: Bytes,
) -> Result<impl IntoResponse> {
remote_write_impl(
state.prom_store_handler,
query,
extension,
content_encoding,
raw_body,
state.is_strict_mode,
state.prom_store_with_metric_engine,
)
.await
}
async fn remote_write_impl(
handler: PromStoreProtocolHandlerRef,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContext>,
content_encoding: TypedHeader<headers::ContentEncoding>,
body: Bytes,
is_strict_mode: bool,
is_metric_engine: bool,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
let PromStoreState {
prom_store_handler,
prom_store_with_metric_engine,
is_strict_mode,
} = state;
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}
@@ -128,7 +112,9 @@ async fn remote_write_impl(
}
let query_ctx = Arc::new(query_ctx);
let output = handler.write(request, query_ctx, is_metric_engine).await?;
let output = prom_store_handler
.write(request, query_ctx, prom_store_with_metric_engine)
.await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,

View File

@@ -24,7 +24,7 @@ use jsonb::{Number as JsonbNumber, Value as JsonbValue};
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue};
use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs};
use pipeline::{GreptimePipelineParams, PipelineWay, SchemaInfo, SelectInfo};
use pipeline::{GreptimePipelineParams, PipelineContext, PipelineWay, SchemaInfo, SelectInfo};
use serde_json::{Map, Value};
use session::context::QueryContextRef;
use snafu::{ensure, ResultExt};
@@ -33,6 +33,7 @@ use crate::error::{
IncompatibleSchemaSnafu, NotSupportedSnafu, PipelineSnafu, Result,
UnsupportedJsonDataTypeForTagSnafu,
};
use crate::http::event::PipelineIngestRequest;
use crate::otlp::trace::attributes::OtlpAnyValue;
use crate::otlp::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::pipeline::run_pipeline;
@@ -74,12 +75,14 @@ pub async fn to_grpc_insert_requests(
let data = parse_export_logs_service_request(request);
let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?;
let pipeline_ctx = PipelineContext::new(&pipeline_def, &pipeline_params);
let inserts = run_pipeline(
&pipeline_handler,
&pipeline_def,
&pipeline_params,
array,
table_name,
&pipeline_ctx,
PipelineIngestRequest {
table: table_name,
values: array,
},
query_ctx,
true,
)

View File

@@ -18,13 +18,14 @@ use std::sync::Arc;
use api::v1::{RowInsertRequest, Rows};
use hashbrown::HashMap;
use pipeline::{
DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineDefinition,
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineContext,
PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
use crate::http::event::PipelineIngestRequest;
use crate::metrics::{
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
};
@@ -51,36 +52,24 @@ pub async fn get_pipeline(
pub(crate) async fn run_pipeline(
handler: &PipelineHandlerRef,
pipeline_definition: &PipelineDefinition,
pipeline_parameters: &GreptimePipelineParams,
data_array: Vec<PipelineMap>,
table_name: String,
pipeline_ctx: &PipelineContext<'_>,
pipeline_req: PipelineIngestRequest,
query_ctx: &QueryContextRef,
is_top_level: bool,
) -> Result<Vec<RowInsertRequest>> {
match pipeline_definition {
match &pipeline_ctx.pipeline_definition {
PipelineDefinition::GreptimeIdentityPipeline(custom_ts) => {
run_identity_pipeline(
handler,
custom_ts.as_ref(),
pipeline_parameters,
data_array,
table_name,
pipeline_ctx.pipeline_param,
pipeline_req,
query_ctx,
)
.await
}
_ => {
run_custom_pipeline(
handler,
pipeline_definition,
pipeline_parameters,
data_array,
table_name,
query_ctx,
is_top_level,
)
.await
run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
}
}
}
@@ -89,10 +78,13 @@ async fn run_identity_pipeline(
handler: &PipelineHandlerRef,
custom_ts: Option<&IdentityTimeIndex>,
pipeline_parameters: &GreptimePipelineParams,
data_array: Vec<PipelineMap>,
table_name: String,
pipeline_req: PipelineIngestRequest,
query_ctx: &QueryContextRef,
) -> Result<Vec<RowInsertRequest>> {
let PipelineIngestRequest {
table: table_name,
values: data_array,
} = pipeline_req;
let table = handler
.get_table(&table_name, query_ctx)
.await
@@ -109,18 +101,20 @@ async fn run_identity_pipeline(
async fn run_custom_pipeline(
handler: &PipelineHandlerRef,
pipeline_definition: &PipelineDefinition,
pipeline_parameters: &GreptimePipelineParams,
data_array: Vec<PipelineMap>,
table_name: String,
pipeline_ctx: &PipelineContext<'_>,
pipeline_req: PipelineIngestRequest,
query_ctx: &QueryContextRef,
is_top_level: bool,
) -> Result<Vec<RowInsertRequest>> {
let db = query_ctx.get_db_string();
let pipeline = get_pipeline(pipeline_definition, handler, query_ctx).await?;
let pipeline = get_pipeline(pipeline_ctx.pipeline_definition, handler, query_ctx).await?;
let transform_timer = std::time::Instant::now();
let PipelineIngestRequest {
table: table_name,
values: data_array,
} = pipeline_req;
let arr_len = data_array.len();
let mut req_map = HashMap::new();
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
@@ -185,12 +179,15 @@ async fn run_custom_pipeline(
// run pipeline recursively.
let next_pipeline_def =
PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
let next_pipeline_ctx =
PipelineContext::new(&next_pipeline_def, pipeline_ctx.pipeline_param);
let requests = Box::pin(run_pipeline(
handler,
&next_pipeline_def,
pipeline_parameters,
coll,
table_name,
&next_pipeline_ctx,
PipelineIngestRequest {
table: table_name,
values: coll,
},
query_ctx,
false,
))