diff --git a/Cargo.lock b/Cargo.lock
index d483ec7088..7065971231 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2293,6 +2293,7 @@ dependencies = [
"common-telemetry",
"futures-util",
"humantime-serde",
+ "num_cpus",
"rskafka",
"rustls 0.23.10",
"rustls-native-certs",
diff --git a/config/config.md b/config/config.md
index f0ee9e54f8..a792be5de5 100644
--- a/config/config.md
+++ b/config/config.md
@@ -68,6 +68,7 @@
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. |
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. |
+| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. |
| `wal.auto_create_topics` | Bool | `true` | Automatically create topics for WAL.
Set to `true` to automatically create topics for WAL.
Otherwise, use topics named `topic_name_prefix_[0..num_topics)` |
| `wal.num_topics` | Integer | `64` | Number of topics.
**It's only used when the provider is `kafka`**. |
@@ -381,6 +382,7 @@
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. |
| `wal.prefill_log_files` | Bool | `false` | Whether to pre-create log files on start up.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_period` | String | `10s` | Duration for fsyncing log files.
**It's only used when the provider is `raft_engine`**. |
+| `wal.recovery_parallelism` | Integer | `2` | Parallelism during WAL recovery. |
| `wal.broker_endpoints` | Array | -- | The Kafka broker endpoints.
**It's only used when the provider is `kafka`**. |
| `wal.max_batch_bytes` | String | `1MB` | The max size of a single producer batch.
Warning: Kafka has a default limit of 1MB per message in a topic.
**It's only used when the provider is `kafka`**. |
| `wal.consumer_wait_timeout` | String | `100ms` | The consumer wait timeout.
**It's only used when the provider is `kafka`**. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 07c1df3e2a..14fbf914e7 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -170,6 +170,9 @@ prefill_log_files = false
## **It's only used when the provider is `raft_engine`**.
sync_period = "10s"
+## Parallelism during WAL recovery.
+recovery_parallelism = 2
+
## The Kafka broker endpoints.
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index f36c0e2904..f7c7b2af29 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -174,6 +174,9 @@ prefill_log_files = false
## **It's only used when the provider is `raft_engine`**.
sync_period = "10s"
+## Parallelism during WAL recovery.
+recovery_parallelism = 2
+
## The Kafka broker endpoints.
## **It's only used when the provider is `kafka`**.
broker_endpoints = ["127.0.0.1:9092"]
diff --git a/src/cmd/tests/load_config_test.rs b/src/cmd/tests/load_config_test.rs
index 199e23717d..78d0786f7c 100644
--- a/src/cmd/tests/load_config_test.rs
+++ b/src/cmd/tests/load_config_test.rs
@@ -65,6 +65,7 @@ fn test_load_datanode_example_config() {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
+ recovery_parallelism: 2,
..Default::default()
}),
storage: StorageConfig {
@@ -207,6 +208,7 @@ fn test_load_standalone_example_config() {
wal: DatanodeWalConfig::RaftEngine(RaftEngineConfig {
dir: Some("/tmp/greptimedb/wal".to_string()),
sync_period: Some(Duration::from_secs(10)),
+ recovery_parallelism: 2,
..Default::default()
}),
region_engine: vec![
diff --git a/src/common/wal/Cargo.toml b/src/common/wal/Cargo.toml
index 0bced0dd38..202b2825e3 100644
--- a/src/common/wal/Cargo.toml
+++ b/src/common/wal/Cargo.toml
@@ -17,6 +17,7 @@ common-macro.workspace = true
common-telemetry.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
+num_cpus.workspace = true
rskafka.workspace = true
rustls = { version = "0.23", default-features = false, features = ["ring", "logging", "std", "tls12"] }
rustls-native-certs = "0.7"
diff --git a/src/common/wal/src/config/raft_engine.rs b/src/common/wal/src/config/raft_engine.rs
index f54e3f1ba5..af5daa9d38 100644
--- a/src/common/wal/src/config/raft_engine.rs
+++ b/src/common/wal/src/config/raft_engine.rs
@@ -41,6 +41,8 @@ pub struct RaftEngineConfig {
/// Duration for fsyncing log files.
#[serde(with = "humantime_serde")]
pub sync_period: Option,
+ /// Parallelism during log recovery.
+ pub recovery_parallelism: usize,
}
impl Default for RaftEngineConfig {
@@ -55,6 +57,7 @@ impl Default for RaftEngineConfig {
enable_log_recycle: true,
prefill_log_files: false,
sync_period: None,
+ recovery_parallelism: num_cpus::get(),
}
}
}
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 09bf901d37..149aa44ebe 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -454,7 +454,7 @@ impl DatanodeBuilder {
"Creating raft-engine logstore with config: {:?} and storage path: {}",
config, &wal_dir
);
- let logstore = RaftEngineLogStore::try_new(wal_dir, config.clone())
+ let logstore = RaftEngineLogStore::try_new(wal_dir, config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)?;
diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs
index 8e9ceec710..a4db95cd57 100644
--- a/src/log-store/src/raft_engine/log_store.rs
+++ b/src/log-store/src/raft_engine/log_store.rs
@@ -16,6 +16,7 @@ use std::collections::{hash_map, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
+use std::time::Duration;
use async_stream::stream;
use common_runtime::{RepeatedTask, TaskFunction};
@@ -40,7 +41,9 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl};
const NAMESPACE_PREFIX: &str = "$sys/";
pub struct RaftEngineLogStore {
- config: RaftEngineConfig,
+ sync_write: bool,
+ sync_period: Option,
+ read_batch_size: usize,
engine: Arc,
gc_task: RepeatedTask,
last_sync_time: AtomicI64,
@@ -76,7 +79,7 @@ impl TaskFunction for PurgeExpiredFilesFunction {
}
impl RaftEngineLogStore {
- pub async fn try_new(dir: String, config: RaftEngineConfig) -> Result {
+ pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result {
let raft_engine_config = Config {
dir,
purge_threshold: ReadableSize(config.purge_threshold.0),
@@ -85,6 +88,7 @@ impl RaftEngineLogStore {
target_file_size: ReadableSize(config.file_size.0),
enable_log_recycle: config.enable_log_recycle,
prefill_for_recycle: config.prefill_log_files,
+ recovery_threads: config.recovery_parallelism,
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
@@ -96,7 +100,9 @@ impl RaftEngineLogStore {
);
let log_store = Self {
- config,
+ sync_write: config.sync_write,
+ sync_period: config.sync_period,
+ read_batch_size: config.read_batch_size,
engine,
gc_task,
last_sync_time: AtomicI64::new(0),
@@ -196,7 +202,9 @@ impl RaftEngineLogStore {
impl Debug for RaftEngineLogStore {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftEngineLogsStore")
- .field("config", &self.config)
+ .field("sync_write", &self.sync_write)
+ .field("sync_period", &self.sync_period)
+ .field("read_batch_size", &self.read_batch_size)
.field("started", &self.gc_task.started())
.finish()
}
@@ -228,9 +236,9 @@ impl LogStore for RaftEngineLogStore {
let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;
- let mut sync = self.config.sync_write;
+ let mut sync = self.sync_write;
- if let Some(sync_period) = &self.config.sync_period {
+ if let Some(sync_period) = &self.sync_period {
let now = common_time::util::current_time_millis();
if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 {
self.last_sync_time.store(now, Ordering::Relaxed);
@@ -276,7 +284,7 @@ impl LogStore for RaftEngineLogStore {
entry_id,
self.span(ns)
);
- let max_batch_size = self.config.read_batch_size;
+ let max_batch_size = self.read_batch_size;
let (tx, mut rx) = tokio::sync::mpsc::channel(max_batch_size);
let _handle = common_runtime::spawn_global(async move {
while start_index <= last_index {
@@ -489,7 +497,7 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
- RaftEngineConfig::default(),
+ &RaftEngineConfig::default(),
)
.await
.unwrap();
@@ -502,7 +510,7 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
- RaftEngineConfig::default(),
+ &RaftEngineConfig::default(),
)
.await
.unwrap();
@@ -528,7 +536,7 @@ mod tests {
let dir = create_temp_dir("raft-engine-logstore-test");
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
- RaftEngineConfig::default(),
+ &RaftEngineConfig::default(),
)
.await
.unwrap();
@@ -570,7 +578,7 @@ mod tests {
{
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
- RaftEngineConfig::default(),
+ &RaftEngineConfig::default(),
)
.await
.unwrap();
@@ -590,7 +598,7 @@ mod tests {
let logstore = RaftEngineLogStore::try_new(
dir.path().to_str().unwrap().to_string(),
- RaftEngineConfig::default(),
+ &RaftEngineConfig::default(),
)
.await
.unwrap();
@@ -634,7 +642,7 @@ mod tests {
..Default::default()
};
- RaftEngineLogStore::try_new(path, config).await.unwrap()
+ RaftEngineLogStore::try_new(path, &config).await.unwrap()
}
#[tokio::test]
diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs
index b1fd183fba..98d419acc4 100644
--- a/src/log-store/src/test_util/log_store_util.rs
+++ b/src/log-store/src/test_util/log_store_util.rs
@@ -29,7 +29,7 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng
file_size: ReadableSize::kb(128),
..Default::default()
};
- RaftEngineLogStore::try_new(path, cfg).await.unwrap()
+ RaftEngineLogStore::try_new(path, &cfg).await.unwrap()
}
/// Create a [KafkaLogStore].
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 56307e0427..fe28387cd6 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -906,6 +906,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"metadata_cache_size =",
"content_cache_size =",
"name =",
+ "recovery_parallelism =",
];
input