feat: set global runtime size by config file (#4063)

* set global runtime size

* fix: resolve PR comments

* fix: log the whole option

* fix ci

* debug ci

* debug ci

---------

Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
LFC
2024-06-04 18:03:33 +08:00
committed by GitHub
parent 0a07130931
commit c0aed1d267
28 changed files with 601 additions and 162 deletions

View File

@@ -57,6 +57,7 @@ runs:
greptime/greptimedb-cluster \
--create-namespace \
-n my-greptimedb \
--values ./.github/actions/setup-greptimedb-cluster/values.yaml \
--wait \
--wait-for-jobs
- name: Wait for GreptimeDB

View File

@@ -0,0 +1,18 @@
meta:
config: |-
[runtime]
read_rt_size = 8
write_rt_size = 8
bg_rt_size = 8
datanode:
config: |-
[runtime]
read_rt_size = 8
write_rt_size = 8
bg_rt_size = 8
frontend:
config: |-
[runtime]
read_rt_size = 8
write_rt_size = 8
bg_rt_size = 8

5
Cargo.lock generated
View File

@@ -2080,9 +2080,11 @@ dependencies = [
"common-macro",
"common-telemetry",
"lazy_static",
"num_cpus",
"once_cell",
"paste",
"prometheus",
"serde",
"snafu 0.8.3",
"tokio",
"tokio-metrics",
@@ -11049,8 +11051,7 @@ dependencies = [
[[package]]
name = "tokio-metrics-collector"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d767da47381602cc481653456823b3ebb600e83d5dd4e0293da9b5566c6c00f0"
source = "git+https://github.com/MichaelScofield/tokio-metrics-collector.git?rev=89d692d5753d28564a7aac73c6ac5aba22243ba0#89d692d5753d28564a7aac73c6ac5aba22243ba0"
dependencies = [
"lazy_static",
"parking_lot 0.12.3",

View File

@@ -13,6 +13,10 @@
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `30s` | HTTP request timeout. |
@@ -154,6 +158,10 @@
| --- | -----| ------- | ----------- |
| `mode` | String | `standalone` | The running mode of the datanode. It can be `standalone` or `distributed`. |
| `default_timezone` | String | `None` | The default timezone of the server. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `18s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |
@@ -240,6 +248,10 @@
| `use_memory_store` | Bool | `false` | Store data in memory. |
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. |
| `store_key_prefix` | String | `""` | If it's not empty, the metasrv will store all data with this key prefix. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `12` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
@@ -300,6 +312,10 @@
| `rpc_max_recv_message_size` | String | `512MB` | The maximum receive message size for gRPC server. |
| `rpc_max_send_message_size` | String | `512MB` | The maximum send message size for gRPC server. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.read_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.write_rt_size` | Integer | `8` | The number of threads to execute the runtime for global write operations. |
| `runtime.bg_rt_size` | Integer | `8` | The number of threads to execute the runtime for global background operations. |
| `heartbeat` | -- | -- | The heartbeat options. |
| `heartbeat.interval` | String | `3s` | Interval for sending heartbeat messages to the metasrv. |
| `heartbeat.retry_interval` | String | `3s` | Interval for retrying to send heartbeat messages to the metasrv. |

View File

@@ -32,6 +32,15 @@ rpc_max_send_message_size = "512MB"
## Enable telemetry to collect anonymous usage data.
enable_telemetry = true
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
## The heartbeat options.
[heartbeat]
## Interval for sending heartbeat messages to the metasrv.

View File

@@ -5,6 +5,15 @@ mode = "standalone"
## +toml2docs:none-default
default_timezone = "UTC"
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
## The heartbeat options.
[heartbeat]
## Interval for sending heartbeat messages to the metasrv.

View File

@@ -25,6 +25,15 @@ enable_telemetry = true
## If it's not empty, the metasrv will store all data with this key prefix.
store_key_prefix = ""
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
## Procedure storage options.
[procedure]

View File

@@ -8,6 +8,15 @@ enable_telemetry = true
## +toml2docs:none-default
default_timezone = "UTC"
## The runtime options.
[runtime]
## The number of threads to execute the runtime for global read operations.
read_rt_size = 8
## The number of threads to execute the runtime for global write operations.
write_rt_size = 8
## The number of threads to execute the runtime for global background operations.
bg_rt_size = 8
## The HTTP server options.
[http]
## The address to bind the HTTP server.

View File

@@ -23,7 +23,6 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use common_wal::config::DatanodeWalConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::service::DatanodeServiceBuilder;
use meta_client::MetaClientOptions;
@@ -34,11 +33,13 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
};
use crate::options::GlobalOptions;
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-datanode";
type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
pub struct Instance {
datanode: Datanode,
@@ -97,7 +98,9 @@ impl Command {
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
self.subcmd.load_options(global_options)
match &self.subcmd {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
}
}
@@ -112,12 +115,6 @@ impl SubCommand {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
match self {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
}
}
#[derive(Debug, Parser, Default)]
@@ -146,22 +143,25 @@ struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
self.merge_with_cli_options(
global_options,
DatanodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
let mut opts = DatanodeOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
Ok(opts)
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: DatanodeOptions,
) -> Result<DatanodeOptions> {
opts: &mut DatanodeOptions,
) -> Result<()> {
let opts = &mut opts.component;
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -231,25 +231,28 @@ impl StartCommand {
// Disable dashboard in datanode.
opts.http.disable_dashboard = true;
Ok(opts)
Ok(())
}
async fn build(&self, mut opts: DatanodeOptions) -> Result<Instance> {
async fn build(&self, opts: DatanodeOptions) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
opts.node_id.map(|x| x.to_string()),
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.map(|x| x.to_string()),
);
log_versions(version!(), short_version!());
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let mut opts = opts.component;
let plugins = plugins::setup_datanode_plugins(&mut opts)
.await
.context(StartDatanodeSnafu)?;
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);
let node_id = opts
.node_id
.context(MissingConfigSnafu { msg: "'node_id'" })?;
@@ -353,7 +356,7 @@ mod tests {
..Default::default()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
@@ -414,7 +417,8 @@ mod tests {
fn test_try_from_cmd() {
let opt = StartCommand::default()
.load_options(&GlobalOptions::default())
.unwrap();
.unwrap()
.component;
assert_eq!(Mode::Standalone, opt.mode);
let opt = (StartCommand {
@@ -423,7 +427,8 @@ mod tests {
..Default::default()
})
.load_options(&GlobalOptions::default())
.unwrap();
.unwrap()
.component;
assert_eq!(Mode::Distributed, opt.mode);
assert!((StartCommand {
@@ -454,7 +459,8 @@ mod tests {
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
.unwrap()
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
@@ -536,7 +542,7 @@ mod tests {
..Default::default()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
@@ -562,7 +568,10 @@ mod tests {
assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
// Should be default value.
assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr);
assert_eq!(
opts.http.addr,
DatanodeOptions::default().component.http.addr
);
},
);
}

View File

@@ -29,7 +29,6 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
@@ -44,9 +43,11 @@ use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
self, InitTimezoneSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, StartFrontendSnafu,
};
use crate::options::GlobalOptions;
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
pub struct Instance {
frontend: FeInstance,
@@ -164,22 +165,25 @@ pub struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
self.merge_with_cli_options(
global_options,
FrontendOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
let mut opts = FrontendOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
Ok(opts)
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: FrontendOptions,
) -> Result<FrontendOptions> {
opts: &mut FrontendOptions,
) -> Result<()> {
let opts = &mut opts.component;
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -242,26 +246,29 @@ impl StartCommand {
opts.user_provider.clone_from(&self.user_provider);
Ok(opts)
Ok(())
}
async fn build(&self, mut opts: FrontendOptions) -> Result<Instance> {
async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.logging,
&opts.tracing,
opts.node_id.clone(),
&opts.component.logging,
&opts.component.tracing,
opts.component.node_id.clone(),
);
log_versions(version!(), short_version!());
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);
let mut opts = opts.component;
#[allow(clippy::unnecessary_mut_passed)]
let plugins = plugins::setup_frontend_plugins(&mut opts)
.await
.context(StartFrontendSnafu)?;
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);
set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
@@ -380,14 +387,14 @@ mod tests {
..Default::default()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
let opts = command.load_options(&Default::default()).unwrap().component;
assert_eq!(opts.http.addr, "127.0.0.1:1234");
assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
let default_opts = FrontendOptions::default();
let default_opts = FrontendOptions::default().component;
assert_eq!(opts.grpc.addr, default_opts.grpc.addr);
assert!(opts.mysql.enable);
@@ -428,7 +435,8 @@ mod tests {
..Default::default()
};
let fe_opts = command.load_options(&GlobalOptions::default()).unwrap();
let fe_opts = command.load_options(&Default::default()).unwrap().component;
assert_eq!(Mode::Distributed, fe_opts.mode);
assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
assert_eq!(Duration::from_secs(30), fe_opts.http.timeout);
@@ -442,7 +450,7 @@ mod tests {
#[tokio::test]
async fn test_try_from_start_command_to_anymap() {
let mut fe_opts = FrontendOptions {
let mut fe_opts = frontend::frontend::FrontendOptions {
http: HttpOptions {
disable_dashboard: false,
..Default::default()
@@ -479,7 +487,8 @@ mod tests {
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
.unwrap()
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
@@ -557,7 +566,7 @@ mod tests {
..Default::default()
};
let fe_opts = command.load_options(&GlobalOptions::default()).unwrap();
let fe_opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
assert_eq!(fe_opts.mysql.runtime_size, 11);

View File

@@ -21,14 +21,15 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
use crate::options::GlobalOptions;
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
pub const APP_NAME: &str = "greptime-metasrv";
pub struct Instance {
@@ -139,22 +140,25 @@ struct StartCommand {
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
self.merge_with_cli_options(
global_options,
MetasrvOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
let mut opts = MetasrvOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts)?;
Ok(opts)
}
// The precedence order is: cli > config file > environment variables > default values.
fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: MetasrvOptions,
) -> Result<MetasrvOptions> {
opts: &mut MetasrvOptions,
) -> Result<()> {
let opts = &mut opts.component;
if let Some(dir) = &global_options.log_dir {
opts.logging.dir.clone_from(dir);
}
@@ -217,21 +221,28 @@ impl StartCommand {
// Disable dashboard in metasrv.
opts.http.disable_dashboard = true;
Ok(opts)
Ok(())
}
async fn build(&self, mut opts: MetasrvOptions) -> Result<Instance> {
let guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
log_versions(version!(), short_version!());
async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let plugins = plugins::setup_metasrv_plugins(&mut opts)
.await
.context(StartMetaServerSnafu)?;
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.component.logging,
&opts.component.tracing,
None,
);
log_versions(version!(), short_version!());
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);
let mut opts = opts.component;
let plugins = plugins::setup_metasrv_plugins(&mut opts)
.await
.context(StartMetaServerSnafu)?;
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
.context(error::BuildMetaServerSnafu)?;
@@ -266,7 +277,7 @@ mod tests {
..Default::default()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
assert_eq!(SelectorType::LoadBased, options.selector);
@@ -299,7 +310,7 @@ mod tests {
..Default::default()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
let options = cmd.load_options(&Default::default()).unwrap().component;
assert_eq!("127.0.0.1:3002".to_string(), options.bind_addr);
assert_eq!("127.0.0.1:3002".to_string(), options.server_addr);
assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
@@ -349,7 +360,8 @@ mod tests {
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
.unwrap()
.component;
let logging_opt = options.logging;
assert_eq!("/tmp/greptimedb/test/logs", logging_opt.dir);
@@ -406,7 +418,7 @@ mod tests {
..Default::default()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
assert_eq!(opts.bind_addr, "127.0.0.1:14002");

View File

@@ -13,6 +13,9 @@
// limitations under the License.
use clap::Parser;
use common_config::Configurable;
use common_runtime::global::RuntimeOptions;
use serde::{Deserialize, Serialize};
#[derive(Parser, Default, Debug, Clone)]
pub struct GlobalOptions {
@@ -29,3 +32,22 @@ pub struct GlobalOptions {
#[arg(global = true)]
pub tokio_console_addr: Option<String>,
}
// TODO(LFC): Move logging and tracing options into global options, like the runtime options.
/// All the options of GreptimeDB.
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct GreptimeOptions<T> {
/// The runtime options.
pub runtime: RuntimeOptions,
/// The options of each component (like Datanode or Standalone) of GreptimeDB.
#[serde(flatten)]
pub component: T,
}
impl<T: Configurable> Configurable for GreptimeOptions<T> {
fn env_list_keys() -> Option<&'static [&'static str]> {
T::env_list_keys()
}
}

View File

@@ -67,7 +67,7 @@ use crate::error::{
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::GlobalOptions;
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{log_versions, App};
pub const APP_NAME: &str = "greptime-standalone";
@@ -79,11 +79,14 @@ pub struct Command {
}
impl Command {
pub async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
self.subcmd.build(opts).await
}
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<StandaloneOptions> {
pub fn load_options(
&self,
global_options: &GlobalOptions,
) -> Result<GreptimeOptions<StandaloneOptions>> {
self.subcmd.load_options(global_options)
}
}
@@ -94,20 +97,23 @@ enum SubCommand {
}
impl SubCommand {
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
match self {
SubCommand::Start(cmd) => cmd.build(opts).await,
}
}
fn load_options(&self, global_options: &GlobalOptions) -> Result<StandaloneOptions> {
fn load_options(
&self,
global_options: &GlobalOptions,
) -> Result<GreptimeOptions<StandaloneOptions>> {
match self {
SubCommand::Start(cmd) => cmd.load_options(global_options),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct StandaloneOptions {
pub mode: Mode,
@@ -161,7 +167,7 @@ impl Default for StandaloneOptions {
}
}
impl Configurable<'_> for StandaloneOptions {
impl Configurable for StandaloneOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}
@@ -291,23 +297,27 @@ pub struct StartCommand {
}
impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<StandaloneOptions> {
self.merge_with_cli_options(
global_options,
StandaloneOptions::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?,
fn load_options(
&self,
global_options: &GlobalOptions,
) -> Result<GreptimeOptions<StandaloneOptions>> {
let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
self.config_file.as_deref(),
self.env_prefix.as_ref(),
)
.context(LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts.component)?;
Ok(opts)
}
// The precedence order is: cli > config file > environment variables > default values.
pub fn merge_with_cli_options(
&self,
global_options: &GlobalOptions,
mut opts: StandaloneOptions,
) -> Result<StandaloneOptions> {
opts: &mut StandaloneOptions,
) -> Result<()> {
// Should always be standalone mode.
opts.mode = Mode::Standalone;
@@ -369,20 +379,27 @@ impl StartCommand {
opts.user_provider.clone_from(&self.user_provider);
Ok(opts)
Ok(())
}
#[allow(unreachable_code)]
#[allow(unused_variables)]
#[allow(clippy::diverging_sub_expression)]
async fn build(&self, opts: StandaloneOptions) -> Result<Instance> {
let guard =
common_telemetry::init_global_logging(APP_NAME, &opts.logging, &opts.tracing, None);
async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
common_runtime::init_global_runtimes(&opts.runtime);
let guard = common_telemetry::init_global_logging(
APP_NAME,
&opts.component.logging,
&opts.component.tracing,
None,
);
log_versions(version!(), short_version!());
info!("Standalone start command: {:#?}", self);
info!("Building standalone instance with {opts:#?}");
info!("Standalone options: {opts:#?}");
let opts = opts.component;
let mut fe_opts = opts.frontend_options();
#[allow(clippy::unnecessary_mut_passed)]
let fe_plugins = plugins::setup_frontend_plugins(&mut fe_opts) // mut ref is MUST, DO NOT change it
@@ -664,7 +681,10 @@ mod tests {
..Default::default()
};
let options = cmd.load_options(&GlobalOptions::default()).unwrap();
let options = cmd
.load_options(&GlobalOptions::default())
.unwrap()
.component;
let fe_opts = options.frontend_options();
let dn_opts = options.datanode_options();
let logging_opts = options.logging;
@@ -725,7 +745,8 @@ mod tests {
#[cfg(feature = "tokio-console")]
tokio_console_addr: None,
})
.unwrap();
.unwrap()
.component;
assert_eq!("/tmp/greptimedb/test/logs", opts.logging.dir);
assert_eq!("debug", opts.logging.level.unwrap());
@@ -787,7 +808,7 @@ mod tests {
..Default::default()
};
let opts = command.load_options(&GlobalOptions::default()).unwrap();
let opts = command.load_options(&Default::default()).unwrap().component;
// Should be read from env, env > default values.
assert_eq!(opts.logging.dir, "/other/log/dir");

View File

@@ -0,0 +1,218 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use cmd::options::GreptimeOptions;
use cmd::standalone::StandaloneOptions;
use common_config::Configurable;
use common_runtime::global::RuntimeOptions;
use common_telemetry::logging::LoggingOptions;
use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::{DatanodeWalConfig, StandaloneWalConfig};
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use frontend::frontend::FrontendOptions;
use frontend::service_config::datanode::DatanodeClientOptions;
use meta_client::MetaClientOptions;
use meta_srv::metasrv::MetasrvOptions;
use meta_srv::selector::SelectorType;
use mito2::config::MitoConfig;
use servers::export_metrics::ExportMetricsOption;
#[test]
fn test_load_datanode_example_config() {
let example_config = common_test_util::find_workspace_path("config/datanode.example.toml");
let options =
GreptimeOptions::<DatanodeOptions>::load_layered_options(example_config.to_str(), "")
.unwrap();
let expected = GreptimeOptions::<DatanodeOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
},
component: DatanodeOptions {
node_id: Some(42),
rpc_hostname: Some("127.0.0.1".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
metadata_cache_max_capacity: 100000,
metadata_cache_ttl: Duration::from_secs(600),
metadata_cache_tti: Duration::from_secs(300),
}),
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
..Default::default()
}),
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
},
region_engine: vec![RegionEngineConfig::Mito(MitoConfig {
num_workers: 8,
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
..Default::default()
})],
logging: LoggingOptions {
level: Some("info".to_string()),
otlp_endpoint: Some("".to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
export_metrics: ExportMetricsOption {
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
..Default::default()
},
};
assert_eq!(options, expected);
}
#[test]
fn test_load_frontend_example_config() {
let example_config = common_test_util::find_workspace_path("config/frontend.example.toml");
let options =
GreptimeOptions::<FrontendOptions>::load_layered_options(example_config.to_str(), "")
.unwrap();
let expected = GreptimeOptions::<FrontendOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
},
component: FrontendOptions {
default_timezone: Some("UTC".to_string()),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
metadata_cache_max_capacity: 100000,
metadata_cache_ttl: Duration::from_secs(600),
metadata_cache_tti: Duration::from_secs(300),
}),
logging: LoggingOptions {
level: Some("info".to_string()),
otlp_endpoint: Some("".to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
datanode: frontend::service_config::DatanodeOptions {
client: DatanodeClientOptions {
connect_timeout: Duration::from_secs(10),
tcp_nodelay: true,
},
},
export_metrics: ExportMetricsOption {
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
..Default::default()
},
};
assert_eq!(options, expected);
}
#[test]
fn test_load_metasrv_example_config() {
let example_config = common_test_util::find_workspace_path("config/metasrv.example.toml");
let options =
GreptimeOptions::<MetasrvOptions>::load_layered_options(example_config.to_str(), "")
.unwrap();
let expected = GreptimeOptions::<MetasrvOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
},
component: MetasrvOptions {
selector: SelectorType::LeaseBased,
data_home: "/tmp/metasrv/".to_string(),
logging: LoggingOptions {
dir: "/tmp/greptimedb/logs".to_string(),
level: Some("info".to_string()),
otlp_endpoint: Some("".to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
export_metrics: ExportMetricsOption {
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
..Default::default()
},
};
assert_eq!(options, expected);
}
#[test]
fn test_load_standalone_example_config() {
let example_config = common_test_util::find_workspace_path("config/standalone.example.toml");
let options =
GreptimeOptions::<StandaloneOptions>::load_layered_options(example_config.to_str(), "")
.unwrap();
let expected = GreptimeOptions::<StandaloneOptions> {
runtime: RuntimeOptions {
read_rt_size: 8,
write_rt_size: 8,
bg_rt_size: 8,
},
component: StandaloneOptions {
default_timezone: Some("UTC".to_string()),
wal: StandaloneWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
..Default::default()
}),
region_engine: vec![RegionEngineConfig::Mito(MitoConfig {
num_workers: 8,
auto_flush_interval: Duration::from_secs(3600),
scan_parallelism: 0,
..Default::default()
})],
storage: StorageConfig {
data_home: "/tmp/greptimedb/".to_string(),
..Default::default()
},
logging: LoggingOptions {
level: Some("info".to_string()),
otlp_endpoint: Some("".to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
export_metrics: ExportMetricsOption {
self_import: Some(Default::default()),
remote_write: Some(Default::default()),
..Default::default()
},
..Default::default()
},
};
assert_eq!(options, expected);
}

View File

@@ -13,7 +13,8 @@
// limitations under the License.
use config::{Environment, File, FileFormat};
use serde::{Deserialize, Serialize};
use serde::de::DeserializeOwned;
use serde::Serialize;
use snafu::ResultExt;
use crate::error::{LoadLayeredConfigSnafu, Result, SerdeJsonSnafu, TomlFormatSnafu};
@@ -25,7 +26,7 @@ pub const ENV_VAR_SEP: &str = "__";
pub const ENV_LIST_SEP: &str = ",";
/// Configuration trait defines the common interface for configuration that can be loaded from multiple sources and serialized to TOML.
pub trait Configurable<'de>: Serialize + Deserialize<'de> + Default + Sized {
pub trait Configurable: Serialize + DeserializeOwned + Default + Sized {
/// Load the configuration from multiple sources and merge them.
/// The precedence order is: config file > environment variables > default values.
/// `env_prefix` is the prefix of environment variables, e.g. "FRONTEND__xxx".
@@ -128,7 +129,7 @@ mod tests {
}
}
impl Configurable<'_> for TestDatanodeConfig {
impl Configurable for TestDatanodeConfig {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
}

View File

@@ -13,13 +13,15 @@ common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
lazy_static.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
paste.workspace = true
prometheus.workspace = true
serde.workspace = true
snafu.workspace = true
tokio.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = "0.2"
tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" }
tokio-util.workspace = true
[dev-dependencies]

View File

@@ -19,6 +19,7 @@ use std::sync::{Mutex, Once};
use common_telemetry::info;
use once_cell::sync::Lazy;
use paste::paste;
use serde::{Deserialize, Serialize};
use crate::{Builder, JoinHandle, Runtime};
@@ -26,6 +27,28 @@ const READ_WORKERS: usize = 8;
const WRITE_WORKERS: usize = 8;
const BG_WORKERS: usize = 8;
/// The options for the global runtimes.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RuntimeOptions {
/// The number of threads to execute the runtime for global read operations.
pub read_rt_size: usize,
/// The number of threads to execute the runtime for global write operations.
pub write_rt_size: usize,
/// The number of threads to execute the runtime for global background operations.
pub bg_rt_size: usize,
}
impl Default for RuntimeOptions {
fn default() -> Self {
let cpus = num_cpus::get();
Self {
read_rt_size: cpus,
write_rt_size: cpus,
bg_rt_size: cpus,
}
}
}
pub fn create_runtime(runtime_name: &str, thread_name: &str, worker_threads: usize) -> Runtime {
info!("Creating runtime with runtime_name: {runtime_name}, thread_name: {thread_name}, work_threads: {worker_threads}.");
Builder::default()
@@ -112,18 +135,26 @@ static CONFIG_RUNTIMES: Lazy<Mutex<ConfigRuntimes>> =
/// # Panics
/// Panics when the global runtimes are already initialized.
/// You should call this function before using any runtime functions.
pub fn init_global_runtimes(
read: Option<Runtime>,
write: Option<Runtime>,
background: Option<Runtime>,
) {
pub fn init_global_runtimes(options: &RuntimeOptions) {
static START: Once = Once::new();
START.call_once(move || {
let mut c = CONFIG_RUNTIMES.lock().unwrap();
assert!(!c.already_init, "Global runtimes already initialized");
c.read_runtime = read;
c.write_runtime = write;
c.bg_runtime = background;
c.read_runtime = Some(create_runtime(
"global-read",
"global-read-worker",
options.read_rt_size,
));
c.write_runtime = Some(create_runtime(
"global-write",
"global-write-worker",
options.write_rt_size,
));
c.bg_runtime = Some(create_runtime(
"global-bg",
"global-bg-worker",
options.bg_rt_size,
));
});
}

View File

@@ -13,7 +13,7 @@
// limitations under the License.
pub mod error;
mod global;
pub mod global;
mod metrics;
mod repeated_task;
pub mod runtime;

View File

@@ -8,7 +8,7 @@ license.workspace = true
workspace = true
[dependencies]
client.workspace = true
client = { workspace = true, features = ["testing"] }
common-query.workspace = true
common-recordbatch.workspace = true
once_cell.workspace = true

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use client::Database;
use common_query::OutputData;
use common_recordbatch::util;
@@ -29,3 +30,25 @@ pub async fn check_output_stream(output: OutputData, expected: &str) {
let pretty_print = recordbatches.pretty_print().unwrap();
assert_eq!(pretty_print, expected, "actual: \n{}", pretty_print);
}
pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
let output = db.sql(sql).await.unwrap();
let output = output.data;
match (&output, expected) {
(OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
assert_eq!(
*x, y,
r#"
expected: {y}
actual: {x}
"#
)
}
(OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
}
}

View File

@@ -15,7 +15,7 @@
//! Datanode configurations
use common_base::readable_size::ReadableSize;
use common_base::secrets::SecretString;
use common_base::secrets::{ExposeSecret, SecretString};
use common_config::Configurable;
use common_grpc::channel_manager::{
DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE, DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
@@ -38,7 +38,7 @@ pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::mb(256);
const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb";
/// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum ObjectStoreConfig {
File(FileConfig),
@@ -61,7 +61,7 @@ impl ObjectStoreConfig {
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct StorageConfig {
/// The working directory of database
@@ -85,7 +85,7 @@ impl Default for StorageConfig {
#[serde(default)]
pub struct FileConfig {}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
#[serde(default)]
pub struct ObjectStorageCacheConfig {
/// The local file cache directory
@@ -109,6 +109,18 @@ pub struct S3Config {
pub cache: ObjectStorageCacheConfig,
}
impl PartialEq for S3Config {
fn eq(&self, other: &Self) -> bool {
self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.region == other.region
&& self.cache == other.cache
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct OssConfig {
@@ -123,6 +135,17 @@ pub struct OssConfig {
pub cache: ObjectStorageCacheConfig,
}
impl PartialEq for OssConfig {
fn eq(&self, other: &Self) -> bool {
self.bucket == other.bucket
&& self.root == other.root
&& self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
&& self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AzblobConfig {
@@ -138,6 +161,18 @@ pub struct AzblobConfig {
pub cache: ObjectStorageCacheConfig,
}
impl PartialEq for AzblobConfig {
fn eq(&self, other: &Self) -> bool {
self.container == other.container
&& self.root == other.root
&& self.account_name.expose_secret() == other.account_name.expose_secret()
&& self.account_key.expose_secret() == other.account_key.expose_secret()
&& self.endpoint == other.endpoint
&& self.sas_token == other.sas_token
&& self.cache == other.cache
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct GcsConfig {
@@ -151,6 +186,17 @@ pub struct GcsConfig {
pub cache: ObjectStorageCacheConfig,
}
impl PartialEq for GcsConfig {
fn eq(&self, other: &Self) -> bool {
self.root == other.root
&& self.bucket == other.bucket
&& self.scope == other.scope
&& self.credential_path.expose_secret() == other.credential_path.expose_secret()
&& self.endpoint == other.endpoint
&& self.cache == other.cache
}
}
impl Default for S3Config {
fn default() -> Self {
Self {
@@ -211,7 +257,7 @@ impl Default for ObjectStoreConfig {
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(default)]
pub struct DatanodeOptions {
pub mode: Mode,
@@ -267,7 +313,7 @@ impl Default for DatanodeOptions {
}
}
impl Configurable<'_> for DatanodeOptions {
impl Configurable for DatanodeOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
}

View File

@@ -74,7 +74,7 @@ impl Default for FrontendOptions {
}
}
impl Configurable<'_> for FrontendOptions {
impl Configurable for FrontendOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
}

View File

@@ -188,7 +188,7 @@ impl Instance {
pub fn build_servers(
&mut self,
opts: impl Into<FrontendOptions> + for<'de> Configurable<'de>,
opts: impl Into<FrontendOptions> + Configurable,
servers: ServerHandlers,
) -> Result<()> {
let opts: FrontendOptions = opts.into();

View File

@@ -39,7 +39,7 @@ use crate::service_config::GrpcOptions;
pub struct Services<T, U>
where
T: Into<FrontendOptions> + for<'de> Configurable<'de> + Clone,
T: Into<FrontendOptions> + Configurable + Clone,
U: FrontendInstance,
{
opts: T,
@@ -51,7 +51,7 @@ where
impl<T, U> Services<T, U>
where
T: Into<FrontendOptions> + for<'de> Configurable<'de> + Clone,
T: Into<FrontendOptions> + Configurable + Clone,
U: FrontendInstance,
{
pub fn new(opts: T, instance: Arc<U>, plugins: Plugins) -> Self {

View File

@@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct DatanodeOptions {
client: DatanodeClientOptions,
pub client: DatanodeClientOptions,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -148,7 +148,7 @@ impl Default for MetasrvOptions {
}
}
impl Configurable<'_> for MetasrvOptions {
impl Configurable for MetasrvOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["wal.broker_endpoints"])
}

View File

@@ -21,16 +21,13 @@ use std::time::Duration;
use auth::UserProviderRef;
use axum::Router;
use catalog::kvbackend::KvBackendCatalogManager;
use client::Database;
use common_base::secrets::ExposeSecret;
use common_config::Configurable;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_query::OutputData;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::warn;
use common_test_util::ports;
use common_test_util::recordbatch::{check_output_stream, ExpectedOutput};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::config::DatanodeWalConfig;
use datanode::config::{
@@ -690,25 +687,3 @@ where
test(endpoints).await
}
pub async fn execute_and_check_output(db: &Database, sql: &str, expected: ExpectedOutput<'_>) {
let output = db.sql(sql).await.unwrap();
let output = output.data;
match (&output, expected) {
(OutputData::AffectedRows(x), ExpectedOutput::AffectedRows(y)) => {
assert_eq!(
*x, y,
r#"
expected: {y}
actual: {x}
"#
)
}
(OutputData::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (OutputData::Stream(_), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
}
}

View File

@@ -28,6 +28,7 @@ use common_grpc::channel_manager::ClientTlsOption;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_runtime::Runtime;
use common_test_util::find_workspace_path;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::GrpcServerConfig;
use servers::http::prometheus::{
@@ -732,10 +733,7 @@ async fn to_batch(output: Output) -> String {
}
pub async fn test_grpc_tls_config(store_type: StorageType) {
let comm_dir = std::path::PathBuf::from_iter([
std::env!("CARGO_RUSTC_CURRENT_DIR"),
"src/common/grpc/tests/tls",
]);
let comm_dir = find_workspace_path("/src/common/grpc/tests/tls");
let ca_path = comm_dir.join("ca.pem").to_str().unwrap().to_string();
let server_cert_path = comm_dir.join("server.pem").to_str().unwrap().to_string();
let server_key_path = comm_dir.join("server.key").to_str().unwrap().to_string();