chore: pipeline dryrun api can currently receives pipeline raw content (#5142)

* chore: pipeline dryrun api can currently receives pipeline raw content

* chore: remove dryrun v1 and add test

* chore: change dryrun pipeline api body schema

* chore: remove useless struct PipelineInfo

* chore: update PipelineDryrunParams doc

* chore: increase code readability

* chore: add some comment for pipeline dryrun test

* Apply suggestions from code review

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>

* chore: format code

---------

Co-authored-by: shuiyisong <113876041+shuiyisong@users.noreply.github.com>
This commit is contained in:
localhost
2024-12-12 19:47:21 +08:00
committed by GitHub
parent fee75a1fad
commit e8e9526738
11 changed files with 311 additions and 147 deletions

View File

@@ -19,6 +19,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
@@ -97,6 +98,10 @@ impl PipelineHandler for Instance {
.table(catalog, &schema, table, None)
.await
}
fn build_pipeline(&self, pipeline: &str) -> ServerResult<Pipeline<GreptimeTransformer>> {
PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu)
}
}
impl Instance {

View File

@@ -223,7 +223,7 @@ transform:
type: uint32
"#;
parse(&Content::Yaml(pipeline_yaml.into())).unwrap()
parse(&Content::Yaml(pipeline_yaml)).unwrap()
}
fn criterion_benchmark(c: &mut Criterion) {

View File

@@ -37,9 +37,9 @@ const PROCESSORS: &str = "processors";
const TRANSFORM: &str = "transform";
const TRANSFORMS: &str = "transforms";
pub enum Content {
Json(String),
Yaml(String),
pub enum Content<'a> {
Json(&'a str),
Yaml(&'a str),
}
pub fn parse<T>(input: &Content) -> Result<Pipeline<T>>
@@ -379,8 +379,7 @@ transform:
- field: field2
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["my_field"].to_vec(), pipeline.required_keys());
@@ -432,8 +431,7 @@ transform:
- field: ts
type: timestamp, ns
index: time"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_str.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_str)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline
.prepare(serde_json::Value::String(message), &mut payload)
@@ -509,8 +507,7 @@ transform:
type: uint32
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let mut payload = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut payload).unwrap();
assert_eq!(&["my_field"].to_vec(), pipeline.required_keys());
@@ -554,8 +551,7 @@ transform:
index: time
"#;
let pipeline: Pipeline<GreptimeTransformer> =
parse(&Content::Yaml(pipeline_yaml.into())).unwrap();
let pipeline: Pipeline<GreptimeTransformer> = parse(&Content::Yaml(pipeline_yaml)).unwrap();
let schema = pipeline.schemas().clone();
let mut result = pipeline.init_intermediate_state();
pipeline.prepare(input_value, &mut result).unwrap();

View File

@@ -243,4 +243,9 @@ impl PipelineOperator {
})
.await
}
/// Compile a pipeline.
pub fn build_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
PipelineTable::compile_pipeline(pipeline)
}
}

View File

