diff --git a/config/datanode.example.toml b/config/datanode.example.toml index aa4b47d187..16063edee0 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,5 +1,4 @@ node_id = 42 -http_addr = '0.0.0.0:3000' rpc_addr = '0.0.0.0:3001' wal_dir = '/tmp/greptimedb/wal' rpc_runtime_size = 8 @@ -7,10 +6,6 @@ mode = "standalone" mysql_addr = '0.0.0.0:3306' mysql_runtime_size = 4 -# applied when postgres feature enbaled -postgres_addr = '0.0.0.0:5432' -postgres_runtime_size = 4 - [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 43280a895a..851282896e 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -37,14 +37,10 @@ struct StartCommand { #[clap(long)] node_id: Option, #[clap(long)] - http_addr: Option, - #[clap(long)] rpc_addr: Option, #[clap(long)] mysql_addr: Option, #[clap(long)] - postgres_addr: Option, - #[clap(long)] metasrv_addr: Option, #[clap(short, long)] config_file: Option, @@ -76,18 +72,12 @@ impl TryFrom for DatanodeOptions { DatanodeOptions::default() }; - if let Some(addr) = cmd.http_addr { - opts.http_addr = addr; - } if let Some(addr) = cmd.rpc_addr { opts.rpc_addr = addr; } if let Some(addr) = cmd.mysql_addr { opts.mysql_addr = addr; } - if let Some(addr) = cmd.postgres_addr { - opts.postgres_addr = addr; - } match (cmd.metasrv_addr, cmd.node_id) { (Some(meta_addr), Some(node_id)) => { @@ -129,10 +119,8 @@ mod tests { fn test_read_from_config_file() { let cmd = StartCommand { node_id: None, - http_addr: None, rpc_addr: None, mysql_addr: None, - postgres_addr: None, metasrv_addr: None, config_file: Some(format!( "{}/../../config/datanode.example.toml", @@ -140,7 +128,6 @@ mod tests { )), }; let options: DatanodeOptions = cmd.try_into().unwrap(); - assert_eq!("0.0.0.0:3000".to_string(), options.http_addr); assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr); assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir); assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr); @@ -153,9 +140,6 @@ mod tests { assert_eq!(3000, options.meta_client_opts.timeout_millis); assert!(options.meta_client_opts.tcp_nodelay); - assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr); - assert_eq!(4, options.postgres_runtime_size); - match options.storage { ObjectStoreConfig::File { data_dir } => { assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir) @@ -169,10 +153,8 @@ mod tests { Mode::Standalone, DatanodeOptions::try_from(StartCommand { node_id: None, - http_addr: None, rpc_addr: None, mysql_addr: None, - postgres_addr: None, metasrv_addr: None, config_file: None }) @@ -184,10 +166,8 @@ mod tests { Mode::Distributed, DatanodeOptions::try_from(StartCommand { node_id: Some(42), - http_addr: None, rpc_addr: None, mysql_addr: None, - postgres_addr: None, metasrv_addr: Some("127.0.0.1:3002".to_string()), config_file: None }) @@ -197,20 +177,16 @@ mod tests { assert!(DatanodeOptions::try_from(StartCommand { node_id: None, - http_addr: None, rpc_addr: None, mysql_addr: None, - postgres_addr: None, metasrv_addr: Some("127.0.0.1:3002".to_string()), config_file: None, }) .is_err()); assert!(DatanodeOptions::try_from(StartCommand { node_id: Some(42), - http_addr: None, rpc_addr: None, mysql_addr: None, - postgres_addr: None, metasrv_addr: None, config_file: None, }) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 83f4fbcbd2..59ceb64234 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -26,13 +26,10 @@ impl Default for ObjectStoreConfig { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DatanodeOptions { pub node_id: u64, - pub http_addr: String, pub rpc_addr: String, pub rpc_runtime_size: usize, pub mysql_addr: String, pub mysql_runtime_size: usize, - pub postgres_addr: String, - pub postgres_runtime_size: usize, pub meta_client_opts: MetaClientOpts, pub wal_dir: String, pub storage: ObjectStoreConfig, @@ -43,13 +40,10 @@ impl Default for DatanodeOptions { fn default() -> Self { Self { node_id: 0, - http_addr: "0.0.0.0:3000".to_string(), rpc_addr: "0.0.0.0:3001".to_string(), rpc_runtime_size: 8, mysql_addr: "0.0.0.0:3306".to_string(), mysql_runtime_size: 2, - postgres_addr: "0.0.0.0:5432".to_string(), - postgres_runtime_size: 2, meta_client_opts: MetaClientOpts::default(), wal_dir: "/tmp/greptimedb/wal".to_string(), storage: ObjectStoreConfig::default(), diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index e15811c728..caeb1c936f 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -4,8 +4,10 @@ use std::sync::Arc; use common_runtime::Builder as RuntimeBuilder; use servers::grpc::GrpcServer; +use servers::mysql::server::MysqlServer; use servers::server::Server; use snafu::ResultExt; +use tokio::try_join; use crate::datanode::DatanodeOptions; use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu}; @@ -16,6 +18,7 @@ pub mod grpc; /// All rpc services. pub struct Services { grpc_server: GrpcServer, + mysql_server: Box, } impl Services { @@ -28,8 +31,17 @@ impl Services { .context(RuntimeResourceSnafu)?, ); + let mysql_io_runtime = Arc::new( + RuntimeBuilder::default() + .worker_threads(opts.mysql_runtime_size as usize) + .thread_name("mysql-io-handlers") + .build() + .context(RuntimeResourceSnafu)?, + ); + Ok(Self { - grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime), + grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime), + mysql_server: MysqlServer::create_server(instance, mysql_io_runtime), }) } @@ -37,10 +49,15 @@ impl Services { let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu { addr: &opts.rpc_addr, })?; - self.grpc_server - .start(grpc_addr) - .await - .context(StartServerSnafu)?; + let mysql_addr = &opts.mysql_addr; + let mysql_addr: SocketAddr = mysql_addr + .parse() + .context(ParseAddrSnafu { addr: mysql_addr })?; + try_join!( + self.grpc_server.start(grpc_addr), + self.mysql_server.start(mysql_addr), + ) + .context(StartServerSnafu)?; Ok(()) } }