feat: start datanode with config (#2312)

* remove memory-catalog and procedure

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* derive serde for MitoConfig

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* start datanode with configs

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove dir in WalConfig

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add rename field attr

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add stupid duplicated mito config

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove wrong import

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* wired compile error

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-04 01:48:40 -05:00
parent 50fca2400e
commit f71aa373c1
27 changed files with 296 additions and 233 deletions

View File

@@ -19,6 +19,7 @@ bytes = "1.1"
catalog = { workspace = true }
common-base = { workspace = true }
common-catalog = { workspace = true }
common-config = { workspace = true }
common-datasource = { workspace = true }
common-error = { workspace = true }
common-function = { workspace = true }
@@ -48,6 +49,7 @@ meta-client = { workspace = true }
meta-srv = { workspace = true }
metrics.workspace = true
mito = { workspace = true }
mito2 = { workspace = true }
object-store = { workspace = true }
pin-project = "1.0"
prost.workspace = true

View File

@@ -14,18 +14,24 @@
//! Datanode configurations
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::local::MemoryCatalogManager;
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_config::WalConfig;
use common_error::ext::BoxedError;
pub use common_procedure::options::ProcedureConfig;
use common_runtime::Runtime;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
@@ -38,11 +44,18 @@ use storage::config::{
DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE,
};
use storage::scheduler::SchedulerConfig;
use store_api::logstore::LogStore;
use store_api::path_utils::WAL_DIR;
use store_api::region_engine::RegionEngineRef;
use tokio::fs;
use crate::error::{Result, RuntimeResourceSnafu, ShutdownInstanceSnafu};
use crate::error::{
CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu,
};
use crate::heartbeat::HeartbeatTask;
use crate::region_server::RegionServer;
use crate::server::Services;
use crate::store;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024);
@@ -218,37 +231,6 @@ impl Default for ObjectStoreConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct WalConfig {
// wal directory
pub dir: Option<String>,
// wal file size in bytes
pub file_size: ReadableSize,
// wal purge threshold in bytes
pub purge_threshold: ReadableSize,
// purge interval in seconds
#[serde(with = "humantime_serde")]
pub purge_interval: Duration,
// read batch size
pub read_batch_size: usize,
// whether to sync log file after every write
pub sync_write: bool,
}
impl Default for WalConfig {
fn default() -> Self {
Self {
dir: None,
file_size: ReadableSize::mb(256), // log file size 256MB
purge_threshold: ReadableSize::gb(4), // purge threshold 4GB
purge_interval: Duration::from_secs(600),
read_batch_size: 128,
sync_write: false,
}
}
}
/// Options for region manifest
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(default)]
@@ -360,7 +342,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig {
#[serde(default)]
pub struct DatanodeOptions {
pub mode: Mode,
pub enable_memory_catalog: bool,
pub node_id: Option<u64>,
pub rpc_addr: String,
pub rpc_hostname: Option<String>,
@@ -370,7 +351,8 @@ pub struct DatanodeOptions {
pub meta_client_options: Option<MetaClientOptions>,
pub wal: WalConfig,
pub storage: StorageConfig,
pub procedure: ProcedureConfig,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
pub enable_telemetry: bool,
}
@@ -379,7 +361,6 @@ impl Default for DatanodeOptions {
fn default() -> Self {
Self {
mode: Mode::Standalone,
enable_memory_catalog: false,
node_id: None,
rpc_addr: "127.0.0.1:3001".to_string(),
rpc_hostname: None,
@@ -388,7 +369,7 @@ impl Default for DatanodeOptions {
meta_client_options: None,
wal: WalConfig::default(),
storage: StorageConfig::default(),
procedure: ProcedureConfig::default(),
region_engine: vec![],
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::default(),
enable_telemetry: true,
@@ -406,6 +387,12 @@ impl DatanodeOptions {
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum RegionEngineConfig {
#[serde(rename = "mito")]
Mito(MitoConfig),
}
/// Datanode service.
pub struct Datanode {
opts: DatanodeOptions,
@@ -433,7 +420,13 @@ impl Datanode {
.context(RuntimeResourceSnafu)?,
);
let region_server = RegionServer::new(query_engine, runtime);
let mut region_server = RegionServer::new(query_engine, runtime);
let log_store = Self::build_log_store(&opts).await?;
let object_store = store::new_object_store(&opts).await?;
let engines = Self::build_store_engines(&opts, log_store, object_store).await?;
for engine in engines {
region_server.register_engine(engine);
}
// build optional things with different modes
let services = match opts.mode {
@@ -489,6 +482,51 @@ impl Datanode {
}
Ok(())
}
// internal utils
/// Build [RaftEngineLogStore]
async fn build_log_store(opts: &DatanodeOptions) -> Result<Arc<RaftEngineLogStore>> {
let data_home = normalize_dir(&opts.storage.data_home);
let wal_dir = format!("{}{WAL_DIR}", data_home);
let wal_config = opts.wal.clone();
// create WAL directory
fs::create_dir_all(Path::new(&wal_dir))
.await
.context(CreateDirSnafu { dir: &wal_dir })?;
info!(
"Creating logstore with config: {:?} and storage path: {}",
wal_config, &wal_dir
);
let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)?;
Ok(Arc::new(logstore))
}
/// Build [RegionEngineRef] from `store_engine` section in `opts`
async fn build_store_engines<S>(
opts: &DatanodeOptions,
log_store: Arc<S>,
object_store: object_store::ObjectStore,
) -> Result<Vec<RegionEngineRef>>
where
S: LogStore,
{
let mut engines = vec![];
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
let engine: MitoEngine =
MitoEngine::new(config.clone(), log_store.clone(), object_store.clone());
engines.push(Arc::new(engine) as _);
}
}
}
Ok(engines)
}
}
#[cfg(test)]

View File

@@ -20,9 +20,10 @@ use std::{fs, path};
use api::v1::meta::Role;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_config::WalConfig;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -35,7 +36,6 @@ use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::{debug, info};
use file_table_engine::engine::immutable::ImmutableFileTableEngine;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::LogConfig;
use meta_client::client::{MetaClient, MetaClientBuilder};
use meta_client::MetaClientOptions;
use mito::config::EngineConfig as TableEngineConfig;
@@ -54,10 +54,9 @@ use store_api::path_utils::{CLUSTER_DIR, WAL_DIR};
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{TableEngine, TableEngineProcedureRef};
use table::requests::FlushTableRequest;
use table::table::numbers::NumbersTable;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig};
use crate::datanode::{DatanodeOptions, ProcedureConfig};
use crate::error::{
self, CatalogSnafu, IncorrectInternalStateSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu,
MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result,
@@ -159,9 +158,8 @@ impl Instance {
) -> Result<(InstanceRef, Option<HeartbeatTask>)> {
let data_home = util::normalize_dir(&opts.storage.data_home);
info!("The working home directory is: {}", data_home);
let object_store = store::new_object_store(&data_home, &opts.storage.store).await?;
let log_store =
Arc::new(create_log_store(&data_home, &opts.storage.store, &opts.wal).await?);
let object_store = store::new_object_store(opts).await?;
let log_store = Arc::new(create_log_store(&data_home, opts.wal.clone()).await?);
let mito_engine = Arc::new(DefaultEngine::new(
TableEngineConfig {
@@ -203,39 +201,17 @@ impl Instance {
// create remote catalog manager
let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode {
Mode::Standalone => {
if opts.enable_memory_catalog {
let catalog = catalog::local::MemoryCatalogManager::with_default_setup();
let table = NumbersTable::table(MIN_USER_TABLE_ID);
let _ = catalog
.register_table(RegisterTableRequest {
table_id: MIN_USER_TABLE_ID,
table_name: table.table_info().name.to_string(),
table,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
})
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(engine_manager.clone())
.await
.expect("Failed to register numbers");
.context(CatalogSnafu)?,
);
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
None,
)
} else {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(engine_manager.clone())
.await
.context(CatalogSnafu)?,
);
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
None,
)
}
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
None,
)
}
Mode::Distributed => {
@@ -273,9 +249,12 @@ impl Instance {
plugins,
);
let query_engine = factory.query_engine();
let procedure_manager =
create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store)
.await?;
let procedure_manager = create_procedure_manager(
opts.node_id.unwrap_or(0),
&ProcedureConfig::default(),
object_store,
)
.await?;
let sql_handler = SqlHandler::new(
engine_manager.clone(),
catalog_manager.clone(),
@@ -451,16 +430,9 @@ pub async fn new_metasrv_client(
pub(crate) async fn create_log_store(
data_home: &str,
store_config: &ObjectStoreConfig,
wal_config: &WalConfig,
wal_config: WalConfig,
) -> Result<RaftEngineLogStore> {
let wal_dir = match (&wal_config.dir, store_config) {
(Some(dir), _) => dir.to_string(),
(None, ObjectStoreConfig::File(_file_config)) => {
format!("{}{WAL_DIR}", data_home)
}
_ => return error::MissingWalDirConfigSnafu {}.fail(),
};
let wal_dir = format!("{}{WAL_DIR}", data_home);
// create WAL directory
fs::create_dir_all(path::Path::new(&wal_dir))
@@ -469,16 +441,7 @@ pub(crate) async fn create_log_store(
"Creating logstore with config: {:?} and storage path: {}",
wal_config, &wal_dir
);
let log_config = LogConfig {
file_size: wal_config.file_size.0,
log_file_dir: wal_dir,
purge_interval: wal_config.purge_interval,
purge_threshold: wal_config.purge_threshold.0,
read_batch_size: wal_config.read_batch_size,
sync_write: wal_config.sync_write,
};
let logstore = RaftEngineLogStore::try_new(log_config)
let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)?;

View File

@@ -28,19 +28,18 @@ use common_base::readable_size::ReadableSize;
use common_telemetry::logging::info;
use object_store::layers::{LoggingLayer, LruCacheLayer, MetricsLayer, RetryLayer, TracingLayer};
use object_store::services::Fs as FsBuilder;
use object_store::util::normalize_dir;
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, Result};
pub(crate) async fn new_object_store(
data_home: &str,
store_config: &ObjectStoreConfig,
) -> Result<ObjectStore> {
let object_store = match store_config {
pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result<ObjectStore> {
let data_home = normalize_dir(&opts.storage.data_home);
let object_store = match &opts.storage.store {
ObjectStoreConfig::File(file_config) => {
fs::new_fs_object_store(data_home, file_config).await
fs::new_fs_object_store(&data_home, file_config).await
}
ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await,
ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await,
@@ -51,8 +50,9 @@ pub(crate) async fn new_object_store(
}?;
// Enable retry layer and cache layer for non-fs object storages
let object_store = if !matches!(store_config, ObjectStoreConfig::File(..)) {
let object_store = create_object_store_with_cache(object_store, store_config).await?;
let object_store = if !matches!(opts.storage.store, ObjectStoreConfig::File(..)) {
let object_store =
create_object_store_with_cache(object_store, &opts.storage.store).await?;
object_store.layer(RetryLayer::new().with_jitter())
} else {
object_store

View File

@@ -16,6 +16,7 @@ use catalog::RegisterTableRequest;
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE,
};
use common_config::WalConfig;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
@@ -25,9 +26,7 @@ use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
use table::TableRef;
use crate::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig,
};
use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, StorageConfig};
use crate::error::{CreateTableSnafu, Result};
use crate::heartbeat::HeartbeatTask;
use crate::instance::{Instance, InstanceRef};
@@ -69,17 +68,13 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let opts = DatanodeOptions {
wal: WalConfig {
dir: Some(wal_tmp_dir.path().to_str().unwrap().to_string()),
..Default::default()
},
wal: WalConfig::default(),
storage: StorageConfig {
data_home: data_tmp_dir.path().to_str().unwrap().to_string(),
store: ObjectStoreConfig::File(FileConfig {}),
..Default::default()
},
mode: Mode::Standalone,
procedure: ProcedureConfig::default(),
..Default::default()
};
(