From 7953b090c0fae128d1f7acac3cbba5cdfbd6a31e Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Wed, 25 Jun 2025 02:48:39 +0800 Subject: [PATCH] feat: support syntax parsing of drop trigger (#6371) * feat: trigger drop * Update Cargo.toml * Update Cargo.lock --------- Co-authored-by: Ning Sun --- Cargo.lock | 2 +- Cargo.toml | 2 +- licenserc.toml | 1 + src/common/meta/src/ddl_manager.rs | 47 ++++++++++++++-- src/common/meta/src/rpc/ddl.rs | 16 ++++++ src/common/meta/src/rpc/ddl/trigger.rs | 78 +++++++++++++++++++++++++- src/frontend/src/instance.rs | 2 + src/operator/src/statement.rs | 10 ++++ src/operator/src/statement/ddl.rs | 37 ++++++++++++ src/sql/src/error.rs | 11 ++++ src/sql/src/parsers/drop_parser.rs | 29 ++++++++++ src/sql/src/statements/drop.rs | 3 + src/sql/src/statements/drop/trigger.rs | 70 +++++++++++++++++++++++ src/sql/src/statements/statement.rs | 5 ++ 14 files changed, 304 insertions(+), 9 deletions(-) create mode 100644 src/sql/src/statements/drop/trigger.rs diff --git a/Cargo.lock b/Cargo.lock index fbe70027a3..a9babe2380 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5144,7 +5144,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=819a9495507c7e33186b70814dc356a14e1ffb45#819a9495507c7e33186b70814dc356a14e1ffb45" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=464226cf8a4a22696503536a123d0b9e318582f4#464226cf8a4a22696503536a123d0b9e318582f4" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4af4c93814..e38447a536 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -134,7 +134,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "819a9495507c7e33186b70814dc356a14e1ffb45" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "464226cf8a4a22696503536a123d0b9e318582f4" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/licenserc.toml b/licenserc.toml index 4a727b6b1b..44d644e76b 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -31,6 +31,7 @@ excludes = [ "src/operator/src/expr_helper/trigger.rs", "src/sql/src/statements/create/trigger.rs", "src/sql/src/statements/show/trigger.rs", + "src/sql/src/statements/drop/trigger.rs", "src/sql/src/parsers/create_parser/trigger.rs", "src/sql/src/parsers/show_parser/trigger.rs", ] diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index abd739ae4a..d840284c07 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -50,7 +50,11 @@ use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; #[cfg(feature = "enterprise")] use crate::rpc::ddl::trigger::CreateTriggerTask; #[cfg(feature = "enterprise")] +use crate::rpc::ddl::trigger::DropTriggerTask; +#[cfg(feature = "enterprise")] use crate::rpc::ddl::DdlTask::CreateTrigger; +#[cfg(feature = "enterprise")] +use crate::rpc::ddl::DdlTask::DropTrigger; use crate::rpc::ddl::DdlTask::{ AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, @@ -91,6 +95,14 @@ pub trait TriggerDdlManager: Send + Sync { query_context: QueryContext, ) -> Result; + async fn drop_trigger( + &self, + drop_trigger_task: DropTriggerTask, + procedure_manager: ProcedureManagerRef, + ddl_context: DdlContext, + query_context: QueryContext, + ) -> Result; + fn as_any(&self) -> &dyn std::any::Any; } @@ -640,6 +652,28 @@ async fn handle_drop_flow_task( }) } +#[cfg(feature = "enterprise")] +async fn handle_drop_trigger_task( + ddl_manager: &DdlManager, + drop_trigger_task: DropTriggerTask, + query_context: QueryContext, +) -> Result { + let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else { + return UnsupportedSnafu { + operation: "drop trigger", + } + .fail(); + }; + + m.drop_trigger( + drop_trigger_task, + ddl_manager.procedure_manager.clone(), + ddl_manager.ddl_context.clone(), + query_context, + ) + .await +} + async fn handle_drop_view_task( ddl_manager: &DdlManager, drop_view_task: DropViewTask, @@ -827,6 +861,11 @@ impl ProcedureExecutor for DdlManager { handle_create_flow_task(self, create_flow_task, request.query_context.into()) .await } + DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await, + CreateView(create_view_task) => { + handle_create_view_task(self, create_view_task).await + } + DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await, #[cfg(feature = "enterprise")] CreateTrigger(create_trigger_task) => { handle_create_trigger_task( @@ -836,11 +875,11 @@ impl ProcedureExecutor for DdlManager { ) .await } - DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await, - CreateView(create_view_task) => { - handle_create_view_task(self, create_view_task).await + #[cfg(feature = "enterprise")] + DropTrigger(drop_trigger_task) => { + handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into()) + .await } - DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await, } } .trace(span) diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 99eb24cebb..2f07ee68e0 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -69,6 +69,8 @@ pub enum DdlTask { AlterDatabase(AlterDatabaseTask), CreateFlow(CreateFlowTask), DropFlow(DropFlowTask), + #[cfg(feature = "enterprise")] + DropTrigger(trigger::DropTriggerTask), CreateView(CreateViewTask), DropView(DropViewTask), #[cfg(feature = "enterprise")] @@ -259,6 +261,18 @@ impl TryFrom for DdlTask { .fail() } } + Task::DropTriggerTask(drop_trigger) => { + #[cfg(feature = "enterprise")] + return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?)); + #[cfg(not(feature = "enterprise"))] + { + let _ = drop_trigger; + crate::error::UnsupportedSnafu { + operation: "drop trigger", + } + .fail() + } + } } } } @@ -311,6 +325,8 @@ impl TryFrom for PbDdlTaskRequest { DdlTask::DropView(task) => Task::DropViewTask(task.into()), #[cfg(feature = "enterprise")] DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()), + #[cfg(feature = "enterprise")] + DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()), }; Ok(Self { diff --git a/src/common/meta/src/rpc/ddl/trigger.rs b/src/common/meta/src/rpc/ddl/trigger.rs index 8d957efc2a..28bc49d108 100644 --- a/src/common/meta/src/rpc/ddl/trigger.rs +++ b/src/common/meta/src/rpc/ddl/trigger.rs @@ -1,10 +1,13 @@ use std::collections::HashMap; use std::time::Duration; -use api::v1::meta::CreateTriggerTask as PbCreateTriggerTask; +use api::v1::meta::{ + CreateTriggerTask as PbCreateTriggerTask, DropTriggerTask as PbDropTriggerTask, +}; use api::v1::notify_channel::ChannelType as PbChannelType; use api::v1::{ - CreateTriggerExpr, NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions, + CreateTriggerExpr as PbCreateTriggerExpr, DropTriggerExpr as PbDropTriggerExpr, + NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions, }; use serde::{Deserialize, Serialize}; use snafu::OptionExt; @@ -56,7 +59,7 @@ impl From for PbCreateTriggerTask { .map(PbNotifyChannel::from) .collect(); - let expr = CreateTriggerExpr { + let expr = PbCreateTriggerExpr { catalog_name: task.catalog_name, trigger_name: task.trigger_name, create_if_not_exists: task.if_not_exists, @@ -139,17 +142,86 @@ impl TryFrom for NotifyChannel { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DropTriggerTask { + pub catalog_name: String, + pub trigger_name: String, + pub drop_if_exists: bool, +} + +impl From for PbDropTriggerTask { + fn from(task: DropTriggerTask) -> Self { + let expr = PbDropTriggerExpr { + catalog_name: task.catalog_name, + trigger_name: task.trigger_name, + drop_if_exists: task.drop_if_exists, + }; + + PbDropTriggerTask { + drop_trigger: Some(expr), + } + } +} + +impl TryFrom for DropTriggerTask { + type Error = error::Error; + + fn try_from(task: PbDropTriggerTask) -> Result { + let expr = task.drop_trigger.context(error::InvalidProtoMsgSnafu { + err_msg: "expected drop_trigger", + })?; + + Ok(DropTriggerTask { + catalog_name: expr.catalog_name, + trigger_name: expr.trigger_name, + drop_if_exists: expr.drop_if_exists, + }) + } +} + impl DdlTask { /// Creates a [`DdlTask`] to create a trigger. pub fn new_create_trigger(expr: CreateTriggerTask) -> Self { DdlTask::CreateTrigger(expr) } + + /// Creates a [`DdlTask`] to drop a trigger. + pub fn new_drop_trigger(expr: DropTriggerTask) -> Self { + DdlTask::DropTrigger(expr) + } } #[cfg(test)] mod tests { use super::*; + #[test] + fn test_convert_drop_trigger_task() { + let original = DropTriggerTask { + catalog_name: "test_catalog".to_string(), + trigger_name: "test_trigger".to_string(), + drop_if_exists: true, + }; + + let pb_task: PbDropTriggerTask = original.clone().into(); + + let expr = pb_task.drop_trigger.as_ref().unwrap(); + assert_eq!(expr.catalog_name, "test_catalog"); + assert_eq!(expr.trigger_name, "test_trigger"); + assert!(expr.drop_if_exists); + + let round_tripped = DropTriggerTask::try_from(pb_task).unwrap(); + + assert_eq!(original.catalog_name, round_tripped.catalog_name); + assert_eq!(original.trigger_name, round_tripped.trigger_name); + assert_eq!(original.drop_if_exists, round_tripped.drop_if_exists); + + // Test invalid case where drop_trigger is None + let invalid_task = PbDropTriggerTask { drop_trigger: None }; + let result = DropTriggerTask::try_from(invalid_task); + assert!(result.is_err()); + } + #[test] fn test_convert_create_trigger_task() { let original = CreateTriggerTask { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index f10ad17a41..ddf728c1ba 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -513,6 +513,8 @@ pub fn check_permission( | Statement::AlterDatabase(_) | Statement::DropFlow(_) | Statement::Use(_) => {} + #[cfg(feature = "enterprise")] + Statement::DropTrigger(_) => {} Statement::ShowCreateDatabase(stmt) => { validate_database(&stmt.database_name, query_ctx)?; } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 988f0846dc..33d5a75de6 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -244,6 +244,16 @@ impl StatementExecutor { ) .await } + #[cfg(feature = "enterprise")] + Statement::DropTrigger(stmt) => { + self.drop_trigger( + query_ctx.current_catalog().to_string(), + format_raw_object_name(stmt.trigger_name()), + stmt.drop_if_exists(), + query_ctx, + ) + .await + } Statement::CreateView(stmt) => { let _ = self.create_view(stmt, query_ctx).await?; Ok(Output::new_with_affected_rows(0)) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 0e945b9604..264108052c 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -37,6 +37,8 @@ use common_meta::key::schema_name::{SchemaName, SchemaNameKey}; use common_meta::key::NAME_PATTERN; #[cfg(feature = "enterprise")] use common_meta::rpc::ddl::trigger::CreateTriggerTask; +#[cfg(feature = "enterprise")] +use common_meta::rpc::ddl::trigger::DropTriggerTask; use common_meta::rpc::ddl::{ CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, @@ -848,6 +850,41 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + #[cfg(feature = "enterprise")] + #[tracing::instrument(skip_all)] + pub(super) async fn drop_trigger( + &self, + catalog_name: String, + trigger_name: String, + drop_if_exists: bool, + query_context: QueryContextRef, + ) -> Result { + let task = DropTriggerTask { + catalog_name, + trigger_name, + drop_if_exists, + }; + self.drop_trigger_procedure(task, query_context).await?; + Ok(Output::new_with_affected_rows(0)) + } + + #[cfg(feature = "enterprise")] + async fn drop_trigger_procedure( + &self, + expr: DropTriggerTask, + query_context: QueryContextRef, + ) -> Result { + let request = SubmitDdlTaskRequest { + query_context, + task: DdlTask::new_drop_trigger(expr), + }; + + self.procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu) + } + /// Drop a view #[tracing::instrument(skip_all)] pub(crate) async fn drop_view( diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index b46233fe61..6efc14dff7 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -195,6 +195,14 @@ pub enum Error { location: Location, }, + #[cfg(feature = "enterprise")] + #[snafu(display("Invalid trigger name: {}", name))] + InvalidTriggerName { + name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid flow query: {}", reason))] InvalidFlowQuery { reason: String, @@ -445,6 +453,9 @@ impl ErrorExt for Error { | UnsupportedUnaryOp { .. } | ConvertStr { .. } => StatusCode::InvalidArguments, + #[cfg(feature = "enterprise")] + InvalidTriggerName { .. } => StatusCode::InvalidArguments, + #[cfg(feature = "enterprise")] InvalidTriggerWebhookOption { .. } | NegativeInterval { .. } => { StatusCode::InvalidArguments diff --git a/src/sql/src/parsers/drop_parser.rs b/src/sql/src/parsers/drop_parser.rs index d40338e85f..f23b31b724 100644 --- a/src/sql/src/parsers/drop_parser.rs +++ b/src/sql/src/parsers/drop_parser.rs @@ -18,6 +18,8 @@ use sqlparser::tokenizer::Token; use crate::error::{self, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result}; use crate::parser::{ParserContext, FLOW}; +#[cfg(feature = "enterprise")] +use crate::statements::drop::trigger::DropTrigger; use crate::statements::drop::{DropDatabase, DropFlow, DropTable, DropView}; use crate::statements::statement::Statement; @@ -29,6 +31,8 @@ impl ParserContext<'_> { Token::Word(w) => match w.keyword { Keyword::TABLE => self.parse_drop_table(), Keyword::VIEW => self.parse_drop_view(), + #[cfg(feature = "enterprise")] + Keyword::TRIGGER => self.parse_drop_trigger(), Keyword::SCHEMA | Keyword::DATABASE => self.parse_drop_database(), Keyword::NoKeyword => { let uppercase = w.value.to_uppercase(); @@ -43,6 +47,31 @@ impl ParserContext<'_> { } } + #[cfg(feature = "enterprise")] + fn parse_drop_trigger(&mut self) -> Result { + let _ = self.parser.next_token(); + + let if_exists = self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]); + let raw_trigger_ident = + self.parse_object_name() + .with_context(|_| error::UnexpectedSnafu { + expected: "a trigger name", + actual: self.peek_token_as_string(), + })?; + let trigger_ident = Self::canonicalize_object_name(raw_trigger_ident); + ensure!( + !trigger_ident.0.is_empty(), + error::InvalidTriggerNameSnafu { + name: trigger_ident.to_string() + } + ); + + Ok(Statement::DropTrigger(DropTrigger::new( + trigger_ident, + if_exists, + ))) + } + fn parse_drop_view(&mut self) -> Result { let _ = self.parser.next_token(); diff --git a/src/sql/src/statements/drop.rs b/src/sql/src/statements/drop.rs index 799722904d..1e97bee25c 100644 --- a/src/sql/src/statements/drop.rs +++ b/src/sql/src/statements/drop.rs @@ -12,6 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(feature = "enterprise")] +pub mod trigger; + use std::fmt::Display; use serde::Serialize; diff --git a/src/sql/src/statements/drop/trigger.rs b/src/sql/src/statements/drop/trigger.rs new file mode 100644 index 0000000000..8469397bbe --- /dev/null +++ b/src/sql/src/statements/drop/trigger.rs @@ -0,0 +1,70 @@ +use std::fmt::Display; + +use serde::Serialize; +use sqlparser::ast::ObjectName; +use sqlparser_derive::{Visit, VisitMut}; + +/// `DROP TRIGGER` statement. +#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)] +pub struct DropTrigger { + trigger_name: ObjectName, + drop_if_exists: bool, +} + +impl DropTrigger { + pub fn new(trigger_name: ObjectName, if_exists: bool) -> Self { + Self { + trigger_name, + drop_if_exists: if_exists, + } + } + + pub fn trigger_name(&self) -> &ObjectName { + &self.trigger_name + } + + pub fn drop_if_exists(&self) -> bool { + self.drop_if_exists + } +} + +impl Display for DropTrigger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DROP TRIGGER")?; + if self.drop_if_exists() { + f.write_str(" IF EXISTS")?; + } + let trigger_name = self.trigger_name(); + write!(f, r#" {trigger_name}"#) + } +} + +#[cfg(test)] +mod tests { + use sqlparser::ast::Ident; + use sqlparser::tokenizer::Span; + + use super::*; + + #[test] + fn test_drop_trigger_display() { + let ident = Ident { + value: "my_trigger".to_string(), + quote_style: None, + span: Span::empty(), + }; + let trigger_name = ObjectName(vec![ident]); + + let drop_trigger = DropTrigger::new(trigger_name.clone(), true); + assert_eq!( + drop_trigger.to_string(), + "DROP TRIGGER IF EXISTS my_trigger" + ); + + let drop_trigger_no_if_exists = DropTrigger::new(trigger_name, false); + assert_eq!( + drop_trigger_no_if_exists.to_string(), + "DROP TRIGGER my_trigger" + ); + } +} diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index 7ec2779278..546b240757 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -72,6 +72,9 @@ pub enum Statement { DropDatabase(DropDatabase), // DROP FLOW DropFlow(DropFlow), + // DROP Trigger + #[cfg(feature = "enterprise")] + DropTrigger(crate::statements::drop::trigger::DropTrigger), // DROP View DropView(DropView), // CREATE DATABASE @@ -158,6 +161,8 @@ impl Display for Statement { #[cfg(feature = "enterprise")] Statement::CreateTrigger(s) => s.fmt(f), Statement::DropFlow(s) => s.fmt(f), + #[cfg(feature = "enterprise")] + Statement::DropTrigger(s) => s.fmt(f), Statement::DropTable(s) => s.fmt(f), Statement::DropDatabase(s) => s.fmt(f), Statement::DropView(s) => s.fmt(f),