@@ -203,7 +203,7 @@ impl PipelineTable {
/// Compile a pipeline from a string.
pub fn compile_pipeline(pipeline: &str) -> Result<Pipeline<GreptimeTransformer>> {
let yaml_content = Content::Yaml(pipeline.into());
let yaml_content = Content::Yaml(pipeline);
parse::<GreptimeTransformer>(&yaml_content).context(CompilePipelineSnafu)
}

View File

@@ -19,7 +19,7 @@ use pipeline::{parse, Content, GreptimeTransformer, Pipeline};
pub fn parse_and_exec(input_str: &str, pipeline_yaml: &str) -> Rows {
let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();

View File

@@ -270,7 +270,7 @@ transform:
let input_value = serde_json::from_str::<serde_json::Value>(input_str).unwrap();
let yaml_content = pipeline::Content::Yaml(pipeline_yaml.into());
let yaml_content = pipeline::Content::Yaml(pipeline_yaml);
let pipeline: pipeline::Pipeline<pipeline::GreptimeTransformer> =
pipeline::parse(&yaml_content).expect("failed to parse pipeline");
let mut result = pipeline.init_intermediate_state();

View File

@@ -417,7 +417,7 @@ transform:
.map(|(_, d)| GreptimeValue { value_data: d })
.collect::<Vec<GreptimeValue>>();
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> =
parse(&yaml_content).expect("failed to parse pipeline");
let mut stats = pipeline.init_intermediate_state();
@@ -487,7 +487,7 @@ transform:
type: json
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let mut status = pipeline.init_intermediate_state();
@@ -592,7 +592,7 @@ transform:
type: json
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let mut status = pipeline.init_intermediate_state();
@@ -655,7 +655,7 @@ transform:
index: timestamp
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let mut status = pipeline.init_intermediate_state();
@@ -691,7 +691,7 @@ transform:
- message
type: string
"#;
let yaml_content = Content::Yaml(pipeline_yaml.into());
let yaml_content = Content::Yaml(pipeline_yaml);
let pipeline: Pipeline<GreptimeTransformer> = parse(&yaml_content).unwrap();
let mut status = pipeline.init_intermediate_state();

View File

@@ -38,7 +38,7 @@ use lazy_static::lazy_static;
use loki_api::prost_types::Timestamp;
use pipeline::error::PipelineTransformSnafu;
use pipeline::util::to_pipeline_version;
use pipeline::PipelineVersion;
use pipeline::{GreptimeTransformer, PipelineVersion};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Map, Value};
@@ -276,39 +276,11 @@ fn transform_ndjson_array_factory(
})
}
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
/// Dryrun pipeline with given data
fn dryrun_pipeline_inner(
value: Vec<Value>,
pipeline: &pipeline::Pipeline<GreptimeTransformer>,
) -> Result<Response> {
let handler = log_state.log_handler;
let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})?;
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
let value = extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
ensure!(
value.len() <= 10,
InvalidParameterSnafu {
reason: "too many rows for dryrun",
}
);
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
let mut intermediate_state = pipeline.init_intermediate_state();
let mut results = Vec::with_capacity(value.len());
@@ -387,6 +359,110 @@ pub async fn pipeline_dryrun(
Ok(Json(result).into_response())
}
/// Dryrun pipeline with given data
/// pipeline_name and pipeline_version to specify pipeline stored in db
/// pipeline to specify pipeline raw content
/// data to specify data
/// data maght be list of string or list of object
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct PipelineDryrunParams {
pub pipeline_name: Option<String>,
pub pipeline_version: Option<String>,
pub pipeline: Option<String>,
pub data: Vec<Value>,
}
/// Check if the payload is valid json
/// Check if the payload contains pipeline or pipeline_name and data
/// Return Some if valid, None if invalid
fn check_pipeline_dryrun_params_valid(payload: &str) -> Option<PipelineDryrunParams> {
match serde_json::from_str::<PipelineDryrunParams>(payload) {
// payload with pipeline or pipeline_name and data is array
Ok(params) if params.pipeline.is_some() || params.pipeline_name.is_some() => Some(params),
// because of the pipeline_name or pipeline is required
Ok(_) => None,
// invalid json
Err(_) => None,
}
}
/// Check if the pipeline_name exists
fn check_pipeline_name_exists(pipeline_name: Option<String>) -> Result<String> {
pipeline_name.context(InvalidParameterSnafu {
reason: "pipeline_name is required",
})
}
/// Check if the data length less than 10
fn check_data_valid(data_len: usize) -> Result<()> {
ensure!(
data_len <= 10,
InvalidParameterSnafu {
reason: "data is required",
}
);
Ok(())
}
#[axum_macros::debug_handler]
pub async fn pipeline_dryrun(
State(log_state): State<LogState>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(mut query_ctx): Extension<QueryContext>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<Response> {
let handler = log_state.log_handler;
match check_pipeline_dryrun_params_valid(&payload) {
Some(params) => {
let data = params.data;
check_data_valid(data.len())?;
match params.pipeline {
None => {
let version =
to_pipeline_version(params.pipeline_version).context(PipelineSnafu)?;
let pipeline_name = check_pipeline_name_exists(params.pipeline_name)?;
let pipeline = handler
.get_pipeline(&pipeline_name, version, Arc::new(query_ctx))
.await?;
dryrun_pipeline_inner(data, &pipeline)
}
Some(pipeline) => {
let pipeline = handler.build_pipeline(&pipeline)?;
dryrun_pipeline_inner(data, &pipeline)
}
}
}
None => {
// This path is for back compatibility with the previous dry run code
// where the payload is just data (JSON or plain text) and the pipeline name
// is specified using query param.
let pipeline_name = check_pipeline_name_exists(query_params.pipeline_name)?;
let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?;
let ignore_errors = query_params.ignore_errors.unwrap_or(false);
let value =
extract_pipeline_value_by_content_type(content_type, payload, ignore_errors)?;
check_data_valid(value.len())?;
query_ctx.set_channel(Channel::Http);
let query_ctx = Arc::new(query_ctx);
let pipeline = handler
.get_pipeline(&pipeline_name, version, query_ctx.clone())
.await?;
dryrun_pipeline_inner(value, &pipeline)
}
}
}
#[axum_macros::debug_handler]
pub async fn loki_ingest(
State(log_state): State<LogState>,

View File

@@ -170,4 +170,7 @@ pub trait PipelineHandler {
table: &str,
query_ctx: &QueryContext,
) -> std::result::Result<Option<Arc<table::Table>>, catalog::error::Error>;
//// Build a pipeline from a string.
fn build_pipeline(&self, pipeline: &str) -> Result<Pipeline<GreptimeTransformer>>;
}

View File

@@ -1319,7 +1319,7 @@ pub async fn test_test_pipeline_api(store_type: StorageType) {
// handshake
let client = TestClient::new(app);
let body = r#"
let pipeline_content = r#"
processors:
- date:
field: time
@@ -1346,7 +1346,7 @@ transform:
let res = client
.post("/v1/events/pipelines/test")
.header("Content-Type", "application/x-yaml")
.body(body)
.body(pipeline_content)
.send()
.await;
@@ -1367,8 +1367,87 @@ transform:
let pipeline = pipelines.first().unwrap();
assert_eq!(pipeline.get("name").unwrap(), "test");
// 2. write data
let data_body = r#"
let dryrun_schema = json!([
{
"colume_type": "FIELD",
"data_type": "INT32",
"fulltext": false,
"name": "id1"
},
{
"colume_type": "FIELD",
"data_type": "INT32",
"fulltext": false,
"name": "id2"
},
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "type"
},
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "log"
},
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "logger"
},
{
"colume_type": "TIMESTAMP",
"data_type": "TIMESTAMP_NANOSECOND",
"fulltext": false,
"name": "time"
}
]);
let dryrun_rows = json!([
[
{
"data_type": "INT32",
"key": "id1",
"semantic_type": "FIELD",
"value": 2436
},
{
"data_type": "INT32",
"key": "id2",
"semantic_type": "FIELD",
"value": 2528
},
{
"data_type": "STRING",
"key": "type",
"semantic_type": "FIELD",
"value": "I"
},
{
"data_type": "STRING",
"key": "log",
"semantic_type": "FIELD",
"value": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"data_type": "STRING",
"key": "logger",
"semantic_type": "FIELD",
"value": "INTERACT.MANAGER"
},
{
"data_type": "TIMESTAMP_NANOSECOND",
"key": "time",
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:37.217+0000"
}
]
]);
{
// test original api
let data_body = r#"
[
{
"id1": "2436",
@@ -1380,100 +1459,100 @@ transform:
}
]
"#;
let res = client
.post("/v1/events/pipelines/dryrun?pipeline_name=test")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await;
let schema = &body["schema"];
let rows = &body["rows"];
assert_eq!(
schema,
&json!([
let res = client
.post("/v1/events/pipelines/dryrun?pipeline_name=test")
.header("Content-Type", "application/json")
.body(data_body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await;
let schema = &body["schema"];
let rows = &body["rows"];
assert_eq!(schema, &dryrun_schema);
assert_eq!(rows, &dryrun_rows);
}
{
// test new api specify pipeline via pipeline_name
let body = r#"
{
"colume_type": "FIELD",
"data_type": "INT32",
"fulltext": false,
"name": "id1"
},
{
"colume_type": "FIELD",
"data_type": "INT32",
"fulltext": false,
"name": "id2"
},
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "type"
},
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "log"
},
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "logger"
},
{
"colume_type": "TIMESTAMP",
"data_type": "TIMESTAMP_NANOSECOND",
"fulltext": false,
"name": "time"
}
])
);
assert_eq!(
rows,
&json!([
[
"pipeline_name": "test",
"data": [
{
"data_type": "INT32",
"key": "id1",
"semantic_type": "FIELD",
"value": 2436
},
{
"data_type": "INT32",
"key": "id2",
"semantic_type": "FIELD",
"value": 2528
},
{
"data_type": "STRING",
"key": "type",
"semantic_type": "FIELD",
"value": "I"
},
{
"data_type": "STRING",
"key": "log",
"semantic_type": "FIELD",
"value": "ClusterAdapter:enter sendTextDataToCluster\\n"
},
{
"data_type": "STRING",
"key": "logger",
"semantic_type": "FIELD",
"value": "INTERACT.MANAGER"
},
{
"data_type": "TIMESTAMP_NANOSECOND",
"key": "time",
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:37.217+0000"
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
}
]
])
);
}
"#;
let res = client
.post("/v1/events/pipelines/dryrun")
.header("Content-Type", "application/json")
.body(body)
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await;
let schema = &body["schema"];
let rows = &body["rows"];
assert_eq!(schema, &dryrun_schema);
assert_eq!(rows, &dryrun_rows);
}
{
// test new api specify pipeline via pipeline raw data
let mut body = json!({
"data": [
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
}
]
});
body["pipeline"] = json!(pipeline_content);
let res = client
.post("/v1/events/pipelines/dryrun")
.header("Content-Type", "application/json")
.body(body.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
let body: Value = res.json().await;
let schema = &body["schema"];
let rows = &body["rows"];
assert_eq!(schema, &dryrun_schema);
assert_eq!(rows, &dryrun_rows);
}
{
// failback to old version api
// not pipeline and pipeline_name in the body
let body = json!({
"data": [
{
"id1": "2436",
"id2": "2528",
"logger": "INTERACT.MANAGER",
"type": "I",
"time": "2024-05-25 20:16:37.217",
"log": "ClusterAdapter:enter sendTextDataToCluster\\n"
}
]
});
let res = client
.post("/v1/events/pipelines/dryrun")
.header("Content-Type", "application/json")
.body(body.to_string())
.send()
.await;
assert_eq!(res.status(), StatusCode::BAD_REQUEST);
}
guard.remove_all().await;
}