feat: support grpc for otlp trace and metrics (#3105)

* feat: add grpc support for otlp trace and metrics

* cr: add some comment

* fix: ut

* fix: cr
This commit is contained in:
fys
2024-01-09 13:01:48 +08:00
committed by GitHub
parent 8bd4a36136
commit 6e860bc0fd
11 changed files with 479 additions and 77 deletions

View File

@@ -45,7 +45,8 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::{join_dir, normalize_dir};
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::GrpcServerConfig;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{start_server, ServerHandler, ServerHandlers};
@@ -328,15 +329,13 @@ impl DatanodeBuilder {
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
};
let server = Box::new(GrpcServer::new(
Some(config),
None,
None,
Some(Arc::new(region_server.clone()) as _),
Some(Arc::new(region_server.clone()) as _),
None,
region_server.runtime(),
));
let server = Box::new(
GrpcServerBuilder::new(region_server.runtime())
.config(config)
.flight_handler(Arc::new(region_server.clone()))
.region_server_handler(Arc::new(region_server.clone()))
.build(),
);
let addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,

View File

@@ -19,7 +19,8 @@ use auth::UserProviderRef;
use common_base::Plugins;
use common_runtime::Builder as RuntimeBuilder;
use servers::error::InternalIoSnafu;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::GrpcServerConfig;
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -68,15 +69,14 @@ impl Services {
max_recv_message_size: opts.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.max_send_message_size.as_bytes() as usize,
};
let grpc_server = GrpcServer::new(
Some(grpc_config),
Some(ServerGrpcQueryHandlerAdapter::arc(instance.clone())),
Some(instance.clone()),
None,
None,
user_provider.clone(),
grpc_runtime,
);
let grpc_server = GrpcServerBuilder::new(grpc_runtime)
.config(grpc_config)
.query_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone()))
.prometheus_handler(instance.clone())
.otlp_handler(instance.clone())
.user_provider(user_provider.clone())
.build();
result.push((Box::new(grpc_server), grpc_addr));
}

View File

