mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 15:40:02 +00:00
chore: add more info for pipeline dryrun API (#5232)
This commit is contained in:
@@ -159,6 +159,7 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Pipeline management api error"))]
|
||||
Pipeline {
|
||||
#[snafu(source)]
|
||||
source: pipeline::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
|
||||
@@ -30,6 +30,7 @@ use axum::http::{Request, StatusCode};
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::{async_trait, BoxError, Extension, Json, TypedHeader};
|
||||
use bytes::Bytes;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_query::prelude::GREPTIME_TIMESTAMP;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_telemetry::{error, warn};
|
||||
@@ -41,13 +42,13 @@ use pipeline::util::to_pipeline_version;
|
||||
use pipeline::{GreptimeTransformer, PipelineVersion};
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Deserializer, Map, Value};
|
||||
use serde_json::{json, Deserializer, Map, Value};
|
||||
use session::context::{Channel, QueryContext, QueryContextRef};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu, ParseJson5Snafu,
|
||||
ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
|
||||
status_code_to_http_status, CatalogSnafu, DecodeOtlpRequestSnafu, Error, InvalidParameterSnafu,
|
||||
ParseJson5Snafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu,
|
||||
};
|
||||
use crate::http::extractor::LogTableName;
|
||||
use crate::http::header::CONTENT_TYPE_PROTOBUF_STR;
|
||||
@@ -404,6 +405,14 @@ fn check_data_valid(data_len: usize) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_step_info_for_pipeline_dryrun_error(step_msg: &str, e: Error) -> Response {
|
||||
let body = Json(json!({
|
||||
"error": format!("{}: {}", step_msg,e.output_msg()),
|
||||
}));
|
||||
|
||||
(status_code_to_http_status(&e.status_code()), body).into_response()
|
||||
}
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
pub async fn pipeline_dryrun(
|
||||
State(log_state): State<LogState>,
|
||||
@@ -431,8 +440,20 @@ pub async fn pipeline_dryrun(
|
||||
dryrun_pipeline_inner(data, &pipeline)
|
||||
}
|
||||
Some(pipeline) => {
|
||||
let pipeline = handler.build_pipeline(&pipeline)?;
|
||||
dryrun_pipeline_inner(data, &pipeline)
|
||||
let pipeline = handler.build_pipeline(&pipeline);
|
||||
match pipeline {
|
||||
Ok(pipeline) => match dryrun_pipeline_inner(data, &pipeline) {
|
||||
Ok(response) => Ok(response),
|
||||
Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
|
||||
"Failed to exec pipeline",
|
||||
e,
|
||||
)),
|
||||
},
|
||||
Err(e) => Ok(add_step_info_for_pipeline_dryrun_error(
|
||||
"Failed to build pipeline",
|
||||
e,
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user