chore: add json write (#4744)

* chore: add json write

* chore: add test for write json log api

* chore: enhancement of Error Handling

* chore: fix by pr comment

* chore: fix by pr comment

* chore: enhancement of error content and add some doc
This commit is contained in:
localhost
2024-10-08 20:11:09 +08:00
committed by GitHub
parent 2cdd103874
commit 71a66d15f7
8 changed files with 505 additions and 91 deletions

1
Cargo.lock generated
View File

@@ -8141,6 +8141,7 @@ dependencies = [
"futures",
"greptime-proto",
"itertools 0.10.5",
"jsonb",
"lazy_static",
"moka",
"once_cell",

View File

@@ -40,6 +40,7 @@ enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true
itertools.workspace = true
jsonb.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["sync"] }
once_cell.workspace = true

View File

@@ -40,7 +40,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("{processor} processor: missing field: {field}"))]
#[snafu(display("Processor {processor}: missing field: {field}"))]
ProcessorMissingField {
processor: String,
field: String,
@@ -48,7 +48,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("{processor} processor: expect string value, but got {v:?}"))]
#[snafu(display("Processor {processor}: expect string value, but got {v:?}"))]
ProcessorExpectString {
processor: String,
v: crate::etl::Value,
@@ -56,7 +56,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("{processor} processor: unsupported value {val}"))]
#[snafu(display("Processor {processor}: unsupported value {val}"))]
ProcessorUnsupportedValue {
processor: &'static str,
val: String,
@@ -64,13 +64,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("processor key must be a string"))]
#[snafu(display("Processor key must be a string"))]
ProcessorKeyMustBeString {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{kind} processor: failed to parse {value}"))]
#[snafu(display("Processor {kind}: failed to parse {value}"))]
ProcessorFailedToParseString {
kind: String,
value: String,
@@ -78,13 +78,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("processor must have a string key"))]
#[snafu(display("Processor must have a string key"))]
ProcessorMustHaveStringKey {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported {processor} processor"))]
#[snafu(display("Unsupported {processor} processor"))]
UnsupportedProcessor {
processor: String,
#[snafu(implicit)]
@@ -108,7 +108,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("failed to parse {key} as int: {value}"))]
#[snafu(display("Failed to parse {key} as int: {value}"))]
FailedToParseIntKey {
key: String,
value: String,
@@ -126,7 +126,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {key} as float: {value}"))]
#[snafu(display("Failed to parse {key} as float: {value}"))]
FailedToParseFloatKey {
key: String,
value: String,
@@ -136,7 +136,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("{kind} processor.{key} not found in intermediate keys"))]
#[snafu(display("Processor {kind}: {key} not found in intermediate keys"))]
IntermediateKeyIndex {
kind: String,
key: String,
@@ -144,41 +144,41 @@ pub enum Error {
location: Location,
},
#[snafu(display("{k} missing value in {s}"))]
#[snafu(display("Cmcd {k} missing value in {s}"))]
CmcdMissingValue {
k: String,
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{part} missing key in {s}"))]
#[snafu(display("Part: {part} missing key in {s}"))]
CmcdMissingKey {
part: String,
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("key must be a string, but got {k:?}"))]
#[snafu(display("Key must be a string, but got {k:?}"))]
KeyMustBeString {
k: yaml_rust::Yaml,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("csv read error"))]
#[snafu(display("Csv read error"))]
CsvRead {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: csv::Error,
},
#[snafu(display("expected at least one record from csv format, but got none"))]
#[snafu(display("Expected at least one record from csv format, but got none"))]
CsvNoRecord {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{separator}' must be a single character, but got '{value}'"))]
#[snafu(display("Separator '{separator}' must be a single character, but got '{value}'"))]
CsvSeparatorName {
separator: &'static str,
value: String,
@@ -186,7 +186,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("'{quote}' must be a single character, but got '{value}'"))]
#[snafu(display("Quote '{quote}' must be a single character, but got '{value}'"))]
CsvQuoteName {
quote: &'static str,
value: String,
@@ -212,19 +212,19 @@ pub enum Error {
location: Location,
},
#[snafu(display("failed to get local timezone"))]
#[snafu(display("Failed to get local timezone"))]
DateFailedToGetLocalTimezone {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to get timestamp"))]
#[snafu(display("Failed to get timestamp"))]
DateFailedToGetTimestamp {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{processor} processor: invalid format {s}"))]
#[snafu(display("Processor {processor}: invalid format {s}"))]
DateInvalidFormat {
s: String,
processor: String,
@@ -245,20 +245,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{split}' exceeds the input"))]
#[snafu(display("Split: '{split}' exceeds the input"))]
DissectSplitExceedsInput {
split: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{split}' does not match the input '{input}'"))]
#[snafu(display("Split: '{split}' does not match the input '{input}'"))]
DissectSplitNotMatchInput {
split: String,
input: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("consecutive names are not allowed: '{name1}' '{name2}'"))]
#[snafu(display("Consecutive names are not allowed: '{name1}' '{name2}'"))]
DissectConsecutiveNames {
name1: String,
name2: String,
@@ -270,7 +270,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("'{m}' modifier already set, but found {modifier}"))]
#[snafu(display("Modifier '{m}' already set, but found {modifier}"))]
DissectModifierAlreadySet {
m: String,
modifier: String,
@@ -304,23 +304,23 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid resolution: {resolution}"))]
#[snafu(display("Invalid resolution: {resolution}"))]
EpochInvalidResolution {
resolution: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("pattern is required"))]
#[snafu(display("Pattern is required"))]
GsubPatternRequired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("replacement is required"))]
#[snafu(display("Replacement is required"))]
GsubReplacementRequired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid regex pattern: {pattern}"))]
#[snafu(display("Invalid regex pattern: {pattern}"))]
Regex {
#[snafu(source)]
error: regex::Error,
@@ -328,72 +328,72 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("separator is required"))]
#[snafu(display("Separator is required"))]
JoinSeparatorRequired {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid method: {method}"))]
#[snafu(display("Invalid method: {method}"))]
LetterInvalidMethod {
method: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("no named group found in regex {origin}"))]
#[snafu(display("No named group found in regex {origin}"))]
RegexNamedGroupNotFound {
origin: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("no valid field found in {processor} processor"))]
#[snafu(display("No valid field found in {processor} processor"))]
RegexNoValidField {
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("no valid pattern found in {processor} processor"))]
#[snafu(display("No valid pattern found in {processor} processor"))]
RegexNoValidPattern {
processor: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid method: {s}"))]
#[snafu(display("Invalid method: {s}"))]
UrlEncodingInvalidMethod {
s: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("url decoding error"))]
#[snafu(display("Url decoding error"))]
UrlEncodingDecode {
#[snafu(source)]
error: std::string::FromUtf8Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("invalid transform on_failure value: {value}"))]
#[snafu(display("Invalid transform on_failure value: {value}"))]
TransformOnFailureInvalidValue {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform element must be a map"))]
#[snafu(display("Transform element must be a map"))]
TransformElementMustBeMap {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform {fields:?} type MUST BE set before default {default}"))]
#[snafu(display("Transform {fields:?} type MUST BE set before default {default}"))]
TransformTypeMustBeSet {
fields: String,
default: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform cannot be empty"))]
#[snafu(display("Transform cannot be empty"))]
TransformEmpty {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("column name must be unique, but got duplicated: {duplicates}"))]
#[snafu(display("Column name must be unique, but got duplicated: {duplicates}"))]
TransformColumnNameMustBeUnique {
duplicates: String,
#[snafu(implicit)]
@@ -407,7 +407,7 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"))]
#[snafu(display("Transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"))]
TransformTimestampIndexCount {
count: usize,
columns: String,
@@ -425,13 +425,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("{ty} value not supported for Epoch"))]
#[snafu(display("Type: {ty} value not supported for Epoch"))]
CoerceUnsupportedEpochType {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to coerce string value '{s}' to type '{ty}'"))]
#[snafu(display("Failed to coerce string value '{s}' to type '{ty}'"))]
CoerceStringToType {
s: String,
ty: String,
@@ -440,7 +440,7 @@ pub enum Error {
},
#[snafu(display(
"invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
"Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
))]
ValueInvalidResolution {
resolution: String,
@@ -449,14 +449,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("failed to parse type: '{t}'"))]
#[snafu(display("Failed to parse type: '{t}'"))]
ValueParseType {
t: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to parse {ty}: {v}"))]
#[snafu(display("Failed to parse {ty}: {v}"))]
ValueParseInt {
ty: String,
v: String,
@@ -466,7 +466,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("failed to parse {ty}: {v}"))]
#[snafu(display("Failed to parse {ty}: {v}"))]
ValueParseFloat {
ty: String,
v: String,
@@ -476,7 +476,7 @@ pub enum Error {
location: Location,
},
#[snafu(display("failed to parse {ty}: {v}"))]
#[snafu(display("Failed to parse {ty}: {v}"))]
ValueParseBoolean {
ty: String,
v: String,
@@ -485,19 +485,19 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("default value not unsupported for type {value}"))]
#[snafu(display("Default value not unsupported for type {value}"))]
ValueDefaultValueUnsupported {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported number type: {value}"))]
#[snafu(display("Unsupported number type: {value}"))]
ValueUnsupportedNumberType {
value: serde_json::Number,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported yaml type: {value:?}"))]
#[snafu(display("Unsupported yaml type: {value:?}"))]
ValueUnsupportedYamlType {
value: yaml_rust::Yaml,
#[snafu(implicit)]
@@ -531,12 +531,26 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("unsupported index type: {value}"))]
#[snafu(display("Unsupported index type: {value}"))]
UnsupportedIndexType {
value: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported number type: {value:?}"))]
UnsupportedNumberType {
value: serde_json::Number,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Column datatype mismatch. For column: {column}, expected datatype: {expected}, actual datatype: {actual}"))]
IdentifyPipelineColumnTypeMismatch {
column: String,
expected: String,
actual: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -13,3 +13,4 @@
// limitations under the License.
pub mod greptime;
pub use greptime::identity_pipeline;

View File

@@ -16,13 +16,20 @@ pub mod coerce;
use std::collections::HashSet;
use ahash::HashMap;
use api::helper::proto_value_type;
use api::v1::column_data_type_extension::TypeExt;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnDataTypeExtension, JsonTypeExtension, SemanticType};
use coerce::{coerce_columns, coerce_value};
use greptime_proto::v1::{ColumnSchema, Row, Rows, Value as GreptimeValue};
use itertools::Itertools;
use serde_json::{Map, Number};
use crate::etl::error::{
Result, TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
IdentifyPipelineColumnTypeMismatchSnafu, Result, TransformColumnNameMustBeUniqueSnafu,
TransformEmptySnafu, TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
UnsupportedNumberTypeSnafu,
};
use crate::etl::field::{InputFieldInfo, OneInputOneOutputField};
use crate::etl::transform::index::Index;
@@ -120,6 +127,7 @@ impl Transformer for GreptimeTransformer {
if let Some(idx) = transform.index {
if idx == Index::Time {
match transform.real_fields.len() {
//Safety unwrap is fine here because we have checked the length of real_fields
1 => timestamp_columns
.push(transform.real_fields.first().unwrap().input_name()),
_ => {
@@ -194,3 +202,304 @@ impl Transformer for GreptimeTransformer {
&mut self.transforms
}
}
/// This is used to record the current state schema information and a sequential cache of field names.
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
struct SchemaInfo {
/// schema info
schema: Vec<ColumnSchema>,
/// index of the column name
index: HashMap<String, usize>,
}
fn resolve_schema(
index: Option<usize>,
value_data: ValueData,
column_schema: ColumnSchema,
row: &mut Vec<GreptimeValue>,
schema_info: &mut SchemaInfo,
) -> Result<()> {
if let Some(index) = index {
let api_value = GreptimeValue {
value_data: Some(value_data),
};
// Safety unwrap is fine here because api_value is always valid
let value_column_data_type = proto_value_type(&api_value).unwrap();
// Safety unwrap is fine here because index is always valid
let schema_column_data_type = schema_info.schema.get(index).unwrap().datatype();
if value_column_data_type != schema_column_data_type {
IdentifyPipelineColumnTypeMismatchSnafu {
column: column_schema.column_name,
expected: schema_column_data_type.as_str_name(),
actual: value_column_data_type.as_str_name(),
}
.fail()
} else {
row[index] = api_value;
Ok(())
}
} else {
let key = column_schema.column_name.clone();
schema_info.schema.push(column_schema);
schema_info.index.insert(key, schema_info.schema.len() - 1);
let api_value = GreptimeValue {
value_data: Some(value_data),
};
row.push(api_value);
Ok(())
}
}
fn resolve_number_schema(
n: Number,
column_name: String,
index: Option<usize>,
row: &mut Vec<GreptimeValue>,
schema_info: &mut SchemaInfo,
) -> Result<()> {
let (value, datatype, semantic_type) = if n.is_i64() {
(
ValueData::I64Value(n.as_i64().unwrap()),
ColumnDataType::Int64 as i32,
SemanticType::Field as i32,
)
} else if n.is_u64() {
(
ValueData::U64Value(n.as_u64().unwrap()),
ColumnDataType::Uint64 as i32,
SemanticType::Field as i32,
)
} else if n.is_f64() {
(
ValueData::F64Value(n.as_f64().unwrap()),
ColumnDataType::Float64 as i32,
SemanticType::Field as i32,
)
} else {
return UnsupportedNumberTypeSnafu { value: n }.fail();
};
resolve_schema(
index,
value,
ColumnSchema {
column_name,
datatype,
semantic_type,
datatype_extension: None,
options: None,
},
row,
schema_info,
)
}
fn json_value_to_row(
schema_info: &mut SchemaInfo,
map: Map<String, serde_json::Value>,
) -> Result<Row> {
let mut row: Vec<GreptimeValue> = Vec::with_capacity(schema_info.schema.len());
for _ in 0..schema_info.schema.len() {
row.push(GreptimeValue { value_data: None });
}
for (column_name, value) in map {
if column_name == DEFAULT_GREPTIME_TIMESTAMP_COLUMN {
continue;
}
let index = schema_info.index.get(&column_name).copied();
match value {
serde_json::Value::Null => {
// do nothing
}
serde_json::Value::String(s) => {
resolve_schema(
index,
ValueData::StringValue(s),
ColumnSchema {
column_name,
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
)?;
}
serde_json::Value::Bool(b) => {
resolve_schema(
index,
ValueData::BoolValue(b),
ColumnSchema {
column_name,
datatype: ColumnDataType::Boolean as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
},
&mut row,
schema_info,
)?;
}
serde_json::Value::Number(n) => {
resolve_number_schema(n, column_name, index, &mut row, schema_info)?;
}
serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
resolve_schema(
index,
ValueData::BinaryValue(jsonb::Value::from(value).to_vec()),
ColumnSchema {
column_name,
datatype: ColumnDataType::Binary as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: Some(ColumnDataTypeExtension {
type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
}),
options: None,
},
&mut row,
schema_info,
)?;
}
}
}
Ok(Row { values: row })
}
/// Identity pipeline for Greptime
/// This pipeline will convert the input JSON array to Greptime Rows
/// 1. The pipeline will add a default timestamp column to the schema
/// 2. The pipeline not resolve NULL value
/// 3. The pipeline assumes that the json format is fixed
/// 4. The pipeline will return an error if the same column datatype is mismatched
/// 5. The pipeline will analyze the schema of each json record and merge them to get the final schema.
pub fn identity_pipeline(array: Vec<serde_json::Value>) -> Result<Rows> {
let mut rows = Vec::with_capacity(array.len());
let mut schema = SchemaInfo::default();
for value in array {
if let serde_json::Value::Object(map) = value {
let row = json_value_to_row(&mut schema, map)?;
rows.push(row);
}
}
let greptime_timestamp_schema = ColumnSchema {
column_name: DEFAULT_GREPTIME_TIMESTAMP_COLUMN.to_string(),
datatype: ColumnDataType::TimestampNanosecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
};
let ns = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let ts = GreptimeValue {
value_data: Some(ValueData::TimestampNanosecondValue(ns)),
};
let column_count = schema.schema.len();
for row in rows.iter_mut() {
let diff = column_count - row.values.len();
for _ in 0..diff {
row.values.push(GreptimeValue { value_data: None });
}
row.values.push(ts.clone());
}
schema.schema.push(greptime_timestamp_schema);
Ok(Rows {
schema: schema.schema,
rows,
})
}
#[cfg(test)]
mod tests {
use crate::identity_pipeline;
#[test]
fn test_identify_pipeline() {
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": "88.5",
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: STRING".to_string(),
);
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
assert!(rows.is_err());
assert_eq!(
rows.err().unwrap().to_string(),
"Column datatype mismatch. For column: score, expected datatype: FLOAT64, actual datatype: INT64".to_string(),
);
}
{
let array = vec![
serde_json::json!({
"woshinull": null,
"name": "Alice",
"age": 20,
"is_student": true,
"score": 99.5,
"hobbies": "reading",
"address": "Beijing",
}),
serde_json::json!({
"name": "Bob",
"age": 21,
"is_student": false,
"score": 88.5,
"hobbies": "swimming",
"address": "Shanghai",
"gaga": "gaga"
}),
];
let rows = identity_pipeline(array);
assert!(rows.is_ok());
let rows = rows.unwrap();
assert_eq!(rows.schema.len(), 8);
assert_eq!(rows.rows.len(), 2);
assert_eq!(8, rows.rows[0].values.len());
assert_eq!(8, rows.rows[1].values.len());
}
}
}

View File

@@ -18,6 +18,7 @@ mod metrics;
pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{parse, Content, Pipeline};

View File

@@ -50,6 +50,9 @@ use crate::metrics::{
};
use crate::query_handler::LogHandlerRef;
const GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX: &str = "greptime_";
const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table: Option<String>,
@@ -121,6 +124,12 @@ pub async fn add_pipeline(
reason: "pipeline_name is required in path",
}
);
ensure!(
!pipeline_name.starts_with(GREPTIME_INTERNAL_PIPELINE_NAME_PREFIX),
InvalidParameterSnafu {
reason: "pipeline_name cannot start with greptime_",
}
);
ensure!(
!payload.is_empty(),
InvalidParameterSnafu {
@@ -425,47 +434,54 @@ async fn ingest_logs_inner(
let db = query_ctx.get_db_string();
let exec_timer = std::time::Instant::now();
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
let transform_timer = std::time::Instant::now();
let mut intermediate_state = pipeline.init_intermediate_state();
let mut results = Vec::with_capacity(pipeline_data.len());
let transformed_data: Rows;
if pipeline_name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
let rows = pipeline::identity_pipeline(pipeline_data)
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
transformed_data = rows;
} else {
let pipeline = state
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
for v in pipeline_data {
pipeline
.prepare(v, &mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
let transform_timer = std::time::Instant::now();
let mut intermediate_state = pipeline.init_intermediate_state();
for v in pipeline_data {
pipeline
.prepare(v, &mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
let r = pipeline
.exec_mut(&mut intermediate_state)
.inspect_err(|_| {
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
})
.context(PipelineTransformSnafu)
.context(PipelineSnafu)?;
results.push(r);
pipeline.reset_intermediate_state(&mut intermediate_state);
}
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
transformed_data = Rows {
rows: results,
schema: pipeline.schemas().clone(),
};
}
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
.with_label_values(&[db.as_str(), METRIC_SUCCESS_VALUE])
.observe(transform_timer.elapsed().as_secs_f64());
let transformed_data: Rows = Rows {
rows: results,
schema: pipeline.schemas().clone(),
};
let insert_request = RowInsertRequest {
rows: Some(transformed_data),
table_name: table_name.clone(),

View File

@@ -86,6 +86,7 @@ macro_rules! http_tests {
test_pipeline_api,
test_test_pipeline_api,
test_plain_text_ingestion,
test_identify_pipeline,
test_otlp_metrics,
test_otlp_traces,
@@ -1076,6 +1077,21 @@ transform:
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/greptime_guagua")
.header("Content-Type", "application/x-yaml")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
assert_eq!(
res.json::<serde_json::Value>().await["error"]
.as_str()
.unwrap(),
"Invalid request parameter: pipeline_name cannot start with greptime_"
);
let res = client
.post("/v1/events/pipelines/test")
.header("Content-Type", "application/x-yaml")
@@ -1161,6 +1177,61 @@ transform:
guard.remove_all().await;
}
pub async fn test_identify_pipeline(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;
// handshake
let client = TestClient::new(app);
let body = r#"{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java"}
{"__time__":1453809242,"__topic__":"","__source__":"10.170.***.***","ip":"10.200.**.***","time":"26/Jan/2016:19:54:02 +0800","url":"POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","status":"200","user-agent":"aliyun-sdk-java","hasagei":"hasagei","dongdongdong":"guaguagua"}"#;
let res = client
.post("/v1/events/logs?db=public&table=logs&pipeline_name=greptime_identity")
.header("Content-Type", "application/json")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: serde_json::Value = res.json().await;
assert!(body.get("execution_time_ms").unwrap().is_number());
assert_eq!(body["output"][0]["affectedrows"], 2);
let res = client.get("/v1/sql?sql=select * from logs").send().await;
assert_eq!(res.status(), StatusCode::OK);
let line1_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java","guaguagua","hasagei",null]"#;
let line2_expected = r#"["10.170.***.***",1453809242,"","10.200.**.***","200","26/Jan/2016:19:54:02 +0800","POST/PutData?Category=YunOsAccountOpLog&AccessKeyId=<yourAccessKeyId>&Date=Fri%2C%2028%20Jun%202013%2006%3A53%3A30%20GMT&Topic=raw&Signature=<yourSignature>HTTP/1.1","aliyun-sdk-java",null,null,null]"#;
let res = client.get("/v1/sql?sql=select * from logs").send().await;
assert_eq!(res.status(), StatusCode::OK);
let resp: serde_json::Value = res.json().await;
let result = resp["output"][0]["records"]["rows"].as_array().unwrap();
assert_eq!(result.len(), 2);
let mut line1 = result[0].as_array().unwrap().clone();
let mut line2 = result[1].as_array().unwrap().clone();
assert!(line1.last().unwrap().is_i64());
assert!(line2.last().unwrap().is_i64());
*line1.last_mut().unwrap() = serde_json::Value::Null;
*line2.last_mut().unwrap() = serde_json::Value::Null;
assert_eq!(
line1,
serde_json::from_str::<Vec<Value>>(line1_expected).unwrap()
);
assert_eq!(
line2,
serde_json::from_str::<Vec<Value>>(line2_expected).unwrap()
);
let expected = r#"[["__source__","String","","YES","","FIELD"],["__time__","Int64","","YES","","FIELD"],["__topic__","String","","YES","","FIELD"],["ip","String","","YES","","FIELD"],["status","String","","YES","","FIELD"],["time","String","","YES","","FIELD"],["url","String","","YES","","FIELD"],["user-agent","String","","YES","","FIELD"],["dongdongdong","String","","YES","","FIELD"],["hasagei","String","","YES","","FIELD"],["greptime_timestamp","TimestampNanosecond","PRI","NO","","TIMESTAMP"]]"#;
validate_data(&client, "desc logs", expected).await;
guard.remove_all().await;
}
pub async fn test_test_pipeline_api(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await;