diff --git a/Cargo.lock b/Cargo.lock index 26d6e0b96d..6a0fefc6ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3711,6 +3711,15 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "document-features" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d" +dependencies = [ + "litrs", +] + [[package]] name = "dotenv" version = "0.15.0" @@ -3755,6 +3764,15 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +[[package]] +name = "dyn-fmt" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c992f591dfce792a9bc2d1880ab67ffd4acc04551f8e551ca3b6233efb322f00" +dependencies = [ + "document-features", +] + [[package]] name = "earcutr" version = "0.4.3" @@ -6265,6 +6283,12 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "local-ip-address" version = "0.6.3" @@ -8342,6 +8366,7 @@ dependencies = [ "datafusion-functions", "datafusion-physical-expr", "datatypes", + "dyn-fmt", "enum_dispatch", "futures", "greptime-proto", diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 4c2a5e9945..6ce55591a1 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -36,6 +36,7 @@ datafusion-expr.workspace = true datafusion-functions.workspace = true datafusion-physical-expr.workspace = true datatypes.workspace = true +dyn-fmt = "0.4" enum_dispatch = "0.3" futures.workspace = true greptime-proto.workspace = true diff --git a/src/pipeline/benches/processor.rs b/src/pipeline/benches/processor.rs index 848bd8a1a1..011f492b63 100644 --- a/src/pipeline/benches/processor.rs +++ b/src/pipeline/benches/processor.rs @@ -29,7 +29,7 @@ fn processor_mut( .exec_mut(&mut payload)? .into_transformed() .expect("expect transformed result "); - result.push(r); + result.push(r.0); } Ok(result) diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 7a714252c5..2b621b7dba 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -639,6 +639,16 @@ pub enum Error { source: common_recordbatch::error::Error, }, + #[snafu(display("A valid table suffix template is required for tablesuffix section"))] + RequiredTableSuffixTemplate, + + #[snafu(display("Invalid table suffix template, input: {}", input))] + InvalidTableSuffixTemplate { + input: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to cast type, msg: {}", msg))] CastType { msg: String, @@ -807,7 +817,9 @@ impl ErrorExt for Error { | FieldRequiredForDispatcher | TableSuffixRequiredForDispatcherRule | ValueRequiredForDispatcherRule - | ReachedMaxNestedLevels { .. } => StatusCode::InvalidArguments, + | ReachedMaxNestedLevels { .. } + | RequiredTableSuffixTemplate + | InvalidTableSuffixTemplate { .. } => StatusCode::InvalidArguments, } } diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index 5c2c2795f4..5d98dce4a8 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -29,6 +29,7 @@ use crate::dispatcher::{Dispatcher, Rule}; use crate::error::{ IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu, }; +use crate::tablesuffix::TableSuffixTemplate; use crate::GreptimeTransformer; const DESCRIPTION: &str = "description"; @@ -36,6 +37,7 @@ const PROCESSORS: &str = "processors"; const TRANSFORM: &str = "transform"; const TRANSFORMS: &str = "transforms"; const DISPATCHER: &str = "dispatcher"; +const TABLESUFFIX: &str = "table_suffix"; pub type PipelineMap = std::collections::BTreeMap; @@ -76,11 +78,18 @@ pub fn parse(input: &Content) -> Result { None }; + let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() { + Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?) + } else { + None + }; + Ok(Pipeline { description, processors, transformer, dispatcher, + tablesuffix, }) } Content::Json(_) => unimplemented!(), @@ -93,6 +102,7 @@ pub struct Pipeline { processors: processor::Processors, dispatcher: Option, transformer: GreptimeTransformer, + tablesuffix: Option, } /// Where the pipeline executed is dispatched to, with context information @@ -121,12 +131,12 @@ impl DispatchedTo { /// The result of pipeline execution #[derive(Debug)] pub enum PipelineExecOutput { - Transformed(Row), + Transformed((Row, Option)), DispatchedTo(DispatchedTo), } impl PipelineExecOutput { - pub fn into_transformed(self) -> Option { + pub fn into_transformed(self) -> Option<(Row, Option)> { if let Self::Transformed(o) = self { Some(o) } else { @@ -162,22 +172,23 @@ pub fn json_array_to_map(val: Vec) -> Result impl Pipeline { pub fn exec_mut(&self, val: &mut PipelineMap) -> Result { + // process for processor in self.processors.iter() { processor.exec_mut(val)?; } - let matched_rule = self - .dispatcher - .as_ref() - .and_then(|dispatcher| dispatcher.exec(val)); - - match matched_rule { - None => self - .transformer - .transform_mut(val) - .map(PipelineExecOutput::Transformed), - Some(rule) => Ok(PipelineExecOutput::DispatchedTo(rule.into())), + // dispatch, fast return if matched + if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) { + return Ok(PipelineExecOutput::DispatchedTo(rule.into())); } + + // transform + let row = self.transformer.transform_mut(val)?; + + // generate table name + let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val)); + + Ok(PipelineExecOutput::Transformed((row, table_suffix))) } pub fn processors(&self) -> &processor::Processors { @@ -237,9 +248,9 @@ transform: .into_transformed() .unwrap(); - assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); - assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); - match &result.values[2].value_data { + assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1))); + assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2))); + match &result.0.values[2].value_data { Some(ValueData::TimestampNanosecondValue(v)) => { assert_ne!(*v, 0); } @@ -289,7 +300,7 @@ transform: .unwrap(); let sechema = pipeline.schemas(); - assert_eq!(sechema.len(), result.values.len()); + assert_eq!(sechema.len(), result.0.values.len()); let test = vec![ ( ColumnDataType::String as i32, @@ -328,7 +339,7 @@ transform: ]; for i in 0..sechema.len() { let schema = &sechema[i]; - let value = &result.values[i]; + let value = &result.0.values[i]; assert_eq!(schema.datatype, test[i].0); assert_eq!(value.value_data, test[i].1); } @@ -364,9 +375,9 @@ transform: .unwrap() .into_transformed() .unwrap(); - assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1))); - assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2))); - match &result.values[2].value_data { + assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1))); + assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2))); + match &result.0.values[2].value_data { Some(ValueData::TimestampNanosecondValue(v)) => { assert_ne!(*v, 0); } @@ -409,7 +420,7 @@ transform: .unwrap(); let output = Rows { schema, - rows: vec![row], + rows: vec![row.0], }; let schemas = output.schema; diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 8942c56bd0..6c477ed7e5 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -17,6 +17,7 @@ pub mod error; mod etl; mod manager; mod metrics; +mod tablesuffix; pub use etl::processor::Processor; pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo}; diff --git a/src/pipeline/src/tablesuffix.rs b/src/pipeline/src/tablesuffix.rs new file mode 100644 index 0000000000..01f41fbf6e --- /dev/null +++ b/src/pipeline/src/tablesuffix.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use dyn_fmt::AsStrFormatExt; +use regex::Regex; +use snafu::{ensure, OptionExt}; +use yaml_rust::Yaml; + +use crate::error::{ + Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result, +}; +use crate::{PipelineMap, Value}; + +const REPLACE_KEY: &str = "{}"; + +lazy_static::lazy_static! { + static ref NAME_TPL: Regex = Regex::new(r"\$\{([^}]+)\}").unwrap(); +} + +/// TableSuffixTemplate is used to generate suffix for the table name, so that the input data can be written to multiple tables. +/// The config should be placed at the end of the pipeline. +/// Use `${variable}` to refer to the variable in the pipeline context, the viarable can be from input data or be a processed result. +/// Note the variable should be an integer number or a string. +/// In case of any error occurs during runtime, no suffix will be added to the table name. +/// +/// ```yaml +/// table_suffix: _${xxx}_${b} +/// ``` +/// +/// For example, if the template is `_${xxx}_${b}`, and the pipeline context is +/// `{"xxx": "123", "b": "456"}`, the generated table name will be `_123_456`. +#[derive(Debug, PartialEq)] +pub(crate) struct TableSuffixTemplate { + pub template: String, + pub keys: Vec, +} + +impl TableSuffixTemplate { + pub fn apply(&self, val: &PipelineMap) -> Option { + let values = self + .keys + .iter() + .filter_map(|key| { + let v = val.get(key)?; + match v { + Value::Int8(v) => Some(v.to_string()), + Value::Int16(v) => Some(v.to_string()), + Value::Int32(v) => Some(v.to_string()), + Value::Int64(v) => Some(v.to_string()), + Value::Uint8(v) => Some(v.to_string()), + Value::Uint16(v) => Some(v.to_string()), + Value::Uint32(v) => Some(v.to_string()), + Value::Uint64(v) => Some(v.to_string()), + Value::String(v) => Some(v.clone()), + _ => None, + } + }) + .collect::>(); + if values.len() != self.keys.len() { + return None; + } + Some(self.template.format(&values)) + } +} + +impl TryFrom<&Yaml> for TableSuffixTemplate { + type Error = Error; + + fn try_from(value: &Yaml) -> Result { + let name_template = value + .as_str() + .context(RequiredTableSuffixTemplateSnafu)? + .to_string(); + + let mut keys = Vec::new(); + for cap in NAME_TPL.captures_iter(&name_template) { + ensure!( + cap.len() >= 2, + InvalidTableSuffixTemplateSnafu { + input: name_template.clone(), + } + ); + let key = cap[1].trim().to_string(); + keys.push(key); + } + + let template = NAME_TPL + .replace_all(&name_template, REPLACE_KEY) + .to_string(); + + Ok(TableSuffixTemplate { template, keys }) + } +} + +#[cfg(test)] +mod tests { + use yaml_rust::YamlLoader; + + use crate::tablesuffix::TableSuffixTemplate; + + #[test] + fn test_table_suffix_parsing() { + let yaml = r#" + table_suffix: _${xxx}_${b} + "#; + let config = YamlLoader::load_from_str(yaml); + assert!(config.is_ok()); + let config = config.unwrap()[0]["table_suffix"].clone(); + let name_template = TableSuffixTemplate::try_from(&config); + assert!(name_template.is_ok()); + let name_template = name_template.unwrap(); + assert_eq!(name_template.template, "_{}_{}"); + assert_eq!(name_template.keys, vec!["xxx", "b"]); + } +} diff --git a/src/pipeline/tests/common.rs b/src/pipeline/tests/common.rs index 15b9f6e1cc..0361f4e41c 100644 --- a/src/pipeline/tests/common.rs +++ b/src/pipeline/tests/common.rs @@ -35,7 +35,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); - rows.push(row); + rows.push(row.0); } } serde_json::Value::Object(_) => { @@ -45,7 +45,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows { .expect("failed to exec pipeline") .into_transformed() .expect("expect transformed result "); - rows.push(row); + rows.push(row.0); } _ => { panic!("invalid input value"); diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index 77361550d4..1ac86864fb 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -13,7 +13,7 @@ // limitations under the License. use api::v1::value::ValueData; -use api::v1::Rows; +use api::v1::{Rows, Value}; use common_telemetry::tracing::info; use greptime_proto::v1::value::ValueData::{ BinaryValue, BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue, @@ -429,7 +429,7 @@ transform: let output = Rows { schema: pipeline.schemas().clone(), - rows: vec![row], + rows: vec![row.0], }; assert_eq!(output.rows.len(), 1); @@ -495,6 +495,7 @@ transform: .into_transformed() .expect("expect transformed result "); let r = row + .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -604,6 +605,7 @@ transform: .expect("expect transformed result "); let r = row + .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -668,6 +670,7 @@ transform: .into_transformed() .expect("expect transformed result "); let r = row + .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -709,6 +712,7 @@ transform: .expect("expect transformed result "); let r = row + .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -767,6 +771,7 @@ transform: .expect("expect transformed result "); let mut r = row + .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -806,7 +811,7 @@ transform: .into_transformed() .expect("expect transformed result "); - row.values.into_iter().for_each(|v| { + row.0.values.into_iter().for_each(|v| { if let ValueData::TimestampNanosecondValue(v) = v.value_data.unwrap() { let now = chrono::Utc::now().timestamp_nanos_opt().unwrap(); assert!(now - v < 1_000_000); @@ -877,6 +882,7 @@ transform: .into_transformed() .expect("expect transformed result "); let r = row + .0 .values .into_iter() .map(|v| v.value_data.unwrap()) @@ -889,3 +895,54 @@ transform: assert_eq!(expected, r); } + +#[test] +fn test_table_suffix_template() { + let input_value = r#" +{ + "line": "2024-05-25 20:16:37.217 [http] hello world" +} +"#; + let input_value = serde_json::from_str::(input_value).unwrap(); + + let pipeline_yaml = r#" +processors: + - dissect: + fields: + - line + patterns: + - "%{+ts} %{+ts} [%{logger}] %{content}" + - date: + fields: + - ts + formats: + - "%Y-%m-%d %H:%M:%S%.3f" +transform: + - fields: + - content + type: string + - field: ts + type: time + index: timestamp +table_suffix: _${logger} +"#; + + let yaml_content = Content::Yaml(pipeline_yaml); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + + let mut status = json_to_map(input_value).unwrap(); + let exec_re = pipeline.exec_mut(&mut status).unwrap(); + + let (row, table_name) = exec_re.into_transformed().unwrap(); + let values = row.values; + let expected_values = vec![ + Value { + value_data: Some(ValueData::StringValue("hello world".into())), + }, + Value { + value_data: Some(ValueData::TimestampNanosecondValue(1716668197217000000)), + }, + ]; + assert_eq!(expected_values, values); + assert_eq!(table_name, Some("_http".to_string())); +} diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index 4928ce44a3..8d8538a319 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -16,6 +16,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use api::v1::{RowInsertRequest, Rows}; +use hashbrown::HashMap; use pipeline::{ DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineDefinition, PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME, @@ -120,7 +121,8 @@ async fn run_custom_pipeline( let transform_timer = std::time::Instant::now(); - let mut transformed = Vec::with_capacity(data_array.len()); + let arr_len = data_array.len(); + let mut req_map = HashMap::new(); let mut dispatched: BTreeMap> = BTreeMap::new(); for mut values in data_array { @@ -134,8 +136,16 @@ async fn run_custom_pipeline( .context(PipelineSnafu)?; match r { - PipelineExecOutput::Transformed(row) => { - transformed.push(row); + PipelineExecOutput::Transformed((row, table_suffix)) => { + let act_table_name = match table_suffix { + Some(suffix) => format!("{}{}", table_name, suffix), + None => table_name.clone(), + }; + + req_map + .entry(act_table_name) + .or_insert_with(|| Vec::with_capacity(arr_len)) + .push(row); } PipelineExecOutput::DispatchedTo(dispatched_to) => { if let Some(coll) = dispatched.get_mut(&dispatched_to) { @@ -151,14 +161,14 @@ async fn run_custom_pipeline( // if current pipeline generates some transformed results, build it as // `RowInsertRequest` and append to results. If the pipeline doesn't // have dispatch, this will be only output of the pipeline. - if !transformed.is_empty() { + for (table_name, rows) in req_map { results.push(RowInsertRequest { rows: Some(Rows { - rows: transformed, + rows, schema: pipeline.schemas().clone(), }), - table_name: table_name.clone(), - }) + table_name, + }); } // if current pipeline contains dispatcher and has several rules, we may diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 90dc059856..34d3b48083 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -96,6 +96,7 @@ macro_rules! http_tests { test_identify_pipeline_with_flatten, test_identify_pipeline_with_custom_ts, test_pipeline_dispatcher, + test_pipeline_suffix_template, test_otlp_metrics, test_otlp_traces_v0, @@ -1657,6 +1658,137 @@ transform: guard.remove_all().await; } +pub async fn test_pipeline_suffix_template(storage_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = + setup_test_http_app_with_frontend(storage_type, "test_pipeline_suffix_template").await; + + // handshake + let client = TestClient::new(app).await; + + let root_pipeline = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + +transform: + - fields: + - id1, id1_root + - id2, id2_root + type: int32 + - fields: + - type + - log + - logger + type: string + - field: time + type: time + index: timestamp +table_suffix: _${type} +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/root") + .header("Content-Type", "application/x-yaml") + .body(root_pipeline) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // 2. write data + let data_body = r#" +[ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "http", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + }, + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "http", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + }, + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "db", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + }, + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "http", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + }, + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=d_table&pipeline_name=root") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // 3. check table list + validate_data( + "test_pipeline_suffix_template_table_list", + &client, + "show tables", + "[[\"d_table\"],[\"d_table_db\"],[\"d_table_http\"],[\"demo\"],[\"numbers\"]]", + ) + .await; + + // 4. check each table's data + validate_data( + "test_pipeline_suffix_template_default", + &client, + "select * from d_table", + "[[2436,2528,null,\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000]]", + ) + .await; + + validate_data( + "test_pipeline_name_template_db", + &client, + "select * from d_table_db", + "[[2436,2528,\"db\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000]]", + ) + .await; + + validate_data( + "test_pipeline_name_template_http", + &client, + "select * from d_table_http", + "[[2436,2528,\"http\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000],[2436,2528,\"http\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000],[2436,2528,\"http\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000]]", + ) + .await; + + guard.remove_all().await; +} + pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) { common_telemetry::init_default_ut_logging(); let (app, mut guard) =