mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
4 Commits
c112cdf241
...
fix/v0.16-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c69c40a9ee | ||
|
|
c9bc3de9aa | ||
|
|
71d54431c6 | ||
|
|
524c0ced44 |
@@ -41,6 +41,7 @@ use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::addrs;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use tracing_appender::non_blocking::WorkerGuard;
|
||||
@@ -144,6 +145,14 @@ pub struct StartCommand {
|
||||
/// on the host, with the same port number as the one specified in `rpc_bind_addr`.
|
||||
#[clap(long, alias = "rpc-hostname")]
|
||||
rpc_server_addr: Option<String>,
|
||||
/// The address to bind the internal gRPC server.
|
||||
#[clap(long, alias = "internal-rpc-addr")]
|
||||
internal_rpc_bind_addr: Option<String>,
|
||||
/// The address advertised to the metasrv, and used for connections from outside the host.
|
||||
/// If left empty or unset, the server will automatically use the IP address of the first network interface
|
||||
/// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
|
||||
#[clap(long, alias = "internal-rpc-hostname")]
|
||||
internal_rpc_server_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
http_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
@@ -241,6 +250,31 @@ impl StartCommand {
|
||||
opts.grpc.server_addr.clone_from(addr);
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.internal_rpc_bind_addr {
|
||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||
internal_grpc.bind_addr = addr.to_string();
|
||||
} else {
|
||||
let grpc_options = GrpcOptions {
|
||||
bind_addr: addr.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
opts.internal_grpc = Some(grpc_options);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.internal_rpc_server_addr {
|
||||
if let Some(internal_grpc) = &mut opts.internal_grpc {
|
||||
internal_grpc.server_addr = addr.to_string();
|
||||
} else {
|
||||
let grpc_options = GrpcOptions {
|
||||
server_addr: addr.to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
opts.internal_grpc = Some(grpc_options);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(addr) = &self.mysql_addr {
|
||||
opts.mysql.enable = true;
|
||||
opts.mysql.addr.clone_from(addr);
|
||||
@@ -448,6 +482,8 @@ mod tests {
|
||||
http_addr: Some("127.0.0.1:1234".to_string()),
|
||||
mysql_addr: Some("127.0.0.1:5678".to_string()),
|
||||
postgres_addr: Some("127.0.0.1:5432".to_string()),
|
||||
internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
|
||||
internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
|
||||
influxdb_enable: Some(false),
|
||||
disable_dashboard: Some(false),
|
||||
..Default::default()
|
||||
@@ -460,6 +496,10 @@ mod tests {
|
||||
assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
|
||||
assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
|
||||
|
||||
let internal_grpc = opts.internal_grpc.as_ref().unwrap();
|
||||
assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
|
||||
assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
|
||||
|
||||
let default_opts = FrontendOptions::default().component;
|
||||
|
||||
assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
|
||||
|
||||
@@ -47,6 +47,9 @@ pub struct FrontendOptions {
|
||||
pub heartbeat: HeartbeatOptions,
|
||||
pub http: HttpOptions,
|
||||
pub grpc: GrpcOptions,
|
||||
/// The internal gRPC options for the frontend service.
|
||||
/// it provide the same service as the public gRPC service, just only for internal use.
|
||||
pub internal_grpc: Option<GrpcOptions>,
|
||||
pub mysql: MysqlOptions,
|
||||
pub postgres: PostgresOptions,
|
||||
pub opentsdb: OpentsdbOptions,
|
||||
@@ -74,6 +77,7 @@ impl Default for FrontendOptions {
|
||||
heartbeat: HeartbeatOptions::frontend_default(),
|
||||
http: HttpOptions::default(),
|
||||
grpc: GrpcOptions::default(),
|
||||
internal_grpc: None,
|
||||
mysql: MysqlOptions::default(),
|
||||
postgres: PostgresOptions::default(),
|
||||
opentsdb: OpentsdbOptions::default(),
|
||||
|
||||
@@ -56,7 +56,14 @@ impl HeartbeatTask {
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Self {
|
||||
HeartbeatTask {
|
||||
peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
|
||||
// if internal grpc is configured, use its address as the peer address
|
||||
// otherwise use the public grpc address, because peer address only promises to be reachable
|
||||
// by other components, it doesn't matter whether it's internal or external
|
||||
peer_addr: if let Some(internal) = &opts.internal_grpc {
|
||||
addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
|
||||
} else {
|
||||
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
|
||||
},
|
||||
meta_client,
|
||||
report_interval: heartbeat_opts.interval.as_millis() as u64,
|
||||
retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use auth::UserProviderRef;
|
||||
use common_base::Plugins;
|
||||
use common_config::Configurable;
|
||||
use meta_client::MetaClientOptions;
|
||||
use servers::error::Error as ServerError;
|
||||
use servers::grpc::builder::GrpcServerBuilder;
|
||||
use servers::grpc::frontend_grpc_handler::FrontendGrpcHandler;
|
||||
@@ -131,17 +132,28 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
fn build_grpc_server(&mut self, opts: &FrontendOptions) -> Result<GrpcServer> {
|
||||
fn build_grpc_server(
|
||||
&mut self,
|
||||
grpc: &GrpcOptions,
|
||||
meta_client: &Option<MetaClientOptions>,
|
||||
name: Option<String>,
|
||||
external: bool,
|
||||
) -> Result<GrpcServer> {
|
||||
let builder = if let Some(builder) = self.grpc_server_builder.take() {
|
||||
builder
|
||||
} else {
|
||||
self.grpc_server_builder(&opts.grpc)?
|
||||
self.grpc_server_builder(grpc)?
|
||||
};
|
||||
|
||||
let user_provider = self.plugins.get::<UserProviderRef>();
|
||||
let user_provider = if external {
|
||||
self.plugins.get::<UserProviderRef>()
|
||||
} else {
|
||||
// skip authentication for internal grpc port
|
||||
None
|
||||
};
|
||||
|
||||
// Determine whether it is Standalone or Distributed mode based on whether the meta client is configured.
|
||||
let runtime = if opts.meta_client.is_none() {
|
||||
let runtime = if meta_client.is_none() {
|
||||
Some(builder.runtime().clone())
|
||||
} else {
|
||||
None
|
||||
@@ -151,18 +163,25 @@ where
|
||||
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
|
||||
user_provider.clone(),
|
||||
runtime,
|
||||
opts.grpc.flight_compression,
|
||||
grpc.flight_compression,
|
||||
);
|
||||
|
||||
let frontend_grpc_handler =
|
||||
FrontendGrpcHandler::new(self.instance.process_manager().clone());
|
||||
let grpc_server = builder
|
||||
.name(name)
|
||||
.database_handler(greptime_request_handler.clone())
|
||||
.prometheus_handler(self.instance.clone(), user_provider.clone())
|
||||
.otel_arrow_handler(OtelArrowServiceHandler::new(self.instance.clone()))
|
||||
.flight_handler(Arc::new(greptime_request_handler))
|
||||
.frontend_grpc_handler(frontend_grpc_handler)
|
||||
.build();
|
||||
.flight_handler(Arc::new(greptime_request_handler));
|
||||
|
||||
let grpc_server = if external {
|
||||
let frontend_grpc_handler =
|
||||
FrontendGrpcHandler::new(self.instance.process_manager().clone());
|
||||
grpc_server.frontend_grpc_handler(frontend_grpc_handler)
|
||||
} else {
|
||||
grpc_server
|
||||
}
|
||||
.build();
|
||||
|
||||
Ok(grpc_server)
|
||||
}
|
||||
|
||||
@@ -195,7 +214,19 @@ where
|
||||
{
|
||||
// Always init GRPC server
|
||||
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
|
||||
let grpc_server = self.build_grpc_server(&opts)?;
|
||||
let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
|
||||
handlers.insert((Box::new(grpc_server), grpc_addr));
|
||||
}
|
||||
|
||||
if let Some(internal_grpc) = &opts.internal_grpc {
|
||||
// Always init Internal GRPC server
|
||||
let grpc_addr = parse_addr(&internal_grpc.bind_addr)?;
|
||||
let grpc_server = self.build_grpc_server(
|
||||
internal_grpc,
|
||||
&opts.meta_client,
|
||||
Some("INTERNAL_GRPC_SERVER".to_string()),
|
||||
false,
|
||||
)?;
|
||||
handlers.insert((Box::new(grpc_server), grpc_addr));
|
||||
}
|
||||
|
||||
|
||||
@@ -62,15 +62,16 @@ pub struct RegionRemove {
|
||||
pub region_id: RegionId,
|
||||
}
|
||||
|
||||
/// Last data truncated in the region.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub struct RegionTruncate {
|
||||
pub region_id: RegionId,
|
||||
#[serde(flatten)]
|
||||
pub kind: TruncateKind,
|
||||
}
|
||||
|
||||
/// The kind of truncate operation.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
#[serde(untagged)]
|
||||
pub enum TruncateKind {
|
||||
/// Truncate all data in the region, marked by all data before the given entry id&sequence.
|
||||
All {
|
||||
@@ -259,6 +260,8 @@ impl RegionMetaActionList {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use common_time::Timestamp;
|
||||
|
||||
use super::*;
|
||||
|
||||
// These tests are used to ensure backward compatibility of manifest files.
|
||||
@@ -419,4 +422,66 @@ mod tests {
|
||||
assert_eq!(manifest, deserialized_manifest);
|
||||
assert_ne!(serialized_manifest, region_manifest_json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_region_truncate_compat() {
|
||||
// Test deserializing RegionTruncate from old schema
|
||||
let region_truncate_json = r#"{
|
||||
"region_id": 4402341478400,
|
||||
"truncated_entry_id": 10,
|
||||
"truncated_sequence": 20
|
||||
}"#;
|
||||
|
||||
let truncate_v1: RegionTruncate = serde_json::from_str(region_truncate_json).unwrap();
|
||||
assert_eq!(truncate_v1.region_id, 4402341478400);
|
||||
assert_eq!(
|
||||
truncate_v1.kind,
|
||||
TruncateKind::All {
|
||||
truncated_entry_id: 10,
|
||||
truncated_sequence: 20,
|
||||
}
|
||||
);
|
||||
|
||||
// Test deserializing RegionTruncate from new schema
|
||||
let region_truncate_v2_json = r#"{
|
||||
"region_id": 4402341478400,
|
||||
"files_to_remove": [
|
||||
{
|
||||
"region_id": 4402341478400,
|
||||
"file_id": "4b220a70-2b03-4641-9687-b65d94641208",
|
||||
"time_range": [
|
||||
{
|
||||
"value": 1451609210000,
|
||||
"unit": "Millisecond"
|
||||
},
|
||||
{
|
||||
"value": 1451609520000,
|
||||
"unit": "Millisecond"
|
||||
}
|
||||
],
|
||||
"level": 1,
|
||||
"file_size": 100
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let truncate_v2: RegionTruncate = serde_json::from_str(region_truncate_v2_json).unwrap();
|
||||
assert_eq!(truncate_v2.region_id, 4402341478400);
|
||||
assert_eq!(
|
||||
truncate_v2.kind,
|
||||
TruncateKind::Partial {
|
||||
files_to_remove: vec![FileMeta {
|
||||
region_id: RegionId::from_u64(4402341478400),
|
||||
file_id: FileId::parse_str("4b220a70-2b03-4641-9687-b65d94641208").unwrap(),
|
||||
time_range: (
|
||||
Timestamp::new_millisecond(1451609210000),
|
||||
Timestamp::new_millisecond(1451609520000)
|
||||
),
|
||||
level: 1,
|
||||
file_size: 100,
|
||||
..Default::default()
|
||||
}]
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,6 +118,8 @@ impl GrpcOptions {
|
||||
|
||||
const DEFAULT_GRPC_ADDR_PORT: &str = "4001";
|
||||
|
||||
const DEFAULT_INTERNAL_GRPC_ADDR_PORT: &str = "4010";
|
||||
|
||||
impl Default for GrpcOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -134,6 +136,22 @@ impl Default for GrpcOptions {
|
||||
}
|
||||
|
||||
impl GrpcOptions {
|
||||
/// Default options for internal gRPC server.
|
||||
/// The internal gRPC server is used for communication between different nodes in cluster.
|
||||
/// It is not exposed to the outside world.
|
||||
pub fn internal_default() -> Self {
|
||||
Self {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
|
||||
// If hostname is not set, the server will use the local ip address as the hostname.
|
||||
server_addr: String::new(),
|
||||
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
|
||||
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
|
||||
flight_compression: FlightCompression::ArrowIpc,
|
||||
runtime_size: 8,
|
||||
tls: TlsOption::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_bind_addr(mut self, bind_addr: &str) -> Self {
|
||||
self.bind_addr = bind_addr.to_string();
|
||||
self
|
||||
@@ -189,6 +207,7 @@ pub struct GrpcServer {
|
||||
>,
|
||||
>,
|
||||
bind_addr: Option<SocketAddr>,
|
||||
name: Option<String>,
|
||||
}
|
||||
|
||||
/// Grpc Server configuration
|
||||
@@ -300,7 +319,7 @@ impl Server for GrpcServer {
|
||||
let addr = listener.local_addr().context(TcpBindSnafu { addr })?;
|
||||
let incoming =
|
||||
TcpIncoming::from_listener(listener, true, None).context(TcpIncomingSnafu)?;
|
||||
info!("gRPC server is bound to {}", addr);
|
||||
info!("gRPC server(name={}) is bound to {}", self.name(), addr);
|
||||
|
||||
*shutdown_tx = Some(tx);
|
||||
|
||||
@@ -342,7 +361,11 @@ impl Server for GrpcServer {
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
GRPC_SERVER
|
||||
if let Some(name) = &self.name {
|
||||
name
|
||||
} else {
|
||||
GRPC_SERVER
|
||||
}
|
||||
}
|
||||
|
||||
fn bind_addr(&self) -> Option<SocketAddr> {
|
||||
|
||||
@@ -62,6 +62,7 @@ macro_rules! add_service {
|
||||
}
|
||||
|
||||
pub struct GrpcServerBuilder {
|
||||
name: Option<String>,
|
||||
config: GrpcServerConfig,
|
||||
runtime: Runtime,
|
||||
routes_builder: RoutesBuilder,
|
||||
@@ -77,6 +78,7 @@ pub struct GrpcServerBuilder {
|
||||
impl GrpcServerBuilder {
|
||||
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
|
||||
Self {
|
||||
name: None,
|
||||
config,
|
||||
runtime,
|
||||
routes_builder: RoutesBuilder::default(),
|
||||
@@ -93,6 +95,10 @@ impl GrpcServerBuilder {
|
||||
&self.runtime
|
||||
}
|
||||
|
||||
pub fn name(self, name: Option<String>) -> Self {
|
||||
Self { name, ..self }
|
||||
}
|
||||
|
||||
/// Add handler for [DatabaseService] service.
|
||||
pub fn database_handler(mut self, database_handler: GreptimeRequestHandler) -> Self {
|
||||
add_service!(
|
||||
@@ -190,6 +196,7 @@ impl GrpcServerBuilder {
|
||||
tls_config: self.tls_config,
|
||||
otel_arrow_service: Mutex::new(self.otel_arrow_service),
|
||||
bind_addr: None,
|
||||
name: self.name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -462,6 +462,7 @@ impl GreptimeDbClusterBuilder {
|
||||
let grpc_port = self.choose_random_unused_port(port_range.clone(), max_attempts, localhost);
|
||||
fe_opts.grpc.bind_addr = construct_addr(grpc_port);
|
||||
fe_opts.grpc.server_addr = construct_addr(grpc_port);
|
||||
|
||||
fe_opts.mysql.addr = construct_addr(self.choose_random_unused_port(
|
||||
port_range.clone(),
|
||||
max_attempts,
|
||||
|
||||
Reference in New Issue
Block a user