diff --git a/Cargo.lock b/Cargo.lock index aaeced3556..5955a1cf59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10904,7 +10904,6 @@ dependencies = [ "derive_builder 0.20.1", "futures", "futures-util", - "hashbrown 0.15.2", "headers", "hostname", "http 1.1.0", diff --git a/src/common/function/Cargo.toml b/src/common/function/Cargo.toml index 73821a896a..292fc07cf0 100644 --- a/src/common/function/Cargo.toml +++ b/src/common/function/Cargo.toml @@ -13,7 +13,7 @@ default = ["geo"] geo = ["geohash", "h3o", "s2", "wkt", "geo-types", "dep:geo"] [dependencies] -ahash = "0.8" +ahash.workspace = true api.workspace = true arc-swap = "1.0" async-trait.workspace = true diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 32fa4eaa52..7a96ff82ce 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -10,7 +10,7 @@ license.workspace = true workspace = true [dependencies] -ahash = "0.8" +ahash.workspace = true api.workspace = true arrow.workspace = true async-trait.workspace = true diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 099d2b5100..72edfa8e03 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -395,17 +395,19 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("Transform cannot be empty"))] - TransformEmpty { - #[snafu(implicit)] - location: Location, - }, #[snafu(display("Column name must be unique, but got duplicated: {duplicates}"))] TransformColumnNameMustBeUnique { duplicates: String, #[snafu(implicit)] location: Location, }, + #[snafu(display( + "At least one timestamp-related processor is required to use auto transform" + ))] + TransformNoTimestampProcessor { + #[snafu(implicit)] + location: Location, + }, #[snafu(display( "Illegal to set multiple timestamp Index columns, please set only one: {columns}" ))] @@ -421,6 +423,11 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Exactly one timestamp value is required to use auto transform"))] + AutoTransformOneTimestamp { + #[snafu(implicit)] + location: Location, + }, #[snafu(display("Null type not supported"))] CoerceUnsupportedNullType { #[snafu(implicit)] @@ -793,10 +800,11 @@ impl ErrorExt for Error { | TransformOnFailureInvalidValue { .. } | TransformElementMustBeMap { .. } | TransformTypeMustBeSet { .. } - | TransformEmpty { .. } | TransformColumnNameMustBeUnique { .. } | TransformMultipleTimestampIndex { .. } + | TransformNoTimestampProcessor { .. } | TransformTimestampIndexCount { .. } + | AutoTransformOneTimestamp { .. } | CoerceUnsupportedNullType { .. } | CoerceUnsupportedNullTypeTo { .. } | CoerceUnsupportedEpochType { .. } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 676600d0a4..daa94c87bf 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -18,7 +18,9 @@ pub mod processor; pub mod transform; pub mod value; +use ahash::{HashMap, HashMapExt}; use api::v1::Row; +use common_time::timestamp::TimeUnit; use processor::{Processor, Processors}; use snafu::{ensure, OptionExt, ResultExt}; use transform::Transforms; @@ -27,8 +29,10 @@ use yaml_rust::YamlLoader; use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ - IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu, + IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, + TransformNoTimestampProcessorSnafu, YamlLoadSnafu, YamlParseSnafu, }; +use crate::etl::processor::ProcessorKind; use crate::tablesuffix::TableSuffixTemplate; use crate::GreptimeTransformer; @@ -70,7 +74,25 @@ pub fn parse(input: &Content) -> Result { Transforms::default() }; - let transformer = GreptimeTransformer::new(transformers)?; + let transformer = if transformers.is_empty() { + // use auto transform + // check processors have at least one timestamp-related processor + let cnt = processors + .iter() + .filter(|p| { + matches!( + p, + ProcessorKind::Date(_) + | ProcessorKind::Timestamp(_) + | ProcessorKind::Epoch(_) + ) + }) + .count(); + ensure!(cnt > 0, TransformNoTimestampProcessorSnafu); + None + } else { + Some(GreptimeTransformer::new(transformers)?) + }; let dispatcher = if !doc[DISPATCHER].is_badvalue() { Some(Dispatcher::try_from(&doc[DISPATCHER])?) @@ -101,7 +123,7 @@ pub struct Pipeline { description: Option, processors: processor::Processors, dispatcher: Option, - transformer: GreptimeTransformer, + transformer: Option, tablesuffix: Option, } @@ -132,6 +154,8 @@ impl DispatchedTo { #[derive(Debug)] pub enum PipelineExecOutput { Transformed((Row, Option)), + // table_suffix, ts_key -> unit + AutoTransform(Option, HashMap), DispatchedTo(DispatchedTo), } @@ -199,25 +223,35 @@ impl Pipeline { return Ok(PipelineExecOutput::DispatchedTo(rule.into())); } - // transform - let row = self.transformer.transform_mut(val)?; - - // generate table name - let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); - - Ok(PipelineExecOutput::Transformed((row, table_suffix))) + if let Some(transformer) = self.transformer() { + let row = transformer.transform_mut(val)?; + let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); + Ok(PipelineExecOutput::Transformed((row, table_suffix))) + } else { + let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); + let mut ts_unit_map = HashMap::with_capacity(4); + // get all ts values + for (k, v) in val { + if let Value::Timestamp(ts) = v { + if !ts_unit_map.contains_key(k) { + ts_unit_map.insert(k.clone(), ts.get_unit()); + } + } + } + Ok(PipelineExecOutput::AutoTransform(table_suffix, ts_unit_map)) + } } pub fn processors(&self) -> &processor::Processors { &self.processors } - pub fn transformer(&self) -> &GreptimeTransformer { - &self.transformer + pub fn transformer(&self) -> Option<&GreptimeTransformer> { + self.transformer.as_ref() } - pub fn schemas(&self) -> &Vec { - self.transformer.schemas() + pub fn schemas(&self) -> Option<&Vec> { + self.transformer.as_ref().map(|t| t.schemas()) } } @@ -315,7 +349,7 @@ transform: .unwrap() .into_transformed() .unwrap(); - let sechema = pipeline.schemas(); + let sechema = pipeline.schemas().unwrap(); assert_eq!(sechema.len(), result.0.values.len()); let test = vec![ @@ -427,7 +461,7 @@ transform: "#; let pipeline: Pipeline = parse(&Content::Yaml(pipeline_yaml)).unwrap(); - let schema = pipeline.schemas().clone(); + let schema = pipeline.schemas().unwrap().clone(); let mut result = json_to_map(input_value).unwrap(); let row = pipeline diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 033feda0c5..8cdd0268c9 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -30,16 +30,15 @@ use serde_json::Number; use crate::error::{ IdentifyPipelineColumnTypeMismatchSnafu, ReachedMaxNestedLevelsSnafu, Result, - TransformColumnNameMustBeUniqueSnafu, TransformEmptySnafu, - TransformMultipleTimestampIndexSnafu, TransformTimestampIndexCountSnafu, - UnsupportedNumberTypeSnafu, + TransformColumnNameMustBeUniqueSnafu, TransformMultipleTimestampIndexSnafu, + TransformTimestampIndexCountSnafu, UnsupportedNumberTypeSnafu, }; use crate::etl::field::{Field, Fields}; use crate::etl::transform::index::Index; use crate::etl::transform::{Transform, Transforms}; use crate::etl::value::{Timestamp, Value}; use crate::etl::PipelineMap; -use crate::IdentityTimeIndex; +use crate::{IdentityTimeIndex, PipelineContext}; const DEFAULT_GREPTIME_TIMESTAMP_COLUMN: &str = "greptime_timestamp"; const DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING: usize = 10; @@ -124,10 +123,7 @@ impl GreptimeTransformer { impl GreptimeTransformer { pub fn new(mut transforms: Transforms) -> Result { - if transforms.is_empty() { - return TransformEmptySnafu.fail(); - } - + // empty check is done in the caller let mut column_names_set = HashSet::new(); let mut timestamp_columns = vec![]; @@ -491,11 +487,12 @@ fn resolve_value( } fn identity_pipeline_inner( - array: Vec, - custom_ts: Option<&IdentityTimeIndex>, + pipeline_maps: Vec, + pipeline_ctx: &PipelineContext<'_>, ) -> Result<(SchemaInfo, Vec)> { - let mut rows = Vec::with_capacity(array.len()); + let mut rows = Vec::with_capacity(pipeline_maps.len()); let mut schema_info = SchemaInfo::default(); + let custom_ts = pipeline_ctx.pipeline_definition.get_custom_ts(); // set time index column schema first schema_info.schema.push(ColumnSchema { @@ -510,7 +507,7 @@ fn identity_pipeline_inner( options: None, }); - for values in array { + for values in pipeline_maps { let row = values_to_row(&mut schema_info, values, custom_ts)?; rows.push(row); } @@ -537,10 +534,9 @@ fn identity_pipeline_inner( pub fn identity_pipeline( array: Vec, table: Option>, - params: &GreptimePipelineParams, - custom_ts: Option<&IdentityTimeIndex>, + pipeline_ctx: &PipelineContext<'_>, ) -> Result { - let input = if params.flatten_json_object() { + let input = if pipeline_ctx.pipeline_param.flatten_json_object() { array .into_iter() .map(|item| flatten_object(item, DEFAULT_MAX_NESTED_LEVELS_FOR_JSON_FLATTENING)) @@ -549,7 +545,7 @@ pub fn identity_pipeline( array }; - identity_pipeline_inner(input, custom_ts).map(|(mut schema, rows)| { + identity_pipeline_inner(input, pipeline_ctx).map(|(mut schema, rows)| { if let Some(table) = table { let table_info = table.table_info(); for tag_name in table_info.meta.row_key_column_names() { @@ -621,10 +617,13 @@ mod tests { use super::*; use crate::etl::{json_array_to_map, json_to_map}; - use crate::identity_pipeline; + use crate::{identity_pipeline, PipelineDefinition}; #[test] fn test_identify_pipeline() { + let params = GreptimePipelineParams::default(); + let pipeline_ctx = + PipelineContext::new(&PipelineDefinition::GreptimeIdentityPipeline(None), ¶ms); { let array = vec![ serde_json::json!({ @@ -647,7 +646,7 @@ mod tests { }), ]; let array = json_array_to_map(array).unwrap(); - let rows = identity_pipeline(array, None, &GreptimePipelineParams::default(), None); + let rows = identity_pipeline(array, None, &pipeline_ctx); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -675,12 +674,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline( - json_array_to_map(array).unwrap(), - None, - &GreptimePipelineParams::default(), - None, - ); + let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx); assert!(rows.is_err()); assert_eq!( rows.err().unwrap().to_string(), @@ -708,12 +702,7 @@ mod tests { "gaga": "gaga" }), ]; - let rows = identity_pipeline( - json_array_to_map(array).unwrap(), - None, - &GreptimePipelineParams::default(), - None, - ); + let rows = identity_pipeline(json_array_to_map(array).unwrap(), None, &pipeline_ctx); assert!(rows.is_ok()); let rows = rows.unwrap(); assert_eq!(rows.schema.len(), 8); @@ -744,8 +733,8 @@ mod tests { ]; let tag_column_names = ["name".to_string(), "address".to_string()]; - let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), None).map( - |(mut schema, rows)| { + let rows = identity_pipeline_inner(json_array_to_map(array).unwrap(), &pipeline_ctx) + .map(|(mut schema, rows)| { for name in tag_column_names { if let Some(index) = schema.index.get(&name) { schema.schema[*index].semantic_type = SemanticType::Tag as i32; @@ -755,8 +744,7 @@ mod tests { schema: schema.schema, rows, } - }, - ); + }); assert!(rows.is_ok()); let rows = rows.unwrap(); diff --git a/src/pipeline/src/etl/value/time.rs b/src/pipeline/src/etl/value/time.rs index f6afe32b87..414b5558ca 100644 --- a/src/pipeline/src/etl/value/time.rs +++ b/src/pipeline/src/etl/value/time.rs @@ -95,6 +95,15 @@ impl Timestamp { TimeUnit::Nanosecond => self.timestamp_nanos(), } } + + pub fn get_unit(&self) -> TimeUnit { + match self { + Timestamp::Nanosecond(_) => TimeUnit::Nanosecond, + Timestamp::Microsecond(_) => TimeUnit::Microsecond, + Timestamp::Millisecond(_) => TimeUnit::Millisecond, + Timestamp::Second(_) => TimeUnit::Second, + } + } } impl Default for Timestamp { diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index 3928e00b35..b0aca8f8fb 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -102,6 +102,18 @@ impl PipelineDefinition { Ok(Self::ByNameAndValue((name.to_owned(), version))) } } + + pub fn is_identity(&self) -> bool { + matches!(self, Self::GreptimeIdentityPipeline(_)) + } + + pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> { + if let Self::GreptimeIdentityPipeline(custom_ts) = self { + custom_ts.as_ref() + } else { + None + } + } } pub struct PipelineContext<'a> { diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index 781f70f0d5..d2435b13f3 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -22,7 +22,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { let yaml_content = Content::Yaml(pipeline_yaml); let pipeline: Pipeline = parse(&yaml_content).expect("failed to parse pipeline"); - let schema = pipeline.schemas().clone(); + let schema = pipeline.schemas().unwrap().clone(); let mut rows = Vec::new(); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 1ac86864fb..fd64e60feb 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -428,7 +428,7 @@ transform: .expect("expect transformed result "); let output = Rows { - schema: pipeline.schemas().clone(), + schema: pipeline.schemas().unwrap().clone(), rows: vec![row.0], }; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index b29ff0bd40..3be6a59204 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -18,7 +18,7 @@ workspace = true local-ip-address.workspace = true [dependencies] -ahash = "0.8" +ahash.workspace = true api.workspace = true arrow.workspace = true arrow-flight.workspace = true @@ -58,7 +58,6 @@ datatypes.workspace = true derive_builder.workspace = true futures.workspace = true futures-util.workspace = true -hashbrown = "0.15" headers = "0.4" hostname = "0.3" http.workspace = true diff --git a/src/servers/src/http/loki.rs b/src/servers/src/http/loki.rs index f010b10c5d..7b5abf7857 100644 --- a/src/servers/src/http/loki.rs +++ b/src/servers/src/http/loki.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use std::time::Instant; +use ahash::{HashMap, HashMapExt}; use api::v1::value::ValueData; use api::v1::{ ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, @@ -28,7 +29,6 @@ use bytes::Bytes; use common_query::prelude::GREPTIME_TIMESTAMP; use common_query::{Output, OutputData}; use common_telemetry::{error, warn}; -use hashbrown::HashMap; use headers::ContentType; use jsonb::Value; use lazy_static::lazy_static; diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 2a7970e2f7..bc5aeec164 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -15,14 +15,16 @@ use std::collections::BTreeMap; use std::sync::Arc; +use ahash::{HashMap, HashMapExt}; use api::v1::{RowInsertRequest, Rows}; -use hashbrown::HashMap; +use itertools::Itertools; +use pipeline::error::AutoTransformOneTimestampSnafu; use pipeline::{ - DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineContext, - PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, + DispatchedTo, IdentityTimeIndex, Pipeline, PipelineContext, PipelineDefinition, + PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, }; use session::context::QueryContextRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::error::{CatalogSnafu, PipelineSnafu, Result}; use crate::http::event::PipelineIngestRequest; @@ -31,6 +33,14 @@ use crate::metrics::{ }; use crate::query_handler::PipelineHandlerRef; +macro_rules! push_to_map { + ($map:expr, $key:expr, $value:expr, $capacity:expr) => { + $map.entry($key) + .or_insert_with(|| Vec::with_capacity($capacity)) + .push($value); + }; +} + /// Never call this on `GreptimeIdentityPipeline` because it's a real pipeline pub async fn get_pipeline( pipeline_def: &PipelineDefinition, @@ -57,27 +67,16 @@ pub(crate) async fn run_pipeline( query_ctx: &QueryContextRef, is_top_level: bool, ) -> Result> { - match &pipeline_ctx.pipeline_definition { - PipelineDefinition::GreptimeIdentityPipeline(custom_ts) => { - run_identity_pipeline( - handler, - custom_ts.as_ref(), - pipeline_ctx.pipeline_param, - pipeline_req, - query_ctx, - ) - .await - } - _ => { - run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await - } + if pipeline_ctx.pipeline_definition.is_identity() { + run_identity_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx).await + } else { + run_custom_pipeline(handler, pipeline_ctx, pipeline_req, query_ctx, is_top_level).await } } async fn run_identity_pipeline( handler: &PipelineHandlerRef, - custom_ts: Option<&IdentityTimeIndex>, - pipeline_parameters: &GreptimePipelineParams, + pipeline_ctx: &PipelineContext<'_>, pipeline_req: PipelineIngestRequest, query_ctx: &QueryContextRef, ) -> Result> { @@ -89,7 +88,7 @@ async fn run_identity_pipeline( .get_table(&table_name, query_ctx) .await .context(CatalogSnafu)?; - pipeline::identity_pipeline(data_array, table, pipeline_parameters, custom_ts) + pipeline::identity_pipeline(data_array, table, pipeline_ctx) .map(|rows| { vec![RowInsertRequest { rows: Some(rows), @@ -113,15 +112,17 @@ async fn run_custom_pipeline( let PipelineIngestRequest { table: table_name, - values: data_array, + values: pipeline_maps, } = pipeline_req; - let arr_len = data_array.len(); - let mut req_map = HashMap::new(); + let arr_len = pipeline_maps.len(); + let mut transformed_map = HashMap::new(); let mut dispatched: BTreeMap> = BTreeMap::new(); + let mut auto_map = HashMap::new(); + let mut auto_map_ts_keys = HashMap::new(); - for mut values in data_array { + for mut pipeline_map in pipeline_maps { let r = pipeline - .exec_mut(&mut values) + .exec_mut(&mut pipeline_map) .inspect_err(|_| { METRIC_HTTP_LOGS_TRANSFORM_ELAPSED .with_label_values(&[db.as_str(), METRIC_FAILURE_VALUE]) @@ -131,38 +132,76 @@ async fn run_custom_pipeline( match r { PipelineExecOutput::Transformed((row, table_suffix)) => { - let act_table_name = match table_suffix { - Some(suffix) => format!("{}{}", table_name, suffix), - None => table_name.clone(), - }; - - req_map + let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); + push_to_map!(transformed_map, act_table_name, row, arr_len); + } + PipelineExecOutput::AutoTransform(table_suffix, ts_keys) => { + let act_table_name = table_suffix_to_table_name(&table_name, table_suffix); + push_to_map!(auto_map, act_table_name.clone(), pipeline_map, arr_len); + auto_map_ts_keys .entry(act_table_name) - .or_insert_with(|| Vec::with_capacity(arr_len)) - .push(row); + .or_insert_with(HashMap::new) + .extend(ts_keys); } PipelineExecOutput::DispatchedTo(dispatched_to) => { - if let Some(coll) = dispatched.get_mut(&dispatched_to) { - coll.push(values); - } else { - dispatched.insert(dispatched_to, vec![values]); - } + push_to_map!(dispatched, dispatched_to, pipeline_map, arr_len); } } } let mut results = Vec::new(); - // if current pipeline generates some transformed results, build it as - // `RowInsertRequest` and append to results. If the pipeline doesn't - // have dispatch, this will be only output of the pipeline. - for (table_name, rows) in req_map { - results.push(RowInsertRequest { - rows: Some(Rows { - rows, - schema: pipeline.schemas().clone(), - }), - table_name, - }); + + if let Some(s) = pipeline.schemas() { + // transformed + + // if current pipeline generates some transformed results, build it as + // `RowInsertRequest` and append to results. If the pipeline doesn't + // have dispatch, this will be only output of the pipeline. + for (table_name, rows) in transformed_map { + results.push(RowInsertRequest { + rows: Some(Rows { + rows, + schema: s.clone(), + }), + table_name, + }); + } + } else { + // auto map + for (table_name, pipeline_maps) in auto_map { + if pipeline_maps.is_empty() { + continue; + } + + let ts_unit_map = auto_map_ts_keys + .remove(&table_name) + .context(AutoTransformOneTimestampSnafu) + .context(PipelineSnafu)?; + // only one timestamp key is allowed + // which will be converted to ts index + let (ts_key, unit) = ts_unit_map + .into_iter() + .exactly_one() + .map_err(|_| AutoTransformOneTimestampSnafu.build()) + .context(PipelineSnafu)?; + + let ident_ts_index = IdentityTimeIndex::Epoch(ts_key.to_string(), unit, false); + let new_def = PipelineDefinition::GreptimeIdentityPipeline(Some(ident_ts_index)); + let next_pipeline_ctx = PipelineContext::new(&new_def, pipeline_ctx.pipeline_param); + + let reqs = run_identity_pipeline( + handler, + &next_pipeline_ctx, + PipelineIngestRequest { + table: table_name, + values: pipeline_maps, + }, + query_ctx, + ) + .await?; + + results.extend(reqs); + } } // if current pipeline contains dispatcher and has several rules, we may @@ -204,3 +243,11 @@ async fn run_custom_pipeline( Ok(results) } + +#[inline] +fn table_suffix_to_table_name(table_name: &String, table_suffix: Option) -> String { + match table_suffix { + Some(suffix) => format!("{}{}", table_name, suffix), + None => table_name.clone(), + } +} diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs index 97f2c47712..e58c683d01 100644 --- a/src/servers/src/prom_row_builder.rs +++ b/src/servers/src/prom_row_builder.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::string::ToString; +use ahash::HashMap; use api::prom_store::remote::Sample; use api::v1::value::ValueData; use api::v1::{ @@ -21,8 +23,6 @@ use api::v1::{ Value, }; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; -use hashbrown::hash_map::Entry; -use hashbrown::HashMap; use prost::DecodeError; use crate::proto::PromLabel; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 2c663e99b9..43b76fcedf 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -96,6 +96,7 @@ macro_rules! http_tests { test_pipeline_api, test_test_pipeline_api, test_plain_text_ingestion, + test_pipeline_auto_transform, test_identity_pipeline, test_identity_pipeline_with_flatten, test_identity_pipeline_with_custom_ts, @@ -2328,6 +2329,85 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_auto_transform(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(store_type, "test_pipeline_auto_transform").await; + + // handshake + let client = TestClient::new(app).await; + + let body = r#" +processors: + - dissect: + fields: + - message + patterns: + - "%{+ts} %{+ts} %{http_status_code} %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" +"#; + + // 1. create pipeline + let res = client + .post("/v1/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} + let content: Value = content.unwrap(); + + let version_str = content + .get("pipelines") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("version") + .unwrap() + .as_str() + .unwrap() + .to_string(); + assert!(!version_str.is_empty()); + + // 2. write data + let data_body = r#" +2024-05-25 20:16:37.217 404 hello +2024-05-25 20:16:37.218 200 hello world +"#; + let res = client + .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "text/plain") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. select data + let expected = "[[1716668197217000000,\"hello\",\"404\",\"2024-05-25 20:16:37.217 404 hello\"],[1716668197218000000,\"hello world\",\"200\",\"2024-05-25 20:16:37.218 200 hello world\"]]"; + validate_data( + "test_pipeline_auto_transform", + &client, + "select * from logs1", + expected, + ) + .await; + + guard.remove_all().await; +} + pub async fn test_otlp_metrics(store_type: StorageType) { // init common_telemetry::init_default_ut_logging();