diff --git a/Cargo.lock b/Cargo.lock index 02f99d7290..57542b71a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9474,6 +9474,7 @@ dependencies = [ "ahash 0.8.12", "api", "arrow", + "arrow-schema", "async-trait", "catalog", "chrono", diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index b64e6d0265..4664c0434b 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -895,7 +895,7 @@ pub fn is_column_type_value_eq( .unwrap_or(false) } -fn encode_json_value(value: JsonValue) -> v1::JsonValue { +pub fn encode_json_value(value: JsonValue) -> v1::JsonValue { fn helper(json: JsonVariant) -> v1::JsonValue { let value = match json { JsonVariant::Null => None, diff --git a/src/api/src/v1/column_def.rs b/src/api/src/v1/column_def.rs index 88ee0c5749..f7ec3d525f 100644 --- a/src/api/src/v1/column_def.rs +++ b/src/api/src/v1/column_def.rs @@ -17,8 +17,8 @@ use std::collections::HashMap; use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY}; use datatypes::schema::{ COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer, - FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions, - SkippingIndexType, + FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY, + SkippingIndexOptions, SkippingIndexType, }; use greptime_proto::v1::{ Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType, @@ -131,6 +131,31 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) -> }) } +/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema]. +pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata { + let mut metadata = Metadata::default(); + let Some(ColumnOptions { options }) = column_options else { + return metadata; + }; + + if let Some(v) = options.get(FULLTEXT_GRPC_KEY) { + metadata.insert(FULLTEXT_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(INVERTED_INDEX_GRPC_KEY) { + metadata.insert(INVERTED_INDEX_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(SKIPPING_INDEX_GRPC_KEY) { + metadata.insert(SKIPPING_INDEX_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(EXTENSION_TYPE_NAME_KEY) { + metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), v.clone()); + } + if let Some(v) = options.get(EXTENSION_TYPE_METADATA_KEY) { + metadata.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), v.clone()); + } + metadata +} + /// Constructs a `ColumnOptions` from the given `ColumnSchema`. pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option { let mut options = ColumnOptions::default(); diff --git a/src/datatypes/src/json.rs b/src/datatypes/src/json.rs index a790b900af..3bebbf89aa 100644 --- a/src/datatypes/src/json.rs +++ b/src/datatypes/src/json.rs @@ -816,7 +816,7 @@ mod tests { let result = encode_by_struct(&json_struct, json); assert_eq!( result.unwrap_err().to_string(), - "Cannot cast value bar to Number(I64)" + r#"Cannot cast value bar to """# ); let json = json!({ diff --git a/src/datatypes/src/types/json_type.rs b/src/datatypes/src/types/json_type.rs index 4c838b78d1..cbcc9d79e0 100644 --- a/src/datatypes/src/types/json_type.rs +++ b/src/datatypes/src/types/json_type.rs @@ -13,7 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::str::FromStr; use std::sync::Arc; @@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType { impl Display for JsonNativeType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - JsonNativeType::Null => write!(f, "Null"), - JsonNativeType::Bool => write!(f, "Bool"), - JsonNativeType::Number(t) => { - write!(f, "Number({t:?})") - } - JsonNativeType::String => write!(f, "String"), - JsonNativeType::Array(item_type) => { - write!(f, "Array[{}]", item_type) - } - JsonNativeType::Object(object) => { - write!( - f, - "Object{{{}}}", + fn to_serde_value(t: &JsonNativeType) -> serde_json::Value { + match t { + JsonNativeType::Null => serde_json::Value::String("".to_string()), + JsonNativeType::Bool => serde_json::Value::String("".to_string()), + JsonNativeType::Number(_) => serde_json::Value::String("".to_string()), + JsonNativeType::String => serde_json::Value::String("".to_string()), + JsonNativeType::Array(item_type) => { + serde_json::Value::Array(vec![to_serde_value(item_type)]) + } + JsonNativeType::Object(object) => serde_json::Value::Object( object .iter() - .map(|(k, v)| format!(r#""{k}": {v}"#)) - .collect::>() - .join(", ") - ) + .map(|(k, v)| (k.clone(), to_serde_value(v))) + .collect(), + ), } } + write!(f, "{}", to_serde_value(self)) } } @@ -183,7 +179,11 @@ impl JsonType { } } - pub(crate) fn native_type(&self) -> &JsonNativeType { + pub fn is_native_type(&self) -> bool { + matches!(self.format, JsonFormat::Native(_)) + } + + pub fn native_type(&self) -> &JsonNativeType { match &self.format { JsonFormat::Jsonb => &JsonNativeType::String, JsonFormat::Native(x) => x.as_ref(), @@ -650,15 +650,16 @@ mod tests { "list": [1, 2, 3], "object": {"a": 1} }"#; - let expected = r#"Json"#; + let expected = + r#"Json<{"hello":"","list":[""],"object":{"a":""}}>"#; test(json, json_type, Ok(expected))?; // cannot merge with other non-object json values: let jsons = [r#""s""#, "1", "[1]"]; let expects = [ - r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#, - r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#, - r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#, + r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"","list":[""],"object":{"a":""}}, that: """#, + r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"","list":[""],"object":{"a":""}}, that: """#, + r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"","list":[""],"object":{"a":""}}, that: [""]"#, ]; for (json, expect) in jsons.into_iter().zip(expects.into_iter()) { test(json, json_type, Err(expect))?; @@ -670,7 +671,7 @@ mod tests { "float": 0.123, "no": 42 }"#; - let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#; + let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: "", that: """#; test(json, json_type, Err(expected))?; // can merge with another json object: @@ -679,7 +680,7 @@ mod tests { "float": 0.123, "int": 42 }"#; - let expected = r#"Json"#; + let expected = r#"Json<{"float":"","hello":"","int":"","list":[""],"object":{"a":""}}>"#; test(json, json_type, Ok(expected))?; // can merge with some complex nested json object: @@ -689,7 +690,7 @@ mod tests { "float": 0.456, "int": 0 }"#; - let expected = r#"Json"#; + let expected = r#"Json<{"float":"","hello":"","int":"","list":[""],"object":{"a":"","foo":"","l":[""],"o":{"key":""}}}>"#; test(json, json_type, Ok(expected))?; Ok(()) diff --git a/src/datatypes/src/vectors/json/builder.rs b/src/datatypes/src/vectors/json/builder.rs index 3a32dda171..ecc19f4bdd 100644 --- a/src/datatypes/src/vectors/json/builder.rs +++ b/src/datatypes/src/vectors/json/builder.rs @@ -321,10 +321,10 @@ mod tests { Ok(()), Ok(()), Err( - "Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String", + r#"Failed to merge JSON datatype: datatypes have conflict, this: "", that: """#, ), Err( - "Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]", + r#"Failed to merge JSON datatype: datatypes have conflict, this: "", that: [""]"#, ), ]; let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1); @@ -396,12 +396,12 @@ mod tests { // test children builders: assert_eq!(builder.builders.len(), 6); let expect_types = [ - r#"Json"#, - r#"Json"#, - r#"Json"#, - r#"Json"#, - r#"Json"#, - r#"Json"#, + r#"Json<{"list":[""],"s":""}>"#, + r#"Json<{"float":"","s":""}>"#, + r#"Json<{"float":"","int":""}>"#, + r#"Json<{"int":"","object":{"hello":"","timestamp":""}}>"#, + r#"Json<{"nested":{"a":{"b":{"b":{"a":""}}}},"object":{"timestamp":""}}>"#, + r#"Json<{"nested":{"a":{"b":{"a":{"b":""}}}},"object":{"timestamp":""}}>"#, ]; let expect_vectors = [ r#" @@ -456,7 +456,7 @@ mod tests { } // test final merged json type: - let expected = r#"Json"#; + let expected = r#"Json<{"float":"","int":"","list":[""],"nested":{"a":{"b":{"a":{"b":""},"b":{"a":""}}}},"object":{"hello":"","timestamp":""},"s":""}>"#; assert_eq!(builder.data_type().to_string(), expected); // test final produced vector: diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 7c856a86bd..33ac51112d 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -13,6 +13,7 @@ workspace = true ahash.workspace = true api.workspace = true arrow.workspace = true +arrow-schema.workspace = true async-trait.workspace = true catalog.workspace = true chrono.workspace = true diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 651f1cd4a9..06ab7cc4b9 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -800,6 +800,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + GreptimeProto { + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(transparent)] + Datatypes { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -920,6 +934,9 @@ impl ErrorExt for Error { FloatIsNan { .. } | InvalidEpochForResolution { .. } | UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments, + + GreptimeProto { source, .. } => source.status_code(), + Datatypes { source, .. } => source.status_code(), } } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 85494b24dc..35db9aa5e7 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -19,13 +19,17 @@ use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; use ahash::{HashMap, HashMapExt}; -use api::helper::proto_value_type; -use api::v1::column_data_type_extension::TypeExt; +use api::helper::{ColumnDataTypeWrapper, encode_json_value}; +use api::v1::column_def::{collect_column_options, options_from_column_schema}; use api::v1::value::ValueData; -use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType}; +use api::v1::{ColumnDataType, SemanticType}; +use arrow_schema::extension::ExtensionType; use coerce::{coerce_columns, coerce_value}; use common_query::prelude::{greptime_timestamp, greptime_value}; use common_telemetry::warn; +use datatypes::data_type::ConcreteDataType; +use datatypes::extension::json::JsonExtensionType; +use datatypes::value::Value; use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue}; use itertools::Itertools; use jsonb::Number; @@ -33,12 +37,15 @@ use once_cell::sync::OnceCell; use serde_json as serde_json_crate; use session::context::Channel; use snafu::OptionExt; +use table::Table; use vrl::prelude::{Bytes, VrlValueConvert}; +use vrl::value::value::StdError; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ - ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, - Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, + ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu, + IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result, + TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu, }; use crate::etl::PipelineDocVersion; @@ -272,15 +279,75 @@ impl GreptimeTransformer { } } +#[derive(Clone)] +pub struct ColumnMetadata { + column_schema: datatypes::schema::ColumnSchema, + semantic_type: SemanticType, +} + +impl From for ColumnMetadata { + fn from(value: ColumnSchema) -> Self { + let datatype = value.datatype(); + let semantic_type = value.semantic_type(); + let ColumnSchema { + column_name, + datatype: _, + semantic_type: _, + datatype_extension, + options, + } = value; + + let column_schema = datatypes::schema::ColumnSchema::new( + column_name, + ColumnDataTypeWrapper::new(datatype, datatype_extension).into(), + semantic_type != SemanticType::Timestamp, + ); + + let metadata = collect_column_options(options.as_ref()); + let column_schema = column_schema.with_metadata(metadata); + + Self { + column_schema, + semantic_type, + } + } +} + +impl TryFrom for ColumnSchema { + type Error = api::error::Error; + + fn try_from(value: ColumnMetadata) -> std::result::Result { + let ColumnMetadata { + column_schema, + semantic_type, + } = value; + + let options = options_from_column_schema(&column_schema); + + let (datatype, datatype_extension) = + ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?; + + Ok(ColumnSchema { + column_name: column_schema.name, + datatype: datatype as _, + semantic_type: semantic_type as _, + datatype_extension, + options, + }) + } +} + /// This is used to record the current state schema information and a sequential cache of field names. /// As you traverse the user input JSON, this will change. /// It will record a superset of all user input schemas. -#[derive(Debug, Default)] +#[derive(Default)] pub struct SchemaInfo { /// schema info - pub schema: Vec, + pub schema: Vec, /// index of the column name pub index: HashMap, + /// The pipeline's corresponding table (if already created). Useful to retrieve column schemas. + table: Option>, } impl SchemaInfo { @@ -288,6 +355,7 @@ impl SchemaInfo { Self { schema: Vec::with_capacity(capacity), index: HashMap::with_capacity(capacity), + table: None, } } @@ -297,46 +365,88 @@ impl SchemaInfo { index.insert(schema.column_name.clone(), i); } Self { - schema: schema_list, + schema: schema_list.into_iter().map(Into::into).collect(), index, + table: None, } } + + pub fn set_table(&mut self, table: Option>) { + self.table = table; + } + + fn find_column_schema_in_table(&self, column_name: &str) -> Option { + if let Some(table) = &self.table + && let Some(i) = table.schema_ref().column_index_by_name(column_name) + { + let column_schema = table.schema_ref().column_schemas()[i].clone(); + + let semantic_type = if column_schema.is_time_index() { + SemanticType::Timestamp + } else if table.table_info().meta.primary_key_indices.contains(&i) { + SemanticType::Tag + } else { + SemanticType::Field + }; + + Some(ColumnMetadata { + column_schema, + semantic_type, + }) + } else { + None + } + } + + pub fn column_schemas(&self) -> api::error::Result> { + self.schema + .iter() + .map(|x| x.clone().try_into()) + .collect::>>() + } } fn resolve_schema( index: Option, - value_data: ValueData, - column_schema: ColumnSchema, - row: &mut Vec, + pipeline_context: &PipelineContext, + column: &str, + value_type: &ConcreteDataType, schema_info: &mut SchemaInfo, ) -> Result<()> { if let Some(index) = index { - let api_value = GreptimeValue { - value_data: Some(value_data), - }; - // Safety unwrap is fine here because api_value is always valid - let value_column_data_type = proto_value_type(&api_value).unwrap(); - // Safety unwrap is fine here because index is always valid - let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype(); - if value_column_data_type != schema_column_data_type { - IdentifyPipelineColumnTypeMismatchSnafu { - column: column_schema.column_name, - expected: schema_column_data_type.as_str_name(), - actual: value_column_data_type.as_str_name(), + let column_type = &mut schema_info.schema[index].column_schema.data_type; + match (column_type, value_type) { + (ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type)) + if column_type.is_include(value_type) => + { + Ok(()) } - .fail() - } else { - row[index] = api_value; - Ok(()) + (column_type, value_type) if column_type == value_type => Ok(()), + (column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu { + column, + expected: column_type.to_string(), + actual: value_type.to_string(), + } + .fail(), } } else { - let key = column_schema.column_name.clone(); + let column_schema = schema_info + .find_column_schema_in_table(column) + .unwrap_or_else(|| { + let semantic_type = decide_semantic(pipeline_context, column); + let column_schema = datatypes::schema::ColumnSchema::new( + column, + value_type.clone(), + semantic_type != SemanticType::Timestamp, + ); + ColumnMetadata { + column_schema, + semantic_type, + } + }); + let key = column.to_string(); schema_info.schema.push(column_schema); schema_info.index.insert(key, schema_info.schema.len() - 1); - let api_value = GreptimeValue { - value_data: Some(value_data), - }; - row.push(api_value); Ok(()) } } @@ -481,11 +591,11 @@ pub(crate) fn values_to_row( Ok(Row { values: row }) } -fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 { +fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType { if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() { - SemanticType::Tag as i32 + SemanticType::Tag } else { - SemanticType::Field as i32 + SemanticType::Field } } @@ -497,55 +607,56 @@ fn resolve_value( p_ctx: &PipelineContext, ) -> Result<()> { let index = schema_info.index.get(&column_name).copied(); - let mut resolve_simple_type = - |value_data: ValueData, column_name: String, data_type: ColumnDataType| { - let semantic_type = decide_semantic(p_ctx, &column_name); - resolve_schema( - index, - value_data, - ColumnSchema { - column_name, - datatype: data_type as i32, - semantic_type, - datatype_extension: None, - options: None, - }, - row, - schema_info, - ) - }; - match value { - VrlValue::Null => {} + let value_data = match value { + VrlValue::Null => None, VrlValue::Integer(v) => { // safe unwrap after type matched - resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?; + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::int64_datatype(), + schema_info, + )?; + Some(ValueData::I64Value(v)) } VrlValue::Float(v) => { // safe unwrap after type matched - resolve_simple_type( - ValueData::F64Value(v.into()), - column_name, - ColumnDataType::Float64, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::float64_datatype(), + schema_info, )?; + Some(ValueData::F64Value(v.into())) } VrlValue::Boolean(v) => { - resolve_simple_type( - ValueData::BoolValue(v), - column_name, - ColumnDataType::Boolean, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::boolean_datatype(), + schema_info, )?; + Some(ValueData::BoolValue(v)) } VrlValue::Bytes(v) => { - resolve_simple_type( - ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())), - column_name, - ColumnDataType::String, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::string_datatype(), + schema_info, )?; + Some(ValueData::StringValue(String::from_utf8_lossy_owned( + v.to_vec(), + ))) } VrlValue::Regex(v) => { @@ -553,42 +664,83 @@ fn resolve_value( "Persisting regex value in the table, this should not happen, column_name: {}", column_name ); - resolve_simple_type( - ValueData::StringValue(v.to_string()), - column_name, - ColumnDataType::String, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::string_datatype(), + schema_info, )?; + Some(ValueData::StringValue(v.to_string())) } VrlValue::Timestamp(ts) => { let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu { input: ts.to_rfc3339(), })?; - resolve_simple_type( - ValueData::TimestampNanosecondValue(ns), - column_name, - ColumnDataType::TimestampNanosecond, + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::timestamp_nanosecond_datatype(), + schema_info, )?; + Some(ValueData::TimestampNanosecondValue(ns)) } VrlValue::Array(_) | VrlValue::Object(_) => { - let data = vrl_value_to_jsonb_value(&value); - resolve_schema( - index, - ValueData::BinaryValue(data.to_vec()), - ColumnSchema { - column_name, - datatype: ColumnDataType::Binary as i32, - semantic_type: SemanticType::Field as i32, - datatype_extension: Some(ColumnDataTypeExtension { - type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), - }), - options: None, - }, - row, - schema_info, - )?; + let is_json_native_type = schema_info + .find_column_schema_in_table(&column_name) + .is_some_and(|x| { + if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type { + column_type.is_native_type() + } else { + false + } + }); + + let value = if is_json_native_type { + let json_extension_type: Option = + if let Some(x) = schema_info.find_column_schema_in_table(&column_name) { + x.column_schema.extension_type()? + } else { + None + }; + let settings = json_extension_type + .and_then(|x| x.metadata().json_structure_settings.clone()) + .unwrap_or_default(); + let value: serde_json::Value = value.try_into().map_err(|e: StdError| { + CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build() + })?; + let value = settings.encode(value)?; + + resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?; + + let Value::Json(value) = value else { + unreachable!() + }; + ValueData::JsonValue(encode_json_value(*value)) + } else { + resolve_schema( + index, + p_ctx, + &column_name, + &ConcreteDataType::binary_datatype(), + schema_info, + )?; + + let value = vrl_value_to_jsonb_value(&value); + ValueData::BinaryValue(value.to_vec()) + }; + Some(value) } + }; + + let value = GreptimeValue { value_data }; + if let Some(index) = index { + row[index] = value; + } else { + row.push(value); } Ok(()) } @@ -626,20 +778,24 @@ fn identity_pipeline_inner( let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); // set time index column schema first - schema_info.schema.push(ColumnSchema { - column_name: custom_ts + let column_schema = datatypes::schema::ColumnSchema::new( + custom_ts .map(|ts| ts.get_column_name().to_string()) .unwrap_or_else(|| greptime_timestamp().to_string()), - datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| { - if pipeline_ctx.channel == Channel::Prometheus { - ColumnDataType::TimestampMillisecond - } else { - ColumnDataType::TimestampNanosecond - } - }) as i32, - semantic_type: SemanticType::Timestamp as i32, - datatype_extension: None, - options: None, + custom_ts + .map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None))) + .unwrap_or_else(|| { + if pipeline_ctx.channel == Channel::Prometheus { + ConcreteDataType::timestamp_millisecond_datatype() + } else { + ConcreteDataType::timestamp_nanosecond_datatype() + } + }), + false, + ); + schema_info.schema.push(ColumnMetadata { + column_schema, + semantic_type: SemanticType::Timestamp, }); let mut opt_map = HashMap::new(); @@ -697,28 +853,29 @@ pub fn identity_pipeline( input.push(result); } - identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| { + identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| { if let Some(table) = table { let table_info = table.table_info(); for tag_name in table_info.meta.row_key_column_names() { if let Some(index) = schema.index.get(tag_name) { - schema.schema[*index].semantic_type = SemanticType::Tag as i32; + schema.schema[*index].semantic_type = SemanticType::Tag; } } } - opt_map + let column_schemas = schema.column_schemas()?; + Ok(opt_map .into_iter() .map(|(opt, rows)| { ( opt, Rows { - schema: schema.schema.clone(), + schema: column_schemas.clone(), rows, }, ) }) - .collect::>() + .collect::>()) }) } @@ -850,7 +1007,7 @@ mod tests { assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), - "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(), + "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(), ); } { @@ -879,7 +1036,7 @@ mod tests { assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), - "Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(), + "Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(), ); } { @@ -942,7 +1099,7 @@ mod tests { .map(|(mut schema, mut rows)| { for name in tag_column_names { if let Some(index) = schema.index.get(&name) { - schema.schema[*index].semantic_type = SemanticType::Tag as i32; + schema.schema[*index].semantic_type = SemanticType::Tag; } } @@ -950,7 +1107,7 @@ mod tests { let rows = rows.remove(&ContextOpt::default()).unwrap(); Rows { - schema: schema.schema, + schema: schema.column_schemas().unwrap(), rows, } }); diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index b102bede02..976999fb6b 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -61,7 +61,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { } Rows { - schema: schema_info.schema.clone(), + schema: schema_info.column_schemas().unwrap(), rows, } } diff --git a/src/pipeline/tests/json_parse.rs b/src/pipeline/tests/json_parse.rs index 9bb5ee4f95..37979745ed 100644 --- a/src/pipeline/tests/json_parse.rs +++ b/src/pipeline/tests/json_parse.rs @@ -52,7 +52,7 @@ transform: // check schema assert_eq!(output.schema[0].column_name, "commit"); - let type_id: i32 = ColumnDataType::Binary.into(); + let type_id: i32 = ColumnDataType::Json.into(); assert_eq!(output.schema[0].datatype, type_id); // check value @@ -91,7 +91,7 @@ transform: // check schema assert_eq!(output.schema[0].column_name, "commit_json"); - let type_id: i32 = ColumnDataType::Binary.into(); + let type_id: i32 = ColumnDataType::Json.into(); assert_eq!(output.schema[0].datatype, type_id); // check value @@ -160,7 +160,7 @@ transform: // check schema assert_eq!(output.schema[0].column_name, "commit"); - let type_id: i32 = ColumnDataType::Binary.into(); + let type_id: i32 = ColumnDataType::Json.into(); assert_eq!(output.schema[0].datatype, type_id); // check value diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index edbc0e2038..cdcb91bd13 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -664,6 +664,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + GreptimeProto { + source: api::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -794,6 +801,8 @@ impl ErrorExt for Error { Suspended { .. } => StatusCode::Suspended, MemoryLimitExceeded { .. } => StatusCode::RateLimited, + + GreptimeProto { source, .. } => source.status_code(), } } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index dbb2975533..ab4caafc34 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -23,6 +23,7 @@ pub mod memory_limit; pub mod prom_query_gateway; pub mod region_server; +use std::any::Any; use std::net::SocketAddr; use std::time::Duration; @@ -399,4 +400,8 @@ impl Server for GrpcServer { fn bind_addr(&self) -> Option { self.bind_addr } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index b99e8136df..2991824994 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1285,6 +1285,10 @@ impl Server for HttpServer { fn bind_addr(&self) -> Option { self.bind_addr } + + fn as_any(&self) -> &dyn std::any::Any { + self + } } #[cfg(test)] diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 2390e374a1..24bb844dc7 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -31,7 +31,7 @@ use axum_extra::TypedHeader; use common_catalog::consts::default_engine; use common_error::ext::{BoxedError, ErrorExt}; use common_query::{Output, OutputData}; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{error, warn}; use headers::ContentType; use lazy_static::lazy_static; use mime_guess::mime; @@ -738,11 +738,6 @@ pub async fn log_ingester( let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?; - debug!( - "receiving logs: {:?}", - serde_json::to_string(&value).unwrap() - ); - query_ctx.set_channel(Channel::Log); let query_ctx = Arc::new(query_ctx); diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index e6f1b064a3..f86959f325 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -152,7 +152,7 @@ pub async fn loki_ingest( rows.push(row); } - let schemas = schema_info.schema; + let schemas = schema_info.column_schemas()?; // fill Null for missing values for row in rows.iter_mut() { row.resize(schemas.len(), GreptimeValue::default()); @@ -746,13 +746,16 @@ fn process_labels( } else { // not exist // add schema and append to values - schemas.push(ColumnSchema { - column_name: k.clone(), - datatype: ColumnDataType::String.into(), - semantic_type: SemanticType::Tag.into(), - datatype_extension: None, - options: None, - }); + schemas.push( + ColumnSchema { + column_name: k.clone(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Tag.into(), + datatype_extension: None, + options: None, + } + .into(), + ); column_indexer.insert(k, schemas.len() - 1); row.push(GreptimeValue { diff --git a/src/servers/src/mysql/server.rs b/src/servers/src/mysql/server.rs index bda027ca55..9f827c7021 100644 --- a/src/servers/src/mysql/server.rs +++ b/src/servers/src/mysql/server.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; @@ -265,4 +266,8 @@ impl Server for MysqlServer { fn bind_addr(&self) -> Option { self.bind_addr } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index c60990077b..41a715ddf7 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema( if let Some(index) = select_schema.index.get(key) { let column_schema = &select_schema.schema[*index]; + let column_schema: ColumnSchema = column_schema.clone().try_into()?; // datatype of the same column name should be the same ensure!( column_schema.datatype == schema.datatype, @@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema( ); extracted_values[*index] = value; } else { - select_schema.schema.push(schema); + select_schema.schema.push(schema.into()); select_schema .index .insert(key.clone(), select_schema.schema.len() - 1); @@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows( let mut parse_ctx = ParseContext::new(select_info); let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?; - schemas.extend(parse_ctx.select_schema.schema); + schemas.extend(parse_ctx.select_schema.column_schemas()?); rows.iter_mut().for_each(|row| { row.values.resize(schemas.len(), GreptimeValue::default()); diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index f1e4138e63..fe7a2f48f3 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -135,12 +135,18 @@ async fn run_custom_pipeline( let mut schema_info = SchemaInfo::default(); schema_info .schema - .push(time_index_column_schema(ts_name, timeunit)); + .push(time_index_column_schema(ts_name, timeunit).into()); schema_info } }; + let table = handler + .get_table(&table_name, query_ctx) + .await + .context(CatalogSnafu)?; + schema_info.set_table(table); + for pipeline_map in pipeline_maps { let result = pipeline .exec_mut(pipeline_map, pipeline_ctx, &mut schema_info) @@ -194,7 +200,7 @@ async fn run_custom_pipeline( RowInsertRequest { rows: Some(Rows { rows, - schema: schema_info.schema.clone(), + schema: schema_info.column_schemas()?, }), table_name: table_name.clone(), }, diff --git a/src/servers/src/postgres/server.rs b/src/servers/src/postgres/server.rs index 3478a6da78..09bc5015a0 100644 --- a/src/servers/src/postgres/server.rs +++ b/src/servers/src/postgres/server.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; @@ -144,4 +145,8 @@ impl Server for PostgresServer { fn bind_addr(&self) -> Option { self.bind_addr } + + fn as_any(&self) -> &dyn Any { + self + } } diff --git a/src/servers/src/server.rs b/src/servers/src/server.rs index 6a3028f63c..611f208e0e 100644 --- a/src/servers/src/server.rs +++ b/src/servers/src/server.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::any::Any; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::SocketAddr; @@ -147,6 +148,8 @@ pub trait Server: Send + Sync { fn bind_addr(&self) -> Option { None } + + fn as_any(&self) -> &dyn Any; } struct AcceptTask { diff --git a/tests-integration/resources/jsonbench-select-all.txt b/tests-integration/resources/jsonbench-select-all.txt index adee15ac99..25423f9e80 100644 --- a/tests-integration/resources/jsonbench-select-all.txt +++ b/tests-integration/resources/jsonbench-select-all.txt @@ -1,14 +1,14 @@ -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+ -| data | ts | -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+ -| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 1970-01-01T00:00:00.001 | -| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 1970-01-01T00:00:00.002 | -| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 1970-01-01T00:00:00.003 | -| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 1970-01-01T00:00:00.004 | -| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 1970-01-01T00:00:00.005 | -| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 1970-01-01T00:00:00.006 | -| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 1970-01-01T00:00:00.007 | -| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 1970-01-01T00:00:00.008 | -| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n‘ It was nothing like that — . I was only thinking . . . ’\n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 1970-01-01T00:00:00.009 | -| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 1970-01-01T00:00:00.010 | -+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+ ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+ +| data | time_us | ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+ +| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 2024-11-21T16:25:49.000167 | +| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 2024-11-21T16:25:49.000644 | +| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 2024-11-21T16:25:49.001108 | +| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 2024-11-21T16:25:49.001372 | +| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 2024-11-21T16:25:49.001905 | +| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 2024-11-21T16:25:49.002758 | +| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 2024-11-21T16:25:49.003106 | +| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 2024-11-21T16:25:49.003461 | +| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n‘ It was nothing like that — . I was only thinking . . . ’\n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 2024-11-21T16:25:49.003907 | +| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 2024-11-21T16:25:49.004769 | ++------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+ diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index f50bf67e33..b6e12903e9 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -48,9 +48,9 @@ use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError}; use frontend::frontend::Frontend; use frontend::instance::builder::FrontendBuilder; use frontend::instance::{Instance, StandaloneDatanodeManager}; +use frontend::server::Services; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; use servers::grpc::GrpcOptions; -use servers::server::ServerHandlers; use snafu::ResultExt; use standalone::options::StandaloneOptions; @@ -249,7 +249,7 @@ impl GreptimeDbStandaloneBuilder { procedure_executor.clone(), Arc::new(ProcessManager::new(server_addr, None)), ) - .with_plugin(plugins) + .with_plugin(plugins.clone()) .try_build() .await .unwrap(); @@ -282,14 +282,15 @@ impl GreptimeDbStandaloneBuilder { test_util::prepare_another_catalog_and_schema(&instance).await; - let mut frontend = Frontend { + let servers = Services::new(opts.clone(), instance.clone(), plugins) + .build() + .unwrap(); + let frontend = Frontend { instance, - servers: ServerHandlers::default(), + servers, heartbeat_task: None, }; - frontend.start().await.unwrap(); - GreptimeDbStandalone { frontend: Arc::new(frontend), opts, diff --git a/tests-integration/tests/jsonbench.rs b/tests-integration/tests/jsonbench.rs index 076096814d..60f699c4ce 100644 --- a/tests-integration/tests/jsonbench.rs +++ b/tests-integration/tests/jsonbench.rs @@ -18,14 +18,114 @@ use std::{fs, io}; use common_test_util::find_workspace_path; use frontend::instance::Instance; +use http::StatusCode; +use servers::http::test_helpers::TestClient; +use servers::http::{HTTP_SERVER, HttpServer}; +use servers::server::ServerHandlers; use tests_integration::standalone::GreptimeDbStandaloneBuilder; use tests_integration::test_util::execute_sql_and_expect; -#[tokio::test] -async fn test_load_jsonbench_data() { +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_load_jsonbench_data_by_pipeline() -> io::Result<()> { common_telemetry::init_default_ut_logging(); - let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data") + let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_pipeline") + .build() + .await; + let frontend = instance.fe_instance(); + + let ServerHandlers::Init(handlers) = instance.frontend.server_handlers() else { + unreachable!() + }; + let router = { + let handlers = handlers.lock().unwrap(); + let server = handlers + .get(HTTP_SERVER) + .and_then(|x| x.0.as_any().downcast_ref::()) + .unwrap(); + server.build(server.make_app()).unwrap() + }; + let client = TestClient::new(router).await; + + create_table(frontend).await; + + desc_table(frontend).await; + + create_pipeline(&client).await; + + insert_data_by_pipeline(&client).await?; + + query_data(frontend).await +} + +async fn insert_data_by_pipeline(client: &TestClient) -> io::Result<()> { + let file = fs::read(find_workspace_path( + "tests-integration/resources/jsonbench-head-10.ndjson", + ))?; + + let response = client + .post("/v1/ingest?table=bluesky&pipeline_name=jsonbench") + .header("Content-Type", "text/plain") + .body(file) + .send() + .await; + assert_eq!(response.status(), StatusCode::OK); + + let response = response.text().await; + // Note that this pattern also matches the inserted rows: "10". + let pattern = r#"{"output":[{"affectedrows":10}]"#; + assert!(response.starts_with(pattern)); + Ok(()) +} + +async fn create_pipeline(client: &TestClient) { + let pipeline = r#" +version: 2 + +processors: + - json_parse: + fields: + - message, data + ignore_missing: true + - simple_extract: + fields: + - data, time_us + key: "time_us" + ignore_missing: false + - epoch: + fields: + - time_us + resolution: microsecond + - select: + fields: + - time_us + - data + +transform: + - fields: + - time_us + type: epoch, us + index: timestamp +"#; + + let response = client + .post("/v1/pipelines/jsonbench") + .header("Content-Type", "application/x-yaml") + .body(pipeline) + .send() + .await; + assert_eq!(response.status(), StatusCode::OK); + + let response = response.text().await; + let pattern = r#"{"pipelines":[{"name":"jsonbench""#; + assert!(response.starts_with(pattern)); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_load_jsonbench_data_by_sql() -> io::Result<()> { + common_telemetry::init_default_ut_logging(); + + let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_sql") .build() .await; let frontend = instance.fe_instance(); @@ -34,9 +134,9 @@ async fn test_load_jsonbench_data() { desc_table(frontend).await; - insert_data(frontend).await.unwrap(); + insert_data_by_sql(frontend).await?; - query_data(frontend).await.unwrap(); + query_data(frontend).await } async fn query_data(frontend: &Arc) -> io::Result<()> { @@ -46,22 +146,21 @@ async fn query_data(frontend: &Arc) -> io::Result<()> { | count(*) | +----------+ | 10 | -+----------+ -"#; ++----------+"#; execute_sql_and_expect(frontend, sql, expected).await; - let sql = "SELECT * FROM bluesky ORDER BY ts"; + let sql = "SELECT * FROM bluesky ORDER BY time_us"; let expected = fs::read_to_string(find_workspace_path( "tests-integration/resources/jsonbench-select-all.txt", ))?; execute_sql_and_expect(frontend, sql, &expected).await; // query 1: - let sql = "\ -SELECT \ - json_get_string(data, '$.commit.collection') AS event, count() AS count \ -FROM bluesky \ -GROUP BY event \ + let sql = " +SELECT + json_get_string(data, '$.commit.collection') AS event, count() AS count +FROM bluesky +GROUP BY event ORDER BY count DESC, event ASC"; let expected = r#" +-----------------------+-------+ @@ -75,16 +174,16 @@ ORDER BY count DESC, event ASC"; execute_sql_and_expect(frontend, sql, expected).await; // query 2: - let sql = "\ -SELECT \ - json_get_string(data, '$.commit.collection') AS event, \ - count() AS count, \ - count(DISTINCT json_get_string(data, '$.did')) AS users \ -FROM bluesky \ -WHERE \ - (json_get_string(data, '$.kind') = 'commit') AND \ - (json_get_string(data, '$.commit.operation') = 'create') \ -GROUP BY event \ + let sql = " +SELECT + json_get_string(data, '$.commit.collection') AS event, + count() AS count, + count(DISTINCT json_get_string(data, '$.did')) AS users +FROM bluesky +WHERE + (json_get_string(data, '$.kind') = 'commit') AND + (json_get_string(data, '$.commit.operation') = 'create') +GROUP BY event ORDER BY count DESC, event ASC"; let expected = r#" +-----------------------+-------+-------+ @@ -98,18 +197,18 @@ ORDER BY count DESC, event ASC"; execute_sql_and_expect(frontend, sql, expected).await; // query 3: - let sql = "\ -SELECT \ - json_get_string(data, '$.commit.collection') AS event, \ - date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, \ - count() AS count \ -FROM bluesky \ -WHERE \ - (json_get_string(data, '$.kind') = 'commit') AND \ - (json_get_string(data, '$.commit.operation') = 'create') AND \ - json_get_string(data, '$.commit.collection') IN \ - ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') \ -GROUP BY event, hour_of_day \ + let sql = " +SELECT + json_get_string(data, '$.commit.collection') AS event, + date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, + count() AS count +FROM bluesky +WHERE + (json_get_string(data, '$.kind') = 'commit') AND + (json_get_string(data, '$.commit.operation') = 'create') AND + json_get_string(data, '$.commit.collection') IN + ('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') +GROUP BY event, hour_of_day ORDER BY hour_of_day, event"; let expected = r#" +----------------------+-------------+-------+ @@ -122,7 +221,7 @@ ORDER BY hour_of_day, event"; execute_sql_and_expect(frontend, sql, expected).await; // query 4: - let sql = "\ + let sql = " SELECT json_get_string(data, '$.did') as user_id, min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts @@ -174,19 +273,23 @@ LIMIT 3"; Ok(()) } -async fn insert_data(frontend: &Arc) -> io::Result<()> { +async fn insert_data_by_sql(frontend: &Arc) -> io::Result<()> { let file = fs::File::open(find_workspace_path( "tests-integration/resources/jsonbench-head-10.ndjson", ))?; let reader = io::BufReader::new(file); - for (i, line) in reader.lines().enumerate() { + for line in reader.lines() { let line = line?; if line.is_empty() { continue; } + + let json: serde_json::Value = serde_json::from_str(&line)?; + let time_us = json.pointer("/time_us").and_then(|x| x.as_u64()).unwrap(); + let sql = format!( - "INSERT INTO bluesky (ts, data) VALUES ({}, '{}')", - i + 1, + "INSERT INTO bluesky (time_us, data) VALUES ({}, '{}')", + time_us, line.replace("'", "''"), // standard method to escape the single quote ); execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await; @@ -197,12 +300,12 @@ async fn insert_data(frontend: &Arc) -> io::Result<()> { async fn desc_table(frontend: &Arc) { let sql = "DESC TABLE bluesky"; let expected = r#" -+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| data | Json | | YES | | FIELD | -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#; ++---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| data | Json<{"_raw":"","commit.collection":"","commit.operation":"","did":"","kind":"","time_us":""}> | | YES | | FIELD | +| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP | ++---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#; execute_sql_and_expect(frontend, sql, expected).await; } @@ -219,7 +322,7 @@ CREATE TABLE bluesky ( time_us Bigint >, ), - ts Timestamp TIME INDEX, + time_us TimestampMicrosecond TIME INDEX, ) "#; execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await; diff --git a/tests/cases/standalone/common/types/json/json-structured.result b/tests/cases/standalone/common/types/json/json-structured.result index 0553831e90..be04e2652d 100644 --- a/tests/cases/standalone/common/types/json/json-structured.result +++ b/tests/cases/standalone/common/types/json/json-structured.result @@ -12,7 +12,7 @@ DESC TABLE t; | Column | Type | Key | Null | Default | Semantic Type | +--------+----------------------+-----+------+---------+---------------+ | ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json | | YES | | FIELD | +| j | Json<""> | | YES | | FIELD | +--------+----------------------+-----+------+---------+---------------+ INSERT INTO t VALUES @@ -24,12 +24,12 @@ Affected Rows: 3 DESC TABLE t; -+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json | | YES | | FIELD | -+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ ++--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json<{"int":"","list":[""],"nested":{"a":{"x":""},"b":{"y":""}}}> | | YES | | FIELD | ++--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ INSERT INTO t VALUES (1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'), @@ -39,12 +39,12 @@ Affected Rows: 2 DESC TABLE t; -+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json | | YES | | FIELD | -+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json<{"bool":"","int":"","list":[""],"nested":{"a":{"x":"","y":""},"b":{"x":"","y":""}}}> | | YES | | FIELD | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}'); @@ -52,12 +52,12 @@ Affected Rows: 1 DESC TABLE t; -+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| Column | Type | Key | Null | Default | Semantic Type | -+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ -| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | -| j | Json | | YES | | FIELD | -+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| Column | Type | Key | Null | Default | Semantic Type | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ +| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP | +| j | Json<{"bool":"","int":"","list":[""],"nested":{"a":{"x":"","y":""},"b":{"x":"","y":""}}}> | | YES | | FIELD | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+ INSERT INTO t VALUES (1762128011000, '{}');