From f71aa373c1fdf47b2160295ab34e8341f856f613 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 4 Sep 2023 01:48:40 -0500 Subject: [PATCH] feat: start datanode with config (#2312) * remove memory-catalog and procedure Signed-off-by: Ruihang Xia * derive serde for MitoConfig Signed-off-by: Ruihang Xia * start datanode with configs Signed-off-by: Ruihang Xia * remove dir in WalConfig Signed-off-by: Ruihang Xia * add rename field attr Signed-off-by: Ruihang Xia * add stupid duplicated mito config Signed-off-by: Ruihang Xia * remove wrong import Signed-off-by: Ruihang Xia * wired compile error Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 17 +++ Cargo.toml | 2 + config/datanode.example.toml | 27 ++++- config/standalone.example.toml | 9 -- src/cmd/Cargo.toml | 1 + src/cmd/src/datanode.rs | 11 -- src/cmd/src/options.rs | 3 - src/cmd/src/standalone.rs | 12 +- src/common/config/Cargo.toml | 10 ++ src/common/config/src/lib.rs | 46 +++++++ src/common/datasource/Cargo.toml | 1 + src/common/datasource/src/compression.rs | 3 +- src/datanode/Cargo.toml | 2 + src/datanode/src/datanode.rs | 112 ++++++++++++------ src/datanode/src/instance.rs | 85 ++++--------- src/datanode/src/store.rs | 18 +-- src/datanode/src/tests/test_util.rs | 11 +- src/log-store/Cargo.toml | 1 + src/log-store/src/lib.rs | 2 - src/log-store/src/raft_engine/log_store.rs | 96 +++++++-------- src/log-store/src/test_util/log_store_util.rs | 12 +- src/mito2/Cargo.toml | 1 + src/mito2/src/config.rs | 4 +- src/script/Cargo.toml | 1 + src/script/src/manager.rs | 29 ++--- src/storage/Cargo.toml | 1 + src/storage/src/test_util/config_util.rs | 12 +- 27 files changed, 296 insertions(+), 233 deletions(-) create mode 100644 src/common/config/Cargo.toml create mode 100644 src/common/config/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index a4b276b639..e2bc1ce016 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1587,6 +1587,7 @@ dependencies = [ "clap 3.2.25", "client", "common-base", + "common-config", "common-error", "common-meta", "common-query", @@ -1672,6 +1673,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "common-config" +version = "0.4.0-nightly" +dependencies = [ + "common-base", + "humantime-serde", + "serde", +] + [[package]] name = "common-datasource" version = "0.4.0-nightly" @@ -1691,6 +1701,7 @@ dependencies = [ "orc-rust", "paste", "regex", + "serde", "snafu", "strum 0.25.0", "tokio", @@ -2611,6 +2622,7 @@ dependencies = [ "client", "common-base", "common-catalog", + "common-config", "common-datasource", "common-error", "common-function", @@ -2641,6 +2653,7 @@ dependencies = [ "meta-srv", "metrics", "mito", + "mito2", "object-store", "pin-project", "prost", @@ -4903,6 +4916,7 @@ dependencies = [ "byteorder", "bytes", "common-base", + "common-config", "common-error", "common-meta", "common-runtime", @@ -5457,6 +5471,7 @@ dependencies = [ "datafusion-common", "datatypes", "futures", + "humantime-serde", "lazy_static", "log-store", "memcomparable", @@ -8363,6 +8378,7 @@ dependencies = [ "async-trait", "catalog", "common-catalog", + "common-config", "common-error", "common-function", "common-query", @@ -9119,6 +9135,7 @@ dependencies = [ "atomic_float", "bytes", "common-base", + "common-config", "common-datasource", "common-error", "common-query", diff --git a/Cargo.toml b/Cargo.toml index c2dc6ac05f..80c6f21673 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "src/cmd", "src/common/base", "src/common/catalog", + "src/common/config", "src/common/datasource", "src/common/error", "src/common/function", @@ -118,6 +119,7 @@ client = { path = "src/client" } cmd = { path = "src/cmd" } common-base = { path = "src/common/base" } common-catalog = { path = "src/common/catalog" } +common-config = { path = "src/common/config" } common-datasource = { path = "src/common/datasource" } common-error = { path = "src/common/error" } common-function = { path = "src/common/function" } diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bc795a16a8..a5b4f7d226 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,7 +1,5 @@ # Node running mode, see `standalone.example.toml`. mode = "distributed" -# Whether to use in-memory catalog, see `standalone.example.toml`. -enable_memory_catalog = false # The datanode identifier, should be unique. node_id = 42 # gRPC server address, "127.0.0.1:3001" by default. @@ -71,10 +69,27 @@ auto_flush_interval = "1h" # Global write buffer size for all regions. global_write_buffer_size = "1GB" -# Procedure storage options, see `standalone.example.toml`. -[procedure] -max_retry_times = 3 -retry_delay = "500ms" +# Mito engine options +[[region_engine]] +[region_engine.mito] +# Number of region workers +num_workers = 8 +# Request channel size of each worker +worker_channel_size = 128 +# Max batch size for a worker to handle requests +worker_request_batch_size = 64 +# Number of meta action updated to trigger a new checkpoint for the manifest +manifest_checkpoint_distance = 10 +# Manifest compression type +manifest_compress_type = "Uncompressed" +# Max number of running background jobs +max_background_jobs = 4 +# Interval to auto flush a region if it has not flushed yet. +auto_flush_interval = "1h" +# Global write buffer size for all regions. +global_write_buffer_size = "1GB" +# Global write buffer size threshold to reject write requests (default 2G). +global_write_buffer_reject_size = "2GB" # Log options # [logging] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index b4d3fa30e4..5e6757777e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -1,7 +1,5 @@ # Node running mode, "standalone" or "distributed". mode = "standalone" -# Whether to use in-memory catalog, `false` by default. -enable_memory_catalog = false # Whether to enable greptimedb telemetry, true by default. enable_telemetry = true @@ -136,13 +134,6 @@ auto_flush_interval = "1h" # Global write buffer size for all regions. global_write_buffer_size = "1GB" -# Procedure storage options. -[procedure] -# Procedure max retry time. -max_retry_times = 3 -# Initial retry delay of procedures, increases exponentially -retry_delay = "500ms" - # Log options # [logging] # Specify logs directory. diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index a458e74949..1c49432352 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -23,6 +23,7 @@ chrono.workspace = true clap = { version = "3.1", features = ["derive"] } client = { workspace = true } common-base = { workspace = true } +common-config = { workspace = true } common-error = { workspace = true } common-meta = { workspace = true } common-query = { workspace = true } diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 4c9b075a5e..0ed19fc755 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -91,8 +91,6 @@ struct StartCommand { #[clap(long)] data_home: Option, #[clap(long)] - wal_dir: Option, - #[clap(long)] http_addr: Option, #[clap(long)] http_timeout: Option, @@ -146,10 +144,6 @@ impl StartCommand { opts.storage.data_home = data_home.clone(); } - if let Some(wal_dir) = &self.wal_dir { - opts.wal.dir = Some(wal_dir.clone()); - } - if let Some(http_addr) = &self.http_addr { opts.http_opts.addr = http_addr.clone(); } @@ -250,7 +244,6 @@ mod tests { assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!(Some(42), options.node_id); - assert_eq!("/other/wal", options.wal.dir.unwrap()); assert_eq!(Duration::from_secs(600), options.wal.purge_interval); assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0); assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0); @@ -426,7 +419,6 @@ mod tests { || { let command = StartCommand { config_file: Some(file.path().to_str().unwrap().to_string()), - wal_dir: Some("/other/wal/dir".to_string()), env_prefix: env_prefix.to_string(), ..Default::default() }; @@ -454,9 +446,6 @@ mod tests { // Should be read from config file, config file > env > default values. assert_eq!(opts.storage.compaction.max_purge_tasks, 32); - // Should be read from cli, cli > config file > env > default values. - assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir"); - // Should be default value. assert_eq!( opts.storage.manifest.checkpoint_margin, diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index f52100e900..7058b63c3b 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -251,9 +251,6 @@ mod tests { ] ); - // Should be the values from config file, not environment variables. - assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal"); - // Should be default values. assert_eq!(opts.node_id, None); }, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 39c109dd0d..c373524516 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,9 +16,10 @@ use std::sync::Arc; use clap::Parser; use common_base::Plugins; +use common_config::WalConfig; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; -use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig, WalConfig}; +use datanode::datanode::{Datanode, DatanodeOptions, ProcedureConfig, StorageConfig}; use datanode::instance::InstanceRef; use frontend::frontend::FrontendOptions; use frontend::instance::{FrontendInstance, Instance as FeInstance}; @@ -81,7 +82,6 @@ impl SubCommand { #[serde(default)] pub struct StandaloneOptions { pub mode: Mode, - pub enable_memory_catalog: bool, pub enable_telemetry: bool, pub http_options: HttpOptions, pub grpc_options: GrpcOptions, @@ -100,7 +100,6 @@ impl Default for StandaloneOptions { fn default() -> Self { Self { mode: Mode::Standalone, - enable_memory_catalog: false, enable_telemetry: true, http_options: HttpOptions::default(), grpc_options: GrpcOptions::default(), @@ -136,11 +135,9 @@ impl StandaloneOptions { fn datanode_options(self) -> DatanodeOptions { DatanodeOptions { - enable_memory_catalog: self.enable_memory_catalog, enable_telemetry: self.enable_telemetry, wal: self.wal, storage: self.storage, - procedure: self.procedure, ..Default::default() } } @@ -193,8 +190,6 @@ struct StartCommand { influxdb_enable: bool, #[clap(short, long)] config_file: Option, - #[clap(short = 'm', long = "memory-catalog")] - enable_memory_catalog: bool, #[clap(long)] tls_mode: Option, #[clap(long)] @@ -215,8 +210,6 @@ impl StartCommand { None, )?; - opts.enable_memory_catalog = self.enable_memory_catalog; - opts.mode = Mode::Standalone; if let Some(dir) = top_level_options.log_dir { @@ -427,7 +420,6 @@ mod tests { assert_eq!(None, fe_opts.mysql_options.reject_no_database); assert!(fe_opts.influxdb_options.enable); - assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap()); match &dn_opts.storage.store { datanode::datanode::ObjectStoreConfig::S3(s3_config) => { assert_eq!( diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml new file mode 100644 index 0000000000..d4511f124c --- /dev/null +++ b/src/common/config/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "common-config" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +common-base.workspace = true +humantime-serde.workspace = true +serde.workspace = true diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs new file mode 100644 index 0000000000..785e66ca40 --- /dev/null +++ b/src/common/config/src/lib.rs @@ -0,0 +1,46 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct WalConfig { + // wal file size in bytes + pub file_size: ReadableSize, + // wal purge threshold in bytes + pub purge_threshold: ReadableSize, + // purge interval in seconds + #[serde(with = "humantime_serde")] + pub purge_interval: Duration, + // read batch size + pub read_batch_size: usize, + // whether to sync log file after every write + pub sync_write: bool, +} + +impl Default for WalConfig { + fn default() -> Self { + Self { + file_size: ReadableSize::mb(256), // log file size 256MB + purge_threshold: ReadableSize::gb(4), // purge threshold 4GB + purge_interval: Duration::from_secs(600), + read_batch_size: 128, + sync_write: false, + } + } +} diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 48e52b4373..5b21ebb070 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -26,6 +26,7 @@ object-store = { workspace = true } orc-rust = "0.2" paste = "1.0" regex = "1.7" +serde.workspace = true snafu.workspace = true strum.workspace = true tokio-util.workspace = true diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs index 23c28786a5..21c566172c 100644 --- a/src/common/datasource/src/compression.rs +++ b/src/common/datasource/src/compression.rs @@ -20,12 +20,13 @@ use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdD use async_compression::tokio::write; use bytes::Bytes; use futures::Stream; +use serde::{Deserialize, Serialize}; use strum::EnumIter; use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use crate::error::{self, Error, Result}; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter, Serialize, Deserialize)] pub enum CompressionType { /// Gzip-ed file Gzip, diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 8a904f241d..a6e36f792a 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -19,6 +19,7 @@ bytes = "1.1" catalog = { workspace = true } common-base = { workspace = true } common-catalog = { workspace = true } +common-config = { workspace = true } common-datasource = { workspace = true } common-error = { workspace = true } common-function = { workspace = true } @@ -48,6 +49,7 @@ meta-client = { workspace = true } meta-srv = { workspace = true } metrics.workspace = true mito = { workspace = true } +mito2 = { workspace = true } object-store = { workspace = true } pin-project = "1.0" prost.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index fb7ba3df67..d9ff9756c6 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -14,18 +14,24 @@ //! Datanode configurations +use std::path::Path; use std::sync::Arc; use std::time::Duration; use catalog::local::MemoryCatalogManager; use common_base::readable_size::ReadableSize; use common_base::Plugins; +use common_config::WalConfig; use common_error::ext::BoxedError; pub use common_procedure::options::ProcedureConfig; use common_runtime::Runtime; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; +use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::MetaClientOptions; +use mito2::config::MitoConfig; +use mito2::engine::MitoEngine; +use object_store::util::normalize_dir; use query::QueryEngineFactory; use secrecy::SecretString; use serde::{Deserialize, Serialize}; @@ -38,11 +44,18 @@ use storage::config::{ DEFAULT_PICKER_SCHEDULE_INTERVAL, DEFAULT_REGION_WRITE_BUFFER_SIZE, }; use storage::scheduler::SchedulerConfig; +use store_api::logstore::LogStore; +use store_api::path_utils::WAL_DIR; +use store_api::region_engine::RegionEngineRef; +use tokio::fs; -use crate::error::{Result, RuntimeResourceSnafu, ShutdownInstanceSnafu}; +use crate::error::{ + CreateDirSnafu, OpenLogStoreSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, +}; use crate::heartbeat::HeartbeatTask; use crate::region_server::RegionServer; use crate::server::Services; +use crate::store; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); @@ -218,37 +231,6 @@ impl Default for ObjectStoreConfig { } } -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -pub struct WalConfig { - // wal directory - pub dir: Option, - // wal file size in bytes - pub file_size: ReadableSize, - // wal purge threshold in bytes - pub purge_threshold: ReadableSize, - // purge interval in seconds - #[serde(with = "humantime_serde")] - pub purge_interval: Duration, - // read batch size - pub read_batch_size: usize, - // whether to sync log file after every write - pub sync_write: bool, -} - -impl Default for WalConfig { - fn default() -> Self { - Self { - dir: None, - file_size: ReadableSize::mb(256), // log file size 256MB - purge_threshold: ReadableSize::gb(4), // purge threshold 4GB - purge_interval: Duration::from_secs(600), - read_batch_size: 128, - sync_write: false, - } - } -} - /// Options for region manifest #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] #[serde(default)] @@ -360,7 +342,6 @@ impl From<&DatanodeOptions> for StorageEngineConfig { #[serde(default)] pub struct DatanodeOptions { pub mode: Mode, - pub enable_memory_catalog: bool, pub node_id: Option, pub rpc_addr: String, pub rpc_hostname: Option, @@ -370,7 +351,8 @@ pub struct DatanodeOptions { pub meta_client_options: Option, pub wal: WalConfig, pub storage: StorageConfig, - pub procedure: ProcedureConfig, + /// Options for different store engines. + pub region_engine: Vec, pub logging: LoggingOptions, pub enable_telemetry: bool, } @@ -379,7 +361,6 @@ impl Default for DatanodeOptions { fn default() -> Self { Self { mode: Mode::Standalone, - enable_memory_catalog: false, node_id: None, rpc_addr: "127.0.0.1:3001".to_string(), rpc_hostname: None, @@ -388,7 +369,7 @@ impl Default for DatanodeOptions { meta_client_options: None, wal: WalConfig::default(), storage: StorageConfig::default(), - procedure: ProcedureConfig::default(), + region_engine: vec![], logging: LoggingOptions::default(), heartbeat: HeartbeatOptions::default(), enable_telemetry: true, @@ -406,6 +387,12 @@ impl DatanodeOptions { } } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum RegionEngineConfig { + #[serde(rename = "mito")] + Mito(MitoConfig), +} + /// Datanode service. pub struct Datanode { opts: DatanodeOptions, @@ -433,7 +420,13 @@ impl Datanode { .context(RuntimeResourceSnafu)?, ); - let region_server = RegionServer::new(query_engine, runtime); + let mut region_server = RegionServer::new(query_engine, runtime); + let log_store = Self::build_log_store(&opts).await?; + let object_store = store::new_object_store(&opts).await?; + let engines = Self::build_store_engines(&opts, log_store, object_store).await?; + for engine in engines { + region_server.register_engine(engine); + } // build optional things with different modes let services = match opts.mode { @@ -489,6 +482,51 @@ impl Datanode { } Ok(()) } + + // internal utils + + /// Build [RaftEngineLogStore] + async fn build_log_store(opts: &DatanodeOptions) -> Result> { + let data_home = normalize_dir(&opts.storage.data_home); + let wal_dir = format!("{}{WAL_DIR}", data_home); + let wal_config = opts.wal.clone(); + + // create WAL directory + fs::create_dir_all(Path::new(&wal_dir)) + .await + .context(CreateDirSnafu { dir: &wal_dir })?; + info!( + "Creating logstore with config: {:?} and storage path: {}", + wal_config, &wal_dir + ); + let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config) + .await + .map_err(Box::new) + .context(OpenLogStoreSnafu)?; + Ok(Arc::new(logstore)) + } + + /// Build [RegionEngineRef] from `store_engine` section in `opts` + async fn build_store_engines( + opts: &DatanodeOptions, + log_store: Arc, + object_store: object_store::ObjectStore, + ) -> Result> + where + S: LogStore, + { + let mut engines = vec![]; + for engine in &opts.region_engine { + match engine { + RegionEngineConfig::Mito(config) => { + let engine: MitoEngine = + MitoEngine::new(config.clone(), log_store.clone(), object_store.clone()); + engines.push(Arc::new(engine) as _); + } + } + } + Ok(engines) + } } #[cfg(test)] diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index f55e9bc7a0..4985c2bb17 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -20,9 +20,10 @@ use std::{fs, path}; use api::v1::meta::Role; use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; -use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; +use catalog::CatalogManagerRef; use common_base::Plugins; -use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; +use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_config::WalConfig; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -35,7 +36,6 @@ use common_procedure::ProcedureManagerRef; use common_telemetry::logging::{debug, info}; use file_table_engine::engine::immutable::ImmutableFileTableEngine; use log_store::raft_engine::log_store::RaftEngineLogStore; -use log_store::LogConfig; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; use mito::config::EngineConfig as TableEngineConfig; @@ -54,10 +54,9 @@ use store_api::path_utils::{CLUSTER_DIR, WAL_DIR}; use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; use table::engine::{TableEngine, TableEngineProcedureRef}; use table::requests::FlushTableRequest; -use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig}; +use crate::datanode::{DatanodeOptions, ProcedureConfig}; use crate::error::{ self, CatalogSnafu, IncorrectInternalStateSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, @@ -159,9 +158,8 @@ impl Instance { ) -> Result<(InstanceRef, Option)> { let data_home = util::normalize_dir(&opts.storage.data_home); info!("The working home directory is: {}", data_home); - let object_store = store::new_object_store(&data_home, &opts.storage.store).await?; - let log_store = - Arc::new(create_log_store(&data_home, &opts.storage.store, &opts.wal).await?); + let object_store = store::new_object_store(opts).await?; + let log_store = Arc::new(create_log_store(&data_home, opts.wal.clone()).await?); let mito_engine = Arc::new(DefaultEngine::new( TableEngineConfig { @@ -203,39 +201,17 @@ impl Instance { // create remote catalog manager let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode { Mode::Standalone => { - if opts.enable_memory_catalog { - let catalog = catalog::local::MemoryCatalogManager::with_default_setup(); - let table = NumbersTable::table(MIN_USER_TABLE_ID); - - let _ = catalog - .register_table(RegisterTableRequest { - table_id: MIN_USER_TABLE_ID, - table_name: table.table_info().name.to_string(), - table, - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - }) + let catalog = Arc::new( + catalog::local::LocalCatalogManager::try_new(engine_manager.clone()) .await - .expect("Failed to register numbers"); + .context(CatalogSnafu)?, + ); - ( - catalog.clone() as CatalogManagerRef, - Some(catalog as TableIdProviderRef), - None, - ) - } else { - let catalog = Arc::new( - catalog::local::LocalCatalogManager::try_new(engine_manager.clone()) - .await - .context(CatalogSnafu)?, - ); - - ( - catalog.clone() as CatalogManagerRef, - Some(catalog as TableIdProviderRef), - None, - ) - } + ( + catalog.clone() as CatalogManagerRef, + Some(catalog as TableIdProviderRef), + None, + ) } Mode::Distributed => { @@ -273,9 +249,12 @@ impl Instance { plugins, ); let query_engine = factory.query_engine(); - let procedure_manager = - create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store) - .await?; + let procedure_manager = create_procedure_manager( + opts.node_id.unwrap_or(0), + &ProcedureConfig::default(), + object_store, + ) + .await?; let sql_handler = SqlHandler::new( engine_manager.clone(), catalog_manager.clone(), @@ -451,16 +430,9 @@ pub async fn new_metasrv_client( pub(crate) async fn create_log_store( data_home: &str, - store_config: &ObjectStoreConfig, - wal_config: &WalConfig, + wal_config: WalConfig, ) -> Result { - let wal_dir = match (&wal_config.dir, store_config) { - (Some(dir), _) => dir.to_string(), - (None, ObjectStoreConfig::File(_file_config)) => { - format!("{}{WAL_DIR}", data_home) - } - _ => return error::MissingWalDirConfigSnafu {}.fail(), - }; + let wal_dir = format!("{}{WAL_DIR}", data_home); // create WAL directory fs::create_dir_all(path::Path::new(&wal_dir)) @@ -469,16 +441,7 @@ pub(crate) async fn create_log_store( "Creating logstore with config: {:?} and storage path: {}", wal_config, &wal_dir ); - let log_config = LogConfig { - file_size: wal_config.file_size.0, - log_file_dir: wal_dir, - purge_interval: wal_config.purge_interval, - purge_threshold: wal_config.purge_threshold.0, - read_batch_size: wal_config.read_batch_size, - sync_write: wal_config.sync_write, - }; - - let logstore = RaftEngineLogStore::try_new(log_config) + let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config) .await .map_err(Box::new) .context(OpenLogStoreSnafu)?; diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index cae8e69705..a9d13050a1 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -28,19 +28,18 @@ use common_base::readable_size::ReadableSize; use common_telemetry::logging::info; use object_store::layers::{LoggingLayer, LruCacheLayer, MetricsLayer, RetryLayer, TracingLayer}; use object_store::services::Fs as FsBuilder; +use object_store::util::normalize_dir; use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; -use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; use crate::error::{self, Result}; -pub(crate) async fn new_object_store( - data_home: &str, - store_config: &ObjectStoreConfig, -) -> Result { - let object_store = match store_config { +pub(crate) async fn new_object_store(opts: &DatanodeOptions) -> Result { + let data_home = normalize_dir(&opts.storage.data_home); + let object_store = match &opts.storage.store { ObjectStoreConfig::File(file_config) => { - fs::new_fs_object_store(data_home, file_config).await + fs::new_fs_object_store(&data_home, file_config).await } ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await, ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await, @@ -51,8 +50,9 @@ pub(crate) async fn new_object_store( }?; // Enable retry layer and cache layer for non-fs object storages - let object_store = if !matches!(store_config, ObjectStoreConfig::File(..)) { - let object_store = create_object_store_with_cache(object_store, store_config).await?; + let object_store = if !matches!(opts.storage.store, ObjectStoreConfig::File(..)) { + let object_store = + create_object_store_with_cache(object_store, &opts.storage.store).await?; object_store.layer(RetryLayer::new().with_jitter()) } else { object_store diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 9176f5fb6d..28b597721b 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -16,6 +16,7 @@ use catalog::RegisterTableRequest; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, }; +use common_config::WalConfig; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; @@ -25,9 +26,7 @@ use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; use table::TableRef; -use crate::datanode::{ - DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig, -}; +use crate::datanode::{DatanodeOptions, FileConfig, ObjectStoreConfig, StorageConfig}; use crate::error::{CreateTableSnafu, Result}; use crate::heartbeat::HeartbeatTask; use crate::instance::{Instance, InstanceRef}; @@ -69,17 +68,13 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let opts = DatanodeOptions { - wal: WalConfig { - dir: Some(wal_tmp_dir.path().to_str().unwrap().to_string()), - ..Default::default() - }, + wal: WalConfig::default(), storage: StorageConfig { data_home: data_tmp_dir.path().to_str().unwrap().to_string(), store: ObjectStoreConfig::File(FileConfig {}), ..Default::default() }, mode: Mode::Standalone, - procedure: ProcedureConfig::default(), ..Default::default() }; ( diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 56e4dfd4d7..0ad9a63d35 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -16,6 +16,7 @@ base64 = "0.13" byteorder = "1.4" bytes = "1.1" common-base = { workspace = true } +common-config = { workspace = true } common-error = { workspace = true } common-meta = { workspace = true } common-runtime = { workspace = true } diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 7f2bbfc3c5..4c1d80d5a4 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -14,11 +14,9 @@ #![feature(let_chains)] -mod config; pub mod error; mod noop; pub mod raft_engine; pub mod test_util; -pub use config::LogConfig; pub use noop::NoopLogStore; diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 68a0601249..7f43a8b8c8 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -16,6 +16,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use async_stream::stream; +use common_config::WalConfig; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; @@ -25,7 +26,6 @@ use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Namespace as NamespaceTrait; use store_api::logstore::{AppendResponse, LogStore}; -use crate::config::LogConfig; use crate::error; use crate::error::{ AddEntryLogBatchSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, @@ -37,7 +37,7 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace const NAMESPACE_PREFIX: &str = "$sys/"; pub struct RaftEngineLogStore { - config: LogConfig, + config: WalConfig, engine: Arc, gc_task: RepeatedTask, } @@ -72,13 +72,13 @@ impl TaskFunction for PurgeExpiredFilesFunction { } impl RaftEngineLogStore { - pub async fn try_new(config: LogConfig) -> Result { + pub async fn try_new(dir: String, config: WalConfig) -> Result { let raft_engine_config = Config { - dir: config.log_file_dir.clone(), - purge_threshold: ReadableSize(config.purge_threshold), + dir, + purge_threshold: ReadableSize(config.purge_threshold.0), recovery_mode: RecoveryMode::TolerateTailCorruption, batch_compression_threshold: ReadableSize::kb(8), - target_file_size: ReadableSize(config.file_size), + target_file_size: ReadableSize(config.file_size.0), ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); @@ -368,15 +368,15 @@ mod tests { use std::collections::HashSet; use std::time::Duration; + use common_base::readable_size::ReadableSize; use common_telemetry::debug; use common_test_util::temp_dir::create_temp_dir; use futures_util::StreamExt; - use raft_engine::ReadableSize; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Namespace as NamespaceTrait; use store_api::logstore::LogStore; - use crate::config::LogConfig; + use super::*; use crate::error::Error; use crate::raft_engine::log_store::RaftEngineLogStore; use crate::raft_engine::protos::logstore::{EntryImpl as Entry, NamespaceImpl as Namespace}; @@ -384,10 +384,10 @@ mod tests { #[tokio::test] async fn test_open_logstore() { let dir = create_temp_dir("raft-engine-logstore-test"); - let logstore = RaftEngineLogStore::try_new(LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }) + let logstore = RaftEngineLogStore::try_new( + dir.path().to_str().unwrap().to_string(), + WalConfig::default(), + ) .await .unwrap(); let namespaces = logstore.list_namespaces().await.unwrap(); @@ -397,10 +397,10 @@ mod tests { #[tokio::test] async fn test_manage_namespace() { let dir = create_temp_dir("raft-engine-logstore-test"); - let logstore = RaftEngineLogStore::try_new(LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }) + let logstore = RaftEngineLogStore::try_new( + dir.path().to_str().unwrap().to_string(), + WalConfig::default(), + ) .await .unwrap(); assert!(logstore.list_namespaces().await.unwrap().is_empty()); @@ -423,10 +423,10 @@ mod tests { #[tokio::test] async fn test_append_and_read() { let dir = create_temp_dir("raft-engine-logstore-test"); - let logstore = RaftEngineLogStore::try_new(LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }) + let logstore = RaftEngineLogStore::try_new( + dir.path().to_str().unwrap().to_string(), + WalConfig::default(), + ) .await .unwrap(); @@ -464,10 +464,10 @@ mod tests { async fn test_reopen() { let dir = create_temp_dir("raft-engine-logstore-reopen-test"); { - let logstore = RaftEngineLogStore::try_new(LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }) + let logstore = RaftEngineLogStore::try_new( + dir.path().to_str().unwrap().to_string(), + WalConfig::default(), + ) .await .unwrap(); assert!(logstore @@ -484,10 +484,10 @@ mod tests { logstore.stop().await.unwrap(); } - let logstore = RaftEngineLogStore::try_new(LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - ..Default::default() - }) + let logstore = RaftEngineLogStore::try_new( + dir.path().to_str().unwrap().to_string(), + WalConfig::default(), + ) .await .unwrap(); @@ -519,16 +519,16 @@ mod tests { async fn test_compaction() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("raft-engine-logstore-test"); + let path = dir.path().to_str().unwrap().to_string(); - let config = LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - file_size: ReadableSize::mb(2).0, - purge_threshold: ReadableSize::mb(4).0, + let config = WalConfig { + file_size: ReadableSize::mb(2), + purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); + let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..4096 { let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); @@ -551,16 +551,16 @@ mod tests { async fn test_obsolete() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("raft-engine-logstore-test"); + let path = dir.path().to_str().unwrap().to_string(); - let config = LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - file_size: ReadableSize::mb(2).0, - purge_threshold: ReadableSize::mb(4).0, + let config = WalConfig { + file_size: ReadableSize::mb(2), + purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); + let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); let namespace = Namespace::with_id(42); for id in 0..1024 { let entry = Entry::create(id, namespace.id(), [b'x'; 4096].to_vec()); @@ -580,16 +580,16 @@ mod tests { async fn test_append_batch() { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); + let path = dir.path().to_str().unwrap().to_string(); - let config = LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - file_size: ReadableSize::mb(2).0, - purge_threshold: ReadableSize::mb(4).0, + let config = WalConfig { + file_size: ReadableSize::mb(2), + purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); + let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); let entries = (0..8) .flat_map(|ns_id| { @@ -612,15 +612,15 @@ mod tests { common_telemetry::init_default_ut_logging(); let dir = create_temp_dir("logstore-append-batch-test"); - let config = LogConfig { - log_file_dir: dir.path().to_str().unwrap().to_string(), - file_size: ReadableSize::mb(2).0, - purge_threshold: ReadableSize::mb(4).0, + let path = dir.path().to_str().unwrap().to_string(); + let config = WalConfig { + file_size: ReadableSize::mb(2), + purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), ..Default::default() }; - let logstore = RaftEngineLogStore::try_new(config).await.unwrap(); + let logstore = RaftEngineLogStore::try_new(path, config).await.unwrap(); let entries = vec![ Entry::create(0, 0, [b'0'; 4096].to_vec()), diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index e97b9cb1ce..652c899c88 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -14,15 +14,17 @@ use std::path::Path; +use common_base::readable_size::ReadableSize; +use common_config::WalConfig; + use crate::raft_engine::log_store::RaftEngineLogStore; -use crate::LogConfig; /// Create a write log for the provided path, used for test. pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEngineLogStore { - let cfg = LogConfig { - file_size: 128 * 1024, - log_file_dir: path.as_ref().display().to_string(), + let path = path.as_ref().display().to_string(); + let cfg = WalConfig { + file_size: ReadableSize::kb(128), ..Default::default() }; - RaftEngineLogStore::try_new(cfg).await.unwrap() + RaftEngineLogStore::try_new(path, cfg).await.unwrap() } diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index af1ae7e61b..f455c53a0e 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -35,6 +35,7 @@ datafusion-common.workspace = true datafusion.workspace = true datatypes = { workspace = true } futures.workspace = true +humantime-serde = { workspace = true } lazy_static = "1.4" log-store = { workspace = true } memcomparable = "0.2" diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 99b9ff9ffb..868b6d6647 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -19,6 +19,7 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; use common_telemetry::warn; +use serde::{Deserialize, Serialize}; /// Default region worker num. const DEFAULT_NUM_WORKERS: usize = 1; @@ -28,7 +29,7 @@ const DEFAULT_MAX_BG_JOB: usize = 4; pub(crate) const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(32); /// Configuration for [MitoEngine](crate::engine::MitoEngine). -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct MitoConfig { // Worker configs: /// Number of region workers (default 1). @@ -51,6 +52,7 @@ pub struct MitoConfig { // Flush configs: /// Interval to auto flush a region if it has not flushed yet (default 30 min). + #[serde(with = "humantime_serde")] pub auto_flush_interval: Duration, /// Global write buffer size threshold to trigger flush (default 512M). pub global_write_buffer_size: ReadableSize, diff --git a/src/script/Cargo.toml b/src/script/Cargo.toml index 20dc96b09f..bc002c681e 100644 --- a/src/script/Cargo.toml +++ b/src/script/Cargo.toml @@ -28,6 +28,7 @@ arrow.workspace = true async-trait.workspace = true catalog = { workspace = true } common-catalog = { workspace = true } +common-config = { workspace = true } common-error = { workspace = true } common-function = { workspace = true } common-query = { workspace = true } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index f202ae60c6..9a7b4130e9 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -123,35 +123,32 @@ impl ScriptManager { #[cfg(test)] mod tests { use catalog::CatalogManager; - use mito::config::EngineConfig as TableEngineConfig; - use mito::table::test_util::new_test_object_store; - use query::QueryEngineFactory; - use table::engine::manager::MemoryTableEngineManager; - - use super::*; - type DefaultEngine = MitoEngine>; - + use common_config::WalConfig; use common_test_util::temp_dir::create_temp_dir; use log_store::raft_engine::log_store::RaftEngineLogStore; - use log_store::LogConfig; + use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; + use mito::table::test_util::new_test_object_store; + use query::QueryEngineFactory; use storage::compaction::noop::NoopCompactionScheduler; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; + use table::engine::manager::MemoryTableEngineManager; + + use super::*; + + type DefaultEngine = MitoEngine>; #[tokio::test] async fn test_insert_find_compile_script() { let wal_dir = create_temp_dir("test_insert_find_compile_script_wal"); - let wal_dir_str = wal_dir.path().to_string_lossy(); + let wal_dir_str = wal_dir.path().to_string_lossy().to_string(); common_telemetry::init_default_ut_logging(); let (_dir, object_store) = new_test_object_store("test_insert_find_compile_script").await; - let log_config = LogConfig { - log_file_dir: wal_dir_str.to_string(), - ..Default::default() - }; - - let log_store = RaftEngineLogStore::try_new(log_config).await.unwrap(); + let log_store = RaftEngineLogStore::try_new(wal_dir_str, WalConfig::default()) + .await + .unwrap(); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); let mock_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index ce6eb012b4..547a76019b 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -14,6 +14,7 @@ async-stream.workspace = true async-trait = "0.1" bytes = "1.1" common-base = { workspace = true } +common-config = { workspace = true } common-datasource = { workspace = true } common-error = { workspace = true } common-query = { workspace = true } diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index 45ddc177b5..c71a23feb7 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -14,9 +14,9 @@ use std::sync::Arc; +use common_config::WalConfig; use common_datasource::compression::CompressionType; use log_store::raft_engine::log_store::RaftEngineLogStore; -use log_store::LogConfig; use object_store::services::Fs; use object_store::ObjectStore; use store_api::manifest::Manifest; @@ -91,11 +91,11 @@ pub async fn new_store_config_with_object_store( None, ); manifest.start().await.unwrap(); - let log_config = LogConfig { - log_file_dir: log_store_dir(store_dir), - ..Default::default() - }; - let log_store = Arc::new(RaftEngineLogStore::try_new(log_config).await.unwrap()); + let log_store = Arc::new( + RaftEngineLogStore::try_new(log_store_dir(store_dir), WalConfig::default()) + .await + .unwrap(), + ); let compaction_scheduler = Arc::new(LocalScheduler::new( SchedulerConfig::default(),