diff --git a/Cargo.lock b/Cargo.lock index d8e5f15cdf..b133daa143 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12087,6 +12087,7 @@ dependencies = [ "tower 0.5.2", "url", "uuid", + "yaml-rust", "zstd 0.13.2", ] diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 029a40e980..179d5e098f 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; +use datatypes::timestamp::TimestampNanosecond; use pipeline::pipeline_operator::PipelineOperator; use pipeline::{Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{ @@ -103,6 +104,18 @@ impl PipelineHandler for Instance { fn build_pipeline(&self, pipeline: &str) -> ServerResult { PipelineOperator::build_pipeline(pipeline).context(PipelineSnafu) } + + async fn get_pipeline_str( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> ServerResult<(String, TimestampNanosecond)> { + self.pipeline_operator + .get_pipeline_str(name, version, query_ctx) + .await + .context(PipelineSnafu) + } } impl Instance { diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index f98bab6b6c..c5d11574b7 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -20,6 +20,7 @@ use api::v1::CreateTableExpr; use catalog::{CatalogManagerRef, RegisterSystemTableRequest}; use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME}; use common_telemetry::info; +use datatypes::timestamp::TimestampNanosecond; use futures::FutureExt; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; @@ -198,6 +199,29 @@ impl PipelineOperator { .await } + /// Get a original pipeline by name. + pub async fn get_pipeline_str( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result<(String, TimestampNanosecond)> { + let schema = query_ctx.current_schema(); + self.create_pipeline_table_if_not_exists(query_ctx.clone()) + .await?; + + let timer = Instant::now(); + self.get_pipeline_table_from_cache(query_ctx.current_catalog()) + .context(PipelineTableNotFoundSnafu)? + .get_pipeline_str(&schema, name, version) + .inspect(|re| { + METRIC_PIPELINE_RETRIEVE_HISTOGRAM + .with_label_values(&[&re.is_ok().to_string()]) + .observe(timer.elapsed().as_secs_f64()) + }) + .await + } + /// Insert a pipeline into the pipeline table. pub async fn insert_pipeline( &self, diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index f01742d7c2..649bab9f6e 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -69,6 +69,7 @@ pub struct PipelineTable { table: TableRef, query_engine: QueryEngineRef, pipelines: Cache>, + original_pipelines: Cache, } impl PipelineTable { @@ -88,6 +89,10 @@ impl PipelineTable { .max_capacity(PIPELINES_CACHE_SIZE) .time_to_live(PIPELINES_CACHE_TTL) .build(), + original_pipelines: Cache::builder() + .max_capacity(PIPELINES_CACHE_SIZE) + .time_to_live(PIPELINES_CACHE_TTL) + .build(), } } @@ -273,10 +278,7 @@ impl PipelineTable { return Ok(pipeline); } - let pipeline = self - .find_pipeline(schema, name, version) - .await? - .context(PipelineNotFoundSnafu { name, version })?; + let pipeline = self.get_pipeline_str(schema, name, version).await?; let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); self.pipelines.insert( @@ -286,6 +288,31 @@ impl PipelineTable { Ok(compiled_pipeline) } + /// Get a original pipeline by name. + /// If the pipeline is not in the cache, it will be get from table and compiled and inserted into the cache. + pub async fn get_pipeline_str( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result<(String, TimestampNanosecond)> { + if let Some(pipeline) = self + .original_pipelines + .get(&generate_pipeline_cache_key(schema, name, version)) + { + return Ok(pipeline); + } + let pipeline = self + .find_pipeline(schema, name, version) + .await? + .context(PipelineNotFoundSnafu { name, version })?; + self.original_pipelines.insert( + generate_pipeline_cache_key(schema, name, version), + pipeline.clone(), + ); + Ok(pipeline) + } + /// Insert a pipeline into the pipeline table and compile it. /// The compiled pipeline will be inserted into the cache. pub async fn insert_and_compile( @@ -310,6 +337,15 @@ impl PipelineTable { generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), compiled_pipeline.clone(), ); + + self.original_pipelines.insert( + generate_pipeline_cache_key(schema, name, None), + (pipeline.to_owned(), TimestampNanosecond(version)), + ); + self.original_pipelines.insert( + generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), + (pipeline.to_owned(), TimestampNanosecond(version)), + ); } Ok((version, compiled_pipeline)) @@ -392,6 +428,10 @@ impl PipelineTable { .remove(&generate_pipeline_cache_key(schema, name, version)); self.pipelines .remove(&generate_pipeline_cache_key(schema, name, None)); + self.original_pipelines + .remove(&generate_pipeline_cache_key(schema, name, version)); + self.original_pipelines + .remove(&generate_pipeline_cache_key(schema, name, None)); Ok(Some(())) } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 07f589f127..07d1c7ea24 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -928,6 +928,10 @@ impl HttpServer { fn route_log_deprecated(log_state: LogState) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) + .route( + "/pipelines/{pipeline_name}", + routing::get(event::query_pipeline), + ) .route( "/pipelines/{pipeline_name}", routing::post(event::add_pipeline), @@ -947,6 +951,10 @@ impl HttpServer { fn route_pipelines(log_state: LogState) -> Router { Router::new() .route("/ingest", routing::post(event::log_ingester)) + .route( + "/pipelines/{pipeline_name}", + routing::get(event::query_pipeline), + ) .route( "/pipelines/{pipeline_name}", routing::post(event::add_pipeline), diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index bee41b8053..cfc0694ae0 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -147,6 +147,41 @@ where } } +#[axum_macros::debug_handler] +pub async fn query_pipeline( + State(state): State, + Extension(mut query_ctx): Extension, + Query(query_params): Query, + Path(pipeline_name): Path, +) -> Result { + let start = Instant::now(); + let handler = state.log_handler; + ensure!( + !pipeline_name.is_empty(), + InvalidParameterSnafu { + reason: "pipeline_name is required in path", + } + ); + + let version = to_pipeline_version(query_params.version.as_deref()).context(PipelineSnafu)?; + + query_ctx.set_channel(Channel::Http); + let query_ctx = Arc::new(query_ctx); + + let (pipeline, pipeline_version) = handler + .get_pipeline_str(&pipeline_name, version, query_ctx) + .await?; + + Ok(GreptimedbManageResponse::from_pipeline( + pipeline_name, + query_params + .version + .unwrap_or(pipeline_version.0.to_iso8601_string()), + start.elapsed().as_millis() as u64, + Some(pipeline), + )) +} + #[axum_macros::debug_handler] pub async fn add_pipeline( State(state): State, @@ -189,6 +224,7 @@ pub async fn add_pipeline( pipeline_name, pipeline.0.to_timezone_aware_string(None), start.elapsed().as_millis() as u64, + None, ) }) .map_err(|e| { @@ -231,6 +267,7 @@ pub async fn delete_pipeline( pipeline_name, version_str, start.elapsed().as_millis() as u64, + None, ) } else { GreptimedbManageResponse::from_pipelines(vec![], start.elapsed().as_millis() as u64) diff --git a/src/servers/src/http/result/greptime_manage_resp.rs b/src/servers/src/http/result/greptime_manage_resp.rs index a46df298b4..3db07028b3 100644 --- a/src/servers/src/http/result/greptime_manage_resp.rs +++ b/src/servers/src/http/result/greptime_manage_resp.rs @@ -30,10 +30,19 @@ pub struct GreptimedbManageResponse { } impl GreptimedbManageResponse { - pub fn from_pipeline(name: String, version: String, execution_time_ms: u64) -> Self { + pub fn from_pipeline( + name: String, + version: String, + execution_time_ms: u64, + pipeline: Option, + ) -> Self { GreptimedbManageResponse { manage_result: ManageResult::Pipelines { - pipelines: vec![PipelineOutput { name, version }], + pipelines: vec![PipelineOutput { + name, + version, + pipeline, + }], }, execution_time_ms, } @@ -68,6 +77,8 @@ pub enum ManageResult { pub struct PipelineOutput { name: String, version: String, + #[serde(skip_serializing_if = "Option::is_none")] + pipeline: Option, } impl IntoResponse for GreptimedbManageResponse { @@ -109,6 +120,7 @@ mod tests { pipelines: vec![PipelineOutput { name: "test_name".to_string(), version: "test_version".to_string(), + pipeline: None, }], }, execution_time_ms: 42, diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index b4e734fca2..30fd360455 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -34,6 +34,7 @@ use api::v1::RowInsertRequests; use async_trait::async_trait; use catalog::CatalogManager; use common_query::Output; +use datatypes::timestamp::TimestampNanosecond; use headers::HeaderValue; use log_query::LogQuery; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; @@ -165,6 +166,14 @@ pub trait PipelineHandler { //// Build a pipeline from a string. fn build_pipeline(&self, pipeline: &str) -> Result; + + /// Get a original pipeline by name. + async fn get_pipeline_str( + &self, + name: &str, + version: PipelineVersion, + query_ctx: QueryContextRef, + ) -> Result<(String, TimestampNanosecond)>; } /// Handle log query requests. diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 0ce38049ca..f2f538f528 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -100,3 +100,4 @@ session = { workspace = true, features = ["testing"] } store-api.workspace = true tokio-postgres = { workspace = true } url = "2.3" +yaml-rust = "0.4" diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index d20fe50241..8ef27cdbf0 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -50,6 +50,7 @@ use tests_integration::test_util::{ setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend, StorageType, }; +use yaml_rust::YamlLoader; #[macro_export] macro_rules! http_test { @@ -1348,6 +1349,55 @@ transform: .as_str() .unwrap() .to_string(); + let encoded_ver_str: String = + url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect(); + + // get pipeline + let res = client + .get(format!("/v1/pipelines/test?version={}", encoded_ver_str).as_str()) + .send() + .await; + + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + let content = serde_json::from_str(&content); + let content: Value = content.unwrap(); + let pipeline_yaml = content + .get("pipelines") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("pipeline") + .unwrap() + .as_str() + .unwrap(); + let docs = YamlLoader::load_from_str(pipeline_yaml).unwrap(); + let body_yaml = YamlLoader::load_from_str(body).unwrap(); + assert_eq!(docs, body_yaml); + + // Do not specify version, get the latest version + let res = client.get("/v1/pipelines/test").send().await; + assert_eq!(res.status(), StatusCode::OK); + + let content = res.text().await; + let content = serde_json::from_str(&content); + let content: Value = content.unwrap(); + let pipeline_yaml = content + .get("pipelines") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .get("pipeline") + .unwrap() + .as_str() + .unwrap(); + let docs = YamlLoader::load_from_str(pipeline_yaml).unwrap(); + assert_eq!(docs, body_yaml); // 2. write data let data_body = r#"