diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 3f0990cb49..4e4d16610e 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -45,7 +45,8 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::{join_dir, normalize_dir}; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; -use servers::grpc::{GrpcServer, GrpcServerConfig}; +use servers::grpc::builder::GrpcServerBuilder; +use servers::grpc::GrpcServerConfig; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::server::{start_server, ServerHandler, ServerHandlers}; @@ -328,15 +329,13 @@ impl DatanodeBuilder { max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize, }; - let server = Box::new(GrpcServer::new( - Some(config), - None, - None, - Some(Arc::new(region_server.clone()) as _), - Some(Arc::new(region_server.clone()) as _), - None, - region_server.runtime(), - )); + let server = Box::new( + GrpcServerBuilder::new(region_server.runtime()) + .config(config) + .flight_handler(Arc::new(region_server.clone())) + .region_server_handler(Arc::new(region_server.clone())) + .build(), + ); let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &opts.rpc_addr, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 712bbce0b6..a3c047490a 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -19,7 +19,8 @@ use auth::UserProviderRef; use common_base::Plugins; use common_runtime::Builder as RuntimeBuilder; use servers::error::InternalIoSnafu; -use servers::grpc::{GrpcServer, GrpcServerConfig}; +use servers::grpc::builder::GrpcServerBuilder; +use servers::grpc::GrpcServerConfig; use servers::http::HttpServerBuilder; use servers::metrics_handler::MetricsHandler; use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef}; @@ -68,15 +69,14 @@ impl Services { max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize, max_send_message_size: opts.max_send_message_size.as_bytes() as usize, }; - let grpc_server = GrpcServer::new( - Some(grpc_config), - Some(ServerGrpcQueryHandlerAdapter::arc(instance.clone())), - Some(instance.clone()), - None, - None, - user_provider.clone(), - grpc_runtime, - ); + + let grpc_server = GrpcServerBuilder::new(grpc_runtime) + .config(grpc_config) + .query_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone())) + .prometheus_handler(instance.clone()) + .otlp_handler(instance.clone()) + .user_provider(user_provider.clone()) + .build(); result.push((Box::new(grpc_server), grpc_addr)); } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 5be38cee15..022bfc52de 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -421,6 +421,9 @@ pub enum Error { #[snafu(display("Failed to convert Mysql value, error: {}", err_msg))] MysqlValueConversion { err_msg: String, location: Location }, + + #[snafu(display("Missing query context"))] + MissingQueryContext { location: Location }, } pub type Result = std::result::Result; @@ -476,6 +479,7 @@ impl ErrorExt for Error { | TimePrecision { .. } | UrlDecode { .. } | IncompatibleSchema { .. } + | MissingQueryContext { .. } | MysqlValueConversion { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index b58f06e15a..b2ba7e8f9d 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -12,14 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod authorize; +pub mod builder; mod database; pub mod flight; pub mod greptime_handler; +mod otlp; pub mod prom_query_gateway; pub mod region_server; use std::net::SocketAddr; -use std::sync::Arc; use api::v1::greptime_database_server::GreptimeDatabaseServer; use api::v1::health_check_server::{HealthCheck, HealthCheckServer}; @@ -36,10 +38,11 @@ use auth::UserProviderRef; use common_grpc::channel_manager::{ DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, }; -use common_runtime::Runtime; use common_telemetry::logging::info; use common_telemetry::{error, warn}; use futures::FutureExt; +use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer; +use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer; use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; use tokio::sync::oneshot::{self, Receiver, Sender}; @@ -47,17 +50,20 @@ use tokio::sync::Mutex; use tonic::transport::server::TcpIncoming; use tonic::{Request, Response, Status}; use tonic_reflection::server::{ServerReflection, ServerReflectionServer}; +use tower::ServiceBuilder; +use self::authorize::AuthMiddlewareLayer; use self::flight::{FlightCraftRef, FlightCraftWrapper}; +use self::otlp::OtlpService; use self::prom_query_gateway::PrometheusGatewayService; -use self::region_server::{RegionServerHandlerRef, RegionServerRequestHandler}; +use self::region_server::RegionServerRequestHandler; use crate::error::{ AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu, }; use crate::grpc::database::DatabaseService; use crate::grpc::greptime_handler::GreptimeRequestHandler; use crate::prometheus_handler::PrometheusHandlerRef; -use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; +use crate::query_handler::OpenTelemetryProtocolHandlerRef; use crate::server::Server; type TonicResult = std::result::Result; @@ -66,12 +72,10 @@ pub struct GrpcServer { config: GrpcServerConfig, // states shutdown_tx: Mutex>>, - user_provider: Option, - /// gRPC serving state receiver. Only present if the gRPC server is started. /// Used to wait for the server to stop, performing the old blocking fashion. serve_state: Mutex>>>, - + user_provider: Option, // handlers /// Handler for [DatabaseService] service. database_handler: Option, @@ -81,6 +85,8 @@ pub struct GrpcServer { flight_handler: Option, /// Handler for [RegionServer]. region_server_handler: Option, + /// Handler for OpenTelemetry Protocol (OTLP) requests. + otlp_handler: Option, } /// Grpc Server configuration @@ -102,32 +108,6 @@ impl Default for GrpcServerConfig { } impl GrpcServer { - pub fn new( - config: Option, - query_handler: Option, - prometheus_handler: Option, - flight_handler: Option, - region_server_handler: Option, - user_provider: Option, - runtime: Arc, - ) -> Self { - let database_handler = query_handler.map(|handler| { - GreptimeRequestHandler::new(handler, user_provider.clone(), runtime.clone()) - }); - let region_server_handler = region_server_handler - .map(|handler| RegionServerRequestHandler::new(handler, runtime.clone())); - Self { - config: config.unwrap_or_default(), - shutdown_tx: Mutex::new(None), - user_provider, - serve_state: Mutex::new(None), - database_handler, - prometheus_handler, - flight_handler, - region_server_handler, - } - } - #[cfg(feature = "testing")] pub fn create_flight_service(&self) -> FlightServiceServer { FlightServiceServer::new(FlightCraftWrapper(self.flight_handler.clone().unwrap())) @@ -245,6 +225,24 @@ impl Server for GrpcServer { builder = builder .add_service(self.create_prom_query_gateway_service(prometheus_handler.clone())) } + + if let Some(otlp_handler) = &self.otlp_handler { + let trace_server = ServiceBuilder::new() + .layer(AuthMiddlewareLayer::with(self.user_provider.clone())) + .service(TraceServiceServer::new(OtlpService::new( + otlp_handler.clone(), + ))); + builder = builder.add_service(trace_server); + + let metrics_server = ServiceBuilder::new() + .layer(AuthMiddlewareLayer::with(self.user_provider.clone())) + .service(MetricsServiceServer::new(OtlpService::new( + otlp_handler.clone(), + ))); + + builder = builder.add_service(metrics_server); + } + if let Some(flight_handler) = &self.flight_handler { builder = builder.add_service( FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone())) diff --git a/src/servers/src/grpc/authorize.rs b/src/servers/src/grpc/authorize.rs new file mode 100644 index 0000000000..878a45f169 --- /dev/null +++ b/src/servers/src/grpc/authorize.rs @@ -0,0 +1,206 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::pin::Pin; +use std::result::Result as StdResult; +use std::task::{Context, Poll}; + +use auth::UserProviderRef; +use hyper::Body; +use session::context::QueryContext; +use tonic::body::BoxBody; +use tonic::transport::NamedService; +use tower::{Layer, Service}; + +use crate::http::authorize::{extract_catalog_and_schema, extract_username_and_password}; + +#[derive(Clone)] +pub struct AuthMiddlewareLayer { + user_provider: Option, +} + +impl AuthMiddlewareLayer { + pub fn with(user_provider: Option) -> Self { + Self { user_provider } + } +} + +impl Layer for AuthMiddlewareLayer { + type Service = AuthMiddleware; + + fn layer(&self, service: S) -> Self::Service { + AuthMiddleware { + inner: service, + user_provider: self.user_provider.clone(), + } + } +} + +/// This middleware is responsible for authenticating the user and setting the user +/// info in the request extension. +/// +/// Detail: Authorization information is passed in through the Authorization request +/// header. +#[derive(Clone)] +pub struct AuthMiddleware { + inner: S, + user_provider: Option, +} + +impl NamedService for AuthMiddleware +where + S: NamedService, +{ + const NAME: &'static str = S::NAME; +} + +type BoxFuture<'a, T> = Pin + Send + 'a>>; + +impl Service> for AuthMiddleware +where + S: Service, Response = hyper::Response> + Clone + Send + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = BoxFuture<'static, StdResult>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, mut req: hyper::Request) -> Self::Future { + // This is necessary because tonic internally uses `tower::buffer::Buffer`. + // See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149 + // for details on why this is necessary. + let clone = self.inner.clone(); + let mut inner = std::mem::replace(&mut self.inner, clone); + + let user_provider = self.user_provider.clone(); + + Box::pin(async move { + if let Err(status) = do_auth(&mut req, user_provider).await { + return Ok(status.to_http()); + } + inner.call(req).await + }) + } +} + +async fn do_auth( + req: &mut hyper::Request, + user_provider: Option, +) -> Result<(), tonic::Status> { + let (catalog, schema) = extract_catalog_and_schema(req); + + let query_ctx = QueryContext::with(catalog, schema); + + let Some(user_provider) = user_provider else { + query_ctx.set_current_user(Some(auth::userinfo_by_name(None))); + let _ = req.extensions_mut().insert(query_ctx); + return Ok(()); + }; + + let (username, password) = extract_username_and_password(false, req) + .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; + + let id = auth::Identity::UserId(&username, None); + let pwd = auth::Password::PlainText(password); + + let user_info = user_provider + .auth(id, pwd, catalog, schema) + .await + .map_err(|e| tonic::Status::unauthenticated(e.to_string()))?; + + query_ctx.set_current_user(Some(user_info)); + let _ = req.extensions_mut().insert(query_ctx); + + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use auth::tests::MockUserProvider; + use base64::engine::general_purpose::STANDARD; + use base64::Engine; + use headers::Header; + use hyper::{Body, Request}; + use session::context::QueryContextRef; + + use crate::grpc::authorize::do_auth; + use crate::http::header::GreptimeDbName; + + #[tokio::test] + async fn test_do_auth_with_user_provider() { + let user_provider = Arc::new(MockUserProvider::default()); + + // auth success + let authorization_val = format!("Basic {}", STANDARD.encode("greptime:greptime")); + let mut req = Request::new(Body::empty()); + req.headers_mut() + .insert("authorization", authorization_val.parse().unwrap()); + + let auth_result = do_auth(&mut req, Some(user_provider.clone())).await; + + assert!(auth_result.is_ok()); + check_req(&req, "greptime", "public", "greptime"); + + // auth failed, err: user not exist. + let authorization_val = format!("Basic {}", STANDARD.encode("greptime2:greptime2")); + let mut req = Request::new(Body::empty()); + req.headers_mut() + .insert("authorization", authorization_val.parse().unwrap()); + + let auth_result = do_auth(&mut req, Some(user_provider)).await; + assert!(auth_result.is_err()); + } + + #[tokio::test] + async fn test_do_auth_without_user_provider() { + let mut req = Request::new(Body::empty()); + req.headers_mut() + .insert("authentication", "pwd".parse().unwrap()); + let auth_result = do_auth(&mut req, None).await; + assert!(auth_result.is_ok()); + check_req(&req, "greptime", "public", "greptime"); + + let mut req = Request::new(Body::empty()); + let auth_result = do_auth(&mut req, None).await; + assert!(auth_result.is_ok()); + check_req(&req, "greptime", "public", "greptime"); + + let mut req = Request::new(Body::empty()); + req.headers_mut() + .insert(GreptimeDbName::name(), "catalog-schema".parse().unwrap()); + let auth_result = do_auth(&mut req, None).await; + assert!(auth_result.is_ok()); + check_req(&req, "catalog", "schema", "greptime"); + } + + fn check_req( + req: &Request, + expected_catalog: &str, + expected_schema: &str, + expected_user_name: &str, + ) { + let ctx = req.extensions().get::().unwrap(); + assert_eq!(expected_catalog, ctx.current_catalog()); + assert_eq!(expected_schema, ctx.current_schema()); + + let user_info = ctx.current_user().unwrap(); + assert_eq!(expected_user_name, user_info.username()); + } +} diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs new file mode 100644 index 0000000000..910fc67ee4 --- /dev/null +++ b/src/servers/src/grpc/builder.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use auth::UserProviderRef; +use common_runtime::Runtime; +use tokio::sync::Mutex; + +use super::flight::FlightCraftRef; +use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler}; +use super::{GrpcServer, GrpcServerConfig}; +use crate::grpc::greptime_handler::GreptimeRequestHandler; +use crate::prometheus_handler::PrometheusHandlerRef; +use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; +use crate::query_handler::OpenTelemetryProtocolHandlerRef; + +pub struct GrpcServerBuilder { + config: Option, + query_handler: Option, + prometheus_handler: Option, + flight_handler: Option, + region_server_handler: Option, + otlp_handler: Option, + user_provider: Option, + runtime: Arc, +} + +impl GrpcServerBuilder { + pub fn new(runtime: Arc) -> Self { + Self { + config: None, + query_handler: None, + prometheus_handler: None, + flight_handler: None, + region_server_handler: None, + otlp_handler: None, + user_provider: None, + runtime, + } + } + + pub fn config(mut self, config: GrpcServerConfig) -> Self { + self.config = Some(config); + self + } + + pub fn option_config(mut self, config: Option) -> Self { + self.config = config; + self + } + + pub fn query_handler(mut self, query_handler: ServerGrpcQueryHandlerRef) -> Self { + self.query_handler = Some(query_handler); + self + } + + pub fn prometheus_handler(mut self, prometheus_handler: PrometheusHandlerRef) -> Self { + self.prometheus_handler = Some(prometheus_handler); + self + } + + pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self { + self.flight_handler = Some(flight_handler); + self + } + + pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self { + self.region_server_handler = Some(region_server_handler); + self + } + + pub fn otlp_handler(mut self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Self { + self.otlp_handler = Some(otlp_handler); + self + } + + pub fn user_provider(mut self, user_provider: Option) -> Self { + self.user_provider = user_provider; + self + } + + pub fn build(self) -> GrpcServer { + let config = self.config.unwrap_or_default(); + + let user_provider = self.user_provider.as_ref(); + + let runtime = self.runtime; + + let database_handler = self.query_handler.map(|handler| { + GreptimeRequestHandler::new(handler, user_provider.cloned(), runtime.clone()) + }); + + let region_server_handler = self + .region_server_handler + .map(|handler| RegionServerRequestHandler::new(handler, runtime)); + + GrpcServer { + config, + prometheus_handler: self.prometheus_handler, + flight_handler: self.flight_handler, + region_server_handler, + database_handler, + otlp_handler: self.otlp_handler, + user_provider: self.user_provider, + shutdown_tx: Mutex::new(None), + serve_state: Mutex::new(None), + } + } +} diff --git a/src/servers/src/grpc/otlp.rs b/src/servers/src/grpc/otlp.rs new file mode 100644 index 0000000000..9edc191e8f --- /dev/null +++ b/src/servers/src/grpc/otlp.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::result::Result as StdResult; + +use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsService; +use opentelemetry_proto::tonic::collector::metrics::v1::{ + ExportMetricsServiceRequest, ExportMetricsServiceResponse, +}; +use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService; +use opentelemetry_proto::tonic::collector::trace::v1::{ + ExportTraceServiceRequest, ExportTraceServiceResponse, +}; +use session::context::QueryContextRef; +use snafu::OptionExt; +use tonic::{Request, Response, Status}; + +use crate::error; +use crate::query_handler::OpenTelemetryProtocolHandlerRef; + +pub struct OtlpService { + handler: OpenTelemetryProtocolHandlerRef, +} + +impl OtlpService { + pub fn new(handler: OpenTelemetryProtocolHandlerRef) -> Self { + Self { handler } + } +} + +#[async_trait::async_trait] +impl TraceService for OtlpService { + async fn export( + &self, + request: Request, + ) -> StdResult, Status> { + let (_headers, extensions, req) = request.into_parts(); + + let ctx = extensions + .get::() + .cloned() + .context(error::MissingQueryContextSnafu)?; + + let res = self.handler.traces(req, ctx).await?; + + Ok(Response::new(res)) + } +} + +#[async_trait::async_trait] +impl MetricsService for OtlpService { + async fn export( + &self, + request: Request, + ) -> StdResult, Status> { + let (_headers, extensions, req) = request.into_parts(); + + let ctx = extensions + .get::() + .cloned() + .context(error::MissingQueryContextSnafu)?; + + let res = self.handler.metrics(req, ctx).await?; + + Ok(Response::new(res)) + } +} diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index 1910f0c6de..e91c1316db 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -31,8 +31,9 @@ use session::context::QueryContext; use snafu::OptionExt; use tonic::{Request, Response}; +use super::greptime_handler::create_query_context; use crate::error::InvalidQuerySnafu; -use crate::grpc::greptime_handler::{auth, create_query_context}; +use crate::grpc::greptime_handler::auth; use crate::grpc::TonicResult; use crate::http::prometheus::{retrieve_metric_name_and_result_type, PrometheusJsonResponse}; use crate::prometheus_handler::PrometheusHandlerRef; diff --git a/src/servers/src/http/authorize.rs b/src/servers/src/http/authorize.rs index 0d5908ea6b..a704b54feb 100644 --- a/src/servers/src/http/authorize.rs +++ b/src/servers/src/http/authorize.rs @@ -126,7 +126,7 @@ fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse { (StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err)) } -fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { +pub fn extract_catalog_and_schema(request: &Request) -> (&str, &str) { // parse database from header let dbname = request .headers() @@ -183,7 +183,7 @@ fn get_influxdb_credentials(request: &Request) -> Result( +pub fn extract_username_and_password( is_influxdb: bool, request: &Request, ) -> Result<(Username, Password)> { diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 21276bda98..eb55a8621b 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -45,7 +45,7 @@ use meta_client::client::MetaClientBuilder; use meta_srv::cluster::MetaPeerClientRef; use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use meta_srv::mocks::MockInfo; -use servers::grpc::GrpcServer; +use servers::grpc::builder::GrpcServerBuilder; use servers::heartbeat_options::HeartbeatOptions; use servers::Mode; use tempfile::TempDir; @@ -373,17 +373,11 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) { .unwrap(), ); - let flight_handler = Some(Arc::new(datanode.region_server()) as _); - let region_server_handler = Some(Arc::new(datanode.region_server()) as _); - let grpc_server = GrpcServer::new( - None, - None, - None, - flight_handler, - region_server_handler, - None, - runtime, - ); + let grpc_server = GrpcServerBuilder::new(runtime) + .flight_handler(Arc::new(datanode.region_server())) + .region_server_handler(Arc::new(datanode.region_server())) + .build(); + let _handle = tokio::spawn(async move { Server::builder() .add_service(grpc_server.create_flight_service()) diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 8ec3302dc7..677300edae 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -42,6 +42,7 @@ use object_store::services::{Azblob, Gcs, Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; use secrecy::ExposeSecret; +use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::greptime_handler::GreptimeRequestHandler; use servers::grpc::{GrpcServer, GrpcServerConfig}; use servers::http::{HttpOptions, HttpServerBuilder}; @@ -508,15 +509,15 @@ pub async fn setup_grpc_server_with( runtime.clone(), )); - let fe_grpc_server = Arc::new(GrpcServer::new( - grpc_config, - Some(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone())), - Some(fe_instance_ref.clone()), - Some(flight_handler), - None, - user_provider, - runtime, - )); + let fe_grpc_server = Arc::new( + GrpcServerBuilder::new(runtime) + .option_config(grpc_config) + .query_handler(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone())) + .flight_handler(flight_handler) + .prometheus_handler(fe_instance_ref.clone()) + .user_provider(user_provider) + .build(), + ); let fe_grpc_addr = "127.0.0.1:0".parse::().unwrap(); let fe_grpc_addr = fe_grpc_server