From 0d660e45cf71fc233d0c29e22840c7121b82863a Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Mon, 9 Jan 2023 13:02:30 +0800 Subject: [PATCH] feat: wal config --- config/datanode.example.toml | 8 +++++ config/standalone.example.toml | 10 +++++- src/cmd/src/datanode.rs | 4 +-- src/cmd/src/standalone.rs | 8 ++--- src/datanode/src/datanode.rs | 33 +++++++++++++++++-- src/datanode/src/instance.rs | 23 +++++++------ src/datanode/src/mock.rs | 2 +- src/datanode/src/tests/test_util.rs | 7 ++-- src/frontend/src/tests.rs | 12 +++++-- src/log-store/src/config.rs | 15 ++++----- src/log-store/src/raft_engine/log_store.rs | 12 +++---- src/log-store/src/test_util/log_store_util.rs | 3 +- tests-integration/src/test_util.rs | 7 ++-- 13 files changed, 100 insertions(+), 44 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a8ed29da4b..8a4999d580 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -7,6 +7,14 @@ mysql_addr = '127.0.0.1:4406' mysql_runtime_size = 4 enable_memory_catalog = false +[wal] +dir = "/tmp/greptimedb/wal" +file_size = 1073741824 +purge_interval = 600 +purge_threshold = 53687091200 +read_batch_size = 128 +sync_write = false + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 54587a6e4d..f2cca574da 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -1,12 +1,20 @@ node_id = 0 mode = 'standalone' -wal_dir = '/tmp/greptimedb/wal/' enable_memory_catalog = false [http_options] addr = '127.0.0.1:4000' timeout = "30s" +[wal] +dir = "/tmp/greptimedb/wal" +file_size = 1073741824 +purge_interval = 600 +purge_threshold = 53687091200 +read_batch_size = 128 +sync_write = false + + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 4afc1ea619..ea454b2258 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -125,7 +125,7 @@ impl TryFrom for DatanodeOptions { } if let Some(wal_dir) = cmd.wal_dir { - opts.wal_dir = wal_dir; + opts.wal.dir = wal_dir; } Ok(opts) } @@ -151,7 +151,7 @@ mod tests { }; let options: DatanodeOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); - assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir); + assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal.dir); assert_eq!("127.0.0.1:4406".to_string(), options.mysql_addr); assert_eq!(4, options.mysql_runtime_size); let MetaClientOpts { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 83809f0b43..a19a070602 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use clap::Parser; use common_telemetry::info; -use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig}; use datanode::instance::InstanceRef; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; @@ -72,7 +72,7 @@ pub struct StandaloneOptions { pub influxdb_options: Option, pub prometheus_options: Option, pub mode: Mode, - pub wal_dir: String, + pub wal: WalConfig, pub storage: ObjectStoreConfig, pub enable_memory_catalog: bool, } @@ -88,7 +88,7 @@ impl Default for StandaloneOptions { influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), mode: Mode::Standalone, - wal_dir: "/tmp/greptimedb/wal".to_string(), + wal: WalConfig::default(), storage: ObjectStoreConfig::default(), enable_memory_catalog: false, } @@ -112,7 +112,7 @@ impl StandaloneOptions { fn datanode_options(self) -> DatanodeOptions { DatanodeOptions { - wal_dir: self.wal_dir, + wal: self.wal, storage: self.storage, enable_memory_catalog: self.enable_memory_catalog, ..Default::default() diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index ccc8b0d3c6..7551bce122 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -45,6 +45,35 @@ impl Default for ObjectStoreConfig { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalConfig { + // wal directory + pub dir: String, + // wal file size in bytes + pub file_size: usize, + // wal purge threshold in bytes + pub purge_threshold: usize, + // purge interval in seconds + pub purge_interval: u64, + // read batch size + pub read_batch_size: usize, + // whether to sync log file after every write + pub sync_write: bool, +} + +impl Default for WalConfig { + fn default() -> Self { + Self { + dir: "/tmp/greptimedb/wal".to_string(), + file_size: 1024 * 1024 * 1024, // log file size 1G + purge_threshold: 1024 * 1024 * 1024 * 50, // purge threshold 50G + purge_interval: 600, + read_batch_size: 128, + sync_write: false, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DatanodeOptions { pub node_id: Option, @@ -53,7 +82,7 @@ pub struct DatanodeOptions { pub mysql_addr: String, pub mysql_runtime_size: usize, pub meta_client_opts: Option, - pub wal_dir: String, + pub wal: WalConfig, pub storage: ObjectStoreConfig, pub enable_memory_catalog: bool, pub mode: Mode, @@ -68,7 +97,7 @@ impl Default for DatanodeOptions { mysql_addr: "127.0.0.1:4406".to_string(), mysql_runtime_size: 2, meta_client_opts: None, - wal_dir: "/tmp/greptimedb/wal".to_string(), + wal: WalConfig::default(), storage: ObjectStoreConfig::default(), enable_memory_catalog: false, mode: Mode::Standalone, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index fc4ff605fe..05259fd2dc 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -38,7 +38,7 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::table::TableIdProviderRef; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, Result, @@ -68,7 +68,7 @@ pub type InstanceRef = Arc; impl Instance { pub async fn new(opts: &DatanodeOptions) -> Result { let object_store = new_object_store(&opts.storage).await?; - let logstore = Arc::new(create_log_store(&opts.wal_dir).await?); + let logstore = Arc::new(create_log_store(&opts.wal).await?); let meta_client = match opts.mode { Mode::Standalone => None, @@ -271,16 +271,19 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul Ok(meta_client) } -pub(crate) async fn create_log_store(path: impl AsRef) -> Result { - let path = path.as_ref(); +pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result { // create WAL directory - fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?; - - info!("The WAL directory is: {}", path); - + fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu { + dir: &wal_config.dir, + })?; + info!("Creating logstore with config: {:?}", wal_config); let log_config = LogConfig { - log_file_dir: path.to_string(), - ..Default::default() + file_size: wal_config.file_size, + log_file_dir: wal_config.dir.clone(), + purge_interval: Duration::from_secs(wal_config.purge_interval), + purge_threshold: wal_config.purge_threshold, + read_batch_size: wal_config.read_batch_size, + sync_write: wal_config.sync_write, }; let logstore = RaftEngineLogStore::try_new(log_config) diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 6715911d23..7c14ab1ea6 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -41,7 +41,7 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let object_store = new_object_store(&opts.storage).await?; - let logstore = Arc::new(create_log_store(&opts.wal_dir).await?); + let logstore = Arc::new(create_log_store(&opts.wal).await?); let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 9fba0ff5f1..121d21d960 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -28,7 +28,7 @@ use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; use tempdir::TempDir; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use crate::error::{CreateTableSnafu, Result}; use crate::sql::SqlHandler; @@ -43,7 +43,10 @@ pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGua let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap(); let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap(); let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index d6a074373b..f2969de00b 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -20,7 +20,7 @@ use catalog::remote::MetaKvBackend; use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use datanode::instance::Instance as DatanodeInstance; use meta_client::client::MetaClientBuilder; use meta_client::rpc::Peer; @@ -62,7 +62,10 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap(); let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap(); let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, @@ -142,7 +145,10 @@ async fn create_dist_datanode_instance( let data_tmp_dir = TempDir::new_in("/tmp", &format!("dist_datanode-data-{current}")).unwrap(); let opts = DatanodeOptions { node_id: Some(datanode_id), - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, diff --git a/src/log-store/src/config.rs b/src/log-store/src/config.rs index d6e0063237..9dd55520bd 100644 --- a/src/log-store/src/config.rs +++ b/src/log-store/src/config.rs @@ -16,10 +16,9 @@ use std::time::Duration; #[derive(Debug, Clone)] pub struct LogConfig { - pub append_buffer_size: usize, - pub max_log_file_size: usize, + pub file_size: usize, pub log_file_dir: String, - pub gc_interval: Duration, + pub purge_interval: Duration, pub purge_threshold: usize, pub read_batch_size: usize, pub sync_write: bool, @@ -30,10 +29,9 @@ impl Default for LogConfig { /// in tests. fn default() -> Self { Self { - append_buffer_size: 128, - max_log_file_size: 1024 * 1024 * 1024, + file_size: 1024 * 1024 * 1024, log_file_dir: "/tmp/greptimedb".to_string(), - gc_interval: Duration::from_secs(10 * 60), + purge_interval: Duration::from_secs(10 * 60), purge_threshold: 1024 * 1024 * 1024 * 50, read_batch_size: 128, sync_write: false, @@ -52,9 +50,8 @@ mod tests { common_telemetry::logging::init_default_ut_logging(); let default = LogConfig::default(); info!("LogConfig::default(): {:?}", default); - assert_eq!(1024 * 1024 * 1024, default.max_log_file_size); - assert_eq!(128, default.append_buffer_size); - assert_eq!(Duration::from_secs(600), default.gc_interval); + assert_eq!(1024 * 1024 * 1024, default.file_size); + assert_eq!(Duration::from_secs(600), default.purge_interval); assert_eq!(1024 * 1024 * 1024 * 50, default.purge_threshold); assert_eq!(128, default.read_batch_size); assert!(!default.sync_write); diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 8d03594c55..f8c1c50358 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -54,7 +54,7 @@ impl RaftEngineLogStore { purge_threshold: ReadableSize(config.purge_threshold as u64), recovery_mode: RecoveryMode::TolerateTailCorruption, batch_compression_threshold: ReadableSize::kb(8), - target_file_size: ReadableSize(config.max_log_file_size as u64), + target_file_size: ReadableSize(config.file_size as u64), ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); @@ -75,7 +75,7 @@ impl RaftEngineLogStore { async fn start(&self) -> Result<(), Error> { let engine_clone = self.engine.clone(); - let interval = self.config.gc_interval; + let interval = self.config.purge_interval; let token = CancellationToken::new(); let child = token.child_token(); // TODO(hl): Maybe spawn to a blocking runtime. @@ -495,9 +495,9 @@ mod tests { let config = LogConfig { log_file_dir: dir.path().to_str().unwrap().to_string(), - max_log_file_size: ReadableSize::mb(2).0 as usize, + file_size: ReadableSize::mb(2).0 as usize, purge_threshold: ReadableSize::mb(4).0 as usize, - gc_interval: Duration::from_secs(5), + purge_interval: Duration::from_secs(5), ..Default::default() }; @@ -528,9 +528,9 @@ mod tests { let config = LogConfig { log_file_dir: dir.path().to_str().unwrap().to_string(), - max_log_file_size: ReadableSize::mb(2).0 as usize, + file_size: ReadableSize::mb(2).0 as usize, purge_threshold: ReadableSize::mb(4).0 as usize, - gc_interval: Duration::from_secs(5), + purge_interval: Duration::from_secs(5), ..Default::default() }; diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index 81a999a713..a6ceedc23c 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -22,8 +22,7 @@ use crate::LogConfig; pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, TempDir) { let dir = TempDir::new(dir).unwrap(); let cfg = LogConfig { - append_buffer_size: 128, - max_log_file_size: 128, + file_size: 128 * 1024, log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 67feb62269..6c2c9c5042 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -23,7 +23,7 @@ use axum::Router; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_runtime::Builder as RuntimeBuilder; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; @@ -147,7 +147,10 @@ pub fn create_tmp_dir_and_datanode_opts( let (storage, data_tmp_dir) = get_test_store_config(&store_type, name); let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage, mode: Mode::Standalone, ..Default::default()