chore: support custom time index selector for identity pipeline (#5750)

* chore: minor refactor

* chore: minor refactor

* chore: support custom ts for identity pipeline

* chore: fix clippy

* chore: minor refactor & update tests

* chore: use ref on identity pipeline param
This commit is contained in:
shuiyisong
2025-03-24 12:27:22 +08:00
committed by GitHub
parent 5ad2d8b3b8
commit c77ce958a3
15 changed files with 733 additions and 382 deletions

View File

@@ -24,7 +24,7 @@ use common_error::ext::ErrorExt;
use common_telemetry::{debug, error};
use headers::ContentType;
use once_cell::sync::Lazy;
use pipeline::GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME;
use pipeline::{PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME};
use serde_json::{json, Deserializer, Value};
use session::context::{Channel, QueryContext};
use snafu::{ensure, ResultExt};
@@ -135,7 +135,7 @@ async fn do_handle_bulk_api(
.start_timer();
// If pipeline_name is not provided, use the internal pipeline.
let pipeline = if let Some(pipeline) = params.pipeline_name {
let pipeline_name = if let Some(pipeline) = params.pipeline_name {
pipeline
} else {
GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string()
@@ -159,10 +159,26 @@ async fn do_handle_bulk_api(
};
let log_num = requests.len();
let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) {
Ok(pipeline) => pipeline,
Err(e) => {
// should be unreachable
error!(e; "Failed to ingest logs");
return (
status_code_to_http_status(&e.status_code()),
elasticsearch_headers(),
axum::Json(write_bulk_response(
start.elapsed().as_millis() as i64,
0,
e.status_code() as u32,
e.to_string().as_str(),
)),
);
}
};
if let Err(e) = ingest_logs_inner(
log_state.log_handler,
pipeline,
None,
requests,
Arc::new(query_ctx),
headers,

View File

@@ -33,7 +33,7 @@ use datatypes::value::column_data_to_json;
use headers::ContentType;
use lazy_static::lazy_static;
use pipeline::util::to_pipeline_version;
use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition, PipelineVersion};
use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition};
use serde::{Deserialize, Serialize};
use serde_json::{json, Deserializer, Map, Value};
use session::context::{Channel, QueryContext, QueryContextRef};
@@ -86,6 +86,15 @@ pub struct LogIngesterQueryParams {
/// The JSON field name of the log message. If not provided, it will take the whole log as the message.
/// The field must be at the top level of the JSON structure.
pub msg_field: Option<String>,
/// Specify a custom time index from the input data rather than server's arrival time.
/// Valid formats:
/// - <field_name>;epoch;<resolution>
/// - <field_name>;datestr;<format>
///
/// If an error occurs while parsing the config, the error will be returned in the response.
/// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored.
/// If so, use the current server's timestamp as the event time.
pub custom_time_index: Option<String>,
}
/// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests.
@@ -281,9 +290,9 @@ async fn dryrun_pipeline_inner(
let results = run_pipeline(
&pipeline_handler,
PipelineDefinition::Resolved(pipeline),
&PipelineDefinition::Resolved(pipeline),
&params,
pipeline::json_array_to_intermediate_state(value).context(PipelineSnafu)?,
pipeline::json_array_to_map(value).context(PipelineSnafu)?,
"dry_run".to_owned(),
query_ctx,
true,
@@ -527,17 +536,23 @@ pub async fn log_ingester(
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
let table_name = query_params.table.context(InvalidParameterSnafu {
reason: "table is required",
})?;
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?;
let pipeline = PipelineDefinition::from_name(
&pipeline_name,
version,
query_params.custom_time_index.map(|s| (s, ignore_errors)),
)
.context(PipelineSnafu)?;
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
query_ctx.set_channel(Channel::Http);
@@ -550,8 +565,7 @@ pub async fn log_ingester(
ingest_logs_inner(
handler,
pipeline_name,
version,
pipeline,
vec![LogIngestRequest {
table: table_name,
values: value,
@@ -611,8 +625,7 @@ fn extract_pipeline_value_by_content_type(
pub(crate) async fn ingest_logs_inner(
state: PipelineHandlerRef,
pipeline_name: String,
version: PipelineVersion,
pipeline: PipelineDefinition,
log_ingest_requests: Vec<LogIngestRequest>,
query_ctx: QueryContextRef,
headers: HeaderMap,
@@ -631,9 +644,9 @@ pub(crate) async fn ingest_logs_inner(
for request in log_ingest_requests {
let requests = run_pipeline(
&state,
PipelineDefinition::from_name(&pipeline_name, version),
&pipeline,
&pipeline_params,
pipeline::json_array_to_intermediate_state(request.values).context(PipelineSnafu)?,
pipeline::json_array_to_map(request.values).context(PipelineSnafu)?,
request.table,
&query_ctx,
true,

View File

@@ -72,11 +72,11 @@ pub async fn to_grpc_insert_requests(
}
PipelineWay::Pipeline(pipeline_def) => {
let data = parse_export_logs_service_request(request);
let array = pipeline::json_array_to_intermediate_state(data).context(PipelineSnafu)?;
let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?;
let inserts = run_pipeline(
&pipeline_handler,
pipeline_def,
&pipeline_def,
&pipeline_params,
array,
table_name,

View File

@@ -17,8 +17,8 @@ use std::sync::Arc;
use api::v1::{RowInsertRequest, Rows};
use pipeline::{
DispatchedTo, GreptimePipelineParams, GreptimeTransformer, Pipeline, PipelineDefinition,
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
DispatchedTo, GreptimePipelineParams, GreptimeTransformer, IdentityTimeIndex, Pipeline,
PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
};
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -31,15 +31,15 @@ use crate::query_handler::PipelineHandlerRef;
/// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline
pub async fn get_pipeline(
pipeline_def: PipelineDefinition,
pipeline_def: &PipelineDefinition,
handler: &PipelineHandlerRef,
query_ctx: &QueryContextRef,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
match pipeline_def {
PipelineDefinition::Resolved(pipeline) => Ok(pipeline),
PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()),
PipelineDefinition::ByNameAndValue((name, version)) => {
handler
.get_pipeline(&name, version, query_ctx.clone())
.get_pipeline(name, *version, query_ctx.clone())
.await
}
_ => {
@@ -49,110 +49,151 @@ pub async fn get_pipeline(
}
pub(crate) async fn run_pipeline(
state: &PipelineHandlerRef,
pipeline_definition: PipelineDefinition,
handler: &PipelineHandlerRef,
pipeline_definition: &PipelineDefinition,
pipeline_parameters: &GreptimePipelineParams,
array: Vec<PipelineMap>,
data_array: Vec<PipelineMap>,
table_name: String,
query_ctx: &QueryContextRef,
is_top_level: bool,
) -> Result<Vec<RowInsertRequest>> {
match pipeline_definition {
PipelineDefinition::GreptimeIdentityPipeline(custom_ts) => {
run_identity_pipeline(
handler,
custom_ts.as_ref(),
pipeline_parameters,
data_array,
table_name,
query_ctx,
)
.await
}
_ => {
run_custom_pipeline(
handler,
pipeline_definition,
pipeline_parameters,
data_array,
table_name,
query_ctx,
is_top_level,
)
.await
}
}
}
async fn run_identity_pipeline(
handler: &PipelineHandlerRef,
custom_ts: Option<&IdentityTimeIndex>,
pipeline_parameters: &GreptimePipelineParams,
data_array: Vec<PipelineMap>,
table_name: String,
query_ctx: &QueryContextRef,
) -> Result<Vec<RowInsertRequest>> {
let table = handler
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?;
pipeline::identity_pipeline(data_array, table, pipeline_parameters, custom_ts)
.map(|rows| {
vec![RowInsertRequest {
rows: Some(rows),
table_name,
}]
})
.context(PipelineSnafu)
}
async fn run_custom_pipeline(
handler: &PipelineHandlerRef,
pipeline_definition: &PipelineDefinition,
pipeline_parameters: &GreptimePipelineParams,
data_array: Vec<PipelineMap>,
table_name: String,
query_ctx: &QueryContextRef,
is_top_level: bool,
) -> Result<Vec<RowInsertRequest>> {
let db = query_ctx.get_db_string();
let pipeline = get_pipeline(pipeline_definition, handler, query_ctx).await?;
if matches!(
pipeline_definition,
PipelineDefinition::GreptimeIdentityPipeline
) {
let table = state
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?;
pipeline::identity_pipeline(array, table, pipeline_parameters)
.map(|rows| {
vec![RowInsertRequest {
rows: Some(rows),
table_name,
}]
let transform_timer = std::time::Instant::now();
let mut transformed = Vec::with_capacity(data_array.len());
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
for mut values in data_array {
let r = pipeline
.exec_mut(&mut values)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineSnafu)
} else {
let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?;
.context(PipelineSnafu)?;
let transform_timer = std::time::Instant::now();
let mut transformed = Vec::with_capacity(array.len());
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
for mut values in array {
let r = pipeline
.exec_mut(&mut values)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineSnafu)?;
match r {
PipelineExecOutput::Transformed(row) => {
transformed.push(row);
}
PipelineExecOutput::DispatchedTo(dispatched_to) => {
if let Some(coll) = dispatched.get_mut(&dispatched_to) {
coll.push(values);
} else {
dispatched.insert(dispatched_to, vec![values]);
}
match r {
PipelineExecOutput::Transformed(row) => {
transformed.push(row);
}
PipelineExecOutput::DispatchedTo(dispatched_to) => {
if let Some(coll) = dispatched.get_mut(&dispatched_to) {
coll.push(values);
} else {
dispatched.insert(dispatched_to, vec![values]);
}
}
}
let mut results = Vec::new();
// if current pipeline generates some transformed results, build it as
// `RowInsertRequest` and append to results. If the pipeline doesn't
// have dispatch, this will be only output of the pipeline.
if !transformed.is_empty() {
results.push(RowInsertRequest {
rows: Some(Rows {
rows: transformed,
schema: pipeline.schemas().clone(),
}),
table_name: table_name.clone(),
})
}
// if current pipeline contains dispatcher and has several rules, we may
// already accumulated several dispatched rules and rows.
for (dispatched_to, coll) in dispatched {
// we generate the new table name according to `table_part` and
// current custom table name.
let table_name = dispatched_to.dispatched_to_table_name(&table_name);
let next_pipeline_name = dispatched_to
.pipeline
.as_deref()
.unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
// run pipeline recursively.
let requests = Box::pin(run_pipeline(
state,
PipelineDefinition::from_name(next_pipeline_name, None),
pipeline_parameters,
coll,
table_name,
query_ctx,
false,
))
.await?;
results.extend(requests);
}
if is_top_level {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
}
Ok(results)
}
let mut results = Vec::new();
// if current pipeline generates some transformed results, build it as
// `RowInsertRequest` and append to results. If the pipeline doesn't
// have dispatch, this will be only output of the pipeline.
if !transformed.is_empty() {
results.push(RowInsertRequest {
rows: Some(Rows {
rows: transformed,
schema: pipeline.schemas().clone(),
}),
table_name: table_name.clone(),
})
}
// if current pipeline contains dispatcher and has several rules, we may
// already accumulated several dispatched rules and rows.
for (dispatched_to, coll) in dispatched {
// we generate the new table name according to `table_part` and
// current custom table name.
let table_name = dispatched_to.dispatched_to_table_name(&table_name);
let next_pipeline_name = dispatched_to
.pipeline
.as_deref()
.unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME);
// run pipeline recursively.
let next_pipeline_def =
PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?;
let requests = Box::pin(run_pipeline(
handler,
&next_pipeline_def,
pipeline_parameters,
coll,
table_name,
query_ctx,
false,
))
.await?;
results.extend(requests);
}
if is_top_level {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
}
Ok(results)
}