diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index e50eebe73a..0f5beaa16b 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -53,6 +53,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to perform IO on path: {}", path))] + Io { + path: String, + #[snafu(source)] + error: std::io::Error, + location: Location, + }, + #[snafu(display("Log store not started yet"))] IllegalState { location: Location }, diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 7c57d6d2e7..a57e76850e 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. #![feature(let_chains)] +#![feature(io_error_more)] pub mod error; pub mod kafka; diff --git a/src/log-store/src/raft_engine/backend.rs b/src/log-store/src/raft_engine/backend.rs index b4320b1852..ab2f9896f9 100644 --- a/src/log-store/src/raft_engine/backend.rs +++ b/src/log-store/src/raft_engine/backend.rs @@ -16,6 +16,7 @@ use std::any::Any; use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::path::Path; use std::sync::RwLock; use common_error::ext::BoxedError; @@ -30,9 +31,9 @@ use common_meta::rpc::store::{ use common_meta::rpc::KeyValue; use common_meta::util::get_next_prefix_key; use raft_engine::{Config, Engine, LogBatch}; -use snafu::ResultExt; +use snafu::{IntoError, ResultExt}; -use crate::error::{self, RaftEngineSnafu}; +use crate::error::{self, IoSnafu, RaftEngineSnafu}; pub(crate) const SYSTEM_NAMESPACE: u64 = 0; @@ -41,8 +42,35 @@ pub struct RaftEngineBackend { engine: RwLock, } +fn ensure_dir(dir: &str) -> error::Result<()> { + let io_context = |err| { + IoSnafu { + path: dir.to_string(), + } + .into_error(err) + }; + + let path = Path::new(dir); + if !path.exists() { + // create the directory to ensure the permission + return std::fs::create_dir_all(path).map_err(io_context); + } + + let metadata = std::fs::metadata(path).map_err(io_context)?; + if !metadata.is_dir() { + return Err(io_context(std::io::ErrorKind::NotADirectory.into())); + } + + Ok(()) +} + impl RaftEngineBackend { pub fn try_open_with_cfg(config: Config) -> error::Result { + ensure_dir(&config.dir)?; + if let Some(spill_dir) = &config.spill_dir { + ensure_dir(spill_dir)?; + } + let engine = Engine::open(config).context(RaftEngineSnafu)?; Ok(Self { engine: RwLock::new(engine),