From 2fcd0cff2e7dadfdfb7ba21bf6fd487b21bfe00d Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Thu, 8 Jan 2026 16:10:06 +0800 Subject: [PATCH] fix: pipeline loading issue (#7491) (#7540) * fix: pipeline loading issue (#7491) * fix: pipeline loading Signed-off-by: shuiyisong * chore: change string to str Signed-off-by: shuiyisong * chore: minor fix to save returned version Signed-off-by: shuiyisong * refactor: introduce PipelineContent Signed-off-by: shuiyisong * fix: use found schema Signed-off-by: shuiyisong * chore: update CR Co-authored-by: Yingwen * chore: CR issue Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong Co-authored-by: Yingwen * fix: etcd sh Signed-off-by: shuiyisong * fix: typo Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong Co-authored-by: Yingwen --- .../setup-greptimedb-cluster/action.yml | 2 +- .github/scripts/deploy-greptimedb.sh | 4 +- src/pipeline/src/error.rs | 8 +- src/pipeline/src/manager/pipeline_cache.rs | 80 ++++++++----- src/pipeline/src/manager/pipeline_operator.rs | 1 + src/pipeline/src/manager/table.rs | 106 ++++++++++-------- src/query/src/dist_plan/commutativity.rs | 2 +- src/query/src/query_engine/state.rs | 2 +- 8 files changed, 123 insertions(+), 82 deletions(-) diff --git a/.github/actions/setup-greptimedb-cluster/action.yml b/.github/actions/setup-greptimedb-cluster/action.yml index 8a19a192bf..759d16d80b 100644 --- a/.github/actions/setup-greptimedb-cluster/action.yml +++ b/.github/actions/setup-greptimedb-cluster/action.yml @@ -51,7 +51,7 @@ runs: run: | helm upgrade \ --install my-greptimedb \ - --set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \ + --set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \ --set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \ --set image.registry=${{ inputs.image-registry }} \ --set image.repository=${{ inputs.image-repository }} \ diff --git a/.github/scripts/deploy-greptimedb.sh b/.github/scripts/deploy-greptimedb.sh index bba7c83a07..a97d6c05ef 100755 --- a/.github/scripts/deploy-greptimedb.sh +++ b/.github/scripts/deploy-greptimedb.sh @@ -68,7 +68,7 @@ function deploy_greptimedb_cluster() { helm install "$cluster_name" greptime/greptimedb-cluster \ --set image.tag="$GREPTIMEDB_IMAGE_TAG" \ - --set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \ + --set 'meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379"' \ -n "$install_namespace" # Wait for greptimedb cluster to be ready. @@ -103,7 +103,7 @@ function deploy_greptimedb_cluster_with_s3_storage() { helm install "$cluster_name" greptime/greptimedb-cluster -n "$install_namespace" \ --set image.tag="$GREPTIMEDB_IMAGE_TAG" \ - --set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \ + --set 'meta.backendStorage.etcd.endpoints[0]="etcd.$install_namespace:2379"' \ --set storage.s3.bucket="$AWS_CI_TEST_BUCKET" \ --set storage.s3.region="$AWS_REGION" \ --set storage.s3.root="$DATA_ROOT" \ diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 1ded1064f9..709f1fc673 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -646,10 +646,14 @@ pub enum Error { }, #[snafu(display( - "Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}", - schemas + "Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}", + name, + current_schema, + schemas, ))] MultiPipelineWithDiffSchema { + name: String, + current_schema: String, schemas: String, #[snafu(implicit)] location: Location, diff --git a/src/pipeline/src/manager/pipeline_cache.rs b/src/pipeline/src/manager/pipeline_cache.rs index f2a30857e1..2abe24e94b 100644 --- a/src/pipeline/src/manager/pipeline_cache.rs +++ b/src/pipeline/src/manager/pipeline_cache.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use common_telemetry::debug; use datatypes::timestamp::TimestampNanosecond; use moka::sync::Cache; @@ -33,10 +34,18 @@ const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10); /// to encapsulate inner cache. Only public methods are exposed. pub(crate) struct PipelineCache { pipelines: Cache>, - original_pipelines: Cache, + original_pipelines: Cache, /// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline /// The failover cache never expires, but it will be updated when the pipelines cache is updated. - failover_cache: Cache, + failover_cache: Cache, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PipelineContent { + pub name: String, + pub content: String, + pub version: TimestampNanosecond, + pub schema: String, } impl PipelineCache { @@ -45,12 +54,17 @@ impl PipelineCache { pipelines: Cache::builder() .max_capacity(PIPELINES_CACHE_SIZE) .time_to_live(PIPELINES_CACHE_TTL) + .name("pipelines") .build(), original_pipelines: Cache::builder() .max_capacity(PIPELINES_CACHE_SIZE) .time_to_live(PIPELINES_CACHE_TTL) + .name("original_pipelines") + .build(), + failover_cache: Cache::builder() + .max_capacity(PIPELINES_CACHE_SIZE) + .name("failover_cache") .build(), - failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(), } } @@ -72,19 +86,15 @@ impl PipelineCache { ); } - pub(crate) fn insert_pipeline_str_cache( - &self, - schema: &str, - name: &str, - version: PipelineVersion, - pipeline: (String, TimestampNanosecond), - with_latest: bool, - ) { + pub(crate) fn insert_pipeline_str_cache(&self, pipeline: &PipelineContent, with_latest: bool) { + let schema = pipeline.schema.as_str(); + let name = pipeline.name.as_str(); + let version = pipeline.version; insert_cache_generic( &self.original_pipelines, schema, name, - version, + Some(version), pipeline.clone(), with_latest, ); @@ -92,8 +102,8 @@ impl PipelineCache { &self.failover_cache, schema, name, - version, - pipeline, + Some(version), + pipeline.clone(), with_latest, ); } @@ -112,7 +122,7 @@ impl PipelineCache { schema: &str, name: &str, version: PipelineVersion, - ) -> Result> { + ) -> Result> { get_cache_generic(&self.failover_cache, schema, name, version) } @@ -121,7 +131,7 @@ impl PipelineCache { schema: &str, name: &str, version: PipelineVersion, - ) -> Result> { + ) -> Result> { get_cache_generic(&self.original_pipelines, schema, name, version) } @@ -174,13 +184,13 @@ fn get_cache_generic( version: PipelineVersion, ) -> Result> { // lets try empty schema first - let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version); - if let Some(value) = cache.get(&k) { + let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version); + if let Some(value) = cache.get(&emp_key) { return Ok(Some(value)); } // use input schema - let k = generate_pipeline_cache_key(schema, name, version); - if let Some(value) = cache.get(&k) { + let schema_k = generate_pipeline_cache_key(schema, name, version); + if let Some(value) = cache.get(&schema_k) { return Ok(Some(value)); } @@ -193,14 +203,28 @@ fn get_cache_generic( match ks.len() { 0 => Ok(None), - 1 => Ok(Some(ks.remove(0).1)), - _ => MultiPipelineWithDiffSchemaSnafu { - schemas: ks - .iter() - .filter_map(|(k, _)| k.split_once('/').map(|k| k.0)) - .collect::>() - .join(","), + 1 => { + let (_, value) = ks.remove(0); + Ok(Some(value)) + } + _ => { + debug!( + "caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}", + cache.iter().map(|e| e.0).collect::>(), + emp_key, + schema_k, + suffix_key + ); + MultiPipelineWithDiffSchemaSnafu { + name: name.to_string(), + current_schema: schema.to_string(), + schemas: ks + .iter() + .filter_map(|(k, _)| k.split_once('/').map(|k| k.0)) + .collect::>() + .join(","), + } + .fail()? } - .fail()?, } } diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 6ad190cf23..24df34e506 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -220,6 +220,7 @@ impl PipelineOperator { .observe(timer.elapsed().as_secs_f64()) }) .await + .map(|p| (p.content, p.version)) } /// Insert a pipeline into the pipeline table. diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 4ca656a008..154b803163 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -44,7 +44,7 @@ use crate::error::{ MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result, }; use crate::etl::{parse, Content, Pipeline}; -use crate::manager::pipeline_cache::PipelineCache; +use crate::manager::pipeline_cache::{PipelineCache, PipelineContent}; use crate::manager::{PipelineInfo, PipelineVersion}; use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT; use crate::util::prepare_dataframe_conditions; @@ -258,17 +258,22 @@ impl PipelineTable { &self, schema: &str, name: &str, - version: PipelineVersion, + input_version: PipelineVersion, ) -> Result> { - if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? { + if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, input_version)? { return Ok(pipeline); } - let pipeline = self.get_pipeline_str(schema, name, version).await?; - let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); + let pipeline_content = self.get_pipeline_str(schema, name, input_version).await?; + let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline_content.content)?); - self.cache - .insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false); + self.cache.insert_pipeline_cache( + &pipeline_content.schema, + name, + Some(pipeline_content.version), + compiled_pipeline.clone(), + input_version.is_none(), + ); Ok(compiled_pipeline) } @@ -278,14 +283,17 @@ impl PipelineTable { &self, schema: &str, name: &str, - version: PipelineVersion, - ) -> Result<(String, TimestampNanosecond)> { - if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? { + input_version: PipelineVersion, + ) -> Result { + if let Some(pipeline) = self + .cache + .get_pipeline_str_cache(schema, name, input_version)? + { return Ok(pipeline); } let mut pipeline_vec; - match self.find_pipeline(name, version).await { + match self.find_pipeline(name, input_version).await { Ok(p) => { METRIC_PIPELINE_TABLE_FIND_COUNT .with_label_values(&["true"]) @@ -302,8 +310,11 @@ impl PipelineTable { .inc(); return self .cache - .get_failover_cache(schema, name, version)? - .ok_or(PipelineNotFoundSnafu { name, version }.build()); + .get_failover_cache(schema, name, input_version)? + .context(PipelineNotFoundSnafu { + name, + version: input_version, + }); } _ => { // if other error, we should return it @@ -314,42 +325,40 @@ impl PipelineTable { }; ensure!( !pipeline_vec.is_empty(), - PipelineNotFoundSnafu { name, version } + PipelineNotFoundSnafu { + name, + version: input_version + } ); // if the result is exact one, use it if pipeline_vec.len() == 1 { - let (pipeline_content, found_schema, version) = pipeline_vec.remove(0); - let p = (pipeline_content, version); - self.cache.insert_pipeline_str_cache( - &found_schema, - name, - Some(version), - p.clone(), - false, - ); - return Ok(p); + let pipeline_content = pipeline_vec.remove(0); + + self.cache + .insert_pipeline_str_cache(&pipeline_content, input_version.is_none()); + return Ok(pipeline_content); } // check if there's empty schema pipeline // if there isn't, check current schema let pipeline = pipeline_vec .iter() - .find(|v| v.1 == EMPTY_SCHEMA_NAME) - .or_else(|| pipeline_vec.iter().find(|v| v.1 == schema)); + .position(|v| v.schema == EMPTY_SCHEMA_NAME) + .or_else(|| pipeline_vec.iter().position(|v| v.schema == schema)) + .map(|idx| pipeline_vec.remove(idx)); // multiple pipeline with no empty or current schema // throw an error - let (pipeline_content, found_schema, version) = - pipeline.context(MultiPipelineWithDiffSchemaSnafu { - schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","), - })?; + let pipeline_content = pipeline.with_context(|| MultiPipelineWithDiffSchemaSnafu { + name: name.to_string(), + current_schema: schema.to_string(), + schemas: pipeline_vec.iter().map(|v| v.schema.clone()).join(","), + })?; - let v = *version; - let p = (pipeline_content.clone(), v); self.cache - .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false); - Ok(p) + .insert_pipeline_str_cache(&pipeline_content, input_version.is_none()); + Ok(pipeline_content) } /// Insert a pipeline into the pipeline table and compile it. @@ -376,13 +385,15 @@ impl PipelineTable { true, ); - self.cache.insert_pipeline_str_cache( - EMPTY_SCHEMA_NAME, - name, - Some(TimestampNanosecond(version)), - (pipeline.to_owned(), TimestampNanosecond(version)), - true, - ); + let pipeline_content = PipelineContent { + name: name.to_string(), + content: pipeline.to_string(), + version: TimestampNanosecond(version), + schema: EMPTY_SCHEMA_NAME.to_string(), + }; + + self.cache + .insert_pipeline_str_cache(&pipeline_content, true); } Ok((version, compiled_pipeline)) @@ -472,7 +483,7 @@ impl PipelineTable { &self, name: &str, version: PipelineVersion, - ) -> Result> { + ) -> Result> { // 1. prepare dataframe let dataframe = self .query_engine @@ -575,11 +586,12 @@ impl PipelineTable { let len = pipeline_content.len(); for i in 0..len { - re.push(( - pipeline_content.get_data(i).unwrap().to_string(), - pipeline_schema.get_data(i).unwrap().to_string(), - pipeline_created_at.get_data(i).unwrap(), - )); + re.push(PipelineContent { + name: name.to_string(), + content: pipeline_content.get_data(i).unwrap().to_string(), + version: pipeline_created_at.get_data(i).unwrap(), + schema: pipeline_schema.get_data(i).unwrap().to_string(), + }); } } diff --git a/src/query/src/dist_plan/commutativity.rs b/src/query/src/dist_plan/commutativity.rs index e8ece184fc..a3394cdbbb 100644 --- a/src/query/src/dist_plan/commutativity.rs +++ b/src/query/src/dist_plan/commutativity.rs @@ -263,7 +263,7 @@ impl Categorizer { } } // all group by expressions are partition columns can push down, unless - // another push down(including `Limit` or `Sort`) is already in progress(which will then prvent next cond commutative node from being push down). + // another push down(including `Limit` or `Sort`) is already in progress(which will then prevent next cond commutative node from being push down). // TODO(discord9): This is a temporary solution(that works), a better description of // commutativity is needed under this situation. Commutativity::ConditionalCommutative(None) diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 09f5e25d9d..c6737dad69 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -203,7 +203,7 @@ impl QueryEngineState { rules.retain(|rule| rule.name() != name); } - /// Optimize the logical plan by the extension anayzer rules. + /// Optimize the logical plan by the extension analyzer rules. pub fn optimize_by_extension_rules( &self, plan: DfLogicalPlan,