Compare commits

...

4 Commits

Author SHA1 Message Date
discord9
c69c40a9ee chore: make internal grpc optional (#6789)
* chore: make internal grpc optional

Signed-off-by: discord9 <discord9@163.com>

* revert sqlness runner too

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-08-24 15:38:20 +08:00
Ning Sun
c9bc3de9aa feat: add cli option for internal grpc 2025-08-24 15:11:59 +08:00
discord9
71d54431c6 feat: frontend internal grpc port (#6784)
* feat: frontend internal grpc port

Signed-off-by: discord9 <discord9@163.com>

* fix: grpc server naming

Signed-off-by: discord9 <discord9@163.com>

* test: sqlness test fix

Signed-off-by: discord9 <discord9@163.com>

* fix: internal not use process manager

Signed-off-by: discord9 <discord9@163.com>

* test: test integration port alloc

Signed-off-by: discord9 <discord9@163.com>

* feat: skip auth for internal grpc

Signed-off-by: discord9 <discord9@163.com>

* test: is distributed

Signed-off-by: discord9 <discord9@163.com>

* what:

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: Ning Sun <sunning@greptime.com>
2025-08-24 10:54:43 +08:00
discord9
524c0ced44 fix: truncate manifest action compat (#6742)
* fix: truncate manifest action compat

Signed-off-by: discord9 <discord9@163.com>

* refactor: use simpler compat

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
2025-08-21 16:23:30 +08:00
8 changed files with 193 additions and 15 deletions

View File

@@ -41,6 +41,7 @@ use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::addrs;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
@@ -144,6 +145,14 @@ pub struct StartCommand {
/// on the host, with the same port number as the one specified in `rpc_bind_addr`.
#[clap(long, alias = "rpc-hostname")]
rpc_server_addr: Option<String>,
/// The address to bind the internal gRPC server.
#[clap(long, alias = "internal-rpc-addr")]
internal_rpc_bind_addr: Option<String>,
/// The address advertised to the metasrv, and used for connections from outside the host.
/// If left empty or unset, the server will automatically use the IP address of the first network interface
/// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
#[clap(long, alias = "internal-rpc-hostname")]
internal_rpc_server_addr: Option<String>,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
@@ -241,6 +250,31 @@ impl StartCommand {
opts.grpc.server_addr.clone_from(addr);
}
if let Some(addr) = &self.internal_rpc_bind_addr {
if let Some(internal_grpc) = &mut opts.internal_grpc {
internal_grpc.bind_addr = addr.to_string();
} else {
let grpc_options = GrpcOptions {
bind_addr: addr.to_string(),
..Default::default()
};
opts.internal_grpc = Some(grpc_options);
}
}
if let Some(addr) = &self.internal_rpc_server_addr {
if let Some(internal_grpc) = &mut opts.internal_grpc {
internal_grpc.server_addr = addr.to_string();
} else {
let grpc_options = GrpcOptions {
server_addr: addr.to_string(),
..Default::default()
};
opts.internal_grpc = Some(grpc_options);
}
}
if let Some(addr) = &self.mysql_addr {
opts.mysql.enable = true;
opts.mysql.addr.clone_from(addr);
@@ -448,6 +482,8 @@ mod tests {
http_addr: Some("127.0.0.1:1234".to_string()),
mysql_addr: Some("127.0.0.1:5678".to_string()),
postgres_addr: Some("127.0.0.1:5432".to_string()),
internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
influxdb_enable: Some(false),
disable_dashboard: Some(false),
..Default::default()
@@ -460,6 +496,10 @@ mod tests {
assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
let internal_grpc = opts.internal_grpc.as_ref().unwrap();
assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
let default_opts = FrontendOptions::default().component;
assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);

View File

@@ -47,6 +47,9 @@ pub struct FrontendOptions {
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub grpc: GrpcOptions,
/// The internal gRPC options for the frontend service.
/// it provide the same service as the public gRPC service, just only for internal use.
pub internal_grpc: Option<GrpcOptions>,
pub mysql: MysqlOptions,
pub postgres: PostgresOptions,
pub opentsdb: OpentsdbOptions,
@@ -74,6 +77,7 @@ impl Default for FrontendOptions {
heartbeat: HeartbeatOptions::frontend_default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
internal_grpc: None,
mysql: MysqlOptions::default(),
postgres: PostgresOptions::default(),
opentsdb: OpentsdbOptions::default(),

View File

@@ -56,7 +56,14 @@ impl HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
// if internal grpc is configured, use its address as the peer address
// otherwise use the public grpc address, because peer address only promises to be reachable
// by other components, it doesn't matter whether it's internal or external
peer_addr: if let Some(internal) = &opts.internal_grpc {
addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
} else {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
},
meta_client,
report_interval: heartbeat_opts.interval.as_millis() as u64,
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use meta_client::MetaClientOptions;
use servers::error::Error as ServerError;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
@@ -131,17 +132,28 @@ where
}
}
fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
fn build_grpc_server(
&mut self,
grpc: &GrpcOptions,
meta_client: &Option<MetaClientOptions>,
name: Option<String>,
external: bool,
) -> Result<GrpcServer> {
let builder = if let Some(builder) = self.grpc_server_builder.take() {
builder
} else {
self.grpc_server_builder(&opts.grpc)?
self.grpc_server_builder(grpc)?
};
let user_provider = self.plugins.get::<UserProviderRef>();
let user_provider = if external {
self.plugins.get::<UserProviderRef>()
} else {
// skip authentication for internal grpc port
None
};
// Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
let runtime = if opts.meta_client.is_none() {
let runtime = if meta_client.is_none() {
Some(builder.runtime().clone())
} else {
None
@@ -151,18 +163,25 @@ where
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
runtime,
opts.grpc.flight_compression,
grpc.flight_compression,
);
let frontend_grpc_handler =
FrontendGrpcHandler::new(self.instance.process_manager().clone());
let grpc_server = builder
.name(name)
.database_handler(greptime_request_handler.clone())
.prometheus_handler(self.instance.clone(), user_provider.clone())
.otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone()))
.flight_handler(Arc::new(greptime_request_handler))
.frontend_grpc_handler(frontend_grpc_handler)
.build();
.flight_handler(Arc::new(greptime_request_handler));
let grpc_server = if external {
let frontend_grpc_handler =
FrontendGrpcHandler::new(self.instance.process_manager().clone());
grpc_server.frontend_grpc_handler(frontend_grpc_handler)
} else {
grpc_server
}
.build();
Ok(grpc_server)
}
@@ -195,7 +214,19 @@ where
{
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
let grpc_server = self.build_grpc_server(&opts)?;
let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}
if let Some(internal_grpc) = &opts.internal_grpc {
// Always init Internal GRPC server
let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
let grpc_server = self.build_grpc_server(
internal_grpc,
&opts.meta_client,
Some("INTERNAL_GRPC_SERVER".to_string()),
false,
)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}

View File

@@ -62,15 +62,16 @@ pub struct RegionRemove {
pub region_id: RegionId,
}
/// Last data truncated in the region.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionTruncate {
pub region_id: RegionId,
#[serde(flatten)]
pub kind: TruncateKind,
}
/// The kind of truncate operation.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[serde(untagged)]
pub enum TruncateKind {
/// Truncate all data in the region, marked by all data before the given entry id&sequence.
All {
@@ -259,6 +260,8 @@ impl RegionMetaActionList {
#[cfg(test)]
mod tests {
use common_time::Timestamp;
use super::*;
// These tests are used to ensure backward compatibility of manifest files.
@@ -419,4 +422,66 @@ mod tests {
assert_eq!(manifest, deserialized_manifest);
assert_ne!(serialized_manifest, region_manifest_json);
}
#[test]
fn test_region_truncate_compat() {
// Test deserializing RegionTruncate from old schema
let region_truncate_json = r#"{
"region_id": 4402341478400,
"truncated_entry_id": 10,
"truncated_sequence": 20
}"#;
let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
assert_eq!(truncate_v1.region_id, 4402341478400);
assert_eq!(
truncate_v1.kind,
TruncateKind::All {
truncated_entry_id: 10,
truncated_sequence: 20,
}
);
// Test deserializing RegionTruncate from new schema
let region_truncate_v2_json = r#"{
"region_id": 4402341478400,
"files_to_remove": [
{
"region_id": 4402341478400,
"file_id": "4b220a70-2b03-4641-9687-b65d94641208",
"time_range": [
{
"value": 1451609210000,
"unit": "Millisecond"
},
{
"value": 1451609520000,
"unit": "Millisecond"
}
],
"level": 1,
"file_size": 100
}
]
}"#;
let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
assert_eq!(truncate_v2.region_id, 4402341478400);
assert_eq!(
truncate_v2.kind,
TruncateKind::Partial {
files_to_remove: vec![FileMeta {
region_id: RegionId::from_u64(4402341478400),
file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
time_range: (
Timestamp::new_millisecond(1451609210000),
Timestamp::new_millisecond(1451609520000)
),
level: 1,
file_size: 100,
..Default::default()
}]
}
);
}
}

View File

@@ -118,6 +118,8 @@ impl GrpcOptions {
const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
impl Default for GrpcOptions {
fn default() -> Self {
Self {
@@ -134,6 +136,22 @@ impl Default for GrpcOptions {
}
impl GrpcOptions {
/// Default options for internal gRPC server.
/// The internal gRPC server is used for communication between different nodes in cluster.
/// It is not exposed to the outside world.
pub fn internal_default() -> Self {
Self {
bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
// If hostname is not set, the server will use the local ip address as the hostname.
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
}
}
pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
self.bind_addr = bind_addr.to_string();
self
@@ -189,6 +207,7 @@ pub struct GrpcServer {
>,
>,
bind_addr: Option<SocketAddr>,
name: Option<String>,
}
/// Grpc Server configuration
@@ -300,7 +319,7 @@ impl Server for GrpcServer {
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
let incoming =
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
info!("gRPC server is bound to {}", addr);
info!("gRPC server(name={}) is bound to {}", self.name(), addr);
*shutdown_tx = Some(tx);
@@ -342,7 +361,11 @@ impl Server for GrpcServer {
}
fn name(&self) -> &str {
GRPC_SERVER
if let Some(name) = &self.name {
name
} else {
GRPC_SERVER
}
}
fn bind_addr(&self) -> Option<SocketAddr> {

View File

@@ -62,6 +62,7 @@ macro_rules! add_service {
}
pub struct GrpcServerBuilder {
name: Option<String>,
config: GrpcServerConfig,
runtime: Runtime,
routes_builder: RoutesBuilder,
@@ -77,6 +78,7 @@ pub struct GrpcServerBuilder {
impl GrpcServerBuilder {
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
Self {
name: None,
config,
runtime,
routes_builder: RoutesBuilder::default(),
@@ -93,6 +95,10 @@ impl GrpcServerBuilder {
&self.runtime
}
pub fn name(self, name: Option<String>) -> Self {
Self { name, ..self }
}
/// Add handler for [DatabaseService] service.
pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
add_service!(
@@ -190,6 +196,7 @@ impl GrpcServerBuilder {
tls_config: self.tls_config,
otel_arrow_service: Mutex::new(self.otel_arrow_service),
bind_addr: None,
name: self.name,
}
}
}

View File

@@ -462,6 +462,7 @@ impl GreptimeDbClusterBuilder {
let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost);
fe_opts.grpc.bind_addr = construct_addr(grpc_port);
fe_opts.grpc.server_addr = construct_addr(grpc_port);
fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port(
port_range.clone(),
max_attempts,