diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 5b16fccd8a..a2d46e3dba 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -17,40 +17,43 @@ use api::v1::query_request::Query; use async_trait::async_trait; use common_query::Output; use query::parser::PromQuery; +use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt}; -use crate::error::{self, Result}; +use crate::error::{Error, IncompleteGrpcResultSnafu, NotSupportedSnafu, Result}; use crate::instance::Instance; #[async_trait] impl GrpcQueryHandler for Instance { - type Error = error::Error; + type Error = Error; async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { + let interceptor_ref = self.plugins.get::>(); + let interceptor = interceptor_ref.as_ref(); + interceptor.pre_execute(&request, ctx.clone())?; + let output = match request { - Request::Insert(request) => self.handle_insert(request, ctx).await?, + Request::Insert(request) => self.handle_insert(request, ctx.clone()).await?, Request::Query(query_request) => { - let query = query_request - .query - .context(error::IncompleteGrpcResultSnafu { - err_msg: "Missing field 'QueryRequest.query'", - })?; + let query = query_request.query.context(IncompleteGrpcResultSnafu { + err_msg: "Missing field 'QueryRequest.query'", + })?; match query { Query::Sql(sql) => { - let mut result = SqlQueryHandler::do_query(self, &sql, ctx).await; + let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await; ensure!( result.len() == 1, - error::NotSupportedSnafu { + NotSupportedSnafu { feat: "execute multiple statements in SQL query string through GRPC interface" } ); result.remove(0)? } Query::LogicalPlan(_) => { - return error::NotSupportedSnafu { + return NotSupportedSnafu { feat: "Execute LogicalPlan in Frontend", } .fail(); @@ -63,10 +66,10 @@ impl GrpcQueryHandler for Instance { step: promql.step, }; let mut result = - SqlQueryHandler::do_promql_query(self, &prom_query, ctx).await; + SqlQueryHandler::do_promql_query(self, &prom_query, ctx.clone()).await; ensure!( result.len() == 1, - error::NotSupportedSnafu { + NotSupportedSnafu { feat: "execute multiple statements in PromQL query string through GRPC interface" } ); @@ -75,9 +78,12 @@ impl GrpcQueryHandler for Instance { } } Request::Ddl(_) | Request::Delete(_) => { - GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx).await? + GrpcQueryHandler::do_query(self.grpc_query_handler.as_ref(), request, ctx.clone()) + .await? } }; + + let output = interceptor.post_execute(output, ctx)?; Ok(output) } } diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index fa1cf83862..7471c45547 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -15,6 +15,7 @@ use std::borrow::Cow; use std::sync::Arc; +use api::v1::greptime_request::Request; use common_error::prelude::ErrorExt; use common_query::Output; use query::plan::LogicalPlan; @@ -126,3 +127,62 @@ where } } } + +/// GrpcQueryInterceptor can track life cycle of a grpc request and customize or +/// abort its execution at given point. +pub trait GrpcQueryInterceptor { + type Error: ErrorExt; + + /// Called before request is actually executed. + fn pre_execute( + &self, + _request: &Request, + _query_ctx: QueryContextRef, + ) -> Result<(), Self::Error> { + Ok(()) + } + + /// Called after execution finished. The implementation can modify the + /// output if needed. + fn post_execute( + &self, + output: Output, + _query_ctx: QueryContextRef, + ) -> Result { + Ok(output) + } +} + +pub type GrpcQueryInterceptorRef = + Arc + Send + Sync + 'static>; + +impl GrpcQueryInterceptor for Option<&GrpcQueryInterceptorRef> +where + E: ErrorExt, +{ + type Error = E; + + fn pre_execute( + &self, + _request: &Request, + _query_ctx: QueryContextRef, + ) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_execute(_request, _query_ctx) + } else { + Ok(()) + } + } + + fn post_execute( + &self, + output: Output, + _query_ctx: QueryContextRef, + ) -> Result { + if let Some(this) = self { + this.post_execute(output, _query_ctx) + } else { + Ok(output) + } + } +} diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index 593fa89207..9aca93b561 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -15,9 +15,12 @@ use std::borrow::Cow; use std::sync::Arc; -use servers::error::{self, Result}; -use servers::interceptor::SqlQueryInterceptor; +use api::v1::greptime_request::Request; +use api::v1::InsertRequest; +use servers::error::{self, NotSupportedSnafu, Result}; +use servers::interceptor::{GrpcQueryInterceptor, SqlQueryInterceptor}; use session::context::{QueryContext, QueryContextRef}; +use snafu::ensure; pub struct NoopInterceptor; @@ -38,3 +41,46 @@ fn test_default_interceptor_behaviour() { let query = "SELECT 1"; assert_eq!("SELECT 1;", di.pre_parsing(query, ctx).unwrap()); } + +impl GrpcQueryInterceptor for NoopInterceptor { + type Error = error::Error; + + fn pre_execute( + &self, + req: &Request, + _query_ctx: QueryContextRef, + ) -> std::result::Result<(), Self::Error> { + match req { + Request::Insert(insert) => { + ensure!( + insert.region_number == 0, + NotSupportedSnafu { + feat: "region not 0" + } + ) + } + _ => { + unreachable!() + } + }; + Ok(()) + } +} + +#[test] +fn test_grpc_interceptor() { + let di = NoopInterceptor; + let ctx = Arc::new(QueryContext::new()); + + let req = Request::Insert(InsertRequest { + region_number: 1, + ..Default::default() + }); + + let fail = GrpcQueryInterceptor::pre_execute(&di, &req, ctx.clone()); + assert!(fail.is_err()); + + let req = Request::Insert(InsertRequest::default()); + let success = GrpcQueryInterceptor::pre_execute(&di, &req, ctx); + assert!(success.is_ok()); +}