fix: invalidate comment DDL cache and lock by object ID (#8390)

* fix: invalidate comment ddl cache locally

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix typos

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-07-01 18:02:57 +08:00
committed by GitHub
parent 8a11ee9462
commit 3aa82567c4
4 changed files with 206 additions and 56 deletions

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use chrono::Utc;
use common_catalog::format_full_table_name;
use common_catalog::{format_full_flow_name, format_full_table_name};
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use common_telemetry::tracing::info;
@@ -38,8 +38,8 @@ use crate::key::flow::flow_info::{FlowInfoKey, FlowInfoValue};
use crate::key::table_info::{TableInfoKey, TableInfoValue};
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, FlowId, MetadataKey, MetadataValue};
use crate::lock_key::{CatalogLock, FlowNameLock, SchemaLock, TableNameLock};
use crate::rpc::ddl::{CommentObjectType, CommentOnTask};
use crate::lock_key::{CatalogLock, FlowLock, FlowNameLock, SchemaLock, TableLock, TableNameLock};
use crate::rpc::ddl::{CommentObjectId, CommentObjectType, CommentOnTask};
use crate::rpc::store::PutRequest;
pub struct CommentOnProcedure {
@@ -106,26 +106,29 @@ impl CommentOnProcedure {
}
async fn prepare_table_or_column(&mut self) -> Result<()> {
let table_name_key = TableNameKey::new(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
);
let table_id = if let Some(table_id) = self.data.table_id {
table_id
} else {
let table_name_key = TableNameKey::new(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
);
let table_id = self
.context
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
})?
.table_id();
self.context
.table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format_full_table_name(
&self.data.catalog_name,
&self.data.schema_name,
&self.data.object_name,
),
})?
.table_id()
};
let table_info = self
.context
@@ -198,17 +201,22 @@ impl CommentOnProcedure {
}
async fn prepare_flow(&mut self) -> Result<()> {
let flow_name_value = self
.context
.flow_metadata_manager
.flow_name_manager()
.get(&self.data.catalog_name, &self.data.object_name)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: &self.data.object_name,
})?;
let flow_id = flow_name_value.flow_id();
let flow_id = if let Some(flow_id) = self.data.flow_id {
flow_id
} else {
self.context
.flow_metadata_manager
.flow_name_manager()
.get(&self.data.catalog_name, &self.data.object_name)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: format_full_flow_name(
&self.data.catalog_name,
&self.data.object_name,
),
})?
.flow_id()
};
let flow_info = self
.context
.flow_metadata_manager
@@ -216,7 +224,7 @@ impl CommentOnProcedure {
.get_raw(flow_id)
.await?
.with_context(|| FlowNotFoundSnafu {
flow_name: &self.data.object_name,
flow_name: format_full_flow_name(&self.data.catalog_name, &self.data.object_name),
})?;
self.data.flow_id = Some(flow_id);
@@ -411,17 +419,23 @@ impl Procedure for CommentOnProcedure {
let lock_key = match self.data.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
vec![
let mut lock_key = vec![
CatalogLock::Read(catalog).into(),
SchemaLock::read(catalog, schema).into(),
TableNameLock::new(catalog, schema, &self.data.object_name).into(),
]
];
if let Some(table_id) = self.data.table_id {
lock_key.push(TableLock::Write(table_id).into());
}
lock_key.push(TableNameLock::new(catalog, schema, &self.data.object_name).into());
lock_key
}
CommentObjectType::Flow => {
vec![
CatalogLock::Read(catalog).into(),
FlowNameLock::new(catalog, &self.data.object_name).into(),
]
let mut lock_key = vec![CatalogLock::Read(catalog).into()];
if let Some(flow_id) = self.data.flow_id {
lock_key.push(FlowLock::Write(flow_id).into());
}
lock_key.push(FlowNameLock::new(catalog, &self.data.object_name).into());
lock_key
}
};
@@ -466,6 +480,12 @@ pub struct CommentOnData {
impl CommentOnData {
pub fn new(task: CommentOnTask) -> Self {
let (table_id, flow_id) = match task.object_id {
Some(CommentObjectId::Table(table_id)) => (Some(table_id), None),
Some(CommentObjectId::Flow(flow_id)) => (None, Some(flow_id)),
None => (None, None),
};
Self {
state: CommentOnState::Prepare,
catalog_name: task.catalog_name,
@@ -474,9 +494,9 @@ impl CommentOnData {
object_name: task.object_name,
column_name: task.column_name,
comment: task.comment,
table_id: None,
table_id,
table_info: None,
flow_id: None,
flow_id,
flow_info: None,
is_unchanged: false,
}

View File

@@ -574,9 +574,15 @@ impl DdlManager {
#[tracing::instrument(skip_all)]
pub async fn submit_comment_on_task(
&self,
comment_on_task: CommentOnTask,
mut comment_on_task: CommentOnTask,
) -> Result<(ProcedureId, Option<Output>)> {
let context = self.create_context();
comment_on_task
.enrich_object_id(
context.table_metadata_manager.table_name_manager(),
context.flow_metadata_manager.flow_name_manager(),
)
.await?;
let procedure = CommentOnProcedure::new(comment_on_task, context);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

View File

@@ -41,6 +41,7 @@ use api::v1::{
};
use base64::Engine as _;
use base64::engine::general_purpose;
use common_catalog::{format_full_flow_name, format_full_table_name};
use common_error::ext::BoxedError;
use common_time::{DatabaseTimeToLive, Timestamp};
use prost::Message;
@@ -56,7 +57,11 @@ use crate::error::{
self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
InvalidUnsetDatabaseOptionSnafu, Result,
};
use crate::flow_name::FlowName;
use crate::instruction::CacheIdent;
use crate::key::FlowId;
use crate::key::flow::flow_name::FlowNameManager;
use crate::key::table_name::{TableNameKey, TableNameManager};
/// Reserved query-context extension key for the frontend peer address that submitted a DDL request.
pub const ORIGIN_FRONTEND_ADDR_EXTENSION_KEY: &str = "__greptime_origin_frontend.addr";
@@ -1351,13 +1356,99 @@ pub enum CommentObjectType {
}
impl CommentOnTask {
pub fn table_ref(&self) -> TableReference<'_> {
TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: &self.object_name,
pub fn table_id(&self) -> Option<TableId> {
match self.object_id.as_ref() {
Some(CommentObjectId::Table(table_id)) => Some(*table_id),
_ => None,
}
}
pub fn flow_id(&self) -> Option<FlowId> {
match self.object_id.as_ref() {
Some(CommentObjectId::Flow(flow_id)) => Some(*flow_id),
_ => None,
}
}
fn set_table_id(&mut self, table_id: TableId) {
self.object_id = Some(CommentObjectId::Table(table_id));
}
fn set_flow_id(&mut self, flow_id: FlowId) {
self.object_id = Some(CommentObjectId::Flow(flow_id));
}
/// Returns the cache identifiers for the object being commented on.
pub fn cache_idents(&self) -> Vec<CacheIdent> {
match self.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
let mut cache_idents = Vec::with_capacity(2);
if let Some(CommentObjectId::Table(table_id)) = self.object_id.as_ref() {
cache_idents.push(CacheIdent::TableId(*table_id));
}
cache_idents.push(CacheIdent::TableName(TableName {
catalog_name: self.catalog_name.clone(),
schema_name: self.schema_name.clone(),
table_name: self.object_name.clone(),
}));
cache_idents
}
CommentObjectType::Flow => {
let mut cache_idents = Vec::with_capacity(2);
if let Some(CommentObjectId::Flow(flow_id)) = self.object_id.as_ref() {
cache_idents.push(CacheIdent::FlowId(*flow_id));
}
cache_idents.push(CacheIdent::FlowName(FlowName {
catalog_name: self.catalog_name.clone(),
flow_name: self.object_name.clone(),
}));
cache_idents
}
}
}
/// Enriches the `object_id` field of the `CommentOnTask`
/// by looking up the corresponding table or flow ID using the provided managers.
pub async fn enrich_object_id(
&mut self,
table_name_manager: &TableNameManager,
flow_name_manager: &FlowNameManager,
) -> Result<()> {
match self.object_type {
CommentObjectType::Table | CommentObjectType::Column => {
let table_id = table_name_manager
.get(TableNameKey::new(
&self.catalog_name,
&self.schema_name,
&self.object_name,
))
.await?
.with_context(|| error::TableNotFoundSnafu {
table_name: format_full_table_name(
&self.catalog_name,
&self.schema_name,
&self.object_name,
),
})?
.table_id();
self.set_table_id(table_id);
}
CommentObjectType::Flow => {
let flow_id = flow_name_manager
.get(&self.catalog_name, &self.object_name)
.await?
.with_context(|| error::FlowNotFoundSnafu {
flow_name: format_full_flow_name(&self.catalog_name, &self.object_name),
})?
.flow_id();
self.set_flow_id(flow_id);
}
}
Ok(())
}
}
// Proto conversions for CommentObjectType

View File

@@ -14,6 +14,7 @@
use api::v1::CommentOnExpr;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::procedure_executor::ExecutorContext;
use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest};
use common_query::Output;
@@ -23,7 +24,9 @@ use snafu::ResultExt;
use sql::ast::ObjectNamePartExt;
use sql::statements::comment::{Comment, CommentObject};
use crate::error::{ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result};
use crate::error::{
self, ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result, TableMetadataManagerSnafu,
};
use crate::statement::StatementExecutor;
use crate::utils::to_meta_query_context;
@@ -39,7 +42,15 @@ impl StatementExecutor {
///
/// A `Result` containing the `Output` of the operation, or an error if the operation fails.
pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result<Output> {
let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?;
let mut comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?;
comment_on_task
.enrich_object_id(
self.table_metadata_manager.table_name_manager(),
self.flow_metadata_manager.flow_name_manager(),
)
.await
.context(TableMetadataManagerSnafu)?;
let cache_idents = comment_on_task.cache_idents();
let request = SubmitDdlTaskRequest::new(
to_meta_query_context(query_ctx),
@@ -49,8 +60,15 @@ impl StatementExecutor {
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
.map(|_| Output::new_with_affected_rows(0))
.context(ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &cache_idents)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
pub async fn comment_by_expr(
@@ -58,7 +76,15 @@ impl StatementExecutor {
expr: CommentOnExpr,
query_ctx: QueryContextRef,
) -> Result<Output> {
let comment_on_task = self.create_comment_on_task_from_expr(expr)?;
let mut comment_on_task = self.create_comment_on_task_from_expr(expr)?;
comment_on_task
.enrich_object_id(
self.table_metadata_manager.table_name_manager(),
self.flow_metadata_manager.flow_name_manager(),
)
.await
.context(TableMetadataManagerSnafu)?;
let cache_idents = comment_on_task.cache_idents();
let request = SubmitDdlTaskRequest::new(
to_meta_query_context(query_ctx),
@@ -68,8 +94,15 @@ impl StatementExecutor {
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(ExecuteDdlSnafu)
.map(|_| Output::new_with_affected_rows(0))
.context(ExecuteDdlSnafu)?;
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate(&Context::default(), &cache_idents)
.await
.context(error::InvalidateTableCacheSnafu)?;
Ok(Output::new_with_affected_rows(0))
}
fn create_comment_on_task_from_expr(&self, expr: CommentOnExpr) -> Result<CommentOnTask> {