mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: add mysql protocol handler back to datanode for debugging (#479)
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
node_id = 42
|
||||
http_addr = '0.0.0.0:3000'
|
||||
rpc_addr = '0.0.0.0:3001'
|
||||
wal_dir = '/tmp/greptimedb/wal'
|
||||
rpc_runtime_size = 8
|
||||
@@ -7,10 +6,6 @@ mode = "standalone"
|
||||
mysql_addr = '0.0.0.0:3306'
|
||||
mysql_runtime_size = 4
|
||||
|
||||
# applied when postgres feature enbaled
|
||||
postgres_addr = '0.0.0.0:5432'
|
||||
postgres_runtime_size = 4
|
||||
|
||||
[storage]
|
||||
type = 'File'
|
||||
data_dir = '/tmp/greptimedb/data/'
|
||||
|
||||
@@ -37,14 +37,10 @@ struct StartCommand {
|
||||
#[clap(long)]
|
||||
node_id: Option<u64>,
|
||||
#[clap(long)]
|
||||
http_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
rpc_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
mysql_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
postgres_addr: Option<String>,
|
||||
#[clap(long)]
|
||||
metasrv_addr: Option<String>,
|
||||
#[clap(short, long)]
|
||||
config_file: Option<String>,
|
||||
@@ -76,18 +72,12 @@ impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
DatanodeOptions::default()
|
||||
};
|
||||
|
||||
if let Some(addr) = cmd.http_addr {
|
||||
opts.http_addr = addr;
|
||||
}
|
||||
if let Some(addr) = cmd.rpc_addr {
|
||||
opts.rpc_addr = addr;
|
||||
}
|
||||
if let Some(addr) = cmd.mysql_addr {
|
||||
opts.mysql_addr = addr;
|
||||
}
|
||||
if let Some(addr) = cmd.postgres_addr {
|
||||
opts.postgres_addr = addr;
|
||||
}
|
||||
|
||||
match (cmd.metasrv_addr, cmd.node_id) {
|
||||
(Some(meta_addr), Some(node_id)) => {
|
||||
@@ -129,10 +119,8 @@ mod tests {
|
||||
fn test_read_from_config_file() {
|
||||
let cmd = StartCommand {
|
||||
node_id: None,
|
||||
http_addr: None,
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
postgres_addr: None,
|
||||
metasrv_addr: None,
|
||||
config_file: Some(format!(
|
||||
"{}/../../config/datanode.example.toml",
|
||||
@@ -140,7 +128,6 @@ mod tests {
|
||||
)),
|
||||
};
|
||||
let options: DatanodeOptions = cmd.try_into().unwrap();
|
||||
assert_eq!("0.0.0.0:3000".to_string(), options.http_addr);
|
||||
assert_eq!("0.0.0.0:3001".to_string(), options.rpc_addr);
|
||||
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
|
||||
assert_eq!("0.0.0.0:3306".to_string(), options.mysql_addr);
|
||||
@@ -153,9 +140,6 @@ mod tests {
|
||||
assert_eq!(3000, options.meta_client_opts.timeout_millis);
|
||||
assert!(options.meta_client_opts.tcp_nodelay);
|
||||
|
||||
assert_eq!("0.0.0.0:5432".to_string(), options.postgres_addr);
|
||||
assert_eq!(4, options.postgres_runtime_size);
|
||||
|
||||
match options.storage {
|
||||
ObjectStoreConfig::File { data_dir } => {
|
||||
assert_eq!("/tmp/greptimedb/data/".to_string(), data_dir)
|
||||
@@ -169,10 +153,8 @@ mod tests {
|
||||
Mode::Standalone,
|
||||
DatanodeOptions::try_from(StartCommand {
|
||||
node_id: None,
|
||||
http_addr: None,
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
postgres_addr: None,
|
||||
metasrv_addr: None,
|
||||
config_file: None
|
||||
})
|
||||
@@ -184,10 +166,8 @@ mod tests {
|
||||
Mode::Distributed,
|
||||
DatanodeOptions::try_from(StartCommand {
|
||||
node_id: Some(42),
|
||||
http_addr: None,
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
postgres_addr: None,
|
||||
metasrv_addr: Some("127.0.0.1:3002".to_string()),
|
||||
config_file: None
|
||||
})
|
||||
@@ -197,20 +177,16 @@ mod tests {
|
||||
|
||||
assert!(DatanodeOptions::try_from(StartCommand {
|
||||
node_id: None,
|
||||
http_addr: None,
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
postgres_addr: None,
|
||||
metasrv_addr: Some("127.0.0.1:3002".to_string()),
|
||||
config_file: None,
|
||||
})
|
||||
.is_err());
|
||||
assert!(DatanodeOptions::try_from(StartCommand {
|
||||
node_id: Some(42),
|
||||
http_addr: None,
|
||||
rpc_addr: None,
|
||||
mysql_addr: None,
|
||||
postgres_addr: None,
|
||||
metasrv_addr: None,
|
||||
config_file: None,
|
||||
})
|
||||
|
||||
@@ -26,13 +26,10 @@ impl Default for ObjectStoreConfig {
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct DatanodeOptions {
|
||||
pub node_id: u64,
|
||||
pub http_addr: String,
|
||||
pub rpc_addr: String,
|
||||
pub rpc_runtime_size: usize,
|
||||
pub mysql_addr: String,
|
||||
pub mysql_runtime_size: usize,
|
||||
pub postgres_addr: String,
|
||||
pub postgres_runtime_size: usize,
|
||||
pub meta_client_opts: MetaClientOpts,
|
||||
pub wal_dir: String,
|
||||
pub storage: ObjectStoreConfig,
|
||||
@@ -43,13 +40,10 @@ impl Default for DatanodeOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
node_id: 0,
|
||||
http_addr: "0.0.0.0:3000".to_string(),
|
||||
rpc_addr: "0.0.0.0:3001".to_string(),
|
||||
rpc_runtime_size: 8,
|
||||
mysql_addr: "0.0.0.0:3306".to_string(),
|
||||
mysql_runtime_size: 2,
|
||||
postgres_addr: "0.0.0.0:5432".to_string(),
|
||||
postgres_runtime_size: 2,
|
||||
meta_client_opts: MetaClientOpts::default(),
|
||||
wal_dir: "/tmp/greptimedb/wal".to_string(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
|
||||
@@ -4,8 +4,10 @@ use std::sync::Arc;
|
||||
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use servers::grpc::GrpcServer;
|
||||
use servers::mysql::server::MysqlServer;
|
||||
use servers::server::Server;
|
||||
use snafu::ResultExt;
|
||||
use tokio::try_join;
|
||||
|
||||
use crate::datanode::DatanodeOptions;
|
||||
use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu};
|
||||
@@ -16,6 +18,7 @@ pub mod grpc;
|
||||
/// All rpc services.
|
||||
pub struct Services {
|
||||
grpc_server: GrpcServer,
|
||||
mysql_server: Box<dyn Server>,
|
||||
}
|
||||
|
||||
impl Services {
|
||||
@@ -28,8 +31,17 @@ impl Services {
|
||||
.context(RuntimeResourceSnafu)?,
|
||||
);
|
||||
|
||||
let mysql_io_runtime = Arc::new(
|
||||
RuntimeBuilder::default()
|
||||
.worker_threads(opts.mysql_runtime_size as usize)
|
||||
.thread_name("mysql-io-handlers")
|
||||
.build()
|
||||
.context(RuntimeResourceSnafu)?,
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime),
|
||||
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
|
||||
mysql_server: MysqlServer::create_server(instance, mysql_io_runtime),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -37,10 +49,15 @@ impl Services {
|
||||
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
|
||||
addr: &opts.rpc_addr,
|
||||
})?;
|
||||
self.grpc_server
|
||||
.start(grpc_addr)
|
||||
.await
|
||||
.context(StartServerSnafu)?;
|
||||
let mysql_addr = &opts.mysql_addr;
|
||||
let mysql_addr: SocketAddr = mysql_addr
|
||||
.parse()
|
||||
.context(ParseAddrSnafu { addr: mysql_addr })?;
|
||||
try_join!(
|
||||
self.grpc_server.start(grpc_addr),
|
||||
self.mysql_server.start(mysql_addr),
|
||||
)
|
||||
.context(StartServerSnafu)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user