test: add a test load configuration example for flownode (#6397)

* test: add a test load configuration example for flownode

Signed-off-by: liyang <daviderli614@gmail.com>

* format rust

Signed-off-by: liyang <daviderli614@gmail.com>

* fix cargo clippy

Signed-off-by: liyang <daviderli614@gmail.com>

* refine FlownodeOptions visibility

Signed-off-by: liyang <daviderli614@gmail.com>

* format rust

Signed-off-by: liyang <daviderli614@gmail.com>

---------

Signed-off-by: liyang <daviderli614@gmail.com>
This commit is contained in:
liyang
2025-06-26 09:48:53 +08:00
committed by GitHub
parent 4bb5d00a4b
commit 9ab36e9a6f
3 changed files with 63 additions and 2 deletions

View File

@@ -23,6 +23,7 @@ use common_wal::config::raft_engine::RaftEngineConfig;
use common_wal::config::DatanodeWalConfig;
use datanode::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use file_engine::config::EngineConfig as FileEngineConfig;
use flow::FlownodeOptions;
use frontend::frontend::FrontendOptions;
use meta_client::MetaClientOptions;
use meta_srv::metasrv::MetasrvOptions;
@@ -195,6 +196,54 @@ fn test_load_metasrv_example_config() {
similar_asserts::assert_eq!(options, expected);
}
#[test]
fn test_load_flownode_example_config() {
let example_config = common_test_util::find_workspace_path("config/flownode.example.toml");
let options =
GreptimeOptions::<FlownodeOptions>::load_layered_options(example_config.to_str(), "")
.unwrap();
let expected = GreptimeOptions::<FlownodeOptions> {
component: FlownodeOptions {
node_id: Some(14),
flow: Default::default(),
grpc: GrpcOptions {
bind_addr: "127.0.0.1:6800".to_string(),
server_addr: "127.0.0.1:6800".to_string(),
runtime_size: 2,
..Default::default()
},
logging: LoggingOptions {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
tracing: Default::default(),
heartbeat: Default::default(),
query: Default::default(),
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),
heartbeat_timeout: Duration::from_millis(500),
ddl_timeout: Duration::from_secs(10),
connect_timeout: Duration::from_secs(1),
tcp_nodelay: true,
metadata_cache_max_capacity: 100000,
metadata_cache_ttl: Duration::from_secs(600),
metadata_cache_tti: Duration::from_secs(300),
}),
http: HttpOptions {
addr: "127.0.0.1:4000".to_string(),
..Default::default()
},
user_provider: None,
},
..Default::default()
};
similar_asserts::assert_eq!(options, expected);
}
#[test]
fn test_load_standalone_example_config() {
let example_config = common_test_util::find_workspace_path("config/standalone.example.toml");

View File

@@ -95,7 +95,7 @@ impl Default for FlowConfig {
}
/// Options for flow node
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct FlownodeOptions {
pub node_id: Option<u64>,
@@ -251,6 +251,10 @@ impl DiffRequest {
Self::Delete(v) => v.len(),
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
@@ -929,6 +933,12 @@ pub struct FlowTickManager {
start_timestamp: repr::Timestamp,
}
impl Default for FlowTickManager {
fn default() -> Self {
Self::new()
}
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {

View File

@@ -43,7 +43,7 @@ mod utils;
#[cfg(test)]
mod test_utils;
pub use adapter::{FlowConfig, FlowStreamingEngineRef, FlownodeOptions, StreamingEngine};
pub use adapter::{FlowConfig, FlowStreamingEngineRef, StreamingEngine};
pub use batching_mode::frontend_client::{FrontendClient, GrpcQueryHandlerWithBoxedError};
pub use engine::FlowAuthHeader;
pub(crate) use engine::{CreateFlowArgs, FlowId, TableName};
@@ -52,3 +52,5 @@ pub use server::{
get_flow_auth_options, FlownodeBuilder, FlownodeInstance, FlownodeServer,
FlownodeServiceBuilder, FrontendInvoker,
};
pub use crate::adapter::FlownodeOptions;