refactor(mito): move wal sync task to background (#5677)

chore/move-wal-sync-to-bg:
 ### Refactor Log Store Task Management

 - **Error Handling Enhancements**: Updated error handling for task management in `error.rs` by renaming `StartGcTask` and `StopGcTask` to `StartWalTask` and `StopWalTask`, respectively, and added a `name` field for more descriptive error messages.
 - **Task Management Improvements**: Introduced `SyncWalTaskFunction` in `log_store.rs` to handle periodic synchronization of WAL tasks, replacing the previous atomic-based sync logic.
 - **Backend Adjustments**: Modified `backend.rs` to use the new `StartWalTaskSnafu` for starting tasks, ensuring consistency with the updated error handling approach.
This commit is contained in:
Lei, HUANG
2025-03-10 16:22:35 +08:00
committed by GitHub
parent 530ff53422
commit a922dcd9df
3 changed files with 55 additions and 24 deletions

View File

@@ -40,15 +40,17 @@ pub enum Error {
actual: String,
},
#[snafu(display("Failed to start log store gc task"))]
StartGcTask {
#[snafu(display("Failed to start log store task: {}", name))]
StartWalTask {
name: String,
#[snafu(implicit)]
location: Location,
source: RuntimeError,
},
#[snafu(display("Failed to stop log store gc task"))]
StopGcTask {
#[snafu(display("Failed to stop log store task: {}", name))]
StopWalTask {
name: String,
#[snafu(implicit)]
location: Location,
source: RuntimeError,

View File

@@ -35,7 +35,7 @@ use common_runtime::RepeatedTask;
use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode};
use snafu::{IntoError, ResultExt};
use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartGcTaskSnafu};
use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartWalTaskSnafu};
use crate::raft_engine::log_store::PurgeExpiredFilesFunction;
pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
@@ -93,7 +93,8 @@ impl RaftEngineBackend {
);
gc_task
.start(common_runtime::global_runtime())
.context(StartGcTaskSnafu)?;
.context(StartWalTaskSnafu { name: "gc_task" })?;
Ok(Self {
engine: RwLock::new(engine),
_gc_task: gc_task,

View File

@@ -14,7 +14,6 @@
use std::collections::{hash_map, HashMap};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -32,7 +31,7 @@ use store_api::storage::RegionId;
use crate::error::{
AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu,
IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu,
RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu,
RaftEngineSnafu, Result, StartWalTaskSnafu, StopWalTaskSnafu,
};
use crate::metrics;
use crate::raft_engine::backend::SYSTEM_NAMESPACE;
@@ -46,7 +45,7 @@ pub struct RaftEngineLogStore {
read_batch_size: usize,
engine: Arc<Engine>,
gc_task: RepeatedTask<Error>,
last_sync_time: AtomicI64,
sync_task: RepeatedTask<Error>,
}
pub struct PurgeExpiredFilesFunction {
@@ -83,6 +82,31 @@ impl TaskFunction<Error> for PurgeExpiredFilesFunction {
}
}
pub struct SyncWalTaskFunction {
engine: Arc<Engine>,
}
#[async_trait::async_trait]
impl TaskFunction<Error> for SyncWalTaskFunction {
async fn call(&mut self) -> std::result::Result<(), Error> {
let engine = self.engine.clone();
if let Err(e) = tokio::task::spawn_blocking(move || engine.sync()).await {
error!(e; "Failed to sync raft engine log files");
};
Ok(())
}
fn name(&self) -> &str {
"SyncWalTaskFunction"
}
}
impl SyncWalTaskFunction {
pub fn new(engine: Arc<Engine>) -> Self {
Self { engine }
}
}
impl RaftEngineLogStore {
pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result<Self> {
let raft_engine_config = Config {
@@ -104,13 +128,18 @@ impl RaftEngineLogStore {
}),
);
let sync_task = RepeatedTask::new(
config.sync_period.unwrap_or(Duration::from_secs(5)),
Box::new(SyncWalTaskFunction::new(engine.clone())),
);
let log_store = Self {
sync_write: config.sync_write,
sync_period: config.sync_period,
read_batch_size: config.read_batch_size,
engine,
gc_task,
last_sync_time: AtomicI64::new(0),
sync_task,
};
log_store.start()?;
Ok(log_store)
@@ -123,7 +152,10 @@ impl RaftEngineLogStore {
fn start(&self) -> Result<()> {
self.gc_task
.start(common_runtime::global_runtime())
.context(StartGcTaskSnafu)
.context(StartWalTaskSnafu { name: "gc_task" })?;
self.sync_task
.start(common_runtime::global_runtime())
.context(StartWalTaskSnafu { name: "sync_task" })
}
fn span(&self, provider: &RaftEngineProvider) -> (Option<u64>, Option<u64>) {
@@ -220,7 +252,14 @@ impl LogStore for RaftEngineLogStore {
type Error = Error;
async fn stop(&self) -> Result<()> {
self.gc_task.stop().await.context(StopGcTaskSnafu)
self.gc_task
.stop()
.await
.context(StopWalTaskSnafu { name: "gc_task" })?;
self.sync_task
.stop()
.await
.context(StopWalTaskSnafu { name: "sync_task" })
}
/// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of
@@ -240,20 +279,9 @@ impl LogStore for RaftEngineLogStore {
}
let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?;
let mut sync = self.sync_write;
if let Some(sync_period) = &self.sync_period {
let now = common_time::util::current_time_millis();
if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 {
self.last_sync_time.store(now, Ordering::Relaxed);
sync = true;
}
}
let _ = self
.engine
.write(&mut batch, sync)
.write(&mut batch, self.sync_write)
.context(RaftEngineSnafu)?;
Ok(AppendBatchResponse { last_entry_ids })