mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: ingest jsonbench data through pipeline
Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -13076,6 +13076,7 @@ dependencies = [
|
||||
"prost 0.13.5",
|
||||
"query",
|
||||
"rand 0.9.1",
|
||||
"regex",
|
||||
"rstest",
|
||||
"rstest_reuse",
|
||||
"sea-query",
|
||||
|
||||
@@ -894,7 +894,7 @@ pub fn is_column_type_value_eq(
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
|
||||
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
|
||||
fn helper(json: JsonVariant) -> v1::JsonValue {
|
||||
let value = match json {
|
||||
JsonVariant::Null => None,
|
||||
|
||||
@@ -17,8 +17,8 @@ use std::collections::HashMap;
|
||||
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
|
||||
use datatypes::schema::{
|
||||
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
|
||||
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
|
||||
SkippingIndexType,
|
||||
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
|
||||
SkippingIndexOptions, SkippingIndexType,
|
||||
};
|
||||
use greptime_proto::v1::{
|
||||
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
|
||||
@@ -131,6 +131,31 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
|
||||
})
|
||||
}
|
||||
|
||||
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
|
||||
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
|
||||
let mut metadata = Metadata::default();
|
||||
let Some(ColumnOptions { options }) = column_options else {
|
||||
return metadata;
|
||||
};
|
||||
|
||||
if let Some(v) = options.get(FULLTEXT_GRPC_KEY) {
|
||||
metadata.insert(FULLTEXT_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(INVERTED_INDEX_GRPC_KEY) {
|
||||
metadata.insert(INVERTED_INDEX_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(SKIPPING_INDEX_GRPC_KEY) {
|
||||
metadata.insert(SKIPPING_INDEX_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(EXTENSION_TYPE_NAME_KEY) {
|
||||
metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(EXTENSION_TYPE_METADATA_KEY) {
|
||||
metadata.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), v.clone());
|
||||
}
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
|
||||
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
|
||||
let mut options = ColumnOptions::default();
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
|
||||
|
||||
impl Display for JsonNativeType {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
JsonNativeType::Null => write!(f, "Null"),
|
||||
JsonNativeType::Bool => write!(f, "Bool"),
|
||||
JsonNativeType::Number(t) => {
|
||||
write!(f, "Number({t:?})")
|
||||
}
|
||||
JsonNativeType::String => write!(f, "String"),
|
||||
JsonNativeType::Array(item_type) => {
|
||||
write!(f, "Array[{}]", item_type)
|
||||
}
|
||||
JsonNativeType::Object(object) => {
|
||||
write!(
|
||||
f,
|
||||
"Object{{{}}}",
|
||||
fn to_serde_value(t: &JsonNativeType) -> serde_json::Value {
|
||||
match t {
|
||||
JsonNativeType::Null => serde_json::Value::String("<Null>".to_string()),
|
||||
JsonNativeType::Bool => serde_json::Value::String("<Bool>".to_string()),
|
||||
JsonNativeType::Number(_) => serde_json::Value::String("<Number>".to_string()),
|
||||
JsonNativeType::String => serde_json::Value::String("<String>".to_string()),
|
||||
JsonNativeType::Array(item_type) => {
|
||||
serde_json::Value::Array(vec![to_serde_value(item_type)])
|
||||
}
|
||||
JsonNativeType::Object(object) => serde_json::Value::Object(
|
||||
object
|
||||
.iter()
|
||||
.map(|(k, v)| format!(r#""{k}": {v}"#))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
)
|
||||
.map(|(k, v)| (k.clone(), to_serde_value(v)))
|
||||
.collect(),
|
||||
),
|
||||
}
|
||||
}
|
||||
write!(f, "{}", to_serde_value(self))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,7 +179,11 @@ impl JsonType {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn native_type(&self) -> &JsonNativeType {
|
||||
pub fn is_native_type(&self) -> bool {
|
||||
matches!(self.format, JsonFormat::Native(_))
|
||||
}
|
||||
|
||||
pub fn native_type(&self) -> &JsonNativeType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => &JsonNativeType::String,
|
||||
JsonFormat::Native(x) => x.as_ref(),
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::column_def::options_from_skipping;
|
||||
use api::v1::region::{
|
||||
@@ -23,7 +24,7 @@ use api::v1::region::{
|
||||
};
|
||||
use api::v1::{
|
||||
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
|
||||
RowInsertRequest, RowInsertRequests, SemanticType,
|
||||
ModifyColumnType, ModifyColumnTypes, RowInsertRequest, RowInsertRequests, SemanticType,
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, OutputMeta};
|
||||
@@ -40,6 +41,7 @@ use common_query::Output;
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use common_telemetry::{error, info, warn};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::SkippingIndexOptions;
|
||||
use futures_util::future;
|
||||
use meter_macros::write_meter;
|
||||
@@ -67,8 +69,9 @@ use table::requests::{
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
|
||||
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
|
||||
CatalogSnafu, ColumnDataTypeSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu,
|
||||
FindRegionLeaderSnafu, InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use crate::expr_helper;
|
||||
use crate::region_req_factory::RegionRequestFactory;
|
||||
@@ -475,6 +478,7 @@ impl Inserter {
|
||||
/// Creates or alter tables on demand:
|
||||
/// - if table does not exist, create table by inferred CreateExpr
|
||||
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
|
||||
/// or align the json columns' datatypes with insert values.
|
||||
///
|
||||
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
|
||||
/// This mapping is used in the conversion of RowToRegion.
|
||||
@@ -559,6 +563,10 @@ impl Inserter {
|
||||
)? {
|
||||
alter_tables.push(alter_expr);
|
||||
}
|
||||
|
||||
if let Some(expr) = maybe_alter_json_column_type(ctx, &table, req)? {
|
||||
alter_tables.push(expr);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
let create_expr =
|
||||
@@ -981,6 +989,86 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
fn maybe_alter_json_column_type(
|
||||
query_context: &QueryContextRef,
|
||||
table: &TableRef,
|
||||
request: &RowInsertRequest,
|
||||
) -> Result<Option<AlterTableExpr>> {
|
||||
let Some(rows) = request.rows.as_ref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
// Fast path: skip altering json column type if insert request doesn't contain any json values.
|
||||
let row_schema = &rows.schema;
|
||||
if row_schema
|
||||
.iter()
|
||||
.all(|x| x.datatype() != ColumnDataType::Json)
|
||||
{
|
||||
return Ok(None);
|
||||
}
|
||||
let table_schema = table.schema_ref();
|
||||
let mut modify_column_types = vec![];
|
||||
|
||||
for value_schema in row_schema {
|
||||
if let Some(column_schema) = table_schema.column_schema_by_name(&value_schema.column_name)
|
||||
&& let Some(column_type) = column_schema.data_type.as_json()
|
||||
{
|
||||
let value_type: ConcreteDataType = ColumnDataTypeWrapper::new(
|
||||
value_schema.datatype(),
|
||||
value_schema.datatype_extension.clone(),
|
||||
)
|
||||
.into();
|
||||
let Some(value_type) = value_type.as_json() else {
|
||||
return InvalidInsertRequestSnafu {
|
||||
reason: format!(
|
||||
"expecting json value for json column '{}', but found type {}",
|
||||
column_schema.name, value_type
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
if column_type.is_include(value_type) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let merged = {
|
||||
let mut column_type = column_type.clone();
|
||||
column_type.merge(value_type).map_err(|e| {
|
||||
InvalidInsertRequestSnafu {
|
||||
reason: format!("insert json value is conflicting with column type: {e}"),
|
||||
}
|
||||
.build()
|
||||
})?;
|
||||
column_type
|
||||
};
|
||||
|
||||
let (target_type, target_type_extension) =
|
||||
ColumnDataTypeWrapper::try_from(ConcreteDataType::Json(merged))
|
||||
.map(|x| x.into_parts())
|
||||
.context(ColumnDataTypeSnafu)?;
|
||||
modify_column_types.push(ModifyColumnType {
|
||||
column_name: column_schema.name.clone(),
|
||||
target_type: target_type as i32,
|
||||
target_type_extension,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if modify_column_types.is_empty() {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(AlterTableExpr {
|
||||
catalog_name: query_context.current_catalog().to_string(),
|
||||
schema_name: query_context.current_schema(),
|
||||
table_name: table.table_info().name.clone(),
|
||||
kind: Some(Kind::ModifyColumnTypes(ModifyColumnTypes {
|
||||
modify_column_types,
|
||||
})),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
|
||||
for request in &requests.inserts {
|
||||
let rows = request.rows.as_ref().unwrap();
|
||||
|
||||
@@ -776,6 +776,20 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
GreptimeProto {
|
||||
source: api::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
Datatypes {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -893,6 +907,9 @@ impl ErrorExt for Error {
|
||||
FloatIsNan { .. }
|
||||
| InvalidEpochForResolution { .. }
|
||||
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
GreptimeProto { source, .. } => source.status_code(),
|
||||
Datatypes { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,13 +19,16 @@ use std::collections::{BTreeMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::helper::proto_value_type;
|
||||
use api::v1::column_data_type_extension::TypeExt;
|
||||
use api::helper::{ColumnDataTypeWrapper, encode_json_value};
|
||||
use api::v1::column_def::{collect_column_options, options_from_column_schema};
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use coerce::{coerce_columns, coerce_value};
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use common_telemetry::warn;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::json::JsonStructureSettings;
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
|
||||
use itertools::Itertools;
|
||||
use jsonb::Number;
|
||||
@@ -33,12 +36,14 @@ use once_cell::sync::OnceCell;
|
||||
use serde_json as serde_json_crate;
|
||||
use session::context::Channel;
|
||||
use snafu::OptionExt;
|
||||
use table::Table;
|
||||
use vrl::prelude::{Bytes, VrlValueConvert};
|
||||
use vrl::value::value::StdError;
|
||||
use vrl::value::{KeyString, Value as VrlValue};
|
||||
|
||||
use crate::error::{
|
||||
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
|
||||
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
|
||||
CoerceIncompatibleTypesSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
|
||||
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
|
||||
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
|
||||
};
|
||||
use crate::etl::PipelineDocVersion;
|
||||
@@ -269,15 +274,75 @@ impl GreptimeTransformer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ColumnMetadata {
|
||||
column_schema: datatypes::schema::ColumnSchema,
|
||||
semantic_type: SemanticType,
|
||||
}
|
||||
|
||||
impl From<ColumnSchema> for ColumnMetadata {
|
||||
fn from(value: ColumnSchema) -> Self {
|
||||
let datatype = value.datatype();
|
||||
let semantic_type = value.semantic_type();
|
||||
let ColumnSchema {
|
||||
column_name,
|
||||
datatype: _,
|
||||
semantic_type: _,
|
||||
datatype_extension,
|
||||
options,
|
||||
} = value;
|
||||
|
||||
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||
column_name,
|
||||
ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
|
||||
semantic_type != SemanticType::Timestamp,
|
||||
);
|
||||
|
||||
let metadata = collect_column_options(options.as_ref());
|
||||
let column_schema = column_schema.with_metadata(metadata);
|
||||
|
||||
Self {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnMetadata> for ColumnSchema {
|
||||
type Error = api::error::Error;
|
||||
|
||||
fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
|
||||
let ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
} = value;
|
||||
|
||||
let options = options_from_column_schema(&column_schema);
|
||||
|
||||
let (datatype, datatype_extension) =
|
||||
ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
|
||||
|
||||
Ok(ColumnSchema {
|
||||
column_name: column_schema.name,
|
||||
datatype: datatype as _,
|
||||
semantic_type: semantic_type as _,
|
||||
datatype_extension,
|
||||
options,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// This is used to record the current state schema information and a sequential cache of field names.
|
||||
/// As you traverse the user input JSON, this will change.
|
||||
/// It will record a superset of all user input schemas.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Default)]
|
||||
pub struct SchemaInfo {
|
||||
/// schema info
|
||||
pub schema: Vec<ColumnSchema>,
|
||||
pub schema: Vec<ColumnMetadata>,
|
||||
/// index of the column name
|
||||
pub index: HashMap<String, usize>,
|
||||
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
|
||||
table: Option<Arc<Table>>,
|
||||
}
|
||||
|
||||
impl SchemaInfo {
|
||||
@@ -285,6 +350,7 @@ impl SchemaInfo {
|
||||
Self {
|
||||
schema: Vec::with_capacity(capacity),
|
||||
index: HashMap::with_capacity(capacity),
|
||||
table: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,46 +360,91 @@ impl SchemaInfo {
|
||||
index.insert(schema.column_name.clone(), i);
|
||||
}
|
||||
Self {
|
||||
schema: schema_list,
|
||||
schema: schema_list.into_iter().map(Into::into).collect(),
|
||||
index,
|
||||
table: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_table(&mut self, table: Option<Arc<Table>>) {
|
||||
self.table = table;
|
||||
}
|
||||
|
||||
fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
|
||||
if let Some(table) = &self.table
|
||||
&& let Some(i) = table.schema_ref().column_index_by_name(column_name)
|
||||
{
|
||||
let column_schema = table.schema_ref().column_schemas()[i].clone();
|
||||
|
||||
let semantic_type = if column_schema.is_time_index() {
|
||||
SemanticType::Timestamp
|
||||
} else if table.table_info().meta.primary_key_indices.contains(&i) {
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field
|
||||
};
|
||||
|
||||
Some(ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
|
||||
self.schema
|
||||
.iter()
|
||||
.map(|x| x.clone().try_into())
|
||||
.collect::<api::error::Result<Vec<_>>>()
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_schema(
|
||||
index: Option<usize>,
|
||||
value_data: ValueData,
|
||||
column_schema: ColumnSchema,
|
||||
row: &mut Vec<GreptimeValue>,
|
||||
pipeline_context: &PipelineContext,
|
||||
column: &str,
|
||||
value_type: &ConcreteDataType,
|
||||
schema_info: &mut SchemaInfo,
|
||||
) -> Result<()> {
|
||||
if let Some(index) = index {
|
||||
let api_value = GreptimeValue {
|
||||
value_data: Some(value_data),
|
||||
};
|
||||
// Safety unwrap is fine here because api_value is always valid
|
||||
let value_column_data_type = proto_value_type(&api_value).unwrap();
|
||||
// Safety unwrap is fine here because index is always valid
|
||||
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
|
||||
if value_column_data_type != schema_column_data_type {
|
||||
IdentifyPipelineColumnTypeMismatchSnafu {
|
||||
column: column_schema.column_name,
|
||||
expected: schema_column_data_type.as_str_name(),
|
||||
actual: value_column_data_type.as_str_name(),
|
||||
let column_type = &mut schema_info.schema[index].column_schema.data_type;
|
||||
if value_type != column_type {
|
||||
if let ConcreteDataType::Json(value_type) = value_type
|
||||
&& let ConcreteDataType::Json(column_type) = column_type
|
||||
{
|
||||
if !column_type.is_include(value_type) {
|
||||
column_type.merge(value_type)?;
|
||||
}
|
||||
} else {
|
||||
return IdentifyPipelineColumnTypeMismatchSnafu {
|
||||
column,
|
||||
expected: column_type.to_string(),
|
||||
actual: value_type.to_string(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
.fail()
|
||||
} else {
|
||||
row[index] = api_value;
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
let key = column_schema.column_name.clone();
|
||||
let column_schema = schema_info
|
||||
.find_column_schema_in_table(column)
|
||||
.unwrap_or_else(|| {
|
||||
let semantic_type = decide_semantic(pipeline_context, column);
|
||||
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||
column,
|
||||
value_type.clone(),
|
||||
semantic_type != SemanticType::Timestamp,
|
||||
);
|
||||
ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
}
|
||||
});
|
||||
let key = column.to_string();
|
||||
schema_info.schema.push(column_schema);
|
||||
schema_info.index.insert(key, schema_info.schema.len() - 1);
|
||||
let api_value = GreptimeValue {
|
||||
value_data: Some(value_data),
|
||||
};
|
||||
row.push(api_value);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -411,11 +522,11 @@ pub(crate) fn values_to_row(
|
||||
Ok(Row { values: row })
|
||||
}
|
||||
|
||||
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
|
||||
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
|
||||
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
|
||||
SemanticType::Tag as i32
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field as i32
|
||||
SemanticType::Field
|
||||
}
|
||||
}
|
||||
|
||||
@@ -427,55 +538,54 @@ fn resolve_value(
|
||||
p_ctx: &PipelineContext,
|
||||
) -> Result<()> {
|
||||
let index = schema_info.index.get(&column_name).copied();
|
||||
let mut resolve_simple_type =
|
||||
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
|
||||
let semantic_type = decide_semantic(p_ctx, &column_name);
|
||||
resolve_schema(
|
||||
index,
|
||||
value_data,
|
||||
ColumnSchema {
|
||||
column_name,
|
||||
datatype: data_type as i32,
|
||||
semantic_type,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
row,
|
||||
schema_info,
|
||||
)
|
||||
};
|
||||
|
||||
match value {
|
||||
VrlValue::Null => {}
|
||||
let value_data = match value {
|
||||
VrlValue::Null => None,
|
||||
|
||||
VrlValue::Integer(v) => {
|
||||
// safe unwrap after type matched
|
||||
resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::int64_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::I64Value(v))
|
||||
}
|
||||
|
||||
VrlValue::Float(v) => {
|
||||
// safe unwrap after type matched
|
||||
resolve_simple_type(
|
||||
ValueData::F64Value(v.into()),
|
||||
column_name,
|
||||
ColumnDataType::Float64,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::float64_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::F64Value(v.into()))
|
||||
}
|
||||
|
||||
VrlValue::Boolean(v) => {
|
||||
resolve_simple_type(
|
||||
ValueData::BoolValue(v),
|
||||
column_name,
|
||||
ColumnDataType::Boolean,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::boolean_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::BoolValue(v))
|
||||
}
|
||||
|
||||
VrlValue::Bytes(v) => {
|
||||
resolve_simple_type(
|
||||
ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
|
||||
column_name,
|
||||
ColumnDataType::String,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::binary_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::BinaryValue(v.into()))
|
||||
}
|
||||
|
||||
VrlValue::Regex(v) => {
|
||||
@@ -483,42 +593,75 @@ fn resolve_value(
|
||||
"Persisting regex value in the table, this should not happen, column_name: {}",
|
||||
column_name
|
||||
);
|
||||
resolve_simple_type(
|
||||
ValueData::StringValue(v.to_string()),
|
||||
column_name,
|
||||
ColumnDataType::String,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::string_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::StringValue(v.to_string()))
|
||||
}
|
||||
|
||||
VrlValue::Timestamp(ts) => {
|
||||
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
|
||||
input: ts.to_rfc3339(),
|
||||
})?;
|
||||
resolve_simple_type(
|
||||
ValueData::TimestampNanosecondValue(ns),
|
||||
column_name,
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::TimestampNanosecondValue(ns))
|
||||
}
|
||||
|
||||
VrlValue::Array(_) | VrlValue::Object(_) => {
|
||||
let data = vrl_value_to_jsonb_value(&value);
|
||||
resolve_schema(
|
||||
index,
|
||||
ValueData::BinaryValue(data.to_vec()),
|
||||
ColumnSchema {
|
||||
column_name,
|
||||
datatype: ColumnDataType::Binary as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype_extension: Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
|
||||
}),
|
||||
options: None,
|
||||
},
|
||||
row,
|
||||
schema_info,
|
||||
)?;
|
||||
let is_json_native_type = schema_info
|
||||
.find_column_schema_in_table(&column_name)
|
||||
.is_some_and(|x| {
|
||||
if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
|
||||
column_type.is_native_type()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
let value = if is_json_native_type {
|
||||
let settings = JsonStructureSettings::Structured(None);
|
||||
let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
|
||||
CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
|
||||
})?;
|
||||
let value = settings.encode(value)?;
|
||||
|
||||
resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
|
||||
|
||||
let Value::Json(value) = value else {
|
||||
unreachable!()
|
||||
};
|
||||
ValueData::JsonValue(encode_json_value(*value))
|
||||
} else {
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::json_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
|
||||
let value = vrl_value_to_jsonb_value(&value);
|
||||
ValueData::BinaryValue(value.to_vec())
|
||||
};
|
||||
Some(value)
|
||||
}
|
||||
};
|
||||
|
||||
let value = GreptimeValue { value_data };
|
||||
if let Some(index) = index {
|
||||
row[index] = value;
|
||||
} else {
|
||||
row.push(value);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -556,20 +699,24 @@ fn identity_pipeline_inner(
|
||||
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
||||
|
||||
// set time index column schema first
|
||||
schema_info.schema.push(ColumnSchema {
|
||||
column_name: custom_ts
|
||||
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||
custom_ts
|
||||
.map(|ts| ts.get_column_name().to_string())
|
||||
.unwrap_or_else(|| greptime_timestamp().to_string()),
|
||||
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
|
||||
if pipeline_ctx.channel == Channel::Prometheus {
|
||||
ColumnDataType::TimestampMillisecond
|
||||
} else {
|
||||
ColumnDataType::TimestampNanosecond
|
||||
}
|
||||
}) as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
custom_ts
|
||||
.map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
|
||||
.unwrap_or_else(|| {
|
||||
if pipeline_ctx.channel == Channel::Prometheus {
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
} else {
|
||||
ConcreteDataType::timestamp_nanosecond_datatype()
|
||||
}
|
||||
}),
|
||||
false,
|
||||
);
|
||||
schema_info.schema.push(ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
});
|
||||
|
||||
let mut opt_map = HashMap::new();
|
||||
@@ -627,28 +774,29 @@ pub fn identity_pipeline(
|
||||
input.push(result);
|
||||
}
|
||||
|
||||
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
|
||||
identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
|
||||
if let Some(table) = table {
|
||||
let table_info = table.table_info();
|
||||
for tag_name in table_info.meta.row_key_column_names() {
|
||||
if let Some(index) = schema.index.get(tag_name) {
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opt_map
|
||||
let column_schemas = schema.column_schemas()?;
|
||||
Ok(opt_map
|
||||
.into_iter()
|
||||
.map(|(opt, rows)| {
|
||||
(
|
||||
opt,
|
||||
Rows {
|
||||
schema: schema.schema.clone(),
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<ContextOpt, Rows>>()
|
||||
.collect::<HashMap<ContextOpt, Rows>>())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -872,7 +1020,7 @@ mod tests {
|
||||
.map(|(mut schema, mut rows)| {
|
||||
for name in tag_column_names {
|
||||
if let Some(index) = schema.index.get(&name) {
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -880,7 +1028,7 @@ mod tests {
|
||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||
|
||||
Rows {
|
||||
schema: schema.schema,
|
||||
schema: schema.column_schemas().unwrap(),
|
||||
rows,
|
||||
}
|
||||
});
|
||||
|
||||
@@ -57,7 +57,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
}
|
||||
|
||||
Rows {
|
||||
schema: schema_info.schema.clone(),
|
||||
schema: schema_info.column_schemas().unwrap(),
|
||||
rows,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -651,6 +651,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
GreptimeProto {
|
||||
source: api::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -777,6 +784,8 @@ impl ErrorExt for Error {
|
||||
HandleOtelArrowRequest { .. } => StatusCode::Internal,
|
||||
|
||||
Cancelled { .. } => StatusCode::Cancelled,
|
||||
|
||||
GreptimeProto { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +31,7 @@ use axum_extra::TypedHeader;
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_query::{Output, OutputData};
|
||||
use common_telemetry::{debug, error, warn};
|
||||
use common_telemetry::{error, warn};
|
||||
use headers::ContentType;
|
||||
use lazy_static::lazy_static;
|
||||
use mime_guess::mime;
|
||||
@@ -738,11 +738,6 @@ pub async fn log_ingester(
|
||||
|
||||
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
|
||||
|
||||
debug!(
|
||||
"receiving logs: {:?}",
|
||||
serde_json::to_string(&value).unwrap()
|
||||
);
|
||||
|
||||
query_ctx.set_channel(Channel::Log);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
|
||||
|
||||
@@ -152,7 +152,7 @@ pub async fn loki_ingest(
|
||||
rows.push(row);
|
||||
}
|
||||
|
||||
let schemas = schema_info.schema;
|
||||
let schemas = schema_info.column_schemas()?;
|
||||
// fill Null for missing values
|
||||
for row in rows.iter_mut() {
|
||||
row.resize(schemas.len(), GreptimeValue::default());
|
||||
@@ -746,13 +746,16 @@ fn process_labels(
|
||||
} else {
|
||||
// not exist
|
||||
// add schema and append to values
|
||||
schemas.push(ColumnSchema {
|
||||
column_name: k.clone(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Tag.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
schemas.push(
|
||||
ColumnSchema {
|
||||
column_name: k.clone(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Tag.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
column_indexer.insert(k, schemas.len() - 1);
|
||||
|
||||
row.push(GreptimeValue {
|
||||
|
||||
@@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema(
|
||||
|
||||
if let Some(index) = select_schema.index.get(key) {
|
||||
let column_schema = &select_schema.schema[*index];
|
||||
let column_schema: ColumnSchema = column_schema.clone().try_into()?;
|
||||
// datatype of the same column name should be the same
|
||||
ensure!(
|
||||
column_schema.datatype == schema.datatype,
|
||||
@@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema(
|
||||
);
|
||||
extracted_values[*index] = value;
|
||||
} else {
|
||||
select_schema.schema.push(schema);
|
||||
select_schema.schema.push(schema.into());
|
||||
select_schema
|
||||
.index
|
||||
.insert(key.clone(), select_schema.schema.len() - 1);
|
||||
@@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows(
|
||||
let mut parse_ctx = ParseContext::new(select_info);
|
||||
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
|
||||
|
||||
schemas.extend(parse_ctx.select_schema.schema);
|
||||
schemas.extend(parse_ctx.select_schema.column_schemas()?);
|
||||
|
||||
rows.iter_mut().for_each(|row| {
|
||||
row.values.resize(schemas.len(), GreptimeValue::default());
|
||||
|
||||
@@ -136,12 +136,18 @@ async fn run_custom_pipeline(
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
schema_info
|
||||
.schema
|
||||
.push(time_index_column_schema(ts_name, timeunit));
|
||||
.push(time_index_column_schema(ts_name, timeunit).into());
|
||||
|
||||
schema_info
|
||||
}
|
||||
};
|
||||
|
||||
let table = handler
|
||||
.get_table(&table_name, query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
schema_info.set_table(table);
|
||||
|
||||
for pipeline_map in pipeline_maps {
|
||||
let result = pipeline
|
||||
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
|
||||
@@ -186,7 +192,7 @@ async fn run_custom_pipeline(
|
||||
RowInsertRequest {
|
||||
rows: Some(Rows {
|
||||
rows,
|
||||
schema: schema_info.schema.clone(),
|
||||
schema: schema_info.column_schemas()?,
|
||||
}),
|
||||
table_name,
|
||||
},
|
||||
|
||||
@@ -105,6 +105,7 @@ paste.workspace = true
|
||||
pipeline.workspace = true
|
||||
prost.workspace = true
|
||||
rand.workspace = true
|
||||
regex.workspace = true
|
||||
session = { workspace = true, features = ["testing"] }
|
||||
store-api.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
|
||||
@@ -42,6 +42,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequ
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use pipeline::GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME;
|
||||
use prost::Message;
|
||||
use regex::Regex;
|
||||
use serde_json::{Value, json};
|
||||
use servers::http::GreptimeQueryOutput;
|
||||
use servers::http::handler::HealthResponse;
|
||||
@@ -126,6 +127,7 @@ macro_rules! http_tests {
|
||||
test_pipeline_skip_error,
|
||||
test_pipeline_filter,
|
||||
test_pipeline_create_table,
|
||||
test_pipeline_ingest_jsonbench_data,
|
||||
|
||||
test_otlp_metrics_new,
|
||||
test_otlp_traces_v0,
|
||||
@@ -2701,6 +2703,95 @@ transform:
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_pipeline_ingest_jsonbench_data(store_type: StorageType) {
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_pipeline_ingest_jsonbench_data").await;
|
||||
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
// Create the pipeline for ingesting jsonbench data.
|
||||
let pipeline = r#"
|
||||
version: 2
|
||||
|
||||
processors:
|
||||
- json_parse:
|
||||
fields:
|
||||
- message, log
|
||||
ignore_missing: true
|
||||
- simple_extract:
|
||||
fields:
|
||||
- log, time_us
|
||||
key: "time_us"
|
||||
ignore_missing: false
|
||||
- epoch:
|
||||
fields:
|
||||
- time_us
|
||||
resolution: microsecond
|
||||
- select:
|
||||
fields:
|
||||
- time_us
|
||||
- log
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- time_us
|
||||
type: epoch, us
|
||||
index: timestamp
|
||||
"#;
|
||||
let response = client
|
||||
.post("/v1/pipelines/jsonbench")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(pipeline)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
let pattern =
|
||||
r#"\{"pipelines":\[\{"name":"jsonbench","version":"[^"]*"}],"execution_time_ms":\d+}"#
|
||||
.parse::<Regex>()
|
||||
.unwrap();
|
||||
assert!(pattern.is_match(&response.text().await));
|
||||
|
||||
// Create the table for storing jsonbench data.
|
||||
let sql = r#"
|
||||
CREATE TABLE jsonbench(time_us TimestampMicrosecond TIME INDEX, `log` Json())
|
||||
"#;
|
||||
let response = client
|
||||
.post("/v1/sql")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.body(format!("sql={sql}"))
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
let pattern = r#"\{"output":\[\{"affectedrows":0}],"execution_time_ms":\d+}"#
|
||||
.parse::<Regex>()
|
||||
.unwrap();
|
||||
assert!(pattern.is_match(&response.text().await));
|
||||
|
||||
// Start ingesting jsonbench data.
|
||||
// The input file only contains head 100 lines of the whole jsonbench test dataset.
|
||||
let path = common_test_util::find_workspace_path(
|
||||
"/tests-integration/resources/jsonbench-head-100.ndjson",
|
||||
);
|
||||
// Jsonbench data do contain some malformed jsons that are meant to skip inserting.
|
||||
let skip_error = true;
|
||||
let response = client
|
||||
.post(&format!(
|
||||
"/v1/ingest?table=jsonbench&pipeline_name=jsonbench&skip_error={skip_error}"
|
||||
))
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(std::fs::read(path).unwrap())
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
// Note that this patten also matches the inserted rows: "74".
|
||||
let pattern = r#"\{"output":\[\{"affectedrows":74}],"execution_time_ms":\d+}"#
|
||||
.parse::<Regex>()
|
||||
.unwrap();
|
||||
assert!(pattern.is_match(&response.text().await));
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_pipeline_dispatcher(storage_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
|
||||
Reference in New Issue
Block a user