chore: minor update for using pipeline with prometheus (#6522)

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2025-07-16 11:03:07 +08:00
committed by GitHub
parent 6744f5470b
commit 95d2549007
2 changed files with 11 additions and 4 deletions

View File

@@ -374,6 +374,12 @@ fn calc_ts(p_ctx: &PipelineContext, values: &VrlValue) -> Result<Option<ValueDat
}
}
/// `need_calc_ts` happens in two cases:
/// 1. full greptime_identity
/// 2. auto-transform without transformer
///
/// if transform is present in custom pipeline in v2 mode
/// we dont need to calc ts again, nor do we need to check ts column name
pub(crate) fn values_to_row(
schema_info: &mut SchemaInfo,
values: VrlValue,
@@ -401,7 +407,7 @@ pub(crate) fn values_to_row(
let values = values.into_object().context(ValueMustBeMapSnafu)?;
for (column_name, value) in values {
if column_name.as_str() == ts_column_name {
if need_calc_ts && column_name.as_str() == ts_column_name {
continue;
}

View File

@@ -442,10 +442,11 @@ impl PromSeriesProcessor {
// run pipeline
let mut req = ContextReq::default();
for (table_name, pipeline_maps) in self.table_values.iter_mut() {
let table_values = std::mem::take(&mut self.table_values);
for (table_name, pipeline_maps) in table_values.into_iter() {
let pipeline_req = PipelineIngestRequest {
table: table_name.clone(),
values: pipeline_maps.clone(),
table: table_name,
values: pipeline_maps,
};
let row_req =
run_pipeline(handler, &pipeline_ctx, pipeline_req, query_ctx, true).await?;