feat: frontend internal grpc port (#6784)

* feat: frontend internal grpc port

Signed-off-by: discord9 <discord9@163.com>

* fix: grpc server naming

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness test fix

Signed-off-by: discord9 <discord9@163.com>

* fix: internal not use process manager

Signed-off-by: discord9 <discord9@163.com>

* test: test integration port alloc

Signed-off-by: discord9 <discord9@163.com>

* feat: skip auth for internal grpc

Signed-off-by: discord9 <discord9@163.com>

* test: is distributed

Signed-off-by: discord9 <discord9@163.com>

* what:

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
discord9
2025-08-22 10:46:35 +08:00
committed by GitHub
parent cdc168e753
commit eaceae4c91
8 changed files with 111 additions and 15 deletions

View File

@@ -48,6 +48,9 @@ pub struct FrontendOptions {
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub grpc: GrpcOptions,
/// The internal gRPC options for the frontend service.
/// it provide the same service as the public gRPC service, just only for internal use.
pub internal_grpc: GrpcOptions,
pub mysql: MysqlOptions,
pub postgres: PostgresOptions,
pub opentsdb: OpentsdbOptions,
@@ -77,6 +80,7 @@ impl Default for FrontendOptions {
heartbeat: HeartbeatOptions::frontend_default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
internal_grpc: GrpcOptions::internal_default(),
mysql: MysqlOptions::default(),
postgres: PostgresOptions::default(),
opentsdb: OpentsdbOptions::default(),

View File

@@ -56,7 +56,10 @@ impl HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
peer_addr: addrs::resolve_addr(
&opts.internal_grpc.bind_addr,
Some(&opts.internal_grpc.server_addr),
),
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use meta_client::MetaClientOptions;
use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
@@ -131,17 +132,28 @@ where
}
}
fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
fn build_grpc_server(
&mut self,
grpc: &GrpcOptions,
meta_client: &Option<MetaClientOptions>,
name: Option<String>,
external: bool,
) -> Result<GrpcServer> {
let builder = if let Some(builder) = self.grpc_server_builder.take() {
builder
} else {
self.grpc_server_builder(&opts.grpc)?
self.grpc_server_builder(grpc)?
};
let user_provider = self.plugins.get::<UserProviderRef>();
let user_provider = if external {
self.plugins.get::<UserProviderRef>()
} else {
// skip authentication for internal grpc port
None
};
// Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
let runtime = if opts.meta_client.is_none() {
let runtime = if meta_client.is_none() {
Some(builder.runtime().clone())
} else {
None
@@ -151,18 +163,25 @@ where
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
runtime,
opts.grpc.flight_compression,
grpc.flight_compression,
);
let frontend_grpc_handler =
FrontendGrpcHandler::new(self.instance.process_manager().clone());
let grpc_server = builder
.name(name)
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone()))
.flight_handler(Arc::new(greptime_request_handler))
.frontend_grpc_handler(frontend_grpc_handler)
.build();
.flight_handler(Arc::new(greptime_request_handler));
let grpc_server = if external {
let frontend_grpc_handler =
FrontendGrpcHandler::new(self.instance.process_manager().clone());
grpc_server.frontend_grpc_handler(frontend_grpc_handler)
} else {
grpc_server
}
.build();
Ok(grpc_server)
}
@@ -195,7 +214,19 @@ where
{
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
let grpc_server = self.build_grpc_server(&opts)?;
let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}
if opts.meta_client.is_some() {
// Always init Internal GRPC server
let grpc_addr = parse_addr(&opts.internal_grpc.bind_addr)?;
let grpc_server = self.build_grpc_server(
&opts.grpc,
&opts.meta_client,
Some("INTERNAL_GRPC_SERVER".to_string()),
false,
)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}

View File

