feat: disable mysql server on datande when running standalone mode (#593)

This commit is contained in:
Lei, HUANG
2022-11-21 12:12:26 +08:00
committed by GitHub
parent 5428ad364e
commit ca5734edb3
16 changed files with 65 additions and 43 deletions

1
Cargo.lock generated
View File

@@ -1098,6 +1098,7 @@ dependencies = [
"meta-client",
"meta-srv",
"serde",
"servers",
"snafu",
"tempdir",
"tokio",

View File

@@ -21,6 +21,7 @@ frontend = { path = "../frontend" }
futures = "0.3"
meta-srv = { path = "../meta-srv" }
serde = "1.0"
servers = {path = "../servers"}
snafu = { version = "0.7", features = ["backtraces"] }
tokio = { version = "1.18", features = ["full"] }
toml = "0.5"

View File

@@ -15,8 +15,8 @@
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{Datanode, DatanodeOptions};
use frontend::frontend::Mode;
use meta_client::MetaClientOpts;
use servers::Mode;
use snafu::ResultExt;
use crate::error::{Error, MissingConfigSnafu, Result, StartDatanodeSnafu};
@@ -124,7 +124,7 @@ mod tests {
use std::assert_matches::assert_matches;
use datanode::datanode::ObjectStoreConfig;
use frontend::frontend::Mode;
use servers::Mode;
use super::*;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use clap::Parser;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance;
@@ -21,6 +21,7 @@ use frontend::mysql::MysqlOptions;
use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use meta_client::MetaClientOpts;
use servers::Mode;
use snafu::ResultExt;
use crate::error::{self, Result};

View File

@@ -16,7 +16,7 @@ use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions, Mode};
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
use frontend::influxdb::InfluxdbOptions;
use frontend::instance::Instance as FeInstance;
@@ -25,6 +25,7 @@ use frontend::opentsdb::OpentsdbOptions;
use frontend::postgres::PostgresOptions;
use frontend::prometheus::PrometheusOptions;
use serde::{Deserialize, Serialize};
use servers::Mode;
use snafu::ResultExt;
use tokio::try_join;

View File

@@ -15,9 +15,9 @@
use std::sync::Arc;
use common_telemetry::info;
use frontend::frontend::Mode;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::Mode;
use crate::error::Result;
use crate::instance::{Instance, InstanceRef};

View File

@@ -20,7 +20,6 @@ use catalog::remote::MetaKvBackend;
use catalog::CatalogManagerRef;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_telemetry::logging::info;
use frontend::frontend::Mode;
use log_store::fs::config::LogConfig;
use log_store::fs::log::LocalFileLogStore;
use meta_client::client::{MetaClient, MetaClientBuilder};
@@ -31,6 +30,7 @@ use object_store::layers::LoggingLayer;
use object_store::services::fs::Builder;
use object_store::{util, ObjectStore};
use query::query_engine::{QueryEngineFactory, QueryEngineRef};
use servers::Mode;
use snafu::prelude::*;
use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;

View File

@@ -17,11 +17,12 @@ use std::net::SocketAddr;
use std::sync::Arc;
use common_runtime::Builder as RuntimeBuilder;
use common_telemetry::tracing::log::info;
use servers::grpc::GrpcServer;
use servers::mysql::server::MysqlServer;
use servers::server::Server;
use servers::Mode;
use snafu::ResultExt;
use tokio::try_join;
use crate::datanode::DatanodeOptions;
use crate::error::{ParseAddrSnafu, Result, RuntimeResourceSnafu, StartServerSnafu};
@@ -32,7 +33,7 @@ pub mod grpc;
/// All rpc services.
pub struct Services {
grpc_server: GrpcServer,
mysql_server: Box<dyn Server>,
mysql_server: Option<Box<dyn Server>>,
}
impl Services {
@@ -45,17 +46,29 @@ impl Services {
.context(RuntimeResourceSnafu)?,
);
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
);
let mysql_server = match opts.mode {
Mode::Standalone => {
info!("Disable MySQL server on datanode when running in standalone mode");
None
}
Mode::Distributed => {
let mysql_io_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.mysql_runtime_size as usize)
.thread_name("mysql-io-handlers")
.build()
.context(RuntimeResourceSnafu)?,
);
Some(MysqlServer::create_server(
instance.clone(),
mysql_io_runtime,
))
}
};
Ok(Self {
grpc_server: GrpcServer::new(instance.clone(), instance.clone(), grpc_runtime),
mysql_server: MysqlServer::create_server(instance, mysql_io_runtime),
grpc_server: GrpcServer::new(instance.clone(), instance, grpc_runtime),
mysql_server,
})
}
@@ -63,15 +76,19 @@ impl Services {
let grpc_addr: SocketAddr = opts.rpc_addr.parse().context(ParseAddrSnafu {
addr: &opts.rpc_addr,
})?;
let mysql_addr = &opts.mysql_addr;
let mysql_addr: SocketAddr = mysql_addr
.parse()
.context(ParseAddrSnafu { addr: mysql_addr })?;
try_join!(
self.grpc_server.start(grpc_addr),
self.mysql_server.start(mysql_addr),
)
.context(StartServerSnafu)?;
let mut res = vec![self.grpc_server.start(grpc_addr)];
if let Some(mysql_server) = &self.mysql_server {
let mysql_addr = &opts.mysql_addr;
let mysql_addr: SocketAddr = mysql_addr
.parse()
.context(ParseAddrSnafu { addr: mysql_addr })?;
res.push(mysql_server.start(mysql_addr));
};
futures::future::try_join_all(res)
.await
.context(StartServerSnafu)?;
Ok(())
}
}

