mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-25 17:30:41 +00:00
feat(pipeline): auto transform (#6013)
* feat: support auto transform * refactor: replace hashbrown with ahash * refactor: params of run identity pipeline * refactor: minor update * test: add test for auto transform * chore: fix cr issues
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -10904,7 +10904,6 @@ dependencies = [
|
||||
"derive_builder 0.20.1",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"hashbrown 0.15.2",
|
||||
"headers",
|
||||
"hostname",
|
||||
"http 1.1.0",
|
||||
|
||||
@@ -13,7 +13,7 @@ default = ["geo"]
|
||||
geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"]
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.8"
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
arc-swap = "1.0"
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -10,7 +10,7 @@ license.workspace = true
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.8"
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
arrow.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -395,17 +395,19 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Transform cannot be empty"))]
|
||||
TransformEmpty {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Column name must be unique, but got duplicated: {duplicates}"))]
|
||||
TransformColumnNameMustBeUnique {
|
||||
duplicates: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display(
|
||||
"At least one timestamp-related processor is required to use auto transform"
|
||||
))]
|
||||
TransformNoTimestampProcessor {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display(
|
||||
"Illegal to set multiple timestamp Index columns, please set only one: {columns}"
|
||||
))]
|
||||
@@ -421,6 +423,11 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Exactly one timestamp value is required to use auto transform"))]
|
||||
AutoTransformOneTimestamp {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
#[snafu(display("Null type not supported"))]
|
||||
CoerceUnsupportedNullType {
|
||||
#[snafu(implicit)]
|
||||
@@ -793,10 +800,11 @@ impl ErrorExt for Error {
|
||||
| TransformOnFailureInvalidValue { .. }
|
||||
| TransformElementMustBeMap { .. }
|
||||
| TransformTypeMustBeSet { .. }
|
||||
| TransformEmpty { .. }
|
||||
| TransformColumnNameMustBeUnique { .. }
|
||||
| TransformMultipleTimestampIndex { .. }
|
||||
| TransformNoTimestampProcessor { .. }
|
||||
| TransformTimestampIndexCount { .. }
|
||||
| AutoTransformOneTimestamp { .. }
|
||||
| CoerceUnsupportedNullType { .. }
|
||||
| CoerceUnsupportedNullTypeTo { .. }
|
||||
| CoerceUnsupportedEpochType { .. }
|
||||
|
||||
@@ -18,7 +18,9 @@ pub mod processor;
|
||||
pub mod transform;
|
||||
pub mod value;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::Row;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use processor::{Processor, Processors};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use transform::Transforms;
|
||||
@@ -27,8 +29,10 @@ use yaml_rust::YamlLoader;
|
||||
|
||||
use crate::dispatcher::{Dispatcher, Rule};
|
||||
use crate::error::{
|
||||
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
|
||||
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result,
|
||||
TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu,
|
||||
};
|
||||
use crate::etl::processor::ProcessorKind;
|
||||
use crate::tablesuffix::TableSuffixTemplate;
|
||||
use crate::GreptimeTransformer;
|
||||
|
||||
@@ -70,7 +74,25 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
|
||||
Transforms::default()
|
||||
};
|
||||
|
||||
let transformer = GreptimeTransformer::new(transformers)?;
|
||||
let transformer = if transformers.is_empty() {
|
||||
// use auto transform
|
||||
// check processors have at least one timestamp-related processor
|
||||
let cnt = processors
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
matches!(
|
||||
p,
|
||||
ProcessorKind::Date(_)
|
||||
| ProcessorKind::Timestamp(_)
|
||||
| ProcessorKind::Epoch(_)
|
||||
)
|
||||
})
|
||||
.count();
|
||||
ensure!(cnt > 0, TransformNoTimestampProcessorSnafu);
|
||||
None
|
||||
} else {
|
||||
Some(GreptimeTransformer::new(transformers)?)
|
||||
};
|
||||
|
||||
let dispatcher = if !doc[DISPATCHER].is_badvalue() {
|
||||
Some(Dispatcher::try_from(&doc[DISPATCHER])?)
|
||||
@@ -101,7 +123,7 @@ pub struct Pipeline {
|
||||
description: Option<String>,
|
||||
processors: processor::Processors,
|
||||
dispatcher: Option<Dispatcher>,
|
||||
transformer: GreptimeTransformer,
|
||||
transformer: Option<GreptimeTransformer>,
|
||||
tablesuffix: Option<TableSuffixTemplate>,
|
||||
}
|
||||
|
||||
@@ -132,6 +154,8 @@ impl DispatchedTo {
|
||||
#[derive(Debug)]
|
||||
pub enum PipelineExecOutput {
|
||||
Transformed((Row, Option<String>)),
|
||||
// table_suffix, ts_key -> unit
|
||||
AutoTransform(Option<String>, HashMap<String, TimeUnit>),
|
||||
DispatchedTo(DispatchedTo),
|
||||
}
|
||||
|
||||
@@ -199,25 +223,35 @@ impl Pipeline {
|
||||
return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
|
||||
}
|
||||
|
||||
// transform
|
||||
let row = self.transformer.transform_mut(val)?;
|
||||
|
||||
// generate table name
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
|
||||
Ok(PipelineExecOutput::Transformed((row, table_suffix)))
|
||||
if let Some(transformer) = self.transformer() {
|
||||
let row = transformer.transform_mut(val)?;
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
Ok(PipelineExecOutput::Transformed((row, table_suffix)))
|
||||
} else {
|
||||
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
|
||||
let mut ts_unit_map = HashMap::with_capacity(4);
|
||||
// get all ts values
|
||||
for (k, v) in val {
|
||||
if let Value::Timestamp(ts) = v {
|
||||
if !ts_unit_map.contains_key(k) {
|
||||
ts_unit_map.insert(k.clone(), ts.get_unit());
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(PipelineExecOutput::AutoTransform(table_suffix, ts_unit_map))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn processors(&self) -> &processor::Processors {
|
||||
&self.processors
|
||||
}
|
||||
|
||||
pub fn transformer(&self) -> &GreptimeTransformer {
|
||||
&self.transformer
|
||||
pub fn transformer(&self) -> Option<&GreptimeTransformer> {
|
||||
self.transformer.as_ref()
|
||||
}
|
||||
|
||||
pub fn schemas(&self) -> &Vec<greptime_proto::v1::ColumnSchema> {
|
||||
self.transformer.schemas()
|
||||
pub fn schemas(&self) -> Option<&Vec<greptime_proto::v1::ColumnSchema>> {
|
||||
self.transformer.as_ref().map(|t| t.schemas())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,7 +349,7 @@ transform:
|
||||
.unwrap()
|
||||
.into_transformed()
|
||||
.unwrap();
|
||||
let sechema = pipeline.schemas();
|
||||
let sechema = pipeline.schemas().unwrap();
|
||||
|
||||
assert_eq!(sechema.len(), result.0.values.len());
|
||||
let test = vec![
|
||||
@@ -427,7 +461,7 @@ transform:
|
||||
"#;
|
||||
|
||||
let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap();
|
||||
let schema = pipeline.schemas().clone();
|
||||
let schema = pipeline.schemas().unwrap().clone();
|
||||
let mut result = json_to_map(input_value).unwrap();
|
||||
|
||||
let row = pipeline
|
||||
|
||||
@@ -30,16 +30,15 @@ use serde_json::Number;
|
||||
|
||||
use crate::error::{
|
||||
IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result,
|
||||
TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu,
|
||||
TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu,
|
||||
UnsupportedNumberTypeSnafu,
|
||||
TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu,
|
||||
TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu,
|
||||
};
|
||||
use crate::etl::field::{Field, Fields};
|
||||
use crate::etl::transform::index::Index;
|
||||
use crate::etl::transform::{Transform, Transforms};
|
||||
use crate::etl::value::{Timestamp, Value};
|
||||
use crate::etl::PipelineMap;
|
||||
use crate::IdentityTimeIndex;
|
||||
use crate::{IdentityTimeIndex, PipelineContext};
|
||||
|
||||
const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp";
|
||||
const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10;
|
||||
@@ -124,10 +123,7 @@ impl GreptimeTransformer {
|
||||
|
||||
impl GreptimeTransformer {
|
||||
pub fn new(mut transforms: Transforms) -> Result<Self> {
|
||||
if transforms.is_empty() {
|
||||
return TransformEmptySnafu.fail();
|
||||
}
|
||||
|
||||
// empty check is done in the caller
|
||||
let mut column_names_set = HashSet::new();
|
||||
let mut timestamp_columns = vec![];
|
||||
|
||||
@@ -491,11 +487,12 @@ fn resolve_value(
|
||||
}
|
||||
|
||||
fn identity_pipeline_inner(
|
||||
array: Vec<PipelineMap>,
|
||||
custom_ts: Option<&IdentityTimeIndex>,
|
||||
pipeline_maps: Vec<PipelineMap>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<(SchemaInfo, Vec<Row>)> {
|
||||
let mut rows = Vec::with_capacity(array.len());
|
||||
let mut rows = Vec::with_capacity(pipeline_maps.len());
|
||||
let mut schema_info = SchemaInfo::default();
|
||||
let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts();
|
||||
|
||||
// set time index column schema first
|
||||
schema_info.schema.push(ColumnSchema {
|
||||
@@ -510,7 +507,7 @@ fn identity_pipeline_inner(
|
||||
options: None,
|
||||
});
|
||||
|
||||
for values in array {
|
||||
for values in pipeline_maps {
|
||||
let row = values_to_row(&mut schema_info, values, custom_ts)?;
|
||||
rows.push(row);
|
||||
}
|
||||
@@ -537,10 +534,9 @@ fn identity_pipeline_inner(
|
||||
pub fn identity_pipeline(
|
||||
array: Vec<PipelineMap>,
|
||||
table: Option<Arc<table::Table>>,
|
||||
params: &GreptimePipelineParams,
|
||||
custom_ts: Option<&IdentityTimeIndex>,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
) -> Result<Rows> {
|
||||
let input = if params.flatten_json_object() {
|
||||
let input = if pipeline_ctx.pipeline_param.flatten_json_object() {
|
||||
array
|
||||
.into_iter()
|
||||
.map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING))
|
||||
@@ -549,7 +545,7 @@ pub fn identity_pipeline(
|
||||
array
|
||||
};
|
||||
|
||||
identity_pipeline_inner(input, custom_ts).map(|(mut schema, rows)| {
|
||||
identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, rows)| {
|
||||
if let Some(table) = table {
|
||||
let table_info = table.table_info();
|
||||
for tag_name in table_info.meta.row_key_column_names() {
|
||||
@@ -621,10 +617,13 @@ mod tests {
|
||||
|
||||
use super::*;
|
||||
use crate::etl::{json_array_to_map, json_to_map};
|
||||
use crate::identity_pipeline;
|
||||
use crate::{identity_pipeline, PipelineDefinition};
|
||||
|
||||
#[test]
|
||||
fn test_identify_pipeline() {
|
||||
let params = GreptimePipelineParams::default();
|
||||
let pipeline_ctx =
|
||||
PipelineContext::new(&PipelineDefinition::GreptimeIdentityPipeline(None), ¶ms);
|
||||
{
|
||||
let array = vec![
|
||||
serde_json::json!({
|
||||
@@ -647,7 +646,7 @@ mod tests {
|
||||
}),
|
||||
];
|
||||
let array = json_array_to_map(array).unwrap();
|
||||
let rows = identity_pipeline(array, None, &GreptimePipelineParams::default(), None);
|
||||
let rows = identity_pipeline(array, None, &pipeline_ctx);
|
||||
assert!(rows.is_err());
|
||||
assert_eq!(
|
||||
rows.err().unwrap().to_string(),
|
||||
@@ -675,12 +674,7 @@ mod tests {
|
||||
"gaga": "gaga"
|
||||
}),
|
||||
];
|
||||
let rows = identity_pipeline(
|
||||
json_array_to_map(array).unwrap(),
|
||||
None,
|
||||
&GreptimePipelineParams::default(),
|
||||
None,
|
||||
);
|
||||
let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
|
||||
assert!(rows.is_err());
|
||||
assert_eq!(
|
||||
rows.err().unwrap().to_string(),
|
||||
@@ -708,12 +702,7 @@ mod tests {
|
||||
"gaga": "gaga"
|
||||
}),
|
||||
];
|
||||
let rows = identity_pipeline(
|
||||
json_array_to_map(array).unwrap(),
|
||||
None,
|
||||
&GreptimePipelineParams::default(),
|
||||
None,
|
||||
);
|
||||
let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx);
|
||||
assert!(rows.is_ok());
|
||||
let rows = rows.unwrap();
|
||||
assert_eq!(rows.schema.len(), 8);
|
||||
@@ -744,8 +733,8 @@ mod tests {
|
||||
];
|
||||
let tag_column_names = ["name".to_string(), "address".to_string()];
|
||||
|
||||
let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), None).map(
|
||||
|(mut schema, rows)| {
|
||||
let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx)
|
||||
.map(|(mut schema, rows)| {
|
||||
for name in tag_column_names {
|
||||
if let Some(index) = schema.index.get(&name) {
|
||||
schema.schema[*index].semantic_type = SemanticType::Tag as i32;
|
||||
@@ -755,8 +744,7 @@ mod tests {
|
||||
schema: schema.schema,
|
||||
rows,
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
assert!(rows.is_ok());
|
||||
let rows = rows.unwrap();
|
||||
|
||||
@@ -95,6 +95,15 @@ impl Timestamp {
|
||||
TimeUnit::Nanosecond => self.timestamp_nanos(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_unit(&self) -> TimeUnit {
|
||||
match self {
|
||||
Timestamp::Nanosecond(_) => TimeUnit::Nanosecond,
|
||||
Timestamp::Microsecond(_) => TimeUnit::Microsecond,
|
||||
Timestamp::Millisecond(_) => TimeUnit::Millisecond,
|
||||
Timestamp::Second(_) => TimeUnit::Second,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Timestamp {
|
||||
|
||||
@@ -102,6 +102,18 @@ impl PipelineDefinition {
|
||||
Ok(Self::ByNameAndValue((name.to_owned(), version)))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_identity(&self) -> bool {
|
||||
matches!(self, Self::GreptimeIdentityPipeline(_))
|
||||
}
|
||||
|
||||
pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
|
||||
if let Self::GreptimeIdentityPipeline(custom_ts) = self {
|
||||
custom_ts.as_ref()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PipelineContext<'a> {
|
||||
|
||||
@@ -22,7 +22,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
|
||||
let yaml_content = Content::Yaml(pipeline_yaml);
|
||||
let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline");
|
||||
|
||||
let schema = pipeline.schemas().clone();
|
||||
let schema = pipeline.schemas().unwrap().clone();
|
||||
|
||||
let mut rows = Vec::new();
|
||||
|
||||
|
||||
@@ -428,7 +428,7 @@ transform:
|
||||
.expect("expect transformed result ");
|
||||
|
||||
let output = Rows {
|
||||
schema: pipeline.schemas().clone(),
|
||||
schema: pipeline.schemas().unwrap().clone(),
|
||||
rows: vec![row.0],
|
||||
};
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ workspace = true
|
||||
local-ip-address.workspace = true
|
||||
|
||||
[dependencies]
|
||||
ahash = "0.8"
|
||||
ahash.workspace = true
|
||||
api.workspace = true
|
||||
arrow.workspace = true
|
||||
arrow-flight.workspace = true
|
||||
@@ -58,7 +58,6 @@ datatypes.workspace = true
|
||||
derive_builder.workspace = true
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
hashbrown = "0.15"
|
||||
headers = "0.4"
|
||||
hostname = "0.3"
|
||||
http.workspace = true
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
|
||||
@@ -28,7 +29,6 @@ use bytes::Bytes;
|
||||
use common_query::prelude::GREPTIME_TIMESTAMP;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_telemetry::{error, warn};
|
||||
use hashbrown::HashMap;
|
||||
use headers::ContentType;
|
||||
use jsonb::Value;
|
||||
use lazy_static::lazy_static;
|
||||
|
||||
@@ -15,14 +15,16 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use ahash::{HashMap, HashMapExt};
|
||||
use api::v1::{RowInsertRequest, Rows};
|
||||
use hashbrown::HashMap;
|
||||
use itertools::Itertools;
|
||||
use pipeline::error::AutoTransformOneTimestampSnafu;
|
||||
use pipeline::{
|
||||
DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineContext,
|
||||
PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, PipelineDefinition,
|
||||
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
|
||||
};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
|
||||
use crate::http::event::PipelineIngestRequest;
|
||||
@@ -31,6 +33,14 @@ use crate::metrics::{
|
||||
};
|
||||
use crate::query_handler::PipelineHandlerRef;
|
||||
|
||||
macro_rules! push_to_map {
|
||||
($map:expr, $key:expr, $value:expr, $capacity:expr) => {
|
||||
$map.entry($key)
|
||||
.or_insert_with(|| Vec::with_capacity($capacity))
|
||||
.push($value);
|
||||
};
|
||||
}
|
||||
|
||||
/// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline
|
||||
pub async fn get_pipeline(
|
||||
pipeline_def: &PipelineDefinition,
|
||||
@@ -57,27 +67,16 @@ pub(crate) async fn run_pipeline(
|
||||
query_ctx: &QueryContextRef,
|
||||
is_top_level: bool,
|
||||
) -> Result<Vec<RowInsertRequest>> {
|
||||
match &pipeline_ctx.pipeline_definition {
|
||||
PipelineDefinition::GreptimeIdentityPipeline(custom_ts) => {
|
||||
run_identity_pipeline(
|
||||
handler,
|
||||
custom_ts.as_ref(),
|
||||
pipeline_ctx.pipeline_param,
|
||||
pipeline_req,
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
_ => {
|
||||
run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
|
||||
}
|
||||
if pipeline_ctx.pipeline_definition.is_identity() {
|
||||
run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await
|
||||
} else {
|
||||
run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_identity_pipeline(
|
||||
handler: &PipelineHandlerRef,
|
||||
custom_ts: Option<&IdentityTimeIndex>,
|
||||
pipeline_parameters: &GreptimePipelineParams,
|
||||
pipeline_ctx: &PipelineContext<'_>,
|
||||
pipeline_req: PipelineIngestRequest,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<Vec<RowInsertRequest>> {
|
||||
@@ -89,7 +88,7 @@ async fn run_identity_pipeline(
|
||||
.get_table(&table_name, query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
pipeline::identity_pipeline(data_array, table, pipeline_parameters, custom_ts)
|
||||
pipeline::identity_pipeline(data_array, table, pipeline_ctx)
|
||||
.map(|rows| {
|
||||
vec![RowInsertRequest {
|
||||
rows: Some(rows),
|
||||
@@ -113,15 +112,17 @@ async fn run_custom_pipeline(
|
||||
|
||||
let PipelineIngestRequest {
|
||||
table: table_name,
|
||||
values: data_array,
|
||||
values: pipeline_maps,
|
||||
} = pipeline_req;
|
||||
let arr_len = data_array.len();
|
||||
let mut req_map = HashMap::new();
|
||||
let arr_len = pipeline_maps.len();
|
||||
let mut transformed_map = HashMap::new();
|
||||
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
|
||||
let mut auto_map = HashMap::new();
|
||||
let mut auto_map_ts_keys = HashMap::new();
|
||||
|
||||
for mut values in data_array {
|
||||
for mut pipeline_map in pipeline_maps {
|
||||
let r = pipeline
|
||||
.exec_mut(&mut values)
|
||||
.exec_mut(&mut pipeline_map)
|
||||
.inspect_err(|_| {
|
||||
METRIC_HTTP_LOGS_TRANSFORM_ELAPSED
|
||||
.with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE])
|
||||
@@ -131,38 +132,76 @@ async fn run_custom_pipeline(
|
||||
|
||||
match r {
|
||||
PipelineExecOutput::Transformed((row, table_suffix)) => {
|
||||
let act_table_name = match table_suffix {
|
||||
Some(suffix) => format!("{}{}", table_name, suffix),
|
||||
None => table_name.clone(),
|
||||
};
|
||||
|
||||
req_map
|
||||
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
|
||||
push_to_map!(transformed_map, act_table_name, row, arr_len);
|
||||
}
|
||||
PipelineExecOutput::AutoTransform(table_suffix, ts_keys) => {
|
||||
let act_table_name = table_suffix_to_table_name(&table_name, table_suffix);
|
||||
push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len);
|
||||
auto_map_ts_keys
|
||||
.entry(act_table_name)
|
||||
.or_insert_with(|| Vec::with_capacity(arr_len))
|
||||
.push(row);
|
||||
.or_insert_with(HashMap::new)
|
||||
.extend(ts_keys);
|
||||
}
|
||||
PipelineExecOutput::DispatchedTo(dispatched_to) => {
|
||||
if let Some(coll) = dispatched.get_mut(&dispatched_to) {
|
||||
coll.push(values);
|
||||
} else {
|
||||
dispatched.insert(dispatched_to, vec![values]);
|
||||
}
|
||||
push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
// if current pipeline generates some transformed results, build it as
|
||||
// `RowInsertRequest` and append to results. If the pipeline doesn't
|
||||
// have dispatch, this will be only output of the pipeline.
|
||||
for (table_name, rows) in req_map {
|
||||
results.push(RowInsertRequest {
|
||||
rows: Some(Rows {
|
||||
rows,
|
||||
schema: pipeline.schemas().clone(),
|
||||
}),
|
||||
table_name,
|
||||
});
|
||||
|
||||
if let Some(s) = pipeline.schemas() {
|
||||
// transformed
|
||||
|
||||
// if current pipeline generates some transformed results, build it as
|
||||
// `RowInsertRequest` and append to results. If the pipeline doesn't
|
||||
// have dispatch, this will be only output of the pipeline.
|
||||
for (table_name, rows) in transformed_map {
|
||||
results.push(RowInsertRequest {
|
||||
rows: Some(Rows {
|
||||
rows,
|
||||
schema: s.clone(),
|
||||
}),
|
||||
table_name,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// auto map
|
||||
for (table_name, pipeline_maps) in auto_map {
|
||||
if pipeline_maps.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ts_unit_map = auto_map_ts_keys
|
||||
.remove(&table_name)
|
||||
.context(AutoTransformOneTimestampSnafu)
|
||||
.context(PipelineSnafu)?;
|
||||
// only one timestamp key is allowed
|
||||
// which will be converted to ts index
|
||||
let (ts_key, unit) = ts_unit_map
|
||||
.into_iter()
|
||||
.exactly_one()
|
||||
.map_err(|_| AutoTransformOneTimestampSnafu.build())
|
||||
.context(PipelineSnafu)?;
|
||||
|
||||
let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false);
|
||||
let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index));
|
||||
let next_pipeline_ctx = PipelineContext::new(&new_def, pipeline_ctx.pipeline_param);
|
||||
|
||||
let reqs = run_identity_pipeline(
|
||||
handler,
|
||||
&next_pipeline_ctx,
|
||||
PipelineIngestRequest {
|
||||
table: table_name,
|
||||
values: pipeline_maps,
|
||||
},
|
||||
query_ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
results.extend(reqs);
|
||||
}
|
||||
}
|
||||
|
||||
// if current pipeline contains dispatcher and has several rules, we may
|
||||
@@ -204,3 +243,11 @@ async fn run_custom_pipeline(
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn table_suffix_to_table_name(table_name: &String, table_suffix: Option<String>) -> String {
|
||||
match table_suffix {
|
||||
Some(suffix) => format!("{}{}", table_name, suffix),
|
||||
None => table_name.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,8 +12,10 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::string::ToString;
|
||||
|
||||
use ahash::HashMap;
|
||||
use api::prom_store::remote::Sample;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
@@ -21,8 +23,6 @@ use api::v1::{
|
||||
Value,
|
||||
};
|
||||
use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
|
||||
use hashbrown::hash_map::Entry;
|
||||
use hashbrown::HashMap;
|
||||
use prost::DecodeError;
|
||||
|
||||
use crate::proto::PromLabel;
|
||||
|
||||
@@ -96,6 +96,7 @@ macro_rules! http_tests {
|
||||
test_pipeline_api,
|
||||
test_test_pipeline_api,
|
||||
test_plain_text_ingestion,
|
||||
test_pipeline_auto_transform,
|
||||
test_identity_pipeline,
|
||||
test_identity_pipeline_with_flatten,
|
||||
test_identity_pipeline_with_custom_ts,
|
||||
@@ -2328,6 +2329,85 @@ transform:
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_pipeline_auto_transform(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (app, mut guard) =
|
||||
setup_test_http_app_with_frontend(store_type, "test_pipeline_auto_transform").await;
|
||||
|
||||
// handshake
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let body = r#"
|
||||
processors:
|
||||
- dissect:
|
||||
fields:
|
||||
- message
|
||||
patterns:
|
||||
- "%{+ts} %{+ts} %{http_status_code} %{content}"
|
||||
- date:
|
||||
fields:
|
||||
- ts
|
||||
formats:
|
||||
- "%Y-%m-%d %H:%M:%S%.3f"
|
||||
"#;
|
||||
|
||||
// 1. create pipeline
|
||||
let res = client
|
||||
.post("/v1/pipelines/test")
|
||||
.header("Content-Type", "application/x-yaml")
|
||||
.body(body)
|
||||
.send()
|
||||
.await;
|
||||
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
let content = res.text().await;
|
||||
|
||||
let content = serde_json::from_str(&content);
|
||||
assert!(content.is_ok());
|
||||
// {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]}
|
||||
let content: Value = content.unwrap();
|
||||
|
||||
let version_str = content
|
||||
.get("pipelines")
|
||||
.unwrap()
|
||||
.as_array()
|
||||
.unwrap()
|
||||
.first()
|
||||
.unwrap()
|
||||
.get("version")
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.unwrap()
|
||||
.to_string();
|
||||
assert!(!version_str.is_empty());
|
||||
|
||||
// 2. write data
|
||||
let data_body = r#"
|
||||
2024-05-25 20:16:37.217 404 hello
|
||||
2024-05-25 20:16:37.218 200 hello world
|
||||
"#;
|
||||
let res = client
|
||||
.post("/v1/ingest?db=public&table=logs1&pipeline_name=test")
|
||||
.header("Content-Type", "text/plain")
|
||||
.body(data_body)
|
||||
.send()
|
||||
.await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// 3. select data
|
||||
let expected = "[[1716668197217000000,\"hello\",\"404\",\"2024-05-25 20:16:37.217 404 hello\"],[1716668197218000000,\"hello world\",\"200\",\"2024-05-25 20:16:37.218 200 hello world\"]]";
|
||||
validate_data(
|
||||
"test_pipeline_auto_transform",
|
||||
&client,
|
||||
"select * from logs1",
|
||||
expected,
|
||||
)
|
||||
.await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_otlp_metrics(store_type: StorageType) {
|
||||
// init
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
Reference in New Issue
Block a user