mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 07:20:41 +00:00
feat: support syntax parsing of drop trigger (#6371)
* feat: trigger drop * Update Cargo.toml * Update Cargo.lock --------- Co-authored-by: Ning Sun <classicning@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5144,7 +5144,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=819a9495507c7e33186b70814dc356a14e1ffb45#819a9495507c7e33186b70814dc356a14e1ffb45"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=464226cf8a4a22696503536a123d0b9e318582f4#464226cf8a4a22696503536a123d0b9e318582f4"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
|
||||
@@ -134,7 +134,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "819a9495507c7e33186b70814dc356a14e1ffb45" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "464226cf8a4a22696503536a123d0b9e318582f4" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -31,6 +31,7 @@ excludes = [
|
||||
"src/operator/src/expr_helper/trigger.rs",
|
||||
"src/sql/src/statements/create/trigger.rs",
|
||||
"src/sql/src/statements/show/trigger.rs",
|
||||
"src/sql/src/statements/drop/trigger.rs",
|
||||
"src/sql/src/parsers/create_parser/trigger.rs",
|
||||
"src/sql/src/parsers/show_parser/trigger.rs",
|
||||
]
|
||||
|
||||
@@ -50,7 +50,11 @@ use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::rpc::ddl::trigger::CreateTriggerTask;
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::rpc::ddl::trigger::DropTriggerTask;
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::rpc::ddl::DdlTask::CreateTrigger;
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::rpc::ddl::DdlTask::DropTrigger;
|
||||
use crate::rpc::ddl::DdlTask::{
|
||||
AlterDatabase, AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables,
|
||||
CreateTable, CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView,
|
||||
@@ -91,6 +95,14 @@ pub trait TriggerDdlManager: Send + Sync {
|
||||
query_context: QueryContext,
|
||||
) -> Result<SubmitDdlTaskResponse>;
|
||||
|
||||
async fn drop_trigger(
|
||||
&self,
|
||||
drop_trigger_task: DropTriggerTask,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
ddl_context: DdlContext,
|
||||
query_context: QueryContext,
|
||||
) -> Result<SubmitDdlTaskResponse>;
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any;
|
||||
}
|
||||
|
||||
@@ -640,6 +652,28 @@ async fn handle_drop_flow_task(
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
async fn handle_drop_trigger_task(
|
||||
ddl_manager: &DdlManager,
|
||||
drop_trigger_task: DropTriggerTask,
|
||||
query_context: QueryContext,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let Some(m) = ddl_manager.trigger_ddl_manager.as_ref() else {
|
||||
return UnsupportedSnafu {
|
||||
operation: "drop trigger",
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
|
||||
m.drop_trigger(
|
||||
drop_trigger_task,
|
||||
ddl_manager.procedure_manager.clone(),
|
||||
ddl_manager.ddl_context.clone(),
|
||||
query_context,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn handle_drop_view_task(
|
||||
ddl_manager: &DdlManager,
|
||||
drop_view_task: DropViewTask,
|
||||
@@ -827,6 +861,11 @@ impl ProcedureExecutor for DdlManager {
|
||||
handle_create_flow_task(self, create_flow_task, request.query_context.into())
|
||||
.await
|
||||
}
|
||||
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
|
||||
CreateView(create_view_task) => {
|
||||
handle_create_view_task(self, create_view_task).await
|
||||
}
|
||||
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
|
||||
#[cfg(feature = "enterprise")]
|
||||
CreateTrigger(create_trigger_task) => {
|
||||
handle_create_trigger_task(
|
||||
@@ -836,11 +875,11 @@ impl ProcedureExecutor for DdlManager {
|
||||
)
|
||||
.await
|
||||
}
|
||||
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
|
||||
CreateView(create_view_task) => {
|
||||
handle_create_view_task(self, create_view_task).await
|
||||
#[cfg(feature = "enterprise")]
|
||||
DropTrigger(drop_trigger_task) => {
|
||||
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
|
||||
.await
|
||||
}
|
||||
DropView(drop_view_task) => handle_drop_view_task(self, drop_view_task).await,
|
||||
}
|
||||
}
|
||||
.trace(span)
|
||||
|
||||
@@ -69,6 +69,8 @@ pub enum DdlTask {
|
||||
AlterDatabase(AlterDatabaseTask),
|
||||
CreateFlow(CreateFlowTask),
|
||||
DropFlow(DropFlowTask),
|
||||
#[cfg(feature = "enterprise")]
|
||||
DropTrigger(trigger::DropTriggerTask),
|
||||
CreateView(CreateViewTask),
|
||||
DropView(DropViewTask),
|
||||
#[cfg(feature = "enterprise")]
|
||||
@@ -259,6 +261,18 @@ impl TryFrom<Task> for DdlTask {
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
Task::DropTriggerTask(drop_trigger) => {
|
||||
#[cfg(feature = "enterprise")]
|
||||
return Ok(DdlTask::DropTrigger(drop_trigger.try_into()?));
|
||||
#[cfg(not(feature = "enterprise"))]
|
||||
{
|
||||
let _ = drop_trigger;
|
||||
crate::error::UnsupportedSnafu {
|
||||
operation: "drop trigger",
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -311,6 +325,8 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
|
||||
DdlTask::DropView(task) => Task::DropViewTask(task.into()),
|
||||
#[cfg(feature = "enterprise")]
|
||||
DdlTask::CreateTrigger(task) => Task::CreateTriggerTask(task.into()),
|
||||
#[cfg(feature = "enterprise")]
|
||||
DdlTask::DropTrigger(task) => Task::DropTriggerTask(task.into()),
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::CreateTriggerTask as PbCreateTriggerTask;
|
||||
use api::v1::meta::{
|
||||
CreateTriggerTask as PbCreateTriggerTask, DropTriggerTask as PbDropTriggerTask,
|
||||
};
|
||||
use api::v1::notify_channel::ChannelType as PbChannelType;
|
||||
use api::v1::{
|
||||
CreateTriggerExpr, NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
|
||||
CreateTriggerExpr as PbCreateTriggerExpr, DropTriggerExpr as PbDropTriggerExpr,
|
||||
NotifyChannel as PbNotifyChannel, WebhookOptions as PbWebhookOptions,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
@@ -56,7 +59,7 @@ impl From<CreateTriggerTask> for PbCreateTriggerTask {
|
||||
.map(PbNotifyChannel::from)
|
||||
.collect();
|
||||
|
||||
let expr = CreateTriggerExpr {
|
||||
let expr = PbCreateTriggerExpr {
|
||||
catalog_name: task.catalog_name,
|
||||
trigger_name: task.trigger_name,
|
||||
create_if_not_exists: task.if_not_exists,
|
||||
@@ -139,17 +142,86 @@ impl TryFrom<PbNotifyChannel> for NotifyChannel {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DropTriggerTask {
|
||||
pub catalog_name: String,
|
||||
pub trigger_name: String,
|
||||
pub drop_if_exists: bool,
|
||||
}
|
||||
|
||||
impl From<DropTriggerTask> for PbDropTriggerTask {
|
||||
fn from(task: DropTriggerTask) -> Self {
|
||||
let expr = PbDropTriggerExpr {
|
||||
catalog_name: task.catalog_name,
|
||||
trigger_name: task.trigger_name,
|
||||
drop_if_exists: task.drop_if_exists,
|
||||
};
|
||||
|
||||
PbDropTriggerTask {
|
||||
drop_trigger: Some(expr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<PbDropTriggerTask> for DropTriggerTask {
|
||||
type Error = error::Error;
|
||||
|
||||
fn try_from(task: PbDropTriggerTask) -> Result<Self> {
|
||||
let expr = task.drop_trigger.context(error::InvalidProtoMsgSnafu {
|
||||
err_msg: "expected drop_trigger",
|
||||
})?;
|
||||
|
||||
Ok(DropTriggerTask {
|
||||
catalog_name: expr.catalog_name,
|
||||
trigger_name: expr.trigger_name,
|
||||
drop_if_exists: expr.drop_if_exists,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl DdlTask {
|
||||
/// Creates a [`DdlTask`] to create a trigger.
|
||||
pub fn new_create_trigger(expr: CreateTriggerTask) -> Self {
|
||||
DdlTask::CreateTrigger(expr)
|
||||
}
|
||||
|
||||
/// Creates a [`DdlTask`] to drop a trigger.
|
||||
pub fn new_drop_trigger(expr: DropTriggerTask) -> Self {
|
||||
DdlTask::DropTrigger(expr)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_convert_drop_trigger_task() {
|
||||
let original = DropTriggerTask {
|
||||
catalog_name: "test_catalog".to_string(),
|
||||
trigger_name: "test_trigger".to_string(),
|
||||
drop_if_exists: true,
|
||||
};
|
||||
|
||||
let pb_task: PbDropTriggerTask = original.clone().into();
|
||||
|
||||
let expr = pb_task.drop_trigger.as_ref().unwrap();
|
||||
assert_eq!(expr.catalog_name, "test_catalog");
|
||||
assert_eq!(expr.trigger_name, "test_trigger");
|
||||
assert!(expr.drop_if_exists);
|
||||
|
||||
let round_tripped = DropTriggerTask::try_from(pb_task).unwrap();
|
||||
|
||||
assert_eq!(original.catalog_name, round_tripped.catalog_name);
|
||||
assert_eq!(original.trigger_name, round_tripped.trigger_name);
|
||||
assert_eq!(original.drop_if_exists, round_tripped.drop_if_exists);
|
||||
|
||||
// Test invalid case where drop_trigger is None
|
||||
let invalid_task = PbDropTriggerTask { drop_trigger: None };
|
||||
let result = DropTriggerTask::try_from(invalid_task);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_create_trigger_task() {
|
||||
let original = CreateTriggerTask {
|
||||
|
||||
@@ -513,6 +513,8 @@ pub fn check_permission(
|
||||
| Statement::AlterDatabase(_)
|
||||
| Statement::DropFlow(_)
|
||||
| Statement::Use(_) => {}
|
||||
#[cfg(feature = "enterprise")]
|
||||
Statement::DropTrigger(_) => {}
|
||||
Statement::ShowCreateDatabase(stmt) => {
|
||||
validate_database(&stmt.database_name, query_ctx)?;
|
||||
}
|
||||
|
||||
@@ -244,6 +244,16 @@ impl StatementExecutor {
|
||||
)
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "enterprise")]
|
||||
Statement::DropTrigger(stmt) => {
|
||||
self.drop_trigger(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
format_raw_object_name(stmt.trigger_name()),
|
||||
stmt.drop_if_exists(),
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Statement::CreateView(stmt) => {
|
||||
let _ = self.create_view(stmt, query_ctx).await?;
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
|
||||
@@ -37,6 +37,8 @@ use common_meta::key::schema_name::{SchemaName, SchemaNameKey};
|
||||
use common_meta::key::NAME_PATTERN;
|
||||
#[cfg(feature = "enterprise")]
|
||||
use common_meta::rpc::ddl::trigger::CreateTriggerTask;
|
||||
#[cfg(feature = "enterprise")]
|
||||
use common_meta::rpc::ddl::trigger::DropTriggerTask;
|
||||
use common_meta::rpc::ddl::{
|
||||
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
|
||||
SubmitDdlTaskResponse,
|
||||
@@ -848,6 +850,41 @@ impl StatementExecutor {
|
||||
.context(error::ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn drop_trigger(
|
||||
&self,
|
||||
catalog_name: String,
|
||||
trigger_name: String,
|
||||
drop_if_exists: bool,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
let task = DropTriggerTask {
|
||||
catalog_name,
|
||||
trigger_name,
|
||||
drop_if_exists,
|
||||
};
|
||||
self.drop_trigger_procedure(task, query_context).await?;
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
async fn drop_trigger_procedure(
|
||||
&self,
|
||||
expr: DropTriggerTask,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest {
|
||||
query_context,
|
||||
task: DdlTask::new_drop_trigger(expr),
|
||||
};
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
.await
|
||||
.context(error::ExecuteDdlSnafu)
|
||||
}
|
||||
|
||||
/// Drop a view
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(crate) async fn drop_view(
|
||||
|
||||
@@ -195,6 +195,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
#[snafu(display("Invalid trigger name: {}", name))]
|
||||
InvalidTriggerName {
|
||||
name: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid flow query: {}", reason))]
|
||||
InvalidFlowQuery {
|
||||
reason: String,
|
||||
@@ -445,6 +453,9 @@ impl ErrorExt for Error {
|
||||
| UnsupportedUnaryOp { .. }
|
||||
| ConvertStr { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
InvalidTriggerName { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
InvalidTriggerWebhookOption { .. } | NegativeInterval { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
|
||||
@@ -18,6 +18,8 @@ use sqlparser::tokenizer::Token;
|
||||
|
||||
use crate::error::{self, InvalidFlowNameSnafu, InvalidTableNameSnafu, Result};
|
||||
use crate::parser::{ParserContext, FLOW};
|
||||
#[cfg(feature = "enterprise")]
|
||||
use crate::statements::drop::trigger::DropTrigger;
|
||||
use crate::statements::drop::{DropDatabase, DropFlow, DropTable, DropView};
|
||||
use crate::statements::statement::Statement;
|
||||
|
||||
@@ -29,6 +31,8 @@ impl ParserContext<'_> {
|
||||
Token::Word(w) => match w.keyword {
|
||||
Keyword::TABLE => self.parse_drop_table(),
|
||||
Keyword::VIEW => self.parse_drop_view(),
|
||||
#[cfg(feature = "enterprise")]
|
||||
Keyword::TRIGGER => self.parse_drop_trigger(),
|
||||
Keyword::SCHEMA | Keyword::DATABASE => self.parse_drop_database(),
|
||||
Keyword::NoKeyword => {
|
||||
let uppercase = w.value.to_uppercase();
|
||||
@@ -43,6 +47,31 @@ impl ParserContext<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
fn parse_drop_trigger(&mut self) -> Result<Statement> {
|
||||
let _ = self.parser.next_token();
|
||||
|
||||
let if_exists = self.parser.parse_keywords(&[Keyword::IF, Keyword::EXISTS]);
|
||||
let raw_trigger_ident =
|
||||
self.parse_object_name()
|
||||
.with_context(|_| error::UnexpectedSnafu {
|
||||
expected: "a trigger name",
|
||||
actual: self.peek_token_as_string(),
|
||||
})?;
|
||||
let trigger_ident = Self::canonicalize_object_name(raw_trigger_ident);
|
||||
ensure!(
|
||||
!trigger_ident.0.is_empty(),
|
||||
error::InvalidTriggerNameSnafu {
|
||||
name: trigger_ident.to_string()
|
||||
}
|
||||
);
|
||||
|
||||
Ok(Statement::DropTrigger(DropTrigger::new(
|
||||
trigger_ident,
|
||||
if_exists,
|
||||
)))
|
||||
}
|
||||
|
||||
fn parse_drop_view(&mut self) -> Result<Statement> {
|
||||
let _ = self.parser.next_token();
|
||||
|
||||
|
||||
@@ -12,6 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
pub mod trigger;
|
||||
|
||||
use std::fmt::Display;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
70
src/sql/src/statements/drop/trigger.rs
Normal file
70
src/sql/src/statements/drop/trigger.rs
Normal file
@@ -0,0 +1,70 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use serde::Serialize;
|
||||
use sqlparser::ast::ObjectName;
|
||||
use sqlparser_derive::{Visit, VisitMut};
|
||||
|
||||
/// `DROP TRIGGER` statement.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Visit, VisitMut, Serialize)]
|
||||
pub struct DropTrigger {
|
||||
trigger_name: ObjectName,
|
||||
drop_if_exists: bool,
|
||||
}
|
||||
|
||||
impl DropTrigger {
|
||||
pub fn new(trigger_name: ObjectName, if_exists: bool) -> Self {
|
||||
Self {
|
||||
trigger_name,
|
||||
drop_if_exists: if_exists,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn trigger_name(&self) -> &ObjectName {
|
||||
&self.trigger_name
|
||||
}
|
||||
|
||||
pub fn drop_if_exists(&self) -> bool {
|
||||
self.drop_if_exists
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for DropTrigger {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str("DROP TRIGGER")?;
|
||||
if self.drop_if_exists() {
|
||||
f.write_str(" IF EXISTS")?;
|
||||
}
|
||||
let trigger_name = self.trigger_name();
|
||||
write!(f, r#" {trigger_name}"#)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use sqlparser::ast::Ident;
|
||||
use sqlparser::tokenizer::Span;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_drop_trigger_display() {
|
||||
let ident = Ident {
|
||||
value: "my_trigger".to_string(),
|
||||
quote_style: None,
|
||||
span: Span::empty(),
|
||||
};
|
||||
let trigger_name = ObjectName(vec![ident]);
|
||||
|
||||
let drop_trigger = DropTrigger::new(trigger_name.clone(), true);
|
||||
assert_eq!(
|
||||
drop_trigger.to_string(),
|
||||
"DROP TRIGGER IF EXISTS my_trigger"
|
||||
);
|
||||
|
||||
let drop_trigger_no_if_exists = DropTrigger::new(trigger_name, false);
|
||||
assert_eq!(
|
||||
drop_trigger_no_if_exists.to_string(),
|
||||
"DROP TRIGGER my_trigger"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -72,6 +72,9 @@ pub enum Statement {
|
||||
DropDatabase(DropDatabase),
|
||||
// DROP FLOW
|
||||
DropFlow(DropFlow),
|
||||
// DROP Trigger
|
||||
#[cfg(feature = "enterprise")]
|
||||
DropTrigger(crate::statements::drop::trigger::DropTrigger),
|
||||
// DROP View
|
||||
DropView(DropView),
|
||||
// CREATE DATABASE
|
||||
@@ -158,6 +161,8 @@ impl Display for Statement {
|
||||
#[cfg(feature = "enterprise")]
|
||||
Statement::CreateTrigger(s) => s.fmt(f),
|
||||
Statement::DropFlow(s) => s.fmt(f),
|
||||
#[cfg(feature = "enterprise")]
|
||||
Statement::DropTrigger(s) => s.fmt(f),
|
||||
Statement::DropTable(s) => s.fmt(f),
|
||||
Statement::DropDatabase(s) => s.fmt(f),
|
||||
Statement::DropView(s) => s.fmt(f),
|
||||
|
||||
Reference in New Issue
Block a user