diff --git a/Cargo.lock b/Cargo.lock index b67a23db9a..8181207532 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7720,6 +7720,7 @@ dependencies = [ "common-time", "crossbeam-utils", "csv", + "dashmap", "datafusion", "datafusion-common", "datafusion-expr", @@ -11575,6 +11576,7 @@ dependencies = [ "tokio-stream", "tonic 0.11.0", "tower", + "url", "uuid", "zstd 0.13.1", ] diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 2b431d832e..7edda5ccf1 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -19,12 +19,8 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; -use pipeline::table::{PipelineInfo, PipelineVersion}; -use pipeline::{GreptimeTransformer, Pipeline}; -use servers::error::{ - AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, - UnsupportedDeletePipelineSnafu, -}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; +use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult}; use servers::query_handler::LogHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -72,9 +68,16 @@ impl LogHandler for Instance { .context(PipelineSnafu) } - async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> { - // TODO(qtang): impl delete - Err(UnsupportedDeletePipelineSnafu {}.build()) + async fn delete_pipeline( + &self, + name: &str, + version: PipelineVersion, + ctx: QueryContextRef, + ) -> ServerResult> { + self.pipeline_operator + .delete_pipeline(name, version, ctx) + .await + .context(PipelineSnafu) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 30828bf67c..a804150761 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -22,6 +22,7 @@ use common_runtime::Builder as RuntimeBuilder; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig}; +use servers::http::event::LogValidatorRef; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -89,7 +90,8 @@ where Some(self.instance.clone()), ); - builder = builder.with_log_ingest_handler(self.instance.clone()); + builder = builder + .with_log_ingest_handler(self.instance.clone(), self.plugins.get::()); if let Some(user_provider) = self.plugins.get::() { builder = builder.with_user_provider(user_provider); diff --git a/src/pipeline/Cargo.toml b/src/pipeline/Cargo.toml index 03096b47a7..168471d756 100644 --- a/src/pipeline/Cargo.toml +++ b/src/pipeline/Cargo.toml @@ -28,6 +28,7 @@ common-telemetry.workspace = true common-time.workspace = true crossbeam-utils.workspace = true csv = "1.3.0" +dashmap.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/pipeline/src/etl/mod.rs b/src/pipeline/src/etl.rs similarity index 100% rename from src/pipeline/src/etl/mod.rs rename to src/pipeline/src/etl.rs diff --git a/src/pipeline/src/etl/processor/mod.rs b/src/pipeline/src/etl/processor.rs similarity index 100% rename from src/pipeline/src/etl/processor/mod.rs rename to src/pipeline/src/etl/processor.rs diff --git a/src/pipeline/src/etl/transform/mod.rs b/src/pipeline/src/etl/transform.rs similarity index 100% rename from src/pipeline/src/etl/transform/mod.rs rename to src/pipeline/src/etl/transform.rs diff --git a/src/pipeline/src/etl/transform/transformer/mod.rs b/src/pipeline/src/etl/transform/transformer.rs similarity index 100% rename from src/pipeline/src/etl/transform/transformer/mod.rs rename to src/pipeline/src/etl/transform/transformer.rs diff --git a/src/pipeline/src/etl/transform/transformer/greptime/mod.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs similarity index 100% rename from src/pipeline/src/etl/transform/transformer/greptime/mod.rs rename to src/pipeline/src/etl/transform/transformer/greptime.rs diff --git a/src/pipeline/src/etl/value/mod.rs b/src/pipeline/src/etl/value.rs similarity index 100% rename from src/pipeline/src/etl/value/mod.rs rename to src/pipeline/src/etl/value.rs diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 86ed9c7ea7..23c9d2c488 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -14,8 +14,12 @@ mod etl; mod manager; +mod metrics; pub use etl::transform::GreptimeTransformer; pub use etl::value::Value; pub use etl::{parse, Content, Pipeline}; -pub use manager::{error, pipeline_operator, table}; +pub use manager::{ + error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, + PipelineVersion, +}; diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs new file mode 100644 index 0000000000..960197e083 --- /dev/null +++ b/src/pipeline/src/manager.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_time::Timestamp; +use datatypes::timestamp::TimestampNanosecond; + +use crate::table::PipelineTable; +use crate::{GreptimeTransformer, Pipeline}; + +pub mod error; +pub mod pipeline_operator; +pub mod table; +pub mod util; + +/// Pipeline version. An optional timestamp with nanosecond precision. +/// If the version is None, it means the latest version of the pipeline. +/// User can specify the version by providing a timestamp string formatted as iso8601. +/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch. +pub type PipelineVersion = Option; + +/// Pipeline info. A tuple of timestamp and pipeline reference. +pub type PipelineInfo = (Timestamp, PipelineRef); + +pub type PipelineTableRef = Arc; +pub type PipelineRef = Arc>; diff --git a/src/pipeline/src/manager/error.rs b/src/pipeline/src/manager/error.rs index ad5d8a96be..07332590f1 100644 --- a/src/pipeline/src/manager/error.rs +++ b/src/pipeline/src/manager/error.rs @@ -101,6 +101,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid pipeline version format: {}", version))] + InvalidPipelineVersion { + version: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -113,9 +120,10 @@ impl ErrorExt for Error { PipelineTableNotFound { .. } => StatusCode::TableNotFound, InsertPipeline { source, .. } => source.status_code(), CollectRecords { source, .. } => source.status_code(), - PipelineNotFound { .. } | CompilePipeline { .. } | PipelineTransform { .. } => { - StatusCode::InvalidArguments - } + PipelineNotFound { .. } + | CompilePipeline { .. } + | PipelineTransform { .. } + | InvalidPipelineVersion { .. } => StatusCode::InvalidArguments, BuildDfLogicalPlan { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), Catalog { source, .. } => source.status_code(), diff --git a/src/pipeline/src/manager/mod.rs b/src/pipeline/src/manager/mod.rs deleted file mode 100644 index 95ffb5822e..0000000000 --- a/src/pipeline/src/manager/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod error; -pub mod pipeline_operator; -pub mod table; diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 5ae81a97a2..049cd80b45 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -14,11 +14,13 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use std::time::Instant; use api::v1::CreateTableExpr; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; use common_telemetry::info; +use futures::FutureExt; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; use query::QueryEngineRef; @@ -27,11 +29,14 @@ use snafu::{OptionExt, ResultExt}; use table::TableRef; use crate::error::{CatalogSnafu, CreateTableSnafu, PipelineTableNotFoundSnafu, Result}; -use crate::table::{PipelineInfo, PipelineTable, PipelineTableRef, PipelineVersion}; +use crate::manager::{PipelineInfo, PipelineTableRef, PipelineVersion}; +use crate::metrics::{ + METRIC_PIPELINE_CREATE_HISTOGRAM, METRIC_PIPELINE_DELETE_HISTOGRAM, + METRIC_PIPELINE_RETRIEVE_HISTOGRAM, +}; +use crate::table::{PipelineTable, PIPELINE_TABLE_NAME}; use crate::{GreptimeTransformer, Pipeline}; -pub const PIPELINE_TABLE_NAME: &str = "pipelines"; - /// PipelineOperator is responsible for managing pipelines. /// It provides the ability to: /// - Create a pipeline table if it does not exist @@ -50,7 +55,7 @@ pub struct PipelineOperator { impl PipelineOperator { /// Create a table request for the pipeline table. - pub fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { + fn create_table_request(&self, catalog: &str) -> RegisterSystemTableRequest { let (time_index, primary_keys, column_defs) = PipelineTable::build_pipeline_schema(); let create_table_expr = CreateTableExpr { @@ -146,20 +151,6 @@ impl PipelineOperator { pub fn get_pipeline_table_from_cache(&self, catalog: &str) -> Option { self.tables.read().unwrap().get(catalog).cloned() } - - async fn insert_and_compile( - &self, - ctx: QueryContextRef, - name: &str, - content_type: &str, - pipeline: &str, - ) -> Result { - let schema = ctx.current_schema(); - self.get_pipeline_table_from_cache(ctx.current_catalog()) - .context(PipelineTableNotFoundSnafu)? - .insert_and_compile(&schema, name, content_type, pipeline) - .await - } } impl PipelineOperator { @@ -189,9 +180,16 @@ impl PipelineOperator { let schema = query_ctx.current_schema(); self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; + + let timer = Instant::now(); self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? .get_pipeline(&schema, name, version) + .inspect(|re| { + METRIC_PIPELINE_RETRIEVE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) .await } @@ -206,7 +204,38 @@ impl PipelineOperator { self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; - self.insert_and_compile(query_ctx, name, content_type, pipeline) + let timer = Instant::now(); + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline) + .inspect(|re| { + METRIC_PIPELINE_CREATE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) + .await + } + + /// Delete a pipeline by name from pipeline table. + pub async fn delete_pipeline( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result> { + // trigger load pipeline table + self.create_pipeline_table_if_not_exists(query_ctx.clone()) + .await?; + + let timer = Instant::now(); + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .delete_pipeline(&query_ctx.current_schema(), name, version) + .inspect(|re| { + METRIC_PIPELINE_DELETE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) .await } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 365b3f4634..d3197123cc 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -25,9 +25,9 @@ use common_recordbatch::util as record_util; use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; use datafusion::datasource::DefaultTableSource; -use datafusion::logical_expr::{and, col, lit}; -use datafusion_common::TableReference; -use datafusion_expr::LogicalPlanBuilder; +use datafusion::logical_expr::col; +use datafusion_common::{TableReference, ToDFSchema}; +use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, LogicalPlanBuilder}; use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; @@ -44,36 +44,25 @@ use table::TableRef; use crate::error::{ BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, CompilePipelineSnafu, - ExecuteInternalStatementSnafu, InsertPipelineSnafu, PipelineNotFoundSnafu, Result, + ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu, + PipelineNotFoundSnafu, Result, }; use crate::etl::transform::GreptimeTransformer; use crate::etl::{parse, Content, Pipeline}; +use crate::manager::{PipelineInfo, PipelineVersion}; +use crate::util::{build_plan_filter, generate_pipeline_cache_key}; -/// Pipeline version. An optional timestamp with nanosecond precision. -/// If the version is None, it means the latest version of the pipeline. -/// User can specify the version by providing a timestamp string formatted as iso8601. -/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch. -pub type PipelineVersion = Option; - -pub type PipelineTableRef = Arc; - -pub type PipelineRef = Arc>; - -/// Pipeline info. A tuple of timestamp and pipeline reference. -pub type PipelineInfo = (Timestamp, PipelineRef); - -pub const PIPELINE_TABLE_NAME: &str = "pipelines"; - -pub const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; -pub const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; -pub const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; -pub const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline"; -pub const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at"; +pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines"; +pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; +pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; +const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; +const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline"; +pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at"; /// Pipeline table cache size. -pub const PIPELINES_CACHE_SIZE: u64 = 10000; +const PIPELINES_CACHE_SIZE: u64 = 10000; /// Pipeline table cache time to live. -pub const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); +const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); /// PipelineTable is a table that stores the pipeline schema and content. /// Every catalog has its own pipeline table. @@ -216,23 +205,6 @@ impl PipelineTable { .map_err(|e| CompilePipelineSnafu { reason: e }.build()) } - fn generate_pipeline_cache_key(schema: &str, name: &str, version: PipelineVersion) -> String { - match version { - Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)), - None => format!("{}/{}/latest", schema, name), - } - } - - fn get_compiled_pipeline_from_cache( - &self, - schema: &str, - name: &str, - version: PipelineVersion, - ) -> Option>> { - self.pipelines - .get(&Self::generate_pipeline_cache_key(schema, name, version)) - } - /// Insert a pipeline into the pipeline table. async fn insert_pipeline_to_pipeline_table( &self, @@ -276,9 +248,8 @@ impl PipelineTable { .context(InsertPipelineSnafu)?; info!( - "Inserted pipeline: {} into {} table: {}, output: {:?}.", + "Insert pipeline success, name: {:?}, table: {:?}, output: {:?}", name, - PIPELINE_TABLE_NAME, table_info.full_table_name(), output ); @@ -294,15 +265,21 @@ impl PipelineTable { name: &str, version: PipelineVersion, ) -> Result>> { - if let Some(pipeline) = self.get_compiled_pipeline_from_cache(schema, name, version) { + if let Some(pipeline) = self + .pipelines + .get(&generate_pipeline_cache_key(schema, name, version)) + { return Ok(pipeline); } - let pipeline = self.find_pipeline_by_name(schema, name, version).await?; + let pipeline = self + .find_pipeline(schema, name, version) + .await? + .context(PipelineNotFoundSnafu { name, version })?; let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); self.pipelines.insert( - Self::generate_pipeline_cache_key(schema, name, version), + generate_pipeline_cache_key(schema, name, version), compiled_pipeline.clone(), ); Ok(compiled_pipeline) @@ -325,11 +302,11 @@ impl PipelineTable { { self.pipelines.insert( - Self::generate_pipeline_cache_key(schema, name, None), + generate_pipeline_cache_key(schema, name, None), compiled_pipeline.clone(), ); self.pipelines.insert( - Self::generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), + generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), compiled_pipeline.clone(), ); } @@ -337,12 +314,91 @@ impl PipelineTable { Ok((version, compiled_pipeline)) } - async fn find_pipeline_by_name( + pub async fn delete_pipeline( &self, schema: &str, name: &str, version: PipelineVersion, - ) -> Result<(String, TimestampNanosecond)> { + ) -> Result> { + // 0. version is ensured at the http api level not None + ensure!( + version.is_some(), + InvalidPipelineVersionSnafu { version: "None" } + ); + + // 1. check pipeline exist in catalog + let pipeline = self.find_pipeline(schema, name, version).await?; + if pipeline.is_none() { + return Ok(None); + } + + // 2. do delete + let table_info = self.table.table_info(); + let table_name = TableReference::full( + table_info.catalog_name.clone(), + table_info.schema_name.clone(), + table_info.name.clone(), + ); + let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); + let table_source = Arc::new(DefaultTableSource::new(table_provider)); + + let df_schema = Arc::new( + table_info + .meta + .schema + .arrow_schema() + .clone() + .to_dfschema() + .context(BuildDfLogicalPlanSnafu)?, + ); + + // create scan plan + let logical_plan = LogicalPlanBuilder::scan(table_name.clone(), table_source, None) + .context(BuildDfLogicalPlanSnafu)? + .filter(build_plan_filter(schema, name, version)) + .context(BuildDfLogicalPlanSnafu)? + .build() + .context(BuildDfLogicalPlanSnafu)?; + + // create dml stmt + let stmt = DmlStatement::new( + table_name, + df_schema, + datafusion_expr::WriteOp::Delete, + Arc::new(logical_plan), + ); + + let plan = LogicalPlan::DfPlan(DfLogicalPlan::Dml(stmt)); + + let output = self + .query_engine + .execute(plan, Self::query_ctx(&table_info)) + .await + .context(ExecuteInternalStatementSnafu)?; + + info!( + "Delete pipeline success, name: {:?}, version: {:?}, table: {:?}, output: {:?}", + name, + version, + table_info.full_table_name(), + output + ); + + // remove cache with version and latest + self.pipelines + .remove(&generate_pipeline_cache_key(schema, name, version)); + self.pipelines + .remove(&generate_pipeline_cache_key(schema, name, None)); + + Ok(Some(())) + } + + async fn find_pipeline( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result> { let table_info = self.table.table_info(); let table_name = TableReference::full( @@ -353,22 +409,10 @@ impl PipelineTable { let table_provider = Arc::new(DfTableProviderAdapter::new(self.table.clone())); let table_source = Arc::new(DefaultTableSource::new(table_provider)); - let schema_and_name_filter = and( - col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), - col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), - ); - let filter = if let Some(v) = version { - and( - schema_and_name_filter, - col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), - ) - } else { - schema_and_name_filter - }; let plan = LogicalPlanBuilder::scan(table_name, table_source, None) .context(BuildDfLogicalPlanSnafu)? - .filter(filter) + .filter(build_plan_filter(schema, name, version)) .context(BuildDfLogicalPlanSnafu)? .project(vec![ col(PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME), @@ -401,8 +445,11 @@ impl PipelineTable { .await .context(CollectRecordsSnafu)?; - ensure!(!records.is_empty(), PipelineNotFoundSnafu { name, version }); + if records.is_empty() { + return Ok(None); + } + // limit 1 ensure!( records.len() == 1 && records[0].num_columns() == 2, PipelineNotFoundSnafu { name, version } @@ -441,9 +488,9 @@ impl PipelineTable { ); // Safety: asserted above - Ok(( + Ok(Some(( pipeline_content.get_data(0).unwrap().to_string(), pipeline_created_at.get_data(0).unwrap(), - )) + ))) } } diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs new file mode 100644 index 0000000000..6133c64215 --- /dev/null +++ b/src/pipeline/src/manager/util.rs @@ -0,0 +1,98 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::Timestamp; +use datafusion_expr::{and, col, lit, Expr}; +use datatypes::timestamp::TimestampNanosecond; + +use crate::error::{InvalidPipelineVersionSnafu, Result}; +use crate::table::{ + PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME, + PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME, +}; +use crate::PipelineVersion; + +pub fn to_pipeline_version(version_str: Option) -> Result { + match version_str { + Some(version) => { + let ts = Timestamp::from_str_utc(&version) + .map_err(|_| InvalidPipelineVersionSnafu { version }.build())?; + Ok(Some(TimestampNanosecond(ts))) + } + None => Ok(None), + } +} + +pub(crate) fn build_plan_filter(schema: &str, name: &str, version: PipelineVersion) -> Expr { + let schema_and_name_filter = and( + col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), + col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), + ); + if let Some(v) = version { + and( + schema_and_name_filter, + col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).eq(lit(v.0.to_iso8601_string())), + ) + } else { + schema_and_name_filter + } +} + +pub(crate) fn generate_pipeline_cache_key( + schema: &str, + name: &str, + version: PipelineVersion, +) -> String { + match version { + Some(version) => format!("{}/{}/{}", schema, name, i64::from(version)), + None => format!("{}/{}/latest", schema, name), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_to_pipeline_version() { + let none_result = to_pipeline_version(None); + assert!(none_result.is_ok()); + assert!(none_result.unwrap().is_none()); + + let some_result = to_pipeline_version(Some("2023-01-01 00:00:00Z".to_string())); + assert!(some_result.is_ok()); + assert_eq!( + some_result.unwrap(), + Some(TimestampNanosecond::new(1672531200000000000)) + ); + + let invalid = to_pipeline_version(Some("invalid".to_string())); + assert!(invalid.is_err()); + } + + #[test] + fn test_generate_pipeline_cache_key() { + let schema = "test_schema"; + let name = "test_name"; + let latest = generate_pipeline_cache_key(schema, name, None); + assert_eq!(latest, "test_schema/test_name/latest"); + + let versioned = generate_pipeline_cache_key( + schema, + name, + Some(TimestampNanosecond::new(1672531200000000000)), + ); + assert_eq!(versioned, "test_schema/test_name/1672531200000000000"); + } +} diff --git a/src/pipeline/src/metrics.rs b/src/pipeline/src/metrics.rs new file mode 100644 index 0000000000..280f5619d4 --- /dev/null +++ b/src/pipeline/src/metrics.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::{register_histogram_vec, HistogramVec}; + +lazy_static! { + pub static ref METRIC_PIPELINE_CREATE_HISTOGRAM: HistogramVec = register_histogram_vec!( + "greptime_pipeline_create_duration_seconds", + "Histogram of the pipeline creation duration", + &["success"] + ) + .unwrap(); + pub static ref METRIC_PIPELINE_DELETE_HISTOGRAM: HistogramVec = register_histogram_vec!( + "greptime_pipeline_delete_duration_seconds", + "Histogram of the pipeline deletion duration", + &["success"] + ) + .unwrap(); + pub static ref METRIC_PIPELINE_RETRIEVE_HISTOGRAM: HistogramVec = register_histogram_vec!( + "greptime_pipeline_retrieve_duration_seconds", + "Histogram of the pipeline retrieval duration", + &["success"] + ) + .unwrap(); +} diff --git a/src/pipeline/tests/gsub.rs b/src/pipeline/tests/gsub.rs index 5d25bf188b..f1209a6f88 100644 --- a/src/pipeline/tests/gsub.rs +++ b/src/pipeline/tests/gsub.rs @@ -29,7 +29,7 @@ fn test_gsub() { let pipeline_yaml = r#" --- -description: Pipeline for Akamai DataStream2 Log +description: Pipeline for Demo Log processors: - gsub: diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index ff9cad1bde..08f2ad3811 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -81,7 +81,7 @@ fn test_complex_data() { let pipeline_yaml = r#" --- -description: Pipeline for Akamai DataStream2 Log +description: Pipeline for Demo Log processors: - urlencoding: diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 04b6fa196c..80c9344415 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -156,12 +156,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Unsupported delete pipeline."))] - UnsupportedDeletePipeline { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to execute script by name: {}", name))] ExecuteScript { name: String, @@ -635,7 +629,6 @@ impl ErrorExt for Error { | FileWatch { .. } => StatusCode::Internal, UnsupportedDataType { .. } => StatusCode::Unsupported, - UnsupportedDeletePipeline { .. } => StatusCode::Unsupported, #[cfg(not(windows))] UpdateJemallocMetrics { .. } => StatusCode::Internal, diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 26e0349de2..6204908c03 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::schema::SchemaRef; +use event::{LogState, LogValidatorRef}; use futures::FutureExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -91,6 +92,7 @@ pub mod csv_result; #[cfg(feature = "dashboard")] mod dashboard; pub mod error_result; +pub mod greptime_manage_resp; pub mod greptime_result_v1; pub mod influxdb_result_v1; pub mod table_result; @@ -589,11 +591,15 @@ impl HttpServerBuilder { } } - pub fn with_log_ingest_handler(self, handler: LogHandlerRef) -> Self { + pub fn with_log_ingest_handler( + self, + handler: LogHandlerRef, + validator: Option, + ) -> Self { Self { router: self.router.nest( &format!("/{HTTP_API_VERSION}/events"), - HttpServer::route_log(handler), + HttpServer::route_log(handler, validator), ), ..self } @@ -721,19 +727,29 @@ impl HttpServer { .with_state(metrics_handler) } - fn route_log(log_handler: LogHandlerRef) -> Router { + fn route_log( + log_handler: LogHandlerRef, + log_validator: Option, + ) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) .route( "/pipelines/:pipeline_name", routing::post(event::add_pipeline), ) + .route( + "/pipelines/:pipeline_name", + routing::delete(event::delete_pipeline), + ) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) .layer(RequestDecompressionLayer::new()), ) - .with_state(log_handler) + .with_state(LogState { + log_handler, + log_validator, + }) } fn route_sql(api_state: ApiState) -> ApiRouter { diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 536337233c..ea436009b0 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -13,6 +13,8 @@ // limitations under the License. use std::result::Result as StdResult; +use std::sync::Arc; +use std::time::Instant; use api::v1::{RowInsertRequest, RowInsertRequests, Rows}; use axum::body::HttpBody; @@ -23,22 +25,20 @@ use axum::http::{Request, StatusCode}; use axum::response::{IntoResponse, Response}; use axum::{async_trait, BoxError, Extension, TypedHeader}; use common_telemetry::{error, warn}; -use common_time::{Timestamp, Timezone}; -use datatypes::timestamp::TimestampNanosecond; -use http::{HeaderMap, HeaderValue}; use mime_guess::mime; use pipeline::error::{CastTypeSnafu, PipelineTransformSnafu}; -use pipeline::table::PipelineVersion; -use pipeline::Value as PipelineValue; +use pipeline::util::to_pipeline_version; +use pipeline::{PipelineVersion, Value as PipelineValue}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use serde_json::{json, Deserializer, Value}; +use serde_json::{Deserializer, Value}; use session::context::QueryContextRef; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, }; +use crate::http::greptime_manage_resp::GreptimedbManageResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; use crate::query_handler::LogHandlerRef; @@ -51,6 +51,7 @@ pub struct LogIngesterQueryParams { pub ignore_errors: Option, pub version: Option, + pub source: Option, } pub struct PipelineContent(String); @@ -100,11 +101,13 @@ where #[axum_macros::debug_handler] pub async fn add_pipeline( - State(handler): State, + State(state): State, Path(pipeline_name): Path, Extension(query_ctx): Extension, PipelineContent(payload): PipelineContent, -) -> Result { +) -> Result { + let start = Instant::now(); + let handler = state.log_handler; if pipeline_name.is_empty() { return Err(InvalidParameterSnafu { reason: "pipeline_name is required in path", @@ -126,22 +129,10 @@ pub async fn add_pipeline( result .map(|pipeline| { - let json_header = - HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(); - let mut headers = HeaderMap::new(); - headers.append(CONTENT_TYPE, json_header); - // Safety check: unwrap is safe here because we have checked the format of the timestamp - let version = pipeline - .0 - .as_formatted_string( - "%Y-%m-%d %H:%M:%S%.fZ", - // Safety check: unwrap is safe here because we have checked the format of the timezone - Some(Timezone::from_tz_string("UTC").unwrap()).as_ref(), - ) - .unwrap(); - ( - headers, - json!({"version": version, "name": pipeline_name}).to_string(), + GreptimedbManageResponse::from_pipeline( + pipeline_name, + pipeline.0.to_timezone_aware_string(None), + start.elapsed().as_millis() as u64, ) }) .map_err(|e| { @@ -150,6 +141,48 @@ pub async fn add_pipeline( }) } +#[axum_macros::debug_handler] +pub async fn delete_pipeline( + State(state): State, + Extension(query_ctx): Extension, + Query(query_params): Query, + Path(pipeline_name): Path, +) -> Result { + let start = Instant::now(); + let handler = state.log_handler; + ensure!( + !pipeline_name.is_empty(), + InvalidParameterSnafu { + reason: "pipeline_name is required", + } + ); + + let version_str = query_params.version.context(InvalidParameterSnafu { + reason: "version is required", + })?; + + let version = to_pipeline_version(Some(version_str.clone())).context(PipelineSnafu)?; + + handler + .delete_pipeline(&pipeline_name, version, query_ctx) + .await + .map(|v| { + if v.is_some() { + GreptimedbManageResponse::from_pipeline( + pipeline_name, + version_str, + start.elapsed().as_millis() as u64, + ) + } else { + GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64) + } + }) + .map_err(|e| { + error!(e; "failed to delete pipeline"); + e + }) +} + /// Transform NDJSON array into a single array fn transform_ndjson_array_factory( values: impl IntoIterator>, @@ -192,12 +225,20 @@ fn transform_ndjson_array_factory( #[axum_macros::debug_handler] pub async fn log_ingester( - State(handler): State, + State(log_state): State, Query(query_params): Query, Extension(query_ctx): Extension, TypedHeader(content_type): TypedHeader, payload: String, ) -> Result { + if let Some(log_validator) = log_state.log_validator { + if let Some(response) = log_validator.validate(query_params.source.clone(), &payload) { + return response; + } + } + + let handler = log_state.log_handler; + let pipeline_name = query_params.pipeline_name.context(InvalidParameterSnafu { reason: "pipeline_name is required", })?; @@ -205,18 +246,7 @@ pub async fn log_ingester( reason: "table is required", })?; - let version = match query_params.version { - Some(version) => { - let ts = Timestamp::from_str_utc(&version).map_err(|e| { - InvalidParameterSnafu { - reason: format!("invalid pipeline version: {} with error: {}", &version, e), - } - .build() - })?; - Some(TimestampNanosecond(ts)) - } - None => None, - }; + let version = to_pipeline_version(query_params.version).context(PipelineSnafu)?; let ignore_errors = query_params.ignore_errors.unwrap_or(false); @@ -276,3 +306,18 @@ async fn ingest_logs_inner( .with_execution_time(start.elapsed().as_millis() as u64); Ok(response) } + +pub trait LogValidator { + /// validate payload by source before processing + /// Return a `Some` result to indicate validation failure. + fn validate(&self, source: Option, payload: &str) -> Option>; +} + +pub type LogValidatorRef = Arc; + +/// axum state struct to hold log handler and validator +#[derive(Clone)] +pub struct LogState { + pub log_handler: LogHandlerRef, + pub log_validator: Option, +} diff --git a/src/servers/src/http/greptime_manage_resp.rs b/src/servers/src/http/greptime_manage_resp.rs new file mode 100644 index 0000000000..d2f61715b5 --- /dev/null +++ b/src/servers/src/http/greptime_manage_resp.rs @@ -0,0 +1,136 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use axum::response::IntoResponse; +use axum::Json; +use http::header::CONTENT_TYPE; +use http::HeaderValue; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; + +/// Greptimedb Manage Api Response struct +/// Currently we have `Pipelines` and `Scripts` as control panel api +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct GreptimedbManageResponse { + #[serde(flatten)] + pub(crate) manage_result: ManageResult, + pub(crate) execution_time_ms: u64, +} + +impl GreptimedbManageResponse { + pub fn from_pipeline(name: String, version: String, execution_time_ms: u64) -> Self { + GreptimedbManageResponse { + manage_result: ManageResult::Pipelines { + pipelines: vec![PipelineOutput { name, version }], + }, + execution_time_ms, + } + } + + pub fn from_pipelines(pipelines: Vec, execution_time_ms: u64) -> Self { + GreptimedbManageResponse { + manage_result: ManageResult::Pipelines { pipelines }, + execution_time_ms, + } + } + + pub fn with_execution_time(mut self, execution_time: u64) -> Self { + self.execution_time_ms = execution_time; + self + } + + pub fn execution_time_ms(&self) -> u64 { + self.execution_time_ms + } +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +#[serde(untagged)] +pub enum ManageResult { + Pipelines { pipelines: Vec }, + // todo(shuiyisong): refactor scripts api + Scripts(), +} + +#[derive(Serialize, Deserialize, Debug, JsonSchema)] +pub struct PipelineOutput { + name: String, + version: String, +} + +impl IntoResponse for GreptimedbManageResponse { + fn into_response(self) -> axum::response::Response { + let execution_time = self.execution_time_ms; + + let mut resp = Json(self).into_response(); + + // We deliberately don't add this format into [`crate::http::ResponseFormat`] + // because this is a format for manage api other than the data query api + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_FORMAT, + HeaderValue::from_static("greptimedb_manage"), + ); + resp.headers_mut().insert( + &GREPTIME_DB_HEADER_EXECUTION_TIME, + HeaderValue::from(execution_time), + ); + resp.headers_mut().insert( + CONTENT_TYPE, + HeaderValue::from_str(mime_guess::mime::APPLICATION_JSON.as_ref()).unwrap(), + ); + + resp + } +} + +#[cfg(test)] +mod tests { + + use arrow::datatypes::ToByteSlice; + use http_body::Body; + use hyper::body::to_bytes; + + use super::*; + + #[tokio::test] + async fn test_into_response() { + let resp = GreptimedbManageResponse { + manage_result: ManageResult::Pipelines { + pipelines: vec![PipelineOutput { + name: "test_name".to_string(), + version: "test_version".to_string(), + }], + }, + execution_time_ms: 42, + }; + + let mut re = resp.into_response(); + let data = re.data(); + + let data_str = format!("{:?}", data); + assert_eq!( + data_str, + r#"Data(Response { status: 200, version: HTTP/1.1, headers: {"content-type": "application/json", "x-greptime-format": "greptimedb_manage", "x-greptime-execution-time": "42"}, body: UnsyncBoxBody })"# + ); + + let body_bytes = to_bytes(re.into_body()).await.unwrap(); + let body_str = String::from_utf8_lossy(body_bytes.to_byte_slice()); + assert_eq!( + body_str, + r#"{"pipelines":[{"name":"test_name","version":"test_version"}],"execution_time_ms":42}"# + ); + } +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index cdd628b4bb..1fe64e6522 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -35,8 +35,7 @@ use common_query::Output; use headers::HeaderValue; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::table::{PipelineInfo, PipelineVersion}; -use pipeline::{GreptimeTransformer, Pipeline}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; use serde_json::Value; use session::context::QueryContextRef; @@ -145,5 +144,10 @@ pub trait LogHandler { query_ctx: QueryContextRef, ) -> Result; - async fn delete_pipeline(&self, name: &str, query_ctx: QueryContextRef) -> Result<()>; + async fn delete_pipeline( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result>; } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 887f04a3b2..905b03bdd1 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -90,3 +90,4 @@ script.workspace = true session = { workspace = true, features = ["testing"] } store-api.workspace = true tokio-postgres = "0.7" +url = "2.3" diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 04054b524d..76f523071b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -426,6 +426,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), Some(instance.instance.clone()), ) + .with_log_ingest_handler(instance.instance.clone(), None) .with_greptime_config_options(instance.opts.to_toml().unwrap()); if let Some(user_provider) = user_provider { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c9c8468078..57ad46dfe0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -19,7 +19,7 @@ use auth::user_provider_from_option; use axum::http::{HeaderName, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use prost::Message; -use serde_json::json; +use serde_json::{json, Value}; use servers::http::error_result::ErrorResponse; use servers::http::greptime_result_v1::GreptimedbV1Response; use servers::http::handler::HealthResponse; @@ -76,6 +76,8 @@ macro_rules! http_tests { test_dashboard_path, test_prometheus_remote_write, test_vm_proto_remote_write, + + test_pipeline_api, ); )* }; @@ -1000,3 +1002,119 @@ pub async fn test_vm_proto_remote_write(store_type: StorageType) { guard.remove_all().await; } + +pub async fn test_pipeline_api(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_pipeline_api").await; + + // handshake + let client = TestClient::new(app); + + let body = r#" +processors: + - date: + field: time + formats: + - "%Y-%m-%d %H:%M:%S%.3f" + ignore_missing: true + +transform: + - fields: + - id1 + - id2 + type: int32 + - fields: + - type + - log + - logger + type: string + - field: time + type: time + index: timestamp +"#; + + // 1. create pipeline + let res = client + .post("/v1/events/pipelines/test") + .header("Content-Type", "application/x-yaml") + .body(body) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + + let content = serde_json::from_str(&content); + assert!(content.is_ok()); + // {"execution_time_ms":13,"pipelines":[{"name":"test","version":"2024-07-04 08:31:00.987136"}]} + let content: Value = content.unwrap(); + + let execution_time = content.get("execution_time_ms"); + assert!(execution_time.unwrap().is_number()); + let pipelines = content.get("pipelines"); + let pipelines = pipelines.unwrap().as_array().unwrap(); + assert_eq!(pipelines.len(), 1); + let pipeline = pipelines.first().unwrap(); + assert_eq!(pipeline.get("name").unwrap(), "test"); + + let version_str = pipeline + .get("version") + .unwrap() + .as_str() + .unwrap() + .to_string(); + + // 2. write data + let data_body = r#" +[ + { + "id1": "2436", + "id2": "2528", + "logger": "INTERACT.MANAGER", + "type": "I", + "time": "2024-05-25 20:16:37.217", + "log": "ClusterAdapter:enter sendTextDataToCluster\\n" + } +] +"#; + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + let encoded: String = url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect(); + + // 3. remove pipeline + let res = client + .delete(format!("/v1/events/pipelines/test?version={}", encoded).as_str()) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + // {"pipelines":[{"name":"test","version":"2024-07-04 08:55:29.038347"}],"execution_time_ms":22} + let content = res.text().await; + let content: Value = serde_json::from_str(&content).unwrap(); + assert!(content.get("execution_time_ms").unwrap().is_number()); + + assert_eq!( + content.get("pipelines").unwrap().to_string(), + format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str() + ); + + // 4. write data failed + let res = client + .post("/v1/events/logs?db=public&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + // todo(shuiyisong): refactor http error handling + assert_ne!(res.status(), StatusCode::OK); + + guard.remove_all().await; +}