feat: pipeline dispatch part 1, add definition (#5359)

* feat: add dispatcher definition

* feat: add dispatcher element in pipelien definition

* fmt: correct format

* test: add negative tests

* fmt: fix format

* refactor: replace consts

* feat: add tostring for dispatcher

* refactor: remove to_string which is actually debug

* Update src/pipeline/src/dispatcher.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

---------

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ning Sun
2025-01-18 19:47:17 +08:00
committed by GitHub
parent c78a492863
commit d73815ba84
6 changed files with 241 additions and 73 deletions

View File

@@ -0,0 +1,107 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use snafu::OptionExt;
use yaml_rust::Yaml;
use crate::etl::error::{Error, Result};
use crate::etl_error::{
FieldRequiredForDispatcherSnafu, TablePartRequiredForDispatcherRuleSnafu,
ValueRequiredForDispatcherRuleSnafu,
};
use crate::Value;
const FIELD: &str = "field";
const TABLE_PARTIAL: &str = "table_part";
const PIPELINE: &str = "pipeline";
const VALUE: &str = "value";
const RULES: &str = "rules";
/// The dispatcher configuration.
///
/// Dispatcher in a pipeline allows user to call another pipeline and specify
/// table name based on field matching.
///
/// ```yaml
/// dispatcher:
/// field: type
/// rules:
/// - value: http
/// pipeline: http_pipeline
/// table_part: http_log
/// - value: db
/// pipeline: db_pipeline
/// table_part: db_log
/// ```
///
/// If none of the rules match the value, this pipeline will continue to process
/// current log entry
#[derive(Debug, PartialEq)]
pub(crate) struct Dispatcher {
pub field: String,
pub rules: Vec<Rule>,
}
/// The rule definition for dispatcher
///
/// - `value`: for pattern matching
/// - `pipeline`: the pipeline to call, if it's unspecified, we use default
/// `greptime_identity`
/// - `table_part`: the table name segment that we use to construct full table
/// name
#[derive(Debug, PartialEq)]
pub(crate) struct Rule {
pub value: Value,
pub table_part: String,
pub pipeline: Option<String>,
}
impl TryFrom<&Yaml> for Dispatcher {
type Error = Error;
fn try_from(value: &Yaml) -> Result<Self> {
let field = value[FIELD]
.as_str()
.map(|s| s.to_string())
.context(FieldRequiredForDispatcherSnafu)?;
let rules = if let Some(rules) = value[RULES].as_vec() {
rules
.iter()
.map(|rule| {
let table_part = rule[TABLE_PARTIAL]
.as_str()
.map(|s| s.to_string())
.context(TablePartRequiredForDispatcherRuleSnafu)?;
let pipeline = rule[PIPELINE].as_str().map(|s| s.to_string());
if rule[VALUE].is_badvalue() {
ValueRequiredForDispatcherRuleSnafu.fail()?;
}
let value = Value::try_from(&rule[VALUE])?;
Ok(Rule {
value,
table_part,
pipeline,
})
})
.collect::<Result<Vec<Rule>>>()?
} else {
vec![]
};
Ok(Dispatcher { field, rules })
}
}

View File

@@ -30,12 +30,14 @@ use transform::{TransformBuilders, Transformer, Transforms};
use value::Value;
use yaml_rust::YamlLoader;
use crate::dispatcher::Dispatcher;
use crate::etl::error::Result;
const DESCRIPTION: &str = "description";
const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";
const DISPATCHER: &str = "dispatcher";
pub enum Content<'a> {
Json(&'a str),
@@ -151,10 +153,17 @@ where
let transformer = T::new(transformers)?;
let dispatcher = if !doc[DISPATCHER].is_badvalue() {
Some(Dispatcher::try_from(&doc[DISPATCHER])?)
} else {
None
};
Ok(Pipeline {
description,
processors,
transformer,
dispatcher,
required_keys,
output_keys,
intermediate_keys: final_intermediate_keys,
@@ -171,6 +180,7 @@ where
{
description: Option<String>,
processors: processor::Processors,
dispatcher: Option<Dispatcher>,
transformer: T,
/// required keys for the preprocessing from map data from user
/// include all processor required and transformer required keys
@@ -182,22 +192,6 @@ where
// pub on_failure: processor::Processors,
}
impl<T> std::fmt::Display for Pipeline<T>
where
T: Transformer,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(description) = &self.description {
writeln!(f, "description: {description}")?;
}
let processors = self.processors.iter().map(|p| p.kind()).join(",");
writeln!(f, "processors: {processors}")?;
writeln!(f, "transformer: {}", self.transformer)
}
}
impl<T> Pipeline<T>
where
T: Transformer,
@@ -349,14 +343,12 @@ pub enum PipelineWay {
#[cfg(test)]
mod tests {
use api::v1::Rows;
use greptime_proto::v1::value::ValueData;
use greptime_proto::v1::{self, ColumnDataType, SemanticType};
use super::*;
use crate::etl::transform::GreptimeTransformer;
use crate::etl::{parse, Content, Pipeline};
use crate::Value;
#[test]
fn test_pipeline_prepare() {
@@ -578,4 +570,119 @@ transform:
value_data
);
}
#[test]
fn test_dispatcher() {
let pipeline_yaml = r#"
---
description: Pipeline for Apache Tomcat
processors:
dispatcher:
field: typename
rules:
- value: http
table_part: http_events
- value: database
table_part: db_events
pipeline: database_pipeline
transform:
- field: typename
type: string
"#;
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let dispatcher = pipeline.dispatcher.expect("expect dispatcher");
assert_eq!(dispatcher.field, "typename");
assert_eq!(dispatcher.rules.len(), 2);
assert_eq!(
dispatcher.rules[0],
crate::dispatcher::Rule {
value: Value::String("http".to_string()),
table_part: "http_events".to_string(),
pipeline: None
}
);
assert_eq!(
dispatcher.rules[1],
crate::dispatcher::Rule {
value: Value::String("database".to_string()),
table_part: "db_events".to_string(),
pipeline: Some("database_pipeline".to_string()),
}
);
let bad_yaml1 = r#"
---
description: Pipeline for Apache Tomcat
processors:
dispatcher:
_field: typename
rules:
- value: http
table_part: http_events
- value: database
table_part: db_events
pipeline: database_pipeline
transform:
- field: typename
type: string
"#;
let bad_yaml2 = r#"
---
description: Pipeline for Apache Tomcat
processors:
dispatcher:
field: typename
rules:
- value: http
_table_part: http_events
- value: database
_table_part: db_events
pipeline: database_pipeline
transform:
- field: typename
type: string
"#;
let bad_yaml3 = r#"
---
description: Pipeline for Apache Tomcat
processors:
dispatcher:
field: typename
rules:
- _value: http
table_part: http_events
- _value: database
table_part: db_events
pipeline: database_pipeline
transform:
- field: typename
type: string
"#;
let r: Result<Pipeline<GreptimeTransformer>> = parse(&Content::Yaml(bad_yaml1));
assert!(r.is_err());
let r: Result<Pipeline<GreptimeTransformer>> = parse(&Content::Yaml(bad_yaml2));
assert!(r.is_err());
let r: Result<Pipeline<GreptimeTransformer>> = parse(&Content::Yaml(bad_yaml3));
assert!(r.is_err());
}
}

View File

@@ -588,6 +588,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Field is required for dispatcher"))]
FieldRequiredForDispatcher,
#[snafu(display("table_part is required for dispatcher rule"))]
TablePartRequiredForDispatcherRule,
#[snafu(display("value is required for dispatcher rule"))]
ValueRequiredForDispatcherRule,
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -15,7 +15,6 @@
pub mod index;
pub mod transformer;
use itertools::Itertools;
use snafu::OptionExt;
use crate::etl::error::{Error, Result};
@@ -40,7 +39,7 @@ use super::error::{
use super::field::{Fields, InputFieldInfo, OneInputOneOutputField};
use super::processor::{yaml_new_field, yaml_new_fields};
pub trait Transformer: std::fmt::Display + Sized + Send + Sync + 'static {
pub trait Transformer: std::fmt::Debug + Sized + Send + Sync + 'static {
type Output;
type VecOutput;
@@ -74,14 +73,6 @@ impl std::str::FromStr for OnFailure {
}
}
impl std::fmt::Display for OnFailure {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
OnFailure::Ignore => write!(f, "ignore"),
OnFailure::Default => write!(f, "default"),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct TransformBuilders {
pub(crate) builders: Vec<TransformBuilder>,
@@ -118,18 +109,6 @@ impl Transforms {
}
}
impl std::fmt::Display for Transforms {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let transforms = self
.transforms
.iter()
.map(|field| field.to_string())
.join(", ");
write!(f, "{}", transforms)
}
}
impl std::ops::Deref for Transforms {
type Target = Vec<Transform>;
@@ -230,32 +209,6 @@ pub struct Transform {
pub on_failure: Option<OnFailure>,
}
impl std::fmt::Display for Transform {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let index = if let Some(index) = &self.index {
format!(", index: {}", index)
} else {
"".to_string()
};
let type_ = format!("type: {}", self.type_);
let fields = format!("field(s): {:?}", self.real_fields);
let default = if let Some(default) = &self.default {
format!(", default: {}", default)
} else {
"".to_string()
};
let on_failure = if let Some(on_failure) = &self.on_failure {
format!(", on_failure: {}", on_failure)
} else {
"".to_string()
};
write!(f, "{type_}{index}, {fields}{default}{on_failure}",)
}
}
impl Default for Transform {
fn default() -> Self {
Transform {

View File

@@ -92,12 +92,6 @@ impl GreptimeTransformer {
}
}
impl std::fmt::Display for GreptimeTransformer {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(f, "GreptimeTransformer.\nColumns: {}", self.transforms)
}
}
impl Transformer for GreptimeTransformer {
type Output = Rows;
type VecOutput = Row;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod dispatcher;
mod etl;
mod manager;
mod metrics;