diff --git a/Cargo.lock b/Cargo.lock
index f89045bb6b..aa00fcf081 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1967,10 +1967,12 @@ dependencies = [
"common-wal",
"config",
"datanode",
+ "humantime-serde",
"meta-client",
"num_cpus",
"serde",
"serde_json",
+ "serde_with",
"snafu 0.8.5",
"sysinfo",
"temp-env",
@@ -4340,7 +4342,6 @@ dependencies = [
"promql-parser",
"prost 0.13.3",
"query",
- "raft-engine",
"serde",
"serde_json",
"servers",
@@ -6230,6 +6231,7 @@ dependencies = [
"bytes",
"chrono",
"common-base",
+ "common-config",
"common-error",
"common-macro",
"common-meta",
diff --git a/config/config.md b/config/config.md
index 807a8141ba..97a227635f 100644
--- a/config/config.md
+++ b/config/config.md
@@ -65,8 +65,8 @@
| `wal.provider` | String | `raft_engine` | The provider of the WAL.
- `raft_engine`: the wal is stored in the local file system by raft-engine.
- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.dir` | String | Unset | The directory to store the WAL files.
**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.
**It's only used when the provider is `raft_engine`**. |
-| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a flush.
**It's only used when the provider is `raft_engine`**. |
-| `wal.purge_interval` | String | `1m` | The interval to trigger a flush.
**It's only used when the provider is `raft_engine`**. |
+| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a purge.
**It's only used when the provider is `raft_engine`**. |
+| `wal.purge_interval` | String | `1m` | The interval to trigger a purge.
**It's only used when the provider is `raft_engine`**. |
| `wal.read_batch_size` | Integer | `128` | The read batch size.
**It's only used when the provider is `raft_engine`**. |
| `wal.sync_write` | Bool | `false` | Whether to use sync write.
**It's only used when the provider is `raft_engine`**. |
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.
**It's only used when the provider is `raft_engine`**. |
@@ -88,8 +88,9 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.
**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.
**It's only used when the provider is `kafka`**.
This option ensures that when Kafka messages are deleted, the system
can still successfully replay memtable data without throwing an
out-of-range error.
However, enabling this option might lead to unexpected data loss,
as the system will skip over missing entries instead of treating
them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
-| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
-| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
+| `metadata_store.file_size` | String | `64MB` | The size of the metadata store log file. |
+| `metadata_store.purge_threshold` | String | `256MB` | The threshold of the metadata store size to trigger a purge. |
+| `metadata_store.purge_interval` | String | `1m` | The interval of the metadata store to trigger a purge. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 3d6ad64a3e..d751f23a71 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -159,11 +159,11 @@ dir = "/tmp/greptimedb/wal"
## **It's only used when the provider is `raft_engine`**.
file_size = "128MB"
-## The threshold of the WAL size to trigger a flush.
+## The threshold of the WAL size to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_threshold = "1GB"
-## The interval to trigger a flush.
+## The interval to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_interval = "1m"
@@ -278,10 +278,12 @@ overwrite_entry_start_id = false
## Metadata storage options.
[metadata_store]
-## Kv file size in bytes.
-file_size = "256MB"
-## Kv purge threshold.
-purge_threshold = "4GB"
+## The size of the metadata store log file.
+file_size = "64MB"
+## The threshold of the metadata store size to trigger a purge.
+purge_threshold = "256MB"
+## The interval of the metadata store to trigger a purge.
+purge_interval = "1m"
## Procedure storage options.
[procedure]
diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs
index 23e27d4e71..5ef998e082 100644
--- a/src/cmd/src/standalone.rs
+++ b/src/cmd/src/standalone.rs
@@ -486,8 +486,8 @@ impl StartCommand {
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
- opts.metadata_store.clone(),
- opts.procedure.clone(),
+ opts.metadata_store,
+ opts.procedure,
)
.await
.context(StartFrontendSnafu)?;
diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml
index ab9149e60a..988eae44df 100644
--- a/src/common/config/Cargo.toml
+++ b/src/common/config/Cargo.toml
@@ -12,9 +12,11 @@ common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
config.workspace = true
+humantime-serde.workspace = true
num_cpus.workspace = true
serde.workspace = true
serde_json.workspace = true
+serde_with.workspace = true
snafu.workspace = true
sysinfo.workspace = true
toml.workspace = true
diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs
index d6fb9abb45..13191a243c 100644
--- a/src/common/config/src/lib.rs
+++ b/src/common/config/src/lib.rs
@@ -16,6 +16,8 @@ pub mod config;
pub mod error;
pub mod utils;
+use std::time::Duration;
+
use common_base::readable_size::ReadableSize;
pub use config::*;
use serde::{Deserialize, Serialize};
@@ -34,22 +36,27 @@ pub enum Mode {
Distributed,
}
-#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {
- // Kv file size in bytes
+ /// The size of the metadata store backend log file.
pub file_size: ReadableSize,
- // Kv purge threshold in bytes
+ /// The threshold of the metadata store size to trigger a purge.
pub purge_threshold: ReadableSize,
+ /// The interval of the metadata store to trigger a purge.
+ #[serde(with = "humantime_serde")]
+ pub purge_interval: Duration,
}
impl Default for KvBackendConfig {
fn default() -> Self {
Self {
- // log file size 256MB
- file_size: ReadableSize::mb(256),
- // purge threshold 4GB
- purge_threshold: ReadableSize::gb(4),
+ // The log file size 64MB
+ file_size: ReadableSize::mb(64),
+ // The log purge threshold 256MB
+ purge_threshold: ReadableSize::mb(256),
+ // The log purge interval 1m
+ purge_interval: Duration::from_secs(60),
}
}
}
diff --git a/src/common/procedure/src/options.rs b/src/common/procedure/src/options.rs
index 503812de49..fe54cff1da 100644
--- a/src/common/procedure/src/options.rs
+++ b/src/common/procedure/src/options.rs
@@ -19,7 +19,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
-#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.
diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml
index b6aee42c5c..d0ce4a9d01 100644
--- a/src/frontend/Cargo.toml
+++ b/src/frontend/Cargo.toml
@@ -51,7 +51,6 @@ prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
-raft-engine.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true
diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs
index d2b9102929..1654544a48 100644
--- a/src/frontend/src/instance.rs
+++ b/src/frontend/src/instance.rs
@@ -40,7 +40,7 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
-use common_telemetry::{debug, error, tracing};
+use common_telemetry::{debug, error, info, tracing};
use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
@@ -55,7 +55,6 @@ use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::stats::StatementStatistics;
use query::QueryEngineRef;
-use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::ExportMetricsTask;
@@ -134,19 +133,15 @@ impl Instance {
kv_backend_config: KvBackendConfig,
procedure_config: ProcedureConfig,
) -> Result<(KvBackendRef, ProcedureManagerRef)> {
- let kv_backend = Arc::new(
- RaftEngineBackend::try_open_with_cfg(Config {
- dir,
- purge_threshold: ReadableSize(kv_backend_config.purge_threshold.0),
- recovery_mode: RecoveryMode::TolerateTailCorruption,
- batch_compression_threshold: ReadableSize::kb(8),
- target_file_size: ReadableSize(kv_backend_config.file_size.0),
- ..Default::default()
- })
- .map_err(BoxedError::new)
- .context(error::OpenRaftEngineBackendSnafu)?,
+ info!(
+ "Creating metadata kvbackend with config: {:?}",
+ kv_backend_config
);
+ let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
+ .map_err(BoxedError::new)
+ .context(error::OpenRaftEngineBackendSnafu)?;
+ let kv_backend = Arc::new(kv_backend);
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let manager_config = ManagerConfig {
diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml
index d8cbc9b7ec..31571afe3e 100644
--- a/src/log-store/Cargo.toml
+++ b/src/log-store/Cargo.toml
@@ -18,6 +18,7 @@ async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
+common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs
index 37a9af18a4..456c6e2c88 100644
--- a/src/log-store/src/raft_engine/backend.rs
+++ b/src/log-store/src/raft_engine/backend.rs
@@ -17,8 +17,9 @@
use std::any::Any;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::path::Path;
-use std::sync::RwLock;
+use std::sync::{Arc, RwLock};
+use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
use common_meta::error as meta_error;
use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
@@ -30,16 +31,19 @@ use common_meta::rpc::store::{
};
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
-use raft_engine::{Config, Engine, LogBatch};
+use common_runtime::RepeatedTask;
+use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode};
use snafu::{IntoError, ResultExt};
-use crate::error::{self, IoSnafu, RaftEngineSnafu};
+use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartGcTaskSnafu};
+use crate::raft_engine::log_store::PurgeExpiredFilesFunction;
pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
/// RaftEngine based [KvBackend] implementation.
pub struct RaftEngineBackend {
- engine: RwLock,
+ engine: RwLock>,
+ _gc_task: RepeatedTask,
}
fn ensure_dir(dir: &str) -> error::Result<()> {
@@ -65,15 +69,34 @@ fn ensure_dir(dir: &str) -> error::Result<()> {
}
impl RaftEngineBackend {
- pub fn try_open_with_cfg(config: Config) -> error::Result {
- ensure_dir(&config.dir)?;
- if let Some(spill_dir) = &config.spill_dir {
+ pub fn try_open_with_cfg(dir: String, config: &KvBackendConfig) -> error::Result {
+ let cfg = Config {
+ dir: dir.to_string(),
+ purge_threshold: ReadableSize(config.purge_threshold.0),
+ recovery_mode: RecoveryMode::TolerateTailCorruption,
+ batch_compression_threshold: ReadableSize::kb(8),
+ target_file_size: ReadableSize(config.file_size.0),
+ ..Default::default()
+ };
+
+ ensure_dir(&dir)?;
+ if let Some(spill_dir) = &cfg.spill_dir {
ensure_dir(spill_dir)?;
}
- let engine = Engine::open(config).context(RaftEngineSnafu)?;
+ let engine = Arc::new(Engine::open(cfg).context(RaftEngineSnafu)?);
+ let gc_task = RepeatedTask::new(
+ config.purge_interval,
+ Box::new(PurgeExpiredFilesFunction {
+ engine: engine.clone(),
+ }),
+ );
+ gc_task
+ .start(common_runtime::global_runtime())
+ .context(StartGcTaskSnafu)?;
Ok(Self {
engine: RwLock::new(engine),
+ _gc_task: gc_task,
})
}
}
@@ -398,21 +421,11 @@ mod tests {
};
use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
use common_test_util::temp_dir::create_temp_dir;
- use raft_engine::{Config, ReadableSize, RecoveryMode};
use super::*;
fn build_kv_backend(dir: String) -> RaftEngineBackend {
- let config = Config {
- dir,
- spill_dir: None,
- recovery_mode: RecoveryMode::AbsoluteConsistency,
- target_file_size: ReadableSize::mb(4),
- purge_threshold: ReadableSize::mb(16),
- ..Default::default()
- };
- let engine = RwLock::new(Engine::open(config).unwrap());
- RaftEngineBackend { engine }
+ RaftEngineBackend::try_open_with_cfg(dir, &KvBackendConfig::default()).unwrap()
}
#[tokio::test]
diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs
index 3c5ba75ef6..3e2ff5023b 100644
--- a/src/log-store/src/raft_engine/log_store.rs
+++ b/src/log-store/src/raft_engine/log_store.rs
@@ -50,7 +50,7 @@ pub struct RaftEngineLogStore {
}
pub struct PurgeExpiredFilesFunction {
- engine: Arc,
+ pub engine: Arc,
}
#[async_trait::async_trait]
diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs
index fe12406cd1..f2d6c3df23 100644
--- a/tests-integration/src/standalone.rs
+++ b/tests-integration/src/standalone.rs
@@ -282,8 +282,8 @@ impl GreptimeDbStandaloneBuilder {
let procedure_config = ProcedureConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
- kv_backend_config.clone(),
- procedure_config.clone(),
+ kv_backend_config,
+ procedure_config,
)
.await
.unwrap();
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 6a20a46934..a74b00ea0f 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -963,8 +963,9 @@ prefill_log_files = false
{storage}
[metadata_store]
-file_size = "256MiB"
-purge_threshold = "4GiB"
+file_size = "64MiB"
+purge_threshold = "256MiB"
+purge_interval = "1m"
[procedure]
max_retry_times = 3