diff --git a/Cargo.lock b/Cargo.lock index b4a520fe9c..596eed965f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13076,6 +13076,7 @@ dependencies = [ "prost 0.13.5", "query", "rand 0.9.1", + "regex", "rstest", "rstest_reuse", "sea-query", diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index f53f3f162b..3ac9cd39a5 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -894,7 +894,7 @@ pub fn is_column_type_value_eq( .unwrap_or(false) } -fn encode_json_value(value: JsonValue) -> v1::JsonValue { +pub fn encode_json_value(value: JsonValue) -> v1::JsonValue { fn helper(json: JsonVariant) -> v1::JsonValue { let value = match json { JsonVariant::Null => None, diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 88ee0c5749..f7ec3d525f 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY}; use datatypes::schema::{ COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, - FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions, - SkippingIndexType, + FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, + SkippingIndexOptions, SkippingIndexType, }; use greptime_proto::v1::{ Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType, @@ -131,6 +131,31 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) -> }) } +/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema]. +pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata { + let mut metadata = Metadata::default(); + let Some(ColumnOptions { options }) = column_options else { + return metadata; + }; + + if let Some(v) = options.get(FULLTEXT_GRPC_KEY) { + metadata.insert(FULLTEXT_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(INVERTED_INDEX_GRPC_KEY) { + metadata.insert(INVERTED_INDEX_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(SKIPPING_INDEX_GRPC_KEY) { + metadata.insert(SKIPPING_INDEX_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(EXTENSION_TYPE_NAME_KEY) { + metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(EXTENSION_TYPE_METADATA_KEY) { + metadata.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), v.clone()); + } + metadata +} + /// Constructs a `ColumnOptions` from the given `ColumnSchema`. pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option { let mut options = ColumnOptions::default(); diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 4c838b78d1..388ec6d3bf 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::str::FromStr; use std::sync::Arc; @@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType { impl Display for JsonNativeType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - JsonNativeType::Null => write!(f, "Null"), - JsonNativeType::Bool => write!(f, "Bool"), - JsonNativeType::Number(t) => { - write!(f, "Number({t:?})") - } - JsonNativeType::String => write!(f, "String"), - JsonNativeType::Array(item_type) => { - write!(f, "Array[{}]", item_type) - } - JsonNativeType::Object(object) => { - write!( - f, - "Object{{{}}}", + fn to_serde_value(t: &JsonNativeType) -> serde_json::Value { + match t { + JsonNativeType::Null => serde_json::Value::String("".to_string()), + JsonNativeType::Bool => serde_json::Value::String("".to_string()), + JsonNativeType::Number(_) => serde_json::Value::String("".to_string()), + JsonNativeType::String => serde_json::Value::String("".to_string()), + JsonNativeType::Array(item_type) => { + serde_json::Value::Array(vec![to_serde_value(item_type)]) + } + JsonNativeType::Object(object) => serde_json::Value::Object( object .iter() - .map(|(k, v)| format!(r#""{k}": {v}"#)) - .collect::>() - .join(", ") - ) + .map(|(k, v)| (k.clone(), to_serde_value(v))) + .collect(), + ), } } + write!(f, "{}", to_serde_value(self)) } } @@ -183,7 +179,11 @@ impl JsonType { } } - pub(crate) fn native_type(&self) -> &JsonNativeType { + pub fn is_native_type(&self) -> bool { + matches!(self.format, JsonFormat::Native(_)) + } + + pub fn native_type(&self) -> &JsonNativeType { match &self.format { JsonFormat::Jsonb => &JsonNativeType::String, JsonFormat::Native(x) => x.as_ref(), diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 201d5d99f4..e229fa37eb 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; +use api::helper::ColumnDataTypeWrapper; use api::v1::alter_table_expr::Kind; use api::v1::column_def::options_from_skipping; use api::v1::region::{ @@ -23,7 +24,7 @@ use api::v1::region::{ }; use api::v1::{ AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests, - RowInsertRequest, RowInsertRequests, SemanticType, + ModifyColumnType, ModifyColumnTypes, RowInsertRequest, RowInsertRequests, SemanticType, }; use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; @@ -40,6 +41,7 @@ use common_query::Output; use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; +use datatypes::data_type::ConcreteDataType; use datatypes::schema::SkippingIndexOptions; use futures_util::future; use meter_macros::write_meter; @@ -67,8 +69,9 @@ use table::requests::{ use table::table_reference::TableReference; use crate::error::{ - CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu, - InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu, + CatalogSnafu, ColumnDataTypeSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, + FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, + TableNotFoundSnafu, }; use crate::expr_helper; use crate::region_req_factory::RegionRequestFactory; @@ -475,6 +478,7 @@ impl Inserter { /// Creates or alter tables on demand: /// - if table does not exist, create table by inferred CreateExpr /// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr` + /// or align the json columns' datatypes with insert values. /// /// Returns a mapping from table name to table id, where table name is the table name involved in the requests. /// This mapping is used in the conversion of RowToRegion. @@ -559,6 +563,10 @@ impl Inserter { )? { alter_tables.push(alter_expr); } + + if let Some(expr) = maybe_alter_json_column_type(ctx, &table, req)? { + alter_tables.push(expr); + } } None => { let create_expr = @@ -981,6 +989,86 @@ impl Inserter { } } +fn maybe_alter_json_column_type( + query_context: &QueryContextRef, + table: &TableRef, + request: &RowInsertRequest, +) -> Result> { + let Some(rows) = request.rows.as_ref() else { + return Ok(None); + }; + + // Fast path: skip altering json column type if insert request doesn't contain any json values. + let row_schema = &rows.schema; + if row_schema + .iter() + .all(|x| x.datatype() != ColumnDataType::Json) + { + return Ok(None); + } + let table_schema = table.schema_ref(); + let mut modify_column_types = vec![]; + + for value_schema in row_schema { + if let Some(column_schema) = table_schema.column_schema_by_name(&value_schema.column_name) + && let Some(column_type) = column_schema.data_type.as_json() + { + let value_type: ConcreteDataType = ColumnDataTypeWrapper::new( + value_schema.datatype(), + value_schema.datatype_extension.clone(), + ) + .into(); + let Some(value_type) = value_type.as_json() else { + return InvalidInsertRequestSnafu { + reason: format!( + "expecting json value for json column '{}', but found type {}", + column_schema.name, value_type + ), + } + .fail(); + }; + + if column_type.is_include(value_type) { + continue; + } + + let merged = { + let mut column_type = column_type.clone(); + column_type.merge(value_type).map_err(|e| { + InvalidInsertRequestSnafu { + reason: format!("insert json value is conflicting with column type: {e}"), + } + .build() + })?; + column_type + }; + + let (target_type, target_type_extension) = + ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(merged)) + .map(|x| x.into_parts()) + .context(ColumnDataTypeSnafu)?; + modify_column_types.push(ModifyColumnType { + column_name: column_schema.name.clone(), + target_type: target_type as i32, + target_type_extension, + }); + } + } + + if modify_column_types.is_empty() { + Ok(None) + } else { + Ok(Some(AlterTableExpr { + catalog_name: query_context.current_catalog().to_string(), + schema_name: query_context.current_schema(), + table_name: table.table_info().name.clone(), + kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes { + modify_column_types, + })), + })) + } +} + fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> { for request in &requests.inserts { let rows = request.rows.as_ref().unwrap(); diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 73ffb711a1..e9f656e545 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -776,6 +776,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + GreptimeProto { + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(transparent)] + Datatypes { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -893,6 +907,9 @@ impl ErrorExt for Error { FloatIsNan { .. } | InvalidEpochForResolution { .. } | UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments, + + GreptimeProto { source, .. } => source.status_code(), + Datatypes { source, .. } => source.status_code(), } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 6774842ef1..5b66b1faa8 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -19,13 +19,16 @@ use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; -use api::helper::proto_value_type; -use api::v1::column_data_type_extension::TypeExt; +use api::helper::{ColumnDataTypeWrapper, encode_json_value}; +use api::v1::column_def::{collect_column_options, options_from_column_schema}; use api::v1::value::ValueData; -use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType}; +use api::v1::{ColumnDataType, SemanticType}; use coerce::{coerce_columns, coerce_value}; use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::warn; +use datatypes::data_type::ConcreteDataType; +use datatypes::json::JsonStructureSettings; +use datatypes::value::Value; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; use jsonb::Number; @@ -33,12 +36,14 @@ use once_cell::sync::OnceCell; use serde_json as serde_json_crate; use session::context::Channel; use snafu::OptionExt; +use table::Table; use vrl::prelude::{Bytes, VrlValueConvert}; +use vrl::value::value::StdError; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ - IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result, - TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, + CoerceIncompatibleTypesSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, + Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu, }; use crate::etl::PipelineDocVersion; @@ -269,15 +274,75 @@ impl GreptimeTransformer { } } +#[derive(Clone)] +pub struct ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema, + semantic_type: SemanticType, +} + +impl From for ColumnMetadata { + fn from(value: ColumnSchema) -> Self { + let datatype = value.datatype(); + let semantic_type = value.semantic_type(); + let ColumnSchema { + column_name, + datatype: _, + semantic_type: _, + datatype_extension, + options, + } = value; + + let column_schema = datatypes::schema::ColumnSchema::new( + column_name, + ColumnDataTypeWrapper::new(datatype, datatype_extension).into(), + semantic_type != SemanticType::Timestamp, + ); + + let metadata = collect_column_options(options.as_ref()); + let column_schema = column_schema.with_metadata(metadata); + + Self { + column_schema, + semantic_type, + } + } +} + +impl TryFrom for ColumnSchema { + type Error = api::error::Error; + + fn try_from(value: ColumnMetadata) -> std::result::Result { + let ColumnMetadata { + column_schema, + semantic_type, + } = value; + + let options = options_from_column_schema(&column_schema); + + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?; + + Ok(ColumnSchema { + column_name: column_schema.name, + datatype: datatype as _, + semantic_type: semantic_type as _, + datatype_extension, + options, + }) + } +} + /// This is used to record the current state schema information and a sequential cache of field names. /// As you traverse the user input JSON, this will change. /// It will record a superset of all user input schemas. -#[derive(Debug, Default)] +#[derive(Default)] pub struct SchemaInfo { /// schema info - pub schema: Vec, + pub schema: Vec, /// index of the column name pub index: HashMap, + /// The pipeline's corresponding table (if already created). Useful to retrieve column schemas. + table: Option>, } impl SchemaInfo { @@ -285,6 +350,7 @@ impl SchemaInfo { Self { schema: Vec::with_capacity(capacity), index: HashMap::with_capacity(capacity), + table: None, } } @@ -294,46 +360,91 @@ impl SchemaInfo { index.insert(schema.column_name.clone(), i); } Self { - schema: schema_list, + schema: schema_list.into_iter().map(Into::into).collect(), index, + table: None, } } + + pub fn set_table(&mut self, table: Option>) { + self.table = table; + } + + fn find_column_schema_in_table(&self, column_name: &str) -> Option { + if let Some(table) = &self.table + && let Some(i) = table.schema_ref().column_index_by_name(column_name) + { + let column_schema = table.schema_ref().column_schemas()[i].clone(); + + let semantic_type = if column_schema.is_time_index() { + SemanticType::Timestamp + } else if table.table_info().meta.primary_key_indices.contains(&i) { + SemanticType::Tag + } else { + SemanticType::Field + }; + + Some(ColumnMetadata { + column_schema, + semantic_type, + }) + } else { + None + } + } + + pub fn column_schemas(&self) -> api::error::Result> { + self.schema + .iter() + .map(|x| x.clone().try_into()) + .collect::>>() + } } fn resolve_schema( index: Option, - value_data: ValueData, - column_schema: ColumnSchema, - row: &mut Vec, + pipeline_context: &PipelineContext, + column: &str, + value_type: &ConcreteDataType, schema_info: &mut SchemaInfo, ) -> Result<()> { if let Some(index) = index { - let api_value = GreptimeValue { - value_data: Some(value_data), - }; - // Safety unwrap is fine here because api_value is always valid - let value_column_data_type = proto_value_type(&api_value).unwrap(); - // Safety unwrap is fine here because index is always valid - let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype(); - if value_column_data_type != schema_column_data_type { - IdentifyPipelineColumnTypeMismatchSnafu { - column: column_schema.column_name, - expected: schema_column_data_type.as_str_name(), - actual: value_column_data_type.as_str_name(), + let column_type = &mut schema_info.schema[index].column_schema.data_type; + if value_type != column_type { + if let ConcreteDataType::Json(value_type) = value_type + && let ConcreteDataType::Json(column_type) = column_type + { + if !column_type.is_include(value_type) { + column_type.merge(value_type)?; + } + } else { + return IdentifyPipelineColumnTypeMismatchSnafu { + column, + expected: column_type.to_string(), + actual: value_type.to_string(), + } + .fail(); } - .fail() - } else { - row[index] = api_value; - Ok(()) } + Ok(()) } else { - let key = column_schema.column_name.clone(); + let column_schema = schema_info + .find_column_schema_in_table(column) + .unwrap_or_else(|| { + let semantic_type = decide_semantic(pipeline_context, column); + let column_schema = datatypes::schema::ColumnSchema::new( + column, + value_type.clone(), + semantic_type != SemanticType::Timestamp, + ); + ColumnMetadata { + column_schema, + semantic_type, + } + }); + let key = column.to_string(); schema_info.schema.push(column_schema); schema_info.index.insert(key, schema_info.schema.len() - 1); - let api_value = GreptimeValue { - value_data: Some(value_data), - }; - row.push(api_value); Ok(()) } } @@ -411,11 +522,11 @@ pub(crate) fn values_to_row( Ok(Row { values: row }) } -fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 { +fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType { if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() { - SemanticType::Tag as i32 + SemanticType::Tag } else { - SemanticType::Field as i32 + SemanticType::Field } } @@ -427,55 +538,54 @@ fn resolve_value( p_ctx: &PipelineContext, ) -> Result<()> { let index = schema_info.index.get(&column_name).copied(); - let mut resolve_simple_type = - |value_data: ValueData, column_name: String, data_type: ColumnDataType| { - let semantic_type = decide_semantic(p_ctx, &column_name); - resolve_schema( - index, - value_data, - ColumnSchema { - column_name, - datatype: data_type as i32, - semantic_type, - datatype_extension: None, - options: None, - }, - row, - schema_info, - ) - }; - match value { - VrlValue::Null => {} + let value_data = match value { + VrlValue::Null => None, VrlValue::Integer(v) => { // safe unwrap after type matched - resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?; + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::int64_datatype(), + schema_info, + )?; + Some(ValueData::I64Value(v)) } VrlValue::Float(v) => { // safe unwrap after type matched - resolve_simple_type( - ValueData::F64Value(v.into()), - column_name, - ColumnDataType::Float64, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::float64_datatype(), + schema_info, )?; + Some(ValueData::F64Value(v.into())) } VrlValue::Boolean(v) => { - resolve_simple_type( - ValueData::BoolValue(v), - column_name, - ColumnDataType::Boolean, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::boolean_datatype(), + schema_info, )?; + Some(ValueData::BoolValue(v)) } VrlValue::Bytes(v) => { - resolve_simple_type( - ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())), - column_name, - ColumnDataType::String, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::binary_datatype(), + schema_info, )?; + Some(ValueData::BinaryValue(v.into())) } VrlValue::Regex(v) => { @@ -483,42 +593,75 @@ fn resolve_value( "Persisting regex value in the table, this should not happen, column_name: {}", column_name ); - resolve_simple_type( - ValueData::StringValue(v.to_string()), - column_name, - ColumnDataType::String, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::string_datatype(), + schema_info, )?; + Some(ValueData::StringValue(v.to_string())) } VrlValue::Timestamp(ts) => { let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu { input: ts.to_rfc3339(), })?; - resolve_simple_type( - ValueData::TimestampNanosecondValue(ns), - column_name, - ColumnDataType::TimestampNanosecond, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::timestamp_nanosecond_datatype(), + schema_info, )?; + Some(ValueData::TimestampNanosecondValue(ns)) } VrlValue::Array(_) | VrlValue::Object(_) => { - let data = vrl_value_to_jsonb_value(&value); - resolve_schema( - index, - ValueData::BinaryValue(data.to_vec()), - ColumnSchema { - column_name, - datatype: ColumnDataType::Binary as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - options: None, - }, - row, - schema_info, - )?; + let is_json_native_type = schema_info + .find_column_schema_in_table(&column_name) + .is_some_and(|x| { + if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type { + column_type.is_native_type() + } else { + false + } + }); + + let value = if is_json_native_type { + let settings = JsonStructureSettings::Structured(None); + let value: serde_json::Value = value.try_into().map_err(|e: StdError| { + CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build() + })?; + let value = settings.encode(value)?; + + resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?; + + let Value::Json(value) = value else { + unreachable!() + }; + ValueData::JsonValue(encode_json_value(*value)) + } else { + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::json_datatype(), + schema_info, + )?; + + let value = vrl_value_to_jsonb_value(&value); + ValueData::BinaryValue(value.to_vec()) + }; + Some(value) } + }; + + let value = GreptimeValue { value_data }; + if let Some(index) = index { + row[index] = value; + } else { + row.push(value); } Ok(()) } @@ -556,20 +699,24 @@ fn identity_pipeline_inner( let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); // set time index column schema first - schema_info.schema.push(ColumnSchema { - column_name: custom_ts + let column_schema = datatypes::schema::ColumnSchema::new( + custom_ts .map(|ts| ts.get_column_name().to_string()) .unwrap_or_else(|| greptime_timestamp().to_string()), - datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| { - if pipeline_ctx.channel == Channel::Prometheus { - ColumnDataType::TimestampMillisecond - } else { - ColumnDataType::TimestampNanosecond - } - }) as i32, - semantic_type: SemanticType::Timestamp as i32, - datatype_extension: None, - options: None, + custom_ts + .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None))) + .unwrap_or_else(|| { + if pipeline_ctx.channel == Channel::Prometheus { + ConcreteDataType::timestamp_millisecond_datatype() + } else { + ConcreteDataType::timestamp_nanosecond_datatype() + } + }), + false, + ); + schema_info.schema.push(ColumnMetadata { + column_schema, + semantic_type: SemanticType::Timestamp, }); let mut opt_map = HashMap::new(); @@ -627,28 +774,29 @@ pub fn identity_pipeline( input.push(result); } - identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| { + identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| { if let Some(table) = table { let table_info = table.table_info(); for tag_name in table_info.meta.row_key_column_names() { if let Some(index) = schema.index.get(tag_name) { - schema.schema[*index].semantic_type = SemanticType::Tag as i32; + schema.schema[*index].semantic_type = SemanticType::Tag; } } } - opt_map + let column_schemas = schema.column_schemas()?; + Ok(opt_map .into_iter() .map(|(opt, rows)| { ( opt, Rows { - schema: schema.schema.clone(), + schema: column_schemas.clone(), rows, }, ) }) - .collect::>() + .collect::>()) }) } @@ -872,7 +1020,7 @@ mod tests { .map(|(mut schema, mut rows)| { for name in tag_column_names { if let Some(index) = schema.index.get(&name) { - schema.schema[*index].semantic_type = SemanticType::Tag as i32; + schema.schema[*index].semantic_type = SemanticType::Tag; } } @@ -880,7 +1028,7 @@ mod tests { let rows = rows.remove(&ContextOpt::default()).unwrap(); Rows { - schema: schema.schema, + schema: schema.column_schemas().unwrap(), rows, } }); diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index 09ea340235..0fba1ba2e3 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -57,7 +57,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { } Rows { - schema: schema_info.schema.clone(), + schema: schema_info.column_schemas().unwrap(), rows, } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 2e39f80c85..ba05e5cd21 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -651,6 +651,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + GreptimeProto { + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -777,6 +784,8 @@ impl ErrorExt for Error { HandleOtelArrowRequest { .. } => StatusCode::Internal, Cancelled { .. } => StatusCode::Cancelled, + + GreptimeProto { source, .. } => source.status_code(), } } diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 2390e374a1..24bb844dc7 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -31,7 +31,7 @@ use axum_extra::TypedHeader; use common_catalog::consts::default_engine; use common_error::ext::{BoxedError, ErrorExt}; use common_query::{Output, OutputData}; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{error, warn}; use headers::ContentType; use lazy_static::lazy_static; use mime_guess::mime; @@ -738,11 +738,6 @@ pub async fn log_ingester( let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; - debug!( - "receiving logs: {:?}", - serde_json::to_string(&value).unwrap() - ); - query_ctx.set_channel(Channel::Log); let query_ctx = Arc::new(query_ctx); diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index e6f1b064a3..f86959f325 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -152,7 +152,7 @@ pub async fn loki_ingest( rows.push(row); } - let schemas = schema_info.schema; + let schemas = schema_info.column_schemas()?; // fill Null for missing values for row in rows.iter_mut() { row.resize(schemas.len(), GreptimeValue::default()); @@ -746,13 +746,16 @@ fn process_labels( } else { // not exist // add schema and append to values - schemas.push(ColumnSchema { - column_name: k.clone(), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Tag.into(), - datatype_extension: None, - options: None, - }); + schemas.push( + ColumnSchema { + column_name: k.clone(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + datatype_extension: None, + options: None, + } + .into(), + ); column_indexer.insert(k, schemas.len() - 1); row.push(GreptimeValue { diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index c60990077b..41a715ddf7 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema( if let Some(index) = select_schema.index.get(key) { let column_schema = &select_schema.schema[*index]; + let column_schema: ColumnSchema = column_schema.clone().try_into()?; // datatype of the same column name should be the same ensure!( column_schema.datatype == schema.datatype, @@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema( ); extracted_values[*index] = value; } else { - select_schema.schema.push(schema); + select_schema.schema.push(schema.into()); select_schema .index .insert(key.clone(), select_schema.schema.len() - 1); @@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows( let mut parse_ctx = ParseContext::new(select_info); let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?; - schemas.extend(parse_ctx.select_schema.schema); + schemas.extend(parse_ctx.select_schema.column_schemas()?); rows.iter_mut().for_each(|row| { row.values.resize(schemas.len(), GreptimeValue::default()); diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 5a6710f420..67a6940bb1 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -136,12 +136,18 @@ async fn run_custom_pipeline( let mut schema_info = SchemaInfo::default(); schema_info .schema - .push(time_index_column_schema(ts_name, timeunit)); + .push(time_index_column_schema(ts_name, timeunit).into()); schema_info } }; + let table = handler + .get_table(&table_name, query_ctx) + .await + .context(CatalogSnafu)?; + schema_info.set_table(table); + for pipeline_map in pipeline_maps { let result = pipeline .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info) @@ -186,7 +192,7 @@ async fn run_custom_pipeline( RowInsertRequest { rows: Some(Rows { rows, - schema: schema_info.schema.clone(), + schema: schema_info.column_schemas()?, }), table_name, }, diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 5721f12af9..0b92028a67 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -105,6 +105,7 @@ paste.workspace = true pipeline.workspace = true prost.workspace = true rand.workspace = true +regex.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true tokio-postgres = { workspace = true } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f5c1542ef8..147c815e6b 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -42,6 +42,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME; use prost::Message; +use regex::Regex; use serde_json::{Value, json}; use servers::http::GreptimeQueryOutput; use servers::http::handler::HealthResponse; @@ -126,6 +127,7 @@ macro_rules! http_tests { test_pipeline_skip_error, test_pipeline_filter, test_pipeline_create_table, + test_pipeline_ingest_jsonbench_data, test_otlp_metrics_new, test_otlp_traces_v0, @@ -2701,6 +2703,95 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_ingest_jsonbench_data(store_type: StorageType) { + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_pipeline_ingest_jsonbench_data").await; + + let client = TestClient::new(app).await; + + // Create the pipeline for ingesting jsonbench data. + let pipeline = r#" +version: 2 + +processors: + - json_parse: + fields: + - message, log + ignore_missing: true + - simple_extract: + fields: + - log, time_us + key: "time_us" + ignore_missing: false + - epoch: + fields: + - time_us + resolution: microsecond + - select: + fields: + - time_us + - log + +transform: + - fields: + - time_us + type: epoch, us + index: timestamp +"#; + let response = client + .post("/v1/pipelines/jsonbench") + .header("Content-Type", "application/x-yaml") + .body(pipeline) + .send() + .await; + assert_eq!(response.status(), StatusCode::OK); + let pattern = + r#"\{"pipelines":\[\{"name":"jsonbench","version":"[^"]*"}],"execution_time_ms":\d+}"# + .parse::() + .unwrap(); + assert!(pattern.is_match(&response.text().await)); + + // Create the table for storing jsonbench data. + let sql = r#" +CREATE TABLE jsonbench(time_us TimestampMicrosecond TIME INDEX, `log` Json()) +"#; + let response = client + .post("/v1/sql") + .header("Content-Type", "application/x-www-form-urlencoded") + .body(format!("sql={sql}")) + .send() + .await; + assert_eq!(response.status(), StatusCode::OK); + let pattern = r#"\{"output":\[\{"affectedrows":0}],"execution_time_ms":\d+}"# + .parse::() + .unwrap(); + assert!(pattern.is_match(&response.text().await)); + + // Start ingesting jsonbench data. + // The input file only contains head 100 lines of the whole jsonbench test dataset. + let path = common_test_util::find_workspace_path( + "/tests-integration/resources/jsonbench-head-100.ndjson", + ); + // Jsonbench data do contain some malformed jsons that are meant to skip inserting. + let skip_error = true; + let response = client + .post(&format!( + "/v1/ingest?table=jsonbench&pipeline_name=jsonbench&skip_error={skip_error}" + )) + .header("Content-Type", "text/plain") + .body(std::fs::read(path).unwrap()) + .send() + .await; + assert_eq!(response.status(), StatusCode::OK); + // Note that this patten also matches the inserted rows: "74". + let pattern = r#"\{"output":\[\{"affectedrows":74}],"execution_time_ms":\d+}"# + .parse::() + .unwrap(); + assert!(pattern.is_match(&response.text().await)); + + guard.remove_all().await; +} + pub async fn test_pipeline_dispatcher(storage_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =