From f81e37f508db98b7ade460f323dbc1640e0520af Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Wed, 24 Jan 2024 11:45:08 +0800 Subject: [PATCH] refactor: make http server built flexibly (#3225) * refactor: make http server built flexibly * Apply suggestions from code review Co-authored-by: JeremyHi * fix: resolve PR comments * Fix CI. --------- Co-authored-by: JeremyHi --- Cargo.lock | 3 - Cargo.toml | 1 + src/cmd/src/frontend.rs | 10 +- src/cmd/src/lib.rs | 2 + src/cmd/src/standalone.rs | 6 +- src/datanode/Cargo.toml | 3 - src/frontend/src/instance.rs | 8 +- src/frontend/src/instance/builder.rs | 4 +- src/frontend/src/server.rs | 189 ++++++++------ src/mito2/src/lib.rs | 2 +- src/servers/Cargo.toml | 2 +- src/servers/src/grpc/builder.rs | 16 +- src/servers/src/http.rs | 304 ++++++++++------------ src/servers/tests/http/http_test.rs | 12 +- src/servers/tests/http/influxdb_test.rs | 3 +- src/servers/tests/http/opentsdb_test.rs | 3 +- src/servers/tests/http/prom_store_test.rs | 5 +- tests-integration/Cargo.toml | 2 +- tests-integration/src/test_util.rs | 30 +-- 19 files changed, 315 insertions(+), 290 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9e001227d..42bbadab06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2659,9 +2659,6 @@ dependencies = [ "async-compat", "async-stream", "async-trait", - "axum", - "axum-macros", - "axum-test-helper", "bytes", "catalog", "client", diff --git a/Cargo.toml b/Cargo.toml index dfa339b4b8..617c66bb65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,6 +73,7 @@ arrow-flight = "47.0" arrow-schema = { version = "47.0", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" +axum = { version = "0.6", features = ["headers"] } base64 = "0.21" bigdecimal = "0.4.2" bitflags = "2.4.1" diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 94718630c6..cf710c8c05 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -46,6 +46,10 @@ impl Instance { fn new(frontend: FeInstance) -> Self { Self { frontend } } + + pub fn mut_inner(&mut self) -> &mut FeInstance { + &mut self.frontend + } } #[async_trait] @@ -253,13 +257,11 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let servers = Services::new(plugins) - .build(opts.clone(), Arc::new(instance.clone())) - .await + let servers = Services::new(opts.clone(), Arc::new(instance.clone()), plugins) + .build() .context(StartFrontendSnafu)?; instance .build_servers(opts, servers) - .await .context(StartFrontendSnafu)?; Ok(Instance::new(instance)) diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 08bc2e66f3..349fd910bd 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -48,6 +48,8 @@ pub trait App { pub async fn start_app(mut app: Box) -> error::Result<()> { let name = app.name().to_string(); + app.pre_start()?; + tokio::select! { result = app.start() => { if let Err(err) = result { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index f9a49e75bd..40611a11d3 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -437,13 +437,11 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let servers = Services::new(fe_plugins) - .build(opts.clone(), Arc::new(frontend.clone())) - .await + let servers = Services::new(opts.clone(), Arc::new(frontend.clone()), fe_plugins) + .build() .context(StartFrontendSnafu)?; frontend .build_servers(opts, servers) - .await .context(StartFrontendSnafu)?; Ok(Instance { diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b06f5d8d36..64c8d03064 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -13,8 +13,6 @@ arrow-flight.workspace = true async-compat = "0.2" async-stream.workspace = true async-trait.workspace = true -axum = "0.6" -axum-macros = "0.3" bytes.workspace = true catalog.workspace = true client.workspace = true @@ -77,7 +75,6 @@ url = "2.3.1" uuid.workspace = true [dev-dependencies] -axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } client.workspace = true common-meta = { workspace = true, features = ["testing"] } common-query.workspace = true diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index ae22121100..e8ae74c7b4 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -32,6 +32,7 @@ use common_base::Plugins; use common_config::KvBackendConfig; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_meta::state_store::KvStateStore; use common_procedure::local::{LocalManager, ManagerConfig}; @@ -119,6 +120,7 @@ pub struct Instance { inserter: InserterRef, deleter: DeleterRef, export_metrics_task: Option, + table_metadata_manager: TableMetadataManagerRef, } impl Instance { @@ -186,7 +188,7 @@ impl Instance { Ok((kv_backend, procedure_manager)) } - pub async fn build_servers( + pub fn build_servers( &mut self, opts: impl Into + TomlSerializable, servers: ServerHandlers, @@ -219,6 +221,10 @@ impl Instance { pub fn statement_executor(&self) -> Arc { self.statement_executor.clone() } + + pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { + &self.table_metadata_manager + } } #[async_trait] diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 15711f9a7b..8e891f41fa 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -20,6 +20,7 @@ use common_base::Plugins; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::DdlTaskExecutorRef; +use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use operator::delete::Deleter; use operator::insert::Inserter; @@ -127,7 +128,7 @@ impl FrontendBuilder { catalog_manager.clone(), query_engine.clone(), self.ddl_task_executor, - kv_backend, + kv_backend.clone(), catalog_manager.clone(), inserter.clone(), )); @@ -145,6 +146,7 @@ impl FrontendBuilder { inserter, deleter, export_metrics_task: None, + table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), }) } } diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 5653905732..245e92cb85 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -21,8 +21,8 @@ use common_runtime::Builder as RuntimeBuilder; use servers::error::InternalIoSnafu; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; -use servers::grpc::GrpcServerConfig; -use servers::http::HttpServerBuilder; +use servers::grpc::{GrpcServer, GrpcServerConfig}; +use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; use servers::opentsdb::OpentsdbServer; @@ -37,16 +37,34 @@ use crate::frontend::{FrontendOptions, TomlSerializable}; use crate::instance::FrontendInstance; use crate::service_config::GrpcOptions; -pub struct Services { +pub struct Services +where + T: Into + TomlSerializable + Clone, + U: FrontendInstance, +{ + opts: T, + instance: Arc, + grpc_server_builder: Option, + http_server_builder: Option, plugins: Plugins, } -impl Services { - pub fn new(plugins: Plugins) -> Self { - Self { plugins } +impl Services +where + T: Into + TomlSerializable + Clone, + U: FrontendInstance, +{ + pub fn new(opts: T, instance: Arc, plugins: Plugins) -> Self { + Self { + opts, + instance, + grpc_server_builder: None, + http_server_builder: None, + plugins, + } } - pub fn grpc_server_builder(opts: &GrpcOptions) -> Result { + pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result { let grpc_runtime = Arc::new( RuntimeBuilder::default() .worker_threads(opts.runtime_size) @@ -63,26 +81,93 @@ impl Services { Ok(GrpcServerBuilder::new(grpc_config, grpc_runtime)) } - pub async fn build(&self, opts: T, instance: Arc) -> Result - where - T: Into + TomlSerializable + Clone, - U: FrontendInstance, - { - let grpc_options = &opts.clone().into().grpc; - let builder = Self::grpc_server_builder(grpc_options)?; - self.build_with(opts, instance, builder).await + pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder { + let mut builder = HttpServerBuilder::new(opts.http.clone()).with_sql_handler( + ServerSqlQueryHandlerAdapter::arc(self.instance.clone()), + Some(self.instance.clone()), + ); + + if let Some(user_provider) = self.plugins.get::() { + builder = builder.with_user_provider(user_provider); + } + + if opts.opentsdb.enable { + builder = builder.with_opentsdb_handler(self.instance.clone()); + } + + if opts.influxdb.enable { + builder = builder.with_influxdb_handler(self.instance.clone()); + } + + if opts.prom_store.enable { + builder = builder + .with_prom_handler(self.instance.clone(), opts.prom_store.with_metric_engine) + .with_prometheus_handler(self.instance.clone()); + } + + if opts.otlp.enable { + builder = builder.with_otlp_handler(self.instance.clone()); + } + builder } - pub async fn build_with( - &self, - opts: T, - instance: Arc, - builder: GrpcServerBuilder, - ) -> Result - where - T: Into + TomlSerializable, - U: FrontendInstance, - { + pub fn with_grpc_server_builder(self, builder: GrpcServerBuilder) -> Self { + Self { + grpc_server_builder: Some(builder), + ..self + } + } + + pub fn with_http_server_builder(self, builder: HttpServerBuilder) -> Self { + Self { + http_server_builder: Some(builder), + ..self + } + } + + fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result { + let builder = if let Some(builder) = self.grpc_server_builder.take() { + builder + } else { + self.grpc_server_builder(&opts.grpc)? + }; + + let user_provider = self.plugins.get::(); + + let greptime_request_handler = GreptimeRequestHandler::new( + ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), + user_provider.clone(), + builder.runtime().clone(), + ); + + let grpc_server = builder + .database_handler(greptime_request_handler.clone()) + .prometheus_handler(self.instance.clone(), user_provider.clone()) + .otlp_handler(self.instance.clone(), user_provider) + .flight_handler(Arc::new(greptime_request_handler)) + .build(); + Ok(grpc_server) + } + + fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result { + let builder = if let Some(builder) = self.http_server_builder.take() { + builder + } else { + self.http_server_builder(opts) + }; + + let http_server = builder + .with_metrics_handler(MetricsHandler) + .with_plugins(self.plugins.clone()) + .with_greptime_config_options(toml) + .build(); + Ok(http_server) + } + + pub fn build(mut self) -> Result { + let opts = self.opts.clone(); + let instance = self.instance.clone(); + let toml = opts.to_toml()?; let opts: FrontendOptions = opts.into(); @@ -92,21 +177,8 @@ impl Services { { // Always init GRPC server - let opts = &opts.grpc; - let grpc_addr = parse_addr(&opts.addr)?; - - let greptime_request_handler = GreptimeRequestHandler::new( - ServerGrpcQueryHandlerAdapter::arc(instance.clone()), - user_provider.clone(), - builder.runtime().clone(), - ); - let grpc_server = builder - .database_handler(greptime_request_handler.clone()) - .prometheus_handler(instance.clone(), user_provider.clone()) - .otlp_handler(instance.clone(), user_provider.clone()) - .flight_handler(Arc::new(greptime_request_handler)) - .build(); - + let grpc_addr = parse_addr(&opts.grpc.addr)?; + let grpc_server = self.build_grpc_server(&opts)?; result.push((Box::new(grpc_server), grpc_addr)); } @@ -114,42 +186,7 @@ impl Services { // Always init HTTP server let http_options = &opts.http; let http_addr = parse_addr(&http_options.addr)?; - - let mut http_server_builder = HttpServerBuilder::new(http_options.clone()); - let _ = http_server_builder - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone())); - - if let Some(user_provider) = user_provider.clone() { - let _ = http_server_builder.with_user_provider(user_provider); - } - - if opts.opentsdb.enable { - let _ = http_server_builder.with_opentsdb_handler(instance.clone()); - } - - if opts.influxdb.enable { - let _ = http_server_builder.with_influxdb_handler(instance.clone()); - } - - if opts.prom_store.enable { - let _ = http_server_builder - .with_prom_handler(instance.clone()) - .with_prometheus_handler(instance.clone()); - http_server_builder - .set_prom_store_with_metric_engine(opts.prom_store.with_metric_engine); - } - - if opts.otlp.enable { - let _ = http_server_builder.with_otlp_handler(instance.clone()); - } - - let http_server = http_server_builder - .with_metrics_handler(MetricsHandler) - .with_script_handler(instance.clone()) - .with_plugins(self.plugins.clone()) - .with_greptime_config_options(toml) - .build(); + let http_server = self.build_http_server(&opts, toml)?; result.push((Box::new(http_server), http_addr)); } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 10b275c481..be785e81c3 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -37,7 +37,7 @@ pub mod read; pub mod region; mod region_write_ctx; pub mod request; -mod row_converter; +pub mod row_converter; pub(crate) mod schedule; pub mod sst; pub mod wal; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 2ee22a32f0..b7da8935f1 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -16,8 +16,8 @@ api.workspace = true arrow-flight.workspace = true async-trait = "0.1" auth.workspace = true -axum = { version = "0.6", features = ["headers"] } axum-macros = "0.3.8" +axum.workspace = true base64.workspace = true bytes.workspace = true catalog.workspace = true diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index 7a5baa40ea..e0e8e81374 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -37,12 +37,18 @@ use crate::grpc::prom_query_gateway::PrometheusGatewayService; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::OpenTelemetryProtocolHandlerRef; +/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]). +/// This macro will automatically add some gRPC properties to the service. +#[macro_export] macro_rules! add_service { ($builder: ident, $service: expr) => { - $builder.routes_builder.add_service( + let max_recv_message_size = $builder.config().max_recv_message_size; + let max_send_message_size = $builder.config().max_send_message_size; + + $builder.routes_builder_mut().add_service( $service - .max_decoding_message_size($builder.config.max_recv_message_size) - .max_encoding_message_size($builder.config.max_send_message_size), + .max_decoding_message_size(max_recv_message_size) + .max_encoding_message_size(max_send_message_size), ) }; } @@ -62,6 +68,10 @@ impl GrpcServerBuilder { } } + pub fn config(&self) -> &GrpcServerConfig { + &self.config + } + pub fn runtime(&self) -> &Arc { &self.runtime } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index e114425e52..d84b991fa0 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -14,6 +14,7 @@ use std::fmt::Display; use std::net::SocketAddr; +use std::sync::Mutex as StdMutex; use std::time::{Duration, Instant}; use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse}; @@ -62,7 +63,6 @@ use crate::metrics::{ }; use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; use crate::query_handler::{ InfluxdbLineProtocolHandlerRef, OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, @@ -99,26 +99,15 @@ pub static PUBLIC_APIS: [&str; 2] = ["/v1/influxdb/ping", "/v1/influxdb/health"] #[derive(Default)] pub struct HttpServer { - // server handlers - sql_handler: Option, - grpc_handler: Option, - influxdb_handler: Option, - opentsdb_handler: Option, - prom_handler: Option, - prometheus_handler: Option, - otlp_handler: Option, - script_handler: Option, + router: StdMutex, shutdown_tx: Mutex>>, user_provider: Option, - metrics_handler: Option, // plugins plugins: Plugins, // server configs options: HttpOptions, - greptime_config_options: Option, - prom_store_with_metric_engine: bool, } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -406,105 +395,16 @@ pub struct GreptimeOptionsConfigState { #[derive(Default)] pub struct HttpServerBuilder { - inner: HttpServer, + options: HttpOptions, + plugins: Plugins, + user_provider: Option, + api: OpenApi, + router: Router, } impl HttpServerBuilder { pub fn new(options: HttpOptions) -> Self { - Self { - inner: HttpServer { - sql_handler: None, - grpc_handler: None, - options, - opentsdb_handler: None, - influxdb_handler: None, - prom_handler: None, - prometheus_handler: None, - otlp_handler: None, - user_provider: None, - script_handler: None, - metrics_handler: None, - shutdown_tx: Mutex::new(None), - greptime_config_options: None, - plugins: Default::default(), - prom_store_with_metric_engine: false, - }, - } - } - - pub fn with_sql_handler(&mut self, handler: ServerSqlQueryHandlerRef) -> &mut Self { - let _ = self.inner.sql_handler.get_or_insert(handler); - self - } - - pub fn with_grpc_handler(&mut self, handler: ServerGrpcQueryHandlerRef) -> &mut Self { - let _ = self.inner.grpc_handler.get_or_insert(handler); - self - } - - pub fn with_opentsdb_handler(&mut self, handler: OpentsdbProtocolHandlerRef) -> &mut Self { - let _ = self.inner.opentsdb_handler.get_or_insert(handler); - self - } - - pub fn with_script_handler(&mut self, handler: ScriptHandlerRef) -> &mut Self { - let _ = self.inner.script_handler.get_or_insert(handler); - self - } - - pub fn with_influxdb_handler(&mut self, handler: InfluxdbLineProtocolHandlerRef) -> &mut Self { - let _ = self.inner.influxdb_handler.get_or_insert(handler); - self - } - - pub fn with_prom_handler(&mut self, handler: PromStoreProtocolHandlerRef) -> &mut Self { - let _ = self.inner.prom_handler.get_or_insert(handler); - self - } - - pub fn with_prometheus_handler(&mut self, handler: PrometheusHandlerRef) -> &mut Self { - let _ = self.inner.prometheus_handler.get_or_insert(handler); - self - } - - pub fn with_otlp_handler(&mut self, handler: OpenTelemetryProtocolHandlerRef) -> &mut Self { - let _ = self.inner.otlp_handler.get_or_insert(handler); - self - } - - pub fn with_user_provider(&mut self, user_provider: UserProviderRef) -> &mut Self { - let _ = self.inner.user_provider.get_or_insert(user_provider); - self - } - - pub fn with_metrics_handler(&mut self, handler: MetricsHandler) -> &mut Self { - let _ = self.inner.metrics_handler.get_or_insert(handler); - self - } - - pub fn with_plugins(&mut self, plugins: Plugins) -> &mut Self { - self.inner.plugins = plugins; - self - } - - pub fn with_greptime_config_options(&mut self, opts: String) -> &mut Self { - self.inner.greptime_config_options = Some(opts); - self - } - - pub fn set_prom_store_with_metric_engine(&mut self, with_metric_engine: bool) -> &mut Self { - self.inner.prom_store_with_metric_engine = with_metric_engine; - self - } - - pub fn build(&mut self) -> HttpServer { - std::mem::take(self).inner - } -} - -impl HttpServer { - pub fn make_app(&self) -> Router { - let mut api = OpenApi { + let api = OpenApi { info: Info { title: "GreptimeDB HTTP API".to_string(), description: Some("HTTP APIs to interact with GreptimeDB".to_string()), @@ -517,72 +417,149 @@ impl HttpServer { }], ..OpenApi::default() }; - - let mut router = Router::new(); - - if let Some(sql_handler) = self.sql_handler.clone() { - let sql_router = self - .route_sql(ApiState { - sql_handler, - script_handler: self.script_handler.clone(), - }) - .finish_api(&mut api) - .layer(Extension(api.clone())); - router = router.nest(&format!("/{HTTP_API_VERSION}"), sql_router); + Self { + options, + plugins: Plugins::default(), + user_provider: None, + api, + router: Router::new(), } + } - if let Some(opentsdb_handler) = self.opentsdb_handler.clone() { - router = router.nest( + pub fn with_sql_handler( + mut self, + sql_handler: ServerSqlQueryHandlerRef, + script_handler: Option, + ) -> Self { + let sql_router = HttpServer::route_sql(ApiState { + sql_handler, + script_handler, + }) + .finish_api(&mut self.api) + .layer(Extension(self.api.clone())); + + Self { + router: self + .router + .nest(&format!("/{HTTP_API_VERSION}"), sql_router), + ..self + } + } + + pub fn with_opentsdb_handler(self, handler: OpentsdbProtocolHandlerRef) -> Self { + Self { + router: self.router.nest( &format!("/{HTTP_API_VERSION}/opentsdb"), - self.route_opentsdb(opentsdb_handler), - ); + HttpServer::route_opentsdb(handler), + ), + ..self } + } - if let Some(influxdb_handler) = self.influxdb_handler.clone() { - router = router.nest( + pub fn with_influxdb_handler(self, handler: InfluxdbLineProtocolHandlerRef) -> Self { + Self { + router: self.router.nest( &format!("/{HTTP_API_VERSION}/influxdb"), - self.route_influxdb(influxdb_handler), - ); + HttpServer::route_influxdb(handler), + ), + ..self } + } - if let Some(prom_handler) = self.prom_handler.clone() { - router = router.nest( + pub fn with_prom_handler( + self, + handler: PromStoreProtocolHandlerRef, + prom_store_with_metric_engine: bool, + ) -> Self { + Self { + router: self.router.nest( &format!("/{HTTP_API_VERSION}/prometheus"), - self.route_prom(prom_handler), - ); + HttpServer::route_prom(handler, prom_store_with_metric_engine), + ), + ..self } + } - if let Some(prometheus_handler) = self.prometheus_handler.clone() { - router = router.nest( + pub fn with_prometheus_handler(self, handler: PrometheusHandlerRef) -> Self { + Self { + router: self.router.nest( &format!("/{HTTP_API_VERSION}/prometheus/api/v1"), - self.route_prometheus(prometheus_handler), - ); + HttpServer::route_prometheus(handler), + ), + ..self } + } - if let Some(otlp_handler) = self.otlp_handler.clone() { - router = router.nest( + pub fn with_otlp_handler(self, handler: OpenTelemetryProtocolHandlerRef) -> Self { + Self { + router: self.router.nest( &format!("/{HTTP_API_VERSION}/otlp"), - self.route_otlp(otlp_handler), - ); + HttpServer::route_otlp(handler), + ), + ..self } + } - if let Some(metrics_handler) = self.metrics_handler { - router = router.nest("", self.route_metrics(metrics_handler)); + pub fn with_user_provider(self, user_provider: UserProviderRef) -> Self { + Self { + user_provider: Some(user_provider), + ..self } + } + + pub fn with_metrics_handler(self, handler: MetricsHandler) -> Self { + Self { + router: self.router.nest("", HttpServer::route_metrics(handler)), + ..self + } + } + + pub fn with_plugins(self, plugins: Plugins) -> Self { + Self { plugins, ..self } + } + + pub fn with_greptime_config_options(mut self, opts: String) -> Self { + let config_router = HttpServer::route_config(GreptimeOptionsConfigState { + greptime_config_options: opts, + }) + .finish_api(&mut self.api); + + Self { + router: self.router.nest("", config_router), + ..self + } + } + + pub fn with_extra_router(self, router: Router) -> Self { + Self { + router: self.router.nest("", router), + ..self + } + } + + pub fn build(self) -> HttpServer { + HttpServer { + options: self.options, + user_provider: self.user_provider, + shutdown_tx: Mutex::new(None), + plugins: self.plugins, + router: StdMutex::new(self.router), + } + } +} + +impl HttpServer { + pub fn make_app(&self) -> Router { + let mut router = { + let router = self.router.lock().unwrap(); + router.clone() + }; router = router.route( "/health", routing::get(handler::health).post(handler::health), ); - let config_router = self - .route_config(GreptimeOptionsConfigState { - greptime_config_options: self.greptime_config_options.clone().unwrap_or_default(), - }) - .finish_api(&mut api); - - router = router.nest("", config_router); - router = router.route("/status", routing::get(handler::status)); #[cfg(feature = "dashboard")] @@ -643,13 +620,13 @@ impl HttpServer { ) } - fn route_metrics(&self, metrics_handler: MetricsHandler) -> Router { + fn route_metrics(metrics_handler: MetricsHandler) -> Router { Router::new() .route("/metrics", routing::get(handler::metrics)) .with_state(metrics_handler) } - fn route_sql(&self, api_state: ApiState) -> ApiRouter { + fn route_sql(api_state: ApiState) -> ApiRouter { ApiRouter::new() .api_route( "/sql", @@ -668,7 +645,7 @@ impl HttpServer { .with_state(api_state) } - fn route_prometheus(&self, prometheus_handler: PrometheusHandlerRef) -> Router { + fn route_prometheus(prometheus_handler: PrometheusHandlerRef) -> Router { Router::new() .route( "/format_query", @@ -685,9 +662,12 @@ impl HttpServer { .with_state(prometheus_handler) } - fn route_prom(&self, prom_handler: PromStoreProtocolHandlerRef) -> Router { + fn route_prom( + prom_handler: PromStoreProtocolHandlerRef, + prom_store_with_metric_engine: bool, + ) -> Router { let mut router = Router::new().route("/read", routing::post(prom_store::remote_read)); - if self.prom_store_with_metric_engine { + if prom_store_with_metric_engine { router = router.route("/write", routing::post(prom_store::remote_write)); } else { router = router.route( @@ -698,7 +678,7 @@ impl HttpServer { router.with_state(prom_handler) } - fn route_influxdb(&self, influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router { + fn route_influxdb(influxdb_handler: InfluxdbLineProtocolHandlerRef) -> Router { Router::new() .route("/write", routing::post(influxdb_write_v1)) .route("/api/v2/write", routing::post(influxdb_write_v2)) @@ -707,20 +687,20 @@ impl HttpServer { .with_state(influxdb_handler) } - fn route_opentsdb(&self, opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router { + fn route_opentsdb(opentsdb_handler: OpentsdbProtocolHandlerRef) -> Router { Router::new() .route("/api/put", routing::post(opentsdb::put)) .with_state(opentsdb_handler) } - fn route_otlp(&self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router { + fn route_otlp(otlp_handler: OpenTelemetryProtocolHandlerRef) -> Router { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) .route("/v1/traces", routing::post(otlp::traces)) .with_state(otlp_handler) } - fn route_config(&self, state: GreptimeOptionsConfigState) -> ApiRouter { + fn route_config(state: GreptimeOptionsConfigState) -> ApiRouter { ApiRouter::new() .route("/config", apirouting::get(handler::config)) .with_state(state) @@ -841,7 +821,7 @@ mod test { use super::*; use crate::error::Error; - use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdapter}; + use crate::query_handler::grpc::GrpcQueryHandler; use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler}; struct DummyInstance { @@ -909,10 +889,8 @@ mod test { fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { _tx: tx }); let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone()); - let grpc_instance = ServerGrpcQueryHandlerAdapter::arc(instance); let server = HttpServerBuilder::new(HttpOptions::default()) - .with_sql_handler(sql_instance) - .with_grpc_handler(grpc_instance) + .with_sql_handler(sql_instance, None) .build(); server.build(server.make_app()).route( "/test/timeout", diff --git a/src/servers/tests/http/http_test.rs b/src/servers/tests/http/http_test.rs index b65a6c76f0..b9f9dc5fa2 100644 --- a/src/servers/tests/http/http_test.rs +++ b/src/servers/tests/http/http_test.rs @@ -18,7 +18,7 @@ use common_test_util::ports; use servers::http::{HttpOptions, HttpServerBuilder}; use table::test_util::MemTable; -use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler}; +use crate::create_testing_sql_query_handler; fn make_test_app() -> Router { let http_opts = HttpOptions { @@ -27,12 +27,10 @@ fn make_test_app() -> Router { }; let server = HttpServerBuilder::new(http_opts) - .with_sql_handler(create_testing_sql_query_handler( - MemTable::default_numbers_table(), - )) - .with_grpc_handler(create_testing_grpc_query_handler( - MemTable::default_numbers_table(), - )) + .with_sql_handler( + create_testing_sql_query_handler(MemTable::default_numbers_table()), + None, + ) .build(); server.build(server.make_app()) } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index a1a0015fa6..81f1718109 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -120,8 +120,7 @@ fn make_test_app(tx: Arc>, db_name: Option<&str>) }) } let server = HttpServerBuilder::new(http_opts) - .with_sql_handler(instance.clone()) - .with_grpc_handler(instance.clone()) + .with_sql_handler(instance.clone(), None) .with_user_provider(Arc::new(user_provider)) .with_influxdb_handler(instance) .build(); diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 388e8b6c0e..9563f25422 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -109,8 +109,7 @@ fn make_test_app(tx: mpsc::Sender) -> Router { let instance = Arc::new(DummyInstance { tx }); let server = HttpServerBuilder::new(http_opts) - .with_grpc_handler(instance.clone()) - .with_sql_handler(instance.clone()) + .with_sql_handler(instance.clone(), None) .with_opentsdb_handler(instance) .build(); server.build(server.make_app()) diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 9b5fe6a37e..44564c367b 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -133,9 +133,8 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { let instance = Arc::new(DummyInstance { tx }); let server = HttpServerBuilder::new(http_opts) - .with_grpc_handler(instance.clone()) - .with_sql_handler(instance.clone()) - .with_prom_handler(instance) + .with_sql_handler(instance.clone(), None) + .with_prom_handler(instance, true) .build(); server.build(server.make_app()) } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 4b49d0c6a5..e94bf487f4 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -12,8 +12,8 @@ api.workspace = true arrow-flight.workspace = true async-trait = "0.1" auth.workspace = true -axum = "0.6" axum-test-helper = { git = "https://github.com/sunng87/axum-test-helper.git", branch = "patch-1" } +axum.workspace = true catalog.workspace = true chrono.workspace = true client = { workspace = true, features = ["testing"] } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 623ffc27ca..c3eb104cc6 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -387,10 +387,10 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router ..Default::default() }; let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc( - instance.instance.clone(), - )) + .with_sql_handler( + ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), + None, + ) .with_metrics_handler(MetricsHandler) .with_greptime_config_options(instance.datanode_opts.to_toml_string()) .build(); @@ -420,16 +420,15 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider( let mut http_server = HttpServerBuilder::new(http_opts); - http_server - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc( - instance.instance.clone(), - )) - .with_script_handler(instance.instance.clone()) + http_server = http_server + .with_sql_handler( + ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()), + Some(instance.instance.clone()), + ) .with_greptime_config_options(instance.mix_options.to_toml().unwrap()); if let Some(user_provider) = user_provider { - http_server.with_user_provider(user_provider); + http_server = http_server.with_user_provider(user_provider); } let http_server = http_server.build(); @@ -458,10 +457,11 @@ pub async fn setup_test_prom_app_with_frontend( }; let frontend_ref = instance.instance.clone(); let http_server = HttpServerBuilder::new(http_opts) - .with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone())) - .with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(frontend_ref.clone())) - .with_script_handler(frontend_ref.clone()) - .with_prom_handler(frontend_ref.clone()) + .with_sql_handler( + ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()), + Some(frontend_ref.clone()), + ) + .with_prom_handler(frontend_ref.clone(), true) .with_prometheus_handler(frontend_ref) .with_greptime_config_options(instance.datanode_opts.to_toml_string()) .build();