fix: datanode start in standalone mode by default (#418)

* fix: datanode start in standalone mode by default

* fix: detech misconfig on startup

* fix: some CR comments and add tests
This commit is contained in:
Lei, Huang
2022-11-08 16:18:13 +08:00
committed by GitHub
parent 43f9c40f43
commit a2f9b788f1
6 changed files with 150 additions and 33 deletions

View File

@@ -23,6 +23,13 @@ impl Default for ObjectStoreConfig {
}
}
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Mode {
Standalone,
Distributed,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatanodeOptions {
pub node_id: u64,
@@ -36,6 +43,7 @@ pub struct DatanodeOptions {
pub meta_client_opts: MetaClientOpts,
pub wal_dir: String,
pub storage: ObjectStoreConfig,
pub mode: Mode,
}
impl Default for DatanodeOptions {
@@ -55,6 +63,7 @@ impl Default for DatanodeOptions {
common_time::util::current_time_millis()
),
storage: ObjectStoreConfig::default(),
mode: Mode::Standalone,
}
}
}

View File

@@ -14,8 +14,8 @@ use storage::{config::EngineConfig as StorageEngineConfig, EngineImpl};
use table_engine::config::EngineConfig as TableEngineConfig;
use table_engine::engine::MitoEngine;
use crate::datanode::{DatanodeOptions, MetaClientOpts, ObjectStoreConfig};
use crate::error::{self, MetaClientInitSnafu, NewCatalogSnafu, Result};
use crate::datanode::{DatanodeOptions, MetaClientOpts, Mode, ObjectStoreConfig};
use crate::error::{self, CatalogSnafu, MetaClientInitSnafu, NewCatalogSnafu, Result};
use crate::heartbeat::HeartbeatTask;
use crate::script::ScriptExecutor;
use crate::server::grpc::plan::PhysicalPlanner;
@@ -34,8 +34,8 @@ pub struct Instance {
pub(crate) physical_planner: PhysicalPlanner,
pub(crate) script_executor: ScriptExecutor,
#[allow(unused)]
pub(crate) meta_client: MetaClient,
pub(crate) heartbeat_task: HeartbeatTask,
pub(crate) meta_client: Option<MetaClient>,
pub(crate) heartbeat_task: Option<HeartbeatTask>,
}
pub type InstanceRef = Arc<Instance>;
@@ -44,7 +44,13 @@ impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let log_store = create_local_file_log_store(opts).await?;
let meta_client = new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?;
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
Some(new_metasrv_client(opts.node_id, &opts.meta_client_opts).await?)
}
};
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),
@@ -57,24 +63,42 @@ impl Instance {
));
// create remote catalog manager
let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
opts.node_id,
Arc::new(MetaKvBackend {
client: meta_client.clone(),
}),
));
let (catalog_manager, factory) = match opts.mode {
Mode::Standalone => {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(table_engine.clone())
.await
.context(CatalogSnafu)?,
);
let factory = QueryEngineFactory::new(catalog.clone());
(catalog as CatalogManagerRef, factory)
}
Mode::Distributed => {
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
table_engine.clone(),
opts.node_id,
Arc::new(MetaKvBackend {
client: meta_client.as_ref().unwrap().clone(),
}),
));
let factory = QueryEngineFactory::new(catalog.clone());
(catalog as CatalogManagerRef, factory)
}
};
let factory = QueryEngineFactory::new(catalog_manager.clone());
let query_engine = factory.query_engine().clone();
let script_executor =
ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?;
let heartbeat_task = HeartbeatTask::new(
opts.node_id, /*node id not set*/
opts.rpc_addr.clone(),
meta_client.clone(),
);
let heartbeat_task = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => Some(HeartbeatTask::new(
opts.node_id, /*node id not set*/
opts.rpc_addr.clone(),
meta_client.as_ref().unwrap().clone(),
)),
};
Ok(Self {
query_engine: query_engine.clone(),
sql_handler: SqlHandler::new(table_engine, catalog_manager.clone()),
@@ -91,7 +115,9 @@ impl Instance {
.start()
.await
.context(NewCatalogSnafu)?;
self.heartbeat_task.start().await?;
if let Some(task) = &self.heartbeat_task {
task.start().await?;
}
Ok(())
}

View File

@@ -23,7 +23,7 @@ impl Instance {
use table_engine::table::test_util::MockEngine;
use table_engine::table::test_util::MockMitoEngine;
let meta_client = mock_meta_client().await;
let meta_client = Some(mock_meta_client().await);
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
TableEngineConfig::default(),
@@ -46,8 +46,11 @@ impl Instance {
.await
.unwrap();
let heartbeat_task =
HeartbeatTask::new(0, "127.0.0.1:3302".to_string(), meta_client.clone());
let heartbeat_task = Some(HeartbeatTask::new(
0,
"127.0.0.1:3302".to_string(),
meta_client.as_ref().unwrap().clone(),
));
Ok(Self {
query_engine,
sql_handler,
@@ -95,8 +98,8 @@ impl Instance {
catalog_manager,
physical_planner: PhysicalPlanner::new(query_engine),
script_executor,
meta_client,
heartbeat_task,
meta_client: Some(meta_client),
heartbeat_task: Some(heartbeat_task),
})
}
}