From acbaa4c599dece63a7b77ac6a17ae81f75bf790f Mon Sep 17 00:00:00 2001 From: shuiyisong Date: Fri, 26 Dec 2025 19:12:08 +0800 Subject: [PATCH] fix: load with latest Signed-off-by: shuiyisong --- src/frontend/src/instance/log_handler.rs | 1 + src/pipeline/src/manager/pipeline_cache.rs | 21 ++++++++++++------- src/pipeline/src/manager/pipeline_operator.rs | 2 +- src/pipeline/src/manager/table.rs | 12 +++++------ 4 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 179d5e098f..3f12df9612 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -115,6 +115,7 @@ impl PipelineHandler for Instance { .get_pipeline_str(name, version, query_ctx) .await .context(PipelineSnafu) + .map(|(p, _)| p) } } diff --git a/src/pipeline/src/manager/pipeline_cache.rs b/src/pipeline/src/manager/pipeline_cache.rs index ecd9bd9224..e3b7fda155 100644 --- a/src/pipeline/src/manager/pipeline_cache.rs +++ b/src/pipeline/src/manager/pipeline_cache.rs @@ -109,7 +109,7 @@ impl PipelineCache { schema: &str, name: &str, version: PipelineVersion, - ) -> Result>> { + ) -> Result, String)>> { get_cache_generic(&self.pipelines, schema, name, version) } @@ -118,7 +118,7 @@ impl PipelineCache { schema: &str, name: &str, version: PipelineVersion, - ) -> Result> { + ) -> Result> { get_cache_generic(&self.failover_cache, schema, name, version) } @@ -127,7 +127,7 @@ impl PipelineCache { schema: &str, name: &str, version: PipelineVersion, - ) -> Result> { + ) -> Result> { get_cache_generic(&self.original_pipelines, schema, name, version) } @@ -178,16 +178,16 @@ fn get_cache_generic( schema: &str, name: &str, version: PipelineVersion, -) -> Result> { +) -> Result> { // lets try empty schema first let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version); if let Some(value) = cache.get(&emp_key) { - return Ok(Some(value)); + return Ok(Some((value, EMPTY_SCHEMA_NAME.to_string()))); } // use input schema let schema_k = generate_pipeline_cache_key(schema, name, version); if let Some(value) = cache.get(&schema_k) { - return Ok(Some(value)); + return Ok(Some((value, schema.to_string()))); } // try all schemas @@ -199,7 +199,14 @@ fn get_cache_generic( match ks.len() { 0 => Ok(None), - 1 => Ok(Some(ks.remove(0).1)), + 1 => { + let (key, value) = ks.remove(0); + if let Some((key_schema, _)) = key.split_once("/") { + Ok(Some((value, key_schema.to_string()))) + } else { + Ok(Some((value, EMPTY_SCHEMA_NAME.to_string()))) + } + } _ => { debug!( "caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}", diff --git a/src/pipeline/src/manager/pipeline_operator.rs b/src/pipeline/src/manager/pipeline_operator.rs index 6ad190cf23..b6317fa13c 100644 --- a/src/pipeline/src/manager/pipeline_operator.rs +++ b/src/pipeline/src/manager/pipeline_operator.rs @@ -205,7 +205,7 @@ impl PipelineOperator { name: &str, version: PipelineVersion, query_ctx: QueryContextRef, - ) -> Result<(String, TimestampNanosecond)> { + ) -> Result<((String, TimestampNanosecond), String)> { let schema = query_ctx.current_schema(); self.create_pipeline_table_if_not_exists(query_ctx.clone()) .await?; diff --git a/src/pipeline/src/manager/table.rs b/src/pipeline/src/manager/table.rs index 53db9c1c26..0e130b0bb7 100644 --- a/src/pipeline/src/manager/table.rs +++ b/src/pipeline/src/manager/table.rs @@ -261,14 +261,14 @@ impl PipelineTable { version: PipelineVersion, ) -> Result> { if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? { - return Ok(pipeline); + return Ok(pipeline.0); } - let pipeline = self.get_pipeline_str(schema, name, version).await?; + let (pipeline, found_schema) = self.get_pipeline_str(schema, name, version).await?; let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?); self.cache.insert_pipeline_cache( - schema, + &found_schema, name, version, compiled_pipeline.clone(), @@ -284,7 +284,7 @@ impl PipelineTable { schema: &str, name: &str, input_version: PipelineVersion, - ) -> Result<(String, TimestampNanosecond)> { + ) -> Result<((String, TimestampNanosecond), String)> { if let Some(pipeline) = self .cache .get_pipeline_str_cache(schema, name, input_version)? @@ -345,7 +345,7 @@ impl PipelineTable { p.clone(), input_version.is_none(), ); - return Ok(p); + return Ok((p, found_schema)); } // check if there's empty schema pipeline @@ -374,7 +374,7 @@ impl PipelineTable { p.clone(), input_version.is_none(), ); - Ok(p) + Ok((p, found_schema.clone())) } /// Insert a pipeline into the pipeline table and compile it.