From af6cf999c1b443e63fae8576cd79c0abe2fd3323 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Fri, 30 May 2025 15:19:32 +0800 Subject: [PATCH] chore: shared pipeline under same catalog with compatibility (#6143) * chore: support shared pipeline under catalog with compatibility * test: add test for cross schema ref * chore: use empty string schema by default * chore: remove unwrap in the patch * fix: df check --- src/pipeline/src/error.rs | 21 +- src/pipeline/src/manager.rs | 1 + src/pipeline/src/manager/pipeline_cache.rs | 184 +++++++++++++ src/pipeline/src/manager/pipeline_operator.rs | 4 +- src/pipeline/src/manager/table.rs | 242 ++++++++++-------- src/pipeline/src/manager/util.rs | 19 +- src/servers/src/http/event.rs | 2 +- tests-integration/tests/http.rs | 48 +++- 8 files changed, 390 insertions(+), 131 deletions(-) create mode 100644 src/pipeline/src/manager/pipeline_cache.rs diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 97ecc40491..60ac3c945c 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -650,6 +650,24 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}", + schemas + ))] + MultiPipelineWithDiffSchema { + schemas: String, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display( + "The return value's length of the record batch does not match, see debug log for details" + ))] + RecordBatchLenNotMatch { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect record batch"))] CollectRecords { #[snafu(implicit)] @@ -750,7 +768,8 @@ impl ErrorExt for Error { PipelineNotFound { .. } | InvalidPipelineVersion { .. } | InvalidCustomTimeIndex { .. } => StatusCode::InvalidArguments, - BuildDfLogicalPlan { .. } => StatusCode::Internal, + MultiPipelineWithDiffSchema { .. } => StatusCode::IllegalState, + BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal, ExecuteInternalStatement { source, .. } => source.status_code(), DataFrame { source, .. } => source.status_code(), Catalog { source, .. } => source.status_code(), diff --git a/src/pipeline/src/manager.rs b/src/pipeline/src/manager.rs index 306ffd9afe..70ed8860ac 100644 --- a/src/pipeline/src/manager.rs +++ b/src/pipeline/src/manager.rs @@ -29,6 +29,7 @@ use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RES use crate::table::PipelineTable; use crate::{GreptimePipelineParams, Pipeline, Value}; +mod pipeline_cache; pub mod pipeline_operator; pub mod table; pub mod util; diff --git a/src/pipeline/src/manager/pipeline_cache.rs b/src/pipeline/src/manager/pipeline_cache.rs new file mode 100644 index 0000000000..68a7d482d2 --- /dev/null +++ b/src/pipeline/src/manager/pipeline_cache.rs @@ -0,0 +1,184 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use datatypes::timestamp::TimestampNanosecond; +use moka::sync::Cache; + +use crate::error::{MultiPipelineWithDiffSchemaSnafu, Result}; +use crate::etl::Pipeline; +use crate::manager::PipelineVersion; +use crate::table::EMPTY_SCHEMA_NAME; +use crate::util::{generate_pipeline_cache_key, generate_pipeline_cache_key_suffix}; + +/// Pipeline table cache size. +const PIPELINES_CACHE_SIZE: u64 = 10000; +/// Pipeline table cache time to live. +const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); + +/// Pipeline cache is located on a separate file on purpose, +/// to encapsulate inner cache. Only public methods are exposed. +pub(crate) struct PipelineCache { + pipelines: Cache>, + original_pipelines: Cache, +} + +impl PipelineCache { + pub(crate) fn new() -> Self { + Self { + pipelines: Cache::builder() + .max_capacity(PIPELINES_CACHE_SIZE) + .time_to_live(PIPELINES_CACHE_TTL) + .build(), + original_pipelines: Cache::builder() + .max_capacity(PIPELINES_CACHE_SIZE) + .time_to_live(PIPELINES_CACHE_TTL) + .build(), + } + } + + pub(crate) fn insert_pipeline_cache( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + pipeline: Arc, + with_latest: bool, + ) { + insert_cache_generic( + &self.pipelines, + schema, + name, + version, + pipeline, + with_latest, + ); + } + + pub(crate) fn insert_pipeline_str_cache( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + pipeline: (String, TimestampNanosecond), + with_latest: bool, + ) { + insert_cache_generic( + &self.original_pipelines, + schema, + name, + version, + pipeline, + with_latest, + ); + } + + pub(crate) fn get_pipeline_cache( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result>> { + get_cache_generic(&self.pipelines, schema, name, version) + } + + pub(crate) fn get_pipeline_str_cache( + &self, + schema: &str, + name: &str, + version: PipelineVersion, + ) -> Result> { + get_cache_generic(&self.original_pipelines, schema, name, version) + } + + // remove cache with version and latest in all schemas + pub(crate) fn remove_cache(&self, name: &str, version: PipelineVersion) { + let version_suffix = generate_pipeline_cache_key_suffix(name, version); + let latest_suffix = generate_pipeline_cache_key_suffix(name, None); + + let ks = self + .pipelines + .iter() + .filter_map(|(k, _)| { + if k.ends_with(&version_suffix) || k.ends_with(&latest_suffix) { + Some(k.clone()) + } else { + None + } + }) + .collect::>(); + + for k in ks { + let k = k.as_str(); + self.pipelines.remove(k); + self.original_pipelines.remove(k); + } + } +} + +fn insert_cache_generic( + cache: &Cache, + schema: &str, + name: &str, + version: PipelineVersion, + value: T, + with_latest: bool, +) { + let k = generate_pipeline_cache_key(schema, name, version); + cache.insert(k, value.clone()); + if with_latest { + let k = generate_pipeline_cache_key(schema, name, None); + cache.insert(k, value); + } +} + +fn get_cache_generic( + cache: &Cache, + schema: &str, + name: &str, + version: PipelineVersion, +) -> Result> { + // lets try empty schema first + let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version); + if let Some(value) = cache.get(&k) { + return Ok(Some(value)); + } + // use input schema + let k = generate_pipeline_cache_key(schema, name, version); + if let Some(value) = cache.get(&k) { + return Ok(Some(value)); + } + + // try all schemas + let suffix_key = generate_pipeline_cache_key_suffix(name, version); + let mut ks = cache + .iter() + .filter(|e| e.0.ends_with(&suffix_key)) + .collect::>(); + + match ks.len() { + 0 => Ok(None), + 1 => Ok(Some(ks.remove(0).1)), + _ => MultiPipelineWithDiffSchemaSnafu { + schemas: ks + .iter() + .filter_map(|(k, _)| k.split_once('/').map(|k| k.0)) + .collect::>() + .join(","), + } + .fail()?, + } +} diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index c5d11574b7..6ad190cf23 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -236,7 +236,7 @@ impl PipelineOperator { let timer = Instant::now(); self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? - .insert_and_compile(&query_ctx.current_schema(), name, content_type, pipeline) + .insert_and_compile(name, content_type, pipeline) .inspect(|re| { METRIC_PIPELINE_CREATE_HISTOGRAM .with_label_values(&[&re.is_ok().to_string()]) @@ -259,7 +259,7 @@ impl PipelineOperator { let timer = Instant::now(); self.get_pipeline_table_from_cache(query_ctx.current_catalog()) .context(PipelineTableNotFoundSnafu)? - .delete_pipeline(&query_ctx.current_schema(), name, version) + .delete_pipeline(name, version) .inspect(|re| { METRIC_PIPELINE_DELETE_HISTOGRAM .with_label_values(&[&re.is_ok().to_string()]) diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index bfd1a8c626..c2ddf439a1 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; use api::v1::value::ValueData; use api::v1::{ @@ -24,13 +23,12 @@ use common_query::OutputData; use common_recordbatch::util as record_util; use common_telemetry::{debug, info}; use common_time::timestamp::{TimeUnit, Timestamp}; -use datafusion::logical_expr::col; use datafusion_common::{TableReference, ToDFSchema}; -use datafusion_expr::{DmlStatement, LogicalPlan}; +use datafusion_expr::{col, DmlStatement, LogicalPlan}; use datatypes::prelude::ScalarVector; use datatypes::timestamp::TimestampNanosecond; use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector}; -use moka::sync::Cache; +use itertools::Itertools; use operator::insert::InserterRef; use operator::statement::StatementExecutorRef; use query::dataframe::DataFrame; @@ -43,23 +41,20 @@ use table::TableRef; use crate::error::{ BuildDfLogicalPlanSnafu, CastTypeSnafu, CollectRecordsSnafu, DataFrameSnafu, ExecuteInternalStatementSnafu, InsertPipelineSnafu, InvalidPipelineVersionSnafu, - PipelineNotFoundSnafu, Result, + MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result, }; use crate::etl::{parse, Content, Pipeline}; +use crate::manager::pipeline_cache::PipelineCache; use crate::manager::{PipelineInfo, PipelineVersion}; -use crate::util::{generate_pipeline_cache_key, prepare_dataframe_conditions}; +use crate::util::prepare_dataframe_conditions; pub(crate) const PIPELINE_TABLE_NAME: &str = "pipelines"; pub(crate) const PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME: &str = "name"; -pub(crate) const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; +const PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME: &str = "schema"; const PIPELINE_TABLE_PIPELINE_CONTENT_TYPE_COLUMN_NAME: &str = "content_type"; const PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME: &str = "pipeline"; pub(crate) const PIPELINE_TABLE_CREATED_AT_COLUMN_NAME: &str = "created_at"; - -/// Pipeline table cache size. -const PIPELINES_CACHE_SIZE: u64 = 10000; -/// Pipeline table cache time to live. -const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); +pub(crate) const EMPTY_SCHEMA_NAME: &str = ""; /// PipelineTable is a table that stores the pipeline schema and content. /// Every catalog has its own pipeline table. @@ -68,8 +63,7 @@ pub struct PipelineTable { statement_executor: StatementExecutorRef, table: TableRef, query_engine: QueryEngineRef, - pipelines: Cache>, - original_pipelines: Cache, + cache: PipelineCache, } impl PipelineTable { @@ -85,14 +79,7 @@ impl PipelineTable { statement_executor, table, query_engine, - pipelines: Cache::builder() - .max_capacity(PIPELINES_CACHE_SIZE) - .time_to_live(PIPELINES_CACHE_TTL) - .build(), - original_pipelines: Cache::builder() - .max_capacity(PIPELINES_CACHE_SIZE) - .time_to_live(PIPELINES_CACHE_TTL) - .build(), + cache: PipelineCache::new(), } } @@ -214,7 +201,6 @@ impl PipelineTable { /// Insert a pipeline into the pipeline table. async fn insert_pipeline_to_pipeline_table( &self, - schema: &str, name: &str, content_type: &str, pipeline: &str, @@ -230,7 +216,7 @@ impl PipelineTable { rows: vec![Row { values: vec![ ValueData::StringValue(name.to_string()).into(), - ValueData::StringValue(schema.to_string()).into(), + ValueData::StringValue(EMPTY_SCHEMA_NAME.to_string()).into(), ValueData::StringValue(content_type.to_string()).into(), ValueData::StringValue(pipeline.to_string()).into(), ValueData::TimestampNanosecondValue(now.value()).into(), @@ -272,20 +258,15 @@ impl PipelineTable { name: &str, version: PipelineVersion, ) -> Result> { - if let Some(pipeline) = self - .pipelines - .get(&generate_pipeline_cache_key(schema, name, version)) - { + if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? { return Ok(pipeline); } let pipeline = self.get_pipeline_str(schema, name, version).await?; let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); - self.pipelines.insert( - generate_pipeline_cache_key(schema, name, version), - compiled_pipeline.clone(), - ); + self.cache + .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false); Ok(compiled_pipeline) } @@ -297,28 +278,56 @@ impl PipelineTable { name: &str, version: PipelineVersion, ) -> Result<(String, TimestampNanosecond)> { - if let Some(pipeline) = self - .original_pipelines - .get(&generate_pipeline_cache_key(schema, name, version)) - { + if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? { return Ok(pipeline); } - let pipeline = self - .find_pipeline(schema, name, version) - .await? - .context(PipelineNotFoundSnafu { name, version })?; - self.original_pipelines.insert( - generate_pipeline_cache_key(schema, name, version), - pipeline.clone(), + + let mut pipeline_vec = self.find_pipeline(name, version).await?; + ensure!( + !pipeline_vec.is_empty(), + PipelineNotFoundSnafu { name, version } ); - Ok(pipeline) + + // if the result is exact one, use it + if pipeline_vec.len() == 1 { + let (pipeline_content, found_schema, version) = pipeline_vec.remove(0); + let p = (pipeline_content, version); + self.cache.insert_pipeline_str_cache( + &found_schema, + name, + Some(version), + p.clone(), + false, + ); + return Ok(p); + } + + // check if there's empty schema pipeline + // if there isn't, check current schema + let pipeline = pipeline_vec + .iter() + .find(|v| v.1 == EMPTY_SCHEMA_NAME) + .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema)); + + // multiple pipeline with no empty or current schema + // throw an error + let (pipeline_content, found_schema, version) = + pipeline.context(MultiPipelineWithDiffSchemaSnafu { + schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","), + })?; + + let v = *version; + let p = (pipeline_content.clone(), v); + self.cache + .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false); + Ok(p) } /// Insert a pipeline into the pipeline table and compile it. /// The compiled pipeline will be inserted into the cache. + /// Newly created pipelines will be saved under empty schema. pub async fn insert_and_compile( &self, - schema: &str, name: &str, content_type: &str, pipeline: &str, @@ -326,26 +335,24 @@ impl PipelineTable { let compiled_pipeline = Arc::new(Self::compile_pipeline(pipeline)?); // we will use the version in the future let version = self - .insert_pipeline_to_pipeline_table(schema, name, content_type, pipeline) + .insert_pipeline_to_pipeline_table(name, content_type, pipeline) .await?; { - self.pipelines.insert( - generate_pipeline_cache_key(schema, name, None), - compiled_pipeline.clone(), - ); - self.pipelines.insert( - generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), + self.cache.insert_pipeline_cache( + EMPTY_SCHEMA_NAME, + name, + Some(TimestampNanosecond(version)), compiled_pipeline.clone(), + true, ); - self.original_pipelines.insert( - generate_pipeline_cache_key(schema, name, None), - (pipeline.to_owned(), TimestampNanosecond(version)), - ); - self.original_pipelines.insert( - generate_pipeline_cache_key(schema, name, Some(TimestampNanosecond(version))), + self.cache.insert_pipeline_str_cache( + EMPTY_SCHEMA_NAME, + name, + Some(TimestampNanosecond(version)), (pipeline.to_owned(), TimestampNanosecond(version)), + true, ); } @@ -354,7 +361,6 @@ impl PipelineTable { pub async fn delete_pipeline( &self, - schema: &str, name: &str, version: PipelineVersion, ) -> Result> { @@ -365,8 +371,8 @@ impl PipelineTable { ); // 1. check pipeline exist in catalog - let pipeline = self.find_pipeline(schema, name, version).await?; - if pipeline.is_none() { + let pipeline = self.find_pipeline(name, version).await?; + if pipeline.is_empty() { return Ok(None); } @@ -378,7 +384,7 @@ impl PipelineTable { let DataFrame::DataFusion(dataframe) = dataframe; let dataframe = dataframe - .filter(prepare_dataframe_conditions(schema, name, version)) + .filter(prepare_dataframe_conditions(name, version)) .context(BuildDfLogicalPlanSnafu)?; // 3. prepare dml stmt @@ -425,24 +431,19 @@ impl PipelineTable { ); // remove cache with version and latest - self.pipelines - .remove(&generate_pipeline_cache_key(schema, name, version)); - self.pipelines - .remove(&generate_pipeline_cache_key(schema, name, None)); - self.original_pipelines - .remove(&generate_pipeline_cache_key(schema, name, version)); - self.original_pipelines - .remove(&generate_pipeline_cache_key(schema, name, None)); + self.cache.remove_cache(name, version); Ok(Some(())) } + // find all pipelines with name and version + // cloud be multiple with different schema + // return format: (pipeline content, schema, created_at) async fn find_pipeline( &self, - schema: &str, name: &str, version: PipelineVersion, - ) -> Result> { + ) -> Result> { // 1. prepare dataframe let dataframe = self .query_engine @@ -450,19 +451,19 @@ impl PipelineTable { .context(DataFrameSnafu)?; let DataFrame::DataFusion(dataframe) = dataframe; + // select all pipelines with name and version let dataframe = dataframe - .filter(prepare_dataframe_conditions(schema, name, version)) + .filter(prepare_dataframe_conditions(name, version)) .context(BuildDfLogicalPlanSnafu)? .select_columns(&[ PIPELINE_TABLE_PIPELINE_CONTENT_COLUMN_NAME, + PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME, PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, ]) .context(BuildDfLogicalPlanSnafu)? .sort(vec![ col(PIPELINE_TABLE_CREATED_AT_COLUMN_NAME).sort(false, true) ]) - .context(BuildDfLogicalPlanSnafu)? - .limit(0, Some(1)) .context(BuildDfLogicalPlanSnafu)?; let plan = dataframe.into_parts().1; @@ -489,51 +490,70 @@ impl PipelineTable { .context(CollectRecordsSnafu)?; if records.is_empty() { - return Ok(None); + return Ok(vec![]); } - // limit 1 ensure!( - records.len() == 1 && records[0].num_columns() == 2, + !records.is_empty() && records.iter().all(|r| r.num_columns() == 3), PipelineNotFoundSnafu { name, version } ); - let pipeline_content_column = records[0].column(0); - let pipeline_content = pipeline_content_column - .as_any() - .downcast_ref::() - .with_context(|| CastTypeSnafu { - msg: format!( - "can't downcast {:?} array into string vector", - pipeline_content_column.data_type() - ), - })?; + let mut re = Vec::with_capacity(records.len()); + for r in records { + let pipeline_content_column = r.column(0); + let pipeline_content = pipeline_content_column + .as_any() + .downcast_ref::() + .with_context(|| CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into string vector", + pipeline_content_column.data_type() + ), + })?; - let pipeline_created_at_column = records[0].column(1); - let pipeline_created_at = pipeline_created_at_column - .as_any() - .downcast_ref::() - .with_context(|| CastTypeSnafu { - msg: format!( - "can't downcast {:?} array into scalar vector", - pipeline_created_at_column.data_type() - ), - })?; + let pipeline_schema_column = r.column(1); + let pipeline_schema = pipeline_schema_column + .as_any() + .downcast_ref::() + .with_context(|| CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into string vector", + pipeline_schema_column.data_type() + ), + })?; - debug!( - "find_pipeline_by_name: pipeline_content: {:?}, pipeline_created_at: {:?}", - pipeline_content, pipeline_created_at - ); + let pipeline_created_at_column = r.column(2); + let pipeline_created_at = pipeline_created_at_column + .as_any() + .downcast_ref::() + .with_context(|| CastTypeSnafu { + msg: format!( + "can't downcast {:?} array into scalar vector", + pipeline_created_at_column.data_type() + ), + })?; - ensure!( - pipeline_content.len() == 1, - PipelineNotFoundSnafu { name, version } - ); + debug!( + "find_pipeline_by_name: pipeline_content: {:?}, pipeline_schema: {:?}, pipeline_created_at: {:?}", + pipeline_content, pipeline_schema, pipeline_created_at + ); - // Safety: asserted above - Ok(Some(( - pipeline_content.get_data(0).unwrap().to_string(), - pipeline_created_at.get_data(0).unwrap(), - ))) + ensure!( + pipeline_content.len() == pipeline_schema.len() + && pipeline_schema.len() == pipeline_created_at.len(), + RecordBatchLenNotMatchSnafu + ); + + let len = pipeline_content.len(); + for i in 0..len { + re.push(( + pipeline_content.get_data(i).unwrap().to_string(), + pipeline_schema.get_data(i).unwrap().to_string(), + pipeline_created_at.get_data(i).unwrap(), + )); + } + } + + Ok(re) } } diff --git a/src/pipeline/src/manager/util.rs b/src/pipeline/src/manager/util.rs index 37aa5967e8..49537c011e 100644 --- a/src/pipeline/src/manager/util.rs +++ b/src/pipeline/src/manager/util.rs @@ -19,7 +19,6 @@ use datatypes::timestamp::TimestampNanosecond; use crate::error::{InvalidPipelineVersionSnafu, Result}; use crate::table::{ PIPELINE_TABLE_CREATED_AT_COLUMN_NAME, PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME, - PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME, }; use crate::PipelineVersion; @@ -34,15 +33,8 @@ pub fn to_pipeline_version(version_str: Option<&str>) -> Result } } -pub(crate) fn prepare_dataframe_conditions( - schema: &str, - name: &str, - version: PipelineVersion, -) -> Expr { - let mut conditions = vec![ - col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name)), - col(PIPELINE_TABLE_PIPELINE_SCHEMA_COLUMN_NAME).eq(lit(schema)), - ]; +pub(crate) fn prepare_dataframe_conditions(name: &str, version: PipelineVersion) -> Expr { + let mut conditions = vec![col(PIPELINE_TABLE_PIPELINE_NAME_COLUMN_NAME).eq(lit(name))]; if let Some(v) = version { conditions @@ -63,6 +55,13 @@ pub(crate) fn generate_pipeline_cache_key( } } +pub(crate) fn generate_pipeline_cache_key_suffix(name: &str, version: PipelineVersion) -> String { + match version { + Some(version) => format!("/{}/{}", name, i64::from(version)), + None => format!("/{}/latest", name), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index e044fcff61..80922cf904 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -177,7 +177,7 @@ pub async fn query_pipeline( pipeline_name, query_params .version - .unwrap_or(pipeline_version.0.to_iso8601_string()), + .unwrap_or(pipeline_version.0.to_timezone_aware_string(None)), start.elapsed().as_millis() as u64, Some(pipeline), )) diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 6cf765ae12..8cb8575fa7 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1403,7 +1403,7 @@ pub async fn test_pipeline_api(store_type: StorageType) { // handshake let client = TestClient::new(app).await; - let body = r#" + let pipeline_body = r#" processors: - date: field: time @@ -1437,7 +1437,7 @@ transform: let res = client .post("/v1/pipelines/greptime_guagua") .header("Content-Type", "application/x-yaml") - .body(body) + .body(pipeline_body) .send() .await; @@ -1452,7 +1452,7 @@ transform: let res = client .post("/v1/pipelines/test") .header("Content-Type", "application/x-yaml") - .body(body) + .body(pipeline_body) .send() .await; @@ -1505,7 +1505,7 @@ transform: .as_str() .unwrap(); let docs = YamlLoader::load_from_str(pipeline_yaml).unwrap(); - let body_yaml = YamlLoader::load_from_str(body).unwrap(); + let body_yaml = YamlLoader::load_from_str(pipeline_body).unwrap(); assert_eq!(docs, body_yaml); // Do not specify version, get the latest version @@ -1560,7 +1560,43 @@ transform: ) .await; - // 4. remove pipeline + // 4. cross-ref pipeline + // create database test_db + let res = client + .post("/v1/sql?sql=create database test_db") + .header("Content-Type", "application/x-www-form-urlencoded") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // check test_db created + validate_data( + "pipeline_db_schema", + &client, + "show databases", + "[[\"greptime_private\"],[\"information_schema\"],[\"public\"],[\"test_db\"]]", + ) + .await; + + // cross ref using public's pipeline + let res = client + .post("/v1/ingest?db=test_db&table=logs1&pipeline_name=test") + .header("Content-Type", "application/json") + .body(data_body) + .send() + .await; + assert_eq!(res.status(), StatusCode::OK); + + // check write success + validate_data( + "pipeline_db_schema", + &client, + "select * from test_db.logs1", + "[[2436,2528,\"INTERACT.MANAGER\",\"I\",\"ClusterAdapter:enter sendTextDataToCluster\\\\n\",1716668197217000000]]", + ) + .await; + + // 5. remove pipeline let encoded_ver_str: String = url::form_urlencoded::byte_serialize(version_str.as_bytes()).collect(); let res = client @@ -1580,7 +1616,7 @@ transform: format!(r#"[{{"name":"test","version":"{}"}}]"#, version_str).as_str() ); - // 5. write data failed + // 6. write data failed let res = client .post("/v1/ingest?db=public&table=logs1&pipeline_name=test") .header("Content-Type", "application/json")