diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 3081f4b75f..6b0e835bbc 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -337,6 +337,7 @@ impl StartCommand { let client = NodeClients::new(channel_config); let mut instance = FrontendBuilder::new( + opts.clone(), cached_meta_backend.clone(), layered_cache_registry.clone(), catalog_manager, @@ -350,12 +351,12 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins) + let servers = Services::new(opts, Arc::new(instance.clone()), plugins) .build() .await .context(StartFrontendSnafu)?; instance - .build_servers(opts, servers) + .build_servers(servers) .context(StartFrontendSnafu)?; Ok(Instance::new(instance, guard)) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 75be49a623..5544abc83a 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -510,6 +510,7 @@ impl StartCommand { .await?; let mut frontend = FrontendBuilder::new( + fe_opts.clone(), kv_backend, layered_cache_registry, catalog_manager, @@ -529,12 +530,12 @@ impl StartCommand { // TODO(discord9): unify with adding `start` and `shutdown` method to flownode too. let _handle = flow_worker_manager.run_background(); - let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) + let servers = Services::new(fe_opts, Arc::new(frontend.clone()), fe_plugins) .build() .await .context(StartFrontendSnafu)?; frontend - .build_servers(fe_opts, servers) + .build_servers(servers) .context(StartFrontendSnafu)?; Ok(Instance { diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ecc04af789..9a909a54c3 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -31,7 +31,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::CatalogManagerRef; use client::OutputData; use common_base::Plugins; -use common_config::{Configurable, KvBackendConfig}; +use common_config::KvBackendConfig; use common_error::ext::{BoxedError, ErrorExt}; use common_frontend::handler::FrontendInvoker; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -114,6 +114,7 @@ pub type FrontendInstanceRef = Arc; #[derive(Clone)] pub struct Instance { + options: FrontendOptions, catalog_manager: CatalogManagerRef, script_executor: Arc, pipeline_operator: Arc, @@ -189,14 +190,9 @@ impl Instance { Ok((kv_backend, procedure_manager)) } - pub fn build_servers( - &mut self, - opts: impl Into + Configurable, - servers: ServerHandlers, - ) -> Result<()> { - let opts: FrontendOptions = opts.into(); + pub fn build_servers(&mut self, servers: ServerHandlers) -> Result<()> { self.export_metrics_task = - ExportMetricsTask::try_new(&opts.export_metrics, Some(&self.plugins)) + ExportMetricsTask::try_new(&self.options.export_metrics, Some(&self.plugins)) .context(StartServerSnafu)?; self.servers = servers; diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index ae8d77dd20..49ffd67e31 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -36,6 +36,7 @@ use servers::server::ServerHandlers; use snafu::OptionExt; use crate::error::{self, Result}; +use crate::frontend::FrontendOptions; use crate::heartbeat::HeartbeatTask; use crate::instance::region_query::FrontendRegionQueryHandler; use crate::instance::Instance; @@ -43,6 +44,7 @@ use crate::script::ScriptExecutor; /// The frontend [`Instance`] builder. pub struct FrontendBuilder { + options: FrontendOptions, kv_backend: KvBackendRef, layered_cache_registry: LayeredCacheRegistryRef, local_cache_invalidator: Option, @@ -55,6 +57,7 @@ pub struct FrontendBuilder { impl FrontendBuilder { pub fn new( + options: FrontendOptions, kv_backend: KvBackendRef, layered_cache_registry: LayeredCacheRegistryRef, catalog_manager: CatalogManagerRef, @@ -62,6 +65,7 @@ impl FrontendBuilder { procedure_executor: ProcedureExecutorRef, ) -> Self { Self { + options, kv_backend, layered_cache_registry, local_cache_invalidator: None, @@ -183,6 +187,7 @@ impl FrontendBuilder { plugins.insert::(statement_executor.clone()); Ok(Instance { + options: self.options, catalog_manager: self.catalog_manager, script_executor, pipeline_operator, diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 23eecd11e7..7985bab9f4 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -42,6 +42,11 @@ impl InfluxdbLineProtocolHandler for Instance { interceptor_ref.pre_execute(&request.lines, ctx.clone())?; let requests = request.try_into()?; + + let requests = interceptor_ref + .post_lines_conversion(requests, ctx.clone()) + .await?; + self.handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 76b332bf38..ca55abb807 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -114,9 +114,8 @@ fn unwrap_or_default_precision(precision: Option) -> Precision { #[cfg(test)] mod tests { use api::v1::value::ValueData; - use api::v1::{ColumnDataType, Rows, SemanticType}; + use api::v1::{ColumnDataType, RowInsertRequests, Rows, SemanticType}; - use super::*; use crate::influxdb::InfluxdbRequest; #[test] diff --git a/src/servers/src/interceptor.rs b/src/servers/src/interceptor.rs index 04bf76c142..941a424be4 100644 --- a/src/servers/src/interceptor.rs +++ b/src/servers/src/interceptor.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use api::prom_store::remote::{ReadRequest, WriteRequest}; use api::v1::greptime_request::Request; +use api::v1::RowInsertRequests; +use async_trait::async_trait; use common_error::ext::ErrorExt; use common_query::Output; use query::parser::PromQuery; @@ -275,17 +277,31 @@ impl ScriptInterceptor for Option> { /// LineProtocolInterceptor can track life cycle of a line protocol request /// and customize or abort its execution at given point. +#[async_trait] pub trait LineProtocolInterceptor { type Error: ErrorExt; fn pre_execute(&self, _line: &str, _query_ctx: QueryContextRef) -> Result<(), Self::Error> { Ok(()) } + + /// Called after the lines are converted to the [RowInsertRequests]. + /// We can then modify the resulting requests if needed. + /// Typically used in some backward compatibility situation. + async fn post_lines_conversion( + &self, + requests: RowInsertRequests, + query_context: QueryContextRef, + ) -> Result { + let _ = query_context; + Ok(requests) + } } pub type LineProtocolInterceptorRef = Arc + Send + Sync + 'static>; +#[async_trait] impl LineProtocolInterceptor for Option> { type Error = E; @@ -296,6 +312,18 @@ impl LineProtocolInterceptor for Option Result { + if let Some(this) = self { + this.post_lines_conversion(requests, query_context).await + } else { + Ok(requests) + } + } } /// OpenTelemetryProtocolInterceptor can track life cycle of an open telemetry protocol request diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 7feb89458a..542a2484b4 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -333,6 +333,7 @@ fn write_ts_to( ValueData::TimestampNanosecondValue(ts), ), }; + let index = column_indexes.get(&name); if let Some(index) = index { check_schema(datatype, SemanticType::Timestamp, &schema[*index])?; diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index eb5dc2d5cd..adc914e56d 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -376,14 +376,16 @@ impl GreptimeDbClusterBuilder { Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())), ]); + let options = FrontendOptions::default(); let heartbeat_task = HeartbeatTask::new( - &FrontendOptions::default(), + &options, meta_client.clone(), HeartbeatOptions::default(), Arc::new(handlers_executor), ); let instance = FrontendBuilder::new( + options, cached_meta_backend.clone(), cache_registry.clone(), catalog_manager, diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index 09bdf5328a..9db64aec8f 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -18,52 +18,19 @@ mod test { use client::OutputData; use common_recordbatch::RecordBatches; - use frontend::instance::Instance; + use rstest::rstest; + use rstest_reuse::apply; use servers::influxdb::InfluxdbRequest; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::InfluxdbLineProtocolHandler; use session::context::QueryContext; - use crate::standalone::GreptimeDbStandaloneBuilder; - use crate::tests; + use crate::tests::test_util::{both_instances_cases, distributed, standalone, MockInstance}; - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_put_influxdb_lines() { - let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_put_influxdb_lines") - .build() - .await; - let instance = &standalone.instance; + #[apply(both_instances_cases)] + async fn test_put_influxdb_lines_without_time_column(instance: Arc) { + let instance = instance.frontend(); - test_put_influxdb_lines(instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_put_influxdb_lines() { - let instance = - tests::create_distributed_instance("test_distributed_put_influxdb_lines").await; - test_put_influxdb_lines(&instance.frontend()).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_standalone_put_influxdb_lines_without_time_column() { - let standalone = GreptimeDbStandaloneBuilder::new( - "test_standalone_put_influxdb_lines_without_time_column", - ) - .build() - .await; - test_put_influxdb_lines_without_time_column(&standalone.instance).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_distributed_put_influxdb_lines_without_time_column() { - let instance = tests::create_distributed_instance( - "test_distributed_put_influxdb_lines_without_time_column", - ) - .await; - test_put_influxdb_lines_without_time_column(&instance.frontend()).await; - } - - async fn test_put_influxdb_lines_without_time_column(instance: &Arc) { let lines = r" monitor1,host=host1 cpu=66.6,memory=1024 monitor1,host=host2 memory=1027"; @@ -92,7 +59,10 @@ monitor1,host=host2 memory=1027"; assert_eq!(total, 2); } - async fn test_put_influxdb_lines(instance: &Arc) { + #[apply(both_instances_cases)] + async fn test_put_influxdb_lines(instance: Arc) { + let instance = instance.frontend(); + let lines = r" monitor1,host=host1 cpu=66.6,memory=1024 1663840496100023100 monitor1,host=host2 memory=1027 1663840496400340001"; diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 77a31fb75f..d34604318a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -207,6 +207,7 @@ impl GreptimeDbStandaloneBuilder { ); let instance = FrontendBuilder::new( + opts.frontend_options(), kv_backend.clone(), cache_registry, catalog_manager, diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 65f91e7f5e..b950b602bf 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -15,7 +15,7 @@ mod instance_kafka_wal_test; mod instance_test; mod promql_test; -mod test_util; +pub(crate) mod test_util; use std::collections::HashMap; use std::sync::Arc; diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index c052b668d8..7bb29ce331 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -15,6 +15,7 @@ use std::env; use std::sync::Arc; +use async_trait::async_trait; use client::OutputData; use common_query::Output; use common_recordbatch::util; @@ -36,7 +37,8 @@ pub(crate) trait RebuildableMockInstance: MockInstance { async fn rebuild(&mut self); } -pub(crate) trait MockInstance: Sync + Send { +#[async_trait] +pub trait MockInstance: Sync + Send { fn frontend(&self) -> Arc; fn is_distributed_mode(&self) -> bool;