diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 962606666b..5f9a5fc4e3 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -40,15 +40,17 @@ pub enum Error { actual: String, }, - #[snafu(display("Failed to start log store gc task"))] - StartGcTask { + #[snafu(display("Failed to start log store task: {}", name))] + StartWalTask { + name: String, #[snafu(implicit)] location: Location, source: RuntimeError, }, - #[snafu(display("Failed to stop log store gc task"))] - StopGcTask { + #[snafu(display("Failed to stop log store task: {}", name))] + StopWalTask { + name: String, #[snafu(implicit)] location: Location, source: RuntimeError, diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index 456c6e2c88..3d41e5298d 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -35,7 +35,7 @@ use common_runtime::RepeatedTask; use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode}; use snafu::{IntoError, ResultExt}; -use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartGcTaskSnafu}; +use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartWalTaskSnafu}; use crate::raft_engine::log_store::PurgeExpiredFilesFunction; pub(crate) const SYSTEM_NAMESPACE: u64 = 0; @@ -93,7 +93,8 @@ impl RaftEngineBackend { ); gc_task .start(common_runtime::global_runtime()) - .context(StartGcTaskSnafu)?; + .context(StartWalTaskSnafu { name: "gc_task" })?; + Ok(Self { engine: RwLock::new(engine), _gc_task: gc_task, diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 3e2ff5023b..c7df8be66c 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -14,7 +14,6 @@ use std::collections::{hash_map, HashMap}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -32,7 +31,7 @@ use store_api::storage::RegionId; use crate::error::{ AddEntryLogBatchSnafu, DiscontinuousLogIndexSnafu, Error, FetchEntrySnafu, IllegalNamespaceSnafu, IllegalStateSnafu, InvalidProviderSnafu, OverrideCompactedEntrySnafu, - RaftEngineSnafu, Result, StartGcTaskSnafu, StopGcTaskSnafu, + RaftEngineSnafu, Result, StartWalTaskSnafu, StopWalTaskSnafu, }; use crate::metrics; use crate::raft_engine::backend::SYSTEM_NAMESPACE; @@ -46,7 +45,7 @@ pub struct RaftEngineLogStore { read_batch_size: usize, engine: Arc, gc_task: RepeatedTask, - last_sync_time: AtomicI64, + sync_task: RepeatedTask, } pub struct PurgeExpiredFilesFunction { @@ -83,6 +82,31 @@ impl TaskFunction for PurgeExpiredFilesFunction { } } +pub struct SyncWalTaskFunction { + engine: Arc, +} + +#[async_trait::async_trait] +impl TaskFunction for SyncWalTaskFunction { + async fn call(&mut self) -> std::result::Result<(), Error> { + let engine = self.engine.clone(); + if let Err(e) = tokio::task::spawn_blocking(move || engine.sync()).await { + error!(e; "Failed to sync raft engine log files"); + }; + Ok(()) + } + + fn name(&self) -> &str { + "SyncWalTaskFunction" + } +} + +impl SyncWalTaskFunction { + pub fn new(engine: Arc) -> Self { + Self { engine } + } +} + impl RaftEngineLogStore { pub async fn try_new(dir: String, config: &RaftEngineConfig) -> Result { let raft_engine_config = Config { @@ -104,13 +128,18 @@ impl RaftEngineLogStore { }), ); + let sync_task = RepeatedTask::new( + config.sync_period.unwrap_or(Duration::from_secs(5)), + Box::new(SyncWalTaskFunction::new(engine.clone())), + ); + let log_store = Self { sync_write: config.sync_write, sync_period: config.sync_period, read_batch_size: config.read_batch_size, engine, gc_task, - last_sync_time: AtomicI64::new(0), + sync_task, }; log_store.start()?; Ok(log_store) @@ -123,7 +152,10 @@ impl RaftEngineLogStore { fn start(&self) -> Result<()> { self.gc_task .start(common_runtime::global_runtime()) - .context(StartGcTaskSnafu) + .context(StartWalTaskSnafu { name: "gc_task" })?; + self.sync_task + .start(common_runtime::global_runtime()) + .context(StartWalTaskSnafu { name: "sync_task" }) } fn span(&self, provider: &RaftEngineProvider) -> (Option, Option) { @@ -220,7 +252,14 @@ impl LogStore for RaftEngineLogStore { type Error = Error; async fn stop(&self) -> Result<()> { - self.gc_task.stop().await.context(StopGcTaskSnafu) + self.gc_task + .stop() + .await + .context(StopWalTaskSnafu { name: "gc_task" })?; + self.sync_task + .stop() + .await + .context(StopWalTaskSnafu { name: "sync_task" }) } /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of @@ -240,20 +279,9 @@ impl LogStore for RaftEngineLogStore { } let (mut batch, last_entry_ids) = self.entries_to_batch(entries)?; - - let mut sync = self.sync_write; - - if let Some(sync_period) = &self.sync_period { - let now = common_time::util::current_time_millis(); - if now - self.last_sync_time.load(Ordering::Relaxed) >= sync_period.as_millis() as i64 { - self.last_sync_time.store(now, Ordering::Relaxed); - sync = true; - } - } - let _ = self .engine - .write(&mut batch, sync) + .write(&mut batch, self.sync_write) .context(RaftEngineSnafu)?; Ok(AppendBatchResponse { last_entry_ids })