Compare commits

...

1 Commits

Author SHA1 Message Date
luofucong
e06ee80057 feat: ingest jsonbench data through pipeline
Signed-off-by: luofucong <luofc@foxmail.com>
2025-11-28 20:07:26 +08:00
15 changed files with 540 additions and 155 deletions

1
Cargo.lock generated
View File

@@ -13076,6 +13076,7 @@ dependencies = [
"prost 0.13.5",
"query",
"rand 0.9.1",
"regex",
"rstest",
"rstest_reuse",
"sea-query",

View File

@@ -894,7 +894,7 @@ pub fn is_column_type_value_eq(
.unwrap_or(false)
}
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
fn helper(json: JsonVariant) -> v1::JsonValue {
let value = match json {
JsonVariant::Null => None,

View File

@@ -17,8 +17,8 @@ use std::collections::HashMap;
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
SkippingIndexOptions, SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -131,6 +131,31 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
})
}
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
let mut metadata = Metadata::default();
let Some(ColumnOptions { options }) = column_options else {
return metadata;
};
if let Some(v) = options.get(FULLTEXT_GRPC_KEY) {
metadata.insert(FULLTEXT_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(INVERTED_INDEX_GRPC_KEY) {
metadata.insert(INVERTED_INDEX_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(SKIPPING_INDEX_GRPC_KEY) {
metadata.insert(SKIPPING_INDEX_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(EXTENSION_TYPE_NAME_KEY) {
metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), v.clone());
}
if let Some(v) = options.get(EXTENSION_TYPE_METADATA_KEY) {
metadata.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), v.clone());
}
metadata
}
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
let mut options = ColumnOptions::default();

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
@@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
impl Display for JsonNativeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
JsonNativeType::Null => write!(f, "Null"),
JsonNativeType::Bool => write!(f, "Bool"),
JsonNativeType::Number(t) => {
write!(f, "Number({t:?})")
}
JsonNativeType::String => write!(f, "String"),
JsonNativeType::Array(item_type) => {
write!(f, "Array[{}]", item_type)
}
JsonNativeType::Object(object) => {
write!(
f,
"Object{{{}}}",
fn to_serde_value(t: &JsonNativeType) -> serde_json::Value {
match t {
JsonNativeType::Null => serde_json::Value::String("<Null>".to_string()),
JsonNativeType::Bool => serde_json::Value::String("<Bool>".to_string()),
JsonNativeType::Number(_) => serde_json::Value::String("<Number>".to_string()),
JsonNativeType::String => serde_json::Value::String("<String>".to_string()),
JsonNativeType::Array(item_type) => {
serde_json::Value::Array(vec![to_serde_value(item_type)])
}
JsonNativeType::Object(object) => serde_json::Value::Object(
object
.iter()
.map(|(k, v)| format!(r#""{k}": {v}"#))
.collect::<Vec<_>>()
.join(", ")
)
.map(|(k, v)| (k.clone(), to_serde_value(v)))
.collect(),
),
}
}
write!(f, "{}", to_serde_value(self))
}
}
@@ -183,7 +179,11 @@ impl JsonType {
}
}
pub(crate) fn native_type(&self) -> &JsonNativeType {
pub fn is_native_type(&self) -> bool {
matches!(self.format, JsonFormat::Native(_))
}
pub fn native_type(&self) -> &JsonNativeType {
match &self.format {
JsonFormat::Jsonb => &JsonNativeType::String,
JsonFormat::Native(x) => x.as_ref(),

View File

@@ -15,6 +15,7 @@
use std::sync::Arc;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use api::helper::ColumnDataTypeWrapper;
use api::v1::alter_table_expr::Kind;
use api::v1::column_def::options_from_skipping;
use api::v1::region::{
@@ -23,7 +24,7 @@ use api::v1::region::{
};
use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
ModifyColumnType, ModifyColumnTypes, RowInsertRequest, RowInsertRequests, SemanticType,
};
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
@@ -40,6 +41,7 @@ use common_query::Output;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{error, info, warn};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::SkippingIndexOptions;
use futures_util::future;
use meter_macros::write_meter;
@@ -67,8 +69,9 @@ use table::requests::{
use table::table_reference::TableReference;
use crate::error::{
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
CatalogSnafu, ColumnDataTypeSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu,
FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result,
TableNotFoundSnafu,
};
use crate::expr_helper;
use crate::region_req_factory::RegionRequestFactory;
@@ -475,6 +478,7 @@ impl Inserter {
/// Creates or alter tables on demand:
/// - if table does not exist, create table by inferred CreateExpr
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
/// or align the json columns' datatypes with insert values.
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
@@ -559,6 +563,10 @@ impl Inserter {
)? {
alter_tables.push(alter_expr);
}
if let Some(expr) = maybe_alter_json_column_type(ctx, &table, req)? {
alter_tables.push(expr);
}
}
None => {
let create_expr =
@@ -981,6 +989,86 @@ impl Inserter {
}
}
fn maybe_alter_json_column_type(
query_context: &QueryContextRef,
table: &TableRef,
request: &RowInsertRequest,
) -> Result<Option<AlterTableExpr>> {
let Some(rows) = request.rows.as_ref() else {
return Ok(None);
};
// Fast path: skip altering json column type if insert request doesn't contain any json values.
let row_schema = &rows.schema;
if row_schema
.iter()
.all(|x| x.datatype() != ColumnDataType::Json)
{
return Ok(None);
}
let table_schema = table.schema_ref();
let mut modify_column_types = vec![];
for value_schema in row_schema {
if let Some(column_schema) = table_schema.column_schema_by_name(&value_schema.column_name)
&& let Some(column_type) = column_schema.data_type.as_json()
{
let value_type: ConcreteDataType = ColumnDataTypeWrapper::new(
value_schema.datatype(),
value_schema.datatype_extension.clone(),
)
.into();
let Some(value_type) = value_type.as_json() else {
return InvalidInsertRequestSnafu {
reason: format!(
"expecting json value for json column '{}', but found type {}",
column_schema.name, value_type
),
}
.fail();
};
if column_type.is_include(value_type) {
continue;
}
let merged = {
let mut column_type = column_type.clone();
column_type.merge(value_type).map_err(|e| {
InvalidInsertRequestSnafu {
reason: format!("insert json value is conflicting with column type: {e}"),
}
.build()
})?;
column_type
};
let (target_type, target_type_extension) =
ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(merged))
.map(|x| x.into_parts())
.context(ColumnDataTypeSnafu)?;
modify_column_types.push(ModifyColumnType {
column_name: column_schema.name.clone(),
target_type: target_type as i32,
target_type_extension,
});
}
}
if modify_column_types.is_empty() {
Ok(None)
} else {
Ok(Some(AlterTableExpr {
catalog_name: query_context.current_catalog().to_string(),
schema_name: query_context.current_schema(),
table_name: table.table_info().name.clone(),
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
modify_column_types,
})),
}))
}
}
fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
for request in &requests.inserts {
let rows = request.rows.as_ref().unwrap();

View File

@@ -776,6 +776,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
GreptimeProto {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
Datatypes {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -893,6 +907,9 @@ impl ErrorExt for Error {
FloatIsNan { .. }
| InvalidEpochForResolution { .. }
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
GreptimeProto { source, .. } => source.status_code(),
Datatypes { source, .. } => source.status_code(),
}
}

View File

@@ -19,13 +19,16 @@ use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::helper::{ColumnDataTypeWrapper, encode_json_value};
use api::v1::column_def::{collect_column_options, options_from_column_schema};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use api::v1::{ColumnDataType, SemanticType};
use coerce::{coerce_columns, coerce_value};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::warn;
use datatypes::data_type::ConcreteDataType;
use datatypes::json::JsonStructureSettings;
use datatypes::value::Value;
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use jsonb::Number;
@@ -33,12 +36,14 @@ use once_cell::sync::OnceCell;
use serde_json as serde_json_crate;
use session::context::Channel;
use snafu::OptionExt;
use table::Table;
use vrl::prelude::{Bytes, VrlValueConvert};
use vrl::value::value::StdError;
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
CoerceIncompatibleTypesSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
};
use crate::etl::PipelineDocVersion;
@@ -269,15 +274,75 @@ impl GreptimeTransformer {
}
}
#[derive(Clone)]
pub struct ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema,
semantic_type: SemanticType,
}
impl From<ColumnSchema> for ColumnMetadata {
fn from(value: ColumnSchema) -> Self {
let datatype = value.datatype();
let semantic_type = value.semantic_type();
let ColumnSchema {
column_name,
datatype: _,
semantic_type: _,
datatype_extension,
options,
} = value;
let column_schema = datatypes::schema::ColumnSchema::new(
column_name,
ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
semantic_type != SemanticType::Timestamp,
);
let metadata = collect_column_options(options.as_ref());
let column_schema = column_schema.with_metadata(metadata);
Self {
column_schema,
semantic_type,
}
}
}
impl TryFrom<ColumnMetadata> for ColumnSchema {
type Error = api::error::Error;
fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
let ColumnMetadata {
column_schema,
semantic_type,
} = value;
let options = options_from_column_schema(&column_schema);
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
Ok(ColumnSchema {
column_name: column_schema.name,
datatype: datatype as _,
semantic_type: semantic_type as _,
datatype_extension,
options,
})
}
}
/// This is used to record the current state schema information and a sequential cache of field names.
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct SchemaInfo {
/// schema info
pub schema: Vec<ColumnSchema>,
pub schema: Vec<ColumnMetadata>,
/// index of the column name
pub index: HashMap<String, usize>,
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
table: Option<Arc<Table>>,
}
impl SchemaInfo {
@@ -285,6 +350,7 @@ impl SchemaInfo {
Self {
schema: Vec::with_capacity(capacity),
index: HashMap::with_capacity(capacity),
table: None,
}
}
@@ -294,46 +360,91 @@ impl SchemaInfo {
index.insert(schema.column_name.clone(), i);
}
Self {
schema: schema_list,
schema: schema_list.into_iter().map(Into::into).collect(),
index,
table: None,
}
}
pub fn set_table(&mut self, table: Option<Arc<Table>>) {
self.table = table;
}
fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
if let Some(table) = &self.table
&& let Some(i) = table.schema_ref().column_index_by_name(column_name)
{
let column_schema = table.schema_ref().column_schemas()[i].clone();
let semantic_type = if column_schema.is_time_index() {
SemanticType::Timestamp
} else if table.table_info().meta.primary_key_indices.contains(&i) {
SemanticType::Tag
} else {
SemanticType::Field
};
Some(ColumnMetadata {
column_schema,
semantic_type,
})
} else {
None
}
}
pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
self.schema
.iter()
.map(|x| x.clone().try_into())
.collect::<api::error::Result<Vec<_>>>()
}
}
fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
pipeline_context: &PipelineContext,
column: &str,
value_type: &ConcreteDataType,
schema_info: &mut SchemaInfo,
) -> Result<()> {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
// Safety unwrap is fine here because api_value is always valid
let value_column_data_type = proto_value_type(&api_value).unwrap();
// Safety unwrap is fine here because index is always valid
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type != schema_column_data_type {
IdentifyPipelineColumnTypeMismatchSnafu {
column: column_schema.column_name,
expected: schema_column_data_type.as_str_name(),
actual: value_column_data_type.as_str_name(),
let column_type = &mut schema_info.schema[index].column_schema.data_type;
if value_type != column_type {
if let ConcreteDataType::Json(value_type) = value_type
&& let ConcreteDataType::Json(column_type) = column_type
{
if !column_type.is_include(value_type) {
column_type.merge(value_type)?;
}
} else {
return IdentifyPipelineColumnTypeMismatchSnafu {
column,
expected: column_type.to_string(),
actual: value_type.to_string(),
}
.fail();
}
.fail()
} else {
row[index] = api_value;
Ok(())
}
Ok(())
} else {
let key = column_schema.column_name.clone();
let column_schema = schema_info
.find_column_schema_in_table(column)
.unwrap_or_else(|| {
let semantic_type = decide_semantic(pipeline_context, column);
let column_schema = datatypes::schema::ColumnSchema::new(
column,
value_type.clone(),
semantic_type != SemanticType::Timestamp,
);
ColumnMetadata {
column_schema,
semantic_type,
}
});
let key = column.to_string();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
Ok(())
}
}
@@ -411,11 +522,11 @@ pub(crate) fn values_to_row(
Ok(Row { values: row })
}
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
SemanticType::Tag as i32
SemanticType::Tag
} else {
SemanticType::Field as i32
SemanticType::Field
}
}
@@ -427,55 +538,54 @@ fn resolve_value(
p_ctx: &PipelineContext,
) -> Result<()> {
let index = schema_info.index.get(&column_name).copied();
let mut resolve_simple_type =
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
let semantic_type = decide_semantic(p_ctx, &column_name);
resolve_schema(
index,
value_data,
ColumnSchema {
column_name,
datatype: data_type as i32,
semantic_type,
datatype_extension: None,
options: None,
},
row,
schema_info,
)
};
match value {
VrlValue::Null => {}
let value_data = match value {
VrlValue::Null => None,
VrlValue::Integer(v) => {
// safe unwrap after type matched
resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::int64_datatype(),
schema_info,
)?;
Some(ValueData::I64Value(v))
}
VrlValue::Float(v) => {
// safe unwrap after type matched
resolve_simple_type(
ValueData::F64Value(v.into()),
column_name,
ColumnDataType::Float64,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::float64_datatype(),
schema_info,
)?;
Some(ValueData::F64Value(v.into()))
}
VrlValue::Boolean(v) => {
resolve_simple_type(
ValueData::BoolValue(v),
column_name,
ColumnDataType::Boolean,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::boolean_datatype(),
schema_info,
)?;
Some(ValueData::BoolValue(v))
}
VrlValue::Bytes(v) => {
resolve_simple_type(
ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
column_name,
ColumnDataType::String,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::binary_datatype(),
schema_info,
)?;
Some(ValueData::BinaryValue(v.into()))
}
VrlValue::Regex(v) => {
@@ -483,42 +593,75 @@ fn resolve_value(
"Persisting regex value in the table, this should not happen, column_name: {}",
column_name
);
resolve_simple_type(
ValueData::StringValue(v.to_string()),
column_name,
ColumnDataType::String,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::string_datatype(),
schema_info,
)?;
Some(ValueData::StringValue(v.to_string()))
}
VrlValue::Timestamp(ts) => {
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
input: ts.to_rfc3339(),
})?;
resolve_simple_type(
ValueData::TimestampNanosecondValue(ns),
column_name,
ColumnDataType::TimestampNanosecond,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::timestamp_nanosecond_datatype(),
schema_info,
)?;
Some(ValueData::TimestampNanosecondValue(ns))
}
VrlValue::Array(_) | VrlValue::Object(_) => {
let data = vrl_value_to_jsonb_value(&value);
resolve_schema(
index,
ValueData::BinaryValue(data.to_vec()),
ColumnSchema {
column_name,
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
options: None,
},
row,
schema_info,
)?;
let is_json_native_type = schema_info
.find_column_schema_in_table(&column_name)
.is_some_and(|x| {
if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
column_type.is_native_type()
} else {
false
}
});
let value = if is_json_native_type {
let settings = JsonStructureSettings::Structured(None);
let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
})?;
let value = settings.encode(value)?;
resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
let Value::Json(value) = value else {
unreachable!()
};
ValueData::JsonValue(encode_json_value(*value))
} else {
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::json_datatype(),
schema_info,
)?;
let value = vrl_value_to_jsonb_value(&value);
ValueData::BinaryValue(value.to_vec())
};
Some(value)
}
};
let value = GreptimeValue { value_data };
if let Some(index) = index {
row[index] = value;
} else {
row.push(value);
}
Ok(())
}
@@ -556,20 +699,24 @@ fn identity_pipeline_inner(
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
// set time index column schema first
schema_info.schema.push(ColumnSchema {
column_name: custom_ts
let column_schema = datatypes::schema::ColumnSchema::new(
custom_ts
.map(|ts| ts.get_column_name().to_string())
.unwrap_or_else(|| greptime_timestamp().to_string()),
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ColumnDataType::TimestampMillisecond
} else {
ColumnDataType::TimestampNanosecond
}
}) as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
custom_ts
.map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
.unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ConcreteDataType::timestamp_millisecond_datatype()
} else {
ConcreteDataType::timestamp_nanosecond_datatype()
}
}),
false,
);
schema_info.schema.push(ColumnMetadata {
column_schema,
semantic_type: SemanticType::Timestamp,
});
let mut opt_map = HashMap::new();
@@ -627,28 +774,29 @@ pub fn identity_pipeline(
input.push(result);
}
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
if let Some(table) = table {
let table_info = table.table_info();
for tag_name in table_info.meta.row_key_column_names() {
if let Some(index) = schema.index.get(tag_name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
schema.schema[*index].semantic_type = SemanticType::Tag;
}
}
}
opt_map
let column_schemas = schema.column_schemas()?;
Ok(opt_map
.into_iter()
.map(|(opt, rows)| {
(
opt,
Rows {
schema: schema.schema.clone(),
schema: column_schemas.clone(),
rows,
},
)
})
.collect::<HashMap<ContextOpt, Rows>>()
.collect::<HashMap<ContextOpt, Rows>>())
})
}
@@ -872,7 +1020,7 @@ mod tests {
.map(|(mut schema, mut rows)| {
for name in tag_column_names {
if let Some(index) = schema.index.get(&name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
schema.schema[*index].semantic_type = SemanticType::Tag;
}
}
@@ -880,7 +1028,7 @@ mod tests {
let rows = rows.remove(&ContextOpt::default()).unwrap();
Rows {
schema: schema.schema,
schema: schema.column_schemas().unwrap(),
rows,
}
});

View File

@@ -57,7 +57,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
}
Rows {
schema: schema_info.schema.clone(),
schema: schema_info.column_schemas().unwrap(),
rows,
}
}

