fix: introduce gc task for metadata store (#5461)

* fix: introduce gc task for metadata kvbackend

* refactor: refine KvbackendConfig

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-02-06 21:12:43 +09:00
committed by GitHub
parent 116bdaf690
commit c80d2a3222
14 changed files with 82 additions and 59 deletions

4
Cargo.lock generated
View File

@@ -1967,10 +1967,12 @@ dependencies = [
"common-wal",
"config",
"datanode",
"humantime-serde",
"meta-client",
"num_cpus",
"serde",
"serde_json",
"serde_with",
"snafu 0.8.5",
"sysinfo",
"temp-env",
@@ -4340,7 +4342,6 @@ dependencies = [
"promql-parser",
"prost 0.13.3",
"query",
"raft-engine",
"serde",
"serde_json",
"servers",
@@ -6230,6 +6231,7 @@ dependencies = [
"bytes",
"chrono",
"common-base",
"common-config",
"common-error",
"common-macro",
"common-meta",

View File

@@ -65,8 +65,8 @@
| `wal.provider` | String | `raft_engine` | The provider of the WAL.<br/>- `raft_engine`: the wal is stored in the local file system by raft-engine.<br/>- `kafka`: it's remote wal that data is stored in Kafka. |
| `wal.dir` | String | Unset | The directory to store the WAL files.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.file_size` | String | `128MB` | The size of the WAL segment file.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a flush.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_threshold` | String | `1GB` | The threshold of the WAL size to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.purge_interval` | String | `1m` | The interval to trigger a purge.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.read_batch_size` | Integer | `128` | The read batch size.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.sync_write` | Bool | `false` | Whether to use sync write.<br/>**It's only used when the provider is `raft_engine`**. |
| `wal.enable_log_recycle` | Bool | `true` | Whether to reuse logically truncated log files.<br/>**It's only used when the provider is `raft_engine`**. |
@@ -88,8 +88,9 @@
| `wal.backoff_deadline` | String | `5mins` | The deadline of retries.<br/>**It's only used when the provider is `kafka`**. |
| `wal.overwrite_entry_start_id` | Bool | `false` | Ignore missing entries during read WAL.<br/>**It's only used when the provider is `kafka`**.<br/><br/>This option ensures that when Kafka messages are deleted, the system<br/>can still successfully replay memtable data without throwing an<br/>out-of-range error.<br/>However, enabling this option might lead to unexpected data loss,<br/>as the system will skip over missing entries instead of treating<br/>them as critical errors. |
| `metadata_store` | -- | -- | Metadata storage options. |
| `metadata_store.file_size` | String | `256MB` | Kv file size in bytes. |
| `metadata_store.purge_threshold` | String | `4GB` | Kv purge threshold. |
| `metadata_store.file_size` | String | `64MB` | The size of the metadata store log file. |
| `metadata_store.purge_threshold` | String | `256MB` | The threshold of the metadata store size to trigger a purge. |
| `metadata_store.purge_interval` | String | `1m` | The interval of the metadata store to trigger a purge. |
| `procedure` | -- | -- | Procedure storage options. |
| `procedure.max_retry_times` | Integer | `3` | Procedure max retry time. |
| `procedure.retry_delay` | String | `500ms` | Initial retry delay of procedures, increases exponentially |

View File

@@ -159,11 +159,11 @@ dir = "/tmp/greptimedb/wal"
## **It's only used when the provider is `raft_engine`**.
file_size = "128MB"
## The threshold of the WAL size to trigger a flush.
## The threshold of the WAL size to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_threshold = "1GB"
## The interval to trigger a flush.
## The interval to trigger a purge.
## **It's only used when the provider is `raft_engine`**.
purge_interval = "1m"
@@ -278,10 +278,12 @@ overwrite_entry_start_id = false
## Metadata storage options.
[metadata_store]
## Kv file size in bytes.
file_size = "256MB"
## Kv purge threshold.
purge_threshold = "4GB"
## The size of the metadata store log file.
file_size = "64MB"
## The threshold of the metadata store size to trigger a purge.
purge_threshold = "256MB"
## The interval of the metadata store to trigger a purge.
purge_interval = "1m"
## Procedure storage options.
[procedure]

View File

@@ -486,8 +486,8 @@ impl StartCommand {
let metadata_dir = metadata_store_dir(data_home);
let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
metadata_dir,
opts.metadata_store.clone(),
opts.procedure.clone(),
opts.metadata_store,
opts.procedure,
)
.await
.context(StartFrontendSnafu)?;

View File

@@ -12,9 +12,11 @@ common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true
config.workspace = true
humantime-serde.workspace = true
num_cpus.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
snafu.workspace = true
sysinfo.workspace = true
toml.workspace = true

View File

@@ -16,6 +16,8 @@ pub mod config;
pub mod error;
pub mod utils;
use std::time::Duration;
use common_base::readable_size::ReadableSize;
pub use config::*;
use serde::{Deserialize, Serialize};
@@ -34,22 +36,27 @@ pub enum Mode {
Distributed,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KvBackendConfig {
// Kv file size in bytes
/// The size of the metadata store backend log file.
pub file_size: ReadableSize,
// Kv purge threshold in bytes
/// The threshold of the metadata store size to trigger a purge.
pub purge_threshold: ReadableSize,
/// The interval of the metadata store to trigger a purge.
#[serde(with = "humantime_serde")]
pub purge_interval: Duration,
}
impl Default for KvBackendConfig {
fn default() -> Self {
Self {
// log file size 256MB
file_size: ReadableSize::mb(256),
// purge threshold 4GB
purge_threshold: ReadableSize::gb(4),
// The log file size 64MB
file_size: ReadableSize::mb(64),
// The log purge threshold 256MB
purge_threshold: ReadableSize::mb(256),
// The log purge interval 1m
purge_interval: Duration::from_secs(60),
}
}
}

View File

@@ -19,7 +19,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct ProcedureConfig {
/// Max retry times of procedure.

View File

@@ -51,7 +51,6 @@ prometheus.workspace = true
promql-parser.workspace = true
prost.workspace = true
query.workspace = true
raft-engine.workspace = true
serde.workspace = true
servers.workspace = true
session.workspace = true

View File

@@ -40,7 +40,7 @@ use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_query::Output;
use common_telemetry::{debug, error, tracing};
use common_telemetry::{debug, error, info, tracing};
use datafusion_expr::LogicalPlan;
use log_store::raft_engine::RaftEngineBackend;
use operator::delete::DeleterRef;
@@ -55,7 +55,6 @@ use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::stats::StatementStatistics;
use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::ExportMetricsTask;
@@ -134,19 +133,15 @@ impl Instance {
kv_backend_config: KvBackendConfig,
procedure_config: ProcedureConfig,
) -> Result<(KvBackendRef, ProcedureManagerRef)> {
let kv_backend = Arc::new(
RaftEngineBackend::try_open_with_cfg(Config {
dir,
purge_threshold: ReadableSize(kv_backend_config.purge_threshold.0),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(kv_backend_config.file_size.0),
..Default::default()
})
.map_err(BoxedError::new)
.context(error::OpenRaftEngineBackendSnafu)?,
info!(
"Creating metadata kvbackend with config: {:?}",
kv_backend_config
);
let kv_backend = RaftEngineBackend::try_open_with_cfg(dir, &kv_backend_config)
.map_err(BoxedError::new)
.context(error::OpenRaftEngineBackendSnafu)?;
let kv_backend = Arc::new(kv_backend);
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let manager_config = ManagerConfig {

View File

@@ -18,6 +18,7 @@ async-trait.workspace = true
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true

View File

@@ -17,8 +17,9 @@
use std::any::Any;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::path::Path;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use common_config::KvBackendConfig;
use common_error::ext::BoxedError;
use common_meta::error as meta_error;
use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
@@ -30,16 +31,19 @@ use common_meta::rpc::store::{
};
use common_meta::rpc::KeyValue;
use common_meta::util::get_next_prefix_key;
use raft_engine::{Config, Engine, LogBatch};
use common_runtime::RepeatedTask;
use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode};
use snafu::{IntoError, ResultExt};
use crate::error::{self, IoSnafu, RaftEngineSnafu};
use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartGcTaskSnafu};
use crate::raft_engine::log_store::PurgeExpiredFilesFunction;
pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
/// RaftEngine based [KvBackend] implementation.
pub struct RaftEngineBackend {
engine: RwLock<Engine>,
engine: RwLock<Arc<Engine>>,
_gc_task: RepeatedTask<Error>,
}
fn ensure_dir(dir: &str) -> error::Result<()> {
@@ -65,15 +69,34 @@ fn ensure_dir(dir: &str) -> error::Result<()> {
}
impl RaftEngineBackend {
pub fn try_open_with_cfg(config: Config) -> error::Result<Self> {
ensure_dir(&config.dir)?;
if let Some(spill_dir) = &config.spill_dir {
pub fn try_open_with_cfg(dir: String, config: &KvBackendConfig) -> error::Result<Self> {
let cfg = Config {
dir: dir.to_string(),
purge_threshold: ReadableSize(config.purge_threshold.0),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(config.file_size.0),
..Default::default()
};
ensure_dir(&dir)?;
if let Some(spill_dir) = &cfg.spill_dir {
ensure_dir(spill_dir)?;
}
let engine = Engine::open(config).context(RaftEngineSnafu)?;
let engine = Arc::new(Engine::open(cfg).context(RaftEngineSnafu)?);
let gc_task = RepeatedTask::new(
config.purge_interval,
Box::new(PurgeExpiredFilesFunction {
engine: engine.clone(),
}),
);
gc_task
.start(common_runtime::global_runtime())
.context(StartGcTaskSnafu)?;
Ok(Self {
engine: RwLock::new(engine),
_gc_task: gc_task,
})
}
}
@@ -398,21 +421,11 @@ mod tests {
};
use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
use common_test_util::temp_dir::create_temp_dir;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use super::*;
fn build_kv_backend(dir: String) -> RaftEngineBackend {
let config = Config {
dir,
spill_dir: None,
recovery_mode: RecoveryMode::AbsoluteConsistency,
target_file_size: ReadableSize::mb(4),
purge_threshold: ReadableSize::mb(16),
..Default::default()
};
let engine = RwLock::new(Engine::open(config).unwrap());
RaftEngineBackend { engine }
RaftEngineBackend::try_open_with_cfg(dir, &KvBackendConfig::default()).unwrap()
}
#[tokio::test]

View File

@@ -50,7 +50,7 @@ pub struct RaftEngineLogStore {
}
pub struct PurgeExpiredFilesFunction {
engine: Arc<Engine>,
pub engine: Arc<Engine>,
}
#[async_trait::async_trait]

View File

@@ -282,8 +282,8 @@ impl GreptimeDbStandaloneBuilder {
let procedure_config = ProcedureConfig::default();
let (kv_backend, procedure_manager) = Instance::try_build_standalone_components(
format!("{}/kv", &opts.storage.data_home),
kv_backend_config.clone(),
procedure_config.clone(),
kv_backend_config,
procedure_config,
)
.await
.unwrap();

View File

@@ -963,8 +963,9 @@ prefill_log_files = false
{storage}
[metadata_store]
file_size = "256MiB"
purge_threshold = "4GiB"
file_size = "64MiB"
purge_threshold = "256MiB"
purge_interval = "1m"
[procedure]
max_retry_times = 3