@@ -116,6 +116,8 @@ impl GrpcOptions {
const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
impl Default for GrpcOptions {
fn default() -> Self {
Self {
@@ -132,6 +134,22 @@ impl Default for GrpcOptions {
}
impl GrpcOptions {
/// Default options for internal gRPC server.
/// The internal gRPC server is used for communication between different nodes in cluster.
/// It is not exposed to the outside world.
pub fn internal_default() -> Self {
Self {
bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
// If hostname is not set, the server will use the local ip address as the hostname.
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
}
}
pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
self.bind_addr = bind_addr.to_string();
self
@@ -187,6 +205,7 @@ pub struct GrpcServer {
>,
>,
bind_addr: Option<SocketAddr>,
name: Option<String>,
}
/// Grpc Server configuration
@@ -297,7 +316,7 @@ impl Server for GrpcServer {
.context(TcpBindSnafu { addr })?;
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
let incoming = TcpIncoming::from(listener).with_nodelay(Some(true));
info!("gRPC server is bound to {}", addr);
info!("gRPC server(name={}) is bound to {}", self.name(), addr);
*shutdown_tx = Some(tx);
@@ -339,7 +358,11 @@ impl Server for GrpcServer {
}
fn name(&self) -> &str {
GRPC_SERVER
if let Some(name) = &self.name {
name
} else {
GRPC_SERVER
}
}
fn bind_addr(&self) -> Option<SocketAddr> {

View File

@@ -62,6 +62,7 @@ macro_rules! add_service {
}
pub struct GrpcServerBuilder {
name: Option<String>,
config: GrpcServerConfig,
runtime: Runtime,
routes_builder: RoutesBuilder,
@@ -77,6 +78,7 @@ pub struct GrpcServerBuilder {
impl GrpcServerBuilder {
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
Self {
name: None,
config,
runtime,
routes_builder: RoutesBuilder::default(),
@@ -93,6 +95,10 @@ impl GrpcServerBuilder {
&self.runtime
}
pub fn name(self, name: Option<String>) -> Self {
Self { name, ..self }
}
/// Add handler for [DatabaseService] service.
pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
add_service!(
@@ -190,6 +196,7 @@ impl GrpcServerBuilder {
tls_config: self.tls_config,
otel_arrow_service: Mutex::new(self.otel_arrow_service),
bind_addr: None,
name: self.name,
}
}
}

View File

@@ -445,7 +445,10 @@ impl GreptimeDbClusterBuilder {
}
fn build_frontend_options(&self) -> FrontendOptions {
let mut fe_opts = FrontendOptions::default();
let mut fe_opts = FrontendOptions {
meta_client: Some(Default::default()),
..Default::default()
};
// Choose a random unused port between [14000, 24000] for local test to avoid conflicts.
let port_range = 14000..=24000;
@@ -462,6 +465,12 @@ impl GreptimeDbClusterBuilder {
let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost);
fe_opts.grpc.bind_addr = construct_addr(grpc_port);
fe_opts.grpc.server_addr = construct_addr(grpc_port);
let internal_grpc_port =
self.choose_random_unused_port(port_range.clone(), max_attempts, localhost);
fe_opts.internal_grpc.bind_addr = construct_addr(internal_grpc_port);
fe_opts.internal_grpc.server_addr = construct_addr(internal_grpc_port);
fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port(
port_range.clone(),
max_attempts,

View File

@@ -1,3 +1,7 @@
[grpc]
bind_addr = "{grpc_addr}"
server_addr = "{grpc_addr}"
[internal_grpc]
bind_addr = "{internal_grpc_addr}"
server_addr = "{internal_grpc_addr}"

View File

@@ -60,6 +60,7 @@ pub enum ServerMode {
Frontend {
http_addr: String,
rpc_bind_addr: String,
internal_rpc_bind_addr: String,
mysql_addr: String,
postgres_addr: String,
metasrv_addr: String,
@@ -100,6 +101,8 @@ struct ConfigContext {
metasrv_addr: String,
// for frontend and standalone
grpc_addr: String,
// for frontend in distributed mode
internal_grpc_addr: String,
// for standalone
mysql_addr: String,
// for standalone
@@ -124,12 +127,14 @@ impl ServerMode {
pub fn random_frontend(metasrv_port: u16) -> Self {
let http_port = get_unique_random_port();
let rpc_port = get_unique_random_port();
let internal_rpc_port = get_unique_random_port();
let mysql_port = get_unique_random_port();
let postgres_port = get_unique_random_port();
ServerMode::Frontend {
http_addr: format!("127.0.0.1:{http_port}"),
rpc_bind_addr: format!("127.0.0.1:{rpc_port}"),
internal_rpc_bind_addr: format!("127.0.0.1:{internal_rpc_port}"),
mysql_addr: format!("127.0.0.1:{mysql_port}"),
postgres_addr: format!("127.0.0.1:{postgres_port}"),
metasrv_addr: format!("127.0.0.1:{metasrv_port}"),
@@ -324,6 +329,15 @@ impl ServerMode {
instance_id: id,
metasrv_addr,
grpc_addr,
internal_grpc_addr: if let ServerMode::Frontend {
internal_rpc_bind_addr,
..
} = self
{
internal_rpc_bind_addr.clone()
} else {
String::new()
},
mysql_addr,
postgres_addr,
};
@@ -381,6 +395,7 @@ impl ServerMode {
ServerMode::Frontend {
http_addr,
rpc_bind_addr,
internal_rpc_bind_addr: _,
mysql_addr,
postgres_addr,
metasrv_addr,