mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 21:02:58 +00:00
feat: ingest jsonbench data through pipeline
Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -9474,6 +9474,7 @@ dependencies = [
|
||||
"ahash 0.8.12",
|
||||
"api",
|
||||
"arrow",
|
||||
"arrow-schema",
|
||||
"async-trait",
|
||||
"catalog",
|
||||
"chrono",
|
||||
|
||||
@@ -895,7 +895,7 @@ pub fn is_column_type_value_eq(
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn encode_json_value(value: JsonValue) -> v1::JsonValue {
|
||||
pub fn encode_json_value(value: JsonValue) -> v1::JsonValue {
|
||||
fn helper(json: JsonVariant) -> v1::JsonValue {
|
||||
let value = match json {
|
||||
JsonVariant::Null => None,
|
||||
|
||||
@@ -17,8 +17,8 @@ use std::collections::HashMap;
|
||||
use arrow_schema::extension::{EXTENSION_TYPE_METADATA_KEY, EXTENSION_TYPE_NAME_KEY};
|
||||
use datatypes::schema::{
|
||||
COMMENT_KEY, ColumnDefaultConstraint, ColumnSchema, FULLTEXT_KEY, FulltextAnalyzer,
|
||||
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, SKIPPING_INDEX_KEY, SkippingIndexOptions,
|
||||
SkippingIndexType,
|
||||
FulltextBackend, FulltextOptions, INVERTED_INDEX_KEY, Metadata, SKIPPING_INDEX_KEY,
|
||||
SkippingIndexOptions, SkippingIndexType,
|
||||
};
|
||||
use greptime_proto::v1::{
|
||||
Analyzer, FulltextBackend as PbFulltextBackend, SkippingIndexType as PbSkippingIndexType,
|
||||
@@ -131,6 +131,31 @@ pub fn try_as_column_def(column_schema: &ColumnSchema, is_primary_key: bool) ->
|
||||
})
|
||||
}
|
||||
|
||||
/// Collect the [ColumnOptions] into the [Metadata] that can be used in, for example, [ColumnSchema].
|
||||
pub fn collect_column_options(column_options: Option<&ColumnOptions>) -> Metadata {
|
||||
let mut metadata = Metadata::default();
|
||||
let Some(ColumnOptions { options }) = column_options else {
|
||||
return metadata;
|
||||
};
|
||||
|
||||
if let Some(v) = options.get(FULLTEXT_GRPC_KEY) {
|
||||
metadata.insert(FULLTEXT_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(INVERTED_INDEX_GRPC_KEY) {
|
||||
metadata.insert(INVERTED_INDEX_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(SKIPPING_INDEX_GRPC_KEY) {
|
||||
metadata.insert(SKIPPING_INDEX_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(EXTENSION_TYPE_NAME_KEY) {
|
||||
metadata.insert(EXTENSION_TYPE_NAME_KEY.to_string(), v.clone());
|
||||
}
|
||||
if let Some(v) = options.get(EXTENSION_TYPE_METADATA_KEY) {
|
||||
metadata.insert(EXTENSION_TYPE_METADATA_KEY.to_string(), v.clone());
|
||||
}
|
||||
metadata
|
||||
}
|
||||
|
||||
/// Constructs a `ColumnOptions` from the given `ColumnSchema`.
|
||||
pub fn options_from_column_schema(column_schema: &ColumnSchema) -> Option<ColumnOptions> {
|
||||
let mut options = ColumnOptions::default();
|
||||
|
||||
@@ -816,7 +816,7 @@ mod tests {
|
||||
let result = encode_by_struct(&json_struct, json);
|
||||
assert_eq!(
|
||||
result.unwrap_err().to_string(),
|
||||
"Cannot cast value bar to Number(I64)"
|
||||
r#"Cannot cast value bar to "<Number>""#
|
||||
);
|
||||
|
||||
let json = json!({
|
||||
|
||||
@@ -13,7 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -133,28 +133,24 @@ impl From<&ConcreteDataType> for JsonNativeType {
|
||||
|
||||
impl Display for JsonNativeType {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
JsonNativeType::Null => write!(f, "Null"),
|
||||
JsonNativeType::Bool => write!(f, "Bool"),
|
||||
JsonNativeType::Number(t) => {
|
||||
write!(f, "Number({t:?})")
|
||||
}
|
||||
JsonNativeType::String => write!(f, "String"),
|
||||
JsonNativeType::Array(item_type) => {
|
||||
write!(f, "Array[{}]", item_type)
|
||||
}
|
||||
JsonNativeType::Object(object) => {
|
||||
write!(
|
||||
f,
|
||||
"Object{{{}}}",
|
||||
fn to_serde_value(t: &JsonNativeType) -> serde_json::Value {
|
||||
match t {
|
||||
JsonNativeType::Null => serde_json::Value::String("<Null>".to_string()),
|
||||
JsonNativeType::Bool => serde_json::Value::String("<Bool>".to_string()),
|
||||
JsonNativeType::Number(_) => serde_json::Value::String("<Number>".to_string()),
|
||||
JsonNativeType::String => serde_json::Value::String("<String>".to_string()),
|
||||
JsonNativeType::Array(item_type) => {
|
||||
serde_json::Value::Array(vec![to_serde_value(item_type)])
|
||||
}
|
||||
JsonNativeType::Object(object) => serde_json::Value::Object(
|
||||
object
|
||||
.iter()
|
||||
.map(|(k, v)| format!(r#""{k}": {v}"#))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
)
|
||||
.map(|(k, v)| (k.clone(), to_serde_value(v)))
|
||||
.collect(),
|
||||
),
|
||||
}
|
||||
}
|
||||
write!(f, "{}", to_serde_value(self))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -183,7 +179,11 @@ impl JsonType {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn native_type(&self) -> &JsonNativeType {
|
||||
pub fn is_native_type(&self) -> bool {
|
||||
matches!(self.format, JsonFormat::Native(_))
|
||||
}
|
||||
|
||||
pub fn native_type(&self) -> &JsonNativeType {
|
||||
match &self.format {
|
||||
JsonFormat::Jsonb => &JsonNativeType::String,
|
||||
JsonFormat::Native(x) => x.as_ref(),
|
||||
@@ -650,15 +650,16 @@ mod tests {
|
||||
"list": [1, 2, 3],
|
||||
"object": {"a": 1}
|
||||
}"#;
|
||||
let expected = r#"Json<Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
|
||||
let expected =
|
||||
r#"Json<{"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
|
||||
test(json, json_type, Ok(expected))?;
|
||||
|
||||
// cannot merge with other non-object json values:
|
||||
let jsons = [r#""s""#, "1", "[1]"];
|
||||
let expects = [
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: String"#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Number(I64)"#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: Object{"hello": String, "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}, that: Array[Number(I64)]"#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<String>""#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: "<Number>""#,
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: {"hello":"<String>","list":["<Number>"],"object":{"a":"<Number>"}}, that: ["<Number>"]"#,
|
||||
];
|
||||
for (json, expect) in jsons.into_iter().zip(expects.into_iter()) {
|
||||
test(json, json_type, Err(expect))?;
|
||||
@@ -670,7 +671,7 @@ mod tests {
|
||||
"float": 0.123,
|
||||
"no": 42
|
||||
}"#;
|
||||
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: String, that: Number(I64)"#;
|
||||
let expected = r#"Failed to merge JSON datatype: datatypes have conflict, this: "<String>", that: "<Number>""#;
|
||||
test(json, json_type, Err(expected))?;
|
||||
|
||||
// can merge with another json object:
|
||||
@@ -679,7 +680,7 @@ mod tests {
|
||||
"float": 0.123,
|
||||
"int": 42
|
||||
}"#;
|
||||
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64)}}>"#;
|
||||
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>"}}>"#;
|
||||
test(json, json_type, Ok(expected))?;
|
||||
|
||||
// can merge with some complex nested json object:
|
||||
@@ -689,7 +690,7 @@ mod tests {
|
||||
"float": 0.456,
|
||||
"int": 0
|
||||
}"#;
|
||||
let expected = r#"Json<Object{"float": Number(F64), "hello": String, "int": Number(I64), "list": Array[Number(I64)], "object": Object{"a": Number(I64), "foo": String, "l": Array[String], "o": Object{"key": String}}}>"#;
|
||||
let expected = r#"Json<{"float":"<Number>","hello":"<String>","int":"<Number>","list":["<Number>"],"object":{"a":"<Number>","foo":"<String>","l":["<String>"],"o":{"key":"<String>"}}}>"#;
|
||||
test(json, json_type, Ok(expected))?;
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -321,10 +321,10 @@ mod tests {
|
||||
Ok(()),
|
||||
Ok(()),
|
||||
Err(
|
||||
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: String",
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: "<String>""#,
|
||||
),
|
||||
Err(
|
||||
"Failed to merge JSON datatype: datatypes have conflict, this: Number(I64), that: Array[Bool]",
|
||||
r#"Failed to merge JSON datatype: datatypes have conflict, this: "<Number>", that: ["<Bool>"]"#,
|
||||
),
|
||||
];
|
||||
let mut builder = JsonVectorBuilder::new(JsonNativeType::Null, 1);
|
||||
@@ -396,12 +396,12 @@ mod tests {
|
||||
// test children builders:
|
||||
assert_eq!(builder.builders.len(), 6);
|
||||
let expect_types = [
|
||||
r#"Json<Object{"list": Array[Number(I64)], "s": String}>"#,
|
||||
r#"Json<Object{"float": Number(F64), "s": String}>"#,
|
||||
r#"Json<Object{"float": Number(F64), "int": Number(I64)}>"#,
|
||||
r#"Json<Object{"int": Number(I64), "object": Object{"hello": String, "timestamp": Number(I64)}}>"#,
|
||||
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"b": Object{"a": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
|
||||
r#"Json<Object{"nested": Object{"a": Object{"b": Object{"a": Object{"b": String}}}}, "object": Object{"timestamp": Number(I64)}}>"#,
|
||||
r#"Json<{"list":["<Number>"],"s":"<String>"}>"#,
|
||||
r#"Json<{"float":"<Number>","s":"<String>"}>"#,
|
||||
r#"Json<{"float":"<Number>","int":"<Number>"}>"#,
|
||||
r#"Json<{"int":"<Number>","object":{"hello":"<String>","timestamp":"<Number>"}}>"#,
|
||||
r#"Json<{"nested":{"a":{"b":{"b":{"a":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
|
||||
r#"Json<{"nested":{"a":{"b":{"a":{"b":"<String>"}}}},"object":{"timestamp":"<Number>"}}>"#,
|
||||
];
|
||||
let expect_vectors = [
|
||||
r#"
|
||||
@@ -456,7 +456,7 @@ mod tests {
|
||||
}
|
||||
|
||||
// test final merged json type:
|
||||
let expected = r#"Json<Object{"float": Number(F64), "int": Number(I64), "list": Array[Number(I64)], "nested": Object{"a": Object{"b": Object{"a": Object{"b": String}, "b": Object{"a": String}}}}, "object": Object{"hello": String, "timestamp": Number(I64)}, "s": String}>"#;
|
||||
let expected = r#"Json<{"float":"<Number>","int":"<Number>","list":["<Number>"],"nested":{"a":{"b":{"a":{"b":"<String>"},"b":{"a":"<String>"}}}},"object":{"hello":"<String>","timestamp":"<Number>"},"s":"<String>"}>"#;
|
||||
assert_eq!(builder.data_type().to_string(), expected);
|
||||
|
||||
// test final produced vector:
|
||||
|
||||
@@ -13,6 +13,7 @@ workspace = true
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
arrow.workspace = true
|
||||
arrow-schema.workspace = true
|
||||
async-trait.workspace = true
|
||||
catalog.workspace = true
|
||||
chrono.workspace = true
|
||||
|
||||
@@ -800,6 +800,20 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
GreptimeProto {
|
||||
source: api::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
Datatypes {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -920,6 +934,9 @@ impl ErrorExt for Error {
|
||||
FloatIsNan { .. }
|
||||
| InvalidEpochForResolution { .. }
|
||||
| UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
GreptimeProto { source, .. } => source.status_code(),
|
||||
Datatypes { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,13 +19,17 @@ use std::collections::{BTreeMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::helper::proto_value_type;
|
||||
use api::v1::column_data_type_extension::TypeExt;
|
||||
use api::helper::{ColumnDataTypeWrapper, encode_json_value};
|
||||
use api::v1::column_def::{collect_column_options, options_from_column_schema};
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
|
||||
use api::v1::{ColumnDataType, SemanticType};
|
||||
use arrow_schema::extension::ExtensionType;
|
||||
use coerce::{coerce_columns, coerce_value};
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use common_telemetry::warn;
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::extension::json::JsonExtensionType;
|
||||
use datatypes::value::Value;
|
||||
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
|
||||
use itertools::Itertools;
|
||||
use jsonb::Number;
|
||||
@@ -33,12 +37,15 @@ use once_cell::sync::OnceCell;
|
||||
use serde_json as serde_json_crate;
|
||||
use session::context::Channel;
|
||||
use snafu::OptionExt;
|
||||
use table::Table;
|
||||
use vrl::prelude::{Bytes, VrlValueConvert};
|
||||
use vrl::value::value::StdError;
|
||||
use vrl::value::{KeyString, Value as VrlValue};
|
||||
|
||||
use crate::error::{
|
||||
ArrayElementMustBeObjectSnafu, IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu,
|
||||
Result, TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
|
||||
ArrayElementMustBeObjectSnafu, CoerceIncompatibleTypesSnafu,
|
||||
IdentifyPipelineColumnTypeMismatchSnafu, InvalidTimestampSnafu, Result,
|
||||
TimeIndexMustBeNonNullSnafu, TransformColumnNameMustBeUniqueSnafu,
|
||||
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, ValueMustBeMapSnafu,
|
||||
};
|
||||
use crate::etl::PipelineDocVersion;
|
||||
@@ -272,15 +279,75 @@ impl GreptimeTransformer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ColumnMetadata {
|
||||
column_schema: datatypes::schema::ColumnSchema,
|
||||
semantic_type: SemanticType,
|
||||
}
|
||||
|
||||
impl From<ColumnSchema> for ColumnMetadata {
|
||||
fn from(value: ColumnSchema) -> Self {
|
||||
let datatype = value.datatype();
|
||||
let semantic_type = value.semantic_type();
|
||||
let ColumnSchema {
|
||||
column_name,
|
||||
datatype: _,
|
||||
semantic_type: _,
|
||||
datatype_extension,
|
||||
options,
|
||||
} = value;
|
||||
|
||||
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||
column_name,
|
||||
ColumnDataTypeWrapper::new(datatype, datatype_extension).into(),
|
||||
semantic_type != SemanticType::Timestamp,
|
||||
);
|
||||
|
||||
let metadata = collect_column_options(options.as_ref());
|
||||
let column_schema = column_schema.with_metadata(metadata);
|
||||
|
||||
Self {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<ColumnMetadata> for ColumnSchema {
|
||||
type Error = api::error::Error;
|
||||
|
||||
fn try_from(value: ColumnMetadata) -> std::result::Result<Self, Self::Error> {
|
||||
let ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
} = value;
|
||||
|
||||
let options = options_from_column_schema(&column_schema);
|
||||
|
||||
let (datatype, datatype_extension) =
|
||||
ColumnDataTypeWrapper::try_from(column_schema.data_type).map(|x| x.into_parts())?;
|
||||
|
||||
Ok(ColumnSchema {
|
||||
column_name: column_schema.name,
|
||||
datatype: datatype as _,
|
||||
semantic_type: semantic_type as _,
|
||||
datatype_extension,
|
||||
options,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// This is used to record the current state schema information and a sequential cache of field names.
|
||||
/// As you traverse the user input JSON, this will change.
|
||||
/// It will record a superset of all user input schemas.
|
||||
#[derive(Debug, Default)]
|
||||
#[derive(Default)]
|
||||
pub struct SchemaInfo {
|
||||
/// schema info
|
||||
pub schema: Vec<ColumnSchema>,
|
||||
pub schema: Vec<ColumnMetadata>,
|
||||
/// index of the column name
|
||||
pub index: HashMap<String, usize>,
|
||||
/// The pipeline's corresponding table (if already created). Useful to retrieve column schemas.
|
||||
table: Option<Arc<Table>>,
|
||||
}
|
||||
|
||||
impl SchemaInfo {
|
||||
@@ -288,6 +355,7 @@ impl SchemaInfo {
|
||||
Self {
|
||||
schema: Vec::with_capacity(capacity),
|
||||
index: HashMap::with_capacity(capacity),
|
||||
table: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -297,46 +365,88 @@ impl SchemaInfo {
|
||||
index.insert(schema.column_name.clone(), i);
|
||||
}
|
||||
Self {
|
||||
schema: schema_list,
|
||||
schema: schema_list.into_iter().map(Into::into).collect(),
|
||||
index,
|
||||
table: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_table(&mut self, table: Option<Arc<Table>>) {
|
||||
self.table = table;
|
||||
}
|
||||
|
||||
fn find_column_schema_in_table(&self, column_name: &str) -> Option<ColumnMetadata> {
|
||||
if let Some(table) = &self.table
|
||||
&& let Some(i) = table.schema_ref().column_index_by_name(column_name)
|
||||
{
|
||||
let column_schema = table.schema_ref().column_schemas()[i].clone();
|
||||
|
||||
let semantic_type = if column_schema.is_time_index() {
|
||||
SemanticType::Timestamp
|
||||
} else if table.table_info().meta.primary_key_indices.contains(&i) {
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field
|
||||
};
|
||||
|
||||
Some(ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn column_schemas(&self) -> api::error::Result<Vec<ColumnSchema>> {
|
||||
self.schema
|
||||
.iter()
|
||||
.map(|x| x.clone().try_into())
|
||||
.collect::<api::error::Result<Vec<_>>>()
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_schema(
|
||||
index: Option<usize>,
|
||||
value_data: ValueData,
|
||||
column_schema: ColumnSchema,
|
||||
row: &mut Vec<GreptimeValue>,
|
||||
pipeline_context: &PipelineContext,
|
||||
column: &str,
|
||||
value_type: &ConcreteDataType,
|
||||
schema_info: &mut SchemaInfo,
|
||||
) -> Result<()> {
|
||||
if let Some(index) = index {
|
||||
let api_value = GreptimeValue {
|
||||
value_data: Some(value_data),
|
||||
};
|
||||
// Safety unwrap is fine here because api_value is always valid
|
||||
let value_column_data_type = proto_value_type(&api_value).unwrap();
|
||||
// Safety unwrap is fine here because index is always valid
|
||||
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
|
||||
if value_column_data_type != schema_column_data_type {
|
||||
IdentifyPipelineColumnTypeMismatchSnafu {
|
||||
column: column_schema.column_name,
|
||||
expected: schema_column_data_type.as_str_name(),
|
||||
actual: value_column_data_type.as_str_name(),
|
||||
let column_type = &mut schema_info.schema[index].column_schema.data_type;
|
||||
match (column_type, value_type) {
|
||||
(ConcreteDataType::Json(column_type), ConcreteDataType::Json(value_type))
|
||||
if column_type.is_include(value_type) =>
|
||||
{
|
||||
Ok(())
|
||||
}
|
||||
.fail()
|
||||
} else {
|
||||
row[index] = api_value;
|
||||
Ok(())
|
||||
(column_type, value_type) if column_type == value_type => Ok(()),
|
||||
(column_type, value_type) => IdentifyPipelineColumnTypeMismatchSnafu {
|
||||
column,
|
||||
expected: column_type.to_string(),
|
||||
actual: value_type.to_string(),
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
} else {
|
||||
let key = column_schema.column_name.clone();
|
||||
let column_schema = schema_info
|
||||
.find_column_schema_in_table(column)
|
||||
.unwrap_or_else(|| {
|
||||
let semantic_type = decide_semantic(pipeline_context, column);
|
||||
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||
column,
|
||||
value_type.clone(),
|
||||
semantic_type != SemanticType::Timestamp,
|
||||
);
|
||||
ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type,
|
||||
}
|
||||
});
|
||||
let key = column.to_string();
|
||||
schema_info.schema.push(column_schema);
|
||||
schema_info.index.insert(key, schema_info.schema.len() - 1);
|
||||
let api_value = GreptimeValue {
|
||||
value_data: Some(value_data),
|
||||
};
|
||||
row.push(api_value);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -481,11 +591,11 @@ pub(crate) fn values_to_row(
|
||||
Ok(Row { values: row })
|
||||
}
|
||||
|
||||
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> i32 {
|
||||
fn decide_semantic(p_ctx: &PipelineContext, column_name: &str) -> SemanticType {
|
||||
if p_ctx.channel == Channel::Prometheus && column_name != greptime_value() {
|
||||
SemanticType::Tag as i32
|
||||
SemanticType::Tag
|
||||
} else {
|
||||
SemanticType::Field as i32
|
||||
SemanticType::Field
|
||||
}
|
||||
}
|
||||
|
||||
@@ -497,55 +607,56 @@ fn resolve_value(
|
||||
p_ctx: &PipelineContext,
|
||||
) -> Result<()> {
|
||||
let index = schema_info.index.get(&column_name).copied();
|
||||
let mut resolve_simple_type =
|
||||
|value_data: ValueData, column_name: String, data_type: ColumnDataType| {
|
||||
let semantic_type = decide_semantic(p_ctx, &column_name);
|
||||
resolve_schema(
|
||||
index,
|
||||
value_data,
|
||||
ColumnSchema {
|
||||
column_name,
|
||||
datatype: data_type as i32,
|
||||
semantic_type,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
row,
|
||||
schema_info,
|
||||
)
|
||||
};
|
||||
|
||||
match value {
|
||||
VrlValue::Null => {}
|
||||
let value_data = match value {
|
||||
VrlValue::Null => None,
|
||||
|
||||
VrlValue::Integer(v) => {
|
||||
// safe unwrap after type matched
|
||||
resolve_simple_type(ValueData::I64Value(v), column_name, ColumnDataType::Int64)?;
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::int64_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::I64Value(v))
|
||||
}
|
||||
|
||||
VrlValue::Float(v) => {
|
||||
// safe unwrap after type matched
|
||||
resolve_simple_type(
|
||||
ValueData::F64Value(v.into()),
|
||||
column_name,
|
||||
ColumnDataType::Float64,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::float64_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::F64Value(v.into()))
|
||||
}
|
||||
|
||||
VrlValue::Boolean(v) => {
|
||||
resolve_simple_type(
|
||||
ValueData::BoolValue(v),
|
||||
column_name,
|
||||
ColumnDataType::Boolean,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::boolean_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::BoolValue(v))
|
||||
}
|
||||
|
||||
VrlValue::Bytes(v) => {
|
||||
resolve_simple_type(
|
||||
ValueData::StringValue(String::from_utf8_lossy_owned(v.to_vec())),
|
||||
column_name,
|
||||
ColumnDataType::String,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::string_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::StringValue(String::from_utf8_lossy_owned(
|
||||
v.to_vec(),
|
||||
)))
|
||||
}
|
||||
|
||||
VrlValue::Regex(v) => {
|
||||
@@ -553,42 +664,83 @@ fn resolve_value(
|
||||
"Persisting regex value in the table, this should not happen, column_name: {}",
|
||||
column_name
|
||||
);
|
||||
resolve_simple_type(
|
||||
ValueData::StringValue(v.to_string()),
|
||||
column_name,
|
||||
ColumnDataType::String,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::string_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::StringValue(v.to_string()))
|
||||
}
|
||||
|
||||
VrlValue::Timestamp(ts) => {
|
||||
let ns = ts.timestamp_nanos_opt().context(InvalidTimestampSnafu {
|
||||
input: ts.to_rfc3339(),
|
||||
})?;
|
||||
resolve_simple_type(
|
||||
ValueData::TimestampNanosecondValue(ns),
|
||||
column_name,
|
||||
ColumnDataType::TimestampNanosecond,
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::timestamp_nanosecond_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
Some(ValueData::TimestampNanosecondValue(ns))
|
||||
}
|
||||
|
||||
VrlValue::Array(_) | VrlValue::Object(_) => {
|
||||
let data = vrl_value_to_jsonb_value(&value);
|
||||
resolve_schema(
|
||||
index,
|
||||
ValueData::BinaryValue(data.to_vec()),
|
||||
ColumnSchema {
|
||||
column_name,
|
||||
datatype: ColumnDataType::Binary as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
datatype_extension: Some(ColumnDataTypeExtension {
|
||||
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
|
||||
}),
|
||||
options: None,
|
||||
},
|
||||
row,
|
||||
schema_info,
|
||||
)?;
|
||||
let is_json_native_type = schema_info
|
||||
.find_column_schema_in_table(&column_name)
|
||||
.is_some_and(|x| {
|
||||
if let ConcreteDataType::Json(column_type) = &x.column_schema.data_type {
|
||||
column_type.is_native_type()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
let value = if is_json_native_type {
|
||||
let json_extension_type: Option<JsonExtensionType> =
|
||||
if let Some(x) = schema_info.find_column_schema_in_table(&column_name) {
|
||||
x.column_schema.extension_type()?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let settings = json_extension_type
|
||||
.and_then(|x| x.metadata().json_structure_settings.clone())
|
||||
.unwrap_or_default();
|
||||
let value: serde_json::Value = value.try_into().map_err(|e: StdError| {
|
||||
CoerceIncompatibleTypesSnafu { msg: e.to_string() }.build()
|
||||
})?;
|
||||
let value = settings.encode(value)?;
|
||||
|
||||
resolve_schema(index, p_ctx, &column_name, &value.data_type(), schema_info)?;
|
||||
|
||||
let Value::Json(value) = value else {
|
||||
unreachable!()
|
||||
};
|
||||
ValueData::JsonValue(encode_json_value(*value))
|
||||
} else {
|
||||
resolve_schema(
|
||||
index,
|
||||
p_ctx,
|
||||
&column_name,
|
||||
&ConcreteDataType::binary_datatype(),
|
||||
schema_info,
|
||||
)?;
|
||||
|
||||
let value = vrl_value_to_jsonb_value(&value);
|
||||
ValueData::BinaryValue(value.to_vec())
|
||||
};
|
||||
Some(value)
|
||||
}
|
||||
};
|
||||
|
||||
let value = GreptimeValue { value_data };
|
||||
if let Some(index) = index {
|
||||
row[index] = value;
|
||||
} else {
|
||||
row.push(value);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -626,20 +778,24 @@ fn identity_pipeline_inner(
|
||||
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
||||
|
||||
// set time index column schema first
|
||||
schema_info.schema.push(ColumnSchema {
|
||||
column_name: custom_ts
|
||||
let column_schema = datatypes::schema::ColumnSchema::new(
|
||||
custom_ts
|
||||
.map(|ts| ts.get_column_name().to_string())
|
||||
.unwrap_or_else(|| greptime_timestamp().to_string()),
|
||||
datatype: custom_ts.map(|c| c.get_datatype()).unwrap_or_else(|| {
|
||||
if pipeline_ctx.channel == Channel::Prometheus {
|
||||
ColumnDataType::TimestampMillisecond
|
||||
} else {
|
||||
ColumnDataType::TimestampNanosecond
|
||||
}
|
||||
}) as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
custom_ts
|
||||
.map(|c| ConcreteDataType::from(ColumnDataTypeWrapper::new(c.get_datatype(), None)))
|
||||
.unwrap_or_else(|| {
|
||||
if pipeline_ctx.channel == Channel::Prometheus {
|
||||
ConcreteDataType::timestamp_millisecond_datatype()
|
||||
} else {
|
||||
ConcreteDataType::timestamp_nanosecond_datatype()
|
||||
}
|
||||
}),
|
||||
false,
|
||||
);
|
||||
schema_info.schema.push(ColumnMetadata {
|
||||
column_schema,
|
||||
semantic_type: SemanticType::Timestamp,
|
||||
});
|
||||
|
||||
let mut opt_map = HashMap::new();
|
||||
@@ -697,28 +853,29 @@ pub fn identity_pipeline(
|
||||
input.push(result);
|
||||
}
|
||||
|
||||
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, opt_map)| {
|
||||
identity_pipeline_inner(input, pipeline_ctx).and_then(|(mut schema, opt_map)| {
|
||||
if let Some(table) = table {
|
||||
let table_info = table.table_info();
|
||||
for tag_name in table_info.meta.row_key_column_names() {
|
||||
if let Some(index) = schema.index.get(tag_name) {
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opt_map
|
||||
let column_schemas = schema.column_schemas()?;
|
||||
Ok(opt_map
|
||||
.into_iter()
|
||||
.map(|(opt, rows)| {
|
||||
(
|
||||
opt,
|
||||
Rows {
|
||||
schema: schema.schema.clone(),
|
||||
schema: column_schemas.clone(),
|
||||
rows,
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect::<HashMap<ContextOpt, Rows>>()
|
||||
.collect::<HashMap<ContextOpt, Rows>>())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -850,7 +1007,7 @@ mod tests {
|
||||
assert!(rows.is_err());
|
||||
assert_eq!(
|
||||
rows.err().unwrap().to_string(),
|
||||
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
|
||||
"Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: String".to_string(),
|
||||
);
|
||||
}
|
||||
{
|
||||
@@ -879,7 +1036,7 @@ mod tests {
|
||||
assert!(rows.is_err());
|
||||
assert_eq!(
|
||||
rows.err().unwrap().to_string(),
|
||||
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
|
||||
"Column datatype mismatch. For column: score, expected datatype: Float64, actual datatype: Int64".to_string(),
|
||||
);
|
||||
}
|
||||
{
|
||||
@@ -942,7 +1099,7 @@ mod tests {
|
||||
.map(|(mut schema, mut rows)| {
|
||||
for name in tag_column_names {
|
||||
if let Some(index) = schema.index.get(&name) {
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -950,7 +1107,7 @@ mod tests {
|
||||
let rows = rows.remove(&ContextOpt::default()).unwrap();
|
||||
|
||||
Rows {
|
||||
schema: schema.schema,
|
||||
schema: schema.column_schemas().unwrap(),
|
||||
rows,
|
||||
}
|
||||
});
|
||||
|
||||
@@ -61,7 +61,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
}
|
||||
|
||||
Rows {
|
||||
schema: schema_info.schema.clone(),
|
||||
schema: schema_info.column_schemas().unwrap(),
|
||||
rows,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,7 +52,7 @@ transform:
|
||||
|
||||
// check schema
|
||||
assert_eq!(output.schema[0].column_name, "commit");
|
||||
let type_id: i32 = ColumnDataType::Binary.into();
|
||||
let type_id: i32 = ColumnDataType::Json.into();
|
||||
assert_eq!(output.schema[0].datatype, type_id);
|
||||
|
||||
// check value
|
||||
@@ -91,7 +91,7 @@ transform:
|
||||
|
||||
// check schema
|
||||
assert_eq!(output.schema[0].column_name, "commit_json");
|
||||
let type_id: i32 = ColumnDataType::Binary.into();
|
||||
let type_id: i32 = ColumnDataType::Json.into();
|
||||
assert_eq!(output.schema[0].datatype, type_id);
|
||||
|
||||
// check value
|
||||
@@ -160,7 +160,7 @@ transform:
|
||||
|
||||
// check schema
|
||||
assert_eq!(output.schema[0].column_name, "commit");
|
||||
let type_id: i32 = ColumnDataType::Binary.into();
|
||||
let type_id: i32 = ColumnDataType::Json.into();
|
||||
assert_eq!(output.schema[0].datatype, type_id);
|
||||
|
||||
// check value
|
||||
|
||||
@@ -664,6 +664,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
GreptimeProto {
|
||||
source: api::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -794,6 +801,8 @@ impl ErrorExt for Error {
|
||||
Suspended { .. } => StatusCode::Suspended,
|
||||
|
||||
MemoryLimitExceeded { .. } => StatusCode::RateLimited,
|
||||
|
||||
GreptimeProto { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod memory_limit;
|
||||
pub mod prom_query_gateway;
|
||||
pub mod region_server;
|
||||
|
||||
use std::any::Any;
|
||||
use std::net::SocketAddr;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -399,4 +400,8 @@ impl Server for GrpcServer {
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
self.bind_addr
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1285,6 +1285,10 @@ impl Server for HttpServer {
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
self.bind_addr
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -31,7 +31,7 @@ use axum_extra::TypedHeader;
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_query::{Output, OutputData};
|
||||
use common_telemetry::{debug, error, warn};
|
||||
use common_telemetry::{error, warn};
|
||||
use headers::ContentType;
|
||||
use lazy_static::lazy_static;
|
||||
use mime_guess::mime;
|
||||
@@ -738,11 +738,6 @@ pub async fn log_ingester(
|
||||
|
||||
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
|
||||
|
||||
debug!(
|
||||
"receiving logs: {:?}",
|
||||
serde_json::to_string(&value).unwrap()
|
||||
);
|
||||
|
||||
query_ctx.set_channel(Channel::Log);
|
||||
let query_ctx = Arc::new(query_ctx);
|
||||
|
||||
|
||||
@@ -152,7 +152,7 @@ pub async fn loki_ingest(
|
||||
rows.push(row);
|
||||
}
|
||||
|
||||
let schemas = schema_info.schema;
|
||||
let schemas = schema_info.column_schemas()?;
|
||||
// fill Null for missing values
|
||||
for row in rows.iter_mut() {
|
||||
row.resize(schemas.len(), GreptimeValue::default());
|
||||
@@ -746,13 +746,16 @@ fn process_labels(
|
||||
} else {
|
||||
// not exist
|
||||
// add schema and append to values
|
||||
schemas.push(ColumnSchema {
|
||||
column_name: k.clone(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Tag.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
});
|
||||
schemas.push(
|
||||
ColumnSchema {
|
||||
column_name: k.clone(),
|
||||
datatype: ColumnDataType::String.into(),
|
||||
semantic_type: SemanticType::Tag.into(),
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
column_indexer.insert(k, schemas.len() - 1);
|
||||
|
||||
row.push(GreptimeValue {
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -265,4 +266,8 @@ impl Server for MysqlServer {
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
self.bind_addr
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -381,6 +381,7 @@ fn extract_field_from_attr_and_combine_schema(
|
||||
|
||||
if let Some(index) = select_schema.index.get(key) {
|
||||
let column_schema = &select_schema.schema[*index];
|
||||
let column_schema: ColumnSchema = column_schema.clone().try_into()?;
|
||||
// datatype of the same column name should be the same
|
||||
ensure!(
|
||||
column_schema.datatype == schema.datatype,
|
||||
@@ -393,7 +394,7 @@ fn extract_field_from_attr_and_combine_schema(
|
||||
);
|
||||
extracted_values[*index] = value;
|
||||
} else {
|
||||
select_schema.schema.push(schema);
|
||||
select_schema.schema.push(schema.into());
|
||||
select_schema
|
||||
.index
|
||||
.insert(key.clone(), select_schema.schema.len() - 1);
|
||||
@@ -480,7 +481,7 @@ fn parse_export_logs_service_request_to_rows(
|
||||
let mut parse_ctx = ParseContext::new(select_info);
|
||||
let mut rows = parse_resource(&mut parse_ctx, request.resource_logs)?;
|
||||
|
||||
schemas.extend(parse_ctx.select_schema.schema);
|
||||
schemas.extend(parse_ctx.select_schema.column_schemas()?);
|
||||
|
||||
rows.iter_mut().for_each(|row| {
|
||||
row.values.resize(schemas.len(), GreptimeValue::default());
|
||||
|
||||
@@ -135,12 +135,18 @@ async fn run_custom_pipeline(
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
schema_info
|
||||
.schema
|
||||
.push(time_index_column_schema(ts_name, timeunit));
|
||||
.push(time_index_column_schema(ts_name, timeunit).into());
|
||||
|
||||
schema_info
|
||||
}
|
||||
};
|
||||
|
||||
let table = handler
|
||||
.get_table(&table_name, query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
schema_info.set_table(table);
|
||||
|
||||
for pipeline_map in pipeline_maps {
|
||||
let result = pipeline
|
||||
.exec_mut(pipeline_map, pipeline_ctx, &mut schema_info)
|
||||
@@ -194,7 +200,7 @@ async fn run_custom_pipeline(
|
||||
RowInsertRequest {
|
||||
rows: Some(Rows {
|
||||
rows,
|
||||
schema: schema_info.schema.clone(),
|
||||
schema: schema_info.column_schemas()?,
|
||||
}),
|
||||
table_name: table_name.clone(),
|
||||
},
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::future::Future;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -144,4 +145,8 @@ impl Server for PostgresServer {
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
self.bind_addr
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::net::SocketAddr;
|
||||
@@ -147,6 +148,8 @@ pub trait Server: Send + Sync {
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
None
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
}
|
||||
|
||||
struct AcceptTask {
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
|
||||
| data | ts |
|
||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
|
||||
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 1970-01-01T00:00:00.001 |
|
||||
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 1970-01-01T00:00:00.002 |
|
||||
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 1970-01-01T00:00:00.003 |
|
||||
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 1970-01-01T00:00:00.004 |
|
||||
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 1970-01-01T00:00:00.005 |
|
||||
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 1970-01-01T00:00:00.006 |
|
||||
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 1970-01-01T00:00:00.007 |
|
||||
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 1970-01-01T00:00:00.008 |
|
||||
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n‘ It was nothing like that — . I was only thinking . . . ’\n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 1970-01-01T00:00:00.009 |
|
||||
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 1970-01-01T00:00:00.010 |
|
||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------+
|
||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|
||||
| data | time_us |
|
||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|
||||
| {_raw: {"commit":{"cid":"bafyreidblutgvj75o4q4akzyyejedjj6l3it6hgqwee6jpwv2wqph5fsgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.095Z","langs":["en"],"reply":{"parent":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"},"root":{"cid":"bafyreibfglofvqou2yiqvwzk4rcgkhhxrbunyemshdjledgwymimqkg24e","uri":"at://did:plc:6tr6tuzlx2db3rduzr2d6r24/app.bsky.feed.post/3lbhqo2rtys2z"}},"text":"aaaaah. LIght shines in a corner of WTF...."},"rev":"3lbhtytnn2k2f","rkey":"3lbhtyteurk2y"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:yj3sjq3blzpynh27cumnp5ks, kind: commit, time_us: 1732206349000167} | 2024-11-21T16:25:49.000167 |
|
||||
| {_raw: {"commit":{"cid":"bafyreia5l5vrkh5oj4cjyhcqby2dprhyvcyofo2q5562tijlae2pzih23m","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.221Z","subject":{"cid":"bafyreidjvrcmckkm765mct5fph36x7kupkfo35rjklbf2k76xkzwyiauge","uri":"at://did:plc:azrv4rcbws6kmcga4fsbphg2/app.bsky.feed.post/3lbgjdpbiec2l"}},"rev":"3lbhuvzds6d2a","rkey":"3lbhuvzdked2a"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:3i4xf2v4wcnyktgv6satke64, kind: commit, time_us: 1732206349000644} | 2024-11-21T16:25:49.000644 |
|
||||
| {_raw: {"commit":{"cid":"bafyreiew2p6cgirfaj45qoenm4fgumib7xoloclrap3jgkz5es7g7kby3i","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:27:40.923Z","subject":"did:plc:r7cdh4sgzqbfdc6wcdxxti7c"},"rev":"3lbhuvze3gi2u","rkey":"3lbhuvzdtmi2u"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:gccfnqqizz4urhchsaie6jft, kind: commit, time_us: 1732206349001108} | 2024-11-21T16:25:49.001108 |
|
||||
| {_raw: {"commit":{"cid":"bafyreidjix4dauj2afjlbzmhj3a7gwftcevvmmy6edww6vrjdbst26rkby","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:15:58.232Z","subject":{"cid":"bafyreiavpshyqzrlo5m7fqodjhs6jevweqnif4phasiwimv4a7mnsqi2fe","uri":"at://did:plc:fusulxqc52zbrc75fi6xrcof/app.bsky.feed.post/3lbhskq5zn22f"}},"rev":"3lbhueija5p22","rkey":"3lbhueiizcx22"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:msxqf3twq7abtdw7dbfskphk, kind: commit, time_us: 1732206349001372} | 2024-11-21T16:25:49.001372 |
|
||||
| {_raw: {"commit":{"cid":"bafyreigzdsdne3z2xxcakgisieyj7y47hj6eg7lj6v4q25ah5q2qotu5ku","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:09:27.254Z","langs":["en"],"reply":{"parent":{"cid":"bafyreih35fe2jj3gchmgk4amold4l6sfxd2sby5wrg3jrws5fkdypxrbg4","uri":"at://did:plc:6wx2gg5yqgvmlu35r6y3bk6d/app.bsky.feed.post/3lbhtj2eb4s2o"},"root":{"cid":"bafyreifipyt3vctd4ptuoicvio7rbr5xvjv4afwuggnd2prnmn55mu6luu","uri":"at://did:plc:474ldquxwzrlcvjhhbbk2wte/app.bsky.feed.post/3lbhdzrynik27"}},"text":"okay i take mine back because I hadn’t heard this one yet^^"},"rev":"3lbhtytohxc2o","rkey":"3lbhtytjqzk2q"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:l5o3qjrmfztir54cpwlv2eme, kind: commit, time_us: 1732206349001905} | 2024-11-21T16:25:49.001905 |
|
||||
| {_raw: {"commit":{"cid":"bafyreiaa2vsdr4ckwjg4jq47zfd7mewidywfz3qh3dmglcd6ozi4xwdega","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:15:21.495Z","subject":"did:plc:amsdn2tbjxo3xrwqneqhh4cm"},"rev":"3lbhudfo3yi2w","rkey":"3lbhudfnw4y2w"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:jkaaf5j2yb2pvpx3ualm3vbh, kind: commit, time_us: 1732206349002758} | 2024-11-21T16:25:49.002758 |
|
||||
| {_raw: {"commit":{"cid":"bafyreihaatlpar3abtx6ck3kde2ksic6zzflk4ppduhf6dxurytqrv33ni","record":{"$type":"app.bsky.graph.follow","createdAt":"2024-11-21T16:18:39.913Z","subject":"did:plc:gf3vum7insztt5rxrpxdz2id"},"rev":"3lbhujcp4ix2n","rkey":"3lbhujcoxmp2n"}}, commit.collection: app.bsky.graph.follow, commit.operation: create, did: did:plc:tdwz2h4id5dxezvohftsmffu, kind: commit, time_us: 1732206349003106} | 2024-11-21T16:25:49.003106 |
|
||||
| {_raw: {"commit":{"cid":"bafyreid5ycocp5zq2g7fcx2xxzxrbafuh7b5qhtwuwiomzo6vqila2cbpu","record":{"$type":"app.bsky.feed.repost","createdAt":"2024-11-21T16:23:36.714Z","subject":{"cid":"bafyreieaacfiobnuqvjhhsndyi5s3fd6krbzdduxsyrzfv43kczpcmkl6y","uri":"at://did:plc:o5q6dynpme4ndolc3heztasm/app.bsky.feed.post/3lbfli3qsoc2o"}},"rev":"3lbhus5vior2t","rkey":"3lbhus5vbtz2t"}}, commit.collection: app.bsky.feed.repost, commit.operation: create, did: did:plc:cdsd346mwow7aj3tgfkwsct3, kind: commit, time_us: 1732206349003461} | 2024-11-21T16:25:49.003461 |
|
||||
| {_raw: {"commit":{"cid":"bafyreibugobcike72y4zxvdyz2oopyt6ywwqfielcwojkb27p7s6rlomgm","record":{"$type":"app.bsky.feed.post","createdAt":"2024-11-21T16:25:44.376Z","langs":["en"],"reply":{"parent":{"cid":"bafyreiaev27cfcxxvn2pdhrwwquzwgclujnulzbcfnn4p4fwgb6migjhw4","uri":"at://did:plc:zec6cslvgc3hhdatrhk6pq5p/app.bsky.feed.post/3lbhujvds4c2b"},"root":{"cid":"bafyreif7qjxhvecwnhlynijj6pf47jwvtkahsz3zh2kaipwu2bw2dxwaqq","uri":"at://did:plc:s4bwqchfzm6gjqfeb6mexgbu/app.bsky.feed.post/3lbhug53kkk2m"}},"text":"\n⌜ Blinking. She hadn't realized she spoke out loud. ⌟\n\n‘ It was nothing like that — . I was only thinking . . . ’\n\n⌜ Trailing off, her mind occupied. ⌟\n"},"rev":"3lbhuvzeccx2w","rkey":"3lbhuvxf4qs2m"}}, commit.collection: app.bsky.feed.post, commit.operation: create, did: did:plc:s4bwqchfzm6gjqfeb6mexgbu, kind: commit, time_us: 1732206349003907} | 2024-11-21T16:25:49.003907 |
|
||||
| {_raw: {"commit":{"cid":"bafyreidjk2svg2fdjiiwohmfmvp3hdxhpb33ycnixzbkyib5m6cocindxq","record":{"$type":"app.bsky.feed.like","createdAt":"2024-11-21T16:25:46.167Z","subject":{"cid":"bafyreiaumopip75nzx2xjbugtwemdppsyx54bd2odf6q45f3o7xkocgari","uri":"at://did:plc:ig2jv6gqup4t7gdq2pmanknw/app.bsky.feed.post/3lbhuvtlaec2c"}},"rev":"3lbhuvzedg52j","rkey":"3lbhuvzdyof2j"}}, commit.collection: app.bsky.feed.like, commit.operation: create, did: did:plc:hbc74dlsxhq53kp5oxges6d7, kind: commit, time_us: 1732206349004769} | 2024-11-21T16:25:49.004769 |
|
||||
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+
|
||||
|
||||
@@ -48,9 +48,9 @@ use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
|
||||
use frontend::frontend::Frontend;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::instance::{Instance, StandaloneDatanodeManager};
|
||||
use frontend::server::Services;
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::server::ServerHandlers;
|
||||
use snafu::ResultExt;
|
||||
use standalone::options::StandaloneOptions;
|
||||
|
||||
@@ -249,7 +249,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
procedure_executor.clone(),
|
||||
Arc::new(ProcessManager::new(server_addr, None)),
|
||||
)
|
||||
.with_plugin(plugins)
|
||||
.with_plugin(plugins.clone())
|
||||
.try_build()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -282,14 +282,15 @@ impl GreptimeDbStandaloneBuilder {
|
||||
|
||||
test_util::prepare_another_catalog_and_schema(&instance).await;
|
||||
|
||||
let mut frontend = Frontend {
|
||||
let servers = Services::new(opts.clone(), instance.clone(), plugins)
|
||||
.build()
|
||||
.unwrap();
|
||||
let frontend = Frontend {
|
||||
instance,
|
||||
servers: ServerHandlers::default(),
|
||||
servers,
|
||||
heartbeat_task: None,
|
||||
};
|
||||
|
||||
frontend.start().await.unwrap();
|
||||
|
||||
GreptimeDbStandalone {
|
||||
frontend: Arc::new(frontend),
|
||||
opts,
|
||||
|
||||
@@ -18,14 +18,114 @@ use std::{fs, io};
|
||||
|
||||
use common_test_util::find_workspace_path;
|
||||
use frontend::instance::Instance;
|
||||
use http::StatusCode;
|
||||
use servers::http::test_helpers::TestClient;
|
||||
use servers::http::{HTTP_SERVER, HttpServer};
|
||||
use servers::server::ServerHandlers;
|
||||
use tests_integration::standalone::GreptimeDbStandaloneBuilder;
|
||||
use tests_integration::test_util::execute_sql_and_expect;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_load_jsonbench_data() {
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_load_jsonbench_data_by_pipeline() -> io::Result<()> {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data")
|
||||
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_pipeline")
|
||||
.build()
|
||||
.await;
|
||||
let frontend = instance.fe_instance();
|
||||
|
||||
let ServerHandlers::Init(handlers) = instance.frontend.server_handlers() else {
|
||||
unreachable!()
|
||||
};
|
||||
let router = {
|
||||
let handlers = handlers.lock().unwrap();
|
||||
let server = handlers
|
||||
.get(HTTP_SERVER)
|
||||
.and_then(|x| x.0.as_any().downcast_ref::<HttpServer>())
|
||||
.unwrap();
|
||||
server.build(server.make_app()).unwrap()
|
||||
};
|
||||
let client = TestClient::new(router).await;
|
||||
|
||||
create_table(frontend).await;
|
||||
|
||||
desc_table(frontend).await;
|
||||
|
||||
create_pipeline(&client).await;
|
||||
|
||||
insert_data_by_pipeline(&client).await?;
|
||||
|
||||
query_data(frontend).await
|
||||
}
|
||||
|
||||
async fn insert_data_by_pipeline(client: &TestClient) -> io::Result<()> {
|
||||
let file = fs::read(find_workspace_path(
|
||||
"tests-integration/resources/jsonbench-head-10.ndjson",
|
||||
))?;
|
||||
|
||||
let response = client
|
||||
.post("/v1/ingest?table=bluesky&pipeline_name=jsonbench")
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(file)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
let response = response.text().await;
|
||||
// Note that this pattern also matches the inserted rows: "10".
|
||||
let pattern = r#"{"output":[{"affectedrows":10}]"#;
|
||||
assert!(response.starts_with(pattern));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_pipeline(client: &TestClient) {
|
||||
let pipeline = r#"
|
||||
version: 2
|
||||
|
||||
processors:
|
||||
- json_parse:
|
||||
fields:
|
||||
- message, data
|
||||
ignore_missing: true
|
||||
- simple_extract:
|
||||
fields:
|
||||
- data, time_us
|
||||
key: "time_us"
|
||||
ignore_missing: false
|
||||
- epoch:
|
||||
fields:
|
||||
- time_us
|
||||
resolution: microsecond
|
||||
- select:
|
||||
fields:
|
||||
- time_us
|
||||
- data
|
||||
|
||||
transform:
|
||||
- fields:
|
||||
- time_us
|
||||
type: epoch, us
|
||||
index: timestamp
|
||||
"#;
|
||||
|
||||
let response = client
|
||||
.post("/v1/pipelines/jsonbench")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(pipeline)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
let response = response.text().await;
|
||||
let pattern = r#"{"pipelines":[{"name":"jsonbench""#;
|
||||
assert!(response.starts_with(pattern));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn test_load_jsonbench_data_by_sql() -> io::Result<()> {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let instance = GreptimeDbStandaloneBuilder::new("test_load_jsonbench_data_by_sql")
|
||||
.build()
|
||||
.await;
|
||||
let frontend = instance.fe_instance();
|
||||
@@ -34,9 +134,9 @@ async fn test_load_jsonbench_data() {
|
||||
|
||||
desc_table(frontend).await;
|
||||
|
||||
insert_data(frontend).await.unwrap();
|
||||
insert_data_by_sql(frontend).await?;
|
||||
|
||||
query_data(frontend).await.unwrap();
|
||||
query_data(frontend).await
|
||||
}
|
||||
|
||||
async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
@@ -46,22 +146,21 @@ async fn query_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
| count(*) |
|
||||
+----------+
|
||||
| 10 |
|
||||
+----------+
|
||||
"#;
|
||||
+----------+"#;
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
let sql = "SELECT * FROM bluesky ORDER BY ts";
|
||||
let sql = "SELECT * FROM bluesky ORDER BY time_us";
|
||||
let expected = fs::read_to_string(find_workspace_path(
|
||||
"tests-integration/resources/jsonbench-select-all.txt",
|
||||
))?;
|
||||
execute_sql_and_expect(frontend, sql, &expected).await;
|
||||
|
||||
// query 1:
|
||||
let sql = "\
|
||||
SELECT \
|
||||
json_get_string(data, '$.commit.collection') AS event, count() AS count \
|
||||
FROM bluesky \
|
||||
GROUP BY event \
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.commit.collection') AS event, count() AS count
|
||||
FROM bluesky
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC";
|
||||
let expected = r#"
|
||||
+-----------------------+-------+
|
||||
@@ -75,16 +174,16 @@ ORDER BY count DESC, event ASC";
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
// query 2:
|
||||
let sql = "\
|
||||
SELECT \
|
||||
json_get_string(data, '$.commit.collection') AS event, \
|
||||
count() AS count, \
|
||||
count(DISTINCT json_get_string(data, '$.did')) AS users \
|
||||
FROM bluesky \
|
||||
WHERE \
|
||||
(json_get_string(data, '$.kind') = 'commit') AND \
|
||||
(json_get_string(data, '$.commit.operation') = 'create') \
|
||||
GROUP BY event \
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.commit.collection') AS event,
|
||||
count() AS count,
|
||||
count(DISTINCT json_get_string(data, '$.did')) AS users
|
||||
FROM bluesky
|
||||
WHERE
|
||||
(json_get_string(data, '$.kind') = 'commit') AND
|
||||
(json_get_string(data, '$.commit.operation') = 'create')
|
||||
GROUP BY event
|
||||
ORDER BY count DESC, event ASC";
|
||||
let expected = r#"
|
||||
+-----------------------+-------+-------+
|
||||
@@ -98,18 +197,18 @@ ORDER BY count DESC, event ASC";
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
// query 3:
|
||||
let sql = "\
|
||||
SELECT \
|
||||
json_get_string(data, '$.commit.collection') AS event, \
|
||||
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day, \
|
||||
count() AS count \
|
||||
FROM bluesky \
|
||||
WHERE \
|
||||
(json_get_string(data, '$.kind') = 'commit') AND \
|
||||
(json_get_string(data, '$.commit.operation') = 'create') AND \
|
||||
json_get_string(data, '$.commit.collection') IN \
|
||||
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like') \
|
||||
GROUP BY event, hour_of_day \
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.commit.collection') AS event,
|
||||
date_part('hour', to_timestamp_micros(json_get_int(data, '$.time_us'))) as hour_of_day,
|
||||
count() AS count
|
||||
FROM bluesky
|
||||
WHERE
|
||||
(json_get_string(data, '$.kind') = 'commit') AND
|
||||
(json_get_string(data, '$.commit.operation') = 'create') AND
|
||||
json_get_string(data, '$.commit.collection') IN
|
||||
('app.bsky.feed.post', 'app.bsky.feed.repost', 'app.bsky.feed.like')
|
||||
GROUP BY event, hour_of_day
|
||||
ORDER BY hour_of_day, event";
|
||||
let expected = r#"
|
||||
+----------------------+-------------+-------+
|
||||
@@ -122,7 +221,7 @@ ORDER BY hour_of_day, event";
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
|
||||
// query 4:
|
||||
let sql = "\
|
||||
let sql = "
|
||||
SELECT
|
||||
json_get_string(data, '$.did') as user_id,
|
||||
min(to_timestamp_micros(json_get_int(data, '$.time_us'))) AS first_post_ts
|
||||
@@ -174,19 +273,23 @@ LIMIT 3";
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
async fn insert_data_by_sql(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
let file = fs::File::open(find_workspace_path(
|
||||
"tests-integration/resources/jsonbench-head-10.ndjson",
|
||||
))?;
|
||||
let reader = io::BufReader::new(file);
|
||||
for (i, line) in reader.lines().enumerate() {
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
if line.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let json: serde_json::Value = serde_json::from_str(&line)?;
|
||||
let time_us = json.pointer("/time_us").and_then(|x| x.as_u64()).unwrap();
|
||||
|
||||
let sql = format!(
|
||||
"INSERT INTO bluesky (ts, data) VALUES ({}, '{}')",
|
||||
i + 1,
|
||||
"INSERT INTO bluesky (time_us, data) VALUES ({}, '{}')",
|
||||
time_us,
|
||||
line.replace("'", "''"), // standard method to escape the single quote
|
||||
);
|
||||
execute_sql_and_expect(frontend, &sql, "Affected Rows: 1").await;
|
||||
@@ -197,12 +300,12 @@ async fn insert_data(frontend: &Arc<Instance>) -> io::Result<()> {
|
||||
async fn desc_table(frontend: &Arc<Instance>) {
|
||||
let sql = "DESC TABLE bluesky";
|
||||
let expected = r#"
|
||||
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| data | Json<Object{"_raw": String, "commit.collection": String, "commit.operation": String, "did": String, "kind": String, "time_us": Number(I64)}> | | YES | | FIELD |
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
+--------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
|
||||
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| data | Json<{"_raw":"<String>","commit.collection":"<String>","commit.operation":"<String>","did":"<String>","kind":"<String>","time_us":"<Number>"}> | | YES | | FIELD |
|
||||
| time_us | TimestampMicrosecond | PRI | NO | | TIMESTAMP |
|
||||
+---------+------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+"#;
|
||||
execute_sql_and_expect(frontend, sql, expected).await;
|
||||
}
|
||||
|
||||
@@ -219,7 +322,7 @@ CREATE TABLE bluesky (
|
||||
time_us Bigint
|
||||
>,
|
||||
),
|
||||
ts Timestamp TIME INDEX,
|
||||
time_us TimestampMicrosecond TIME INDEX,
|
||||
)
|
||||
"#;
|
||||
execute_sql_and_expect(frontend, sql, "Affected Rows: 0").await;
|
||||
|
||||
@@ -12,7 +12,7 @@ DESC TABLE t;
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<Null> | | YES | | FIELD |
|
||||
| j | Json<"<Null>"> | | YES | | FIELD |
|
||||
+--------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES
|
||||
@@ -24,12 +24,12 @@ Affected Rows: 3
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<Object{"int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String}, "b": Object{"y": Number(I64)}}}> | | YES | | FIELD |
|
||||
+--------+-----------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<{"int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>"},"b":{"y":"<Number>"}}}> | | YES | | FIELD |
|
||||
+--------+---------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES
|
||||
(1762128004000, '{"int": 4, "bool": true, "nested": {"a": {"y": 1}}}'),
|
||||
@@ -39,12 +39,12 @@ Affected Rows: 2
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<Object{"bool": Bool, "int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String, "y": Number(I64)}, "b": Object{"x": String, "y": Number(I64)}}}> | | YES | | FIELD |
|
||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES (1762128006000, '{"int": 6, "list": [-6.0], "bool": true, "nested": {"a": {"x": "ax", "y": 66}, "b": {"y": -66, "x": "bx"}}}');
|
||||
|
||||
@@ -52,12 +52,12 @@ Affected Rows: 1
|
||||
|
||||
DESC TABLE t;
|
||||
|
||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<Object{"bool": Bool, "int": Number(I64), "list": Array[Number(F64)], "nested": Object{"a": Object{"x": String, "y": Number(I64)}, "b": Object{"x": String, "y": Number(I64)}}}> | | YES | | FIELD |
|
||||
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
| ts | TimestampMillisecond | PRI | NO | | TIMESTAMP |
|
||||
| j | Json<{"bool":"<Bool>","int":"<Number>","list":["<Number>"],"nested":{"a":{"x":"<String>","y":"<Number>"},"b":{"x":"<String>","y":"<Number>"}}}> | | YES | | FIELD |
|
||||
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------+-----+------+---------+---------------+
|
||||
|
||||
INSERT INTO t VALUES (1762128011000, '{}');
|
||||
|
||||
|
||||
Reference in New Issue
Block a user