View File

@@ -651,6 +651,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
GreptimeProto {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -777,6 +784,8 @@ impl ErrorExt for Error {
HandleOtelArrowRequest { .. } => StatusCode::Internal,
Cancelled { .. } => StatusCode::Cancelled,
GreptimeProto { source, .. } => source.status_code(),
}
}

View File

@@ -31,7 +31,7 @@ use axum_extra::TypedHeader;
use common_catalog::consts::default_engine;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::{Output, OutputData};
use common_telemetry::{debug, error, warn};
use common_telemetry::{error, warn};
use headers::ContentType;
use lazy_static::lazy_static;
use mime_guess::mime;
@@ -738,11 +738,6 @@ pub async fn log_ingester(
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
debug!(
"receiving logs: {:?}",
serde_json::to_string(&value).unwrap()
);
query_ctx.set_channel(Channel::Log);
let query_ctx = Arc::new(query_ctx);

View File

@@ -152,7 +152,7 @@ pub async fn loki_ingest(
rows.push(row);
}
let schemas = schema_info.schema;
let schemas = schema_info.column_schemas()?;
// fill Null for missing values
for row in rows.iter_mut() {
row.resize(schemas.len(), GreptimeValue::default());
@@ -746,13 +746,16 @@ fn process_labels(
} else {
// not exist
// add schema and append to values
schemas.push(ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
});
schemas.push(
ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
}
.into(),
);
column_indexer.insert(k, schemas.len() - 1);
row.push(GreptimeValue {

View File

@@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema(
if let Some(index) = select_schema.index.get(key) {
let column_schema = &select_schema.schema[*index];
let column_schema: ColumnSchema = column_schema.clone().try_into()?;
// datatype of the same column name should be the same
ensure!(
column_schema.datatype == schema.datatype,
@@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema(
);
extracted_values[*index] = value;
} else {
select_schema.schema.push(schema);
select_schema.schema.push(schema.into());
select_schema
.index
.insert(key.clone(), select_schema.schema.len() - 1);
@@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows(
let mut parse_ctx = ParseContext::new(select_info);
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
schemas.extend(parse_ctx.select_schema.schema);
schemas.extend(parse_ctx.select_schema.column_schemas()?);
rows.iter_mut().for_each(|row| {
row.values.resize(schemas.len(), GreptimeValue::default());

View File

@@ -136,12 +136,18 @@ async fn run_custom_pipeline(
let mut schema_info = SchemaInfo::default();
schema_info
.schema
.push(time_index_column_schema(ts_name, timeunit));
.push(time_index_column_schema(ts_name, timeunit).into());
schema_info
}
};
let table = handler
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?;
schema_info.set_table(table);
for pipeline_map in pipeline_maps {
let result = pipeline
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
@@ -186,7 +192,7 @@ async fn run_custom_pipeline(
RowInsertRequest {
rows: Some(Rows {
rows,
schema: schema_info.schema.clone(),
schema: schema_info.column_schemas()?,
}),
table_name,
},

View File

@@ -105,6 +105,7 @@ paste.workspace = true
pipeline.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres = { workspace = true }

View File

@@ -42,6 +42,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
use prost::Message;
use regex::Regex;
use serde_json::{Value, json};
use servers::http::GreptimeQueryOutput;
use servers::http::handler::HealthResponse;
@@ -126,6 +127,7 @@ macro_rules! http_tests {
test_pipeline_skip_error,
test_pipeline_filter,
test_pipeline_create_table,
test_pipeline_ingest_jsonbench_data,
test_otlp_metrics_new,
test_otlp_traces_v0,
@@ -2701,6 +2703,95 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_ingest_jsonbench_data(store_type: StorageType) {
let (app, mut guard) =
setup_test_http_app_with_frontend(store_type, "test_pipeline_ingest_jsonbench_data").await;
let client = TestClient::new(app).await;
// Create the pipeline for ingesting jsonbench data.
let pipeline = r#"
version: 2
processors:
- json_parse:
fields:
- message, log
ignore_missing: true
- simple_extract:
fields:
- log, time_us
key: "time_us"
ignore_missing: false
- epoch:
fields:
- time_us
resolution: microsecond
- select:
fields:
- time_us
- log
transform:
- fields:
- time_us
type: epoch, us
index: timestamp
"#;
let response = client
.post("/v1/pipelines/jsonbench")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
let pattern =
r#"\{"pipelines":\[\{"name":"jsonbench","version":"[^"]*"}],"execution_time_ms":\d+}"#
.parse::<Regex>()
.unwrap();
assert!(pattern.is_match(&response.text().await));
// Create the table for storing jsonbench data.
let sql = r#"
CREATE TABLE jsonbench(time_us TimestampMicrosecond TIME INDEX, `log` Json())
"#;
let response = client
.post("/v1/sql")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(format!("sql={sql}"))
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
let pattern = r#"\{"output":\[\{"affectedrows":0}],"execution_time_ms":\d+}"#
.parse::<Regex>()
.unwrap();
assert!(pattern.is_match(&response.text().await));
// Start ingesting jsonbench data.
// The input file only contains head 100 lines of the whole jsonbench test dataset.
let path = common_test_util::find_workspace_path(
"/tests-integration/resources/jsonbench-head-100.ndjson",
);
// Jsonbench data do contain some malformed jsons that are meant to skip inserting.
let skip_error = true;
let response = client
.post(&format!(
"/v1/ingest?table=jsonbench&pipeline_name=jsonbench&skip_error={skip_error}"
))
.header("Content-Type", "text/plain")
.body(std::fs::read(path).unwrap())
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
// Note that this patten also matches the inserted rows: "74".
let pattern = r#"\{"output":\[\{"affectedrows":74}],"execution_time_ms":\d+}"#
.parse::<Regex>()
.unwrap();
assert!(pattern.is_match(&response.text().await));
guard.remove_all().await;
}
pub async fn test_pipeline_dispatcher(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =