Compare commits

...

3 Commits

Author SHA1 Message Date
luofucong
5befc62b80 resolve PR comments
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-31 17:42:06 +08:00
luofucong
e3e7fd13a2 fix ci
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-31 17:42:06 +08:00
luofucong
2845c8c2ba feat: ingest jsonbench data through pipeline
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-30 20:56:21 +08:00
25 changed files with 600 additions and 260 deletions

1
Cargo.lock generated
View File

@@ -9474,6 +9474,7 @@ dependencies = [
"ahash 0.8.12",
"api",
"arrow",
"arrow-schema",
"async-trait",
"catalog",
"chrono",

View File

@@ -895,7 +895,7 @@ pub fn is_column_type_value_eq(
.unwrap_or(false)
}
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
fn helper(json: JsonVariant) -> v1::JsonValue {
let value = match json {
JsonVariant::Null => None,

View File

@@ -17,8 +17,8 @@ use std::collections::HashMap;
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
use datatypes::schema::{
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
SkippingIndexType,
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
SkippingIndexOptions, SkippingIndexType,
};
use greptime_proto::v1::{
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
@@ -36,6 +36,14 @@ const INVERTED_INDEX_GRPC_KEY: &str = "inverted_index";
/// Key used to store skip index options in gRPC column options.
const SKIPPING_INDEX_GRPC_KEY: &str = "skipping_index";
const COLUMN_OPTION_MAPPINGS: [(&str, &str); 5] = [
(FULLTEXT_GRPC_KEY, FULLTEXT_KEY),
(INVERTED_INDEX_GRPC_KEY, INVERTED_INDEX_KEY),
(SKIPPING_INDEX_GRPC_KEY, SKIPPING_INDEX_KEY),
(EXTENSION_TYPE_NAME_KEY, EXTENSION_TYPE_NAME_KEY),
(EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_METADATA_KEY),
];
/// Tries to construct a `ColumnSchema` from the given `ColumnDef`.
pub fn try_as_column_schema(column_def: &ColumnDef) -> Result<ColumnSchema> {
let data_type = ColumnDataTypeWrapper::try_new(
@@ -131,6 +139,21 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
})
}
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
let Some(ColumnOptions { options }) = column_options else {
return Metadata::default();
};
let mut metadata = Metadata::with_capacity(options.len());
for (x, y) in COLUMN_OPTION_MAPPINGS {
if let Some(v) = options.get(x) {
metadata.insert(y.to_string(), v.clone());
}
}
metadata
}
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
let mut options = ColumnOptions::default();

View File

@@ -816,7 +816,7 @@ mod tests {
let result = encode_by_struct(&json_struct, json);
assert_eq!(
result.unwrap_err().to_string(),
"Cannot cast value bar to Number(I64)"
r#"Cannot cast value bar to "<Number>""#
);
let json = json!({

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
@@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
impl Display for JsonNativeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
JsonNativeType::Null => write!(f, "Null"),
JsonNativeType::Bool => write!(f, "Bool"),
JsonNativeType::Number(t) => {
write!(f, "Number({t:?})")
}
JsonNativeType::String => write!(f, "String"),
JsonNativeType::Array(item_type) => {
write!(f, "Array[{}]", item_type)
}
JsonNativeType::Object(object) => {
write!(
f,
"Object{{{}}}",
fn to_serde_value(t: &JsonNativeType) -> serde_json::Value {
match t {
JsonNativeType::Null => serde_json::Value::String("<Null>".to_string()),
JsonNativeType::Bool => serde_json::Value::String("<Bool>".to_string()),
JsonNativeType::Number(_) => serde_json::Value::String("<Number>".to_string()),
JsonNativeType::String => serde_json::Value::String("<String>".to_string()),
JsonNativeType::Array(item_type) => {
serde_json::Value::Array(vec![to_serde_value(item_type)])
}
JsonNativeType::Object(object) => serde_json::Value::Object(
object
.iter()
.map(|(k, v)| format!(r#""{k}": {v}"#))
.collect::<Vec<_>>()
.join(", ")
)
.map(|(k, v)| (k.clone(), to_serde_value(v)))
.collect(),
),
}
}
write!(f, "{}", to_serde_value(self))
}
}
@@ -183,7 +179,11 @@ impl JsonType {
}
}
pub(crate) fn native_type(&self) -> &JsonNativeType {
pub fn is_native_type(&self) -> bool {
matches!(self.format, JsonFormat::Native(_))
}
pub fn native_type(&self) -> &JsonNativeType {
match &self.format {
JsonFormat::Jsonb => &JsonNativeType::String,
JsonFormat::Native(x) => x.as_ref(),
@@ -650,15 +650,16 @@ mod tests {
"list": [1, 2, 3],
"object": {"a": 1}
}"#;
let expected = r#"Json<Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
let expected =
r#"Json<{"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
test(json, json_type, Ok(expected))?;
// cannot merge with other non-object json values:
let jsons = [r#""s""#, "1", "[1]"];
let expects = [
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<String>""#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<Number>""#,
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: ["<Number>"]"#,
];
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
test(json, json_type, Err(expect))?;
@@ -670,7 +671,7 @@ mod tests {
"float": 0.123,
"no": 42
}"#;
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#;
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: "<String>", that: "<Number>""#;
test(json, json_type, Err(expected))?;
// can merge with another json object:
@@ -679,7 +680,7 @@ mod tests {
"float": 0.123,
"int": 42
}"#;
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
test(json, json_type, Ok(expected))?;
// can merge with some complex nested json object:
@@ -689,7 +690,7 @@ mod tests {
"float": 0.456,
"int": 0
}"#;
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64), "foo": String, "l": Array[String], "o": Object{"key": String}}}>"#;
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>","foo":"<String>","l":["<String>"],"o":{"key":"<String>"}}}>"#;
test(json, json_type, Ok(expected))?;
Ok(())

View File

@@ -321,10 +321,10 @@ mod tests {
Ok(()),
Ok(()),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String",
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: "<String>""#,
),
Err(
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: ["<Bool>"]"#,
),
];
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
@@ -396,12 +396,12 @@ mod tests {
// test children builders:
assert_eq!(builder.builders.len(), 6);
let expect_types = [
r#"Json<Object{"list": Array[Number(I64)], "s": String}>"#,
r#"Json<Object{"float": Number(F64), "s": String}>"#,
r#"Json<Object{"float": Number(F64), "int": Number(I64)}>"#,
r#"Json<Object{"int": Number(I64), "object": Object{"hello": String, "timestamp": Number(I64)}}>"#,
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"b": Object{"a": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"a": Object{"b": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
r#"Json<{"list":["<Number>"],"s":"<String>"}>"#,
r#"Json<{"float":"<Number>","s":"<String>"}>"#,
r#"Json<{"float":"<Number>","int":"<Number>"}>"#,
r#"Json<{"int":"<Number>","object":{"hello":"<String>","timestamp":"<Number>"}}>"#,
r#"Json<{"nested":{"a":{"b":{"b":{"a":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
r#"Json<{"nested":{"a":{"b":{"a":{"b":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
];
let expect_vectors = [
r#"
@@ -456,7 +456,7 @@ mod tests {
}
// test final merged json type:
let expected = r#"Json<Object{"float": Number(F64), "int": Number(I64), "list": Array[Number(I64)], "nested": Object{"a": Object{"b": Object{"a": Object{"b": String}, "b": Object{"a": String}}}}, "object": Object{"hello": String, "timestamp": Number(I64)}, "s": String}>"#;
let expected = r#"Json<{"float":"<Number>","int":"<Number>","list":["<Number>"],"nested":{"a":{"b":{"a":{"b":"<String>"},"b":{"a":"<String>"}}}},"object":{"hello":"<String>","timestamp":"<Number>"},"s":"<String>"}>"#;
assert_eq!(builder.data_type().to_string(), expected);
// test final produced vector:

View File

@@ -13,6 +13,7 @@ workspace = true
ahash.workspace = true
api.workspace = true
arrow.workspace = true
arrow-schema.workspace = true
async-trait.workspace = true
catalog.workspace = true
chrono.workspace = true

View File

@@ -800,6 +800,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
GreptimeProto {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
Datatypes {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -920,6 +934,9 @@ impl ErrorExt for Error {
FloatIsNan { .. }
| InvalidEpochForResolution { .. }
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
GreptimeProto { source, .. } => source.status_code(),
Datatypes { source, .. } => source.status_code(),
}
}

View File

@@ -19,13 +19,17 @@ use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use ahash::{HashMap, HashMapExt};
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::helper::{ColumnDataTypeWrapper, encode_json_value};
use api::v1::column_def::{collect_column_options, options_from_column_schema};
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use api::v1::{ColumnDataType, SemanticType};
use arrow_schema::extension::ExtensionType;
use coerce::{coerce_columns, coerce_value};
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_telemetry::warn;
use datatypes::data_type::ConcreteDataType;
use datatypes::extension::json::JsonExtensionType;
use datatypes::value::Value;
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use jsonb::Number;
@@ -33,12 +37,15 @@ use once_cell::sync::OnceCell;
use serde_json as serde_json_crate;
use session::context::Channel;
use snafu::OptionExt;
use table::Table;
use vrl::prelude::{Bytes, VrlValueConvert};
use vrl::value::value::StdError;
use vrl::value::{KeyString, Value as VrlValue};
use crate::error::{
ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
};
use crate::etl::PipelineDocVersion;
@@ -272,15 +279,75 @@ impl GreptimeTransformer {
}
}
#[derive(Clone)]
pub struct ColumnMetadata {
column_schema: datatypes::schema::ColumnSchema,
semantic_type: SemanticType,
}
impl From<ColumnSchema> for ColumnMetadata {
fn from(value: ColumnSchema) -> Self {
let datatype = value.datatype();
let semantic_type = value.semantic_type();
let ColumnSchema {
column_name,
datatype: _,
semantic_type: _,
datatype_extension,
options,
} = value;
let column_schema = datatypes::schema::ColumnSchema::new(
column_name,
ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
semantic_type != SemanticType::Timestamp,
);
let metadata = collect_column_options(options.as_ref());
let column_schema = column_schema.with_metadata(metadata);
Self {
column_schema,
semantic_type,
}
}
}
impl TryFrom<ColumnMetadata> for ColumnSchema {
type Error = api::error::Error;
fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
let ColumnMetadata {
column_schema,
semantic_type,
} = value;
let options = options_from_column_schema(&column_schema);
let (datatype, datatype_extension) =
ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
Ok(ColumnSchema {
column_name: column_schema.name,
datatype: datatype as _,
semantic_type: semantic_type as _,
datatype_extension,
options,
})
}
}
/// This is used to record the current state schema information and a sequential cache of field names.
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
#[derive(Default)]
pub struct SchemaInfo {
/// schema info
pub schema: Vec<ColumnSchema>,
pub schema: Vec<ColumnMetadata>,
/// index of the column name
pub index: HashMap<String, usize>,
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
table: Option<Arc<Table>>,
}
impl SchemaInfo {
@@ -288,6 +355,7 @@ impl SchemaInfo {
Self {
schema: Vec::with_capacity(capacity),
index: HashMap::with_capacity(capacity),
table: None,
}
}
@@ -297,46 +365,88 @@ impl SchemaInfo {
index.insert(schema.column_name.clone(), i);
}
Self {
schema: schema_list,
schema: schema_list.into_iter().map(Into::into).collect(),
index,
table: None,
}
}
pub fn set_table(&mut self, table: Option<Arc<Table>>) {
self.table = table;
}
fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
if let Some(table) = &self.table
&& let Some(i) = table.schema_ref().column_index_by_name(column_name)
{
let column_schema = table.schema_ref().column_schemas()[i].clone();
let semantic_type = if column_schema.is_time_index() {
SemanticType::Timestamp
} else if table.table_info().meta.primary_key_indices.contains(&i) {
SemanticType::Tag
} else {
SemanticType::Field
};
Some(ColumnMetadata {
column_schema,
semantic_type,
})
} else {
None
}
}
pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
self.schema
.iter()
.map(|x| x.clone().try_into())
.collect::<api::error::Result<Vec<_>>>()
}
}
fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
pipeline_context: &PipelineContext,
column: &str,
value_type: &ConcreteDataType,
schema_info: &mut SchemaInfo,
) -> Result<()> {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
// Safety unwrap is fine here because api_value is always valid
let value_column_data_type = proto_value_type(&api_value).unwrap();
// Safety unwrap is fine here because index is always valid
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type != schema_column_data_type {
IdentifyPipelineColumnTypeMismatchSnafu {
column: column_schema.column_name,
expected: schema_column_data_type.as_str_name(),
actual: value_column_data_type.as_str_name(),
let column_type = &mut schema_info.schema[index].column_schema.data_type;
match (column_type, value_type) {
(ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
if column_type.is_include(value_type) =>
{
Ok(())
}
.fail()
} else {
row[index] = api_value;
Ok(())
(column_type, value_type) if column_type == value_type => Ok(()),
(column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
column,
expected: column_type.to_string(),
actual: value_type.to_string(),
}
.fail(),
}
} else {
let key = column_schema.column_name.clone();
let column_schema = schema_info
.find_column_schema_in_table(column)
.unwrap_or_else(|| {
let semantic_type = decide_semantic(pipeline_context, column);
let column_schema = datatypes::schema::ColumnSchema::new(
column,
value_type.clone(),
semantic_type != SemanticType::Timestamp,
);
ColumnMetadata {
column_schema,
semantic_type,
}
});
let key = column.to_string();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
Ok(())
}
}
@@ -481,11 +591,11 @@ pub(crate) fn values_to_row(
Ok(Row { values: row })
}
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
SemanticType::Tag as i32
SemanticType::Tag
} else {
SemanticType::Field as i32
SemanticType::Field
}
}
@@ -497,55 +607,56 @@ fn resolve_value(
p_ctx: &PipelineContext,
) -> Result<()> {
let index = schema_info.index.get(&column_name).copied();
let mut resolve_simple_type =
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
let semantic_type = decide_semantic(p_ctx, &column_name);
resolve_schema(
index,
value_data,
ColumnSchema {
column_name,
datatype: data_type as i32,
semantic_type,
datatype_extension: None,
options: None,
},
row,
schema_info,
)
};
match value {
VrlValue::Null => {}
let value_data = match value {
VrlValue::Null => None,
VrlValue::Integer(v) => {
// safe unwrap after type matched
resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::int64_datatype(),
schema_info,
)?;
Some(ValueData::I64Value(v))
}
VrlValue::Float(v) => {
// safe unwrap after type matched
resolve_simple_type(
ValueData::F64Value(v.into()),
column_name,
ColumnDataType::Float64,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::float64_datatype(),
schema_info,
)?;
Some(ValueData::F64Value(v.into()))
}
VrlValue::Boolean(v) => {
resolve_simple_type(
ValueData::BoolValue(v),
column_name,
ColumnDataType::Boolean,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::boolean_datatype(),
schema_info,
)?;
Some(ValueData::BoolValue(v))
}
VrlValue::Bytes(v) => {
resolve_simple_type(
ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
column_name,
ColumnDataType::String,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::string_datatype(),
schema_info,
)?;
Some(ValueData::StringValue(String::from_utf8_lossy_owned(
v.to_vec(),
)))
}
VrlValue::Regex(v) => {
@@ -553,42 +664,83 @@ fn resolve_value(
"Persisting regex value in the table, this should not happen, column_name: {}",
column_name
);
resolve_simple_type(
ValueData::StringValue(v.to_string()),
column_name,
ColumnDataType::String,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::string_datatype(),
schema_info,
)?;
Some(ValueData::StringValue(v.to_string()))
}
VrlValue::Timestamp(ts) => {
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
input: ts.to_rfc3339(),
})?;
resolve_simple_type(
ValueData::TimestampNanosecondValue(ns),
column_name,
ColumnDataType::TimestampNanosecond,
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::timestamp_nanosecond_datatype(),
schema_info,
)?;
Some(ValueData::TimestampNanosecondValue(ns))
}
VrlValue::Array(_) | VrlValue::Object(_) => {
let data = vrl_value_to_jsonb_value(&value);
resolve_schema(
index,
ValueData::BinaryValue(data.to_vec()),
ColumnSchema {
column_name,
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
options: None,
},
row,
schema_info,
)?;
let is_json_native_type = schema_info
.find_column_schema_in_table(&column_name)
.is_some_and(|x| {
if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
column_type.is_native_type()
} else {
false
}
});
let value = if is_json_native_type {
let json_extension_type: Option<JsonExtensionType> =
if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
x.column_schema.extension_type()?
} else {
None
};
let settings = json_extension_type
.and_then(|x| x.metadata().json_structure_settings.clone())
.unwrap_or_default();
let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
})?;
let value = settings.encode(value)?;
resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
let Value::Json(value) = value else {
unreachable!()
};
ValueData::JsonValue(encode_json_value(*value))
} else {
resolve_schema(
index,
p_ctx,
&column_name,
&ConcreteDataType::binary_datatype(),
schema_info,
)?;
let value = vrl_value_to_jsonb_value(&value);
ValueData::BinaryValue(value.to_vec())
};
Some(value)
}
};
let value = GreptimeValue { value_data };
if let Some(index) = index {
row[index] = value;
} else {
row.push(value);
}
Ok(())
}
@@ -626,20 +778,24 @@ fn identity_pipeline_inner(
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
// set time index column schema first
schema_info.schema.push(ColumnSchema {
column_name: custom_ts
let column_schema = datatypes::schema::ColumnSchema::new(
custom_ts
.map(|ts| ts.get_column_name().to_string())
.unwrap_or_else(|| greptime_timestamp().to_string()),
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ColumnDataType::TimestampMillisecond
} else {
ColumnDataType::TimestampNanosecond
}
}) as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
custom_ts
.map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
.unwrap_or_else(|| {
if pipeline_ctx.channel == Channel::Prometheus {
ConcreteDataType::timestamp_millisecond_datatype()
} else {
ConcreteDataType::timestamp_nanosecond_datatype()
}
}),
false,
);
schema_info.schema.push(ColumnMetadata {
column_schema,
semantic_type: SemanticType::Timestamp,
});
let mut opt_map = HashMap::new();
@@ -697,28 +853,29 @@ pub fn identity_pipeline(
input.push(result);
}
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
if let Some(table) = table {
let table_info = table.table_info();
for tag_name in table_info.meta.row_key_column_names() {
if let Some(index) = schema.index.get(tag_name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
schema.schema[*index].semantic_type = SemanticType::Tag;
}
}
}
opt_map
let column_schemas = schema.column_schemas()?;
Ok(opt_map
.into_iter()
.map(|(opt, rows)| {
(
opt,
Rows {
schema: schema.schema.clone(),
schema: column_schemas.clone(),
rows,
},
)
})
.collect::<HashMap<ContextOpt, Rows>>()
.collect::<HashMap<ContextOpt, Rows>>())
})
}
@@ -850,7 +1007,7 @@ mod tests {
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
"Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Binary".to_string(),
);
}
{
@@ -879,7 +1036,7 @@ mod tests {
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
"Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
);
}
{
@@ -942,7 +1099,7 @@ mod tests {
.map(|(mut schema, mut rows)| {
for name in tag_column_names {
if let Some(index) = schema.index.get(&name) {
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
schema.schema[*index].semantic_type = SemanticType::Tag;
}
}
@@ -950,7 +1107,7 @@ mod tests {
let rows = rows.remove(&ContextOpt::default()).unwrap();
Rows {
schema: schema.schema,
schema: schema.column_schemas().unwrap(),
rows,
}
});

View File

@@ -61,7 +61,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
}
Rows {
schema: schema_info.schema.clone(),
schema: schema_info.column_schemas().unwrap(),
rows,
}
}

View File

@@ -52,7 +52,7 @@ transform:
// check schema
assert_eq!(output.schema[0].column_name, "commit");
let type_id: i32 = ColumnDataType::Binary.into();
let type_id: i32 = ColumnDataType::Json.into();
assert_eq!(output.schema[0].datatype, type_id);
// check value
@@ -91,7 +91,7 @@ transform:
// check schema
assert_eq!(output.schema[0].column_name, "commit_json");
let type_id: i32 = ColumnDataType::Binary.into();
let type_id: i32 = ColumnDataType::Json.into();
assert_eq!(output.schema[0].datatype, type_id);
// check value
@@ -160,7 +160,7 @@ transform:
// check schema
assert_eq!(output.schema[0].column_name, "commit");
let type_id: i32 = ColumnDataType::Binary.into();
let type_id: i32 = ColumnDataType::Json.into();
assert_eq!(output.schema[0].datatype, type_id);
// check value

View File

@@ -664,6 +664,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
GreptimeProto {
source: api::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -794,6 +801,8 @@ impl ErrorExt for Error {
Suspended { .. } => StatusCode::Suspended,
MemoryLimitExceeded { .. } => StatusCode::RateLimited,
GreptimeProto { source, .. } => source.status_code(),
}
}

View File

@@ -23,6 +23,7 @@ pub mod memory_limit;
pub mod prom_query_gateway;
pub mod region_server;
use std::any::Any;
use std::net::SocketAddr;
use std::time::Duration;
@@ -399,4 +400,8 @@ impl Server for GrpcServer {
fn bind_addr(&self) -> Option<SocketAddr> {
self.bind_addr
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1285,6 +1285,10 @@ impl Server for HttpServer {
fn bind_addr(&self) -> Option<SocketAddr> {
self.bind_addr
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[cfg(test)]

View File

@@ -31,7 +31,7 @@ use axum_extra::TypedHeader;
use common_catalog::consts::default_engine;
use common_error::ext::{BoxedError, ErrorExt};
use common_query::{Output, OutputData};
use common_telemetry::{debug, error, warn};
use common_telemetry::{error, warn};
use headers::ContentType;
use lazy_static::lazy_static;
use mime_guess::mime;
@@ -738,11 +738,6 @@ pub async fn log_ingester(
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
debug!(
"receiving logs: {:?}",
serde_json::to_string(&value).unwrap()
);
query_ctx.set_channel(Channel::Log);
let query_ctx = Arc::new(query_ctx);

View File

@@ -152,7 +152,7 @@ pub async fn loki_ingest(
rows.push(row);
}
let schemas = schema_info.schema;
let schemas = schema_info.column_schemas()?;
// fill Null for missing values
for row in rows.iter_mut() {
row.resize(schemas.len(), GreptimeValue::default());
@@ -746,13 +746,16 @@ fn process_labels(
} else {
// not exist
// add schema and append to values
schemas.push(ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
});
schemas.push(
ColumnSchema {
column_name: k.clone(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Tag.into(),
datatype_extension: None,
options: None,
}
.into(),
);
column_indexer.insert(k, schemas.len() - 1);
row.push(GreptimeValue {

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -265,4 +266,8 @@ impl Server for MysqlServer {
fn bind_addr(&self) -> Option<SocketAddr> {
self.bind_addr
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema(
if let Some(index) = select_schema.index.get(key) {
let column_schema = &select_schema.schema[*index];
let column_schema: ColumnSchema = column_schema.clone().try_into()?;
// datatype of the same column name should be the same
ensure!(
column_schema.datatype == schema.datatype,
@@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema(
);
extracted_values[*index] = value;
} else {
select_schema.schema.push(schema);
select_schema.schema.push(schema.into());
select_schema
.index
.insert(key.clone(), select_schema.schema.len() - 1);
@@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows(
let mut parse_ctx = ParseContext::new(select_info);
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
schemas.extend(parse_ctx.select_schema.schema);
schemas.extend(parse_ctx.select_schema.column_schemas()?);
rows.iter_mut().for_each(|row| {
row.values.resize(schemas.len(), GreptimeValue::default());

View File

@@ -135,12 +135,18 @@ async fn run_custom_pipeline(
let mut schema_info = SchemaInfo::default();
schema_info
.schema
.push(time_index_column_schema(ts_name, timeunit));
.push(time_index_column_schema(ts_name, timeunit).into());
schema_info
}
};
let table = handler
.get_table(&table_name, query_ctx)
.await
.context(CatalogSnafu)?;
schema_info.set_table(table);
for pipeline_map in pipeline_maps {
let result = pipeline
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
@@ -194,7 +200,7 @@ async fn run_custom_pipeline(
RowInsertRequest {
rows: Some(Rows {
rows,
schema: schema_info.schema.clone(),
schema: schema_info.column_schemas()?,
}),
table_name: table_name.clone(),
},

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -144,4 +145,8 @@ impl Server for PostgresServer {
fn bind_addr(&self) -> Option<SocketAddr> {
self.bind_addr
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::net::SocketAddr;
@@ -147,6 +148,8 @@ pub trait Server: Send + Sync {
fn bind_addr(&self) -> Option<SocketAddr> {
None
}
fn as_any(&self) -> &dyn Any;
}
struct AcceptTask {

View File

@@ -1,14 +1,14 @@
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
| data | ts |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 1970-01-01T00:00:00.001 |
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 1970-01-01T00:00:00.002 |
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 1970-01-01T00:00:00.003 |
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 1970-01-01T00:00:00.004 |
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 1970-01-01T00:00:00.005 |
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 1970-01-01T00:00:00.006 |
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 1970-01-01T00:00:00.007 |
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 1970-01-01T00:00:00.008 |
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 1970-01-01T00:00:00.009 |
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 1970-01-01T00:00:00.010 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
| data | time_us |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 2024-11-21T16:25:49.000167 |
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 2024-11-21T16:25:49.000644 |
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 2024-11-21T16:25:49.001108 |
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 2024-11-21T16:25:49.001372 |
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadnt heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 2024-11-21T16:25:49.001905 |
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 2024-11-21T16:25:49.002758 |
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 2024-11-21T16:25:49.003106 |
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 2024-11-21T16:25:49.003461 |
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n It was nothing like that — . I was only thinking . . . \n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 2024-11-21T16:25:49.003907 |
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 2024-11-21T16:25:49.004769 |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+

View File

@@ -48,9 +48,9 @@ use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
use frontend::frontend::Frontend;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{Instance, StandaloneDatanodeManager};
use frontend::server::Services;
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use servers::grpc::GrpcOptions;
use servers::server::ServerHandlers;
use snafu::ResultExt;
use standalone::options::StandaloneOptions;
@@ -249,7 +249,7 @@ impl GreptimeDbStandaloneBuilder {
procedure_executor.clone(),
Arc::new(ProcessManager::new(server_addr, None)),
)
.with_plugin(plugins)
.with_plugin(plugins.clone())
.try_build()
.await
.unwrap();
@@ -282,14 +282,15 @@ impl GreptimeDbStandaloneBuilder {
test_util::prepare_another_catalog_and_schema(&instance).await;
let mut frontend = Frontend {
let servers = Services::new(opts.clone(), instance.clone(), plugins)
.build()
.unwrap();
let frontend = Frontend {
instance,
servers: ServerHandlers::default(),
servers,
heartbeat_task: None,
};
frontend.start().await.unwrap();
GreptimeDbStandalone {
frontend: Arc::new(frontend),
opts,

View File

@@ -18,14 +18,114 @@ use std::{fs, io};
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use http::StatusCode;
use servers::http::test_helpers::TestClient;
use servers::http::{HTTP_SERVER, HttpServer};
use servers::server::ServerHandlers;
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
use tests_integration::test_util::execute_sql_and_expect;
#[tokio::test]
async fn test_load_jsonbench_data() {
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_load_jsonbench_data_by_pipeline() -> io::Result<()> {
common_telemetry::init_default_ut_logging();
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data")
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_pipeline")
.build()
.await;
let frontend = instance.fe_instance();
let ServerHandlers::Init(handlers) = instance.frontend.server_handlers() else {
unreachable!()
};
let router = {
let handlers = handlers.lock().unwrap();
let server = handlers
.get(HTTP_SERVER)
.and_then(|x| x.0.as_any().downcast_ref::<HttpServer>())
.unwrap();
server.build(server.make_app()).unwrap()
};
let client = TestClient::new(router).await;
create_table(frontend).await;
desc_table(frontend).await;
create_pipeline(&client).await;
insert_data_by_pipeline(&client).await?;
query_data(frontend).await
}
async fn insert_data_by_pipeline(client: &TestClient) -> io::Result<()> {
let file = fs::read(find_workspace_path(
"tests-integration/resources/jsonbench-head-10.ndjson",
))?;
let response = client
.post("/v1/ingest?table=bluesky&pipeline_name=jsonbench")
.header("Content-Type", "text/plain")
.body(file)
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
let response = response.text().await;
// Note that this pattern also matches the inserted rows: "10".
let pattern = r#"{"output":[{"affectedrows":10}]"#;
assert!(response.starts_with(pattern));
Ok(())
}
async fn create_pipeline(client: &TestClient) {
let pipeline = r#"
version: 2
processors:
- json_parse:
fields:
- message, data
ignore_missing: true
- simple_extract:
fields:
- data, time_us
key: "time_us"
ignore_missing: false
- epoch:
fields:
- time_us
resolution: microsecond
- select:
fields:
- time_us
- data
transform:
- fields:
- time_us
type: epoch, us
index: timestamp
"#;
let response = client
.post("/v1/pipelines/jsonbench")
.header("Content-Type", "application/x-yaml")
.body(pipeline)
.send()
.await;
assert_eq!(response.status(), StatusCode::OK);
let response = response.text().await;
let pattern = r#"{"pipelines":[{"name":"jsonbench""#;
assert!(response.starts_with(pattern));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_load_jsonbench_data_by_sql() -> io::Result<()> {
common_telemetry::init_default_ut_logging();
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_sql")
.build()
.await;
let frontend = instance.fe_instance();
@@ -34,9 +134,9 @@ async fn test_load_jsonbench_data() {
desc_table(frontend).await;
insert_data(frontend).await.unwrap();
insert_data_by_sql(frontend).await?;
query_data(frontend).await.unwrap();
query_data(frontend).await
}
async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
@@ -46,22 +146,21 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
| count(*) |
+----------+
| 10 |
+----------+
"#;
+----------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
let sql = "SELECT * FROM bluesky ORDER BY ts";
let sql = "SELECT * FROM bluesky ORDER BY time_us";
let expected = fs::read_to_string(find_workspace_path(
"tests-integration/resources/jsonbench-select-all.txt",
))?;
execute_sql_and_expect(frontend, sql, &expected).await;
// query 1:
let sql = "\
SELECT \
json_get_string(data, '$.commit.collection') AS event, count() AS count \
FROM bluesky \
GROUP BY event \
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event, count() AS count
FROM bluesky
GROUP BY event
ORDER BY count DESC, event ASC";
let expected = r#"
+-----------------------+-------+
@@ -75,16 +174,16 @@ ORDER BY count DESC, event ASC";
execute_sql_and_expect(frontend, sql, expected).await;
// query 2:
let sql = "\
SELECT \
json_get_string(data, '$.commit.collection') AS event, \
count() AS count, \
count(DISTINCT json_get_string(data, '$.did')) AS users \
FROM bluesky \
WHERE \
(json_get_string(data, '$.kind') = 'commit') AND \
(json_get_string(data, '$.commit.operation') = 'create') \
GROUP BY event \
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
count() AS count,
count(DISTINCT json_get_string(data, '$.did')) AS users
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create')
GROUP BY event
ORDER BY count DESC, event ASC";
let expected = r#"
+-----------------------+-------+-------+
@@ -98,18 +197,18 @@ ORDER BY count DESC, event ASC";
execute_sql_and_expect(frontend, sql, expected).await;
// query 3:
let sql = "\
SELECT \
json_get_string(data, '$.commit.collection') AS event, \
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, \
count() AS count \
FROM bluesky \
WHERE \
(json_get_string(data, '$.kind') = 'commit') AND \
(json_get_string(data, '$.commit.operation') = 'create') AND \
json_get_string(data, '$.commit.collection') IN \
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') \
GROUP BY event, hour_of_day \
let sql = "
SELECT
json_get_string(data, '$.commit.collection') AS event,
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
count() AS count
FROM bluesky
WHERE
(json_get_string(data, '$.kind') = 'commit') AND
(json_get_string(data, '$.commit.operation') = 'create') AND
json_get_string(data, '$.commit.collection') IN
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
GROUP BY event, hour_of_day
ORDER BY hour_of_day, event";
let expected = r#"
+----------------------+-------------+-------+
@@ -122,7 +221,7 @@ ORDER BY hour_of_day, event";
execute_sql_and_expect(frontend, sql, expected).await;
// query 4:
let sql = "\
let sql = "
SELECT
json_get_string(data, '$.did') as user_id,
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
@@ -174,19 +273,23 @@ LIMIT 3";
Ok(())
}
async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
let file = fs::File::open(find_workspace_path(
"tests-integration/resources/jsonbench-head-10.ndjson",
))?;
let reader = io::BufReader::new(file);
for (i, line) in reader.lines().enumerate() {
for line in reader.lines() {
let line = line?;
if line.is_empty() {
continue;
}
let json: serde_json::Value = serde_json::from_str(&line)?;
let time_us = json.pointer("/time_us").and_then(|x| x.as_u64()).unwrap();
let sql = format!(
"INSERT INTO bluesky (ts, data) VALUES ({}, '{}')",
i + 1,
"INSERT INTO bluesky (time_us, data) VALUES ({}, '{}')",
time_us,
line.replace("'", "''"), // standard method to escape the single quote
);
execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await;
@@ -197,12 +300,12 @@ async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
async fn desc_table(frontend: &Arc<Instance>) {
let sql = "DESC TABLE bluesky";
let expected = r#"
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<Object{"_raw": String, "commit.collection": String, "commit.operation": String, "did": String, "kind": String, "time_us": Number(I64)}> | | YES | | FIELD |
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
execute_sql_and_expect(frontend, sql, expected).await;
}
@@ -219,7 +322,7 @@ CREATE TABLE bluesky (
time_us Bigint
>,
),
ts Timestamp TIME INDEX,
time_us TimestampMicrosecond TIME INDEX,
)
"#;
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;

View File

@@ -12,7 +12,7 @@ DESC TABLE t;
| Column | Type | Key | Null | Default | Semantic Type |
+--------+----------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<Null> | | YES | | FIELD |
| j | Json<"<Null>"> | | YES | | FIELD |
+--------+----------------------+-----+------+---------+---------------+
INSERT INTO t VALUES
@@ -24,12 +24,12 @@ Affected Rows: 3
DESC TABLE t;
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<Object{"int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String}, "b": Object{"y": Number(I64)}}}> | | YES | | FIELD |
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<{"int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>"},"b":{"y":"<Number>"}}}> | | YES | | FIELD |
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
INSERT INTO t VALUES
(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'),
@@ -39,12 +39,12 @@ Affected Rows: 2
DESC TABLE t;
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<Object{"bool": Bool, "int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String, "y": Number(I64)}, "b": Object{"x": String, "y": Number(I64)}}}> | | YES | | FIELD |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}');
@@ -52,12 +52,12 @@ Affected Rows: 1
DESC TABLE t;
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<Object{"bool": Bool, "int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String, "y": Number(I64)}, "b": Object{"x": String, "y": Number(I64)}}}> | | YES | | FIELD |
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
INSERT INTO t VALUES (1762128011000, '{}');