fix: complete interceptors for all frontend entry (#3428)

This commit is contained in:
shuiyisong
2024-03-05 17:38:47 +08:00
committed by GitHub
parent 6fd2ff49d5
commit 7b1c3503d0
5 changed files with 153 additions and 1 deletions

View File

@@ -15,8 +15,9 @@
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use servers::error::AuthSnafu;
use servers::error::{AuthSnafu, Error};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
@@ -36,6 +37,9 @@ impl InfluxdbLineProtocolHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::LineProtocol)
.context(AuthSnafu)?;
let interceptor_ref = self.plugins.get::<LineProtocolInterceptorRef<Error>>();
interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
let requests = request.try_into()?;
let _ = self
.handle_row_inserts(requests, ctx)

View File

@@ -23,6 +23,7 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::OpenTelemetryProtocolHandler;
@@ -45,6 +46,12 @@ impl OpenTelemetryProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let interceptor_ref = self
.plugins
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let _ = self
.handle_row_inserts(requests, ctx)
@@ -73,6 +80,11 @@ impl OpenTelemetryProtocolHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;
let interceptor_ref = self
.plugins
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (

View File

@@ -29,6 +29,7 @@ use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
use servers::prom_store::{self, Metrics};
use servers::query_handler::{
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
@@ -168,6 +169,10 @@ impl PromStoreProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;
let interceptor_ref = self
.plugins
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
if with_metric_engine {
@@ -202,6 +207,10 @@ impl PromStoreProtocolHandler for Instance {
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreRead)
.context(AuthSnafu)?;
let interceptor_ref = self
.plugins
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_read(&request, ctx.clone())?;
let response_type = negotiate_response_type(&request.accepted_response_types)?;

View File

@@ -16,6 +16,8 @@ use std::collections::HashMap;
use async_trait::async_trait;
use common_query::Output;
use servers::error::Error;
use servers::interceptor::{ScriptInterceptor, ScriptInterceptorRef};
use servers::query_handler::ScriptHandler;
use session::context::QueryContextRef;
@@ -30,6 +32,9 @@ impl ScriptHandler for Instance {
name: &str,
script: &str,
) -> servers::error::Result<()> {
let interceptor_ref = self.plugins.get::<ScriptInterceptorRef<Error>>();
interceptor_ref.pre_execute(name, query_ctx.clone())?;
let _timer = metrics::INSERT_SCRIPTS_ELAPSED.start_timer();
self.script_executor
.insert_script(query_ctx, name, script)
@@ -42,6 +47,9 @@ impl ScriptHandler for Instance {
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let interceptor_ref = self.plugins.get::<ScriptInterceptorRef<Error>>();
interceptor_ref.pre_execute(name, query_ctx.clone())?;
let _timer = metrics::EXECUTE_SCRIPT_ELAPSED.start_timer();
self.script_executor
.execute_script(query_ctx, name, params)

View File

@@ -15,6 +15,7 @@
use std::borrow::Cow;
use std::sync::Arc;
use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::v1::greptime_request::Request;
use common_error::ext::ErrorExt;
use common_query::Output;
@@ -246,3 +247,121 @@ where
}
}
}
/// ScriptInterceptor can track life cycle of a script request and customize or
/// abort its execution at given point.
pub trait ScriptInterceptor {
type Error: ErrorExt;
/// Called before script request is actually executed.
fn pre_execute(&self, _name: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
Ok(())
}
}
pub type ScriptInterceptorRef<E> = Arc<dyn ScriptInterceptor<Error = E> + Send + Sync + 'static>;
impl<E: ErrorExt> ScriptInterceptor for Option<ScriptInterceptorRef<E>> {
type Error = E;
fn pre_execute(&self, name: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(name, query_ctx)
} else {
Ok(())
}
}
}
/// LineProtocolInterceptor can track life cycle of a line protocol request
/// and customize or abort its execution at given point.
pub trait LineProtocolInterceptor {
type Error: ErrorExt;
fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
Ok(())
}
}
pub type LineProtocolInterceptorRef<E> =
Arc<dyn LineProtocolInterceptor<Error = E> + Send + Sync + 'static>;
impl<E: ErrorExt> LineProtocolInterceptor for Option<LineProtocolInterceptorRef<E>> {
type Error = E;
fn pre_execute(&self, line: &str, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(line, query_ctx)
} else {
Ok(())
}
}
}
/// OpenTelemetryProtocolInterceptor can track life cycle of an open telemetry protocol request
/// and customize or abort its execution at given point.
pub trait OpenTelemetryProtocolInterceptor {
type Error: ErrorExt;
fn pre_execute(&self, _query_ctx: QueryContextRef) -> Result<(), Self::Error> {
Ok(())
}
}
pub type OpenTelemetryProtocolInterceptorRef<E> =
Arc<dyn OpenTelemetryProtocolInterceptor<Error = E> + Send + Sync + 'static>;
impl<E: ErrorExt> OpenTelemetryProtocolInterceptor
for Option<OpenTelemetryProtocolInterceptorRef<E>>
{
type Error = E;
fn pre_execute(&self, query_ctx: QueryContextRef) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_execute(query_ctx)
} else {
Ok(())
}
}
}
/// PromStoreProtocolInterceptor can track life cycle of a prom store request
/// and customize or abort its execution at given point.
pub trait PromStoreProtocolInterceptor {
type Error: ErrorExt;
fn pre_write(
&self,
_write_req: &WriteRequest,
_ctx: QueryContextRef,
) -> Result<(), Self::Error> {
Ok(())
}
fn pre_read(&self, _read_req: &ReadRequest, _ctx: QueryContextRef) -> Result<(), Self::Error> {
Ok(())
}
}
pub type PromStoreProtocolInterceptorRef<E> =
Arc<dyn PromStoreProtocolInterceptor<Error = E> + Send + Sync + 'static>;
impl<E: ErrorExt> PromStoreProtocolInterceptor for Option<PromStoreProtocolInterceptorRef<E>> {
type Error = E;
fn pre_write(&self, write_req: &WriteRequest, ctx: QueryContextRef) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_write(write_req, ctx)
} else {
Ok(())
}
}
fn pre_read(&self, read_req: &ReadRequest, ctx: QueryContextRef) -> Result<(), Self::Error> {
if let Some(this) = self {
this.pre_read(read_req, ctx)
} else {
Ok(())
}
}
}