diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 6ef48205cc..2b431d832e 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -19,7 +19,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; -use pipeline::table::PipelineVersion; +use pipeline::table::{PipelineInfo, PipelineVersion}; use pipeline::{GreptimeTransformer, Pipeline}; use servers::error::{ AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, @@ -65,7 +65,7 @@ impl LogHandler for Instance { content_type: &str, pipeline: &str, query_ctx: QueryContextRef, - ) -> ServerResult<()> { + ) -> ServerResult { self.pipeline_operator .insert_pipeline(name, content_type, pipeline, query_ctx) .await diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 390a48d834..364bc13745 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt}; use table::TableRef; use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; -use crate::table::{PipelineTable, PipelineTableRef, PipelineVersion}; +use crate::table::{PipelineInfo, PipelineTable, PipelineTableRef, PipelineVersion}; use crate::{GreptimeTransformer, Pipeline}; pub const PIPELINE_TABLE_NAME: &str = "pipelines"; @@ -153,7 +153,7 @@ impl PipelineOperator { name: &str, content_type: &str, pipeline: &str, - ) -> Result>> { + ) -> Result { self.get_pipeline_table_from_cache(ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? .insert_and_compile(ctx.current_schema(), name, content_type, pipeline) @@ -200,12 +200,11 @@ impl PipelineOperator { content_type: &str, pipeline: &str, query_ctx: QueryContextRef, - ) -> Result<()> { + ) -> Result { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; self.insert_and_compile(query_ctx, name, content_type, pipeline) .await - .map(|_| ()) } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index d037ae3d48..365b3f4634 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -57,6 +57,11 @@ pub type PipelineVersion = Option; pub type PipelineTableRef = Arc; +pub type PipelineRef = Arc>; + +/// Pipeline info. A tuple of timestamp and pipeline reference. +pub type PipelineInfo = (Timestamp, PipelineRef); + pub const PIPELINE_TABLE_NAME: &str = "pipelines"; pub const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; @@ -311,7 +316,7 @@ impl PipelineTable { name: &str, content_type: &str, pipeline: &str, - ) -> Result>> { + ) -> Result { let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?); // we will use the version in the future let version = self @@ -329,7 +334,7 @@ impl PipelineTable { ); } - Ok(compiled_pipeline) + Ok((version, compiled_pipeline)) } async fn find_pipeline_by_name( diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index f9939b8057..536337233c 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -23,15 +23,16 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; use common_telemetry::{error, warn}; -use common_time::Timestamp; +use common_time::{Timestamp, Timezone}; use datatypes::timestamp::TimestampNanosecond; +use http::{HeaderMap, HeaderValue}; use mime_guess::mime; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; use pipeline::table::PipelineVersion; use pipeline::Value as PipelineValue; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use serde_json::{Deserializer, Value}; +use serde_json::{json, Deserializer, Value}; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -103,7 +104,7 @@ pub async fn add_pipeline( Path(pipeline_name): Path, Extension(query_ctx): Extension, PipelineContent(payload): PipelineContent, -) -> Result { +) -> Result { if pipeline_name.is_empty() { return Err(InvalidParameterSnafu { reason: "pipeline_name is required in path", @@ -123,10 +124,30 @@ pub async fn add_pipeline( .insert_pipeline(&pipeline_name, content_type, &payload, query_ctx) .await; - result.map(|_| "ok".to_string()).map_err(|e| { - error!(e; "failed to insert pipeline"); - e - }) + result + .map(|pipeline| { + let json_header = + HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(); + let mut headers = HeaderMap::new(); + headers.append(CONTENT_TYPE, json_header); + // Safety check: unwrap is safe here because we have checked the format of the timestamp + let version = pipeline + .0 + .as_formatted_string( + "%Y-%m-%d %H:%M:%S%.fZ", + // Safety check: unwrap is safe here because we have checked the format of the timezone + Some(Timezone::from_tz_string("UTC").unwrap()).as_ref(), + ) + .unwrap(); + ( + headers, + json!({"version": version, "name": pipeline_name}).to_string(), + ) + }) + .map_err(|e| { + error!(e; "failed to insert pipeline"); + e + }) } /// Transform NDJSON array into a single array diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index f0c1170e07..cdd628b4bb 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -35,7 +35,7 @@ use common_query::Output; use headers::HeaderValue; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::table::PipelineVersion; +use pipeline::table::{PipelineInfo, PipelineVersion}; use pipeline::{GreptimeTransformer, Pipeline}; use serde_json::Value; use session::context::QueryContextRef; @@ -143,7 +143,7 @@ pub trait LogHandler { content_type: &str, pipeline: &str, query_ctx: QueryContextRef, - ) -> Result<()>; + ) -> Result; async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()>; }