@@ -421,6 +421,9 @@ pub enum Error {
#[snafu(display("Failed to convert Mysql value, error: {}", err_msg))]
MysqlValueConversion { err_msg: String, location: Location },
#[snafu(display("Missing query context"))]
MissingQueryContext { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -476,6 +479,7 @@ impl ErrorExt for Error {
| TimePrecision { .. }
| UrlDecode { .. }
| IncompatibleSchema { .. }
| MissingQueryContext { .. }
| MysqlValueConversion { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. }

View File

@@ -12,14 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod authorize;
pub mod builder;
mod database;
pub mod flight;
pub mod greptime_handler;
mod otlp;
pub mod prom_query_gateway;
pub mod region_server;
use std::net::SocketAddr;
use std::sync::Arc;
use api::v1::greptime_database_server::GreptimeDatabaseServer;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
@@ -36,10 +38,11 @@ use auth::UserProviderRef;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use common_runtime::Runtime;
use common_telemetry::logging::info;
use common_telemetry::{error, warn};
use futures::FutureExt;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsServiceServer;
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -47,17 +50,20 @@ use tokio::sync::Mutex;
use tonic::transport::server::TcpIncoming;
use tonic::{Request, Response, Status};
use tonic_reflection::server::{ServerReflection, ServerReflectionServer};
use tower::ServiceBuilder;
use self::authorize::AuthMiddlewareLayer;
use self::flight::{FlightCraftRef, FlightCraftWrapper};
use self::otlp::OtlpService;
use self::prom_query_gateway::PrometheusGatewayService;
use self::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use self::region_server::RegionServerRequestHandler;
use crate::error::{
AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu,
};
use crate::grpc::database::DatabaseService;
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::server::Server;
type TonicResult<T> = std::result::Result<T, Status>;
@@ -66,12 +72,10 @@ pub struct GrpcServer {
config: GrpcServerConfig,
// states
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
/// gRPC serving state receiver. Only present if the gRPC server is started.
/// Used to wait for the server to stop, performing the old blocking fashion.
serve_state: Mutex<Option<Receiver<Result<()>>>>,
user_provider: Option<UserProviderRef>,
// handlers
/// Handler for [DatabaseService] service.
database_handler: Option<GreptimeRequestHandler>,
@@ -81,6 +85,8 @@ pub struct GrpcServer {
flight_handler: Option<FlightCraftRef>,
/// Handler for [RegionServer].
region_server_handler: Option<RegionServerRequestHandler>,
/// Handler for OpenTelemetry Protocol (OTLP) requests.
otlp_handler: Option<OpenTelemetryProtocolHandlerRef>,
}
/// Grpc Server configuration
@@ -102,32 +108,6 @@ impl Default for GrpcServerConfig {
}
impl GrpcServer {
pub fn new(
config: Option<GrpcServerConfig>,
query_handler: Option<ServerGrpcQueryHandlerRef>,
prometheus_handler: Option<PrometheusHandlerRef>,
flight_handler: Option<FlightCraftRef>,
region_server_handler: Option<RegionServerHandlerRef>,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
) -> Self {
let database_handler = query_handler.map(|handler| {
GreptimeRequestHandler::new(handler, user_provider.clone(), runtime.clone())
});
let region_server_handler = region_server_handler
.map(|handler| RegionServerRequestHandler::new(handler, runtime.clone()));
Self {
config: config.unwrap_or_default(),
shutdown_tx: Mutex::new(None),
user_provider,
serve_state: Mutex::new(None),
database_handler,
prometheus_handler,
flight_handler,
region_server_handler,
}
}
#[cfg(feature = "testing")]
pub fn create_flight_service(&self) -> FlightServiceServer<impl FlightService> {
FlightServiceServer::new(FlightCraftWrapper(self.flight_handler.clone().unwrap()))
@@ -245,6 +225,24 @@ impl Server for GrpcServer {
builder = builder
.add_service(self.create_prom_query_gateway_service(prometheus_handler.clone()))
}
if let Some(otlp_handler) = &self.otlp_handler {
let trace_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(self.user_provider.clone()))
.service(TraceServiceServer::new(OtlpService::new(
otlp_handler.clone(),
)));
builder = builder.add_service(trace_server);
let metrics_server = ServiceBuilder::new()
.layer(AuthMiddlewareLayer::with(self.user_provider.clone()))
.service(MetricsServiceServer::new(OtlpService::new(
otlp_handler.clone(),
)));
builder = builder.add_service(metrics_server);
}
if let Some(flight_handler) = &self.flight_handler {
builder = builder.add_service(
FlightServiceServer::new(FlightCraftWrapper(flight_handler.clone()))

View File

@@ -0,0 +1,206 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::result::Result as StdResult;
use std::task::{Context, Poll};
use auth::UserProviderRef;
use hyper::Body;
use session::context::QueryContext;
use tonic::body::BoxBody;
use tonic::transport::NamedService;
use tower::{Layer, Service};
use crate::http::authorize::{extract_catalog_and_schema, extract_username_and_password};
#[derive(Clone)]
pub struct AuthMiddlewareLayer {
user_provider: Option<UserProviderRef>,
}
impl AuthMiddlewareLayer {
pub fn with(user_provider: Option<UserProviderRef>) -> Self {
Self { user_provider }
}
}
impl<S> Layer<S> for AuthMiddlewareLayer {
type Service = AuthMiddleware<S>;
fn layer(&self, service: S) -> Self::Service {
AuthMiddleware {
inner: service,
user_provider: self.user_provider.clone(),
}
}
}
/// This middleware is responsible for authenticating the user and setting the user
/// info in the request extension.
///
/// Detail: Authorization information is passed in through the Authorization request
/// header.
#[derive(Clone)]
pub struct AuthMiddleware<S> {
inner: S,
user_provider: Option<UserProviderRef>,
}
impl<S> NamedService for AuthMiddleware<S>
where
S: NamedService,
{
const NAME: &'static str = S::NAME;
}
type BoxFuture<'a, T> = Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;
impl<S> Service<hyper::Request<Body>> for AuthMiddleware<S>
where
S: Service<hyper::Request<Body>, Response = hyper::Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, mut req: hyper::Request<Body>) -> Self::Future {
// This is necessary because tonic internally uses `tower::buffer::Buffer`.
// See https://github.com/tower-rs/tower/issues/547#issuecomment-767629149
// for details on why this is necessary.
let clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, clone);
let user_provider = self.user_provider.clone();
Box::pin(async move {
if let Err(status) = do_auth(&mut req, user_provider).await {
return Ok(status.to_http());
}
inner.call(req).await
})
}
}
async fn do_auth<T>(
req: &mut hyper::Request<T>,
user_provider: Option<UserProviderRef>,
) -> Result<(), tonic::Status> {
let (catalog, schema) = extract_catalog_and_schema(req);
let query_ctx = QueryContext::with(catalog, schema);
let Some(user_provider) = user_provider else {
query_ctx.set_current_user(Some(auth::userinfo_by_name(None)));
let _ = req.extensions_mut().insert(query_ctx);
return Ok(());
};
let (username, password) = extract_username_and_password(false, req)
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
let id = auth::Identity::UserId(&username, None);
let pwd = auth::Password::PlainText(password);
let user_info = user_provider
.auth(id, pwd, catalog, schema)
.await
.map_err(|e| tonic::Status::unauthenticated(e.to_string()))?;
query_ctx.set_current_user(Some(user_info));
let _ = req.extensions_mut().insert(query_ctx);
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use auth::tests::MockUserProvider;
use base64::engine::general_purpose::STANDARD;
use base64::Engine;
use headers::Header;
use hyper::{Body, Request};
use session::context::QueryContextRef;
use crate::grpc::authorize::do_auth;
use crate::http::header::GreptimeDbName;
#[tokio::test]
async fn test_do_auth_with_user_provider() {
let user_provider = Arc::new(MockUserProvider::default());
// auth success
let authorization_val = format!("Basic {}", STANDARD.encode("greptime:greptime"));
let mut req = Request::new(Body::empty());
req.headers_mut()
.insert("authorization", authorization_val.parse().unwrap());
let auth_result = do_auth(&mut req, Some(user_provider.clone())).await;
assert!(auth_result.is_ok());
check_req(&req, "greptime", "public", "greptime");
// auth failed, err: user not exist.
let authorization_val = format!("Basic {}", STANDARD.encode("greptime2:greptime2"));
let mut req = Request::new(Body::empty());
req.headers_mut()
.insert("authorization", authorization_val.parse().unwrap());
let auth_result = do_auth(&mut req, Some(user_provider)).await;
assert!(auth_result.is_err());
}
#[tokio::test]
async fn test_do_auth_without_user_provider() {
let mut req = Request::new(Body::empty());
req.headers_mut()
.insert("authentication", "pwd".parse().unwrap());
let auth_result = do_auth(&mut req, None).await;
assert!(auth_result.is_ok());
check_req(&req, "greptime", "public", "greptime");
let mut req = Request::new(Body::empty());
let auth_result = do_auth(&mut req, None).await;
assert!(auth_result.is_ok());
check_req(&req, "greptime", "public", "greptime");
let mut req = Request::new(Body::empty());
req.headers_mut()
.insert(GreptimeDbName::name(), "catalog-schema".parse().unwrap());
let auth_result = do_auth(&mut req, None).await;
assert!(auth_result.is_ok());
check_req(&req, "catalog", "schema", "greptime");
}
fn check_req<T>(
req: &Request<T>,
expected_catalog: &str,
expected_schema: &str,
expected_user_name: &str,
) {
let ctx = req.extensions().get::<QueryContextRef>().unwrap();
assert_eq!(expected_catalog, ctx.current_catalog());
assert_eq!(expected_schema, ctx.current_schema());
let user_info = ctx.current_user().unwrap();
assert_eq!(expected_user_name, user_info.username());
}
}

View File

@@ -0,0 +1,121 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use auth::UserProviderRef;
use common_runtime::Runtime;
use tokio::sync::Mutex;
use super::flight::FlightCraftRef;
use super::region_server::{RegionServerHandlerRef, RegionServerRequestHandler};
use super::{GrpcServer, GrpcServerConfig};
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub struct GrpcServerBuilder {
config: Option<GrpcServerConfig>,
query_handler: Option<ServerGrpcQueryHandlerRef>,
prometheus_handler: Option<PrometheusHandlerRef>,
flight_handler: Option<FlightCraftRef>,
region_server_handler: Option<RegionServerHandlerRef>,
otlp_handler: Option<OpenTelemetryProtocolHandlerRef>,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
}
impl GrpcServerBuilder {
pub fn new(runtime: Arc<Runtime>) -> Self {
Self {
config: None,
query_handler: None,
prometheus_handler: None,
flight_handler: None,
region_server_handler: None,
otlp_handler: None,
user_provider: None,
runtime,
}
}
pub fn config(mut self, config: GrpcServerConfig) -> Self {
self.config = Some(config);
self
}
pub fn option_config(mut self, config: Option<GrpcServerConfig>) -> Self {
self.config = config;
self
}
pub fn query_handler(mut self, query_handler: ServerGrpcQueryHandlerRef) -> Self {
self.query_handler = Some(query_handler);
self
}
pub fn prometheus_handler(mut self, prometheus_handler: PrometheusHandlerRef) -> Self {
self.prometheus_handler = Some(prometheus_handler);
self
}
pub fn flight_handler(mut self, flight_handler: FlightCraftRef) -> Self {
self.flight_handler = Some(flight_handler);
self
}
pub fn region_server_handler(mut self, region_server_handler: RegionServerHandlerRef) -> Self {
self.region_server_handler = Some(region_server_handler);
self
}
pub fn otlp_handler(mut self, otlp_handler: OpenTelemetryProtocolHandlerRef) -> Self {
self.otlp_handler = Some(otlp_handler);
self
}
pub fn user_provider(mut self, user_provider: Option<UserProviderRef>) -> Self {
self.user_provider = user_provider;
self
}
pub fn build(self) -> GrpcServer {
let config = self.config.unwrap_or_default();
let user_provider = self.user_provider.as_ref();
let runtime = self.runtime;
let database_handler = self.query_handler.map(|handler| {
GreptimeRequestHandler::new(handler, user_provider.cloned(), runtime.clone())
});
let region_server_handler = self
.region_server_handler
.map(|handler| RegionServerRequestHandler::new(handler, runtime));
GrpcServer {
config,
prometheus_handler: self.prometheus_handler,
flight_handler: self.flight_handler,
region_server_handler,
database_handler,
otlp_handler: self.otlp_handler,
user_provider: self.user_provider,
shutdown_tx: Mutex::new(None),
serve_state: Mutex::new(None),
}
}
}

View File

@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::result::Result as StdResult;
use opentelemetry_proto::tonic::collector::metrics::v1::metrics_service_server::MetricsService;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceService;
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::QueryContextRef;
use snafu::OptionExt;
use tonic::{Request, Response, Status};
use crate::error;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
pub struct OtlpService {
handler: OpenTelemetryProtocolHandlerRef,
}
impl OtlpService {
pub fn new(handler: OpenTelemetryProtocolHandlerRef) -> Self {
Self { handler }
}
}
#[async_trait::async_trait]
impl TraceService for OtlpService {
async fn export(
&self,
request: Request<ExportTraceServiceRequest>,
) -> StdResult<Response<ExportTraceServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();
let ctx = extensions
.get::<QueryContextRef>()
.cloned()
.context(error::MissingQueryContextSnafu)?;
let res = self.handler.traces(req, ctx).await?;
Ok(Response::new(res))
}
}
#[async_trait::async_trait]
impl MetricsService for OtlpService {
async fn export(
&self,
request: Request<ExportMetricsServiceRequest>,
) -> StdResult<Response<ExportMetricsServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();
let ctx = extensions
.get::<QueryContextRef>()
.cloned()
.context(error::MissingQueryContextSnafu)?;
let res = self.handler.metrics(req, ctx).await?;
Ok(Response::new(res))
}
}

View File

@@ -31,8 +31,9 @@ use session::context::QueryContext;
use snafu::OptionExt;
use tonic::{Request, Response};
use super::greptime_handler::create_query_context;
use crate::error::InvalidQuerySnafu;
use crate::grpc::greptime_handler::{auth, create_query_context};
use crate::grpc::greptime_handler::auth;
use crate::grpc::TonicResult;
use crate::http::prometheus::{retrieve_metric_name_and_result_type, PrometheusJsonResponse};
use crate::prometheus_handler::PrometheusHandlerRef;

View File

@@ -126,7 +126,7 @@ fn err_response(is_influxdb: bool, err: impl ErrorExt) -> impl IntoResponse {
(StatusCode::UNAUTHORIZED, ErrorResponse::from_error(ty, err))
}
fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
pub fn extract_catalog_and_schema<B>(request: &Request<B>) -> (&str, &str) {
// parse database from header
let dbname = request
.headers()
@@ -183,7 +183,7 @@ fn get_influxdb_credentials<B>(request: &Request<B>) -> Result<Option<(Username,
}
}
fn extract_username_and_password<B>(
pub fn extract_username_and_password<B>(
is_influxdb: bool,
request: &Request<B>,
) -> Result<(Username, Password)> {

View File

@@ -45,7 +45,7 @@ use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
use meta_srv::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef};
use meta_srv::mocks::MockInfo;
use servers::grpc::GrpcServer;
use servers::grpc::builder::GrpcServerBuilder;
use servers::heartbeat_options::HeartbeatOptions;
use servers::Mode;
use tempfile::TempDir;
@@ -373,17 +373,11 @@ async fn create_datanode_client(datanode: &Datanode) -> (String, Client) {
.unwrap(),
);
let flight_handler = Some(Arc::new(datanode.region_server()) as _);
let region_server_handler = Some(Arc::new(datanode.region_server()) as _);
let grpc_server = GrpcServer::new(
None,
None,
None,
flight_handler,
region_server_handler,
None,
runtime,
);
let grpc_server = GrpcServerBuilder::new(runtime)
.flight_handler(Arc::new(datanode.region_server()))
.region_server_handler(Arc::new(datanode.region_server()))
.build();
let _handle = tokio::spawn(async move {
Server::builder()
.add_service(grpc_server.create_flight_service())

View File

@@ -42,6 +42,7 @@ use object_store::services::{Azblob, Gcs, Oss, S3};
use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use secrecy::ExposeSecret;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::{HttpOptions, HttpServerBuilder};
@@ -508,15 +509,15 @@ pub async fn setup_grpc_server_with(
runtime.clone(),
));
let fe_grpc_server = Arc::new(GrpcServer::new(
grpc_config,
Some(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone())),
Some(fe_instance_ref.clone()),
Some(flight_handler),
None,
user_provider,
runtime,
));
let fe_grpc_server = Arc::new(
GrpcServerBuilder::new(runtime)
.option_config(grpc_config)
.query_handler(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()))
.flight_handler(flight_handler)
.prometheus_handler(fe_instance_ref.clone())
.user_provider(user_provider)
.build(),
);
let fe_grpc_addr = "127.0.0.1:0".parse::<SocketAddr>().unwrap();
let fe_grpc_addr = fe_grpc_server