From a922dcd9dffc786224861105efa94593f43ae7f6 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 10 Mar 2025 16:22:35 +0800 Subject: [PATCH] refactor(mito): move wal sync task to background (#5677) chore/move-wal-sync-to-bg: ### Refactor Log Store Task Management - **Error Handling Enhancements**: Updated error handling for task management in `error.rs` by renaming `StartGcTask` and `StopGcTask` to `StartWalTask` and `StopWalTask`, respectively, and added a `name` field for more descriptive error messages. - **Task Management Improvements**: Introduced `SyncWalTaskFunction` in `log_store.rs` to handle periodic synchronization of WAL tasks, replacing the previous atomic-based sync logic. - **Backend Adjustments**: Modified `backend.rs` to use the new `StartWalTaskSnafu` for starting tasks, ensuring consistency with the updated error handling approach. --- src/log-store/src/error.rs | 10 ++-- src/log-store/src/raft_engine/backend.rs | 5 +- src/log-store/src/raft_engine/log_store.rs | 64 ++++++++++++++++------ 3 files changed, 55 insertions(+), 24 deletions(-) diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 962606666b..5f9a5fc4e3 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -40,15 +40,17 @@ pub enum Error { actual: String, }, - #[snafu(display("Failed to start log store gc task"))] - StartGcTask { + #[snafu(display("Failed to start log store task: {}", name))] + StartWalTask { + name: String, #[snafu(implicit)] location: Location, source: RuntimeError, }, - #[snafu(display("Failed to stop log store gc task"))] - StopGcTask { + #[snafu(display("Failed to stop log store task: {}", name))] + StopWalTask { + name: String, #[snafu(implicit)] location: Location, source: RuntimeError, diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 456c6e2c88..3d41e5298d 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -35,7 +35,7 @@ use common_runtime::RepeatedTask; use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode}; use snafu::{IntoError, ResultExt}; -use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartGcTaskSnafu}; +use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartWalTaskSnafu}; use crate::raft_engine::log_store::PurgeExpiredFilesFunction; pub(crate) const SYSTEM_NAMESPACE: u64 = 0; @@ -93,7 +93,8 @@ impl RaftEngineBackend { ); gc_task .start(common_runtime::global_runtime()) - .context(StartGcTaskSnafu)?; + .context(StartWalTaskSnafu { name: "gc_task" })?; + Ok(Self { engine: RwLock::new(engine), _gc_task: gc_task, diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 3e2ff5023b..c7df8be66c 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -14,7 +14,6 @@ use std::collections::{hash_map, HashMap}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -32,7 +31,7 @@ use store_api::storage::RegionId; use crate::error::{ AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu, - RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, + RaftEngineSnafu, Result, StartWalTaskSnafu, StopWalTaskSnafu, }; use crate::metrics; use crate::raft_engine::backend::SYSTEM_NAMESPACE; @@ -46,7 +45,7 @@ pub struct RaftEngineLogStore { read_batch_size: usize, engine: Arc, gc_task: RepeatedTask, - last_sync_time: AtomicI64, + sync_task: RepeatedTask, } pub struct PurgeExpiredFilesFunction { @@ -83,6 +82,31 @@ impl TaskFunction for PurgeExpiredFilesFunction { } } +pub struct SyncWalTaskFunction { + engine: Arc, +} + +#[async_trait::async_trait] +impl TaskFunction for SyncWalTaskFunction { + async fn call(&mut self) -> std::result::Result<(), Error> { + let engine = self.engine.clone(); + if let Err(e) = tokio::task::spawn_blocking(move || engine.sync()).await { + error!(e; "Failed to sync raft engine log files"); + }; + Ok(()) + } + + fn name(&self) -> &str { + "SyncWalTaskFunction" + } +} + +impl SyncWalTaskFunction { + pub fn new(engine: Arc) -> Self { + Self { engine } + } +} + impl RaftEngineLogStore { pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result { let raft_engine_config = Config { @@ -104,13 +128,18 @@ impl RaftEngineLogStore { }), ); + let sync_task = RepeatedTask::new( + config.sync_period.unwrap_or(Duration::from_secs(5)), + Box::new(SyncWalTaskFunction::new(engine.clone())), + ); + let log_store = Self { sync_write: config.sync_write, sync_period: config.sync_period, read_batch_size: config.read_batch_size, engine, gc_task, - last_sync_time: AtomicI64::new(0), + sync_task, }; log_store.start()?; Ok(log_store) @@ -123,7 +152,10 @@ impl RaftEngineLogStore { fn start(&self) -> Result<()> { self.gc_task .start(common_runtime::global_runtime()) - .context(StartGcTaskSnafu) + .context(StartWalTaskSnafu { name: "gc_task" })?; + self.sync_task + .start(common_runtime::global_runtime()) + .context(StartWalTaskSnafu { name: "sync_task" }) } fn span(&self, provider: &RaftEngineProvider) -> (Option, Option) { @@ -220,7 +252,14 @@ impl LogStore for RaftEngineLogStore { type Error = Error; async fn stop(&self) -> Result<()> { - self.gc_task.stop().await.context(StopGcTaskSnafu) + self.gc_task + .stop() + .await + .context(StopWalTaskSnafu { name: "gc_task" })?; + self.sync_task + .stop() + .await + .context(StopWalTaskSnafu { name: "sync_task" }) } /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of @@ -240,20 +279,9 @@ impl LogStore for RaftEngineLogStore { } let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?; - - let mut sync = self.sync_write; - - if let Some(sync_period) = &self.sync_period { - let now = common_time::util::current_time_millis(); - if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 { - self.last_sync_time.store(now, Ordering::Relaxed); - sync = true; - } - } - let _ = self .engine - .write(&mut batch, sync) + .write(&mut batch, self.sync_write) .context(RaftEngineSnafu)?; Ok(AppendBatchResponse { last_entry_ids })