feat(pipeline): support table name suffix templating in pipeline (#5775)

* chore: add table name template in pipeline yaml

* chore: implement apply function and add simple test

* chore: add comment and integration test

* chore: minor update

* fix: typos

* chore: change to table suffix

* chore: update comment and test

* chore: change name to table_suffix
This commit is contained in:
shuiyisong
2025-03-29 02:12:46 +08:00
committed by GitHub
parent a9e990768d
commit bef45ed0e8
11 changed files with 411 additions and 36 deletions

25
Cargo.lock generated
View File

@@ -3711,6 +3711,15 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "document-features"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95249b50c6c185bee49034bcb378a49dc2b5dff0be90ff6616d31d64febab05d"
dependencies = [
"litrs",
]
[[package]]
name = "dotenv"
version = "0.15.0"
@@ -3755,6 +3764,15 @@ version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
[[package]]
name = "dyn-fmt"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c992f591dfce792a9bc2d1880ab67ffd4acc04551f8e551ca3b6233efb322f00"
dependencies = [
"document-features",
]
[[package]]
name = "earcutr"
version = "0.4.3"
@@ -6265,6 +6283,12 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
[[package]]
name = "litrs"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
[[package]]
name = "local-ip-address"
version = "0.6.3"
@@ -8342,6 +8366,7 @@ dependencies = [
"datafusion-functions",
"datafusion-physical-expr",
"datatypes",
"dyn-fmt",
"enum_dispatch",
"futures",
"greptime-proto",

View File

@@ -36,6 +36,7 @@ datafusion-expr.workspace = true
datafusion-functions.workspace = true
datafusion-physical-expr.workspace = true
datatypes.workspace = true
dyn-fmt = "0.4"
enum_dispatch = "0.3"
futures.workspace = true
greptime-proto.workspace = true

View File

@@ -29,7 +29,7 @@ fn processor_mut(
.exec_mut(&mut payload)?
.into_transformed()
.expect("expect transformed result ");
result.push(r);
result.push(r.0);
}
Ok(result)

View File

@@ -639,6 +639,16 @@ pub enum Error {
source: common_recordbatch::error::Error,
},
#[snafu(display("A valid table suffix template is required for tablesuffix section"))]
RequiredTableSuffixTemplate,
#[snafu(display("Invalid table suffix template, input: {}", input))]
InvalidTableSuffixTemplate {
input: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to cast type, msg: {}", msg))]
CastType {
msg: String,
@@ -807,7 +817,9 @@ impl ErrorExt for Error {
| FieldRequiredForDispatcher
| TableSuffixRequiredForDispatcherRule
| ValueRequiredForDispatcherRule
| ReachedMaxNestedLevels { .. } => StatusCode::InvalidArguments,
| ReachedMaxNestedLevels { .. }
| RequiredTableSuffixTemplate
| InvalidTableSuffixTemplate { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -29,6 +29,7 @@ use crate::dispatcher::{Dispatcher, Rule};
use crate::error::{
IntermediateKeyIndexSnafu, PrepareValueMustBeObjectSnafu, Result, YamlLoadSnafu, YamlParseSnafu,
};
use crate::tablesuffix::TableSuffixTemplate;
use crate::GreptimeTransformer;
const DESCRIPTION: &str = "description";
@@ -36,6 +37,7 @@ const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";
const DISPATCHER: &str = "dispatcher";
const TABLESUFFIX: &str = "table_suffix";
pub type PipelineMap = std::collections::BTreeMap<String, Value>;
@@ -76,11 +78,18 @@ pub fn parse(input: &Content) -> Result<Pipeline> {
None
};
let tablesuffix = if !doc[TABLESUFFIX].is_badvalue() {
Some(TableSuffixTemplate::try_from(&doc[TABLESUFFIX])?)
} else {
None
};
Ok(Pipeline {
description,
processors,
transformer,
dispatcher,
tablesuffix,
})
}
Content::Json(_) => unimplemented!(),
@@ -93,6 +102,7 @@ pub struct Pipeline {
processors: processor::Processors,
dispatcher: Option<Dispatcher>,
transformer: GreptimeTransformer,
tablesuffix: Option<TableSuffixTemplate>,
}
/// Where the pipeline executed is dispatched to, with context information
@@ -121,12 +131,12 @@ impl DispatchedTo {
/// The result of pipeline execution
#[derive(Debug)]
pub enum PipelineExecOutput {
Transformed(Row),
Transformed((Row, Option<String>)),
DispatchedTo(DispatchedTo),
}
impl PipelineExecOutput {
pub fn into_transformed(self) -> Option<Row> {
pub fn into_transformed(self) -> Option<(Row, Option<String>)> {
if let Self::Transformed(o) = self {
Some(o)
} else {
@@ -162,22 +172,23 @@ pub fn json_array_to_map(val: Vec<serde_json::Value>) -> Result<Vec<PipelineMap>
impl Pipeline {
pub fn exec_mut(&self, val: &mut PipelineMap) -> Result<PipelineExecOutput> {
// process
for processor in self.processors.iter() {
processor.exec_mut(val)?;
}
let matched_rule = self
.dispatcher
.as_ref()
.and_then(|dispatcher| dispatcher.exec(val));
match matched_rule {
None => self
.transformer
.transform_mut(val)
.map(PipelineExecOutput::Transformed),
Some(rule) => Ok(PipelineExecOutput::DispatchedTo(rule.into())),
// dispatch, fast return if matched
if let Some(rule) = self.dispatcher.as_ref().and_then(|d| d.exec(val)) {
return Ok(PipelineExecOutput::DispatchedTo(rule.into()));
}
// transform
let row = self.transformer.transform_mut(val)?;
// generate table name
let table_suffix = self.tablesuffix.as_ref().and_then(|t| t.apply(val));
Ok(PipelineExecOutput::Transformed((row, table_suffix)))
}
pub fn processors(&self) -> &processor::Processors {
@@ -237,9 +248,9 @@ transform:
.into_transformed()
.unwrap();
assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.values[2].value_data {
assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.0.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
}
@@ -289,7 +300,7 @@ transform:
.unwrap();
let sechema = pipeline.schemas();
assert_eq!(sechema.len(), result.values.len());
assert_eq!(sechema.len(), result.0.values.len());
let test = vec![
(
ColumnDataType::String as i32,
@@ -328,7 +339,7 @@ transform:
];
for i in 0..sechema.len() {
let schema = &sechema[i];
let value = &result.values[i];
let value = &result.0.values[i];
assert_eq!(schema.datatype, test[i].0);
assert_eq!(value.value_data, test[i].1);
}
@@ -364,9 +375,9 @@ transform:
.unwrap()
.into_transformed()
.unwrap();
assert_eq!(result.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.values[2].value_data {
assert_eq!(result.0.values[0].value_data, Some(ValueData::U32Value(1)));
assert_eq!(result.0.values[1].value_data, Some(ValueData::U32Value(2)));
match &result.0.values[2].value_data {
Some(ValueData::TimestampNanosecondValue(v)) => {
assert_ne!(*v, 0);
}
@@ -409,7 +420,7 @@ transform:
.unwrap();
let output = Rows {
schema,
rows: vec![row],
rows: vec![row.0],
};
let schemas = output.schema;

View File

@@ -17,6 +17,7 @@ pub mod error;
mod etl;
mod manager;
mod metrics;
mod tablesuffix;
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::{GreptimePipelineParams, SchemaInfo};

View File

@@ -0,0 +1,126 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use dyn_fmt::AsStrFormatExt;
use regex::Regex;
use snafu::{ensure, OptionExt};
use yaml_rust::Yaml;
use crate::error::{
Error, InvalidTableSuffixTemplateSnafu, RequiredTableSuffixTemplateSnafu, Result,
};
use crate::{PipelineMap, Value};
const REPLACE_KEY: &str = "{}";
lazy_static::lazy_static! {
static ref NAME_TPL: Regex = Regex::new(r"\$\{([^}]+)\}").unwrap();
}
/// TableSuffixTemplate is used to generate suffix for the table name, so that the input data can be written to multiple tables.
/// The config should be placed at the end of the pipeline.
/// Use `${variable}` to refer to the variable in the pipeline context, the viarable can be from input data or be a processed result.
/// Note the variable should be an integer number or a string.
/// In case of any error occurs during runtime, no suffix will be added to the table name.
///
/// ```yaml
/// table_suffix: _${xxx}_${b}
/// ```
///
/// For example, if the template is `_${xxx}_${b}`, and the pipeline context is
/// `{"xxx": "123", "b": "456"}`, the generated table name will be `_123_456`.
#[derive(Debug, PartialEq)]
pub(crate) struct TableSuffixTemplate {
pub template: String,
pub keys: Vec<String>,
}
impl TableSuffixTemplate {
pub fn apply(&self, val: &PipelineMap) -> Option<String> {
let values = self
.keys
.iter()
.filter_map(|key| {
let v = val.get(key)?;
match v {
Value::Int8(v) => Some(v.to_string()),
Value::Int16(v) => Some(v.to_string()),
Value::Int32(v) => Some(v.to_string()),
Value::Int64(v) => Some(v.to_string()),
Value::Uint8(v) => Some(v.to_string()),
Value::Uint16(v) => Some(v.to_string()),
Value::Uint32(v) => Some(v.to_string()),
Value::Uint64(v) => Some(v.to_string()),
Value::String(v) => Some(v.clone()),
_ => None,
}
})
.collect::<Vec<_>>();
if values.len() != self.keys.len() {
return None;
}
Some(self.template.format(&values))
}
}
impl TryFrom<&Yaml> for TableSuffixTemplate {
type Error = Error;
fn try_from(value: &Yaml) -> Result<Self> {
let name_template = value
.as_str()
.context(RequiredTableSuffixTemplateSnafu)?
.to_string();
let mut keys = Vec::new();
for cap in NAME_TPL.captures_iter(&name_template) {
ensure!(
cap.len() >= 2,
InvalidTableSuffixTemplateSnafu {
input: name_template.clone(),
}
);
let key = cap[1].trim().to_string();
keys.push(key);
}
let template = NAME_TPL
.replace_all(&name_template, REPLACE_KEY)
.to_string();
Ok(TableSuffixTemplate { template, keys })
}
}
#[cfg(test)]
mod tests {
use yaml_rust::YamlLoader;
use crate::tablesuffix::TableSuffixTemplate;
#[test]
fn test_table_suffix_parsing() {
let yaml = r#"
table_suffix: _${xxx}_${b}
"#;
let config = YamlLoader::load_from_str(yaml);
assert!(config.is_ok());
let config = config.unwrap()[0]["table_suffix"].clone();
let name_template = TableSuffixTemplate::try_from(&config);
assert!(name_template.is_ok());
let name_template = name_template.unwrap();
assert_eq!(name_template.template, "_{}_{}");
assert_eq!(name_template.keys, vec!["xxx", "b"]);
}
}

View File

@@ -35,7 +35,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
rows.push(row);
rows.push(row.0);
}
}
serde_json::Value::Object(_) => {
@@ -45,7 +45,7 @@ pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
.expect("failed to exec pipeline")
.into_transformed()
.expect("expect transformed result ");
rows.push(row);
rows.push(row.0);
}
_ => {
panic!("invalid input value");

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use api::v1::value::ValueData;
use api::v1::Rows;
use api::v1::{Rows, Value};
use common_telemetry::tracing::info;
use greptime_proto::v1::value::ValueData::{
BinaryValue, BoolValue, F64Value, StringValue, TimestampNanosecondValue, TimestampSecondValue,
@@ -429,7 +429,7 @@ transform:
let output = Rows {
schema: pipeline.schemas().clone(),
rows: vec![row],
rows: vec![row.0],
};
assert_eq!(output.rows.len(), 1);
@@ -495,6 +495,7 @@ transform:
.into_transformed()
.expect("expect transformed result ");
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -604,6 +605,7 @@ transform:
.expect("expect transformed result ");
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -668,6 +670,7 @@ transform:
.into_transformed()
.expect("expect transformed result ");
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -709,6 +712,7 @@ transform:
.expect("expect transformed result ");
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -767,6 +771,7 @@ transform:
.expect("expect transformed result ");
let mut r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -806,7 +811,7 @@ transform:
.into_transformed()
.expect("expect transformed result ");
row.values.into_iter().for_each(|v| {
row.0.values.into_iter().for_each(|v| {
if let ValueData::TimestampNanosecondValue(v) = v.value_data.unwrap() {
let now = chrono::Utc::now().timestamp_nanos_opt().unwrap();
assert!(now - v < 1_000_000);
@@ -877,6 +882,7 @@ transform:
.into_transformed()
.expect("expect transformed result ");
let r = row
.0
.values
.into_iter()
.map(|v| v.value_data.unwrap())
@@ -889,3 +895,54 @@ transform:
assert_eq!(expected, r);
}
#[test]
fn test_table_suffix_template() {
let input_value = r#"
{
"line": "2024-05-25 20:16:37.217 [http] hello world"
}
"#;
let input_value = serde_json::from_str::<serde_json::Value>(input_value).unwrap();
let pipeline_yaml = r#"
processors:
- dissect:
fields:
- line
patterns:
- "%{+ts} %{+ts} [%{logger}] %{content}"
- date:
fields:
- ts
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
transform:
- fields:
- content
type: string
- field: ts
type: time
index: timestamp
table_suffix: _${logger}
"#;
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline = parse(&yaml_content).unwrap();
let mut status = json_to_map(input_value).unwrap();
let exec_re = pipeline.exec_mut(&mut status).unwrap();
let (row, table_name) = exec_re.into_transformed().unwrap();
let values = row.values;
let expected_values = vec![
Value {
value_data: Some(ValueData::StringValue("hello world".into())),
},
Value {
value_data: Some(ValueData::TimestampNanosecondValue(1716668197217000000)),
},
];
assert_eq!(expected_values, values);
assert_eq!(table_name, Some("_http".to_string()));
}

View File

@@ -16,6 +16,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use api::v1::{RowInsertRequest, Rows};
use hashbrown::HashMap;
use pipeline::{
DispatchedTo, GreptimePipelineParams, IdentityTimeIndex, Pipeline, PipelineDefinition,
PipelineExecOutput, PipelineMap, GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME,
@@ -120,7 +121,8 @@ async fn run_custom_pipeline(
let transform_timer = std::time::Instant::now();
let mut transformed = Vec::with_capacity(data_array.len());
let arr_len = data_array.len();
let mut req_map = HashMap::new();
let mut dispatched: BTreeMap<DispatchedTo, Vec<PipelineMap>> = BTreeMap::new();
for mut values in data_array {
@@ -134,8 +136,16 @@ async fn run_custom_pipeline(
.context(PipelineSnafu)?;
match r {
PipelineExecOutput::Transformed(row) => {
transformed.push(row);
PipelineExecOutput::Transformed((row, table_suffix)) => {
let act_table_name = match table_suffix {
Some(suffix) => format!("{}{}", table_name, suffix),
None => table_name.clone(),
};
req_map
.entry(act_table_name)
.or_insert_with(|| Vec::with_capacity(arr_len))
.push(row);
}
PipelineExecOutput::DispatchedTo(dispatched_to) => {
if let Some(coll) = dispatched.get_mut(&dispatched_to) {
@@ -151,14 +161,14 @@ async fn run_custom_pipeline(
// if current pipeline generates some transformed results, build it as
// `RowInsertRequest` and append to results. If the pipeline doesn't
// have dispatch, this will be only output of the pipeline.
if !transformed.is_empty() {
for (table_name, rows) in req_map {
results.push(RowInsertRequest {
rows: Some(Rows {
rows: transformed,
rows,
schema: pipeline.schemas().clone(),
}),
table_name: table_name.clone(),
})
table_name,
});
}
// if current pipeline contains dispatcher and has several rules, we may

View File

@@ -96,6 +96,7 @@ macro_rules! http_tests {
test_identify_pipeline_with_flatten,
test_identify_pipeline_with_custom_ts,
test_pipeline_dispatcher,
test_pipeline_suffix_template,
test_otlp_metrics,
test_otlp_traces_v0,
@@ -1657,6 +1658,137 @@ transform:
guard.remove_all().await;
}
pub async fn test_pipeline_suffix_template(storage_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_http_app_with_frontend(storage_type, "test_pipeline_suffix_template").await;
// handshake
let client = TestClient::new(app).await;
let root_pipeline = r#"
processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
transform:
- fields:
- id1, id1_root
- id2, id2_root
type: int32
- fields:
- type
- log
- logger
type: string
- field: time
type: time
index: timestamp
table_suffix: _${type}
"#;
// 1. create pipeline
let res = client
.post("/v1/events/pipelines/root")
.header("Content-Type", "application/x-yaml")
.body(root_pipeline)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 2. write data
let data_body = r#"
[
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "http",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "http",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "db",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "http",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
}
]
"#;
let res = client
.post("/v1/events/logs?db=public&table=d_table&pipeline_name=root")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// 3. check table list
validate_data(
"test_pipeline_suffix_template_table_list",
&client,
"show tables",
"[[\"d_table\"],[\"d_table_db\"],[\"d_table_http\"],[\"demo\"],[\"numbers\"]]",
)
.await;
// 4. check each table's data
validate_data(
"test_pipeline_suffix_template_default",
&client,
"select * from d_table",
"[[2436,2528,null,\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000]]",
)
.await;
validate_data(
"test_pipeline_name_template_db",
&client,
"select * from d_table_db",
"[[2436,2528,\"db\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000]]",
)
.await;
validate_data(
"test_pipeline_name_template_http",
&client,
"select * from d_table_http",
"[[2436,2528,\"http\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000],[2436,2528,\"http\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000],[2436,2528,\"http\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",\"INTERACT.MANAGER\",1716668197217000000]]",
)
.await;
guard.remove_all().await;
}
pub async fn test_identify_pipeline_with_flatten(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =