From 52d627e37db26b9830024de9b005444207299436 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Thu, 19 Sep 2024 13:14:47 +0800 Subject: [PATCH] chore: add log ingest interceptor (#4734) * chore: add log ingest interceptor * chore: rename * chore: update interceptor signature --- src/frontend/src/instance/log_handler.rs | 11 ++++- src/frontend/src/server.rs | 8 ++- src/pipeline/src/etl/processor/dissect.rs | 8 +-- src/servers/src/http.rs | 18 +++---- src/servers/src/http/event.rs | 10 +++- src/servers/src/interceptor.rs | 59 +++++++++++++++++++++++ tests-integration/src/test_util.rs | 2 +- 7 files changed, 96 insertions(+), 20 deletions(-) diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 7edda5ccf1..441501b242 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -20,7 +20,10 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; -use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult}; +use servers::error::{ + AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult, +}; +use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::LogHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -40,6 +43,12 @@ impl LogHandler for Instance { .check_permission(ctx.current_user(), PermissionReq::LogWrite) .context(AuthSnafu)?; + let log = self + .plugins + .get::>() + .as_ref() + .pre_ingest(log, ctx.clone())?; + self.handle_log_inserts(log, ctx).await } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 9660b27620..115002c3ab 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -18,11 +18,13 @@ use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; use common_config::{Configurable, Mode}; +use servers::error::Error as ServerError; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig}; use servers::http::event::LogValidatorRef; use servers::http::{HttpServer, HttpServerBuilder}; +use servers::interceptor::LogIngestInterceptorRef; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::postgres::PostgresServer; @@ -81,8 +83,10 @@ where Some(self.instance.clone()), ); - builder = builder - .with_log_ingest_handler(self.instance.clone(), self.plugins.get::()); + let validator = self.plugins.get::(); + let ingest_interceptor = self.plugins.get::>(); + builder = + builder.with_log_ingest_handler(self.instance.clone(), validator, ingest_interceptor); if let Some(user_provider) = self.plugins.get::() { builder = builder.with_user_provider(user_provider); diff --git a/src/pipeline/src/etl/processor/dissect.rs b/src/pipeline/src/etl/processor/dissect.rs index dca88d3843..f9925916fc 100644 --- a/src/pipeline/src/etl/processor/dissect.rs +++ b/src/pipeline/src/etl/processor/dissect.rs @@ -15,7 +15,6 @@ use std::ops::Deref; use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; -use common_telemetry::warn; use itertools::Itertools; use crate::etl::field::{Fields, InputFieldInfo, OneInputMultiOutputField}; @@ -742,11 +741,8 @@ impl DissectProcessor { let chs = val.chars().collect::>(); for pattern in &self.patterns { - match self.process_pattern(&chs, pattern) { - Ok(map) => return Ok(map), - Err(e) => { - warn!("dissect processor: {}", e); - } + if let Ok(map) = self.process_pattern(&chs, pattern) { + return Ok(map); } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 956a650fcc..18388998e7 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -63,6 +63,7 @@ use crate::http::prometheus::{ build_info_query, format_query, instant_query, label_values_query, labels_query, range_query, series_query, }; +use crate::interceptor::LogIngestInterceptorRef; use crate::metrics::http_metrics_layer; use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; @@ -596,11 +597,16 @@ impl HttpServerBuilder { self, handler: LogHandlerRef, validator: Option, + ingest_interceptor: Option>, ) -> Self { Self { router: self.router.nest( &format!("/{HTTP_API_VERSION}/events"), - HttpServer::route_log(handler, validator), + HttpServer::route_log(LogState { + log_handler: handler, + log_validator: validator, + ingest_interceptor, + }), ), ..self } @@ -739,10 +745,7 @@ impl HttpServer { .with_state(metrics_handler) } - fn route_log( - log_handler: LogHandlerRef, - log_validator: Option, - ) -> Router { + fn route_log(log_state: LogState) -> Router { Router::new() .route("/logs", routing::post(event::log_ingester)) .route( @@ -759,10 +762,7 @@ impl HttpServer { .layer(HandleErrorLayer::new(handle_error)) .layer(RequestDecompressionLayer::new()), ) - .with_state(LogState { - log_handler, - log_validator, - }) + .with_state(log_state) } fn route_sql(api_state: ApiState) -> ApiRouter { diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index dbd7f1232a..f0a0902837 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -37,11 +37,13 @@ use session::context::{Channel, QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, UnsupportedContentTypeSnafu, + Error, InvalidParameterSnafu, ParseJsonSnafu, PipelineSnafu, Result, + UnsupportedContentTypeSnafu, }; use crate::http::greptime_manage_resp::GreptimedbManageResponse; use crate::http::greptime_result_v1::GreptimedbV1Response; use crate::http::HttpResponse; +use crate::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_INGESTION_COUNTER, METRIC_HTTP_LOGS_INGESTION_ELAPSED, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, @@ -378,6 +380,11 @@ pub async fn log_ingester( query_ctx.set_channel(Channel::Http); let query_ctx = Arc::new(query_ctx); + let value = log_state + .ingest_interceptor + .as_ref() + .pre_pipeline(value, query_ctx.clone())?; + ingest_logs_inner( handler, pipeline_name, @@ -506,6 +513,7 @@ pub type LogValidatorRef = Arc; pub struct LogState { pub log_handler: LogHandlerRef, pub log_validator: Option, + pub ingest_interceptor: Option>, } #[cfg(test)] diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index e4aceeb442..d3478a56ea 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -23,6 +23,7 @@ use common_error::ext::ErrorExt; use common_query::Output; use query::parser::PromQuery; use query::plan::LogicalPlan; +use serde_json::Value; use session::context::QueryContextRef; use sql::statements::statement::Statement; @@ -397,3 +398,61 @@ impl PromStoreProtocolInterceptor for Option, + _query_ctx: QueryContextRef, + ) -> Result, Self::Error> { + Ok(values) + } + + /// Called before insertion. + fn pre_ingest( + &self, + request: RowInsertRequests, + _query_ctx: QueryContextRef, + ) -> Result { + Ok(request) + } +} + +pub type LogIngestInterceptorRef = + Arc + Send + Sync + 'static>; + +impl LogIngestInterceptor for Option<&LogIngestInterceptorRef> +where + E: ErrorExt, +{ + type Error = E; + + fn pre_pipeline( + &self, + values: Vec, + query_ctx: QueryContextRef, + ) -> Result, Self::Error> { + if let Some(this) = self { + this.pre_pipeline(values, query_ctx) + } else { + Ok(values) + } + } + + fn pre_ingest( + &self, + request: RowInsertRequests, + query_ctx: QueryContextRef, + ) -> Result { + if let Some(this) = self { + this.pre_ingest(request, query_ctx) + } else { + Ok(request) + } + } +} diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index cf125a5776..a055527e2b 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -424,7 +424,7 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), Some(instance.instance.clone()), ) - .with_log_ingest_handler(instance.instance.clone(), None) + .with_log_ingest_handler(instance.instance.clone(), None, None) .with_otlp_handler(instance.instance.clone()) .with_greptime_config_options(instance.opts.to_toml().unwrap());