From c77ce958a3b936e1065abe51fa10832b0ea2ac99 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 24 Mar 2025 12:27:22 +0800 Subject: [PATCH] chore: support custom time index selector for identity pipeline (#5750) * chore: minor refactor * chore: minor refactor * chore: support custom ts for identity pipeline * chore: fix clippy * chore: minor refactor & update tests * chore: use ref on identity pipeline param --- src/pipeline/benches/processor.rs | 4 +- src/pipeline/src/error.rs | 12 +- src/pipeline/src/etl.rs | 12 +- .../src/etl/transform/transformer/greptime.rs | 419 +++++++++--------- src/pipeline/src/etl/value/time.rs | 13 +- src/pipeline/src/lib.rs | 8 +- src/pipeline/src/manager.rs | 209 ++++++++- src/pipeline/tests/common.rs | 6 +- src/pipeline/tests/dissect.rs | 4 +- src/pipeline/tests/pipeline.rs | 20 +- src/servers/src/elasticsearch.rs | 22 +- src/servers/src/http/event.rs | 41 +- src/servers/src/otlp/logs.rs | 4 +- src/servers/src/pipeline.rs | 237 ++++++---- tests-integration/tests/http.rs | 104 ++++- 15 files changed, 733 insertions(+), 382 deletions(-) diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index b26ef7e63f..4dfd487554 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -14,7 +14,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; use pipeline::error::Result; -use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline}; +use pipeline::{json_to_map, parse, Content, GreptimeTransformer, Pipeline}; use serde_json::{Deserializer, Value}; fn processor_mut( @@ -24,7 +24,7 @@ fn processor_mut( let mut result = Vec::with_capacity(input_values.len()); for v in input_values { - let mut payload = json_to_intermediate_state(v).unwrap(); + let mut payload = json_to_map(v).unwrap(); let r = pipeline .exec_mut(&mut payload)? .into_transformed() diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index ac653daacd..7a714252c5 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -692,6 +692,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid custom time index config: {}, reason: {}", config, reason))] + InvalidCustomTimeIndex { + config: String, + reason: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -704,7 +712,9 @@ impl ErrorExt for Error { PipelineTableNotFound { .. } => StatusCode::TableNotFound, InsertPipeline { source, .. } => source.status_code(), CollectRecords { source, .. } => source.status_code(), - PipelineNotFound { .. } | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, + PipelineNotFound { .. } + | InvalidPipelineVersion { .. } + | InvalidCustomTimeIndex { .. } => StatusCode::InvalidArguments, BuildDfLogicalPlan { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), DataFrame { source, .. } => source.status_code(), diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 42af8af22e..6a38a3562e 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -147,7 +147,7 @@ impl PipelineExecOutput { } } -pub fn json_to_intermediate_state(val: serde_json::Value) -> Result { +pub fn json_to_map(val: serde_json::Value) -> Result { match val { serde_json::Value::Object(map) => { let mut intermediate_state = PipelineMap::new(); @@ -160,8 +160,8 @@ pub fn json_to_intermediate_state(val: serde_json::Value) -> Result } } -pub fn json_array_to_intermediate_state(val: Vec) -> Result> { - val.into_iter().map(json_to_intermediate_state).collect() +pub fn json_array_to_map(val: Vec) -> Result> { + val.into_iter().map(json_to_map).collect() } impl Pipeline @@ -238,7 +238,7 @@ transform: type: uint32 "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); - let mut payload = json_to_intermediate_state(input_value).unwrap(); + let mut payload = json_to_map(input_value).unwrap(); let result = pipeline .exec_mut(&mut payload) .unwrap() @@ -366,7 +366,7 @@ transform: "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); - let mut payload = json_to_intermediate_state(input_value).unwrap(); + let mut payload = json_to_map(input_value).unwrap(); let result = pipeline .exec_mut(&mut payload) .unwrap() @@ -408,7 +408,7 @@ transform: let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); let schema = pipeline.schemas().clone(); - let mut result = json_to_intermediate_state(input_value).unwrap(); + let mut result = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut result) diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 008964dc79..1c2285a33a 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -38,6 +38,7 @@ use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transformer, Transforms}; use crate::etl::value::{Timestamp, Value}; use crate::etl::PipelineMap; +use crate::IdentityTimeIndex; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -328,237 +329,194 @@ fn resolve_number_schema( ) } -fn values_to_row(schema_info: &mut SchemaInfo, values: PipelineMap) -> Result { +fn values_to_row( + schema_info: &mut SchemaInfo, + values: PipelineMap, + custom_ts: Option<&IdentityTimeIndex>, +) -> Result { let mut row: Vec = Vec::with_capacity(schema_info.schema.len()); - for _ in 0..schema_info.schema.len() { + + // set time index value + let value_data = match custom_ts { + Some(ts) => { + let ts_field = values.get(ts.get_column_name()); + Some(ts.get_timestamp(ts_field)?) + } + None => Some(ValueData::TimestampNanosecondValue( + chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default(), + )), + }; + + row.push(GreptimeValue { value_data }); + + for _ in 1..schema_info.schema.len() { row.push(GreptimeValue { value_data: None }); } - for (column_name, value) in values.into_iter() { - if column_name == DEFAULT_GREPTIME_TIMESTAMP_COLUMN { + for (column_name, value) in values { + // skip ts column + let ts_column = custom_ts + .as_ref() + .map_or(DEFAULT_GREPTIME_TIMESTAMP_COLUMN, |ts| ts.get_column_name()); + if column_name == ts_column { continue; } let index = schema_info.index.get(&column_name).copied(); - - match value { - Value::Null => {} - - Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => { - // safe unwrap after type matched - let v = value.as_i64().unwrap(); - resolve_schema( - index, - ValueData::I64Value(v), - ColumnSchema { - column_name, - datatype: ColumnDataType::Int64 as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - - Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => { - // safe unwrap after type matched - let v = value.as_u64().unwrap(); - resolve_schema( - index, - ValueData::U64Value(v), - ColumnSchema { - column_name, - datatype: ColumnDataType::Uint64 as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - - Value::Float32(_) | Value::Float64(_) => { - // safe unwrap after type matched - let v = value.as_f64().unwrap(); - resolve_schema( - index, - ValueData::F64Value(v), - ColumnSchema { - column_name, - datatype: ColumnDataType::Float64 as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - - Value::Boolean(v) => { - resolve_schema( - index, - ValueData::BoolValue(v), - ColumnSchema { - column_name, - datatype: ColumnDataType::Boolean as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - Value::String(v) => { - resolve_schema( - index, - ValueData::StringValue(v), - ColumnSchema { - column_name, - datatype: ColumnDataType::String as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - - Value::Timestamp(Timestamp::Nanosecond(ns)) => { - resolve_schema( - index, - ValueData::TimestampNanosecondValue(ns), - ColumnSchema { - column_name, - datatype: ColumnDataType::TimestampNanosecond as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - Value::Timestamp(Timestamp::Microsecond(us)) => { - resolve_schema( - index, - ValueData::TimestampMicrosecondValue(us), - ColumnSchema { - column_name, - datatype: ColumnDataType::TimestampMicrosecond as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - Value::Timestamp(Timestamp::Millisecond(ms)) => { - resolve_schema( - index, - ValueData::TimestampMillisecondValue(ms), - ColumnSchema { - column_name, - datatype: ColumnDataType::TimestampMillisecond as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - Value::Timestamp(Timestamp::Second(s)) => { - resolve_schema( - index, - ValueData::TimestampSecondValue(s), - ColumnSchema { - column_name, - datatype: ColumnDataType::TimestampSecond as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: None, - options: None, - }, - &mut row, - schema_info, - )?; - } - - Value::Array(_) | Value::Map(_) => { - let data: jsonb::Value = value.into(); - resolve_schema( - index, - ValueData::BinaryValue(data.to_vec()), - ColumnSchema { - column_name, - datatype: ColumnDataType::Binary as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - options: None, - }, - &mut row, - schema_info, - )?; - } - } + resolve_value(index, value, column_name, &mut row, schema_info)?; } Ok(Row { values: row }) } -fn identity_pipeline_inner<'a>( +fn resolve_value( + index: Option, + value: Value, + column_name: String, + row: &mut Vec, + schema_info: &mut SchemaInfo, +) -> Result<()> { + let mut resolve_simple_type = + |value_data: ValueData, column_name: String, data_type: ColumnDataType| { + resolve_schema( + index, + value_data, + ColumnSchema { + column_name, + datatype: data_type as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + options: None, + }, + row, + schema_info, + ) + }; + + match value { + Value::Null => {} + + Value::Int8(_) | Value::Int16(_) | Value::Int32(_) | Value::Int64(_) => { + // safe unwrap after type matched + let v = value.as_i64().unwrap(); + resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?; + } + + Value::Uint8(_) | Value::Uint16(_) | Value::Uint32(_) | Value::Uint64(_) => { + // safe unwrap after type matched + let v = value.as_u64().unwrap(); + resolve_simple_type(ValueData::U64Value(v), column_name, ColumnDataType::Uint64)?; + } + + Value::Float32(_) | Value::Float64(_) => { + // safe unwrap after type matched + let v = value.as_f64().unwrap(); + resolve_simple_type(ValueData::F64Value(v), column_name, ColumnDataType::Float64)?; + } + + Value::Boolean(v) => { + resolve_simple_type( + ValueData::BoolValue(v), + column_name, + ColumnDataType::Boolean, + )?; + } + + Value::String(v) => { + resolve_simple_type( + ValueData::StringValue(v), + column_name, + ColumnDataType::String, + )?; + } + + Value::Timestamp(Timestamp::Nanosecond(ns)) => { + resolve_simple_type( + ValueData::TimestampNanosecondValue(ns), + column_name, + ColumnDataType::TimestampNanosecond, + )?; + } + + Value::Timestamp(Timestamp::Microsecond(us)) => { + resolve_simple_type( + ValueData::TimestampMicrosecondValue(us), + column_name, + ColumnDataType::TimestampMicrosecond, + )?; + } + + Value::Timestamp(Timestamp::Millisecond(ms)) => { + resolve_simple_type( + ValueData::TimestampMillisecondValue(ms), + column_name, + ColumnDataType::TimestampMillisecond, + )?; + } + + Value::Timestamp(Timestamp::Second(s)) => { + resolve_simple_type( + ValueData::TimestampSecondValue(s), + column_name, + ColumnDataType::TimestampSecond, + )?; + } + + Value::Array(_) | Value::Map(_) => { + let data: jsonb::Value = value.into(); + resolve_schema( + index, + ValueData::BinaryValue(data.to_vec()), + ColumnSchema { + column_name, + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + options: None, + }, + row, + schema_info, + )?; + } + } + Ok(()) +} + +fn identity_pipeline_inner( array: Vec, - tag_column_names: Option>, - _params: &GreptimePipelineParams, -) -> Result { + custom_ts: Option<&IdentityTimeIndex>, +) -> Result<(SchemaInfo, Vec)> { let mut rows = Vec::with_capacity(array.len()); let mut schema_info = SchemaInfo::default(); - for values in array { - let row = values_to_row(&mut schema_info, values)?; - rows.push(row); - } - - let greptime_timestamp_schema = ColumnSchema { - column_name: DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(), - datatype: ColumnDataType::TimestampNanosecond as i32, + // set time index column schema first + schema_info.schema.push(ColumnSchema { + column_name: custom_ts + .map(|ts| ts.get_column_name().clone()) + .unwrap_or_else(|| DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string()), + datatype: custom_ts + .map(|c| c.get_datatype()) + .unwrap_or(ColumnDataType::TimestampNanosecond) as i32, semantic_type: SemanticType::Timestamp as i32, datatype_extension: None, options: None, - }; - let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0); - let ts = GreptimeValue { - value_data: Some(ValueData::TimestampNanosecondValue(ns)), - }; + }); + + for values in array { + let row = values_to_row(&mut schema_info, values, custom_ts)?; + rows.push(row); + } + let column_count = schema_info.schema.len(); for row in rows.iter_mut() { let diff = column_count - row.values.len(); for _ in 0..diff { row.values.push(GreptimeValue { value_data: None }); } - row.values.push(ts.clone()); } - schema_info.schema.push(greptime_timestamp_schema); - // set the semantic type of the row key column to Tag - if let Some(tag_column_names) = tag_column_names { - tag_column_names.for_each(|tag_column_name| { - if let Some(index) = schema_info.index.get(tag_column_name) { - schema_info.schema[*index].semantic_type = SemanticType::Tag as i32; - } - }); - } - Ok(Rows { - schema: schema_info.schema, - rows, - }) + Ok((schema_info, rows)) } /// Identity pipeline for Greptime @@ -573,6 +531,7 @@ pub fn identity_pipeline( array: Vec, table: Option>, params: &GreptimePipelineParams, + custom_ts: Option<&IdentityTimeIndex>, ) -> Result { let input = if params.flatten_json_object() { array @@ -583,14 +542,20 @@ pub fn identity_pipeline( array }; - match table { - Some(table) => { + identity_pipeline_inner(input, custom_ts).map(|(mut schema, rows)| { + if let Some(table) = table { let table_info = table.table_info(); - let tag_column_names = table_info.meta.row_key_column_names(); - identity_pipeline_inner(input, Some(tag_column_names), params) + for tag_name in table_info.meta.row_key_column_names() { + if let Some(index) = schema.index.get(tag_name) { + schema.schema[*index].semantic_type = SemanticType::Tag as i32; + } + } } - None => identity_pipeline_inner(input, None::>, params), - } + Rows { + schema: schema.schema, + rows, + } + }) } /// Consumes the JSON object and consumes it into a single-level object. @@ -648,7 +613,7 @@ mod tests { use api::v1::SemanticType; use super::*; - use crate::etl::{json_array_to_intermediate_state, json_to_intermediate_state}; + use crate::etl::{json_array_to_map, json_to_map}; use crate::identity_pipeline; #[test] @@ -674,8 +639,8 @@ mod tests { "gaga": "gaga" }), ]; - let array = json_array_to_intermediate_state(array).unwrap(); - let rows = identity_pipeline(array, None, &GreptimePipelineParams::default()); + let array = json_array_to_map(array).unwrap(); + let rows = identity_pipeline(array, None, &GreptimePipelineParams::default(), None); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -704,9 +669,10 @@ mod tests { }), ]; let rows = identity_pipeline( - json_array_to_intermediate_state(array).unwrap(), + json_array_to_map(array).unwrap(), None, &GreptimePipelineParams::default(), + None, ); assert!(rows.is_err()); assert_eq!( @@ -736,9 +702,10 @@ mod tests { }), ]; let rows = identity_pipeline( - json_array_to_intermediate_state(array).unwrap(), + json_array_to_map(array).unwrap(), None, &GreptimePipelineParams::default(), + None, ); assert!(rows.is_ok()); let rows = rows.unwrap(); @@ -769,11 +736,21 @@ mod tests { }), ]; let tag_column_names = ["name".to_string(), "address".to_string()]; - let rows = identity_pipeline_inner( - json_array_to_intermediate_state(array).unwrap(), - Some(tag_column_names.iter()), - &GreptimePipelineParams::default(), + + let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), None).map( + |(mut schema, rows)| { + for name in tag_column_names { + if let Some(index) = schema.index.get(&name) { + schema.schema[*index].semantic_type = SemanticType::Tag as i32; + } + } + Rows { + schema: schema.schema, + rows, + } + }, ); + assert!(rows.is_ok()); let rows = rows.unwrap(); assert_eq!(rows.schema.len(), 8); @@ -869,8 +846,8 @@ mod tests { ]; for (input, max_depth, expected) in test_cases { - let input = json_to_intermediate_state(input).unwrap(); - let expected = expected.map(|e| json_to_intermediate_state(e).unwrap()); + let input = json_to_map(input).unwrap(); + let expected = expected.map(|e| json_to_map(e).unwrap()); let flattened_object = flatten_object(input, max_depth).ok(); assert_eq!(flattened_object, expected); diff --git a/src/pipeline/src/etl/value/time.rs b/src/pipeline/src/etl/value/time.rs index 72a6ef3593..f6afe32b87 100644 --- a/src/pipeline/src/etl/value/time.rs +++ b/src/pipeline/src/etl/value/time.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_time::timestamp::TimeUnit; + #[derive(Debug, Clone, PartialEq)] pub enum Timestamp { Nanosecond(i64), @@ -84,11 +86,20 @@ impl Timestamp { Timestamp::Second(v) => *v, } } + + pub(crate) fn to_unit(&self, unit: &TimeUnit) -> i64 { + match unit { + TimeUnit::Second => self.timestamp(), + TimeUnit::Millisecond => self.timestamp_millis(), + TimeUnit::Microsecond => self.timestamp_micros(), + TimeUnit::Nanosecond => self.timestamp_nanos(), + } + } } impl Default for Timestamp { fn default() -> Self { - Timestamp::Nanosecond(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)) + Timestamp::Nanosecond(chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default()) } } diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index a471ce721e..fa4c3026d1 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -24,11 +24,11 @@ pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; pub use etl::{ - json_array_to_intermediate_state, json_to_intermediate_state, parse, Content, DispatchedTo, - Pipeline, PipelineExecOutput, PipelineMap, + json_array_to_map, json_to_map, parse, Content, DispatchedTo, Pipeline, PipelineExecOutput, + PipelineMap, }; pub use manager::{ - pipeline_operator, table, util, PipelineDefinition, PipelineInfo, PipelineRef, - PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo, + pipeline_operator, table, util, IdentityTimeIndex, PipelineDefinition, PipelineInfo, + PipelineRef, PipelineTableRef, PipelineVersion, PipelineWay, SelectInfo, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME, }; diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index 479f5fb340..ffb2a95e72 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -14,14 +14,19 @@ use std::sync::Arc; +use api::v1::value::ValueData; +use api::v1::ColumnDataType; +use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::timestamp::TimestampNanosecond; use itertools::Itertools; +use snafu::ensure; use util::to_pipeline_version; -use crate::error::Result; +use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, Result}; +use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION}; use crate::table::PipelineTable; -use crate::{GreptimeTransformer, Pipeline}; +use crate::{GreptimeTransformer, Pipeline, Value}; pub mod pipeline_operator; pub mod table; @@ -71,18 +76,29 @@ pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1"; /// Enum for holding information of a pipeline, which is either pipeline itself, /// or information that be used to retrieve a pipeline from `PipelineHandler` +#[derive(Debug, Clone)] pub enum PipelineDefinition { Resolved(Arc>), ByNameAndValue((String, PipelineVersion)), - GreptimeIdentityPipeline, + GreptimeIdentityPipeline(Option), } impl PipelineDefinition { - pub fn from_name(name: &str, version: PipelineVersion) -> Self { + pub fn from_name( + name: &str, + version: PipelineVersion, + custom_time_index: Option<(String, bool)>, + ) -> Result { if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME { - Self::GreptimeIdentityPipeline + Ok(Self::GreptimeIdentityPipeline( + custom_time_index + .map(|(config, ignore_errors)| { + IdentityTimeIndex::from_config(config, ignore_errors) + }) + .transpose()?, + )) } else { - Self::ByNameAndValue((name.to_owned(), version)) + Ok(Self::ByNameAndValue((name.to_owned(), version))) } } } @@ -107,10 +123,189 @@ impl PipelineWay { Ok(PipelineWay::Pipeline(PipelineDefinition::from_name( pipeline_name, to_pipeline_version(version)?, - ))) + None, + )?)) } } else { Ok(default_pipeline) } } } + +const IDENTITY_TS_EPOCH: &str = "epoch"; +const IDENTITY_TS_DATESTR: &str = "datestr"; + +#[derive(Debug, Clone)] +pub enum IdentityTimeIndex { + Epoch(String, TimeUnit, bool), + DateStr(String, String, bool), +} + +impl IdentityTimeIndex { + pub fn from_config(config: String, ignore_errors: bool) -> Result { + let parts = config.split(';').collect::>(); + ensure!( + parts.len() == 3, + InvalidCustomTimeIndexSnafu { + config, + reason: "config format: ';;'", + } + ); + + let field = parts[0].to_string(); + match parts[1] { + IDENTITY_TS_EPOCH => match parts[2] { + NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch( + field, + TimeUnit::Nanosecond, + ignore_errors, + )), + US_RESOLUTION => Ok(IdentityTimeIndex::Epoch( + field, + TimeUnit::Microsecond, + ignore_errors, + )), + MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch( + field, + TimeUnit::Millisecond, + ignore_errors, + )), + S_RESOLUTION => Ok(IdentityTimeIndex::Epoch( + field, + TimeUnit::Second, + ignore_errors, + )), + _ => InvalidCustomTimeIndexSnafu { + config, + reason: "epoch type must be one of ns, us, ms, s", + } + .fail(), + }, + IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr( + field, + parts[2].to_string(), + ignore_errors, + )), + _ => InvalidCustomTimeIndexSnafu { + config, + reason: "identity time index type must be one of epoch, datestr", + } + .fail(), + } + } + + pub fn get_column_name(&self) -> &String { + match self { + IdentityTimeIndex::Epoch(field, _, _) => field, + IdentityTimeIndex::DateStr(field, _, _) => field, + } + } + + pub fn get_ignore_errors(&self) -> bool { + match self { + IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors, + IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors, + } + } + + pub fn get_datatype(&self) -> ColumnDataType { + match self { + IdentityTimeIndex::Epoch(_, unit, _) => match unit { + TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond, + TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond, + TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond, + TimeUnit::Second => ColumnDataType::TimestampSecond, + }, + IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond, + } + } + + pub fn get_timestamp(&self, value: Option<&Value>) -> Result { + match self { + IdentityTimeIndex::Epoch(_, unit, ignore_errors) => { + let v = match value { + Some(Value::Int32(v)) => *v as i64, + Some(Value::Int64(v)) => *v, + Some(Value::Uint32(v)) => *v as i64, + Some(Value::Uint64(v)) => *v as i64, + Some(Value::String(s)) => match s.parse::() { + Ok(v) => v, + Err(_) => { + return if_ignore_errors( + *ignore_errors, + *unit, + format!("failed to convert {} to number", s), + ) + } + }, + Some(Value::Timestamp(timestamp)) => timestamp.to_unit(unit), + Some(v) => { + return if_ignore_errors( + *ignore_errors, + *unit, + format!("unsupported value type to convert to timestamp: {}", v), + ) + } + None => { + return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string()) + } + }; + Ok(time_unit_to_value_data(*unit, v)) + } + IdentityTimeIndex::DateStr(_, format, ignore_errors) => { + let v = match value { + Some(Value::String(s)) => s, + Some(v) => { + return if_ignore_errors( + *ignore_errors, + TimeUnit::Nanosecond, + format!("unsupported value type to convert to date string: {}", v), + ); + } + None => { + return if_ignore_errors( + *ignore_errors, + TimeUnit::Nanosecond, + "missing field".to_string(), + ) + } + }; + + let timestamp = match chrono::DateTime::parse_from_str(v, format) { + Ok(ts) => ts, + Err(_) => { + return if_ignore_errors( + *ignore_errors, + TimeUnit::Nanosecond, + format!("failed to parse date string: {}, format: {}", v, format), + ) + } + }; + + Ok(ValueData::TimestampNanosecondValue( + timestamp.timestamp_nanos_opt().unwrap_or_default(), + )) + } + } + } +} + +fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result { + if ignore_errors { + Ok(time_unit_to_value_data( + unit, + Timestamp::current_time(unit).value(), + )) + } else { + CastTypeSnafu { msg }.fail() + } +} + +fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData { + match unit { + TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v), + TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v), + TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v), + TimeUnit::Second => ValueData::TimestampSecondValue(v), + } +} diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index 89bebbf85b..5eb3f27692 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -13,7 +13,7 @@ // limitations under the License. use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType}; -use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline}; +use pipeline::{json_to_map, parse, Content, GreptimeTransformer, Pipeline}; /// test util function to parse and execute pipeline pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { @@ -30,7 +30,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { match input_value { serde_json::Value::Array(array) => { for value in array { - let mut intermediate_status = json_to_intermediate_state(value).unwrap(); + let mut intermediate_status = json_to_map(value).unwrap(); let row = pipeline .exec_mut(&mut intermediate_status) .expect("failed to exec pipeline") @@ -40,7 +40,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { } } serde_json::Value::Object(_) => { - let mut intermediate_status = json_to_intermediate_state(input_value).unwrap(); + let mut intermediate_status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut intermediate_status) .expect("failed to exec pipeline") diff --git a/src/pipeline/tests/dissect.rs b/src/pipeline/tests/dissect.rs index a93112d689..51b9e1bd59 100644 --- a/src/pipeline/tests/dissect.rs +++ b/src/pipeline/tests/dissect.rs @@ -16,7 +16,7 @@ mod common; use greptime_proto::v1::value::ValueData::StringValue; use greptime_proto::v1::{ColumnDataType, SemanticType}; -use pipeline::json_to_intermediate_state; +use pipeline::json_to_map; fn make_string_column_schema(name: String) -> greptime_proto::v1::ColumnSchema { common::make_column_schema(name, ColumnDataType::String, SemanticType::Field) @@ -274,7 +274,7 @@ transform: let yaml_content = pipeline::Content::Yaml(pipeline_yaml); let pipeline: pipeline::Pipeline = pipeline::parse(&yaml_content).expect("failed to parse pipeline"); - let mut result = json_to_intermediate_state(input_value).unwrap(); + let mut result = json_to_map(input_value).unwrap(); let row = pipeline.exec_mut(&mut result); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 3f3a90c55f..cac4a8a1bd 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -20,7 +20,7 @@ use greptime_proto::v1::value::ValueData::{ U32Value, U64Value, U8Value, }; use greptime_proto::v1::Value as GreptimeValue; -use pipeline::{json_to_intermediate_state, parse, Content, GreptimeTransformer, Pipeline}; +use pipeline::{json_to_map, parse, Content, GreptimeTransformer, Pipeline}; #[test] fn test_complex_data() { @@ -420,7 +420,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); - let mut stats = json_to_intermediate_state(input_value).unwrap(); + let mut stats = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut stats) @@ -489,7 +489,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value).unwrap(); + let mut status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut status) .unwrap() @@ -597,7 +597,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value).unwrap(); + let mut status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut status) .unwrap() @@ -662,7 +662,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value).unwrap(); + let mut status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut status) .unwrap() @@ -701,7 +701,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value).unwrap(); + let mut status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut status) @@ -760,7 +760,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value).unwrap(); + let mut status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut status) .unwrap() @@ -800,7 +800,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value).unwrap(); + let mut status = json_to_map(input_value).unwrap(); let row = pipeline .exec_mut(&mut status) .unwrap() @@ -862,7 +862,7 @@ transform: let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).unwrap(); - let mut status = json_to_intermediate_state(input_value1).unwrap(); + let mut status = json_to_map(input_value1).unwrap(); let dispatched_to = pipeline .exec_mut(&mut status) .unwrap() @@ -871,7 +871,7 @@ transform: assert_eq!(dispatched_to.table_suffix, "http"); assert_eq!(dispatched_to.pipeline.unwrap(), "access_log_pipeline"); - let mut status = json_to_intermediate_state(input_value2).unwrap(); + let mut status = json_to_map(input_value2).unwrap(); let row = pipeline .exec_mut(&mut status) .unwrap() diff --git a/src/servers/src/elasticsearch.rs b/src/servers/src/elasticsearch.rs index baa25ba776..60b9834b90 100644 --- a/src/servers/src/elasticsearch.rs +++ b/src/servers/src/elasticsearch.rs @@ -24,7 +24,7 @@ use common_error::ext::ErrorExt; use common_telemetry::{debug, error}; use headers::ContentType; use once_cell::sync::Lazy; -use pipeline::GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME; +use pipeline::{PipelineDefinition, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME}; use serde_json::{json, Deserializer, Value}; use session::context::{Channel, QueryContext}; use snafu::{ensure, ResultExt}; @@ -135,7 +135,7 @@ async fn do_handle_bulk_api( .start_timer(); // If pipeline_name is not provided, use the internal pipeline. - let pipeline = if let Some(pipeline) = params.pipeline_name { + let pipeline_name = if let Some(pipeline) = params.pipeline_name { pipeline } else { GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME.to_string() @@ -159,10 +159,26 @@ async fn do_handle_bulk_api( }; let log_num = requests.len(); + let pipeline = match PipelineDefinition::from_name(&pipeline_name, None, None) { + Ok(pipeline) => pipeline, + Err(e) => { + // should be unreachable + error!(e; "Failed to ingest logs"); + return ( + status_code_to_http_status(&e.status_code()), + elasticsearch_headers(), + axum::Json(write_bulk_response( + start.elapsed().as_millis() as i64, + 0, + e.status_code() as u32, + e.to_string().as_str(), + )), + ); + } + }; if let Err(e) = ingest_logs_inner( log_state.log_handler, pipeline, - None, requests, Arc::new(query_ctx), headers, diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 93bc5f8f73..a6398716a5 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -33,7 +33,7 @@ use datatypes::value::column_data_to_json; use headers::ContentType; use lazy_static::lazy_static; use pipeline::util::to_pipeline_version; -use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition, PipelineVersion}; +use pipeline::{GreptimePipelineParams, GreptimeTransformer, PipelineDefinition}; use serde::{Deserialize, Serialize}; use serde_json::{json, Deserializer, Map, Value}; use session::context::{Channel, QueryContext, QueryContextRef}; @@ -86,6 +86,15 @@ pub struct LogIngesterQueryParams { /// The JSON field name of the log message. If not provided, it will take the whole log as the message. /// The field must be at the top level of the JSON structure. pub msg_field: Option, + /// Specify a custom time index from the input data rather than server's arrival time. + /// Valid formats: + /// - ;epoch; + /// - ;datestr; + /// + /// If an error occurs while parsing the config, the error will be returned in the response. + /// If an error occurs while ingesting the data, the `ignore_errors` will be used to determine if the error should be ignored. + /// If so, use the current server's timestamp as the event time. + pub custom_time_index: Option, } /// LogIngestRequest is the internal request for log ingestion. The raw log input can be transformed into multiple LogIngestRequests. @@ -281,9 +290,9 @@ async fn dryrun_pipeline_inner( let results = run_pipeline( &pipeline_handler, - PipelineDefinition::Resolved(pipeline), + &PipelineDefinition::Resolved(pipeline), ¶ms, - pipeline::json_array_to_intermediate_state(value).context(PipelineSnafu)?, + pipeline::json_array_to_map(value).context(PipelineSnafu)?, "dry_run".to_owned(), query_ctx, true, @@ -527,17 +536,23 @@ pub async fn log_ingester( let handler = log_state.log_handler; - let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { - reason: "pipeline_name is required", - })?; let table_name = query_params.table.context(InvalidParameterSnafu { reason: "table is required", })?; - let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?; - let ignore_errors = query_params.ignore_errors.unwrap_or(false); + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { + reason: "pipeline_name is required", + })?; + let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?; + let pipeline = PipelineDefinition::from_name( + &pipeline_name, + version, + query_params.custom_time_index.map(|s| (s, ignore_errors)), + ) + .context(PipelineSnafu)?; + let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; query_ctx.set_channel(Channel::Http); @@ -550,8 +565,7 @@ pub async fn log_ingester( ingest_logs_inner( handler, - pipeline_name, - version, + pipeline, vec![LogIngestRequest { table: table_name, values: value, @@ -611,8 +625,7 @@ fn extract_pipeline_value_by_content_type( pub(crate) async fn ingest_logs_inner( state: PipelineHandlerRef, - pipeline_name: String, - version: PipelineVersion, + pipeline: PipelineDefinition, log_ingest_requests: Vec, query_ctx: QueryContextRef, headers: HeaderMap, @@ -631,9 +644,9 @@ pub(crate) async fn ingest_logs_inner( for request in log_ingest_requests { let requests = run_pipeline( &state, - PipelineDefinition::from_name(&pipeline_name, version), + &pipeline, &pipeline_params, - pipeline::json_array_to_intermediate_state(request.values).context(PipelineSnafu)?, + pipeline::json_array_to_map(request.values).context(PipelineSnafu)?, request.table, &query_ctx, true, diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 10dc8b5cc4..afaf289fcb 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -72,11 +72,11 @@ pub async fn to_grpc_insert_requests( } PipelineWay::Pipeline(pipeline_def) => { let data = parse_export_logs_service_request(request); - let array = pipeline::json_array_to_intermediate_state(data).context(PipelineSnafu)?; + let array = pipeline::json_array_to_map(data).context(PipelineSnafu)?; let inserts = run_pipeline( &pipeline_handler, - pipeline_def, + &pipeline_def, &pipeline_params, array, table_name, diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 0b51da94ea..a3151aeee4 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use api::v1::{RowInsertRequest, Rows}; use pipeline::{ - DispatchedTo, GreptimePipelineParams, GreptimeTransformer, Pipeline, PipelineDefinition, - PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + DispatchedTo, GreptimePipelineParams, GreptimeTransformer, IdentityTimeIndex, Pipeline, + PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::QueryContextRef; use snafu::ResultExt; @@ -31,15 +31,15 @@ use crate::query_handler::PipelineHandlerRef; /// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline pub async fn get_pipeline( - pipeline_def: PipelineDefinition, + pipeline_def: &PipelineDefinition, handler: &PipelineHandlerRef, query_ctx: &QueryContextRef, ) -> Result>> { match pipeline_def { - PipelineDefinition::Resolved(pipeline) => Ok(pipeline), + PipelineDefinition::Resolved(pipeline) => Ok(pipeline.clone()), PipelineDefinition::ByNameAndValue((name, version)) => { handler - .get_pipeline(&name, version, query_ctx.clone()) + .get_pipeline(name, *version, query_ctx.clone()) .await } _ => { @@ -49,110 +49,151 @@ pub async fn get_pipeline( } pub(crate) async fn run_pipeline( - state: &PipelineHandlerRef, - pipeline_definition: PipelineDefinition, + handler: &PipelineHandlerRef, + pipeline_definition: &PipelineDefinition, pipeline_parameters: &GreptimePipelineParams, - array: Vec, + data_array: Vec, + table_name: String, + query_ctx: &QueryContextRef, + is_top_level: bool, +) -> Result> { + match pipeline_definition { + PipelineDefinition::GreptimeIdentityPipeline(custom_ts) => { + run_identity_pipeline( + handler, + custom_ts.as_ref(), + pipeline_parameters, + data_array, + table_name, + query_ctx, + ) + .await + } + _ => { + run_custom_pipeline( + handler, + pipeline_definition, + pipeline_parameters, + data_array, + table_name, + query_ctx, + is_top_level, + ) + .await + } + } +} + +async fn run_identity_pipeline( + handler: &PipelineHandlerRef, + custom_ts: Option<&IdentityTimeIndex>, + pipeline_parameters: &GreptimePipelineParams, + data_array: Vec, + table_name: String, + query_ctx: &QueryContextRef, +) -> Result> { + let table = handler + .get_table(&table_name, query_ctx) + .await + .context(CatalogSnafu)?; + pipeline::identity_pipeline(data_array, table, pipeline_parameters, custom_ts) + .map(|rows| { + vec![RowInsertRequest { + rows: Some(rows), + table_name, + }] + }) + .context(PipelineSnafu) +} + +async fn run_custom_pipeline( + handler: &PipelineHandlerRef, + pipeline_definition: &PipelineDefinition, + pipeline_parameters: &GreptimePipelineParams, + data_array: Vec, table_name: String, query_ctx: &QueryContextRef, is_top_level: bool, ) -> Result> { let db = query_ctx.get_db_string(); + let pipeline = get_pipeline(pipeline_definition, handler, query_ctx).await?; - if matches!( - pipeline_definition, - PipelineDefinition::GreptimeIdentityPipeline - ) { - let table = state - .get_table(&table_name, query_ctx) - .await - .context(CatalogSnafu)?; - pipeline::identity_pipeline(array, table, pipeline_parameters) - .map(|rows| { - vec![RowInsertRequest { - rows: Some(rows), - table_name, - }] + let transform_timer = std::time::Instant::now(); + + let mut transformed = Vec::with_capacity(data_array.len()); + let mut dispatched: BTreeMap> = BTreeMap::new(); + + for mut values in data_array { + let r = pipeline + .exec_mut(&mut values) + .inspect_err(|_| { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); }) - .context(PipelineSnafu) - } else { - let pipeline = get_pipeline(pipeline_definition, state, query_ctx).await?; + .context(PipelineSnafu)?; - let transform_timer = std::time::Instant::now(); - - let mut transformed = Vec::with_capacity(array.len()); - let mut dispatched: BTreeMap> = BTreeMap::new(); - - for mut values in array { - let r = pipeline - .exec_mut(&mut values) - .inspect_err(|_| { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - }) - .context(PipelineSnafu)?; - - match r { - PipelineExecOutput::Transformed(row) => { - transformed.push(row); - } - PipelineExecOutput::DispatchedTo(dispatched_to) => { - if let Some(coll) = dispatched.get_mut(&dispatched_to) { - coll.push(values); - } else { - dispatched.insert(dispatched_to, vec![values]); - } + match r { + PipelineExecOutput::Transformed(row) => { + transformed.push(row); + } + PipelineExecOutput::DispatchedTo(dispatched_to) => { + if let Some(coll) = dispatched.get_mut(&dispatched_to) { + coll.push(values); + } else { + dispatched.insert(dispatched_to, vec![values]); } } } - - let mut results = Vec::new(); - // if current pipeline generates some transformed results, build it as - // `RowInsertRequest` and append to results. If the pipeline doesn't - // have dispatch, this will be only output of the pipeline. - if !transformed.is_empty() { - results.push(RowInsertRequest { - rows: Some(Rows { - rows: transformed, - schema: pipeline.schemas().clone(), - }), - table_name: table_name.clone(), - }) - } - - // if current pipeline contains dispatcher and has several rules, we may - // already accumulated several dispatched rules and rows. - for (dispatched_to, coll) in dispatched { - // we generate the new table name according to `table_part` and - // current custom table name. - let table_name = dispatched_to.dispatched_to_table_name(&table_name); - let next_pipeline_name = dispatched_to - .pipeline - .as_deref() - .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME); - - // run pipeline recursively. - let requests = Box::pin(run_pipeline( - state, - PipelineDefinition::from_name(next_pipeline_name, None), - pipeline_parameters, - coll, - table_name, - query_ctx, - false, - )) - .await?; - - results.extend(requests); - } - - if is_top_level { - METRIC_HTTP_LOGS_TRANSFORM_ELAPSED - .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) - .observe(transform_timer.elapsed().as_secs_f64()); - } - - Ok(results) } + + let mut results = Vec::new(); + // if current pipeline generates some transformed results, build it as + // `RowInsertRequest` and append to results. If the pipeline doesn't + // have dispatch, this will be only output of the pipeline. + if !transformed.is_empty() { + results.push(RowInsertRequest { + rows: Some(Rows { + rows: transformed, + schema: pipeline.schemas().clone(), + }), + table_name: table_name.clone(), + }) + } + + // if current pipeline contains dispatcher and has several rules, we may + // already accumulated several dispatched rules and rows. + for (dispatched_to, coll) in dispatched { + // we generate the new table name according to `table_part` and + // current custom table name. + let table_name = dispatched_to.dispatched_to_table_name(&table_name); + let next_pipeline_name = dispatched_to + .pipeline + .as_deref() + .unwrap_or(GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME); + + // run pipeline recursively. + let next_pipeline_def = + PipelineDefinition::from_name(next_pipeline_name, None, None).context(PipelineSnafu)?; + let requests = Box::pin(run_pipeline( + handler, + &next_pipeline_def, + pipeline_parameters, + coll, + table_name, + query_ctx, + false, + )) + .await?; + + results.extend(requests); + } + + if is_top_level { + METRIC_HTTP_LOGS_TRANSFORM_ELAPSED + .with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE]) + .observe(transform_timer.elapsed().as_secs_f64()); + } + + Ok(results) } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 6de8a49adc..0d2c397e02 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -94,6 +94,7 @@ macro_rules! http_tests { test_plain_text_ingestion, test_identify_pipeline, test_identify_pipeline_with_flatten, + test_identify_pipeline_with_custom_ts, test_pipeline_dispatcher, test_otlp_metrics, @@ -1422,8 +1423,8 @@ pub async fn test_identify_pipeline(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); - let line1_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei",null]"#; - let line2_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null,null]"#; + let line1_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java",null,null]"#; + let line2_expected = r#"[null,"10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei"]"#; let res = client.get("/v1/sql?sql=select * from logs").send().await; assert_eq!(res.status(), StatusCode::OK); let resp: serde_json::Value = res.json().await; @@ -1431,10 +1432,11 @@ pub async fn test_identify_pipeline(store_type: StorageType) { assert_eq!(result.len(), 2); let mut line1 = result[0].as_array().unwrap().clone(); let mut line2 = result[1].as_array().unwrap().clone(); - assert!(line1.last().unwrap().is_i64()); - assert!(line2.last().unwrap().is_i64()); - *line1.last_mut().unwrap() = serde_json::Value::Null; - *line2.last_mut().unwrap() = serde_json::Value::Null; + assert!(line1.first().unwrap().is_i64()); + assert!(line2.first().unwrap().is_i64()); + // set time index to null for assertion + *line1.first_mut().unwrap() = serde_json::Value::Null; + *line2.first_mut().unwrap() = serde_json::Value::Null; assert_eq!( line1, @@ -1445,7 +1447,7 @@ pub async fn test_identify_pipeline(store_type: StorageType) { serde_json::from_str::>(line2_expected).unwrap() ); - let expected = r#"[["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"],["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"]]"#; + let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"]]"#; validate_data("identity_schema", &client, "desc logs", expected).await; guard.remove_all().await; @@ -1670,7 +1672,7 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); - let expected = r#"[["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","Json","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"]]"#; + let expected = r#"[["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["custom_map.value_a","Json","","YES","","FIELD"],["custom_map.value_b","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"]]"#; validate_data( "test_identify_pipeline_with_flatten_desc_logs", &client, @@ -1691,6 +1693,92 @@ pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_identify_pipeline_with_custom_ts(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_identify_pipeline_with_custom_ts") + .await; + + let client = TestClient::new(app).await; + let body = r#" + [{"__time__":1453809242,"__source__":"10.170.***.***", "__name__":"hello"}, + {"__time__":1453809252,"__source__":"10.170.***.***"}] + "#; + + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/json"), + )], + "/v1/ingest?table=logs&pipeline_name=greptime_identity&custom_time_index=__time__;epoch;s", + body.as_bytes().to_vec(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = r#"[["__time__","TimestampSecond","PRI","NO","","TIMESTAMP"],["__name__","String","","YES","","FIELD"],["__source__","String","","YES","","FIELD"]]"#; + validate_data( + "test_identify_pipeline_with_custom_ts_desc_logs", + &client, + "desc logs", + expected, + ) + .await; + + let expected = r#"[[1453809242,"hello","10.170.***.***"],[1453809252,null,"10.170.***.***"]]"#; + validate_data( + "test_identify_pipeline_with_custom_ts_data", + &client, + "select * from logs", + expected, + ) + .await; + + // drop table + let res = client.get("/v1/sql?sql=drop table logs").send().await; + assert_eq!(res.status(), StatusCode::OK); + + let body = r#" + [{"__time__":"2019-01-16 02:42:01+08:00","__source__":"10.170.***.***"}, + {"__time__":"2019-01-16 02:42:04+08:00","__source__":"10.170.***.***", "__name__":"hello"}] + "#; + + let res = send_req( + &client, + vec![( + HeaderName::from_static("content-type"), + HeaderValue::from_static("application/json"), + )], + "/v1/ingest?table=logs&pipeline_name=greptime_identity&custom_time_index=__time__;datestr;%Y-%m-%d %H:%M:%S%z", + body.as_bytes().to_vec(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = r#"[["__time__","TimestampNanosecond","PRI","NO","","TIMESTAMP"],["__source__","String","","YES","","FIELD"],["__name__","String","","YES","","FIELD"]]"#; + validate_data( + "test_identify_pipeline_with_custom_ts_desc_logs", + &client, + "desc logs", + expected, + ) + .await; + + let expected = r#"[[1547577721000000000,"10.170.***.***",null],[1547577724000000000,"10.170.***.***","hello"]]"#; + validate_data( + "test_identify_pipeline_with_custom_ts_data", + &client, + "select * from logs", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_test_pipeline_api(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;