refactor!: unify FrontendOptions and DatanodeOptions by using GrpcOptions (#4088)

* refactor: move GrpcOptions to servers/grpc

* fix: optimize code

* fix: docs

* refactor: move DatanodeOptions.rpc_hostname to grpc.hostname

* fix: merge main

* refactor code impl

test: add test_depreacted_cli_options unit test

* Update src/servers/src/grpc.rs

Co-authored-by: Yingwen <realevenyag@gmail.com>

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
taobo
2024-06-19 06:45:38 +08:00
committed by GitHub
parent fe74efdafe
commit 22d12683b4
17 changed files with 210 additions and 103 deletions

View File

@@ -305,13 +305,24 @@
| `node_id` | Integer | `None` | The datanode identifier and should be unique in the cluster. |
| `require_lease_before_startup` | Bool | `false` | Start services after regions have obtained leases.<br/>It will block the datanode start if it can't receive leases in the heartbeat from metasrv. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `rpc_addr` | String | `127.0.0.1:3001` | The gRPC address of the datanode. |
| `rpc_hostname` | String | `None` | The hostname of the datanode. |
| `rpc_runtime_size` | Integer | `8` | The number of gRPC server worker threads. |
| `rpc_max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `rpc_max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `rpc_addr` | String | `None` | Deprecated, use `grpc.addr` instead. |
| `rpc_hostname` | String | `None` | Deprecated, use `grpc.hostname` instead. |
| `rpc_runtime_size` | Integer | `None` | Deprecated, use `grpc.runtime_size` instead. |
| `rpc_max_recv_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_recv_message_size` instead. |
| `rpc_max_send_message_size` | String | `None` | Deprecated, use `grpc.rpc_max_send_message_size` instead. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.addr` | String | `127.0.0.1:3001` | The address to bind the gRPC server. |
| `grpc.hostname` | String | `127.0.0.1` | The hostname to advertise to the metasrv. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `grpc.max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
| `grpc.tls.cert_path` | String | `None` | Certificate file path. |
| `grpc.tls.key_path` | String | `None` | Private key file path. |
| `grpc.tls.watch` | Bool | `false` | Watch for Certificate and key file change and auto reload.<br/>For now, gRPC tls config does not support auto reload. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |

View File

@@ -13,27 +13,61 @@ require_lease_before_startup = false
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false
## Enable telemetry to collect anonymous usage data.
enable_telemetry = true
## Parallelism of initializing regions.
init_regions_parallelism = 16
## The gRPC address of the datanode.
## Deprecated, use `grpc.addr` instead.
## +toml2docs:none-default
rpc_addr = "127.0.0.1:3001"
## The hostname of the datanode.
## Deprecated, use `grpc.hostname` instead.
## +toml2docs:none-default
rpc_hostname = "127.0.0.1"
## The number of gRPC server worker threads.
## Deprecated, use `grpc.runtime_size` instead.
## +toml2docs:none-default
rpc_runtime_size = 8
## The maximum receive message size for gRPC server.
## Deprecated, use `grpc.rpc_max_recv_message_size` instead.
## +toml2docs:none-default
rpc_max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
## Deprecated, use `grpc.rpc_max_send_message_size` instead.
## +toml2docs:none-default
rpc_max_send_message_size = "512MB"
## Enable telemetry to collect anonymous usage data.
enable_telemetry = true
## The gRPC server options.
[grpc]
## The address to bind the gRPC server.
addr = "127.0.0.1:3001"
## The hostname to advertise to the metasrv.
hostname = "127.0.0.1"
## The number of server worker threads.
runtime_size = 8
## The maximum receive message size for gRPC server.
max_recv_message_size = "512MB"
## The maximum send message size for gRPC server.
max_send_message_size = "512MB"
## gRPC server TLS options, see `mysql.tls` section.
[grpc.tls]
## TLS mode.
mode = "disable"
## Certificate file path.
## +toml2docs:none-default
cert_path = ""
## Private key file path.
## +toml2docs:none-default
key_path = ""
## Watch for Certificate and key file change and auto reload.
## For now, gRPC tls config does not support auto reload.
watch = false
## The runtime options.
[runtime]

View File

@@ -19,8 +19,8 @@ use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
use datanode::datanode::{Datanode, DatanodeBuilder};
@@ -155,6 +155,7 @@ impl StartCommand {
}
// The precedence order is: cli > config file > environment variables > default values.
#[allow(deprecated)]
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
@@ -176,11 +177,32 @@ impl StartCommand {
};
if let Some(addr) = &self.rpc_addr {
opts.rpc_addr.clone_from(addr);
opts.grpc.addr.clone_from(addr);
} else if let Some(addr) = &opts.rpc_addr {
warn!("Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead.");
opts.grpc.addr.clone_from(addr);
}
if self.rpc_hostname.is_some() {
opts.rpc_hostname.clone_from(&self.rpc_hostname);
if let Some(hostname) = &self.rpc_hostname {
opts.grpc.hostname.clone_from(hostname);
} else if let Some(hostname) = &opts.rpc_hostname {
warn!("Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead.");
opts.grpc.hostname.clone_from(hostname);
}
if let Some(runtime_size) = opts.rpc_runtime_size {
warn!("Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead.");
opts.grpc.runtime_size = runtime_size;
}
if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead.");
opts.grpc.max_recv_message_size = max_recv_message_size;
}
if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead.");
opts.grpc.max_send_message_size = max_send_message_size;
}
if let Some(node_id) = self.node_id {
@@ -302,6 +324,34 @@ mod tests {
use super::*;
use crate::options::GlobalOptions;
#[test]
fn test_deprecated_cli_options() {
common_telemetry::init_default_ut_logging();
let mut file = create_named_temp_file();
let toml_str = r#"
mode = "distributed"
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:4001"
rpc_hostname = "192.168.0.1"
[grpc]
addr = "127.0.0.1:3001"
hostname = "127.0.0.1"
runtime_size = 8
"#;
write!(file, "{}", toml_str).unwrap();
let cmd = StartCommand {
config_file: Some(file.path().to_str().unwrap().to_string()),
..Default::default()
};
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:4001".to_string(), options.grpc.addr);
assert_eq!("192.168.0.1".to_string(), options.grpc.hostname);
}
#[test]
fn test_read_from_config_file() {
let mut file = create_named_temp_file();
@@ -309,9 +359,11 @@ mod tests {
mode = "distributed"
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
rpc_runtime_size = 8
[grpc]
addr = "127.0.0.1:3001"
hostname = "127.0.0.1"
runtime_size = 8
[heartbeat]
interval = "300ms"
@@ -358,7 +410,7 @@ mod tests {
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!("127.0.0.1:3001".to_string(), options.grpc.addr);
assert_eq!(Some(42), options.node_id);
let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
@@ -475,8 +527,8 @@ mod tests {
enable_memory_catalog = false
node_id = 42
rpc_addr = "127.0.0.1:3001"
rpc_hostname = "127.0.0.1"
rpc_runtime_size = 8
rpc_hostname = "10.103.174.219"
[meta_client]
timeout = "3s"
@@ -572,6 +624,7 @@ mod tests {
opts.http.addr,
DatanodeOptions::default().component.http.addr
);
assert_eq!(opts.grpc.hostname, "10.103.174.219");
},
);
}

View File

@@ -370,7 +370,7 @@ mod tests {
use common_base::readable_size::ReadableSize;
use common_config::ENV_VAR_SEP;
use common_test_util::temp_dir::create_named_temp_file;
use frontend::service_config::GrpcOptions;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use super::*;

View File

@@ -49,12 +49,13 @@ use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::server::Services;
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
};
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
@@ -203,7 +204,7 @@ impl StandaloneOptions {
wal: cloned_opts.wal.into(),
storage: cloned_opts.storage,
region_engine: cloned_opts.region_engine,
rpc_addr: cloned_opts.grpc.addr,
grpc: cloned_opts.grpc,
..Default::default()
}
}
@@ -350,7 +351,7 @@ impl StartCommand {
if let Some(addr) = &self.rpc_addr {
// frontend grpc addr conflict with datanode default grpc addr
let datanode_grpc_addr = DatanodeOptions::default().rpc_addr;
let datanode_grpc_addr = DatanodeOptions::default().grpc.addr;
if addr.eq(&datanode_grpc_addr) {
return IllegalConfigSnafu {
msg: format!(

View File

@@ -18,6 +18,9 @@ use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_base::readable_size::ReadableSize;
use common_config::Configurable;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use common_runtime::global::RuntimeOptions;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
@@ -30,7 +33,9 @@ use meta_srv::metasrv::MetasrvOptions;
use meta_srv::selector::SelectorType;
use mito2::config::MitoConfig;
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
#[allow(deprecated)]
#[test]
fn test_load_datanode_example_config() {
let example_config = common_test_util::find_workspace_path("config/datanode.example.toml");
@@ -46,7 +51,6 @@ fn test_load_datanode_example_config() {
},
component: DatanodeOptions {
node_id: Some(42),
rpc_hostname: Some("127.0.0.1".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
@@ -90,6 +94,12 @@ fn test_load_datanode_example_config() {
remote_write: Some(Default::default()),
..Default::default()
},
grpc: GrpcOptions::default().with_addr("127.0.0.1:3001"),
rpc_addr: Some("127.0.0.1:3001".to_string()),
rpc_hostname: Some("127.0.0.1".to_string()),
rpc_runtime_size: Some(8),
rpc_max_recv_message_size: Some(DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE),
rpc_max_send_message_size: Some(DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE),
..Default::default()
},
};

View File

@@ -17,9 +17,6 @@
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
pub use common_procedure::options::ProcedureConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
@@ -28,6 +25,7 @@ use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
@@ -265,13 +263,7 @@ pub struct DatanodeOptions {
pub require_lease_before_startup: bool,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
pub rpc_runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub rpc_max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub rpc_max_send_message_size: ReadableSize,
pub grpc: GrpcOptions,
pub heartbeat: HeartbeatOptions,
pub http: HttpOptions,
pub meta_client: Option<MetaClientOptions>,
@@ -283,9 +275,22 @@ pub struct DatanodeOptions {
pub enable_telemetry: bool,
pub export_metrics: ExportMetricsOption,
pub tracing: TracingOptions,
/// Deprecated options, please use the new options instead.
#[deprecated(note = "Please use `grpc.addr` instead.")]
pub rpc_addr: Option<String>,
#[deprecated(note = "Please use `grpc.hostname` instead.")]
pub rpc_hostname: Option<String>,
#[deprecated(note = "Please use `grpc.runtime_size` instead.")]
pub rpc_runtime_size: Option<usize>,
#[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
pub rpc_max_recv_message_size: Option<ReadableSize>,
#[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
pub rpc_max_send_message_size: Option<ReadableSize>,
}
impl Default for DatanodeOptions {
#[allow(deprecated)]
fn default() -> Self {
Self {
mode: Mode::Standalone,
@@ -293,11 +298,7 @@ impl Default for DatanodeOptions {
require_lease_before_startup: false,
init_regions_in_background: false,
init_regions_parallelism: 16,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
rpc_runtime_size: 8,
rpc_max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
rpc_max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
grpc: GrpcOptions::default().with_addr("127.0.0.1:3001"),
http: HttpOptions::default(),
meta_client: None,
wal: DatanodeWalConfig::default(),
@@ -311,6 +312,13 @@ impl Default for DatanodeOptions {
enable_telemetry: true,
export_metrics: ExportMetricsOption::default(),
tracing: TracingOptions::default(),
// Deprecated options
rpc_addr: None,
rpc_hostname: None,
rpc_runtime_size: None,
rpc_max_recv_message_size: None,
rpc_max_send_message_size: None,
}
}
}

View File

@@ -315,7 +315,7 @@ impl DatanodeBuilder {
let runtime = Arc::new(
Runtime::builder()
.worker_threads(opts.rpc_runtime_size)
.worker_threads(opts.grpc.runtime_size)
.thread_name("io-handlers")
.build()
.context(RuntimeResourceSnafu)?,

View File

@@ -84,8 +84,8 @@ impl HeartbeatTask {
node_id: opts.node_id.unwrap_or(0),
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr: opts.rpc_addr.clone(),
server_hostname: opts.rpc_hostname.clone(),
server_addr: opts.grpc.addr.clone(),
server_hostname: Some(opts.grpc.hostname.clone()),
running: Arc::new(AtomicBool::new(false)),
meta_client: Arc::new(meta_client),
region_server,

View File

@@ -21,7 +21,6 @@ use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
use servers::server::{ServerHandler, ServerHandlers};
use servers::tls::TlsOption;
use snafu::ResultExt;
use crate::config::DatanodeOptions;
@@ -67,8 +66,8 @@ impl<'a> DatanodeServiceBuilder<'a> {
let handlers = ServerHandlers::default();
if let Some(grpc_server) = self.grpc_server.take() {
let addr: SocketAddr = self.opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &self.opts.rpc_addr,
let addr: SocketAddr = self.opts.grpc.addr.parse().context(ParseAddrSnafu {
addr: &self.opts.grpc.addr,
})?;
let handler: ServerHandler = (Box::new(grpc_server), addr);
handlers.insert(handler).await;
@@ -94,9 +93,9 @@ impl<'a> DatanodeServiceBuilder<'a> {
region_server: &RegionServer,
) -> GrpcServerBuilder {
let config = GrpcServerConfig {
max_recv_message_size: opts.rpc_max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.rpc_max_send_message_size.as_bytes() as usize,
tls: TlsOption::default(),
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
};
GrpcServerBuilder::new(config, region_server.runtime())

View File

@@ -17,13 +17,14 @@ use common_telemetry::logging::{LoggingOptions, TracingOptions};
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
use crate::service_config::{
DatanodeOptions, GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions,
PostgresOptions, PromStoreOptions,
DatanodeOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
PromStoreOptions,
};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -21,7 +21,7 @@ use common_config::{Configurable, Mode};
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -35,7 +35,6 @@ use snafu::ResultExt;
use crate::error::{self, Result, StartServerSnafu, TomlFormatSnafu};
use crate::frontend::FrontendOptions;
use crate::instance::FrontendInstance;
use crate::service_config::GrpcOptions;
pub struct Services<T, U>
where

View File

@@ -13,7 +13,6 @@
// limitations under the License.
pub mod datanode;
pub mod grpc;
pub mod influxdb;
pub mod mysql;
pub mod opentsdb;
@@ -21,7 +20,6 @@ pub mod otlp;
pub mod postgres;
pub mod prom_store;
pub use grpc::GrpcOptions;
pub use influxdb::InfluxdbOptions;
pub use mysql::MysqlOptions;
pub use opentsdb::OpentsdbOptions;

View File

@@ -1,44 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::readable_size::ReadableSize;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use serde::{Deserialize, Serialize};
use servers::tls::TlsOption;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct GrpcOptions {
pub addr: String,
pub runtime_size: usize,
// Max gRPC receiving(decoding) message size
pub max_recv_message_size: ReadableSize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
}
impl Default for GrpcOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4001".to_string(),
runtime_size: 8,
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
tls: TlsOption::default(),
}
}
}

View File

@@ -27,11 +27,13 @@ use std::net::SocketAddr;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::{HealthCheckRequest, HealthCheckResponse};
use async_trait::async_trait;
use common_base::readable_size::ReadableSize;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
};
use common_telemetry::{error, info, warn};
use futures::FutureExt;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::net::TcpListener;
use tokio::sync::oneshot::{self, Receiver, Sender};
@@ -50,6 +52,39 @@ use crate::tls::TlsOption;
type TonicResult<T> = std::result::Result<T, Status>;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct GrpcOptions {
pub addr: String,
pub hostname: String,
/// Max gRPC receiving(decoding) message size
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
pub runtime_size: usize,
#[serde(default = "Default::default")]
pub tls: TlsOption,
}
impl Default for GrpcOptions {
fn default() -> Self {
Self {
addr: "127.0.0.1:4001".to_string(),
hostname: "127.0.0.1".to_string(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
runtime_size: 8,
tls: TlsOption::default(),
}
}
}
impl GrpcOptions {
pub fn with_addr(mut self, addr: &str) -> Self {
self.addr = addr.to_string();
self
}
}
pub struct GrpcServer {
// states
shutdown_tx: Mutex<Option<Sender<()>>>,

View File

@@ -42,7 +42,7 @@ use object_store::test_util::TempFolder;
use object_store::ObjectStore;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::grpc::{GrpcOptions, GrpcServer, GrpcServerConfig};
use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
@@ -347,6 +347,7 @@ pub(crate) fn create_datanode_opts(
providers,
store: default_store,
},
grpc: GrpcOptions::default().with_addr(PEER_PLACEHOLDER_ADDR),
mode,
wal: wal_config,
..Default::default()

View File

@@ -740,9 +740,10 @@ is_strict_mode = false
[grpc]
addr = "127.0.0.1:4001"
runtime_size = 8
hostname = "127.0.0.1"
max_recv_message_size = "512MiB"
max_send_message_size = "512MiB"
runtime_size = 8
[grpc.tls]
mode = "disable"