From 3aa82567c44a3b0c408dc0be43ba4b032171c7ce Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 1 Jul 2026 18:02:57 +0800 Subject: [PATCH] fix: invalidate comment DDL cache and lock by object ID (#8390) * fix: invalidate comment ddl cache locally Signed-off-by: WenyXu * fix: fix typos Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/ddl/comment_on.rs | 106 +++++++++++++++----------- src/common/meta/src/ddl_manager.rs | 8 +- src/common/meta/src/rpc/ddl.rs | 101 ++++++++++++++++++++++-- src/operator/src/statement/comment.rs | 47 ++++++++++-- 4 files changed, 206 insertions(+), 56 deletions(-) diff --git a/src/common/meta/src/ddl/comment_on.rs b/src/common/meta/src/ddl/comment_on.rs index 65c2cbc0ec..c4970abb83 100644 --- a/src/common/meta/src/ddl/comment_on.rs +++ b/src/common/meta/src/ddl/comment_on.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use async_trait::async_trait; use chrono::Utc; -use common_catalog::format_full_table_name; +use common_catalog::{format_full_flow_name, format_full_table_name}; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::tracing::info; @@ -38,8 +38,8 @@ use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue}; use crate::key::table_info::{TableInfoKey, TableInfoValue}; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue}; -use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock}; -use crate::rpc::ddl::{CommentObjectType, CommentOnTask}; +use crate::lock_key::{CatalogLock, FlowLock, FlowNameLock, SchemaLock, TableLock, TableNameLock}; +use crate::rpc::ddl::{CommentObjectId, CommentObjectType, CommentOnTask}; use crate::rpc::store::PutRequest; pub struct CommentOnProcedure { @@ -106,26 +106,29 @@ impl CommentOnProcedure { } async fn prepare_table_or_column(&mut self) -> Result<()> { - let table_name_key = TableNameKey::new( - &self.data.catalog_name, - &self.data.schema_name, - &self.data.object_name, - ); + let table_id = if let Some(table_id) = self.data.table_id { + table_id + } else { + let table_name_key = TableNameKey::new( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ); - let table_id = self - .context - .table_metadata_manager - .table_name_manager() - .get(table_name_key) - .await? - .with_context(|| TableNotFoundSnafu { - table_name: format_full_table_name( - &self.data.catalog_name, - &self.data.schema_name, - &self.data.object_name, - ), - })? - .table_id(); + self.context + .table_metadata_manager + .table_name_manager() + .get(table_name_key) + .await? + .with_context(|| TableNotFoundSnafu { + table_name: format_full_table_name( + &self.data.catalog_name, + &self.data.schema_name, + &self.data.object_name, + ), + })? + .table_id() + }; let table_info = self .context @@ -198,17 +201,22 @@ impl CommentOnProcedure { } async fn prepare_flow(&mut self) -> Result<()> { - let flow_name_value = self - .context - .flow_metadata_manager - .flow_name_manager() - .get(&self.data.catalog_name, &self.data.object_name) - .await? - .with_context(|| FlowNotFoundSnafu { - flow_name: &self.data.object_name, - })?; - - let flow_id = flow_name_value.flow_id(); + let flow_id = if let Some(flow_id) = self.data.flow_id { + flow_id + } else { + self.context + .flow_metadata_manager + .flow_name_manager() + .get(&self.data.catalog_name, &self.data.object_name) + .await? + .with_context(|| FlowNotFoundSnafu { + flow_name: format_full_flow_name( + &self.data.catalog_name, + &self.data.object_name, + ), + })? + .flow_id() + }; let flow_info = self .context .flow_metadata_manager @@ -216,7 +224,7 @@ impl CommentOnProcedure { .get_raw(flow_id) .await? .with_context(|| FlowNotFoundSnafu { - flow_name: &self.data.object_name, + flow_name: format_full_flow_name(&self.data.catalog_name, &self.data.object_name), })?; self.data.flow_id = Some(flow_id); @@ -411,17 +419,23 @@ impl Procedure for CommentOnProcedure { let lock_key = match self.data.object_type { CommentObjectType::Table | CommentObjectType::Column => { - vec![ + let mut lock_key = vec![ CatalogLock::Read(catalog).into(), SchemaLock::read(catalog, schema).into(), - TableNameLock::new(catalog, schema, &self.data.object_name).into(), - ] + ]; + if let Some(table_id) = self.data.table_id { + lock_key.push(TableLock::Write(table_id).into()); + } + lock_key.push(TableNameLock::new(catalog, schema, &self.data.object_name).into()); + lock_key } CommentObjectType::Flow => { - vec![ - CatalogLock::Read(catalog).into(), - FlowNameLock::new(catalog, &self.data.object_name).into(), - ] + let mut lock_key = vec![CatalogLock::Read(catalog).into()]; + if let Some(flow_id) = self.data.flow_id { + lock_key.push(FlowLock::Write(flow_id).into()); + } + lock_key.push(FlowNameLock::new(catalog, &self.data.object_name).into()); + lock_key } }; @@ -466,6 +480,12 @@ pub struct CommentOnData { impl CommentOnData { pub fn new(task: CommentOnTask) -> Self { + let (table_id, flow_id) = match task.object_id { + Some(CommentObjectId::Table(table_id)) => (Some(table_id), None), + Some(CommentObjectId::Flow(flow_id)) => (None, Some(flow_id)), + None => (None, None), + }; + Self { state: CommentOnState::Prepare, catalog_name: task.catalog_name, @@ -474,9 +494,9 @@ impl CommentOnData { object_name: task.object_name, column_name: task.column_name, comment: task.comment, - table_id: None, + table_id, table_info: None, - flow_id: None, + flow_id, flow_info: None, is_unchanged: false, } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index a6192f06ba..a92c9b679a 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -574,9 +574,15 @@ impl DdlManager { #[tracing::instrument(skip_all)] pub async fn submit_comment_on_task( &self, - comment_on_task: CommentOnTask, + mut comment_on_task: CommentOnTask, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); + comment_on_task + .enrich_object_id( + context.table_metadata_manager.table_name_manager(), + context.flow_metadata_manager.flow_name_manager(), + ) + .await?; let procedure = CommentOnProcedure::new(comment_on_task, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 1cca31c364..fd11e4c310 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -41,6 +41,7 @@ use api::v1::{ }; use base64::Engine as _; use base64::engine::general_purpose; +use common_catalog::{format_full_flow_name, format_full_table_name}; use common_error::ext::BoxedError; use common_time::{DatabaseTimeToLive, Timestamp}; use prost::Message; @@ -56,7 +57,11 @@ use crate::error::{ self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu, InvalidUnsetDatabaseOptionSnafu, Result, }; +use crate::flow_name::FlowName; +use crate::instruction::CacheIdent; use crate::key::FlowId; +use crate::key::flow::flow_name::FlowNameManager; +use crate::key::table_name::{TableNameKey, TableNameManager}; /// Reserved query-context extension key for the frontend peer address that submitted a DDL request. pub const ORIGIN_FRONTEND_ADDR_EXTENSION_KEY: &str = "__greptime_origin_frontend.addr"; @@ -1351,13 +1356,99 @@ pub enum CommentObjectType { } impl CommentOnTask { - pub fn table_ref(&self) -> TableReference<'_> { - TableReference { - catalog: &self.catalog_name, - schema: &self.schema_name, - table: &self.object_name, + pub fn table_id(&self) -> Option { + match self.object_id.as_ref() { + Some(CommentObjectId::Table(table_id)) => Some(*table_id), + _ => None, } } + + pub fn flow_id(&self) -> Option { + match self.object_id.as_ref() { + Some(CommentObjectId::Flow(flow_id)) => Some(*flow_id), + _ => None, + } + } + + fn set_table_id(&mut self, table_id: TableId) { + self.object_id = Some(CommentObjectId::Table(table_id)); + } + + fn set_flow_id(&mut self, flow_id: FlowId) { + self.object_id = Some(CommentObjectId::Flow(flow_id)); + } + + /// Returns the cache identifiers for the object being commented on. + pub fn cache_idents(&self) -> Vec { + match self.object_type { + CommentObjectType::Table | CommentObjectType::Column => { + let mut cache_idents = Vec::with_capacity(2); + if let Some(CommentObjectId::Table(table_id)) = self.object_id.as_ref() { + cache_idents.push(CacheIdent::TableId(*table_id)); + } + cache_idents.push(CacheIdent::TableName(TableName { + catalog_name: self.catalog_name.clone(), + schema_name: self.schema_name.clone(), + table_name: self.object_name.clone(), + })); + cache_idents + } + CommentObjectType::Flow => { + let mut cache_idents = Vec::with_capacity(2); + if let Some(CommentObjectId::Flow(flow_id)) = self.object_id.as_ref() { + cache_idents.push(CacheIdent::FlowId(*flow_id)); + } + cache_idents.push(CacheIdent::FlowName(FlowName { + catalog_name: self.catalog_name.clone(), + flow_name: self.object_name.clone(), + })); + cache_idents + } + } + } + + /// Enriches the `object_id` field of the `CommentOnTask` + /// by looking up the corresponding table or flow ID using the provided managers. + pub async fn enrich_object_id( + &mut self, + table_name_manager: &TableNameManager, + flow_name_manager: &FlowNameManager, + ) -> Result<()> { + match self.object_type { + CommentObjectType::Table | CommentObjectType::Column => { + let table_id = table_name_manager + .get(TableNameKey::new( + &self.catalog_name, + &self.schema_name, + &self.object_name, + )) + .await? + .with_context(|| error::TableNotFoundSnafu { + table_name: format_full_table_name( + &self.catalog_name, + &self.schema_name, + &self.object_name, + ), + })? + .table_id(); + + self.set_table_id(table_id); + } + CommentObjectType::Flow => { + let flow_id = flow_name_manager + .get(&self.catalog_name, &self.object_name) + .await? + .with_context(|| error::FlowNotFoundSnafu { + flow_name: format_full_flow_name(&self.catalog_name, &self.object_name), + })? + .flow_id(); + + self.set_flow_id(flow_id); + } + } + + Ok(()) + } } // Proto conversions for CommentObjectType diff --git a/src/operator/src/statement/comment.rs b/src/operator/src/statement/comment.rs index 2a5c48ffbb..dc3eff0b8d 100644 --- a/src/operator/src/statement/comment.rs +++ b/src/operator/src/statement/comment.rs @@ -14,6 +14,7 @@ use api::v1::CommentOnExpr; use common_error::ext::BoxedError; +use common_meta::cache_invalidator::Context; use common_meta::procedure_executor::ExecutorContext; use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest}; use common_query::Output; @@ -23,7 +24,9 @@ use snafu::ResultExt; use sql::ast::ObjectNamePartExt; use sql::statements::comment::{Comment, CommentObject}; -use crate::error::{ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result}; +use crate::error::{ + self, ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu, +}; use crate::statement::StatementExecutor; use crate::utils::to_meta_query_context; @@ -39,7 +42,15 @@ impl StatementExecutor { /// /// A `Result` containing the `Output` of the operation, or an error if the operation fails. pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result { - let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?; + let mut comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?; + comment_on_task + .enrich_object_id( + self.table_metadata_manager.table_name_manager(), + self.flow_metadata_manager.flow_name_manager(), + ) + .await + .context(TableMetadataManagerSnafu)?; + let cache_idents = comment_on_task.cache_idents(); let request = SubmitDdlTaskRequest::new( to_meta_query_context(query_ctx), @@ -49,8 +60,15 @@ impl StatementExecutor { self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) .await - .context(ExecuteDdlSnafu) - .map(|_| Output::new_with_affected_rows(0)) + .context(ExecuteDdlSnafu)?; + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate(&Context::default(), &cache_idents) + .await + .context(error::InvalidateTableCacheSnafu)?; + + Ok(Output::new_with_affected_rows(0)) } pub async fn comment_by_expr( @@ -58,7 +76,15 @@ impl StatementExecutor { expr: CommentOnExpr, query_ctx: QueryContextRef, ) -> Result { - let comment_on_task = self.create_comment_on_task_from_expr(expr)?; + let mut comment_on_task = self.create_comment_on_task_from_expr(expr)?; + comment_on_task + .enrich_object_id( + self.table_metadata_manager.table_name_manager(), + self.flow_metadata_manager.flow_name_manager(), + ) + .await + .context(TableMetadataManagerSnafu)?; + let cache_idents = comment_on_task.cache_idents(); let request = SubmitDdlTaskRequest::new( to_meta_query_context(query_ctx), @@ -68,8 +94,15 @@ impl StatementExecutor { self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) .await - .context(ExecuteDdlSnafu) - .map(|_| Output::new_with_affected_rows(0)) + .context(ExecuteDdlSnafu)?; + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate(&Context::default(), &cache_idents) + .await + .context(error::InvalidateTableCacheSnafu)?; + + Ok(Output::new_with_affected_rows(0)) } fn create_comment_on_task_from_expr(&self, expr: CommentOnExpr) -> Result {