View File

@@ -30,10 +30,10 @@ use client::{Client, Database, ObjectResult};
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_runtime::Builder as RuntimeBuilder;
use frontend::frontend::FrontendOptions;
use frontend::frontend::Mode::Standalone;
use frontend::grpc::GrpcOptions;
use servers::grpc::GrpcServer;
use servers::server::Server;
use servers::Mode;
use crate::instance::Instance;
use crate::tests::test_util::{self, TestGuard};
@@ -62,7 +62,7 @@ async fn setup_grpc_server(
let fe_grpc_addr = format!("127.0.0.1:{}", frontend_port);
let fe_opts = FrontendOptions {
mode: Standalone,
mode: Mode::Standalone,
datanode_rpc_addr: datanode_grpc_addr.clone(),
grpc_options: Some(GrpcOptions {
addr: fe_grpc_addr.clone(),

View File

@@ -19,9 +19,9 @@ use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use frontend::frontend::Mode;
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use meta_client::MetaClientOpts;
use serde::{Deserialize, Serialize};
use servers::Mode;
use snafu::prelude::*;
use crate::error::{self, Result};
@@ -97,10 +98,3 @@ where
Services::start(&self.opts, instance).await
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}

View File

@@ -43,11 +43,11 @@ use common_telemetry::{debug, error, info};
use distributed::DistInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::MetaClientOpts;
use servers::error as server_error;
use servers::query_handler::{
GrpcAdminHandler, GrpcQueryHandler, InfluxdbLineProtocolHandler, OpentsdbProtocolHandler,
PrometheusProtocolHandler, ScriptHandler, ScriptHandlerRef, SqlQueryHandler,
};
use servers::{error as server_error, Mode};
use snafu::prelude::*;
use sql::dialect::GenericDialect;
use sql::parser::ParserContext;
@@ -64,7 +64,7 @@ use crate::error::{
SchemaNotFoundSnafu, SelectSnafu,
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::{FrontendOptions, Mode};
use crate::frontend::FrontendOptions;
use crate::sql::insert_to_request;
use crate::table::route::TableRoutes;

View File

@@ -21,15 +21,14 @@ use async_trait::async_trait;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::prelude::BoxedError;
use common_insert::column_to_vector;
use servers::error as server_error;
use servers::influxdb::InfluxdbRequest;
use servers::query_handler::InfluxdbLineProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::{OptionExt, ResultExt};
use table::requests::InsertRequest;
use crate::error;
use crate::error::{DeserializeInsertBatchSnafu, InsertBatchToRequestSnafu, Result};
use crate::frontend::Mode;
use crate::instance::Instance;
#[async_trait]

View File

@@ -14,13 +14,12 @@
use async_trait::async_trait;
use common_error::prelude::BoxedError;
use servers::error as server_error;
use servers::opentsdb::codec::DataPoint;
use servers::query_handler::OpentsdbProtocolHandler;
use servers::{error as server_error, Mode};
use snafu::prelude::*;
use crate::error::Result;
use crate::frontend::Mode;
use crate::instance::Instance;
#[async_trait]

View File

@@ -24,9 +24,9 @@ use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::prometheus::{self, Metrics};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
use crate::frontend::Mode;
use crate::instance::{parse_stmt, Instance};
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;

View File

@@ -14,6 +14,8 @@
#![feature(assert_matches)]
use serde::{Deserialize, Serialize};
pub mod context;
pub mod error;
pub mod grpc;
@@ -27,3 +29,10 @@ pub mod prometheus;
pub mod query_handler;
pub mod server;
mod shutdown;
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}