From 9ab36e9a6f06fcebb0b9dbdd2e6600a22131ebc6 Mon Sep 17 00:00:00 2001 From: liyang Date: Thu, 26 Jun 2025 09:48:53 +0800 Subject: [PATCH] test: add a test load configuration example for flownode (#6397) * test: add a test load configuration example for flownode Signed-off-by: liyang * format rust Signed-off-by: liyang * fix cargo clippy Signed-off-by: liyang * refine FlownodeOptions visibility Signed-off-by: liyang * format rust Signed-off-by: liyang --------- Signed-off-by: liyang --- src/cmd/tests/load_config_test.rs | 49 +++++++++++++++++++++++++++++++ src/flow/src/adapter.rs | 12 +++++++- src/flow/src/lib.rs | 4 ++- 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index e3211aba46..be74da35f9 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -23,6 +23,7 @@ use common_wal::config::raft_engine::RaftEngineConfig; use common_wal::config::DatanodeWalConfig; use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig}; use file_engine::config::EngineConfig as FileEngineConfig; +use flow::FlownodeOptions; use frontend::frontend::FrontendOptions; use meta_client::MetaClientOptions; use meta_srv::metasrv::MetasrvOptions; @@ -195,6 +196,54 @@ fn test_load_metasrv_example_config() { similar_asserts::assert_eq!(options, expected); } +#[test] +fn test_load_flownode_example_config() { + let example_config = common_test_util::find_workspace_path("config/flownode.example.toml"); + let options = + GreptimeOptions::::load_layered_options(example_config.to_str(), "") + .unwrap(); + let expected = GreptimeOptions:: { + component: FlownodeOptions { + node_id: Some(14), + flow: Default::default(), + grpc: GrpcOptions { + bind_addr: "127.0.0.1:6800".to_string(), + server_addr: "127.0.0.1:6800".to_string(), + runtime_size: 2, + ..Default::default() + }, + logging: LoggingOptions { + dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR), + level: Some("info".to_string()), + otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()), + tracing_sample_ratio: Some(Default::default()), + ..Default::default() + }, + tracing: Default::default(), + heartbeat: Default::default(), + query: Default::default(), + meta_client: Some(MetaClientOptions { + metasrv_addrs: vec!["127.0.0.1:3002".to_string()], + timeout: Duration::from_secs(3), + heartbeat_timeout: Duration::from_millis(500), + ddl_timeout: Duration::from_secs(10), + connect_timeout: Duration::from_secs(1), + tcp_nodelay: true, + metadata_cache_max_capacity: 100000, + metadata_cache_ttl: Duration::from_secs(600), + metadata_cache_tti: Duration::from_secs(300), + }), + http: HttpOptions { + addr: "127.0.0.1:4000".to_string(), + ..Default::default() + }, + user_provider: None, + }, + ..Default::default() + }; + similar_asserts::assert_eq!(options, expected); +} + #[test] fn test_load_standalone_example_config() { let example_config = common_test_util::find_workspace_path("config/standalone.example.toml"); diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 6a1697a389..59974ab48f 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -95,7 +95,7 @@ impl Default for FlowConfig { } /// Options for flow node -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct FlownodeOptions { pub node_id: Option, @@ -251,6 +251,10 @@ impl DiffRequest { Self::Delete(v) => v.len(), } } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } } pub fn batches_to_rows_req(batches: Vec) -> Result, Error> { @@ -929,6 +933,12 @@ pub struct FlowTickManager { start_timestamp: repr::Timestamp, } +impl Default for FlowTickManager { + fn default() -> Self { + Self::new() + } +} + impl FlowTickManager { pub fn new() -> Self { FlowTickManager { diff --git a/src/flow/src/lib.rs b/src/flow/src/lib.rs index baf06a9cab..055042a6b5 100644 --- a/src/flow/src/lib.rs +++ b/src/flow/src/lib.rs @@ -43,7 +43,7 @@ mod utils; #[cfg(test)] mod test_utils; -pub use adapter::{FlowConfig, FlowStreamingEngineRef, FlownodeOptions, StreamingEngine}; +pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine}; pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError}; pub use engine::FlowAuthHeader; pub(crate) use engine::{CreateFlowArgs, FlowId, TableName}; @@ -52,3 +52,5 @@ pub use server::{ get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServer, FlownodeServiceBuilder, FrontendInvoker, }; + +pub use crate::adapter::FlownodeOptions;