diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 1d80d436aa..40210e5662 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -333,9 +333,9 @@ impl Pipeline { table_suffix, })); } - // continue v2 process, check ts column and set the rest fields with auto-transform + // continue v2 process, and set the rest fields with auto-transform // if transformer presents, then ts has been set - values_to_row(schema_info, val, pipeline_ctx, Some(values))? + values_to_row(schema_info, val, pipeline_ctx, Some(values), false)? } TransformerMode::AutoTransform(ts_name, time_unit) => { // infer ts from the context @@ -347,7 +347,7 @@ impl Pipeline { )); let n_ctx = PipelineContext::new(&def, pipeline_ctx.pipeline_param, pipeline_ctx.channel); - values_to_row(schema_info, val, &n_ctx, None)? + values_to_row(schema_info, val, &n_ctx, None, true)? } }; diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 8494d638a0..f3a3f175c8 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -420,15 +420,17 @@ pub(crate) fn values_to_row( values: Value, pipeline_ctx: &PipelineContext<'_>, row: Option>, + need_calc_ts: bool, ) -> Result { let mut row: Vec = row.unwrap_or_else(|| Vec::with_capacity(schema_info.schema.len())); let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); - // calculate timestamp value based on the channel - let ts = calc_ts(pipeline_ctx, &values)?; - - row.push(GreptimeValue { value_data: ts }); + if need_calc_ts { + // calculate timestamp value based on the channel + let ts = calc_ts(pipeline_ctx, &values)?; + row.push(GreptimeValue { value_data: ts }); + } row.resize(schema_info.schema.len(), GreptimeValue { value_data: None }); @@ -608,7 +610,7 @@ fn identity_pipeline_inner( skip_error ); let row = unwrap_or_continue_if_err!( - values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None), + values_to_row(&mut schema_info, pipeline_map, pipeline_ctx, None, true), skip_error );