From 4b3e60c1d3049d7fd9e4c9a74ed72e08929670a2 Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Tue, 23 Dec 2025 20:06:23 +0800 Subject: [PATCH] fix: load with latest Signed-off-by: shuiyisong --- src/pipeline/src/error.rs | 8 +++-- src/pipeline/src/manager/pipeline_cache.rs | 41 +++++++++++++++------- src/pipeline/src/manager/table.rs | 31 ++++++++++++---- 3 files changed, 59 insertions(+), 21 deletions(-) diff --git a/src/pipeline/src/error.rs b/src/pipeline/src/error.rs index 1ded1064f9..709f1fc673 100644 --- a/src/pipeline/src/error.rs +++ b/src/pipeline/src/error.rs @@ -646,10 +646,14 @@ pub enum Error { }, #[snafu(display( - "Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}", - schemas + "Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}", + name, + current_schema, + schemas, ))] MultiPipelineWithDiffSchema { + name: String, + current_schema: String, schemas: String, #[snafu(implicit)] location: Location, diff --git a/src/pipeline/src/manager/pipeline_cache.rs b/src/pipeline/src/manager/pipeline_cache.rs index f2a30857e1..ecd9bd9224 100644 --- a/src/pipeline/src/manager/pipeline_cache.rs +++ b/src/pipeline/src/manager/pipeline_cache.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use std::time::Duration; +use common_telemetry::debug; use datatypes::timestamp::TimestampNanosecond; use moka::sync::Cache; @@ -45,12 +46,17 @@ impl PipelineCache { pipelines: Cache::builder() .max_capacity(PIPELINES_CACHE_SIZE) .time_to_live(PIPELINES_CACHE_TTL) + .name("pipelines") .build(), original_pipelines: Cache::builder() .max_capacity(PIPELINES_CACHE_SIZE) .time_to_live(PIPELINES_CACHE_TTL) + .name("original_pipelines") + .build(), + failover_cache: Cache::builder() + .max_capacity(PIPELINES_CACHE_SIZE) + .name("failover_cache") .build(), - failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(), } } @@ -174,13 +180,13 @@ fn get_cache_generic( version: PipelineVersion, ) -> Result> { // lets try empty schema first - let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version); - if let Some(value) = cache.get(&k) { + let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version); + if let Some(value) = cache.get(&emp_key) { return Ok(Some(value)); } // use input schema - let k = generate_pipeline_cache_key(schema, name, version); - if let Some(value) = cache.get(&k) { + let schema_k = generate_pipeline_cache_key(schema, name, version); + if let Some(value) = cache.get(&schema_k) { return Ok(Some(value)); } @@ -194,13 +200,24 @@ fn get_cache_generic( match ks.len() { 0 => Ok(None), 1 => Ok(Some(ks.remove(0).1)), - _ => MultiPipelineWithDiffSchemaSnafu { - schemas: ks - .iter() - .filter_map(|(k, _)| k.split_once('/').map(|k| k.0)) - .collect::>() - .join(","), + _ => { + debug!( + "caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}", + cache.iter().map(|e| e.0).collect::>(), + emp_key, + schema_k, + suffix_key + ); + MultiPipelineWithDiffSchemaSnafu { + name: name.to_string(), + current_schema: schema.to_string(), + schemas: ks + .iter() + .filter_map(|(k, _)| k.split_once('/').map(|k| k.0)) + .collect::>() + .join(","), + } + .fail()? } - .fail()?, } } diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 4ca656a008..6af84a2c14 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -278,14 +278,17 @@ impl PipelineTable { &self, schema: &str, name: &str, - version: PipelineVersion, + input_version: PipelineVersion, ) -> Result<(String, TimestampNanosecond)> { - if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? { + if let Some(pipeline) = self + .cache + .get_pipeline_str_cache(schema, name, input_version)? + { return Ok(pipeline); } let mut pipeline_vec; - match self.find_pipeline(name, version).await { + match self.find_pipeline(name, input_version).await { Ok(p) => { METRIC_PIPELINE_TABLE_FIND_COUNT .with_label_values(&["true"]) @@ -302,8 +305,14 @@ impl PipelineTable { .inc(); return self .cache - .get_failover_cache(schema, name, version)? - .ok_or(PipelineNotFoundSnafu { name, version }.build()); + .get_failover_cache(schema, name, input_version)? + .ok_or( + PipelineNotFoundSnafu { + name, + version: input_version, + } + .build(), + ); } _ => { // if other error, we should return it @@ -314,7 +323,10 @@ impl PipelineTable { }; ensure!( !pipeline_vec.is_empty(), - PipelineNotFoundSnafu { name, version } + PipelineNotFoundSnafu { + name, + version: input_version + } ); // if the result is exact one, use it @@ -342,13 +354,18 @@ impl PipelineTable { // throw an error let (pipeline_content, found_schema, version) = pipeline.context(MultiPipelineWithDiffSchemaSnafu { + name: name.to_string(), + current_schema: schema.to_string(), schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","), })?; let v = *version; let p = (pipeline_content.clone(), v); + + let with_latest = input_version.is_none(); + self.cache - .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false); + .insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), with_latest); Ok(p) }