From 208afe402b4b38b3b6c7973017a01f1728504f9d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 9 Sep 2024 12:25:24 +0800 Subject: [PATCH] feat(wal): increase recovery parallelism (#4689) * Refactor RaftEngineLogStore to use references for config - Updated `RaftEngineLogStore::try_new` to accept a reference to `RaftEngineConfig` instead of taking ownership. - Replaced direct usage of `config` with individual fields (`sync_write`, `sync_period`, `read_batch_size`). - Adjusted test cases to pass references to `RaftEngineConfig`. * Add parallelism configuration for WAL recovery - Introduced `recovery_parallelism` setting in `datanode.example.toml` and `standalone.example.toml` for configuring parallelism during WAL recovery. - Updated `Cargo.lock` and `Cargo.toml` to include `num_cpus` dependency. - Modified `RaftEngineConfig` to include `recovery_parallelism` with a default value set to the number of CP * feat/wal-recovery-parallelism: Add `wal.recovery_parallelism` configuration option - Introduced `wal.recovery_parallelism` to config.md for specifying parallelism during WAL recovery. - Updated `RaftEngineLogStore` to include `recovery_threads` from the new configuration. * fix: ut --- Cargo.lock | 1 + config/config.md | 2 ++ config/datanode.example.toml | 3 ++ config/standalone.example.toml | 3 ++ src/cmd/tests/load_config_test.rs | 2 ++ src/common/wal/Cargo.toml | 1 + src/common/wal/src/config/raft_engine.rs | 3 ++ src/datanode/src/datanode.rs | 2 +- src/log-store/src/raft_engine/log_store.rs | 34 ++++++++++++------- src/log-store/src/test_util/log_store_util.rs | 2 +- tests-integration/tests/http.rs | 1 + 11 files changed, 39 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d483ec7088..7065971231 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2293,6 +2293,7 @@ dependencies = [ "common-telemetry", "futures-util", "humantime-serde", + "num_cpus", "rskafka", "rustls 0.23.10", "rustls-native-certs", diff --git a/config/config.md b/config/config.md index f0ee9e54f8..a792be5de5 100644 --- a/config/config.md +++ b/config/config.md @@ -68,6 +68,7 @@ | `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. | | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | +| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | | `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` | | `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. | @@ -381,6 +382,7 @@ | `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. | | `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. | | `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. | +| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. | | `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. | | `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. | | `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 07c1df3e2a..14fbf914e7 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -170,6 +170,9 @@ prefill_log_files = false ## **It's only used when the provider is `raft_engine`**. sync_period = "10s" +## Parallelism during WAL recovery. +recovery_parallelism = 2 + ## The Kafka broker endpoints. ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index f36c0e2904..f7c7b2af29 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -174,6 +174,9 @@ prefill_log_files = false ## **It's only used when the provider is `raft_engine`**. sync_period = "10s" +## Parallelism during WAL recovery. +recovery_parallelism = 2 + ## The Kafka broker endpoints. ## **It's only used when the provider is `kafka`**. broker_endpoints = ["127.0.0.1:9092"] diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs index 199e23717d..78d0786f7c 100644 --- a/src/cmd/tests/load_config_test.rs +++ b/src/cmd/tests/load_config_test.rs @@ -65,6 +65,7 @@ fn test_load_datanode_example_config() { wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some("/tmp/greptimedb/wal".to_string()), sync_period: Some(Duration::from_secs(10)), + recovery_parallelism: 2, ..Default::default() }), storage: StorageConfig { @@ -207,6 +208,7 @@ fn test_load_standalone_example_config() { wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig { dir: Some("/tmp/greptimedb/wal".to_string()), sync_period: Some(Duration::from_secs(10)), + recovery_parallelism: 2, ..Default::default() }), region_engine: vec![ diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml index 0bced0dd38..202b2825e3 100644 --- a/src/common/wal/Cargo.toml +++ b/src/common/wal/Cargo.toml @@ -17,6 +17,7 @@ common-macro.workspace = true common-telemetry.workspace = true futures-util.workspace = true humantime-serde.workspace = true +num_cpus.workspace = true rskafka.workspace = true rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] } rustls-native-certs = "0.7" diff --git a/src/common/wal/src/config/raft_engine.rs b/src/common/wal/src/config/raft_engine.rs index f54e3f1ba5..af5daa9d38 100644 --- a/src/common/wal/src/config/raft_engine.rs +++ b/src/common/wal/src/config/raft_engine.rs @@ -41,6 +41,8 @@ pub struct RaftEngineConfig { /// Duration for fsyncing log files. #[serde(with = "humantime_serde")] pub sync_period: Option, + /// Parallelism during log recovery. + pub recovery_parallelism: usize, } impl Default for RaftEngineConfig { @@ -55,6 +57,7 @@ impl Default for RaftEngineConfig { enable_log_recycle: true, prefill_log_files: false, sync_period: None, + recovery_parallelism: num_cpus::get(), } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 09bf901d37..149aa44ebe 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -454,7 +454,7 @@ impl DatanodeBuilder { "Creating raft-engine logstore with config: {:?} and storage path: {}", config, &wal_dir ); - let logstore = RaftEngineLogStore::try_new(wal_dir, config.clone()) + let logstore = RaftEngineLogStore::try_new(wal_dir, config) .await .map_err(Box::new) .context(OpenLogStoreSnafu)?; diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 8e9ceec710..a4db95cd57 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -16,6 +16,7 @@ use std::collections::{hash_map, HashMap}; use std::fmt::{Debug, Formatter}; use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use std::time::Duration; use async_stream::stream; use common_runtime::{RepeatedTask, TaskFunction}; @@ -40,7 +41,9 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; const NAMESPACE_PREFIX: &str = "$sys/"; pub struct RaftEngineLogStore { - config: RaftEngineConfig, + sync_write: bool, + sync_period: Option, + read_batch_size: usize, engine: Arc, gc_task: RepeatedTask, last_sync_time: AtomicI64, @@ -76,7 +79,7 @@ impl TaskFunction for PurgeExpiredFilesFunction { } impl RaftEngineLogStore { - pub async fn try_new(dir: String, config: RaftEngineConfig) -> Result { + pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result { let raft_engine_config = Config { dir, purge_threshold: ReadableSize(config.purge_threshold.0), @@ -85,6 +88,7 @@ impl RaftEngineLogStore { target_file_size: ReadableSize(config.file_size.0), enable_log_recycle: config.enable_log_recycle, prefill_for_recycle: config.prefill_log_files, + recovery_threads: config.recovery_parallelism, ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); @@ -96,7 +100,9 @@ impl RaftEngineLogStore { ); let log_store = Self { - config, + sync_write: config.sync_write, + sync_period: config.sync_period, + read_batch_size: config.read_batch_size, engine, gc_task, last_sync_time: AtomicI64::new(0), @@ -196,7 +202,9 @@ impl RaftEngineLogStore { impl Debug for RaftEngineLogStore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("RaftEngineLogsStore") - .field("config", &self.config) + .field("sync_write", &self.sync_write) + .field("sync_period", &self.sync_period) + .field("read_batch_size", &self.read_batch_size) .field("started", &self.gc_task.started()) .finish() } @@ -228,9 +236,9 @@ impl LogStore for RaftEngineLogStore { let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?; - let mut sync = self.config.sync_write; + let mut sync = self.sync_write; - if let Some(sync_period) = &self.config.sync_period { + if let Some(sync_period) = &self.sync_period { let now = common_time::util::current_time_millis(); if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 { self.last_sync_time.store(now, Ordering::Relaxed); @@ -276,7 +284,7 @@ impl LogStore for RaftEngineLogStore { entry_id, self.span(ns) ); - let max_batch_size = self.config.read_batch_size; + let max_batch_size = self.read_batch_size; let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size); let _handle = common_runtime::spawn_global(async move { while start_index <= last_index { @@ -489,7 +497,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - RaftEngineConfig::default(), + &RaftEngineConfig::default(), ) .await .unwrap(); @@ -502,7 +510,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - RaftEngineConfig::default(), + &RaftEngineConfig::default(), ) .await .unwrap(); @@ -528,7 +536,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - RaftEngineConfig::default(), + &RaftEngineConfig::default(), ) .await .unwrap(); @@ -570,7 +578,7 @@ mod tests { { let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - RaftEngineConfig::default(), + &RaftEngineConfig::default(), ) .await .unwrap(); @@ -590,7 +598,7 @@ mod tests { let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - RaftEngineConfig::default(), + &RaftEngineConfig::default(), ) .await .unwrap(); @@ -634,7 +642,7 @@ mod tests { ..Default::default() }; - RaftEngineLogStore::try_new(path, config).await.unwrap() + RaftEngineLogStore::try_new(path, &config).await.unwrap() } #[tokio::test] diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index b1fd183fba..98d419acc4 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -29,7 +29,7 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng file_size: ReadableSize::kb(128), ..Default::default() }; - RaftEngineLogStore::try_new(path, cfg).await.unwrap() + RaftEngineLogStore::try_new(path, &cfg).await.unwrap() } /// Create a [KafkaLogStore]. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 56307e0427..fe28387cd6 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -906,6 +906,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String { "metadata_cache_size =", "content_cache_size =", "name =", + "recovery_parallelism =", ]; input