From 7b1c3503d01571260dd5ba8bb193bcf85dca2eab Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 5 Mar 2024 17:38:47 +0800 Subject: [PATCH] fix: complete interceptors for all frontend entry (#3428) --- src/frontend/src/instance/influxdb.rs | 6 +- src/frontend/src/instance/otlp.rs | 12 +++ src/frontend/src/instance/prom_store.rs | 9 ++ src/frontend/src/instance/script.rs | 8 ++ src/servers/src/interceptor.rs | 119 ++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 1 deletion(-) diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 36276b71e2..72fe0d92b4 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -15,8 +15,9 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; -use servers::error::AuthSnafu; +use servers::error::{AuthSnafu, Error}; use servers::influxdb::InfluxdbRequest; +use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -36,6 +37,9 @@ impl InfluxdbLineProtocolHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::LineProtocol) .context(AuthSnafu)?; + let interceptor_ref = self.plugins.get::>(); + interceptor_ref.pre_execute(&request.lines, ctx.clone())?; + let requests = request.try_into()?; let _ = self .handle_row_inserts(requests, ctx) diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 22b2d3307d..7817bb69f2 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -23,6 +23,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; use servers::otlp::plugin::TraceParserRef; use servers::query_handler::OpenTelemetryProtocolHandler; @@ -45,6 +46,12 @@ impl OpenTelemetryProtocolHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::Otlp) .context(AuthSnafu)?; + + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_execute(ctx.clone())?; + let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?; let _ = self .handle_row_inserts(requests, ctx) @@ -73,6 +80,11 @@ impl OpenTelemetryProtocolHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::Otlp) .context(AuthSnafu)?; + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_execute(ctx.clone())?; + let (table_name, spans) = match self.plugins.get::() { Some(parser) => (parser.table_name(), parser.parse(request)), None => ( diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 88d350e4c5..5e21188294 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -29,6 +29,7 @@ use operator::statement::StatementExecutor; use prost::Message; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; +use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef}; use servers::prom_store::{self, Metrics}; use servers::query_handler::{ PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse, @@ -168,6 +169,10 @@ impl PromStoreProtocolHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) .context(AuthSnafu)?; + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_write(&request, ctx.clone())?; let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?; if with_metric_engine { @@ -202,6 +207,10 @@ impl PromStoreProtocolHandler for Instance { .as_ref() .check_permission(ctx.current_user(), PermissionReq::PromStoreRead) .context(AuthSnafu)?; + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_read(&request, ctx.clone())?; let response_type = negotiate_response_type(&request.accepted_response_types)?; diff --git a/src/frontend/src/instance/script.rs b/src/frontend/src/instance/script.rs index b8eac4b17c..6aacb8c51b 100644 --- a/src/frontend/src/instance/script.rs +++ b/src/frontend/src/instance/script.rs @@ -16,6 +16,8 @@ use std::collections::HashMap; use async_trait::async_trait; use common_query::Output; +use servers::error::Error; +use servers::interceptor::{ScriptInterceptor, ScriptInterceptorRef}; use servers::query_handler::ScriptHandler; use session::context::QueryContextRef; @@ -30,6 +32,9 @@ impl ScriptHandler for Instance { name: &str, script: &str, ) -> servers::error::Result<()> { + let interceptor_ref = self.plugins.get::>(); + interceptor_ref.pre_execute(name, query_ctx.clone())?; + let _timer = metrics::INSERT_SCRIPTS_ELAPSED.start_timer(); self.script_executor .insert_script(query_ctx, name, script) @@ -42,6 +47,9 @@ impl ScriptHandler for Instance { name: &str, params: HashMap, ) -> servers::error::Result { + let interceptor_ref = self.plugins.get::>(); + interceptor_ref.pre_execute(name, query_ctx.clone())?; + let _timer = metrics::EXECUTE_SCRIPT_ELAPSED.start_timer(); self.script_executor .execute_script(query_ctx, name, params) diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index 0612e9e51d..04bf76c142 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -15,6 +15,7 @@ use std::borrow::Cow; use std::sync::Arc; +use api::prom_store::remote::{ReadRequest, WriteRequest}; use api::v1::greptime_request::Request; use common_error::ext::ErrorExt; use common_query::Output; @@ -246,3 +247,121 @@ where } } } + +/// ScriptInterceptor can track life cycle of a script request and customize or +/// abort its execution at given point. +pub trait ScriptInterceptor { + type Error: ErrorExt; + + /// Called before script request is actually executed. + fn pre_execute(&self, _name: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> { + Ok(()) + } +} + +pub type ScriptInterceptorRef = Arc + Send + Sync + 'static>; + +impl ScriptInterceptor for Option> { + type Error = E; + + fn pre_execute(&self, name: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_execute(name, query_ctx) + } else { + Ok(()) + } + } +} + +/// LineProtocolInterceptor can track life cycle of a line protocol request +/// and customize or abort its execution at given point. +pub trait LineProtocolInterceptor { + type Error: ErrorExt; + + fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> { + Ok(()) + } +} + +pub type LineProtocolInterceptorRef = + Arc + Send + Sync + 'static>; + +impl LineProtocolInterceptor for Option> { + type Error = E; + + fn pre_execute(&self, line: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_execute(line, query_ctx) + } else { + Ok(()) + } + } +} + +/// OpenTelemetryProtocolInterceptor can track life cycle of an open telemetry protocol request +/// and customize or abort its execution at given point. +pub trait OpenTelemetryProtocolInterceptor { + type Error: ErrorExt; + + fn pre_execute(&self, _query_ctx: QueryContextRef) -> Result<(), Self::Error> { + Ok(()) + } +} + +pub type OpenTelemetryProtocolInterceptorRef = + Arc + Send + Sync + 'static>; + +impl OpenTelemetryProtocolInterceptor + for Option> +{ + type Error = E; + + fn pre_execute(&self, query_ctx: QueryContextRef) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_execute(query_ctx) + } else { + Ok(()) + } + } +} + +/// PromStoreProtocolInterceptor can track life cycle of a prom store request +/// and customize or abort its execution at given point. +pub trait PromStoreProtocolInterceptor { + type Error: ErrorExt; + + fn pre_write( + &self, + _write_req: &WriteRequest, + _ctx: QueryContextRef, + ) -> Result<(), Self::Error> { + Ok(()) + } + + fn pre_read(&self, _read_req: &ReadRequest, _ctx: QueryContextRef) -> Result<(), Self::Error> { + Ok(()) + } +} + +pub type PromStoreProtocolInterceptorRef = + Arc + Send + Sync + 'static>; + +impl PromStoreProtocolInterceptor for Option> { + type Error = E; + + fn pre_write(&self, write_req: &WriteRequest, ctx: QueryContextRef) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_write(write_req, ctx) + } else { + Ok(()) + } + } + + fn pre_read(&self, read_req: &ReadRequest, ctx: QueryContextRef) -> Result<(), Self::Error> { + if let Some(this) = self { + this.pre_read(read_req, ctx) + } else { + Ok(()) + } + } +}