diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 23b355f60c..3829cda718 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -48,6 +48,9 @@ pub struct FrontendOptions { pub heartbeat: HeartbeatOptions, pub http: HttpOptions, pub grpc: GrpcOptions, + /// The internal gRPC options for the frontend service. + /// it provide the same service as the public gRPC service, just only for internal use. + pub internal_grpc: GrpcOptions, pub mysql: MysqlOptions, pub postgres: PostgresOptions, pub opentsdb: OpentsdbOptions, @@ -77,6 +80,7 @@ impl Default for FrontendOptions { heartbeat: HeartbeatOptions::frontend_default(), http: HttpOptions::default(), grpc: GrpcOptions::default(), + internal_grpc: GrpcOptions::internal_default(), mysql: MysqlOptions::default(), postgres: PostgresOptions::default(), opentsdb: OpentsdbOptions::default(), diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 47dac786b2..dd0e89399d 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -56,7 +56,10 @@ impl HeartbeatTask { resp_handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Self { HeartbeatTask { - peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), + peer_addr: addrs::resolve_addr( + &opts.internal_grpc.bind_addr, + Some(&opts.internal_grpc.server_addr), + ), meta_client, report_interval: heartbeat_opts.interval.as_millis() as u64, retry_interval: heartbeat_opts.retry_interval.as_millis() as u64, diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 0c6493f79b..b198604e42 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use auth::UserProviderRef; use common_base::Plugins; use common_config::Configurable; +use meta_client::MetaClientOptions; use servers::error::Error as ServerError; use servers::grpc::builder::GrpcServerBuilder; use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler; @@ -131,17 +132,28 @@ where } } - fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result { + fn build_grpc_server( + &mut self, + grpc: &GrpcOptions, + meta_client: &Option, + name: Option, + external: bool, + ) -> Result { let builder = if let Some(builder) = self.grpc_server_builder.take() { builder } else { - self.grpc_server_builder(&opts.grpc)? + self.grpc_server_builder(grpc)? }; - let user_provider = self.plugins.get::(); + let user_provider = if external { + self.plugins.get::() + } else { + // skip authentication for internal grpc port + None + }; // Determine whether it is Standalone or Distributed mode based on whether the meta client is configured. - let runtime = if opts.meta_client.is_none() { + let runtime = if meta_client.is_none() { Some(builder.runtime().clone()) } else { None @@ -151,18 +163,25 @@ where ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()), user_provider.clone(), runtime, - opts.grpc.flight_compression, + grpc.flight_compression, ); - let frontend_grpc_handler = - FrontendGrpcHandler::new(self.instance.process_manager().clone()); let grpc_server = builder + .name(name) .database_handler(greptime_request_handler.clone()) .prometheus_handler(self.instance.clone(), user_provider.clone()) .otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone())) - .flight_handler(Arc::new(greptime_request_handler)) - .frontend_grpc_handler(frontend_grpc_handler) - .build(); + .flight_handler(Arc::new(greptime_request_handler)); + + let grpc_server = if external { + let frontend_grpc_handler = + FrontendGrpcHandler::new(self.instance.process_manager().clone()); + grpc_server.frontend_grpc_handler(frontend_grpc_handler) + } else { + grpc_server + } + .build(); + Ok(grpc_server) } @@ -195,7 +214,19 @@ where { // Always init GRPC server let grpc_addr = parse_addr(&opts.grpc.bind_addr)?; - let grpc_server = self.build_grpc_server(&opts)?; + let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?; + handlers.insert((Box::new(grpc_server), grpc_addr)); + } + + if opts.meta_client.is_some() { + // Always init Internal GRPC server + let grpc_addr = parse_addr(&opts.internal_grpc.bind_addr)?; + let grpc_server = self.build_grpc_server( + &opts.grpc, + &opts.meta_client, + Some("INTERNAL_GRPC_SERVER".to_string()), + false, + )?; handlers.insert((Box::new(grpc_server), grpc_addr)); } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index a93a82b50e..18254e9ec7 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -116,6 +116,8 @@ impl GrpcOptions { const DEFAULT_GRPC_ADDR_PORT: &str = "4001"; +const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010"; + impl Default for GrpcOptions { fn default() -> Self { Self { @@ -132,6 +134,22 @@ impl Default for GrpcOptions { } impl GrpcOptions { + /// Default options for internal gRPC server. + /// The internal gRPC server is used for communication between different nodes in cluster. + /// It is not exposed to the outside world. + pub fn internal_default() -> Self { + Self { + bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT), + // If hostname is not set, the server will use the local ip address as the hostname. + server_addr: String::new(), + max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, + max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE, + flight_compression: FlightCompression::ArrowIpc, + runtime_size: 8, + tls: TlsOption::default(), + } + } + pub fn with_bind_addr(mut self, bind_addr: &str) -> Self { self.bind_addr = bind_addr.to_string(); self @@ -187,6 +205,7 @@ pub struct GrpcServer { >, >, bind_addr: Option, + name: Option, } /// Grpc Server configuration @@ -297,7 +316,7 @@ impl Server for GrpcServer { .context(TcpBindSnafu { addr })?; let addr = listener.local_addr().context(TcpBindSnafu { addr })?; let incoming = TcpIncoming::from(listener).with_nodelay(Some(true)); - info!("gRPC server is bound to {}", addr); + info!("gRPC server(name={}) is bound to {}", self.name(), addr); *shutdown_tx = Some(tx); @@ -339,7 +358,11 @@ impl Server for GrpcServer { } fn name(&self) -> &str { - GRPC_SERVER + if let Some(name) = &self.name { + name + } else { + GRPC_SERVER + } } fn bind_addr(&self) -> Option { diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs index f39a76a576..a9ebbacce4 100644 --- a/src/servers/src/grpc/builder.rs +++ b/src/servers/src/grpc/builder.rs @@ -62,6 +62,7 @@ macro_rules! add_service { } pub struct GrpcServerBuilder { + name: Option, config: GrpcServerConfig, runtime: Runtime, routes_builder: RoutesBuilder, @@ -77,6 +78,7 @@ pub struct GrpcServerBuilder { impl GrpcServerBuilder { pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self { Self { + name: None, config, runtime, routes_builder: RoutesBuilder::default(), @@ -93,6 +95,10 @@ impl GrpcServerBuilder { &self.runtime } + pub fn name(self, name: Option) -> Self { + Self { name, ..self } + } + /// Add handler for [DatabaseService] service. pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self { add_service!( @@ -190,6 +196,7 @@ impl GrpcServerBuilder { tls_config: self.tls_config, otel_arrow_service: Mutex::new(self.otel_arrow_service), bind_addr: None, + name: self.name, } } } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index a7847394a5..4a037413bd 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -445,7 +445,10 @@ impl GreptimeDbClusterBuilder { } fn build_frontend_options(&self) -> FrontendOptions { - let mut fe_opts = FrontendOptions::default(); + let mut fe_opts = FrontendOptions { + meta_client: Some(Default::default()), + ..Default::default() + }; // Choose a random unused port between [14000, 24000] for local test to avoid conflicts. let port_range = 14000..=24000; @@ -462,6 +465,12 @@ impl GreptimeDbClusterBuilder { let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost); fe_opts.grpc.bind_addr = construct_addr(grpc_port); fe_opts.grpc.server_addr = construct_addr(grpc_port); + + let internal_grpc_port = + self.choose_random_unused_port(port_range.clone(), max_attempts, localhost); + fe_opts.internal_grpc.bind_addr = construct_addr(internal_grpc_port); + fe_opts.internal_grpc.server_addr = construct_addr(internal_grpc_port); + fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port( port_range.clone(), max_attempts, diff --git a/tests/conf/frontend-test.toml.template b/tests/conf/frontend-test.toml.template index de4ce86adc..9d8bbdfd5e 100644 --- a/tests/conf/frontend-test.toml.template +++ b/tests/conf/frontend-test.toml.template @@ -1,3 +1,7 @@ [grpc] bind_addr = "{grpc_addr}" server_addr = "{grpc_addr}" + +[internal_grpc] +bind_addr = "{internal_grpc_addr}" +server_addr = "{internal_grpc_addr}" diff --git a/tests/runner/src/server_mode.rs b/tests/runner/src/server_mode.rs index b3d471da46..3b04299142 100644 --- a/tests/runner/src/server_mode.rs +++ b/tests/runner/src/server_mode.rs @@ -60,6 +60,7 @@ pub enum ServerMode { Frontend { http_addr: String, rpc_bind_addr: String, + internal_rpc_bind_addr: String, mysql_addr: String, postgres_addr: String, metasrv_addr: String, @@ -100,6 +101,8 @@ struct ConfigContext { metasrv_addr: String, // for frontend and standalone grpc_addr: String, + // for frontend in distributed mode + internal_grpc_addr: String, // for standalone mysql_addr: String, // for standalone @@ -124,12 +127,14 @@ impl ServerMode { pub fn random_frontend(metasrv_port: u16) -> Self { let http_port = get_unique_random_port(); let rpc_port = get_unique_random_port(); + let internal_rpc_port = get_unique_random_port(); let mysql_port = get_unique_random_port(); let postgres_port = get_unique_random_port(); ServerMode::Frontend { http_addr: format!("127.0.0.1:{http_port}"), rpc_bind_addr: format!("127.0.0.1:{rpc_port}"), + internal_rpc_bind_addr: format!("127.0.0.1:{internal_rpc_port}"), mysql_addr: format!("127.0.0.1:{mysql_port}"), postgres_addr: format!("127.0.0.1:{postgres_port}"), metasrv_addr: format!("127.0.0.1:{metasrv_port}"), @@ -324,6 +329,15 @@ impl ServerMode { instance_id: id, metasrv_addr, grpc_addr, + internal_grpc_addr: if let ServerMode::Frontend { + internal_rpc_bind_addr, + .. + } = self + { + internal_rpc_bind_addr.clone() + } else { + String::new() + }, mysql_addr, postgres_addr, }; @@ -381,6 +395,7 @@ impl ServerMode { ServerMode::Frontend { http_addr, rpc_bind_addr, + internal_rpc_bind_addr: _, mysql_addr, postgres_addr, metasrv_addr,