chore: enhance add pipeline http api return data (#4167)

* chore: enhance add pipeline http api return data

* chore: replaceing hard code header value
This commit is contained in:
localhost
2024-06-20 10:19:31 +08:00
committed by GitHub
parent 8abebad458
commit 48a0f39b19
5 changed files with 42 additions and 17 deletions

View File

@@ -19,7 +19,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::table::PipelineVersion;
use pipeline::table::{PipelineInfo, PipelineVersion};
use pipeline::{GreptimeTransformer, Pipeline};
use servers::error::{
AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
@@ -65,7 +65,7 @@ impl LogHandler for Instance {
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> ServerResult<()> {
) -> ServerResult<PipelineInfo> {
self.pipeline_operator
.insert_pipeline(name, content_type, pipeline, query_ctx)
.await

View File

@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
use table::TableRef;
use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result};
use crate::table::{PipelineTable, PipelineTableRef, PipelineVersion};
use crate::table::{PipelineInfo, PipelineTable, PipelineTableRef, PipelineVersion};
use crate::{GreptimeTransformer, Pipeline};
pub const PIPELINE_TABLE_NAME: &str = "pipelines";
@@ -153,7 +153,7 @@ impl PipelineOperator {
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
) -> Result<PipelineInfo> {
self.get_pipeline_table_from_cache(ctx.current_catalog())
.context(PipelineTableNotFoundSnafu)?
.insert_and_compile(ctx.current_schema(), name, content_type, pipeline)
@@ -200,12 +200,11 @@ impl PipelineOperator {
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> Result<()> {
) -> Result<PipelineInfo> {
self.create_pipeline_table_if_not_exists(query_ctx.clone())
.await?;
self.insert_and_compile(query_ctx, name, content_type, pipeline)
.await
.map(|_| ())
}
}

View File

@@ -57,6 +57,11 @@ pub type PipelineVersion = Option<TimestampNanosecond>;
pub type PipelineTableRef = Arc<PipelineTable>;
pub type PipelineRef = Arc<Pipeline<GreptimeTransformer>>;
/// Pipeline info. A tuple of timestamp and pipeline reference.
pub type PipelineInfo = (Timestamp, PipelineRef);
pub const PIPELINE_TABLE_NAME: &str = "pipelines";
pub const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name";
@@ -311,7 +316,7 @@ impl PipelineTable {
name: &str,
content_type: &str,
pipeline: &str,
) -> Result<Arc<Pipeline<GreptimeTransformer>>> {
) -> Result<PipelineInfo> {
let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?);
// we will use the version in the future
let version = self
@@ -329,7 +334,7 @@ impl PipelineTable {
);
}
Ok(compiled_pipeline)
Ok((version, compiled_pipeline))
}
async fn find_pipeline_by_name(

View File

@@ -23,15 +23,16 @@ use axum::http::{Request, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::{async_trait, BoxError, Extension, TypedHeader};
use common_telemetry::{error, warn};
use common_time::Timestamp;
use common_time::{Timestamp, Timezone};
use datatypes::timestamp::TimestampNanosecond;
use http::{HeaderMap, HeaderValue};
use mime_guess::mime;
use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu};
use pipeline::table::PipelineVersion;
use pipeline::Value as PipelineValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{Deserializer, Value};
use serde_json::{json, Deserializer, Value};
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
@@ -103,7 +104,7 @@ pub async fn add_pipeline(
Path(pipeline_name): Path<String>,
Extension(query_ctx): Extension<QueryContextRef>,
PipelineContent(payload): PipelineContent,
) -> Result<String> {
) -> Result<impl IntoResponse> {
if pipeline_name.is_empty() {
return Err(InvalidParameterSnafu {
reason: "pipeline_name is required in path",
@@ -123,10 +124,30 @@ pub async fn add_pipeline(
.insert_pipeline(&pipeline_name, content_type, &payload, query_ctx)
.await;
result.map(|_| "ok".to_string()).map_err(|e| {
error!(e; "failed to insert pipeline");
e
})
result
.map(|pipeline| {
let json_header =
HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap();
let mut headers = HeaderMap::new();
headers.append(CONTENT_TYPE, json_header);
// Safety check: unwrap is safe here because we have checked the format of the timestamp
let version = pipeline
.0
.as_formatted_string(
"%Y-%m-%d %H:%M:%S%.fZ",
// Safety check: unwrap is safe here because we have checked the format of the timezone
Some(Timezone::from_tz_string("UTC").unwrap()).as_ref(),
)
.unwrap();
(
headers,
json!({"version": version, "name": pipeline_name}).to_string(),
)
})
.map_err(|e| {
error!(e; "failed to insert pipeline");
e
})
}
/// Transform NDJSON array into a single array

View File

@@ -35,7 +35,7 @@ use common_query::Output;
use headers::HeaderValue;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::table::PipelineVersion;
use pipeline::table::{PipelineInfo, PipelineVersion};
use pipeline::{GreptimeTransformer, Pipeline};
use serde_json::Value;
use session::context::QueryContextRef;
@@ -143,7 +143,7 @@ pub trait LogHandler {
content_type: &str,
pipeline: &str,
query_ctx: QueryContextRef,
) -> Result<()>;
) -> Result<PipelineInfo>;
async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()>;
}